hthorkey.cpp 130 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323132413251326132713281329133013311332133313341335133613371338133913401341134213431344134513461347134813491350135113521353135413551356135713581359136013611362136313641365136613671368136913701371137213731374137513761377137813791380138113821383138413851386138713881389139013911392139313941395139613971398139914001401140214031404140514061407140814091410141114121413141414151416141714181419142014211422142314241425142614271428142914301431143214331434143514361437143814391440144114421443144414451446144714481449145014511452145314541455145614571458145914601461146214631464146514661467146814691470147114721473147414751476147714781479148014811482148314841485148614871488148914901491149214931494149514961497149814991500150115021503150415051506150715081509151015111512151315141515151615171518151915201521152215231524152515261527152815291530153115321533153415351536153715381539154015411542154315441545154615471548154915501551155215531554155515561557155815591560156115621563156415651566156715681569157015711572157315741575157615771578157915801581158215831584158515861587158815891590159115921593159415951596159715981599160016011602160316041605160616071608160916101611161216131614161516161617161816191620162116221623162416251626162716281629163016311632163316341635163616371638163916401641164216431644164516461647164816491650165116521653165416551656165716581659166016611662166316641665166616671668166916701671167216731674167516761677167816791680168116821683168416851686168716881689169016911692169316941695169616971698169917001701170217031704170517061707170817091710171117121713171417151716171717181719172017211722172317241725172617271728172917301731173217331734173517361737173817391740174117421743174417451746174717481749175017511752175317541755175617571758175917601761176217631764176517661767176817691770177117721773177417751776177717781779178017811782178317841785178617871788178917901791179217931794179517961797179817991800180118021803180418051806180718081809181018111812181318141815181618171818181918201821182218231824182518261827182818291830183118321833183418351836183718381839184018411842184318441845184618471848184918501851185218531854185518561857185818591860186118621863186418651866186718681869187018711872187318741875187618771878187918801881188218831884188518861887188818891890189118921893189418951896189718981899190019011902190319041905190619071908190919101911191219131914191519161917191819191920192119221923192419251926192719281929193019311932193319341935193619371938193919401941194219431944194519461947194819491950195119521953195419551956195719581959196019611962196319641965196619671968196919701971197219731974197519761977197819791980198119821983198419851986198719881989199019911992199319941995199619971998199920002001200220032004200520062007200820092010201120122013201420152016201720182019202020212022202320242025202620272028202920302031203220332034203520362037203820392040204120422043204420452046204720482049205020512052205320542055205620572058205920602061206220632064206520662067206820692070207120722073207420752076207720782079208020812082208320842085208620872088208920902091209220932094209520962097209820992100210121022103210421052106210721082109211021112112211321142115211621172118211921202121212221232124212521262127212821292130213121322133213421352136213721382139214021412142214321442145214621472148214921502151215221532154215521562157215821592160216121622163216421652166216721682169217021712172217321742175217621772178217921802181218221832184218521862187218821892190219121922193219421952196219721982199220022012202220322042205220622072208220922102211221222132214221522162217221822192220222122222223222422252226222722282229223022312232223322342235223622372238223922402241224222432244224522462247224822492250225122522253225422552256225722582259226022612262226322642265226622672268226922702271227222732274227522762277227822792280228122822283228422852286228722882289229022912292229322942295229622972298229923002301230223032304230523062307230823092310231123122313231423152316231723182319232023212322232323242325232623272328232923302331233223332334233523362337233823392340234123422343234423452346234723482349235023512352235323542355235623572358235923602361236223632364236523662367236823692370237123722373237423752376237723782379238023812382238323842385238623872388238923902391239223932394239523962397239823992400240124022403240424052406240724082409241024112412241324142415241624172418241924202421242224232424242524262427242824292430243124322433243424352436243724382439244024412442244324442445244624472448244924502451245224532454245524562457245824592460246124622463246424652466246724682469247024712472247324742475247624772478247924802481248224832484248524862487248824892490249124922493249424952496249724982499250025012502250325042505250625072508250925102511251225132514251525162517251825192520252125222523252425252526252725282529253025312532253325342535253625372538253925402541254225432544254525462547254825492550255125522553255425552556255725582559256025612562256325642565256625672568256925702571257225732574257525762577257825792580258125822583258425852586258725882589259025912592259325942595259625972598259926002601260226032604260526062607260826092610261126122613261426152616261726182619262026212622262326242625262626272628262926302631263226332634263526362637263826392640264126422643264426452646264726482649265026512652265326542655265626572658265926602661266226632664266526662667266826692670267126722673267426752676267726782679268026812682268326842685268626872688268926902691269226932694269526962697269826992700270127022703270427052706270727082709271027112712271327142715271627172718271927202721272227232724272527262727272827292730273127322733273427352736273727382739274027412742274327442745274627472748274927502751275227532754275527562757275827592760276127622763276427652766276727682769277027712772277327742775277627772778277927802781278227832784278527862787278827892790279127922793279427952796279727982799280028012802280328042805280628072808280928102811281228132814281528162817281828192820282128222823282428252826282728282829283028312832283328342835283628372838283928402841284228432844284528462847284828492850285128522853285428552856285728582859286028612862286328642865286628672868286928702871287228732874287528762877287828792880288128822883288428852886288728882889289028912892289328942895289628972898289929002901290229032904290529062907290829092910291129122913291429152916291729182919292029212922292329242925292629272928292929302931293229332934293529362937293829392940294129422943294429452946294729482949295029512952295329542955295629572958295929602961296229632964296529662967296829692970297129722973297429752976297729782979298029812982298329842985298629872988298929902991299229932994299529962997299829993000300130023003300430053006300730083009301030113012301330143015301630173018301930203021302230233024302530263027302830293030303130323033303430353036303730383039304030413042304330443045304630473048304930503051305230533054305530563057305830593060306130623063306430653066306730683069307030713072307330743075307630773078307930803081308230833084308530863087308830893090309130923093309430953096309730983099310031013102310331043105310631073108310931103111311231133114311531163117311831193120312131223123312431253126312731283129313031313132313331343135313631373138313931403141314231433144314531463147314831493150315131523153315431553156315731583159316031613162316331643165316631673168316931703171317231733174317531763177317831793180318131823183318431853186318731883189319031913192319331943195319631973198319932003201320232033204320532063207320832093210321132123213321432153216321732183219322032213222322332243225322632273228322932303231323232333234323532363237323832393240324132423243324432453246324732483249325032513252325332543255325632573258325932603261326232633264326532663267326832693270327132723273327432753276327732783279328032813282328332843285328632873288328932903291329232933294329532963297329832993300330133023303330433053306330733083309331033113312331333143315331633173318331933203321332233233324332533263327332833293330333133323333333433353336333733383339334033413342334333443345334633473348334933503351335233533354335533563357335833593360336133623363336433653366336733683369337033713372337333743375337633773378337933803381338233833384338533863387338833893390339133923393339433953396339733983399340034013402340334043405340634073408340934103411341234133414341534163417341834193420342134223423342434253426342734283429343034313432343334343435343634373438343934403441344234433444344534463447344834493450345134523453345434553456345734583459346034613462346334643465346634673468346934703471347234733474347534763477347834793480348134823483348434853486348734883489349034913492349334943495349634973498349935003501350235033504350535063507350835093510351135123513351435153516351735183519352035213522352335243525352635273528352935303531353235333534353535363537353835393540354135423543354435453546354735483549355035513552355335543555355635573558355935603561356235633564356535663567356835693570357135723573357435753576357735783579358035813582358335843585358635873588358935903591359235933594359535963597359835993600360136023603360436053606360736083609361036113612361336143615361636173618361936203621362236233624362536263627362836293630363136323633363436353636363736383639364036413642364336443645364636473648364936503651365236533654365536563657365836593660366136623663366436653666366736683669367036713672367336743675367636773678367936803681368236833684368536863687368836893690369136923693369436953696369736983699370037013702370337043705370637073708370937103711371237133714371537163717371837193720372137223723372437253726372737283729373037313732373337343735373637373738373937403741374237433744374537463747374837493750375137523753375437553756375737583759376037613762376337643765376637673768376937703771377237733774377537763777377837793780378137823783378437853786378737883789379037913792379337943795379637973798379938003801380238033804380538063807380838093810381138123813381438153816381738183819382038213822382338243825382638273828382938303831383238333834383538363837383838393840384138423843384438453846384738483849385038513852385338543855385638573858385938603861386238633864386538663867386838693870387138723873387438753876387738783879388038813882388338843885388638873888388938903891389238933894389538963897389838993900390139023903390439053906390739083909391039113912391339143915391639173918391939203921392239233924392539263927392839293930393139323933393439353936393739383939394039413942394339443945394639473948394939503951395239533954395539563957395839593960396139623963396439653966396739683969397039713972397339743975397639773978397939803981398239833984398539863987398839893990399139923993399439953996399739983999400040014002400340044005400640074008400940104011401240134014401540164017401840194020402140224023402440254026402740284029403040314032403340344035403640374038403940404041404240434044404540464047404840494050405140524053405440554056405740584059406040614062406340644065406640674068406940704071407240734074407540764077407840794080408140824083408440854086408740884089409040914092409340944095
  1. /*##############################################################################
  2. HPCC SYSTEMS software Copyright (C) 2012 HPCC Systems.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. ############################################################################## */
  13. #include "hthor.ipp"
  14. #include "rtlkey.hpp"
  15. #include "jhtree.hpp"
  16. #include "eclhelper.hpp"
  17. #include "jthread.hpp"
  18. #include "jqueue.tpp"
  19. #include "dasess.hpp"
  20. #include "thorxmlwrite.hpp"
  21. #include "layouttrans.hpp"
  22. #include "thorstep.ipp"
  23. #include "roxiedebug.hpp"
  24. #define MAX_FETCH_LOOKAHEAD 1000
  25. #define IGNORE_FORMAT_CRC_MISMATCH_WHEN_NO_METADATA
  26. #define DEFAULT_KJ_PRESERVES_ORDER 1
  27. using roxiemem::IRowManager;
  28. using roxiemem::OwnedRoxieRow;
  29. using roxiemem::OwnedConstRoxieRow;
  30. using roxiemem::OwnedRoxieString;
  31. static IKeyIndex *openKeyFile(IDistributedFilePart & keyFile)
  32. {
  33. unsigned numCopies = keyFile.numCopies();
  34. assertex(numCopies);
  35. Owned<IException> exc;
  36. for (unsigned copy=0; copy < numCopies; copy++)
  37. {
  38. RemoteFilename rfn;
  39. try
  40. {
  41. OwnedIFile ifile = createIFile(keyFile.getFilename(rfn,copy));
  42. unsigned __int64 thissize = ifile->size();
  43. if (thissize != -1)
  44. {
  45. StringBuffer remotePath;
  46. rfn.getRemotePath(remotePath);
  47. unsigned crc = 0;
  48. keyFile.getCrc(crc);
  49. return createKeyIndex(remotePath.str(), crc, false, false);
  50. }
  51. }
  52. catch (IException *E)
  53. {
  54. EXCLOG(E, "While opening index file");
  55. if (exc)
  56. E->Release();
  57. else
  58. exc.setown(E);
  59. }
  60. }
  61. if (exc)
  62. throw exc.getClear();
  63. StringBuffer url;
  64. RemoteFilename rfn;
  65. keyFile.getFilename(rfn).getRemotePath(url);
  66. throw MakeStringException(1001, "Could not open key file at %s%s", url.str(), (numCopies > 1) ? " or any alternate location." : ".");
  67. }
  68. void enterSingletonSuperfiles(Shared<IDistributedFile> & file)
  69. {
  70. IDistributedSuperFile * super = file->querySuperFile();
  71. while(super && (super->numSubFiles() == 1))
  72. {
  73. file.setown(super->getSubFile(0));
  74. super = file->querySuperFile();
  75. }
  76. }
  77. bool rltEnabled(IConstWorkUnit const * wu)
  78. {
  79. if(wu->hasDebugValue("layoutTranslationEnabled"))
  80. return wu->getDebugValueBool("layoutTranslationEnabled", false);
  81. else
  82. return wu->getDebugValueBool("hthorLayoutTranslationEnabled", false);
  83. }
  84. IRecordLayoutTranslator * getRecordLayoutTranslator(IDefRecordMeta const * activityMeta, size32_t activityMetaSize, void const * activityMetaBuff, IDistributedFile * df, IRecordLayoutTranslatorCache * cache)
  85. {
  86. IPropertyTree const & props = df->queryAttributes();
  87. MemoryBuffer diskMetaBuff;
  88. if(!props.getPropBin("_record_layout", diskMetaBuff))
  89. #ifdef IGNORE_FORMAT_CRC_MISMATCH_WHEN_NO_METADATA
  90. {
  91. WARNLOG("On reading index %s, formatCRC mismatch ignored because file had no record layout metadata and so assumed old", df->queryLogicalName());
  92. return NULL;
  93. }
  94. #else
  95. throw MakeStringException(0, "Unable to recover from record layout mismatch for index %s: no record layout metadata in file", df->queryLogicalName());
  96. #endif
  97. try
  98. {
  99. if(cache)
  100. return cache->get(diskMetaBuff.length(), diskMetaBuff.bufferBase(), activityMetaSize, activityMetaBuff, activityMeta);
  101. else
  102. return createRecordLayoutTranslator(diskMetaBuff.length(), diskMetaBuff.bufferBase(), activityMetaSize, activityMetaBuff);
  103. }
  104. catch (IException *E)
  105. {
  106. StringBuffer m;
  107. m.appendf("In index %s: ", df->queryLogicalName());
  108. E->errorMessage(m);
  109. E->Release();
  110. Owned<IDefRecordMeta> diskMeta = deserializeRecordMeta(diskMetaBuff, true);
  111. StringBuffer diskMetaDesc;
  112. getRecordMetaAsString(diskMetaDesc, diskMeta);
  113. StringBuffer activityMetaDesc;
  114. getRecordMetaAsString(activityMetaDesc, activityMeta);
  115. ERRLOG("RecordLayoutTranslator error: %s\nDisk meta: %s\nActivity meta: %s", m.str(), diskMetaDesc.str(), activityMetaDesc.str());
  116. throw MakeStringExceptionDirect(0, m.str());
  117. }
  118. }
  119. static void setProgress(IPropertyTree &node, const char *name, const char *value)
  120. {
  121. StringBuffer attr("@");
  122. node.setProp(attr.append(name).str(), value);
  123. }
  124. static void setProgress(IPropertyTree &node, const char *name, unsigned __int64 value)
  125. {
  126. StringBuffer attr("@");
  127. node.setPropInt64(attr.append(name).str(), value);
  128. }
  129. class TransformCallback : public CInterface, implements IThorIndexCallback
  130. {
  131. public:
  132. TransformCallback() { keyManager = NULL; };
  133. IMPLEMENT_IINTERFACE
  134. //IThorIndexCallback
  135. virtual unsigned __int64 getFilePosition(const void * row)
  136. {
  137. return filepos;
  138. }
  139. virtual byte * lookupBlob(unsigned __int64 id)
  140. {
  141. size32_t dummy;
  142. return (byte *) keyManager->loadBlob(id, dummy);
  143. }
  144. public:
  145. offset_t & getFPosRef() { return filepos; }
  146. void setManager(IKeyManager * _manager)
  147. {
  148. finishedRow();
  149. keyManager = _manager;
  150. }
  151. void finishedRow()
  152. {
  153. if (keyManager)
  154. keyManager->releaseBlobs();
  155. }
  156. protected:
  157. IKeyManager * keyManager;
  158. offset_t filepos;
  159. };
  160. //-------------------------------------------------------------------------------------------------------------
  161. class CHThorNullAggregateActivity : public CHThorNullActivity
  162. {
  163. public:
  164. CHThorNullAggregateActivity(IAgentContext &agent, unsigned _activityId, unsigned _subgraphId, IHThorArg & _arg, IHThorCompoundAggregateExtra &_extra, ThorActivityKind _kind) : CHThorNullActivity(agent, _activityId, _subgraphId, _arg, _kind), helper(_extra) {}
  165. //interface IHThorInput
  166. virtual void ready();
  167. virtual const void *nextInGroup();
  168. virtual bool needsAllocator() const { return true; }
  169. protected:
  170. IHThorCompoundAggregateExtra &helper;
  171. bool finished;
  172. };
  173. void CHThorNullAggregateActivity::ready()
  174. {
  175. CHThorNullActivity::ready();
  176. finished = false;
  177. }
  178. const void *CHThorNullAggregateActivity::nextInGroup()
  179. {
  180. if (finished) return NULL;
  181. processed++;
  182. finished = true;
  183. RtlDynamicRowBuilder rowBuilder(rowAllocator);
  184. try
  185. {
  186. size32_t newSize = helper.clearAggregate(rowBuilder);
  187. return rowBuilder.finalizeRowClear(newSize);
  188. }
  189. catch(IException * e)
  190. {
  191. throw makeWrappedException(e);
  192. }
  193. }
  194. //-------------------------------------------------------------------------------------------------------------
  195. class CHThorNullCountActivity : public CHThorNullActivity
  196. {
  197. public:
  198. CHThorNullCountActivity(IAgentContext &agent, unsigned _activityId, unsigned _subgraphId, IHThorArg & _arg, ThorActivityKind _kind)
  199. : CHThorNullActivity(agent, _activityId, _subgraphId, _arg, _kind), finished(false) {}
  200. //interface IHThorInput
  201. virtual void ready();
  202. virtual const void *nextInGroup();
  203. virtual bool needsAllocator() const { return true; }
  204. protected:
  205. bool finished;
  206. };
  207. void CHThorNullCountActivity::ready()
  208. {
  209. CHThorNullActivity::ready();
  210. finished = false;
  211. }
  212. const void *CHThorNullCountActivity::nextInGroup()
  213. {
  214. if (finished) return NULL;
  215. processed++;
  216. finished = true;
  217. size32_t outSize = outputMeta.getFixedSize();
  218. void * ret = rowAllocator->createRow(); //meta: outputMeta
  219. if (outSize == 1)
  220. *(byte *)ret = 0;
  221. else
  222. *(unsigned __int64 *)ret = 0;
  223. return rowAllocator->finalizeRow(outSize, ret, outSize);
  224. }
  225. //-------------------------------------------------------------------------------------------------------------
  226. class CHThorIndexReadActivityBase : public CHThorActivityBase
  227. {
  228. public:
  229. CHThorIndexReadActivityBase(IAgentContext &agent, unsigned _activityId, unsigned _subgraphId, IHThorIndexReadBaseArg &_arg, ThorActivityKind _kind, IDistributedFile * df);
  230. ~CHThorIndexReadActivityBase();
  231. virtual void ready();
  232. virtual void done();
  233. IHThorInput *queryOutput(unsigned index) { return this; }
  234. virtual bool needsAllocator() const { return true; }
  235. //interface IHThorInput
  236. virtual bool isGrouped() { return false; }
  237. virtual const char *getFileName() { return NULL; }
  238. virtual bool outputToFile(const char *) { return false; }
  239. virtual IOutputMetaData * queryOutputMeta() const { return outputMeta; }
  240. virtual void updateProgress(IWUGraphProgress &progress) const
  241. {
  242. CHThorActivityBase::updateProgress(progress);
  243. IPropertyTree &node = progress.updateNode(subgraphId, activityId);
  244. setProgress(node, "postfiltered", queryPostFiltered());
  245. setProgress(node, "seeks", querySeeks());
  246. setProgress(node, "scans", queryScans());
  247. }
  248. virtual unsigned querySeeks() const
  249. {
  250. return seeks + (klManager ? klManager->querySeeks() : 0);
  251. }
  252. virtual unsigned queryScans() const
  253. {
  254. return scans + (klManager ? klManager->queryScans() : 0);
  255. }
  256. virtual unsigned queryPostFiltered() const
  257. {
  258. return postFiltered;
  259. }
  260. virtual void fail(char const * msg)
  261. {
  262. throw MakeStringExceptionDirect(0, msg);
  263. }
  264. protected:
  265. bool doPreopenLimit(unsigned __int64 limit);
  266. bool doPreopenLimitFile(unsigned __int64 & count, unsigned __int64 limit);
  267. IKeyIndex * doPreopenLimitPart(unsigned __int64 & count, unsigned __int64 limit, unsigned part);
  268. const void * createKeyedLimitOnFailRow();
  269. void getLayoutTranslators();
  270. IRecordLayoutTranslator * getLayoutTranslator(IDistributedFile * f);
  271. void verifyIndex(IKeyIndex * idx);
  272. void initManager(IKeyManager *manager);
  273. bool firstPart();
  274. virtual bool nextPart();
  275. virtual void initPart();
  276. private:
  277. bool firstMultiPart();
  278. bool nextMultiPart();
  279. void killPart();
  280. bool setCurrentPart(unsigned whichPart);
  281. void clearTlk() { tlk.clear(); tlManager.clear(); }
  282. void openTlk();
  283. bool doNextSuper();
  284. protected:
  285. IHThorIndexReadBaseArg &helper;
  286. IHThorSourceLimitTransformExtra * limitTransformExtra;
  287. CachedOutputMetaData eclKeySize;
  288. size32_t keySize;
  289. void * activityRecordMetaBuff;
  290. size32_t activityRecordMetaSize;
  291. Owned<IDefRecordMeta> activityRecordMeta;
  292. // current part
  293. Owned<IDistributedFilePart> curPart;
  294. Owned<IKeyManager> klManager;
  295. Owned<IKeyIndex> keyIndex;
  296. unsigned nextPartNumber;
  297. //multi files
  298. Owned<IDistributedFile> df;
  299. Owned<IKeyIndex> tlk;
  300. Owned<IKeyManager> tlManager;
  301. //super files:
  302. Owned<IDistributedFileIterator> superIterator;
  303. unsigned superIndex;
  304. unsigned superCount;
  305. StringBuffer superName;
  306. TransformCallback callback;
  307. //for preopening (when need counts for keyed skip limit):
  308. Owned<IKeyIndexSet> keyIndexCache;
  309. UnsignedArray superIndexCache;
  310. unsigned keyIndexCacheIdx;
  311. unsigned seeks;
  312. unsigned scans;
  313. unsigned postFiltered;
  314. bool singlePart; // a single part index, not part of a super file - optimize so never reload the part.
  315. bool localSortKey;
  316. //for layout translation
  317. Owned<IRecordLayoutTranslator> layoutTrans;
  318. PointerIArrayOf<IRecordLayoutTranslator> layoutTransArray;
  319. bool gotLayoutTrans;
  320. };
  321. CHThorIndexReadActivityBase::CHThorIndexReadActivityBase(IAgentContext &_agent, unsigned _activityId, unsigned _subgraphId, IHThorIndexReadBaseArg &_arg, ThorActivityKind _kind, IDistributedFile * _df)
  322. : CHThorActivityBase(_agent, _activityId, _subgraphId, _arg, _kind), helper(_arg), df(LINK(_df)), activityRecordMetaBuff(NULL)
  323. {
  324. singlePart = false;
  325. localSortKey = (df->queryAttributes().hasProp("@local"));
  326. IDistributedSuperFile *super = df->querySuperFile();
  327. superCount = 1;
  328. superIndex = 0;
  329. nextPartNumber = 0;
  330. if (super)
  331. {
  332. superIterator.setown(super->getSubFileIterator(true));
  333. superCount = super->numSubFiles(true);
  334. if (helper.getFlags() & TIRsorted)
  335. throw MakeStringException(1000, "SORTED attribute is not supported when reading from superkey");
  336. superName.append(df->queryLogicalName());
  337. df.clear();
  338. }
  339. else if (df->numParts() == 1)
  340. {
  341. singlePart = true;
  342. }
  343. eclKeySize.set(helper.queryDiskRecordSize());
  344. postFiltered = 0;
  345. seeks = 0;
  346. scans = 0;
  347. helper.setCallback(&callback);
  348. limitTransformExtra = static_cast<IHThorSourceLimitTransformExtra *>(helper.selectInterface(TAIsourcelimittransformextra_1));
  349. gotLayoutTrans = false;
  350. }
  351. CHThorIndexReadActivityBase::~CHThorIndexReadActivityBase()
  352. {
  353. // ReleaseRoxieRow(recBuffer);
  354. rtlFree(activityRecordMetaBuff);
  355. }
  356. void CHThorIndexReadActivityBase::ready()
  357. {
  358. CHThorActivityBase::ready();
  359. if(!gotLayoutTrans)
  360. {
  361. getLayoutTranslators();
  362. gotLayoutTrans = true;
  363. }
  364. firstPart();
  365. }
  366. void CHThorIndexReadActivityBase::done()
  367. {
  368. killPart();
  369. CHThorActivityBase::done();
  370. }
  371. bool CHThorIndexReadActivityBase::doPreopenLimit(unsigned __int64 limit)
  372. {
  373. if(!helper.canMatchAny())
  374. return false;
  375. keyIndexCache.setown(createKeyIndexSet());
  376. unsigned __int64 count = 0;
  377. if(superIterator)
  378. {
  379. superIterator->first();
  380. do
  381. {
  382. df.set(&superIterator->query());
  383. if(doPreopenLimitFile(count, limit))
  384. return true;
  385. ++superIndex;
  386. } while(superIterator->next());
  387. return false;
  388. }
  389. else
  390. {
  391. return doPreopenLimitFile(count, limit);
  392. }
  393. }
  394. bool CHThorIndexReadActivityBase::doPreopenLimitFile(unsigned __int64 & count, unsigned __int64 limit)
  395. {
  396. unsigned num = df->numParts()-1;
  397. if(num)
  398. {
  399. if(localSortKey)
  400. {
  401. Owned<IKeyIndex> tlk = openKeyFile(df->queryPart(num));
  402. verifyIndex(tlk);
  403. for(unsigned idx = 0; idx < num; ++idx)
  404. {
  405. keyIndexCache->addIndex(doPreopenLimitPart(count, limit, idx));
  406. if(superIterator)
  407. superIndexCache.append(superIndex);
  408. }
  409. }
  410. else
  411. {
  412. Owned<IKeyIndex> tlk = openKeyFile(df->queryPart(num));
  413. verifyIndex(tlk);
  414. Owned<IKeyManager> tlman = createKeyManager(tlk, keySize, NULL);
  415. initManager(tlman);
  416. while(tlman->lookup(false) && (count<=limit))
  417. {
  418. unsigned slavePart = (unsigned)tlman->queryFpos();
  419. if (slavePart)
  420. {
  421. keyIndexCache->addIndex(doPreopenLimitPart(count, limit, slavePart-1));
  422. if(superIterator)
  423. superIndexCache.append(superIndex);
  424. }
  425. }
  426. if (count>limit)
  427. {
  428. if ( agent.queryCodeContext()->queryDebugContext())
  429. agent.queryCodeContext()->queryDebugContext()->checkBreakpoint(DebugStateLimit, NULL, static_cast<IActivityBase *>(this));
  430. }
  431. }
  432. }
  433. else
  434. {
  435. keyIndexCache->addIndex(doPreopenLimitPart(count, limit, 0));
  436. if(superIterator)
  437. superIndexCache.append(superIndex);
  438. }
  439. return (count>limit);
  440. }
  441. IKeyIndex * CHThorIndexReadActivityBase::doPreopenLimitPart(unsigned __int64 & result, unsigned __int64 limit, unsigned part)
  442. {
  443. Owned<IKeyIndex> kidx;
  444. kidx.setown(openKeyFile(df->queryPart(part)));
  445. if(df->numParts() == 1)
  446. verifyIndex(kidx);
  447. if (limit != (unsigned) -1)
  448. {
  449. Owned<IKeyManager> kman = createKeyManager(kidx, keySize, NULL);
  450. initManager(kman);
  451. result += kman->checkCount(limit-result);
  452. }
  453. return kidx.getClear();
  454. }
  455. void CHThorIndexReadActivityBase::openTlk()
  456. {
  457. tlk.setown(openKeyFile(df->queryPart(df->numParts()-1)));
  458. }
  459. const void * CHThorIndexReadActivityBase::createKeyedLimitOnFailRow()
  460. {
  461. RtlDynamicRowBuilder rowBuilder(rowAllocator);
  462. size32_t newSize = limitTransformExtra->transformOnKeyedLimitExceeded(rowBuilder);
  463. if (newSize)
  464. return rowBuilder.finalizeRowClear(newSize);
  465. return NULL;
  466. }
  467. bool CHThorIndexReadActivityBase::firstPart()
  468. {
  469. killPart();
  470. if (helper.canMatchAny())
  471. {
  472. if(keyIndexCache)
  473. {
  474. keyIndexCacheIdx = 0;
  475. return nextPart();
  476. }
  477. if (singlePart)
  478. {
  479. //part is cached and not reloaded - for efficiency in subqueries.
  480. if (!keyIndex)
  481. return setCurrentPart(0);
  482. initPart();
  483. return true;
  484. }
  485. if (superIterator)
  486. {
  487. superIterator->first();
  488. superIndex = 0;
  489. return doNextSuper();
  490. }
  491. else
  492. return firstMultiPart();
  493. }
  494. return false;
  495. }
  496. bool CHThorIndexReadActivityBase::nextPart()
  497. {
  498. killPart();
  499. if(keyIndexCache)
  500. {
  501. if(keyIndexCacheIdx >= keyIndexCache->numParts())
  502. return false;
  503. keyIndex.set(keyIndexCache->queryPart(keyIndexCacheIdx));
  504. if(superIterator)
  505. {
  506. superIndex = superIndexCache.item(keyIndexCacheIdx);
  507. layoutTrans.set(layoutTransArray.item(superIndex));
  508. keySize = keyIndex->keySize();
  509. }
  510. ++keyIndexCacheIdx;
  511. initPart();
  512. return true;
  513. }
  514. if (singlePart)
  515. return false;
  516. if (nextMultiPart())
  517. return true;
  518. if (superIterator && superIterator->next())
  519. {
  520. ++superIndex;
  521. return doNextSuper();
  522. }
  523. return false;
  524. }
  525. void CHThorIndexReadActivityBase::initManager(IKeyManager *manager)
  526. {
  527. if(layoutTrans)
  528. manager->setLayoutTranslator(layoutTrans);
  529. helper.createSegmentMonitors(manager);
  530. manager->finishSegmentMonitors();
  531. manager->reset();
  532. }
  533. void CHThorIndexReadActivityBase::initPart()
  534. {
  535. klManager.setown(createKeyManager(keyIndex, keySize, NULL));
  536. initManager(klManager);
  537. callback.setManager(klManager);
  538. }
  539. void CHThorIndexReadActivityBase::killPart()
  540. {
  541. callback.setManager(NULL);
  542. if (klManager)
  543. {
  544. seeks += klManager->querySeeks();
  545. scans += klManager->queryScans();
  546. klManager.clear();
  547. }
  548. }
  549. bool CHThorIndexReadActivityBase::setCurrentPart(unsigned whichPart)
  550. {
  551. keyIndex.setown(openKeyFile(df->queryPart(whichPart)));
  552. if(df->numParts() == 1)
  553. verifyIndex(keyIndex);
  554. initPart();
  555. return true;
  556. }
  557. bool CHThorIndexReadActivityBase::firstMultiPart()
  558. {
  559. if(!tlk)
  560. openTlk();
  561. verifyIndex(tlk);
  562. tlManager.setown(createKeyManager(tlk, keySize, NULL));
  563. initManager(tlManager);
  564. nextPartNumber = 0;
  565. return nextMultiPart();
  566. }
  567. bool CHThorIndexReadActivityBase::nextMultiPart()
  568. {
  569. //tlManager may be null for a single part index within a superfile.
  570. if (tlManager)
  571. {
  572. if (localSortKey)
  573. {
  574. if (nextPartNumber<(df->numParts()-1))
  575. return setCurrentPart(nextPartNumber++);
  576. }
  577. else
  578. {
  579. while (tlManager->lookup(false))
  580. {
  581. if (tlManager->queryFpos())
  582. return setCurrentPart((unsigned)tlManager->queryFpos()-1);
  583. }
  584. }
  585. }
  586. return false;
  587. }
  588. bool CHThorIndexReadActivityBase::doNextSuper()
  589. {
  590. do
  591. {
  592. clearTlk();
  593. df.set(&superIterator->query());
  594. unsigned numParts = df->numParts();
  595. if (numParts==1)
  596. return setCurrentPart(0);
  597. if (firstMultiPart())
  598. return true;
  599. ++superIndex;
  600. } while (superIterator->next());
  601. return false;
  602. }
  603. void CHThorIndexReadActivityBase::getLayoutTranslators()
  604. {
  605. if(superIterator)
  606. {
  607. superIterator->first();
  608. do
  609. {
  610. IDistributedFile & f = superIterator->query();
  611. layoutTrans.setown(getLayoutTranslator(&f));
  612. if(layoutTrans)
  613. {
  614. StringBuffer buff;
  615. buff.append("Using record layout translation to correct layout mismatch on reading index ").append(f.queryLogicalName());
  616. WARNLOG("%s", buff.str());
  617. agent.addWuException(buff.str(), 0, ExceptionSeverityWarning, "hthor");
  618. }
  619. layoutTransArray.append(layoutTrans.getClear());
  620. } while(superIterator->next());
  621. }
  622. else
  623. {
  624. layoutTrans.setown(getLayoutTranslator(df));
  625. if(layoutTrans)
  626. {
  627. StringBuffer buff;
  628. buff.append("Using record layout translation to correct layout mismatch on reading index ").append(df->queryLogicalName());
  629. WARNLOG("%s", buff.str());
  630. agent.addWuException(buff.str(), 0, ExceptionSeverityWarning, "hthor");
  631. }
  632. }
  633. }
  634. IRecordLayoutTranslator * CHThorIndexReadActivityBase::getLayoutTranslator(IDistributedFile * f)
  635. {
  636. if(agent.queryWorkUnit()->getDebugValueBool("skipFileFormatCrcCheck", false))
  637. return NULL;
  638. if(!rltEnabled(agent.queryWorkUnit()))
  639. {
  640. verifyFormatCrc(helper.getFormatCrc(), f, (superIterator ? superName.str() : NULL) , true, true);
  641. return NULL;
  642. }
  643. if(verifyFormatCrc(helper.getFormatCrc(), f, (superIterator ? superName.str() : NULL) , true, false))
  644. return NULL;
  645. if(!activityRecordMeta)
  646. {
  647. if(!helper.getIndexLayout(activityRecordMetaSize, activityRecordMetaBuff))
  648. throw MakeStringException(0, "Unable to recover from record layout mismatch for index %s: no record layout metadata in activity", f->queryLogicalName());
  649. MemoryBuffer buff;
  650. buff.setBuffer(activityRecordMetaSize, activityRecordMetaBuff, false);
  651. activityRecordMeta.setown(deserializeRecordMeta(buff, true));
  652. }
  653. return getRecordLayoutTranslator(activityRecordMeta, activityRecordMetaSize, activityRecordMetaBuff, f, agent.queryRecordLayoutTranslatorCache());
  654. }
  655. void CHThorIndexReadActivityBase::verifyIndex(IKeyIndex * idx)
  656. {
  657. if(superIterator)
  658. layoutTrans.set(layoutTransArray.item(superIndex));
  659. keySize = idx->keySize();
  660. if (eclKeySize.isFixedSize())
  661. {
  662. if(layoutTrans)
  663. layoutTrans->checkSizes(df->queryLogicalName(), eclKeySize.getFixedSize(), keySize);
  664. else
  665. if (keySize != eclKeySize.getFixedSize())
  666. throw MakeStringException(0, "Key size mismatch reading index %s: index indicates size %u, ECL indicates size %u", df->queryLogicalName(), keySize, eclKeySize.getFixedSize());
  667. }
  668. }
  669. //-------------------------------------------------------------------------------------------------------------
  670. class CHThorIndexReadActivity : public CHThorIndexReadActivityBase
  671. {
  672. public:
  673. CHThorIndexReadActivity(IAgentContext &agent, unsigned _activityId, unsigned _subgraphId, IHThorIndexReadArg &_arg, ThorActivityKind _kind, IDistributedFile * df);
  674. ~CHThorIndexReadActivity();
  675. //interface IHThorInput
  676. virtual void ready();
  677. virtual const void *nextInGroup();
  678. virtual const void * nextGE(const void * seek, unsigned numFields);
  679. virtual IInputSteppingMeta * querySteppingMeta();
  680. protected:
  681. virtual bool nextPart();
  682. virtual void initPart();
  683. protected:
  684. IHThorIndexReadArg &helper;
  685. IHThorSteppedSourceExtra * steppedExtra;
  686. unsigned __int64 keyedProcessed;
  687. unsigned __int64 keyedLimit;
  688. unsigned __int64 rowLimit;
  689. unsigned __int64 stopAfter;
  690. ISteppingMeta * rawMeta;
  691. ISteppingMeta * projectedMeta;
  692. size32_t seekGEOffset;
  693. unsigned * seekSizes;
  694. CSteppingMeta steppingMeta;
  695. bool needTransform;
  696. bool keyedLimitReached;
  697. bool keyedLimitSkips;
  698. bool keyedLimitCreates;
  699. bool keyedLimitRowCreated;
  700. };
  701. CHThorIndexReadActivity::CHThorIndexReadActivity(IAgentContext &_agent, unsigned _activityId, unsigned _subgraphId, IHThorIndexReadArg &_arg, ThorActivityKind _kind, IDistributedFile * _df)
  702. : CHThorIndexReadActivityBase(_agent, _activityId, _subgraphId, _arg, _kind, _df), helper(_arg)
  703. {
  704. steppedExtra = static_cast<IHThorSteppedSourceExtra *>(helper.selectInterface(TAIsteppedsourceextra_1));
  705. needTransform = helper.needTransform();
  706. keyedLimit = (unsigned __int64)-1;
  707. rowLimit = (unsigned __int64)-1;
  708. stopAfter = (unsigned __int64)-1;
  709. keyedLimitReached = false;
  710. keyedLimitSkips = ((helper.getFlags() & TIRkeyedlimitskips) != 0);
  711. keyedLimitCreates = ((helper.getFlags() & TIRkeyedlimitcreates) != 0);
  712. keyedLimitRowCreated = false;
  713. keyedProcessed = 0;
  714. rawMeta = helper.queryRawSteppingMeta();
  715. projectedMeta = helper.queryProjectedSteppingMeta();
  716. seekGEOffset = 0;
  717. seekSizes = 0;
  718. if (rawMeta)
  719. {
  720. //should check that no translation, also should check all keys in maxFields list can actually be keyed.
  721. const CFieldOffsetSize * fields = rawMeta->queryFields();
  722. unsigned maxFields = rawMeta->getNumFields();
  723. seekGEOffset = fields[0].offset;
  724. seekSizes = new unsigned[maxFields];
  725. seekSizes[0] = fields[0].size;
  726. for (unsigned i=1; i < maxFields; i++)
  727. seekSizes[i] = seekSizes[i-1] + fields[i].size;
  728. if (projectedMeta)
  729. steppingMeta.init(projectedMeta, false);
  730. else
  731. steppingMeta.init(rawMeta, false);
  732. }
  733. }
  734. CHThorIndexReadActivity::~CHThorIndexReadActivity()
  735. {
  736. delete [] seekSizes;
  737. }
  738. void CHThorIndexReadActivity::ready()
  739. {
  740. keyedLimitReached = false;
  741. keyedLimitRowCreated = false;
  742. keyedLimit = helper.getKeyedLimit();
  743. rowLimit = helper.getRowLimit();
  744. if (helper.getFlags() & TIRlimitskips)
  745. rowLimit = (unsigned __int64) -1;
  746. stopAfter = helper.getChooseNLimit();
  747. keyedProcessed = 0;
  748. if(!gotLayoutTrans)
  749. {
  750. getLayoutTranslators();
  751. gotLayoutTrans = true;
  752. }
  753. if (seekGEOffset || localSortKey || ((keyedLimit != (unsigned __int64) -1) && ((helper.getFlags() & TIRcountkeyedlimit) != 0) && !singlePart))
  754. keyedLimitReached = doPreopenLimit(keyedLimit);
  755. CHThorIndexReadActivityBase::ready();
  756. if (steppedExtra)
  757. steppingMeta.setExtra(steppedExtra);
  758. if(klManager && (keyedLimit != (unsigned __int64) -1) && ((helper.getFlags() & TIRcountkeyedlimit) != 0) && singlePart && !seekGEOffset)
  759. {
  760. unsigned __int64 result = klManager->checkCount(keyedLimit);
  761. keyedLimitReached = (result > keyedLimit);
  762. klManager->reset();
  763. }
  764. }
  765. bool CHThorIndexReadActivity::nextPart()
  766. {
  767. if(keyIndexCache && (seekGEOffset || localSortKey))
  768. {
  769. klManager.setown(createKeyMerger(keyIndexCache, keySize, seekGEOffset, NULL));
  770. keyIndexCache.clear();
  771. initManager(klManager);
  772. callback.setManager(klManager);
  773. return true;
  774. }
  775. else if (seekGEOffset || localSortKey)
  776. return false;
  777. else
  778. return CHThorIndexReadActivityBase::nextPart();
  779. }
  780. void CHThorIndexReadActivity::initPart()
  781. {
  782. CHThorIndexReadActivityBase::initPart();
  783. }
  784. const void *CHThorIndexReadActivity::nextInGroup()
  785. {
  786. if(keyedLimitReached)
  787. {
  788. if (keyedLimitSkips)
  789. return NULL;
  790. if (keyedLimitCreates)
  791. {
  792. if (!keyedLimitRowCreated)
  793. {
  794. keyedLimitRowCreated = true;
  795. return createKeyedLimitOnFailRow();
  796. }
  797. return NULL;
  798. }
  799. helper.onKeyedLimitExceeded(); // should throw exception
  800. }
  801. if((stopAfter && (processed-initialProcessed)==stopAfter) || !klManager)
  802. return NULL;
  803. loop
  804. {
  805. agent.reportProgress(NULL);
  806. if (klManager->lookup(true))
  807. {
  808. keyedProcessed++;
  809. if ((keyedLimit != (unsigned __int64) -1) && keyedProcessed > keyedLimit)
  810. helper.onKeyedLimitExceeded();
  811. byte const * keyRow = klManager->queryKeyBuffer(callback.getFPosRef());
  812. if (needTransform)
  813. {
  814. try
  815. {
  816. size32_t recSize;
  817. RtlDynamicRowBuilder rowBuilder(rowAllocator);
  818. recSize = helper.transform(rowBuilder, keyRow);
  819. callback.finishedRow();
  820. if (recSize)
  821. {
  822. processed++;
  823. if ((processed-initialProcessed) > rowLimit)
  824. {
  825. helper.onLimitExceeded();
  826. if ( agent.queryCodeContext()->queryDebugContext())
  827. agent.queryCodeContext()->queryDebugContext()->checkBreakpoint(DebugStateLimit, NULL, static_cast<IActivityBase *>(this));
  828. }
  829. return rowBuilder.finalizeRowClear(recSize);
  830. }
  831. else
  832. {
  833. postFiltered++;
  834. }
  835. }
  836. catch(IException * e)
  837. {
  838. throw makeWrappedException(e);
  839. }
  840. }
  841. else
  842. {
  843. processed++;
  844. if ((processed-initialProcessed) > rowLimit)
  845. {
  846. helper.onLimitExceeded();
  847. if ( agent.queryCodeContext()->queryDebugContext())
  848. agent.queryCodeContext()->queryDebugContext()->checkBreakpoint(DebugStateLimit, NULL, static_cast<IActivityBase *>(this));
  849. }
  850. try
  851. {
  852. RtlDynamicRowBuilder rowBuilder(rowAllocator);
  853. size32_t finalSize = cloneRow(rowBuilder, keyRow, outputMeta);
  854. return rowBuilder.finalizeRowClear(finalSize);
  855. }
  856. catch(IException * e)
  857. {
  858. throw makeWrappedException(e);
  859. }
  860. }
  861. }
  862. else if (!nextPart())
  863. return NULL;
  864. }
  865. }
  866. const void *CHThorIndexReadActivity::nextGE(const void * seek, unsigned numFields)
  867. {
  868. if(keyedLimitReached && !keyedLimitSkips)
  869. helper.onKeyedLimitExceeded(); // should throw exception
  870. if(keyedLimitReached || (stopAfter && (processed-initialProcessed)==stopAfter) || !klManager)
  871. return NULL;
  872. const byte * rawSeek = (const byte *)seek + seekGEOffset;
  873. unsigned seekSize = seekSizes[numFields-1];
  874. if (projectedMeta)
  875. {
  876. byte *temp = (byte *) alloca(seekSize);
  877. RtlStaticRowBuilder tempBuilder(temp - seekGEOffset, seekSize + seekGEOffset);
  878. helper.mapOutputToInput(tempBuilder, seek, numFields); // NOTE - weird interface to mapOutputToInput means that it STARTS writing at seekGEOffset...
  879. rawSeek = (byte *)temp;
  880. }
  881. loop
  882. {
  883. agent.reportProgress(NULL);
  884. if (klManager->lookupSkip(rawSeek, seekGEOffset, seekSize))
  885. {
  886. const byte * row = klManager->queryKeyBuffer(callback.getFPosRef());
  887. #ifdef _DEBUG
  888. if (memcmp(row + seekGEOffset, rawSeek, seekSize) < 0)
  889. assertex("smart seek failure");
  890. #endif
  891. keyedProcessed++;
  892. if ((keyedLimit != (unsigned __int64) -1) && keyedProcessed > keyedLimit)
  893. helper.onKeyedLimitExceeded();
  894. if (needTransform)
  895. {
  896. try
  897. {
  898. size32_t recSize;
  899. RtlDynamicRowBuilder rowBuilder(rowAllocator);
  900. recSize = helper.transform(rowBuilder, row);
  901. callback.finishedRow();
  902. if (recSize)
  903. {
  904. processed++;
  905. if ((processed-initialProcessed) > rowLimit)
  906. {
  907. helper.onLimitExceeded();
  908. if ( agent.queryCodeContext()->queryDebugContext())
  909. agent.queryCodeContext()->queryDebugContext()->checkBreakpoint(DebugStateLimit, NULL, static_cast<IActivityBase *>(this));
  910. }
  911. return rowBuilder.finalizeRowClear(recSize);
  912. }
  913. else
  914. {
  915. postFiltered++;
  916. }
  917. }
  918. catch(IException * e)
  919. {
  920. throw makeWrappedException(e);
  921. }
  922. }
  923. else
  924. {
  925. processed++;
  926. if ((processed-initialProcessed) > rowLimit)
  927. {
  928. helper.onLimitExceeded();
  929. if ( agent.queryCodeContext()->queryDebugContext())
  930. agent.queryCodeContext()->queryDebugContext()->checkBreakpoint(DebugStateLimit, NULL, static_cast<IActivityBase *>(this));
  931. }
  932. try
  933. {
  934. RtlDynamicRowBuilder rowBuilder(rowAllocator);
  935. size32_t finalSize = cloneRow(rowBuilder, row, outputMeta);
  936. return rowBuilder.finalizeRowClear(finalSize);
  937. }
  938. catch(IException * e)
  939. {
  940. throw makeWrappedException(e);
  941. }
  942. }
  943. }
  944. else if (!nextPart())
  945. return NULL;
  946. }
  947. }
  948. IInputSteppingMeta * CHThorIndexReadActivity::querySteppingMeta()
  949. {
  950. if (rawMeta)
  951. return &steppingMeta;
  952. return NULL;
  953. }
  954. extern HTHOR_API IHThorActivity *createIndexReadActivity(IAgentContext &_agent, unsigned _activityId, unsigned _subgraphId, IHThorIndexReadArg &arg, ThorActivityKind _kind)
  955. {
  956. // A logical filename for the key should refer to a single physical file - either the TLK or a monolithic key
  957. OwnedRoxieString lfn(arg.getFileName());
  958. Owned<ILocalOrDistributedFile> ldFile = _agent.resolveLFN(lfn, "IndexRead", 0 != (arg.getFlags() & TIRoptional));
  959. Linked<IDistributedFile> dFile = ldFile ? ldFile->queryDistributedFile() : NULL;
  960. if (!dFile)
  961. {
  962. StringBuffer buff;
  963. buff.append("Skipping OPT index read of nonexistent file ").append(lfn);
  964. WARNLOG("%s", buff.str());
  965. _agent.addWuException(buff.str(), 0, ExceptionSeverityWarning, "hthor");
  966. return new CHThorNullActivity(_agent, _activityId, _subgraphId, arg, _kind);
  967. }
  968. _agent.logFileAccess(dFile, "HThor", "READ");
  969. enterSingletonSuperfiles(dFile);
  970. return new CHThorIndexReadActivity(_agent, _activityId, _subgraphId, arg, _kind, dFile);
  971. }
  972. //-------------------------------------------------------------------------------------------------------------
  973. class CHThorIndexNormalizeActivity : public CHThorIndexReadActivityBase
  974. {
  975. public:
  976. CHThorIndexNormalizeActivity(IAgentContext &agent, unsigned _activityId, unsigned _subgraphId, IHThorIndexNormalizeArg &_arg, ThorActivityKind _kind, IDistributedFile * df);
  977. ~CHThorIndexNormalizeActivity();
  978. virtual void ready();
  979. virtual void done();
  980. virtual const void *nextInGroup();
  981. virtual bool needsAllocator() const { return true; }
  982. protected:
  983. const void * createNextRow();
  984. protected:
  985. IHThorIndexNormalizeArg &helper;
  986. unsigned __int64 rowLimit;
  987. unsigned __int64 stopAfter;
  988. RtlDynamicRowBuilder outBuilder;
  989. unsigned __int64 keyedProcessed;
  990. unsigned __int64 keyedLimit;
  991. bool skipLimitReached;
  992. bool expanding;
  993. };
  994. CHThorIndexNormalizeActivity::CHThorIndexNormalizeActivity(IAgentContext &_agent, unsigned _activityId, unsigned _subgraphId, IHThorIndexNormalizeArg &_arg, ThorActivityKind _kind, IDistributedFile * _df) : CHThorIndexReadActivityBase(_agent, _activityId, _subgraphId, _arg, _kind, _df), helper(_arg), outBuilder(NULL)
  995. {
  996. keyedLimit = (unsigned __int64)-1;
  997. skipLimitReached = false;
  998. keyedProcessed = 0;
  999. rowLimit = (unsigned __int64)-1;
  1000. stopAfter = (unsigned __int64)-1;
  1001. expanding = false;
  1002. }
  1003. CHThorIndexNormalizeActivity::~CHThorIndexNormalizeActivity()
  1004. {
  1005. }
  1006. void CHThorIndexNormalizeActivity::ready()
  1007. {
  1008. keyedLimit = helper.getKeyedLimit();
  1009. skipLimitReached = false;
  1010. keyedProcessed = 0;
  1011. rowLimit = helper.getRowLimit();
  1012. if (helper.getFlags() & TIRlimitskips)
  1013. rowLimit = (unsigned __int64) -1;
  1014. stopAfter = helper.getChooseNLimit();
  1015. expanding = false;
  1016. CHThorIndexReadActivityBase::ready();
  1017. outBuilder.setAllocator(rowAllocator);
  1018. }
  1019. void CHThorIndexNormalizeActivity::done()
  1020. {
  1021. outBuilder.clear();
  1022. CHThorIndexReadActivityBase::done();
  1023. }
  1024. const void *CHThorIndexNormalizeActivity::nextInGroup()
  1025. {
  1026. if ((stopAfter && (processed-initialProcessed)==stopAfter) || !klManager)
  1027. return NULL;
  1028. if (skipLimitReached || (stopAfter && (processed-initialProcessed)==stopAfter) || !klManager)
  1029. return NULL;
  1030. if ((keyedLimit != (unsigned __int64) -1) && (helper.getFlags() & TIRcountkeyedlimit) != 0)
  1031. {
  1032. unsigned __int64 result = klManager->checkCount(keyedLimit);
  1033. if (result > keyedLimit)
  1034. {
  1035. if((helper.getFlags() & TIRkeyedlimitskips) != 0)
  1036. skipLimitReached = true;
  1037. else if((helper.getFlags() & TIRkeyedlimitcreates) != 0)
  1038. {
  1039. skipLimitReached = true;
  1040. return createKeyedLimitOnFailRow();
  1041. }
  1042. else
  1043. helper.onKeyedLimitExceeded(); // should throw exception
  1044. return NULL;
  1045. }
  1046. klManager->reset();
  1047. keyedLimit = (unsigned __int64) -1; // to avoid checking it again
  1048. }
  1049. assertex(!((keyedLimit != (unsigned __int64) -1) && ((helper.getFlags() & TIRkeyedlimitskips) != 0)));
  1050. loop
  1051. {
  1052. loop
  1053. {
  1054. if (expanding)
  1055. {
  1056. loop
  1057. {
  1058. expanding = helper.next();
  1059. if (!expanding)
  1060. break;
  1061. const void * ret = createNextRow();
  1062. if (ret)
  1063. return ret;
  1064. }
  1065. }
  1066. callback.finishedRow();
  1067. while (!klManager->lookup(true))
  1068. {
  1069. keyedProcessed++;
  1070. if ((keyedLimit != (unsigned __int64) -1) && keyedProcessed > keyedLimit)
  1071. helper.onKeyedLimitExceeded();
  1072. if (!nextPart())
  1073. return NULL;
  1074. }
  1075. agent.reportProgress(NULL);
  1076. expanding = helper.first(klManager->queryKeyBuffer(callback.getFPosRef()));
  1077. if (expanding)
  1078. {
  1079. const void * ret = createNextRow();
  1080. if (ret)
  1081. return ret;
  1082. }
  1083. }
  1084. }
  1085. }
  1086. const void * CHThorIndexNormalizeActivity::createNextRow()
  1087. {
  1088. try
  1089. {
  1090. outBuilder.ensureRow();
  1091. size32_t thisSize = helper.transform(outBuilder);
  1092. if (thisSize == 0)
  1093. {
  1094. return NULL;
  1095. }
  1096. OwnedConstRoxieRow ret = outBuilder.finalizeRowClear(thisSize);
  1097. if ((processed - initialProcessed) >=rowLimit)
  1098. {
  1099. helper.onLimitExceeded();
  1100. if ( agent.queryCodeContext()->queryDebugContext())
  1101. agent.queryCodeContext()->queryDebugContext()->checkBreakpoint(DebugStateLimit, NULL, static_cast<IActivityBase *>(this));
  1102. return NULL;
  1103. }
  1104. processed++;
  1105. return ret.getClear();
  1106. }
  1107. catch(IException * e)
  1108. {
  1109. throw makeWrappedException(e);
  1110. }
  1111. }
  1112. extern HTHOR_API IHThorActivity *createIndexNormalizeActivity(IAgentContext &_agent, unsigned _activityId, unsigned _subgraphId, IHThorIndexNormalizeArg &arg, ThorActivityKind _kind)
  1113. {
  1114. // A logical filename for the key should refer to a single physical file - either the TLK or a monolithic key
  1115. OwnedRoxieString lfn(arg.getFileName());
  1116. Owned<ILocalOrDistributedFile> ldFile = _agent.resolveLFN(lfn, "IndexNormalize", 0 != (arg.getFlags() & TIRoptional),true,true);
  1117. Linked<IDistributedFile> dFile = ldFile ? ldFile->queryDistributedFile() : NULL;
  1118. if (!dFile)
  1119. {
  1120. StringBuffer buff;
  1121. buff.append("Skipping OPT index normalize of nonexistent file ").append(lfn);
  1122. WARNLOG("%s", buff.str());
  1123. _agent.addWuException(buff.str(), 0, ExceptionSeverityWarning, "hthor");
  1124. return new CHThorNullActivity(_agent, _activityId, _subgraphId, arg, _kind);
  1125. }
  1126. _agent.logFileAccess(dFile, "HThor", "READ");
  1127. enterSingletonSuperfiles(dFile);
  1128. return new CHThorIndexNormalizeActivity(_agent, _activityId, _subgraphId, arg, _kind, dFile);
  1129. }
  1130. //-------------------------------------------------------------------------------------------------------------
  1131. class CHThorIndexAggregateActivity : public CHThorIndexReadActivityBase
  1132. {
  1133. public:
  1134. CHThorIndexAggregateActivity(IAgentContext &agent, unsigned _activityId, unsigned _subgraphId, IHThorIndexAggregateArg &_arg, ThorActivityKind _kind, IDistributedFile * df);
  1135. ~CHThorIndexAggregateActivity();
  1136. //interface IHThorInput
  1137. virtual void done();
  1138. virtual void ready();
  1139. virtual const void *nextInGroup();
  1140. virtual bool needsAllocator() const { return true; }
  1141. protected:
  1142. void * createNextRow();
  1143. void gather();
  1144. protected:
  1145. IHThorIndexAggregateArg &helper;
  1146. RtlDynamicRowBuilder outBuilder;
  1147. bool finished;
  1148. };
  1149. CHThorIndexAggregateActivity::CHThorIndexAggregateActivity(IAgentContext &_agent, unsigned _activityId, unsigned _subgraphId, IHThorIndexAggregateArg &_arg, ThorActivityKind _kind, IDistributedFile * _df)
  1150. : CHThorIndexReadActivityBase(_agent, _activityId, _subgraphId, _arg, _kind, _df), helper(_arg), outBuilder(NULL)
  1151. {
  1152. }
  1153. CHThorIndexAggregateActivity::~CHThorIndexAggregateActivity()
  1154. {
  1155. }
  1156. void CHThorIndexAggregateActivity::ready()
  1157. {
  1158. CHThorIndexReadActivityBase::ready();
  1159. outBuilder.setAllocator(rowAllocator);
  1160. finished = false;
  1161. }
  1162. void CHThorIndexAggregateActivity::done()
  1163. {
  1164. outBuilder.clear();
  1165. CHThorIndexReadActivityBase::done();
  1166. }
  1167. void CHThorIndexAggregateActivity::gather()
  1168. {
  1169. outBuilder.ensureRow();
  1170. try
  1171. {
  1172. helper.clearAggregate(outBuilder);
  1173. }
  1174. catch(IException * e)
  1175. {
  1176. throw makeWrappedException(e);
  1177. }
  1178. if(!klManager)
  1179. return;
  1180. loop
  1181. {
  1182. while (!klManager->lookup(true))
  1183. {
  1184. if (!nextPart())
  1185. return;
  1186. }
  1187. agent.reportProgress(NULL);
  1188. try
  1189. {
  1190. helper.processRow(outBuilder, klManager->queryKeyBuffer(callback.getFPosRef()));
  1191. }
  1192. catch(IException * e)
  1193. {
  1194. throw makeWrappedException(e);
  1195. }
  1196. callback.finishedRow();
  1197. }
  1198. }
  1199. const void *CHThorIndexAggregateActivity::nextInGroup()
  1200. {
  1201. if (finished) return NULL;
  1202. gather();
  1203. processed++;
  1204. finished = true;
  1205. size32_t size = outputMeta.getRecordSize(outBuilder.getSelf());
  1206. return outBuilder.finalizeRowClear(size);
  1207. }
  1208. extern HTHOR_API IHThorActivity *createIndexAggregateActivity(IAgentContext &_agent, unsigned _activityId, unsigned _subgraphId, IHThorIndexAggregateArg &arg, ThorActivityKind _kind)
  1209. {
  1210. // A logical filename for the key should refer to a single physical file - either the TLK or a monolithic key
  1211. OwnedRoxieString lfn(arg.getFileName());
  1212. Owned<ILocalOrDistributedFile> ldFile = _agent.resolveLFN(lfn, "IndexAggregate", 0 != (arg.getFlags() & TIRoptional));
  1213. Linked<IDistributedFile> dFile = ldFile ? ldFile->queryDistributedFile() : NULL;
  1214. if (!dFile)
  1215. {
  1216. StringBuffer buff;
  1217. buff.append("Skipping OPT index aggregate of nonexistent file ").append(lfn);
  1218. WARNLOG("%s", buff.str());
  1219. _agent.addWuException(buff.str(), 0, ExceptionSeverityWarning, "hthor");
  1220. return new CHThorNullAggregateActivity(_agent, _activityId, _subgraphId, arg, arg, _kind);
  1221. }
  1222. _agent.logFileAccess(dFile, "HThor", "READ");
  1223. enterSingletonSuperfiles(dFile);
  1224. return new CHThorIndexAggregateActivity(_agent, _activityId, _subgraphId, arg, _kind, dFile);
  1225. }
  1226. //-------------------------------------------------------------------------------------------------------------
  1227. class CHThorIndexCountActivity : public CHThorIndexReadActivityBase
  1228. {
  1229. public:
  1230. CHThorIndexCountActivity(IAgentContext &agent, unsigned _activityId, unsigned _subgraphId, IHThorIndexCountArg &_arg, ThorActivityKind _kind, IDistributedFile * df);
  1231. //interface IHThorInput
  1232. virtual void ready();
  1233. virtual const void *nextInGroup();
  1234. protected:
  1235. void * createNextRow();
  1236. protected:
  1237. IHThorIndexCountArg &helper;
  1238. unsigned __int64 choosenLimit;
  1239. bool finished;
  1240. };
  1241. CHThorIndexCountActivity::CHThorIndexCountActivity(IAgentContext &_agent, unsigned _activityId, unsigned _subgraphId, IHThorIndexCountArg &_arg, ThorActivityKind _kind, IDistributedFile * _df)
  1242. : CHThorIndexReadActivityBase(_agent, _activityId, _subgraphId, _arg, _kind, _df), helper(_arg)
  1243. {
  1244. choosenLimit = (unsigned __int64)-1;
  1245. finished = false;
  1246. }
  1247. void CHThorIndexCountActivity::ready()
  1248. {
  1249. CHThorIndexReadActivityBase::ready();
  1250. finished = false;
  1251. choosenLimit = helper.getChooseNLimit();
  1252. }
  1253. const void *CHThorIndexCountActivity::nextInGroup()
  1254. {
  1255. if (finished) return NULL;
  1256. unsigned __int64 totalCount = 0;
  1257. if(klManager)
  1258. {
  1259. loop
  1260. {
  1261. if (helper.hasFilter())
  1262. {
  1263. loop
  1264. {
  1265. agent.reportProgress(NULL);
  1266. if (!klManager->lookup(true))
  1267. break;
  1268. totalCount += helper.numValid(klManager->queryKeyBuffer(callback.getFPosRef()));
  1269. callback.finishedRow();
  1270. if ((totalCount > choosenLimit))
  1271. break;
  1272. }
  1273. }
  1274. else
  1275. totalCount += klManager->getCount();
  1276. if ((totalCount > choosenLimit) || !nextPart())
  1277. break;
  1278. }
  1279. }
  1280. finished = true;
  1281. processed++;
  1282. if (totalCount > choosenLimit)
  1283. totalCount = choosenLimit;
  1284. size32_t outSize = outputMeta.getFixedSize();
  1285. void * ret = rowAllocator->createRow(); //meta: outputMeta
  1286. if (outSize == 1)
  1287. {
  1288. assertex(choosenLimit == 1);
  1289. *(byte *)ret = (byte)totalCount;
  1290. }
  1291. else
  1292. {
  1293. assertex(outSize == sizeof(unsigned __int64));
  1294. *(unsigned __int64 *)ret = totalCount;
  1295. }
  1296. return ret = rowAllocator->finalizeRow(outSize, ret, outSize);
  1297. }
  1298. extern HTHOR_API IHThorActivity *createIndexCountActivity(IAgentContext &_agent, unsigned _activityId, unsigned _subgraphId, IHThorIndexCountArg &arg, ThorActivityKind _kind)
  1299. {
  1300. // A logical filename for the key should refer to a single physical file - either the TLK or a monolithic key
  1301. OwnedRoxieString lfn(arg.getFileName());
  1302. Owned<ILocalOrDistributedFile> ldFile = _agent.resolveLFN(lfn, "IndexCount", 0 != (arg.getFlags() & TIRoptional));
  1303. Linked<IDistributedFile> dFile = ldFile ? ldFile->queryDistributedFile() : NULL;
  1304. if (!dFile)
  1305. {
  1306. StringBuffer buff;
  1307. buff.append("Skipping OPT index count of nonexistent file ").append(lfn);
  1308. WARNLOG("%s", buff.str());
  1309. _agent.addWuException(buff.str(), 0, ExceptionSeverityWarning, "hthor");
  1310. return new CHThorNullCountActivity(_agent, _activityId, _subgraphId, arg, _kind);
  1311. }
  1312. _agent.logFileAccess(dFile, "HThor", "READ");
  1313. enterSingletonSuperfiles(dFile);
  1314. return new CHThorIndexCountActivity(_agent, _activityId, _subgraphId, arg, _kind, dFile);
  1315. }
  1316. //-------------------------------------------------------------------------------------------------------------
  1317. class CHThorIndexGroupAggregateActivity : public CHThorIndexReadActivityBase, implements IHThorGroupAggregateCallback
  1318. {
  1319. public:
  1320. CHThorIndexGroupAggregateActivity(IAgentContext &agent, unsigned _activityId, unsigned _subgraphId, IHThorIndexGroupAggregateArg &_arg, ThorActivityKind _kind, IDistributedFile * df);
  1321. IMPLEMENT_IINTERFACE
  1322. //interface IHThorInput
  1323. virtual void ready();
  1324. virtual const void *nextInGroup();
  1325. virtual bool needsAllocator() const { return true; }
  1326. virtual void processRow(const void * next);
  1327. protected:
  1328. void * createNextRow();
  1329. void gather();
  1330. protected:
  1331. IHThorIndexGroupAggregateArg &helper;
  1332. RowAggregator aggregated;
  1333. bool eof;
  1334. bool gathered;
  1335. };
  1336. CHThorIndexGroupAggregateActivity::CHThorIndexGroupAggregateActivity(IAgentContext &_agent, unsigned _activityId, unsigned _subgraphId, IHThorIndexGroupAggregateArg &_arg, ThorActivityKind _kind, IDistributedFile * _df) : CHThorIndexReadActivityBase(_agent, _activityId, _subgraphId, _arg, _kind, _df), helper(_arg), aggregated(_arg, _arg)
  1337. {
  1338. eof = false;
  1339. gathered = false;
  1340. }
  1341. void CHThorIndexGroupAggregateActivity::ready()
  1342. {
  1343. CHThorIndexReadActivityBase::ready();
  1344. eof = false;
  1345. gathered = false;
  1346. aggregated.reset();
  1347. aggregated.start(rowAllocator);
  1348. }
  1349. void CHThorIndexGroupAggregateActivity::processRow(const void * next)
  1350. {
  1351. aggregated.addRow(next);
  1352. }
  1353. void CHThorIndexGroupAggregateActivity::gather()
  1354. {
  1355. gathered = true;
  1356. if(!klManager)
  1357. return;
  1358. loop
  1359. {
  1360. while (!klManager->lookup(true))
  1361. {
  1362. if (!nextPart())
  1363. return;
  1364. }
  1365. agent.reportProgress(NULL);
  1366. try
  1367. {
  1368. helper.processRow(klManager->queryKeyBuffer(callback.getFPosRef()), this);
  1369. }
  1370. catch(IException * e)
  1371. {
  1372. throw makeWrappedException(e);
  1373. }
  1374. callback.finishedRow();
  1375. }
  1376. }
  1377. const void *CHThorIndexGroupAggregateActivity::nextInGroup()
  1378. {
  1379. if (eof)
  1380. return NULL;
  1381. if (!gathered)
  1382. gather();
  1383. Owned<AggregateRowBuilder> next = aggregated.nextResult();
  1384. if (next)
  1385. {
  1386. processed++;
  1387. return next->finalizeRowClear();
  1388. }
  1389. eof = true;
  1390. return NULL;
  1391. }
  1392. extern HTHOR_API IHThorActivity *createIndexGroupAggregateActivity(IAgentContext &_agent, unsigned _activityId, unsigned _subgraphId, IHThorIndexGroupAggregateArg &arg, ThorActivityKind _kind)
  1393. {
  1394. // A logical filename for the key should refer to a single physical file - either the TLK or a monolithic key
  1395. OwnedRoxieString lfn(arg.getFileName());
  1396. Owned<ILocalOrDistributedFile> ldFile = _agent.resolveLFN(lfn, "IndexGroupAggregate", 0 != (arg.getFlags() & TIRoptional));
  1397. Linked<IDistributedFile> dFile = ldFile ? ldFile->queryDistributedFile() : NULL;
  1398. if (!dFile)
  1399. {
  1400. StringBuffer buff;
  1401. buff.append("Skipping OPT index group aggregate of nonexistent file ").append(lfn);
  1402. WARNLOG("%s", buff.str());
  1403. _agent.addWuException(buff.str(), 0, ExceptionSeverityWarning, "hthor");
  1404. return new CHThorNullActivity(_agent, _activityId, _subgraphId, arg, _kind);
  1405. }
  1406. _agent.logFileAccess(dFile, "HThor", "READ");
  1407. enterSingletonSuperfiles(dFile);
  1408. return new CHThorIndexGroupAggregateActivity(_agent, _activityId, _subgraphId, arg, _kind, dFile);
  1409. }
  1410. //-------------------------------------------------------------------------------------------------------------
  1411. interface IThreadedExceptionHandler
  1412. {
  1413. virtual void noteException(IException *E) = 0;
  1414. };
  1415. template <class ROW, class OWNER>
  1416. class PartHandlerThread : public CInterface, implements IPooledThread
  1417. {
  1418. public:
  1419. typedef PartHandlerThread<ROW, OWNER> SELF;
  1420. IMPLEMENT_IINTERFACE;
  1421. PartHandlerThread() : owner(0)
  1422. {
  1423. }
  1424. virtual void init(void * _owner) { owner = (OWNER *)_owner; }
  1425. virtual void main()
  1426. {
  1427. try
  1428. {
  1429. owner->openPart();
  1430. loop
  1431. {
  1432. ROW * row = owner->getRow();
  1433. if (!row)
  1434. break;
  1435. owner->doRequest(row);
  1436. }
  1437. }
  1438. catch (IException *E)
  1439. {
  1440. owner->noteException(E);
  1441. }
  1442. }
  1443. bool stop()
  1444. {
  1445. owner->stop();
  1446. return true;
  1447. }
  1448. virtual bool canReuse() { return true; }
  1449. private:
  1450. OWNER * owner;
  1451. };
  1452. template <class ROW>
  1453. class ThreadedPartHandler : public CInterface
  1454. {
  1455. protected:
  1456. Linked<IThreadPool> threadPool;
  1457. PooledThreadHandle threadHandle;
  1458. QueueOf<ROW, true> pending;
  1459. CriticalSection crit;
  1460. Semaphore limit;
  1461. bool started;
  1462. Owned<IDistributedFilePart> part;
  1463. IThreadedExceptionHandler *handler;
  1464. public:
  1465. typedef ThreadedPartHandler<ROW> SELF;
  1466. ThreadedPartHandler(IDistributedFilePart *_part, IThreadedExceptionHandler *_handler, IThreadPool * _threadPool)
  1467. : limit(MAX_FETCH_LOOKAHEAD), part(_part), handler(_handler), threadHandle(0), threadPool(_threadPool)
  1468. {
  1469. started = false;
  1470. }
  1471. ~ThreadedPartHandler()
  1472. {
  1473. //is it the responsibility of the derived class to clean up the list on destruction --- can do nothing but assert here, since implementations different and VMTs gone by now
  1474. assertex(pending.ordinality() == 0);
  1475. }
  1476. void addRow(ROW * row)
  1477. {
  1478. limit.wait();
  1479. CriticalBlock procedure(crit);
  1480. pending.enqueue(row);
  1481. if (!started)
  1482. {
  1483. started = true;
  1484. start();
  1485. }
  1486. }
  1487. void stop()
  1488. {
  1489. }
  1490. void start()
  1491. {
  1492. threadHandle = threadPool->start(this);
  1493. }
  1494. void join()
  1495. {
  1496. threadPool->join(threadHandle);
  1497. started = false;
  1498. }
  1499. ROW * getRow()
  1500. {
  1501. CriticalBlock procedure(crit);
  1502. if(pending.ordinality())
  1503. {
  1504. limit.signal();
  1505. return pending.dequeue();
  1506. }
  1507. else
  1508. {
  1509. started = false; //because returning NULL will cause thread to terminate (has to be within this CriticalBlock to avoid race cond.)
  1510. return NULL;
  1511. }
  1512. }
  1513. void noteException(IException * e)
  1514. {
  1515. handler->noteException(e);
  1516. }
  1517. private:
  1518. friend class PartHandlerThread<ROW, SELF>;
  1519. virtual void doRequest(ROW * row) = 0; // Must be implemented by derived class
  1520. virtual void openPart() = 0; // Must be implemented by derived class
  1521. };
  1522. template <class ROW>
  1523. class PartHandlerThreadFactory : public CInterface, implements IThreadFactory
  1524. {
  1525. IMPLEMENT_IINTERFACE;
  1526. typedef ThreadedPartHandler<ROW> OWNER;
  1527. IPooledThread * createNew() { return new PartHandlerThread<ROW, OWNER>(); }
  1528. };
  1529. class FetchRequest : public CInterface
  1530. {
  1531. public:
  1532. const void * left;
  1533. offset_t pos;
  1534. offset_t seq;
  1535. FetchRequest(const void * _left, offset_t _pos, offset_t _seq) : left(_left), pos(_pos), seq(_seq) {}
  1536. ~FetchRequest() { ReleaseRoxieRow(left); }
  1537. };
  1538. class IFlatFetchHandlerCallback
  1539. {
  1540. public:
  1541. virtual void processFetch(FetchRequest const * fetch, offset_t pos, ISerialStream *rawStream) = 0;
  1542. };
  1543. class IXmlFetchHandlerCallback
  1544. {
  1545. public:
  1546. virtual void processFetched(FetchRequest const * fetch, IColumnProvider * lastMatch) = 0;
  1547. virtual IException * makeWrappedException(IException * e, char const * extra) const = 0;
  1548. };
  1549. // this class base for all three fetch activities and keyed join
  1550. class FetchPartHandlerBase
  1551. {
  1552. protected:
  1553. Owned<IFileIO> rawFile;
  1554. Owned<ISerialStream> rawStream;
  1555. offset_t base;
  1556. offset_t top;
  1557. bool blockcompressed;
  1558. MemoryAttr encryptionkey;
  1559. unsigned activityId;
  1560. CachedOutputMetaData const & outputMeta;
  1561. IEngineRowAllocator * rowAllocator;
  1562. IOutputRowDeserializer * rowDeserializer;
  1563. public:
  1564. FetchPartHandlerBase(offset_t _base, offset_t _size, bool _blockcompressed, MemoryAttr &_encryptionkey, unsigned _activityId, CachedOutputMetaData const & _outputMeta, IOutputRowDeserializer * _rowDeserializer, IEngineRowAllocator *_rowAllocator)
  1565. : blockcompressed(_blockcompressed),
  1566. encryptionkey(_encryptionkey),
  1567. activityId(_activityId),
  1568. outputMeta(_outputMeta),
  1569. rowDeserializer(_rowDeserializer),
  1570. rowAllocator(_rowAllocator)
  1571. {
  1572. base = _base;
  1573. top = _base + _size;
  1574. }
  1575. int compare(offset_t offset)
  1576. {
  1577. if (offset < base)
  1578. return -1;
  1579. else if (offset >= top)
  1580. return 1;
  1581. else
  1582. return 0;
  1583. }
  1584. offset_t translateFPos(offset_t rp)
  1585. {
  1586. if(isLocalFpos(rp))
  1587. return getLocalFposOffset(rp);
  1588. else
  1589. return rp-base;
  1590. }
  1591. virtual void openPart()
  1592. {
  1593. // MORE - cached file handles?
  1594. if(rawFile)
  1595. return;
  1596. IDistributedFilePart * part = queryPart();
  1597. unsigned numCopies = part->numCopies();
  1598. for (unsigned copy=0; copy < numCopies; copy++)
  1599. {
  1600. RemoteFilename rfn;
  1601. try
  1602. {
  1603. OwnedIFile ifile = createIFile(part->getFilename(rfn,copy));
  1604. unsigned __int64 thissize = ifile->size();
  1605. if (thissize != -1)
  1606. {
  1607. IPropertyTree & props = part->queryAttributes();
  1608. unsigned __int64 expectedSize;
  1609. Owned<IExpander> eexp;
  1610. if (encryptionkey.length()!=0) {
  1611. eexp.setown(createAESExpander256(encryptionkey.length(),encryptionkey.get()));
  1612. blockcompressed = true;
  1613. }
  1614. if(blockcompressed)
  1615. expectedSize = props.getPropInt64("@compressedSize", -1);
  1616. else
  1617. expectedSize = props.getPropInt64("@size", -1);
  1618. if(thissize != expectedSize && expectedSize != -1)
  1619. throw MakeStringException(0, "File size mismatch: file %s was supposed to be %"I64F"d bytes but appears to be %"I64F"d bytes", ifile->queryFilename(), expectedSize, thissize);
  1620. if(blockcompressed)
  1621. rawFile.setown(createCompressedFileReader(ifile,eexp));
  1622. else
  1623. rawFile.setown(ifile->open(IFOread));
  1624. break;
  1625. }
  1626. }
  1627. catch (IException *E)
  1628. {
  1629. EXCLOG(E, "Opening key part");
  1630. E->Release();
  1631. }
  1632. }
  1633. if(!rawFile)
  1634. {
  1635. RemoteFilename rfn;
  1636. StringBuffer rmtPath;
  1637. part->getFilename(rfn).getRemotePath(rmtPath);
  1638. throw MakeStringException(1001, "Could not open file part at %s%s", rmtPath.str(), (numCopies > 1) ? " or any alternate location." : ".");
  1639. }
  1640. rawStream.setown(createFileSerialStream(rawFile, 0, -1, 0));
  1641. }
  1642. virtual IDistributedFilePart * queryPart() = 0;
  1643. };
  1644. // this class base for all three fetch activities, but not keyed join
  1645. class SimpleFetchPartHandlerBase : public FetchPartHandlerBase, public ThreadedPartHandler<FetchRequest>
  1646. {
  1647. public:
  1648. SimpleFetchPartHandlerBase(IDistributedFilePart *_part, offset_t _base, offset_t _size, IThreadedExceptionHandler *_handler, IThreadPool * _threadPool, bool _blockcompressed, MemoryAttr &_encryptionkey, unsigned _activityId, CachedOutputMetaData const & _outputMeta, IOutputRowDeserializer * _rowDeserializer, IEngineRowAllocator *_rowAllocator)
  1649. : FetchPartHandlerBase(_base, _size, _blockcompressed, _encryptionkey, _activityId, _outputMeta, _rowDeserializer, _rowAllocator),
  1650. ThreadedPartHandler<FetchRequest>(_part, _handler, _threadPool)
  1651. {
  1652. }
  1653. ~SimpleFetchPartHandlerBase()
  1654. {
  1655. while(FetchRequest * fetch = pending.dequeue())
  1656. fetch->Release();
  1657. }
  1658. IMPLEMENT_IINTERFACE;
  1659. virtual IDistributedFilePart * queryPart() { return part; }
  1660. private:
  1661. virtual void openPart() { FetchPartHandlerBase::openPart(); }
  1662. };
  1663. // this class used for flat and CSV fetch activities, but not XML fetch or keyed join
  1664. class FlatFetchPartHandler : public SimpleFetchPartHandlerBase
  1665. {
  1666. public:
  1667. FlatFetchPartHandler(IFlatFetchHandlerCallback & _owner, IDistributedFilePart * _part, offset_t _base, offset_t _size, IThreadedExceptionHandler *_handler, IThreadPool * _threadPool, bool _blockcompressed, MemoryAttr &_encryptionkey, unsigned _activityId, CachedOutputMetaData const & _outputMeta, IOutputRowDeserializer * _rowDeserializer, IEngineRowAllocator *_rowAllocator)
  1668. : SimpleFetchPartHandlerBase(_part, _base, _size, _handler, _threadPool, _blockcompressed, _encryptionkey, _activityId, _outputMeta, _rowDeserializer, _rowAllocator),
  1669. owner(_owner)
  1670. {
  1671. }
  1672. virtual void doRequest(FetchRequest * _fetch)
  1673. {
  1674. Owned<FetchRequest> fetch(_fetch);
  1675. offset_t pos = translateFPos(fetch->pos);
  1676. if(pos >= rawFile->size())
  1677. throw MakeStringException(0, "Attempted to fetch at invalid filepos");
  1678. owner.processFetch(fetch, pos, rawStream);
  1679. }
  1680. private:
  1681. IFlatFetchHandlerCallback & owner;
  1682. };
  1683. class DistributedFileFetchHandlerBase : public CInterface, implements IInterface, implements IThreadedExceptionHandler
  1684. {
  1685. public:
  1686. IMPLEMENT_IINTERFACE;
  1687. DistributedFileFetchHandlerBase() {}
  1688. virtual ~DistributedFileFetchHandlerBase() {}
  1689. virtual void noteException(IException *E)
  1690. {
  1691. CriticalBlock procedure(exceptionCrit);
  1692. if (exception)
  1693. E->Release();
  1694. else
  1695. exception = E;
  1696. }
  1697. protected:
  1698. static offset_t getPartSize(IDistributedFilePart *part)
  1699. {
  1700. offset_t partsize = part->queryAttributes().getPropInt64("@size", -1);
  1701. if (partsize==-1)
  1702. {
  1703. MTIME_SECTION(timer, "Fetch remote file size");
  1704. unsigned numCopies = part->numCopies();
  1705. for (unsigned copy=0; copy < numCopies; copy++)
  1706. {
  1707. RemoteFilename rfn;
  1708. try
  1709. {
  1710. OwnedIFile ifile = createIFile(part->getFilename(rfn,copy));
  1711. partsize = ifile->size();
  1712. if (partsize != -1)
  1713. {
  1714. // TODO: Create DistributedFilePropertyLock for parts
  1715. part->lockProperties();
  1716. part->queryAttributes().setPropInt64("@size", partsize);
  1717. part->unlockProperties();
  1718. break;
  1719. }
  1720. }
  1721. catch(IException *E)
  1722. {
  1723. EXCLOG(E, "Open remote file");
  1724. E->Release();
  1725. }
  1726. }
  1727. }
  1728. if (partsize==-1)
  1729. throw MakeStringException(0, "Unable to determine size of filepart");
  1730. return partsize;
  1731. }
  1732. protected:
  1733. CriticalSection exceptionCrit;
  1734. IException * exception;
  1735. };
  1736. template <class PARTHANDLER>
  1737. class IFetchHandlerFactory
  1738. {
  1739. public:
  1740. virtual PARTHANDLER * createFetchPartHandler(IDistributedFilePart * part, offset_t base, offset_t size, IThreadedExceptionHandler * handler, bool blockcompressed, MemoryAttr &encryptionkey, IOutputRowDeserializer * rowDeserializer, IEngineRowAllocator *rowAllocator) = 0;
  1741. };
  1742. template <class PARTHANDLER, class LEFTPTR, class REQUEST>
  1743. class DistributedFileFetchHandler : public DistributedFileFetchHandlerBase
  1744. {
  1745. public:
  1746. typedef DistributedFileFetchHandler<PARTHANDLER, LEFTPTR, REQUEST> SELF;
  1747. DistributedFileFetchHandler(IDistributedFile * f, IFetchHandlerFactory<PARTHANDLER> & factory, MemoryAttr &encryptionkey, IOutputRowDeserializer * rowDeserializer, IEngineRowAllocator *rowAllocator) : file(f)
  1748. {
  1749. numParts = f->numParts();
  1750. parts = new PARTHANDLER *[numParts];
  1751. Owned<IFileDescriptor> fdesc = f->getFileDescriptor();
  1752. bool blockcompressed = fdesc->isCompressed(); //assume new compression, old compression was never handled on fetch
  1753. offset_t base = 0;
  1754. unsigned idx;
  1755. for (idx = 0; idx < numParts; idx++)
  1756. {
  1757. IDistributedFilePart *part = f->getPart(idx);
  1758. offset_t size = getPartSize(part);
  1759. parts[idx] = factory.createFetchPartHandler(part, base, size, this, blockcompressed, encryptionkey, rowDeserializer, rowAllocator);
  1760. base += size;
  1761. }
  1762. exception = NULL;
  1763. }
  1764. ~DistributedFileFetchHandler()
  1765. {
  1766. unsigned idx;
  1767. for (idx = 0; idx < numParts; idx++)
  1768. {
  1769. delete parts[idx];
  1770. }
  1771. delete [] parts;
  1772. }
  1773. int compare(offset_t l, PARTHANDLER * r)
  1774. {
  1775. return r->compare(l);
  1776. }
  1777. void addRow(LEFTPTR left, offset_t rp, offset_t seq)
  1778. {
  1779. PARTHANDLER * part = binsearch(rp, parts, numParts, this);
  1780. if(!part)
  1781. throw MakeStringException(1002, "FETCH: file position %"I64F"d out of range", rp);
  1782. part->addRow(new REQUEST(left, rp, seq));
  1783. }
  1784. void stop()
  1785. {
  1786. unsigned idx;
  1787. for (idx = 0; idx < numParts; idx++)
  1788. {
  1789. parts[idx]->stop();
  1790. parts[idx]->join();
  1791. }
  1792. if (exception)
  1793. throw (exception);
  1794. }
  1795. private:
  1796. Linked<IDistributedFile> file;
  1797. unsigned numParts;
  1798. PARTHANDLER * * parts;
  1799. };
  1800. //-------------------------------------------------------------------------------------------------------------
  1801. class CHThorThreadedActivityBase : public CHThorActivityBase, implements IThreadedExceptionHandler
  1802. {
  1803. class InputHandler : extends Thread
  1804. {
  1805. CHThorThreadedActivityBase *parent;
  1806. public:
  1807. InputHandler(CHThorThreadedActivityBase *_parent) : parent(_parent)
  1808. {
  1809. }
  1810. virtual int run()
  1811. {
  1812. try
  1813. {
  1814. parent->fetchAll();
  1815. }
  1816. catch (IException *E)
  1817. {
  1818. parent->noteException(E);
  1819. }
  1820. catch (...)
  1821. {
  1822. parent->noteException(MakeStringException(0, "Unknown exception caught in Fetch::InputHandler"));
  1823. }
  1824. return 0;
  1825. }
  1826. };
  1827. public:
  1828. CHThorThreadedActivityBase (IAgentContext &_agent, unsigned _activityId, unsigned _subgraphId, IHThorArg &_arg, IHThorFetchContext &_fetch, ThorActivityKind _kind, IRecordSize *diskSize)
  1829. : CHThorActivityBase(_agent, _activityId, _subgraphId, _arg, _kind), fetch(_fetch)
  1830. {
  1831. exception = NULL;
  1832. rowLimit = 0;
  1833. }
  1834. virtual ~CHThorThreadedActivityBase ()
  1835. {
  1836. }
  1837. virtual void waitForThreads()
  1838. {
  1839. aborting = true;
  1840. if (inputThread)
  1841. inputThread->join();
  1842. inputThread.clear();
  1843. threadPool.clear();
  1844. }
  1845. virtual void fetchAll() = 0;
  1846. virtual void ready()
  1847. {
  1848. CHThorActivityBase::ready();
  1849. started = false;
  1850. stopped = false;
  1851. aborting = false;
  1852. initializeThreadPool();
  1853. }
  1854. virtual void initializeThreadPool() = 0;
  1855. virtual void done()
  1856. {
  1857. aborting = true;
  1858. stop();
  1859. if (inputThread)
  1860. inputThread->join();
  1861. while (!stopped)
  1862. {
  1863. const void * row = getRow();
  1864. ReleaseRoxieRow(row);
  1865. }
  1866. clearQueue();
  1867. waitForThreads();
  1868. avail.reinit(0);
  1869. CHThorActivityBase::done();
  1870. }
  1871. virtual const void * getRow() = 0;
  1872. virtual void clearQueue() = 0;
  1873. IHThorInput *queryOutput(unsigned index) { return this; }
  1874. //interface IHThorInput
  1875. virtual bool isGrouped() { return false; }
  1876. virtual const char *getFileName() { return NULL; }
  1877. virtual bool outputToFile(const char *) { return false; }
  1878. virtual IOutputMetaData * queryOutputMeta() const { return CHThorActivityBase::outputMeta; }
  1879. protected:
  1880. Semaphore avail;
  1881. bool stopped;
  1882. bool started;
  1883. bool aborting;
  1884. IHThorFetchContext &fetch;
  1885. Owned<InputHandler> inputThread;
  1886. unsigned numParts;
  1887. unsigned __int64 rowLimit;
  1888. Owned<IThreadPool> threadPool;
  1889. CriticalSection pendingCrit;
  1890. IException *exception;
  1891. public:
  1892. virtual void noteException(IException *E)
  1893. {
  1894. CriticalBlock procedure(pendingCrit);
  1895. if (exception)
  1896. E->Release();
  1897. else
  1898. exception = E;
  1899. avail.signal();
  1900. }
  1901. void stop()
  1902. {
  1903. avail.signal();
  1904. }
  1905. virtual const void *nextInGroup()
  1906. {
  1907. if (!started)
  1908. {
  1909. started = true;
  1910. start();
  1911. }
  1912. try
  1913. {
  1914. const void *ret = getRow();
  1915. if (ret)
  1916. {
  1917. processed++;
  1918. if ((processed-initialProcessed) > rowLimit)
  1919. {
  1920. onLimitExceeded();
  1921. if ( agent.queryCodeContext()->queryDebugContext())
  1922. agent.queryCodeContext()->queryDebugContext()->checkBreakpoint(DebugStateLimit, NULL, static_cast<IActivityBase *>(this));
  1923. }
  1924. }
  1925. return ret;
  1926. }
  1927. catch(...)
  1928. {
  1929. stopParts();
  1930. throw;
  1931. }
  1932. }
  1933. virtual void initParts(IDistributedFile * f) = 0;
  1934. virtual void stopParts() = 0;
  1935. virtual void onLimitExceeded() = 0;
  1936. virtual void start()
  1937. {
  1938. OwnedRoxieString lfn(fetch.getFileName());
  1939. if (lfn.get())
  1940. {
  1941. Owned <ILocalOrDistributedFile> ldFile = agent.resolveLFN(lfn, "Fetch", 0 != (fetch.getFetchFlags() & FFdatafileoptional));
  1942. IDistributedFile * dFile = ldFile ? ldFile->queryDistributedFile() : NULL;
  1943. if(dFile)
  1944. {
  1945. verifyFetchFormatCrc(dFile);
  1946. agent.logFileAccess(dFile, "HThor", "READ");
  1947. initParts(dFile);
  1948. }
  1949. else
  1950. {
  1951. StringBuffer buff;
  1952. buff.append("Skipping OPT fetch of nonexistent file ").append(lfn);
  1953. WARNLOG("%s", buff.str());
  1954. agent.addWuException(buff.str(), 0, ExceptionSeverityWarning, "hthor");
  1955. }
  1956. }
  1957. inputThread.setown(new InputHandler(this));
  1958. inputThread->start();
  1959. }
  1960. protected:
  1961. virtual void verifyFetchFormatCrc(IDistributedFile * f) {} // do nothing here as (currently, and probably by design) not available for CSV and XML, so only implement for binary
  1962. };
  1963. class CHThorFetchActivityBase : public CHThorThreadedActivityBase, public IFetchHandlerFactory<SimpleFetchPartHandlerBase>
  1964. {
  1965. public:
  1966. CHThorFetchActivityBase(IAgentContext &_agent, unsigned _activityId, unsigned _subgraphId, IHThorArg &_arg, IHThorFetchContext &_fetch, ThorActivityKind _kind, IRecordSize *diskSize)
  1967. : CHThorThreadedActivityBase (_agent, _activityId, _subgraphId, _arg, _fetch, _kind, diskSize)
  1968. {
  1969. pendingSeq = 0;
  1970. signalSeq = 0;
  1971. dequeuedSeq = 0;
  1972. }
  1973. ~CHThorFetchActivityBase()
  1974. {
  1975. clearQueue();
  1976. }
  1977. virtual void initializeThreadPool()
  1978. {
  1979. threadPool.setown(createThreadPool("hthor fetch activity thread pool", &threadFactory));
  1980. }
  1981. virtual void initParts(IDistributedFile * f)
  1982. {
  1983. size32_t kl;
  1984. void *k;
  1985. fetch.getFileEncryptKey(kl,k);
  1986. MemoryAttr encryptionkey;
  1987. encryptionkey.setOwn(kl,k);
  1988. parts.setown(new DistributedFileFetchHandler<SimpleFetchPartHandlerBase, const void *, FetchRequest>(f, *this, encryptionkey, rowDeserializer, rowAllocator));
  1989. }
  1990. virtual void stopParts()
  1991. {
  1992. if(parts)
  1993. parts->stop();
  1994. }
  1995. virtual void fetchAll()
  1996. {
  1997. if(parts)
  1998. {
  1999. loop
  2000. {
  2001. if (aborting)
  2002. break;
  2003. const void *row = input->nextInGroup();
  2004. if (!row)
  2005. {
  2006. row = input->nextInGroup();
  2007. if (!row)
  2008. break;
  2009. }
  2010. offset_t rp = fetch.extractPosition(row);
  2011. offset_t seq = addRowPlaceholder();
  2012. parts->addRow(row, rp, seq);
  2013. }
  2014. parts->stop();
  2015. }
  2016. stop();
  2017. }
  2018. // to preserve order, we enqueue NULLs onto the queue and issue sequence numbers, and we only signal avail when rows in correct sequence are available
  2019. // pendingSeq gives the next sequence number to issue; signalSeq gives the next sequence number to signal for; and dequeuedSeq gives the number actually dequeued
  2020. offset_t addRowPlaceholder()
  2021. {
  2022. CriticalBlock procedure(pendingCrit);
  2023. pending.enqueue(NULL);
  2024. return pendingSeq++;
  2025. }
  2026. void setRow(const void *row, offset_t seq)
  2027. {
  2028. CriticalBlock procedure(pendingCrit);
  2029. //GH->? Why does this append allocated nulls instead of having a queue of const void??
  2030. pending.set((unsigned)(seq-dequeuedSeq), new const void*(row));
  2031. if(seq!=signalSeq)
  2032. return;
  2033. do
  2034. {
  2035. avail.signal();
  2036. ++signalSeq;
  2037. } while((signalSeq < pendingSeq) && (pending.query((unsigned)(signalSeq-dequeuedSeq)) != NULL));
  2038. }
  2039. const void * getRow()
  2040. {
  2041. while(!stopped)
  2042. {
  2043. avail.wait();
  2044. CriticalBlock procedure(pendingCrit);
  2045. if (exception)
  2046. {
  2047. IException *E = exception;
  2048. exception = NULL;
  2049. throw E;
  2050. }
  2051. if(pending.ordinality() == 0)
  2052. {
  2053. stopped = true;
  2054. break;
  2055. }
  2056. const void * * ptr = pending.dequeue();
  2057. ++dequeuedSeq;
  2058. const void * ret = *ptr;
  2059. delete ptr;
  2060. if(ret)
  2061. return ret;
  2062. }
  2063. return NULL;
  2064. }
  2065. virtual void clearQueue()
  2066. {
  2067. while(pending.ordinality())
  2068. {
  2069. const void * * ptr = pending.dequeue();
  2070. if(ptr)
  2071. {
  2072. ReleaseRoxieRow(*ptr);
  2073. delete ptr;
  2074. }
  2075. }
  2076. pendingSeq = 0;
  2077. signalSeq = 0;
  2078. dequeuedSeq = 0;
  2079. }
  2080. protected:
  2081. Owned<IOutputRowDeserializer> rowDeserializer;
  2082. private:
  2083. PartHandlerThreadFactory<FetchRequest> threadFactory;
  2084. Owned<DistributedFileFetchHandler<SimpleFetchPartHandlerBase, const void *, FetchRequest> > parts;
  2085. offset_t pendingSeq, signalSeq, dequeuedSeq;
  2086. QueueOf<const void *, true> pending;
  2087. };
  2088. class CHThorFlatFetchActivity : public CHThorFetchActivityBase, public IFlatFetchHandlerCallback
  2089. {
  2090. public:
  2091. CHThorFlatFetchActivity(IAgentContext &_agent, unsigned _activityId, unsigned _subgraphId, IHThorFetchArg &_arg, IHThorFetchContext &_fetch, ThorActivityKind _kind, IRecordSize *diskSize, MemoryAttr &encryptionkey)
  2092. : CHThorFetchActivityBase (_agent, _activityId, _subgraphId, _arg, _fetch, _kind, diskSize), helper(_arg)
  2093. {}
  2094. ~CHThorFlatFetchActivity()
  2095. {
  2096. waitForThreads();
  2097. }
  2098. virtual void ready()
  2099. {
  2100. CHThorFetchActivityBase::ready();
  2101. rowLimit = helper.getRowLimit();
  2102. rowDeserializer.setown(helper.queryDiskRecordSize()->createDiskDeserializer(agent.queryCodeContext(), activityId));
  2103. diskAllocator.setown(agent.queryCodeContext()->getRowAllocator(helper.queryDiskRecordSize(), activityId));
  2104. }
  2105. virtual bool needsAllocator() const { return true; }
  2106. virtual void processFetch(FetchRequest const * fetch, offset_t pos, ISerialStream *rawStream)
  2107. {
  2108. CThorStreamDeserializerSource deserializeSource;
  2109. deserializeSource.setStream(rawStream);
  2110. deserializeSource.reset(pos);
  2111. RtlDynamicRowBuilder rowBuilder(diskAllocator);
  2112. unsigned sizeRead = rowDeserializer->deserialize(rowBuilder.ensureRow(), deserializeSource);
  2113. OwnedConstRoxieRow rawBuffer(rowBuilder.finalizeRowClear(sizeRead));
  2114. CriticalBlock procedure(transformCrit);
  2115. size32_t thisSize;
  2116. try
  2117. {
  2118. RtlDynamicRowBuilder rowBuilder(rowAllocator);
  2119. thisSize = helper.transform(rowBuilder, rawBuffer, fetch->left, fetch->pos);
  2120. if(thisSize)
  2121. {
  2122. setRow(rowBuilder.finalizeRowClear(thisSize), fetch->seq);
  2123. }
  2124. else
  2125. {
  2126. setRow(NULL, fetch->seq);
  2127. }
  2128. }
  2129. catch(IException * e)
  2130. {
  2131. throw makeWrappedException(e);
  2132. }
  2133. }
  2134. virtual void onLimitExceeded()
  2135. {
  2136. helper.onLimitExceeded();
  2137. }
  2138. virtual SimpleFetchPartHandlerBase * createFetchPartHandler(IDistributedFilePart * part, offset_t base, offset_t size, IThreadedExceptionHandler * handler, bool blockcompressed, MemoryAttr &encryptionkey, IOutputRowDeserializer * rowDeserializer, IEngineRowAllocator *rowAllocator)
  2139. {
  2140. return new FlatFetchPartHandler(*this, part, base, size, handler, threadPool, blockcompressed, encryptionkey, activityId, outputMeta, rowDeserializer, rowAllocator);
  2141. }
  2142. protected:
  2143. virtual void verifyFetchFormatCrc(IDistributedFile * f)
  2144. {
  2145. if(!agent.queryWorkUnit()->getDebugValueBool("skipFileFormatCrcCheck", false))
  2146. ::verifyFormatCrcSuper(helper.getDiskFormatCrc(), f, false, true);
  2147. }
  2148. protected:
  2149. CriticalSection transformCrit;
  2150. IHThorFetchArg & helper;
  2151. Owned<IEngineRowAllocator> diskAllocator;
  2152. };
  2153. extern HTHOR_API IHThorActivity *createFetchActivity(IAgentContext &_agent, unsigned _activityId, unsigned _subgraphId, IHThorFetchArg &arg, ThorActivityKind _kind)
  2154. {
  2155. size32_t kl;
  2156. void *k;
  2157. arg.getFileEncryptKey(kl,k);
  2158. MemoryAttr encryptionkey;
  2159. encryptionkey.setOwn(kl,k);
  2160. return new CHThorFlatFetchActivity(_agent, _activityId, _subgraphId, arg, arg, _kind, arg.queryDiskRecordSize(),encryptionkey);
  2161. }
  2162. //------------------------------------------------------------------------------------------
  2163. class CHThorCsvFetchActivity : public CHThorFetchActivityBase, public IFlatFetchHandlerCallback
  2164. {
  2165. public:
  2166. CHThorCsvFetchActivity (IAgentContext &_agent, unsigned _activityId, unsigned _subgraphId, IHThorCsvFetchArg &_arg, ThorActivityKind _kind)
  2167. : CHThorFetchActivityBase(_agent, _activityId, _subgraphId, _arg, _arg, _kind, NULL), helper(_arg)
  2168. {
  2169. //MORE: I have no idea what should be passed for recordSize in the line above, either something that reads a fixed size, or
  2170. //reads a record based on the csv information
  2171. ICsvParameters * csvInfo = _arg.queryCsvParameters();
  2172. OwnedRoxieString lfn(fetch.getFileName());
  2173. Owned<ILocalOrDistributedFile> ldFile = agent.resolveLFN(lfn, "CsvFetch", 0 != (_arg.getFetchFlags() & FFdatafileoptional));
  2174. IDistributedFile * dFile = ldFile ? ldFile->queryDistributedFile() : NULL;
  2175. const char * quotes = NULL;
  2176. const char * separators = NULL;
  2177. const char * terminators = NULL;
  2178. const char * escapes = NULL;
  2179. if (dFile)
  2180. {
  2181. IPropertyTree & options = dFile->queryAttributes();
  2182. quotes = options.queryProp("@csvQuote");
  2183. separators = options.queryProp("@csvSeparate");
  2184. terminators = options.queryProp("@csvTerminate");
  2185. escapes = options.queryProp("@csvEscape");
  2186. agent.logFileAccess(dFile, "HThor", "READ");
  2187. }
  2188. else
  2189. {
  2190. StringBuffer buff;
  2191. buff.append("Skipping OPT fetch of nonexistent file ").append(lfn);
  2192. WARNLOG("%s", buff.str());
  2193. agent.addWuException(buff.str(), 0, ExceptionSeverityWarning, "hthor");
  2194. }
  2195. csvSplitter.init(_arg.getMaxColumns(), csvInfo, quotes, separators, terminators, escapes);
  2196. }
  2197. ~CHThorCsvFetchActivity()
  2198. {
  2199. waitForThreads();
  2200. }
  2201. virtual bool needsAllocator() const { return true; }
  2202. virtual void processFetch(FetchRequest const * fetch, offset_t pos, ISerialStream *rawStream)
  2203. {
  2204. rawStream->reset(pos);
  2205. CriticalBlock procedure(transformCrit);
  2206. size32_t rowSize = 4096; // MORE - make configurable
  2207. size32_t maxRowSize = 10*1024*1024; // MORE - make configurable
  2208. loop
  2209. {
  2210. size32_t avail;
  2211. const void *peek = rawStream->peek(rowSize, avail);
  2212. if (csvSplitter.splitLine(avail, (const byte *)peek) < rowSize || avail < rowSize)
  2213. break;
  2214. if (rowSize == maxRowSize)
  2215. throw MakeStringException(0, "Row too big");
  2216. if (rowSize >= maxRowSize/2)
  2217. rowSize = maxRowSize;
  2218. else
  2219. rowSize += rowSize;
  2220. }
  2221. size32_t thisSize;
  2222. try
  2223. {
  2224. RtlDynamicRowBuilder rowBuilder(rowAllocator);
  2225. thisSize = helper.transform(rowBuilder, csvSplitter.queryLengths(), (const char * *)csvSplitter.queryData(), fetch->left, fetch->pos);
  2226. if(thisSize)
  2227. {
  2228. setRow(rowBuilder.finalizeRowClear(thisSize), fetch->seq);
  2229. }
  2230. else
  2231. {
  2232. setRow(NULL, fetch->seq);
  2233. }
  2234. }
  2235. catch(IException * e)
  2236. {
  2237. throw makeWrappedException(e);
  2238. }
  2239. }
  2240. virtual void ready()
  2241. {
  2242. CHThorFetchActivityBase::ready();
  2243. rowLimit = helper.getRowLimit();
  2244. rowDeserializer.setown(helper.queryDiskRecordSize()->createDiskDeserializer(agent.queryCodeContext(), activityId));
  2245. }
  2246. virtual void onLimitExceeded()
  2247. {
  2248. helper.onLimitExceeded();
  2249. }
  2250. virtual SimpleFetchPartHandlerBase * createFetchPartHandler(IDistributedFilePart * part, offset_t base, offset_t size, IThreadedExceptionHandler * handler, bool blockcompressed, MemoryAttr &encryptionkey, IOutputRowDeserializer * rowDeserializer, IEngineRowAllocator *rowAllocator)
  2251. {
  2252. return new FlatFetchPartHandler(*this, part, base, size, handler, threadPool, blockcompressed, encryptionkey, activityId, outputMeta, rowDeserializer, rowAllocator);
  2253. }
  2254. protected:
  2255. CSVSplitter csvSplitter;
  2256. CriticalSection transformCrit;
  2257. IHThorCsvFetchArg & helper;
  2258. };
  2259. extern HTHOR_API IHThorActivity *createCsvFetchActivity(IAgentContext &_agent, unsigned _activityId, unsigned _subgraphId, IHThorCsvFetchArg &arg, ThorActivityKind _kind)
  2260. {
  2261. return new CHThorCsvFetchActivity(_agent, _activityId, _subgraphId, arg, _kind);
  2262. }
  2263. //------------------------------------------------------------------------------------------
  2264. class XmlFetchPartHandler : public SimpleFetchPartHandlerBase, public IXMLSelect
  2265. {
  2266. public:
  2267. IMPLEMENT_IINTERFACE;
  2268. XmlFetchPartHandler(IXmlFetchHandlerCallback & _owner, IDistributedFilePart * _part, offset_t _base, offset_t _size, IThreadedExceptionHandler * _handler, unsigned _streamBufferSize, IThreadPool * _threadPool, bool _blockcompressed, MemoryAttr &_encryptionkey, unsigned _activityId, CachedOutputMetaData const & _outputMeta)
  2269. : SimpleFetchPartHandlerBase(_part, _base, _size, _handler, _threadPool, _blockcompressed, _encryptionkey, _activityId, _outputMeta, NULL, NULL),
  2270. owner(_owner),
  2271. streamBufferSize(_streamBufferSize)
  2272. {
  2273. }
  2274. virtual void doRequest(FetchRequest * _fetch)
  2275. {
  2276. Owned<FetchRequest> fetch(_fetch);
  2277. offset_t pos = translateFPos(fetch->pos);
  2278. rawStream->seek(pos, IFSbegin);
  2279. while(!lastMatch)
  2280. {
  2281. bool gotNext = false;
  2282. try
  2283. {
  2284. gotNext = parser->next();
  2285. }
  2286. catch(IException * e)
  2287. {
  2288. StringBuffer fname;
  2289. RemoteFilename rfn;
  2290. part->getFilename(rfn).getPath(fname);
  2291. throw owner.makeWrappedException(e, fname.str());
  2292. }
  2293. if(!gotNext)
  2294. {
  2295. StringBuffer fname;
  2296. RemoteFilename rfn;
  2297. part->getFilename(rfn).getPath(fname);
  2298. throw MakeStringException(0, "Fetch fpos at EOF of %s", fname.str());
  2299. }
  2300. }
  2301. owner.processFetched(fetch, lastMatch);
  2302. parser->reset();
  2303. }
  2304. virtual void openPart()
  2305. {
  2306. if(parser)
  2307. return;
  2308. FetchPartHandlerBase::openPart();
  2309. rawStream.setown(createBufferedIOStream(rawFile, streamBufferSize));
  2310. parser.setown(createXMLParse(*rawStream, "/", *this));
  2311. }
  2312. //iface IXMLSelect
  2313. void match(IColumnProvider & entry, offset_t startOffset, offset_t endOffset)
  2314. {
  2315. lastMatch.set(&entry);
  2316. }
  2317. protected:
  2318. IXmlFetchHandlerCallback & owner;
  2319. Owned<IFileIOStream> rawStream;
  2320. Owned<IXMLParse> parser;
  2321. Owned<IColumnProvider> lastMatch;
  2322. unsigned streamBufferSize;
  2323. };
  2324. class CHThorXmlFetchActivity : public CHThorFetchActivityBase, public IXmlFetchHandlerCallback
  2325. {
  2326. public:
  2327. CHThorXmlFetchActivity(IAgentContext & _agent, unsigned _activityId, unsigned _subgraphId, IHThorXmlFetchArg & _arg, ThorActivityKind _kind)
  2328. : CHThorFetchActivityBase(_agent, _activityId, _subgraphId, _arg, _arg, _kind, NULL), helper(_arg)
  2329. {
  2330. }
  2331. ~CHThorXmlFetchActivity()
  2332. {
  2333. waitForThreads();
  2334. }
  2335. virtual bool needsAllocator() const { return true; }
  2336. virtual void processFetched(FetchRequest const * fetch, IColumnProvider * lastMatch)
  2337. {
  2338. CriticalBlock procedure(transformCrit);
  2339. size32_t thisSize;
  2340. try
  2341. {
  2342. RtlDynamicRowBuilder rowBuilder(rowAllocator);
  2343. thisSize = helper.transform(rowBuilder, lastMatch, fetch->left, fetch->pos);
  2344. if(thisSize)
  2345. {
  2346. setRow(rowBuilder.finalizeRowClear(thisSize), fetch->seq);
  2347. }
  2348. else
  2349. {
  2350. setRow(NULL, fetch->seq);
  2351. }
  2352. }
  2353. catch(IException * e)
  2354. {
  2355. throw makeWrappedException(e);
  2356. }
  2357. }
  2358. IException * makeWrappedException(IException * e) const { return CHThorActivityBase::makeWrappedException(e); }
  2359. virtual IException * makeWrappedException(IException * e, char const * extra) const { return CHThorActivityBase::makeWrappedException(e, extra); }
  2360. virtual void ready()
  2361. {
  2362. CHThorFetchActivityBase::ready();
  2363. rowLimit = helper.getRowLimit();
  2364. }
  2365. virtual void onLimitExceeded()
  2366. {
  2367. helper.onLimitExceeded();
  2368. }
  2369. virtual SimpleFetchPartHandlerBase * createFetchPartHandler(IDistributedFilePart * part, offset_t base, offset_t size, IThreadedExceptionHandler * handler, bool blockcompressed, MemoryAttr &encryptionkey, IOutputRowDeserializer * rowDeserializer, IEngineRowAllocator *rowAllocator)
  2370. {
  2371. return new XmlFetchPartHandler(*this, part, base, size, handler, 4096, threadPool, blockcompressed, encryptionkey, activityId, outputMeta); //MORE: need to put correct stream buffer size here, when Gavin provides it
  2372. }
  2373. protected:
  2374. CriticalSection transformCrit;
  2375. IHThorXmlFetchArg & helper;
  2376. };
  2377. extern HTHOR_API IHThorActivity *createXmlFetchActivity(IAgentContext & _agent, unsigned _activityId, unsigned _subgraphId, IHThorXmlFetchArg & arg, ThorActivityKind _kind)
  2378. {
  2379. return new CHThorXmlFetchActivity(_agent, _activityId, _subgraphId, arg, _kind);
  2380. }
  2381. //------------------------------------------------------------------------------------------
  2382. class CJoinGroup;
  2383. class MatchSet : public CInterface
  2384. {
  2385. public:
  2386. MatchSet(CJoinGroup * _jg) : jg(_jg)
  2387. {
  2388. }
  2389. ~MatchSet()
  2390. {
  2391. ForEachItemIn(idx, rows)
  2392. ReleaseRoxieRow(rows.item(idx));
  2393. }
  2394. void addRightMatch(void * right, offset_t fpos);
  2395. offset_t addRightPending();
  2396. void setPendingRightMatch(offset_t seq, void * right, offset_t fpos);
  2397. void incRightMatchCount();
  2398. unsigned count() const { return rows.ordinality(); }
  2399. CJoinGroup * queryJoinGroup() const { return jg; }
  2400. void * queryRow(unsigned idx) const { return rows.item(idx); }
  2401. offset_t queryOffset(unsigned idx) const { return offsets.item(idx); }
  2402. private:
  2403. CJoinGroup * jg;
  2404. PointerArray rows;
  2405. Int64Array offsets;
  2406. };
  2407. interface IJoinProcessor
  2408. {
  2409. virtual CJoinGroup *createJoinGroup(const void *row) = 0;
  2410. virtual void readyManager(IKeyManager * manager, const void * row) = 0;
  2411. virtual void doneManager(IKeyManager * manager) = 0;
  2412. virtual bool addMatch(MatchSet * ms, IKeyManager * manager) = 0;
  2413. virtual void onComplete(CJoinGroup * jg) = 0;
  2414. virtual bool leftCanMatch(const void *_left) = 0;
  2415. virtual IRecordLayoutTranslator * getLayoutTranslator(IDistributedFile * f) = 0;
  2416. virtual void verifyIndex(IDistributedFile * f, IKeyIndex * idx, IRecordLayoutTranslator * trans) = 0;
  2417. };
  2418. class CJoinGroup : public CInterface, implements IInterface
  2419. {
  2420. public:
  2421. class MatchIterator
  2422. {
  2423. public:
  2424. // Single threaded by now
  2425. void const * queryRow() const { return owner.matchsets.item(ms).queryRow(idx); }
  2426. offset_t queryOffset() const { return owner.matchsets.item(ms).queryOffset(idx); }
  2427. bool start()
  2428. {
  2429. idx = 0;
  2430. for(ms = 0; owner.matchsets.isItem(ms); ++ms)
  2431. if(owner.matchsets.item(ms).count())
  2432. return true;
  2433. return false;
  2434. }
  2435. bool next()
  2436. {
  2437. if(++idx < owner.matchsets.item(ms).count())
  2438. return true;
  2439. idx = 0;
  2440. while(owner.matchsets.isItem(++ms))
  2441. if(owner.matchsets.item(ms).count())
  2442. return true;
  2443. return false;
  2444. }
  2445. private:
  2446. friend class CJoinGroup;
  2447. MatchIterator(CJoinGroup const & _owner) : owner(_owner) {}
  2448. CJoinGroup const & owner;
  2449. unsigned ms;
  2450. unsigned idx;
  2451. } matches;
  2452. CJoinGroup *prev; // Doubly-linked list to allow us to keep track of ones that are still in use
  2453. CJoinGroup *next;
  2454. CJoinGroup() : matches(*this)
  2455. {
  2456. // Used for head object only
  2457. left = NULL;
  2458. prev = NULL;
  2459. next = NULL;
  2460. atomic_set(&endMarkersPending,0);
  2461. groupStart = NULL;
  2462. matchcount = 0;
  2463. }
  2464. IMPLEMENT_IINTERFACE;
  2465. CJoinGroup(const void *_left, IJoinProcessor *_join, CJoinGroup *_groupStart) : join(_join), matches(*this)
  2466. {
  2467. candidates = 0;
  2468. left = _left;
  2469. if (_groupStart)
  2470. {
  2471. groupStart = _groupStart;
  2472. atomic_inc(&_groupStart->endMarkersPending);
  2473. }
  2474. else
  2475. {
  2476. groupStart = this;
  2477. atomic_set(&endMarkersPending, 1);
  2478. }
  2479. matchcount = 0;
  2480. }
  2481. ~CJoinGroup()
  2482. {
  2483. ReleaseRoxieRow(left);
  2484. }
  2485. MatchSet * getMatchSet()
  2486. {
  2487. CriticalBlock b(crit);
  2488. MatchSet * ms = new MatchSet(this);
  2489. matchsets.append(*ms);
  2490. return ms;
  2491. }
  2492. inline void notePending()
  2493. {
  2494. // assertex(!complete());
  2495. atomic_inc(&groupStart->endMarkersPending);
  2496. }
  2497. inline bool complete() const
  2498. {
  2499. return atomic_read(&groupStart->endMarkersPending) == 0;
  2500. }
  2501. inline bool inGroup(CJoinGroup *leader) const
  2502. {
  2503. return groupStart==leader;
  2504. }
  2505. inline void noteEnd()
  2506. {
  2507. assertex(!complete());
  2508. if (atomic_dec_and_test(&groupStart->endMarkersPending))
  2509. {
  2510. join->onComplete(groupStart);
  2511. }
  2512. }
  2513. inline unsigned noteCandidate()
  2514. {
  2515. CriticalBlock b(crit);
  2516. return ++candidates;
  2517. }
  2518. inline const void *queryLeft() const
  2519. {
  2520. return left;
  2521. }
  2522. inline unsigned rowsSeen() const
  2523. {
  2524. CriticalBlock b(crit);
  2525. return matchcount;
  2526. }
  2527. inline unsigned candidateCount() const
  2528. {
  2529. CriticalBlock b(crit);
  2530. return candidates;
  2531. }
  2532. protected:
  2533. friend class MatchSet;
  2534. friend class MatchIterator;
  2535. const void *left;
  2536. unsigned matchcount;
  2537. CIArrayOf<MatchSet> matchsets;
  2538. atomic_t endMarkersPending;
  2539. IJoinProcessor *join;
  2540. mutable CriticalSection crit;
  2541. CJoinGroup *groupStart;
  2542. unsigned candidates;
  2543. };
  2544. void MatchSet::addRightMatch(void * right, offset_t fpos)
  2545. {
  2546. assertex(!jg->complete());
  2547. CriticalBlock b(jg->crit);
  2548. rows.append(right);
  2549. offsets.append(fpos);
  2550. jg->matchcount++;
  2551. }
  2552. offset_t MatchSet::addRightPending()
  2553. {
  2554. assertex(!jg->complete());
  2555. CriticalBlock b(jg->crit);
  2556. offset_t seq = rows.ordinality();
  2557. rows.append(NULL);
  2558. offsets.append(0);
  2559. return seq;
  2560. }
  2561. void MatchSet::setPendingRightMatch(offset_t seq, void * right, offset_t fpos)
  2562. {
  2563. assertex(!jg->complete());
  2564. CriticalBlock b(jg->crit);
  2565. rows.replace(right, (aindex_t)seq);
  2566. offsets.replace(fpos, (aindex_t)seq);
  2567. jg->matchcount++;
  2568. }
  2569. void MatchSet::incRightMatchCount()
  2570. {
  2571. assertex(!jg->complete());
  2572. CriticalBlock b(jg->crit);
  2573. jg->matchcount++;
  2574. }
  2575. class JoinGroupPool : public CInterface
  2576. {
  2577. CJoinGroup *groupStart;
  2578. public:
  2579. CJoinGroup head;
  2580. CriticalSection crit;
  2581. bool preserveGroups;
  2582. JoinGroupPool(bool _preserveGroups)
  2583. {
  2584. head.next = &head;
  2585. head.prev = &head;
  2586. preserveGroups = _preserveGroups;
  2587. groupStart = NULL;
  2588. }
  2589. ~JoinGroupPool()
  2590. {
  2591. CJoinGroup *finger = head.next;
  2592. while (finger != &head)
  2593. {
  2594. CJoinGroup *next = finger->next;
  2595. finger->Release();
  2596. finger = next;
  2597. }
  2598. }
  2599. CJoinGroup *createJoinGroup(const void *row, IJoinProcessor *join)
  2600. {
  2601. CJoinGroup *jg = new CJoinGroup(row, join, groupStart);
  2602. if (preserveGroups && !groupStart)
  2603. {
  2604. jg->notePending(); // Make sure we wait for the group end
  2605. groupStart = jg;
  2606. }
  2607. CriticalBlock c(crit);
  2608. jg->next = &head;
  2609. jg->prev = head.prev;
  2610. head.prev->next = jg;
  2611. head.prev = jg;
  2612. return jg;
  2613. }
  2614. void endGroup()
  2615. {
  2616. if (groupStart)
  2617. groupStart->noteEnd();
  2618. groupStart = NULL;
  2619. }
  2620. void releaseJoinGroup(CJoinGroup *goer)
  2621. {
  2622. CriticalBlock c(crit);
  2623. goer->next->prev = goer->prev;
  2624. goer->prev->next = goer->next;
  2625. goer->Release(); // MORE - could put onto another list to reuse....
  2626. }
  2627. };
  2628. //=============================================================================================
  2629. class DistributedKeyLookupHandler;
  2630. class KeyedLookupPartHandler : extends ThreadedPartHandler<MatchSet>, implements IInterface
  2631. {
  2632. IJoinProcessor &owner;
  2633. Owned<IKeyManager> manager;
  2634. IAgentContext &agent;
  2635. DistributedKeyLookupHandler * tlk;
  2636. unsigned subno;
  2637. public:
  2638. IMPLEMENT_IINTERFACE;
  2639. KeyedLookupPartHandler(IJoinProcessor &_owner, IDistributedFilePart *_part, DistributedKeyLookupHandler * _tlk, unsigned _subno, IThreadPool * _threadPool, IAgentContext &_agent);
  2640. ~KeyedLookupPartHandler()
  2641. {
  2642. while(pending.dequeue())
  2643. ; //do nothing but dequeue as don't own MatchSets
  2644. }
  2645. private:
  2646. virtual void doRequest(MatchSet * ms)
  2647. {
  2648. agent.reportProgress(NULL);
  2649. CJoinGroup * jg = ms->queryJoinGroup();
  2650. owner.readyManager(manager, jg->queryLeft());
  2651. while(manager->lookup(true))
  2652. {
  2653. if(owner.addMatch(ms, manager))
  2654. break;
  2655. }
  2656. jg->noteEnd();
  2657. owner.doneManager(manager);
  2658. }
  2659. virtual void openPart();
  2660. };
  2661. interface IKeyLookupHandler : extends IInterface
  2662. {
  2663. virtual void addRow(const void *row) = 0;
  2664. virtual void stop() = 0;
  2665. };
  2666. class DistributedKeyLookupHandler : public CInterface, implements IThreadedExceptionHandler, implements IKeyLookupHandler
  2667. {
  2668. bool opened;
  2669. IArrayOf<IKeyManager> managers;
  2670. Owned<IRecordLayoutTranslator> trans;
  2671. unsigned subStart;
  2672. UnsignedArray keyNumParts;
  2673. IArrayOf<KeyedLookupPartHandler> parts;
  2674. IArrayOf<IDistributedFile> keyFiles;
  2675. IArrayOf<IDistributedFilePart> tlks;
  2676. IJoinProcessor &owner;
  2677. CriticalSection exceptionCrit;
  2678. IException *exception;
  2679. Linked<IDistributedFile> file;
  2680. PartHandlerThreadFactory<MatchSet> threadFactory;
  2681. Owned<IThreadPool> threadPool;
  2682. IntArray subSizes;
  2683. IAgentContext &agent;
  2684. void addFile(IDistributedFile &f)
  2685. {
  2686. if((f.numParts() == 1) || (f.queryAttributes().hasProp("@local")))
  2687. throw MakeStringException(0, "Superfile %s contained mixed monolithic/local/noroot and regular distributed keys --- not supported", file->queryLogicalName());
  2688. subSizes.append(parts.length());
  2689. unsigned numParts = f.numParts()-1;
  2690. for (unsigned idx = 0; idx < numParts; idx++)
  2691. {
  2692. IDistributedFilePart *part = f.getPart(idx);
  2693. parts.append(*new KeyedLookupPartHandler(owner, part, this, tlks.ordinality(), threadPool, agent));
  2694. }
  2695. keyFiles.append(OLINK(f));
  2696. tlks.append(*f.getPart(numParts));
  2697. keyNumParts.append(numParts);
  2698. }
  2699. public:
  2700. IMPLEMENT_IINTERFACE;
  2701. DistributedKeyLookupHandler(IDistributedFile *f, IJoinProcessor &_owner, IAgentContext &_agent)
  2702. : file(f), owner(_owner), agent(_agent)
  2703. {
  2704. threadPool.setown(createThreadPool("hthor keyed join lookup thread pool", &threadFactory));
  2705. IDistributedSuperFile *super = f->querySuperFile();
  2706. if (super)
  2707. {
  2708. Owned<IDistributedFileIterator> it = super->getSubFileIterator(true);
  2709. ForEach(*it)
  2710. addFile(it->query());
  2711. }
  2712. else
  2713. addFile(*f);
  2714. opened = false;
  2715. exception = NULL;
  2716. }
  2717. ~DistributedKeyLookupHandler()
  2718. {
  2719. threadPool.clear();
  2720. }
  2721. void addRow(const void *row)
  2722. {
  2723. if (owner.leftCanMatch(row))
  2724. {
  2725. if(!opened)
  2726. openTLK();
  2727. CJoinGroup *jg = owner.createJoinGroup(row);
  2728. ForEachItemIn(subno, managers)
  2729. {
  2730. agent.reportProgress(NULL);
  2731. subStart = subSizes.item(subno);
  2732. IKeyManager & manager = managers.item(subno);
  2733. owner.readyManager(&manager, row);
  2734. while(manager.lookup(false))
  2735. {
  2736. unsigned recptr = (unsigned)manager.queryFpos();
  2737. if (recptr)
  2738. {
  2739. jg->notePending();
  2740. parts.item(recptr+subStart-1).addRow(jg->getMatchSet());
  2741. }
  2742. }
  2743. owner.doneManager(&manager);
  2744. }
  2745. jg->noteEnd();
  2746. }
  2747. else
  2748. {
  2749. CJoinGroup *jg = owner.createJoinGroup(row);
  2750. jg->noteEnd();
  2751. }
  2752. }
  2753. void openTLK()
  2754. {
  2755. ForEachItemIn(idx, tlks)
  2756. {
  2757. IDistributedFile & f = keyFiles.item(idx);
  2758. IDistributedFilePart &tlk = tlks.item(idx);
  2759. Owned<IKeyIndex> index = openKeyFile(tlk);
  2760. //Owned<IRecordLayoutTranslator>
  2761. trans.setown(owner.getLayoutTranslator(&f));
  2762. owner.verifyIndex(&f, index, trans);
  2763. Owned<IKeyManager> manager = createKeyManager(index, index->keySize(), NULL);
  2764. if(trans)
  2765. manager->setLayoutTranslator(trans);
  2766. managers.append(*manager.getLink());
  2767. }
  2768. opened = true;
  2769. }
  2770. void stop()
  2771. {
  2772. ForEachItemIn(idx, parts)
  2773. {
  2774. parts.item(idx).stop();
  2775. parts.item(idx).join();
  2776. }
  2777. if (exception)
  2778. throw exception;
  2779. }
  2780. virtual void noteException(IException *E)
  2781. {
  2782. CriticalBlock procedure(exceptionCrit);
  2783. if (exception)
  2784. E->Release();
  2785. else
  2786. exception = E;
  2787. }
  2788. IRecordLayoutTranslator * queryRecordLayoutTranslator() const { return trans; }
  2789. };
  2790. KeyedLookupPartHandler::KeyedLookupPartHandler(IJoinProcessor &_owner, IDistributedFilePart *_part, DistributedKeyLookupHandler * _tlk, unsigned _subno, IThreadPool * _threadPool, IAgentContext &_agent)
  2791. : owner(_owner), ThreadedPartHandler<MatchSet>(_part, _tlk, _threadPool), agent(_agent), tlk(_tlk), subno(_subno)
  2792. {
  2793. }
  2794. void KeyedLookupPartHandler::openPart()
  2795. {
  2796. if(manager)
  2797. return;
  2798. Owned<IKeyIndex> index = openKeyFile(*part);
  2799. manager.setown(createKeyManager(index, index->keySize(), NULL));
  2800. IRecordLayoutTranslator * trans = tlk->queryRecordLayoutTranslator();
  2801. if(trans)
  2802. manager->setLayoutTranslator(trans);
  2803. }
  2804. class MonolithicKeyLookupHandler : public CInterface, implements IKeyLookupHandler
  2805. {
  2806. IArrayOf<IKeyManager> managers;
  2807. Linked<IDistributedFile> file;
  2808. IDistributedSuperFile * super;
  2809. IArrayOf<IDistributedFile> keyFiles;
  2810. IJoinProcessor &owner;
  2811. IAgentContext &agent;
  2812. bool opened;
  2813. public:
  2814. IMPLEMENT_IINTERFACE;
  2815. MonolithicKeyLookupHandler(IDistributedFile *f, IJoinProcessor &_owner, IAgentContext &_agent)
  2816. : file(f), owner(_owner), agent(_agent), opened(false)
  2817. {
  2818. super = f->querySuperFile();
  2819. if (super)
  2820. {
  2821. Owned<IDistributedFileIterator> it = super->getSubFileIterator(true);
  2822. ForEach(*it)
  2823. addFile(it->query());
  2824. }
  2825. else
  2826. addFile(*f);
  2827. }
  2828. void addFile(IDistributedFile &f)
  2829. {
  2830. if((f.numParts() != 1) && (!f.queryAttributes().hasProp("@local")))
  2831. throw MakeStringException(0, "Superfile %s contained mixed monolithic/local/noroot and regular distributed keys --- not supported", file->queryLogicalName());
  2832. keyFiles.append(OLINK(f));
  2833. }
  2834. void addRow(const void *row)
  2835. {
  2836. if (owner.leftCanMatch(row))
  2837. {
  2838. if(!opened)
  2839. openKey();
  2840. CJoinGroup *jg = owner.createJoinGroup(row);
  2841. ForEachItemIn(idx, managers)
  2842. {
  2843. agent.reportProgress(NULL);
  2844. IKeyManager & manager = managers.item(idx);
  2845. owner.readyManager(&manager, row);
  2846. while(manager.lookup(true))
  2847. {
  2848. if(owner.addMatch(jg->getMatchSet(), &manager))
  2849. break;
  2850. }
  2851. owner.doneManager(&manager);
  2852. }
  2853. jg->noteEnd();
  2854. }
  2855. else
  2856. {
  2857. CJoinGroup *jg = owner.createJoinGroup(row);
  2858. jg->noteEnd();
  2859. }
  2860. }
  2861. void openKey()
  2862. {
  2863. ForEachItemIn(idx, keyFiles)
  2864. {
  2865. IDistributedFile & f = keyFiles.item(idx);
  2866. Owned<IRecordLayoutTranslator> trans = owner.getLayoutTranslator(&f);
  2867. Owned<IKeyManager> manager;
  2868. if(f.numParts() == 1)
  2869. {
  2870. Owned<IKeyIndex> index = openKeyFile(f.queryPart(0));
  2871. owner.verifyIndex(&f, index, trans);
  2872. manager.setown(createKeyManager(index, index->keySize(), NULL));
  2873. }
  2874. else
  2875. {
  2876. unsigned num = f.numParts()-1;
  2877. Owned<IKeyIndexSet> parts = createKeyIndexSet();
  2878. Owned<IKeyIndex> index;
  2879. for(unsigned i=0; i<num; ++i)
  2880. {
  2881. index.setown(openKeyFile(f.queryPart(i)));
  2882. parts->addIndex(index.getLink());
  2883. }
  2884. owner.verifyIndex(&f, index, trans);
  2885. manager.setown(createKeyMerger(parts, index->keySize(), 0, NULL));
  2886. }
  2887. if(trans)
  2888. manager->setLayoutTranslator(trans);
  2889. managers.append(*manager.getLink());
  2890. }
  2891. opened = true;
  2892. }
  2893. void stop()
  2894. {
  2895. }
  2896. };
  2897. //------------------------------------------------------------------------------------------
  2898. class KeyedJoinFetchRequest : public CInterface
  2899. {
  2900. public:
  2901. MatchSet * ms;
  2902. offset_t pos;
  2903. offset_t seq;
  2904. KeyedJoinFetchRequest(MatchSet * _ms, offset_t _pos, offset_t _seq) : ms(_ms), pos(_pos), seq(_seq) {}
  2905. };
  2906. class IKeyedJoinFetchHandlerCallback
  2907. {
  2908. public:
  2909. virtual void processFetch(KeyedJoinFetchRequest const * fetch, offset_t pos, ISerialStream *rawStream) = 0;
  2910. };
  2911. class KeyedJoinFetchPartHandler : public FetchPartHandlerBase, public ThreadedPartHandler<KeyedJoinFetchRequest>
  2912. {
  2913. public:
  2914. KeyedJoinFetchPartHandler(IKeyedJoinFetchHandlerCallback & _owner, IDistributedFilePart *_part, offset_t _base, offset_t _size, IThreadedExceptionHandler *_handler, IThreadPool * _threadPool, bool _blockcompressed, MemoryAttr &_encryptionkey, unsigned _activityId, CachedOutputMetaData const & _outputMeta, IOutputRowDeserializer * _rowDeserializer, IEngineRowAllocator *_rowAllocator)
  2915. : FetchPartHandlerBase(_base, _size, _blockcompressed, _encryptionkey, _activityId, _outputMeta, _rowDeserializer, _rowAllocator),
  2916. ThreadedPartHandler<KeyedJoinFetchRequest>(_part, _handler, _threadPool),
  2917. owner(_owner)
  2918. {
  2919. }
  2920. virtual ~KeyedJoinFetchPartHandler()
  2921. {
  2922. while(KeyedJoinFetchRequest * fetch = pending.dequeue())
  2923. fetch->Release();
  2924. }
  2925. IMPLEMENT_IINTERFACE;
  2926. virtual IDistributedFilePart * queryPart() { return part; }
  2927. private:
  2928. virtual void openPart()
  2929. {
  2930. FetchPartHandlerBase::openPart();
  2931. }
  2932. virtual void doRequest(KeyedJoinFetchRequest * _fetch)
  2933. {
  2934. Owned<KeyedJoinFetchRequest> fetch(_fetch);
  2935. offset_t pos = translateFPos(fetch->pos);
  2936. if(pos >= rawFile->size())
  2937. throw MakeStringException(0, "Attempted to fetch at invalid filepos");
  2938. owner.processFetch(fetch, pos, rawStream);
  2939. }
  2940. private:
  2941. IKeyedJoinFetchHandlerCallback & owner;
  2942. };
  2943. class CHThorKeyedJoinActivity : public CHThorThreadedActivityBase, implements IJoinProcessor, public IKeyedJoinFetchHandlerCallback, public IFetchHandlerFactory<KeyedJoinFetchPartHandler>
  2944. {
  2945. PartHandlerThreadFactory<FetchRequest> threadFactory;
  2946. Owned<DistributedFileFetchHandler<KeyedJoinFetchPartHandler, MatchSet *, KeyedJoinFetchRequest> > parts;
  2947. IHThorKeyedJoinArg &helper;
  2948. Owned<IKeyLookupHandler> lookup;
  2949. Owned<IEngineRowAllocator> defaultRightAllocator;
  2950. OwnedConstRoxieRow defaultRight;
  2951. bool leftOuter;
  2952. bool exclude;
  2953. bool extractJoinFields;
  2954. bool limitFail;
  2955. bool limitOnFail;
  2956. bool needsDiskRead;
  2957. unsigned atMost;
  2958. unsigned abortLimit;
  2959. unsigned keepLimit;
  2960. bool preserveOrder;
  2961. bool preserveGroups;
  2962. Owned<JoinGroupPool> pool;
  2963. QueueOf<const void, true> pending;
  2964. CriticalSection statsCrit, imatchCrit, fmatchCrit;
  2965. atomic_t prefiltered;
  2966. atomic_t postfiltered;
  2967. atomic_t skips;
  2968. unsigned seeks;
  2969. unsigned scans;
  2970. OwnedRowArray extractedRows;
  2971. Owned <ILocalOrDistributedFile> ldFile;
  2972. IDistributedFile * dFile;
  2973. IDistributedSuperFile * super;
  2974. CachedOutputMetaData eclKeySize;
  2975. Owned<IOutputRowDeserializer> rowDeserializer;
  2976. Owned<IEngineRowAllocator> diskAllocator;
  2977. public:
  2978. CHThorKeyedJoinActivity(IAgentContext &_agent, unsigned _activityId, unsigned _subgraphId, IHThorKeyedJoinArg &_arg, ThorActivityKind _kind)
  2979. : CHThorThreadedActivityBase(_agent, _activityId, _subgraphId, _arg, _arg, _kind, _arg.queryDiskRecordSize()), helper(_arg), activityRecordMetaBuff(NULL)
  2980. {
  2981. atomic_set(&prefiltered, 0);
  2982. atomic_set(&postfiltered, 0);
  2983. atomic_set(&skips, 0);
  2984. seeks = 0;
  2985. scans = 0;
  2986. eclKeySize.set(helper.queryIndexRecordSize());
  2987. }
  2988. ~CHThorKeyedJoinActivity()
  2989. {
  2990. clearQueue();
  2991. waitForThreads();
  2992. rtlFree(activityRecordMetaBuff);
  2993. }
  2994. virtual bool needsAllocator() const { return true; }
  2995. virtual void ready()
  2996. {
  2997. CHThorThreadedActivityBase::ready();
  2998. preserveOrder = ((helper.getJoinFlags() & JFkeepsorted) != 0) || agent.queryWorkUnit()->getDebugValueBool("__hthor_kj_always_preserve_order", DEFAULT_KJ_PRESERVES_ORDER);
  2999. preserveGroups = helper.queryOutputMeta()->isGrouped();
  3000. needsDiskRead = helper.diskAccessRequired();
  3001. extractJoinFields = ((helper.getJoinFlags() & JFextractjoinfields) != 0);
  3002. atMost = helper.getJoinLimit();
  3003. if (atMost == 0) atMost = (unsigned)-1;
  3004. abortLimit = helper.getMatchAbortLimit();
  3005. if (abortLimit == 0) abortLimit = (unsigned)-1;
  3006. leftOuter = ((helper.getJoinFlags() & JFleftouter) != 0);
  3007. exclude = ((helper.getJoinFlags() & JFexclude) != 0);
  3008. keepLimit = helper.getKeepLimit();
  3009. if (keepLimit == 0) keepLimit = (unsigned)-1;
  3010. rowLimit = helper.getRowLimit();
  3011. pool.setown(new JoinGroupPool(preserveGroups));
  3012. limitOnFail = ((helper.getJoinFlags() & JFonfail) != 0);
  3013. limitFail = !limitOnFail && ((helper.getJoinFlags() & JFmatchAbortLimitSkips) == 0);
  3014. if(leftOuter || limitOnFail)
  3015. {
  3016. if (!defaultRight)
  3017. {
  3018. RtlDynamicRowBuilder rowBuilder(queryRightRowAllocator());
  3019. size32_t thisSize = helper.createDefaultRight(rowBuilder);
  3020. defaultRight.setown(rowBuilder.finalizeRowClear(thisSize));
  3021. }
  3022. }
  3023. if (needsDiskRead)
  3024. {
  3025. rowDeserializer.setown(helper.queryDiskRecordSize()->createDiskDeserializer(agent.queryCodeContext(), activityId));
  3026. diskAllocator.setown(agent.queryCodeContext()->getRowAllocator(helper.queryDiskRecordSize(), activityId));
  3027. }
  3028. }
  3029. virtual void initializeThreadPool()
  3030. {
  3031. threadPool.setown(createThreadPool("hthor keyed join fetch thread pool", &threadFactory));
  3032. }
  3033. virtual void initParts(IDistributedFile * f)
  3034. {
  3035. size32_t kl;
  3036. void *k;
  3037. fetch.getFileEncryptKey(kl,k);
  3038. MemoryAttr encryptionkey;
  3039. encryptionkey.setOwn(kl,k);
  3040. Owned<IEngineRowAllocator> inputRowAllocator;
  3041. if (needsDiskRead)
  3042. {
  3043. inputRowAllocator.setown(agent.queryCodeContext()->getRowAllocator(helper.queryDiskRecordSize(), activityId));
  3044. parts.setown(new DistributedFileFetchHandler<KeyedJoinFetchPartHandler, MatchSet *, KeyedJoinFetchRequest>(f, *this, encryptionkey, rowDeserializer, inputRowAllocator));
  3045. }
  3046. }
  3047. virtual void stopParts()
  3048. {
  3049. if(parts)
  3050. parts->stop();
  3051. }
  3052. virtual bool isGrouped() { return preserveGroups; }
  3053. virtual void waitForThreads()
  3054. {
  3055. aborting = true;
  3056. if (inputThread)
  3057. inputThread->join();
  3058. lookup.clear();
  3059. threadPool.clear();
  3060. }
  3061. virtual void clearQueue()
  3062. {
  3063. while (pending.ordinality())
  3064. ReleaseRoxieRow(pending.dequeue());
  3065. }
  3066. void addRow(const void *row)
  3067. {
  3068. CriticalBlock procedure(pendingCrit);
  3069. pending.enqueue(row);
  3070. avail.signal();
  3071. }
  3072. const void * getRow()
  3073. {
  3074. if (stopped)
  3075. return NULL;
  3076. avail.wait();
  3077. CriticalBlock procedure(pendingCrit);
  3078. if (exception)
  3079. {
  3080. IException *E = exception;
  3081. exception = NULL;
  3082. throw E;
  3083. }
  3084. if (pending.ordinality())
  3085. return pending.dequeue();
  3086. else
  3087. {
  3088. stopped = true;
  3089. return NULL;
  3090. }
  3091. }
  3092. virtual void fetchAll()
  3093. {
  3094. bool eogSeen = false; // arguably true makes more sense
  3095. loop
  3096. {
  3097. if (aborting)
  3098. break;
  3099. const void *row = input->nextInGroup();
  3100. if (!row)
  3101. {
  3102. if (eogSeen)
  3103. break;
  3104. else
  3105. eogSeen = true;
  3106. pool->endGroup();
  3107. }
  3108. else
  3109. {
  3110. eogSeen = false;
  3111. if(lookup)
  3112. {
  3113. lookup->addRow(row);
  3114. }
  3115. else
  3116. {
  3117. CJoinGroup *jg = createJoinGroup(row);
  3118. jg->noteEnd();
  3119. }
  3120. }
  3121. }
  3122. if(lookup)
  3123. lookup->stop();
  3124. if (parts)
  3125. parts->stop();
  3126. stop();
  3127. }
  3128. virtual KeyedJoinFetchPartHandler * createFetchPartHandler(IDistributedFilePart * part, offset_t base, offset_t size, IThreadedExceptionHandler * handler, bool blockcompressed, MemoryAttr &encryptionkey, IOutputRowDeserializer * rowDeserializer, IEngineRowAllocator *rowAllocator)
  3129. {
  3130. return new KeyedJoinFetchPartHandler(*this, part, base, size, handler, threadPool, blockcompressed, encryptionkey, activityId, outputMeta, rowDeserializer, rowAllocator);
  3131. }
  3132. virtual void processFetch(KeyedJoinFetchRequest const * fetch, offset_t pos, ISerialStream *rawStream)
  3133. {
  3134. CThorStreamDeserializerSource deserializeSource;
  3135. deserializeSource.setStream(rawStream);
  3136. deserializeSource.reset(pos);
  3137. RtlDynamicRowBuilder rowBuilder(diskAllocator);
  3138. unsigned sizeRead = rowDeserializer->deserialize(rowBuilder.ensureRow(), deserializeSource);
  3139. OwnedConstRoxieRow row = rowBuilder.finalizeRowClear(sizeRead);
  3140. if(match(fetch->ms, row))
  3141. {
  3142. if(exclude)
  3143. {
  3144. fetch->ms->incRightMatchCount();
  3145. }
  3146. else
  3147. {
  3148. RtlDynamicRowBuilder extractBuilder(queryRightRowAllocator());
  3149. size32_t size = helper.extractJoinFields(extractBuilder, row, fetch->pos, NULL);
  3150. void * ret = (void *) extractBuilder.finalizeRowClear(size);
  3151. fetch->ms->setPendingRightMatch(fetch->seq, ret, fetch->pos);
  3152. }
  3153. }
  3154. fetch->ms->queryJoinGroup()->noteEnd();
  3155. }
  3156. bool match(MatchSet * ms, const void * right)
  3157. {
  3158. CriticalBlock proc(fmatchCrit);
  3159. bool ret = helper.fetchMatch(ms->queryJoinGroup()->queryLeft(), right);
  3160. if (!ret)
  3161. atomic_inc(&postfiltered);
  3162. return ret;
  3163. }
  3164. virtual bool leftCanMatch(const void * _left)
  3165. {
  3166. bool ret = helper.leftCanMatch(_left);
  3167. if (!ret)
  3168. atomic_inc(&prefiltered);
  3169. return ret;
  3170. }
  3171. virtual CJoinGroup *createJoinGroup(const void *row)
  3172. {
  3173. // NOTE - single threaded
  3174. return pool->createJoinGroup(row, this);
  3175. }
  3176. virtual void onComplete(CJoinGroup *jg)
  3177. {
  3178. CriticalBlock c(pool->crit);
  3179. if (preserveOrder)
  3180. {
  3181. CJoinGroup *finger = pool->head.next;
  3182. if(preserveGroups)
  3183. {
  3184. unsigned joinGroupSize = 0;
  3185. Linked<CJoinGroup> firstInGroup = finger;
  3186. while(finger != &pool->head)
  3187. {
  3188. CJoinGroup *next = finger->next;
  3189. if(finger->complete())
  3190. joinGroupSize += doJoinGroup(finger);
  3191. else
  3192. break;
  3193. finger = next;
  3194. if(!finger->inGroup(firstInGroup))
  3195. {
  3196. if(joinGroupSize)
  3197. addRow(NULL);
  3198. joinGroupSize = 0;
  3199. firstInGroup.set(finger);
  3200. }
  3201. }
  3202. assertex(finger == firstInGroup.get());
  3203. }
  3204. else
  3205. {
  3206. while(finger != &pool->head)
  3207. {
  3208. CJoinGroup *next = finger->next;
  3209. if(finger->complete())
  3210. doJoinGroup(finger);
  3211. else
  3212. break;
  3213. finger = next;
  3214. }
  3215. }
  3216. }
  3217. else if (preserveGroups)
  3218. {
  3219. Linked<CJoinGroup> head = jg; // Must avoid releasing head until the end, or while loop can overrun if head is reused
  3220. assertex(jg->inGroup(jg));
  3221. CJoinGroup *finger = jg;
  3222. unsigned joinGroupSize = 0;
  3223. while (finger->inGroup(jg))
  3224. {
  3225. CJoinGroup *next = finger->next;
  3226. joinGroupSize += doJoinGroup(finger);
  3227. finger = next;
  3228. }
  3229. if (joinGroupSize)
  3230. addRow(NULL);
  3231. }
  3232. else
  3233. doJoinGroup(jg);
  3234. }
  3235. void failLimit(const void * left)
  3236. {
  3237. helper.onMatchAbortLimitExceeded();
  3238. CommonXmlWriter xmlwrite(0);
  3239. if (input && input->queryOutputMeta() && input->queryOutputMeta()->hasXML())
  3240. {
  3241. input->queryOutputMeta()->toXML((byte *) left, xmlwrite);
  3242. }
  3243. throw MakeStringException(0, "More than %d match candidates in keyed join for row %s", abortLimit, xmlwrite.str());
  3244. }
  3245. unsigned doJoinGroup(CJoinGroup *jg)
  3246. {
  3247. unsigned matched = jg->rowsSeen();
  3248. unsigned added = 0;
  3249. const void *left = jg->queryLeft();
  3250. if (jg->candidateCount() > abortLimit)
  3251. {
  3252. if(limitFail)
  3253. failLimit(left);
  3254. if(limitOnFail)
  3255. {
  3256. Owned<IException> except;
  3257. try
  3258. {
  3259. failLimit(left);
  3260. }
  3261. catch(IException * e)
  3262. {
  3263. except.setown(e);
  3264. }
  3265. assertex(except);
  3266. size32_t transformedSize;
  3267. RtlDynamicRowBuilder rowBuilder(rowAllocator);
  3268. try
  3269. {
  3270. transformedSize = helper.onFailTransform(rowBuilder, left, defaultRight, 0, except);
  3271. }
  3272. catch(IException * e)
  3273. {
  3274. throw makeWrappedException(e);
  3275. }
  3276. if(transformedSize)
  3277. {
  3278. const void * shrunk = rowBuilder.finalizeRowClear(transformedSize);
  3279. addRow(shrunk);
  3280. added++;
  3281. }
  3282. else
  3283. {
  3284. atomic_inc(&skips);
  3285. }
  3286. }
  3287. else
  3288. return 0;
  3289. }
  3290. else if(!matched || jg->candidateCount() > atMost)
  3291. {
  3292. if(leftOuter)
  3293. {
  3294. switch(kind)
  3295. {
  3296. case TAKkeyedjoin:
  3297. case TAKkeyeddenormalizegroup:
  3298. {
  3299. size32_t transformedSize = 0;
  3300. try
  3301. {
  3302. RtlDynamicRowBuilder rowBuilder(rowAllocator);
  3303. if (kind == TAKkeyedjoin)
  3304. transformedSize = helper.transform(rowBuilder, left, defaultRight, 0);
  3305. else if (kind == TAKkeyeddenormalizegroup)
  3306. transformedSize = helper.transform(rowBuilder, left, defaultRight, 0, (const void * *)NULL);
  3307. if (transformedSize)
  3308. {
  3309. const void * shrunk = rowBuilder.finalizeRowClear(transformedSize);
  3310. addRow(shrunk);
  3311. added++;
  3312. }
  3313. else
  3314. {
  3315. atomic_inc(&skips);
  3316. }
  3317. }
  3318. catch(IException * e)
  3319. {
  3320. throw makeWrappedException(e);
  3321. }
  3322. break;
  3323. }
  3324. case TAKkeyeddenormalize:
  3325. {
  3326. LinkRoxieRow(left);
  3327. addRow((void *) left );
  3328. added++;
  3329. break;
  3330. }
  3331. default:
  3332. throwUnexpected();
  3333. }
  3334. }
  3335. }
  3336. else if(!exclude)
  3337. {
  3338. switch(kind)
  3339. {
  3340. case TAKkeyedjoin:
  3341. {
  3342. if(jg->matches.start())
  3343. {
  3344. do
  3345. {
  3346. try
  3347. {
  3348. RtlDynamicRowBuilder rowBuilder(rowAllocator);
  3349. void const * row = jg->matches.queryRow();
  3350. if(!row) continue;
  3351. offset_t fpos = jg->matches.queryOffset();
  3352. size32_t transformedSize;
  3353. transformedSize = helper.transform(rowBuilder, left, row, fpos);
  3354. if (transformedSize)
  3355. {
  3356. const void * shrunk = rowBuilder.finalizeRowClear(transformedSize);
  3357. addRow(shrunk);
  3358. added++;
  3359. if (added==keepLimit)
  3360. break;
  3361. }
  3362. else
  3363. {
  3364. atomic_inc(&skips);
  3365. }
  3366. }
  3367. catch(IException * e)
  3368. {
  3369. throw makeWrappedException(e);
  3370. }
  3371. } while(jg->matches.next());
  3372. }
  3373. break;
  3374. }
  3375. case TAKkeyeddenormalize:
  3376. {
  3377. OwnedConstRoxieRow newLeft;
  3378. newLeft.set(left);
  3379. unsigned rowSize = 0;
  3380. unsigned count = 0;
  3381. unsigned rightAdded = 0;
  3382. if(jg->matches.start())
  3383. {
  3384. do
  3385. {
  3386. void const * row = jg->matches.queryRow();
  3387. if(!row) continue;
  3388. ++count;
  3389. offset_t fpos = jg->matches.queryOffset();
  3390. size32_t transformedSize;
  3391. try
  3392. {
  3393. RtlDynamicRowBuilder rowBuilder(rowAllocator);
  3394. transformedSize = helper.transform(rowBuilder, newLeft, row, fpos, count);
  3395. if (transformedSize)
  3396. {
  3397. newLeft.setown(rowBuilder.finalizeRowClear(transformedSize));
  3398. rowSize = transformedSize;
  3399. rightAdded++;
  3400. if (rightAdded==keepLimit)
  3401. break;
  3402. }
  3403. else
  3404. {
  3405. atomic_inc(&skips);
  3406. }
  3407. }
  3408. catch(IException * e)
  3409. {
  3410. throw makeWrappedException(e);
  3411. }
  3412. } while(jg->matches.next());
  3413. }
  3414. if (rowSize)
  3415. {
  3416. addRow(newLeft.getClear());
  3417. ReleaseRoxieRow(newLeft);
  3418. added++;
  3419. }
  3420. break;
  3421. }
  3422. case TAKkeyeddenormalizegroup:
  3423. {
  3424. extractedRows.clear();
  3425. unsigned count = 0;
  3426. if(jg->matches.start())
  3427. do
  3428. {
  3429. const void * row = jg->matches.queryRow();
  3430. if(!row) continue;
  3431. if(++count > keepLimit)
  3432. break;
  3433. LinkRoxieRow(row);
  3434. extractedRows.append(row);
  3435. } while(jg->matches.next());
  3436. size32_t transformedSize;
  3437. try
  3438. {
  3439. RtlDynamicRowBuilder rowBuilder(rowAllocator);
  3440. transformedSize = helper.transform(rowBuilder, left, extractedRows.item(0), extractedRows.ordinality(), (const void * *)extractedRows.getArray());
  3441. extractedRows.clear();
  3442. if (transformedSize)
  3443. {
  3444. const void * shrunk = rowBuilder.finalizeRowClear(transformedSize);
  3445. addRow(shrunk);
  3446. added++;
  3447. }
  3448. else
  3449. {
  3450. atomic_inc(&skips);
  3451. }
  3452. }
  3453. catch(IException * e)
  3454. {
  3455. throw makeWrappedException(e);
  3456. }
  3457. break;
  3458. }
  3459. default:
  3460. throwUnexpected();
  3461. }
  3462. }
  3463. pool->releaseJoinGroup(jg); // releases link to gotten row
  3464. return added;
  3465. }
  3466. static bool useMonolithic(IDistributedFile & f)
  3467. {
  3468. return ((f.numParts() == 1) || (f.queryAttributes().hasProp("@local")));
  3469. }
  3470. virtual void start()
  3471. {
  3472. OwnedRoxieString lfn(helper.getIndexFileName());
  3473. ldFile.setown(agent.resolveLFN(lfn, "KeyedJoin", 0 != (helper.getJoinFlags() & JFindexoptional)));
  3474. dFile = ldFile ? ldFile->queryDistributedFile() : NULL;
  3475. if (dFile)
  3476. {
  3477. Owned<IDistributedFile> odFile;
  3478. odFile.setown(dFile);
  3479. LINK(odFile);
  3480. enterSingletonSuperfiles(odFile);
  3481. bool mono;
  3482. super = dFile->querySuperFile();
  3483. if(super)
  3484. {
  3485. if(super->numSubFiles()==0)
  3486. throw MakeStringException(0, "Superkey %s empty", super->queryLogicalName());
  3487. mono = useMonolithic(super->querySubFile(0));
  3488. }
  3489. else
  3490. {
  3491. mono = useMonolithic(*dFile);
  3492. }
  3493. if (mono)
  3494. lookup.setown(new MonolithicKeyLookupHandler(dFile, *this, agent));
  3495. else
  3496. lookup.setown(new DistributedKeyLookupHandler(dFile, *this, agent));
  3497. agent.logFileAccess(dFile, "HThor", "READ");
  3498. }
  3499. else
  3500. {
  3501. StringBuffer buff;
  3502. buff.append("Skipping OPT keyed join against nonexistent file ").append(lfn);
  3503. WARNLOG("%s", buff.str());
  3504. agent.addWuException(buff.str(), 0, ExceptionSeverityWarning, "hthor");
  3505. }
  3506. CHThorThreadedActivityBase::start();
  3507. }
  3508. virtual void readyManager(IKeyManager * manager, const void * row)
  3509. {
  3510. helper.createSegmentMonitors(manager, row);
  3511. manager->finishSegmentMonitors();
  3512. manager->reset();
  3513. manager->resetCounts();
  3514. }
  3515. virtual void doneManager(IKeyManager * manager)
  3516. {
  3517. manager->releaseSegmentMonitors();
  3518. CriticalBlock b(statsCrit);
  3519. seeks += manager->querySeeks();
  3520. scans += manager->queryScans();
  3521. }
  3522. virtual bool addMatch(MatchSet * ms, IKeyManager * manager)
  3523. {
  3524. CJoinGroup * jg = ms->queryJoinGroup();
  3525. unsigned candTotal = jg->noteCandidate();
  3526. if (candTotal > atMost || candTotal > abortLimit)
  3527. {
  3528. if ( agent.queryCodeContext()->queryDebugContext())
  3529. agent.queryCodeContext()->queryDebugContext()->checkBreakpoint(DebugStateLimit, NULL, static_cast<IActivityBase *>(this));
  3530. return true;
  3531. }
  3532. KLBlobProviderAdapter adapter(manager);
  3533. offset_t recptr;
  3534. byte const * rhs = manager->queryKeyBuffer(recptr);
  3535. if(indexReadMatch(jg->queryLeft(), rhs, recptr, &adapter))
  3536. {
  3537. if(needsDiskRead)
  3538. {
  3539. jg->notePending();
  3540. offset_t seq = ms->addRightPending();
  3541. parts->addRow(ms, recptr, seq);
  3542. }
  3543. else
  3544. {
  3545. if(exclude)
  3546. ms->incRightMatchCount();
  3547. else
  3548. {
  3549. RtlDynamicRowBuilder rowBuilder(queryRightRowAllocator());
  3550. size32_t size = helper.extractJoinFields(rowBuilder, rhs, recptr, &adapter);
  3551. void * ret = (void *)rowBuilder.finalizeRowClear(size);
  3552. ms->addRightMatch(ret, recptr);
  3553. }
  3554. }
  3555. }
  3556. else
  3557. {
  3558. atomic_inc(&postfiltered);
  3559. }
  3560. return false;
  3561. }
  3562. bool indexReadMatch(const void * indexRow, const void * inputRow, unsigned __int64 keyedFpos, IBlobProvider * blobs)
  3563. {
  3564. CriticalBlock proc(imatchCrit);
  3565. return helper.indexReadMatch(indexRow, inputRow, keyedFpos, blobs);
  3566. }
  3567. IEngineRowAllocator * queryRightRowAllocator()
  3568. {
  3569. if (!defaultRightAllocator)
  3570. defaultRightAllocator.setown(agent.queryCodeContext()->getRowAllocator(helper.queryJoinFieldsRecordSize(), activityId));
  3571. return defaultRightAllocator;
  3572. }
  3573. virtual void onLimitExceeded()
  3574. {
  3575. helper.onLimitExceeded();
  3576. }
  3577. virtual void updateProgress(IWUGraphProgress &progress) const
  3578. {
  3579. CHThorThreadedActivityBase::updateProgress(progress);
  3580. IPropertyTree &node = progress.updateNode(subgraphId, activityId);
  3581. setProgress(node, "prefiltered", atomic_read(&prefiltered));
  3582. setProgress(node, "postfiltered", atomic_read(&postfiltered));
  3583. setProgress(node, "skips", atomic_read(&skips));
  3584. setProgress(node, "seeks", seeks);
  3585. setProgress(node, "scans", scans);
  3586. }
  3587. protected:
  3588. void * activityRecordMetaBuff;
  3589. size32_t activityRecordMetaSize;
  3590. Owned<IDefRecordMeta> activityRecordMeta;
  3591. virtual IRecordLayoutTranslator * getLayoutTranslator(IDistributedFile * f)
  3592. {
  3593. if(agent.queryWorkUnit()->getDebugValueBool("skipFileFormatCrcCheck", false))
  3594. {
  3595. return NULL;
  3596. }
  3597. if(!rltEnabled(agent.queryWorkUnit()))
  3598. {
  3599. verifyFormatCrc(helper.getIndexFormatCrc(), f, super ? super->queryLogicalName() : NULL, true, true);
  3600. return NULL;
  3601. }
  3602. if(verifyFormatCrc(helper.getIndexFormatCrc(), f, super ? super->queryLogicalName() : NULL, true, false))
  3603. {
  3604. return NULL;
  3605. }
  3606. if(!activityRecordMeta)
  3607. {
  3608. if(!helper.getIndexLayout(activityRecordMetaSize, activityRecordMetaBuff))
  3609. throw MakeStringException(0, "Unable to recover from record layout mismatch for index %s: no record layout metadata in activity", f->queryLogicalName());
  3610. MemoryBuffer buff;
  3611. buff.setBuffer(activityRecordMetaSize, activityRecordMetaBuff, false);
  3612. activityRecordMeta.setown(deserializeRecordMeta(buff, true));
  3613. }
  3614. return getRecordLayoutTranslator(activityRecordMeta, activityRecordMetaSize, activityRecordMetaBuff, f, agent.queryRecordLayoutTranslatorCache());
  3615. }
  3616. virtual void verifyIndex(IDistributedFile * f, IKeyIndex * idx, IRecordLayoutTranslator * trans)
  3617. {
  3618. if (eclKeySize.isFixedSize())
  3619. {
  3620. if(trans)
  3621. trans->checkSizes(f->queryLogicalName(), eclKeySize.getFixedSize(), idx->keySize());
  3622. else
  3623. if(idx->keySize() != eclKeySize.getFixedSize())
  3624. throw MakeStringException(1002, "Key size mismatch on key %s: key file indicates record size should be %u, but ECL declaration was %u", f->queryLogicalName(), idx->keySize(), eclKeySize.getFixedSize());
  3625. }
  3626. }
  3627. virtual void verifyFetchFormatCrc(IDistributedFile * f)
  3628. {
  3629. if(!agent.queryWorkUnit()->getDebugValueBool("skipFileFormatCrcCheck", false))
  3630. ::verifyFormatCrcSuper(helper.getDiskFormatCrc(), f, false, true);
  3631. }
  3632. virtual void warn(char const * msg)
  3633. {
  3634. StringBuffer buff;
  3635. buff.append(msg).append(" for index ").append(dFile->queryLogicalName());
  3636. WARNLOG("%s", buff.str());
  3637. agent.addWuException(buff.str(), 0, ExceptionSeverityWarning, "hthor");
  3638. }
  3639. virtual void fail(char const * msg)
  3640. {
  3641. throw MakeStringExceptionDirect(0, msg);
  3642. }
  3643. };
  3644. extern HTHOR_API IHThorActivity *createKeyedJoinActivity(IAgentContext &_agent, unsigned _activityId, unsigned _subgraphId, IHThorKeyedJoinArg &arg, ThorActivityKind _kind)
  3645. {
  3646. return new CHThorKeyedJoinActivity(_agent, _activityId, _subgraphId, arg, _kind);
  3647. }