cassandrawu.cpp 181 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323132413251326132713281329133013311332133313341335133613371338133913401341134213431344134513461347134813491350135113521353135413551356135713581359136013611362136313641365136613671368136913701371137213731374137513761377137813791380138113821383138413851386138713881389139013911392139313941395139613971398139914001401140214031404140514061407140814091410141114121413141414151416141714181419142014211422142314241425142614271428142914301431143214331434143514361437143814391440144114421443144414451446144714481449145014511452145314541455145614571458145914601461146214631464146514661467146814691470147114721473147414751476147714781479148014811482148314841485148614871488148914901491149214931494149514961497149814991500150115021503150415051506150715081509151015111512151315141515151615171518151915201521152215231524152515261527152815291530153115321533153415351536153715381539154015411542154315441545154615471548154915501551155215531554155515561557155815591560156115621563156415651566156715681569157015711572157315741575157615771578157915801581158215831584158515861587158815891590159115921593159415951596159715981599160016011602160316041605160616071608160916101611161216131614161516161617161816191620162116221623162416251626162716281629163016311632163316341635163616371638163916401641164216431644164516461647164816491650165116521653165416551656165716581659166016611662166316641665166616671668166916701671167216731674167516761677167816791680168116821683168416851686168716881689169016911692169316941695169616971698169917001701170217031704170517061707170817091710171117121713171417151716171717181719172017211722172317241725172617271728172917301731173217331734173517361737173817391740174117421743174417451746174717481749175017511752175317541755175617571758175917601761176217631764176517661767176817691770177117721773177417751776177717781779178017811782178317841785178617871788178917901791179217931794179517961797179817991800180118021803180418051806180718081809181018111812181318141815181618171818181918201821182218231824182518261827182818291830183118321833183418351836183718381839184018411842184318441845184618471848184918501851185218531854185518561857185818591860186118621863186418651866186718681869187018711872187318741875187618771878187918801881188218831884188518861887188818891890189118921893189418951896189718981899190019011902190319041905190619071908190919101911191219131914191519161917191819191920192119221923192419251926192719281929193019311932193319341935193619371938193919401941194219431944194519461947194819491950195119521953195419551956195719581959196019611962196319641965196619671968196919701971197219731974197519761977197819791980198119821983198419851986198719881989199019911992199319941995199619971998199920002001200220032004200520062007200820092010201120122013201420152016201720182019202020212022202320242025202620272028202920302031203220332034203520362037203820392040204120422043204420452046204720482049205020512052205320542055205620572058205920602061206220632064206520662067206820692070207120722073207420752076207720782079208020812082208320842085208620872088208920902091209220932094209520962097209820992100210121022103210421052106210721082109211021112112211321142115211621172118211921202121212221232124212521262127212821292130213121322133213421352136213721382139214021412142214321442145214621472148214921502151215221532154215521562157215821592160216121622163216421652166216721682169217021712172217321742175217621772178217921802181218221832184218521862187218821892190219121922193219421952196219721982199220022012202220322042205220622072208220922102211221222132214221522162217221822192220222122222223222422252226222722282229223022312232223322342235223622372238223922402241224222432244224522462247224822492250225122522253225422552256225722582259226022612262226322642265226622672268226922702271227222732274227522762277227822792280228122822283228422852286228722882289229022912292229322942295229622972298229923002301230223032304230523062307230823092310231123122313231423152316231723182319232023212322232323242325232623272328232923302331233223332334233523362337233823392340234123422343234423452346234723482349235023512352235323542355235623572358235923602361236223632364236523662367236823692370237123722373237423752376237723782379238023812382238323842385238623872388238923902391239223932394239523962397239823992400240124022403240424052406240724082409241024112412241324142415241624172418241924202421242224232424242524262427242824292430243124322433243424352436243724382439244024412442244324442445244624472448244924502451245224532454245524562457245824592460246124622463246424652466246724682469247024712472247324742475247624772478247924802481248224832484248524862487248824892490249124922493249424952496249724982499250025012502250325042505250625072508250925102511251225132514251525162517251825192520252125222523252425252526252725282529253025312532253325342535253625372538253925402541254225432544254525462547254825492550255125522553255425552556255725582559256025612562256325642565256625672568256925702571257225732574257525762577257825792580258125822583258425852586258725882589259025912592259325942595259625972598259926002601260226032604260526062607260826092610261126122613261426152616261726182619262026212622262326242625262626272628262926302631263226332634263526362637263826392640264126422643264426452646264726482649265026512652265326542655265626572658265926602661266226632664266526662667266826692670267126722673267426752676267726782679268026812682268326842685268626872688268926902691269226932694269526962697269826992700270127022703270427052706270727082709271027112712271327142715271627172718271927202721272227232724272527262727272827292730273127322733273427352736273727382739274027412742274327442745274627472748274927502751275227532754275527562757275827592760276127622763276427652766276727682769277027712772277327742775277627772778277927802781278227832784278527862787278827892790279127922793279427952796279727982799280028012802280328042805280628072808280928102811281228132814281528162817281828192820282128222823282428252826282728282829283028312832283328342835283628372838283928402841284228432844284528462847284828492850285128522853285428552856285728582859286028612862286328642865286628672868286928702871287228732874287528762877287828792880288128822883288428852886288728882889289028912892289328942895289628972898289929002901290229032904290529062907290829092910291129122913291429152916291729182919292029212922292329242925292629272928292929302931293229332934293529362937293829392940294129422943294429452946294729482949295029512952295329542955295629572958295929602961296229632964296529662967296829692970297129722973297429752976297729782979298029812982298329842985298629872988298929902991299229932994299529962997299829993000300130023003300430053006300730083009301030113012301330143015301630173018301930203021302230233024302530263027302830293030303130323033303430353036303730383039304030413042304330443045304630473048304930503051305230533054305530563057305830593060306130623063306430653066306730683069307030713072307330743075307630773078307930803081308230833084308530863087308830893090309130923093309430953096309730983099310031013102310331043105310631073108310931103111311231133114311531163117311831193120312131223123312431253126312731283129313031313132313331343135313631373138313931403141314231433144314531463147314831493150315131523153315431553156315731583159316031613162316331643165316631673168316931703171317231733174317531763177317831793180318131823183318431853186318731883189319031913192319331943195319631973198319932003201320232033204320532063207320832093210321132123213321432153216321732183219322032213222322332243225322632273228322932303231323232333234323532363237323832393240324132423243324432453246324732483249325032513252325332543255325632573258325932603261326232633264326532663267326832693270327132723273327432753276327732783279328032813282328332843285328632873288328932903291329232933294329532963297329832993300330133023303330433053306330733083309331033113312331333143315331633173318331933203321332233233324332533263327332833293330333133323333333433353336333733383339334033413342334333443345334633473348334933503351335233533354335533563357335833593360336133623363336433653366336733683369337033713372337333743375337633773378337933803381338233833384338533863387338833893390339133923393339433953396339733983399340034013402340334043405340634073408340934103411341234133414341534163417341834193420342134223423342434253426342734283429343034313432343334343435343634373438343934403441344234433444344534463447344834493450345134523453345434553456345734583459346034613462346334643465346634673468346934703471347234733474347534763477347834793480348134823483348434853486348734883489349034913492349334943495349634973498349935003501350235033504350535063507350835093510351135123513351435153516351735183519352035213522352335243525352635273528352935303531353235333534353535363537353835393540354135423543354435453546354735483549355035513552355335543555355635573558355935603561356235633564356535663567356835693570357135723573357435753576357735783579358035813582358335843585358635873588358935903591359235933594359535963597359835993600360136023603360436053606360736083609361036113612361336143615361636173618361936203621362236233624362536263627362836293630363136323633363436353636363736383639364036413642364336443645364636473648364936503651365236533654365536563657365836593660366136623663366436653666366736683669367036713672367336743675367636773678367936803681368236833684368536863687368836893690369136923693369436953696369736983699370037013702370337043705370637073708370937103711371237133714371537163717371837193720372137223723372437253726372737283729373037313732373337343735373637373738373937403741374237433744374537463747374837493750375137523753375437553756375737583759376037613762376337643765376637673768376937703771377237733774377537763777377837793780378137823783378437853786378737883789379037913792379337943795379637973798379938003801380238033804380538063807380838093810381138123813381438153816381738183819382038213822382338243825382638273828382938303831383238333834383538363837383838393840384138423843384438453846384738483849385038513852385338543855385638573858385938603861386238633864386538663867386838693870387138723873387438753876387738783879388038813882388338843885388638873888388938903891389238933894389538963897389838993900390139023903390439053906390739083909391039113912391339143915391639173918391939203921392239233924392539263927392839293930393139323933393439353936393739383939394039413942394339443945394639473948394939503951395239533954395539563957395839593960396139623963396439653966396739683969397039713972397339743975397639773978397939803981398239833984398539863987398839893990399139923993399439953996399739983999400040014002400340044005400640074008400940104011401240134014401540164017401840194020402140224023402440254026402740284029403040314032403340344035403640374038403940404041404240434044404540464047404840494050405140524053405440554056405740584059406040614062406340644065406640674068406940704071407240734074407540764077407840794080408140824083408440854086408740884089409040914092409340944095409640974098409941004101410241034104410541064107410841094110411141124113411441154116411741184119412041214122412341244125412641274128412941304131413241334134413541364137413841394140414141424143414441454146414741484149415041514152415341544155415641574158415941604161416241634164416541664167416841694170417141724173417441754176417741784179418041814182418341844185418641874188418941904191419241934194419541964197419841994200420142024203420442054206420742084209421042114212421342144215421642174218421942204221422242234224422542264227422842294230423142324233423442354236423742384239424042414242424342444245424642474248424942504251425242534254425542564257425842594260426142624263426442654266426742684269427042714272427342744275427642774278427942804281428242834284428542864287428842894290429142924293429442954296429742984299430043014302430343044305430643074308430943104311431243134314431543164317431843194320432143224323432443254326432743284329433043314332433343344335433643374338433943404341434243434344434543464347434843494350435143524353435443554356435743584359436043614362436343644365436643674368436943704371437243734374437543764377437843794380438143824383438443854386438743884389439043914392439343944395439643974398439944004401440244034404440544064407440844094410441144124413441444154416441744184419442044214422442344244425442644274428442944304431443244334434443544364437443844394440444144424443444444454446444744484449445044514452445344544455445644574458445944604461446244634464446544664467446844694470447144724473447444754476447744784479448044814482
  1. /*##############################################################################
  2. HPCC SYSTEMS software Copyright (C) 2013 HPCC Systems.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. ############################################################################## */
  13. #include "platform.h"
  14. #include "cassandra.h"
  15. #include "jexcept.hpp"
  16. #include "jthread.hpp"
  17. #include "hqlplugins.hpp"
  18. #include "deftype.hpp"
  19. #include "eclhelper.hpp"
  20. #include "eclrtl.hpp"
  21. #include "eclrtl_imp.hpp"
  22. #include "rtlds_imp.hpp"
  23. #include "rtlfield.hpp"
  24. #include "rtlembed.hpp"
  25. #include "roxiemem.hpp"
  26. #include "nbcd.hpp"
  27. #include "jsort.hpp"
  28. #include "jptree.hpp"
  29. #include "jlzw.hpp"
  30. #include "jregexp.hpp"
  31. #include "dadfs.hpp"
  32. #include "dasds.hpp"
  33. #include "wuerror.hpp"
  34. #include "workunit.hpp"
  35. #include "workunit.ipp"
  36. #include "cassandraembed.hpp"
  37. #define EXPORT DECL_EXPORT
  38. namespace cassandraembed {
  39. #define CASS_WU_QUERY_EXPIRES (1000*60*5)
  40. #define CASS_WORKUNIT_POSTSORT_LIMIT 10000
  41. #define DEFAULT_PREFIX_SIZE 2
  42. #define MIN_PREFIX_SIZE 2
  43. #define MAX_PREFIX_SIZE 8
  44. #define DEFAULT_PARTITIONS 2
  45. #define MIN_PARTITIONS 1
  46. #define MAX_PARTITIONS 10
  47. static const CassValue *getSingleResult(const CassResult *result)
  48. {
  49. const CassRow *row = cass_result_first_row(result);
  50. if (row)
  51. return cass_row_get_column(row, 0);
  52. else
  53. return NULL;
  54. }
  55. static StringBuffer &getCassString(StringBuffer &str, const CassValue *value)
  56. {
  57. const char *output;
  58. size_t length;
  59. check(cass_value_get_string(value, &output, &length));
  60. return str.append(length, output);
  61. }
  62. struct CassandraXmlMapping;
  63. interface ICassandraSession : public IInterface // MORE - rename!
  64. {
  65. virtual CassSession *querySession() const = 0;
  66. virtual CassandraPrepared *prepareStatement(const char *query) const = 0;
  67. virtual void executeAsync(CIArrayOf<CassandraStatement> &batch, const char *what) const = 0;
  68. virtual unsigned queryTraceLevel() const = 0;
  69. virtual const CassResult *fetchDataForWuid(const CassandraXmlMapping *mappings, const char *wuid, bool includeWuid) const = 0;
  70. virtual const CassResult *fetchDataForWuidAndKey(const CassandraXmlMapping *mappings, const char *wuid, const char *key) const = 0;
  71. virtual void deleteChildByWuid(const CassandraXmlMapping *mappings, const char *wuid, CassBatch *batch) const = 0;
  72. virtual IPTree *cassandraToWorkunitXML(const char *wuid) const = 0;
  73. virtual unsigned queryPrefixSize() const = 0;
  74. virtual unsigned queryPartitions() const = 0;
  75. };
  76. struct CassandraColumnMapper
  77. {
  78. virtual ~CassandraColumnMapper() {}
  79. virtual IPTree *toXML(IPTree *row, const char *name, const CassValue *value) = 0;
  80. virtual bool fromXML(const ICassandraSession *session, CassandraStatement *statement, unsigned idx, IPTree *row, const char *name, const char *userVal) = 0;
  81. };
  82. static class StringColumnMapper : implements CassandraColumnMapper
  83. {
  84. public:
  85. virtual IPTree *toXML(IPTree *row, const char *name, const CassValue *value) override
  86. {
  87. rtlDataAttr str;
  88. unsigned chars;
  89. getUTF8Result(NULL, value, chars, str.refstr());
  90. StringAttr s(str.getstr(), rtlUtf8Size(chars, str.getstr()));
  91. row->setProp(name, s);
  92. return row;
  93. }
  94. virtual bool fromXML(const ICassandraSession *session, CassandraStatement *statement, unsigned idx, IPTree *row, const char *name, const char *userVal) override
  95. {
  96. const char *value = row->queryProp(name);
  97. if (!value)
  98. return false;
  99. if (statement)
  100. statement->bindString(idx, value);
  101. return true;
  102. }
  103. } stringColumnMapper;
  104. static class RequiredStringColumnMapper : public StringColumnMapper
  105. {
  106. public:
  107. virtual bool fromXML(const ICassandraSession *session, CassandraStatement *statement, unsigned idx, IPTree *row, const char *name, const char *userVal) override
  108. {
  109. const char *value = row->queryProp(name);
  110. if (!value)
  111. value = "";
  112. if (statement)
  113. statement->bindString(idx, value);
  114. return true;
  115. }
  116. } requiredStringColumnMapper;
  117. static class SuppliedStringColumnMapper : public StringColumnMapper
  118. {
  119. public:
  120. virtual bool fromXML(const ICassandraSession *session, CassandraStatement *statement, unsigned idx, IPTree *row, const char *name, const char *userVal) override
  121. {
  122. if (statement)
  123. statement->bindString(idx, userVal);
  124. return true;
  125. }
  126. } suppliedStringColumnMapper;
  127. static class BlobColumnMapper : implements CassandraColumnMapper
  128. {
  129. public:
  130. virtual IPTree *toXML(IPTree *row, const char *name, const CassValue *value) override
  131. {
  132. rtlDataAttr str;
  133. unsigned chars;
  134. getDataResult(NULL, value, chars, str.refdata());
  135. row->setPropBin(name, chars, str.getbytes());
  136. return row;
  137. }
  138. virtual bool fromXML(const ICassandraSession *session, CassandraStatement *statement, unsigned idx, IPTree *row, const char *name, const char *userVal) override
  139. {
  140. MemoryBuffer value;
  141. row->getPropBin(name, value);
  142. if (value.length())
  143. {
  144. if (statement)
  145. statement->bindBytes(idx, (const cass_byte_t *) value.toByteArray(), value.length());
  146. return true;
  147. }
  148. else
  149. return false;
  150. }
  151. } blobColumnMapper;
  152. static class compressTreeColumnMapper : implements CassandraColumnMapper
  153. {
  154. public:
  155. virtual IPTree *toXML(IPTree *row, const char *name, const CassValue *value) override
  156. {
  157. rtlDataAttr str;
  158. unsigned chars;
  159. getDataResult(NULL, value, chars, str.refdata());
  160. if (chars)
  161. {
  162. MemoryBuffer compressed, decompressed;
  163. compressed.setBuffer(chars, str.getbytes(), false);
  164. decompressToBuffer(decompressed, compressed);
  165. Owned<IPTree> p = createPTree(decompressed);
  166. row->setPropTree(name, p.getClear());
  167. }
  168. return row;
  169. }
  170. virtual bool fromXML(const ICassandraSession *session, CassandraStatement *statement, unsigned idx, IPTree *row, const char *name, const char *userVal) override
  171. {
  172. IPTree *child = row->queryPropTree(name);
  173. if (child && child->hasChildren())
  174. {
  175. if (statement)
  176. {
  177. MemoryBuffer decompressed, compressed;
  178. child->serialize(decompressed);
  179. compressToBuffer(compressed, decompressed.length(), decompressed.toByteArray());
  180. statement->bindBytes(idx, (const cass_byte_t *) compressed.toByteArray(), compressed.length());
  181. }
  182. return true;
  183. }
  184. else
  185. return false;
  186. }
  187. } compressTreeColumnMapper;
  188. static class TimeStampColumnMapper : implements CassandraColumnMapper
  189. {
  190. public:
  191. virtual IPTree *toXML(IPTree *row, const char *name, const CassValue *value) override
  192. {
  193. // never fetched (that may change?)
  194. return row;
  195. }
  196. virtual bool fromXML(const ICassandraSession *session, CassandraStatement *statement, unsigned idx, IPTree *row, const char *name, const char *userVal) override
  197. {
  198. // never bound, but does need to be included in the ?
  199. return true;
  200. }
  201. } timestampColumnMapper;
  202. static class HashRootNameColumnMapper : implements CassandraColumnMapper
  203. {
  204. public:
  205. virtual IPTree *toXML(IPTree *row, const char *name, const CassValue *value) override
  206. {
  207. throwUnexpected(); // we never return the partition column
  208. }
  209. virtual bool fromXML(const ICassandraSession *session, CassandraStatement *statement, unsigned idx, IPTree *row, const char *name, const char *userVal) override
  210. {
  211. if (statement)
  212. {
  213. int hash = rtlHash32VStr(row->queryName(), 0) % session->queryPartitions();
  214. statement->bindInt32(idx, hash);
  215. }
  216. return true;
  217. }
  218. } hashRootNameColumnMapper;
  219. static class RootNameColumnMapper : implements CassandraColumnMapper
  220. {
  221. public:
  222. virtual IPTree *toXML(IPTree *row, const char *name, const CassValue *value) override
  223. {
  224. rtlDataAttr str;
  225. unsigned chars;
  226. getUTF8Result(NULL, value, chars, str.refstr());
  227. StringAttr s(str.getstr(), rtlUtf8Size(chars, str.getstr()));
  228. row->renameProp("/", s);
  229. return row;
  230. }
  231. virtual bool fromXML(const ICassandraSession *session, CassandraStatement *statement, unsigned idx, IPTree *row, const char *name, const char *userVal) override
  232. {
  233. if (statement)
  234. {
  235. const char *value = row->queryName();
  236. statement->bindString(idx, value);
  237. }
  238. return true;
  239. }
  240. } rootNameColumnMapper;
  241. // WuidColumnMapper is used for columns containing a wuid that is NOT in the resulting XML - it
  242. // is an error to try to map such a column to/from the XML representation
  243. static class WuidColumnMapper : implements CassandraColumnMapper
  244. {
  245. public:
  246. virtual IPTree *toXML(IPTree *row, const char *name, const CassValue *value) override
  247. {
  248. throwUnexpected();
  249. }
  250. virtual bool fromXML(const ICassandraSession *session, CassandraStatement *statement, unsigned idx, IPTree *row, const char *name, const char *userVal) override
  251. {
  252. throwUnexpected();
  253. }
  254. } wuidColumnMapper;
  255. static class BoolColumnMapper : implements CassandraColumnMapper
  256. {
  257. public:
  258. virtual IPTree *toXML(IPTree *row, const char *name, const CassValue *value) override
  259. {
  260. row->addPropBool(name, getBooleanResult(NULL, value));
  261. return row;
  262. }
  263. virtual bool fromXML(const ICassandraSession *session, CassandraStatement *statement, unsigned idx, IPTree *row, const char *name, const char *userVal) override
  264. {
  265. if (row->hasProp(name))
  266. {
  267. if (statement)
  268. {
  269. bool value = row->getPropBool(name, false);
  270. statement->bindBool(idx, value ? cass_true : cass_false);
  271. }
  272. return true;
  273. }
  274. else
  275. return false;
  276. }
  277. } boolColumnMapper;
  278. static class PrefixSearchColumnMapper : implements CassandraColumnMapper
  279. {
  280. public:
  281. virtual IPTree *toXML(IPTree *row, const char *name, const CassValue *value) override
  282. {
  283. return row;
  284. }
  285. virtual bool fromXML(const ICassandraSession *session, CassandraStatement *statement, unsigned idx, IPTree *row, const char *name, const char *userVal) override
  286. {
  287. return _fromXML(statement, idx, row, userVal, session->queryPrefixSize(), true);
  288. }
  289. protected:
  290. static bool _fromXML(CassandraStatement *statement, unsigned idx, IPTree *row, const char *xpath, unsigned prefixLength, bool uc)
  291. {
  292. const char *columnVal = row->queryProp(xpath);
  293. if (columnVal)
  294. {
  295. if (statement)
  296. {
  297. StringBuffer buf(columnVal);
  298. if (uc)
  299. buf.toUpperCase();
  300. if (prefixLength)
  301. statement->bindString_n(idx, buf, prefixLength);
  302. else
  303. statement->bindString(idx, buf);
  304. }
  305. return true;
  306. }
  307. else
  308. return false;
  309. }
  310. } prefixSearchColumnMapper;
  311. static class SearchColumnMapper : public PrefixSearchColumnMapper
  312. {
  313. public:
  314. virtual bool fromXML(const ICassandraSession *session, CassandraStatement *statement, unsigned idx, IPTree *row, const char *name, const char *userVal) override
  315. {
  316. return _fromXML(statement, idx, row, userVal, 0, true);
  317. }
  318. } searchColumnMapper;
  319. static class LCSearchColumnMapper : public PrefixSearchColumnMapper
  320. {
  321. public:
  322. virtual bool fromXML(const ICassandraSession *session, CassandraStatement *statement, unsigned idx, IPTree *row, const char *name, const char *userVal) override
  323. {
  324. return _fromXML(statement, idx, row, userVal, 0, false);
  325. }
  326. } lcSearchColumnMapper;
  327. static class IntColumnMapper : implements CassandraColumnMapper
  328. {
  329. public:
  330. virtual IPTree *toXML(IPTree *row, const char *name, const CassValue *value) override
  331. {
  332. if (name)
  333. row->addPropInt(name, getSignedResult(NULL, value));
  334. return row;
  335. }
  336. virtual bool fromXML(const ICassandraSession *session, CassandraStatement *statement, unsigned idx, IPTree *row, const char *name, const char *userVal) override
  337. {
  338. if (row->hasProp(name))
  339. {
  340. if (statement)
  341. {
  342. int value = row->getPropInt(name);
  343. statement->bindInt32(idx, value);
  344. }
  345. return true;
  346. }
  347. else
  348. return false;
  349. }
  350. } intColumnMapper;
  351. static class DefaultedIntColumnMapper : public IntColumnMapper
  352. {
  353. public:
  354. virtual bool fromXML(const ICassandraSession *session, CassandraStatement *statement, unsigned idx, IPTree *row, const char *name, const char *userVal) override
  355. {
  356. if (statement)
  357. {
  358. int value = row->getPropInt(name, atoi(userVal));
  359. statement->bindInt32(idx, value);
  360. }
  361. return true;
  362. }
  363. } defaultedIntColumnMapper;
  364. static class BigIntColumnMapper : implements CassandraColumnMapper
  365. {
  366. public:
  367. virtual IPTree *toXML(IPTree *row, const char *name, const CassValue *value) override
  368. {
  369. row->addPropInt64(name, getSignedResult(NULL, value));
  370. return row;
  371. }
  372. virtual bool fromXML(const ICassandraSession *session, CassandraStatement *statement, unsigned idx, IPTree *row, const char *name, const char *userVal) override
  373. {
  374. if (row->hasProp(name))
  375. {
  376. if (statement)
  377. {
  378. __int64 value = row->getPropInt64(name);
  379. statement->bindInt64(idx, value);
  380. }
  381. return true;
  382. }
  383. else
  384. return false;
  385. }
  386. } bigintColumnMapper;
  387. static class SimpleMapColumnMapper : implements CassandraColumnMapper
  388. {
  389. public:
  390. virtual IPTree *toXML(IPTree *row, const char *name, const CassValue *value) override
  391. {
  392. Owned<IPTree> map = createPTree(name);
  393. CassandraIterator elems(cass_iterator_from_map(value));
  394. while (cass_iterator_next(elems))
  395. {
  396. rtlDataAttr str;
  397. unsigned chars;
  398. getStringResult(NULL, cass_iterator_get_map_key(elems), chars, str.refstr());
  399. StringAttr s(str.getstr(), chars);
  400. stringColumnMapper.toXML(map, s, cass_iterator_get_map_value(elems));
  401. }
  402. row->addPropTree(name, map.getClear());
  403. return row;
  404. }
  405. virtual bool fromXML(const ICassandraSession *session, CassandraStatement *statement, unsigned idx, IPTree *row, const char *name, const char *userVal) override
  406. {
  407. Owned<IPTree> child = row->getPropTree(name);
  408. if (child)
  409. {
  410. unsigned numItems = child->numChildren();
  411. if (numItems)
  412. {
  413. if (statement)
  414. {
  415. CassandraCollection collection(cass_collection_new(CASS_COLLECTION_TYPE_MAP, numItems));
  416. Owned<IPTreeIterator> items = child->getElements("*");
  417. ForEach(*items)
  418. {
  419. IPTree &item = items->query();
  420. const char *key = item.queryName();
  421. const char *value = item.queryProp(NULL);
  422. if (key && value)
  423. {
  424. check(cass_collection_append_string(collection, key));
  425. check(cass_collection_append_string(collection, value));
  426. }
  427. }
  428. statement->bindCollection(idx, collection);
  429. }
  430. return true;
  431. }
  432. }
  433. return false;
  434. }
  435. } simpleMapColumnMapper;
  436. static class AttributeMapColumnMapper : implements CassandraColumnMapper
  437. {
  438. public:
  439. virtual IPTree *toXML(IPTree *row, const char *name, const CassValue *value) override
  440. {
  441. CassandraIterator elems(cass_iterator_from_map(value));
  442. while (cass_iterator_next(elems))
  443. {
  444. rtlDataAttr str;
  445. unsigned chars;
  446. getStringResult(NULL, cass_iterator_get_map_key(elems), chars, str.refstr());
  447. StringBuffer s("@");
  448. s.append(chars, str.getstr());
  449. stringColumnMapper.toXML(row, s, cass_iterator_get_map_value(elems));
  450. }
  451. return row;
  452. }
  453. virtual bool fromXML(const ICassandraSession *session, CassandraStatement *statement, unsigned idx, IPTree *row, const char *name, const char *userVal) override
  454. {
  455. // NOTE - name here provides a list of attributes that we should NOT be mapping
  456. Owned<IAttributeIterator> attrs = row->getAttributes();
  457. unsigned numItems = 0;
  458. ForEach(*attrs)
  459. {
  460. StringBuffer key = attrs->queryName();
  461. key.append('@');
  462. if (strstr(name, key) == NULL)
  463. numItems++;
  464. }
  465. if (numItems)
  466. {
  467. if (statement)
  468. {
  469. CassandraCollection collection(cass_collection_new(CASS_COLLECTION_TYPE_MAP, numItems));
  470. ForEach(*attrs)
  471. {
  472. StringBuffer key = attrs->queryName();
  473. key.append('@');
  474. if (strstr(name, key) == NULL)
  475. {
  476. const char *value = attrs->queryValue();
  477. check(cass_collection_append_string(collection, attrs->queryName()+1)); // skip the @
  478. check(cass_collection_append_string(collection, value));
  479. }
  480. }
  481. statement->bindCollection(idx, collection);
  482. }
  483. return true;
  484. }
  485. else
  486. return false;
  487. }
  488. } attributeMapColumnMapper;
  489. static class ElementMapColumnMapper : implements CassandraColumnMapper
  490. {
  491. public:
  492. virtual IPTree *toXML(IPTree *row, const char *name, const CassValue *value) override
  493. {
  494. CassandraIterator elems(cass_iterator_from_map(value));
  495. while (cass_iterator_next(elems))
  496. {
  497. rtlDataAttr str;
  498. unsigned chars;
  499. getStringResult(NULL, cass_iterator_get_map_key(elems), chars, str.refstr());
  500. StringBuffer elemName(chars, str.getstr());
  501. stringColumnMapper.toXML(row, elemName, cass_iterator_get_map_value(elems));
  502. }
  503. return row;
  504. }
  505. virtual bool fromXML(const ICassandraSession *session, CassandraStatement *statement, unsigned idx, IPTree *row, const char *name, const char *userVal) override
  506. {
  507. // NOTE - name here provides a list of elements that we should NOT be mapping
  508. Owned<IPTreeIterator> elems = row->getElements("*");
  509. unsigned numItems = 0;
  510. ForEach(*elems)
  511. {
  512. IPTree &item = elems->query();
  513. StringBuffer key('@');
  514. key.append(item.queryName());
  515. key.append('@');
  516. if (strstr(name, key) == NULL)
  517. {
  518. const char *value = item.queryProp(".");
  519. if (value)
  520. numItems++;
  521. }
  522. }
  523. if (numItems)
  524. {
  525. if (statement)
  526. {
  527. CassandraCollection collection(cass_collection_new(CASS_COLLECTION_TYPE_MAP, numItems));
  528. ForEach(*elems)
  529. {
  530. IPTree &item = elems->query();
  531. StringBuffer key('@');
  532. key.append(item.queryName());
  533. key.append('@');
  534. if (strstr(name, key) == NULL)
  535. {
  536. const char *value = item.queryProp(".");
  537. if (value)
  538. {
  539. check(cass_collection_append_string(collection, item.queryName()));
  540. check(cass_collection_append_string(collection, value));
  541. }
  542. }
  543. }
  544. statement->bindCollection(idx, collection);
  545. }
  546. return true;
  547. }
  548. else
  549. return false;
  550. }
  551. } elementMapColumnMapper;
  552. static class SubtreeMapColumnMapper : implements CassandraColumnMapper
  553. {
  554. public:
  555. virtual IPTree *toXML(IPTree *row, const char *name, const CassValue *value) override
  556. {
  557. CassandraIterator elems(cass_iterator_from_map(value));
  558. while (cass_iterator_next(elems))
  559. {
  560. rtlDataAttr str;
  561. unsigned chars;
  562. getStringResult(NULL, cass_iterator_get_map_key(elems), chars, str.refstr());
  563. StringBuffer elemName(chars, str.getstr());
  564. const CassValue *value = cass_iterator_get_map_value(elems);
  565. StringBuffer valStr;
  566. getCassString(valStr, value);
  567. if (valStr.length() && valStr.charAt(0)== '<')
  568. {
  569. IPTree *sub = createPTreeFromXMLString(valStr);
  570. row->setPropTree(elemName, sub);
  571. }
  572. }
  573. return row;
  574. }
  575. virtual bool fromXML(const ICassandraSession *session, CassandraStatement *statement, unsigned idx, IPTree *row, const char *name, const char *userVal) override
  576. {
  577. // NOTE - name here provides a list of elements that we SHOULD be mapping
  578. Owned<IPTreeIterator> elems = row->getElements("*");
  579. unsigned numItems = 0;
  580. ForEach(*elems)
  581. {
  582. IPTree &item = elems->query();
  583. StringBuffer key("@");
  584. key.append(item.queryName());
  585. key.append('@');
  586. if (strstr(name, key) != NULL)
  587. numItems++;
  588. }
  589. if (numItems)
  590. {
  591. if (statement)
  592. {
  593. CassandraCollection collection(cass_collection_new(CASS_COLLECTION_TYPE_MAP, numItems));
  594. ForEach(*elems)
  595. {
  596. IPTree &item = elems->query();
  597. StringBuffer key("@");
  598. key.append(item.queryName());
  599. key.append('@');
  600. if (strstr(name, key) != NULL)
  601. {
  602. StringBuffer x;
  603. ::toXML(&item, x);
  604. check(cass_collection_append_string(collection, item.queryName()));
  605. check(cass_collection_append_string(collection, x));
  606. }
  607. }
  608. statement->bindCollection(idx, collection);
  609. }
  610. return true;
  611. }
  612. else
  613. return false;
  614. }
  615. } subTreeMapColumnMapper;
  616. /*
  617. static class QueryTextColumnMapper : public StringColumnMapper
  618. {
  619. public:
  620. virtual IPTree *toXML(IPTree *row, const char *name, const CassValue *value) override
  621. {
  622. // Name is "Query/Text ...
  623. IPTree *query = row->queryPropTree("Query");
  624. if (!query)
  625. {
  626. query = createPTree("Query");
  627. query = row->setPropTree("Query", query);
  628. row->setProp("Query/@fetchEntire", "1"); // Compatibility...
  629. }
  630. return StringColumnMapper::toXML(query, "Text", value);
  631. }
  632. } queryTextColumnMapper;
  633. */
  634. static class GraphMapColumnMapper : implements CassandraColumnMapper
  635. {
  636. public:
  637. GraphMapColumnMapper(const char *_elemName, const char *_nameAttr)
  638. : elemName(_elemName), nameAttr(_nameAttr)
  639. {
  640. }
  641. virtual IPTree *toXML(IPTree *row, const char *name, const CassValue *value) override
  642. {
  643. Owned<IPTree> map = createPTree(name);
  644. CassandraIterator elems(cass_iterator_from_map(value));
  645. while (cass_iterator_next(elems))
  646. {
  647. rtlDataAttr str;
  648. unsigned chars;
  649. getStringResult(NULL, cass_iterator_get_map_value(elems), chars, str.refstr());
  650. Owned<IPTree> child = createPTreeFromXMLString(chars, str.getstr());
  651. map->addPropTree(elemName, child.getClear());
  652. }
  653. row->addPropTree(name, map.getClear());
  654. return row;
  655. }
  656. virtual bool fromXML(const ICassandraSession *session, CassandraStatement *statement, unsigned idx, IPTree *row, const char *name, const char *userVal) override
  657. {
  658. Owned<IPTree> child = row->getPropTree(name);
  659. if (child)
  660. {
  661. unsigned numItems = child->numChildren();
  662. if (numItems)
  663. {
  664. if (statement)
  665. {
  666. CassandraCollection collection(cass_collection_new(CASS_COLLECTION_TYPE_MAP, numItems));
  667. Owned<IPTreeIterator> items = child->getElements("*");
  668. ForEach(*items)
  669. {
  670. IPTree &item = items->query();
  671. const char *key = item.queryProp(nameAttr);
  672. // MORE - may need to read, and probably should write, compressed. At least for graphs
  673. StringBuffer value;
  674. ::toXML(&item, value, 0, 0);
  675. if (key && value.length())
  676. {
  677. check(cass_collection_append_string(collection, key));
  678. check(cass_collection_append_string(collection, value));
  679. }
  680. }
  681. statement->bindCollection(idx, collection);
  682. }
  683. return true;
  684. }
  685. }
  686. return false;
  687. }
  688. private:
  689. const char *elemName;
  690. const char *nameAttr;
  691. } graphMapColumnMapper("Graph", "@name"), workflowMapColumnMapper("Item", "@wfid"), associationsMapColumnMapper("File", "@filename"), usedFieldsMapColumnMapper("field", "@name");
  692. static class WarningsMapColumnMapper : implements CassandraColumnMapper
  693. {
  694. public:
  695. virtual IPTree *toXML(IPTree *row, const char *name, const CassValue *value) override
  696. {
  697. CassandraIterator elems(cass_iterator_from_map(value));
  698. while (cass_iterator_next(elems))
  699. {
  700. unsigned code = getUnsignedResult(NULL, cass_iterator_get_map_key(elems));
  701. VStringBuffer xpath("OnWarnings/OnWarning[@code='%u']", code);
  702. IPropertyTree * mapping = row->queryPropTree(xpath);
  703. if (!mapping)
  704. {
  705. IPropertyTree * onWarnings = ensurePTree(row, "OnWarnings");
  706. mapping = onWarnings->addPropTree("OnWarning", createPTree());
  707. mapping->setPropInt("@code", code);
  708. }
  709. rtlDataAttr str;
  710. unsigned chars;
  711. getStringResult(NULL, cass_iterator_get_map_value(elems), chars, str.refstr());
  712. StringBuffer s(chars, str.getstr());
  713. mapping->setProp("@severity", s);
  714. }
  715. return row;
  716. }
  717. virtual bool fromXML(const ICassandraSession *session, CassandraStatement *statement, unsigned idx, IPTree *row, const char *name, const char *userVal) override
  718. {
  719. if (!row->hasProp("OnWarnings/OnWarning"))
  720. return false;
  721. else
  722. {
  723. if (statement)
  724. {
  725. CassandraCollection collection(cass_collection_new(CASS_COLLECTION_TYPE_MAP, 5));
  726. Owned<IPTreeIterator> elems = row->getElements("OnWarnings/OnWarning");
  727. ForEach(*elems)
  728. {
  729. IPTree &item = elems->query();
  730. unsigned code = item.getPropInt("@code", 0);
  731. const char *value = item.queryProp("@severity");
  732. if (value)
  733. {
  734. check(cass_collection_append_int32(collection, code));
  735. check(cass_collection_append_string(collection, value));
  736. }
  737. }
  738. statement->bindCollection(idx, collection);
  739. }
  740. return true;
  741. }
  742. }
  743. } warningsMapColumnMapper;
  744. static class PluginListColumnMapper : implements CassandraColumnMapper
  745. {
  746. public:
  747. PluginListColumnMapper(const char *_elemName, const char *_nameAttr)
  748. : elemName(_elemName), nameAttr(_nameAttr)
  749. {
  750. }
  751. virtual IPTree *toXML(IPTree *row, const char *name, const CassValue *value) override
  752. {
  753. Owned<IPTree> map = name ? createPTree(name) : LINK(row);
  754. CassandraIterator elems(cass_iterator_from_collection(value));
  755. while (cass_iterator_next(elems))
  756. {
  757. Owned<IPTree> child = createPTree(elemName);
  758. stringColumnMapper.toXML(child, nameAttr, cass_iterator_get_value(elems));
  759. map->addPropTree(elemName, child.getClear());
  760. }
  761. if (name)
  762. row->addPropTree(name, map.getClear());
  763. return row;
  764. }
  765. virtual bool fromXML(const ICassandraSession *session, CassandraStatement *statement, unsigned idx, IPTree *row, const char *name, const char *userVal) override
  766. {
  767. Owned<IPTree> child = row->getPropTree(name);
  768. if (child)
  769. {
  770. unsigned numItems = child->numChildren();
  771. if (numItems)
  772. {
  773. if (statement)
  774. {
  775. CassandraCollection collection(cass_collection_new(CASS_COLLECTION_TYPE_LIST, numItems));
  776. Owned<IPTreeIterator> items = child->getElements("*");
  777. ForEach(*items)
  778. {
  779. IPTree &item = items->query();
  780. const char *value = item.queryProp(nameAttr);
  781. if (value)
  782. check(cass_collection_append_string(collection, value));
  783. }
  784. statement->bindCollection(idx, collection);
  785. }
  786. return true;
  787. }
  788. }
  789. return false;
  790. }
  791. private:
  792. const char *elemName;
  793. const char *nameAttr;
  794. } pluginListColumnMapper("Plugin", "@dllname"), subfileListColumnMapper("Subfile", "@name");
  795. struct CassandraXmlMapping
  796. {
  797. const char *columnName;
  798. const char *columnType;
  799. const char *xpath;
  800. CassandraColumnMapper &mapper;
  801. };
  802. struct CassandraTableInfo
  803. {
  804. const char *x;
  805. const CassandraXmlMapping *mappings;
  806. };
  807. static const int majorVersion = 1; // If this does not match the value in the repository, you cannot proceed - a conversion tool is needed
  808. static const int minorVersion = 2; // If this is less that the value in the repository, we should be fine (but there may be columns we don't know about and thus don't read - and will write as NULL in new rows)
  809. // If this is greater than the value in the repository, we need to update the repository (using add column) and its version before proceeding
  810. // Make sure to increment this if any column is ever added below
  811. static const CassandraXmlMapping workunitsMappings [] =
  812. {
  813. {"partition", "int", NULL, hashRootNameColumnMapper},
  814. {"wuid", "text", NULL, rootNameColumnMapper},
  815. {"clustername", "text", "@clusterName", stringColumnMapper},
  816. {"jobname", "text", "@jobName", stringColumnMapper},
  817. {"priorityclass", "text", "@priorityClass", stringColumnMapper},
  818. {"prioritylevel", "int", "@priorityLevel", intColumnMapper},
  819. {"wuScope", "text", "@scope", stringColumnMapper},
  820. {"submitID", "text", "@submitID", stringColumnMapper},
  821. {"state", "text", "@state", stringColumnMapper},
  822. {"action", "text", "Action", stringColumnMapper},
  823. {"protected", "boolean", "@protected", boolColumnMapper},
  824. {"scheduled", "text", "@timeScheduled", stringColumnMapper}, // Should store as a date?
  825. {"totalThorTime", "text", "@totalThorTime", stringColumnMapper}, // We store in the wu ptree as a collatable string (with leading spaces to force to one partition)
  826. {"appvalues", "map<text, text>", "@Application@", subTreeMapColumnMapper},
  827. {"agentSession", "bigint", "@agentSession", bigintColumnMapper},
  828. {"debug", "map<text, text>", "Debug", simpleMapColumnMapper},
  829. {"attributes", "map<text, text>", "@agentSession@wuid@clusterName@jobName@priorityClass@priorityLevel@protected@scope@submitID@state@timeScheduled@totalThorTime@", attributeMapColumnMapper}, // name is the suppression list, note trailing @
  830. {"plugins", "list<text>", "Plugins", pluginListColumnMapper},
  831. {"workflow", "map<text, text>", "Workflow", workflowMapColumnMapper},
  832. {"onWarnings", "map<int, text>", "OnWarnings/OnWarning", warningsMapColumnMapper},
  833. // These are catchalls for anything not processed above or in a child table
  834. {"elements", "map<text, text>", "@Action@Application@Debug@Exceptions@Files@FilesRead@Graphs@Results@Statistics@Plugins@Query@State@Variables@Temporaries@Workflow@", elementMapColumnMapper}, // name is the suppression list, note trailing @
  835. {"subtrees", "map<text, text>", "@DiskUsageStats@Parameters@Process@Tracing@", subTreeMapColumnMapper}, // name is the INCLUSION list, note trailing @
  836. { NULL, "workunits", "((partition), wuid)|CLUSTERING ORDER BY (wuid DESC)", stringColumnMapper}
  837. };
  838. static const CassandraXmlMapping workunitInfoMappings [] = // A cut down version of the workunit mappings - used when querying with no key
  839. {
  840. {"partition", "int", NULL, hashRootNameColumnMapper},
  841. {"wuid", "text", NULL, rootNameColumnMapper},
  842. {"clustername", "text", "@clusterName", stringColumnMapper},
  843. {"jobname", "text", "@jobName", stringColumnMapper},
  844. {"priorityclass", "text", "@priorityClass", stringColumnMapper},
  845. {"prioritylevel", "int", "@priorityLevel", intColumnMapper},
  846. {"wuScope", "text", "@scope", stringColumnMapper},
  847. {"submitID", "text", "@submitID", stringColumnMapper},
  848. {"state", "text", "@state", stringColumnMapper},
  849. {"action", "text", "Action", stringColumnMapper},
  850. {"protected", "boolean", "@protected", boolColumnMapper},
  851. {"scheduled", "text", "@timeScheduled", stringColumnMapper}, // Should store as a date?
  852. {"totalThorTime", "text", "@totalThorTime", stringColumnMapper}, // We store in the wu ptree as a collatable string. Need to force to one partition too
  853. {"appvalues", "map<text, text>", "@Application@", subTreeMapColumnMapper},
  854. { NULL, "workunits", "((partition), wuid)|CLUSTERING ORDER BY (wuid DESC)", stringColumnMapper}
  855. };
  856. // The following describes the search table - this contains copies of the basic wu information but keyed by different fields
  857. static const CassandraXmlMapping searchMappings [] =
  858. {
  859. {"xpath", "text", NULL, suppliedStringColumnMapper},
  860. {"fieldPrefix", "text", NULL, prefixSearchColumnMapper},
  861. {"fieldValue", "text", NULL, searchColumnMapper},
  862. {"wuid", "text", NULL, rootNameColumnMapper},
  863. {"clustername", "text", "@clusterName", stringColumnMapper},
  864. {"jobname", "text", "@jobName", stringColumnMapper},
  865. {"priorityclass", "text", "@priorityClass", stringColumnMapper},
  866. {"prioritylevel", "int", "@priorityLevel", intColumnMapper},
  867. {"scope", "text", "@scope", stringColumnMapper},
  868. {"submitID", "text", "@submitID", stringColumnMapper},
  869. {"state", "text", "@state", stringColumnMapper},
  870. {"action", "text", "Action", stringColumnMapper},
  871. {"protected", "boolean", "@protected", boolColumnMapper},
  872. {"scheduled", "text", "@timeScheduled", stringColumnMapper}, // Should store as a date?
  873. {"totalThorTime", "text", "@totalThorTime", stringColumnMapper}, // We store in the wu ptree as a collatable string. Need to force to one partition too
  874. {"appvalues", "map<text, text>", "@Application@", subTreeMapColumnMapper},
  875. { NULL, "workunitsSearch", "((xpath, fieldPrefix), fieldValue, wuid)|CLUSTERING ORDER BY (fieldValue ASC, wuid DESC)", stringColumnMapper}
  876. };
  877. // The fields we can search by. These are a subset of the fields in the basic workunit info that is returned from a search. A row is created in the search table for each of these, for each workunit.
  878. const char * searchPaths[] = { "@submitID", "@clusterName", "@jobName", "@priorityClass", "@protected", "@scope", "@state", "@totalThorTime", NULL};
  879. static const CassandraXmlMapping uniqueSearchMappings [] =
  880. {
  881. {"xpath", "text", NULL, suppliedStringColumnMapper},
  882. {"fieldPrefix", "text", NULL, prefixSearchColumnMapper}, // Leading N chars, upper-cased
  883. {"fieldValue", "text", NULL, searchColumnMapper}, // upper-cased
  884. {"origFieldValue", "text", NULL, lcSearchColumnMapper}, // original case
  885. { NULL, "uniqueSearchValues", "((xpath, fieldPrefix), fieldValue, origFieldValue)|CLUSTERING ORDER BY (fieldValue ASC)", stringColumnMapper}
  886. };
  887. // The fields we can wild search by. We store these in the uniqueSearchMappings table so we can translate wildcards into sets
  888. // We also add application name/key combinations, but we have to special-case that
  889. const char * wildSearchPaths[] = { "@submitID", "@clusterName", "@jobName", NULL};
  890. static const CassandraXmlMapping filesSearchMappings [] =
  891. {
  892. {"name", "text", "@name", stringColumnMapper},
  893. {"read", "boolean", "@read", boolColumnMapper},
  894. {"wuid", "text", NULL, suppliedStringColumnMapper},
  895. { NULL, "filesSearchValues", "((name, read), wuid)|CLUSTERING ORDER BY (wuid DESC)", stringColumnMapper}
  896. };
  897. // The version table is keyed by a partition value because (a) you need to key by something and (b) we can use it to spread the load of
  898. // version lookups (pick a partition at random).
  899. // Note that this table must have the same minimum layout on all versions.
  900. static const CassandraXmlMapping versionMappings [] =
  901. {
  902. {"partition", "int", "@partition", intColumnMapper},
  903. {"major", "int", "@major", intColumnMapper},
  904. {"minor", "int", "@minor", intColumnMapper},
  905. {"attributes", "map<text, text>", "@major@minor@partition@", attributeMapColumnMapper}, // name is the suppression list, note trailing @
  906. { NULL, "version", "((partition))", stringColumnMapper}
  907. };
  908. /*
  909. * Some thoughts on the secondary tables:
  910. * 1. To support (trailing) wildcards we will need to split the key into two - the leading N chars and the rest. Exactly what N is will depend on the installation size.
  911. * Too large and users will complain, but too small would hinder partitioning of the values across Cassandra nodes. 1 or 2 may be enough.
  912. * 2. I could combine all the secondary tables into 1 with a field indicating the type of the key. The key field would be repeated though... Would it help?
  913. * I'm not sure it really changes a lot - adds a bit of noise into the partitioner...
  914. * Actually, it does mean that the updates and deletes can all be done with a single Cassandra query, though whether that has any advantages over multiple in a batch I don't know
  915. * It MAY well make it easier to make sure that searches are case-insensitive, since we'll generally need to separate out the search field from the display field to achieve that
  916. * 3. Sort orders are tricky - I can use the secondary table to deliver sorted by one field as long as it is the one I am filtering by (but if is is I probably don't need it sorted!)
  917. *
  918. */
  919. // The following describe child tables - all keyed by wuid
  920. enum ChildTablesEnum { WuQueryChild, WuExceptionsChild, WuStatisticsChild, WuGraphsChild, WuResultsChild, WuVariablesChild, WuTemporariesChild, WuFilesReadChild, WuFilesWrittenChild, WuFieldUsage, ChildTablesSize };
  921. struct ChildTableInfo
  922. {
  923. const char *parentElement;
  924. const char *childElement;
  925. ChildTablesEnum index;
  926. const CassandraXmlMapping *mappings;
  927. };
  928. // wuQueries table is slightly unusual among the child tables as is is 1:1 - it is split out for lazy load purposes.
  929. static const CassandraXmlMapping wuQueryMappings [] =
  930. {
  931. {"partition", "int", NULL, hashRootNameColumnMapper},
  932. {"wuid", "text", NULL, rootNameColumnMapper},
  933. {"associations", "map<text, text>", "Associated", associationsMapColumnMapper},
  934. {"attributes", "map<text, text>", "", attributeMapColumnMapper},
  935. {"query", "text", "Text", stringColumnMapper}, // May want to make this even lazier...
  936. {"shortQuery", "text", "ShortText", stringColumnMapper},
  937. { NULL, "wuQueries", "((partition), wuid)", stringColumnMapper}
  938. };
  939. static const ChildTableInfo wuQueriesTable =
  940. {
  941. "Query", NULL,
  942. WuQueryChild,
  943. wuQueryMappings
  944. };
  945. // wuExceptions table holds the exceptions associated with a wuid
  946. static const CassandraXmlMapping wuExceptionsMappings [] =
  947. {
  948. {"partition", "int", NULL, hashRootNameColumnMapper},
  949. {"wuid", "text", NULL, rootNameColumnMapper},
  950. {"sequence", "int", "@sequence", intColumnMapper},
  951. {"attributes", "map<text, text>", "", attributeMapColumnMapper},
  952. {"value", "text", ".", stringColumnMapper},
  953. { NULL, "wuExceptions", "((partition), wuid, sequence)", stringColumnMapper}
  954. };
  955. static const ChildTableInfo wuExceptionsTable =
  956. {
  957. "Exceptions", "Exception",
  958. WuExceptionsChild,
  959. wuExceptionsMappings
  960. };
  961. static const CassandraXmlMapping wuStatisticsMappings [] =
  962. {
  963. {"partition", "int", NULL, hashRootNameColumnMapper},
  964. {"wuid", "text", NULL, rootNameColumnMapper},
  965. {"ts", "bigint", "@ts", bigintColumnMapper}, // MORE - should change this to a timeuuid ?
  966. {"kind", "text", "@kind", stringColumnMapper},
  967. {"creator", "text", "@creator", stringColumnMapper},
  968. {"scope", "text", "@scope", stringColumnMapper},
  969. {"attributes", "map<text, text>", "@ts@kind@creator@scope@", attributeMapColumnMapper},
  970. { NULL, "wuStatistics", "((partition), wuid, ts, kind, creator, scope)", stringColumnMapper}
  971. };
  972. static const ChildTableInfo wuStatisticsTable =
  973. {
  974. "Statistics", "Statistic",
  975. WuStatisticsChild,
  976. wuStatisticsMappings
  977. };
  978. static const CassandraXmlMapping wuGraphsMappings [] =
  979. {
  980. {"partition", "int", NULL, hashRootNameColumnMapper},
  981. {"wuid", "text", NULL, rootNameColumnMapper},
  982. {"name", "text", "@name", stringColumnMapper},
  983. {"attributes", "map<text, text>", "@name@", attributeMapColumnMapper},
  984. {"xgmml", "blob", "xgmml", compressTreeColumnMapper},
  985. { NULL, "wuGraphs", "((partition), wuid, name)", stringColumnMapper} // Note - we do occasionally search by type - but that is done in a postfilter having preloaded/cached all
  986. };
  987. static const ChildTableInfo wuGraphsTable =
  988. {
  989. "Graphs", "Graph",
  990. WuGraphsChild,
  991. wuGraphsMappings
  992. };
  993. // A cut down version of the above - note this does not represent a different table!
  994. static const CassandraXmlMapping wuGraphMetasMappings [] =
  995. {
  996. {"partition", "int", NULL, hashRootNameColumnMapper},
  997. {"wuid", "text", NULL, rootNameColumnMapper},
  998. {"name", "text", "@name", stringColumnMapper},
  999. {"attributes", "map<text, text>", "@name@", attributeMapColumnMapper},
  1000. { NULL, "wuGraphs", "((partition), wuid, name)", stringColumnMapper}
  1001. };
  1002. static const ChildTableInfo wuGraphMetasTable =
  1003. {
  1004. "Graphs", "Graph",
  1005. WuGraphsChild,
  1006. wuGraphMetasMappings
  1007. };
  1008. #define resultTableFields \
  1009. {"partition", "int", NULL, hashRootNameColumnMapper}, \
  1010. {"wuid", "text", NULL, rootNameColumnMapper}, \
  1011. {"sequence", "int", "@sequence", defaultedIntColumnMapper}, \
  1012. {"name", "text", "@name", stringColumnMapper}, \
  1013. {"attributes", "map<text, text>", "@sequence@name@", attributeMapColumnMapper}, /* name is the suppression list */ \
  1014. {"rowcount", "int", "rowCount", intColumnMapper}, /* This is the number of rows in result (which may be stored in a file rather than in value) */ \
  1015. {"totalrowcount", "bigint", "totalRowCount", bigintColumnMapper}, /* This is the number of rows in value */ \
  1016. {"schemaRaw", "blob", "SchemaRaw", blobColumnMapper}, \
  1017. {"logicalName", "text", "logicalName", stringColumnMapper}, /* either this or value will be present once result status is "calculated" */ \
  1018. {"value", "blob", "Value", blobColumnMapper}, \
  1019. {"graph", "text", "@graph", stringColumnMapper}, \
  1020. {"activity", "int", "@activity", intColumnMapper}
  1021. static const CassandraXmlMapping wuResultsMappings [] =
  1022. {
  1023. resultTableFields,
  1024. { NULL, "wuResults", "((partition), wuid, sequence)", stringColumnMapper}
  1025. };
  1026. static const ChildTableInfo wuResultsTable =
  1027. {
  1028. "Results", "Result",
  1029. WuResultsChild,
  1030. wuResultsMappings
  1031. };
  1032. // This looks very similar to the above, but the key is different...
  1033. static const CassandraXmlMapping wuVariablesMappings [] =
  1034. {
  1035. resultTableFields,
  1036. {"xmlValue", "text", "xmlValue", stringColumnMapper},
  1037. { NULL, "wuVariables", "((partition), wuid, sequence, name)", stringColumnMapper}
  1038. };
  1039. static const ChildTableInfo wuVariablesTable =
  1040. {
  1041. "Variables", "Variable",
  1042. WuVariablesChild,
  1043. wuVariablesMappings
  1044. };
  1045. // Again, very similar, but mapped to a different area of the XML
  1046. static const CassandraXmlMapping wuTemporariesMappings [] =
  1047. {
  1048. resultTableFields,
  1049. { NULL, "wuTemporaries", "((partition), wuid, sequence, name)", stringColumnMapper}
  1050. };
  1051. static const ChildTableInfo wuTemporariesTable =
  1052. {
  1053. "Temporaries", "Variable",
  1054. WuTemporariesChild,
  1055. wuTemporariesMappings
  1056. };
  1057. static const CassandraXmlMapping wuFilesReadMappings [] =
  1058. {
  1059. {"partition", "int", NULL, hashRootNameColumnMapper},
  1060. {"wuid", "text", NULL, rootNameColumnMapper},
  1061. {"name", "text", "@name", stringColumnMapper},
  1062. {"attributes", "map<text, text>", "@name@", attributeMapColumnMapper}, /* name is the suppression list */
  1063. {"subfiles", "list<text>", NULL, subfileListColumnMapper},
  1064. { NULL, "wuFilesRead", "((partition), wuid, name)", stringColumnMapper}
  1065. };
  1066. static const ChildTableInfo wuFilesReadTable =
  1067. {
  1068. "FilesRead", "File",
  1069. WuFilesReadChild,
  1070. wuFilesReadMappings
  1071. };
  1072. static const CassandraXmlMapping wuFilesWrittenMappings [] =
  1073. {
  1074. {"partition", "int", NULL, hashRootNameColumnMapper},
  1075. {"wuid", "text", NULL, rootNameColumnMapper},
  1076. {"name", "text", "@name", stringColumnMapper},
  1077. {"attributes", "map<text, text>", "@name@", attributeMapColumnMapper}, /* name is the suppression list */
  1078. { NULL, "wuFilesWritten", "((partition), wuid, name)", stringColumnMapper}
  1079. };
  1080. static const ChildTableInfo wuFilesWrittenTable =
  1081. {
  1082. "Files", "File",
  1083. WuFilesWrittenChild,
  1084. wuFilesWrittenMappings
  1085. };
  1086. static const CassandraXmlMapping wuFieldUsageMappings [] =
  1087. {
  1088. {"partition", "int", NULL, hashRootNameColumnMapper},
  1089. {"wuid", "text", NULL, rootNameColumnMapper},
  1090. {"name", "text", "@name", stringColumnMapper},
  1091. {"type", "text", "@type", stringColumnMapper},
  1092. {"numFields", "int", "@numFields", intColumnMapper},
  1093. {"numFieldsUsed", "int", "@numFieldsUsed", intColumnMapper},
  1094. {"fields", "map<text, text>", "fields", usedFieldsMapColumnMapper},
  1095. { NULL, "wuFieldUsage", "((partition), wuid, name)", stringColumnMapper}
  1096. };
  1097. static const ChildTableInfo wuFieldUsageTable =
  1098. {
  1099. "usedsources", "datasource",
  1100. WuFieldUsage,
  1101. wuFieldUsageMappings
  1102. };
  1103. // Order should match the enum above
  1104. static const ChildTableInfo * const childTables [] = { &wuQueriesTable, &wuExceptionsTable, &wuStatisticsTable, &wuGraphsTable, &wuResultsTable, &wuVariablesTable, &wuTemporariesTable, &wuFilesReadTable, &wuFilesWrittenTable, &wuFieldUsageTable, NULL };
  1105. // Graph progress tables are read directly, XML mappers not used
  1106. static const CassandraXmlMapping wuGraphProgressMappings [] =
  1107. {
  1108. {"partition", "int", NULL, hashRootNameColumnMapper},
  1109. {"wuid", "text", NULL, rootNameColumnMapper},
  1110. {"graphID", "text", NULL, stringColumnMapper},
  1111. {"subgraphID", "bigint", NULL, bigintColumnMapper},
  1112. {"creator", "text", NULL, stringColumnMapper},
  1113. {"progress", "blob", NULL, blobColumnMapper},
  1114. { NULL, "wuGraphProgress", "((partition), wuid, graphID, subgraphID, creator)", stringColumnMapper}
  1115. };
  1116. static const CassandraXmlMapping wuGraphStateMappings [] =
  1117. {
  1118. {"partition", "int", NULL, hashRootNameColumnMapper},
  1119. {"wuid", "text", NULL, rootNameColumnMapper},
  1120. {"graphID", "text", NULL, stringColumnMapper},
  1121. {"subgraphID", "bigint", NULL, bigintColumnMapper},
  1122. {"state", "int", NULL, intColumnMapper},
  1123. { NULL, "wuGraphState", "((partition), wuid, graphID, subgraphID)", stringColumnMapper}
  1124. };
  1125. static const CassandraXmlMapping wuGraphRunningMappings [] =
  1126. {
  1127. {"partition", "int", NULL, hashRootNameColumnMapper},
  1128. {"wuid", "text", NULL, rootNameColumnMapper},
  1129. {"graphID", "text", NULL, stringColumnMapper},
  1130. {"subgraphID", "bigint", NULL, bigintColumnMapper},
  1131. { NULL, "wuGraphRunning", "((partition), wuid)", stringColumnMapper}
  1132. };
  1133. void getBoundFieldNames(const ICassandraSession *session, const CassandraXmlMapping *mappings, StringBuffer &names, StringBuffer &bindings, IPTree *inXML, const char *userVal, StringBuffer &tableName)
  1134. {
  1135. while (mappings->columnName)
  1136. {
  1137. if (!inXML || mappings->mapper.fromXML(session, NULL, 0, inXML, mappings->xpath, userVal))
  1138. {
  1139. names.appendf(",%s", mappings->columnName);
  1140. if (strcmp(mappings->columnType, "timeuuid")==0)
  1141. bindings.appendf(",now()");
  1142. else
  1143. bindings.appendf(",?");
  1144. }
  1145. mappings++;
  1146. }
  1147. tableName.append(mappings->columnType);
  1148. }
  1149. void getFieldNames(const CassandraXmlMapping *mappings, StringBuffer &names, StringBuffer &tableName)
  1150. {
  1151. while (mappings->columnName)
  1152. {
  1153. names.appendf(",%s", mappings->columnName);
  1154. mappings++;
  1155. }
  1156. tableName.append(mappings->columnType);
  1157. }
  1158. const char *queryTableName(const CassandraXmlMapping *mappings)
  1159. {
  1160. while (mappings->columnName)
  1161. mappings++;
  1162. return mappings->columnType;
  1163. }
  1164. StringBuffer & describeTable(const CassandraXmlMapping *mappings, StringBuffer &out)
  1165. {
  1166. StringBuffer fields;
  1167. while (mappings->columnName)
  1168. {
  1169. fields.appendf("%s %s,", mappings->columnName, mappings->columnType);
  1170. mappings++;
  1171. }
  1172. StringArray options;
  1173. options.appendList(mappings->xpath, "|");
  1174. assertex(options.length()); // Primary key at least should be present!
  1175. out.appendf("CREATE TABLE IF NOT EXISTS %s (%s PRIMARY KEY %s)", mappings->columnType, fields.str(), options.item(0));
  1176. unsigned idx = 1;
  1177. while (options.isItem(idx))
  1178. {
  1179. if (idx==1)
  1180. out.append(" WITH ");
  1181. else
  1182. out.append(", ");
  1183. out.append(options.item(idx));
  1184. idx++;
  1185. }
  1186. out.append(';');
  1187. return out;
  1188. }
  1189. const CassResult *executeQuery(CassSession *session, CassStatement *statement)
  1190. {
  1191. CassandraFuture future(cass_session_execute(session, statement));
  1192. future.wait("executeQuery");
  1193. return cass_future_get_result(future);
  1194. }
  1195. void deleteSecondaryByKey(const char * xpath, const char *key, const char *wuid, const ICassandraSession *sessionCache, CIArrayOf<CassandraStatement> &batch)
  1196. {
  1197. if (key)
  1198. {
  1199. StringBuffer ucKey(key);
  1200. ucKey.toUpperCase();
  1201. StringBuffer names;
  1202. StringBuffer tableName;
  1203. getFieldNames(searchMappings, names, tableName);
  1204. VStringBuffer deleteQuery("DELETE from %s where xpath=? and fieldPrefix=? and fieldValue=? and wuid=?;", tableName.str());
  1205. CassandraStatement &update = *new CassandraStatement(sessionCache->prepareStatement(deleteQuery));
  1206. update.bindString(0, xpath);
  1207. update.bindString_n(1, ucKey, sessionCache->queryPrefixSize());
  1208. update.bindString(2, ucKey);
  1209. update.bindString(3, wuid);
  1210. batch.append(update);
  1211. }
  1212. }
  1213. void executeSimpleCommand(CassSession *session, const char *command)
  1214. {
  1215. CassandraStatement statement(cass_statement_new(command, 0));
  1216. CassandraFuture future(cass_session_execute(session, statement));
  1217. future.wait("execute");
  1218. }
  1219. void ensureTable(CassSession *session, const CassandraXmlMapping *mappings)
  1220. {
  1221. StringBuffer schema;
  1222. executeSimpleCommand(session, describeTable(mappings, schema));
  1223. }
  1224. extern void simpleXMLtoCassandra(const ICassandraSession *session, CassBatch *batch, const CassandraXmlMapping *mappings, IPTree *inXML, const char *userVal = NULL)
  1225. {
  1226. StringBuffer names;
  1227. StringBuffer bindings;
  1228. StringBuffer tableName;
  1229. getBoundFieldNames(session, mappings, names, bindings, inXML, userVal, tableName);
  1230. VStringBuffer insertQuery("INSERT into %s (%s) values (%s);", tableName.str(), names.str()+1, bindings.str()+1);
  1231. CassandraStatement update(session->prepareStatement(insertQuery));
  1232. unsigned bindidx = 0;
  1233. while (mappings->columnName)
  1234. {
  1235. if (mappings->mapper.fromXML(session, &update, bindidx, inXML, mappings->xpath, userVal))
  1236. bindidx++;
  1237. mappings++;
  1238. }
  1239. check(cass_batch_add_statement(batch, update));
  1240. }
  1241. extern void simpleXMLtoCassandra(const ICassandraSession *session, CIArrayOf<CassandraStatement> &batch, const CassandraXmlMapping *mappings, IPTree *inXML, const char *userVal = NULL)
  1242. {
  1243. StringBuffer names;
  1244. StringBuffer bindings;
  1245. StringBuffer tableName;
  1246. getBoundFieldNames(session, mappings, names, bindings, inXML, userVal, tableName);
  1247. VStringBuffer insertQuery("INSERT into %s (%s) values (%s);", tableName.str(), names.str()+1, bindings.str()+1);
  1248. CassandraStatement &update = *new CassandraStatement(session->prepareStatement(insertQuery));
  1249. unsigned bindidx = 0;
  1250. while (mappings->columnName)
  1251. {
  1252. if (mappings->mapper.fromXML(session, &update, bindidx, inXML, mappings->xpath, userVal))
  1253. bindidx++;
  1254. mappings++;
  1255. }
  1256. batch.append(update);
  1257. }
  1258. extern void deleteFileSearch(const ICassandraSession *session, CIArrayOf<CassandraStatement> &batch, const char *name, bool read, const char *wuid)
  1259. {
  1260. StringBuffer names;
  1261. StringBuffer tableName;
  1262. getFieldNames(filesSearchMappings, names, tableName);
  1263. VStringBuffer deleteQuery("DELETE from %s where name=? and read=? and wuid=?", tableName.str());
  1264. CassandraStatement &update = *new CassandraStatement(session->prepareStatement(deleteQuery));
  1265. update.bindString(0, name);
  1266. update.bindBool(1, read ? cass_true : cass_false);
  1267. update.bindString(2, wuid);
  1268. batch.append(update);
  1269. }
  1270. extern void addFileSearch(const ICassandraSession *session, CIArrayOf<CassandraStatement> &batch, const char *name, bool read, const char *wuid)
  1271. {
  1272. StringBuffer bindings;
  1273. StringBuffer names;
  1274. StringBuffer tableName;
  1275. getBoundFieldNames(session, filesSearchMappings, names, bindings, NULL, NULL, tableName);
  1276. VStringBuffer insertQuery("INSERT INTO %s (%s) values (%s)", tableName.str(), names.str()+1, bindings.str()+1);
  1277. CassandraStatement &update = *new CassandraStatement(session->prepareStatement(insertQuery));
  1278. update.bindString(0, name);
  1279. update.bindBool(1, read ? cass_true : cass_false);
  1280. update.bindString(2, wuid);
  1281. batch.append(update);
  1282. }
  1283. extern void addUniqueValue(const ICassandraSession *session, CIArrayOf<CassandraStatement> &batch, const char *xpath, const char *value)
  1284. {
  1285. StringBuffer bindings;
  1286. StringBuffer names;
  1287. StringBuffer tableName;
  1288. getBoundFieldNames(session, uniqueSearchMappings, names, bindings, NULL, NULL, tableName);
  1289. VStringBuffer insertQuery("INSERT into %s (%s) values (%s);", tableName.str(), names.str()+1, bindings.str()+1);
  1290. CassandraStatement &update = *new CassandraStatement(session->prepareStatement(insertQuery));
  1291. update.bindString(0, xpath);
  1292. StringBuffer ucValue(value);
  1293. ucValue.toUpperCase();
  1294. update.bindString_n(1, ucValue, session->queryPrefixSize());
  1295. update.bindString(2, ucValue);
  1296. update.bindString(3, value);
  1297. batch.append(update);
  1298. }
  1299. extern void childXMLRowtoCassandra(const ICassandraSession *session, CassBatch *batch, const CassandraXmlMapping *mappings, const char *wuid, IPTree &row, const char *userVal)
  1300. {
  1301. StringBuffer bindings;
  1302. StringBuffer names;
  1303. StringBuffer tableName;
  1304. // Note that we bind all fields, even where there is no value in the XML
  1305. // This ensures that values are correctly deleted where necessary - it also has
  1306. // the fortuitous benefit of reducing the number of variants of the query that we need to prepare and cache.
  1307. getBoundFieldNames(session, mappings, names, bindings, NULL, userVal, tableName);
  1308. VStringBuffer insertQuery("INSERT into %s (%s) values (%s);", tableName.str(), names.str()+1, bindings.str()+1);
  1309. CassandraStatement update(session->prepareStatement(insertQuery));
  1310. update.bindInt32(0, rtlHash32VStr(wuid, 0) % session->queryPartitions());
  1311. update.bindString(1, wuid);
  1312. unsigned colidx = 2; // We already bound wuid and partition
  1313. while (mappings[colidx].columnName)
  1314. {
  1315. if (!mappings[colidx].mapper.fromXML(session, &update, colidx, &row, mappings[colidx].xpath, userVal))
  1316. update.bindNull(colidx);
  1317. colidx++;
  1318. }
  1319. check(cass_batch_add_statement(batch, update));
  1320. }
  1321. extern unsigned childCount(const ICassandraSession *session, const CassandraXmlMapping *mappings, const char *wuid)
  1322. {
  1323. VStringBuffer countQuery("SELECT count(*) FROM %s WHERE partition=? AND wuid=?;", queryTableName(mappings));
  1324. CassandraStatement count(session->prepareStatement(countQuery));
  1325. count.bindInt32(0, rtlHash32VStr(wuid, 0) % session->queryPartitions());
  1326. count.bindString(1, wuid);
  1327. CassandraFuture future(cass_session_execute(session->querySession(), count));
  1328. future.wait("select count(*)");
  1329. CassandraResult result(cass_future_get_result(future));
  1330. return getUnsignedResult(NULL, getSingleResult(result));
  1331. }
  1332. extern void childXMLtoCassandra(const ICassandraSession *session, CassBatch *batch, const CassandraXmlMapping *mappings, const char *wuid, IPTreeIterator *elements, const char *userVal)
  1333. {
  1334. if (elements->first())
  1335. {
  1336. do
  1337. {
  1338. childXMLRowtoCassandra(session, batch, mappings, wuid, elements->query(), userVal);
  1339. }
  1340. while (elements->next());
  1341. }
  1342. }
  1343. extern void childXMLtoCassandra(const ICassandraSession *session, CassBatch *batch, const CassandraXmlMapping *mappings, IPTree *inXML, const char *xpath, const char *defaultValue)
  1344. {
  1345. Owned<IPTreeIterator> elements = inXML->getElements(xpath);
  1346. childXMLtoCassandra(session, batch, mappings, inXML->queryName(), elements, defaultValue);
  1347. }
  1348. static IPTree *rowToPTree(const char *xpath, const char *key, const CassandraXmlMapping *mappings, const CassRow *row)
  1349. {
  1350. CassandraIterator cols(cass_iterator_from_row(row));
  1351. Owned<IPTree> xml = createPTree("row"); // May be overwritten below if wuid field is processed
  1352. if (xpath && *xpath && key && *key)
  1353. xml->setProp(xpath, key);
  1354. while (cass_iterator_next(cols))
  1355. {
  1356. assertex(mappings->columnName);
  1357. const CassValue *value = cass_iterator_get_column(cols);
  1358. if (value && !cass_value_is_null(value))
  1359. mappings->mapper.toXML(xml, mappings->xpath, value);
  1360. mappings++;
  1361. }
  1362. return xml.getClear();
  1363. }
  1364. /*
  1365. * PostFilter represents a filter to be applied to a ConstWorkUnitInfo tree representation prior to returning it from an iterator
  1366. */
  1367. interface IPostFilter : public IInterface
  1368. {
  1369. virtual bool matches(IPTree &p) const = 0;
  1370. virtual const char *queryValue() const = 0;
  1371. virtual const char *queryXPath() const = 0;
  1372. virtual WUSortField queryField() const = 0;
  1373. };
  1374. class PostFilter : public CInterfaceOf<IPostFilter>
  1375. {
  1376. public:
  1377. PostFilter(WUSortField _field, const char *_value, bool _wild)
  1378. : field(_field), xpath(queryFilterXPath(_field)), wild(_wild)
  1379. {
  1380. setValue(_value);
  1381. }
  1382. virtual bool matches(IPTree &p) const
  1383. {
  1384. const char *val = p.queryProp(xpath);
  1385. if (val)
  1386. return wild ? WildMatch(val, pattern) : strieq(val, pattern);
  1387. else
  1388. return false;
  1389. }
  1390. virtual const char *queryValue() const
  1391. {
  1392. return value.str();
  1393. }
  1394. void setValue(const char *_value)
  1395. {
  1396. if (wild)
  1397. {
  1398. VStringBuffer filter("*%s*", _value);
  1399. pattern.set(filter);
  1400. }
  1401. else
  1402. pattern.set(_value);
  1403. value.set(_value);
  1404. }
  1405. virtual const char *queryXPath() const
  1406. {
  1407. return xpath;
  1408. }
  1409. virtual WUSortField queryField() const
  1410. {
  1411. return field;
  1412. }
  1413. protected:
  1414. const char *xpath;
  1415. StringAttr pattern;
  1416. StringAttr value;
  1417. WUSortField field;
  1418. bool wild;
  1419. };
  1420. class MultiValuePostFilter : public PostFilter
  1421. {
  1422. public:
  1423. MultiValuePostFilter(WUSortField _field, const char *_value)
  1424. : PostFilter(_field, _value, false)
  1425. {
  1426. setValue(_value);
  1427. }
  1428. virtual bool matches(IPTree &p) const
  1429. {
  1430. const char *val = p.queryProp(xpath);
  1431. if (val)
  1432. {
  1433. ForEachItemIn(idx, values)
  1434. {
  1435. if (strieq(val, values.item(idx)))
  1436. return true;
  1437. }
  1438. }
  1439. return false;
  1440. }
  1441. void setValue(const char *_value)
  1442. {
  1443. values.appendList(_value, "|");
  1444. }
  1445. private:
  1446. StringArray values;
  1447. };
  1448. class AppValuePostFilter : public CInterfaceOf<IPostFilter>
  1449. {
  1450. public:
  1451. AppValuePostFilter(const char *_name, const char *_value, bool _wild) : wild(_wild)
  1452. {
  1453. xpath.appendf("Application/%s", _name);
  1454. setValue(_value);
  1455. }
  1456. virtual bool matches(IPTree &p) const
  1457. {
  1458. const char *val = p.queryProp(xpath);
  1459. if (val)
  1460. return wild ? WildMatch(val, pattern) : strieq(val, pattern);
  1461. else
  1462. return false;
  1463. }
  1464. virtual const char *queryValue() const
  1465. {
  1466. return value.str();
  1467. }
  1468. void setValue(const char *_value)
  1469. {
  1470. if (wild)
  1471. {
  1472. VStringBuffer filter("*%s*", _value);
  1473. pattern.set(filter);
  1474. }
  1475. else
  1476. pattern.set(_value);
  1477. value.set(_value);
  1478. }
  1479. virtual const char *queryXPath() const
  1480. {
  1481. return xpath;
  1482. }
  1483. virtual WUSortField queryField() const
  1484. {
  1485. return WUSFappvalue;
  1486. }
  1487. private:
  1488. StringBuffer xpath;
  1489. StringAttr pattern;
  1490. StringAttr value;
  1491. bool wild;
  1492. };
  1493. class CassSortableIterator : public CassandraIterator
  1494. {
  1495. public:
  1496. CassSortableIterator(CassIterator *_iterator, unsigned _idx, int _compareColumn, bool _descending)
  1497. : CassandraIterator(_iterator), idx(_idx), compareColumn(_compareColumn), descending(_descending)
  1498. {
  1499. }
  1500. const CassSortableIterator *nextRow()
  1501. {
  1502. if (iterator && cass_iterator_next(iterator))
  1503. {
  1504. if (compareColumn != -1)
  1505. {
  1506. const CassRow *row = cass_iterator_get_row(iterator);
  1507. getCassString(value.clear(), cass_row_get_column(row, compareColumn));
  1508. }
  1509. return this;
  1510. }
  1511. else
  1512. return NULL;
  1513. }
  1514. void stop()
  1515. {
  1516. value.clear();
  1517. set(NULL);
  1518. }
  1519. int compare(const CassSortableIterator *to) const
  1520. {
  1521. if (compareColumn==-1)
  1522. return idx - to->idx; // concat mode
  1523. int ret = strcmp(value, to->value); // Note - empty StringBuffer always returns ""
  1524. return descending ? -ret : ret;
  1525. }
  1526. private:
  1527. StringBuffer value;
  1528. unsigned idx;
  1529. int compareColumn;
  1530. bool descending;
  1531. };
  1532. interface IConstWorkUnitIteratorEx : public IConstWorkUnitIterator
  1533. {
  1534. virtual bool hasPostFilters() const = 0;
  1535. virtual bool isMerging() const = 0;
  1536. virtual void notePosition() const = 0;
  1537. };
  1538. /*
  1539. *
  1540. * The cache entries serve two purposes:
  1541. *
  1542. * 1. They allow us to map row numbers to values for the end of each page returned, which can make forward paging efficient when not post-sorting
  1543. * 2. They allow us to preserve post-sort results in order to avoid having to re-retrieve them.
  1544. */
  1545. class CCassandraWuUQueryCacheEntry : public CInterfaceOf<IInterface>
  1546. {
  1547. public:
  1548. CCassandraWuUQueryCacheEntry()
  1549. {
  1550. hint = get_cycles_now(); // MORE - should do better perhaps?
  1551. lastAccess = msTick();
  1552. }
  1553. __int64 queryHint() const
  1554. {
  1555. return hint;
  1556. }
  1557. void noteWuid(const char *wuid, const char *fieldValue, unsigned row)
  1558. {
  1559. CriticalBlock b(crit);
  1560. // NOTE - we store one set of row information per page retrieved - and we normally traverse the pages
  1561. // in order so appending to the end is better than (for example) binchopping
  1562. ForEachItemInRev(idx, rows)
  1563. {
  1564. unsigned foundRow = rows.item(idx);
  1565. if (foundRow==row)
  1566. {
  1567. assert(streq(wuids.item(idx), wuid));
  1568. assert(streq(fieldValues.item(idx), fieldValue));
  1569. return;
  1570. }
  1571. if (foundRow < row)
  1572. break;
  1573. }
  1574. rows.add(row, idx+1);
  1575. wuids.add(wuid, idx+1);
  1576. fieldValues.add(fieldValue ? fieldValue : "", idx+1);
  1577. }
  1578. IConstWorkUnitIteratorEx *getResult() const
  1579. {
  1580. CriticalBlock b(crit);
  1581. return result.getLink();
  1582. }
  1583. void setResult(IConstWorkUnitIteratorEx *_result)
  1584. {
  1585. CriticalBlock b(crit);
  1586. result.set(_result);
  1587. }
  1588. unsigned lookupStartRow(StringBuffer &wuid, StringBuffer &fieldValue, unsigned startOffset) const
  1589. {
  1590. // See if we can provide a base wuid to search above/below
  1591. CriticalBlock b(crit);
  1592. ForEachItemInRev(idx, rows)
  1593. {
  1594. unsigned foundRow = rows.item(idx);
  1595. if (foundRow <= startOffset)
  1596. {
  1597. wuid.set(wuids.item(idx));
  1598. fieldValue.set(fieldValues.item(idx));
  1599. return foundRow;
  1600. }
  1601. }
  1602. return 0;
  1603. }
  1604. void touch()
  1605. {
  1606. lastAccess = msTick();
  1607. }
  1608. inline unsigned queryLastAccess() const
  1609. {
  1610. return lastAccess;
  1611. }
  1612. private:
  1613. mutable CriticalSection crit; // It's POSSIBLE that we could get two queries in hitting the cache at the same time, I think...
  1614. UnsignedArray rows;
  1615. StringArray wuids;
  1616. StringArray fieldValues;
  1617. Owned<IConstWorkUnitIteratorEx> result;
  1618. __uint64 hint;
  1619. unsigned lastAccess;
  1620. };
  1621. class CassMultiIterator : public CInterface, implements IRowProvider, implements ICompare, implements IConstWorkUnitIteratorEx
  1622. {
  1623. public:
  1624. IMPLEMENT_IINTERFACE;
  1625. CassMultiIterator(CCassandraWuUQueryCacheEntry *_cache, unsigned _startRowNum, int _compareColumn, bool _descending)
  1626. : cache(_cache)
  1627. {
  1628. compareColumn = _compareColumn;
  1629. descending = _descending;
  1630. startRowNum = _startRowNum;
  1631. rowNum = 0;
  1632. }
  1633. void setStartOffset(unsigned start)
  1634. {
  1635. startRowNum = start; // we managed to do a seek forward via a filter
  1636. }
  1637. void setCompareColumn(int _compareColumn)
  1638. {
  1639. assert(!inputs.length());
  1640. compareColumn = _compareColumn;
  1641. }
  1642. void addResult(CassandraResult &result)
  1643. {
  1644. results.append(result);
  1645. }
  1646. void addPostFilters(IArrayOf<IPostFilter> &filters, unsigned start)
  1647. {
  1648. unsigned len = filters.length();
  1649. while (start<len)
  1650. postFilters.append(OLINK(filters.item(start++)));
  1651. }
  1652. void addPostFilter(PostFilter &filter)
  1653. {
  1654. postFilters.append(filter);
  1655. }
  1656. virtual bool hasPostFilters() const
  1657. {
  1658. return postFilters.length() != 0;
  1659. }
  1660. virtual bool isMerging() const
  1661. {
  1662. return results.length() > 1;
  1663. }
  1664. virtual bool first()
  1665. {
  1666. inputs.kill();
  1667. ForEachItemIn(idx, results)
  1668. {
  1669. inputs.append(*new CassSortableIterator(cass_iterator_from_result(results.item(idx)), idx, compareColumn, descending));
  1670. }
  1671. merger.setown(createRowStreamMerger(inputs.length(), *this, this, false));
  1672. rowNum = startRowNum;
  1673. return next();
  1674. }
  1675. virtual void notePosition() const
  1676. {
  1677. if (cache && current)
  1678. {
  1679. cache->noteWuid(current->queryWuid(), lastThorTime, rowNum);
  1680. }
  1681. }
  1682. virtual bool next()
  1683. {
  1684. Owned<IConstWorkUnitInfo> last = current.getClear();
  1685. for (;;)
  1686. {
  1687. const CassandraIterator *nextSource = nextMergedSource();
  1688. if (!nextSource)
  1689. {
  1690. if (cache && last)
  1691. {
  1692. cache->noteWuid(last->queryWuid(), lastThorTime, rowNum);
  1693. }
  1694. return false;
  1695. }
  1696. Owned<IPTree> wuXML = rowToPTree(NULL, NULL, workunitInfoMappings+1, cass_iterator_get_row(*nextSource)); // NOTE - this is relying on search mappings and wuInfoMappings being the same
  1697. bool postFiltered = false;
  1698. ForEachItemIn(pfIdx, postFilters)
  1699. {
  1700. if (!postFilters.item(pfIdx).matches(*wuXML))
  1701. {
  1702. postFiltered = true;
  1703. break;
  1704. }
  1705. }
  1706. if (!postFiltered)
  1707. {
  1708. current.setown(createConstWorkUnitInfo(*wuXML));
  1709. lastThorTime.set(wuXML->queryProp("@totalThorTime"));
  1710. rowNum++;
  1711. return true;
  1712. }
  1713. }
  1714. }
  1715. virtual bool isValid()
  1716. {
  1717. return current != NULL;
  1718. }
  1719. virtual IConstWorkUnitInfo & query()
  1720. {
  1721. assertex(current);
  1722. return *current.get();
  1723. }
  1724. const CassandraIterator *nextMergedSource()
  1725. {
  1726. return (const CassSortableIterator *) merger->nextRow();
  1727. }
  1728. protected:
  1729. virtual void linkRow(const void *row) { }
  1730. virtual void releaseRow(const void *row) { }
  1731. virtual const void *nextRow(unsigned idx)
  1732. {
  1733. CassSortableIterator &it = inputs.item(idx);
  1734. return it.nextRow(); // returns either a pointer to the iterator, or NULL
  1735. }
  1736. virtual void stop(unsigned idx)
  1737. {
  1738. inputs.item(idx).stop();
  1739. }
  1740. virtual int docompare(const void *a, const void *b) const
  1741. {
  1742. // a and b point to to CassSortableIterator objects
  1743. const CassSortableIterator *aa = (const CassSortableIterator *) a;
  1744. const CassSortableIterator *bb = (const CassSortableIterator *) b;
  1745. return aa->compare(bb);
  1746. }
  1747. private:
  1748. IArrayOf<CassandraResult> results;
  1749. IArrayOf<CassSortableIterator> inputs;
  1750. Owned<IRowStream> merger; // NOTE - must be destroyed before inputs is destroyed
  1751. IArrayOf<IPostFilter> postFilters;
  1752. Owned<IConstWorkUnitInfo> current;
  1753. Linked<CCassandraWuUQueryCacheEntry> cache;
  1754. StringAttr lastThorTime;
  1755. int compareColumn;
  1756. unsigned startRowNum;
  1757. unsigned rowNum;
  1758. bool descending;
  1759. };
  1760. class CassPostSortIterator : public CInterfaceOf<IConstWorkUnitIteratorEx>, implements ICompare
  1761. {
  1762. public:
  1763. CassPostSortIterator(IConstWorkUnitIterator * _input, unsigned _sortorder, unsigned _limit)
  1764. : input(_input), sortorder(_sortorder), limit(_limit)
  1765. {
  1766. idx = 0;
  1767. }
  1768. virtual bool first()
  1769. {
  1770. if (input)
  1771. {
  1772. readFirst();
  1773. input.clear();
  1774. }
  1775. idx = 0;
  1776. return sorted.isItem(idx);
  1777. }
  1778. virtual bool next()
  1779. {
  1780. idx++;
  1781. if (sorted.isItem(idx))
  1782. return true;
  1783. return false;
  1784. }
  1785. virtual void notePosition() const
  1786. {
  1787. }
  1788. virtual bool isValid()
  1789. {
  1790. return sorted.isItem(idx);
  1791. }
  1792. virtual IConstWorkUnitInfo & query()
  1793. {
  1794. return sorted.item(idx);
  1795. }
  1796. virtual bool hasPostFilters() const
  1797. {
  1798. return false; // they are done by my input. But we may want to rename this function to indicate "may return more than asked" in which case would be true
  1799. }
  1800. virtual bool isMerging() const
  1801. {
  1802. return false;
  1803. }
  1804. private:
  1805. void readFirst()
  1806. {
  1807. ForEach(*input)
  1808. {
  1809. sorted.append(OLINK(input->query()));
  1810. if (sorted.length()>=limit)
  1811. break;
  1812. }
  1813. qsortvec((void **)sorted.getArray(0), sorted.length(), *this);
  1814. }
  1815. virtual int docompare(const void *a, const void *b) const
  1816. {
  1817. // a and b point to to IConstWorkUnitInfo objects
  1818. const IConstWorkUnitInfo *aa = (const IConstWorkUnitInfo *) a;
  1819. const IConstWorkUnitInfo *bb = (const IConstWorkUnitInfo *) b;
  1820. int diff;
  1821. switch (sortorder & 0xff)
  1822. {
  1823. case WUSFuser:
  1824. diff = stricmp(aa->queryUser(), bb->queryUser());
  1825. break;
  1826. case WUSFcluster:
  1827. diff = stricmp(aa->queryClusterName(), bb->queryClusterName());
  1828. break;
  1829. case WUSFjob:
  1830. diff = stricmp(aa->queryJobName(), bb->queryJobName());
  1831. break;
  1832. case WUSFstate:
  1833. diff = stricmp(aa->queryStateDesc(), bb->queryStateDesc());
  1834. break;
  1835. case WUSFprotected:
  1836. diff = (int) bb->isProtected() - (int) aa->isProtected();
  1837. break;
  1838. case WUSFtotalthortime:
  1839. diff = (int) (bb->getTotalThorTime() - bb->getTotalThorTime());
  1840. break;
  1841. case WUSFwuid:
  1842. diff = stricmp(aa->queryWuid(), bb->queryWuid()); // Should never happen, since we always fetch with a wuid sort
  1843. break;
  1844. default:
  1845. throwUnexpected();
  1846. }
  1847. if (sortorder & WUSFreverse)
  1848. return -diff;
  1849. else
  1850. return diff;
  1851. }
  1852. Owned<IConstWorkUnitIterator> input;
  1853. IArrayOf<IConstWorkUnitInfo> sorted;
  1854. unsigned sortorder;
  1855. unsigned idx;
  1856. unsigned limit;
  1857. };
  1858. class SubPageIterator : public CInterfaceOf<IConstWorkUnitIteratorEx>
  1859. {
  1860. public:
  1861. SubPageIterator(IConstWorkUnitIteratorEx *_input, unsigned _startOffset, unsigned _pageSize)
  1862. : input(_input), startOffset(_startOffset), pageSize(_pageSize), idx(0)
  1863. {
  1864. }
  1865. virtual bool first()
  1866. {
  1867. idx = 0;
  1868. // MORE - put a seek into the Ex interface
  1869. if (input->first())
  1870. {
  1871. for (int i = 0; i < startOffset;i++)
  1872. {
  1873. if (!input->next())
  1874. return false;
  1875. }
  1876. return true;
  1877. }
  1878. else
  1879. return false;
  1880. }
  1881. virtual bool next()
  1882. {
  1883. idx++;
  1884. if (idx >= pageSize)
  1885. {
  1886. input->notePosition();
  1887. return false;
  1888. }
  1889. return input->next();
  1890. }
  1891. virtual void notePosition() const
  1892. {
  1893. input->notePosition();
  1894. }
  1895. virtual bool isValid()
  1896. {
  1897. return idx < pageSize && input->isValid();
  1898. }
  1899. virtual IConstWorkUnitInfo & query()
  1900. {
  1901. return input->query();
  1902. }
  1903. virtual bool hasPostFilters() const
  1904. {
  1905. return false;
  1906. }
  1907. virtual bool isMerging() const
  1908. {
  1909. return false;
  1910. }
  1911. private:
  1912. Owned<IConstWorkUnitIteratorEx> input;
  1913. unsigned startOffset;
  1914. unsigned pageSize;
  1915. unsigned idx;
  1916. };
  1917. class CassJoinIterator : implements IConstWorkUnitIteratorEx, public CInterface
  1918. {
  1919. public:
  1920. IMPLEMENT_IINTERFACE;
  1921. CassJoinIterator(unsigned _compareColumn, bool _descending)
  1922. {
  1923. compareColumn = _compareColumn;
  1924. descending = _descending;
  1925. }
  1926. void addResult(CassandraResult &result)
  1927. {
  1928. results.append(result);
  1929. }
  1930. void addPostFilter(IPostFilter &post)
  1931. {
  1932. postFilters.append(post);
  1933. }
  1934. virtual bool first()
  1935. {
  1936. if (!results.length())
  1937. return false;
  1938. inputs.kill();
  1939. ForEachItemIn(idx, results)
  1940. {
  1941. Owned <CassSortableIterator> input = new CassSortableIterator(cass_iterator_from_result(results.item(idx)), idx, compareColumn, descending);
  1942. if (!input->nextRow())
  1943. return false;
  1944. inputs.append(*input.getClear());
  1945. }
  1946. return next();
  1947. }
  1948. virtual bool next()
  1949. {
  1950. current.clear();
  1951. for (;;)
  1952. {
  1953. unsigned idx = 0;
  1954. unsigned target = 0;
  1955. unsigned matches = 1; // I always match myself!
  1956. unsigned sources = inputs.length();
  1957. if (!sources)
  1958. return false;
  1959. while (matches < sources)
  1960. {
  1961. idx++;
  1962. if (idx==sources)
  1963. idx = 0;
  1964. int diff;
  1965. for (;;)
  1966. {
  1967. assert(idx != target);
  1968. diff = inputs.item(idx).compare(&inputs.item(target));
  1969. if (diff >= 0)
  1970. break;
  1971. if (!inputs.item(idx).nextRow())
  1972. {
  1973. inputs.kill(); // Once any reaches EOF, we are done
  1974. return false;
  1975. }
  1976. }
  1977. if (diff > 0)
  1978. {
  1979. target = idx;
  1980. matches = 1;
  1981. }
  1982. else
  1983. matches++;
  1984. }
  1985. Owned<IPTree> wuXML = rowToPTree(NULL, NULL, workunitInfoMappings+1, cass_iterator_get_row(inputs.item(0)));
  1986. bool postFiltered = false;
  1987. ForEachItemIn(pfIdx, postFilters)
  1988. {
  1989. if (!postFilters.item(pfIdx).matches(*wuXML))
  1990. {
  1991. postFiltered = true;
  1992. break;
  1993. }
  1994. }
  1995. if (!postFiltered)
  1996. {
  1997. current.setown(createConstWorkUnitInfo(*wuXML));
  1998. ForEachItemIn(idx2, inputs)
  1999. {
  2000. if (!inputs.item(idx2).nextRow())
  2001. {
  2002. inputs.clear(); // Make sure next() fails next time it is called
  2003. break;
  2004. }
  2005. }
  2006. return true;
  2007. }
  2008. }
  2009. }
  2010. virtual bool isValid()
  2011. {
  2012. return current != NULL;
  2013. }
  2014. virtual IConstWorkUnitInfo & query()
  2015. {
  2016. assertex(current);
  2017. return *current.get();
  2018. }
  2019. private:
  2020. IArrayOf<CassandraResult> results;
  2021. IArrayOf<CassSortableIterator> inputs;
  2022. IArrayOf<IPostFilter> postFilters;
  2023. Owned<IConstWorkUnitInfo> current;
  2024. unsigned compareColumn;
  2025. bool descending;
  2026. };
  2027. static void lockWuid(Owned<IRemoteConnection> &connection, const char *wuid)
  2028. {
  2029. VStringBuffer wuRoot("/WorkUnitLocks/%s", wuid);
  2030. if (connection)
  2031. connection->changeMode(RTM_LOCK_WRITE, SDS_LOCK_TIMEOUT); // Would it ever be anything else?
  2032. else
  2033. connection.setown(querySDS().connect(wuRoot.str(), myProcessSession(), RTM_LOCK_WRITE|RTM_CREATE_QUERY, SDS_LOCK_TIMEOUT));
  2034. if (!connection)
  2035. throw makeStringExceptionV(WUERR_LockFailed, "Failed to get connection for xpath %s", wuRoot.str());
  2036. }
  2037. class CCassandraWorkUnit : public CPersistedWorkUnit
  2038. {
  2039. public:
  2040. CCassandraWorkUnit(ICassandraSession *_sessionCache, IPTree *wuXML, ISecManager *secmgr, ISecUser *secuser, IRemoteConnection *_daliLock, bool _allDirty)
  2041. : sessionCache(_sessionCache), CPersistedWorkUnit(secmgr, secuser), daliLock(_daliLock), allDirty(_allDirty)
  2042. {
  2043. CPersistedWorkUnit::loadPTree(wuXML);
  2044. memset(childLoaded, 0, sizeof(childLoaded));
  2045. actionChanged = false;
  2046. stateChanged = false;
  2047. abortDirty = false;
  2048. }
  2049. ~CCassandraWorkUnit()
  2050. {
  2051. }
  2052. virtual void forceReload()
  2053. {
  2054. synchronized sync(locked); // protect locked workunits (uncommitted writes) from reload
  2055. loadPTree(sessionCache->cassandraToWorkunitXML(queryWuid()));
  2056. memset(childLoaded, 0, sizeof(childLoaded));
  2057. allDirty = false;
  2058. actionChanged = false;
  2059. stateChanged = false;
  2060. abortDirty = true;
  2061. }
  2062. void executeBatch(CassandraBatch &batch, const char * what) const
  2063. {
  2064. if (sessionCache->queryTraceLevel() > 1)
  2065. DBGLOG("Executing batch %s", what);
  2066. batch.execute(sessionCache->querySession(), what);
  2067. }
  2068. void executeAsync(CIArrayOf<CassandraStatement> &batch, const char * what) const
  2069. {
  2070. if (sessionCache->queryTraceLevel() > 1)
  2071. DBGLOG("Executing async batch %s (%d elements)", what, batch.length());
  2072. sessionCache->executeAsync(batch, what);
  2073. }
  2074. virtual void cleanupAndDelete(bool deldll, bool deleteOwned, const StringArray *deleteExclusions)
  2075. {
  2076. const char *wuid = queryWuid();
  2077. CPersistedWorkUnit::cleanupAndDelete(deldll, deleteOwned, deleteExclusions);
  2078. // Note we need to gather the information about what secondaries to delete before we delete the parent/children,
  2079. // but we actually do the deletion afterwards
  2080. CIArrayOf<CassandraStatement> deleteSearches;
  2081. deleteSecondaries(wuid, deleteSearches);
  2082. CassandraBatch mainBatch(CASS_BATCH_TYPE_UNLOGGED);
  2083. deleteChildren(wuid, mainBatch);
  2084. sessionCache->deleteChildByWuid(wuGraphProgressMappings, wuid, mainBatch);
  2085. sessionCache->deleteChildByWuid(wuGraphStateMappings, wuid, mainBatch);
  2086. sessionCache->deleteChildByWuid(wuGraphRunningMappings, wuid, mainBatch);
  2087. // If the partitioning of the main workunits table does not match the partitioning of the other tables, then would be better to
  2088. // execute the deletes of the child tables and the main record as two separate batches.
  2089. CassandraStatement update(sessionCache->prepareStatement("DELETE from workunits where partition=? and wuid=?;"));
  2090. update.bindInt32(0, rtlHash32VStr(wuid, 0) % sessionCache->queryPartitions());
  2091. update.bindString(1, wuid);
  2092. check(cass_batch_add_statement(mainBatch, update));
  2093. executeBatch(mainBatch, "delete wu");
  2094. executeAsync(deleteSearches, "delete wu");
  2095. }
  2096. virtual void commit()
  2097. {
  2098. CPersistedWorkUnit::commit();
  2099. if (sessionCache->queryTraceLevel() >= 8)
  2100. {
  2101. StringBuffer s; toXML(p, s); DBGLOG("CCassandraWorkUnit::commit\n%s", s.str());
  2102. }
  2103. CIArrayOf<CassandraStatement> secondaryBatch;
  2104. CassandraBatch batch(CASS_BATCH_TYPE_UNLOGGED);
  2105. Owned<CassandraBatch> deletesBatch;
  2106. const char *wuid = queryWuid();
  2107. bool isGlobal = streq(wuid, GLOBAL_WORKUNIT);
  2108. if (!isGlobal) // Global workunit only has child rows, no parent
  2109. {
  2110. if (prev) // Holds the values of the "basic" info at the last commit
  2111. updateSecondaries(wuid, secondaryBatch);
  2112. simpleXMLtoCassandra(sessionCache, batch, workunitsMappings, p); // This just does the parent row
  2113. }
  2114. if (allDirty && !isGlobal)
  2115. {
  2116. // MORE - this delete is technically correct, but if we assert that the only place that copyWorkUnit is used is to populate an
  2117. // empty newly-created WU, it is unnecessary.
  2118. // deleteChildren(wuid, deletesBatch);
  2119. // MORE can use the table?
  2120. childXMLtoCassandra(sessionCache, batch, wuGraphsMappings, p, "Graphs/Graph", 0);
  2121. childXMLtoCassandra(sessionCache, batch, wuResultsMappings, p, "Results/Result", "0");
  2122. childXMLtoCassandra(sessionCache, batch, wuVariablesMappings, p, "Variables/Variable", "-1"); // ResultSequenceStored
  2123. childXMLtoCassandra(sessionCache, batch, wuTemporariesMappings, p, "Temporaries/Variable", "-3"); // ResultSequenceInternal // NOTE - lookups may also request ResultSequenceOnce
  2124. childXMLtoCassandra(sessionCache, batch, wuExceptionsMappings, p, "Exceptions/Exception", 0);
  2125. childXMLtoCassandra(sessionCache, batch, wuStatisticsMappings, p, "Statistics/Statistic", 0);
  2126. childXMLtoCassandra(sessionCache, batch, wuFilesReadMappings, p, "FilesRead/File", 0);
  2127. childXMLtoCassandra(sessionCache, batch, wuFilesWrittenMappings, p, "Files/File", 0);
  2128. childXMLtoCassandra(sessionCache, batch, wuFieldUsageMappings, p, "usedsources/datasource", 0);
  2129. IPTree *query = p->queryPropTree("Query");
  2130. if (query)
  2131. childXMLRowtoCassandra(sessionCache, batch, wuQueryMappings, wuid, *query, 0);
  2132. }
  2133. else
  2134. {
  2135. HashIterator iter(dirtyPaths);
  2136. ForEach (iter)
  2137. {
  2138. const char *path = (const char *) iter.query().getKey();
  2139. const CassandraXmlMapping *table = *dirtyPaths.mapToValue(&iter.query());
  2140. if (sessionCache->queryTraceLevel()>2)
  2141. DBGLOG("Updating dirty path %s", path);
  2142. if (*path == '*')
  2143. {
  2144. if (!deletesBatch)
  2145. deletesBatch.setown(new CassandraBatch(CASS_BATCH_TYPE_UNLOGGED));
  2146. sessionCache->deleteChildByWuid(table, wuid, *deletesBatch);
  2147. childXMLtoCassandra(sessionCache, batch, table, p, path+1, 0);
  2148. }
  2149. else
  2150. {
  2151. IPTree *dirty = p->queryPropTree(path);
  2152. if (dirty)
  2153. childXMLRowtoCassandra(sessionCache, batch, table, wuid, *dirty, 0);
  2154. else if (sessionCache->queryTraceLevel())
  2155. {
  2156. StringBuffer xml;
  2157. toXML(p, xml);
  2158. DBGLOG("Missing dirty element %s in %s", path, xml.str());
  2159. }
  2160. }
  2161. }
  2162. ForEachItemIn(d, dirtyResults)
  2163. {
  2164. IWUResult &result = dirtyResults.item(d);
  2165. switch (result.getResultSequence())
  2166. {
  2167. case ResultSequenceStored:
  2168. childXMLRowtoCassandra(sessionCache, batch, wuVariablesMappings, wuid, *result.queryPTree(), "-1");
  2169. break;
  2170. case ResultSequenceInternal:
  2171. case ResultSequenceOnce:
  2172. childXMLRowtoCassandra(sessionCache, batch, wuTemporariesMappings, wuid, *result.queryPTree(), "-3");
  2173. break;
  2174. default:
  2175. childXMLRowtoCassandra(sessionCache, batch, wuResultsMappings, wuid, *result.queryPTree(), "0");
  2176. break;
  2177. }
  2178. }
  2179. }
  2180. if (sessionCache->queryTraceLevel() > 1)
  2181. DBGLOG("Executing commit batches");
  2182. if (deletesBatch)
  2183. {
  2184. CassandraFuture futureBatch(cass_session_execute_batch(sessionCache->querySession(), *deletesBatch));
  2185. futureBatch.wait("commit deletes");
  2186. }
  2187. CassandraFuture futureBatch(cass_session_execute_batch(sessionCache->querySession(), batch));
  2188. futureBatch.wait("commit updates");
  2189. executeAsync(secondaryBatch, "commit");
  2190. if (stateChanged)
  2191. {
  2192. // Signal changes to state to anyone that might be watching via Dali
  2193. VStringBuffer xpath("/WorkUnitStates/%s/State", wuid);
  2194. Owned<IRemoteConnection> conn = querySDS().connect(xpath.str(), myProcessSession(), RTM_LOCK_WRITE|RTM_CREATE, SDS_LOCK_TIMEOUT);
  2195. conn->queryRoot()->setProp(NULL, p->queryProp("@state"));
  2196. }
  2197. if (actionChanged)
  2198. {
  2199. // Signal changes to action to anyone that might be watching via Dali
  2200. VStringBuffer xpath("/WorkUnitStates/%s/Action", wuid);
  2201. Owned<IRemoteConnection> conn = querySDS().connect(xpath.str(), myProcessSession(), RTM_LOCK_WRITE|RTM_CREATE, SDS_LOCK_TIMEOUT);
  2202. conn->queryRoot()->setProp(NULL, p->queryProp("Action"));
  2203. }
  2204. prev.clear();
  2205. allDirty = false;
  2206. stateChanged = false;
  2207. actionChanged = false;
  2208. dirtyPaths.kill();
  2209. dirtyResults.kill();
  2210. }
  2211. virtual IConstWUGraph *getGraph(const char *qname) const
  2212. {
  2213. // Just because we read one graph, does not mean we are likely to read more. So don't cache this result.
  2214. // Also note that graphs are generally read-only
  2215. CassandraResult result(sessionCache->fetchDataForWuidAndKey(wuGraphsMappings, queryWuid(), qname));
  2216. const CassRow *row = cass_result_first_row(result);
  2217. if (row)
  2218. {
  2219. Owned<IPTree> graph = createPTree("Graph");
  2220. unsigned colidx = 2; // We did not fetch wuid or partition
  2221. CassandraIterator cols(cass_iterator_from_row(row));
  2222. while (cass_iterator_next(cols))
  2223. {
  2224. assertex(wuGraphsMappings[colidx].columnName);
  2225. const CassValue *value = cass_iterator_get_column(cols);
  2226. if (value && !cass_value_is_null(value))
  2227. wuGraphsMappings[colidx].mapper.toXML(graph, wuGraphsMappings[colidx].xpath, value);
  2228. colidx++;
  2229. }
  2230. return new CLocalWUGraph(*this, graph.getClear());
  2231. }
  2232. else
  2233. return NULL;
  2234. }
  2235. virtual unsigned getResultCount() const
  2236. {
  2237. return childCount(sessionCache, wuResultsMappings, queryWuid());
  2238. }
  2239. virtual unsigned getGraphCount() const
  2240. {
  2241. return childCount(sessionCache, wuGraphsMappings, queryWuid());
  2242. }
  2243. virtual unsigned getSourceFileCount() const
  2244. {
  2245. return childCount(sessionCache, wuFilesReadMappings, queryWuid());
  2246. }
  2247. virtual unsigned getVariableCount() const
  2248. {
  2249. return childCount(sessionCache, wuVariablesMappings, queryWuid());
  2250. }
  2251. virtual void setUser(const char *user)
  2252. {
  2253. if (trackSecondaryChange(user, "@submitID"))
  2254. CPersistedWorkUnit::setUser(user);
  2255. }
  2256. virtual void setClusterName(const char *cluster)
  2257. {
  2258. if (trackSecondaryChange(cluster, "@clusterName"))
  2259. CPersistedWorkUnit::setClusterName(cluster);
  2260. }
  2261. virtual void setJobName(const char *jobname)
  2262. {
  2263. if (trackSecondaryChange(jobname, "@jobName"))
  2264. CPersistedWorkUnit::setJobName(jobname);
  2265. }
  2266. virtual void setState(WUState state)
  2267. {
  2268. if (trackSecondaryChange(getWorkunitStateStr(state), "@state"))
  2269. {
  2270. stateChanged = true;
  2271. CPersistedWorkUnit::setState(state);
  2272. }
  2273. }
  2274. virtual void setAction(WUAction action)
  2275. {
  2276. actionChanged = true;
  2277. CPersistedWorkUnit::setAction(action);
  2278. }
  2279. virtual void setApplicationValue(const char *app, const char *propname, const char *value, bool overwrite)
  2280. {
  2281. VStringBuffer xpath("Application/%s/%s", app, propname);
  2282. if (trackSecondaryChange(value, xpath))
  2283. CPersistedWorkUnit::setApplicationValue(app, propname, value, overwrite);
  2284. }
  2285. virtual void _lockRemote()
  2286. {
  2287. lockWuid(daliLock, queryWuid());
  2288. }
  2289. virtual void _unlockRemote()
  2290. {
  2291. commit();
  2292. if (daliLock)
  2293. {
  2294. daliLock->close(true);
  2295. daliLock.clear();
  2296. }
  2297. }
  2298. virtual void createGraph(const char * name, const char *label, WUGraphType type, IPropertyTree *xgmml, unsigned wfid)
  2299. {
  2300. CPersistedWorkUnit::createGraph(name, label, type, xgmml, wfid);
  2301. VStringBuffer xpath("Graphs/Graph[@name='%s']", name);
  2302. noteDirty(xpath, wuGraphsMappings);
  2303. }
  2304. virtual IWUResult * updateResultByName(const char * name)
  2305. {
  2306. return noteDirty(CPersistedWorkUnit::updateResultByName(name));
  2307. }
  2308. virtual IWUResult * updateResultBySequence(unsigned seq)
  2309. {
  2310. return noteDirty(CPersistedWorkUnit::updateResultBySequence(seq));
  2311. }
  2312. virtual IWUResult * updateTemporaryByName(const char * name)
  2313. {
  2314. return noteDirty(CPersistedWorkUnit::updateTemporaryByName(name));
  2315. }
  2316. virtual IWUResult * updateVariableByName(const char * name)
  2317. {
  2318. return noteDirty(CPersistedWorkUnit::updateVariableByName(name));
  2319. }
  2320. virtual IWUQuery * updateQuery()
  2321. {
  2322. noteDirty("Query", wuQueryMappings);
  2323. return CPersistedWorkUnit::updateQuery();
  2324. }
  2325. virtual IConstWUQuery *getQuery() const
  2326. {
  2327. checkChildLoaded(wuQueriesTable);
  2328. return CPersistedWorkUnit::getQuery();
  2329. }
  2330. virtual IConstWUFileUsageIterator * getFieldUsage() const
  2331. {
  2332. checkChildLoaded(wuFieldUsageTable);
  2333. return CPersistedWorkUnit::getFieldUsage();
  2334. }
  2335. virtual IWUException *createException()
  2336. {
  2337. IWUException *result = CPersistedWorkUnit::createException();
  2338. VStringBuffer xpath("Exceptions/Exception[@sequence='%d']", result->getSequence());
  2339. noteDirty(xpath, wuExceptionsMappings);
  2340. return result;
  2341. }
  2342. virtual void copyWorkUnit(IConstWorkUnit *cached, bool copyStats, bool all)
  2343. {
  2344. // Make sure that any required updates to the secondary files happen
  2345. IPropertyTree *fromP = queryExtendedWU(cached)->queryPTree();
  2346. for (const char * const *search = searchPaths; *search; search++)
  2347. trackSecondaryChange(fromP->queryProp(*search), *search);
  2348. for (const ChildTableInfo * const * table = childTables; *table != NULL; table++)
  2349. checkChildLoaded(**table);
  2350. CPersistedWorkUnit::copyWorkUnit(cached, copyStats, all);
  2351. memset(childLoaded, 1, sizeof(childLoaded));
  2352. allDirty = true;
  2353. actionChanged = true;
  2354. stateChanged = true;
  2355. }
  2356. virtual void noteFileRead(IDistributedFile *file)
  2357. {
  2358. if (file)
  2359. {
  2360. childLoaded[WuFilesReadChild] = true; // Prevent duplicates if someone tries to read back files read (unlikely)
  2361. CPersistedWorkUnit::noteFileRead(file);
  2362. VStringBuffer xpath("FilesRead/File[@name='%s']", file->queryLogicalName());
  2363. noteDirty(xpath, wuFilesReadMappings);
  2364. }
  2365. else
  2366. {
  2367. // A hack for testing!
  2368. Owned<IPropertyTreeIterator> files = p->getElements("FilesRead/File");
  2369. ForEach(*files)
  2370. {
  2371. VStringBuffer xpath("FilesRead/File[@name='%s']", files->query().queryProp("@name"));
  2372. noteDirty(xpath, wuFilesReadMappings);
  2373. }
  2374. }
  2375. }
  2376. virtual void addFile(const char *fileName, StringArray *clusters, unsigned usageCount, WUFileKind fileKind, const char *graphOwner)
  2377. {
  2378. if (fileName)
  2379. {
  2380. childLoaded[WuFilesWrittenChild] = true; // Prevent duplicates if someone tries to read back files written from same object (unlikely)
  2381. CPersistedWorkUnit::addFile(fileName, clusters, usageCount, fileKind, graphOwner);
  2382. VStringBuffer xpath("Files/File[@name='%s']", fileName);
  2383. noteDirty(xpath, wuFilesWrittenMappings);
  2384. }
  2385. }
  2386. virtual void clearGraphProgress() const
  2387. {
  2388. const char *wuid = queryWuid();
  2389. CassandraBatch batch(CASS_BATCH_TYPE_UNLOGGED);
  2390. sessionCache->deleteChildByWuid(wuGraphProgressMappings, wuid, batch);
  2391. sessionCache->deleteChildByWuid(wuGraphStateMappings, wuid, batch);
  2392. sessionCache->deleteChildByWuid(wuGraphRunningMappings, wuid, batch);
  2393. executeBatch(batch, "clearGraphProgress");
  2394. }
  2395. virtual bool getRunningGraph(IStringVal &graphName, WUGraphIDType &subId) const
  2396. {
  2397. CassandraStatement statement(sessionCache->prepareStatement("SELECT graphID, subgraphID FROM wuGraphRunning where partition=? and wuid=?;"));
  2398. const char *wuid = queryWuid();
  2399. statement.bindInt32(0, rtlHash32VStr(wuid, 0) % sessionCache->queryPartitions());
  2400. statement.bindString(1, wuid);
  2401. CassandraFuture future(cass_session_execute(sessionCache->querySession(), statement));
  2402. future.wait("getRunningGraph");
  2403. CassandraResult result(cass_future_get_result(future));
  2404. if (cass_result_row_count(result))
  2405. {
  2406. const CassRow *row = cass_result_first_row(result);
  2407. assertex(row);
  2408. StringBuffer b;
  2409. getCassString(b, cass_row_get_column(row, 0));
  2410. graphName.set(b);
  2411. subId = getUnsignedResult(NULL, cass_row_get_column(row, 1));
  2412. return true;
  2413. }
  2414. else
  2415. return false;
  2416. }
  2417. virtual IConstWUGraphProgress *getGraphProgress(const char *graphName) const
  2418. {
  2419. CassandraStatement statement(sessionCache->prepareStatement("SELECT subgraphID, creator, progress FROM wuGraphProgress where partition=? and wuid=? and graphID=?;"));
  2420. const char *wuid = queryWuid();
  2421. statement.bindInt32(0, rtlHash32VStr(wuid, 0) % sessionCache->queryPartitions());
  2422. statement.bindString(1, wuid);
  2423. statement.bindString(2, graphName);
  2424. CassandraFuture future(cass_session_execute(sessionCache->querySession(), statement));
  2425. future.wait("getGraphProgress");
  2426. CassandraResult result(cass_future_get_result(future));
  2427. CassandraIterator rows(cass_iterator_from_result(result));
  2428. if (!cass_result_row_count(result))
  2429. return NULL;
  2430. Owned<IPropertyTree> progress = createPTree(graphName);
  2431. progress->setPropBool("@stats", true);
  2432. progress->setPropInt("@format", PROGRESS_FORMAT_V);
  2433. while (cass_iterator_next(rows))
  2434. {
  2435. const CassRow *row = cass_iterator_get_row(rows);
  2436. WUGraphIDType subId = getUnsignedResult(NULL, cass_row_get_column(row, 0));
  2437. StringBuffer creator, xml;
  2438. getCassString(creator, cass_row_get_column(row, 1));
  2439. getCassString(xml, cass_row_get_column(row, 2));
  2440. IPTree *stats = createPTreeFromXMLString(xml);
  2441. // We could check that atoi(stats->queryName()+2)==subgraphID, and that stats->queryProp(@creator)==creator)....
  2442. progress->addPropTree(stats->queryName(), stats);
  2443. }
  2444. return createConstGraphProgress(queryWuid(), graphName, progress); // Links progress
  2445. }
  2446. WUGraphState queryGraphState(const char *graphName) const
  2447. {
  2448. return queryNodeState(graphName, 0);
  2449. }
  2450. WUGraphState queryNodeState(const char *graphName, WUGraphIDType nodeId) const
  2451. {
  2452. CassandraStatement statement(sessionCache->prepareStatement("SELECT state FROM wuGraphState where partition=? and wuid=? and graphID=? and subgraphID=?;"));
  2453. const char *wuid = queryWuid();
  2454. statement.bindInt32(0, rtlHash32VStr(wuid, 0) % sessionCache->queryPartitions());
  2455. statement.bindString(1, wuid);
  2456. statement.bindString(2, graphName);
  2457. statement.bindInt64(3, nodeId);
  2458. CassandraFuture future(cass_session_execute(sessionCache->querySession(), statement));
  2459. future.wait("queryNodeState");
  2460. CassandraResult result(cass_future_get_result(future));
  2461. if (cass_result_row_count(result))
  2462. return (WUGraphState) getUnsignedResult(NULL, getSingleResult(result));
  2463. else
  2464. return WUGraphUnknown;
  2465. }
  2466. void setGraphState(const char *graphName, WUGraphState state) const
  2467. {
  2468. setNodeState(graphName, 0, state);
  2469. }
  2470. void setNodeState(const char *graphName, WUGraphIDType nodeId, WUGraphState state) const
  2471. {
  2472. CassandraStatement statement(sessionCache->prepareStatement("INSERT INTO wuGraphState (partition, wuid, graphID, subgraphID, state) values (?,?,?,?,?);"));
  2473. const char *wuid = queryWuid();
  2474. statement.bindInt32(0, rtlHash32VStr(wuid, 0) % sessionCache->queryPartitions());
  2475. statement.bindString(1, wuid);
  2476. statement.bindString(2, graphName);
  2477. statement.bindInt64(3, nodeId);
  2478. statement.bindInt32(4, (int) state);
  2479. CassandraFuture future(cass_session_execute(sessionCache->querySession(), statement));
  2480. future.wait("setNodeState update state");
  2481. if (nodeId)
  2482. {
  2483. switch (state)
  2484. {
  2485. case WUGraphRunning:
  2486. {
  2487. CassandraStatement statement2(sessionCache->prepareStatement("INSERT INTO wuGraphRunning (partition, wuid, graphID, subgraphID) values (?,?,?,?);"));
  2488. statement2.bindInt32(0, rtlHash32VStr(wuid, 0) % sessionCache->queryPartitions());
  2489. statement2.bindString(1, wuid);
  2490. statement2.bindString(2, graphName);
  2491. statement2.bindInt64(3, nodeId);
  2492. CassandraFuture future(cass_session_execute(sessionCache->querySession(), statement2));
  2493. future.wait("setNodeState update running");
  2494. break;
  2495. }
  2496. case WUGraphComplete:
  2497. {
  2498. CassandraStatement statement3(sessionCache->prepareStatement("DELETE FROM wuGraphRunning where partition=? and wuid=?;"));
  2499. statement3.bindInt32(0, rtlHash32VStr(wuid, 0) % sessionCache->queryPartitions());
  2500. statement3.bindString(1, wuid);
  2501. CassandraFuture future(cass_session_execute(sessionCache->querySession(), statement3));
  2502. future.wait("setNodeState remove running");
  2503. break;
  2504. }
  2505. }
  2506. }
  2507. }
  2508. class CCassandraWuGraphStats : public CWuGraphStats
  2509. {
  2510. public:
  2511. CCassandraWuGraphStats(const CCassandraWorkUnit *_parent, StatisticCreatorType _creatorType, const char * _creator, unsigned _wfid, const char * _rootScope, unsigned _id)
  2512. : CWuGraphStats(createPTree(_rootScope), _creatorType, _creator, _wfid, _rootScope, _id),
  2513. parent(_parent)
  2514. {
  2515. }
  2516. virtual void beforeDispose()
  2517. {
  2518. CWuGraphStats::beforeDispose(); // Sets up progress - should contain a single child tree sqNN where nn==id
  2519. parent->setGraphProgress(progress, progress->queryName(), id, creator);
  2520. }
  2521. protected:
  2522. Linked<const CCassandraWorkUnit> parent;
  2523. StringAttr wuid;
  2524. };
  2525. IWUGraphStats *updateStats(const char *graphName, StatisticCreatorType creatorType, const char * creator, unsigned wfid, unsigned subgraph) const override
  2526. {
  2527. return new CCassandraWuGraphStats(this, creatorType, creator, wfid, graphName, subgraph);
  2528. }
  2529. virtual void _loadFilesRead() const
  2530. {
  2531. checkChildLoaded(wuFilesReadTable); // Lazy populate the FilesRead branch of p from Cassandra
  2532. CPersistedWorkUnit::_loadFilesRead();
  2533. }
  2534. virtual void _loadFilesWritten() const
  2535. {
  2536. checkChildLoaded(wuFilesWrittenTable); // Lazy populate the Files branch of p from Cassandra
  2537. CPersistedWorkUnit::_loadFilesWritten();
  2538. }
  2539. virtual void _loadResults() const
  2540. {
  2541. checkChildLoaded(wuResultsTable); // Lazy populate the Results branch of p from Cassandra
  2542. CPersistedWorkUnit::_loadResults();
  2543. }
  2544. virtual void _loadGraphs(bool heavy) const
  2545. {
  2546. // Lazy populate the Graphs branch of p from Cassandra
  2547. if (heavy)
  2548. {
  2549. // If we loaded light before, and are now loading heavy, we need to force the reload. Unlikely to happen in practice.
  2550. if (graphsCached==1)
  2551. {
  2552. p->removeProp("Graphs");
  2553. childLoaded[WuGraphsChild] = false;
  2554. }
  2555. checkChildLoaded(wuGraphsTable);
  2556. }
  2557. else
  2558. {
  2559. checkChildLoaded(wuGraphMetasTable);
  2560. }
  2561. CPersistedWorkUnit::_loadGraphs(heavy);
  2562. }
  2563. virtual void _loadVariables() const
  2564. {
  2565. checkChildLoaded(wuVariablesTable); // Lazy populate the Variables branch of p from Cassandra
  2566. CPersistedWorkUnit::_loadVariables();
  2567. }
  2568. virtual void _loadTemporaries() const
  2569. {
  2570. checkChildLoaded(wuTemporariesTable); // Lazy populate the Temporaries branch of p from Cassandra
  2571. CPersistedWorkUnit::_loadTemporaries();
  2572. }
  2573. virtual void _loadStatistics() const
  2574. {
  2575. checkChildLoaded(wuStatisticsTable); // Lazy populate the Statistics branch of p from Cassandra
  2576. CPersistedWorkUnit::_loadStatistics();
  2577. }
  2578. virtual void _loadExceptions() const
  2579. {
  2580. checkChildLoaded(wuExceptionsTable); // Lazy populate the Exceptions branch of p from Cassandra
  2581. CPersistedWorkUnit::_loadExceptions();
  2582. }
  2583. virtual void clearExceptions()
  2584. {
  2585. CriticalBlock b(crit);
  2586. noteDirty("*Exceptions/Exception", wuExceptionsMappings);
  2587. CPersistedWorkUnit::clearExceptions();
  2588. }
  2589. virtual IPropertyTree *getUnpackedTree(bool includeProgress) const
  2590. {
  2591. // If anyone wants the whole ptree, we'd better make sure we have fully loaded it...
  2592. CriticalBlock b(crit);
  2593. for (const ChildTableInfo * const * table = childTables; *table != NULL; table++)
  2594. checkChildLoaded(**table);
  2595. return CPersistedWorkUnit::getUnpackedTree(includeProgress);
  2596. }
  2597. virtual IPropertyTree *queryPTree() const
  2598. {
  2599. // If anyone wants the whole ptree, we'd better make sure we have fully loaded it...
  2600. CriticalBlock b(crit);
  2601. for (const ChildTableInfo * const * table = childTables; *table != NULL; table++)
  2602. checkChildLoaded(**table);
  2603. // And a hack for the fact that Dali stores state in both @state and <state>
  2604. const char *stateStr = p->queryProp("@state");
  2605. if (stateStr)
  2606. p->setProp("State", stateStr);
  2607. return p;
  2608. }
  2609. void setGraphProgress(IPropertyTree *progress, const char *gid, unsigned subid, const char *creator) const
  2610. {
  2611. const char *wuid=queryWuid();
  2612. CassandraStatement statement(sessionCache->prepareStatement("INSERT INTO wuGraphProgress (partition, wuid, graphID, subgraphID, creator, progress) values (?,?,?,?,?,?);"));
  2613. statement.bindInt32(0, rtlHash32VStr(wuid, 0) % sessionCache->queryPartitions());
  2614. statement.bindString(1, wuid);
  2615. statement.bindString(2, gid);
  2616. statement.bindInt64(3, subid);
  2617. statement.bindString(4, creator);
  2618. StringBuffer tag;
  2619. tag.append("sg").append(subid);
  2620. IPTree *sq = progress->queryPropTree(tag);
  2621. assertex(sq);
  2622. StringBuffer xml;
  2623. toXML(sq, xml);
  2624. statement.bindString(5, xml);
  2625. CassandraFuture future(cass_session_execute(sessionCache->querySession(), statement));
  2626. future.wait("update stats");
  2627. }
  2628. virtual IPropertyTree *getGraphProgressTree() const
  2629. {
  2630. CassandraStatement graphQuery(sessionCache->prepareStatement("SELECT graphId, subgraphID, creator, progress FROM wuGraphProgress where partition=? and wuid=?;"));
  2631. const char *wuid = queryWuid();
  2632. graphQuery.bindInt32(0, rtlHash32VStr(wuid, 0) % sessionCache->queryPartitions());
  2633. graphQuery.bindString(1, wuid);
  2634. CassandraFuture future(cass_session_execute(sessionCache->querySession(), graphQuery));
  2635. future.wait("getGraphProgress");
  2636. CassandraResult result(cass_future_get_result(future));
  2637. if (!cass_result_row_count(result))
  2638. return NULL;
  2639. Owned<IPTree> progress = createPTree("GraphProgress");
  2640. CassandraIterator rows(cass_iterator_from_result(result));
  2641. while (cass_iterator_next(rows))
  2642. {
  2643. const CassRow *row = cass_iterator_get_row(rows);
  2644. StringBuffer graphName, creator, xml;
  2645. getCassString(graphName, cass_row_get_column(row, 0));
  2646. WUGraphIDType subId = getUnsignedResult(NULL, cass_row_get_column(row, 1));
  2647. getCassString(creator, cass_row_get_column(row, 2));
  2648. getCassString(xml, cass_row_get_column(row, 3));
  2649. if (!progress->hasProp(graphName))
  2650. progress->setPropTree(graphName, createPTree(graphName));
  2651. IPTree *graph = progress->queryPropTree(graphName);
  2652. graph->setPropBool("@stats", true);
  2653. graph->setPropInt("@format", PROGRESS_FORMAT_V);
  2654. IPTree *stats = createPTreeFromXMLString(xml);
  2655. // We could check that atoi(stats->queryName()+2)==subgraphID, and that stats->queryProp(@creator)==creator)....
  2656. graph->addPropTree(stats->queryName(), stats);
  2657. }
  2658. // Now fill in the graph/node states
  2659. CassandraStatement stateQuery(sessionCache->prepareStatement("SELECT graphId, subgraphId, state FROM wuGraphState where partition=? and wuid=?;"));
  2660. stateQuery.bindInt32(0, rtlHash32VStr(wuid, 0) % sessionCache->queryPartitions());
  2661. stateQuery.bindString(1, wuid);
  2662. CassandraFuture stateFuture(cass_session_execute(sessionCache->querySession(), stateQuery));
  2663. stateFuture.wait("getGraphStateProgress");
  2664. CassandraResult stateResult(cass_future_get_result(stateFuture));
  2665. CassandraIterator stateRows(cass_iterator_from_result(stateResult));
  2666. if (cass_result_row_count(stateResult))
  2667. {
  2668. CassandraIterator stateRows(cass_iterator_from_result(stateResult));
  2669. while (cass_iterator_next(stateRows))
  2670. {
  2671. const CassRow *row = cass_iterator_get_row(stateRows);
  2672. StringBuffer graphName;
  2673. getCassString(graphName, cass_row_get_column(row, 0));
  2674. WUGraphIDType subId = getUnsignedResult(NULL, cass_row_get_column(row, 1));
  2675. unsigned state = getUnsignedResult(NULL, cass_row_get_column(row, 2));
  2676. IPTree *node = progress->queryPropTree(graphName);
  2677. if (node)
  2678. {
  2679. if (subId)
  2680. {
  2681. // This is what you might expect it to say...
  2682. //StringBuffer sg("sg");
  2683. //sg.append(subId);
  2684. //node = node->queryPropTree(sg);
  2685. // but in fact the node states are stored in separate elements. I need to see if that is something I broke.
  2686. StringBuffer xpath("node[@id='");
  2687. xpath.append(subId).append("'])");
  2688. node->removeProp(xpath); // Shouldn't be one, just playing safe
  2689. node = node->addPropTree("node", createPTree("node"));
  2690. node->setPropInt("@id", subId);
  2691. node->setPropInt("@_state", state);
  2692. }
  2693. else
  2694. node->setPropInt("@_state", state);
  2695. }
  2696. }
  2697. }
  2698. return progress.getClear();
  2699. }
  2700. protected:
  2701. // Delete child table rows
  2702. void deleteChildren(const char *wuid, CassBatch *useBatch)
  2703. {
  2704. for (const ChildTableInfo * const * table = childTables; *table != NULL; table++)
  2705. sessionCache->deleteChildByWuid(table[0]->mappings, wuid, useBatch);
  2706. }
  2707. // Lazy-populate a portion of WU xml from a child table
  2708. void checkChildLoaded(const ChildTableInfo &childTable) const
  2709. {
  2710. // NOTE - should be called inside critsec
  2711. if (!childLoaded[childTable.index])
  2712. {
  2713. const CassResult* cassResult;
  2714. try
  2715. {
  2716. cassResult = sessionCache->fetchDataForWuid(childTable.mappings, queryWuid(), false);
  2717. }
  2718. catch (IException* e)
  2719. {
  2720. int errorCode = e->errorCode();
  2721. StringBuffer origErrorMsg;
  2722. e->errorMessage(origErrorMsg);
  2723. e->Release();
  2724. const char* tableName = queryTableName(childTable.mappings);
  2725. VStringBuffer newErrorMsg("Failed to read from cassandra table '%s' (Have you run wutool to initialize cassandra repository?), [%s]", tableName, origErrorMsg.str());
  2726. rtlFail(errorCode, newErrorMsg);
  2727. }
  2728. CassandraResult result(cassResult);
  2729. IPTree *results = p->queryPropTree(childTable.parentElement);
  2730. CassandraIterator rows(cass_iterator_from_result(result));
  2731. while (cass_iterator_next(rows))
  2732. {
  2733. CassandraIterator cols(cass_iterator_from_row(cass_iterator_get_row(rows)));
  2734. Owned<IPTree> child;
  2735. if (!results)
  2736. results = ensurePTree(p, childTable.parentElement);
  2737. if (childTable.childElement)
  2738. child.setown(createPTree(childTable.childElement));
  2739. else
  2740. child.set(results);
  2741. unsigned colidx = 2; // We did not fetch wuid or partition
  2742. while (cass_iterator_next(cols))
  2743. {
  2744. assertex(childTable.mappings[colidx].columnName);
  2745. const CassValue *value = cass_iterator_get_column(cols);
  2746. if (value && !cass_value_is_null(value))
  2747. childTable.mappings[colidx].mapper.toXML(child, childTable.mappings[colidx].xpath, value);
  2748. colidx++;
  2749. }
  2750. if (childTable.childElement)
  2751. {
  2752. const char *childName = child->queryName();
  2753. results->addPropTree(childName, child.getClear());
  2754. }
  2755. }
  2756. childLoaded[childTable.index] = true;
  2757. }
  2758. }
  2759. // Update secondary tables (used to search wuids by owner, state, jobname etc)
  2760. void updateSecondaryTable(const char *xpath, const char *prevKey, const char *wuid, CIArrayOf<CassandraStatement> &batch)
  2761. {
  2762. if (prevKey && *prevKey)
  2763. deleteSecondaryByKey(xpath, prevKey, wuid, sessionCache, batch);
  2764. const char *value = p->queryProp(xpath);
  2765. if (value && *value)
  2766. simpleXMLtoCassandra(sessionCache, batch, searchMappings, p, xpath);
  2767. }
  2768. void deleteAppSecondaries(IPTree &pt, const char *wuid, CIArrayOf<CassandraStatement> &batch)
  2769. {
  2770. Owned<IPTreeIterator> apps = pt.getElements("Application");
  2771. ForEach(*apps)
  2772. {
  2773. IPTree &app = apps->query();
  2774. Owned<IPTreeIterator> names = app.getElements("*");
  2775. ForEach(*names)
  2776. {
  2777. IPTree &name = names->query();
  2778. Owned<IPTreeIterator> values = name.getElements("*");
  2779. ForEach(*values)
  2780. {
  2781. IPTree &value = values->query();
  2782. const char *appValue = value.queryProp(".");
  2783. if (appValue && *appValue)
  2784. {
  2785. VStringBuffer xpath("%s/%s/%s", app.queryName(), name.queryName(), value.queryName());
  2786. deleteSecondaryByKey(xpath, appValue, wuid, sessionCache, batch);
  2787. }
  2788. }
  2789. }
  2790. }
  2791. }
  2792. void deleteSecondaries(const char *wuid, CIArrayOf<CassandraStatement> &batch)
  2793. {
  2794. for (const char * const *search = searchPaths; *search; search++)
  2795. deleteSecondaryByKey(*search, p->queryProp(*search), wuid, sessionCache, batch);
  2796. deleteAppSecondaries(*p, wuid, batch);
  2797. Owned<IPropertyTreeIterator> filesRead = &getFilesReadIterator();
  2798. ForEach(*filesRead)
  2799. {
  2800. deleteFileSearch(sessionCache, batch, filesRead->query().queryProp("@name"), true, wuid);
  2801. }
  2802. Owned<IPropertyTreeIterator> filesWritten = &getFileIterator();
  2803. ForEach(*filesWritten)
  2804. {
  2805. deleteFileSearch(sessionCache, batch, filesWritten->query().queryProp("@name"), false, wuid);
  2806. }
  2807. }
  2808. void updateSecondaries(const char *wuid, CIArrayOf<CassandraStatement> &batch)
  2809. {
  2810. const char * const *search;
  2811. for (search = searchPaths; *search; search++)
  2812. updateSecondaryTable(*search, prev->queryProp(*search), wuid, batch);
  2813. for (search = wildSearchPaths; *search; search++)
  2814. {
  2815. const char *value = p->queryProp(*search);
  2816. if (value && *value)
  2817. addUniqueValue(sessionCache, batch, *search, value);
  2818. }
  2819. deleteAppSecondaries(*prev, wuid, batch);
  2820. Owned<IConstWUAppValueIterator> appValues = &getApplicationValues();
  2821. ForEach(*appValues)
  2822. {
  2823. IConstWUAppValue& val=appValues->query();
  2824. addUniqueValue(sessionCache, batch, "Application", val.queryApplication()); // Used to populate droplists of applications
  2825. VStringBuffer key("@@%s", val.queryApplication());
  2826. addUniqueValue(sessionCache, batch, key, val.queryName()); // Used to populate droplists of value names for a given application
  2827. VStringBuffer xpath("Application/%s/%s", val.queryApplication(), val.queryName());
  2828. addUniqueValue(sessionCache, batch, xpath, val.queryValue()); // Used to get lists of values for a given app and name, and for filtering
  2829. simpleXMLtoCassandra(sessionCache, batch, searchMappings, p, xpath);
  2830. }
  2831. Owned<IPropertyTreeIterator> filesRead = &getFilesReadIterator();
  2832. ForEach(*filesRead)
  2833. {
  2834. addFileSearch(sessionCache, batch, filesRead->query().queryProp("@name"), true, wuid);
  2835. }
  2836. Owned<IPropertyTreeIterator> filesWritten = &getFileIterator();
  2837. ForEach(*filesWritten)
  2838. {
  2839. addFileSearch(sessionCache, batch, filesWritten->query().queryProp("@name"), false, wuid);
  2840. }
  2841. }
  2842. // Keep track of previously committed values for fields that we have a secondary table for, so that we can update them appropriately when we commit
  2843. bool trackSecondaryChange(const char *newval, const char *xpath)
  2844. {
  2845. if (!newval)
  2846. newval = "";
  2847. const char *oldval = p->queryProp(xpath);
  2848. if (!oldval)
  2849. oldval = "";
  2850. if (streq(newval, oldval))
  2851. return false; // No change
  2852. bool add;
  2853. if (!prev)
  2854. {
  2855. prev.setown(createPTree());
  2856. add = true;
  2857. }
  2858. else add = !prev->hasProp(xpath);
  2859. if (add)
  2860. {
  2861. const char *tailptr = strrchr(xpath, '/');
  2862. if (tailptr)
  2863. {
  2864. StringBuffer head(tailptr-xpath, xpath);
  2865. ensurePTree(prev, head)->setProp(tailptr+1, oldval);
  2866. }
  2867. else
  2868. prev->setProp(xpath, oldval);
  2869. }
  2870. return true;
  2871. }
  2872. IWUResult *noteDirty(IWUResult *result)
  2873. {
  2874. if (result)
  2875. dirtyResults.append(*LINK(result));
  2876. return result;
  2877. }
  2878. void noteDirty(const char *xpath, const CassandraXmlMapping *table)
  2879. {
  2880. dirtyPaths.setValue(xpath, table);
  2881. }
  2882. Linked<const ICassandraSession> sessionCache;
  2883. mutable bool childLoaded[ChildTablesSize];
  2884. bool allDirty;
  2885. bool stateChanged;
  2886. bool actionChanged;
  2887. Owned<IPTree> prev;
  2888. MapStringTo<const CassandraXmlMapping *> dirtyPaths;
  2889. IArrayOf<IWUResult> dirtyResults;
  2890. Owned<IRemoteConnection> daliLock; // We still use dali for locking
  2891. };
  2892. class CCassandraWorkUnitWatcher : public CWorkUnitWatcher
  2893. {
  2894. public:
  2895. CCassandraWorkUnitWatcher(IWorkUnitSubscriber *_subscriber, WUSubscribeOptions flags, const char *wuid)
  2896. : CWorkUnitWatcher(_subscriber, (WUSubscribeOptions) (flags & SubscribeOptionAbort), wuid)
  2897. {
  2898. if (flags & SubscribeOptionState)
  2899. {
  2900. VStringBuffer xpath("/WorkUnitStates/%s/State", wuid);
  2901. stateId = querySDS().subscribe(xpath.str(), *this);
  2902. }
  2903. if (flags & SubscribeOptionAction)
  2904. {
  2905. VStringBuffer xpath("/WorkUnitStates/%s/Action", wuid);
  2906. actionId = querySDS().subscribe(xpath.str(), *this);
  2907. }
  2908. }
  2909. };
  2910. class CCasssandraWorkUnitFactory : public CWorkUnitFactory, implements ICassandraSession
  2911. {
  2912. IMPLEMENT_IINTERFACE;
  2913. public:
  2914. CCasssandraWorkUnitFactory(const SharedObject *_dll, const IPropertyTree *props) : cluster(cass_cluster_new()), randomizeSuffix(0), randState((unsigned) get_cycles_now()), cacheRetirer(*this)
  2915. {
  2916. StringArray options;
  2917. options.append("write_bytes_high_water_mark=1000000"); // Set the default HWM - workunits get big. This can be overridden by supplied options
  2918. Owned<IPTreeIterator> it = props->getElements("Option");
  2919. ForEach(*it)
  2920. {
  2921. IPTree &item = it->query();
  2922. const char *opt = item.queryProp("@name");
  2923. const char *val = item.queryProp("@value");
  2924. if (opt && val)
  2925. {
  2926. if (strieq(opt, "randomWuidSuffix"))
  2927. randomizeSuffix = atoi(val);
  2928. else if (strieq(opt, "traceLevel"))
  2929. traceLevel = atoi(val);
  2930. else if (strieq(opt, "partitions"))
  2931. {
  2932. partitions = atoi(val); // Note this value is only used when creating a new repo
  2933. if (partitions < MIN_PARTITIONS)
  2934. partitions = MIN_PARTITIONS;
  2935. else if (partitions > MAX_PARTITIONS)
  2936. partitions = MAX_PARTITIONS;
  2937. }
  2938. else if (strieq(opt, "prefixSize"))
  2939. {
  2940. prefixSize = atoi(val); // Note this value is only used when creating a new repo
  2941. if (prefixSize < MIN_PREFIX_SIZE)
  2942. prefixSize = MIN_PREFIX_SIZE;
  2943. else if (prefixSize > MAX_PREFIX_SIZE)
  2944. prefixSize = MAX_PREFIX_SIZE;
  2945. }
  2946. else
  2947. {
  2948. VStringBuffer optstr("%s=%s", opt, val);
  2949. options.append(optstr);
  2950. }
  2951. }
  2952. }
  2953. cluster.setOptions(options);
  2954. if (!cluster.queryKeySpace())
  2955. cluster.setKeySpace("hpcc");
  2956. try
  2957. {
  2958. cluster.connect();
  2959. Owned<IPTree> versionInfo = getVersionInfo();
  2960. if (versionInfo)
  2961. {
  2962. int major = versionInfo->getPropInt("@major", 0);
  2963. int minor = versionInfo->getPropInt("@minor", 0);
  2964. partitions = versionInfo->getPropInt("@numPartitions", DEFAULT_PARTITIONS);
  2965. prefixSize = versionInfo->getPropInt("@searchPrefixSize", DEFAULT_PREFIX_SIZE);
  2966. if (major && minor)
  2967. {
  2968. // Note that if there is no version info at all, we have to assume that the repository is not yet created. We don't fail, otherwise no-one can call createRepository the first time...
  2969. if (major != majorVersion)
  2970. throw makeStringExceptionV(WUERR_WorkunitVersionMismatch, "Incompatible workunit repository version (wanted %d.%d, found %d.%d)", majorVersion, minorVersion, major, minor);
  2971. if (minor != minorVersion)
  2972. {
  2973. if (minor < minorVersion)
  2974. {
  2975. DBGLOG("WARNING: repository version %d.%d is older than current version %d.%d - adding required columns", major, minor, majorVersion, minorVersion);
  2976. switch (minor)
  2977. {
  2978. case 1:
  2979. executeSimpleCommand(querySession(), "ALTER TABLE wuresults ADD graph text;");
  2980. executeSimpleCommand(querySession(), "ALTER TABLE wuresults ADD activity int;");
  2981. executeSimpleCommand(querySession(), "ALTER TABLE wuvariables ADD graph text;");
  2982. executeSimpleCommand(querySession(), "ALTER TABLE wuvariables ADD activity int;");
  2983. executeSimpleCommand(querySession(), "ALTER TABLE wutemporaries ADD graph text;");
  2984. executeSimpleCommand(querySession(), "ALTER TABLE wutemporaries ADD activity int;");
  2985. break;
  2986. }
  2987. createVersionTable(true);
  2988. }
  2989. else
  2990. DBGLOG("WARNING: repository version %d.%d is newer than current version %d.%d - some columns will not be updated", major, minor, majorVersion, minorVersion);
  2991. }
  2992. }
  2993. }
  2994. else
  2995. {
  2996. DBGLOG("WARNING: repository version could not be retrieved (repository not yet created?)");
  2997. cluster.disconnect();
  2998. }
  2999. }
  3000. catch (IException *E)
  3001. {
  3002. EXCLOG(E);
  3003. E->Release();
  3004. DBGLOG("WARNING: repository version could not be retrieved (repository not yet created?)");
  3005. }
  3006. cacheRetirer.start();
  3007. LINK(_dll); // Yes, this leaks. Not really sure how to avoid that.
  3008. }
  3009. ~CCasssandraWorkUnitFactory()
  3010. {
  3011. cacheRetirer.stop();
  3012. cacheRetirer.join();
  3013. if (traceLevel)
  3014. DBGLOG("CCasssandraWorkUnitFactory destroyed");
  3015. }
  3016. virtual bool initializeStore()
  3017. {
  3018. createRepository();
  3019. return true;
  3020. }
  3021. virtual IWorkUnitWatcher *getWatcher(IWorkUnitSubscriber *subscriber, WUSubscribeOptions options, const char *wuid) const
  3022. {
  3023. return new CCassandraWorkUnitWatcher(subscriber, options, wuid);
  3024. }
  3025. virtual CLocalWorkUnit* _createWorkUnit(const char *wuid, ISecManager *secmgr, ISecUser *secuser)
  3026. {
  3027. unsigned suffix;
  3028. unsigned suffixLength;
  3029. if (randomizeSuffix) // May need to enable this option if you are expecting to create hundreds of workunits / second
  3030. {
  3031. suffix = rand_r(&randState);
  3032. suffixLength = randomizeSuffix;
  3033. }
  3034. else
  3035. {
  3036. suffix = 0;
  3037. suffixLength = 0;
  3038. }
  3039. Owned<CassandraPrepared> prepared = prepareStatement("INSERT INTO workunits (partition, wuid) VALUES (?,?) IF NOT EXISTS;");
  3040. for (;;)
  3041. {
  3042. // Create a unique WUID by adding suffixes until we managed to add a new value
  3043. StringBuffer useWuid(wuid);
  3044. if (suffix)
  3045. {
  3046. useWuid.append("-");
  3047. for (unsigned i = 0; i < suffixLength; i++)
  3048. {
  3049. useWuid.appendf("%c", '0'+suffix%10);
  3050. suffix /= 10;
  3051. }
  3052. }
  3053. CassandraStatement statement(prepared.getLink());
  3054. statement.bindInt32(0, rtlHash32VStr(useWuid.str(), 0) % partitions);
  3055. statement.bindString(1, useWuid.str());
  3056. if (traceLevel >= 2)
  3057. DBGLOG("Try creating %s", useWuid.str());
  3058. CassandraFuture future(cass_session_execute(querySession(), statement));
  3059. future.wait("execute");
  3060. CassandraResult result(cass_future_get_result(future));
  3061. if (cass_result_column_count(result)==1)
  3062. {
  3063. // A single column result indicates success, - the single column should be called '[applied]' and have the value 'true'
  3064. // If there are multiple columns it will be '[applied]' (value false) and the fields of the existing row
  3065. Owned<IPTree> wuXML = createPTree(useWuid);
  3066. wuXML->setProp("@xmlns:xsi", "http://www.w3.org/1999/XMLSchema-instance");
  3067. wuXML->setPropInt("@wuidVersion", WUID_VERSION); // we implement the latest version.
  3068. wuXML->setProp("@totalThorTime", ""); // must be non null, otherwise sorting by thor time excludes the values
  3069. Owned<IRemoteConnection> daliLock;
  3070. lockWuid(daliLock, useWuid);
  3071. Owned<CLocalWorkUnit> wu = new CCassandraWorkUnit(this, wuXML.getClear(), secmgr, secuser, daliLock.getClear(), false);
  3072. return wu.getClear();
  3073. }
  3074. suffix = rand_r(&randState);
  3075. if (suffixLength<9)
  3076. suffixLength++;
  3077. }
  3078. }
  3079. virtual CLocalWorkUnit* _openWorkUnit(const char *wuid, ISecManager *secmgr, ISecUser *secuser)
  3080. {
  3081. Owned<IPTree> wuXML = cassandraToWorkunitXML(wuid);
  3082. if (wuXML)
  3083. return new CCassandraWorkUnit(this, wuXML.getClear(), secmgr, secuser, NULL, false);
  3084. else
  3085. return NULL;
  3086. }
  3087. virtual CLocalWorkUnit* _updateWorkUnit(const char *wuid, ISecManager *secmgr, ISecUser *secuser)
  3088. {
  3089. // We still use dali for the locks
  3090. Owned<IRemoteConnection> daliLock;
  3091. lockWuid(daliLock, wuid);
  3092. Owned<IPTree> wuXML = cassandraToWorkunitXML(wuid);
  3093. Owned<CLocalWorkUnit> wu = new CCassandraWorkUnit(this, wuXML.getClear(), secmgr, secuser, daliLock.getClear(), false);
  3094. return wu.getClear();
  3095. }
  3096. virtual bool _restoreWorkUnit(IPTree *_pt, const char *wuid)
  3097. {
  3098. Owned<IPTree> pt(_pt);
  3099. try
  3100. {
  3101. Owned<IRemoteConnection> daliLock;
  3102. lockWuid(daliLock, wuid);
  3103. Owned<IPropertyTree> gProgress = pruneBranch(pt, "GraphProgress[1]");
  3104. Owned<CCassandraWorkUnit> wu = new CCassandraWorkUnit(this, pt.getClear(), NULL, NULL, daliLock.getClear(), true);
  3105. if (gProgress)
  3106. {
  3107. Owned<IPTreeIterator> graphs = gProgress->getElements("*");
  3108. ForEach(*graphs)
  3109. {
  3110. IPTree &graph = graphs->query();
  3111. const char *graphName = graph.queryName();
  3112. Owned<IPTreeIterator> subs = graph.getElements("*");
  3113. ForEach(*subs)
  3114. {
  3115. IPTree &sub = subs->query();
  3116. const char *name=sub.queryName();
  3117. if (name[0]=='s' && name[1]=='g')
  3118. {
  3119. wu->setGraphProgress(&graph, graphName, atoi(name+2), sub.queryProp("@creator"));
  3120. }
  3121. else if (streq(name, "node"))
  3122. {
  3123. unsigned subid = sub.getPropInt("@id");
  3124. if (subid)
  3125. {
  3126. if (sub.hasChildren()) // Old format
  3127. wu->setGraphProgress(&sub, graphName, subid, sub.queryProp("@creator"));
  3128. if (sub.hasProp("@_state"))
  3129. wu->setNodeState(graphName, subid, (WUGraphState) sub.getPropInt("@_state"));
  3130. }
  3131. }
  3132. }
  3133. if (graph.hasProp("@_state"))
  3134. wu->setGraphState(graphName, (WUGraphState) graph.getPropInt("@_state"));
  3135. }
  3136. }
  3137. wu->commit();
  3138. return true;
  3139. }
  3140. catch (IException *E)
  3141. {
  3142. EXCLOG(E);
  3143. ::Release(E);
  3144. return false;
  3145. }
  3146. }
  3147. virtual IWorkUnit * getGlobalWorkUnit(ISecManager *secmgr = NULL, ISecUser *secuser = NULL)
  3148. {
  3149. // MORE - should it check security? Dali version never did...
  3150. Owned<IRemoteConnection> daliLock;
  3151. lockWuid(daliLock, GLOBAL_WORKUNIT);
  3152. Owned<IPTree> wuXML = createPTree(GLOBAL_WORKUNIT);
  3153. Owned<CLocalWorkUnit> wu = new CCassandraWorkUnit(this, wuXML.getClear(), NULL, NULL, daliLock.getClear(), false);
  3154. return &wu->lockRemote(false);
  3155. }
  3156. virtual IConstWorkUnitIterator * getWorkUnitsByOwner(const char * owner, ISecManager *secmgr, ISecUser *secuser)
  3157. {
  3158. return getWorkUnitsByXXX("@submitID", owner, secmgr, secuser);
  3159. }
  3160. virtual IConstWorkUnitIterator * getScheduledWorkUnits(ISecManager *secmgr, ISecUser *secuser)
  3161. {
  3162. return getWorkUnitsByXXX("@state", getWorkunitStateStr(WUStateScheduled), secmgr, secuser); // MORE - there may be more efficient ways to do this?
  3163. }
  3164. virtual IConstWorkUnitIterator * getWorkUnitsSorted(WUSortField sortorder, WUSortField * filters, const void * filterbuf,
  3165. unsigned startOffset, unsigned pageSize, __int64 * cachehint, unsigned *total,
  3166. ISecManager *secmgr, ISecUser *secuser)
  3167. {
  3168. // To assist in the efficient implementation of this function without requiring local sorting and filtering,
  3169. // we maintain a couple of additional search tables in addition to the main workunit table.
  3170. //
  3171. // The workunitsSearch table allows us to map from a given field's value to a workunit - to avoid the need
  3172. // for a second lookup this table contains a copy of all the 'lightweight' fields in the workunit. The table
  3173. // has a partition key of xpath, searchPrefix allowing it to be used for range lookups provided at least
  3174. // 2 characters are provided, while hopefully spreading the load a little between Cassandra partitions.
  3175. //
  3176. // The uniqueValues table is used to track what values are present for some wild-searchable fields, so we do
  3177. // two lookups - one to translate the wildcard to a set, then others to retrieve the wus matching each value
  3178. // in the set. These are done as N parallel reads rather than a single query (which might naively be expected
  3179. // to be more efficient) for two reasons. Firstly, we can get them back sorted that way and merge the results
  3180. // on the fly. Secondly, it is actually more efficient, at least in the case when there are multiple Cassandra
  3181. // partitions, since it in-effect cuts out the step of talking to a coordinator node which would talk to
  3182. // multiple other nodes to get the data.
  3183. //
  3184. // We go to some lengths to avoid post-sorting if we can, but any sort order other than by wuid or totalThorTime
  3185. // will post-sort it. If a post-sort is required, we will fetch up to WUID_LOCALSORT_LIMIT rows, - if there are
  3186. // more then we should fail, and the user should be invited to add filters.
  3187. //
  3188. // We can do at most one 'hard' filter, plus a filter on wuid range - anything else will require post-filtering.
  3189. // Most 'wild' searches can only be done with post-filtering, but some can be translated to multiple hard values
  3190. // using the unique values table. In such cases we merge results in the fly to avoid a post-sort if possible
  3191. //
  3192. // Note that Cassandra does not presently support filtering before returning the values except where a
  3193. // key or secondary index is available - even if ALLOW FILTERING is specified. If it did, some of the post-
  3194. // filtering would be better off done at the Cassandra side.
  3195. //
  3196. // We should encourage the UI to present drop-lists of users for filtering, to avoid the use of wildcard
  3197. // searches just because people can't remember the name.
  3198. //
  3199. // Searching by files probably needs to be done differently - a separate table mapping filenames to wuids.
  3200. // This can perhaps be join-merged if other filters are present. This is still TBD at the moment.
  3201. Owned<CCassandraWuUQueryCacheEntry> cached;
  3202. if (cachehint && *cachehint)
  3203. {
  3204. CriticalBlock b(cacheCrit);
  3205. cached.set(cacheIdMap.getValue(*cachehint));
  3206. }
  3207. if (cached)
  3208. cached->touch();
  3209. else
  3210. cached.setown(new CCassandraWuUQueryCacheEntry());
  3211. if (pageSize > INT_MAX)
  3212. pageSize = INT_MAX;
  3213. const WUSortField *thisFilter = filters;
  3214. IArrayOf<IPostFilter> goodFilters;
  3215. IArrayOf<IPostFilter> wuidFilters;
  3216. IArrayOf<IPostFilter> poorFilters;
  3217. IArrayOf<IPostFilter> fileFilters;
  3218. IArrayOf<IPostFilter> remoteWildFilters;
  3219. Owned<IConstWorkUnitIteratorEx> result;
  3220. WUSortField baseSort = (WUSortField) (sortorder & 0xff);
  3221. StringBuffer thorTimeThreshold;
  3222. bool sortByThorTime = (baseSort == WUSFtotalthortime);
  3223. bool needsPostSort = (baseSort != WUSFwuid && baseSort != WUSFtotalthortime);
  3224. bool sortDescending = (sortorder & WUSFreverse) || needsPostSort;
  3225. if (!result)
  3226. {
  3227. Owned<CassMultiIterator> merger = new CassMultiIterator(needsPostSort ? NULL : cached, 0, 0, sortDescending); // We always merge by wuid (except when we merge by thor time... we turn the compare off then to make it an appender)
  3228. if (startOffset)
  3229. {
  3230. StringBuffer startWuid;
  3231. unsigned found = cached->lookupStartRow(startWuid, thorTimeThreshold, startOffset);
  3232. if (found)
  3233. {
  3234. if (!sortByThorTime)
  3235. {
  3236. if (sortDescending)
  3237. startWuid.setCharAt(startWuid.length()-1, startWuid.charAt(startWuid.length()-1)-1); // we want to find the last wuid BEFORE
  3238. else
  3239. startWuid.append('\x21'); // we want to find the first wuid AFTER. This is printable but not going to be in any wuid
  3240. thorTimeThreshold.clear();
  3241. }
  3242. wuidFilters.append(*new PostFilter(sortorder==WUSFwuid ? WUSFwuid : WUSFwuidhigh, startWuid, true));
  3243. startOffset -= found;
  3244. merger->setStartOffset(found);
  3245. }
  3246. }
  3247. const char *fv = (const char *) filterbuf;
  3248. while (thisFilter && *thisFilter)
  3249. {
  3250. WUSortField field = (WUSortField) (*thisFilter & 0xff);
  3251. bool isWild = (*thisFilter & WUSFwild) != 0;
  3252. switch (field)
  3253. {
  3254. case WUSFappvalue:
  3255. {
  3256. assertex(fv);
  3257. const char *name = fv;
  3258. fv = fv + strlen(fv)+1;
  3259. if (isWild)
  3260. {
  3261. StringBuffer s(fv);
  3262. if (s.charAt(s.length()-1)== '*')
  3263. s.remove(s.length()-1, 1);
  3264. if (s.length())
  3265. remoteWildFilters.append(*new AppValuePostFilter(name, s, true)); // Should we allow wild on the app and/or name too? Not at the moment
  3266. }
  3267. else
  3268. goodFilters.append(*new AppValuePostFilter(name, fv, false));
  3269. break;
  3270. }
  3271. case WUSFuser:
  3272. case WUSFcluster:
  3273. case WUSFjob:
  3274. if (isWild)
  3275. {
  3276. StringBuffer s(fv);
  3277. if (s.charAt(s.length()-1)== '*')
  3278. s.remove(s.length()-1, 1);
  3279. if (s.length())
  3280. remoteWildFilters.append(*new PostFilter(field, s, true)); // Trailing-only wildcards can be done remotely
  3281. }
  3282. else if (strchr(fv, '|'))
  3283. goodFilters.append(*new MultiValuePostFilter(field, fv));
  3284. else
  3285. goodFilters.append(*new PostFilter(field, fv, false));
  3286. break;
  3287. case WUSFstate:
  3288. case WUSFpriority:
  3289. case WUSFprotected:
  3290. // These can't be wild, but are not very good filters
  3291. if (strchr(fv, '|'))
  3292. poorFilters.append(*new MultiValuePostFilter(field, fv));
  3293. else
  3294. poorFilters.append(*new PostFilter(field, fv, false));
  3295. break;
  3296. case WUSFwuid: // Acts as wuidLo when specified as a filter
  3297. case WUSFwuidhigh:
  3298. // Wuid filters can be added to good and poor filters, and to remoteWild if they are done via merged sets rather than ranges...
  3299. if (sortByThorTime)
  3300. remoteWildFilters.append(*new PostFilter(field, fv, true));
  3301. else
  3302. mergeFilter(wuidFilters, field, fv);
  3303. break;
  3304. case WUSFfileread:
  3305. case WUSFfilewritten:
  3306. fileFilters.append(*new PostFilter(field, fv, true));
  3307. break;
  3308. case WUSFtotalthortime:
  3309. // This should be treated as a low value - i.e. return only wu's that took longer than the supplied value
  3310. if (thorTimeThreshold.isEmpty()) // If not a continuation
  3311. formatTimeCollatable(thorTimeThreshold, milliToNano(atoi(fv)), false);
  3312. break;
  3313. case WUSFwildwuid:
  3314. // Translate into a range - note that we only support trailing * wildcard.
  3315. if (fv && *fv)
  3316. {
  3317. StringBuffer s(fv);
  3318. if (s.charAt(s.length()-1)== '*')
  3319. s.remove(s.length()-1, 1);
  3320. if (s.length())
  3321. {
  3322. mergeFilter(wuidFilters, WUSFwuid, s);
  3323. s.append('\x7e'); // '~' - higher than anything that should occur in a wuid (but still printable)
  3324. mergeFilter(wuidFilters, WUSFwuidhigh, s);
  3325. }
  3326. }
  3327. break;
  3328. case WUSFecl: // This is different...
  3329. if (isWild)
  3330. merger->addPostFilter(*new PostFilter(field, fv, true)); // Wildcards on ECL are trailing and leading - no way to do remotely
  3331. else
  3332. goodFilters.append(*new PostFilter(field, fv, false)); // A hard filter on exact ecl match is possible but very unlikely
  3333. default:
  3334. UNSUPPORTED("Workunit filter criteria");
  3335. }
  3336. thisFilter++;
  3337. if (fv)
  3338. fv = fv + strlen(fv)+1;
  3339. }
  3340. if (fileFilters.length())
  3341. {
  3342. // We can't postfilter by these - we COULD in some cases do a join between these and some other filtered set
  3343. // but we will leave that as an exercise to the reader. So if there is a fileFilter, read it first, and turn it into a merge set of the resulting wus.
  3344. // MORE read and written are not the same
  3345. assertex(fileFilters.length()==1); // If we supported more there would be a join phase here
  3346. merger->addPostFilters(goodFilters, 0);
  3347. merger->addPostFilters(poorFilters, 0);
  3348. merger->addPostFilters(remoteWildFilters, 0);
  3349. const IPostFilter &fileFilter = fileFilters.item(0);
  3350. CassandraResult wuids(fetchDataForFiles(fileFilter.queryValue(), wuidFilters, fileFilter.queryField()==WUSFfileread));
  3351. CassandraIterator rows(cass_iterator_from_result(wuids));
  3352. StringBuffer value;
  3353. while (cass_iterator_next(rows))
  3354. {
  3355. const CassRow *row = cass_iterator_get_row(rows);
  3356. getCassString(value.clear(), cass_row_get_column(row, 0));
  3357. merger->addResult(*new CassandraResult(fetchDataForWuid(workunitInfoMappings, value, true)));
  3358. }
  3359. }
  3360. else if (sortByThorTime)
  3361. {
  3362. merger->addPostFilters(goodFilters, 0);
  3363. merger->addPostFilters(poorFilters, 0);
  3364. merger->addPostFilters(remoteWildFilters, 0);
  3365. if (wuidFilters.length())
  3366. {
  3367. // We are doing a continuation of a prior search that is sorted by a searchField, which may not be unique
  3368. // We need two queries - one where searchField==startSearchField and wuid > startWuid,
  3369. // and one where searchField > startSearchField. We know that there are no other filters in play (as Cassandra would not support them)
  3370. // though there may be postfilters
  3371. assertex(wuidFilters.length()==1);
  3372. merger->addResult(*new CassandraResult(fetchMoreDataByThorTime(thorTimeThreshold, wuidFilters.item(0).queryValue(), sortDescending, merger->hasPostFilters() ? 0 : pageSize+startOffset)));
  3373. merger->addResult(*new CassandraResult(fetchMoreDataByThorTime(thorTimeThreshold, NULL, sortDescending, merger->hasPostFilters() ? 0 : pageSize+startOffset)));
  3374. merger->setCompareColumn(-1); // we want to preserve the order of these two results
  3375. }
  3376. else
  3377. merger->addResult(*new CassandraResult(fetchDataByThorTime(thorTimeThreshold, sortDescending, merger->hasPostFilters() ? 0 : pageSize+startOffset)));
  3378. }
  3379. else if (goodFilters.length())
  3380. {
  3381. merger->addPostFilters(goodFilters, 1);
  3382. merger->addPostFilters(poorFilters, 0);
  3383. merger->addPostFilters(remoteWildFilters, 0);
  3384. const IPostFilter &best = goodFilters.item(0);
  3385. const char *queryValue = best.queryValue();
  3386. if (strchr(queryValue, '|'))
  3387. {
  3388. StringArray values;
  3389. values.appendListUniq(queryValue, "|");
  3390. ForEachItemIn(vidx, values)
  3391. {
  3392. const char *thisValue = values.item(vidx);
  3393. if (!isEmptyString(thisValue))
  3394. merger->addResult(*new CassandraResult(fetchDataForKeyWithFilter(best.queryXPath(), thisValue, wuidFilters, sortorder, merger->hasPostFilters() ? 0 : pageSize+startOffset)));
  3395. }
  3396. }
  3397. else
  3398. merger->addResult(*new CassandraResult(fetchDataForKeyWithFilter(best.queryXPath(), best.queryValue(), wuidFilters, sortorder, merger->hasPostFilters() ? 0 : pageSize+startOffset)));
  3399. }
  3400. else if (poorFilters.length())
  3401. {
  3402. merger->addPostFilters(poorFilters, 1);
  3403. merger->addPostFilters(remoteWildFilters, 0);
  3404. const IPostFilter &best= poorFilters.item(0);
  3405. const char *queryValue =best.queryValue();
  3406. if (strchr(queryValue, '|'))
  3407. {
  3408. StringArray values;
  3409. values.appendListUniq(queryValue, "|");
  3410. ForEachItemIn(vidx, values)
  3411. {
  3412. const char *thisValue = values.item(vidx);
  3413. if (!isEmptyString(thisValue))
  3414. merger->addResult(*new CassandraResult(fetchDataForKeyWithFilter(best.queryXPath(), thisValue, wuidFilters, sortorder, merger->hasPostFilters() ? 0 : pageSize+startOffset)));
  3415. }
  3416. }
  3417. else
  3418. merger->addResult(*new CassandraResult(fetchDataForKeyWithFilter(best.queryXPath(), best.queryValue(), wuidFilters, sortorder, merger->hasPostFilters() ? 0 : pageSize+startOffset)));
  3419. }
  3420. else if (remoteWildFilters.length())
  3421. {
  3422. merger->addPostFilters(remoteWildFilters, 1); // Any other filters have to be done locally
  3423. // Convert into a value IN [] which we do via a merge
  3424. // NOTE - If we want sorted by filter (or don't care about sort order), we could do directly as a range - but the wuid range filters then don't work, and the merger would be invalid
  3425. StringArray fieldValues;
  3426. const IPostFilter &best= remoteWildFilters.item(0);
  3427. _getUniqueValues(best.queryXPath(), best.queryValue(), fieldValues);
  3428. ForEachItemIn(idx, fieldValues)
  3429. {
  3430. merger->addResult(*new CassandraResult(fetchDataForKeyWithFilter(best.queryXPath(), fieldValues.item(idx), wuidFilters, sortorder, merger->hasPostFilters() ? 0 : pageSize+startOffset)));
  3431. }
  3432. }
  3433. else
  3434. {
  3435. // If all we have is a wuid range (or nothing), search the wuid table and/or return everything
  3436. for (int i = 0; i < partitions; i++)
  3437. {
  3438. merger->addResult(*new CassandraResult(fetchDataByPartition(workunitInfoMappings, i, wuidFilters, sortorder, merger->hasPostFilters() ? 0 : pageSize+startOffset)));
  3439. }
  3440. }
  3441. // The result we have will be sorted by wuid (ascending or descending)
  3442. if (needsPostSort)
  3443. {
  3444. // A post-sort will be required.
  3445. // Result should be limited in (to CASS_WORKUNIT_POSTSORT_LIMIT * number of results being merged)
  3446. result.setown(new CassPostSortIterator(merger.getClear(), sortorder, pageSize > CASS_WORKUNIT_POSTSORT_LIMIT ? pageSize : CASS_WORKUNIT_POSTSORT_LIMIT));
  3447. cached->setResult(result);
  3448. }
  3449. else
  3450. result.setown(merger.getClear());
  3451. }
  3452. if (startOffset || needsPostSort || result->hasPostFilters() || result->isMerging()) // we need a subpage if we have fetched anything other than exactly the rows requested
  3453. result.setown(new SubPageIterator(result.getClear(), startOffset, pageSize));
  3454. if (cachehint)
  3455. {
  3456. *cachehint = cached->queryHint();
  3457. CriticalBlock b(cacheCrit);
  3458. cacheIdMap.setValue(*cachehint, cached); // Links its parameter
  3459. }
  3460. if (total)
  3461. *total = 0; // We don't know
  3462. return result.getClear();
  3463. }
  3464. virtual StringArray &getUniqueValues(WUSortField field, const char *prefix, StringArray &result) const
  3465. {
  3466. return _getUniqueValues(queryFilterXPath(field), prefix, result);
  3467. }
  3468. virtual unsigned numWorkUnits()
  3469. {
  3470. unsigned total = 0;
  3471. CIArrayOf<CassandraFuture> futures;
  3472. for (int i = 0; i < partitions; i++)
  3473. {
  3474. CassandraStatement statement(prepareStatement("SELECT COUNT(*) FROM workunits where partition=?;"));
  3475. statement.bindInt32(0, i);
  3476. futures.append(*new CassandraFuture(cass_session_execute(querySession(), statement)));
  3477. }
  3478. ForEachItemIn(idx, futures)
  3479. {
  3480. CassandraFuture &future = futures.item(idx);
  3481. future.wait("select count(*)");
  3482. CassandraResult result(cass_future_get_result(future));
  3483. total += getUnsignedResult(NULL, getSingleResult(result));
  3484. }
  3485. return total;
  3486. }
  3487. /*
  3488. virtual bool isAborting(const char *wuid) const - done in the base class using dali
  3489. virtual void clearAborting(const char *wuid) - done in the base class using dali
  3490. */
  3491. virtual WUState waitForWorkUnit(const char * wuid, unsigned timeout, bool compiled, bool returnOnWaitState)
  3492. {
  3493. Owned<WorkUnitWaiter> waiter = new WorkUnitWaiter(wuid, SubscribeOptionState);
  3494. LocalIAbortHandler abortHandler(*waiter);
  3495. CassandraStatement statement(prepareStatement("select state, agentSession from workunits where partition=? and wuid=?;"));
  3496. statement.bindInt32(0, rtlHash32VStr(wuid, 0) % partitions);
  3497. statement.bindString(1, wuid);
  3498. SessionId agent = 0;
  3499. bool agentSessionStopped = false;
  3500. unsigned start = msTick();
  3501. for (;;)
  3502. {
  3503. CassandraFuture future(cass_session_execute(querySession(), statement));
  3504. future.wait("Lookup wu state");
  3505. CassandraResult result(cass_future_get_result(future));
  3506. const CassRow *row = cass_result_first_row(result);
  3507. if (!row)
  3508. return WUStateUnknown;
  3509. const CassValue *stateVal = cass_row_get_column(row, 0);
  3510. if (!stateVal)
  3511. return WUStateUnknown;
  3512. StringBuffer stateStr;
  3513. getCassString(stateStr, stateVal);
  3514. WUState state = getWorkUnitState(stateStr);
  3515. switch (state)
  3516. {
  3517. case WUStateCompiled:
  3518. case WUStateUploadingFiles:
  3519. if (compiled)
  3520. return state;
  3521. break;
  3522. case WUStateCompleted:
  3523. case WUStateFailed:
  3524. case WUStateAborted:
  3525. return state;
  3526. case WUStateWait:
  3527. if (returnOnWaitState)
  3528. return state;
  3529. break;
  3530. case WUStateCompiling:
  3531. case WUStateRunning:
  3532. case WUStateDebugPaused:
  3533. case WUStateDebugRunning:
  3534. case WUStateBlocked:
  3535. case WUStateAborting:
  3536. if (agentSessionStopped)
  3537. {
  3538. reportAbnormalTermination(wuid, state, agent);
  3539. return state;
  3540. }
  3541. if (queryDaliServerVersion().compare("2.1")>=0)
  3542. {
  3543. agent = getUnsignedResult(NULL, cass_row_get_column(row, 1));
  3544. if(agent && querySessionManager().sessionStopped(agent, 0))
  3545. {
  3546. agentSessionStopped = true;
  3547. continue;
  3548. }
  3549. }
  3550. break;
  3551. }
  3552. agentSessionStopped = false; // reset for state changes such as WUStateWait then WUStateRunning again
  3553. unsigned waited = msTick() - start;
  3554. if (timeout==-1 || waited + 20000 < timeout)
  3555. {
  3556. waiter->wait(20000); // recheck state every 20 seconds, in case eclagent has crashed.
  3557. if (waiter->isAborted())
  3558. return WUStateUnknown; // MORE - throw an exception?
  3559. }
  3560. else if (waited > timeout || !waiter->wait(timeout-waited))
  3561. return WUStateUnknown; // MORE - throw an exception?
  3562. }
  3563. }
  3564. virtual WUAction waitForWorkUnitAction(const char * wuid, WUAction original)
  3565. {
  3566. StringAttr origStr(getWorkunitActionStr(original));
  3567. Owned<WorkUnitWaiter> waiter = new WorkUnitWaiter(wuid, SubscribeOptionAction);
  3568. LocalIAbortHandler abortHandler(*waiter);
  3569. CassandraStatement statement(prepareStatement("select action from workunits where partition=? and wuid=?;"));
  3570. statement.bindInt32(0, rtlHash32VStr(wuid, 0) % partitions);
  3571. statement.bindString(1, wuid);
  3572. WUAction ret = WUActionUnknown;
  3573. for (;;)
  3574. {
  3575. CassandraFuture future(cass_session_execute(querySession(), statement));
  3576. future.wait("Lookup wu action");
  3577. CassandraResult result(cass_future_get_result(future));
  3578. const CassRow *row = cass_result_first_row(result);
  3579. if (!row)
  3580. {
  3581. PROGLOG("While waiting for job %s, WU no longer exists", wuid);
  3582. break;
  3583. }
  3584. const CassValue *actionVal = cass_row_get_column(row, 0);
  3585. if (!actionVal)
  3586. {
  3587. PROGLOG("While waiting for job %s, WU action cannot be read", wuid);
  3588. break;
  3589. }
  3590. StringBuffer actionStr;
  3591. getCassString(actionStr, actionVal);
  3592. if (!streq(actionStr, origStr))
  3593. {
  3594. ret = getWorkunitAction(actionStr);
  3595. break;
  3596. }
  3597. waiter->wait(10000); // recheck state every 20 seconds even if no notifications... just because we used to before
  3598. if (waiter->isAborted())
  3599. break;
  3600. }
  3601. return ret;
  3602. }
  3603. unsigned validateRepository(bool fix)
  3604. {
  3605. unsigned errCount = 0;
  3606. // 1. Check that every entry in main wu table has matching entries in secondary tables
  3607. CassandraResult result(fetchData(workunitInfoMappings+1));
  3608. CassandraIterator rows(cass_iterator_from_result(result));
  3609. if (fix)
  3610. {
  3611. // Delete the unique values table - the validate process recreates it afresh
  3612. executeSimpleCommand(querySession(), "TRUNCATE uniqueSearchValues;");
  3613. }
  3614. while (cass_iterator_next(rows))
  3615. {
  3616. Owned<IPTree> wuXML = rowToPTree(NULL, NULL, workunitInfoMappings+1, cass_iterator_get_row(rows));
  3617. const char *wuid = wuXML->queryName();
  3618. // For each search entry, check that we get matching XML
  3619. for (const char * const *search = searchPaths; *search; search++)
  3620. errCount += validateSearch(*search, wuid, wuXML, fix);
  3621. }
  3622. // 2. Check that there are no orphaned entries in search or child tables
  3623. errCount += checkOrphans(searchMappings, 3, fix);
  3624. for (const ChildTableInfo * const * table = childTables; *table != NULL; table++)
  3625. errCount += checkOrphans(table[0]->mappings, 1, fix);
  3626. errCount += checkOrphans(wuGraphProgressMappings, 1, fix);
  3627. errCount += checkOrphans(wuGraphStateMappings, 1, fix);
  3628. errCount += checkOrphans(wuGraphRunningMappings, 1, fix);
  3629. return errCount;
  3630. }
  3631. virtual void deleteRepository(bool recreate)
  3632. {
  3633. // USE WITH CARE!
  3634. CassandraSession s(cass_session_new());
  3635. CassandraFuture future(cass_session_connect(s, cluster.queryCluster()));
  3636. future.wait("connect without keyspace to delete");
  3637. VStringBuffer deleteKeyspace("DROP KEYSPACE IF EXISTS %s;", cluster.queryKeySpace());
  3638. executeSimpleCommand(s, deleteKeyspace);
  3639. s.set(NULL);
  3640. cluster.disconnect();
  3641. if (recreate)
  3642. createRepository();
  3643. }
  3644. virtual void createRepository()
  3645. {
  3646. cluster.disconnect();
  3647. CassandraSession s(cass_session_new());
  3648. CassandraFuture future(cass_session_connect(s, cluster.queryCluster()));
  3649. future.wait("connect without keyspace");
  3650. VStringBuffer create("CREATE KEYSPACE IF NOT EXISTS %s WITH replication = { 'class': 'SimpleStrategy', 'replication_factor': '1' };", cluster.queryKeySpace()); // MORE - options from props? Not 100% sure if they are appropriate.
  3651. executeSimpleCommand(s, create);
  3652. s.set(NULL);
  3653. cluster.connect();
  3654. createVersionTable(false);
  3655. ensureTable(querySession(), workunitsMappings);
  3656. ensureTable(querySession(), searchMappings);
  3657. ensureTable(querySession(), uniqueSearchMappings);
  3658. ensureTable(querySession(), filesSearchMappings);
  3659. for (const ChildTableInfo * const * table = childTables; *table != NULL; table++)
  3660. ensureTable(querySession(), table[0]->mappings);
  3661. ensureTable(querySession(), wuGraphProgressMappings);
  3662. ensureTable(querySession(), wuGraphStateMappings);
  3663. ensureTable(querySession(), wuGraphRunningMappings);
  3664. }
  3665. virtual const char *queryStoreType() const
  3666. {
  3667. return "Cassandra";
  3668. }
  3669. // Interface ICassandraSession
  3670. virtual CassSession *querySession() const { return cluster.querySession(); };
  3671. virtual unsigned queryTraceLevel() const { return traceLevel; };
  3672. virtual CassandraPrepared *prepareStatement(const char *query) const
  3673. {
  3674. return cluster.prepareStatement(query, traceLevel>=2);
  3675. }
  3676. virtual void executeAsync(CIArrayOf<CassandraStatement> &batch, const char *what) const override
  3677. {
  3678. if (batch.ordinality())
  3679. {
  3680. if (queryTraceLevel() > 1)
  3681. DBGLOG("Executing async batch %s", what);
  3682. cluster.executeAsync(batch, what);
  3683. }
  3684. }
  3685. virtual unsigned queryPartitions() const override
  3686. {
  3687. return partitions;
  3688. }
  3689. virtual unsigned queryPrefixSize() const override
  3690. {
  3691. return prefixSize;
  3692. }
  3693. private:
  3694. virtual void executeBatch(CassandraBatch &batch, const char *what) const
  3695. {
  3696. if (queryTraceLevel() > 1)
  3697. DBGLOG("Executing batch %s", what);
  3698. CassandraFuture futureBatch(cass_session_execute_batch(querySession(), batch));
  3699. futureBatch.wait(what);
  3700. }
  3701. void createVersionTable(bool force)
  3702. {
  3703. StringBuffer schema;
  3704. executeSimpleCommand(querySession(), describeTable(versionMappings, schema));
  3705. Owned<IPTree> oldVersion = getVersionInfo();
  3706. if (force || !oldVersion)
  3707. {
  3708. VStringBuffer versionInfo("<Version major='%d' minor='%d' numPartitions='%d' searchPrefixSize='%d'/>", majorVersion, minorVersion, partitions, prefixSize);
  3709. CassandraBatch versionBatch(CASS_BATCH_TYPE_LOGGED);
  3710. Owned<IPTree> pt = createPTreeFromXMLString(versionInfo);
  3711. for (int i = 0; i < DEFAULT_PARTITIONS; i++) // NOTE - version table always has DEFAULT_PARTITIONS partitions
  3712. {
  3713. pt->setPropInt("@partition", i);
  3714. simpleXMLtoCassandra(this, versionBatch, versionMappings, pt, NULL);
  3715. }
  3716. executeBatch(versionBatch, "createVersionTable");
  3717. }
  3718. }
  3719. IPTree *getVersionInfo()
  3720. {
  3721. try
  3722. {
  3723. StringBuffer names;
  3724. StringBuffer tableName;
  3725. getFieldNames(versionMappings, names, tableName);
  3726. VStringBuffer selectQuery("select %s from %s where partition=?;", names.str()+1, tableName.str());
  3727. CassandraStatement select(prepareStatement(selectQuery));
  3728. select.bindInt32(0, rand_r(&randState) % DEFAULT_PARTITIONS); // NOTE - version table always has DEFAULT_PARTITIONS partitions
  3729. CassandraFuture future(cass_session_execute(querySession(), select));
  3730. future.wait("read version");
  3731. CassandraResult result(cass_future_get_result(future));
  3732. const CassRow *row = cass_result_first_row(result);
  3733. if (row)
  3734. return rowToPTree(NULL, NULL, versionMappings, row);
  3735. }
  3736. catch (IException *E)
  3737. {
  3738. EXCLOG(E);
  3739. E->Release();
  3740. }
  3741. catch (...)
  3742. {
  3743. DBGLOG("WARNING: Unknown exception caught while trying to retrieve Cassandra repository version information");
  3744. }
  3745. return NULL;
  3746. }
  3747. bool checkWuExists(const char *wuid)
  3748. {
  3749. CassandraStatement statement(prepareStatement("SELECT COUNT(*) FROM workunits where partition=? and wuid=?;"));
  3750. statement.bindInt32(0, rtlHash32VStr(wuid, 0) % partitions);
  3751. statement.bindString(1, wuid);
  3752. CassandraFuture future(cass_session_execute(querySession(), statement));
  3753. future.wait("select count(*)");
  3754. CassandraResult result(cass_future_get_result(future));
  3755. return getUnsignedResult(NULL, getSingleResult(result)) != 0; // Shouldn't be more than 1, either
  3756. }
  3757. void mergeFilter(IArrayOf<IPostFilter> &filters, WUSortField field, const char *value)
  3758. {
  3759. // Combine multiple filters on wuid - Cassandra doesn't like seeing more than one.
  3760. ForEachItemIn(idx, filters)
  3761. {
  3762. PostFilter &filter = static_cast<PostFilter &>(filters.item(idx));
  3763. if (filter.queryField()==field)
  3764. {
  3765. const char *prevLimit = filter.queryValue();
  3766. int diff = strcmp(prevLimit, value);
  3767. if (diff && ((diff < 0) == (field==WUSFwuid)))
  3768. filter.setValue(value);
  3769. return;
  3770. }
  3771. }
  3772. // Not found - add new filter
  3773. filters.append(*new PostFilter(field, value, true));
  3774. }
  3775. IConstWorkUnitIterator * getWorkUnitsByXXX(const char *xpath, const char *key, ISecManager *secmgr, ISecUser *secuser)
  3776. {
  3777. Owned<CassMultiIterator> merger = new CassMultiIterator(NULL, 0, 0, true); // Merge by wuid
  3778. if (!key || !*key)
  3779. {
  3780. IArrayOf<IPostFilter> wuidFilters;
  3781. for (int i = 0; i < partitions; i++)
  3782. {
  3783. merger->addResult(*new CassandraResult(fetchDataByPartition(workunitInfoMappings, i, wuidFilters)));
  3784. }
  3785. }
  3786. else
  3787. merger->addResult(*new CassandraResult(fetchDataForKey(xpath, key)));
  3788. return createSecureConstWUIterator(merger.getClear(), secmgr, secuser);
  3789. }
  3790. StringArray &_getUniqueValues(const char *xpath, const char *prefix, StringArray &result) const
  3791. {
  3792. if (prefix && strlen(prefix) >= prefixSize)
  3793. {
  3794. CassandraResult r(fetchDataForWildSearch(xpath, prefix, uniqueSearchMappings));
  3795. CassandraIterator rows(cass_iterator_from_result(r));
  3796. StringBuffer value;
  3797. while (cass_iterator_next(rows))
  3798. {
  3799. const CassRow *row = cass_iterator_get_row(rows);
  3800. getCassString(value.clear(), cass_row_get_column(row, 0));
  3801. result.append(value);
  3802. }
  3803. }
  3804. return result;
  3805. }
  3806. unsigned validateSearch(const char *xpath, const char *wuid, IPTree *wuXML, bool fix)
  3807. {
  3808. unsigned errCount = 0;
  3809. const char *childKey = wuXML->queryProp(xpath);
  3810. if (childKey && *childKey)
  3811. {
  3812. CIArrayOf<CassandraStatement> batch;
  3813. CIArrayOf<CassandraStatement> deletes;
  3814. CassandraResult result(fetchDataForKeyAndWuid(xpath, childKey, wuid));
  3815. if (fix)
  3816. simpleXMLtoCassandra(this, batch, uniqueSearchMappings, wuXML, xpath);
  3817. switch (cass_result_row_count(result))
  3818. {
  3819. case 0:
  3820. DBGLOG("Missing search data for %s for wuid=%s key=%s", xpath, wuid, childKey);
  3821. if (fix)
  3822. simpleXMLtoCassandra(this, batch, searchMappings, wuXML, xpath);
  3823. errCount++;
  3824. break;
  3825. case 1:
  3826. {
  3827. Owned<IPTree> secXML = rowToPTree(xpath, childKey, searchMappings+4, cass_result_first_row(result)); // type, prefix, key, and wuid are not returned
  3828. secXML->renameProp("/", wuid);
  3829. if (!areMatchingPTrees(wuXML, secXML))
  3830. {
  3831. DBGLOG("Mismatched search data for %s for wuid %s", xpath, wuid);
  3832. if (fix)
  3833. simpleXMLtoCassandra(this, batch, searchMappings, wuXML, xpath);
  3834. errCount++;
  3835. }
  3836. break;
  3837. }
  3838. default:
  3839. DBGLOG("Multiple secondary data %d for %s for wuid %s", (int) cass_result_row_count(result), xpath, wuid); // This should be impossible!
  3840. if (fix)
  3841. {
  3842. deleteSecondaryByKey(xpath, childKey, wuid, this, deletes);
  3843. simpleXMLtoCassandra(this, batch, searchMappings, wuXML, xpath);
  3844. }
  3845. break;
  3846. }
  3847. if (fix)
  3848. {
  3849. executeAsync(deletes, "delete search");
  3850. executeAsync(batch, "fix search");
  3851. }
  3852. }
  3853. return errCount;
  3854. }
  3855. unsigned checkOrphans(const CassandraXmlMapping *mappings, unsigned wuidIndex, bool fix)
  3856. {
  3857. unsigned errCount = 0;
  3858. CassandraResult result(fetchData(mappings));
  3859. CassandraIterator rows(cass_iterator_from_result(result));
  3860. while (cass_iterator_next(rows))
  3861. {
  3862. const CassRow *row = cass_iterator_get_row(rows);
  3863. StringBuffer wuid;
  3864. getCassString(wuid, cass_row_get_column(row, wuidIndex));
  3865. if (!streq(wuid, GLOBAL_WORKUNIT) && !checkWuExists(wuid))
  3866. {
  3867. DBGLOG("Orphaned data in %s for wuid=%s", queryTableName(mappings), wuid.str());
  3868. if (fix)
  3869. {
  3870. if (wuidIndex)
  3871. {
  3872. CIArrayOf<CassandraStatement> secondaryBatch;
  3873. StringBuffer xpath, fieldValue;
  3874. getCassString(xpath, cass_row_get_column(row, 0));
  3875. getCassString(fieldValue, cass_row_get_column(row, 2));
  3876. deleteSecondaryByKey(xpath, fieldValue, wuid, this, secondaryBatch);
  3877. executeAsync(secondaryBatch, "Delete orphans");
  3878. }
  3879. else
  3880. {
  3881. CassandraBatch batch(CASS_BATCH_TYPE_UNLOGGED);
  3882. deleteChildByWuid(mappings, wuid, batch);
  3883. executeBatch(batch, "Delete orphans");
  3884. }
  3885. }
  3886. errCount++;
  3887. }
  3888. }
  3889. return errCount;
  3890. }
  3891. IPTree *cassandraToWorkunitXML(const char *wuid) const
  3892. {
  3893. CassandraResult result(fetchDataForWuid(workunitsMappings, wuid, false));
  3894. CassandraIterator rows(cass_iterator_from_result(result));
  3895. if (cass_iterator_next(rows)) // should just be one
  3896. {
  3897. Owned<IPTree> wuXML = createPTree(wuid);
  3898. wuXML->setProp("@xmlns:xsi", "http://www.w3.org/1999/XMLSchema-instance");
  3899. CassandraIterator cols(cass_iterator_from_row(cass_iterator_get_row(rows)));
  3900. unsigned colidx = 2; // wuid and partition are not returned
  3901. while (cass_iterator_next(cols))
  3902. {
  3903. assertex(workunitsMappings[colidx].columnName);
  3904. const CassValue *value = cass_iterator_get_column(cols);
  3905. if (value && !cass_value_is_null(value))
  3906. workunitsMappings[colidx].mapper.toXML(wuXML, workunitsMappings[colidx].xpath, value);
  3907. colidx++;
  3908. }
  3909. return wuXML.getClear();
  3910. }
  3911. else
  3912. return NULL;
  3913. }
  3914. // Fetch all rows from a table
  3915. const CassResult *fetchData(const CassandraXmlMapping *mappings) const
  3916. {
  3917. StringBuffer names;
  3918. StringBuffer tableName;
  3919. getFieldNames(mappings, names, tableName);
  3920. VStringBuffer selectQuery("select %s from %s;", names.str()+1, tableName.str());
  3921. if (traceLevel >= 2)
  3922. DBGLOG("%s", selectQuery.str());
  3923. CassandraStatement statement(cass_statement_new(selectQuery.str(), 0));
  3924. return executeQuery(querySession(), statement);
  3925. }
  3926. // Fetch all rows from a single partition of a table
  3927. const CassResult *fetchDataByPartition(const CassandraXmlMapping *mappings, int partition, const IArrayOf<IPostFilter> &wuidFilters, unsigned sortOrder=WUSFwuid|WUSFreverse, unsigned limit=0) const
  3928. {
  3929. StringBuffer names;
  3930. StringBuffer tableName;
  3931. getFieldNames(mappings+1, names, tableName); // Don't fetch partition column
  3932. VStringBuffer selectQuery("select %s from %s where partition=?", names.str()+1, tableName.str());
  3933. ForEachItemIn(idx, wuidFilters)
  3934. {
  3935. const IPostFilter &wuidFilter = wuidFilters.item(idx);
  3936. selectQuery.appendf(" and wuid %s ?", wuidFilter.queryField()==WUSFwuidhigh ? "<=" : ">=");
  3937. }
  3938. switch (sortOrder)
  3939. {
  3940. case WUSFwuid:
  3941. selectQuery.append(" ORDER BY WUID ASC");
  3942. break;
  3943. case WUSFwuid|WUSFreverse:
  3944. // If not wuid, descending, we will have to post-sort
  3945. selectQuery.append(" ORDER BY WUID DESC");
  3946. break;
  3947. default:
  3948. // If not wuid, descending, we will have to post-sort. We still need in wuid desc order for the merge though.
  3949. selectQuery.append(" ORDER BY WUID DESC");
  3950. if (!limit)
  3951. limit = CASS_WORKUNIT_POSTSORT_LIMIT;
  3952. break;
  3953. }
  3954. if (limit)
  3955. selectQuery.appendf(" LIMIT %u", limit);
  3956. selectQuery.append(';');
  3957. CassandraStatement select(prepareStatement(selectQuery));
  3958. select.bindInt32(0, partition);
  3959. ForEachItemIn(idx2, wuidFilters)
  3960. {
  3961. const IPostFilter &wuidFilter = wuidFilters.item(idx2);
  3962. select.bindString(idx2+1, wuidFilter.queryValue());
  3963. }
  3964. return executeQuery(querySession(), select);
  3965. }
  3966. // Fetch matching rows from a child table, or the main wu table
  3967. const CassResult *fetchDataForWuid(const CassandraXmlMapping *mappings, const char *wuid, bool includeWuid) const
  3968. {
  3969. assertex(wuid && *wuid);
  3970. StringBuffer names;
  3971. StringBuffer tableName;
  3972. getFieldNames(mappings + (includeWuid ? 1 : 2), names, tableName); // mappings+2 means we don't return the partition or wuid columns
  3973. VStringBuffer selectQuery("select %s from %s where partition=? and wuid=?;", names.str()+1, tableName.str());
  3974. CassandraStatement select(prepareStatement(selectQuery));
  3975. select.bindInt32(0, rtlHash32VStr(wuid, 0) % partitions);
  3976. select.bindString(1, wuid);
  3977. return executeQuery(querySession(), select);
  3978. }
  3979. const CassResult *fetchDataForWuidAndKey(const CassandraXmlMapping *mappings, const char *wuid, const char *key) const
  3980. {
  3981. assertex(wuid && *wuid);
  3982. StringBuffer names;
  3983. StringBuffer tableName;
  3984. getFieldNames(mappings+2, names, tableName); // mappings+2 means we don't return the partition or wuid columns. We do return the key.
  3985. VStringBuffer selectQuery("select %s from %s where partition=? and wuid=? and %s=?;", names.str()+1, tableName.str(), mappings[2].columnName);
  3986. CassandraStatement select(prepareStatement(selectQuery));
  3987. select.bindInt32(0, rtlHash32VStr(wuid, 0) % partitions);
  3988. select.bindString(1, wuid);
  3989. select.bindString(2, key);
  3990. return executeQuery(querySession(), select);
  3991. }
  3992. // Fetch matching rows from the search table, for all wuids, sorted by wuid
  3993. const CassResult *fetchDataForKey(const char *xpath, const char *key) const
  3994. {
  3995. assertex(key);
  3996. StringBuffer names;
  3997. StringBuffer tableName;
  3998. StringBuffer ucKey(key);
  3999. ucKey.toUpperCase();
  4000. getFieldNames(searchMappings+3, names, tableName); // mappings+3 means we don't return the key columns (xpath, upper(keyPrefix), upper(key))
  4001. VStringBuffer selectQuery("select %s from %s where xpath=? and fieldPrefix=? and fieldValue=?", names.str()+1, tableName.str());
  4002. selectQuery.append(" ORDER BY fieldValue ASC, WUID desc;");
  4003. CassandraStatement select(prepareStatement(selectQuery));
  4004. select.bindString(0, xpath);
  4005. select.bindString_n(1, ucKey, prefixSize);
  4006. select.bindString(2, ucKey);
  4007. return executeQuery(querySession(), select);
  4008. }
  4009. // Fetch matching rows from the search table, for all wuids, sorted by wuid
  4010. const CassResult *fetchDataForKeyWithFilter(const char *xpath, const char *key, const IArrayOf<IPostFilter> &wuidFilters, unsigned sortOrder, unsigned limit) const
  4011. {
  4012. StringBuffer names;
  4013. StringBuffer tableName;
  4014. StringBuffer ucKey(key);
  4015. ucKey.toUpperCase();
  4016. getFieldNames(searchMappings+3, names, tableName); // mappings+3 means we don't return the key columns (xpath, upper(keyPrefix), upper(key))
  4017. VStringBuffer selectQuery("select %s from %s where xpath=? and fieldPrefix=? and fieldValue=?", names.str()+1, tableName.str());
  4018. ForEachItemIn(idx, wuidFilters)
  4019. {
  4020. const IPostFilter &wuidFilter = wuidFilters.item(idx);
  4021. selectQuery.appendf(" and wuid %s ?", wuidFilter.queryField()==WUSFwuidhigh ? "<=" : ">=");
  4022. }
  4023. switch (sortOrder)
  4024. {
  4025. case WUSFwuid:
  4026. selectQuery.append(" ORDER BY fieldValue DESC, WUID ASC");
  4027. break;
  4028. case WUSFwuid|WUSFreverse:
  4029. selectQuery.append(" ORDER BY fieldValue ASC, WUID DESC");
  4030. break;
  4031. default:
  4032. // If not wuid, descending, we will have to post-sort. We still need in wuid desc order for the merge though.
  4033. selectQuery.appendf(" ORDER BY fieldvalue ASC, WUID DESC");
  4034. limit = CASS_WORKUNIT_POSTSORT_LIMIT;
  4035. break;
  4036. }
  4037. if (limit)
  4038. selectQuery.appendf(" LIMIT %u", limit);
  4039. CassandraStatement select(prepareStatement(selectQuery));
  4040. select.bindString(0, xpath);
  4041. select.bindString_n(1, ucKey, prefixSize);
  4042. select.bindString(2, ucKey);
  4043. ForEachItemIn(idx2, wuidFilters)
  4044. {
  4045. const IPostFilter &wuidFilter = wuidFilters.item(idx2);
  4046. select.bindString(3+idx2, wuidFilter.queryValue());
  4047. }
  4048. return executeQuery(querySession(), select);
  4049. }
  4050. // Fetch matching rows from the search or uniqueSearch table, for a given prefix
  4051. const CassResult *fetchDataForWildSearch(const char *xpath, const char *prefix, const CassandraXmlMapping *mappings) const
  4052. {
  4053. assertex(prefix && *prefix);
  4054. StringBuffer names;
  4055. StringBuffer tableName;
  4056. StringBuffer ucKey(prefix);
  4057. ucKey.toUpperCase();
  4058. StringBuffer ucKeyEnd(ucKey);
  4059. size32_t len = ucKeyEnd.length();
  4060. assertex(len);
  4061. ucKeyEnd.setCharAt(len-1, ucKeyEnd.charAt(len-1)+1);
  4062. getFieldNames(mappings+3, names, tableName); // mappings+3 means we don't return the key columns (xpath, upper(keyPrefix), upper(key))
  4063. VStringBuffer selectQuery("select %s from %s where xpath=? and fieldPrefix=? and fieldValue>=? and fieldValue<?;", names.str()+1, tableName.str());
  4064. CassandraStatement select(prepareStatement(selectQuery));
  4065. select.bindString(0, xpath);
  4066. select.bindString_n(1, ucKey, prefixSize);
  4067. select.bindString(2, ucKey);
  4068. select.bindString(3, ucKeyEnd);
  4069. return executeQuery(querySession(), select);
  4070. }
  4071. // Fetch rows from the search table, by thorTime, above a threshold
  4072. const CassResult *fetchDataByThorTime(const char *threshold, bool descending, unsigned limit) const
  4073. {
  4074. StringBuffer names;
  4075. StringBuffer tableName;
  4076. getFieldNames(searchMappings+3, names, tableName); // mappings+3 means we don't return the key columns (xpath, upper(keyPrefix), upper(key))
  4077. VStringBuffer selectQuery("select %s from %s where xpath=? and fieldPrefix=?", names.str()+1, tableName.str());
  4078. if (threshold && *threshold)
  4079. selectQuery.appendf(" where fieldValue >= ?");
  4080. if (descending)
  4081. selectQuery.append(" ORDER BY fieldValue DESC, wuid ASC");
  4082. else
  4083. selectQuery.append(" ORDER BY fieldValue ASC, wuid DESC");
  4084. if (limit)
  4085. selectQuery.appendf(" LIMIT %u", limit);
  4086. selectQuery.append(';');
  4087. CassandraStatement select(prepareStatement(selectQuery));
  4088. select.bindString(0, "@totalThorTime");
  4089. select.bindString_n(1, " ", prefixSize); // This would stop working if we ever set the search prefix to > 8 chars. So don't.
  4090. if (threshold && *threshold)
  4091. select.bindString(2, threshold);
  4092. return executeQuery(querySession(), select);
  4093. }
  4094. // Fetch rows from the search table, continuing a previous query that was sorted by thor time - part one
  4095. // This technique only works for thor time where we have forced to a single partition. Otherwise it gets even more complicated, and not worth it.
  4096. const CassResult *fetchMoreDataByThorTime(const char *threshold, const char *wuid, bool descending, unsigned limit) const
  4097. {
  4098. StringBuffer names;
  4099. StringBuffer tableName;
  4100. getFieldNames(searchMappings+3, names, tableName); // mappings+3 means we don't return the key columns (xpath, upper(keyPrefix), upper(key))
  4101. const char *wuidTest;
  4102. const char *fieldTest;
  4103. if (descending)
  4104. {
  4105. wuidTest = ">";
  4106. fieldTest = wuid ? "=" : "<";
  4107. }
  4108. else
  4109. {
  4110. wuidTest = "<";
  4111. fieldTest = wuid ? "=" : ">";
  4112. }
  4113. VStringBuffer selectQuery("select %s from %s where xpath=? and fieldPrefix=? and fieldValue %s ?", names.str()+1, tableName.str(), fieldTest);
  4114. if (wuid)
  4115. selectQuery.appendf(" and wuid %s ?", wuidTest);
  4116. if (descending)
  4117. selectQuery.append(" ORDER BY fieldValue DESC, WUID ASC");
  4118. else
  4119. selectQuery.append(" ORDER BY fieldValue ASC, WUID DESC");
  4120. if (limit)
  4121. selectQuery.appendf(" LIMIT %u", limit);
  4122. selectQuery.append(';');
  4123. CassandraStatement select(prepareStatement(selectQuery));
  4124. select.bindString(0, "@totalThorTime");
  4125. select.bindString_n(1, threshold, prefixSize);
  4126. select.bindString(2, threshold);
  4127. if (wuid)
  4128. select.bindString(3, wuid);
  4129. return executeQuery(querySession(), select);
  4130. }
  4131. // Fetch rows from the file search table (covers files read and files written)
  4132. const CassResult *fetchDataForFiles(const char *name, const IArrayOf<IPostFilter> &wuidFilters, bool read) const
  4133. {
  4134. StringBuffer names;
  4135. StringBuffer tableName;
  4136. getFieldNames(filesSearchMappings+2, names, tableName); // mappings+2 means we don't return the key columns (name and readmode)
  4137. VStringBuffer selectQuery("select %s from %s where name=? and read=?", names.str()+1, tableName.str());
  4138. ForEachItemIn(idx, wuidFilters)
  4139. {
  4140. const IPostFilter &wuidFilter = wuidFilters.item(idx);
  4141. selectQuery.appendf(" and wuid %s ?", wuidFilter.queryField()==WUSFwuidhigh ? "<=" : ">=");
  4142. }
  4143. CassandraStatement select(prepareStatement(selectQuery));
  4144. select.bindString(0, name);
  4145. select.bindBool(1, read ? cass_true : cass_false);
  4146. ForEachItemIn(idx2, wuidFilters)
  4147. {
  4148. const IPostFilter &wuidFilter = wuidFilters.item(idx2);
  4149. select.bindString(idx2+2, wuidFilter.queryValue());
  4150. }
  4151. return executeQuery(querySession(), select);
  4152. }
  4153. // Fetch matching rows from the search table, for a single wuid
  4154. const CassResult *fetchDataForKeyAndWuid(const char *xpath, const char *key, const char *wuid) const
  4155. {
  4156. assertex(key);
  4157. StringBuffer names;
  4158. StringBuffer tableName;
  4159. StringBuffer ucKey(key);
  4160. ucKey.toUpperCase();
  4161. getFieldNames(searchMappings+4, names, tableName); // mappings+4 means we don't return the key columns (xpath, upper(keyPrefix), upper(key), and wuid)
  4162. VStringBuffer selectQuery("select %s from %s where xpath=? and fieldPrefix=? and fieldValue =? and wuid=?;", names.str()+1, tableName.str());
  4163. CassandraStatement select(prepareStatement(selectQuery));
  4164. select.bindString(0, xpath);
  4165. select.bindString_n(1, ucKey, prefixSize);
  4166. select.bindString(2, ucKey);
  4167. select.bindString(3, wuid);
  4168. return executeQuery(querySession(), select);
  4169. }
  4170. // Delete matching rows from a child table
  4171. virtual void deleteChildByWuid(const CassandraXmlMapping *mappings, const char *wuid, CassBatch *batch) const
  4172. {
  4173. StringBuffer names;
  4174. StringBuffer tableName;
  4175. getFieldNames(mappings, names, tableName);
  4176. VStringBuffer insertQuery("DELETE from %s where partition=? and wuid=?;", tableName.str());
  4177. CassandraStatement update(prepareStatement(insertQuery));
  4178. update.bindInt32(0, rtlHash32VStr(wuid, 0) % partitions);
  4179. update.bindString(1, wuid);
  4180. check(cass_batch_add_statement(batch, update));
  4181. }
  4182. unsigned retireCache()
  4183. {
  4184. CriticalBlock b(cacheCrit); // Is this too coarse-grained?
  4185. unsigned expires = CASS_WU_QUERY_EXPIRES;
  4186. unsigned now = msTick();
  4187. ICopyArrayOf<CCassandraWuUQueryCacheEntry> goers;
  4188. HashIterator iter(cacheIdMap);
  4189. ForEach(iter)
  4190. {
  4191. CCassandraWuUQueryCacheEntry *entry = cacheIdMap.mapToValue(&iter.query());
  4192. unsigned age = now - entry->queryLastAccess();
  4193. int ttl = CASS_WU_QUERY_EXPIRES-age;
  4194. if (ttl<= 0)
  4195. goers.append(*entry);
  4196. else if (ttl< expires)
  4197. expires = ttl;
  4198. }
  4199. ForEachItemIn(idx, goers)
  4200. {
  4201. DBGLOG("Expiring cache entry %p", &goers.item(idx));
  4202. cacheIdMap.remove(goers.item(idx).queryHint());
  4203. }
  4204. return expires;
  4205. }
  4206. class CacheRetirer : public Thread
  4207. {
  4208. public:
  4209. CacheRetirer(CCasssandraWorkUnitFactory &_parent) : Thread("WorkunitListCacheRetirer"), parent(_parent)
  4210. {
  4211. stopping = false;
  4212. }
  4213. virtual int run()
  4214. {
  4215. while (!stopping)
  4216. {
  4217. unsigned delay = parent.retireCache();
  4218. sem.wait(delay);
  4219. }
  4220. return 0;
  4221. }
  4222. void stop()
  4223. {
  4224. stopping = true;
  4225. sem.signal();
  4226. }
  4227. private:
  4228. Semaphore sem;
  4229. CCasssandraWorkUnitFactory &parent;
  4230. bool stopping;
  4231. } cacheRetirer;
  4232. unsigned randomizeSuffix;
  4233. unsigned traceLevel;
  4234. unsigned randState;
  4235. int partitions = DEFAULT_PARTITIONS;
  4236. int prefixSize = DEFAULT_PREFIX_SIZE;
  4237. CassandraClusterSession cluster;
  4238. mutable CriticalSection cacheCrit;
  4239. mutable MapXToMyClass<__uint64, __uint64, CCassandraWuUQueryCacheEntry> cacheIdMap;
  4240. };
  4241. } // namespace
  4242. extern "C" EXPORT IWorkUnitFactory *createWorkUnitFactory(const SharedObject *dll, const IPropertyTree *props)
  4243. {
  4244. return new cassandraembed::CCasssandraWorkUnitFactory(dll, props);
  4245. }