ccdactivities.cpp 204 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214121512161217121812191220122112221223122412251226122712281229123012311232123312341235123612371238123912401241124212431244124512461247124812491250125112521253125412551256125712581259126012611262126312641265126612671268126912701271127212731274127512761277127812791280128112821283128412851286128712881289129012911292129312941295129612971298129913001301130213031304130513061307130813091310131113121313131413151316131713181319132013211322132313241325132613271328132913301331133213331334133513361337133813391340134113421343134413451346134713481349135013511352135313541355135613571358135913601361136213631364136513661367136813691370137113721373137413751376137713781379138013811382138313841385138613871388138913901391139213931394139513961397139813991400140114021403140414051406140714081409141014111412141314141415141614171418141914201421142214231424142514261427142814291430143114321433143414351436143714381439144014411442144314441445144614471448144914501451145214531454145514561457145814591460146114621463146414651466146714681469147014711472147314741475147614771478147914801481148214831484148514861487148814891490149114921493149414951496149714981499150015011502150315041505150615071508150915101511151215131514151515161517151815191520152115221523152415251526152715281529153015311532153315341535153615371538153915401541154215431544154515461547154815491550155115521553155415551556155715581559156015611562156315641565156615671568156915701571157215731574157515761577157815791580158115821583158415851586158715881589159015911592159315941595159615971598159916001601160216031604160516061607160816091610161116121613161416151616161716181619162016211622162316241625162616271628162916301631163216331634163516361637163816391640164116421643164416451646164716481649165016511652165316541655165616571658165916601661166216631664166516661667166816691670167116721673167416751676167716781679168016811682168316841685168616871688168916901691169216931694169516961697169816991700170117021703170417051706170717081709171017111712171317141715171617171718171917201721172217231724172517261727172817291730173117321733173417351736173717381739174017411742174317441745174617471748174917501751175217531754175517561757175817591760176117621763176417651766176717681769177017711772177317741775177617771778177917801781178217831784178517861787178817891790179117921793179417951796179717981799180018011802180318041805180618071808180918101811181218131814181518161817181818191820182118221823182418251826182718281829183018311832183318341835183618371838183918401841184218431844184518461847184818491850185118521853185418551856185718581859186018611862186318641865186618671868186918701871187218731874187518761877187818791880188118821883188418851886188718881889189018911892189318941895189618971898189919001901190219031904190519061907190819091910191119121913191419151916191719181919192019211922192319241925192619271928192919301931193219331934193519361937193819391940194119421943194419451946194719481949195019511952195319541955195619571958195919601961196219631964196519661967196819691970197119721973197419751976197719781979198019811982198319841985198619871988198919901991199219931994199519961997199819992000200120022003200420052006200720082009201020112012201320142015201620172018201920202021202220232024202520262027202820292030203120322033203420352036203720382039204020412042204320442045204620472048204920502051205220532054205520562057205820592060206120622063206420652066206720682069207020712072207320742075207620772078207920802081208220832084208520862087208820892090209120922093209420952096209720982099210021012102210321042105210621072108210921102111211221132114211521162117211821192120212121222123212421252126212721282129213021312132213321342135213621372138213921402141214221432144214521462147214821492150215121522153215421552156215721582159216021612162216321642165216621672168216921702171217221732174217521762177217821792180218121822183218421852186218721882189219021912192219321942195219621972198219922002201220222032204220522062207220822092210221122122213221422152216221722182219222022212222222322242225222622272228222922302231223222332234223522362237223822392240224122422243224422452246224722482249225022512252225322542255225622572258225922602261226222632264226522662267226822692270227122722273227422752276227722782279228022812282228322842285228622872288228922902291229222932294229522962297229822992300230123022303230423052306230723082309231023112312231323142315231623172318231923202321232223232324232523262327232823292330233123322333233423352336233723382339234023412342234323442345234623472348234923502351235223532354235523562357235823592360236123622363236423652366236723682369237023712372237323742375237623772378237923802381238223832384238523862387238823892390239123922393239423952396239723982399240024012402240324042405240624072408240924102411241224132414241524162417241824192420242124222423242424252426242724282429243024312432243324342435243624372438243924402441244224432444244524462447244824492450245124522453245424552456245724582459246024612462246324642465246624672468246924702471247224732474247524762477247824792480248124822483248424852486248724882489249024912492249324942495249624972498249925002501250225032504250525062507250825092510251125122513251425152516251725182519252025212522252325242525252625272528252925302531253225332534253525362537253825392540254125422543254425452546254725482549255025512552255325542555255625572558255925602561256225632564256525662567256825692570257125722573257425752576257725782579258025812582258325842585258625872588258925902591259225932594259525962597259825992600260126022603260426052606260726082609261026112612261326142615261626172618261926202621262226232624262526262627262826292630263126322633263426352636263726382639264026412642264326442645264626472648264926502651265226532654265526562657265826592660266126622663266426652666266726682669267026712672267326742675267626772678267926802681268226832684268526862687268826892690269126922693269426952696269726982699270027012702270327042705270627072708270927102711271227132714271527162717271827192720272127222723272427252726272727282729273027312732273327342735273627372738273927402741274227432744274527462747274827492750275127522753275427552756275727582759276027612762276327642765276627672768276927702771277227732774277527762777277827792780278127822783278427852786278727882789279027912792279327942795279627972798279928002801280228032804280528062807280828092810281128122813281428152816281728182819282028212822282328242825282628272828282928302831283228332834283528362837283828392840284128422843284428452846284728482849285028512852285328542855285628572858285928602861286228632864286528662867286828692870287128722873287428752876287728782879288028812882288328842885288628872888288928902891289228932894289528962897289828992900290129022903290429052906290729082909291029112912291329142915291629172918291929202921292229232924292529262927292829292930293129322933293429352936293729382939294029412942294329442945294629472948294929502951295229532954295529562957295829592960296129622963296429652966296729682969297029712972297329742975297629772978297929802981298229832984298529862987298829892990299129922993299429952996299729982999300030013002300330043005300630073008300930103011301230133014301530163017301830193020302130223023302430253026302730283029303030313032303330343035303630373038303930403041304230433044304530463047304830493050305130523053305430553056305730583059306030613062306330643065306630673068306930703071307230733074307530763077307830793080308130823083308430853086308730883089309030913092309330943095309630973098309931003101310231033104310531063107310831093110311131123113311431153116311731183119312031213122312331243125312631273128312931303131313231333134313531363137313831393140314131423143314431453146314731483149315031513152315331543155315631573158315931603161316231633164316531663167316831693170317131723173317431753176317731783179318031813182318331843185318631873188318931903191319231933194319531963197319831993200320132023203320432053206320732083209321032113212321332143215321632173218321932203221322232233224322532263227322832293230323132323233323432353236323732383239324032413242324332443245324632473248324932503251325232533254325532563257325832593260326132623263326432653266326732683269327032713272327332743275327632773278327932803281328232833284328532863287328832893290329132923293329432953296329732983299330033013302330333043305330633073308330933103311331233133314331533163317331833193320332133223323332433253326332733283329333033313332333333343335333633373338333933403341334233433344334533463347334833493350335133523353335433553356335733583359336033613362336333643365336633673368336933703371337233733374337533763377337833793380338133823383338433853386338733883389339033913392339333943395339633973398339934003401340234033404340534063407340834093410341134123413341434153416341734183419342034213422342334243425342634273428342934303431343234333434343534363437343834393440344134423443344434453446344734483449345034513452345334543455345634573458345934603461346234633464346534663467346834693470347134723473347434753476347734783479348034813482348334843485348634873488348934903491349234933494349534963497349834993500350135023503350435053506350735083509351035113512351335143515351635173518351935203521352235233524352535263527352835293530353135323533353435353536353735383539354035413542354335443545354635473548354935503551355235533554355535563557355835593560356135623563356435653566356735683569357035713572357335743575357635773578357935803581358235833584358535863587358835893590359135923593359435953596359735983599360036013602360336043605360636073608360936103611361236133614361536163617361836193620362136223623362436253626362736283629363036313632363336343635363636373638363936403641364236433644364536463647364836493650365136523653365436553656365736583659366036613662366336643665366636673668366936703671367236733674367536763677367836793680368136823683368436853686368736883689369036913692369336943695369636973698369937003701370237033704370537063707370837093710371137123713371437153716371737183719372037213722372337243725372637273728372937303731373237333734373537363737373837393740374137423743374437453746374737483749375037513752375337543755375637573758375937603761376237633764376537663767376837693770377137723773377437753776377737783779378037813782378337843785378637873788378937903791379237933794379537963797379837993800380138023803380438053806380738083809381038113812381338143815381638173818381938203821382238233824382538263827382838293830383138323833383438353836383738383839384038413842384338443845384638473848384938503851385238533854385538563857385838593860386138623863386438653866386738683869387038713872387338743875387638773878387938803881388238833884388538863887388838893890389138923893389438953896389738983899390039013902390339043905390639073908390939103911391239133914391539163917391839193920392139223923392439253926392739283929393039313932393339343935393639373938393939403941394239433944394539463947394839493950395139523953395439553956395739583959396039613962396339643965396639673968396939703971397239733974397539763977397839793980398139823983398439853986398739883989399039913992399339943995399639973998399940004001400240034004400540064007400840094010401140124013401440154016401740184019402040214022402340244025402640274028402940304031403240334034403540364037403840394040404140424043404440454046404740484049405040514052405340544055405640574058405940604061406240634064406540664067406840694070407140724073407440754076407740784079408040814082408340844085408640874088408940904091409240934094409540964097409840994100410141024103410441054106410741084109411041114112411341144115411641174118411941204121412241234124412541264127412841294130413141324133413441354136413741384139414041414142414341444145414641474148414941504151415241534154415541564157415841594160416141624163416441654166416741684169417041714172417341744175417641774178417941804181418241834184418541864187418841894190419141924193419441954196419741984199420042014202420342044205420642074208420942104211421242134214421542164217421842194220422142224223422442254226422742284229423042314232423342344235423642374238423942404241424242434244424542464247424842494250425142524253425442554256425742584259426042614262426342644265426642674268426942704271427242734274427542764277427842794280428142824283428442854286428742884289429042914292429342944295429642974298429943004301430243034304430543064307430843094310431143124313431443154316431743184319432043214322432343244325432643274328432943304331433243334334433543364337433843394340434143424343434443454346434743484349435043514352435343544355435643574358435943604361436243634364436543664367436843694370437143724373437443754376437743784379438043814382438343844385438643874388438943904391439243934394439543964397439843994400440144024403440444054406440744084409441044114412441344144415441644174418441944204421442244234424442544264427442844294430443144324433443444354436443744384439444044414442444344444445444644474448444944504451445244534454445544564457445844594460446144624463446444654466446744684469447044714472447344744475447644774478447944804481448244834484448544864487448844894490449144924493449444954496449744984499450045014502450345044505450645074508450945104511451245134514451545164517451845194520452145224523452445254526452745284529453045314532453345344535453645374538453945404541454245434544454545464547454845494550455145524553455445554556455745584559456045614562456345644565456645674568456945704571457245734574457545764577457845794580458145824583458445854586458745884589459045914592459345944595459645974598459946004601460246034604460546064607460846094610461146124613461446154616461746184619462046214622462346244625462646274628462946304631463246334634463546364637463846394640464146424643464446454646464746484649465046514652465346544655465646574658465946604661466246634664466546664667466846694670467146724673467446754676467746784679468046814682468346844685468646874688468946904691469246934694469546964697469846994700470147024703470447054706470747084709471047114712471347144715471647174718471947204721472247234724472547264727472847294730473147324733473447354736473747384739474047414742474347444745474647474748474947504751475247534754475547564757475847594760476147624763476447654766476747684769477047714772477347744775477647774778477947804781478247834784478547864787478847894790479147924793479447954796479747984799480048014802480348044805480648074808480948104811481248134814481548164817481848194820482148224823482448254826482748284829483048314832483348344835483648374838483948404841484248434844484548464847484848494850485148524853485448554856485748584859486048614862486348644865486648674868486948704871487248734874487548764877487848794880488148824883488448854886488748884889489048914892489348944895489648974898489949004901490249034904490549064907490849094910491149124913491449154916491749184919492049214922492349244925492649274928492949304931493249334934493549364937493849394940494149424943494449454946494749484949495049514952495349544955495649574958495949604961496249634964496549664967496849694970497149724973497449754976497749784979498049814982498349844985498649874988498949904991499249934994499549964997499849995000500150025003500450055006500750085009501050115012501350145015501650175018501950205021502250235024502550265027502850295030503150325033503450355036503750385039504050415042504350445045504650475048504950505051505250535054505550565057505850595060506150625063506450655066506750685069507050715072507350745075507650775078507950805081508250835084508550865087508850895090509150925093509450955096509750985099510051015102510351045105510651075108510951105111511251135114511551165117511851195120512151225123512451255126512751285129513051315132513351345135513651375138513951405141514251435144514551465147514851495150515151525153515451555156515751585159516051615162516351645165516651675168516951705171517251735174517551765177517851795180518151825183518451855186518751885189519051915192519351945195519651975198519952005201520252035204520552065207520852095210521152125213521452155216521752185219522052215222522352245225522652275228522952305231523252335234523552365237523852395240524152425243524452455246524752485249525052515252525352545255525652575258525952605261526252635264526552665267526852695270527152725273527452755276527752785279528052815282528352845285528652875288528952905291529252935294529552965297529852995300530153025303530453055306530753085309531053115312531353145315531653175318531953205321532253235324532553265327532853295330533153325333533453355336533753385339534053415342534353445345534653475348534953505351535253535354535553565357535853595360536153625363536453655366536753685369537053715372537353745375537653775378537953805381538253835384538553865387538853895390
  1. /*##############################################################################
  2. HPCC SYSTEMS software Copyright (C) 2012 HPCC Systems.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. ############################################################################## */
  13. #include "platform.h"
  14. #include "jlib.hpp"
  15. #include "ccd.hpp"
  16. #include "ccdquery.hpp"
  17. #include "ccdstate.hpp"
  18. #include "ccdserver.hpp"
  19. #include "ccdcontext.hpp"
  20. #include "ccddebug.hpp"
  21. #include "ccdactivities.hpp"
  22. #include "ccdqueue.ipp"
  23. #include "ccdsnmp.hpp"
  24. #include "ccdfile.hpp"
  25. #include "ccdkey.hpp"
  26. #include "rtlkey.hpp"
  27. #include "eclrtl_imp.hpp"
  28. #include "rtlread_imp.hpp"
  29. #include "jhtree.hpp"
  30. #include "jlog.hpp"
  31. #include "jmisc.hpp"
  32. #include "udplib.hpp"
  33. #include "csvsplitter.hpp"
  34. #include "thorxmlread.hpp"
  35. #include "thorcommon.ipp"
  36. #include "jstats.h"
  37. size32_t diskReadBufferSize = 0x10000;
  38. using roxiemem::OwnedRoxieRow;
  39. using roxiemem::OwnedConstRoxieRow;
  40. using roxiemem::OwnedRoxieString;
  41. using roxiemem::IRowManager;
  42. #define maxContinuationSize 48000 // note - must fit in the 2-byte length field... but also needs to be possible to send back from Roxie server->slave in one packet
  43. size32_t serializeRow(IOutputRowSerializer * serializer, IMessagePacker *output, const void *unserialized)
  44. {
  45. CSizingSerializer sizer;
  46. serializer->serialize(sizer, (const byte *) unserialized);
  47. unsigned serializedLength = sizer.size();
  48. void *udpBuffer = output->getBuffer(serializedLength, true);
  49. CRawRowSerializer memSerializer(serializedLength, (byte *) udpBuffer);
  50. serializer->serialize(memSerializer, (const byte *) unserialized);
  51. assertex(memSerializer.size() == serializedLength);
  52. output->putBuffer(udpBuffer, serializedLength, true);
  53. return serializedLength;
  54. }
  55. inline void appendBuffer(IMessagePacker * output, size32_t size, const void * data, bool isVariable)
  56. {
  57. void *recBuffer = output->getBuffer(size, isVariable);
  58. memcpy(recBuffer, data, size);
  59. output->putBuffer(recBuffer, size, isVariable);
  60. }
  61. extern void putStatsValue(IPropertyTree *node, const char *statName, const char *statType, unsigned __int64 val)
  62. {
  63. if (val)
  64. {
  65. StringBuffer xpath;
  66. xpath.appendf("att[@name='%s']", statName);
  67. IPropertyTree *att = node->queryPropTree(xpath.str());
  68. if (!att)
  69. {
  70. att = node->addPropTree("att", createPTree());
  71. att->setProp("@name", statName);
  72. }
  73. att->setProp("@type", statType);
  74. att->setPropInt64("@value", val);
  75. }
  76. }
  77. extern void putStatsValue(StringBuffer &reply, const char *statName, const char *statType, unsigned __int64 val)
  78. {
  79. if (val)
  80. {
  81. reply.appendf(" <att name='%s' type='%s' value='%"I64F"d'/>\n", statName, statType, val);
  82. }
  83. }
  84. CActivityFactory::CActivityFactory(unsigned _id, unsigned _subgraphId, IQueryFactory &_queryFactory, HelperFactory *_helperFactory, ThorActivityKind _kind)
  85. : id(_id),
  86. subgraphId(_subgraphId),
  87. queryFactory(_queryFactory),
  88. helperFactory(_helperFactory),
  89. kind(_kind)
  90. {
  91. if (helperFactory)
  92. {
  93. Owned<IHThorArg> helper = helperFactory();
  94. meta.set(helper->queryOutputMeta());
  95. }
  96. }
  97. void CActivityFactory::addChildQuery(unsigned id, ActivityArray *childQuery)
  98. {
  99. childQueries.append(*childQuery);
  100. childQueryIndexes.append(id);
  101. }
  102. ActivityArray *CActivityFactory::queryChildQuery(unsigned idx, unsigned &id)
  103. {
  104. if (childQueries.isItem(idx))
  105. {
  106. id = childQueryIndexes.item(idx);
  107. return &childQueries.item(idx);
  108. }
  109. id = 0;
  110. return NULL;
  111. }
  112. class CSlaveActivityFactory : public CActivityFactory, implements ISlaveActivityFactory
  113. {
  114. public:
  115. CSlaveActivityFactory(IPropertyTree &_graphNode, unsigned _subgraphId, IQueryFactory &_queryFactory, HelperFactory *_helperFactory)
  116. : CActivityFactory(_graphNode.getPropInt("@id", 0), _subgraphId, _queryFactory, _helperFactory, getActivityKind(_graphNode))
  117. {
  118. }
  119. virtual IQueryFactory &queryQueryFactory() const
  120. {
  121. return CActivityFactory::queryQueryFactory();
  122. }
  123. void addChildQuery(unsigned id, ActivityArray *childQuery)
  124. {
  125. CActivityFactory::addChildQuery(id, childQuery);
  126. }
  127. StringBuffer &toString(StringBuffer &ret) const
  128. {
  129. return ret.appendf("%p", this);
  130. }
  131. bool getEnableFieldTranslation() const
  132. {
  133. return queryFactory.getEnableFieldTranslation();
  134. }
  135. const char *queryQueryName() const
  136. {
  137. return queryFactory.queryQueryName();
  138. }
  139. virtual ActivityArray *queryChildQuery(unsigned idx, unsigned &id)
  140. {
  141. return CActivityFactory::queryChildQuery(idx, id);
  142. }
  143. virtual unsigned queryId() const
  144. {
  145. return CActivityFactory::queryId();
  146. }
  147. virtual ThorActivityKind getKind() const
  148. {
  149. return CActivityFactory::getKind();
  150. }
  151. virtual void noteStatistics(const StatsCollector &fromStats)
  152. {
  153. CActivityFactory::noteStatistics(fromStats);
  154. }
  155. virtual void getEdgeProgressInfo(unsigned idx, IPropertyTree &edge) const
  156. {
  157. CActivityFactory::getEdgeProgressInfo(idx, edge);
  158. }
  159. virtual void getNodeProgressInfo(IPropertyTree &node) const
  160. {
  161. CActivityFactory::getNodeProgressInfo(node);
  162. }
  163. virtual void resetNodeProgressInfo()
  164. {
  165. CActivityFactory::resetNodeProgressInfo();
  166. }
  167. virtual void getActivityMetrics(StringBuffer &reply) const
  168. {
  169. CActivityFactory::getActivityMetrics(reply);
  170. }
  171. IRoxieSlaveContext *createSlaveContext(SlaveContextLogger &logctx, IRoxieQueryPacket *packet) const
  172. {
  173. return queryFactory.createSlaveContext(logctx, packet);
  174. }
  175. virtual void getXrefInfo(IPropertyTree &reply, const IRoxieContextLogger &logctx) const
  176. {
  177. if (datafile)
  178. addXrefFileInfo(reply, datafile);
  179. }
  180. IRoxieSlaveContext *createChildQueries(IHThorArg *colocalArg, IArrayOf<IActivityGraph> &childGraphs, IProbeManager *_probeManager, SlaveContextLogger &logctx, IRoxieQueryPacket *packet) const
  181. {
  182. if (!childQueries.length())
  183. logctx.setDebuggerActive(false);
  184. if (meta.needsDestruct() || meta.needsSerializeDisk() || childQueries.length())
  185. {
  186. Owned<IRoxieSlaveContext> queryContext = queryFactory.createSlaveContext(logctx, packet);
  187. ForEachItemIn(idx, childQueries)
  188. {
  189. if (!_probeManager) // MORE - the probeAllRows is a hack!
  190. _probeManager = queryContext->queryProbeManager();
  191. IActivityGraph *childGraph = createActivityGraph(NULL, childQueryIndexes.item(idx), childQueries.item(idx), NULL, _probeManager, logctx); // MORE - the parent is wrong!
  192. childGraphs.append(*childGraph);
  193. queryContext->noteChildGraph(childQueryIndexes.item(idx), childGraph);
  194. childGraph->onCreate(queryContext, colocalArg); //NB: onCreate() on helper for activities in child graph are delayed, otherwise this would go wrong.
  195. }
  196. return queryContext.getClear();
  197. }
  198. return NULL;
  199. }
  200. Owned<const IResolvedFile> datafile;
  201. protected:
  202. static IPropertyTree *queryStatsNode(IPropertyTree *parent, const char *xpath)
  203. {
  204. StringBuffer levelx;
  205. const char *sep = strchr(xpath, '/');
  206. if (sep)
  207. levelx.append(sep-xpath, xpath);
  208. else
  209. levelx.append(xpath);
  210. IPropertyTree *child = parent->queryPropTree(levelx);
  211. if (!child)
  212. {
  213. const char *id = strchr(levelx, '[');
  214. if (!id)
  215. {
  216. child = createPTree(levelx);
  217. parent->addPropTree(levelx, child);
  218. }
  219. else
  220. {
  221. StringBuffer elem;
  222. elem.append(id-levelx, levelx);
  223. child = createPTree(elem);
  224. parent->addPropTree(elem, child);
  225. loop
  226. {
  227. StringBuffer attr, val;
  228. id++;
  229. while (*id != '=')
  230. attr.append(*id++);
  231. id++;
  232. char qu = *id++;
  233. while (*id != qu)
  234. val.append(*id++);
  235. child->setProp(attr, val);
  236. id++;
  237. if (*id == ']')
  238. {
  239. if (id[1]!='[')
  240. break;
  241. id++;
  242. }
  243. else
  244. throwUnexpected();
  245. }
  246. }
  247. }
  248. if (sep)
  249. return queryStatsNode(child, sep+1);
  250. else
  251. return child;
  252. }
  253. };
  254. //================================================================================================
  255. class CRoxieSlaveActivity : public CInterface, implements IRoxieSlaveActivity, implements ICodeContext
  256. {
  257. protected:
  258. SlaveContextLogger &logctx;
  259. Linked<IRoxieQueryPacket> packet;
  260. mutable Owned<IRoxieSlaveContext> queryContext; // bit of a hack but easier than changing the ICodeContext callback interface to remove const
  261. const CSlaveActivityFactory *basefactory;
  262. IArrayOf<IActivityGraph> childGraphs;
  263. IHThorArg *basehelper;
  264. PartNoType lastPartNo;
  265. MemoryBuffer serializedCreate;
  266. MemoryBuffer resentInfo;
  267. CachedOutputMetaData meta;
  268. Owned<IOutputRowSerializer> serializer;
  269. Owned<IEngineRowAllocator> rowAllocator;
  270. #ifdef _DEBUG
  271. Owned<IProbeManager> probeManager;
  272. #endif
  273. bool aborted;
  274. bool resent;
  275. bool isOpt;
  276. bool variableFileName;
  277. bool allowFieldTranslation;
  278. Owned<const IResolvedFile> varFileInfo;
  279. virtual void setPartNo(bool filechanged) = 0;
  280. inline void checkPartChanged(PartNoType &newPart)
  281. {
  282. if (newPart.partNo!=lastPartNo.partNo || newPart.fileNo!=lastPartNo.fileNo)
  283. {
  284. lastPartNo.partNo = newPart.partNo;
  285. bool filechanged = lastPartNo.fileNo != newPart.fileNo;
  286. lastPartNo.fileNo = newPart.fileNo;
  287. setPartNo(filechanged);
  288. }
  289. }
  290. virtual bool needsRowAllocator()
  291. {
  292. return meta.needsSerializeDisk() || meta.isVariableSize();
  293. }
  294. virtual void onCreate()
  295. {
  296. #ifdef _DEBUG
  297. // MORE - need to consider debugging....
  298. if (probeAllRows)
  299. {
  300. probeManager.setown(createProbeManager());
  301. queryContext.setown(basefactory->createChildQueries(basehelper, childGraphs, probeManager, logctx, packet));
  302. }
  303. else
  304. #endif
  305. queryContext.setown(basefactory->createChildQueries(basehelper, childGraphs, NULL, logctx, packet));
  306. if (!queryContext)
  307. queryContext.setown(basefactory->createSlaveContext(logctx, packet));
  308. if (meta.needsSerializeDisk())
  309. serializer.setown(meta.createDiskSerializer(queryContext->queryCodeContext(), basefactory->queryId()));
  310. if (needsRowAllocator())
  311. rowAllocator.setown(getRowAllocator(meta.queryOriginal(), basefactory->queryId()));
  312. unsigned parentExtractSize;
  313. serializedCreate.read(parentExtractSize);
  314. const byte * parentExtract = serializedCreate.readDirect(parentExtractSize);
  315. basehelper->onCreate(this, NULL, &serializedCreate);
  316. basehelper->onStart(parentExtract, &serializedCreate);
  317. deserializeExtra(serializedCreate);
  318. if (variableFileName) // note - in keyed join with dependent index case, kj itself won't have variableFileName but indexread might
  319. {
  320. CDateTime cacheDate(serializedCreate);
  321. unsigned checksum;
  322. serializedCreate.read(checksum);
  323. OwnedRoxieString fname(queryDynamicFileName());
  324. varFileInfo.setown(querySlaveDynamicFileCache()->lookupDynamicFile(logctx, fname, cacheDate, checksum, &packet->queryHeader(), isOpt, true));
  325. setVariableFileInfo();
  326. }
  327. }
  328. virtual void deserializeExtra(MemoryBuffer &out)
  329. {
  330. }
  331. CRoxieSlaveActivity(SlaveContextLogger &_logctx, IRoxieQueryPacket *_packet, HelperFactory *_hFactory, const CSlaveActivityFactory *_factory)
  332. : logctx(_logctx), packet(_packet), basefactory(_factory)
  333. {
  334. allowFieldTranslation = _factory->getEnableFieldTranslation();
  335. resent = packet->getContinuationLength() != 0;
  336. serializedCreate.setBuffer(packet->getContextLength(), (void *) packet->queryContextData(), false);
  337. if (resent)
  338. resentInfo.setBuffer(packet->getContinuationLength(), (void *) packet->queryContinuationData(), false);
  339. basehelper = _hFactory();
  340. aborted = false;
  341. lastPartNo.partNo = 0xffff;
  342. lastPartNo.fileNo = 0xffff;
  343. isOpt = false;
  344. variableFileName = false;
  345. meta.set(basehelper->queryOutputMeta());
  346. }
  347. ~CRoxieSlaveActivity()
  348. {
  349. ::Release(basehelper);
  350. }
  351. public:
  352. IMPLEMENT_IINTERFACE;
  353. virtual const char *queryDynamicFileName() const = 0;
  354. virtual void setVariableFileInfo() = 0;
  355. virtual IIndexReadActivityInfo *queryIndexReadActivity() { throwUnexpected(); } // should only be called for index activity
  356. virtual unsigned queryId()
  357. {
  358. return basefactory->queryId();
  359. }
  360. virtual bool check()
  361. {
  362. Owned<IMessagePacker> output = ROQ->createOutputStream(packet->queryHeader(), false, logctx);
  363. doCheck(output);
  364. output->flush(true);
  365. return true;
  366. }
  367. virtual void doCheck(IMessagePacker *output)
  368. {
  369. // MORE - unsophisticated default - if this approach seems fruitful then we can add something more thorough
  370. void *recBuffer = output->getBuffer(sizeof(bool), false);
  371. bool ret = false;
  372. memcpy(recBuffer, &ret, sizeof(bool));
  373. }
  374. virtual void abort()
  375. {
  376. if (logctx.queryTraceLevel() > 2)
  377. {
  378. StringBuffer s;
  379. logctx.CTXLOG("Aborting running activity: %s", packet->queryHeader().toString(s).str());
  380. }
  381. logctx.requestAbort();
  382. aborted = true;
  383. if (queryContext)
  384. {
  385. Owned<IException> E = MakeStringException(ROXIE_ABORT_ERROR, "Roxie server requested abort for running activity");
  386. queryContext->notifyAbort(E);
  387. }
  388. }
  389. virtual IRoxieQueryPacket *queryPacket() const
  390. {
  391. return packet;
  392. }
  393. void limitExceeded(bool keyed = false)
  394. {
  395. RoxiePacketHeader &header = packet->queryHeader();
  396. StringBuffer s;
  397. logctx.CTXLOG("%sLIMIT EXCEEDED: %s", keyed ? "KEYED " : "", header.toString(s).str());
  398. header.activityId = keyed ? ROXIE_KEYEDLIMIT_EXCEEDED : ROXIE_LIMIT_EXCEEDED;
  399. Owned<IMessagePacker> output = ROQ->createOutputStream(header, false, logctx);
  400. output->flush(true);
  401. aborted = true;
  402. }
  403. virtual IThorChildGraph * resolveChildQuery(__int64 activityId, IHThorArg * colocal)
  404. {
  405. assertex(colocal == basehelper);
  406. return queryContext->queryCodeContext()->resolveChildQuery(activityId, colocal);
  407. }
  408. size32_t serializeRow(IMessagePacker *output, const void *unserialized) const
  409. {
  410. return ::serializeRow(serializer, output, unserialized);
  411. }
  412. virtual const char *loadResource(unsigned id)
  413. {
  414. return queryContext->queryCodeContext()->loadResource(id);
  415. }
  416. // Sets should not happen - they can only happen in the main Roxie server context
  417. virtual void setResultBool(const char *name, unsigned sequence, bool value) { throwUnexpected(); }
  418. virtual void setResultData(const char *name, unsigned sequence, int len, const void * data) { throwUnexpected(); }
  419. virtual void setResultDecimal(const char * stepname, unsigned sequence, int len, int precision, bool isSigned, const void *val) { throwUnexpected(); }
  420. virtual void setResultInt(const char *name, unsigned sequence, __int64 value) { throwUnexpected(); }
  421. virtual void setResultRaw(const char *name, unsigned sequence, int len, const void * data) { throwUnexpected(); }
  422. virtual void setResultReal(const char * stepname, unsigned sequence, double value) { throwUnexpected(); }
  423. virtual void setResultSet(const char *name, unsigned sequence, bool isAll, size32_t len, const void * data, ISetToXmlTransformer * transformer) { throwUnexpected(); }
  424. virtual void setResultString(const char *name, unsigned sequence, int len, const char * str) { throwUnexpected(); }
  425. virtual void setResultUInt(const char *name, unsigned sequence, unsigned __int64 value) { throwUnexpected(); }
  426. virtual void setResultUnicode(const char *name, unsigned sequence, int len, UChar const * str) { throwUnexpected(); }
  427. virtual void setResultVarString(const char * name, unsigned sequence, const char * value) { throwUnexpected(); }
  428. virtual void setResultVarUnicode(const char * name, unsigned sequence, UChar const * value) { throwUnexpected(); }
  429. // Some gets are allowed though (e.g. for ONCE values)
  430. virtual bool getResultBool(const char * name, unsigned sequence)
  431. {
  432. return queryContext->queryCodeContext()->getResultBool(name, sequence);
  433. }
  434. virtual void getResultData(unsigned & tlen, void * & tgt, const char * name, unsigned sequence)
  435. {
  436. queryContext->queryCodeContext()->getResultData(tlen, tgt, name, sequence);
  437. }
  438. virtual void getResultDecimal(unsigned tlen, int precision, bool isSigned, void * tgt, const char * stepname, unsigned sequence)
  439. {
  440. queryContext->queryCodeContext()->getResultDecimal(tlen, precision, isSigned, tgt, stepname, sequence);
  441. }
  442. virtual void getResultRaw(unsigned & tlen, void * & tgt, const char * name, unsigned sequence, IXmlToRowTransformer * xmlTransformer, ICsvToRowTransformer * csvTransformer)
  443. {
  444. queryContext->queryCodeContext()->getResultRaw(tlen, tgt, name, sequence, xmlTransformer, csvTransformer);
  445. }
  446. virtual void getResultSet(bool & isAll, size32_t & tlen, void * & tgt, const char * name, unsigned sequence, IXmlToRowTransformer * xmlTransformer, ICsvToRowTransformer * csvTransformer)
  447. {
  448. queryContext->queryCodeContext()->getResultSet(isAll, tlen, tgt, name, sequence, xmlTransformer, csvTransformer);
  449. }
  450. virtual __int64 getResultInt(const char * name, unsigned sequence)
  451. {
  452. return queryContext->queryCodeContext()->getResultInt(name, sequence);
  453. }
  454. virtual double getResultReal(const char * name, unsigned sequence)
  455. {
  456. return queryContext->queryCodeContext()->getResultReal(name, sequence);
  457. }
  458. virtual void getResultString(unsigned & tlen, char * & tgt, const char * name, unsigned sequence)
  459. {
  460. queryContext->queryCodeContext()->getResultString(tlen, tgt, name, sequence);
  461. }
  462. virtual void getResultStringF(unsigned tlen, char * tgt, const char * name, unsigned sequence)
  463. {
  464. queryContext->queryCodeContext()->getResultStringF(tlen, tgt, name, sequence);
  465. }
  466. virtual void getResultUnicode(unsigned & tlen, UChar * & tgt, const char * name, unsigned sequence)
  467. {
  468. queryContext->queryCodeContext()->getResultUnicode(tlen, tgt, name, sequence);
  469. }
  470. virtual char *getResultVarString(const char * name, unsigned sequence)
  471. {
  472. return queryContext->queryCodeContext()->getResultVarString(name, sequence);
  473. }
  474. virtual UChar *getResultVarUnicode(const char * name, unsigned sequence)
  475. {
  476. return queryContext->queryCodeContext()->getResultVarUnicode(name, sequence);
  477. }
  478. virtual void getResultRowset(size32_t & tcount, byte * * & tgt, const char * name, unsigned sequence, IEngineRowAllocator * _rowAllocator, bool isGrouped, IXmlToRowTransformer * xmlTransformer, ICsvToRowTransformer * csvTransformer)
  479. {
  480. return queryContext->queryCodeContext()->getResultRowset(tcount, tgt, name, sequence, _rowAllocator, isGrouped, xmlTransformer, csvTransformer);
  481. }
  482. virtual void getResultDictionary(size32_t & tcount, byte * * & tgt, IEngineRowAllocator * _rowAllocator, const char * name, unsigned sequence, IXmlToRowTransformer * xmlTransformer, ICsvToRowTransformer * csvTransformer, IHThorHashLookupInfo * hasher)
  483. {
  484. return queryContext->queryCodeContext()->getResultDictionary(tcount, tgt, _rowAllocator, name, sequence, xmlTransformer, csvTransformer, hasher);
  485. }
  486. virtual unsigned getResultHash(const char * name, unsigned sequence) { throwUnexpected(); }
  487. // Not yet thought about these....
  488. virtual char *getWuid() { throwUnexpected(); } // caller frees return string.
  489. virtual void getExternalResultRaw(unsigned & tlen, void * & tgt, const char * wuid, const char * stepname, unsigned sequence, IXmlToRowTransformer * xmlTransformer, ICsvToRowTransformer * csvTransformer) { throwUnexpected(); } // shouldn't really be here, but it broke thor.
  490. virtual void executeGraph(const char * graphName, bool realThor, size32_t parentExtractSize, const void * parentExtract) { throwUnexpected(); }
  491. virtual unsigned __int64 getDatasetHash(const char * name, unsigned __int64 hash) { throwUnexpected(); return 0; }
  492. virtual char * getExpandLogicalName(const char * logicalName) { throwUnexpected(); }
  493. virtual void addWuException(const char * text, unsigned code, unsigned severity) { throwUnexpected(); }
  494. virtual void addWuAssertFailure(unsigned code, const char * text, const char * filename, unsigned lineno, unsigned column, bool isAbort) { throwUnexpected(); }
  495. virtual IUserDescriptor *queryUserDescriptor() { throwUnexpected(); }
  496. virtual unsigned getNodes() { throwUnexpected(); }
  497. virtual unsigned getNodeNum() { throwUnexpected(); }
  498. virtual char *getFilePart(const char *logicalPart, bool create=false) { throwUnexpected(); } // caller frees return string.
  499. virtual unsigned __int64 getFileOffset(const char *logicalPart) { throwUnexpected(); }
  500. virtual IDistributedFileTransaction *querySuperFileTransaction() { throwUnexpected(); }
  501. virtual char *getJobName() { throwUnexpected(); } // caller frees return string.
  502. virtual char *getJobOwner() { throwUnexpected(); } // caller frees return string.
  503. virtual char *getClusterName() { throwUnexpected(); } // caller frees return str.
  504. virtual char *getGroupName() { throwUnexpected(); } // caller frees return string.
  505. virtual char * queryIndexMetaData(char const * lfn, char const * xpath) { throwUnexpected(); }
  506. virtual char *getDaliServers() { return queryContext->queryCodeContext()->getDaliServers(); }
  507. // The below are called on Roxie server and passed in context
  508. virtual unsigned getPriority() const { throwUnexpected(); }
  509. virtual char *getPlatform() { throwUnexpected(); }
  510. virtual char *getEnv(const char *name, const char *defaultValue) const { throwUnexpected(); }
  511. virtual char *getOS() { throwUnexpected(); }
  512. virtual unsigned logString(const char *text) const
  513. {
  514. if (text && *text)
  515. {
  516. logctx.CTXLOG("USER: %s", text);
  517. return strlen(text);
  518. }
  519. else
  520. return 0;
  521. }
  522. virtual const IContextLogger &queryContextLogger() const
  523. {
  524. return logctx;
  525. }
  526. virtual IEngineRowAllocator * getRowAllocator(IOutputMetaData * meta, unsigned activityId) const
  527. {
  528. return queryContext->queryCodeContext()->getRowAllocator(meta, activityId);
  529. }
  530. virtual const char *cloneVString(const char *str) const
  531. {
  532. return queryContext->queryCodeContext()->cloneVString(str);
  533. }
  534. virtual const char *cloneVString(size32_t len, const char *str) const
  535. {
  536. return queryContext->queryCodeContext()->cloneVString(len, str);
  537. }
  538. virtual void getRowXML(size32_t & lenResult, char * & result, IOutputMetaData & info, const void * row, unsigned flags)
  539. {
  540. convertRowToXML(lenResult, result, info, row, flags);
  541. }
  542. const void * fromXml(IEngineRowAllocator * rowAllocator, size32_t len, const char * utf8, IXmlToRowTransformer * xmlTransformer, bool stripWhitespace)
  543. {
  544. return createRowFromXml(rowAllocator, len, utf8, xmlTransformer, stripWhitespace);
  545. }
  546. virtual IEngineContext *queryEngineContext() { return NULL; }
  547. };
  548. //================================================================================================
  549. class OptimizedRowBuilder : public ARowBuilder, public CInterface
  550. {
  551. public:
  552. OptimizedRowBuilder(IEngineRowAllocator * _rowAllocator, const CachedOutputMetaData & _meta, IMessagePacker * _output, IOutputRowSerializer * _serializer)
  553. : dynamicBuilder(_rowAllocator, false), meta(_meta), serializer(_serializer), output(_output)
  554. {
  555. useDynamic = serializer != NULL || meta.isVariableSize();
  556. }
  557. IMPLEMENT_IINTERFACE
  558. virtual byte * createSelf()
  559. {
  560. if (useDynamic)
  561. {
  562. dynamicBuilder.ensureRow();
  563. self = dynamicBuilder.getSelf();
  564. }
  565. else
  566. self = static_cast<byte *>(output->getBuffer(meta.getFixedSize(), false));
  567. return self;
  568. }
  569. virtual byte * ensureCapacity(size32_t required, const char * fieldName)
  570. {
  571. if (useDynamic)
  572. {
  573. self = dynamicBuilder.ensureCapacity(required, fieldName);
  574. return static_cast<byte *>(self);
  575. }
  576. else
  577. {
  578. size32_t fixedLength = meta.getFixedSize();
  579. if (required <= fixedLength)
  580. return static_cast<byte *>(self);
  581. // This should never happen!
  582. rtlReportFieldOverflow(required, fixedLength, fieldName);
  583. return NULL;
  584. }
  585. }
  586. virtual void reportMissingRow() const
  587. {
  588. throw MakeStringException(MSGAUD_user, 1000, "OptimizedRowBuilder::row() is NULL");
  589. }
  590. inline void ensureRow()
  591. {
  592. if (!self)
  593. createSelf();
  594. }
  595. inline void clear()
  596. {
  597. if (useDynamic)
  598. dynamicBuilder.clear();
  599. self = NULL;
  600. }
  601. size_t writeToOutput(size32_t transformedSize, bool outputIfEmpty)
  602. {
  603. size32_t outputSize = transformedSize;
  604. if (transformedSize || outputIfEmpty)
  605. {
  606. if (useDynamic)
  607. {
  608. OwnedConstRoxieRow result = dynamicBuilder.finalizeRowClear(transformedSize);
  609. if (serializer)
  610. outputSize = serializeRow(serializer, output, result);
  611. else
  612. {
  613. self = static_cast<byte *>(output->getBuffer(transformedSize, true));
  614. memcpy(self, result, transformedSize);
  615. output->putBuffer(self, transformedSize, true);
  616. }
  617. }
  618. else
  619. {
  620. output->putBuffer(self, transformedSize, false);
  621. }
  622. }
  623. clear();
  624. return outputSize;
  625. }
  626. private:
  627. RtlDynamicRowBuilder dynamicBuilder;
  628. const CachedOutputMetaData & meta;
  629. IMessagePacker * output;
  630. IOutputRowSerializer * serializer;
  631. bool useDynamic;
  632. };
  633. class OptimizedKJRowBuilder : public ARowBuilder, public CInterface
  634. {
  635. // Rules are different enough that we can't easily derive from OptimizedRowBuilder
  636. public:
  637. IMPLEMENT_IINTERFACE;
  638. OptimizedKJRowBuilder(IEngineRowAllocator * _rowAllocator, const CachedOutputMetaData & _meta, IMessagePacker * _output)
  639. : dynamicBuilder(_rowAllocator, false), meta(_meta), output(_output)
  640. {
  641. useDynamic = meta.isVariableSize();
  642. }
  643. virtual byte * createSelf()
  644. {
  645. if (useDynamic)
  646. {
  647. dynamicBuilder.ensureRow();
  648. self = dynamicBuilder.getSelf();
  649. return self;
  650. }
  651. else
  652. {
  653. self = static_cast<byte *>(output->getBuffer(KEYEDJOIN_RECORD_SIZE(meta.getFixedSize()), true)) + KEYEDJOIN_RECORD_SIZE(0);
  654. return self ;
  655. }
  656. }
  657. virtual byte * ensureCapacity(size32_t required, const char * fieldName)
  658. {
  659. if (useDynamic)
  660. {
  661. self = dynamicBuilder.ensureCapacity(required, fieldName);
  662. return self;
  663. }
  664. else
  665. {
  666. size32_t fixedLength = meta.getFixedSize();
  667. if (required <= fixedLength)
  668. return self;
  669. rtlReportFieldOverflow(required, fixedLength, fieldName);
  670. return NULL;
  671. }
  672. }
  673. virtual void reportMissingRow() const
  674. {
  675. throw MakeStringException(MSGAUD_user, 1000, "OptimizedKJRowBuilder::row() is NULL");
  676. }
  677. inline void ensureRow()
  678. {
  679. if (!self)
  680. createSelf();
  681. }
  682. void writeToOutput(size32_t transformedSize, offset_t recptr, CJoinGroup *jg, unsigned short partNo)
  683. {
  684. KeyedJoinHeader *rec;
  685. if (useDynamic)
  686. {
  687. OwnedConstRoxieRow result = dynamicBuilder.finalizeRowClear(transformedSize);
  688. rec = (KeyedJoinHeader *) (output->getBuffer(KEYEDJOIN_RECORD_SIZE(transformedSize), true));
  689. memcpy(&rec->rhsdata, result, transformedSize);
  690. }
  691. else
  692. {
  693. rec = (KeyedJoinHeader *)(self - KEYEDJOIN_RECORD_SIZE(0));
  694. }
  695. rec->fpos = recptr;
  696. rec->thisGroup = jg;
  697. rec->partNo = partNo;
  698. output->putBuffer(rec, KEYEDJOIN_RECORD_SIZE(transformedSize), true);
  699. self = NULL;
  700. }
  701. private:
  702. RtlDynamicRowBuilder dynamicBuilder;
  703. const CachedOutputMetaData & meta;
  704. IMessagePacker * output;
  705. bool useDynamic;
  706. };
  707. //================================================================================================
  708. class CRoxieDiskReadBaseActivity : public CRoxieSlaveActivity, implements IIndexReadContext//, implements IDiskReadActivity
  709. {
  710. friend class RecordProcessor;
  711. friend class KeyedRecordProcessor;
  712. friend class UnkeyedRecordProcessor;
  713. friend class UnkeyedVariableRecordProcessor;
  714. friend class KeyedNormalizeRecordProcessor;
  715. friend class UnkeyedNormalizeRecordProcessor;
  716. friend class KeyedCountRecordProcessor;
  717. friend class UnkeyedCountRecordProcessor;
  718. friend class UnkeyedVariableCountRecordProcessor;
  719. friend class KeyedAggregateRecordProcessor;
  720. friend class UnkeyedAggregateRecordProcessor;
  721. friend class UnkeyedVariableAggregateRecordProcessor;
  722. friend class KeyedGroupAggregateRecordProcessor;
  723. friend class UnkeyedGroupAggregateRecordProcessor;
  724. friend class UnkeyedVariableGroupAggregateRecordProcessor;
  725. protected:
  726. IHThorDiskReadBaseArg *helper;
  727. unsigned processed;
  728. unsigned parallelPartNo;
  729. unsigned numParallel;
  730. bool isKeyed;
  731. bool forceUnkeyed;
  732. offset_t readPos;
  733. CachedOutputMetaData diskSize;
  734. Owned<IInMemoryIndexCursor> cursor;
  735. Linked<IInMemoryIndexManager> manager;
  736. Owned<IInMemoryFileProcessor> processor;
  737. Owned<IFileIOArray> varFiles;
  738. CriticalSection pcrit;
  739. public:
  740. IMPLEMENT_IINTERFACE;
  741. CRoxieDiskReadBaseActivity(SlaveContextLogger &_logctx, IRoxieQueryPacket *_packet, HelperFactory *_hFactory, const CSlaveActivityFactory *_aFactory,
  742. IInMemoryIndexManager *_manager,
  743. unsigned _parallelPartNo, unsigned _numParallel, bool _forceUnkeyed)
  744. : CRoxieSlaveActivity(_logctx, _packet, _hFactory, _aFactory),
  745. manager(_manager),
  746. parallelPartNo(_parallelPartNo),
  747. numParallel(_numParallel),
  748. forceUnkeyed(_forceUnkeyed)
  749. {
  750. helper = (IHThorDiskReadBaseArg *) basehelper;
  751. variableFileName = allFilesDynamic || ((helper->getFlags() & (TDXvarfilename|TDXdynamicfilename)) != 0);
  752. isOpt = (helper->getFlags() & TDRoptional) != 0;
  753. diskSize.set(helper->queryDiskRecordSize());
  754. processed = 0;
  755. readPos = 0;
  756. isKeyed = false;
  757. if (resent)
  758. {
  759. bool usedKey;
  760. resentInfo.read(processed);
  761. resentInfo.read(usedKey);
  762. if (usedKey)
  763. {
  764. cursor.setown(manager->createCursor());
  765. cursor->deserializeCursorPos(resentInfo);
  766. isKeyed = true;
  767. }
  768. else
  769. resentInfo.read(readPos);
  770. assertex(resentInfo.remaining() == 0);
  771. }
  772. }
  773. virtual void onCreate()
  774. {
  775. CRoxieSlaveActivity::onCreate();
  776. helper->createSegmentMonitors(this);
  777. if (!resent)
  778. isKeyed = (cursor && !forceUnkeyed) ? cursor->selectKey() : false;
  779. }
  780. virtual const char *queryDynamicFileName() const
  781. {
  782. return helper->getFileName();
  783. }
  784. virtual void setVariableFileInfo()
  785. {
  786. unsigned channel = packet->queryHeader().channel;
  787. varFiles.setown(varFileInfo->getIFileIOArray(isOpt, channel)); // MORE could combine
  788. manager.setown(varFileInfo->getIndexManager(isOpt, channel, varFiles, diskSize, false, 0));
  789. }
  790. inline bool queryKeyed() const
  791. {
  792. return isKeyed;
  793. }
  794. void setParallel(unsigned _partno, unsigned _numParallel)
  795. {
  796. assertex(!processor);
  797. parallelPartNo = _partno;
  798. numParallel = _numParallel;
  799. }
  800. virtual StringBuffer &toString(StringBuffer &ret) const
  801. {
  802. return ret.appendf("DiskRead %u", packet->queryHeader().activityId);
  803. }
  804. virtual void append(IKeySegmentMonitor *segment)
  805. {
  806. if (!segment->isWild())
  807. {
  808. if (!cursor)
  809. cursor.setown(manager->createCursor());
  810. cursor->append(segment);
  811. }
  812. }
  813. virtual unsigned ordinality() const
  814. {
  815. return cursor ? cursor->ordinality() : 0;
  816. }
  817. virtual IKeySegmentMonitor *item(unsigned idx) const
  818. {
  819. return cursor ? cursor->item(idx) : 0;
  820. }
  821. virtual void setMergeBarrier(unsigned barrierOffset)
  822. {
  823. // no merging so no issue...
  824. }
  825. virtual void abort()
  826. {
  827. aborted = true;
  828. CriticalBlock p(pcrit);
  829. if (processor)
  830. processor->abort();
  831. }
  832. virtual bool process()
  833. {
  834. MTIME_SECTION(timer, "CRoxieDiskReadBaseActivity::process");
  835. atomic_inc(&diskReadStarted);
  836. Owned<IMessagePacker> output = ROQ->createOutputStream(packet->queryHeader(), false, logctx);
  837. doProcess(output);
  838. helper->setCallback(NULL);
  839. logctx.flush(true, aborted);
  840. if (aborted)
  841. {
  842. output->abort();
  843. return false;
  844. }
  845. else
  846. {
  847. output->flush(true);
  848. atomic_inc(&diskReadCompleted);
  849. return true;
  850. }
  851. }
  852. virtual void doCheck(IMessagePacker *output)
  853. {
  854. // for in-memory diskread activities, not a lot to check. If it got this far the answer is 'true'...
  855. void *recBuffer = output->getBuffer(sizeof(bool), false);
  856. bool ret = true;
  857. memcpy(recBuffer, &ret, sizeof(bool));
  858. }
  859. virtual void doProcess(IMessagePacker * output) = 0;
  860. virtual void setPartNo(bool filechanged)
  861. {
  862. throwUnexpected();
  863. }
  864. };
  865. class CRoxieDiskBaseActivityFactory : public CSlaveActivityFactory
  866. {
  867. protected:
  868. Owned<IFileIOArray> fileArray;
  869. Owned<IInMemoryIndexManager> manager;
  870. public:
  871. CRoxieDiskBaseActivityFactory(IPropertyTree &_graphNode, unsigned _subgraphId, IQueryFactory &_queryFactory, HelperFactory *_helperFactory)
  872. : CSlaveActivityFactory(_graphNode, _subgraphId, _queryFactory, _helperFactory)
  873. {
  874. Owned<IHThorDiskReadBaseArg> helper = (IHThorDiskReadBaseArg *) helperFactory();
  875. bool variableFileName = allFilesDynamic || ((helper->getFlags() & (TDXvarfilename|TDXdynamicfilename)) != 0);
  876. if (!variableFileName)
  877. {
  878. bool isOpt = (helper->getFlags() & TDRoptional) != 0;
  879. OwnedRoxieString fileName(helper->getFileName());
  880. datafile.setown(_queryFactory.queryPackage().lookupFileName(fileName, isOpt, true, _queryFactory.queryWorkUnit()));
  881. if (datafile)
  882. {
  883. unsigned channel = queryFactory.queryChannel();
  884. fileArray.setown(datafile->getIFileIOArray(isOpt, channel));
  885. manager.setown(datafile->getIndexManager(isOpt, channel, fileArray, helper->queryDiskRecordSize(), _graphNode.getPropBool("att[@name=\"preload\"]/@value", false), _graphNode.getPropInt("att[@name=\"_preloadSize\"]/@value", 0)));
  886. Owned<IPropertyTreeIterator> memKeyInfo = queryFactory.queryPackage().getInMemoryIndexInfo(_graphNode);
  887. if (memKeyInfo)
  888. {
  889. ForEach(*memKeyInfo)
  890. {
  891. IPropertyTree &info = memKeyInfo->query();
  892. manager->setKeyInfo(info);
  893. }
  894. }
  895. }
  896. else
  897. manager.setown(getEmptyIndexManager());
  898. }
  899. }
  900. ~CRoxieDiskBaseActivityFactory()
  901. {
  902. }
  903. };
  904. //================================================================================================
  905. class CRoxieDiskReadActivity;
  906. class CRoxieCsvReadActivity;
  907. class CRoxieXmlReadActivity;
  908. IInMemoryFileProcessor *createKeyedRecordProcessor(IInMemoryIndexCursor *cursor, CRoxieDiskReadActivity &owner, bool resent);
  909. IInMemoryFileProcessor *createUnkeyedRecordProcessor(IInMemoryIndexCursor *cursor, CRoxieDiskReadActivity &owner, bool variableDisk, IDirectReader *reader);
  910. IInMemoryFileProcessor *createCsvRecordProcessor(CRoxieCsvReadActivity &owner, IDirectReader *reader, bool _skipHeader, const IResolvedFile *datafile);
  911. IInMemoryFileProcessor *createXmlRecordProcessor(CRoxieXmlReadActivity &owner, IDirectReader *reader);
  912. class CRoxieDiskReadActivity : public CRoxieDiskReadBaseActivity
  913. {
  914. friend class ReadRecordProcessor;
  915. protected:
  916. IHThorDiskReadArg *helper;
  917. public:
  918. IMPLEMENT_IINTERFACE;
  919. CRoxieDiskReadActivity(SlaveContextLogger &_logctx, IRoxieQueryPacket *_packet, HelperFactory *_hFactory, const CSlaveActivityFactory *_aFactory,
  920. IInMemoryIndexManager *_manager)
  921. : CRoxieDiskReadBaseActivity(_logctx, _packet, _hFactory, _aFactory, _manager, 0, 1, false)
  922. {
  923. onCreate();
  924. helper = (IHThorDiskReadArg *) basehelper;
  925. }
  926. virtual StringBuffer &toString(StringBuffer &ret) const
  927. {
  928. return ret.appendf("DiskRead %u", packet->queryHeader().activityId);
  929. }
  930. virtual void doProcess(IMessagePacker * output)
  931. {
  932. {
  933. CriticalBlock p(pcrit);
  934. processor.setown(isKeyed ? createKeyedRecordProcessor(cursor, *this, resent) :
  935. createUnkeyedRecordProcessor(cursor, *this, diskSize.isVariableSize(), manager->createReader(readPos, parallelPartNo, numParallel)));
  936. }
  937. unsigned __int64 rowLimit = helper->getRowLimit();
  938. unsigned __int64 stopAfter = helper->getChooseNLimit();
  939. processor->doQuery(output, processed, rowLimit, stopAfter);
  940. }
  941. size32_t doTransform(IMessagePacker * output, const void *src) const
  942. {
  943. OptimizedRowBuilder rowBuilder(rowAllocator, meta, output, serializer);
  944. unsigned transformedSize = helper->transform(rowBuilder, src);
  945. return rowBuilder.writeToOutput(transformedSize, false);
  946. }
  947. };
  948. class CRoxieCsvReadActivity : public CRoxieDiskReadBaseActivity
  949. {
  950. public:
  951. friend class CsvRecordProcessor;
  952. protected:
  953. IHThorCsvReadArg *helper;
  954. const IResolvedFile *datafile;
  955. public:
  956. IMPLEMENT_IINTERFACE;
  957. CRoxieCsvReadActivity(SlaveContextLogger &_logctx, IRoxieQueryPacket *_packet, HelperFactory *_hFactory,
  958. const CSlaveActivityFactory *_aFactory, IInMemoryIndexManager *_manager, const IResolvedFile *_datafile)
  959. : CRoxieDiskReadBaseActivity(_logctx, _packet, _hFactory, _aFactory, _manager, 0, 1, true), datafile(_datafile)
  960. {
  961. onCreate();
  962. helper = (IHThorCsvReadArg *) basehelper;
  963. }
  964. virtual StringBuffer &toString(StringBuffer &ret) const
  965. {
  966. return ret.appendf("CsvRead %u", packet->queryHeader().activityId);
  967. }
  968. virtual void doProcess(IMessagePacker * output)
  969. {
  970. {
  971. CriticalBlock p(pcrit);
  972. processor.setown(
  973. createCsvRecordProcessor(*this,
  974. manager->createReader(readPos, parallelPartNo, numParallel),
  975. packet->queryHeader().channel==1 && !resent,
  976. varFileInfo ? varFileInfo.get() : datafile));
  977. }
  978. unsigned __int64 rowLimit = helper->getRowLimit();
  979. unsigned __int64 stopAfter = helper->getChooseNLimit();
  980. processor->doQuery(output, processed, rowLimit, stopAfter);
  981. }
  982. size32_t doTransform(IMessagePacker * output, unsigned *srcLen, const char **src) const
  983. {
  984. OptimizedRowBuilder rowBuilder(rowAllocator, meta, output, serializer);
  985. unsigned transformedSize = helper->transform(rowBuilder, srcLen, src);
  986. return rowBuilder.writeToOutput(transformedSize, false);
  987. }
  988. };
  989. class CRoxieXmlReadActivity : public CRoxieDiskReadBaseActivity
  990. {
  991. public:
  992. friend class XmlRecordProcessor;
  993. protected:
  994. IHThorXmlReadArg *helper;
  995. public:
  996. IMPLEMENT_IINTERFACE;
  997. CRoxieXmlReadActivity(SlaveContextLogger &_logctx, IRoxieQueryPacket *_packet, HelperFactory *_hFactory, const CSlaveActivityFactory *_aFactory,
  998. IInMemoryIndexManager *_manager)
  999. : CRoxieDiskReadBaseActivity(_logctx, _packet, _hFactory, _aFactory, _manager, 0, 1, true)
  1000. {
  1001. onCreate();
  1002. helper = (IHThorXmlReadArg *) basehelper;
  1003. }
  1004. virtual StringBuffer &toString(StringBuffer &ret) const
  1005. {
  1006. return ret.appendf("XmlRead %u", packet->queryHeader().activityId);
  1007. }
  1008. virtual void doProcess(IMessagePacker * output)
  1009. {
  1010. {
  1011. CriticalBlock p(pcrit);
  1012. processor.setown(createXmlRecordProcessor(*this, manager->createReader(readPos, parallelPartNo, numParallel)));
  1013. }
  1014. unsigned __int64 rowLimit = helper->getRowLimit();
  1015. unsigned __int64 stopAfter = helper->getChooseNLimit();
  1016. processor->doQuery(output, processed, rowLimit, stopAfter);
  1017. }
  1018. size32_t doTransform(IMessagePacker * output, IXmlToRowTransformer *rowTransformer, IColumnProvider *lastMatch, IThorDiskCallback *callback) const
  1019. {
  1020. OptimizedRowBuilder rowBuilder(rowAllocator, meta, output, serializer);
  1021. unsigned transformedSize = rowTransformer->transform(rowBuilder, lastMatch, callback);
  1022. return rowBuilder.writeToOutput(transformedSize, false);
  1023. }
  1024. };
  1025. class CRoxieDiskReadActivityFactory : public CRoxieDiskBaseActivityFactory
  1026. {
  1027. public:
  1028. IMPLEMENT_IINTERFACE;
  1029. CRoxieDiskReadActivityFactory(IPropertyTree &_graphNode, unsigned _subgraphId, IQueryFactory &_queryFactory, HelperFactory *_helperFactory)
  1030. : CRoxieDiskBaseActivityFactory(_graphNode, _subgraphId, _queryFactory, _helperFactory)
  1031. {
  1032. }
  1033. virtual IRoxieSlaveActivity *createActivity(SlaveContextLogger &logctx, IRoxieQueryPacket *packet) const
  1034. {
  1035. return new CRoxieDiskReadActivity(logctx, packet, helperFactory, this, manager);
  1036. }
  1037. virtual StringBuffer &toString(StringBuffer &s) const
  1038. {
  1039. return CSlaveActivityFactory::toString(s.append("DiskRead "));
  1040. }
  1041. };
  1042. class CRoxieCsvReadActivityFactory : public CRoxieDiskBaseActivityFactory
  1043. {
  1044. public:
  1045. IMPLEMENT_IINTERFACE;
  1046. CRoxieCsvReadActivityFactory(IPropertyTree &_graphNode, unsigned _subgraphId, IQueryFactory &_queryFactory, HelperFactory *_helperFactory)
  1047. : CRoxieDiskBaseActivityFactory(_graphNode, _subgraphId, _queryFactory, _helperFactory)
  1048. {
  1049. }
  1050. virtual IRoxieSlaveActivity *createActivity(SlaveContextLogger &logctx, IRoxieQueryPacket *packet) const
  1051. {
  1052. return new CRoxieCsvReadActivity(logctx, packet, helperFactory, this, manager, datafile);
  1053. }
  1054. virtual StringBuffer &toString(StringBuffer &s) const
  1055. {
  1056. return CSlaveActivityFactory::toString(s.append("DiskRead "));
  1057. }
  1058. };
  1059. class CRoxieXmlReadActivityFactory : public CRoxieDiskBaseActivityFactory
  1060. {
  1061. public:
  1062. IMPLEMENT_IINTERFACE;
  1063. CRoxieXmlReadActivityFactory(IPropertyTree &_graphNode, unsigned _subgraphId, IQueryFactory &_queryFactory, HelperFactory *_helperFactory)
  1064. : CRoxieDiskBaseActivityFactory(_graphNode, _subgraphId, _queryFactory, _helperFactory)
  1065. {
  1066. }
  1067. virtual IRoxieSlaveActivity *createActivity(SlaveContextLogger &logctx, IRoxieQueryPacket *packet) const
  1068. {
  1069. return new CRoxieXmlReadActivity(logctx, packet, helperFactory, this, manager);
  1070. }
  1071. virtual StringBuffer &toString(StringBuffer &s) const
  1072. {
  1073. return CSlaveActivityFactory::toString(s.append("DiskRead "));
  1074. }
  1075. };
  1076. //================================================================================================
  1077. // Note - the classes below could be commoned up to make the code smaller, but they have been deliberately unrolled to
  1078. // keep to a bare minimum the number of virtual calls/variable tests per record scanned. This is very speed critical.
  1079. class RecordProcessor : public CInterface, implements IInMemoryFileProcessor
  1080. {
  1081. protected:
  1082. IInMemoryIndexCursor *cursor; // Unkeyed variants still may need to check segmonitors in here
  1083. bool aborted;
  1084. const char *endRec;
  1085. static inline size32_t roundDown(size32_t got, size32_t fixedSize)
  1086. {
  1087. // Make sure that the buffer size we process is a multiple of the fixed record size
  1088. return (got / fixedSize) * fixedSize;
  1089. }
  1090. static size32_t getBufferSize(size32_t fixedSize)
  1091. {
  1092. // Calculate appropriate buffer size for fixed size record processors
  1093. assert(fixedSize);
  1094. unsigned recordsPerBuffer = diskReadBufferSize / fixedSize;
  1095. if (!recordsPerBuffer)
  1096. recordsPerBuffer = 1;
  1097. return fixedSize * recordsPerBuffer;
  1098. }
  1099. public:
  1100. IMPLEMENT_IINTERFACE;
  1101. RecordProcessor(IInMemoryIndexCursor *_cursor) : cursor(_cursor)
  1102. {
  1103. aborted = false;
  1104. endRec = NULL;
  1105. }
  1106. virtual void abort()
  1107. {
  1108. aborted = true;
  1109. endRec = NULL; // speeds up the abort in some of the derived classes
  1110. }
  1111. };
  1112. //================================================================================================
  1113. // Base class for all varieties of RecordProcessor used by disk read activity
  1114. class ReadRecordProcessor : public RecordProcessor
  1115. {
  1116. protected:
  1117. CRoxieDiskReadActivity &owner;
  1118. IHThorDiskReadArg *helper;
  1119. public:
  1120. ReadRecordProcessor(IInMemoryIndexCursor *_cursor, CRoxieDiskReadActivity &_owner)
  1121. : RecordProcessor(_cursor), owner(_owner)
  1122. {
  1123. helper = _owner.helper;
  1124. }
  1125. };
  1126. // Used by disk read when an in-memory index is available
  1127. class KeyedRecordProcessor : public ReadRecordProcessor
  1128. {
  1129. bool resent;
  1130. public:
  1131. KeyedRecordProcessor(IInMemoryIndexCursor *_cursor, CRoxieDiskReadActivity &_owner, bool _resent) : ReadRecordProcessor(_cursor, _owner)
  1132. {
  1133. resent = _resent;
  1134. helper->setCallback(cursor);
  1135. }
  1136. virtual void doQuery(IMessagePacker *output, unsigned processed, unsigned __int64 rowLimit, unsigned __int64 stopAfter)
  1137. {
  1138. // doQuery needs to be as fast as possible - we are making a virtual call to it per query in order to avoid tests of loop-invariants within it
  1139. unsigned totalSizeSent = 0;
  1140. IInMemoryIndexCursor *lc = cursor;
  1141. if (!resent)
  1142. lc->reset();
  1143. bool continuationFailed = false;
  1144. while (!aborted)
  1145. {
  1146. const void *nextCandidate = lc->nextMatch();
  1147. if (!nextCandidate)
  1148. break;
  1149. unsigned transformedSize = owner.doTransform(output, nextCandidate);
  1150. if (transformedSize)
  1151. {
  1152. processed++;
  1153. if (processed > rowLimit)
  1154. {
  1155. owner.limitExceeded();
  1156. break;
  1157. }
  1158. if (processed == stopAfter)
  1159. break;
  1160. totalSizeSent += transformedSize;
  1161. if (totalSizeSent > indexReadChunkSize && !continuationFailed)
  1162. {
  1163. MemoryBuffer si;
  1164. unsigned short siLen = 0;
  1165. si.append(siLen);
  1166. si.append(processed);
  1167. si.append(true); // using a key
  1168. lc->serializeCursorPos(si);
  1169. if (si.length() <= maxContinuationSize)
  1170. {
  1171. siLen = si.length() - sizeof(siLen);
  1172. si.writeDirect(0, sizeof(siLen), &siLen);
  1173. output->sendMetaInfo(si.toByteArray(), si.length());
  1174. return;
  1175. }
  1176. else
  1177. continuationFailed = true;
  1178. }
  1179. }
  1180. }
  1181. }
  1182. };
  1183. IInMemoryFileProcessor *createKeyedRecordProcessor(IInMemoryIndexCursor *cursor, CRoxieDiskReadActivity &owner, bool resent)
  1184. {
  1185. return new KeyedRecordProcessor(cursor, owner, resent);
  1186. }
  1187. // Used by disk read when an in-memory index is NOT available
  1188. // We use different variants for fixed versus variable sized records, in order to make the fixed version as fast as possible
  1189. class UnkeyedRecordProcessor : public ReadRecordProcessor
  1190. {
  1191. protected:
  1192. Owned<IDirectReader> reader;
  1193. public:
  1194. IMPLEMENT_IINTERFACE;
  1195. UnkeyedRecordProcessor(IInMemoryIndexCursor *_cursor, CRoxieDiskReadActivity &_owner, IDirectReader *_reader)
  1196. : ReadRecordProcessor(_cursor, _owner), reader(_reader)
  1197. {
  1198. }
  1199. virtual void doQuery(IMessagePacker *output, unsigned processed, unsigned __int64 rowLimit, unsigned __int64 stopAfter)
  1200. {
  1201. unsigned totalSizeSent = 0;
  1202. helper->setCallback(reader->queryThorDiskCallback());
  1203. size32_t recordSize = owner.diskSize.getFixedSize();
  1204. size32_t bufferSize = getBufferSize(recordSize);
  1205. while (!aborted && !reader->eos())
  1206. {
  1207. size32_t gotSize;
  1208. const char *firstRec = (const char *) reader->peek(bufferSize, gotSize);
  1209. if (!gotSize)
  1210. break;
  1211. gotSize = roundDown(gotSize, recordSize);
  1212. const char *nextRec = firstRec;
  1213. endRec = firstRec + gotSize;
  1214. while (nextRec < endRec)
  1215. {
  1216. size32_t transformedSize;
  1217. if (cursor && cursor->isFiltered(nextRec))
  1218. transformedSize = 0;
  1219. else
  1220. transformedSize = owner.doTransform(output, nextRec);
  1221. nextRec += recordSize;
  1222. if (transformedSize)
  1223. {
  1224. processed++;
  1225. if (processed > rowLimit)
  1226. {
  1227. owner.limitExceeded();
  1228. return;
  1229. }
  1230. if (processed == stopAfter)
  1231. return;
  1232. totalSizeSent += transformedSize;
  1233. if (totalSizeSent > indexReadChunkSize)
  1234. {
  1235. MemoryBuffer si;
  1236. unsigned short siLen = 0;
  1237. si.append(siLen);
  1238. si.append(processed);
  1239. si.append(false); // not using a key
  1240. offset_t readPos = reader->tell() + (nextRec - firstRec);
  1241. si.append(readPos);
  1242. siLen = si.length() - sizeof(siLen);
  1243. si.writeDirect(0, sizeof(siLen), &siLen);
  1244. output->sendMetaInfo(si.toByteArray(), si.length());
  1245. return;
  1246. }
  1247. }
  1248. }
  1249. reader->skip(gotSize);
  1250. }
  1251. }
  1252. };
  1253. class UnkeyedVariableRecordProcessor : public UnkeyedRecordProcessor
  1254. {
  1255. public:
  1256. UnkeyedVariableRecordProcessor(IInMemoryIndexCursor *_cursor, CRoxieDiskReadActivity &_owner, IDirectReader *_reader)
  1257. : UnkeyedRecordProcessor(_cursor, _owner, _reader), deserializeSource(_reader)
  1258. {
  1259. prefetcher.setown(owner.diskSize.queryOriginal()->createDiskPrefetcher(owner.queryContext->queryCodeContext(), owner.basefactory->queryId()));
  1260. }
  1261. virtual void doQuery(IMessagePacker *output, unsigned processed, unsigned __int64 rowLimit, unsigned __int64 stopAfter)
  1262. {
  1263. unsigned totalSizeSent = 0;
  1264. helper->setCallback(reader->queryThorDiskCallback());
  1265. while (!aborted && !deserializeSource.eos())
  1266. {
  1267. // This loop is the inner loop for memory diskreads - so keep it efficient!
  1268. prefetcher->readAhead(deserializeSource);
  1269. const byte *nextRec = deserializeSource.queryRow();
  1270. size32_t transformedSize;
  1271. if (cursor && cursor->isFiltered(nextRec))
  1272. transformedSize = 0;
  1273. else
  1274. transformedSize = owner.doTransform(output, nextRec);
  1275. deserializeSource.finishedRow();
  1276. if (transformedSize)
  1277. {
  1278. processed++;
  1279. if (processed > rowLimit)
  1280. {
  1281. owner.limitExceeded();
  1282. return;
  1283. }
  1284. if (processed == stopAfter)
  1285. return;
  1286. totalSizeSent += transformedSize;
  1287. if (totalSizeSent > indexReadChunkSize)
  1288. {
  1289. MemoryBuffer si;
  1290. unsigned short siLen = 0;
  1291. si.append(siLen);
  1292. si.append(processed);
  1293. si.append(false); // not using a key
  1294. offset_t readPos = deserializeSource.tell();
  1295. si.append(readPos);
  1296. siLen = si.length() - sizeof(siLen);
  1297. si.writeDirect(0, sizeof(siLen), &siLen);
  1298. output->sendMetaInfo(si.toByteArray(), si.length());
  1299. return;
  1300. }
  1301. }
  1302. }
  1303. }
  1304. protected:
  1305. CThorContiguousRowBuffer deserializeSource;
  1306. Owned<ISourceRowPrefetcher> prefetcher;
  1307. };
  1308. IInMemoryFileProcessor *createUnkeyedRecordProcessor(IInMemoryIndexCursor *cursor, CRoxieDiskReadActivity &owner, bool variableDisk, IDirectReader *_reader)
  1309. {
  1310. if (variableDisk)
  1311. return new UnkeyedVariableRecordProcessor(cursor, owner, _reader);
  1312. else
  1313. return new UnkeyedRecordProcessor(cursor, owner, _reader);
  1314. }
  1315. //================================================================================================
  1316. // RecordProcessor used by CSV read activity. We don't try to index these or optimize fixed size cases...
  1317. class CsvRecordProcessor : public RecordProcessor
  1318. {
  1319. protected:
  1320. CRoxieCsvReadActivity &owner;
  1321. IHThorCsvReadArg *helper;
  1322. Owned<IDirectReader> reader;
  1323. bool skipHeader;
  1324. const IResolvedFile *datafile;
  1325. public:
  1326. IMPLEMENT_IINTERFACE;
  1327. CsvRecordProcessor(CRoxieCsvReadActivity &_owner, IDirectReader *_reader, bool _skipHeader, const IResolvedFile *_datafile)
  1328. : RecordProcessor(NULL), owner(_owner), reader(_reader), datafile(_datafile)
  1329. {
  1330. helper = _owner.helper;
  1331. skipHeader = _skipHeader;
  1332. helper->setCallback(reader->queryThorDiskCallback());
  1333. }
  1334. virtual void doQuery(IMessagePacker *output, unsigned processed, unsigned __int64 rowLimit, unsigned __int64 stopAfter)
  1335. {
  1336. unsigned totalSizeSent = 0;
  1337. ICsvParameters * csvInfo = helper->queryCsvParameters();
  1338. unsigned headerLines = skipHeader ? csvInfo->queryHeaderLen() : 0;
  1339. const char *quotes = NULL;
  1340. const char *separators = NULL;
  1341. const char *terminators = NULL;
  1342. const char *escapes = NULL;
  1343. CSVSplitter csvSplitter;
  1344. if (datafile)
  1345. {
  1346. const IPropertyTree *options = datafile->queryProperties();
  1347. if (options)
  1348. {
  1349. quotes = options->queryProp("@csvQuote");
  1350. separators = options->queryProp("@csvSeparate");
  1351. terminators = options->queryProp("@csvTerminate");
  1352. escapes = options->queryProp("@csvEscape");
  1353. }
  1354. }
  1355. csvSplitter.init(helper->getMaxColumns(), csvInfo, quotes, separators, terminators, escapes);
  1356. while (!aborted)
  1357. {
  1358. // MORE - there are rumours of a csvSplitter that operates on a stream... if/when it exists, this should use it
  1359. if (reader->eos())
  1360. {
  1361. break;
  1362. }
  1363. size32_t rowSize = 4096; // MORE - make configurable
  1364. size32_t maxRowSize = 10*1024*1024; // MORE - make configurable
  1365. size32_t thisLineLength;
  1366. loop
  1367. {
  1368. size32_t avail;
  1369. const void *peek = reader->peek(rowSize, avail);
  1370. thisLineLength = csvSplitter.splitLine(avail, (const byte *)peek);
  1371. if (thisLineLength < rowSize || avail < rowSize)
  1372. break;
  1373. if (rowSize == maxRowSize)
  1374. throw MakeStringException(0, "Row too big");
  1375. if (rowSize >= maxRowSize/2)
  1376. rowSize = maxRowSize;
  1377. else
  1378. rowSize += rowSize;
  1379. }
  1380. if (!thisLineLength)
  1381. break;
  1382. if (headerLines)
  1383. {
  1384. headerLines--;
  1385. reader->skip(thisLineLength);
  1386. }
  1387. else
  1388. {
  1389. unsigned transformedSize = owner.doTransform(output, csvSplitter.queryLengths(), (const char * *)csvSplitter.queryData());
  1390. reader->skip(thisLineLength);
  1391. if (transformedSize)
  1392. {
  1393. processed++;
  1394. if (processed > rowLimit)
  1395. {
  1396. owner.limitExceeded();
  1397. return;
  1398. }
  1399. if (processed == stopAfter)
  1400. return;
  1401. totalSizeSent += transformedSize;
  1402. if (totalSizeSent > indexReadChunkSize)
  1403. {
  1404. MemoryBuffer si;
  1405. unsigned short siLen = 0;
  1406. si.append(siLen);
  1407. si.append(processed);
  1408. si.append(false); // not using a key
  1409. offset_t readPos = reader->tell();
  1410. si.append(readPos);
  1411. siLen = si.length() - sizeof(siLen);
  1412. si.writeDirect(0, sizeof(siLen), &siLen);
  1413. output->sendMetaInfo(si.toByteArray(), si.length());
  1414. return;
  1415. }
  1416. }
  1417. }
  1418. }
  1419. }
  1420. };
  1421. //================================================================================================
  1422. // RecordProcessor used by XML read activity. We don't try to index these or optimize fixed size cases...
  1423. class XmlRecordProcessor : public RecordProcessor, implements IXMLSelect
  1424. {
  1425. public:
  1426. IMPLEMENT_IINTERFACE;
  1427. XmlRecordProcessor(CRoxieXmlReadActivity &_owner, IDirectReader *_reader)
  1428. : RecordProcessor(NULL), owner(_owner), reader(_reader)
  1429. {
  1430. helper = _owner.helper;
  1431. helper->setCallback(reader->queryThorDiskCallback());
  1432. }
  1433. virtual void match(IColumnProvider &entry, offset_t startOffset, offset_t endOffset)
  1434. {
  1435. lastMatch.set(&entry);
  1436. }
  1437. virtual void doQuery(IMessagePacker *output, unsigned processed, unsigned __int64 rowLimit, unsigned __int64 stopAfter)
  1438. {
  1439. #if 0
  1440. // xml read does not support continuation record stuff as too hard to serialize state of xml parser
  1441. unsigned totalSizeSent = 0;
  1442. #endif
  1443. Linked<IXmlToRowTransformer> rowTransformer = helper->queryTransformer();
  1444. OwnedRoxieString xmlIterator(helper->getXmlIteratorPath());
  1445. Owned<IXMLParse> xmlParser = createXMLParse(*reader->querySimpleStream(), xmlIterator, *this, (0 != (TDRxmlnoroot & helper->getFlags()))?ptr_noRoot:ptr_none, (helper->getFlags() & TDRusexmlcontents) != 0);
  1446. while (!aborted)
  1447. {
  1448. //call to next() will callback on the IXmlSelect interface
  1449. bool gotNext = false;
  1450. gotNext = xmlParser->next();
  1451. if(!gotNext)
  1452. break;
  1453. else if (lastMatch)
  1454. {
  1455. unsigned transformedSize = owner.doTransform(output, rowTransformer, lastMatch, reader->queryThorDiskCallback());
  1456. lastMatch.clear();
  1457. if (transformedSize)
  1458. {
  1459. processed++;
  1460. if (processed > rowLimit)
  1461. {
  1462. owner.limitExceeded();
  1463. return;
  1464. }
  1465. if (processed == stopAfter)
  1466. return;
  1467. #if 0
  1468. // xml read does not support continuation record stuff as too hard to serialize state of xml parser
  1469. totalSizeSent += transformedSize;
  1470. if (totalSizeSent > indexReadChunkSize)
  1471. {
  1472. MemoryBuffer si;
  1473. unsigned short siLen = 0;
  1474. si.append(siLen);
  1475. si.append(processed);
  1476. si.append(false); // not using a key
  1477. readPos = inputFileIOStream->tell();
  1478. si.append(readPos);
  1479. siLen = si.length() - sizeof(siLen);
  1480. si.writeDirect(0, sizeof(siLen), &siLen);
  1481. output->sendMetaInfo(si.toByteArray(), si.length());
  1482. return;
  1483. }
  1484. #endif
  1485. }
  1486. }
  1487. }
  1488. }
  1489. protected:
  1490. CRoxieXmlReadActivity &owner;
  1491. IHThorXmlReadArg *helper;
  1492. Owned<IColumnProvider> lastMatch;
  1493. Owned<IDirectReader> reader;
  1494. };
  1495. IInMemoryFileProcessor *createCsvRecordProcessor(CRoxieCsvReadActivity &owner, IDirectReader *_reader, bool _skipHeader, const IResolvedFile *datafile)
  1496. {
  1497. return new CsvRecordProcessor(owner, _reader, _skipHeader, datafile);
  1498. }
  1499. IInMemoryFileProcessor *createXmlRecordProcessor(CRoxieXmlReadActivity &owner, IDirectReader *_reader)
  1500. {
  1501. return new XmlRecordProcessor(owner, _reader);
  1502. }
  1503. ISlaveActivityFactory *createRoxieCsvReadActivityFactory(IPropertyTree &_graphNode, unsigned _subgraphId, IQueryFactory &_queryFactory, HelperFactory *_helperFactory)
  1504. {
  1505. return new CRoxieCsvReadActivityFactory(_graphNode, _subgraphId, _queryFactory, _helperFactory);
  1506. }
  1507. ISlaveActivityFactory *createRoxieXmlReadActivityFactory(IPropertyTree &_graphNode, unsigned _subgraphId, IQueryFactory &_queryFactory, HelperFactory *_helperFactory)
  1508. {
  1509. return new CRoxieXmlReadActivityFactory(_graphNode, _subgraphId, _queryFactory, _helperFactory);
  1510. }
  1511. ISlaveActivityFactory *createRoxieDiskReadActivityFactory(IPropertyTree &_graphNode, unsigned _subgraphId, IQueryFactory &_queryFactory, HelperFactory *_helperFactory)
  1512. {
  1513. return new CRoxieDiskReadActivityFactory(_graphNode, _subgraphId, _queryFactory, _helperFactory);
  1514. }
  1515. //================================================================================================
  1516. class CRoxieDiskNormalizeActivity;
  1517. IInMemoryFileProcessor *createKeyedNormalizeRecordProcessor(IInMemoryIndexCursor *cursor, CRoxieDiskNormalizeActivity &owner, bool resent);
  1518. IInMemoryFileProcessor *createUnkeyedNormalizeRecordProcessor(IInMemoryIndexCursor *cursor, CRoxieDiskNormalizeActivity &owner, IDirectReader *reader);
  1519. class CRoxieDiskNormalizeActivity : public CRoxieDiskReadBaseActivity
  1520. {
  1521. friend class NormalizeRecordProcessor;
  1522. protected:
  1523. IHThorDiskNormalizeArg *helper;
  1524. public:
  1525. IMPLEMENT_IINTERFACE;
  1526. CRoxieDiskNormalizeActivity(SlaveContextLogger &_logctx, IRoxieQueryPacket *_packet, HelperFactory *_hFactory, const CSlaveActivityFactory *_aFactory,
  1527. IInMemoryIndexManager *_manager)
  1528. : CRoxieDiskReadBaseActivity(_logctx, _packet, _hFactory, _aFactory, _manager, 0, 1, false)
  1529. {
  1530. onCreate();
  1531. helper = (IHThorDiskNormalizeArg *) basehelper;
  1532. }
  1533. virtual StringBuffer &toString(StringBuffer &ret) const
  1534. {
  1535. return ret.appendf("DiskNormalize %u", packet->queryHeader().activityId);
  1536. }
  1537. virtual void doProcess(IMessagePacker * output)
  1538. {
  1539. {
  1540. CriticalBlock p(pcrit);
  1541. processor.setown(isKeyed ?
  1542. createKeyedNormalizeRecordProcessor(cursor, *this, resent) :
  1543. createUnkeyedNormalizeRecordProcessor(cursor, *this, manager->createReader(readPos, parallelPartNo, numParallel)));
  1544. }
  1545. unsigned __int64 rowLimit = helper->getRowLimit();
  1546. unsigned __int64 stopAfter = helper->getChooseNLimit();
  1547. processor->doQuery(output, processed, rowLimit, stopAfter);
  1548. }
  1549. size32_t doNormalizeTransform(IMessagePacker * output) const
  1550. {
  1551. OptimizedRowBuilder rowBuilder(rowAllocator, meta, output, serializer);
  1552. unsigned transformedSize = helper->transform(rowBuilder);
  1553. return rowBuilder.writeToOutput(transformedSize, false);
  1554. }
  1555. };
  1556. class CRoxieDiskNormalizeActivityFactory : public CRoxieDiskBaseActivityFactory
  1557. {
  1558. public:
  1559. IMPLEMENT_IINTERFACE;
  1560. CRoxieDiskNormalizeActivityFactory(IPropertyTree &_graphNode, unsigned _subgraphId, IQueryFactory &_queryFactory, HelperFactory *_helperFactory)
  1561. : CRoxieDiskBaseActivityFactory(_graphNode, _subgraphId, _queryFactory, _helperFactory)
  1562. {
  1563. }
  1564. virtual IRoxieSlaveActivity *createActivity(SlaveContextLogger &logctx, IRoxieQueryPacket *packet) const
  1565. {
  1566. return new CRoxieDiskNormalizeActivity(logctx, packet, helperFactory, this, manager);
  1567. }
  1568. virtual StringBuffer &toString(StringBuffer &s) const
  1569. {
  1570. return CSlaveActivityFactory::toString(s.append("DiskNormalize "));
  1571. }
  1572. };
  1573. ISlaveActivityFactory *createRoxieDiskNormalizeActivityFactory(IPropertyTree &_graphNode, unsigned _subgraphId, IQueryFactory &_queryFactory, HelperFactory *_helperFactory)
  1574. {
  1575. return new CRoxieDiskNormalizeActivityFactory(_graphNode, _subgraphId, _queryFactory, _helperFactory);
  1576. }
  1577. //================================================================================================
  1578. // RecordProcessors used by Disk Normalize activity.
  1579. class NormalizeRecordProcessor : public RecordProcessor
  1580. {
  1581. public:
  1582. NormalizeRecordProcessor(IInMemoryIndexCursor *_cursor, CRoxieDiskNormalizeActivity &_owner)
  1583. : RecordProcessor(_cursor), owner(_owner)
  1584. {
  1585. helper = _owner.helper;
  1586. }
  1587. protected:
  1588. CRoxieDiskNormalizeActivity &owner;
  1589. IHThorDiskNormalizeArg *helper;
  1590. };
  1591. // Used when we have an in-memory key that matches at least some of the filter conditions
  1592. class KeyedNormalizeRecordProcessor : public NormalizeRecordProcessor
  1593. {
  1594. public:
  1595. KeyedNormalizeRecordProcessor(IInMemoryIndexCursor *_cursor, CRoxieDiskNormalizeActivity &_owner, bool _resent)
  1596. : NormalizeRecordProcessor(_cursor, _owner)
  1597. {
  1598. resent = _resent;
  1599. helper->setCallback(cursor);
  1600. }
  1601. virtual void doQuery(IMessagePacker *output, unsigned processed, unsigned __int64 rowLimit, unsigned __int64 stopAfter)
  1602. {
  1603. // doQuery needs to be as fast as possible - we are making a virtual call to it per query in order to avoid tests of loop-invariants within it
  1604. unsigned totalSizeSent = 0;
  1605. IInMemoryIndexCursor *lc = cursor;
  1606. if (!resent)
  1607. lc->reset();
  1608. bool continuationFailed = false;
  1609. while (!aborted)
  1610. {
  1611. const void *nextCandidate = lc->nextMatch();
  1612. if (!nextCandidate)
  1613. break;
  1614. if (helper->first(nextCandidate))
  1615. {
  1616. do
  1617. {
  1618. size32_t transformedSize = owner.doNormalizeTransform(output);
  1619. if (transformedSize)
  1620. {
  1621. processed++;
  1622. if (processed > rowLimit)
  1623. {
  1624. owner.limitExceeded();
  1625. return;
  1626. }
  1627. totalSizeSent += transformedSize;
  1628. if (processed == stopAfter)
  1629. return;
  1630. }
  1631. } while (helper->next());
  1632. if (totalSizeSent > indexReadChunkSize && !continuationFailed)
  1633. {
  1634. MemoryBuffer si;
  1635. unsigned short siLen = 0;
  1636. si.append(siLen);
  1637. si.append(processed);
  1638. si.append(true); // using a key
  1639. lc->serializeCursorPos(si);
  1640. if (si.length() <= maxContinuationSize)
  1641. {
  1642. siLen = si.length() - sizeof(siLen);
  1643. si.writeDirect(0, sizeof(siLen), &siLen);
  1644. output->sendMetaInfo(si.toByteArray(), si.length());
  1645. return;
  1646. }
  1647. else
  1648. continuationFailed = true;
  1649. }
  1650. }
  1651. }
  1652. }
  1653. private:
  1654. bool resent;
  1655. };
  1656. // Used when we have no key
  1657. // Not split into variable vs fixed varieties (unlike others). We could if there was a demand
  1658. class UnkeyedNormalizeRecordProcessor : public NormalizeRecordProcessor
  1659. {
  1660. public:
  1661. IMPLEMENT_IINTERFACE;
  1662. UnkeyedNormalizeRecordProcessor(IInMemoryIndexCursor *_cursor, CRoxieDiskNormalizeActivity &_owner, IDirectReader *_reader)
  1663. : NormalizeRecordProcessor(_cursor, _owner), reader(_reader), deserializeSource(_reader)
  1664. {
  1665. prefetcher.setown(owner.diskSize.queryOriginal()->createDiskPrefetcher(owner.queryContext->queryCodeContext(), owner.basefactory->queryId()));
  1666. }
  1667. virtual void doQuery(IMessagePacker *output, unsigned processed, unsigned __int64 rowLimit, unsigned __int64 stopAfter)
  1668. {
  1669. unsigned totalSizeSent = 0;
  1670. helper->setCallback(reader->queryThorDiskCallback());
  1671. while (!aborted && !deserializeSource.eos())
  1672. {
  1673. prefetcher->readAhead(deserializeSource);
  1674. const byte *nextRec = deserializeSource.queryRow();
  1675. if (!cursor || !cursor->isFiltered(nextRec))
  1676. {
  1677. if (helper->first(nextRec))
  1678. {
  1679. do
  1680. {
  1681. size32_t transformedSize = owner.doNormalizeTransform(output);
  1682. if (transformedSize)
  1683. {
  1684. processed++;
  1685. if (processed > rowLimit)
  1686. {
  1687. owner.limitExceeded();
  1688. return;
  1689. }
  1690. totalSizeSent += transformedSize;
  1691. if (processed == stopAfter)
  1692. return;
  1693. }
  1694. } while (helper->next());
  1695. }
  1696. }
  1697. deserializeSource.finishedRow();
  1698. if (totalSizeSent > indexReadChunkSize)
  1699. {
  1700. MemoryBuffer si;
  1701. unsigned short siLen = 0;
  1702. si.append(siLen);
  1703. si.append(processed);
  1704. si.append(false); // not using a key
  1705. offset_t readPos = deserializeSource.tell();
  1706. si.append(readPos);
  1707. siLen = si.length() - sizeof(siLen);
  1708. si.writeDirect(0, sizeof(siLen), &siLen);
  1709. output->sendMetaInfo(si.toByteArray(), si.length());
  1710. return;
  1711. }
  1712. }
  1713. }
  1714. protected:
  1715. Owned<IDirectReader> reader;
  1716. CThorContiguousRowBuffer deserializeSource;
  1717. Owned<ISourceRowPrefetcher> prefetcher;
  1718. };
  1719. IInMemoryFileProcessor *createKeyedNormalizeRecordProcessor(IInMemoryIndexCursor *cursor, CRoxieDiskNormalizeActivity &owner, bool resent)
  1720. {
  1721. return new KeyedNormalizeRecordProcessor(cursor, owner, resent);
  1722. }
  1723. IInMemoryFileProcessor *createUnkeyedNormalizeRecordProcessor(IInMemoryIndexCursor *cursor, CRoxieDiskNormalizeActivity &owner, IDirectReader *_reader)
  1724. {
  1725. return new UnkeyedNormalizeRecordProcessor(cursor, owner, _reader);
  1726. }
  1727. //================================================================================================
  1728. class CRoxieDiskCountActivity;
  1729. IInMemoryFileProcessor *createKeyedCountRecordProcessor(IInMemoryIndexCursor *cursor, CRoxieDiskCountActivity &owner);
  1730. IInMemoryFileProcessor *createUnkeyedCountRecordProcessor(IInMemoryIndexCursor *cursor, CRoxieDiskCountActivity &owner, bool variableDisk, IDirectReader *reader);
  1731. class CRoxieDiskCountActivity : public CRoxieDiskReadBaseActivity
  1732. {
  1733. friend class CountRecordProcessor;
  1734. protected:
  1735. IHThorDiskCountArg *helper;
  1736. public:
  1737. IMPLEMENT_IINTERFACE;
  1738. CRoxieDiskCountActivity(SlaveContextLogger &_logctx, IRoxieQueryPacket *_packet, HelperFactory *_hFactory, const CSlaveActivityFactory *_aFactory,
  1739. IInMemoryIndexManager *_manager)
  1740. : CRoxieDiskReadBaseActivity(_logctx, _packet, _hFactory, _aFactory, _manager, 0, 1, false)
  1741. {
  1742. onCreate();
  1743. helper = (IHThorDiskCountArg *) basehelper;
  1744. }
  1745. virtual StringBuffer &toString(StringBuffer &ret) const
  1746. {
  1747. return ret.appendf("DiskCount %u", packet->queryHeader().activityId);
  1748. }
  1749. virtual void doProcess(IMessagePacker * output)
  1750. {
  1751. {
  1752. CriticalBlock p(pcrit);
  1753. processor.setown(isKeyed ?
  1754. createKeyedCountRecordProcessor(cursor, *this) :
  1755. createUnkeyedCountRecordProcessor(cursor, *this, diskSize.isVariableSize(), manager->createReader(readPos, parallelPartNo, numParallel)));
  1756. }
  1757. unsigned __int64 stopAfter = helper->getChooseNLimit();
  1758. processor->doQuery(output, processed, (unsigned __int64) -1, stopAfter);
  1759. }
  1760. };
  1761. class CRoxieDiskCountActivityFactory : public CRoxieDiskBaseActivityFactory
  1762. {
  1763. public:
  1764. IMPLEMENT_IINTERFACE;
  1765. CRoxieDiskCountActivityFactory(IPropertyTree &_graphNode, unsigned _subgraphId, IQueryFactory &_queryFactory, HelperFactory *_helperFactory)
  1766. : CRoxieDiskBaseActivityFactory(_graphNode, _subgraphId, _queryFactory, _helperFactory)
  1767. {
  1768. }
  1769. virtual IRoxieSlaveActivity *createActivity(SlaveContextLogger &logctx, IRoxieQueryPacket *packet) const
  1770. {
  1771. return new CRoxieDiskCountActivity(logctx, packet, helperFactory, this, manager);
  1772. }
  1773. virtual StringBuffer &toString(StringBuffer &s) const
  1774. {
  1775. return CSlaveActivityFactory::toString(s.append("DiskRead "));
  1776. }
  1777. };
  1778. //================================================================================================
  1779. // RecordProcessors used by Disk Normalize activity.
  1780. // Note - the classes below could be commoned up to make the code smaller, but they have been deliberately unrolled to
  1781. // keep to a bare minimum the number of virtual calls/variable tests per record scanned. This is very speed critical.
  1782. class CountRecordProcessor : public RecordProcessor
  1783. {
  1784. public:
  1785. CountRecordProcessor(IInMemoryIndexCursor *_cursor, CRoxieDiskCountActivity &_owner)
  1786. : RecordProcessor(_cursor), owner(_owner)
  1787. {
  1788. helper = _owner.helper;
  1789. }
  1790. protected:
  1791. CRoxieDiskCountActivity &owner;
  1792. IHThorDiskCountArg *helper;
  1793. };
  1794. // Used when we have an in-memory key that matches at least some of the filter conditions
  1795. class KeyedCountRecordProcessor : public CountRecordProcessor
  1796. {
  1797. public:
  1798. KeyedCountRecordProcessor(IInMemoryIndexCursor *_cursor, CRoxieDiskCountActivity &_owner) : CountRecordProcessor(_cursor, _owner)
  1799. {
  1800. helper->setCallback(cursor);
  1801. }
  1802. virtual void doQuery(IMessagePacker *output, unsigned processed, unsigned __int64 rowLimit, unsigned __int64 stopAfter)
  1803. {
  1804. // doQuery needs to be as fast as possible - we are making a virtual call to it per query in order to avoid tests of loop-invariants within it
  1805. unsigned recordSize = owner.meta.getFixedSize();
  1806. void *recBuffer = output->getBuffer(recordSize, false);
  1807. IInMemoryIndexCursor *lc = cursor;
  1808. lc->reset();
  1809. unsigned __int64 totalCount = 0;
  1810. while (!aborted)
  1811. {
  1812. const void *nextCandidate = lc->nextMatch();
  1813. if (!nextCandidate)
  1814. break;
  1815. totalCount += helper->numValid(nextCandidate);
  1816. if (totalCount >= stopAfter)
  1817. {
  1818. totalCount = stopAfter;
  1819. break;
  1820. }
  1821. }
  1822. if (!aborted)
  1823. {
  1824. assert(!owner.serializer); // A count can never need serializing, surely!
  1825. if (recordSize == 1)
  1826. *(byte *)recBuffer = (byte)totalCount;
  1827. else
  1828. {
  1829. assertex(recordSize == sizeof(unsigned __int64));
  1830. *(unsigned __int64 *)recBuffer = totalCount;
  1831. }
  1832. output->putBuffer(recBuffer, recordSize, false);
  1833. }
  1834. }
  1835. };
  1836. IInMemoryFileProcessor *createKeyedCountRecordProcessor(IInMemoryIndexCursor *cursor, CRoxieDiskCountActivity &owner)
  1837. {
  1838. return new KeyedCountRecordProcessor(cursor, owner);
  1839. }
  1840. // Used when there is no key, fixed records
  1841. class UnkeyedCountRecordProcessor : public CountRecordProcessor
  1842. {
  1843. protected:
  1844. Owned<IDirectReader> reader;
  1845. public:
  1846. UnkeyedCountRecordProcessor(IInMemoryIndexCursor *_cursor, CRoxieDiskCountActivity &_owner, IDirectReader *_reader)
  1847. : CountRecordProcessor(_cursor, _owner), reader(_reader)
  1848. {
  1849. }
  1850. // This version is used for fixed size rows only - variable size rows use more derived class which overrides
  1851. virtual void doQuery(IMessagePacker *output, unsigned processed, unsigned __int64 rowLimit, unsigned __int64 stopAfter)
  1852. {
  1853. unsigned outputRecordSize = owner.meta.getFixedSize();
  1854. void *recBuffer = output->getBuffer(outputRecordSize, false);
  1855. helper->setCallback(reader->queryThorDiskCallback());
  1856. unsigned __int64 totalCount = 0;
  1857. size32_t recordSize = owner.diskSize.getFixedSize();
  1858. size32_t bufferSize = getBufferSize(recordSize);
  1859. while (!aborted && !reader->eos())
  1860. {
  1861. size32_t gotSize;
  1862. const char *nextRec = (const char *) reader->peek(bufferSize, gotSize);
  1863. if (!gotSize)
  1864. break;
  1865. gotSize = roundDown(gotSize, recordSize);
  1866. if (cursor)
  1867. {
  1868. const char *endRec = nextRec + gotSize;
  1869. loop
  1870. {
  1871. // This loop is the inner loop for memory disk counts - so keep it efficient!
  1872. if (nextRec >= endRec)
  1873. break;
  1874. if (!cursor->isFiltered(nextRec))
  1875. {
  1876. totalCount += helper->numValid(nextRec);
  1877. if (totalCount >= stopAfter)
  1878. break;
  1879. }
  1880. nextRec += recordSize;
  1881. }
  1882. }
  1883. else
  1884. totalCount += helper->numValid(gotSize, nextRec);
  1885. if (totalCount >= stopAfter)
  1886. {
  1887. totalCount = stopAfter;
  1888. break;
  1889. }
  1890. reader->skip(gotSize);
  1891. }
  1892. if (!aborted)
  1893. {
  1894. assert(!owner.serializer); // A count can never need serializing, surely!
  1895. if (outputRecordSize == 1)
  1896. *(byte *)recBuffer = (byte)totalCount;
  1897. else
  1898. {
  1899. assertex(outputRecordSize == sizeof(unsigned __int64));
  1900. *(unsigned __int64 *)recBuffer = totalCount;
  1901. }
  1902. output->putBuffer(recBuffer, outputRecordSize, false);
  1903. }
  1904. }
  1905. };
  1906. // Used when there is no key, variable records
  1907. class UnkeyedVariableCountRecordProcessor : public UnkeyedCountRecordProcessor
  1908. {
  1909. public:
  1910. UnkeyedVariableCountRecordProcessor(IInMemoryIndexCursor *_cursor, CRoxieDiskCountActivity &_owner, IDirectReader *_reader)
  1911. : UnkeyedCountRecordProcessor(_cursor, _owner, _reader), deserializeSource(reader)
  1912. {
  1913. prefetcher.setown(owner.diskSize.queryOriginal()->createDiskPrefetcher(owner.queryContext->queryCodeContext(), owner.basefactory->queryId()));
  1914. }
  1915. // This version is used for variable size rows
  1916. virtual void doQuery(IMessagePacker *output, unsigned processed, unsigned __int64 rowLimit, unsigned __int64 stopAfter)
  1917. {
  1918. unsigned outputRecordSize = owner.meta.getFixedSize();
  1919. void *recBuffer = output->getBuffer(outputRecordSize, false);
  1920. helper->setCallback(reader->queryThorDiskCallback());
  1921. unsigned __int64 totalCount = 0;
  1922. while (!aborted && !deserializeSource.eos())
  1923. {
  1924. prefetcher->readAhead(deserializeSource);
  1925. const byte *nextRec = deserializeSource.queryRow();
  1926. if (!cursor || !cursor->isFiltered(nextRec))
  1927. {
  1928. totalCount += helper->numValid(nextRec);
  1929. if (totalCount >= stopAfter)
  1930. {
  1931. totalCount = stopAfter;
  1932. break;
  1933. }
  1934. }
  1935. deserializeSource.finishedRow();
  1936. }
  1937. if (!aborted)
  1938. {
  1939. assert(!owner.serializer); // A count can never need serializing, surely!
  1940. if (outputRecordSize == 1)
  1941. *(byte *)recBuffer = (byte)totalCount;
  1942. else
  1943. {
  1944. assertex(outputRecordSize == sizeof(unsigned __int64));
  1945. *(unsigned __int64 *)recBuffer = totalCount;
  1946. }
  1947. output->putBuffer(recBuffer, outputRecordSize, false);
  1948. }
  1949. }
  1950. protected:
  1951. CThorContiguousRowBuffer deserializeSource;
  1952. Owned<ISourceRowPrefetcher> prefetcher;
  1953. };
  1954. IInMemoryFileProcessor *createUnkeyedCountRecordProcessor(IInMemoryIndexCursor *cursor, CRoxieDiskCountActivity &owner, bool variableDisk, IDirectReader *reader)
  1955. {
  1956. if (variableDisk)
  1957. return new UnkeyedVariableCountRecordProcessor(cursor, owner, reader);
  1958. else
  1959. return new UnkeyedCountRecordProcessor(cursor, owner, reader);
  1960. }
  1961. ISlaveActivityFactory *createRoxieDiskCountActivityFactory(IPropertyTree &_graphNode, unsigned _subgraphId, IQueryFactory &_queryFactory, HelperFactory *_helperFactory)
  1962. {
  1963. return new CRoxieDiskCountActivityFactory(_graphNode, _subgraphId, _queryFactory, _helperFactory);
  1964. }
  1965. //================================================================================================
  1966. class CRoxieDiskAggregateActivity;
  1967. IInMemoryFileProcessor *createKeyedAggregateRecordProcessor(IInMemoryIndexCursor *cursor, CRoxieDiskAggregateActivity &owner);
  1968. IInMemoryFileProcessor *createUnkeyedAggregateRecordProcessor(IInMemoryIndexCursor *cursor, CRoxieDiskAggregateActivity &owner, bool variableDisk, IDirectReader *reader);
  1969. class CRoxieDiskAggregateActivity : public CRoxieDiskReadBaseActivity
  1970. {
  1971. friend class AggregateRecordProcessor;
  1972. protected:
  1973. IHThorDiskAggregateArg *helper;
  1974. public:
  1975. IMPLEMENT_IINTERFACE;
  1976. CRoxieDiskAggregateActivity(SlaveContextLogger &_logctx, IRoxieQueryPacket *_packet, HelperFactory *_hFactory, const CSlaveActivityFactory *_aFactory,
  1977. IInMemoryIndexManager *_manager,
  1978. unsigned _parallelPartNo, unsigned _numParallel, bool _forceUnkeyed)
  1979. : CRoxieDiskReadBaseActivity(_logctx, _packet, _hFactory, _aFactory, _manager, _parallelPartNo, _numParallel, _forceUnkeyed)
  1980. {
  1981. onCreate();
  1982. helper = (IHThorDiskAggregateArg *) basehelper;
  1983. }
  1984. virtual bool needsRowAllocator()
  1985. {
  1986. return true;
  1987. }
  1988. virtual StringBuffer &toString(StringBuffer &ret) const
  1989. {
  1990. return ret.appendf("DiskAggregate %u", packet->queryHeader().activityId);
  1991. }
  1992. virtual void doProcess(IMessagePacker * output)
  1993. {
  1994. {
  1995. CriticalBlock p(pcrit);
  1996. processor.setown(isKeyed ? createKeyedAggregateRecordProcessor(cursor, *this) :
  1997. createUnkeyedAggregateRecordProcessor(cursor, *this, diskSize.isVariableSize(), manager->createReader(readPos, parallelPartNo, numParallel)));
  1998. }
  1999. processor->doQuery(output, 0, 0, 0);
  2000. }
  2001. };
  2002. //================================================================================================
  2003. class CParallelRoxieActivity : public CRoxieSlaveActivity
  2004. {
  2005. protected:
  2006. CIArrayOf<CRoxieDiskReadBaseActivity> parts;
  2007. unsigned numParallel;
  2008. CriticalSection parCrit;
  2009. Owned<IOutputRowDeserializer> deserializer;
  2010. public:
  2011. IMPLEMENT_IINTERFACE;
  2012. CParallelRoxieActivity(SlaveContextLogger &_logctx, IRoxieQueryPacket *_packet, HelperFactory *_hFactory, const CSlaveActivityFactory *_factory, unsigned _numParallel)
  2013. : CRoxieSlaveActivity(_logctx, _packet, _hFactory, _factory), numParallel(_numParallel)
  2014. {
  2015. assertex(numParallel > 1);
  2016. }
  2017. virtual void abort()
  2018. {
  2019. ForEachItemIn(idx, parts)
  2020. {
  2021. parts.item(idx).abort();
  2022. }
  2023. }
  2024. virtual void setPartNo(bool filechanged) { throwUnexpected(); }
  2025. virtual StringBuffer &toString(StringBuffer &ret) const
  2026. {
  2027. return parts.item(0).toString(ret);
  2028. }
  2029. virtual const char *queryDynamicFileName() const
  2030. {
  2031. throwUnexpected();
  2032. }
  2033. virtual void setVariableFileInfo()
  2034. {
  2035. throwUnexpected();
  2036. }
  2037. virtual void doProcess(IMessagePacker * output) = 0;
  2038. virtual void processRow(CDummyMessagePacker &output) = 0;
  2039. virtual bool process()
  2040. {
  2041. if (numParallel == 1)
  2042. {
  2043. return parts.item(0).process();
  2044. }
  2045. else
  2046. {
  2047. MTIME_SECTION(timer, "CParallelRoxieActivity::process");
  2048. atomic_inc(&diskReadStarted);
  2049. Owned<IMessagePacker> output = ROQ->createOutputStream(packet->queryHeader(), false, logctx);
  2050. class casyncfor: public CAsyncFor
  2051. {
  2052. CIArrayOf<CRoxieDiskReadBaseActivity> &parts;
  2053. CParallelRoxieActivity &parent;
  2054. public:
  2055. casyncfor(CIArrayOf<CRoxieDiskReadBaseActivity> &_parts, CParallelRoxieActivity &_parent)
  2056. : parts(_parts), parent(_parent)
  2057. {
  2058. }
  2059. void Do(unsigned i)
  2060. {
  2061. try
  2062. {
  2063. CDummyMessagePacker d;
  2064. parts.item(i).doProcess(&d);
  2065. d.flush(true);
  2066. parent.processRow(d);
  2067. }
  2068. catch (IException *)
  2069. {
  2070. // if one throws exception, may as well abort the rest
  2071. parent.abort();
  2072. throw;
  2073. }
  2074. }
  2075. } afor(parts, *this);
  2076. afor.For(numParallel, numParallel);
  2077. //for (unsigned i = 0; i < numParallel; i++) afor.Do(i); // use this instead of line above to make them serial - handy for debugging!
  2078. logctx.flush(true, aborted);
  2079. if (aborted)
  2080. {
  2081. output->abort();
  2082. return false;
  2083. }
  2084. else
  2085. {
  2086. doProcess(output);
  2087. output->flush(true);
  2088. atomic_inc(&diskReadCompleted);
  2089. return true;
  2090. }
  2091. return true;
  2092. }
  2093. }
  2094. };
  2095. class CParallelRoxieDiskAggregateActivity : public CParallelRoxieActivity
  2096. {
  2097. protected:
  2098. IHThorDiskAggregateArg *helper;
  2099. OwnedConstRoxieRow finalRow;
  2100. public:
  2101. IMPLEMENT_IINTERFACE;
  2102. CParallelRoxieDiskAggregateActivity(SlaveContextLogger &_logctx, IRoxieQueryPacket *_packet, HelperFactory *_hFactory, const CSlaveActivityFactory *_aFactory,
  2103. IInMemoryIndexManager *_manager, unsigned _numParallel) :
  2104. CParallelRoxieActivity(_logctx, _packet, _hFactory, _aFactory, _numParallel)
  2105. {
  2106. helper = (IHThorDiskAggregateArg *) basehelper;
  2107. onCreate();
  2108. if (meta.needsSerializeDisk())
  2109. {
  2110. // MORE - avoiding serializing to dummy would be more efficient...
  2111. deserializer.setown(meta.createDiskDeserializer(queryContext->queryCodeContext(), basefactory->queryId()));
  2112. }
  2113. CRoxieDiskAggregateActivity *part0 = new CRoxieDiskAggregateActivity(_logctx, _packet, _hFactory, _aFactory, _manager, 0, numParallel, false);
  2114. parts.append(*part0);
  2115. if (part0->queryKeyed())
  2116. {
  2117. numParallel = 1;
  2118. part0->setParallel(0, 1);
  2119. }
  2120. else
  2121. {
  2122. for (unsigned i = 1; i < numParallel; i++)
  2123. parts.append(*new CRoxieDiskAggregateActivity(_logctx, _packet, _hFactory, _aFactory, _manager, i, numParallel, true));
  2124. }
  2125. }
  2126. ~CParallelRoxieDiskAggregateActivity()
  2127. {
  2128. finalRow.clear();
  2129. }
  2130. virtual bool needsRowAllocator()
  2131. {
  2132. return true;
  2133. }
  2134. virtual void doProcess(IMessagePacker *output)
  2135. {
  2136. if (!aborted)
  2137. {
  2138. if (!finalRow)
  2139. {
  2140. RtlDynamicRowBuilder rowBuilder(rowAllocator);
  2141. size32_t size = helper->clearAggregate(rowBuilder);
  2142. finalRow.setown(rowBuilder.finalizeRowClear(size));
  2143. }
  2144. //GH-RKC: This can probably be cleaner and more efficient using the OptimizedRowBuilder class introduced since I fixed this code
  2145. if (serializer)
  2146. serializeRow(output, finalRow);
  2147. else
  2148. {
  2149. size32_t size = meta.getRecordSize(finalRow);
  2150. appendBuffer(output, size, finalRow, meta.isVariableSize());
  2151. }
  2152. }
  2153. finalRow.clear();
  2154. helper->setCallback(NULL);
  2155. }
  2156. virtual void processRow(CDummyMessagePacker &d)
  2157. {
  2158. CriticalBlock c(parCrit);
  2159. MemoryBuffer &m = d.data;
  2160. RtlDynamicRowBuilder finalBuilder(rowAllocator, false);
  2161. if (deserializer)
  2162. {
  2163. Owned<ISerialStream> stream = createMemoryBufferSerialStream(m);
  2164. CThorStreamDeserializerSource rowSource(stream);
  2165. while (m.remaining())
  2166. {
  2167. RecordLengthType *rowLen = (RecordLengthType *) m.readDirect(sizeof(RecordLengthType));
  2168. if (!*rowLen)
  2169. break;
  2170. RecordLengthType len = *rowLen;
  2171. RtlDynamicRowBuilder rowBuilder(rowAllocator);
  2172. size_t outsize = deserializer->deserialize(rowBuilder, rowSource);
  2173. if (!finalBuilder.exists())
  2174. finalBuilder.swapWith(rowBuilder);
  2175. else
  2176. {
  2177. const void * deserialized = rowBuilder.finalizeRowClear(outsize);
  2178. helper->mergeAggregate(finalBuilder, deserialized);
  2179. ReleaseRoxieRow(deserialized);
  2180. }
  2181. }
  2182. }
  2183. else
  2184. {
  2185. RecordLengthType len = meta.getFixedSize();
  2186. while (m.remaining())
  2187. {
  2188. const void *row;
  2189. if (!meta.isFixedSize())
  2190. {
  2191. RecordLengthType *rowLen = (RecordLengthType *) m.readDirect(sizeof(RecordLengthType));
  2192. if (!*rowLen)
  2193. break;
  2194. len = *rowLen;
  2195. }
  2196. row = m.readDirect(len);
  2197. if (!finalBuilder.exists())
  2198. cloneRow(finalBuilder, row, meta);
  2199. else
  2200. helper->mergeAggregate(finalBuilder, row);
  2201. }
  2202. }
  2203. if (finalBuilder.exists())
  2204. {
  2205. size32_t finalSize = meta.getRecordSize(finalBuilder.getSelf()); // MORE - can probably track it above...
  2206. finalRow.setown(finalBuilder.finalizeRowClear(finalSize));
  2207. }
  2208. }
  2209. };
  2210. class CRoxieDiskAggregateActivityFactory : public CRoxieDiskBaseActivityFactory
  2211. {
  2212. public:
  2213. IMPLEMENT_IINTERFACE;
  2214. CRoxieDiskAggregateActivityFactory(IPropertyTree &_graphNode, unsigned _subgraphId, IQueryFactory &_queryFactory, HelperFactory *_helperFactory)
  2215. : CRoxieDiskBaseActivityFactory(_graphNode, _subgraphId, _queryFactory, _helperFactory)
  2216. {
  2217. }
  2218. virtual IRoxieSlaveActivity *createActivity(SlaveContextLogger &logctx, IRoxieQueryPacket *packet) const
  2219. {
  2220. if (parallelAggregate > 1)
  2221. return new CParallelRoxieDiskAggregateActivity(logctx, packet, helperFactory, this, manager, parallelAggregate);
  2222. else
  2223. return new CRoxieDiskAggregateActivity(logctx, packet, helperFactory, this, manager, 0, 1, false);
  2224. }
  2225. virtual StringBuffer &toString(StringBuffer &s) const
  2226. {
  2227. return CSlaveActivityFactory::toString(s.append("DiskRead "));
  2228. }
  2229. };
  2230. //================================================================================================
  2231. // RecordProcessors used by Disk Aggregate activity.
  2232. // Note - the classes below could be commoned up to make the code smaller, but they have been deliberately unrolled to
  2233. // keep to a bare minimum the number of virtual calls/variable tests per record scanned. This is very speed critical.
  2234. class AggregateRecordProcessor : public RecordProcessor
  2235. {
  2236. public:
  2237. AggregateRecordProcessor(IInMemoryIndexCursor *_cursor, CRoxieDiskAggregateActivity &_owner) : RecordProcessor(_cursor), owner(_owner)
  2238. {
  2239. helper = _owner.helper;
  2240. }
  2241. protected:
  2242. CRoxieDiskAggregateActivity &owner;
  2243. IHThorDiskAggregateArg *helper;
  2244. };
  2245. // Used when we have an in-memory key that matches at least some of the filter conditions
  2246. class KeyedAggregateRecordProcessor : public AggregateRecordProcessor
  2247. {
  2248. public:
  2249. KeyedAggregateRecordProcessor(IInMemoryIndexCursor *_cursor, CRoxieDiskAggregateActivity &_owner) : AggregateRecordProcessor(_cursor, _owner)
  2250. {
  2251. helper->setCallback(cursor);
  2252. }
  2253. virtual void doQuery(IMessagePacker *output, unsigned processed, unsigned __int64 rowLimit, unsigned __int64 stopAfter)
  2254. {
  2255. // doQuery needs to be as fast as possible - we are making a virtual call to it per query in order to avoid tests of loop-invariants within it
  2256. OptimizedRowBuilder rowBuilder(owner.rowAllocator, owner.meta, output, owner.serializer);
  2257. helper->clearAggregate(rowBuilder);
  2258. IInMemoryIndexCursor *lc = cursor;
  2259. lc->reset();
  2260. while (!aborted)
  2261. {
  2262. const void *nextCandidate = lc->nextMatch();
  2263. if (!nextCandidate)
  2264. break;
  2265. helper->processRow(rowBuilder, nextCandidate);
  2266. }
  2267. if (!aborted)
  2268. {
  2269. if (helper->processedAnyRows())
  2270. {
  2271. size32_t finalSize = owner.meta.getRecordSize(rowBuilder.getSelf());
  2272. rowBuilder.writeToOutput(finalSize, true);
  2273. }
  2274. }
  2275. }
  2276. };
  2277. IInMemoryFileProcessor *createKeyedAggregateRecordProcessor(IInMemoryIndexCursor *cursor, CRoxieDiskAggregateActivity &owner)
  2278. {
  2279. return new KeyedAggregateRecordProcessor(cursor, owner);
  2280. }
  2281. // Used when we have no key - fixed size records
  2282. class UnkeyedAggregateRecordProcessor : public AggregateRecordProcessor
  2283. {
  2284. public:
  2285. UnkeyedAggregateRecordProcessor(IInMemoryIndexCursor *_cursor, CRoxieDiskAggregateActivity &_owner, IDirectReader *_reader)
  2286. : AggregateRecordProcessor(_cursor, _owner), reader(_reader)
  2287. {
  2288. helper->setCallback(reader->queryThorDiskCallback());
  2289. }
  2290. // Note that variable size record handler overrides this class
  2291. virtual void doQuery(IMessagePacker *output, unsigned processed, unsigned __int64 rowLimit, unsigned __int64 stopAfter)
  2292. {
  2293. OptimizedRowBuilder rowBuilder(owner.rowAllocator, owner.meta, output, owner.serializer);
  2294. helper->clearAggregate(rowBuilder);
  2295. size32_t recordSize = owner.diskSize.getFixedSize();
  2296. size32_t bufferSize = getBufferSize(recordSize);
  2297. while (!aborted)
  2298. {
  2299. size32_t gotSize;
  2300. const char *firstRec = (const char *) reader->peek(bufferSize, gotSize);
  2301. if (!gotSize)
  2302. break;
  2303. gotSize = roundDown(gotSize, recordSize);
  2304. const char *nextRec = firstRec;
  2305. endRec = firstRec + gotSize;
  2306. // This loop is the inner loop for memory diskreads - so keep it efficient!
  2307. if (cursor) // Moved this test out of the loop below for speed!
  2308. {
  2309. while (nextRec <= endRec)
  2310. {
  2311. if (!cursor->isFiltered(nextRec))
  2312. helper->processRow(rowBuilder, nextRec);
  2313. nextRec += recordSize;
  2314. }
  2315. }
  2316. else
  2317. {
  2318. while (nextRec <= endRec)
  2319. {
  2320. helper->processRow(rowBuilder, nextRec);
  2321. nextRec += recordSize;
  2322. }
  2323. }
  2324. reader->skip(gotSize);
  2325. }
  2326. if (!aborted)
  2327. {
  2328. if (helper->processedAnyRows())
  2329. {
  2330. size32_t finalSize = owner.meta.getRecordSize(rowBuilder.getSelf());
  2331. rowBuilder.writeToOutput(finalSize, true);
  2332. }
  2333. }
  2334. }
  2335. protected:
  2336. Owned<IDirectReader> reader;
  2337. };
  2338. // Used when we have no key - variablesize records
  2339. class UnkeyedVariableAggregateRecordProcessor : public UnkeyedAggregateRecordProcessor
  2340. {
  2341. public:
  2342. UnkeyedVariableAggregateRecordProcessor(IInMemoryIndexCursor *_cursor, CRoxieDiskAggregateActivity &_owner, IDirectReader *_reader)
  2343. : UnkeyedAggregateRecordProcessor(_cursor, _owner, _reader), deserializeSource(_reader)
  2344. {
  2345. prefetcher.setown(owner.diskSize.queryOriginal()->createDiskPrefetcher(owner.queryContext->queryCodeContext(), owner.basefactory->queryId()));
  2346. }
  2347. virtual void doQuery(IMessagePacker *output, unsigned processed, unsigned __int64 rowLimit, unsigned __int64 stopAfter)
  2348. {
  2349. OptimizedRowBuilder rowBuilder(owner.rowAllocator, owner.meta, output, owner.serializer);
  2350. helper->clearAggregate(rowBuilder);
  2351. while (!aborted && !deserializeSource.eos())
  2352. {
  2353. prefetcher->readAhead(deserializeSource);
  2354. const byte *nextRec = deserializeSource.queryRow();
  2355. if (!cursor || !cursor->isFiltered(nextRec))
  2356. {
  2357. helper->processRow(rowBuilder, nextRec);
  2358. }
  2359. deserializeSource.finishedRow();
  2360. }
  2361. if (!aborted)
  2362. {
  2363. if (helper->processedAnyRows())
  2364. {
  2365. size32_t finalSize = owner.meta.getRecordSize(rowBuilder.getSelf());
  2366. rowBuilder.writeToOutput(finalSize, true);
  2367. }
  2368. }
  2369. }
  2370. protected:
  2371. CThorContiguousRowBuffer deserializeSource;
  2372. Owned<ISourceRowPrefetcher> prefetcher;
  2373. };
  2374. IInMemoryFileProcessor *createUnkeyedAggregateRecordProcessor(IInMemoryIndexCursor *_cursor, CRoxieDiskAggregateActivity &_owner, bool variableDisk, IDirectReader *_reader)
  2375. {
  2376. if (variableDisk)
  2377. return new UnkeyedVariableAggregateRecordProcessor(_cursor, _owner, _reader);
  2378. else
  2379. return new UnkeyedAggregateRecordProcessor(_cursor, _owner, _reader);
  2380. }
  2381. ISlaveActivityFactory *createRoxieDiskAggregateActivityFactory(IPropertyTree &_graphNode, unsigned _subgraphId, IQueryFactory &_queryFactory, HelperFactory *_helperFactory)
  2382. {
  2383. return new CRoxieDiskAggregateActivityFactory(_graphNode, _subgraphId, _queryFactory, _helperFactory);
  2384. }
  2385. //================================================================================================
  2386. class CRoxieDiskGroupAggregateActivity : public CRoxieDiskReadBaseActivity
  2387. {
  2388. protected:
  2389. IHThorDiskGroupAggregateArg *helper;
  2390. RowAggregator results;
  2391. void outputResults(IMessagePacker *output)
  2392. {
  2393. if (!aborted)
  2394. {
  2395. loop
  2396. {
  2397. Owned<AggregateRowBuilder> next = results.nextResult();
  2398. if (!next)
  2399. break;
  2400. unsigned rowSize = next->querySize();
  2401. OwnedConstRoxieRow row(next->finalizeRowClear());
  2402. if (serializer)
  2403. {
  2404. serializeRow(output, row);
  2405. }
  2406. else
  2407. {
  2408. void *recBuffer = output->getBuffer(rowSize, meta.isVariableSize());
  2409. memcpy(recBuffer, row, rowSize);
  2410. output->putBuffer(recBuffer, rowSize, meta.isVariableSize());
  2411. }
  2412. }
  2413. }
  2414. }
  2415. public:
  2416. IMPLEMENT_IINTERFACE;
  2417. CRoxieDiskGroupAggregateActivity(SlaveContextLogger &_logctx, IRoxieQueryPacket *_packet, HelperFactory *_hFactory, const CSlaveActivityFactory *_aFactory,
  2418. IInMemoryIndexManager *_manager,
  2419. unsigned partNo, unsigned numParts, bool _forceUnkeyed)
  2420. : CRoxieDiskReadBaseActivity(_logctx, _packet, _hFactory, _aFactory, _manager, partNo, numParts, _forceUnkeyed),
  2421. helper((IHThorDiskGroupAggregateArg *) basehelper),
  2422. results(*helper, *helper)
  2423. {
  2424. onCreate();
  2425. results.start(rowAllocator);
  2426. }
  2427. virtual bool needsRowAllocator()
  2428. {
  2429. return true;
  2430. }
  2431. virtual StringBuffer &toString(StringBuffer &ret) const
  2432. {
  2433. return ret.appendf("DiskGroupAggregate %u", packet->queryHeader().activityId);
  2434. }
  2435. virtual void doProcess(IMessagePacker * output)
  2436. {
  2437. {
  2438. CriticalBlock p(pcrit);
  2439. processor.setown(isKeyed ?
  2440. createKeyedGroupAggregateRecordProcessor(cursor, results, *helper) :
  2441. createUnkeyedGroupAggregateRecordProcessor(cursor, results, *helper, manager->createReader(readPos, parallelPartNo, numParallel),
  2442. queryContext->queryCodeContext(), basefactory->queryId()));
  2443. }
  2444. processor->doQuery(output, 0, 0, 0);
  2445. if (!aborted)
  2446. outputResults(output);
  2447. results.reset();
  2448. }
  2449. };
  2450. class CParallelRoxieDiskGroupAggregateActivity : public CParallelRoxieActivity
  2451. {
  2452. protected:
  2453. IHThorDiskGroupAggregateArg *helper;
  2454. RowAggregator resultAggregator;
  2455. Owned<IRowManager> rowManager;
  2456. public:
  2457. IMPLEMENT_IINTERFACE;
  2458. CParallelRoxieDiskGroupAggregateActivity(SlaveContextLogger &_logctx, IRoxieQueryPacket *_packet, HelperFactory *_hFactory, const CSlaveActivityFactory *_aFactory,
  2459. IInMemoryIndexManager *_manager, unsigned _numParallel) :
  2460. CParallelRoxieActivity(_logctx, _packet, _hFactory, _aFactory, _numParallel),
  2461. helper((IHThorDiskGroupAggregateArg *) basehelper),
  2462. resultAggregator(*helper, *helper)
  2463. {
  2464. onCreate();
  2465. resultAggregator.start(rowAllocator);
  2466. if (meta.needsSerializeDisk())
  2467. {
  2468. // MORE - avoiding serializing to dummy would be more efficient...
  2469. deserializer.setown(meta.createDiskDeserializer(queryContext->queryCodeContext(), basefactory->queryId()));
  2470. }
  2471. CRoxieDiskGroupAggregateActivity *part0 = new CRoxieDiskGroupAggregateActivity(_logctx, _packet, _hFactory, _aFactory, _manager, 0, numParallel, false);
  2472. parts.append(*part0);
  2473. if (part0->queryKeyed())
  2474. {
  2475. numParallel = 1;
  2476. part0->setParallel(0, 1);
  2477. }
  2478. else
  2479. {
  2480. for (unsigned i = 1; i < numParallel; i++)
  2481. parts.append(*new CRoxieDiskGroupAggregateActivity(_logctx, _packet, _hFactory, _aFactory, _manager, i, numParallel, true));
  2482. }
  2483. }
  2484. virtual bool needsRowAllocator()
  2485. {
  2486. return true;
  2487. }
  2488. virtual void doProcess(IMessagePacker *output)
  2489. {
  2490. if (!aborted)
  2491. {
  2492. loop
  2493. {
  2494. Owned<AggregateRowBuilder> next = resultAggregator.nextResult();
  2495. if (!next)
  2496. break;
  2497. unsigned rowSize = next->querySize();
  2498. OwnedConstRoxieRow row(next->finalizeRowClear());
  2499. if (serializer)
  2500. {
  2501. serializeRow(output, row);
  2502. }
  2503. else
  2504. {
  2505. void *recBuffer = output->getBuffer(rowSize, meta.isVariableSize());
  2506. memcpy(recBuffer, row, rowSize);
  2507. output->putBuffer(recBuffer, rowSize, meta.isVariableSize());
  2508. }
  2509. }
  2510. }
  2511. resultAggregator.reset();
  2512. helper->setCallback(NULL);
  2513. }
  2514. void processRow(CDummyMessagePacker &d)
  2515. {
  2516. CriticalBlock b(parCrit); // MORE - use a spinlock
  2517. MemoryBuffer &m = d.data;
  2518. Owned<ISerialStream> stream = createMemoryBufferSerialStream(m);
  2519. CThorStreamDeserializerSource rowSource(stream);
  2520. while (m.remaining())
  2521. {
  2522. const void *row;
  2523. if (meta.isFixedSize() && !deserializer)
  2524. {
  2525. row = m.readDirect(meta.getFixedSize());
  2526. resultAggregator.mergeElement(row);
  2527. }
  2528. else
  2529. {
  2530. RecordLengthType *rowLen = (RecordLengthType *) m.readDirect(sizeof(RecordLengthType));
  2531. if (!*rowLen)
  2532. break;
  2533. RecordLengthType len = *rowLen;
  2534. if (deserializer)
  2535. {
  2536. RtlDynamicRowBuilder rowBuilder(rowAllocator);
  2537. size_t outsize = deserializer->deserialize(rowBuilder, rowSource);
  2538. OwnedConstRoxieRow deserialized = rowBuilder.finalizeRowClear(outsize);
  2539. resultAggregator.mergeElement(deserialized);
  2540. }
  2541. else
  2542. {
  2543. row = m.readDirect(len);
  2544. resultAggregator.mergeElement(row);
  2545. }
  2546. }
  2547. }
  2548. }
  2549. };
  2550. class CRoxieDiskGroupAggregateActivityFactory : public CRoxieDiskBaseActivityFactory
  2551. {
  2552. public:
  2553. IMPLEMENT_IINTERFACE;
  2554. CRoxieDiskGroupAggregateActivityFactory(IPropertyTree &_graphNode, unsigned _subgraphId, IQueryFactory &_queryFactory, HelperFactory *_helperFactory)
  2555. : CRoxieDiskBaseActivityFactory(_graphNode, _subgraphId, _queryFactory, _helperFactory)
  2556. {
  2557. }
  2558. virtual IRoxieSlaveActivity *createActivity(SlaveContextLogger &logctx, IRoxieQueryPacket *packet) const
  2559. {
  2560. if (parallelAggregate > 1)
  2561. return new CParallelRoxieDiskGroupAggregateActivity(logctx, packet, helperFactory, this, manager, parallelAggregate);
  2562. else
  2563. return new CRoxieDiskGroupAggregateActivity(logctx, packet, helperFactory, this, manager, 0, 1, false);
  2564. }
  2565. virtual StringBuffer &toString(StringBuffer &s) const
  2566. {
  2567. return CSlaveActivityFactory::toString(s.append("DiskRead "));
  2568. }
  2569. };
  2570. //================================================================================================
  2571. // RecordProcessors used by Disk Group Aggregate activity.
  2572. // Note - the classes below could be commoned up to make the code smaller, but they have been deliberately unrolled to
  2573. // keep to a bare minimum the number of virtual calls/variable tests per record scanned. This is very speed critical.
  2574. class GroupAggregateRecordProcessor : public RecordProcessor, implements IHThorGroupAggregateCallback
  2575. {
  2576. public:
  2577. IMPLEMENT_IINTERFACE;
  2578. GroupAggregateRecordProcessor(IInMemoryIndexCursor *_cursor, RowAggregator &_results, IHThorDiskGroupAggregateArg &_helper)
  2579. : RecordProcessor(_cursor),
  2580. results(_results),
  2581. helper(_helper)
  2582. {
  2583. }
  2584. virtual void processRow(const void * next)
  2585. {
  2586. results.addRow(next);
  2587. }
  2588. protected:
  2589. RowAggregator &results;
  2590. IHThorDiskGroupAggregateArg &helper;
  2591. };
  2592. // Used when we have an in-memory key that matches at least some of the filter conditions
  2593. class KeyedGroupAggregateRecordProcessor : public GroupAggregateRecordProcessor
  2594. {
  2595. public:
  2596. KeyedGroupAggregateRecordProcessor(IInMemoryIndexCursor *_cursor, RowAggregator &_results, IHThorDiskGroupAggregateArg &_helper)
  2597. : GroupAggregateRecordProcessor(_cursor, _results, _helper)
  2598. {
  2599. helper.setCallback(cursor);
  2600. }
  2601. virtual void doQuery(IMessagePacker *output, unsigned processed, unsigned __int64 rowLimit, unsigned __int64 stopAfter)
  2602. {
  2603. // doQuery needs to be as fast as possible - we are making a virtual call to it per query in order to avoid tests of loop-invariants within it
  2604. IInMemoryIndexCursor *lc = cursor;
  2605. lc->reset();
  2606. while (!aborted)
  2607. {
  2608. const void *nextCandidate = lc->nextMatch();
  2609. if (!nextCandidate)
  2610. break;
  2611. helper.processRow(nextCandidate, this);
  2612. }
  2613. }
  2614. };
  2615. IInMemoryFileProcessor *createKeyedGroupAggregateRecordProcessor(IInMemoryIndexCursor *cursor, RowAggregator &results, IHThorDiskGroupAggregateArg &helper)
  2616. {
  2617. return new KeyedGroupAggregateRecordProcessor(cursor, results, helper);
  2618. }
  2619. // Used when we have no key, fixed size records. Variable size records use more derived class
  2620. class UnkeyedGroupAggregateRecordProcessor : public GroupAggregateRecordProcessor
  2621. {
  2622. public:
  2623. IMPLEMENT_IINTERFACE;
  2624. UnkeyedGroupAggregateRecordProcessor(IInMemoryIndexCursor *_cursor, RowAggregator &_results, IHThorDiskGroupAggregateArg &_helper, IDirectReader *_reader)
  2625. : GroupAggregateRecordProcessor(_cursor, _results, _helper), reader(_reader)
  2626. {
  2627. helper.setCallback(reader->queryThorDiskCallback());
  2628. }
  2629. virtual void doQuery(IMessagePacker *output, unsigned processed, unsigned __int64 rowLimit, unsigned __int64 stopAfter)
  2630. {
  2631. size32_t recordSize = helper.queryDiskRecordSize()->getFixedSize();
  2632. size32_t bufferSize = getBufferSize(recordSize);
  2633. while (!aborted && !reader->eos())
  2634. {
  2635. size32_t gotSize;
  2636. const char *firstRec = (const char *) reader->peek(bufferSize, gotSize);
  2637. if (!gotSize)
  2638. break;
  2639. gotSize = roundDown(gotSize, recordSize);
  2640. const char *nextRec = firstRec;
  2641. endRec = firstRec + gotSize;
  2642. // This loop is the inner loop for memory diskreads - so keep it efficient!
  2643. if (cursor)
  2644. {
  2645. while (nextRec <= endRec)
  2646. {
  2647. if (!cursor->isFiltered(nextRec))
  2648. helper.processRow(nextRec, this);
  2649. nextRec += recordSize;
  2650. }
  2651. }
  2652. else
  2653. {
  2654. helper.processRows(gotSize, firstRec, this);
  2655. }
  2656. reader->skip(gotSize);
  2657. }
  2658. }
  2659. protected:
  2660. Owned<IDirectReader> reader;
  2661. };
  2662. // Used when we have no key, variable size records.
  2663. class UnkeyedVariableGroupAggregateRecordProcessor : public UnkeyedGroupAggregateRecordProcessor
  2664. {
  2665. public:
  2666. UnkeyedVariableGroupAggregateRecordProcessor(IInMemoryIndexCursor *_cursor, RowAggregator &_results, IHThorDiskGroupAggregateArg &_helper, IDirectReader *_reader,
  2667. ICodeContext *ctx, unsigned activityId)
  2668. : UnkeyedGroupAggregateRecordProcessor(_cursor, _results, _helper, _reader), deserializeSource(_reader)
  2669. {
  2670. prefetcher.setown(helper.queryDiskRecordSize()->createDiskPrefetcher(ctx, activityId));
  2671. }
  2672. virtual void doQuery(IMessagePacker *output, unsigned processed, unsigned __int64 rowLimit, unsigned __int64 stopAfter)
  2673. {
  2674. helper.setCallback(reader->queryThorDiskCallback());
  2675. while (!aborted && !deserializeSource.eos())
  2676. {
  2677. // This loop is the inner loop for memory diskreads - so keep it efficient!
  2678. prefetcher->readAhead(deserializeSource);
  2679. const byte *nextRec = deserializeSource.queryRow();
  2680. if (!cursor || !cursor->isFiltered(nextRec))
  2681. helper.processRow(nextRec, this);
  2682. deserializeSource.finishedRow();
  2683. }
  2684. }
  2685. protected:
  2686. CThorContiguousRowBuffer deserializeSource;
  2687. Owned<ISourceRowPrefetcher> prefetcher;
  2688. };
  2689. IInMemoryFileProcessor *createUnkeyedGroupAggregateRecordProcessor(IInMemoryIndexCursor *cursor, RowAggregator &results, IHThorDiskGroupAggregateArg &helper, IDirectReader *reader, ICodeContext *ctx, unsigned activityId)
  2690. {
  2691. if (helper.queryDiskRecordSize()->isVariableSize())
  2692. return new UnkeyedVariableGroupAggregateRecordProcessor(cursor, results, helper, reader, ctx, activityId);
  2693. else
  2694. return new UnkeyedGroupAggregateRecordProcessor(cursor, results, helper, reader);
  2695. }
  2696. ISlaveActivityFactory *createRoxieDiskGroupAggregateActivityFactory(IPropertyTree &_graphNode, unsigned _subgraphId, IQueryFactory &_queryFactory, HelperFactory *_helperFactory)
  2697. {
  2698. return new CRoxieDiskGroupAggregateActivityFactory(_graphNode, _subgraphId, _queryFactory, _helperFactory);
  2699. }
  2700. //================================================================================================
  2701. class CRoxieKeyedActivityFactory : public CSlaveActivityFactory
  2702. {
  2703. protected:
  2704. Owned<IKeyArray> keyArray;
  2705. Owned<TranslatorArray> layoutTranslators;
  2706. Owned<IDefRecordMeta> activityMeta;
  2707. CRoxieKeyedActivityFactory(IPropertyTree &_graphNode, unsigned _subgraphId, IQueryFactory &_queryFactory, HelperFactory *_helperFactory)
  2708. : CSlaveActivityFactory(_graphNode, _subgraphId, _queryFactory, _helperFactory)
  2709. {
  2710. }
  2711. public:
  2712. inline IKeyArray *queryKeyArray() const { return keyArray; }
  2713. inline TranslatorArray *queryLayoutTranslators() const { return layoutTranslators; }
  2714. inline IDefRecordMeta *queryActivityMeta() const { return activityMeta; }
  2715. };
  2716. class CRoxieIndexActivityFactory : public CRoxieKeyedActivityFactory
  2717. {
  2718. public:
  2719. IMPLEMENT_IINTERFACE;
  2720. CRoxieIndexActivityFactory(IPropertyTree &_graphNode, unsigned _subgraphId, IQueryFactory &_queryFactory, HelperFactory *_helperFactory)
  2721. : CRoxieKeyedActivityFactory(_graphNode, _subgraphId, _queryFactory, _helperFactory)
  2722. {
  2723. }
  2724. void init(IHThorIndexReadBaseArg * helper, IPropertyTree &graphNode)
  2725. {
  2726. rtlDataAttr indexLayoutMeta;
  2727. size32_t indexLayoutSize;
  2728. if(!helper->getIndexLayout(indexLayoutSize, indexLayoutMeta.refdata()))
  2729. assertex(indexLayoutSize== 0);
  2730. MemoryBuffer m;
  2731. m.setBuffer(indexLayoutSize, indexLayoutMeta.getdata());
  2732. activityMeta.setown(deserializeRecordMeta(m, true));
  2733. layoutTranslators.setown(new TranslatorArray);
  2734. bool variableFileName = allFilesDynamic || ((helper->getFlags() & (TIRvarfilename|TIRdynamicfilename)) != 0);
  2735. if (!variableFileName)
  2736. {
  2737. bool isOpt = (helper->getFlags() & TIRoptional) != 0;
  2738. OwnedRoxieString indexName(helper->getFileName());
  2739. datafile.setown(queryFactory.queryPackage().lookupFileName(indexName, isOpt, true, queryFactory.queryWorkUnit()));
  2740. if (datafile)
  2741. keyArray.setown(datafile->getKeyArray(activityMeta, layoutTranslators, isOpt, queryFactory.queryChannel(), queryFactory.getEnableFieldTranslation()));
  2742. }
  2743. }
  2744. };
  2745. class CRoxieKeyedActivity : public CRoxieSlaveActivity
  2746. {
  2747. // Common base class for all activities that deal with keys - keyed join or indexread and its allies
  2748. protected:
  2749. Owned<IKeyManager> tlk;
  2750. Linked<TranslatorArray> layoutTranslators;
  2751. Linked<IKeyArray> keyArray;
  2752. IDefRecordMeta *activityMeta;
  2753. bool createSegmentMonitorsPending;
  2754. virtual void createSegmentMonitors() = 0;
  2755. virtual void setPartNo(bool filechanged)
  2756. {
  2757. if (!lastPartNo.partNo)
  2758. {
  2759. assertex(filechanged);
  2760. Owned<IKeyIndexSet> allKeys = createKeyIndexSet();
  2761. for (unsigned subpart = 0; subpart < keyArray->length(); subpart++)
  2762. {
  2763. IKeyIndexBase *kib = keyArray->queryKeyPart(subpart);
  2764. if (kib)
  2765. {
  2766. IKeyIndex *k = kib->queryPart(lastPartNo.fileNo);
  2767. if (k)
  2768. {
  2769. assertex(!k->isTopLevelKey());
  2770. allKeys->addIndex(LINK(k));
  2771. }
  2772. }
  2773. }
  2774. if (allKeys->numParts())
  2775. {
  2776. tlk.setown(createKeyMerger(allKeys, 0, 0, &logctx));
  2777. createSegmentMonitorsPending = true;
  2778. }
  2779. else
  2780. tlk.clear();
  2781. }
  2782. else
  2783. {
  2784. IKeyIndexBase *kib = keyArray->queryKeyPart(lastPartNo.partNo);
  2785. assertex(kib != NULL);
  2786. IKeyIndex *k = kib->queryPart(lastPartNo.fileNo);
  2787. if (filechanged)
  2788. {
  2789. tlk.setown(createKeyManager(k, 0, &logctx));
  2790. createSegmentMonitorsPending = true;
  2791. }
  2792. else
  2793. tlk->setKey(k);
  2794. }
  2795. }
  2796. virtual void setVariableFileInfo()
  2797. {
  2798. layoutTranslators.setown(new TranslatorArray);
  2799. keyArray.setown(varFileInfo->getKeyArray(activityMeta, layoutTranslators, isOpt, packet->queryHeader().channel, allowFieldTranslation));
  2800. }
  2801. CRoxieKeyedActivity(SlaveContextLogger &_logctx, IRoxieQueryPacket *_packet, HelperFactory *_hFactory, const CRoxieKeyedActivityFactory *_aFactory)
  2802. : CRoxieSlaveActivity(_logctx, _packet, _hFactory, _aFactory),
  2803. keyArray(_aFactory->queryKeyArray()),
  2804. layoutTranslators(_aFactory->queryLayoutTranslators()),
  2805. activityMeta(_aFactory->queryActivityMeta()),
  2806. createSegmentMonitorsPending(true)
  2807. {
  2808. }
  2809. };
  2810. class CRoxieIndexActivity : public CRoxieKeyedActivity
  2811. {
  2812. // Common base class for indexread, indexcount and related activities
  2813. protected:
  2814. const CRoxieIndexActivityFactory *factory;
  2815. PartNoType *inputData; // list of channels
  2816. IHThorIndexReadBaseArg * indexHelper;
  2817. unsigned inputCount;
  2818. unsigned inputsDone;
  2819. unsigned processed;
  2820. unsigned keyprocessed;
  2821. unsigned steppingOffset;
  2822. unsigned steppingLength;
  2823. unsigned short numSkipFields;
  2824. unsigned numSeeks;
  2825. bool seeksAreEof;
  2826. bool lastRowCompleteMatch;
  2827. CIndexTransformCallback callback;
  2828. SmartStepExtra stepExtra; // just used for flags - a little unnecessary...
  2829. const byte *steppingRow;
  2830. __int64 getCount()
  2831. {
  2832. assertex(!resent);
  2833. unsigned __int64 result = 0;
  2834. unsigned inputsDone = 0;
  2835. while (!aborted && inputsDone < inputCount)
  2836. {
  2837. checkPartChanged(inputData[inputsDone]);
  2838. if (tlk)
  2839. {
  2840. createSegmentMonitors();
  2841. result += tlk->getCount();
  2842. }
  2843. inputsDone++;
  2844. }
  2845. return result;
  2846. }
  2847. bool checkLimit(unsigned __int64 limit)
  2848. {
  2849. assertex(!resent);
  2850. unsigned __int64 result = 0;
  2851. unsigned inputsDone = 0;
  2852. bool ret = true;
  2853. while (!aborted && inputsDone < inputCount)
  2854. {
  2855. checkPartChanged(inputData[inputsDone]);
  2856. if (tlk)
  2857. {
  2858. createSegmentMonitors();
  2859. result += tlk->checkCount(limit-result);
  2860. if (result > limit)
  2861. {
  2862. ret = false;
  2863. break;
  2864. }
  2865. }
  2866. inputsDone++;
  2867. }
  2868. return ret;
  2869. }
  2870. public:
  2871. IMPLEMENT_IINTERFACE;
  2872. CRoxieIndexActivity(SlaveContextLogger &_logctx, IRoxieQueryPacket *_packet, HelperFactory *_hFactory, const CRoxieIndexActivityFactory *_aFactory, unsigned _steppingOffset)
  2873. : CRoxieKeyedActivity(_logctx, _packet, _hFactory, _aFactory),
  2874. factory(_aFactory),
  2875. steppingOffset(_steppingOffset),
  2876. stepExtra(SSEFreadAhead, NULL)
  2877. {
  2878. indexHelper = (IHThorIndexReadBaseArg *) basehelper;
  2879. variableFileName = allFilesDynamic || ((indexHelper->getFlags() & (TIRvarfilename|TIRdynamicfilename)) != 0);
  2880. isOpt = (indexHelper->getFlags() & TDRoptional) != 0;
  2881. inputData = NULL;
  2882. inputCount = 0;
  2883. inputsDone = 0;
  2884. processed = 0;
  2885. keyprocessed = 0;
  2886. numSkipFields = 0;
  2887. lastRowCompleteMatch = true; // default is we only return complete matches....
  2888. seeksAreEof = false;
  2889. steppingLength = 0;
  2890. steppingRow = NULL;
  2891. numSeeks = 0;
  2892. if (packet->getSmartStepInfoLength())
  2893. {
  2894. const byte *smartStepInfoValue = packet->querySmartStepInfoData();
  2895. numSkipFields = * (unsigned short *) smartStepInfoValue;
  2896. smartStepInfoValue += sizeof(unsigned short);
  2897. steppingLength = * (unsigned short *) smartStepInfoValue;
  2898. smartStepInfoValue += sizeof(unsigned short);
  2899. unsigned flags = * (unsigned short *) smartStepInfoValue;
  2900. smartStepInfoValue += sizeof(unsigned short);
  2901. seeksAreEof = * (bool *) smartStepInfoValue;
  2902. smartStepInfoValue += sizeof(bool);
  2903. numSeeks = * (unsigned *) smartStepInfoValue;
  2904. smartStepInfoValue += sizeof(unsigned);
  2905. assertex(numSeeks); // Given that we put the first seek in here to there should always be at least one!
  2906. steppingRow = smartStepInfoValue; // the first of them...
  2907. stepExtra.set(flags, NULL);
  2908. if (logctx.queryTraceLevel() > 10)
  2909. {
  2910. logctx.CTXLOG("%d seek rows provided. mismatch(%d) readahead(%d) onlyfirst(%d)", numSeeks,
  2911. (int)stepExtra.returnMismatches(), (int)stepExtra.readAheadManyResults(), (int)stepExtra.onlyReturnFirstSeekMatch());
  2912. if (logctx.queryTraceLevel() > 15)
  2913. {
  2914. for (unsigned i = 0; i < numSeeks; i++)
  2915. {
  2916. StringBuffer b;
  2917. for (unsigned j = 0; j < steppingLength; j++)
  2918. b.appendf("%02x ", steppingRow[i*steppingLength + j]);
  2919. logctx.CTXLOG("Seek row %d: %s", i+1, b.str());
  2920. }
  2921. }
  2922. }
  2923. }
  2924. else
  2925. {
  2926. if (logctx.queryTraceLevel() > 10)
  2927. logctx.CTXLOG("0 seek rows provided.");
  2928. }
  2929. }
  2930. virtual void onCreate()
  2931. {
  2932. CRoxieKeyedActivity::onCreate();
  2933. inputData = (PartNoType *) serializedCreate.readDirect(0);
  2934. inputCount = (serializedCreate.length() - serializedCreate.getPos()) / sizeof(*inputData);
  2935. indexHelper->setCallback(&callback);
  2936. }
  2937. virtual const char *queryDynamicFileName() const
  2938. {
  2939. return indexHelper->getFileName();
  2940. }
  2941. virtual void createSegmentMonitors()
  2942. {
  2943. if (createSegmentMonitorsPending)
  2944. {
  2945. createSegmentMonitorsPending = false;
  2946. tlk->setLayoutTranslator(layoutTranslators->item(lastPartNo.fileNo));
  2947. indexHelper->createSegmentMonitors(tlk);
  2948. tlk->finishSegmentMonitors();
  2949. }
  2950. }
  2951. bool sendContinuation(IMessagePacker * output)
  2952. {
  2953. MemoryBuffer si;
  2954. unsigned short siLen = 0;
  2955. si.append(siLen);
  2956. si.append(lastRowCompleteMatch);
  2957. si.append(inputsDone);
  2958. si.append(processed);
  2959. si.append(keyprocessed);
  2960. si.append(lastPartNo.partNo);
  2961. si.append(lastPartNo.fileNo);
  2962. tlk->serializeCursorPos(si);
  2963. if (si.length() <= maxContinuationSize)
  2964. {
  2965. siLen = si.length() - sizeof(siLen);
  2966. si.writeDirect(0, sizeof(siLen), &siLen);
  2967. output->sendMetaInfo(si.toByteArray(), si.length());
  2968. logctx.flush(true, aborted);
  2969. output->flush(true);
  2970. return true;
  2971. }
  2972. else
  2973. return false;
  2974. }
  2975. void readContinuationInfo()
  2976. {
  2977. resentInfo.read(lastRowCompleteMatch);
  2978. resentInfo.read(inputsDone);
  2979. resentInfo.read(processed);
  2980. resentInfo.read(keyprocessed);
  2981. resentInfo.read(lastPartNo.partNo);
  2982. resentInfo.read(lastPartNo.fileNo);
  2983. setPartNo(true);
  2984. tlk->deserializeCursorPos(resentInfo);
  2985. assertex(resentInfo.remaining() == 0);
  2986. }
  2987. virtual void setPartNo(bool fileChanged)
  2988. {
  2989. // NOTE - may be used by both indexread and normalize...
  2990. if (steppingOffset) // MORE - may be other cases too - eg want output sorted and there are multiple subfiles...
  2991. {
  2992. unsigned i = 0;
  2993. Owned<IKeyIndexSet> allKeys = createKeyIndexSet();
  2994. while (!aborted && i < inputCount)
  2995. {
  2996. PartNoType &part = inputData[i];
  2997. lastPartNo.partNo = part.partNo;
  2998. lastPartNo.fileNo = part.fileNo; // This is a hack so that the translator can be retrieved. We don't support record translation properly when doing keymerging...)
  2999. // MORE - this code looks like it could be commoned up with code in CRoxieKeyedActivity::setPartNo
  3000. if (!lastPartNo.partNo)
  3001. {
  3002. for (unsigned subpart = 0; subpart < keyArray->length(); subpart++)
  3003. {
  3004. IKeyIndexBase *kib = keyArray->queryKeyPart(subpart);
  3005. if (kib)
  3006. {
  3007. IKeyIndex *k = kib->queryPart(lastPartNo.fileNo);
  3008. if (k)
  3009. {
  3010. assertex(!k->isTopLevelKey());
  3011. allKeys->addIndex(LINK(k));
  3012. }
  3013. }
  3014. }
  3015. }
  3016. else
  3017. {
  3018. IKeyIndexBase *kib = keyArray->queryKeyPart(part.partNo);
  3019. assertex(kib != NULL);
  3020. IKeyIndex *k = kib->queryPart(part.fileNo);
  3021. allKeys->addIndex(LINK(k));
  3022. }
  3023. i++;
  3024. }
  3025. if (allKeys->numParts())
  3026. tlk.setown(::createKeyMerger(allKeys, 0, steppingOffset, &logctx));
  3027. else
  3028. tlk.clear();
  3029. createSegmentMonitorsPending = true;
  3030. }
  3031. else
  3032. CRoxieKeyedActivity::setPartNo(fileChanged);
  3033. }
  3034. };
  3035. //================================================================================================
  3036. class CRoxieIndexReadActivity : public CRoxieIndexActivity, implements IIndexReadActivityInfo
  3037. {
  3038. protected:
  3039. IHThorCompoundReadExtra * readHelper;
  3040. public:
  3041. IMPLEMENT_IINTERFACE;
  3042. CRoxieIndexReadActivity(SlaveContextLogger &_logctx, IRoxieQueryPacket *_packet, HelperFactory *_hFactory, const CRoxieIndexActivityFactory *_aFactory, unsigned _steppingOffset)
  3043. : CRoxieIndexActivity(_logctx, _packet, _hFactory, _aFactory, _steppingOffset)
  3044. {
  3045. onCreate();
  3046. readHelper = (IHThorIndexReadArg *) basehelper;
  3047. if (resent)
  3048. readContinuationInfo();
  3049. }
  3050. virtual StringBuffer &toString(StringBuffer &ret) const
  3051. {
  3052. return ret.appendf("IndexRead %u", packet->queryHeader().activityId);
  3053. }
  3054. /* Notes on Global smart stepping implementation:
  3055. When smart stepping, I get from the Roxie server:
  3056. 1 or more seek positions
  3057. some flags
  3058. I can read from the index in an order that matches the seek positions order (because of index merger), and which match my hard filter (they may not match my postfilter)
  3059. I can skip forwards in the index to the first record GE a skip position
  3060. I am not going to try to implement a (possible future) flag to return mismatches after any but the last of the seek positions (yet)
  3061. I want to return M matching (keyed matches, seek field matches, and postfilter matches) rows for all provided seek positions
  3062. where M is 1 if SSEFonlyReturnFirstSeekMatch flag set, otherwise all...
  3063. THEN once we are beyond the last provided seek:
  3064. if returnMismatches flag set,
  3065. current row = (next row matching keyed filter)
  3066. if someNewAsYetUnnamedAndUnimplementedFlag flag set
  3067. //if the post filter matches then there may be scope for returning all records which match the seek fields of that first match
  3068. // But not very likely to help unless terms correlated, and may well hinder
  3069. return current row and all following rows matching keyed filter, seek data from current row, and postfilter
  3070. return next row matching keyed filter (even if doesn't match postfilter)
  3071. else
  3072. return next N rows matching keyed filter and postfilter (where N depends on readAheadManyResults flag - 1 if not set)
  3073. */
  3074. inline void advanceToNextSeek()
  3075. {
  3076. assertex(numSeeks != 0);
  3077. if (--numSeeks)
  3078. steppingRow += steppingLength;
  3079. else
  3080. steppingRow = NULL;
  3081. }
  3082. virtual bool process()
  3083. {
  3084. MTIME_SECTION(timer, "CRoxieIndexReadActivity ::process");
  3085. unsigned __int64 keyedLimit = readHelper->getKeyedLimit();
  3086. unsigned __int64 limit = readHelper->getRowLimit();
  3087. if (!resent && (keyedLimit != (unsigned __int64) -1) && ((keyedLimit > preabortIndexReadsThreshold) || (indexHelper->getFlags() & TIRcountkeyedlimit) != 0)) // Don't recheck the limit every time!
  3088. {
  3089. if (!checkLimit(keyedLimit))
  3090. {
  3091. limitExceeded(true);
  3092. return true;
  3093. }
  3094. }
  3095. unsigned __int64 stopAfter = readHelper->getChooseNLimit();
  3096. Owned<IMessagePacker> output = ROQ->createOutputStream(packet->queryHeader(), false, logctx);
  3097. OptimizedRowBuilder rowBuilder(rowAllocator, meta, output, serializer);
  3098. unsigned totalSizeSent = 0;
  3099. unsigned skipped = 0;
  3100. unsigned processedBefore = processed;
  3101. unsigned keyprocessedBefore = keyprocessed;
  3102. bool continuationFailed = false;
  3103. const byte *rawSeek = NULL;
  3104. if (steppingRow)
  3105. rawSeek = steppingRow;
  3106. bool continuationNeeded = false;
  3107. while (!aborted && inputsDone < inputCount)
  3108. {
  3109. if (!resent || !steppingOffset) // Bit of a hack... In the resent case, we have already set up the tlk, and all keys are processed at once in the steppingOffset case (which makes checkPartChanged gives a false positive in this case)
  3110. checkPartChanged(inputData[inputsDone]);
  3111. if (tlk)
  3112. {
  3113. createSegmentMonitors();
  3114. tlk->reset(resent);
  3115. resent = false;
  3116. {
  3117. TransformCallbackAssociation associate(callback, tlk); // want to destroy this before we advance to next key...
  3118. while (!aborted && rawSeek ? tlk->lookupSkip(rawSeek, steppingOffset, steppingLength) : tlk->lookup(true))
  3119. {
  3120. rawSeek = NULL; // only want to do the seek first time we look for a particular seek value
  3121. keyprocessed++;
  3122. if ((keyedLimit != (unsigned __int64) -1) && keyprocessed > keyedLimit)
  3123. {
  3124. logctx.noteStatistic(STATS_ACCEPTED, keyprocessed-keyprocessedBefore, 1);
  3125. logctx.noteStatistic(STATS_REJECTED, skipped, 1);
  3126. limitExceeded(true);
  3127. break;
  3128. }
  3129. atomic_inc(&indexRecordsRead);
  3130. size32_t transformedSize;
  3131. const byte * keyRow = tlk->queryKeyBuffer(callback.getFPosRef());
  3132. int diff = 0;
  3133. if (steppingRow)
  3134. {
  3135. diff = memcmp(keyRow+steppingOffset, steppingRow, steppingLength);
  3136. assertex(diff >= 0);
  3137. }
  3138. while (diff > 0)
  3139. {
  3140. advanceToNextSeek();
  3141. if (!steppingRow)
  3142. break;
  3143. diff = memcmp(keyRow+steppingOffset, steppingRow, steppingLength);
  3144. if (diff < 0)
  3145. {
  3146. rawSeek = steppingRow;
  3147. break;
  3148. }
  3149. }
  3150. if (diff >= 0)
  3151. {
  3152. if (diff > 0 && seeksAreEof)
  3153. {
  3154. assertex(!steppingRow);
  3155. break;
  3156. }
  3157. rowBuilder.ensureRow();
  3158. transformedSize = readHelper->transform(rowBuilder, keyRow);
  3159. callback.finishedRow();
  3160. if (transformedSize)
  3161. {
  3162. if (logctx.queryTraceLevel() > 15)
  3163. {
  3164. StringBuffer b;
  3165. for (unsigned j = 0; j < (steppingLength ? steppingLength : 6); j++)
  3166. b.appendf("%02x ", keyRow[steppingOffset + j]);
  3167. logctx.CTXLOG("Returning seek row %s", b.str());
  3168. }
  3169. // Did get a match
  3170. processed++;
  3171. if (limit && processed > limit)
  3172. {
  3173. logctx.noteStatistic(STATS_ACCEPTED, processed-processedBefore, 1);
  3174. logctx.noteStatistic(STATS_REJECTED, skipped, 1);
  3175. limitExceeded(false);
  3176. break;
  3177. }
  3178. if (processed > stopAfter)
  3179. {
  3180. logctx.noteStatistic(STATS_ACCEPTED, processed-processedBefore, 1);
  3181. logctx.noteStatistic(STATS_REJECTED, skipped, 1);
  3182. logctx.flush(true, false);
  3183. output->flush(true);
  3184. return true;
  3185. }
  3186. rowBuilder.writeToOutput(transformedSize, true);
  3187. totalSizeSent += transformedSize;
  3188. if (totalSizeSent > indexReadChunkSize || (steppingOffset && !steppingRow && !stepExtra.readAheadManyResults()))
  3189. continuationNeeded = true;
  3190. lastRowCompleteMatch = true;
  3191. if (steppingRow && stepExtra.onlyReturnFirstSeekMatch())
  3192. advanceToNextSeek();
  3193. }
  3194. else
  3195. {
  3196. // Didn't get a match
  3197. if (steppingOffset && !steppingRow && stepExtra.returnMismatches())
  3198. {
  3199. transformedSize = readHelper->unfilteredTransform(rowBuilder, keyRow);
  3200. if (transformedSize) // will only be zero in odd situations where codegen can't work out how to transform (eg because of a skip)
  3201. {
  3202. callback.finishedRow();
  3203. rowBuilder.writeToOutput(transformedSize, true);
  3204. totalSizeSent += transformedSize;
  3205. continuationNeeded = true;
  3206. lastRowCompleteMatch = false;
  3207. }
  3208. }
  3209. else
  3210. {
  3211. atomic_inc(&postFiltered);
  3212. skipped++;
  3213. }
  3214. }
  3215. }
  3216. if (continuationNeeded && !continuationFailed)
  3217. {
  3218. if (logctx.queryTraceLevel() > 10)
  3219. logctx.CTXLOG("Indexread returning partial result set %d rows from %d seeks, %d scans, %d skips", processed-processedBefore, tlk->querySeeks(), tlk->queryScans(), tlk->querySkips());
  3220. if (sendContinuation(output))
  3221. {
  3222. logctx.noteStatistic(STATS_ACCEPTED, processed-processedBefore, 1);
  3223. logctx.noteStatistic(STATS_REJECTED, skipped, 1);
  3224. return true;
  3225. }
  3226. else
  3227. {
  3228. // This is actually pretty fatal for smart-stepping case
  3229. if (logctx.queryTraceLevel())
  3230. logctx.CTXLOG("Indexread unable to return partial result set");
  3231. continuationFailed = true;
  3232. }
  3233. }
  3234. rowBuilder.clear();
  3235. }
  3236. }
  3237. }
  3238. if (steppingOffset)
  3239. inputsDone = inputCount;
  3240. else
  3241. inputsDone++;
  3242. }
  3243. if (tlk) // a very early abort can mean it is NULL.... MORE is this the right place to put it or should it be inside the loop??
  3244. {
  3245. if (logctx.queryTraceLevel() > 10 && !aborted)
  3246. {
  3247. logctx.CTXLOG("Indexread returning result set %d rows from %d seeks, %d scans, %d skips", processed-processedBefore, tlk->querySeeks(), tlk->queryScans(), tlk->querySkips());
  3248. if (steppingOffset)
  3249. logctx.CTXLOG("Indexread return: steppingOffset %d, steppingRow %p, stepExtra.returnMismatches() %d",steppingOffset, steppingRow, (int) stepExtra.returnMismatches());
  3250. }
  3251. logctx.noteStatistic(STATS_ACCEPTED, processed-processedBefore, 1);
  3252. logctx.noteStatistic(STATS_REJECTED, skipped, 1);
  3253. }
  3254. logctx.flush(true, aborted);
  3255. if (aborted)
  3256. output->abort();
  3257. else
  3258. output->flush(true);
  3259. return !aborted;
  3260. }
  3261. virtual IIndexReadActivityInfo *queryIndexReadActivity()
  3262. {
  3263. return this;
  3264. }
  3265. virtual IKeyArray *getKeySet() const
  3266. {
  3267. return keyArray.getLink();
  3268. }
  3269. virtual const IResolvedFile *getVarFileInfo() const
  3270. {
  3271. return varFileInfo.getLink();
  3272. }
  3273. virtual TranslatorArray *getTranslators() const
  3274. {
  3275. return layoutTranslators.getLink();
  3276. }
  3277. virtual void mergeSegmentMonitors(IIndexReadContext *irc) const
  3278. {
  3279. indexHelper->createSegmentMonitors(irc); // NOTE: they will merge;
  3280. }
  3281. virtual IRoxieServerActivity *queryActivity() { throwUnexpected(); }
  3282. virtual const RemoteActivityId &queryRemoteId() const { throwUnexpected(); }
  3283. };
  3284. //================================================================================================
  3285. class CRoxieIndexReadActivityFactory : public CRoxieIndexActivityFactory
  3286. {
  3287. unsigned steppingOffset;
  3288. public:
  3289. CRoxieIndexReadActivityFactory(IPropertyTree &graphNode, unsigned _subgraphId, IQueryFactory &_queryFactory, HelperFactory *_helperFactory)
  3290. : CRoxieIndexActivityFactory(graphNode, _subgraphId, _queryFactory, _helperFactory)
  3291. {
  3292. Owned<IHThorIndexReadArg> helper = (IHThorIndexReadArg *) helperFactory();
  3293. init(helper, graphNode);
  3294. ISteppingMeta *rawMeta = helper->queryRawSteppingMeta();
  3295. if (rawMeta)
  3296. {
  3297. // MORE - should check all keys in maxFields list can actually be keyed.
  3298. const CFieldOffsetSize * fields = rawMeta->queryFields();
  3299. steppingOffset = fields[0].offset;
  3300. }
  3301. else
  3302. {
  3303. steppingOffset = 0;
  3304. }
  3305. }
  3306. virtual IRoxieSlaveActivity *createActivity(SlaveContextLogger &logctx, IRoxieQueryPacket *packet) const
  3307. {
  3308. return new CRoxieIndexReadActivity(logctx, packet, helperFactory, this, steppingOffset);
  3309. }
  3310. virtual StringBuffer &toString(StringBuffer &s) const
  3311. {
  3312. return CSlaveActivityFactory::toString(s.append("INDEXREAD "));
  3313. }
  3314. };
  3315. ISlaveActivityFactory *createRoxieIndexReadActivityFactory(IPropertyTree &_graphNode, unsigned _subgraphId, IQueryFactory &_queryFactory, HelperFactory *_helperFactory)
  3316. {
  3317. return new CRoxieIndexReadActivityFactory(_graphNode, _subgraphId, _queryFactory, _helperFactory);
  3318. }
  3319. //================================================================================================
  3320. //MORE: Very similar to the indexRead code, but I'm not sure it's worth commoning up....
  3321. class CRoxieIndexNormalizeActivity : public CRoxieIndexActivity
  3322. {
  3323. protected:
  3324. IHThorCompoundNormalizeExtra * normalizeHelper;
  3325. public:
  3326. IMPLEMENT_IINTERFACE;
  3327. CRoxieIndexNormalizeActivity(SlaveContextLogger &_logctx, IRoxieQueryPacket *_packet, HelperFactory *_hFactory, const CRoxieIndexActivityFactory *_aFactory)
  3328. : CRoxieIndexActivity(_logctx, _packet, _hFactory, _aFactory, 0) //MORE - stepping?
  3329. {
  3330. onCreate();
  3331. normalizeHelper = (IHThorIndexNormalizeArg *) basehelper;
  3332. if (resent)
  3333. readContinuationInfo();
  3334. }
  3335. virtual StringBuffer &toString(StringBuffer &ret) const
  3336. {
  3337. return ret.appendf("IndexNormalize %u", packet->queryHeader().activityId);
  3338. }
  3339. virtual bool process()
  3340. {
  3341. MTIME_SECTION(timer, "CRoxieIndexNormalizeActivity ::process");
  3342. unsigned __int64 keyedLimit = normalizeHelper->getKeyedLimit();
  3343. unsigned __int64 rowLimit = normalizeHelper->getRowLimit();
  3344. if (!resent && (keyedLimit != (unsigned __int64) -1) && (indexHelper->getFlags() & TIRcountkeyedlimit) != 0) // Don't recheck the limit every time!
  3345. {
  3346. if (!checkLimit(keyedLimit))
  3347. {
  3348. limitExceeded(true);
  3349. return true;
  3350. }
  3351. }
  3352. unsigned __int64 stopAfter = normalizeHelper->getChooseNLimit();
  3353. Owned<IMessagePacker> output = ROQ->createOutputStream(packet->queryHeader(), false, logctx);
  3354. OptimizedRowBuilder rowBuilder(rowAllocator, meta, output, serializer);
  3355. unsigned totalSizeSent = 0;
  3356. unsigned skipped = 0;
  3357. unsigned processedBefore = processed;
  3358. bool continuationFailed = false;
  3359. while (!aborted && inputsDone < inputCount)
  3360. {
  3361. checkPartChanged(inputData[inputsDone]);
  3362. if (tlk)
  3363. {
  3364. createSegmentMonitors();
  3365. tlk->reset(resent);
  3366. resent = false;
  3367. TransformCallbackAssociation associate(callback, tlk);
  3368. while (!aborted && tlk->lookup(true))
  3369. {
  3370. keyprocessed++;
  3371. if (keyedLimit && processed > keyedLimit)
  3372. {
  3373. logctx.noteStatistic(STATS_ACCEPTED, processed-processedBefore, 1);
  3374. logctx.noteStatistic(STATS_REJECTED, skipped, 1);
  3375. limitExceeded(true);
  3376. break;
  3377. }
  3378. atomic_inc(&indexRecordsRead);
  3379. if (normalizeHelper->first(tlk->queryKeyBuffer(callback.getFPosRef())))
  3380. {
  3381. do
  3382. {
  3383. rowBuilder.ensureRow();
  3384. size32_t transformedSize = normalizeHelper->transform(rowBuilder);
  3385. if (transformedSize)
  3386. {
  3387. processed++;
  3388. if (processed > rowLimit)
  3389. {
  3390. logctx.noteStatistic(STATS_ACCEPTED, processed-processedBefore, 1);
  3391. logctx.noteStatistic(STATS_REJECTED, skipped, 1);
  3392. limitExceeded(false);
  3393. break;
  3394. }
  3395. if (processed > stopAfter)
  3396. {
  3397. logctx.noteStatistic(STATS_ACCEPTED, processed-processedBefore, 1);
  3398. logctx.noteStatistic(STATS_REJECTED, skipped, 1);
  3399. logctx.flush(true, aborted);
  3400. output->flush(true);
  3401. return true;
  3402. }
  3403. totalSizeSent += rowBuilder.writeToOutput(transformedSize, true);
  3404. }
  3405. } while (normalizeHelper->next());
  3406. callback.finishedRow();
  3407. if (totalSizeSent > indexReadChunkSize && !continuationFailed)
  3408. {
  3409. if (sendContinuation(output))
  3410. {
  3411. logctx.noteStatistic(STATS_ACCEPTED, processed-processedBefore, 1);
  3412. logctx.noteStatistic(STATS_REJECTED, skipped, 1);
  3413. return true;
  3414. }
  3415. else
  3416. continuationFailed = true;
  3417. }
  3418. }
  3419. else
  3420. {
  3421. atomic_inc(&postFiltered);
  3422. skipped++;
  3423. }
  3424. }
  3425. }
  3426. inputsDone++;
  3427. }
  3428. if (tlk) // a very early abort can mean it is NULL....
  3429. {
  3430. logctx.noteStatistic(STATS_ACCEPTED, processed-processedBefore, 1);
  3431. logctx.noteStatistic(STATS_REJECTED, skipped, 1);
  3432. }
  3433. logctx.flush(true, aborted);
  3434. if (aborted)
  3435. output->abort();
  3436. else
  3437. output->flush(true);
  3438. return !aborted;
  3439. }
  3440. };
  3441. //================================================================================================
  3442. class CRoxieIndexNormalizeActivityFactory : public CRoxieIndexActivityFactory
  3443. {
  3444. public:
  3445. CRoxieIndexNormalizeActivityFactory(IPropertyTree &graphNode, unsigned _subgraphId, IQueryFactory &_queryFactory, HelperFactory *_helperFactory)
  3446. : CRoxieIndexActivityFactory(graphNode, _subgraphId, _queryFactory, _helperFactory)
  3447. {
  3448. Owned<IHThorIndexNormalizeArg> helper = (IHThorIndexNormalizeArg *) helperFactory();
  3449. init(helper, graphNode);
  3450. }
  3451. virtual IRoxieSlaveActivity *createActivity(SlaveContextLogger &logctx, IRoxieQueryPacket *packet) const
  3452. {
  3453. return new CRoxieIndexNormalizeActivity(logctx, packet, helperFactory, this);
  3454. }
  3455. virtual StringBuffer &toString(StringBuffer &s) const
  3456. {
  3457. return CSlaveActivityFactory::toString(s.append("IndexNormalize "));
  3458. }
  3459. };
  3460. ISlaveActivityFactory *createRoxieIndexNormalizeActivityFactory(IPropertyTree &_graphNode, unsigned _subgraphId, IQueryFactory &_queryFactory, HelperFactory *_helperFactory)
  3461. {
  3462. return new CRoxieIndexNormalizeActivityFactory(_graphNode, _subgraphId, _queryFactory, _helperFactory);
  3463. }
  3464. //================================================================================================
  3465. class CRoxieIndexCountActivity : public CRoxieIndexActivity
  3466. {
  3467. protected:
  3468. IHThorCompoundCountExtra * countHelper;
  3469. IHThorSourceCountLimit * limitHelper;
  3470. unsigned __int64 choosenLimit;
  3471. unsigned __int64 rowLimit;
  3472. unsigned __int64 keyedLimit;
  3473. public:
  3474. IMPLEMENT_IINTERFACE;
  3475. CRoxieIndexCountActivity(SlaveContextLogger &_logctx, IRoxieQueryPacket *_packet, HelperFactory *_hFactory, const CRoxieIndexActivityFactory *_aFactory)
  3476. : CRoxieIndexActivity(_logctx, _packet, _hFactory, _aFactory, 0)
  3477. {
  3478. onCreate();
  3479. countHelper = (IHThorIndexCountArg *) basehelper;
  3480. limitHelper = static_cast<IHThorSourceCountLimit *>(basehelper->selectInterface(TAIsourcecountlimit_1));
  3481. assertex(!resent);
  3482. choosenLimit = countHelper->getChooseNLimit();
  3483. if (limitHelper)
  3484. {
  3485. rowLimit = limitHelper->getRowLimit();
  3486. keyedLimit = limitHelper->getKeyedLimit();
  3487. }
  3488. else
  3489. rowLimit = keyedLimit = (unsigned __int64) -1;
  3490. }
  3491. virtual StringBuffer &toString(StringBuffer &ret) const
  3492. {
  3493. return ret.appendf("IndexCount %u", packet->queryHeader().activityId);
  3494. }
  3495. virtual bool process()
  3496. {
  3497. MTIME_SECTION(timer, "CRoxieIndexCountActivity ::process");
  3498. Owned<IMessagePacker> output = ROQ->createOutputStream(packet->queryHeader(), false, logctx);
  3499. unsigned skipped = 0;
  3500. unsigned processedBefore = processed;
  3501. unsigned __int64 count = 0;
  3502. while (!aborted && inputsDone < inputCount && count < choosenLimit)
  3503. {
  3504. checkPartChanged(inputData[inputsDone]);
  3505. if (tlk)
  3506. {
  3507. createSegmentMonitors();
  3508. tlk->reset(false);
  3509. if (countHelper->hasFilter())
  3510. {
  3511. callback.setManager(tlk);
  3512. while (!aborted && (count < choosenLimit) && tlk->lookup(true))
  3513. {
  3514. keyprocessed++;
  3515. atomic_inc(&indexRecordsRead);
  3516. count += countHelper->numValid(tlk->queryKeyBuffer(callback.getFPosRef()));
  3517. if (count > rowLimit)
  3518. limitExceeded(false);
  3519. else if (count > keyedLimit)
  3520. limitExceeded(true);
  3521. callback.finishedRow();
  3522. }
  3523. callback.setManager(NULL);
  3524. }
  3525. else
  3526. {
  3527. //MORE: GH->RKC There should be value in providing a choosenLimit to getCount()
  3528. //MORE: note that tlk->checkCount() is NOT suitable as it makes assumptions about the segmonitors (only checks leading ones)
  3529. count += tlk->getCount();
  3530. if (count > rowLimit)
  3531. limitExceeded(false);
  3532. else if (count > keyedLimit)
  3533. limitExceeded(true); // MORE - is this right?
  3534. }
  3535. }
  3536. inputsDone++;
  3537. }
  3538. if (!aborted && count)
  3539. {
  3540. if (count > choosenLimit)
  3541. count = choosenLimit;
  3542. processed++;
  3543. assertex(!serializer);
  3544. void *recBuffer = output->getBuffer(meta.getFixedSize(), false);
  3545. if (meta.getFixedSize() == 1)
  3546. *(byte *)recBuffer = (byte)count;
  3547. else
  3548. {
  3549. assertex(meta.getFixedSize() == sizeof(unsigned __int64));
  3550. *(unsigned __int64 *)recBuffer = count;
  3551. }
  3552. output->putBuffer(recBuffer, meta.getFixedSize(), false);
  3553. }
  3554. if (tlk) // a very early abort can mean it is NULL....
  3555. {
  3556. logctx.noteStatistic(STATS_ACCEPTED, processed-processedBefore, 1);
  3557. logctx.noteStatistic(STATS_REJECTED, skipped, 1);
  3558. }
  3559. if (aborted)
  3560. output->abort();
  3561. else
  3562. output->flush(true);
  3563. return !aborted;
  3564. }
  3565. };
  3566. //================================================================================================
  3567. class CRoxieIndexCountActivityFactory : public CRoxieIndexActivityFactory
  3568. {
  3569. public:
  3570. CRoxieIndexCountActivityFactory(IPropertyTree &graphNode, unsigned _subgraphId, IQueryFactory &_queryFactory, HelperFactory *_helperFactory)
  3571. : CRoxieIndexActivityFactory(graphNode, _subgraphId, _queryFactory, _helperFactory)
  3572. {
  3573. Owned<IHThorIndexCountArg> helper = (IHThorIndexCountArg *) helperFactory();
  3574. init(helper, graphNode);
  3575. }
  3576. virtual IRoxieSlaveActivity *createActivity(SlaveContextLogger &logctx, IRoxieQueryPacket *packet) const
  3577. {
  3578. return new CRoxieIndexCountActivity(logctx, packet, helperFactory, this);
  3579. }
  3580. virtual StringBuffer &toString(StringBuffer &s) const
  3581. {
  3582. return CSlaveActivityFactory::toString(s.append("INDEXCOUNT "));
  3583. }
  3584. };
  3585. ISlaveActivityFactory *createRoxieIndexCountActivityFactory(IPropertyTree &_graphNode, unsigned _subgraphId, IQueryFactory &_queryFactory, HelperFactory *_helperFactory)
  3586. {
  3587. return new CRoxieIndexCountActivityFactory(_graphNode, _subgraphId, _queryFactory, _helperFactory);
  3588. }
  3589. //================================================================================================
  3590. class CRoxieIndexAggregateActivity : public CRoxieIndexActivity
  3591. {
  3592. protected:
  3593. IHThorCompoundAggregateExtra * aggregateHelper;
  3594. public:
  3595. IMPLEMENT_IINTERFACE;
  3596. CRoxieIndexAggregateActivity(SlaveContextLogger &_logctx, IRoxieQueryPacket *_packet, HelperFactory *_hFactory, const CRoxieIndexActivityFactory *_aFactory)
  3597. : CRoxieIndexActivity(_logctx, _packet, _hFactory, _aFactory, 0)
  3598. {
  3599. onCreate();
  3600. aggregateHelper = (IHThorIndexAggregateArg *) basehelper;
  3601. assertex(!resent);
  3602. }
  3603. virtual StringBuffer &toString(StringBuffer &ret) const
  3604. {
  3605. return ret.appendf("IndexAggregate %u", packet->queryHeader().activityId);
  3606. }
  3607. virtual bool process()
  3608. {
  3609. MTIME_SECTION(timer, "CRoxieIndexAggregateActivity ::process");
  3610. Owned<IMessagePacker> output = ROQ->createOutputStream(packet->queryHeader(), false, logctx);
  3611. OptimizedRowBuilder rowBuilder(rowAllocator, meta, output, serializer);
  3612. rowBuilder.ensureRow();
  3613. aggregateHelper->clearAggregate(rowBuilder);
  3614. unsigned skipped = 0;
  3615. unsigned processedBefore = processed;
  3616. while (!aborted && inputsDone < inputCount)
  3617. {
  3618. checkPartChanged(inputData[inputsDone]);
  3619. if (tlk)
  3620. {
  3621. createSegmentMonitors();
  3622. tlk->reset(false);
  3623. callback.setManager(tlk);
  3624. while (!aborted && tlk->lookup(true))
  3625. {
  3626. keyprocessed++;
  3627. atomic_inc(&indexRecordsRead);
  3628. aggregateHelper->processRow(rowBuilder, tlk->queryKeyBuffer(callback.getFPosRef()));
  3629. callback.finishedRow();
  3630. }
  3631. callback.setManager(NULL);
  3632. }
  3633. inputsDone++;
  3634. }
  3635. if (!aborted && aggregateHelper->processedAnyRows())
  3636. {
  3637. processed++;
  3638. size32_t transformedSize = meta.getRecordSize(rowBuilder.getSelf());
  3639. rowBuilder.writeToOutput(transformedSize, true);
  3640. }
  3641. if (tlk) // a very early abort can mean it is NULL....
  3642. {
  3643. logctx.noteStatistic(STATS_ACCEPTED, processed-processedBefore, 1);
  3644. logctx.noteStatistic(STATS_REJECTED, skipped, 1);
  3645. }
  3646. logctx.flush(true, aborted);
  3647. if (aborted)
  3648. output->abort();
  3649. else
  3650. output->flush(true);
  3651. return !aborted;
  3652. }
  3653. };
  3654. //================================================================================================
  3655. class CRoxieIndexAggregateActivityFactory : public CRoxieIndexActivityFactory
  3656. {
  3657. public:
  3658. CRoxieIndexAggregateActivityFactory(IPropertyTree &graphNode, unsigned _subgraphId, IQueryFactory &_queryFactory, HelperFactory *_helperFactory)
  3659. : CRoxieIndexActivityFactory(graphNode, _subgraphId, _queryFactory, _helperFactory)
  3660. {
  3661. Owned<IHThorIndexAggregateArg> helper = (IHThorIndexAggregateArg *) helperFactory();
  3662. init(helper, graphNode);
  3663. }
  3664. virtual IRoxieSlaveActivity *createActivity(SlaveContextLogger &logctx, IRoxieQueryPacket *packet) const
  3665. {
  3666. return new CRoxieIndexAggregateActivity(logctx, packet, helperFactory, this);
  3667. }
  3668. virtual StringBuffer &toString(StringBuffer &s) const
  3669. {
  3670. return CSlaveActivityFactory::toString(s.append("INDEXAGGREGATE "));
  3671. }
  3672. };
  3673. ISlaveActivityFactory *createRoxieIndexAggregateActivityFactory(IPropertyTree &_graphNode, unsigned _subgraphId, IQueryFactory &_queryFactory, HelperFactory *_helperFactory)
  3674. {
  3675. return new CRoxieIndexAggregateActivityFactory(_graphNode, _subgraphId, _queryFactory, _helperFactory);
  3676. }
  3677. //================================================================================================
  3678. class CRoxieIndexGroupAggregateActivity : public CRoxieIndexActivity, implements IHThorGroupAggregateCallback
  3679. {
  3680. protected:
  3681. IHThorCompoundGroupAggregateExtra * aggregateHelper;
  3682. RowAggregator results;
  3683. unsigned groupSegCount;
  3684. ThorActivityKind kind;
  3685. public:
  3686. IMPLEMENT_IINTERFACE;
  3687. CRoxieIndexGroupAggregateActivity(SlaveContextLogger &_logctx, IRoxieQueryPacket *_packet, HelperFactory *_hFactory, const CRoxieIndexActivityFactory *_aFactory, ThorActivityKind _kind)
  3688. : CRoxieIndexActivity(_logctx, _packet, _hFactory, _aFactory, 0),
  3689. aggregateHelper((IHThorIndexGroupAggregateArg *) basehelper),
  3690. results(*aggregateHelper, *aggregateHelper), kind(_kind)
  3691. {
  3692. onCreate();
  3693. results.start(rowAllocator);
  3694. assertex(!resent);
  3695. groupSegCount = 0;
  3696. }
  3697. virtual bool needsRowAllocator()
  3698. {
  3699. return true;
  3700. }
  3701. virtual StringBuffer &toString(StringBuffer &ret) const
  3702. {
  3703. return ret.appendf("IndexGroupAggregate %u", packet->queryHeader().activityId);
  3704. }
  3705. virtual void processRow(const void * next)
  3706. {
  3707. results.addRow(next);
  3708. }
  3709. virtual void createSegmentMonitors()
  3710. {
  3711. if (createSegmentMonitorsPending)
  3712. {
  3713. unsigned groupSegSize;
  3714. if ((kind==TAKindexgroupcount || kind==TAKindexgroupexists))
  3715. groupSegSize = aggregateHelper->getGroupSegmentMonitorsSize();
  3716. else
  3717. groupSegSize = 0;
  3718. tlk->setMergeBarrier(groupSegSize);
  3719. CRoxieIndexActivity::createSegmentMonitors();
  3720. if (groupSegSize)
  3721. {
  3722. // MORE - this code should be moved to somewhere common so ccdserver can share it
  3723. unsigned numSegs = tlk->ordinality();
  3724. for (unsigned segNo = 0; segNo < numSegs; segNo++)
  3725. {
  3726. IKeySegmentMonitor *seg = tlk->item(segNo);
  3727. if (seg->getOffset()+seg->getSize()==groupSegSize)
  3728. {
  3729. groupSegCount = segNo+1;
  3730. break;
  3731. }
  3732. }
  3733. assertex(groupSegCount);
  3734. }
  3735. else
  3736. groupSegCount = 0;
  3737. }
  3738. }
  3739. virtual bool process()
  3740. {
  3741. MTIME_SECTION(timer, "CRoxieIndexGroupAggregateActivity ::process");
  3742. Owned<IRowManager> rowManager = roxiemem::createRowManager(0, NULL, logctx, NULL, true); // MORE - should not really use default limits
  3743. Owned<IMessagePacker> output = ROQ->createOutputStream(packet->queryHeader(), false, logctx);
  3744. unsigned processedBefore = processed;
  3745. try
  3746. {
  3747. while (!aborted && inputsDone < inputCount)
  3748. {
  3749. checkPartChanged(inputData[inputsDone]);
  3750. if (tlk)
  3751. {
  3752. createSegmentMonitors();
  3753. tlk->reset(false);
  3754. callback.setManager(tlk);
  3755. while (!aborted && tlk->lookup(true))
  3756. {
  3757. if (groupSegCount && !layoutTranslators->item(lastPartNo.fileNo))
  3758. {
  3759. AggregateRowBuilder &rowBuilder = results.addRow(tlk->queryKeyBuffer(callback.getFPosRef()));
  3760. callback.finishedRow();
  3761. if (kind == TAKindexgroupcount)
  3762. {
  3763. unsigned __int64 count = tlk->getCurrentRangeCount(groupSegCount);
  3764. aggregateHelper->processCountGrouping(rowBuilder, count-1);
  3765. }
  3766. if (!tlk->nextRange(groupSegCount))
  3767. break;
  3768. }
  3769. else
  3770. {
  3771. keyprocessed++;
  3772. atomic_inc(&indexRecordsRead);
  3773. aggregateHelper->processRow(tlk->queryKeyBuffer(callback.getFPosRef()), this);
  3774. callback.finishedRow();
  3775. }
  3776. }
  3777. callback.setManager(NULL);
  3778. }
  3779. inputsDone++;
  3780. }
  3781. if (!aborted)
  3782. {
  3783. loop
  3784. {
  3785. Owned<AggregateRowBuilder> next = results.nextResult();
  3786. if (!next)
  3787. break;
  3788. unsigned rowSize = next->querySize();
  3789. OwnedConstRoxieRow row(next->finalizeRowClear());
  3790. if (serializer)
  3791. {
  3792. serializeRow(output, row);
  3793. }
  3794. else
  3795. {
  3796. void *recBuffer = output->getBuffer(rowSize, meta.isVariableSize());
  3797. memcpy(recBuffer, row, rowSize);
  3798. output->putBuffer(recBuffer, rowSize, meta.isVariableSize());
  3799. }
  3800. }
  3801. }
  3802. }
  3803. catch (...)
  3804. {
  3805. results.reset(); // kill entries before the rowManager dies.
  3806. throw;
  3807. }
  3808. results.reset();
  3809. if (tlk) // a very early abort can mean it is NULL....
  3810. {
  3811. logctx.noteStatistic(STATS_ACCEPTED, processed-processedBefore, 1);
  3812. logctx.noteStatistic(STATS_REJECTED, 0, 1);
  3813. }
  3814. logctx.flush(true, aborted);
  3815. if (aborted)
  3816. output->abort();
  3817. else
  3818. output->flush(true);
  3819. return !aborted;
  3820. }
  3821. };
  3822. //================================================================================================
  3823. class CRoxieIndexGroupAggregateActivityFactory : public CRoxieIndexActivityFactory
  3824. {
  3825. ThorActivityKind kind;
  3826. public:
  3827. CRoxieIndexGroupAggregateActivityFactory(IPropertyTree &graphNode, unsigned _subgraphId, IQueryFactory &_queryFactory, HelperFactory *_helperFactory, ThorActivityKind _kind)
  3828. : CRoxieIndexActivityFactory(graphNode, _subgraphId, _queryFactory, _helperFactory), kind(_kind)
  3829. {
  3830. Owned<IHThorIndexGroupAggregateArg> helper = (IHThorIndexGroupAggregateArg *) helperFactory();
  3831. init(helper, graphNode);
  3832. }
  3833. virtual IRoxieSlaveActivity *createActivity(SlaveContextLogger &logctx, IRoxieQueryPacket *packet) const
  3834. {
  3835. return new CRoxieIndexGroupAggregateActivity(logctx, packet, helperFactory, this, kind);
  3836. }
  3837. virtual StringBuffer &toString(StringBuffer &s) const
  3838. {
  3839. return CSlaveActivityFactory::toString(s.append("INDEXGROUPAGGREGATE "));
  3840. }
  3841. };
  3842. ISlaveActivityFactory *createRoxieIndexGroupAggregateActivityFactory(IPropertyTree &_graphNode, unsigned _subgraphId, IQueryFactory &_queryFactory, HelperFactory *_helperFactory, ThorActivityKind _kind)
  3843. {
  3844. return new CRoxieIndexGroupAggregateActivityFactory(_graphNode, _subgraphId, _queryFactory, _helperFactory, _kind);
  3845. }
  3846. //================================================================================================
  3847. class CRoxieFetchActivityFactory : public CSlaveActivityFactory
  3848. {
  3849. public:
  3850. IMPLEMENT_IINTERFACE;
  3851. Owned<IFileIOArray> fileArray;
  3852. CRoxieFetchActivityFactory(IPropertyTree &_graphNode, unsigned _subgraphId, IQueryFactory &_queryFactory, HelperFactory *_helperFactory)
  3853. : CSlaveActivityFactory(_graphNode, _subgraphId, _queryFactory, _helperFactory)
  3854. {
  3855. Owned<IHThorFetchBaseArg> helper = (IHThorFetchBaseArg *) helperFactory();
  3856. IHThorFetchContext * fetchContext = static_cast<IHThorFetchContext *>(helper->selectInterface(TAIfetchcontext_1));
  3857. bool variableFileName = allFilesDynamic || ((fetchContext->getFetchFlags() & (FFvarfilename|FFdynamicfilename)) != 0);
  3858. if (!variableFileName)
  3859. {
  3860. bool isOpt = (fetchContext->getFetchFlags() & FFdatafileoptional) != 0;
  3861. OwnedRoxieString fname(fetchContext->getFileName());
  3862. datafile.setown(_queryFactory.queryPackage().lookupFileName(fname, isOpt, true, _queryFactory.queryWorkUnit()));
  3863. if (datafile)
  3864. fileArray.setown(datafile->getIFileIOArray(isOpt, queryFactory.queryChannel()));
  3865. }
  3866. }
  3867. virtual IRoxieSlaveActivity *createActivity(SlaveContextLogger &logctx, IRoxieQueryPacket *packet) const;
  3868. virtual StringBuffer &toString(StringBuffer &s) const
  3869. {
  3870. return CSlaveActivityFactory::toString(s.append("FETCH "));
  3871. }
  3872. inline IFileIO *getFilePart(unsigned partNo, offset_t &_base) const
  3873. {
  3874. return fileArray->getFilePart(partNo, _base);
  3875. }
  3876. };
  3877. class CRoxieFetchActivityBase : public CRoxieSlaveActivity
  3878. {
  3879. protected:
  3880. IHThorFetchBaseArg *helper;
  3881. IHThorFetchContext * fetchContext;
  3882. const CRoxieFetchActivityFactory *factory;
  3883. Owned<IFileIO> rawFile;
  3884. Owned<ISerialStream> rawStream;
  3885. CThorStreamDeserializerSource deserializeSource;
  3886. offset_t base;
  3887. char *inputData;
  3888. char *inputLimit;
  3889. Owned<IFileIOArray> varFiles;
  3890. bool needsRHS;
  3891. virtual size32_t doFetch(ARowBuilder & rowBuilder, offset_t pos, offset_t rawpos, void *inputData) = 0;
  3892. public:
  3893. IMPLEMENT_IINTERFACE;
  3894. CRoxieFetchActivityBase(SlaveContextLogger &_logctx, IRoxieQueryPacket *_packet, HelperFactory *_hFactory, const CRoxieFetchActivityFactory *_aFactory)
  3895. : CRoxieSlaveActivity(_logctx, _packet, _hFactory, _aFactory), factory(_aFactory)
  3896. {
  3897. helper = (IHThorFetchBaseArg *) basehelper;
  3898. fetchContext = static_cast<IHThorFetchContext *>(helper->selectInterface(TAIfetchcontext_1));
  3899. base = 0;
  3900. variableFileName = allFilesDynamic || ((fetchContext->getFetchFlags() & (FFvarfilename|FFdynamicfilename)) != 0);
  3901. isOpt = (fetchContext->getFetchFlags() & FFdatafileoptional) != 0;
  3902. onCreate();
  3903. inputData = (char *) serializedCreate.readDirect(0);
  3904. inputLimit = inputData + (serializedCreate.length() - serializedCreate.getPos());
  3905. needsRHS = helper->transformNeedsRhs();
  3906. }
  3907. virtual const char *queryDynamicFileName() const
  3908. {
  3909. return fetchContext->getFileName();
  3910. }
  3911. virtual void setVariableFileInfo()
  3912. {
  3913. varFiles.setown(varFileInfo->getIFileIOArray(isOpt, packet->queryHeader().channel));
  3914. }
  3915. virtual bool process();
  3916. virtual StringBuffer &toString(StringBuffer &ret) const
  3917. {
  3918. return ret.appendf("Fetch %u", packet->queryHeader().activityId);
  3919. }
  3920. virtual void setPartNo(bool filechanged);
  3921. };
  3922. bool CRoxieFetchActivityBase::process()
  3923. {
  3924. MTIME_SECTION(timer, "CRoxieFetchActivityBase::process");
  3925. Owned<IMessagePacker> output = ROQ->createOutputStream(packet->queryHeader(), false, logctx);
  3926. unsigned accepted = 0;
  3927. unsigned rejected = 0;
  3928. unsigned __int64 rowLimit = helper->getRowLimit();
  3929. OptimizedRowBuilder rowBuilder(rowAllocator, meta, output, serializer);
  3930. while (!aborted && inputData < inputLimit)
  3931. {
  3932. checkPartChanged(*(PartNoType *) inputData);
  3933. inputData += sizeof(PartNoType);
  3934. offset_t rp = *(offset_t *)inputData;
  3935. inputData += sizeof(offset_t);
  3936. unsigned rhsSize;
  3937. if (needsRHS)
  3938. {
  3939. rhsSize = *(unsigned *)inputData;
  3940. inputData += sizeof(unsigned);
  3941. }
  3942. else
  3943. rhsSize = 0;
  3944. offset_t pos;
  3945. if (isLocalFpos(rp))
  3946. pos = getLocalFposOffset(rp);
  3947. else
  3948. pos = rp-base;
  3949. unsigned thisSize = doFetch(rowBuilder, pos, rp, inputData);
  3950. inputData += rhsSize;
  3951. if (thisSize)
  3952. {
  3953. rowBuilder.writeToOutput(thisSize, true);
  3954. accepted++;
  3955. if (accepted > rowLimit)
  3956. {
  3957. logctx.noteStatistic(STATS_DISK_SEEKS, accepted+rejected, 1);
  3958. logctx.noteStatistic(STATS_ACCEPTED, accepted, 1);
  3959. logctx.noteStatistic(STATS_REJECTED, rejected, 1);
  3960. limitExceeded();
  3961. return true;
  3962. }
  3963. }
  3964. else
  3965. rejected++;
  3966. }
  3967. logctx.noteStatistic(STATS_DISK_SEEKS, accepted+rejected, 1);
  3968. logctx.noteStatistic(STATS_ACCEPTED, accepted, 1);
  3969. logctx.noteStatistic(STATS_REJECTED, rejected, 1);
  3970. logctx.flush(true, aborted);
  3971. if (aborted)
  3972. output->abort();
  3973. else
  3974. output->flush(true);
  3975. return !aborted;
  3976. }
  3977. class CRoxieFetchActivity : public CRoxieFetchActivityBase
  3978. {
  3979. Owned<IEngineRowAllocator> diskAllocator;
  3980. Owned<IOutputRowDeserializer> rowDeserializer;
  3981. public:
  3982. CRoxieFetchActivity(SlaveContextLogger &_logctx, IRoxieQueryPacket *_packet, HelperFactory *_hFactory, const CRoxieFetchActivityFactory *_aFactory)
  3983. : CRoxieFetchActivityBase(_logctx, _packet, _hFactory, _aFactory)
  3984. {
  3985. IHThorFetchContext * fetchContext = static_cast<IHThorFetchContext *>(helper->selectInterface(TAIfetchcontext_1));
  3986. IOutputMetaData *diskMeta = fetchContext->queryDiskRecordSize();
  3987. diskAllocator.setown(getRowAllocator(diskMeta, basefactory->queryId()));
  3988. rowDeserializer.setown(diskMeta->createDiskDeserializer(queryContext->queryCodeContext(), basefactory->queryId()));
  3989. }
  3990. virtual size32_t doFetch(ARowBuilder & rowBuilder, offset_t pos, offset_t rawpos, void *inputData)
  3991. {
  3992. RtlDynamicRowBuilder diskRowBuilder(diskAllocator);
  3993. deserializeSource.reset(pos);
  3994. unsigned sizeRead = rowDeserializer->deserialize(diskRowBuilder.ensureRow(), deserializeSource);
  3995. OwnedConstRoxieRow rawBuffer = diskRowBuilder.finalizeRowClear(sizeRead);
  3996. // note the swapped parameters - left and right map to input and raw differently for JOIN vs FETCH
  3997. IHThorFetchArg *h = (IHThorFetchArg *) helper;
  3998. return h->transform(rowBuilder, rawBuffer, inputData, rawpos);
  3999. }
  4000. };
  4001. IRoxieSlaveActivity *CRoxieFetchActivityFactory::createActivity(SlaveContextLogger &logctx, IRoxieQueryPacket *packet) const
  4002. {
  4003. return new CRoxieFetchActivity(logctx, packet, helperFactory, this);
  4004. }
  4005. //------------------------------------------------------------------------------------
  4006. class CRoxieCSVFetchActivity : public CRoxieFetchActivityBase
  4007. {
  4008. CSVSplitter csvSplitter;
  4009. public:
  4010. CRoxieCSVFetchActivity(SlaveContextLogger &_logctx, IRoxieQueryPacket *_packet, HelperFactory *_hFactory, const CRoxieFetchActivityFactory *_aFactory, unsigned _maxColumns)
  4011. : CRoxieFetchActivityBase(_logctx, _packet, _hFactory, _aFactory)
  4012. {
  4013. const char * quotes = NULL;
  4014. const char * separators = NULL;
  4015. const char * terminators = NULL;
  4016. const char * escapes = NULL;
  4017. const IResolvedFile *fileInfo = varFileInfo ? varFileInfo : factory->datafile;
  4018. if (fileInfo)
  4019. {
  4020. const IPropertyTree *options = fileInfo->queryProperties();
  4021. if (options)
  4022. {
  4023. quotes = options->queryProp("@csvQuote");
  4024. separators = options->queryProp("@csvSeparate");
  4025. terminators = options->queryProp("@csvTerminate");
  4026. escapes = options->queryProp("@csvEscape");
  4027. }
  4028. }
  4029. IHThorCsvFetchArg *h = (IHThorCsvFetchArg *) helper;
  4030. ICsvParameters *csvInfo = h->queryCsvParameters();
  4031. csvSplitter.init(_maxColumns, csvInfo, quotes, separators, terminators, escapes);
  4032. }
  4033. virtual size32_t doFetch(ARowBuilder & rowBuilder, offset_t pos, offset_t rawpos, void *inputData)
  4034. {
  4035. IHThorCsvFetchArg *h = (IHThorCsvFetchArg *) helper;
  4036. rawStream->reset(pos);
  4037. size32_t rowSize = 4096; // MORE - make configurable
  4038. size32_t maxRowSize = 10*1024*1024; // MORE - make configurable
  4039. loop
  4040. {
  4041. size32_t avail;
  4042. const void *peek = rawStream->peek(rowSize, avail);
  4043. if (csvSplitter.splitLine(avail, (const byte *)peek) < rowSize || avail < rowSize)
  4044. break;
  4045. if (rowSize == maxRowSize)
  4046. throw MakeStringException(0, "Row too big");
  4047. if (rowSize >= maxRowSize/2)
  4048. rowSize = maxRowSize;
  4049. else
  4050. rowSize += rowSize;
  4051. }
  4052. return h->transform(rowBuilder, csvSplitter.queryLengths(), (const char * *)csvSplitter.queryData(), inputData, rawpos);
  4053. }
  4054. };
  4055. class CRoxieXMLFetchActivity : public CRoxieFetchActivityBase, implements IXMLSelect
  4056. {
  4057. Owned<IXMLParse> parser;
  4058. Owned<IColumnProvider> lastMatch;
  4059. Owned<IFileIOStream> rawStreamX;
  4060. unsigned streamBufferSize;
  4061. public:
  4062. IMPLEMENT_IINTERFACE;
  4063. CRoxieXMLFetchActivity(SlaveContextLogger &_logctx, IRoxieQueryPacket *_packet, HelperFactory *_hFactory, const CRoxieFetchActivityFactory *_aFactory, unsigned _streamBufferSize)
  4064. : CRoxieFetchActivityBase(_logctx, _packet, _hFactory, _aFactory),
  4065. streamBufferSize(_streamBufferSize)
  4066. {
  4067. }
  4068. virtual size32_t doFetch(ARowBuilder & rowBuilder, offset_t pos, offset_t rawpos, void *inputData)
  4069. {
  4070. rawStreamX->seek(pos, IFSbegin);
  4071. try
  4072. {
  4073. while(!lastMatch)
  4074. if(!parser->next())
  4075. throw MakeStringException(ROXIE_RECORD_FETCH_ERROR, "XML parse error at position %"I64F"d", pos);
  4076. IHThorXmlFetchArg *h = (IHThorXmlFetchArg *) helper;
  4077. unsigned thisSize = h->transform(rowBuilder, lastMatch, inputData, rawpos);
  4078. lastMatch.clear();
  4079. parser->reset();
  4080. return thisSize;
  4081. }
  4082. catch (IException *E)
  4083. {
  4084. ::Release(E);
  4085. throw MakeStringException(ROXIE_RECORD_FETCH_ERROR, "XML parse error at position %"I64F"d", pos);
  4086. }
  4087. }
  4088. virtual void match(IColumnProvider & entry, offset_t startOffset, offset_t endOffset)
  4089. {
  4090. lastMatch.set(&entry);
  4091. }
  4092. virtual void setPartNo(bool filechanged)
  4093. {
  4094. CRoxieFetchActivityBase::setPartNo(filechanged);
  4095. rawStreamX.setown(createBufferedIOStream(rawFile, streamBufferSize));
  4096. parser.setown(createXMLParse(*rawStreamX, "/", *this));
  4097. }
  4098. };
  4099. void CRoxieFetchActivityBase::setPartNo(bool filechanged)
  4100. {
  4101. rawFile.setown(variableFileName ? varFiles->getFilePart(lastPartNo.partNo, base) : factory->getFilePart(lastPartNo.partNo, base)); // MORE - superfiles
  4102. assertex(rawFile != NULL);
  4103. rawStream.setown(createFileSerialStream(rawFile, 0, -1, 0));
  4104. deserializeSource.setStream(rawStream);
  4105. }
  4106. class CRoxieCSVFetchActivityFactory : public CRoxieFetchActivityFactory
  4107. {
  4108. unsigned maxColumns;
  4109. public:
  4110. CRoxieCSVFetchActivityFactory(IPropertyTree &_graphNode, unsigned _subgraphId, IQueryFactory &_queryFactory, HelperFactory *_helperFactory)
  4111. : CRoxieFetchActivityFactory(_graphNode, _subgraphId, _queryFactory, _helperFactory)
  4112. {
  4113. Owned<IHThorCsvFetchArg> helper = (IHThorCsvFetchArg*) helperFactory();
  4114. maxColumns = helper->getMaxColumns();
  4115. ICsvParameters *csvInfo = helper->queryCsvParameters();
  4116. assertex(!csvInfo->queryEBCDIC());
  4117. }
  4118. virtual IRoxieSlaveActivity *createActivity(SlaveContextLogger &logctx, IRoxieQueryPacket *packet) const
  4119. {
  4120. return new CRoxieCSVFetchActivity(logctx, packet, helperFactory, this, maxColumns);
  4121. }
  4122. };
  4123. class CRoxieXMLFetchActivityFactory : public CRoxieFetchActivityFactory
  4124. {
  4125. public:
  4126. CRoxieXMLFetchActivityFactory(IPropertyTree &_graphNode, unsigned _subgraphId, IQueryFactory &_queryFactory, HelperFactory *_helperFactory)
  4127. : CRoxieFetchActivityFactory(_graphNode, _subgraphId, _queryFactory, _helperFactory)
  4128. {
  4129. }
  4130. virtual IRoxieSlaveActivity *createActivity(SlaveContextLogger &logctx, IRoxieQueryPacket *packet) const
  4131. {
  4132. return new CRoxieXMLFetchActivity(logctx, packet, helperFactory, this, 4096);
  4133. }
  4134. };
  4135. ISlaveActivityFactory *createRoxieFetchActivityFactory(IPropertyTree &_graphNode, unsigned _subgraphId, IQueryFactory &_queryFactory, HelperFactory *_helperFactory)
  4136. {
  4137. return new CRoxieFetchActivityFactory(_graphNode, _subgraphId, _queryFactory, _helperFactory);
  4138. }
  4139. ISlaveActivityFactory *createRoxieCSVFetchActivityFactory(IPropertyTree &_graphNode, unsigned _subgraphId, IQueryFactory &_queryFactory, HelperFactory *_helperFactory)
  4140. {
  4141. return new CRoxieCSVFetchActivityFactory(_graphNode, _subgraphId, _queryFactory, _helperFactory);
  4142. }
  4143. ISlaveActivityFactory *createRoxieXMLFetchActivityFactory(IPropertyTree &_graphNode, unsigned _subgraphId, IQueryFactory &_queryFactory, HelperFactory *_helperFactory)
  4144. {
  4145. return new CRoxieXMLFetchActivityFactory(_graphNode, _subgraphId, _queryFactory, _helperFactory);
  4146. }
  4147. //================================================================================================
  4148. class CRoxieKeyedJoinIndexActivityFactory : public CRoxieKeyedActivityFactory
  4149. {
  4150. public:
  4151. IMPLEMENT_IINTERFACE;
  4152. CRoxieKeyedJoinIndexActivityFactory(IPropertyTree &_graphNode, unsigned _subgraphId, IQueryFactory &_queryFactory, HelperFactory *_helperFactory)
  4153. : CRoxieKeyedActivityFactory(_graphNode, _subgraphId, _queryFactory, _helperFactory)
  4154. {
  4155. Owned<IHThorKeyedJoinArg> helper = (IHThorKeyedJoinArg *) helperFactory();
  4156. rtlDataAttr indexLayoutMeta;
  4157. size32_t indexLayoutSize;
  4158. if(!helper->getIndexLayout(indexLayoutSize, indexLayoutMeta.refdata()))
  4159. assertex(indexLayoutSize== 0);
  4160. MemoryBuffer m;
  4161. m.setBuffer(indexLayoutSize, indexLayoutMeta.getdata());
  4162. activityMeta.setown(deserializeRecordMeta(m, true));
  4163. layoutTranslators.setown(new TranslatorArray);
  4164. bool variableFileName = allFilesDynamic || ((helper->getJoinFlags() & (JFvarindexfilename|JFdynamicindexfilename)) != 0);
  4165. if (!variableFileName)
  4166. {
  4167. bool isOpt = (helper->getJoinFlags() & JFindexoptional) != 0;
  4168. OwnedRoxieString indexFileName(helper->getIndexFileName());
  4169. datafile.setown(_queryFactory.queryPackage().lookupFileName(indexFileName, isOpt, true, _queryFactory.queryWorkUnit()));
  4170. if (datafile)
  4171. keyArray.setown(datafile->getKeyArray(activityMeta, layoutTranslators, isOpt, queryFactory.queryChannel(), queryFactory.getEnableFieldTranslation()));
  4172. }
  4173. }
  4174. virtual IRoxieSlaveActivity *createActivity(SlaveContextLogger &logctx, IRoxieQueryPacket *packet) const;
  4175. virtual StringBuffer &toString(StringBuffer &s) const
  4176. {
  4177. return CSlaveActivityFactory::toString(s.append("KEYEDJOIN INDEX "));
  4178. }
  4179. };
  4180. class CRoxieKeyedJoinIndexActivity : public CRoxieKeyedActivity
  4181. {
  4182. IHThorKeyedJoinArg *helper;
  4183. const CRoxieKeyedJoinIndexActivityFactory *factory;
  4184. unsigned inputLength;
  4185. char *inputData;
  4186. Owned<IRoxieSlaveActivity> rootIndexActivity;
  4187. IIndexReadActivityInfo *rootIndex;
  4188. unsigned processed;
  4189. unsigned candidateCount;
  4190. unsigned keepCount;
  4191. unsigned inputDone;
  4192. public:
  4193. IMPLEMENT_IINTERFACE;
  4194. CRoxieKeyedJoinIndexActivity(SlaveContextLogger &_logctx, IRoxieQueryPacket *_packet, HelperFactory *_hFactory, const CRoxieKeyedJoinIndexActivityFactory *_aFactory)
  4195. : factory(_aFactory), CRoxieKeyedActivity(_logctx, _packet, _hFactory, _aFactory)
  4196. {
  4197. helper = (IHThorKeyedJoinArg *) basehelper;
  4198. variableFileName = allFilesDynamic || ((helper->getJoinFlags() & (JFvarindexfilename|JFdynamicindexfilename)) != 0);
  4199. inputDone = 0;
  4200. processed = 0;
  4201. candidateCount = 0;
  4202. keepCount = 0;
  4203. rootIndex = NULL;
  4204. onCreate();
  4205. inputData = (char *) serializedCreate.readDirect(0);
  4206. inputLength = (serializedCreate.length() - serializedCreate.getPos());
  4207. if (resent)
  4208. {
  4209. resentInfo.read(inputDone);
  4210. inputData += inputDone;
  4211. resentInfo.read(processed);
  4212. resentInfo.read(candidateCount);
  4213. resentInfo.read(keepCount);
  4214. resentInfo.read(lastPartNo.partNo);
  4215. resentInfo.read(lastPartNo.fileNo);
  4216. setPartNo(true);
  4217. tlk->deserializeCursorPos(resentInfo);
  4218. assertex(resentInfo.remaining() == 0);
  4219. }
  4220. }
  4221. ~CRoxieKeyedJoinIndexActivity()
  4222. {
  4223. }
  4224. virtual void deserializeExtra(MemoryBuffer &buff)
  4225. {
  4226. if (helper->getJoinFlags() & JFindexfromactivity)
  4227. {
  4228. RemoteActivityId indexActivityId(buff);
  4229. assertex(indexActivityId.activityId);
  4230. unsigned indexCtxLen;
  4231. buff.read(indexCtxLen);
  4232. const void *indexCtx = buff.readDirect(indexCtxLen);
  4233. //We create a packet for the index activity to use. Header, trace info and parentextract are clone of mine, context info is copied from buff
  4234. MemoryBuffer indexPacketData;
  4235. indexPacketData.append(sizeof(RoxiePacketHeader), &packet->queryHeader());
  4236. indexPacketData.append(packet->getTraceLength(), packet->queryTraceInfo());
  4237. const byte *parentExtract = (const byte *) packet->queryContextData();
  4238. unsigned parentExtractLen = *(unsigned*) parentExtract;
  4239. indexPacketData.append(parentExtractLen);
  4240. indexPacketData.append(parentExtractLen, parentExtract + sizeof(unsigned));
  4241. indexPacketData.append(indexCtxLen, indexCtx);
  4242. RoxiePacketHeader *newHeader = (RoxiePacketHeader *) indexPacketData.toByteArray();
  4243. newHeader->continueSequence = 0;
  4244. newHeader->activityId = indexActivityId.activityId;
  4245. newHeader->queryHash = indexActivityId.queryHash;
  4246. Owned<IRoxieQueryPacket> indexPacket = createRoxiePacket(indexPacketData);
  4247. Owned<ISlaveActivityFactory> indexActivityFactory = factory->queryQueryFactory().getSlaveActivityFactory(indexActivityId.activityId);
  4248. assertex(indexActivityFactory != NULL);
  4249. rootIndexActivity.setown(indexActivityFactory->createActivity(logctx, indexPacket));
  4250. rootIndex = rootIndexActivity->queryIndexReadActivity();
  4251. varFileInfo.setown(rootIndex->getVarFileInfo());
  4252. layoutTranslators.setown(rootIndex->getTranslators());
  4253. keyArray.setown(rootIndex->getKeySet());
  4254. }
  4255. }
  4256. virtual const char *queryDynamicFileName() const
  4257. {
  4258. return helper->getIndexFileName();
  4259. }
  4260. virtual bool process();
  4261. virtual StringBuffer &toString(StringBuffer &ret) const
  4262. {
  4263. return ret.appendf("KeyedJoinIndex %u", packet->queryHeader().activityId);
  4264. }
  4265. virtual void createSegmentMonitors()
  4266. {
  4267. // This is called to create the segmonitors that apply to ALL rows - not the per-row ones
  4268. // At present there are none. However we should still set up the layout translation.
  4269. if (createSegmentMonitorsPending)
  4270. {
  4271. createSegmentMonitorsPending = false;
  4272. tlk->setLayoutTranslator(layoutTranslators->item(lastPartNo.fileNo));
  4273. }
  4274. }
  4275. };
  4276. IRoxieSlaveActivity *CRoxieKeyedJoinIndexActivityFactory::createActivity(SlaveContextLogger &logctx, IRoxieQueryPacket *packet) const
  4277. {
  4278. return new CRoxieKeyedJoinIndexActivity(logctx, packet, helperFactory, this);
  4279. }
  4280. bool CRoxieKeyedJoinIndexActivity::process()
  4281. {
  4282. MTIME_SECTION(timer, "CRoxieKeyedJoinIndexActivity::process");
  4283. Owned<IMessagePacker> output = ROQ->createOutputStream(packet->queryHeader(), false, logctx);
  4284. IOutputMetaData *joinFieldsMeta = helper->queryJoinFieldsRecordSize();
  4285. Owned<IEngineRowAllocator> joinFieldsAllocator = getRowAllocator(joinFieldsMeta, basefactory->queryId());
  4286. OptimizedKJRowBuilder rowBuilder(joinFieldsAllocator, joinFieldsMeta, output);
  4287. unsigned __int64 rowLimit = helper->getRowLimit();
  4288. unsigned atmost = helper->getJoinLimit();
  4289. if (!atmost) atmost = (unsigned) -1;
  4290. unsigned abortLimit = helper->getMatchAbortLimit();
  4291. if (!abortLimit) abortLimit = (unsigned) -1;
  4292. if (abortLimit < atmost)
  4293. atmost = abortLimit;
  4294. unsigned keepLimit = helper->getKeepLimit();
  4295. unsigned joinFlags = helper->getJoinFlags();
  4296. if (joinFlags & (JFtransformMaySkip | JFfetchMayFilter))
  4297. keepLimit = 0;
  4298. if ((joinFlags & (JFexclude|JFleftouter)) == (JFexclude|JFleftouter) && (!(joinFlags & JFfetchMayFilter))) // For left-only joins, all we care about is existance of a match. Return as soon as we know that there is one
  4299. keepLimit = 1;
  4300. unsigned processedBefore = processed;
  4301. unsigned rejected = 0;
  4302. CachedOutputMetaData inputFields(helper->queryIndexReadInputRecordSize());
  4303. unsigned inputSize = inputFields.getFixedSize();
  4304. unsigned totalSizeSent = 0;
  4305. // Now go fetch the records
  4306. bool continuationFailed = false;
  4307. while (!aborted && inputDone < inputLength)
  4308. {
  4309. checkPartChanged(*(PartNoType *) inputData);
  4310. CJoinGroup *jg = *(CJoinGroup **) (inputData + sizeof(PartNoType)); // NOTE - this is a pointer in Roxie server's address space - don't go following it!
  4311. char *inputRow = inputData+sizeof(PartNoType)+sizeof(const void *);
  4312. if (inputFields.isVariableSize())
  4313. {
  4314. inputSize = *(unsigned *)inputRow;
  4315. inputRow += sizeof(unsigned);
  4316. }
  4317. if (tlk)
  4318. {
  4319. createSegmentMonitors();
  4320. helper->createSegmentMonitors(tlk, inputRow);
  4321. if (rootIndex)
  4322. rootIndex->mergeSegmentMonitors(tlk);
  4323. tlk->finishSegmentMonitors();
  4324. if (logctx.queryTraceLevel() >= 20)
  4325. {
  4326. StringBuffer out;
  4327. printKeyedValues(out, tlk, helper->queryIndexRecordSize());
  4328. logctx.CTXLOG("Using filter %s", out.str());
  4329. }
  4330. if (!resent && (atmost != (unsigned) -1) && ((atmost > preabortKeyedJoinsThreshold) || (joinFlags & JFcountmatchabortlimit) || (keepLimit != 0)))
  4331. {
  4332. unsigned __int64 precount = tlk->checkCount(atmost);
  4333. if (precount > atmost)
  4334. {
  4335. candidateCount = atmost+1;
  4336. if (logctx.queryTraceLevel() > 5)
  4337. logctx.CTXLOG("Pre-aborting since candidate count is at least %"I64F"d", precount);
  4338. }
  4339. else
  4340. {
  4341. if (logctx.queryTraceLevel() > 10)
  4342. logctx.CTXLOG("NOT Pre-aborting since candidate count is %"I64F"d", precount);
  4343. tlk->reset(false);
  4344. }
  4345. }
  4346. else
  4347. tlk->reset(resent);
  4348. resent = false;
  4349. while (candidateCount <= atmost)
  4350. {
  4351. if (tlk->lookup(true))
  4352. {
  4353. candidateCount++;
  4354. atomic_inc(&indexRecordsRead);
  4355. KLBlobProviderAdapter adapter(tlk);
  4356. offset_t recptr;
  4357. const byte *indexRow = tlk->queryKeyBuffer(recptr);
  4358. if (helper->indexReadMatch(inputRow, indexRow, recptr, &adapter))
  4359. {
  4360. processed++;
  4361. if (keepLimit)
  4362. {
  4363. keepCount++;
  4364. if (keepCount > keepLimit)
  4365. break;
  4366. }
  4367. if (processed > rowLimit)
  4368. {
  4369. if (logctx.queryTraceLevel() > 1)
  4370. {
  4371. StringBuffer s;
  4372. logctx.CTXLOG("limit exceeded for %s", packet->queryHeader().toString(s).str());
  4373. }
  4374. logctx.noteStatistic(STATS_ACCEPTED, processed-processedBefore, 1);
  4375. logctx.noteStatistic(STATS_REJECTED, rejected, 1);
  4376. limitExceeded();
  4377. return true;
  4378. }
  4379. unsigned totalSize = 0;
  4380. if (helper->diskAccessRequired())
  4381. {
  4382. const void *self = output->getBuffer(KEYEDJOIN_RECORD_SIZE(0), true);
  4383. KeyedJoinHeader *rec = (KeyedJoinHeader *) self;
  4384. rec->fpos = recptr;
  4385. rec->thisGroup = jg;
  4386. rec->partNo = lastPartNo.partNo;
  4387. output->putBuffer(self, KEYEDJOIN_RECORD_SIZE(0), true);
  4388. }
  4389. else
  4390. {
  4391. KLBlobProviderAdapter adapter(tlk);
  4392. totalSize = helper->extractJoinFields(rowBuilder, indexRow, recptr, &adapter);
  4393. rowBuilder.writeToOutput(totalSize, recptr, jg, lastPartNo.partNo);
  4394. }
  4395. totalSizeSent += KEYEDJOIN_RECORD_SIZE(totalSize);
  4396. if (totalSizeSent > indexReadChunkSize && !continuationFailed)
  4397. {
  4398. MemoryBuffer si;
  4399. unsigned short siLen = 0;
  4400. si.append(siLen);
  4401. si.append(inputDone);
  4402. si.append(processed);
  4403. si.append(candidateCount);
  4404. si.append(keepCount);
  4405. si.append(lastPartNo.partNo);
  4406. si.append(lastPartNo.fileNo);
  4407. tlk->serializeCursorPos(si);
  4408. if (si.length() <= maxContinuationSize)
  4409. {
  4410. siLen = si.length() - sizeof(siLen);
  4411. si.writeDirect(0, sizeof(siLen), &siLen);
  4412. output->sendMetaInfo(si.toByteArray(), si.length());
  4413. logctx.flush(true, aborted);
  4414. output->flush(true);
  4415. logctx.noteStatistic(STATS_ACCEPTED, processed-processedBefore, 1);
  4416. logctx.noteStatistic(STATS_REJECTED, rejected, 1);
  4417. return true;
  4418. }
  4419. else
  4420. continuationFailed = true;
  4421. }
  4422. }
  4423. else
  4424. {
  4425. rejected++;
  4426. atomic_inc(&postFiltered);
  4427. }
  4428. }
  4429. else
  4430. break;
  4431. }
  4432. tlk->releaseSegmentMonitors();
  4433. }
  4434. // output an end marker for the matches to this group
  4435. KeyedJoinHeader *rec = (KeyedJoinHeader *) output->getBuffer(KEYEDJOIN_RECORD_SIZE(0), true);
  4436. rec->fpos = candidateCount;
  4437. rec->thisGroup = jg;
  4438. rec->partNo = (unsigned short) -1;
  4439. output->putBuffer(rec, KEYEDJOIN_RECORD_SIZE(0), true);
  4440. totalSizeSent += KEYEDJOIN_RECORD_SIZE(0); // note - don't interrupt here though - too complicated.
  4441. candidateCount = 0;
  4442. keepCount = 0;
  4443. inputData = inputRow + inputSize;
  4444. inputDone += sizeof(PartNoType) + sizeof(const void *);
  4445. if (inputFields.isVariableSize())
  4446. inputDone += sizeof(unsigned);
  4447. inputDone += inputSize;
  4448. }
  4449. if (tlk)
  4450. {
  4451. logctx.noteStatistic(STATS_ACCEPTED, processed-processedBefore, 1);
  4452. logctx.noteStatistic(STATS_REJECTED, rejected, 1);
  4453. }
  4454. logctx.flush(true, aborted);
  4455. if (aborted)
  4456. output->abort();
  4457. else
  4458. output->flush(true);
  4459. return !aborted;
  4460. }
  4461. //================================================================================================
  4462. ISlaveActivityFactory *createRoxieKeyedJoinIndexActivityFactory(IPropertyTree &_graphNode, unsigned _subgraphId, IQueryFactory &_queryFactory, HelperFactory *_helperFactory)
  4463. {
  4464. return new CRoxieKeyedJoinIndexActivityFactory(_graphNode, _subgraphId, _queryFactory, _helperFactory);
  4465. }
  4466. //================================================================================================
  4467. class CRoxieKeyedJoinFetchActivityFactory : public CSlaveActivityFactory
  4468. {
  4469. public:
  4470. IMPLEMENT_IINTERFACE;
  4471. Owned<IFileIOArray> fileArray;
  4472. CRoxieKeyedJoinFetchActivityFactory(IPropertyTree &_graphNode, unsigned _subgraphId, IQueryFactory &_queryFactory, HelperFactory *_helperFactory)
  4473. : CSlaveActivityFactory(_graphNode, _subgraphId, _queryFactory, _helperFactory)
  4474. {
  4475. Owned<IHThorKeyedJoinArg> helper = (IHThorKeyedJoinArg *) helperFactory();
  4476. assertex(helper->diskAccessRequired());
  4477. bool variableFileName = allFilesDynamic || ((helper->getFetchFlags() & (FFvarfilename|FFdynamicfilename)) != 0);
  4478. if (!variableFileName)
  4479. {
  4480. bool isOpt = (helper->getFetchFlags() & FFdatafileoptional) != 0;
  4481. OwnedRoxieString fileName(helper->getFileName());
  4482. datafile.setown(_queryFactory.queryPackage().lookupFileName(fileName, isOpt, true, _queryFactory.queryWorkUnit()));
  4483. if (datafile)
  4484. fileArray.setown(datafile->getIFileIOArray(isOpt, queryFactory.queryChannel()));
  4485. }
  4486. }
  4487. virtual IRoxieSlaveActivity *createActivity(SlaveContextLogger &logctx, IRoxieQueryPacket *packet) const;
  4488. virtual StringBuffer &toString(StringBuffer &s) const
  4489. {
  4490. return CSlaveActivityFactory::toString(s.append("KEYEDJOIN FETCH "));
  4491. }
  4492. IFileIO *getFilePart(unsigned partNo, offset_t &_base) const
  4493. {
  4494. return fileArray->getFilePart(partNo, _base);
  4495. }
  4496. };
  4497. class CRoxieKeyedJoinFetchActivity : public CRoxieSlaveActivity
  4498. {
  4499. IHThorKeyedJoinArg *helper;
  4500. Owned<IFileIO> rawFile;
  4501. const CRoxieKeyedJoinFetchActivityFactory *factory;
  4502. offset_t base;
  4503. const char *inputLimit;
  4504. const char *inputData;
  4505. Owned<IFileIOArray> varFiles;
  4506. Owned<ISerialStream> rawStream;
  4507. CThorStreamDeserializerSource deserializeSource;
  4508. virtual void setPartNo(bool filechanged)
  4509. {
  4510. rawFile.setown(variableFileName ? varFiles->getFilePart(lastPartNo.partNo, base) : factory->getFilePart(lastPartNo.partNo, base)); // MORE - superfiles
  4511. rawStream.setown(createFileSerialStream(rawFile, 0, -1, 0));
  4512. deserializeSource.setStream(rawStream);
  4513. }
  4514. public:
  4515. IMPLEMENT_IINTERFACE;
  4516. CRoxieKeyedJoinFetchActivity(SlaveContextLogger &_logctx, IRoxieQueryPacket *_packet, HelperFactory *_hFactory, const CRoxieKeyedJoinFetchActivityFactory *_aFactory)
  4517. : factory(_aFactory),
  4518. CRoxieSlaveActivity(_logctx, _packet, _hFactory, _aFactory)
  4519. {
  4520. // MORE - no continuation row support?
  4521. base = 0;
  4522. helper = (IHThorKeyedJoinArg *) basehelper;
  4523. variableFileName = allFilesDynamic || ((helper->getFetchFlags() & (FFvarfilename|FFdynamicfilename)) != 0);
  4524. onCreate();
  4525. inputData = (const char *) serializedCreate.readDirect(0);
  4526. inputLimit = inputData + (serializedCreate.length() - serializedCreate.getPos());
  4527. }
  4528. ~CRoxieKeyedJoinFetchActivity()
  4529. {
  4530. }
  4531. virtual const char *queryDynamicFileName() const
  4532. {
  4533. return helper->getFileName();
  4534. }
  4535. virtual void setVariableFileInfo()
  4536. {
  4537. varFiles.setown(varFileInfo->getIFileIOArray(isOpt, packet->queryHeader().channel));
  4538. }
  4539. virtual bool process();
  4540. virtual StringBuffer &toString(StringBuffer &ret) const
  4541. {
  4542. return ret.appendf("KeyedJoinFetch %u", packet->queryHeader().activityId);
  4543. }
  4544. };
  4545. bool CRoxieKeyedJoinFetchActivity::process()
  4546. {
  4547. MTIME_SECTION(timer, "CRoxieKeyedJoinFetchActivity::process");
  4548. // MORE - where we are returning everything there is an optimization or two to be had
  4549. Owned<IMessagePacker> output = ROQ->createOutputStream(packet->queryHeader(), false, logctx);
  4550. unsigned processed = 0;
  4551. unsigned skipped = 0;
  4552. unsigned __int64 rowLimit = helper->getRowLimit();
  4553. unsigned totalSizeSent = 0;
  4554. Owned<IOutputRowDeserializer> rowDeserializer = helper->queryDiskRecordSize()->createDiskDeserializer(queryContext->queryCodeContext(), basefactory->queryId());
  4555. Owned<IEngineRowAllocator> diskAllocator = getRowAllocator(helper->queryDiskRecordSize(), basefactory->queryId());
  4556. RtlDynamicRowBuilder diskRowBuilder(diskAllocator);
  4557. IOutputMetaData *joinFieldsMeta = helper->queryJoinFieldsRecordSize();
  4558. Owned<IEngineRowAllocator> joinFieldsAllocator = getRowAllocator(joinFieldsMeta, basefactory->queryId());
  4559. OptimizedKJRowBuilder jfRowBuilder(joinFieldsAllocator, joinFieldsMeta, output);
  4560. CachedOutputMetaData inputFields(helper->queryFetchInputRecordSize());
  4561. size32_t inputSize = inputFields.getFixedSize();
  4562. while (!aborted && inputData < inputLimit)
  4563. {
  4564. checkPartChanged(*(PartNoType *) inputData);
  4565. inputData += sizeof(PartNoType);
  4566. offset_t rp;
  4567. memcpy(&rp, inputData, sizeof(rp));
  4568. offset_t pos;
  4569. if (isLocalFpos(rp))
  4570. pos = getLocalFposOffset(rp);
  4571. else
  4572. pos = rp-base;
  4573. deserializeSource.reset(pos);
  4574. unsigned sizeRead = rowDeserializer->deserialize(diskRowBuilder.ensureRow(), deserializeSource);
  4575. OwnedConstRoxieRow rawBuffer = diskRowBuilder.finalizeRowClear(sizeRead);
  4576. const KeyedJoinHeader *headerPtr = (KeyedJoinHeader *) inputData;
  4577. inputData = &headerPtr->rhsdata[0];
  4578. if (inputFields.isVariableSize())
  4579. {
  4580. memcpy(&inputSize, inputData, sizeof(inputSize));
  4581. inputData += sizeof(inputSize);
  4582. }
  4583. if (helper->fetchMatch(inputData, rawBuffer))
  4584. {
  4585. unsigned thisSize = helper->extractJoinFields(jfRowBuilder, rawBuffer, rp, (IBlobProvider*)NULL);
  4586. jfRowBuilder.writeToOutput(thisSize, headerPtr->fpos, headerPtr->thisGroup, headerPtr->partNo);
  4587. totalSizeSent += KEYEDJOIN_RECORD_SIZE(thisSize);
  4588. processed++;
  4589. if (processed > rowLimit)
  4590. {
  4591. logctx.noteStatistic(STATS_DISK_SEEKS, processed+skipped, 1);
  4592. logctx.noteStatistic(STATS_ACCEPTED, processed, 1);
  4593. logctx.noteStatistic(STATS_REJECTED, skipped, 1);
  4594. limitExceeded();
  4595. return true;
  4596. }
  4597. }
  4598. else
  4599. {
  4600. skipped++;
  4601. KeyedJoinHeader *out = (KeyedJoinHeader *) output->getBuffer(KEYEDJOIN_RECORD_SIZE(0), true);
  4602. out->fpos = 0;
  4603. out->thisGroup = headerPtr->thisGroup;
  4604. out->partNo = (unsigned short) -1;
  4605. output->putBuffer(out, KEYEDJOIN_RECORD_SIZE(0), true);
  4606. totalSizeSent += KEYEDJOIN_RECORD_SIZE(0);
  4607. }
  4608. inputData += inputSize;
  4609. }
  4610. logctx.noteStatistic(STATS_DISK_SEEKS, processed+skipped, 1);
  4611. logctx.noteStatistic(STATS_ACCEPTED, processed, 1);
  4612. logctx.noteStatistic(STATS_REJECTED, skipped, 1);
  4613. logctx.flush(true, aborted);
  4614. if (aborted)
  4615. {
  4616. output->abort();
  4617. if (logctx.queryTraceLevel() > 5)
  4618. {
  4619. StringBuffer s;
  4620. logctx.CTXLOG("CRoxieKeyedJoinFetchActivity aborted: %s", packet->queryHeader().toString(s).str());
  4621. }
  4622. }
  4623. else
  4624. {
  4625. output->flush(true);
  4626. if (logctx.queryTraceLevel() > 5)
  4627. {
  4628. StringBuffer s;
  4629. logctx.CTXLOG("CRoxieKeyedJoinFetchActivity completed: %d records returned(%d bytes), %d skipped: %s", processed, totalSizeSent, skipped, packet->queryHeader().toString(s).str());
  4630. }
  4631. }
  4632. return !aborted;
  4633. }
  4634. IRoxieSlaveActivity *CRoxieKeyedJoinFetchActivityFactory::createActivity(SlaveContextLogger &logctx, IRoxieQueryPacket *packet) const
  4635. {
  4636. return new CRoxieKeyedJoinFetchActivity(logctx, packet, helperFactory, this);
  4637. }
  4638. ISlaveActivityFactory *createRoxieKeyedJoinFetchActivityFactory(IPropertyTree &_graphNode, unsigned _subgraphId, IQueryFactory &_queryFactory, HelperFactory *_helperFactory)
  4639. {
  4640. return new CRoxieKeyedJoinFetchActivityFactory(_graphNode, _subgraphId, _queryFactory, _helperFactory);
  4641. }
  4642. //================================================================================================
  4643. class CRoxieRemoteActivity : public CRoxieSlaveActivity
  4644. {
  4645. protected:
  4646. IHThorRemoteArg * remoteHelper;
  4647. unsigned processed;
  4648. unsigned remoteId;
  4649. public:
  4650. IMPLEMENT_IINTERFACE;
  4651. CRoxieRemoteActivity(SlaveContextLogger &_logctx, IRoxieQueryPacket *_packet, HelperFactory *_hFactory, const CSlaveActivityFactory *_aFactory, unsigned _remoteId)
  4652. : CRoxieSlaveActivity(_logctx, _packet, _hFactory, _aFactory),
  4653. remoteId(_remoteId)
  4654. {
  4655. remoteHelper = (IHThorRemoteArg *) basehelper;
  4656. processed = 0;
  4657. onCreate();
  4658. }
  4659. virtual StringBuffer &toString(StringBuffer &ret) const
  4660. {
  4661. return ret.appendf("Remote %u", packet->queryHeader().activityId);
  4662. }
  4663. virtual const char *queryDynamicFileName() const
  4664. {
  4665. throwUnexpected();
  4666. }
  4667. virtual void setVariableFileInfo()
  4668. {
  4669. throwUnexpected();
  4670. }
  4671. virtual bool process()
  4672. {
  4673. MTIME_SECTION(timer, "CRoxieRemoteActivity ::process");
  4674. Owned<IMessagePacker> output = ROQ->createOutputStream(packet->queryHeader(), false, logctx);
  4675. unsigned __int64 rowLimit = remoteHelper->getRowLimit();
  4676. rtlRowBuilder remoteExtractBuilder;
  4677. remoteHelper->createParentExtract(remoteExtractBuilder);
  4678. Linked<IActivityGraph> remoteQuery = queryContext->queryChildGraph(remoteId);
  4679. Linked<IRoxieServerChildGraph> remoteGraph = remoteQuery->queryLoopGraph();
  4680. try
  4681. {
  4682. remoteGraph->beforeExecute();
  4683. Owned<IRoxieInput> input = remoteGraph->startOutput(0, remoteExtractBuilder.size(), remoteExtractBuilder.getbytes(), false);
  4684. while (!aborted)
  4685. {
  4686. const void * next = input->nextInGroup();
  4687. if (!next)
  4688. {
  4689. next = input->nextInGroup();
  4690. if (!next)
  4691. break;
  4692. }
  4693. size32_t nextSize = meta.getRecordSize(next);
  4694. //MORE - what about grouping?
  4695. processed++;
  4696. if (processed > rowLimit)
  4697. {
  4698. ReleaseRoxieRow(next);
  4699. limitExceeded();
  4700. break;
  4701. }
  4702. if (serializer)
  4703. serializeRow(output, next);
  4704. else
  4705. {
  4706. void * recBuffer = output->getBuffer(nextSize, meta.isVariableSize());
  4707. memcpy(recBuffer, next, nextSize);
  4708. output->putBuffer(recBuffer, nextSize, meta.isVariableSize());
  4709. }
  4710. ReleaseRoxieRow(next);
  4711. }
  4712. remoteGraph->afterExecute();
  4713. }
  4714. catch (IException *E)
  4715. {
  4716. remoteGraph->afterExecute();
  4717. if (aborted)
  4718. ::Release(E);
  4719. else
  4720. throw;
  4721. }
  4722. catch (...)
  4723. {
  4724. remoteGraph->afterExecute();
  4725. throw;
  4726. }
  4727. logctx.flush(true, aborted);
  4728. if (aborted)
  4729. output->abort();
  4730. else
  4731. output->flush(true);
  4732. return !aborted;
  4733. }
  4734. virtual void setPartNo(bool filechanged)
  4735. {
  4736. throwUnexpected();
  4737. }
  4738. };
  4739. //================================================================================================
  4740. class CRoxieRemoteActivityFactory : public CSlaveActivityFactory
  4741. {
  4742. unsigned remoteId;
  4743. public:
  4744. IMPLEMENT_IINTERFACE;
  4745. CRoxieRemoteActivityFactory(IPropertyTree &graphNode, unsigned _subgraphId, IQueryFactory &_queryFactory, HelperFactory *_helperFactory, unsigned _remoteId)
  4746. : CSlaveActivityFactory(graphNode, _subgraphId, _queryFactory, _helperFactory), remoteId(_remoteId)
  4747. {
  4748. }
  4749. virtual IRoxieSlaveActivity *createActivity(SlaveContextLogger &logctx, IRoxieQueryPacket *packet) const
  4750. {
  4751. return new CRoxieRemoteActivity(logctx, packet, helperFactory, this, remoteId);
  4752. }
  4753. virtual StringBuffer &toString(StringBuffer &s) const
  4754. {
  4755. return CSlaveActivityFactory::toString(s.append("Remote "));
  4756. }
  4757. };
  4758. ISlaveActivityFactory *createRoxieRemoteActivityFactory(IPropertyTree &_graphNode, unsigned _subgraphId, IQueryFactory &_queryFactory, HelperFactory *_helperFactory, unsigned _remoteId)
  4759. {
  4760. return new CRoxieRemoteActivityFactory(_graphNode, _subgraphId, _queryFactory, _helperFactory, _remoteId);
  4761. }
  4762. //================================================================================================
  4763. class CRoxieDummyActivityFactory : public CSlaveActivityFactory // not a real activity - just used to properly link files
  4764. {
  4765. protected:
  4766. Owned<const IResolvedFile> indexfile;
  4767. Owned<IKeyArray> keyArray;
  4768. Owned<IFileIOArray> fileArray;
  4769. TranslatorArray layoutTranslators;
  4770. public:
  4771. IMPLEMENT_IINTERFACE;
  4772. CRoxieDummyActivityFactory(IPropertyTree &_graphNode, unsigned _subgraphId, IQueryFactory &_queryFactory, bool isLoadDataOnly)
  4773. : CSlaveActivityFactory(_graphNode, _subgraphId, _queryFactory, NULL)
  4774. {
  4775. if (_graphNode.getPropBool("att[@name='_isSpill']/@value", false) || _graphNode.getPropBool("att[@name='_isSpillGlobal']/@value", false))
  4776. return; // ignore 'spills'
  4777. try // operations does not want any missing file errors to be fatal, or throw traps - just log it
  4778. {
  4779. bool isOpt = _graphNode.getPropBool("att[@name='_isOpt']/@value") || pretendAllOpt;
  4780. const char *fileName = queryNodeFileName(_graphNode);
  4781. const char *indexName = queryNodeIndexName(_graphNode);
  4782. if (indexName && (!fileName || !streq(indexName, fileName)))
  4783. {
  4784. indexfile.setown(_queryFactory.queryPackage().lookupFileName(indexName, isOpt, true, _queryFactory.queryWorkUnit()));
  4785. if (indexfile)
  4786. keyArray.setown(indexfile->getKeyArray(NULL, &layoutTranslators, isOpt, queryFactory.queryChannel(), queryFactory.getEnableFieldTranslation()));
  4787. }
  4788. if (fileName)
  4789. {
  4790. datafile.setown(_queryFactory.queryPackage().lookupFileName(fileName, isOpt, true, _queryFactory.queryWorkUnit()));
  4791. if (datafile)
  4792. fileArray.setown(datafile->getIFileIOArray(isOpt, queryFactory.queryChannel()));
  4793. }
  4794. }
  4795. catch(IException *E)
  4796. {
  4797. StringBuffer errors;
  4798. E->errorMessage(errors);
  4799. DBGLOG("%s File error = %s", (isLoadDataOnly) ? "LOADDATAONLY" : "SUSPENDED QUERY", errors.str());
  4800. E->Release();
  4801. }
  4802. }
  4803. virtual IRoxieSlaveActivity *createActivity(SlaveContextLogger &logctx, IRoxieQueryPacket *packet) const
  4804. {
  4805. throwUnexpected(); // don't actually want to create an activity
  4806. }
  4807. };
  4808. //================================================================================================
  4809. ISlaveActivityFactory *createRoxieDummyActivityFactory(IPropertyTree &_graphNode, unsigned _subgraphId, IQueryFactory &_queryFactory, bool isLoadDataOnly)
  4810. {
  4811. // MORE - bool isLoadDataOnly may need to be an enum if more than just LOADDATAONLY and suspended queries use this
  4812. return new CRoxieDummyActivityFactory(_graphNode, _subgraphId, _queryFactory, isLoadDataOnly);
  4813. }