cassandrawu.cpp 175 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323132413251326132713281329133013311332133313341335133613371338133913401341134213431344134513461347134813491350135113521353135413551356135713581359136013611362136313641365136613671368136913701371137213731374137513761377137813791380138113821383138413851386138713881389139013911392139313941395139613971398139914001401140214031404140514061407140814091410141114121413141414151416141714181419142014211422142314241425142614271428142914301431143214331434143514361437143814391440144114421443144414451446144714481449145014511452145314541455145614571458145914601461146214631464146514661467146814691470147114721473147414751476147714781479148014811482148314841485148614871488148914901491149214931494149514961497149814991500150115021503150415051506150715081509151015111512151315141515151615171518151915201521152215231524152515261527152815291530153115321533153415351536153715381539154015411542154315441545154615471548154915501551155215531554155515561557155815591560156115621563156415651566156715681569157015711572157315741575157615771578157915801581158215831584158515861587158815891590159115921593159415951596159715981599160016011602160316041605160616071608160916101611161216131614161516161617161816191620162116221623162416251626162716281629163016311632163316341635163616371638163916401641164216431644164516461647164816491650165116521653165416551656165716581659166016611662166316641665166616671668166916701671167216731674167516761677167816791680168116821683168416851686168716881689169016911692169316941695169616971698169917001701170217031704170517061707170817091710171117121713171417151716171717181719172017211722172317241725172617271728172917301731173217331734173517361737173817391740174117421743174417451746174717481749175017511752175317541755175617571758175917601761176217631764176517661767176817691770177117721773177417751776177717781779178017811782178317841785178617871788178917901791179217931794179517961797179817991800180118021803180418051806180718081809181018111812181318141815181618171818181918201821182218231824182518261827182818291830183118321833183418351836183718381839184018411842184318441845184618471848184918501851185218531854185518561857185818591860186118621863186418651866186718681869187018711872187318741875187618771878187918801881188218831884188518861887188818891890189118921893189418951896189718981899190019011902190319041905190619071908190919101911191219131914191519161917191819191920192119221923192419251926192719281929193019311932193319341935193619371938193919401941194219431944194519461947194819491950195119521953195419551956195719581959196019611962196319641965196619671968196919701971197219731974197519761977197819791980198119821983198419851986198719881989199019911992199319941995199619971998199920002001200220032004200520062007200820092010201120122013201420152016201720182019202020212022202320242025202620272028202920302031203220332034203520362037203820392040204120422043204420452046204720482049205020512052205320542055205620572058205920602061206220632064206520662067206820692070207120722073207420752076207720782079208020812082208320842085208620872088208920902091209220932094209520962097209820992100210121022103210421052106210721082109211021112112211321142115211621172118211921202121212221232124212521262127212821292130213121322133213421352136213721382139214021412142214321442145214621472148214921502151215221532154215521562157215821592160216121622163216421652166216721682169217021712172217321742175217621772178217921802181218221832184218521862187218821892190219121922193219421952196219721982199220022012202220322042205220622072208220922102211221222132214221522162217221822192220222122222223222422252226222722282229223022312232223322342235223622372238223922402241224222432244224522462247224822492250225122522253225422552256225722582259226022612262226322642265226622672268226922702271227222732274227522762277227822792280228122822283228422852286228722882289229022912292229322942295229622972298229923002301230223032304230523062307230823092310231123122313231423152316231723182319232023212322232323242325232623272328232923302331233223332334233523362337233823392340234123422343234423452346234723482349235023512352235323542355235623572358235923602361236223632364236523662367236823692370237123722373237423752376237723782379238023812382238323842385238623872388238923902391239223932394239523962397239823992400240124022403240424052406240724082409241024112412241324142415241624172418241924202421242224232424242524262427242824292430243124322433243424352436243724382439244024412442244324442445244624472448244924502451245224532454245524562457245824592460246124622463246424652466246724682469247024712472247324742475247624772478247924802481248224832484248524862487248824892490249124922493249424952496249724982499250025012502250325042505250625072508250925102511251225132514251525162517251825192520252125222523252425252526252725282529253025312532253325342535253625372538253925402541254225432544254525462547254825492550255125522553255425552556255725582559256025612562256325642565256625672568256925702571257225732574257525762577257825792580258125822583258425852586258725882589259025912592259325942595259625972598259926002601260226032604260526062607260826092610261126122613261426152616261726182619262026212622262326242625262626272628262926302631263226332634263526362637263826392640264126422643264426452646264726482649265026512652265326542655265626572658265926602661266226632664266526662667266826692670267126722673267426752676267726782679268026812682268326842685268626872688268926902691269226932694269526962697269826992700270127022703270427052706270727082709271027112712271327142715271627172718271927202721272227232724272527262727272827292730273127322733273427352736273727382739274027412742274327442745274627472748274927502751275227532754275527562757275827592760276127622763276427652766276727682769277027712772277327742775277627772778277927802781278227832784278527862787278827892790279127922793279427952796279727982799280028012802280328042805280628072808280928102811281228132814281528162817281828192820282128222823282428252826282728282829283028312832283328342835283628372838283928402841284228432844284528462847284828492850285128522853285428552856285728582859286028612862286328642865286628672868286928702871287228732874287528762877287828792880288128822883288428852886288728882889289028912892289328942895289628972898289929002901290229032904290529062907290829092910291129122913291429152916291729182919292029212922292329242925292629272928292929302931293229332934293529362937293829392940294129422943294429452946294729482949295029512952295329542955295629572958295929602961296229632964296529662967296829692970297129722973297429752976297729782979298029812982298329842985298629872988298929902991299229932994299529962997299829993000300130023003300430053006300730083009301030113012301330143015301630173018301930203021302230233024302530263027302830293030303130323033303430353036303730383039304030413042304330443045304630473048304930503051305230533054305530563057305830593060306130623063306430653066306730683069307030713072307330743075307630773078307930803081308230833084308530863087308830893090309130923093309430953096309730983099310031013102310331043105310631073108310931103111311231133114311531163117311831193120312131223123312431253126312731283129313031313132313331343135313631373138313931403141314231433144314531463147314831493150315131523153315431553156315731583159316031613162316331643165316631673168316931703171317231733174317531763177317831793180318131823183318431853186318731883189319031913192319331943195319631973198319932003201320232033204320532063207320832093210321132123213321432153216321732183219322032213222322332243225322632273228322932303231323232333234323532363237323832393240324132423243324432453246324732483249325032513252325332543255325632573258325932603261326232633264326532663267326832693270327132723273327432753276327732783279328032813282328332843285328632873288328932903291329232933294329532963297329832993300330133023303330433053306330733083309331033113312331333143315331633173318331933203321332233233324332533263327332833293330333133323333333433353336333733383339334033413342334333443345334633473348334933503351335233533354335533563357335833593360336133623363336433653366336733683369337033713372337333743375337633773378337933803381338233833384338533863387338833893390339133923393339433953396339733983399340034013402340334043405340634073408340934103411341234133414341534163417341834193420342134223423342434253426342734283429343034313432343334343435343634373438343934403441344234433444344534463447344834493450345134523453345434553456345734583459346034613462346334643465346634673468346934703471347234733474347534763477347834793480348134823483348434853486348734883489349034913492349334943495349634973498349935003501350235033504350535063507350835093510351135123513351435153516351735183519352035213522352335243525352635273528352935303531353235333534353535363537353835393540354135423543354435453546354735483549355035513552355335543555355635573558355935603561356235633564356535663567356835693570357135723573357435753576357735783579358035813582358335843585358635873588358935903591359235933594359535963597359835993600360136023603360436053606360736083609361036113612361336143615361636173618361936203621362236233624362536263627362836293630363136323633363436353636363736383639364036413642364336443645364636473648364936503651365236533654365536563657365836593660366136623663366436653666366736683669367036713672367336743675367636773678367936803681368236833684368536863687368836893690369136923693369436953696369736983699370037013702370337043705370637073708370937103711371237133714371537163717371837193720372137223723372437253726372737283729373037313732373337343735373637373738373937403741374237433744374537463747374837493750375137523753375437553756375737583759376037613762376337643765376637673768376937703771377237733774377537763777377837793780378137823783378437853786378737883789379037913792379337943795379637973798379938003801380238033804380538063807380838093810381138123813381438153816381738183819382038213822382338243825382638273828382938303831383238333834383538363837383838393840384138423843384438453846384738483849385038513852385338543855385638573858385938603861386238633864386538663867386838693870387138723873387438753876387738783879388038813882388338843885388638873888388938903891389238933894389538963897389838993900390139023903390439053906390739083909391039113912391339143915391639173918391939203921392239233924392539263927392839293930393139323933393439353936393739383939394039413942394339443945394639473948394939503951395239533954395539563957395839593960396139623963396439653966396739683969397039713972397339743975397639773978397939803981398239833984398539863987398839893990399139923993399439953996399739983999400040014002400340044005400640074008400940104011401240134014401540164017401840194020402140224023402440254026402740284029403040314032403340344035403640374038403940404041404240434044404540464047404840494050405140524053405440554056405740584059406040614062406340644065406640674068406940704071407240734074407540764077407840794080408140824083408440854086408740884089409040914092409340944095409640974098409941004101410241034104410541064107410841094110411141124113411441154116411741184119412041214122412341244125412641274128412941304131413241334134413541364137413841394140414141424143414441454146414741484149415041514152415341544155415641574158415941604161416241634164416541664167416841694170417141724173417441754176417741784179418041814182418341844185418641874188418941904191419241934194419541964197419841994200420142024203420442054206420742084209421042114212421342144215421642174218421942204221422242234224422542264227422842294230423142324233423442354236423742384239424042414242424342444245424642474248424942504251425242534254425542564257425842594260426142624263426442654266426742684269427042714272427342744275427642774278427942804281428242834284428542864287428842894290429142924293429442954296429742984299430043014302430343044305430643074308430943104311431243134314431543164317431843194320432143224323432443254326432743284329433043314332433343344335433643374338433943404341434243434344434543464347434843494350435143524353435443554356435743584359436043614362436343644365436643674368436943704371437243734374437543764377437843794380438143824383
  1. /*##############################################################################
  2. HPCC SYSTEMS software Copyright (C) 2013 HPCC Systems.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. ############################################################################## */
  13. #include "platform.h"
  14. #include "cassandra.h"
  15. #include "jexcept.hpp"
  16. #include "jthread.hpp"
  17. #include "hqlplugins.hpp"
  18. #include "deftype.hpp"
  19. #include "eclhelper.hpp"
  20. #include "eclrtl.hpp"
  21. #include "eclrtl_imp.hpp"
  22. #include "rtlds_imp.hpp"
  23. #include "rtlfield.hpp"
  24. #include "rtlembed.hpp"
  25. #include "roxiemem.hpp"
  26. #include "nbcd.hpp"
  27. #include "jsort.hpp"
  28. #include "jptree.hpp"
  29. #include "jlzw.hpp"
  30. #include "jregexp.hpp"
  31. #include "dadfs.hpp"
  32. #include "dasds.hpp"
  33. #include "wuerror.hpp"
  34. #include "workunit.hpp"
  35. #include "workunit.ipp"
  36. #include "cassandraembed.hpp"
  37. #define EXPORT DECL_EXPORT
  38. namespace cassandraembed {
  39. #define CASS_WU_QUERY_EXPIRES (1000*60*5)
  40. #define CASS_WORKUNIT_POSTSORT_LIMIT 10000
  41. #define CASS_SEARCH_PREFIX_SIZE 2
  42. #define NUM_PARTITIONS 2
  43. static const CassValue *getSingleResult(const CassResult *result)
  44. {
  45. const CassRow *row = cass_result_first_row(result);
  46. if (row)
  47. return cass_row_get_column(row, 0);
  48. else
  49. return NULL;
  50. }
  51. static StringBuffer &getCassString(StringBuffer &str, const CassValue *value)
  52. {
  53. const char *output;
  54. size_t length;
  55. check(cass_value_get_string(value, &output, &length));
  56. return str.append(length, output);
  57. }
  58. struct CassandraColumnMapper
  59. {
  60. virtual ~CassandraColumnMapper() {}
  61. virtual IPTree *toXML(IPTree *row, const char *name, const CassValue *value) = 0;
  62. virtual bool fromXML(CassandraStatement *statement, unsigned idx, IPTree *row, const char *name, const char *userVal) = 0;
  63. };
  64. static class StringColumnMapper : implements CassandraColumnMapper
  65. {
  66. public:
  67. virtual IPTree *toXML(IPTree *row, const char *name, const CassValue *value)
  68. {
  69. rtlDataAttr str;
  70. unsigned chars;
  71. getUTF8Result(NULL, value, chars, str.refstr());
  72. StringAttr s(str.getstr(), rtlUtf8Size(chars, str.getstr()));
  73. row->setProp(name, s);
  74. return row;
  75. }
  76. virtual bool fromXML(CassandraStatement *statement, unsigned idx, IPTree *row, const char *name, const char *userVal)
  77. {
  78. const char *value = row->queryProp(name);
  79. if (!value)
  80. return false;
  81. if (statement)
  82. statement->bindString(idx, value);
  83. return true;
  84. }
  85. } stringColumnMapper;
  86. static class RequiredStringColumnMapper : public StringColumnMapper
  87. {
  88. public:
  89. virtual bool fromXML(CassandraStatement *statement, unsigned idx, IPTree *row, const char *name, const char *userVal)
  90. {
  91. const char *value = row->queryProp(name);
  92. if (!value)
  93. value = "";
  94. if (statement)
  95. statement->bindString(idx, value);
  96. return true;
  97. }
  98. } requiredStringColumnMapper;
  99. static class SuppliedStringColumnMapper : public StringColumnMapper
  100. {
  101. public:
  102. virtual bool fromXML(CassandraStatement *statement, unsigned idx, IPTree *row, const char *, const char *userVal)
  103. {
  104. if (statement)
  105. statement->bindString(idx, userVal);
  106. return true;
  107. }
  108. } suppliedStringColumnMapper;
  109. static class BlobColumnMapper : implements CassandraColumnMapper
  110. {
  111. public:
  112. virtual IPTree *toXML(IPTree *row, const char *name, const CassValue *value)
  113. {
  114. rtlDataAttr str;
  115. unsigned chars;
  116. getDataResult(NULL, value, chars, str.refdata());
  117. row->setPropBin(name, chars, str.getbytes());
  118. return row;
  119. }
  120. virtual bool fromXML(CassandraStatement *statement, unsigned idx, IPTree *row, const char *name, const char * userVal)
  121. {
  122. MemoryBuffer value;
  123. row->getPropBin(name, value);
  124. if (value.length())
  125. {
  126. if (statement)
  127. statement->bindBytes(idx, (const cass_byte_t *) value.toByteArray(), value.length());
  128. return true;
  129. }
  130. else
  131. return false;
  132. }
  133. } blobColumnMapper;
  134. static class compressTreeColumnMapper : implements CassandraColumnMapper
  135. {
  136. public:
  137. virtual IPTree *toXML(IPTree *row, const char *name, const CassValue *value)
  138. {
  139. rtlDataAttr str;
  140. unsigned chars;
  141. getDataResult(NULL, value, chars, str.refdata());
  142. if (chars)
  143. {
  144. MemoryBuffer compressed, decompressed;
  145. compressed.setBuffer(chars, str.getbytes(), false);
  146. decompressToBuffer(decompressed, compressed);
  147. Owned<IPTree> p = createPTree(decompressed);
  148. row->setPropTree(name, p.getClear());
  149. }
  150. return row;
  151. }
  152. virtual bool fromXML(CassandraStatement *statement, unsigned idx, IPTree *row, const char *name, const char * userVal)
  153. {
  154. IPTree *child = row->queryPropTree(name);
  155. if (child && child->hasChildren())
  156. {
  157. if (statement)
  158. {
  159. MemoryBuffer decompressed, compressed;
  160. child->serialize(decompressed);
  161. compressToBuffer(compressed, decompressed.length(), decompressed.toByteArray());
  162. statement->bindBytes(idx, (const cass_byte_t *) compressed.toByteArray(), compressed.length());
  163. }
  164. return true;
  165. }
  166. else
  167. return false;
  168. }
  169. } compressTreeColumnMapper;
  170. static class TimeStampColumnMapper : implements CassandraColumnMapper
  171. {
  172. public:
  173. virtual IPTree *toXML(IPTree *row, const char *name, const CassValue *value)
  174. {
  175. // never fetched (that may change?)
  176. return row;
  177. }
  178. virtual bool fromXML(CassandraStatement *statement, unsigned idx, IPTree *row, const char *name, const char * userVal)
  179. {
  180. // never bound, but does need to be included in the ?
  181. return true;
  182. }
  183. } timestampColumnMapper;
  184. static class HashRootNameColumnMapper : implements CassandraColumnMapper
  185. {
  186. public:
  187. virtual IPTree *toXML(IPTree *row, const char *name, const CassValue *value)
  188. {
  189. throwUnexpected(); // we never return the partition column
  190. }
  191. virtual bool fromXML(CassandraStatement *statement, unsigned idx, IPTree *row, const char *name, const char * userVal)
  192. {
  193. if (statement)
  194. {
  195. int hash = rtlHash32VStr(row->queryName(), 0) % NUM_PARTITIONS;
  196. statement->bindInt32(idx, hash);
  197. }
  198. return true;
  199. }
  200. } hashRootNameColumnMapper;
  201. static class RootNameColumnMapper : implements CassandraColumnMapper
  202. {
  203. public:
  204. virtual IPTree *toXML(IPTree *row, const char *name, const CassValue *value)
  205. {
  206. rtlDataAttr str;
  207. unsigned chars;
  208. getUTF8Result(NULL, value, chars, str.refstr());
  209. StringAttr s(str.getstr(), rtlUtf8Size(chars, str.getstr()));
  210. row->renameProp("/", s);
  211. return row;
  212. }
  213. virtual bool fromXML(CassandraStatement *statement, unsigned idx, IPTree *row, const char *name, const char * userVal)
  214. {
  215. if (statement)
  216. {
  217. const char *value = row->queryName();
  218. statement->bindString(idx, value);
  219. }
  220. return true;
  221. }
  222. } rootNameColumnMapper;
  223. // WuidColumnMapper is used for columns containing a wuid that is NOT in the resulting XML - it
  224. // is an error to try to map such a column to/from the XML representation
  225. static class WuidColumnMapper : implements CassandraColumnMapper
  226. {
  227. public:
  228. virtual IPTree *toXML(IPTree *row, const char *name, const CassValue *value)
  229. {
  230. throwUnexpected();
  231. }
  232. virtual bool fromXML(CassandraStatement *statement, unsigned idx, IPTree *row, const char *name, const char * userVal)
  233. {
  234. throwUnexpected();
  235. }
  236. } wuidColumnMapper;
  237. static class BoolColumnMapper : implements CassandraColumnMapper
  238. {
  239. public:
  240. virtual IPTree *toXML(IPTree *row, const char *name, const CassValue *value)
  241. {
  242. row->addPropBool(name, getBooleanResult(NULL, value));
  243. return row;
  244. }
  245. virtual bool fromXML(CassandraStatement *statement, unsigned idx, IPTree *row, const char *name, const char * userVal)
  246. {
  247. if (row->hasProp(name))
  248. {
  249. if (statement)
  250. {
  251. bool value = row->getPropBool(name, false);
  252. statement->bindBool(idx, value ? cass_true : cass_false);
  253. }
  254. return true;
  255. }
  256. else
  257. return false;
  258. }
  259. } boolColumnMapper;
  260. static class PrefixSearchColumnMapper : implements CassandraColumnMapper
  261. {
  262. public:
  263. virtual IPTree *toXML(IPTree *row, const char *name, const CassValue *value)
  264. {
  265. return row;
  266. }
  267. virtual bool fromXML(CassandraStatement *statement, unsigned idx, IPTree *row, const char *, const char *userVal)
  268. {
  269. return _fromXML(statement, idx, row, userVal, CASS_SEARCH_PREFIX_SIZE, true);
  270. }
  271. protected:
  272. static bool _fromXML(CassandraStatement *statement, unsigned idx, IPTree *row, const char *xpath, unsigned prefixLength, bool uc)
  273. {
  274. const char *columnVal = row->queryProp(xpath);
  275. if (columnVal)
  276. {
  277. if (statement)
  278. {
  279. StringBuffer buf(columnVal);
  280. if (uc)
  281. buf.toUpperCase();
  282. if (prefixLength)
  283. statement->bindString_n(idx, buf, prefixLength);
  284. else
  285. statement->bindString(idx, buf);
  286. }
  287. return true;
  288. }
  289. else
  290. return false;
  291. }
  292. } prefixSearchColumnMapper;
  293. static class SearchColumnMapper : public PrefixSearchColumnMapper
  294. {
  295. public:
  296. virtual bool fromXML(CassandraStatement *statement, unsigned idx, IPTree *row, const char *, const char *userVal)
  297. {
  298. return _fromXML(statement, idx, row, userVal, 0, true);
  299. }
  300. } searchColumnMapper;
  301. static class LCSearchColumnMapper : public PrefixSearchColumnMapper
  302. {
  303. public:
  304. virtual bool fromXML(CassandraStatement *statement, unsigned idx, IPTree *row, const char *, const char *userVal)
  305. {
  306. return _fromXML(statement, idx, row, userVal, 0, false);
  307. }
  308. } lcSearchColumnMapper;
  309. static class IntColumnMapper : implements CassandraColumnMapper
  310. {
  311. public:
  312. virtual IPTree *toXML(IPTree *row, const char *name, const CassValue *value)
  313. {
  314. if (name)
  315. row->addPropInt(name, getSignedResult(NULL, value));
  316. return row;
  317. }
  318. virtual bool fromXML(CassandraStatement *statement, unsigned idx, IPTree *row, const char *name, const char *userVal)
  319. {
  320. if (row->hasProp(name))
  321. {
  322. if (statement)
  323. {
  324. int value = row->getPropInt(name);
  325. statement->bindInt32(idx, value);
  326. }
  327. return true;
  328. }
  329. else
  330. return false;
  331. }
  332. } intColumnMapper;
  333. static class DefaultedIntColumnMapper : public IntColumnMapper
  334. {
  335. public:
  336. virtual bool fromXML(CassandraStatement *statement, unsigned idx, IPTree *row, const char *name, const char * defaultValue)
  337. {
  338. if (statement)
  339. {
  340. int value = row->getPropInt(name, atoi(defaultValue));
  341. statement->bindInt32(idx, value);
  342. }
  343. return true;
  344. }
  345. } defaultedIntColumnMapper;
  346. static class BigIntColumnMapper : implements CassandraColumnMapper
  347. {
  348. public:
  349. virtual IPTree *toXML(IPTree *row, const char *name, const CassValue *value)
  350. {
  351. row->addPropInt64(name, getSignedResult(NULL, value));
  352. return row;
  353. }
  354. virtual bool fromXML(CassandraStatement *statement, unsigned idx, IPTree *row, const char *name, const char *userVal)
  355. {
  356. if (row->hasProp(name))
  357. {
  358. if (statement)
  359. {
  360. __int64 value = row->getPropInt64(name);
  361. statement->bindInt64(idx, value);
  362. }
  363. return true;
  364. }
  365. else
  366. return false;
  367. }
  368. } bigintColumnMapper;
  369. static class SimpleMapColumnMapper : implements CassandraColumnMapper
  370. {
  371. public:
  372. virtual IPTree *toXML(IPTree *row, const char *name, const CassValue *value)
  373. {
  374. Owned<IPTree> map = createPTree(name);
  375. CassandraIterator elems(cass_iterator_from_map(value));
  376. while (cass_iterator_next(elems))
  377. {
  378. rtlDataAttr str;
  379. unsigned chars;
  380. getStringResult(NULL, cass_iterator_get_map_key(elems), chars, str.refstr());
  381. StringAttr s(str.getstr(), chars);
  382. stringColumnMapper.toXML(map, s, cass_iterator_get_map_value(elems));
  383. }
  384. row->addPropTree(name, map.getClear());
  385. return row;
  386. }
  387. virtual bool fromXML(CassandraStatement *statement, unsigned idx, IPTree *row, const char *name, const char *userVal)
  388. {
  389. Owned<IPTree> child = row->getPropTree(name);
  390. if (child)
  391. {
  392. unsigned numItems = child->numChildren();
  393. if (numItems)
  394. {
  395. if (statement)
  396. {
  397. CassandraCollection collection(cass_collection_new(CASS_COLLECTION_TYPE_MAP, numItems));
  398. Owned<IPTreeIterator> items = child->getElements("*");
  399. ForEach(*items)
  400. {
  401. IPTree &item = items->query();
  402. const char *key = item.queryName();
  403. const char *value = item.queryProp(NULL);
  404. if (key && value)
  405. {
  406. check(cass_collection_append_string(collection, key));
  407. check(cass_collection_append_string(collection, value));
  408. }
  409. }
  410. statement->bindCollection(idx, collection);
  411. }
  412. return true;
  413. }
  414. }
  415. return false;
  416. }
  417. } simpleMapColumnMapper;
  418. static class AttributeMapColumnMapper : implements CassandraColumnMapper
  419. {
  420. public:
  421. virtual IPTree *toXML(IPTree *row, const char *name, const CassValue *value)
  422. {
  423. CassandraIterator elems(cass_iterator_from_map(value));
  424. while (cass_iterator_next(elems))
  425. {
  426. rtlDataAttr str;
  427. unsigned chars;
  428. getStringResult(NULL, cass_iterator_get_map_key(elems), chars, str.refstr());
  429. StringBuffer s("@");
  430. s.append(chars, str.getstr());
  431. stringColumnMapper.toXML(row, s, cass_iterator_get_map_value(elems));
  432. }
  433. return row;
  434. }
  435. virtual bool fromXML(CassandraStatement *statement, unsigned idx, IPTree *row, const char *name, const char *userVal)
  436. {
  437. // NOTE - name here provides a list of attributes that we should NOT be mapping
  438. Owned<IAttributeIterator> attrs = row->getAttributes();
  439. unsigned numItems = 0;
  440. ForEach(*attrs)
  441. {
  442. StringBuffer key = attrs->queryName();
  443. key.append('@');
  444. if (strstr(name, key) == NULL)
  445. numItems++;
  446. }
  447. if (numItems)
  448. {
  449. if (statement)
  450. {
  451. CassandraCollection collection(cass_collection_new(CASS_COLLECTION_TYPE_MAP, numItems));
  452. ForEach(*attrs)
  453. {
  454. StringBuffer key = attrs->queryName();
  455. key.append('@');
  456. if (strstr(name, key) == NULL)
  457. {
  458. const char *value = attrs->queryValue();
  459. check(cass_collection_append_string(collection, attrs->queryName()+1)); // skip the @
  460. check(cass_collection_append_string(collection, value));
  461. }
  462. }
  463. statement->bindCollection(idx, collection);
  464. }
  465. return true;
  466. }
  467. else
  468. return false;
  469. }
  470. } attributeMapColumnMapper;
  471. static class ElementMapColumnMapper : implements CassandraColumnMapper
  472. {
  473. public:
  474. virtual IPTree *toXML(IPTree *row, const char *name, const CassValue *value)
  475. {
  476. CassandraIterator elems(cass_iterator_from_map(value));
  477. while (cass_iterator_next(elems))
  478. {
  479. rtlDataAttr str;
  480. unsigned chars;
  481. getStringResult(NULL, cass_iterator_get_map_key(elems), chars, str.refstr());
  482. StringBuffer elemName(chars, str.getstr());
  483. stringColumnMapper.toXML(row, elemName, cass_iterator_get_map_value(elems));
  484. }
  485. return row;
  486. }
  487. virtual bool fromXML(CassandraStatement *statement, unsigned idx, IPTree *row, const char *name, const char *userVal)
  488. {
  489. // NOTE - name here provides a list of elements that we should NOT be mapping
  490. Owned<IPTreeIterator> elems = row->getElements("*");
  491. unsigned numItems = 0;
  492. ForEach(*elems)
  493. {
  494. IPTree &item = elems->query();
  495. StringBuffer key('@');
  496. key.append(item.queryName());
  497. key.append('@');
  498. if (strstr(name, key) == NULL)
  499. {
  500. const char *value = item.queryProp(".");
  501. if (value)
  502. numItems++;
  503. }
  504. }
  505. if (numItems)
  506. {
  507. if (statement)
  508. {
  509. CassandraCollection collection(cass_collection_new(CASS_COLLECTION_TYPE_MAP, numItems));
  510. ForEach(*elems)
  511. {
  512. IPTree &item = elems->query();
  513. StringBuffer key('@');
  514. key.append(item.queryName());
  515. key.append('@');
  516. if (strstr(name, key) == NULL)
  517. {
  518. const char *value = item.queryProp(".");
  519. if (value)
  520. {
  521. check(cass_collection_append_string(collection, item.queryName()));
  522. check(cass_collection_append_string(collection, value));
  523. }
  524. }
  525. }
  526. statement->bindCollection(idx, collection);
  527. }
  528. return true;
  529. }
  530. else
  531. return false;
  532. }
  533. } elementMapColumnMapper;
  534. static class SubtreeMapColumnMapper : implements CassandraColumnMapper
  535. {
  536. public:
  537. virtual IPTree *toXML(IPTree *row, const char *name, const CassValue *value)
  538. {
  539. CassandraIterator elems(cass_iterator_from_map(value));
  540. while (cass_iterator_next(elems))
  541. {
  542. rtlDataAttr str;
  543. unsigned chars;
  544. getStringResult(NULL, cass_iterator_get_map_key(elems), chars, str.refstr());
  545. StringBuffer elemName(chars, str.getstr());
  546. const CassValue *value = cass_iterator_get_map_value(elems);
  547. StringBuffer valStr;
  548. getCassString(valStr, value);
  549. if (valStr.length() && valStr.charAt(0)== '<')
  550. {
  551. IPTree *sub = createPTreeFromXMLString(valStr);
  552. row->setPropTree(elemName, sub);
  553. }
  554. }
  555. return row;
  556. }
  557. virtual bool fromXML(CassandraStatement *statement, unsigned idx, IPTree *row, const char *name, const char *userVal)
  558. {
  559. // NOTE - name here provides a list of elements that we SHOULD be mapping
  560. Owned<IPTreeIterator> elems = row->getElements("*");
  561. unsigned numItems = 0;
  562. ForEach(*elems)
  563. {
  564. IPTree &item = elems->query();
  565. StringBuffer key("@");
  566. key.append(item.queryName());
  567. key.append('@');
  568. if (strstr(name, key) != NULL)
  569. numItems++;
  570. }
  571. if (numItems)
  572. {
  573. if (statement)
  574. {
  575. CassandraCollection collection(cass_collection_new(CASS_COLLECTION_TYPE_MAP, numItems));
  576. ForEach(*elems)
  577. {
  578. IPTree &item = elems->query();
  579. StringBuffer key("@");
  580. key.append(item.queryName());
  581. key.append('@');
  582. if (strstr(name, key) != NULL)
  583. {
  584. StringBuffer x;
  585. ::toXML(&item, x);
  586. check(cass_collection_append_string(collection, item.queryName()));
  587. check(cass_collection_append_string(collection, x));
  588. }
  589. }
  590. statement->bindCollection(idx, collection);
  591. }
  592. return true;
  593. }
  594. else
  595. return false;
  596. }
  597. } subTreeMapColumnMapper;
  598. /*
  599. static class QueryTextColumnMapper : public StringColumnMapper
  600. {
  601. public:
  602. virtual IPTree *toXML(IPTree *row, const char *name, const CassValue *value)
  603. {
  604. // Name is "Query/Text ...
  605. IPTree *query = row->queryPropTree("Query");
  606. if (!query)
  607. {
  608. query = createPTree("Query");
  609. query = row->setPropTree("Query", query);
  610. row->setProp("Query/@fetchEntire", "1"); // Compatibility...
  611. }
  612. return StringColumnMapper::toXML(query, "Text", value);
  613. }
  614. } queryTextColumnMapper;
  615. */
  616. static class GraphMapColumnMapper : implements CassandraColumnMapper
  617. {
  618. public:
  619. GraphMapColumnMapper(const char *_elemName, const char *_nameAttr)
  620. : elemName(_elemName), nameAttr(_nameAttr)
  621. {
  622. }
  623. virtual IPTree *toXML(IPTree *row, const char *name, const CassValue *value)
  624. {
  625. Owned<IPTree> map = createPTree(name);
  626. CassandraIterator elems(cass_iterator_from_map(value));
  627. while (cass_iterator_next(elems))
  628. {
  629. rtlDataAttr str;
  630. unsigned chars;
  631. getStringResult(NULL, cass_iterator_get_map_value(elems), chars, str.refstr());
  632. Owned<IPTree> child = createPTreeFromXMLString(chars, str.getstr());
  633. map->addPropTree(elemName, child.getClear());
  634. }
  635. row->addPropTree(name, map.getClear());
  636. return row;
  637. }
  638. virtual bool fromXML(CassandraStatement *statement, unsigned idx, IPTree *row, const char *name, const char *userVal)
  639. {
  640. Owned<IPTree> child = row->getPropTree(name);
  641. if (child)
  642. {
  643. unsigned numItems = child->numChildren();
  644. if (numItems)
  645. {
  646. if (statement)
  647. {
  648. CassandraCollection collection(cass_collection_new(CASS_COLLECTION_TYPE_MAP, numItems));
  649. Owned<IPTreeIterator> items = child->getElements("*");
  650. ForEach(*items)
  651. {
  652. IPTree &item = items->query();
  653. const char *key = item.queryProp(nameAttr);
  654. // MORE - may need to read, and probably should write, compressed. At least for graphs
  655. StringBuffer value;
  656. ::toXML(&item, value, 0, 0);
  657. if (key && value.length())
  658. {
  659. check(cass_collection_append_string(collection, key));
  660. check(cass_collection_append_string(collection, value));
  661. }
  662. }
  663. statement->bindCollection(idx, collection);
  664. }
  665. return true;
  666. }
  667. }
  668. return false;
  669. }
  670. private:
  671. const char *elemName;
  672. const char *nameAttr;
  673. } graphMapColumnMapper("Graph", "@name"), workflowMapColumnMapper("Item", "@wfid"), associationsMapColumnMapper("File", "@filename"), usedFieldsMapColumnMapper("field", "@name");
  674. static class WarningsMapColumnMapper : implements CassandraColumnMapper
  675. {
  676. public:
  677. virtual IPTree *toXML(IPTree *row, const char *name, const CassValue *value)
  678. {
  679. CassandraIterator elems(cass_iterator_from_map(value));
  680. while (cass_iterator_next(elems))
  681. {
  682. unsigned code = getUnsignedResult(NULL, cass_iterator_get_map_key(elems));
  683. VStringBuffer xpath("OnWarnings/OnWarning[@code='%u']", code);
  684. IPropertyTree * mapping = row->queryPropTree(xpath);
  685. if (!mapping)
  686. {
  687. IPropertyTree * onWarnings = ensurePTree(row, "OnWarnings");
  688. mapping = onWarnings->addPropTree("OnWarning", createPTree());
  689. mapping->setPropInt("@code", code);
  690. }
  691. rtlDataAttr str;
  692. unsigned chars;
  693. getStringResult(NULL, cass_iterator_get_map_value(elems), chars, str.refstr());
  694. StringBuffer s(chars, str.getstr());
  695. mapping->setProp("@severity", s);
  696. }
  697. return row;
  698. }
  699. virtual bool fromXML(CassandraStatement *statement, unsigned idx, IPTree *row, const char *name, const char *userVal)
  700. {
  701. if (!row->hasProp("OnWarnings/OnWarning"))
  702. return false;
  703. else
  704. {
  705. if (statement)
  706. {
  707. CassandraCollection collection(cass_collection_new(CASS_COLLECTION_TYPE_MAP, 5));
  708. Owned<IPTreeIterator> elems = row->getElements("OnWarnings/OnWarning");
  709. ForEach(*elems)
  710. {
  711. IPTree &item = elems->query();
  712. unsigned code = item.getPropInt("@code", 0);
  713. const char *value = item.queryProp("@severity");
  714. if (value)
  715. {
  716. check(cass_collection_append_int32(collection, code));
  717. check(cass_collection_append_string(collection, value));
  718. }
  719. }
  720. statement->bindCollection(idx, collection);
  721. }
  722. return true;
  723. }
  724. }
  725. } warningsMapColumnMapper;
  726. static class PluginListColumnMapper : implements CassandraColumnMapper
  727. {
  728. public:
  729. PluginListColumnMapper(const char *_elemName, const char *_nameAttr)
  730. : elemName(_elemName), nameAttr(_nameAttr)
  731. {
  732. }
  733. virtual IPTree *toXML(IPTree *row, const char *name, const CassValue *value)
  734. {
  735. Owned<IPTree> map = name ? createPTree(name) : LINK(row);
  736. CassandraIterator elems(cass_iterator_from_collection(value));
  737. while (cass_iterator_next(elems))
  738. {
  739. Owned<IPTree> child = createPTree(elemName);
  740. stringColumnMapper.toXML(child, nameAttr, cass_iterator_get_value(elems));
  741. map->addPropTree(elemName, child.getClear());
  742. }
  743. if (name)
  744. row->addPropTree(name, map.getClear());
  745. return row;
  746. }
  747. virtual bool fromXML(CassandraStatement *statement, unsigned idx, IPTree *row, const char *name, const char *userVal)
  748. {
  749. Owned<IPTree> child = row->getPropTree(name);
  750. if (child)
  751. {
  752. unsigned numItems = child->numChildren();
  753. if (numItems)
  754. {
  755. if (statement)
  756. {
  757. CassandraCollection collection(cass_collection_new(CASS_COLLECTION_TYPE_LIST, numItems));
  758. Owned<IPTreeIterator> items = child->getElements("*");
  759. ForEach(*items)
  760. {
  761. IPTree &item = items->query();
  762. const char *value = item.queryProp(nameAttr);
  763. if (value)
  764. check(cass_collection_append_string(collection, value));
  765. }
  766. statement->bindCollection(idx, collection);
  767. }
  768. return true;
  769. }
  770. }
  771. return false;
  772. }
  773. private:
  774. const char *elemName;
  775. const char *nameAttr;
  776. } pluginListColumnMapper("Plugin", "@dllname"), subfileListColumnMapper("Subfile", "@name");
  777. struct CassandraXmlMapping
  778. {
  779. const char *columnName;
  780. const char *columnType;
  781. const char *xpath;
  782. CassandraColumnMapper &mapper;
  783. };
  784. struct CassandraTableInfo
  785. {
  786. const char *x;
  787. const CassandraXmlMapping *mappings;
  788. };
  789. static const int majorVersion = 1; // If this does not match the value in the repository, you cannot proceed - a conversion tool is needed
  790. static const int minorVersion = 2; // If this is less that the value in the repository, we should be fine (but there may be columns we don't know about and thus don't read - and will write as NULL in new rows)
  791. // If this is greater than the value in the repository, we need to update the repository (using add column) and its version before proceeding
  792. // Make sure to increment this if any column is ever added below
  793. static const CassandraXmlMapping workunitsMappings [] =
  794. {
  795. {"partition", "int", NULL, hashRootNameColumnMapper},
  796. {"wuid", "text", NULL, rootNameColumnMapper},
  797. {"clustername", "text", "@clusterName", stringColumnMapper},
  798. {"jobname", "text", "@jobName", stringColumnMapper},
  799. {"priorityclass", "text", "@priorityClass", stringColumnMapper},
  800. {"prioritylevel", "int", "@priorityLevel", intColumnMapper},
  801. {"wuScope", "text", "@scope", stringColumnMapper},
  802. {"submitID", "text", "@submitID", stringColumnMapper},
  803. {"state", "text", "@state", stringColumnMapper},
  804. {"action", "text", "Action", stringColumnMapper},
  805. {"protected", "boolean", "@protected", boolColumnMapper},
  806. {"scheduled", "text", "@timeScheduled", stringColumnMapper}, // Should store as a date?
  807. {"totalThorTime", "text", "@totalThorTime", stringColumnMapper}, // We store in the wu ptree as a collatable string (with leading spaces to force to one partition)
  808. {"appvalues", "map<text, text>", "@Application@", subTreeMapColumnMapper},
  809. {"agentSession", "bigint", "@agentSession", bigintColumnMapper},
  810. {"debug", "map<text, text>", "Debug", simpleMapColumnMapper},
  811. {"attributes", "map<text, text>", "@agentSession@wuid@clusterName@jobName@priorityClass@priorityLevel@protected@scope@submitID@state@timeScheduled@totalThorTime@", attributeMapColumnMapper}, // name is the suppression list, note trailing @
  812. {"plugins", "list<text>", "Plugins", pluginListColumnMapper},
  813. {"workflow", "map<text, text>", "Workflow", workflowMapColumnMapper},
  814. {"onWarnings", "map<int, text>", "OnWarnings/OnWarning", warningsMapColumnMapper},
  815. // These are catchalls for anything not processed above or in a child table
  816. {"elements", "map<text, text>", "@Action@Application@Debug@Exceptions@Files@FilesRead@Graphs@Results@Statistics@Plugins@Query@State@Variables@Temporaries@Workflow@", elementMapColumnMapper}, // name is the suppression list, note trailing @
  817. {"subtrees", "map<text, text>", "@DiskUsageStats@Parameters@Process@Tracing@", subTreeMapColumnMapper}, // name is the INCLUSION list, note trailing @
  818. { NULL, "workunits", "((partition), wuid)|CLUSTERING ORDER BY (wuid DESC)", stringColumnMapper}
  819. };
  820. static const CassandraXmlMapping workunitInfoMappings [] = // A cut down version of the workunit mappings - used when querying with no key
  821. {
  822. {"partition", "int", NULL, hashRootNameColumnMapper},
  823. {"wuid", "text", NULL, rootNameColumnMapper},
  824. {"clustername", "text", "@clusterName", stringColumnMapper},
  825. {"jobname", "text", "@jobName", stringColumnMapper},
  826. {"priorityclass", "text", "@priorityClass", stringColumnMapper},
  827. {"prioritylevel", "int", "@priorityLevel", intColumnMapper},
  828. {"wuScope", "text", "@scope", stringColumnMapper},
  829. {"submitID", "text", "@submitID", stringColumnMapper},
  830. {"state", "text", "@state", stringColumnMapper},
  831. {"action", "text", "Action", stringColumnMapper},
  832. {"protected", "boolean", "@protected", boolColumnMapper},
  833. {"scheduled", "text", "@timeScheduled", stringColumnMapper}, // Should store as a date?
  834. {"totalThorTime", "text", "@totalThorTime", stringColumnMapper}, // We store in the wu ptree as a collatable string. Need to force to one partition too
  835. {"appvalues", "map<text, text>", "@Application@", subTreeMapColumnMapper},
  836. { NULL, "workunits", "((partition), wuid)|CLUSTERING ORDER BY (wuid DESC)", stringColumnMapper}
  837. };
  838. // The following describes the search table - this contains copies of the basic wu information but keyed by different fields
  839. static const CassandraXmlMapping searchMappings [] =
  840. {
  841. {"xpath", "text", NULL, suppliedStringColumnMapper},
  842. {"fieldPrefix", "text", NULL, prefixSearchColumnMapper},
  843. {"fieldValue", "text", NULL, searchColumnMapper},
  844. {"wuid", "text", NULL, rootNameColumnMapper},
  845. {"clustername", "text", "@clusterName", stringColumnMapper},
  846. {"jobname", "text", "@jobName", stringColumnMapper},
  847. {"priorityclass", "text", "@priorityClass", stringColumnMapper},
  848. {"prioritylevel", "int", "@priorityLevel", intColumnMapper},
  849. {"scope", "text", "@scope", stringColumnMapper},
  850. {"submitID", "text", "@submitID", stringColumnMapper},
  851. {"state", "text", "@state", stringColumnMapper},
  852. {"action", "text", "Action", stringColumnMapper},
  853. {"protected", "boolean", "@protected", boolColumnMapper},
  854. {"scheduled", "text", "@timeScheduled", stringColumnMapper}, // Should store as a date?
  855. {"totalThorTime", "text", "@totalThorTime", stringColumnMapper}, // We store in the wu ptree as a collatable string. Need to force to one partition too
  856. {"appvalues", "map<text, text>", "@Application@", subTreeMapColumnMapper},
  857. { NULL, "workunitsSearch", "((xpath, fieldPrefix), fieldValue, wuid)|CLUSTERING ORDER BY (fieldValue ASC, wuid DESC)", stringColumnMapper}
  858. };
  859. // The fields we can search by. These are a subset of the fields in the basic workunit info that is returned from a search. A row is created in the search table for each of these, for each workunit.
  860. const char * searchPaths[] = { "@submitID", "@clusterName", "@jobName", "@priorityClass", "@protected", "@scope", "@state", "@totalThorTime", NULL};
  861. static const CassandraXmlMapping uniqueSearchMappings [] =
  862. {
  863. {"xpath", "text", NULL, suppliedStringColumnMapper},
  864. {"fieldPrefix", "text", NULL, prefixSearchColumnMapper}, // Leading N chars, upper-cased
  865. {"fieldValue", "text", NULL, searchColumnMapper}, // upper-cased
  866. {"origFieldValue", "text", NULL, lcSearchColumnMapper}, // original case
  867. { NULL, "uniqueSearchValues", "((xpath, fieldPrefix), fieldValue, origFieldValue)|CLUSTERING ORDER BY (fieldValue ASC)", stringColumnMapper}
  868. };
  869. // The fields we can wild search by. We store these in the uniqueSearchMappings table so we can translate wildcards into sets
  870. // We also add application name/key combinations, but we have to special-case that
  871. const char * wildSearchPaths[] = { "@submitID", "@clusterName", "@jobName", NULL};
  872. static const CassandraXmlMapping filesSearchMappings [] =
  873. {
  874. {"name", "text", "@name", stringColumnMapper},
  875. {"read", "boolean", "@read", boolColumnMapper},
  876. {"wuid", "text", NULL, suppliedStringColumnMapper},
  877. { NULL, "filesSearchValues", "((name, read), wuid)|CLUSTERING ORDER BY (wuid DESC)", stringColumnMapper}
  878. };
  879. // The version table is keyed by a partition value because (a) you need to key by something and (b) we can use it to spread the load of
  880. // version lookups (pick a partition at random).
  881. // Note that this table must have the same minimum layout on all versions.
  882. static const CassandraXmlMapping versionMappings [] =
  883. {
  884. {"partition", "int", "@partition", intColumnMapper},
  885. {"major", "int", "@major", intColumnMapper},
  886. {"minor", "int", "@minor", intColumnMapper},
  887. {"attributes", "map<text, text>", "@major@minor@partition@", attributeMapColumnMapper}, // name is the suppression list, note trailing @
  888. { NULL, "version", "((partition))", stringColumnMapper}
  889. };
  890. /*
  891. * Some thoughts on the secondary tables:
  892. * 1. To support (trailing) wildcards we will need to split the key into two - the leading N chars and the rest. Exactly what N is will depend on the installation size.
  893. * Too large and users will complain, but too small would hinder partitioning of the values across Cassandra nodes. 1 or 2 may be enough.
  894. * 2. I could combine all the secondary tables into 1 with a field indicating the type of the key. The key field would be repeated though... Would it help?
  895. * I'm not sure it really changes a lot - adds a bit of noise into the partitioner...
  896. * Actually, it does mean that the updates and deletes can all be done with a single Cassandra query, though whether that has any advantages over multiple in a batch I don't know
  897. * It MAY well make it easier to make sure that searches are case-insensitive, since we'll generally need to separate out the search field from the display field to achieve that
  898. * 3. Sort orders are tricky - I can use the secondary table to deliver sorted by one field as long as it is the one I am filtering by (but if is is I probably don't need it sorted!)
  899. *
  900. */
  901. // The following describe child tables - all keyed by wuid
  902. enum ChildTablesEnum { WuQueryChild, WuExceptionsChild, WuStatisticsChild, WuGraphsChild, WuResultsChild, WuVariablesChild, WuTemporariesChild, WuFilesReadChild, WuFilesWrittenChild, WuFieldUsage, ChildTablesSize };
  903. struct ChildTableInfo
  904. {
  905. const char *parentElement;
  906. const char *childElement;
  907. ChildTablesEnum index;
  908. const CassandraXmlMapping *mappings;
  909. };
  910. // wuQueries table is slightly unusual among the child tables as is is 1:1 - it is split out for lazy load purposes.
  911. static const CassandraXmlMapping wuQueryMappings [] =
  912. {
  913. {"partition", "int", NULL, hashRootNameColumnMapper},
  914. {"wuid", "text", NULL, rootNameColumnMapper},
  915. {"associations", "map<text, text>", "Associated", associationsMapColumnMapper},
  916. {"attributes", "map<text, text>", "", attributeMapColumnMapper},
  917. {"query", "text", "Text", stringColumnMapper}, // May want to make this even lazier...
  918. {"shortQuery", "text", "ShortText", stringColumnMapper},
  919. { NULL, "wuQueries", "((partition), wuid)", stringColumnMapper}
  920. };
  921. static const ChildTableInfo wuQueriesTable =
  922. {
  923. "Query", NULL,
  924. WuQueryChild,
  925. wuQueryMappings
  926. };
  927. // wuExceptions table holds the exceptions associated with a wuid
  928. static const CassandraXmlMapping wuExceptionsMappings [] =
  929. {
  930. {"partition", "int", NULL, hashRootNameColumnMapper},
  931. {"wuid", "text", NULL, rootNameColumnMapper},
  932. {"sequence", "int", "@sequence", intColumnMapper},
  933. {"attributes", "map<text, text>", "", attributeMapColumnMapper},
  934. {"value", "text", ".", stringColumnMapper},
  935. { NULL, "wuExceptions", "((partition), wuid, sequence)", stringColumnMapper}
  936. };
  937. static const ChildTableInfo wuExceptionsTable =
  938. {
  939. "Exceptions", "Exception",
  940. WuExceptionsChild,
  941. wuExceptionsMappings
  942. };
  943. static const CassandraXmlMapping wuStatisticsMappings [] =
  944. {
  945. {"partition", "int", NULL, hashRootNameColumnMapper},
  946. {"wuid", "text", NULL, rootNameColumnMapper},
  947. {"ts", "bigint", "@ts", bigintColumnMapper}, // MORE - should change this to a timeuuid ?
  948. {"kind", "text", "@kind", stringColumnMapper},
  949. {"creator", "text", "@creator", stringColumnMapper},
  950. {"scope", "text", "@scope", stringColumnMapper},
  951. {"attributes", "map<text, text>", "@ts@kind@creator@scope@", attributeMapColumnMapper},
  952. { NULL, "wuStatistics", "((partition), wuid, ts, kind, creator, scope)", stringColumnMapper}
  953. };
  954. static const ChildTableInfo wuStatisticsTable =
  955. {
  956. "Statistics", "Statistic",
  957. WuStatisticsChild,
  958. wuStatisticsMappings
  959. };
  960. static const CassandraXmlMapping wuGraphsMappings [] =
  961. {
  962. {"partition", "int", NULL, hashRootNameColumnMapper},
  963. {"wuid", "text", NULL, rootNameColumnMapper},
  964. {"name", "text", "@name", stringColumnMapper},
  965. {"attributes", "map<text, text>", "@name@", attributeMapColumnMapper},
  966. {"xgmml", "blob", "xgmml", compressTreeColumnMapper},
  967. { NULL, "wuGraphs", "((partition), wuid, name)", stringColumnMapper} // Note - we do occasionally search by type - but that is done in a postfilter having preloaded/cached all
  968. };
  969. static const ChildTableInfo wuGraphsTable =
  970. {
  971. "Graphs", "Graph",
  972. WuGraphsChild,
  973. wuGraphsMappings
  974. };
  975. // A cut down version of the above - note this does not represent a different table!
  976. static const CassandraXmlMapping wuGraphMetasMappings [] =
  977. {
  978. {"partition", "int", NULL, hashRootNameColumnMapper},
  979. {"wuid", "text", NULL, rootNameColumnMapper},
  980. {"name", "text", "@name", stringColumnMapper},
  981. {"attributes", "map<text, text>", "@name@", attributeMapColumnMapper},
  982. { NULL, "wuGraphs", "((partition), wuid, name)", stringColumnMapper}
  983. };
  984. static const ChildTableInfo wuGraphMetasTable =
  985. {
  986. "Graphs", "Graph",
  987. WuGraphsChild,
  988. wuGraphMetasMappings
  989. };
  990. #define resultTableFields \
  991. {"partition", "int", NULL, hashRootNameColumnMapper}, \
  992. {"wuid", "text", NULL, rootNameColumnMapper}, \
  993. {"sequence", "int", "@sequence", defaultedIntColumnMapper}, \
  994. {"name", "text", "@name", stringColumnMapper}, \
  995. {"attributes", "map<text, text>", "@sequence@name@", attributeMapColumnMapper}, /* name is the suppression list */ \
  996. {"rowcount", "int", "rowCount", intColumnMapper}, /* This is the number of rows in result (which may be stored in a file rather than in value) */ \
  997. {"totalrowcount", "bigint", "totalRowCount", bigintColumnMapper}, /* This is the number of rows in value */ \
  998. {"schemaRaw", "blob", "SchemaRaw", blobColumnMapper}, \
  999. {"logicalName", "text", "logicalName", stringColumnMapper}, /* either this or value will be present once result status is "calculated" */ \
  1000. {"value", "blob", "Value", blobColumnMapper}, \
  1001. {"graph", "text", "@graph", stringColumnMapper}, \
  1002. {"activity", "int", "@activity", intColumnMapper}
  1003. static const CassandraXmlMapping wuResultsMappings [] =
  1004. {
  1005. resultTableFields,
  1006. { NULL, "wuResults", "((partition), wuid, sequence)", stringColumnMapper}
  1007. };
  1008. static const ChildTableInfo wuResultsTable =
  1009. {
  1010. "Results", "Result",
  1011. WuResultsChild,
  1012. wuResultsMappings
  1013. };
  1014. // This looks very similar to the above, but the key is different...
  1015. static const CassandraXmlMapping wuVariablesMappings [] =
  1016. {
  1017. resultTableFields,
  1018. {"xmlValue", "text", "xmlValue", stringColumnMapper},
  1019. { NULL, "wuVariables", "((partition), wuid, sequence, name)", stringColumnMapper}
  1020. };
  1021. static const ChildTableInfo wuVariablesTable =
  1022. {
  1023. "Variables", "Variable",
  1024. WuVariablesChild,
  1025. wuVariablesMappings
  1026. };
  1027. // Again, very similar, but mapped to a different area of the XML
  1028. static const CassandraXmlMapping wuTemporariesMappings [] =
  1029. {
  1030. resultTableFields,
  1031. { NULL, "wuTemporaries", "((partition), wuid, sequence, name)", stringColumnMapper}
  1032. };
  1033. static const ChildTableInfo wuTemporariesTable =
  1034. {
  1035. "Temporaries", "Variable",
  1036. WuTemporariesChild,
  1037. wuTemporariesMappings
  1038. };
  1039. static const CassandraXmlMapping wuFilesReadMappings [] =
  1040. {
  1041. {"partition", "int", NULL, hashRootNameColumnMapper},
  1042. {"wuid", "text", NULL, rootNameColumnMapper},
  1043. {"name", "text", "@name", stringColumnMapper},
  1044. {"attributes", "map<text, text>", "@name@", attributeMapColumnMapper}, /* name is the suppression list */
  1045. {"subfiles", "list<text>", NULL, subfileListColumnMapper},
  1046. { NULL, "wuFilesRead", "((partition), wuid, name)", stringColumnMapper}
  1047. };
  1048. static const ChildTableInfo wuFilesReadTable =
  1049. {
  1050. "FilesRead", "File",
  1051. WuFilesReadChild,
  1052. wuFilesReadMappings
  1053. };
  1054. static const CassandraXmlMapping wuFilesWrittenMappings [] =
  1055. {
  1056. {"partition", "int", NULL, hashRootNameColumnMapper},
  1057. {"wuid", "text", NULL, rootNameColumnMapper},
  1058. {"name", "text", "@name", stringColumnMapper},
  1059. {"attributes", "map<text, text>", "@name@", attributeMapColumnMapper}, /* name is the suppression list */
  1060. { NULL, "wuFilesWritten", "((partition), wuid, name)", stringColumnMapper}
  1061. };
  1062. static const ChildTableInfo wuFilesWrittenTable =
  1063. {
  1064. "Files", "File",
  1065. WuFilesWrittenChild,
  1066. wuFilesWrittenMappings
  1067. };
  1068. static const CassandraXmlMapping wuFieldUsageMappings [] =
  1069. {
  1070. {"partition", "int", NULL, hashRootNameColumnMapper},
  1071. {"wuid", "text", NULL, rootNameColumnMapper},
  1072. {"name", "text", "@name", stringColumnMapper},
  1073. {"type", "text", "@type", stringColumnMapper},
  1074. {"numFields", "int", "@numFields", intColumnMapper},
  1075. {"numFieldsUsed", "int", "@numFieldsUsed", intColumnMapper},
  1076. {"fields", "map<text, text>", "fields", usedFieldsMapColumnMapper},
  1077. { NULL, "wuFieldUsage", "((partition), wuid, name)", stringColumnMapper}
  1078. };
  1079. static const ChildTableInfo wuFieldUsageTable =
  1080. {
  1081. "usedsources", "datasource",
  1082. WuFieldUsage,
  1083. wuFieldUsageMappings
  1084. };
  1085. // Order should match the enum above
  1086. static const ChildTableInfo * const childTables [] = { &wuQueriesTable, &wuExceptionsTable, &wuStatisticsTable, &wuGraphsTable, &wuResultsTable, &wuVariablesTable, &wuTemporariesTable, &wuFilesReadTable, &wuFilesWrittenTable, &wuFieldUsageTable, NULL };
  1087. // Graph progress tables are read directly, XML mappers not used
  1088. static const CassandraXmlMapping wuGraphProgressMappings [] =
  1089. {
  1090. {"partition", "int", NULL, hashRootNameColumnMapper},
  1091. {"wuid", "text", NULL, rootNameColumnMapper},
  1092. {"graphID", "text", NULL, stringColumnMapper},
  1093. {"subgraphID", "bigint", NULL, bigintColumnMapper},
  1094. {"creator", "text", NULL, stringColumnMapper},
  1095. {"progress", "blob", NULL, blobColumnMapper},
  1096. { NULL, "wuGraphProgress", "((partition), wuid, graphID, subgraphID, creator)", stringColumnMapper}
  1097. };
  1098. static const CassandraXmlMapping wuGraphStateMappings [] =
  1099. {
  1100. {"partition", "int", NULL, hashRootNameColumnMapper},
  1101. {"wuid", "text", NULL, rootNameColumnMapper},
  1102. {"graphID", "text", NULL, stringColumnMapper},
  1103. {"subgraphID", "bigint", NULL, bigintColumnMapper},
  1104. {"state", "int", NULL, intColumnMapper},
  1105. { NULL, "wuGraphState", "((partition), wuid, graphID, subgraphID)", stringColumnMapper}
  1106. };
  1107. static const CassandraXmlMapping wuGraphRunningMappings [] =
  1108. {
  1109. {"partition", "int", NULL, hashRootNameColumnMapper},
  1110. {"wuid", "text", NULL, rootNameColumnMapper},
  1111. {"graphID", "text", NULL, stringColumnMapper},
  1112. {"subgraphID", "bigint", NULL, bigintColumnMapper},
  1113. { NULL, "wuGraphRunning", "((partition), wuid)", stringColumnMapper}
  1114. };
  1115. interface ICassandraSession : public IInterface // MORE - rename!
  1116. {
  1117. virtual CassSession *querySession() const = 0;
  1118. virtual CassandraPrepared *prepareStatement(const char *query) const = 0;
  1119. virtual void executeAsync(CIArrayOf<CassandraStatement> &batch, const char *what) const = 0;
  1120. virtual unsigned queryTraceLevel() const = 0;
  1121. virtual const CassResult *fetchDataForWuid(const CassandraXmlMapping *mappings, const char *wuid, bool includeWuid) const = 0;
  1122. virtual const CassResult *fetchDataForWuidAndKey(const CassandraXmlMapping *mappings, const char *wuid, const char *key) const = 0;
  1123. virtual void deleteChildByWuid(const CassandraXmlMapping *mappings, const char *wuid, CassBatch *batch) const = 0;
  1124. virtual IPTree *cassandraToWorkunitXML(const char *wuid) const = 0;
  1125. };
  1126. void getBoundFieldNames(const CassandraXmlMapping *mappings, StringBuffer &names, StringBuffer &bindings, IPTree *inXML, const char *userVal, StringBuffer &tableName)
  1127. {
  1128. while (mappings->columnName)
  1129. {
  1130. if (!inXML || mappings->mapper.fromXML(NULL, 0, inXML, mappings->xpath, userVal))
  1131. {
  1132. names.appendf(",%s", mappings->columnName);
  1133. if (strcmp(mappings->columnType, "timeuuid")==0)
  1134. bindings.appendf(",now()");
  1135. else
  1136. bindings.appendf(",?");
  1137. }
  1138. mappings++;
  1139. }
  1140. tableName.append(mappings->columnType);
  1141. }
  1142. void getFieldNames(const CassandraXmlMapping *mappings, StringBuffer &names, StringBuffer &tableName)
  1143. {
  1144. while (mappings->columnName)
  1145. {
  1146. names.appendf(",%s", mappings->columnName);
  1147. mappings++;
  1148. }
  1149. tableName.append(mappings->columnType);
  1150. }
  1151. const char *queryTableName(const CassandraXmlMapping *mappings)
  1152. {
  1153. while (mappings->columnName)
  1154. mappings++;
  1155. return mappings->columnType;
  1156. }
  1157. StringBuffer & describeTable(const CassandraXmlMapping *mappings, StringBuffer &out)
  1158. {
  1159. StringBuffer fields;
  1160. while (mappings->columnName)
  1161. {
  1162. fields.appendf("%s %s,", mappings->columnName, mappings->columnType);
  1163. mappings++;
  1164. }
  1165. StringArray options;
  1166. options.appendList(mappings->xpath, "|");
  1167. assertex(options.length()); // Primary key at least should be present!
  1168. out.appendf("CREATE TABLE IF NOT EXISTS %s (%s PRIMARY KEY %s)", mappings->columnType, fields.str(), options.item(0));
  1169. unsigned idx = 1;
  1170. while (options.isItem(idx))
  1171. {
  1172. if (idx==1)
  1173. out.append(" WITH ");
  1174. else
  1175. out.append(", ");
  1176. out.append(options.item(idx));
  1177. idx++;
  1178. }
  1179. out.append(';');
  1180. return out;
  1181. }
  1182. const CassResult *executeQuery(CassSession *session, CassStatement *statement)
  1183. {
  1184. CassandraFuture future(cass_session_execute(session, statement));
  1185. future.wait("executeQuery");
  1186. return cass_future_get_result(future);
  1187. }
  1188. void deleteSecondaryByKey(const char * xpath, const char *key, const char *wuid, const ICassandraSession *sessionCache, CIArrayOf<CassandraStatement> &batch)
  1189. {
  1190. if (key)
  1191. {
  1192. StringBuffer ucKey(key);
  1193. ucKey.toUpperCase();
  1194. StringBuffer names;
  1195. StringBuffer tableName;
  1196. getFieldNames(searchMappings, names, tableName);
  1197. VStringBuffer deleteQuery("DELETE from %s where xpath=? and fieldPrefix=? and fieldValue=? and wuid=?;", tableName.str());
  1198. CassandraStatement &update = *new CassandraStatement(sessionCache->prepareStatement(deleteQuery));
  1199. update.bindString(0, xpath);
  1200. update.bindString_n(1, ucKey, CASS_SEARCH_PREFIX_SIZE);
  1201. update.bindString(2, ucKey);
  1202. update.bindString(3, wuid);
  1203. batch.append(update);
  1204. }
  1205. }
  1206. void executeSimpleCommand(CassSession *session, const char *command)
  1207. {
  1208. CassandraStatement statement(cass_statement_new(command, 0));
  1209. CassandraFuture future(cass_session_execute(session, statement));
  1210. future.wait("execute");
  1211. }
  1212. void ensureTable(CassSession *session, const CassandraXmlMapping *mappings)
  1213. {
  1214. StringBuffer schema;
  1215. executeSimpleCommand(session, describeTable(mappings, schema));
  1216. }
  1217. extern void simpleXMLtoCassandra(const ICassandraSession *session, CassBatch *batch, const CassandraXmlMapping *mappings, IPTree *inXML, const char *userVal = NULL)
  1218. {
  1219. StringBuffer names;
  1220. StringBuffer bindings;
  1221. StringBuffer tableName;
  1222. getBoundFieldNames(mappings, names, bindings, inXML, userVal, tableName);
  1223. VStringBuffer insertQuery("INSERT into %s (%s) values (%s);", tableName.str(), names.str()+1, bindings.str()+1);
  1224. CassandraStatement update(session->prepareStatement(insertQuery));
  1225. unsigned bindidx = 0;
  1226. while (mappings->columnName)
  1227. {
  1228. if (mappings->mapper.fromXML(&update, bindidx, inXML, mappings->xpath, userVal))
  1229. bindidx++;
  1230. mappings++;
  1231. }
  1232. check(cass_batch_add_statement(batch, update));
  1233. }
  1234. extern void simpleXMLtoCassandra(const ICassandraSession *session, CIArrayOf<CassandraStatement> &batch, const CassandraXmlMapping *mappings, IPTree *inXML, const char *userVal = NULL)
  1235. {
  1236. StringBuffer names;
  1237. StringBuffer bindings;
  1238. StringBuffer tableName;
  1239. getBoundFieldNames(mappings, names, bindings, inXML, userVal, tableName);
  1240. VStringBuffer insertQuery("INSERT into %s (%s) values (%s);", tableName.str(), names.str()+1, bindings.str()+1);
  1241. CassandraStatement &update = *new CassandraStatement(session->prepareStatement(insertQuery));
  1242. unsigned bindidx = 0;
  1243. while (mappings->columnName)
  1244. {
  1245. if (mappings->mapper.fromXML(&update, bindidx, inXML, mappings->xpath, userVal))
  1246. bindidx++;
  1247. mappings++;
  1248. }
  1249. batch.append(update);
  1250. }
  1251. extern void deleteFileSearch(const ICassandraSession *session, CIArrayOf<CassandraStatement> &batch, const char *name, bool read, const char *wuid)
  1252. {
  1253. StringBuffer names;
  1254. StringBuffer tableName;
  1255. getFieldNames(filesSearchMappings, names, tableName);
  1256. VStringBuffer deleteQuery("DELETE from %s where name=? and read=? and wuid=?", tableName.str());
  1257. CassandraStatement &update = *new CassandraStatement(session->prepareStatement(deleteQuery));
  1258. update.bindString(0, name);
  1259. update.bindBool(1, read ? cass_true : cass_false);
  1260. update.bindString(2, wuid);
  1261. batch.append(update);
  1262. }
  1263. extern void addFileSearch(const ICassandraSession *session, CIArrayOf<CassandraStatement> &batch, const char *name, bool read, const char *wuid)
  1264. {
  1265. StringBuffer bindings;
  1266. StringBuffer names;
  1267. StringBuffer tableName;
  1268. getBoundFieldNames(filesSearchMappings, names, bindings, NULL, NULL, tableName);
  1269. VStringBuffer insertQuery("INSERT INTO %s (%s) values (%s)", tableName.str(), names.str()+1, bindings.str()+1);
  1270. CassandraStatement &update = *new CassandraStatement(session->prepareStatement(insertQuery));
  1271. update.bindString(0, name);
  1272. update.bindBool(1, read ? cass_true : cass_false);
  1273. update.bindString(2, wuid);
  1274. batch.append(update);
  1275. }
  1276. extern void addUniqueValue(const ICassandraSession *session, CIArrayOf<CassandraStatement> &batch, const char *xpath, const char *value)
  1277. {
  1278. StringBuffer bindings;
  1279. StringBuffer names;
  1280. StringBuffer tableName;
  1281. getBoundFieldNames(uniqueSearchMappings, names, bindings, NULL, NULL, tableName);
  1282. VStringBuffer insertQuery("INSERT into %s (%s) values (%s);", tableName.str(), names.str()+1, bindings.str()+1);
  1283. CassandraStatement &update = *new CassandraStatement(session->prepareStatement(insertQuery));
  1284. update.bindString(0, xpath);
  1285. StringBuffer ucValue(value);
  1286. ucValue.toUpperCase();
  1287. update.bindString_n(1, ucValue, CASS_SEARCH_PREFIX_SIZE);
  1288. update.bindString(2, ucValue);
  1289. update.bindString(3, value);
  1290. batch.append(update);
  1291. }
  1292. extern void childXMLRowtoCassandra(const ICassandraSession *session, CassBatch *batch, const CassandraXmlMapping *mappings, const char *wuid, IPTree &row, const char *userVal)
  1293. {
  1294. StringBuffer bindings;
  1295. StringBuffer names;
  1296. StringBuffer tableName;
  1297. // Note that we bind all fields, even where there is no value in the XML
  1298. // This ensures that values are correctly deleted where necessary - it also has
  1299. // the fortuitous benefit of reducing the number of variants of the query that we need to prepare and cache.
  1300. getBoundFieldNames(mappings, names, bindings, NULL, userVal, tableName);
  1301. VStringBuffer insertQuery("INSERT into %s (%s) values (%s);", tableName.str(), names.str()+1, bindings.str()+1);
  1302. CassandraStatement update(session->prepareStatement(insertQuery));
  1303. update.bindInt32(0, rtlHash32VStr(wuid, 0) % NUM_PARTITIONS);
  1304. update.bindString(1, wuid);
  1305. unsigned colidx = 2; // We already bound wuid and partition
  1306. while (mappings[colidx].columnName)
  1307. {
  1308. if (!mappings[colidx].mapper.fromXML(&update, colidx, &row, mappings[colidx].xpath, userVal))
  1309. update.bindNull(colidx);
  1310. colidx++;
  1311. }
  1312. check(cass_batch_add_statement(batch, update));
  1313. }
  1314. extern unsigned childCount(const ICassandraSession *session, const CassandraXmlMapping *mappings, const char *wuid)
  1315. {
  1316. VStringBuffer countQuery("SELECT count(*) FROM %s WHERE partition=? AND wuid=?;", queryTableName(mappings));
  1317. CassandraStatement count(session->prepareStatement(countQuery));
  1318. count.bindInt32(0, rtlHash32VStr(wuid, 0) % NUM_PARTITIONS);
  1319. count.bindString(1, wuid);
  1320. CassandraFuture future(cass_session_execute(session->querySession(), count));
  1321. future.wait("select count(*)");
  1322. CassandraResult result(cass_future_get_result(future));
  1323. return getUnsignedResult(NULL, getSingleResult(result));
  1324. }
  1325. extern void childXMLtoCassandra(const ICassandraSession *session, CassBatch *batch, const CassandraXmlMapping *mappings, const char *wuid, IPTreeIterator *elements, const char *userVal)
  1326. {
  1327. if (elements->first())
  1328. {
  1329. do
  1330. {
  1331. childXMLRowtoCassandra(session, batch, mappings, wuid, elements->query(), userVal);
  1332. }
  1333. while (elements->next());
  1334. }
  1335. }
  1336. extern void childXMLtoCassandra(const ICassandraSession *session, CassBatch *batch, const CassandraXmlMapping *mappings, IPTree *inXML, const char *xpath, const char *defaultValue)
  1337. {
  1338. Owned<IPTreeIterator> elements = inXML->getElements(xpath);
  1339. childXMLtoCassandra(session, batch, mappings, inXML->queryName(), elements, defaultValue);
  1340. }
  1341. static IPTree *rowToPTree(const char *xpath, const char *key, const CassandraXmlMapping *mappings, const CassRow *row)
  1342. {
  1343. CassandraIterator cols(cass_iterator_from_row(row));
  1344. Owned<IPTree> xml = createPTree("row"); // May be overwritten below if wuid field is processed
  1345. if (xpath && *xpath && key && *key)
  1346. xml->setProp(xpath, key);
  1347. while (cass_iterator_next(cols))
  1348. {
  1349. assertex(mappings->columnName);
  1350. const CassValue *value = cass_iterator_get_column(cols);
  1351. if (value && !cass_value_is_null(value))
  1352. mappings->mapper.toXML(xml, mappings->xpath, value);
  1353. mappings++;
  1354. }
  1355. return xml.getClear();
  1356. }
  1357. /*
  1358. * PostFilter represents a filter to be applied to a ConstWorkUnitInfo tree representation prior to returning it from an iterator
  1359. */
  1360. interface IPostFilter : public IInterface
  1361. {
  1362. virtual bool matches(IPTree &p) const = 0;
  1363. virtual const char *queryValue() const = 0;
  1364. virtual const char *queryXPath() const = 0;
  1365. virtual WUSortField queryField() const = 0;
  1366. };
  1367. class PostFilter : public CInterfaceOf<IPostFilter>
  1368. {
  1369. public:
  1370. PostFilter(WUSortField _field, const char *_value, bool _wild)
  1371. : field(_field), xpath(queryFilterXPath(_field)), wild(_wild)
  1372. {
  1373. setValue(_value);
  1374. }
  1375. virtual bool matches(IPTree &p) const
  1376. {
  1377. const char *val = p.queryProp(xpath);
  1378. if (val)
  1379. return wild ? WildMatch(val, pattern) : strieq(val, pattern);
  1380. else
  1381. return false;
  1382. }
  1383. virtual const char *queryValue() const
  1384. {
  1385. return value.str();
  1386. }
  1387. void setValue(const char *_value)
  1388. {
  1389. if (wild)
  1390. {
  1391. VStringBuffer filter("*%s*", _value);
  1392. pattern.set(filter);
  1393. }
  1394. else
  1395. pattern.set(_value);
  1396. value.set(_value);
  1397. }
  1398. virtual const char *queryXPath() const
  1399. {
  1400. return xpath;
  1401. }
  1402. virtual WUSortField queryField() const
  1403. {
  1404. return field;
  1405. }
  1406. private:
  1407. const char *xpath;
  1408. StringAttr pattern;
  1409. StringAttr value;
  1410. WUSortField field;
  1411. bool wild;
  1412. };
  1413. class AppValuePostFilter : public CInterfaceOf<IPostFilter>
  1414. {
  1415. public:
  1416. AppValuePostFilter(const char *_name, const char *_value, bool _wild) : wild(_wild)
  1417. {
  1418. xpath.appendf("Application/%s", _name);
  1419. setValue(_value);
  1420. }
  1421. virtual bool matches(IPTree &p) const
  1422. {
  1423. const char *val = p.queryProp(xpath);
  1424. if (val)
  1425. return wild ? WildMatch(val, pattern) : strieq(val, pattern);
  1426. else
  1427. return false;
  1428. }
  1429. virtual const char *queryValue() const
  1430. {
  1431. return value.str();
  1432. }
  1433. void setValue(const char *_value)
  1434. {
  1435. if (wild)
  1436. {
  1437. VStringBuffer filter("*%s*", _value);
  1438. pattern.set(filter);
  1439. }
  1440. else
  1441. pattern.set(_value);
  1442. value.set(_value);
  1443. }
  1444. virtual const char *queryXPath() const
  1445. {
  1446. return xpath;
  1447. }
  1448. virtual WUSortField queryField() const
  1449. {
  1450. return WUSFappvalue;
  1451. }
  1452. private:
  1453. StringBuffer xpath;
  1454. StringAttr pattern;
  1455. StringAttr value;
  1456. bool wild;
  1457. };
  1458. class CassSortableIterator : public CassandraIterator
  1459. {
  1460. public:
  1461. CassSortableIterator(CassIterator *_iterator, unsigned _idx, int _compareColumn, bool _descending)
  1462. : CassandraIterator(_iterator), idx(_idx), compareColumn(_compareColumn), descending(_descending)
  1463. {
  1464. }
  1465. const CassSortableIterator *nextRow()
  1466. {
  1467. if (iterator && cass_iterator_next(iterator))
  1468. {
  1469. if (compareColumn != -1)
  1470. {
  1471. const CassRow *row = cass_iterator_get_row(iterator);
  1472. getCassString(value.clear(), cass_row_get_column(row, compareColumn));
  1473. }
  1474. return this;
  1475. }
  1476. else
  1477. return NULL;
  1478. }
  1479. void stop()
  1480. {
  1481. value.clear();
  1482. set(NULL);
  1483. }
  1484. int compare(const CassSortableIterator *to) const
  1485. {
  1486. if (compareColumn==-1)
  1487. return idx - to->idx; // concat mode
  1488. int ret = strcmp(value, to->value); // Note - empty StringBuffer always returns ""
  1489. return descending ? -ret : ret;
  1490. }
  1491. private:
  1492. StringBuffer value;
  1493. unsigned idx;
  1494. int compareColumn;
  1495. bool descending;
  1496. };
  1497. interface IConstWorkUnitIteratorEx : public IConstWorkUnitIterator
  1498. {
  1499. virtual bool hasPostFilters() const = 0;
  1500. virtual bool isMerging() const = 0;
  1501. virtual void notePosition() const = 0;
  1502. };
  1503. /*
  1504. *
  1505. * The cache entries serve two purposes:
  1506. *
  1507. * 1. They allow us to map row numbers to values for the end of each page returned, which can make forward paging efficient when not post-sorting
  1508. * 2. They allow us to preserve post-sort results in order to avoid having to re-retrieve them.
  1509. */
  1510. class CCassandraWuUQueryCacheEntry : public CInterfaceOf<IInterface>
  1511. {
  1512. public:
  1513. CCassandraWuUQueryCacheEntry()
  1514. {
  1515. hint = get_cycles_now(); // MORE - should do better perhaps?
  1516. lastAccess = msTick();
  1517. }
  1518. __int64 queryHint() const
  1519. {
  1520. return hint;
  1521. }
  1522. void noteWuid(const char *wuid, const char *fieldValue, unsigned row)
  1523. {
  1524. CriticalBlock b(crit);
  1525. // NOTE - we store one set of row information per page retrieved - and we normally traverse the pages
  1526. // in order so appending to the end is better than (for example) binchopping
  1527. ForEachItemInRev(idx, rows)
  1528. {
  1529. unsigned foundRow = rows.item(idx);
  1530. if (foundRow==row)
  1531. {
  1532. assert(streq(wuids.item(idx), wuid));
  1533. assert(streq(fieldValues.item(idx), fieldValue));
  1534. return;
  1535. }
  1536. if (foundRow < row)
  1537. break;
  1538. }
  1539. rows.add(row, idx+1);
  1540. wuids.add(wuid, idx+1);
  1541. fieldValues.add(fieldValue ? fieldValue : "", idx+1);
  1542. }
  1543. IConstWorkUnitIteratorEx *getResult() const
  1544. {
  1545. CriticalBlock b(crit);
  1546. return result.getLink();
  1547. }
  1548. void setResult(IConstWorkUnitIteratorEx *_result)
  1549. {
  1550. CriticalBlock b(crit);
  1551. result.set(_result);
  1552. }
  1553. unsigned lookupStartRow(StringBuffer &wuid, StringBuffer &fieldValue, unsigned startOffset) const
  1554. {
  1555. // See if we can provide a base wuid to search above/below
  1556. CriticalBlock b(crit);
  1557. ForEachItemInRev(idx, rows)
  1558. {
  1559. unsigned foundRow = rows.item(idx);
  1560. if (foundRow <= startOffset)
  1561. {
  1562. wuid.set(wuids.item(idx));
  1563. fieldValue.set(fieldValues.item(idx));
  1564. return foundRow;
  1565. }
  1566. }
  1567. return 0;
  1568. }
  1569. void touch()
  1570. {
  1571. lastAccess = msTick();
  1572. }
  1573. inline unsigned queryLastAccess() const
  1574. {
  1575. return lastAccess;
  1576. }
  1577. private:
  1578. mutable CriticalSection crit; // It's POSSIBLE that we could get two queries in hitting the cache at the same time, I think...
  1579. UnsignedArray rows;
  1580. StringArray wuids;
  1581. StringArray fieldValues;
  1582. Owned<IConstWorkUnitIteratorEx> result;
  1583. __uint64 hint;
  1584. unsigned lastAccess;
  1585. };
  1586. class CassMultiIterator : public CInterface, implements IRowProvider, implements ICompare, implements IConstWorkUnitIteratorEx
  1587. {
  1588. public:
  1589. IMPLEMENT_IINTERFACE;
  1590. CassMultiIterator(CCassandraWuUQueryCacheEntry *_cache, unsigned _startRowNum, int _compareColumn, bool _descending)
  1591. : cache(_cache)
  1592. {
  1593. compareColumn = _compareColumn;
  1594. descending = _descending;
  1595. startRowNum = _startRowNum;
  1596. rowNum = 0;
  1597. }
  1598. void setStartOffset(unsigned start)
  1599. {
  1600. startRowNum = start; // we managed to do a seek forward via a filter
  1601. }
  1602. void setCompareColumn(int _compareColumn)
  1603. {
  1604. assert(!inputs.length());
  1605. compareColumn = _compareColumn;
  1606. }
  1607. void addResult(CassandraResult &result)
  1608. {
  1609. results.append(result);
  1610. }
  1611. void addPostFilters(IArrayOf<IPostFilter> &filters, unsigned start)
  1612. {
  1613. unsigned len = filters.length();
  1614. while (start<len)
  1615. postFilters.append(OLINK(filters.item(start++)));
  1616. }
  1617. void addPostFilter(PostFilter &filter)
  1618. {
  1619. postFilters.append(filter);
  1620. }
  1621. virtual bool hasPostFilters() const
  1622. {
  1623. return postFilters.length() != 0;
  1624. }
  1625. virtual bool isMerging() const
  1626. {
  1627. return results.length() > 1;
  1628. }
  1629. virtual bool first()
  1630. {
  1631. inputs.kill();
  1632. ForEachItemIn(idx, results)
  1633. {
  1634. inputs.append(*new CassSortableIterator(cass_iterator_from_result(results.item(idx)), idx, compareColumn, descending));
  1635. }
  1636. merger.setown(createRowStreamMerger(inputs.length(), *this, this, false));
  1637. rowNum = startRowNum;
  1638. return next();
  1639. }
  1640. virtual void notePosition() const
  1641. {
  1642. if (cache && current)
  1643. {
  1644. cache->noteWuid(current->queryWuid(), lastThorTime, rowNum);
  1645. }
  1646. }
  1647. virtual bool next()
  1648. {
  1649. Owned<IConstWorkUnitInfo> last = current.getClear();
  1650. loop
  1651. {
  1652. const CassandraIterator *nextSource = nextMergedSource();
  1653. if (!nextSource)
  1654. {
  1655. if (cache && last)
  1656. {
  1657. cache->noteWuid(last->queryWuid(), lastThorTime, rowNum);
  1658. }
  1659. return false;
  1660. }
  1661. Owned<IPTree> wuXML = rowToPTree(NULL, NULL, workunitInfoMappings+1, cass_iterator_get_row(*nextSource)); // NOTE - this is relying on search mappings and wuInfoMappings being the same
  1662. bool postFiltered = false;
  1663. ForEachItemIn(pfIdx, postFilters)
  1664. {
  1665. if (!postFilters.item(pfIdx).matches(*wuXML))
  1666. {
  1667. postFiltered = true;
  1668. break;
  1669. }
  1670. }
  1671. if (!postFiltered)
  1672. {
  1673. current.setown(createConstWorkUnitInfo(*wuXML));
  1674. lastThorTime.set(wuXML->queryProp("@totalThorTime"));
  1675. rowNum++;
  1676. return true;
  1677. }
  1678. }
  1679. }
  1680. virtual bool isValid()
  1681. {
  1682. return current != NULL;
  1683. }
  1684. virtual IConstWorkUnitInfo & query()
  1685. {
  1686. assertex(current);
  1687. return *current.get();
  1688. }
  1689. const CassandraIterator *nextMergedSource()
  1690. {
  1691. return (const CassSortableIterator *) merger->nextRow();
  1692. }
  1693. protected:
  1694. virtual void linkRow(const void *row) { }
  1695. virtual void releaseRow(const void *row) { }
  1696. virtual const void *nextRow(unsigned idx)
  1697. {
  1698. CassSortableIterator &it = inputs.item(idx);
  1699. return it.nextRow(); // returns either a pointer to the iterator, or NULL
  1700. }
  1701. virtual void stop(unsigned idx)
  1702. {
  1703. inputs.item(idx).stop();
  1704. }
  1705. virtual int docompare(const void *a, const void *b) const
  1706. {
  1707. // a and b point to to CassSortableIterator objects
  1708. const CassSortableIterator *aa = (const CassSortableIterator *) a;
  1709. const CassSortableIterator *bb = (const CassSortableIterator *) b;
  1710. return aa->compare(bb);
  1711. }
  1712. private:
  1713. IArrayOf<CassandraResult> results;
  1714. IArrayOf<CassSortableIterator> inputs;
  1715. Owned<IRowStream> merger; // NOTE - must be destroyed before inputs is destroyed
  1716. IArrayOf<IPostFilter> postFilters;
  1717. Owned<IConstWorkUnitInfo> current;
  1718. Linked<CCassandraWuUQueryCacheEntry> cache;
  1719. StringAttr lastThorTime;
  1720. int compareColumn;
  1721. unsigned startRowNum;
  1722. unsigned rowNum;
  1723. bool descending;
  1724. };
  1725. class CassPostSortIterator : public CInterfaceOf<IConstWorkUnitIteratorEx>, implements ICompare
  1726. {
  1727. public:
  1728. CassPostSortIterator(IConstWorkUnitIterator * _input, unsigned _sortorder, unsigned _limit)
  1729. : input(_input), sortorder(_sortorder), limit(_limit)
  1730. {
  1731. idx = 0;
  1732. }
  1733. virtual bool first()
  1734. {
  1735. if (input)
  1736. {
  1737. readFirst();
  1738. input.clear();
  1739. }
  1740. idx = 0;
  1741. return sorted.isItem(idx);
  1742. }
  1743. virtual bool next()
  1744. {
  1745. idx++;
  1746. if (sorted.isItem(idx))
  1747. return true;
  1748. return false;
  1749. }
  1750. virtual void notePosition() const
  1751. {
  1752. }
  1753. virtual bool isValid()
  1754. {
  1755. return sorted.isItem(idx);
  1756. }
  1757. virtual IConstWorkUnitInfo & query()
  1758. {
  1759. return sorted.item(idx);
  1760. }
  1761. virtual bool hasPostFilters() const
  1762. {
  1763. return false; // they are done by my input. But we may want to rename this function to indicate "may return more than asked" in which case would be true
  1764. }
  1765. virtual bool isMerging() const
  1766. {
  1767. return false;
  1768. }
  1769. private:
  1770. void readFirst()
  1771. {
  1772. ForEach(*input)
  1773. {
  1774. sorted.append(OLINK(input->query()));
  1775. if (sorted.length()>=limit)
  1776. break;
  1777. }
  1778. qsortvec((void **)sorted.getArray(0), sorted.length(), *this);
  1779. }
  1780. virtual int docompare(const void *a, const void *b) const
  1781. {
  1782. // a and b point to to IConstWorkUnitInfo objects
  1783. const IConstWorkUnitInfo *aa = (const IConstWorkUnitInfo *) a;
  1784. const IConstWorkUnitInfo *bb = (const IConstWorkUnitInfo *) b;
  1785. int diff;
  1786. switch (sortorder & 0xff)
  1787. {
  1788. case WUSFuser:
  1789. diff = stricmp(aa->queryUser(), bb->queryUser());
  1790. break;
  1791. case WUSFcluster:
  1792. diff = stricmp(aa->queryClusterName(), bb->queryClusterName());
  1793. break;
  1794. case WUSFjob:
  1795. diff = stricmp(aa->queryJobName(), bb->queryJobName());
  1796. break;
  1797. case WUSFstate:
  1798. diff = stricmp(aa->queryStateDesc(), bb->queryStateDesc());
  1799. break;
  1800. case WUSFprotected:
  1801. diff = (int) bb->isProtected() - (int) aa->isProtected();
  1802. break;
  1803. case WUSFtotalthortime:
  1804. diff = (int) (bb->getTotalThorTime() - bb->getTotalThorTime());
  1805. break;
  1806. case WUSFwuid:
  1807. diff = stricmp(aa->queryWuid(), bb->queryWuid()); // Should never happen, since we always fetch with a wuid sort
  1808. break;
  1809. default:
  1810. throwUnexpected();
  1811. }
  1812. if (sortorder & WUSFreverse)
  1813. return -diff;
  1814. else
  1815. return diff;
  1816. }
  1817. Owned<IConstWorkUnitIterator> input;
  1818. IArrayOf<IConstWorkUnitInfo> sorted;
  1819. unsigned sortorder;
  1820. unsigned idx;
  1821. unsigned limit;
  1822. };
  1823. class SubPageIterator : public CInterfaceOf<IConstWorkUnitIteratorEx>
  1824. {
  1825. public:
  1826. SubPageIterator(IConstWorkUnitIteratorEx *_input, unsigned _startOffset, unsigned _pageSize)
  1827. : input(_input), startOffset(_startOffset), pageSize(_pageSize), idx(0)
  1828. {
  1829. }
  1830. virtual bool first()
  1831. {
  1832. idx = 0;
  1833. // MORE - put a seek into the Ex interface
  1834. if (input->first())
  1835. {
  1836. for (int i = 0; i < startOffset;i++)
  1837. {
  1838. if (!input->next())
  1839. return false;
  1840. }
  1841. return true;
  1842. }
  1843. else
  1844. return false;
  1845. }
  1846. virtual bool next()
  1847. {
  1848. idx++;
  1849. if (idx >= pageSize)
  1850. {
  1851. input->notePosition();
  1852. return false;
  1853. }
  1854. return input->next();
  1855. }
  1856. virtual void notePosition() const
  1857. {
  1858. input->notePosition();
  1859. }
  1860. virtual bool isValid()
  1861. {
  1862. return idx < pageSize && input->isValid();
  1863. }
  1864. virtual IConstWorkUnitInfo & query()
  1865. {
  1866. return input->query();
  1867. }
  1868. virtual bool hasPostFilters() const
  1869. {
  1870. return false;
  1871. }
  1872. virtual bool isMerging() const
  1873. {
  1874. return false;
  1875. }
  1876. private:
  1877. Owned<IConstWorkUnitIteratorEx> input;
  1878. unsigned startOffset;
  1879. unsigned pageSize;
  1880. unsigned idx;
  1881. };
  1882. class CassJoinIterator : implements IConstWorkUnitIteratorEx, public CInterface
  1883. {
  1884. public:
  1885. IMPLEMENT_IINTERFACE;
  1886. CassJoinIterator(unsigned _compareColumn, bool _descending)
  1887. {
  1888. compareColumn = _compareColumn;
  1889. descending = _descending;
  1890. }
  1891. void addResult(CassandraResult &result)
  1892. {
  1893. results.append(result);
  1894. }
  1895. void addPostFilter(IPostFilter &post)
  1896. {
  1897. postFilters.append(post);
  1898. }
  1899. virtual bool first()
  1900. {
  1901. if (!results.length())
  1902. return false;
  1903. inputs.kill();
  1904. ForEachItemIn(idx, results)
  1905. {
  1906. Owned <CassSortableIterator> input = new CassSortableIterator(cass_iterator_from_result(results.item(idx)), idx, compareColumn, descending);
  1907. if (!input->nextRow())
  1908. return false;
  1909. inputs.append(*input.getClear());
  1910. }
  1911. return next();
  1912. }
  1913. virtual bool next()
  1914. {
  1915. current.clear();
  1916. loop
  1917. {
  1918. unsigned idx = 0;
  1919. unsigned target = 0;
  1920. unsigned matches = 1; // I always match myself!
  1921. unsigned sources = inputs.length();
  1922. if (!sources)
  1923. return false;
  1924. while (matches < sources)
  1925. {
  1926. idx++;
  1927. if (idx==sources)
  1928. idx = 0;
  1929. int diff;
  1930. loop
  1931. {
  1932. assert(idx != target);
  1933. diff = inputs.item(idx).compare(&inputs.item(target));
  1934. if (diff >= 0)
  1935. break;
  1936. if (!inputs.item(idx).nextRow())
  1937. {
  1938. inputs.kill(); // Once any reaches EOF, we are done
  1939. return false;
  1940. }
  1941. }
  1942. if (diff > 0)
  1943. {
  1944. target = idx;
  1945. matches = 1;
  1946. }
  1947. else
  1948. matches++;
  1949. }
  1950. Owned<IPTree> wuXML = rowToPTree(NULL, NULL, workunitInfoMappings+1, cass_iterator_get_row(inputs.item(0)));
  1951. bool postFiltered = false;
  1952. ForEachItemIn(pfIdx, postFilters)
  1953. {
  1954. if (!postFilters.item(pfIdx).matches(*wuXML))
  1955. {
  1956. postFiltered = true;
  1957. break;
  1958. }
  1959. }
  1960. if (!postFiltered)
  1961. {
  1962. current.setown(createConstWorkUnitInfo(*wuXML));
  1963. ForEachItemIn(idx2, inputs)
  1964. {
  1965. if (!inputs.item(idx2).nextRow())
  1966. {
  1967. inputs.clear(); // Make sure next() fails next time it is called
  1968. break;
  1969. }
  1970. }
  1971. return true;
  1972. }
  1973. }
  1974. }
  1975. virtual bool isValid()
  1976. {
  1977. return current != NULL;
  1978. }
  1979. virtual IConstWorkUnitInfo & query()
  1980. {
  1981. assertex(current);
  1982. return *current.get();
  1983. }
  1984. private:
  1985. IArrayOf<CassandraResult> results;
  1986. IArrayOf<CassSortableIterator> inputs;
  1987. IArrayOf<IPostFilter> postFilters;
  1988. Owned<IConstWorkUnitInfo> current;
  1989. unsigned compareColumn;
  1990. bool descending;
  1991. };
  1992. static void lockWuid(Owned<IRemoteConnection> &connection, const char *wuid)
  1993. {
  1994. VStringBuffer wuRoot("/WorkUnitLocks/%s", wuid);
  1995. if (connection)
  1996. connection->changeMode(RTM_LOCK_WRITE, SDS_LOCK_TIMEOUT); // Would it ever be anything else?
  1997. else
  1998. connection.setown(querySDS().connect(wuRoot.str(), myProcessSession(), RTM_LOCK_WRITE|RTM_CREATE_QUERY, SDS_LOCK_TIMEOUT));
  1999. if (!connection)
  2000. throw makeStringExceptionV(WUERR_LockFailed, "Failed to get connection for xpath %s", wuRoot.str());
  2001. }
  2002. class CCassandraWorkUnit : public CPersistedWorkUnit
  2003. {
  2004. public:
  2005. CCassandraWorkUnit(ICassandraSession *_sessionCache, IPTree *wuXML, ISecManager *secmgr, ISecUser *secuser, IRemoteConnection *_daliLock, bool _allDirty)
  2006. : sessionCache(_sessionCache), CPersistedWorkUnit(secmgr, secuser), daliLock(_daliLock), allDirty(_allDirty)
  2007. {
  2008. CPersistedWorkUnit::loadPTree(wuXML);
  2009. memset(childLoaded, 0, sizeof(childLoaded));
  2010. actionChanged = false;
  2011. stateChanged = false;
  2012. abortDirty = false;
  2013. }
  2014. ~CCassandraWorkUnit()
  2015. {
  2016. }
  2017. virtual void forceReload()
  2018. {
  2019. synchronized sync(locked); // protect locked workunits (uncommitted writes) from reload
  2020. loadPTree(sessionCache->cassandraToWorkunitXML(queryWuid()));
  2021. memset(childLoaded, 0, sizeof(childLoaded));
  2022. allDirty = false;
  2023. actionChanged = false;
  2024. stateChanged = false;
  2025. abortDirty = true;
  2026. }
  2027. void executeBatch(CassandraBatch &batch, const char * what) const
  2028. {
  2029. if (sessionCache->queryTraceLevel() > 1)
  2030. DBGLOG("Executing batch %s", what);
  2031. batch.execute(sessionCache->querySession(), what);
  2032. }
  2033. void executeAsync(CIArrayOf<CassandraStatement> &batch, const char * what) const
  2034. {
  2035. if (sessionCache->queryTraceLevel() > 1)
  2036. DBGLOG("Executing async batch %s (%d elements)", what, batch.length());
  2037. sessionCache->executeAsync(batch, what);
  2038. }
  2039. virtual void cleanupAndDelete(bool deldll, bool deleteOwned, const StringArray *deleteExclusions)
  2040. {
  2041. const char *wuid = queryWuid();
  2042. CPersistedWorkUnit::cleanupAndDelete(deldll, deleteOwned, deleteExclusions);
  2043. // Note we need to gather the information about what secondaries to delete before we delete the parent/children,
  2044. // but we actually do the deletion afterwards
  2045. CIArrayOf<CassandraStatement> deleteSearches;
  2046. deleteSecondaries(wuid, deleteSearches);
  2047. CassandraBatch main(CASS_BATCH_TYPE_UNLOGGED);
  2048. deleteChildren(wuid, main);
  2049. sessionCache->deleteChildByWuid(wuGraphProgressMappings, wuid, main);
  2050. sessionCache->deleteChildByWuid(wuGraphStateMappings, wuid, main);
  2051. sessionCache->deleteChildByWuid(wuGraphRunningMappings, wuid, main);
  2052. // If the partitioning of the main workunits table does not match the partitioning of the other tables, then would be better to
  2053. // execute the deletes of the child tables and the main record as two separate batches.
  2054. CassandraStatement update(sessionCache->prepareStatement("DELETE from workunits where partition=? and wuid=?;"));
  2055. update.bindInt32(0, rtlHash32VStr(wuid, 0) % NUM_PARTITIONS);
  2056. update.bindString(1, wuid);
  2057. check(cass_batch_add_statement(main, update));
  2058. executeBatch(main, "delete wu");
  2059. executeAsync(deleteSearches, "delete wu");
  2060. }
  2061. virtual void commit()
  2062. {
  2063. CPersistedWorkUnit::commit();
  2064. if (sessionCache->queryTraceLevel() >= 8)
  2065. {
  2066. StringBuffer s; toXML(p, s); DBGLOG("CCassandraWorkUnit::commit\n%s", s.str());
  2067. }
  2068. CIArrayOf<CassandraStatement> secondaryBatch;
  2069. CassandraBatch batch(CASS_BATCH_TYPE_UNLOGGED);
  2070. Owned<CassandraBatch> deletesBatch;
  2071. const char *wuid = queryWuid();
  2072. bool isGlobal = streq(wuid, GLOBAL_WORKUNIT);
  2073. if (!isGlobal) // Global workunit only has child rows, no parent
  2074. {
  2075. if (prev) // Holds the values of the "basic" info at the last commit
  2076. updateSecondaries(wuid, secondaryBatch);
  2077. simpleXMLtoCassandra(sessionCache, batch, workunitsMappings, p); // This just does the parent row
  2078. }
  2079. if (allDirty && !isGlobal)
  2080. {
  2081. // MORE - this delete is technically correct, but if we assert that the only place that copyWorkUnit is used is to populate an
  2082. // empty newly-created WU, it is unnecessary.
  2083. // deleteChildren(wuid, deletesBatch);
  2084. // MORE can use the table?
  2085. childXMLtoCassandra(sessionCache, batch, wuGraphsMappings, p, "Graphs/Graph", 0);
  2086. childXMLtoCassandra(sessionCache, batch, wuResultsMappings, p, "Results/Result", "0");
  2087. childXMLtoCassandra(sessionCache, batch, wuVariablesMappings, p, "Variables/Variable", "-1"); // ResultSequenceStored
  2088. childXMLtoCassandra(sessionCache, batch, wuTemporariesMappings, p, "Temporaries/Variable", "-3"); // ResultSequenceInternal // NOTE - lookups may also request ResultSequenceOnce
  2089. childXMLtoCassandra(sessionCache, batch, wuExceptionsMappings, p, "Exceptions/Exception", 0);
  2090. childXMLtoCassandra(sessionCache, batch, wuStatisticsMappings, p, "Statistics/Statistic", 0);
  2091. childXMLtoCassandra(sessionCache, batch, wuFilesReadMappings, p, "FilesRead/File", 0);
  2092. childXMLtoCassandra(sessionCache, batch, wuFilesWrittenMappings, p, "Files/File", 0);
  2093. childXMLtoCassandra(sessionCache, batch, wuFieldUsageMappings, p, "usedsources/datasource", 0);
  2094. IPTree *query = p->queryPropTree("Query");
  2095. if (query)
  2096. childXMLRowtoCassandra(sessionCache, batch, wuQueryMappings, wuid, *query, 0);
  2097. }
  2098. else
  2099. {
  2100. HashIterator iter(dirtyPaths);
  2101. ForEach (iter)
  2102. {
  2103. const char *path = (const char *) iter.query().getKey();
  2104. const CassandraXmlMapping *table = *dirtyPaths.mapToValue(&iter.query());
  2105. if (sessionCache->queryTraceLevel()>2)
  2106. DBGLOG("Updating dirty path %s", path);
  2107. if (*path == '*')
  2108. {
  2109. if (!deletesBatch)
  2110. deletesBatch.setown(new CassandraBatch(CASS_BATCH_TYPE_UNLOGGED));
  2111. sessionCache->deleteChildByWuid(table, wuid, *deletesBatch);
  2112. childXMLtoCassandra(sessionCache, batch, table, p, path+1, 0);
  2113. }
  2114. else
  2115. {
  2116. IPTree *dirty = p->queryPropTree(path);
  2117. if (dirty)
  2118. childXMLRowtoCassandra(sessionCache, batch, table, wuid, *dirty, 0);
  2119. else if (sessionCache->queryTraceLevel())
  2120. {
  2121. StringBuffer xml;
  2122. toXML(p, xml);
  2123. DBGLOG("Missing dirty element %s in %s", path, xml.str());
  2124. }
  2125. }
  2126. }
  2127. ForEachItemIn(d, dirtyResults)
  2128. {
  2129. IWUResult &result = dirtyResults.item(d);
  2130. switch (result.getResultSequence())
  2131. {
  2132. case ResultSequenceStored:
  2133. childXMLRowtoCassandra(sessionCache, batch, wuVariablesMappings, wuid, *result.queryPTree(), "-1");
  2134. break;
  2135. case ResultSequenceInternal:
  2136. case ResultSequenceOnce:
  2137. childXMLRowtoCassandra(sessionCache, batch, wuTemporariesMappings, wuid, *result.queryPTree(), "-3");
  2138. break;
  2139. default:
  2140. childXMLRowtoCassandra(sessionCache, batch, wuResultsMappings, wuid, *result.queryPTree(), "0");
  2141. break;
  2142. }
  2143. }
  2144. }
  2145. if (sessionCache->queryTraceLevel() > 1)
  2146. DBGLOG("Executing commit batches");
  2147. if (deletesBatch)
  2148. {
  2149. CassandraFuture futureBatch(cass_session_execute_batch(sessionCache->querySession(), *deletesBatch));
  2150. futureBatch.wait("commit deletes");
  2151. }
  2152. CassandraFuture futureBatch(cass_session_execute_batch(sessionCache->querySession(), batch));
  2153. futureBatch.wait("commit updates");
  2154. executeAsync(secondaryBatch, "commit");
  2155. if (stateChanged)
  2156. {
  2157. // Signal changes to state to anyone that might be watching via Dali
  2158. VStringBuffer xpath("/WorkUnitStates/%s/State", wuid);
  2159. Owned<IRemoteConnection> conn = querySDS().connect(xpath.str(), myProcessSession(), RTM_LOCK_WRITE|RTM_CREATE, SDS_LOCK_TIMEOUT);
  2160. conn->queryRoot()->setProp(NULL, p->queryProp("@state"));
  2161. }
  2162. if (actionChanged)
  2163. {
  2164. // Signal changes to action to anyone that might be watching via Dali
  2165. VStringBuffer xpath("/WorkUnitStates/%s/Action", wuid);
  2166. Owned<IRemoteConnection> conn = querySDS().connect(xpath.str(), myProcessSession(), RTM_LOCK_WRITE|RTM_CREATE, SDS_LOCK_TIMEOUT);
  2167. conn->queryRoot()->setProp(NULL, p->queryProp("Action"));
  2168. }
  2169. prev.clear();
  2170. allDirty = false;
  2171. stateChanged = false;
  2172. actionChanged = false;
  2173. dirtyPaths.kill();
  2174. dirtyResults.kill();
  2175. }
  2176. virtual IConstWUGraph *getGraph(const char *qname) const
  2177. {
  2178. // Just because we read one graph, does not mean we are likely to read more. So don't cache this result.
  2179. // Also note that graphs are generally read-only
  2180. CassandraResult result(sessionCache->fetchDataForWuidAndKey(wuGraphsMappings, queryWuid(), qname));
  2181. const CassRow *row = cass_result_first_row(result);
  2182. if (row)
  2183. {
  2184. Owned<IPTree> graph = createPTree("Graph");
  2185. unsigned colidx = 2; // We did not fetch wuid or partition
  2186. CassandraIterator cols(cass_iterator_from_row(row));
  2187. while (cass_iterator_next(cols))
  2188. {
  2189. assertex(wuGraphsMappings[colidx].columnName);
  2190. const CassValue *value = cass_iterator_get_column(cols);
  2191. if (value && !cass_value_is_null(value))
  2192. wuGraphsMappings[colidx].mapper.toXML(graph, wuGraphsMappings[colidx].xpath, value);
  2193. colidx++;
  2194. }
  2195. return new CLocalWUGraph(*this, graph.getClear());
  2196. }
  2197. else
  2198. return NULL;
  2199. }
  2200. virtual unsigned getResultCount() const
  2201. {
  2202. return childCount(sessionCache, wuResultsMappings, queryWuid());
  2203. }
  2204. virtual unsigned getGraphCount() const
  2205. {
  2206. return childCount(sessionCache, wuGraphsMappings, queryWuid());
  2207. }
  2208. virtual unsigned getSourceFileCount() const
  2209. {
  2210. return childCount(sessionCache, wuFilesReadMappings, queryWuid());
  2211. }
  2212. virtual unsigned getVariableCount() const
  2213. {
  2214. return childCount(sessionCache, wuVariablesMappings, queryWuid());
  2215. }
  2216. virtual void setUser(const char *user)
  2217. {
  2218. if (trackSecondaryChange(user, "@submitID"))
  2219. CPersistedWorkUnit::setUser(user);
  2220. }
  2221. virtual void setClusterName(const char *cluster)
  2222. {
  2223. if (trackSecondaryChange(cluster, "@clusterName"))
  2224. CPersistedWorkUnit::setClusterName(cluster);
  2225. }
  2226. virtual void setJobName(const char *jobname)
  2227. {
  2228. if (trackSecondaryChange(jobname, "@jobName"))
  2229. CPersistedWorkUnit::setJobName(jobname);
  2230. }
  2231. virtual void setState(WUState state)
  2232. {
  2233. if (trackSecondaryChange(getWorkunitStateStr(state), "@state"))
  2234. {
  2235. stateChanged = true;
  2236. CPersistedWorkUnit::setState(state);
  2237. }
  2238. }
  2239. virtual void setAction(WUAction action)
  2240. {
  2241. actionChanged = true;
  2242. CPersistedWorkUnit::setAction(action);
  2243. }
  2244. virtual void setApplicationValue(const char *app, const char *propname, const char *value, bool overwrite)
  2245. {
  2246. VStringBuffer xpath("Application/%s/%s", app, propname);
  2247. if (trackSecondaryChange(value, xpath))
  2248. CPersistedWorkUnit::setApplicationValue(app, propname, value, overwrite);
  2249. }
  2250. virtual void _lockRemote()
  2251. {
  2252. lockWuid(daliLock, queryWuid());
  2253. }
  2254. virtual void _unlockRemote()
  2255. {
  2256. commit();
  2257. if (daliLock)
  2258. {
  2259. daliLock->close(true);
  2260. daliLock.clear();
  2261. }
  2262. }
  2263. virtual void createGraph(const char * name, const char *label, WUGraphType type, IPropertyTree *xgmml)
  2264. {
  2265. CPersistedWorkUnit::createGraph(name, label, type, xgmml);
  2266. VStringBuffer xpath("Graphs/Graph[@name='%s']", name);
  2267. noteDirty(xpath, wuGraphsMappings);
  2268. }
  2269. virtual IWUResult * updateResultByName(const char * name)
  2270. {
  2271. return noteDirty(CPersistedWorkUnit::updateResultByName(name));
  2272. }
  2273. virtual IWUResult * updateResultBySequence(unsigned seq)
  2274. {
  2275. return noteDirty(CPersistedWorkUnit::updateResultBySequence(seq));
  2276. }
  2277. virtual IWUResult * updateTemporaryByName(const char * name)
  2278. {
  2279. return noteDirty(CPersistedWorkUnit::updateTemporaryByName(name));
  2280. }
  2281. virtual IWUResult * updateVariableByName(const char * name)
  2282. {
  2283. return noteDirty(CPersistedWorkUnit::updateVariableByName(name));
  2284. }
  2285. virtual IWUQuery * updateQuery()
  2286. {
  2287. noteDirty("Query", wuQueryMappings);
  2288. return CPersistedWorkUnit::updateQuery();
  2289. }
  2290. virtual IConstWUQuery *getQuery() const
  2291. {
  2292. checkChildLoaded(wuQueriesTable);
  2293. return CPersistedWorkUnit::getQuery();
  2294. }
  2295. virtual IConstWUFileUsageIterator * getFieldUsage() const
  2296. {
  2297. checkChildLoaded(wuFieldUsageTable);
  2298. return CPersistedWorkUnit::getFieldUsage();
  2299. }
  2300. virtual IWUException *createException()
  2301. {
  2302. IWUException *result = CPersistedWorkUnit::createException();
  2303. VStringBuffer xpath("Exceptions/Exception[@sequence='%d']", result->getSequence());
  2304. noteDirty(xpath, wuExceptionsMappings);
  2305. return result;
  2306. }
  2307. virtual void copyWorkUnit(IConstWorkUnit *cached, bool all)
  2308. {
  2309. // Make sure that any required updates to the secondary files happen
  2310. IPropertyTree *fromP = queryExtendedWU(cached)->queryPTree();
  2311. for (const char * const *search = searchPaths; *search; search++)
  2312. trackSecondaryChange(fromP->queryProp(*search), *search);
  2313. for (const ChildTableInfo * const * table = childTables; *table != NULL; table++)
  2314. checkChildLoaded(**table);
  2315. CPersistedWorkUnit::copyWorkUnit(cached, all);
  2316. memset(childLoaded, 1, sizeof(childLoaded));
  2317. allDirty = true;
  2318. actionChanged = true;
  2319. stateChanged = true;
  2320. }
  2321. virtual void noteFileRead(IDistributedFile *file)
  2322. {
  2323. if (file)
  2324. {
  2325. childLoaded[WuFilesReadChild] = true; // Prevent duplicates if someone tries to read back files read (unlikely)
  2326. CPersistedWorkUnit::noteFileRead(file);
  2327. VStringBuffer xpath("FilesRead/File[@name='%s']", file->queryLogicalName());
  2328. noteDirty(xpath, wuFilesReadMappings);
  2329. }
  2330. else
  2331. {
  2332. // A hack for testing!
  2333. Owned<IPropertyTreeIterator> files = p->getElements("FilesRead/File");
  2334. ForEach(*files)
  2335. {
  2336. VStringBuffer xpath("FilesRead/File[@name='%s']", files->query().queryProp("@name"));
  2337. noteDirty(xpath, wuFilesReadMappings);
  2338. }
  2339. }
  2340. }
  2341. virtual void addFile(const char *fileName, StringArray *clusters, unsigned usageCount, WUFileKind fileKind, const char *graphOwner)
  2342. {
  2343. if (fileName)
  2344. {
  2345. childLoaded[WuFilesWrittenChild] = true; // Prevent duplicates if someone tries to read back files written from same object (unlikely)
  2346. CPersistedWorkUnit::addFile(fileName, clusters, usageCount, fileKind, graphOwner);
  2347. VStringBuffer xpath("Files/File[@name='%s']", fileName);
  2348. noteDirty(xpath, wuFilesWrittenMappings);
  2349. }
  2350. }
  2351. virtual void clearGraphProgress() const
  2352. {
  2353. const char *wuid = queryWuid();
  2354. CassandraBatch batch(CASS_BATCH_TYPE_UNLOGGED);
  2355. sessionCache->deleteChildByWuid(wuGraphProgressMappings, wuid, batch);
  2356. sessionCache->deleteChildByWuid(wuGraphStateMappings, wuid, batch);
  2357. sessionCache->deleteChildByWuid(wuGraphRunningMappings, wuid, batch);
  2358. executeBatch(batch, "clearGraphProgress");
  2359. }
  2360. virtual bool getRunningGraph(IStringVal &graphName, WUGraphIDType &subId) const
  2361. {
  2362. CassandraStatement statement(sessionCache->prepareStatement("SELECT graphID, subgraphID FROM wuGraphRunning where partition=? and wuid=?;"));
  2363. const char *wuid = queryWuid();
  2364. statement.bindInt32(0, rtlHash32VStr(wuid, 0) % NUM_PARTITIONS);
  2365. statement.bindString(1, wuid);
  2366. CassandraFuture future(cass_session_execute(sessionCache->querySession(), statement));
  2367. future.wait("getRunningGraph");
  2368. CassandraResult result(cass_future_get_result(future));
  2369. if (cass_result_row_count(result))
  2370. {
  2371. const CassRow *row = cass_result_first_row(result);
  2372. assertex(row);
  2373. StringBuffer b;
  2374. getCassString(b, cass_row_get_column(row, 0));
  2375. graphName.set(b);
  2376. subId = getUnsignedResult(NULL, cass_row_get_column(row, 1));
  2377. return true;
  2378. }
  2379. else
  2380. return false;
  2381. }
  2382. virtual IConstWUGraphProgress *getGraphProgress(const char *graphName) const
  2383. {
  2384. CassandraStatement statement(sessionCache->prepareStatement("SELECT subgraphID, creator, progress FROM wuGraphProgress where partition=? and wuid=? and graphID=?;"));
  2385. const char *wuid = queryWuid();
  2386. statement.bindInt32(0, rtlHash32VStr(wuid, 0) % NUM_PARTITIONS);
  2387. statement.bindString(1, wuid);
  2388. statement.bindString(2, graphName);
  2389. CassandraFuture future(cass_session_execute(sessionCache->querySession(), statement));
  2390. future.wait("getGraphProgress");
  2391. CassandraResult result(cass_future_get_result(future));
  2392. CassandraIterator rows(cass_iterator_from_result(result));
  2393. if (!cass_result_row_count(result))
  2394. return NULL;
  2395. Owned<IPropertyTree> progress = createPTree(graphName);
  2396. progress->setPropBool("@stats", true);
  2397. progress->setPropInt("@format", PROGRESS_FORMAT_V);
  2398. while (cass_iterator_next(rows))
  2399. {
  2400. const CassRow *row = cass_iterator_get_row(rows);
  2401. WUGraphIDType subId = getUnsignedResult(NULL, cass_row_get_column(row, 0));
  2402. StringBuffer creator, xml;
  2403. getCassString(creator, cass_row_get_column(row, 1));
  2404. getCassString(xml, cass_row_get_column(row, 2));
  2405. IPTree *stats = createPTreeFromXMLString(xml);
  2406. // We could check that atoi(stats->queryName()+2)==subgraphID, and that stats->queryProp(@creator)==creator)....
  2407. progress->addPropTree(stats->queryName(), stats);
  2408. }
  2409. return createConstGraphProgress(queryWuid(), graphName, progress); // Links progress
  2410. }
  2411. WUGraphState queryGraphState(const char *graphName) const
  2412. {
  2413. return queryNodeState(graphName, 0);
  2414. }
  2415. WUGraphState queryNodeState(const char *graphName, WUGraphIDType nodeId) const
  2416. {
  2417. CassandraStatement statement(sessionCache->prepareStatement("SELECT state FROM wuGraphState where partition=? and wuid=? and graphID=? and subgraphID=?;"));
  2418. const char *wuid = queryWuid();
  2419. statement.bindInt32(0, rtlHash32VStr(wuid, 0) % NUM_PARTITIONS);
  2420. statement.bindString(1, wuid);
  2421. statement.bindString(2, graphName);
  2422. statement.bindInt64(3, nodeId);
  2423. CassandraFuture future(cass_session_execute(sessionCache->querySession(), statement));
  2424. future.wait("queryNodeState");
  2425. CassandraResult result(cass_future_get_result(future));
  2426. if (cass_result_row_count(result))
  2427. return (WUGraphState) getUnsignedResult(NULL, getSingleResult(result));
  2428. else
  2429. return WUGraphUnknown;
  2430. }
  2431. void setGraphState(const char *graphName, WUGraphState state) const
  2432. {
  2433. setNodeState(graphName, 0, state);
  2434. }
  2435. void setNodeState(const char *graphName, WUGraphIDType nodeId, WUGraphState state) const
  2436. {
  2437. CassandraStatement statement(sessionCache->prepareStatement("INSERT INTO wuGraphState (partition, wuid, graphID, subgraphID, state) values (?,?,?,?,?);"));
  2438. const char *wuid = queryWuid();
  2439. statement.bindInt32(0, rtlHash32VStr(wuid, 0) % NUM_PARTITIONS);
  2440. statement.bindString(1, wuid);
  2441. statement.bindString(2, graphName);
  2442. statement.bindInt64(3, nodeId);
  2443. statement.bindInt32(4, (int) state);
  2444. CassandraFuture future(cass_session_execute(sessionCache->querySession(), statement));
  2445. future.wait("setNodeState update state");
  2446. if (nodeId)
  2447. {
  2448. switch (state)
  2449. {
  2450. case WUGraphRunning:
  2451. {
  2452. CassandraStatement statement2(sessionCache->prepareStatement("INSERT INTO wuGraphRunning (partition, wuid, graphID, subgraphID) values (?,?,?,?);"));
  2453. statement2.bindInt32(0, rtlHash32VStr(wuid, 0) % NUM_PARTITIONS);
  2454. statement2.bindString(1, wuid);
  2455. statement2.bindString(2, graphName);
  2456. statement2.bindInt64(3, nodeId);
  2457. CassandraFuture future(cass_session_execute(sessionCache->querySession(), statement2));
  2458. future.wait("setNodeState update running");
  2459. break;
  2460. }
  2461. case WUGraphComplete:
  2462. {
  2463. CassandraStatement statement3(sessionCache->prepareStatement("DELETE FROM wuGraphRunning where partition=? and wuid=?;"));
  2464. statement3.bindInt32(0, rtlHash32VStr(wuid, 0) % NUM_PARTITIONS);
  2465. statement3.bindString(1, wuid);
  2466. CassandraFuture future(cass_session_execute(sessionCache->querySession(), statement3));
  2467. future.wait("setNodeState remove running");
  2468. break;
  2469. }
  2470. }
  2471. }
  2472. }
  2473. class CCassandraWuGraphStats : public CWuGraphStats
  2474. {
  2475. public:
  2476. CCassandraWuGraphStats(const CCassandraWorkUnit *_parent, StatisticCreatorType _creatorType, const char * _creator, const char * _rootScope, unsigned _id)
  2477. : CWuGraphStats(createPTree(_rootScope), _creatorType, _creator, _rootScope, _id),
  2478. parent(_parent)
  2479. {
  2480. }
  2481. virtual void beforeDispose()
  2482. {
  2483. CWuGraphStats::beforeDispose(); // Sets up progress - should contain a single child tree sqNN where nn==id
  2484. parent->setGraphProgress(progress, progress->queryName(), id, creator);
  2485. }
  2486. protected:
  2487. Linked<const CCassandraWorkUnit> parent;
  2488. StringAttr wuid;
  2489. };
  2490. IWUGraphStats *updateStats(const char *graphName, StatisticCreatorType creatorType, const char * creator, unsigned subgraph) const
  2491. {
  2492. return new CCassandraWuGraphStats(this, creatorType, creator, graphName, subgraph);
  2493. }
  2494. virtual void _loadFilesRead() const
  2495. {
  2496. checkChildLoaded(wuFilesReadTable); // Lazy populate the FilesRead branch of p from Cassandra
  2497. CPersistedWorkUnit::_loadFilesRead();
  2498. }
  2499. virtual void _loadFilesWritten() const
  2500. {
  2501. checkChildLoaded(wuFilesWrittenTable); // Lazy populate the Files branch of p from Cassandra
  2502. CPersistedWorkUnit::_loadFilesWritten();
  2503. }
  2504. virtual void _loadResults() const
  2505. {
  2506. checkChildLoaded(wuResultsTable); // Lazy populate the Results branch of p from Cassandra
  2507. CPersistedWorkUnit::_loadResults();
  2508. }
  2509. virtual void _loadGraphs(bool heavy) const
  2510. {
  2511. // Lazy populate the Graphs branch of p from Cassandra
  2512. if (heavy)
  2513. {
  2514. // If we loaded light before, and are now loading heavy, we need to force the reload. Unlikely to happen in practice.
  2515. if (graphsCached==1)
  2516. {
  2517. p->removeProp("Graphs");
  2518. childLoaded[WuGraphsChild] = false;
  2519. }
  2520. checkChildLoaded(wuGraphsTable);
  2521. }
  2522. else
  2523. {
  2524. checkChildLoaded(wuGraphMetasTable);
  2525. }
  2526. CPersistedWorkUnit::_loadGraphs(heavy);
  2527. }
  2528. virtual void _loadVariables() const
  2529. {
  2530. checkChildLoaded(wuVariablesTable); // Lazy populate the Variables branch of p from Cassandra
  2531. CPersistedWorkUnit::_loadVariables();
  2532. }
  2533. virtual void _loadTemporaries() const
  2534. {
  2535. checkChildLoaded(wuTemporariesTable); // Lazy populate the Temporaries branch of p from Cassandra
  2536. CPersistedWorkUnit::_loadTemporaries();
  2537. }
  2538. virtual void _loadStatistics() const
  2539. {
  2540. checkChildLoaded(wuStatisticsTable); // Lazy populate the Statistics branch of p from Cassandra
  2541. CPersistedWorkUnit::_loadStatistics();
  2542. }
  2543. virtual void _loadExceptions() const
  2544. {
  2545. checkChildLoaded(wuExceptionsTable); // Lazy populate the Exceptions branch of p from Cassandra
  2546. CPersistedWorkUnit::_loadExceptions();
  2547. }
  2548. virtual void clearExceptions()
  2549. {
  2550. CriticalBlock b(crit);
  2551. noteDirty("*Exceptions/Exception", wuExceptionsMappings);
  2552. CPersistedWorkUnit::clearExceptions();
  2553. }
  2554. virtual IPropertyTree *getUnpackedTree(bool includeProgress) const
  2555. {
  2556. // If anyone wants the whole ptree, we'd better make sure we have fully loaded it...
  2557. CriticalBlock b(crit);
  2558. for (const ChildTableInfo * const * table = childTables; *table != NULL; table++)
  2559. checkChildLoaded(**table);
  2560. return CPersistedWorkUnit::getUnpackedTree(includeProgress);
  2561. }
  2562. virtual IPropertyTree *queryPTree() const
  2563. {
  2564. // If anyone wants the whole ptree, we'd better make sure we have fully loaded it...
  2565. CriticalBlock b(crit);
  2566. for (const ChildTableInfo * const * table = childTables; *table != NULL; table++)
  2567. checkChildLoaded(**table);
  2568. // And a hack for the fact that Dali stores state in both @state and <state>
  2569. const char *stateStr = p->queryProp("@state");
  2570. if (stateStr)
  2571. p->setProp("State", stateStr);
  2572. return p;
  2573. }
  2574. void setGraphProgress(IPropertyTree *progress, const char *gid, unsigned subid, const char *creator) const
  2575. {
  2576. const char *wuid=queryWuid();
  2577. CassandraStatement statement(sessionCache->prepareStatement("INSERT INTO wuGraphProgress (partition, wuid, graphID, subgraphID, creator, progress) values (?,?,?,?,?,?);"));
  2578. statement.bindInt32(0, rtlHash32VStr(wuid, 0) % NUM_PARTITIONS);
  2579. statement.bindString(1, wuid);
  2580. statement.bindString(2, gid);
  2581. statement.bindInt64(3, subid);
  2582. statement.bindString(4, creator);
  2583. StringBuffer tag;
  2584. tag.append("sg").append(subid);
  2585. IPTree *sq = progress->queryPropTree(tag);
  2586. assertex(sq);
  2587. StringBuffer xml;
  2588. toXML(sq, xml);
  2589. statement.bindString(5, xml);
  2590. CassandraFuture future(cass_session_execute(sessionCache->querySession(), statement));
  2591. future.wait("update stats");
  2592. }
  2593. virtual IPropertyTree *getGraphProgressTree() const
  2594. {
  2595. CassandraStatement graphQuery(sessionCache->prepareStatement("SELECT graphId, subgraphID, creator, progress FROM wuGraphProgress where partition=? and wuid=?;"));
  2596. const char *wuid = queryWuid();
  2597. graphQuery.bindInt32(0, rtlHash32VStr(wuid, 0) % NUM_PARTITIONS);
  2598. graphQuery.bindString(1, wuid);
  2599. CassandraFuture future(cass_session_execute(sessionCache->querySession(), graphQuery));
  2600. future.wait("getGraphProgress");
  2601. CassandraResult result(cass_future_get_result(future));
  2602. if (!cass_result_row_count(result))
  2603. return NULL;
  2604. Owned<IPTree> progress = createPTree("GraphProgress");
  2605. CassandraIterator rows(cass_iterator_from_result(result));
  2606. while (cass_iterator_next(rows))
  2607. {
  2608. const CassRow *row = cass_iterator_get_row(rows);
  2609. StringBuffer graphName, creator, xml;
  2610. getCassString(graphName, cass_row_get_column(row, 0));
  2611. WUGraphIDType subId = getUnsignedResult(NULL, cass_row_get_column(row, 1));
  2612. getCassString(creator, cass_row_get_column(row, 2));
  2613. getCassString(xml, cass_row_get_column(row, 3));
  2614. if (!progress->hasProp(graphName))
  2615. progress->setPropTree(graphName, createPTree(graphName));
  2616. IPTree *graph = progress->queryPropTree(graphName);
  2617. graph->setPropBool("@stats", true);
  2618. graph->setPropInt("@format", PROGRESS_FORMAT_V);
  2619. IPTree *stats = createPTreeFromXMLString(xml);
  2620. // We could check that atoi(stats->queryName()+2)==subgraphID, and that stats->queryProp(@creator)==creator)....
  2621. graph->addPropTree(stats->queryName(), stats);
  2622. }
  2623. // Now fill in the graph/node states
  2624. CassandraStatement stateQuery(sessionCache->prepareStatement("SELECT graphId, subgraphId, state FROM wuGraphState where partition=? and wuid=?;"));
  2625. stateQuery.bindInt32(0, rtlHash32VStr(wuid, 0) % NUM_PARTITIONS);
  2626. stateQuery.bindString(1, wuid);
  2627. CassandraFuture stateFuture(cass_session_execute(sessionCache->querySession(), stateQuery));
  2628. stateFuture.wait("getGraphStateProgress");
  2629. CassandraResult stateResult(cass_future_get_result(stateFuture));
  2630. CassandraIterator stateRows(cass_iterator_from_result(stateResult));
  2631. if (cass_result_row_count(stateResult))
  2632. {
  2633. CassandraIterator stateRows(cass_iterator_from_result(stateResult));
  2634. while (cass_iterator_next(stateRows))
  2635. {
  2636. const CassRow *row = cass_iterator_get_row(stateRows);
  2637. StringBuffer graphName;
  2638. getCassString(graphName, cass_row_get_column(row, 0));
  2639. WUGraphIDType subId = getUnsignedResult(NULL, cass_row_get_column(row, 1));
  2640. unsigned state = getUnsignedResult(NULL, cass_row_get_column(row, 2));
  2641. IPTree *node = progress->queryPropTree(graphName);
  2642. if (node)
  2643. {
  2644. if (subId)
  2645. {
  2646. // This is what you might expect it to say...
  2647. //StringBuffer sg("sg");
  2648. //sg.append(subId);
  2649. //node = node->queryPropTree(sg);
  2650. // but in fact the node states are stored in separate elements. I need to see if that is something I broke.
  2651. StringBuffer xpath("node[@id='");
  2652. xpath.append(subId).append("'])");
  2653. node->removeProp(xpath); // Shouldn't be one, just playing safe
  2654. node = node->addPropTree("node", createPTree("node"));
  2655. node->setPropInt("@id", subId);
  2656. node->setPropInt("@_state", state);
  2657. }
  2658. else
  2659. node->setPropInt("@_state", state);
  2660. }
  2661. }
  2662. }
  2663. return progress.getClear();
  2664. }
  2665. protected:
  2666. // Delete child table rows
  2667. void deleteChildren(const char *wuid, CassBatch *useBatch)
  2668. {
  2669. for (const ChildTableInfo * const * table = childTables; *table != NULL; table++)
  2670. sessionCache->deleteChildByWuid(table[0]->mappings, wuid, useBatch);
  2671. }
  2672. // Lazy-populate a portion of WU xml from a child table
  2673. void checkChildLoaded(const ChildTableInfo &childTable) const
  2674. {
  2675. // NOTE - should be called inside critsec
  2676. if (!childLoaded[childTable.index])
  2677. {
  2678. const CassResult* cassResult;
  2679. try
  2680. {
  2681. cassResult = sessionCache->fetchDataForWuid(childTable.mappings, queryWuid(), false);
  2682. }
  2683. catch (IException* e)
  2684. {
  2685. int errorCode = e->errorCode();
  2686. StringBuffer origErrorMsg;
  2687. e->errorMessage(origErrorMsg);
  2688. e->Release();
  2689. const char* tableName = queryTableName(childTable.mappings);
  2690. VStringBuffer newErrorMsg("Failed to read from cassandra table '%s' (Have you run wutool to initialize cassandra repository?), [%s]", tableName, origErrorMsg.str());
  2691. rtlFail(errorCode, newErrorMsg);
  2692. }
  2693. CassandraResult result(cassResult);
  2694. IPTree *results = p->queryPropTree(childTable.parentElement);
  2695. CassandraIterator rows(cass_iterator_from_result(result));
  2696. while (cass_iterator_next(rows))
  2697. {
  2698. CassandraIterator cols(cass_iterator_from_row(cass_iterator_get_row(rows)));
  2699. Owned<IPTree> child;
  2700. if (!results)
  2701. results = ensurePTree(p, childTable.parentElement);
  2702. if (childTable.childElement)
  2703. child.setown(createPTree(childTable.childElement));
  2704. else
  2705. child.set(results);
  2706. unsigned colidx = 2; // We did not fetch wuid or partition
  2707. while (cass_iterator_next(cols))
  2708. {
  2709. assertex(childTable.mappings[colidx].columnName);
  2710. const CassValue *value = cass_iterator_get_column(cols);
  2711. if (value && !cass_value_is_null(value))
  2712. childTable.mappings[colidx].mapper.toXML(child, childTable.mappings[colidx].xpath, value);
  2713. colidx++;
  2714. }
  2715. if (childTable.childElement)
  2716. {
  2717. const char *childName = child->queryName();
  2718. results->addPropTree(childName, child.getClear());
  2719. }
  2720. }
  2721. childLoaded[childTable.index] = true;
  2722. }
  2723. }
  2724. // Update secondary tables (used to search wuids by owner, state, jobname etc)
  2725. void updateSecondaryTable(const char *xpath, const char *prevKey, const char *wuid, CIArrayOf<CassandraStatement> &batch)
  2726. {
  2727. if (prevKey && *prevKey)
  2728. deleteSecondaryByKey(xpath, prevKey, wuid, sessionCache, batch);
  2729. const char *value = p->queryProp(xpath);
  2730. if (value && *value)
  2731. simpleXMLtoCassandra(sessionCache, batch, searchMappings, p, xpath);
  2732. }
  2733. void deleteAppSecondaries(IPTree &pt, const char *wuid, CIArrayOf<CassandraStatement> &batch)
  2734. {
  2735. Owned<IPTreeIterator> apps = pt.getElements("Application");
  2736. ForEach(*apps)
  2737. {
  2738. IPTree &app = apps->query();
  2739. Owned<IPTreeIterator> names = app.getElements("*");
  2740. ForEach(*names)
  2741. {
  2742. IPTree &name = names->query();
  2743. Owned<IPTreeIterator> values = name.getElements("*");
  2744. ForEach(*values)
  2745. {
  2746. IPTree &value = values->query();
  2747. const char *appValue = value.queryProp(".");
  2748. if (appValue && *appValue)
  2749. {
  2750. VStringBuffer xpath("%s/%s/%s", app.queryName(), name.queryName(), value.queryName());
  2751. deleteSecondaryByKey(xpath, appValue, wuid, sessionCache, batch);
  2752. }
  2753. }
  2754. }
  2755. }
  2756. }
  2757. void deleteSecondaries(const char *wuid, CIArrayOf<CassandraStatement> &batch)
  2758. {
  2759. for (const char * const *search = searchPaths; *search; search++)
  2760. deleteSecondaryByKey(*search, p->queryProp(*search), wuid, sessionCache, batch);
  2761. deleteAppSecondaries(*p, wuid, batch);
  2762. Owned<IPropertyTreeIterator> filesRead = &getFilesReadIterator();
  2763. ForEach(*filesRead)
  2764. {
  2765. deleteFileSearch(sessionCache, batch, filesRead->query().queryProp("@name"), true, wuid);
  2766. }
  2767. Owned<IPropertyTreeIterator> filesWritten = &getFileIterator();
  2768. ForEach(*filesWritten)
  2769. {
  2770. deleteFileSearch(sessionCache, batch, filesWritten->query().queryProp("@name"), false, wuid);
  2771. }
  2772. }
  2773. void updateSecondaries(const char *wuid, CIArrayOf<CassandraStatement> &batch)
  2774. {
  2775. const char * const *search;
  2776. for (search = searchPaths; *search; search++)
  2777. updateSecondaryTable(*search, prev->queryProp(*search), wuid, batch);
  2778. for (search = wildSearchPaths; *search; search++)
  2779. {
  2780. const char *value = p->queryProp(*search);
  2781. if (value && *value)
  2782. addUniqueValue(sessionCache, batch, *search, value);
  2783. }
  2784. deleteAppSecondaries(*prev, wuid, batch);
  2785. Owned<IConstWUAppValueIterator> appValues = &getApplicationValues();
  2786. ForEach(*appValues)
  2787. {
  2788. IConstWUAppValue& val=appValues->query();
  2789. addUniqueValue(sessionCache, batch, "Application", val.queryApplication()); // Used to populate droplists of applications
  2790. VStringBuffer key("@@%s", val.queryApplication());
  2791. addUniqueValue(sessionCache, batch, key, val.queryName()); // Used to populate droplists of value names for a given application
  2792. VStringBuffer xpath("Application/%s/%s", val.queryApplication(), val.queryName());
  2793. addUniqueValue(sessionCache, batch, xpath, val.queryValue()); // Used to get lists of values for a given app and name, and for filtering
  2794. simpleXMLtoCassandra(sessionCache, batch, searchMappings, p, xpath);
  2795. }
  2796. Owned<IPropertyTreeIterator> filesRead = &getFilesReadIterator();
  2797. ForEach(*filesRead)
  2798. {
  2799. addFileSearch(sessionCache, batch, filesRead->query().queryProp("@name"), true, wuid);
  2800. }
  2801. Owned<IPropertyTreeIterator> filesWritten = &getFileIterator();
  2802. ForEach(*filesWritten)
  2803. {
  2804. addFileSearch(sessionCache, batch, filesWritten->query().queryProp("@name"), false, wuid);
  2805. }
  2806. }
  2807. // Keep track of previously committed values for fields that we have a secondary table for, so that we can update them appropriately when we commit
  2808. bool trackSecondaryChange(const char *newval, const char *xpath)
  2809. {
  2810. if (!newval)
  2811. newval = "";
  2812. const char *oldval = p->queryProp(xpath);
  2813. if (!oldval)
  2814. oldval = "";
  2815. if (streq(newval, oldval))
  2816. return false; // No change
  2817. bool add;
  2818. if (!prev)
  2819. {
  2820. prev.setown(createPTree());
  2821. add = true;
  2822. }
  2823. else add = !prev->hasProp(xpath);
  2824. if (add)
  2825. {
  2826. const char *tailptr = strrchr(xpath, '/');
  2827. if (tailptr)
  2828. {
  2829. StringBuffer head(tailptr-xpath, xpath);
  2830. ensurePTree(prev, head)->setProp(tailptr+1, oldval);
  2831. }
  2832. else
  2833. prev->setProp(xpath, oldval);
  2834. }
  2835. return true;
  2836. }
  2837. IWUResult *noteDirty(IWUResult *result)
  2838. {
  2839. if (result)
  2840. dirtyResults.append(*LINK(result));
  2841. return result;
  2842. }
  2843. void noteDirty(const char *xpath, const CassandraXmlMapping *table)
  2844. {
  2845. dirtyPaths.setValue(xpath, table);
  2846. }
  2847. Linked<const ICassandraSession> sessionCache;
  2848. mutable bool childLoaded[ChildTablesSize];
  2849. bool allDirty;
  2850. bool stateChanged;
  2851. bool actionChanged;
  2852. Owned<IPTree> prev;
  2853. MapStringTo<const CassandraXmlMapping *> dirtyPaths;
  2854. IArrayOf<IWUResult> dirtyResults;
  2855. Owned<IRemoteConnection> daliLock; // We still use dali for locking
  2856. };
  2857. class CCassandraWorkUnitWatcher : public CWorkUnitWatcher
  2858. {
  2859. public:
  2860. CCassandraWorkUnitWatcher(IWorkUnitSubscriber *_subscriber, WUSubscribeOptions flags, const char *wuid)
  2861. : CWorkUnitWatcher(_subscriber, (WUSubscribeOptions) (flags & SubscribeOptionAbort), wuid)
  2862. {
  2863. if (flags & SubscribeOptionState)
  2864. {
  2865. VStringBuffer xpath("/WorkUnitStates/%s/State", wuid);
  2866. stateId = querySDS().subscribe(xpath.str(), *this);
  2867. }
  2868. if (flags & SubscribeOptionAction)
  2869. {
  2870. VStringBuffer xpath("/WorkUnitStates/%s/Action", wuid);
  2871. actionId = querySDS().subscribe(xpath.str(), *this);
  2872. }
  2873. }
  2874. };
  2875. class CCasssandraWorkUnitFactory : public CWorkUnitFactory, implements ICassandraSession
  2876. {
  2877. IMPLEMENT_IINTERFACE;
  2878. public:
  2879. CCasssandraWorkUnitFactory(const SharedObject *_dll, const IPropertyTree *props) : cluster(cass_cluster_new()), randomizeSuffix(0), randState((unsigned) get_cycles_now()), cacheRetirer(*this)
  2880. {
  2881. StringArray options;
  2882. options.append("write_bytes_high_water_mark=1000000"); // Set the default HWM - workunits get big. This can be overridden by supplied options
  2883. Owned<IPTreeIterator> it = props->getElements("Option");
  2884. ForEach(*it)
  2885. {
  2886. IPTree &item = it->query();
  2887. const char *opt = item.queryProp("@name");
  2888. const char *val = item.queryProp("@value");
  2889. if (opt && val)
  2890. {
  2891. if (strieq(opt, "randomWuidSuffix"))
  2892. randomizeSuffix = atoi(val);
  2893. else if (strieq(opt, "traceLevel"))
  2894. traceLevel = atoi(val);
  2895. else
  2896. {
  2897. VStringBuffer optstr("%s=%s", opt, val);
  2898. options.append(optstr);
  2899. }
  2900. }
  2901. }
  2902. cluster.setOptions(options);
  2903. if (!cluster.queryKeySpace())
  2904. cluster.setKeySpace("hpcc");
  2905. try
  2906. {
  2907. cluster.connect();
  2908. Owned<IPTree> versionInfo = getVersionInfo();
  2909. if (versionInfo)
  2910. {
  2911. int major = versionInfo->getPropInt("@major", 0);
  2912. int minor = versionInfo->getPropInt("@minor", 0);
  2913. if (major && minor)
  2914. {
  2915. // Note that if there is no version info at all, we have to assume that the repository is not yet created. We don't fail, otherwise no-one can call createRepository the first time...
  2916. if (major != majorVersion)
  2917. throw makeStringExceptionV(WUERR_WorkunitVersionMismatch, "Incompatible workunit repository version (wanted %d.%d, found %d.%d)", majorVersion, minorVersion, major, minor);
  2918. if (minor != minorVersion)
  2919. {
  2920. if (minor < minorVersion)
  2921. {
  2922. DBGLOG("WARNING: repository version %d.%d is older than current version %d.%d - adding required columns", major, minor, majorVersion, minorVersion);
  2923. switch (minor)
  2924. {
  2925. case 1:
  2926. executeSimpleCommand(querySession(), "ALTER TABLE wuresults ADD graph text;");
  2927. executeSimpleCommand(querySession(), "ALTER TABLE wuresults ADD activity int;");
  2928. executeSimpleCommand(querySession(), "ALTER TABLE wuvariables ADD graph text;");
  2929. executeSimpleCommand(querySession(), "ALTER TABLE wuvariables ADD activity int;");
  2930. executeSimpleCommand(querySession(), "ALTER TABLE wutemporaries ADD graph text;");
  2931. executeSimpleCommand(querySession(), "ALTER TABLE wutemporaries ADD activity int;");
  2932. break;
  2933. }
  2934. createVersionTable(true);
  2935. }
  2936. else
  2937. DBGLOG("WARNING: repository version %d.%d is newer than current version %d.%d - some columns will not be updated", major, minor, majorVersion, minorVersion);
  2938. }
  2939. }
  2940. }
  2941. else
  2942. {
  2943. DBGLOG("WARNING: repository version could not be retrieved (repository not yet created?)");
  2944. cluster.disconnect();
  2945. }
  2946. }
  2947. catch (IException *E)
  2948. {
  2949. EXCLOG(E);
  2950. E->Release();
  2951. DBGLOG("WARNING: repository version could not be retrieved (repository not yet created?)");
  2952. }
  2953. cacheRetirer.start();
  2954. LINK(_dll); // Yes, this leaks. Not really sure how to avoid that.
  2955. }
  2956. ~CCasssandraWorkUnitFactory()
  2957. {
  2958. cacheRetirer.stop();
  2959. cacheRetirer.join();
  2960. if (traceLevel)
  2961. DBGLOG("CCasssandraWorkUnitFactory destroyed");
  2962. }
  2963. virtual bool initializeStore()
  2964. {
  2965. createRepository();
  2966. return true;
  2967. }
  2968. virtual IWorkUnitWatcher *getWatcher(IWorkUnitSubscriber *subscriber, WUSubscribeOptions options, const char *wuid) const
  2969. {
  2970. return new CCassandraWorkUnitWatcher(subscriber, options, wuid);
  2971. }
  2972. virtual CLocalWorkUnit* _createWorkUnit(const char *wuid, ISecManager *secmgr, ISecUser *secuser)
  2973. {
  2974. unsigned suffix;
  2975. unsigned suffixLength;
  2976. if (randomizeSuffix) // May need to enable this option if you are expecting to create hundreds of workunits / second
  2977. {
  2978. suffix = rand_r(&randState);
  2979. suffixLength = randomizeSuffix;
  2980. }
  2981. else
  2982. {
  2983. suffix = 0;
  2984. suffixLength = 0;
  2985. }
  2986. Owned<CassandraPrepared> prepared = prepareStatement("INSERT INTO workunits (partition, wuid) VALUES (?,?) IF NOT EXISTS;");
  2987. loop
  2988. {
  2989. // Create a unique WUID by adding suffixes until we managed to add a new value
  2990. StringBuffer useWuid(wuid);
  2991. if (suffix)
  2992. {
  2993. useWuid.append("-");
  2994. for (unsigned i = 0; i < suffixLength; i++)
  2995. {
  2996. useWuid.appendf("%c", '0'+suffix%10);
  2997. suffix /= 10;
  2998. }
  2999. }
  3000. CassandraStatement statement(prepared.getLink());
  3001. statement.bindInt32(0, rtlHash32VStr(useWuid.str(), 0) % NUM_PARTITIONS);
  3002. statement.bindString(1, useWuid.str());
  3003. if (traceLevel >= 2)
  3004. DBGLOG("Try creating %s", useWuid.str());
  3005. CassandraFuture future(cass_session_execute(querySession(), statement));
  3006. future.wait("execute");
  3007. CassandraResult result(cass_future_get_result(future));
  3008. if (cass_result_column_count(result)==1)
  3009. {
  3010. // A single column result indicates success, - the single column should be called '[applied]' and have the value 'true'
  3011. // If there are multiple columns it will be '[applied]' (value false) and the fields of the existing row
  3012. Owned<IPTree> wuXML = createPTree(useWuid);
  3013. wuXML->setProp("@xmlns:xsi", "http://www.w3.org/1999/XMLSchema-instance");
  3014. wuXML->setPropInt("@wuidVersion", WUID_VERSION); // we implement the latest version.
  3015. Owned<IRemoteConnection> daliLock;
  3016. lockWuid(daliLock, useWuid);
  3017. Owned<CLocalWorkUnit> wu = new CCassandraWorkUnit(this, wuXML.getClear(), secmgr, secuser, daliLock.getClear(), false);
  3018. return wu.getClear();
  3019. }
  3020. suffix = rand_r(&randState);
  3021. if (suffixLength<9)
  3022. suffixLength++;
  3023. }
  3024. }
  3025. virtual CLocalWorkUnit* _openWorkUnit(const char *wuid, ISecManager *secmgr, ISecUser *secuser)
  3026. {
  3027. Owned<IPTree> wuXML = cassandraToWorkunitXML(wuid);
  3028. if (wuXML)
  3029. return new CCassandraWorkUnit(this, wuXML.getClear(), secmgr, secuser, NULL, false);
  3030. else
  3031. return NULL;
  3032. }
  3033. virtual CLocalWorkUnit* _updateWorkUnit(const char *wuid, ISecManager *secmgr, ISecUser *secuser)
  3034. {
  3035. // We still use dali for the locks
  3036. Owned<IRemoteConnection> daliLock;
  3037. lockWuid(daliLock, wuid);
  3038. Owned<IPTree> wuXML = cassandraToWorkunitXML(wuid);
  3039. Owned<CLocalWorkUnit> wu = new CCassandraWorkUnit(this, wuXML.getClear(), secmgr, secuser, daliLock.getClear(), false);
  3040. return wu.getClear();
  3041. }
  3042. virtual bool _restoreWorkUnit(IPTree *_pt, const char *wuid)
  3043. {
  3044. Owned<IPTree> pt(_pt);
  3045. try
  3046. {
  3047. Owned<IRemoteConnection> daliLock;
  3048. lockWuid(daliLock, wuid);
  3049. Owned<IPropertyTree> gProgress = pruneBranch(pt, "GraphProgress[1]");
  3050. Owned<CCassandraWorkUnit> wu = new CCassandraWorkUnit(this, pt.getClear(), NULL, NULL, daliLock.getClear(), true);
  3051. if (gProgress)
  3052. {
  3053. Owned<IPTreeIterator> graphs = gProgress->getElements("*");
  3054. ForEach(*graphs)
  3055. {
  3056. IPTree &graph = graphs->query();
  3057. const char *graphName = graph.queryName();
  3058. Owned<IPTreeIterator> subs = graph.getElements("*");
  3059. ForEach(*subs)
  3060. {
  3061. IPTree &sub = subs->query();
  3062. const char *name=sub.queryName();
  3063. if (name[0]=='s' && name[1]=='g')
  3064. {
  3065. wu->setGraphProgress(&graph, graphName, atoi(name+2), sub.queryProp("@creator"));
  3066. }
  3067. else if (streq(name, "node"))
  3068. {
  3069. unsigned subid = sub.getPropInt("@id");
  3070. if (subid)
  3071. {
  3072. if (sub.hasChildren()) // Old format
  3073. wu->setGraphProgress(&sub, graphName, subid, sub.queryProp("@creator"));
  3074. if (sub.hasProp("@_state"))
  3075. wu->setNodeState(graphName, subid, (WUGraphState) sub.getPropInt("@_state"));
  3076. }
  3077. }
  3078. }
  3079. if (graph.hasProp("@_state"))
  3080. wu->setGraphState(graphName, (WUGraphState) graph.getPropInt("@_state"));
  3081. }
  3082. }
  3083. wu->commit();
  3084. return true;
  3085. }
  3086. catch (IException *E)
  3087. {
  3088. EXCLOG(E);
  3089. ::Release(E);
  3090. return false;
  3091. }
  3092. }
  3093. virtual IWorkUnit * getGlobalWorkUnit(ISecManager *secmgr = NULL, ISecUser *secuser = NULL)
  3094. {
  3095. // MORE - should it check security? Dali version never did...
  3096. Owned<IRemoteConnection> daliLock;
  3097. lockWuid(daliLock, GLOBAL_WORKUNIT);
  3098. Owned<IPTree> wuXML = createPTree(GLOBAL_WORKUNIT);
  3099. Owned<CLocalWorkUnit> wu = new CCassandraWorkUnit(this, wuXML.getClear(), NULL, NULL, daliLock.getClear(), false);
  3100. return &wu->lockRemote(false);
  3101. }
  3102. virtual IConstWorkUnitIterator * getWorkUnitsByOwner(const char * owner, ISecManager *secmgr, ISecUser *secuser)
  3103. {
  3104. return getWorkUnitsByXXX("@submitID", owner, secmgr, secuser);
  3105. }
  3106. virtual IConstWorkUnitIterator * getScheduledWorkUnits(ISecManager *secmgr, ISecUser *secuser)
  3107. {
  3108. return getWorkUnitsByXXX("@state", getWorkunitStateStr(WUStateScheduled), secmgr, secuser); // MORE - there may be more efficient ways to do this?
  3109. }
  3110. virtual IConstWorkUnitIterator * getWorkUnitsSorted(WUSortField sortorder, WUSortField * filters, const void * filterbuf,
  3111. unsigned startOffset, unsigned pageSize, __int64 * cachehint, unsigned *total,
  3112. ISecManager *secmgr, ISecUser *secuser)
  3113. {
  3114. // To assist in the efficient implementation of this function without requiring local sorting and filtering,
  3115. // we maintain a couple of additional search tables in addition to the main workunit table.
  3116. //
  3117. // The workunitsSearch table allows us to map from a given field's value to a workunit - to avoid the need
  3118. // for a second lookup this table contains a copy of all the 'lightweight' fields in the workunit. The table
  3119. // has a partition key of xpath, searchPrefix allowing it to be used for range lookups provided at least
  3120. // 2 characters are provided, while hopefully spreading the load a little between Cassandra partitions.
  3121. //
  3122. // The uniqueValues table is used to track what values are present for some wild-searchable fields, so we do
  3123. // two lookups - one to translate the wildcard to a set, then others to retrieve the wus matching each value
  3124. // in the set. These are done as N parallel reads rather than a single query (which might naively be expected
  3125. // to be more efficient) for two reasons. Firstly, we can get them back sorted that way and merge the results
  3126. // on the fly. Secondly, it is actually more efficient, at least in the case when there are multiple Cassandra
  3127. // partitions, since it in-effect cuts out the step of talking to a coordinator node which would talk to
  3128. // multiple other nodes to get the data.
  3129. //
  3130. // We go to some lengths to avoid post-sorting if we can, but any sort order other than by wuid or totalThorTime
  3131. // will post-sort it. If a post-sort is required, we will fetch up to WUID_LOCALSORT_LIMIT rows, - if there are
  3132. // more then we should fail, and the user should be invited to add filters.
  3133. //
  3134. // We can do at most one 'hard' filter, plus a filter on wuid range - anything else will require post-filtering.
  3135. // Most 'wild' searches can only be done with post-filtering, but some can be translated to multiple hard values
  3136. // using the unique values table. In such cases we merge results in the fly to avoid a post-sort if possible
  3137. //
  3138. // Note that Cassandra does not presently support filtering before returning the values except where a
  3139. // key or secondary index is available - even if ALLOW FILTERING is specified. If it did, some of the post-
  3140. // filtering would be better off done at the Cassandra side.
  3141. //
  3142. // We should encourage the UI to present drop-lists of users for filtering, to avoid the use of wildcard
  3143. // searches just because people can't remember the name.
  3144. //
  3145. // Searching by files probably needs to be done differently - a separate table mapping filenames to wuids.
  3146. // This can perhaps be join-merged if other filters are present. This is still TBD at the moment.
  3147. Owned<CCassandraWuUQueryCacheEntry> cached;
  3148. if (cachehint && *cachehint)
  3149. {
  3150. CriticalBlock b(cacheCrit);
  3151. cached.set(cacheIdMap.getValue(*cachehint));
  3152. }
  3153. if (cached)
  3154. cached->touch();
  3155. else
  3156. cached.setown(new CCassandraWuUQueryCacheEntry());
  3157. if (pageSize > INT_MAX)
  3158. pageSize = INT_MAX;
  3159. const WUSortField *thisFilter = filters;
  3160. IArrayOf<IPostFilter> goodFilters;
  3161. IArrayOf<IPostFilter> wuidFilters;
  3162. IArrayOf<IPostFilter> poorFilters;
  3163. IArrayOf<IPostFilter> fileFilters;
  3164. IArrayOf<IPostFilter> remoteWildFilters;
  3165. Owned<IConstWorkUnitIteratorEx> result;
  3166. WUSortField baseSort = (WUSortField) (sortorder & 0xff);
  3167. StringBuffer thorTimeThreshold;
  3168. bool sortByThorTime = (baseSort == WUSFtotalthortime);
  3169. bool needsPostSort = (baseSort != WUSFwuid && baseSort != WUSFtotalthortime);
  3170. bool sortDescending = (sortorder & WUSFreverse) || needsPostSort;
  3171. if (!result)
  3172. {
  3173. Owned<CassMultiIterator> merger = new CassMultiIterator(needsPostSort ? NULL : cached, 0, 0, sortDescending); // We always merge by wuid (except when we merge by thor time... we turn the compare off then to make it an appender)
  3174. if (startOffset)
  3175. {
  3176. StringBuffer startWuid;
  3177. unsigned found = cached->lookupStartRow(startWuid, thorTimeThreshold, startOffset);
  3178. if (found)
  3179. {
  3180. if (!sortByThorTime)
  3181. {
  3182. if (sortDescending)
  3183. startWuid.setCharAt(startWuid.length()-1, startWuid.charAt(startWuid.length()-1)-1); // we want to find the last wuid BEFORE
  3184. else
  3185. startWuid.append('\x21'); // we want to find the first wuid AFTER. This is printable but not going to be in any wuid
  3186. thorTimeThreshold.clear();
  3187. }
  3188. wuidFilters.append(*new PostFilter(sortorder==WUSFwuid ? WUSFwuid : WUSFwuidhigh, startWuid, true));
  3189. startOffset -= found;
  3190. merger->setStartOffset(found);
  3191. }
  3192. }
  3193. const char *fv = (const char *) filterbuf;
  3194. while (thisFilter && *thisFilter)
  3195. {
  3196. WUSortField field = (WUSortField) (*thisFilter & 0xff);
  3197. bool isWild = (*thisFilter & WUSFwild) != 0;
  3198. switch (field)
  3199. {
  3200. case WUSFappvalue:
  3201. {
  3202. assertex(fv);
  3203. const char *name = fv;
  3204. fv = fv + strlen(fv)+1;
  3205. if (isWild)
  3206. {
  3207. StringBuffer s(fv);
  3208. if (s.charAt(s.length()-1)== '*')
  3209. s.remove(s.length()-1, 1);
  3210. if (s.length())
  3211. remoteWildFilters.append(*new AppValuePostFilter(name, s, true)); // Should we allow wild on the app and/or name too? Not at the moment
  3212. }
  3213. else
  3214. goodFilters.append(*new AppValuePostFilter(name, fv, false));
  3215. break;
  3216. }
  3217. case WUSFuser:
  3218. case WUSFcluster:
  3219. case WUSFjob:
  3220. if (isWild)
  3221. {
  3222. StringBuffer s(fv);
  3223. if (s.charAt(s.length()-1)== '*')
  3224. s.remove(s.length()-1, 1);
  3225. if (s.length())
  3226. remoteWildFilters.append(*new PostFilter(field, s, true)); // Trailing-only wildcards can be done remotely
  3227. }
  3228. else
  3229. goodFilters.append(*new PostFilter(field, fv, false));
  3230. break;
  3231. case WUSFstate:
  3232. case WUSFpriority:
  3233. case WUSFprotected:
  3234. // These can't be wild, but are not very good filters
  3235. poorFilters.append(*new PostFilter(field, fv, false));
  3236. break;
  3237. case WUSFwuid: // Acts as wuidLo when specified as a filter
  3238. case WUSFwuidhigh:
  3239. // Wuid filters can be added to good and poor filters, and to remoteWild if they are done via merged sets rather than ranges...
  3240. if (sortByThorTime)
  3241. remoteWildFilters.append(*new PostFilter(field, fv, true));
  3242. else
  3243. mergeFilter(wuidFilters, field, fv);
  3244. break;
  3245. case WUSFfileread:
  3246. case WUSFfilewritten:
  3247. fileFilters.append(*new PostFilter(field, fv, true));
  3248. break;
  3249. case WUSFtotalthortime:
  3250. // This should be treated as a low value - i.e. return only wu's that took longer than the supplied value
  3251. if (thorTimeThreshold.isEmpty()) // If not a continuation
  3252. formatTimeCollatable(thorTimeThreshold, milliToNano(atoi(fv)), false);
  3253. break;
  3254. case WUSFwildwuid:
  3255. // Translate into a range - note that we only support trailing * wildcard.
  3256. if (fv && *fv)
  3257. {
  3258. StringBuffer s(fv);
  3259. if (s.charAt(s.length()-1)== '*')
  3260. s.remove(s.length()-1, 1);
  3261. if (s.length())
  3262. {
  3263. mergeFilter(wuidFilters, WUSFwuid, s);
  3264. s.append('\x7e'); // '~' - higher than anything that should occur in a wuid (but still printable)
  3265. mergeFilter(wuidFilters, WUSFwuidhigh, s);
  3266. }
  3267. }
  3268. break;
  3269. case WUSFecl: // This is different...
  3270. if (isWild)
  3271. merger->addPostFilter(*new PostFilter(field, fv, true)); // Wildcards on ECL are trailing and leading - no way to do remotely
  3272. else
  3273. goodFilters.append(*new PostFilter(field, fv, false)); // A hard filter on exact ecl match is possible but very unlikely
  3274. default:
  3275. UNSUPPORTED("Workunit filter criteria");
  3276. }
  3277. thisFilter++;
  3278. if (fv)
  3279. fv = fv + strlen(fv)+1;
  3280. }
  3281. if (fileFilters.length())
  3282. {
  3283. // We can't postfilter by these - we COULD in some cases do a join between these and some other filtered set
  3284. // but we will leave that as an exercise to the reader. So if there is a fileFilter, read it first, and turn it into a merge set of the resulting wus.
  3285. // MORE read and written are not the same
  3286. assertex(fileFilters.length()==1); // If we supported more there would be a join phase here
  3287. merger->addPostFilters(goodFilters, 0);
  3288. merger->addPostFilters(poorFilters, 0);
  3289. merger->addPostFilters(remoteWildFilters, 0);
  3290. const IPostFilter &fileFilter = fileFilters.item(0);
  3291. CassandraResult wuids(fetchDataForFiles(fileFilter.queryValue(), wuidFilters, fileFilter.queryField()==WUSFfileread));
  3292. CassandraIterator rows(cass_iterator_from_result(wuids));
  3293. StringBuffer value;
  3294. while (cass_iterator_next(rows))
  3295. {
  3296. const CassRow *row = cass_iterator_get_row(rows);
  3297. getCassString(value.clear(), cass_row_get_column(row, 0));
  3298. merger->addResult(*new CassandraResult(fetchDataForWuid(workunitInfoMappings, value, true)));
  3299. }
  3300. }
  3301. else if (sortByThorTime)
  3302. {
  3303. merger->addPostFilters(goodFilters, 0);
  3304. merger->addPostFilters(poorFilters, 0);
  3305. merger->addPostFilters(remoteWildFilters, 0);
  3306. if (wuidFilters.length())
  3307. {
  3308. // We are doing a continuation of a prior search that is sorted by a searchField, which may not be unique
  3309. // We need two queries - one where searchField==startSearchField and wuid > startWuid,
  3310. // and one where searchField > startSearchField. We know that there are no other filters in play (as Cassandra would not support them)
  3311. // though there may be postfilters
  3312. assertex(wuidFilters.length()==1);
  3313. merger->addResult(*new CassandraResult(fetchMoreDataByThorTime(thorTimeThreshold, wuidFilters.item(0).queryValue(), sortDescending, merger->hasPostFilters() ? 0 : pageSize+startOffset)));
  3314. merger->addResult(*new CassandraResult(fetchMoreDataByThorTime(thorTimeThreshold, NULL, sortDescending, merger->hasPostFilters() ? 0 : pageSize+startOffset)));
  3315. merger->setCompareColumn(-1); // we want to preserve the order of these two results
  3316. }
  3317. else
  3318. merger->addResult(*new CassandraResult(fetchDataByThorTime(thorTimeThreshold, sortDescending, merger->hasPostFilters() ? 0 : pageSize+startOffset)));
  3319. }
  3320. else if (goodFilters.length())
  3321. {
  3322. merger->addPostFilters(goodFilters, 1);
  3323. merger->addPostFilters(poorFilters, 0);
  3324. merger->addPostFilters(remoteWildFilters, 0);
  3325. const IPostFilter &best = goodFilters.item(0);
  3326. merger->addResult(*new CassandraResult(fetchDataForKeyWithFilter(best.queryXPath(), best.queryValue(), wuidFilters, sortorder, merger->hasPostFilters() ? 0 : pageSize+startOffset)));
  3327. }
  3328. else if (poorFilters.length())
  3329. {
  3330. merger->addPostFilters(poorFilters, 1);
  3331. merger->addPostFilters(remoteWildFilters, 0);
  3332. const IPostFilter &best= poorFilters.item(0);
  3333. merger->addResult(*new CassandraResult(fetchDataForKeyWithFilter(best.queryXPath(), best.queryValue(), wuidFilters, sortorder, merger->hasPostFilters() ? 0 : pageSize+startOffset)));
  3334. }
  3335. else if (remoteWildFilters.length())
  3336. {
  3337. merger->addPostFilters(remoteWildFilters, 1); // Any other filters have to be done locally
  3338. // Convert into a value IN [] which we do via a merge
  3339. // NOTE - If we want sorted by filter (or don't care about sort order), we could do directly as a range - but the wuid range filters then don't work, and the merger would be invalid
  3340. StringArray fieldValues;
  3341. const IPostFilter &best= remoteWildFilters.item(0);
  3342. _getUniqueValues(best.queryXPath(), best.queryValue(), fieldValues);
  3343. ForEachItemIn(idx, fieldValues)
  3344. {
  3345. merger->addResult(*new CassandraResult(fetchDataForKeyWithFilter(best.queryXPath(), fieldValues.item(idx), wuidFilters, sortorder, merger->hasPostFilters() ? 0 : pageSize+startOffset)));
  3346. }
  3347. }
  3348. else
  3349. {
  3350. // If all we have is a wuid range (or nothing), search the wuid table and/or return everything
  3351. for (int i = 0; i < NUM_PARTITIONS; i++)
  3352. {
  3353. merger->addResult(*new CassandraResult(fetchDataByPartition(workunitInfoMappings, i, wuidFilters, sortorder, merger->hasPostFilters() ? 0 : pageSize+startOffset)));
  3354. }
  3355. }
  3356. // The result we have will be sorted by wuid (ascending or descending)
  3357. if (needsPostSort)
  3358. {
  3359. // A post-sort will be required.
  3360. // Result should be limited in (to CASS_WORKUNIT_POSTSORT_LIMIT * number of results being merged)
  3361. result.setown(new CassPostSortIterator(merger.getClear(), sortorder, pageSize > CASS_WORKUNIT_POSTSORT_LIMIT ? pageSize : CASS_WORKUNIT_POSTSORT_LIMIT));
  3362. cached->setResult(result);
  3363. }
  3364. else
  3365. result.setown(merger.getClear());
  3366. }
  3367. if (startOffset || needsPostSort || result->hasPostFilters() || result->isMerging()) // we need a subpage if we have fetched anything other than exactly the rows requested
  3368. result.setown(new SubPageIterator(result.getClear(), startOffset, pageSize));
  3369. if (cachehint)
  3370. {
  3371. *cachehint = cached->queryHint();
  3372. CriticalBlock b(cacheCrit);
  3373. cacheIdMap.setValue(*cachehint, cached); // Links its parameter
  3374. }
  3375. if (total)
  3376. *total = 0; // We don't know
  3377. return result.getClear();
  3378. }
  3379. virtual StringArray &getUniqueValues(WUSortField field, const char *prefix, StringArray &result) const
  3380. {
  3381. return _getUniqueValues(queryFilterXPath(field), prefix, result);
  3382. }
  3383. virtual unsigned numWorkUnits()
  3384. {
  3385. unsigned total = 0;
  3386. CIArrayOf<CassandraFuture> futures;
  3387. for (int i = 0; i < NUM_PARTITIONS; i++)
  3388. {
  3389. CassandraStatement statement(prepareStatement("SELECT COUNT(*) FROM workunits where partition=?;"));
  3390. statement.bindInt32(0, i);
  3391. futures.append(*new CassandraFuture(cass_session_execute(querySession(), statement)));
  3392. }
  3393. ForEachItemIn(idx, futures)
  3394. {
  3395. CassandraFuture &future = futures.item(idx);
  3396. future.wait("select count(*)");
  3397. CassandraResult result(cass_future_get_result(future));
  3398. total += getUnsignedResult(NULL, getSingleResult(result));
  3399. }
  3400. return total;
  3401. }
  3402. /*
  3403. virtual bool isAborting(const char *wuid) const - done in the base class using dali
  3404. virtual void clearAborting(const char *wuid) - done in the base class using dali
  3405. */
  3406. virtual WUState waitForWorkUnit(const char * wuid, unsigned timeout, bool compiled, bool returnOnWaitState)
  3407. {
  3408. Owned<WorkUnitWaiter> waiter = new WorkUnitWaiter(wuid, SubscribeOptionState);
  3409. LocalIAbortHandler abortHandler(*waiter);
  3410. CassandraStatement statement(prepareStatement("select state, agentSession from workunits where partition=? and wuid=?;"));
  3411. statement.bindInt32(0, rtlHash32VStr(wuid, 0) % NUM_PARTITIONS);
  3412. statement.bindString(1, wuid);
  3413. SessionId agent = 0;
  3414. bool agentSessionStopped = false;
  3415. unsigned start = msTick();
  3416. loop
  3417. {
  3418. CassandraFuture future(cass_session_execute(querySession(), statement));
  3419. future.wait("Lookup wu state");
  3420. CassandraResult result(cass_future_get_result(future));
  3421. const CassRow *row = cass_result_first_row(result);
  3422. if (!row)
  3423. return WUStateUnknown;
  3424. const CassValue *stateVal = cass_row_get_column(row, 0);
  3425. if (!stateVal)
  3426. return WUStateUnknown;
  3427. StringBuffer stateStr;
  3428. getCassString(stateStr, stateVal);
  3429. WUState state = getWorkUnitState(stateStr);
  3430. switch (state)
  3431. {
  3432. case WUStateCompiled:
  3433. case WUStateUploadingFiles:
  3434. if (compiled)
  3435. return state;
  3436. break;
  3437. case WUStateCompleted:
  3438. case WUStateFailed:
  3439. case WUStateAborted:
  3440. return state;
  3441. case WUStateWait:
  3442. if (returnOnWaitState)
  3443. return state;
  3444. break;
  3445. case WUStateCompiling:
  3446. case WUStateRunning:
  3447. case WUStateDebugPaused:
  3448. case WUStateDebugRunning:
  3449. case WUStateBlocked:
  3450. case WUStateAborting:
  3451. if (agentSessionStopped)
  3452. {
  3453. reportAbnormalTermination(wuid, state, agent);
  3454. return state;
  3455. }
  3456. if (queryDaliServerVersion().compare("2.1")>=0)
  3457. {
  3458. agent = getUnsignedResult(NULL, cass_row_get_column(row, 1));
  3459. if(agent && querySessionManager().sessionStopped(agent, 0))
  3460. {
  3461. agentSessionStopped = true;
  3462. continue;
  3463. }
  3464. }
  3465. break;
  3466. }
  3467. agentSessionStopped = false; // reset for state changes such as WUStateWait then WUStateRunning again
  3468. unsigned waited = msTick() - start;
  3469. if (timeout==-1 || waited + 20000 < timeout)
  3470. {
  3471. waiter->wait(20000); // recheck state every 20 seconds, in case eclagent has crashed.
  3472. if (waiter->aborted)
  3473. return WUStateUnknown; // MORE - throw an exception?
  3474. }
  3475. else if (waited > timeout || !waiter->wait(timeout-waited))
  3476. return WUStateUnknown; // MORE - throw an exception?
  3477. }
  3478. }
  3479. virtual WUAction waitForWorkUnitAction(const char * wuid, WUAction original)
  3480. {
  3481. StringAttr origStr(getWorkunitActionStr(original));
  3482. Owned<WorkUnitWaiter> waiter = new WorkUnitWaiter(wuid, SubscribeOptionAction);
  3483. LocalIAbortHandler abortHandler(*waiter);
  3484. CassandraStatement statement(prepareStatement("select action from workunits where partition=? and wuid=?;"));
  3485. statement.bindInt32(0, rtlHash32VStr(wuid, 0) % NUM_PARTITIONS);
  3486. statement.bindString(1, wuid);
  3487. WUAction ret = WUActionUnknown;
  3488. loop
  3489. {
  3490. CassandraFuture future(cass_session_execute(querySession(), statement));
  3491. future.wait("Lookup wu action");
  3492. CassandraResult result(cass_future_get_result(future));
  3493. const CassRow *row = cass_result_first_row(result);
  3494. if (!row)
  3495. {
  3496. PROGLOG("While waiting for job %s, WU no longer exists", wuid);
  3497. break;
  3498. }
  3499. const CassValue *actionVal = cass_row_get_column(row, 0);
  3500. if (!actionVal)
  3501. {
  3502. PROGLOG("While waiting for job %s, WU action cannot be read", wuid);
  3503. break;
  3504. }
  3505. StringBuffer actionStr;
  3506. getCassString(actionStr, actionVal);
  3507. if (!streq(actionStr, origStr))
  3508. {
  3509. ret = getWorkunitAction(actionStr);
  3510. break;
  3511. }
  3512. waiter->wait(10000); // recheck state every 20 seconds even if no notifications... just because we used to before
  3513. if (waiter->aborted)
  3514. break;
  3515. }
  3516. return ret;
  3517. }
  3518. unsigned validateRepository(bool fix)
  3519. {
  3520. unsigned errCount = 0;
  3521. // 1. Check that every entry in main wu table has matching entries in secondary tables
  3522. CassandraResult result(fetchData(workunitInfoMappings+1));
  3523. CassandraIterator rows(cass_iterator_from_result(result));
  3524. if (fix)
  3525. {
  3526. // Delete the unique values table - the validate process recreates it afresh
  3527. executeSimpleCommand(querySession(), "TRUNCATE uniqueSearchValues;");
  3528. }
  3529. while (cass_iterator_next(rows))
  3530. {
  3531. Owned<IPTree> wuXML = rowToPTree(NULL, NULL, workunitInfoMappings+1, cass_iterator_get_row(rows));
  3532. const char *wuid = wuXML->queryName();
  3533. // For each search entry, check that we get matching XML
  3534. for (const char * const *search = searchPaths; *search; search++)
  3535. errCount += validateSearch(*search, wuid, wuXML, fix);
  3536. }
  3537. // 2. Check that there are no orphaned entries in search or child tables
  3538. errCount += checkOrphans(searchMappings, 3, fix);
  3539. for (const ChildTableInfo * const * table = childTables; *table != NULL; table++)
  3540. errCount += checkOrphans(table[0]->mappings, 1, fix);
  3541. errCount += checkOrphans(wuGraphProgressMappings, 1, fix);
  3542. errCount += checkOrphans(wuGraphStateMappings, 1, fix);
  3543. errCount += checkOrphans(wuGraphRunningMappings, 1, fix);
  3544. return errCount;
  3545. }
  3546. virtual void deleteRepository(bool recreate)
  3547. {
  3548. // USE WITH CARE!
  3549. CassandraSession s(cass_session_new());
  3550. CassandraFuture future(cass_session_connect(s, cluster.queryCluster()));
  3551. future.wait("connect without keyspace to delete");
  3552. VStringBuffer deleteKeyspace("DROP KEYSPACE IF EXISTS %s;", cluster.queryKeySpace());
  3553. executeSimpleCommand(s, deleteKeyspace);
  3554. s.set(NULL);
  3555. cluster.disconnect();
  3556. if (recreate)
  3557. createRepository();
  3558. }
  3559. virtual void createRepository()
  3560. {
  3561. cluster.disconnect();
  3562. CassandraSession s(cass_session_new());
  3563. CassandraFuture future(cass_session_connect(s, cluster.queryCluster()));
  3564. future.wait("connect without keyspace");
  3565. VStringBuffer create("CREATE KEYSPACE IF NOT EXISTS %s WITH replication = { 'class': 'SimpleStrategy', 'replication_factor': '1' };", cluster.queryKeySpace()); // MORE - options from props? Not 100% sure if they are appropriate.
  3566. executeSimpleCommand(s, create);
  3567. s.set(NULL);
  3568. cluster.connect();
  3569. createVersionTable(false);
  3570. ensureTable(querySession(), workunitsMappings);
  3571. ensureTable(querySession(), searchMappings);
  3572. ensureTable(querySession(), uniqueSearchMappings);
  3573. ensureTable(querySession(), filesSearchMappings);
  3574. for (const ChildTableInfo * const * table = childTables; *table != NULL; table++)
  3575. ensureTable(querySession(), table[0]->mappings);
  3576. ensureTable(querySession(), wuGraphProgressMappings);
  3577. ensureTable(querySession(), wuGraphStateMappings);
  3578. ensureTable(querySession(), wuGraphRunningMappings);
  3579. }
  3580. virtual const char *queryStoreType() const
  3581. {
  3582. return "Cassandra";
  3583. }
  3584. // Interface ICassandraSession
  3585. virtual CassSession *querySession() const { return cluster.querySession(); };
  3586. virtual unsigned queryTraceLevel() const { return traceLevel; };
  3587. virtual CassandraPrepared *prepareStatement(const char *query) const
  3588. {
  3589. return cluster.prepareStatement(query, traceLevel>=2);
  3590. }
  3591. virtual void executeAsync(CIArrayOf<CassandraStatement> &batch, const char *what) const override
  3592. {
  3593. if (batch.ordinality())
  3594. {
  3595. if (queryTraceLevel() > 1)
  3596. DBGLOG("Executing async batch %s", what);
  3597. cluster.executeAsync(batch, what);
  3598. }
  3599. }
  3600. private:
  3601. virtual void executeBatch(CassandraBatch &batch, const char *what) const
  3602. {
  3603. if (queryTraceLevel() > 1)
  3604. DBGLOG("Executing batch %s", what);
  3605. CassandraFuture futureBatch(cass_session_execute_batch(querySession(), batch));
  3606. futureBatch.wait(what);
  3607. }
  3608. void createVersionTable(bool force)
  3609. {
  3610. StringBuffer schema;
  3611. executeSimpleCommand(querySession(), describeTable(versionMappings, schema));
  3612. Owned<IPTree> oldVersion = getVersionInfo();
  3613. if (force || !oldVersion)
  3614. {
  3615. VStringBuffer versionInfo("<Version major='%d' minor='%d'/>", majorVersion, minorVersion);
  3616. CassandraBatch versionBatch(CASS_BATCH_TYPE_LOGGED);
  3617. Owned<IPTree> pt = createPTreeFromXMLString(versionInfo);
  3618. for (int i = 0; i < NUM_PARTITIONS; i++)
  3619. {
  3620. pt->setPropInt("@partition", i);
  3621. simpleXMLtoCassandra(this, versionBatch, versionMappings, pt, NULL);
  3622. }
  3623. executeBatch(versionBatch, "createVersionTable");
  3624. }
  3625. }
  3626. IPTree *getVersionInfo()
  3627. {
  3628. try
  3629. {
  3630. StringBuffer names;
  3631. StringBuffer tableName;
  3632. getFieldNames(versionMappings, names, tableName);
  3633. VStringBuffer selectQuery("select %s from %s where partition=?;", names.str()+1, tableName.str());
  3634. CassandraStatement select(prepareStatement(selectQuery));
  3635. select.bindInt32(0, rand_r(&randState) % NUM_PARTITIONS);
  3636. CassandraFuture future(cass_session_execute(querySession(), select));
  3637. future.wait("read version");
  3638. CassandraResult result(cass_future_get_result(future));
  3639. const CassRow *row = cass_result_first_row(result);
  3640. if (row)
  3641. return rowToPTree(NULL, NULL, versionMappings, row);
  3642. }
  3643. catch (IException *E)
  3644. {
  3645. EXCLOG(E);
  3646. E->Release();
  3647. }
  3648. catch (...)
  3649. {
  3650. DBGLOG("WARNING: Unknown exception caught while trying to retrieve Cassandra repository version information");
  3651. }
  3652. return NULL;
  3653. }
  3654. bool checkWuExists(const char *wuid)
  3655. {
  3656. CassandraStatement statement(prepareStatement("SELECT COUNT(*) FROM workunits where partition=? and wuid=?;"));
  3657. statement.bindInt32(0, rtlHash32VStr(wuid, 0) % NUM_PARTITIONS);
  3658. statement.bindString(1, wuid);
  3659. CassandraFuture future(cass_session_execute(querySession(), statement));
  3660. future.wait("select count(*)");
  3661. CassandraResult result(cass_future_get_result(future));
  3662. return getUnsignedResult(NULL, getSingleResult(result)) != 0; // Shouldn't be more than 1, either
  3663. }
  3664. void mergeFilter(IArrayOf<IPostFilter> &filters, WUSortField field, const char *value)
  3665. {
  3666. // Combine multiple filters on wuid - Cassandra doesn't like seeing more than one.
  3667. ForEachItemIn(idx, filters)
  3668. {
  3669. PostFilter &filter = static_cast<PostFilter &>(filters.item(idx));
  3670. if (filter.queryField()==field)
  3671. {
  3672. const char *prevLimit = filter.queryValue();
  3673. int diff = strcmp(prevLimit, value);
  3674. if (diff && ((diff < 0) == (field==WUSFwuid)))
  3675. filter.setValue(value);
  3676. return;
  3677. }
  3678. }
  3679. // Not found - add new filter
  3680. filters.append(*new PostFilter(field, value, true));
  3681. }
  3682. IConstWorkUnitIterator * getWorkUnitsByXXX(const char *xpath, const char *key, ISecManager *secmgr, ISecUser *secuser)
  3683. {
  3684. Owned<CassMultiIterator> merger = new CassMultiIterator(NULL, 0, 0, true); // Merge by wuid
  3685. if (!key || !*key)
  3686. {
  3687. IArrayOf<IPostFilter> wuidFilters;
  3688. for (int i = 0; i < NUM_PARTITIONS; i++)
  3689. {
  3690. merger->addResult(*new CassandraResult(fetchDataByPartition(workunitInfoMappings, i, wuidFilters)));
  3691. }
  3692. }
  3693. else
  3694. merger->addResult(*new CassandraResult(fetchDataForKey(xpath, key)));
  3695. return createSecureConstWUIterator(merger.getClear(), secmgr, secuser);
  3696. }
  3697. StringArray &_getUniqueValues(const char *xpath, const char *prefix, StringArray &result) const
  3698. {
  3699. if (prefix && strlen(prefix) >= CASS_SEARCH_PREFIX_SIZE)
  3700. {
  3701. CassandraResult r(fetchDataForWildSearch(xpath, prefix, uniqueSearchMappings));
  3702. CassandraIterator rows(cass_iterator_from_result(r));
  3703. StringBuffer value;
  3704. while (cass_iterator_next(rows))
  3705. {
  3706. const CassRow *row = cass_iterator_get_row(rows);
  3707. getCassString(value.clear(), cass_row_get_column(row, 0));
  3708. result.append(value);
  3709. }
  3710. }
  3711. return result;
  3712. }
  3713. unsigned validateSearch(const char *xpath, const char *wuid, IPTree *wuXML, bool fix)
  3714. {
  3715. unsigned errCount = 0;
  3716. const char *childKey = wuXML->queryProp(xpath);
  3717. if (childKey && *childKey)
  3718. {
  3719. CIArrayOf<CassandraStatement> batch;
  3720. CIArrayOf<CassandraStatement> deletes;
  3721. CassandraResult result(fetchDataForKeyAndWuid(xpath, childKey, wuid));
  3722. if (fix)
  3723. simpleXMLtoCassandra(this, batch, uniqueSearchMappings, wuXML, xpath);
  3724. switch (cass_result_row_count(result))
  3725. {
  3726. case 0:
  3727. DBGLOG("Missing search data for %s for wuid=%s key=%s", xpath, wuid, childKey);
  3728. if (fix)
  3729. simpleXMLtoCassandra(this, batch, searchMappings, wuXML, xpath);
  3730. errCount++;
  3731. break;
  3732. case 1:
  3733. {
  3734. Owned<IPTree> secXML = rowToPTree(xpath, childKey, searchMappings+4, cass_result_first_row(result)); // type, prefix, key, and wuid are not returned
  3735. secXML->renameProp("/", wuid);
  3736. if (!areMatchingPTrees(wuXML, secXML))
  3737. {
  3738. DBGLOG("Mismatched search data for %s for wuid %s", xpath, wuid);
  3739. if (fix)
  3740. simpleXMLtoCassandra(this, batch, searchMappings, wuXML, xpath);
  3741. errCount++;
  3742. }
  3743. break;
  3744. }
  3745. default:
  3746. DBGLOG("Multiple secondary data %d for %s for wuid %s", (int) cass_result_row_count(result), xpath, wuid); // This should be impossible!
  3747. if (fix)
  3748. {
  3749. deleteSecondaryByKey(xpath, childKey, wuid, this, deletes);
  3750. simpleXMLtoCassandra(this, batch, searchMappings, wuXML, xpath);
  3751. }
  3752. break;
  3753. }
  3754. if (fix)
  3755. {
  3756. executeAsync(deletes, "delete search");
  3757. executeAsync(batch, "fix search");
  3758. }
  3759. }
  3760. return errCount;
  3761. }
  3762. unsigned checkOrphans(const CassandraXmlMapping *mappings, unsigned wuidIndex, bool fix)
  3763. {
  3764. unsigned errCount = 0;
  3765. CassandraResult result(fetchData(mappings));
  3766. CassandraIterator rows(cass_iterator_from_result(result));
  3767. while (cass_iterator_next(rows))
  3768. {
  3769. const CassRow *row = cass_iterator_get_row(rows);
  3770. StringBuffer wuid;
  3771. getCassString(wuid, cass_row_get_column(row, wuidIndex));
  3772. if (!streq(wuid, GLOBAL_WORKUNIT) && !checkWuExists(wuid))
  3773. {
  3774. DBGLOG("Orphaned data in %s for wuid=%s", queryTableName(mappings), wuid.str());
  3775. if (fix)
  3776. {
  3777. if (wuidIndex)
  3778. {
  3779. CIArrayOf<CassandraStatement> secondaryBatch;
  3780. StringBuffer xpath, fieldValue;
  3781. getCassString(xpath, cass_row_get_column(row, 0));
  3782. getCassString(fieldValue, cass_row_get_column(row, 2));
  3783. deleteSecondaryByKey(xpath, fieldValue, wuid, this, secondaryBatch);
  3784. executeAsync(secondaryBatch, "Delete orphans");
  3785. }
  3786. else
  3787. {
  3788. CassandraBatch batch(CASS_BATCH_TYPE_UNLOGGED);
  3789. deleteChildByWuid(mappings, wuid, batch);
  3790. executeBatch(batch, "Delete orphans");
  3791. }
  3792. }
  3793. errCount++;
  3794. }
  3795. }
  3796. return errCount;
  3797. }
  3798. IPTree *cassandraToWorkunitXML(const char *wuid) const
  3799. {
  3800. CassandraResult result(fetchDataForWuid(workunitsMappings, wuid, false));
  3801. CassandraIterator rows(cass_iterator_from_result(result));
  3802. if (cass_iterator_next(rows)) // should just be one
  3803. {
  3804. Owned<IPTree> wuXML = createPTree(wuid);
  3805. wuXML->setProp("@xmlns:xsi", "http://www.w3.org/1999/XMLSchema-instance");
  3806. CassandraIterator cols(cass_iterator_from_row(cass_iterator_get_row(rows)));
  3807. unsigned colidx = 2; // wuid and partition are not returned
  3808. while (cass_iterator_next(cols))
  3809. {
  3810. assertex(workunitsMappings[colidx].columnName);
  3811. const CassValue *value = cass_iterator_get_column(cols);
  3812. if (value && !cass_value_is_null(value))
  3813. workunitsMappings[colidx].mapper.toXML(wuXML, workunitsMappings[colidx].xpath, value);
  3814. colidx++;
  3815. }
  3816. return wuXML.getClear();
  3817. }
  3818. else
  3819. return NULL;
  3820. }
  3821. // Fetch all rows from a table
  3822. const CassResult *fetchData(const CassandraXmlMapping *mappings) const
  3823. {
  3824. StringBuffer names;
  3825. StringBuffer tableName;
  3826. getFieldNames(mappings, names, tableName);
  3827. VStringBuffer selectQuery("select %s from %s;", names.str()+1, tableName.str());
  3828. if (traceLevel >= 2)
  3829. DBGLOG("%s", selectQuery.str());
  3830. CassandraStatement statement(cass_statement_new(selectQuery.str(), 0));
  3831. return executeQuery(querySession(), statement);
  3832. }
  3833. // Fetch all rows from a single partition of a table
  3834. const CassResult *fetchDataByPartition(const CassandraXmlMapping *mappings, int partition, const IArrayOf<IPostFilter> &wuidFilters, unsigned sortOrder=WUSFwuid|WUSFreverse, unsigned limit=0) const
  3835. {
  3836. StringBuffer names;
  3837. StringBuffer tableName;
  3838. getFieldNames(mappings+1, names, tableName); // Don't fetch partition column
  3839. VStringBuffer selectQuery("select %s from %s where partition=?", names.str()+1, tableName.str());
  3840. ForEachItemIn(idx, wuidFilters)
  3841. {
  3842. const IPostFilter &wuidFilter = wuidFilters.item(idx);
  3843. selectQuery.appendf(" and wuid %s ?", wuidFilter.queryField()==WUSFwuidhigh ? "<=" : ">=");
  3844. }
  3845. switch (sortOrder)
  3846. {
  3847. case WUSFwuid:
  3848. selectQuery.append(" ORDER BY WUID ASC");
  3849. break;
  3850. case WUSFwuid|WUSFreverse:
  3851. // If not wuid, descending, we will have to post-sort
  3852. selectQuery.append(" ORDER BY WUID DESC");
  3853. break;
  3854. default:
  3855. // If not wuid, descending, we will have to post-sort. We still need in wuid desc order for the merge though.
  3856. selectQuery.append(" ORDER BY WUID DESC");
  3857. if (!limit)
  3858. limit = CASS_WORKUNIT_POSTSORT_LIMIT;
  3859. break;
  3860. }
  3861. if (limit)
  3862. selectQuery.appendf(" LIMIT %u", limit);
  3863. selectQuery.append(';');
  3864. CassandraStatement select(prepareStatement(selectQuery));
  3865. select.bindInt32(0, partition);
  3866. ForEachItemIn(idx2, wuidFilters)
  3867. {
  3868. const IPostFilter &wuidFilter = wuidFilters.item(idx2);
  3869. select.bindString(idx2+1, wuidFilter.queryValue());
  3870. }
  3871. return executeQuery(querySession(), select);
  3872. }
  3873. // Fetch matching rows from a child table, or the main wu table
  3874. const CassResult *fetchDataForWuid(const CassandraXmlMapping *mappings, const char *wuid, bool includeWuid) const
  3875. {
  3876. assertex(wuid && *wuid);
  3877. StringBuffer names;
  3878. StringBuffer tableName;
  3879. getFieldNames(mappings + (includeWuid ? 1 : 2), names, tableName); // mappings+2 means we don't return the partition or wuid columns
  3880. VStringBuffer selectQuery("select %s from %s where partition=? and wuid=?;", names.str()+1, tableName.str());
  3881. CassandraStatement select(prepareStatement(selectQuery));
  3882. select.bindInt32(0, rtlHash32VStr(wuid, 0) % NUM_PARTITIONS);
  3883. select.bindString(1, wuid);
  3884. return executeQuery(querySession(), select);
  3885. }
  3886. const CassResult *fetchDataForWuidAndKey(const CassandraXmlMapping *mappings, const char *wuid, const char *key) const
  3887. {
  3888. assertex(wuid && *wuid);
  3889. StringBuffer names;
  3890. StringBuffer tableName;
  3891. getFieldNames(mappings+2, names, tableName); // mappings+2 means we don't return the partition or wuid columns. We do return the key.
  3892. VStringBuffer selectQuery("select %s from %s where partition=? and wuid=? and %s=?;", names.str()+1, tableName.str(), mappings[2].columnName);
  3893. CassandraStatement select(prepareStatement(selectQuery));
  3894. select.bindInt32(0, rtlHash32VStr(wuid, 0) % NUM_PARTITIONS);
  3895. select.bindString(1, wuid);
  3896. select.bindString(2, key);
  3897. return executeQuery(querySession(), select);
  3898. }
  3899. // Fetch matching rows from the search table, for all wuids, sorted by wuid
  3900. const CassResult *fetchDataForKey(const char *xpath, const char *key) const
  3901. {
  3902. assertex(key);
  3903. StringBuffer names;
  3904. StringBuffer tableName;
  3905. StringBuffer ucKey(key);
  3906. ucKey.toUpperCase();
  3907. getFieldNames(searchMappings+3, names, tableName); // mappings+3 means we don't return the key columns (xpath, upper(keyPrefix), upper(key))
  3908. VStringBuffer selectQuery("select %s from %s where xpath=? and fieldPrefix=? and fieldValue=?", names.str()+1, tableName.str());
  3909. selectQuery.append(" ORDER BY fieldValue ASC, WUID desc;");
  3910. CassandraStatement select(prepareStatement(selectQuery));
  3911. select.bindString(0, xpath);
  3912. select.bindString_n(1, ucKey, CASS_SEARCH_PREFIX_SIZE);
  3913. select.bindString(2, ucKey);
  3914. return executeQuery(querySession(), select);
  3915. }
  3916. // Fetch matching rows from the search table, for all wuids, sorted by wuid
  3917. const CassResult *fetchDataForKeyWithFilter(const char *xpath, const char *key, const IArrayOf<IPostFilter> &wuidFilters, unsigned sortOrder, unsigned limit) const
  3918. {
  3919. StringBuffer names;
  3920. StringBuffer tableName;
  3921. StringBuffer ucKey(key);
  3922. ucKey.toUpperCase();
  3923. getFieldNames(searchMappings+3, names, tableName); // mappings+3 means we don't return the key columns (xpath, upper(keyPrefix), upper(key))
  3924. VStringBuffer selectQuery("select %s from %s where xpath=? and fieldPrefix=? and fieldValue=?", names.str()+1, tableName.str());
  3925. ForEachItemIn(idx, wuidFilters)
  3926. {
  3927. const IPostFilter &wuidFilter = wuidFilters.item(idx);
  3928. selectQuery.appendf(" and wuid %s ?", wuidFilter.queryField()==WUSFwuidhigh ? "<=" : ">=");
  3929. }
  3930. switch (sortOrder)
  3931. {
  3932. case WUSFwuid:
  3933. selectQuery.append(" ORDER BY fieldValue DESC, WUID ASC");
  3934. break;
  3935. case WUSFwuid|WUSFreverse:
  3936. selectQuery.append(" ORDER BY fieldValue ASC, WUID DESC");
  3937. break;
  3938. default:
  3939. // If not wuid, descending, we will have to post-sort. We still need in wuid desc order for the merge though.
  3940. selectQuery.appendf(" ORDER BY fieldvalue ASC, WUID DESC");
  3941. limit = CASS_WORKUNIT_POSTSORT_LIMIT;
  3942. break;
  3943. }
  3944. if (limit)
  3945. selectQuery.appendf(" LIMIT %u", limit);
  3946. CassandraStatement select(prepareStatement(selectQuery));
  3947. select.bindString(0, xpath);
  3948. select.bindString_n(1, ucKey, CASS_SEARCH_PREFIX_SIZE);
  3949. select.bindString(2, ucKey);
  3950. ForEachItemIn(idx2, wuidFilters)
  3951. {
  3952. const IPostFilter &wuidFilter = wuidFilters.item(idx2);
  3953. select.bindString(3+idx2, wuidFilter.queryValue());
  3954. }
  3955. return executeQuery(querySession(), select);
  3956. }
  3957. // Fetch matching rows from the search or uniqueSearch table, for a given prefix
  3958. const CassResult *fetchDataForWildSearch(const char *xpath, const char *prefix, const CassandraXmlMapping *mappings) const
  3959. {
  3960. assertex(prefix && *prefix);
  3961. StringBuffer names;
  3962. StringBuffer tableName;
  3963. StringBuffer ucKey(prefix);
  3964. ucKey.toUpperCase();
  3965. StringBuffer ucKeyEnd(ucKey);
  3966. size32_t len = ucKeyEnd.length();
  3967. assertex(len);
  3968. ucKeyEnd.setCharAt(len-1, ucKeyEnd.charAt(len-1)+1);
  3969. getFieldNames(mappings+3, names, tableName); // mappings+3 means we don't return the key columns (xpath, upper(keyPrefix), upper(key))
  3970. VStringBuffer selectQuery("select %s from %s where xpath=? and fieldPrefix=? and fieldValue>=? and fieldValue<?;", names.str()+1, tableName.str());
  3971. CassandraStatement select(prepareStatement(selectQuery));
  3972. select.bindString(0, xpath);
  3973. select.bindString_n(1, ucKey, CASS_SEARCH_PREFIX_SIZE);
  3974. select.bindString(2, ucKey);
  3975. select.bindString(3, ucKeyEnd);
  3976. return executeQuery(querySession(), select);
  3977. }
  3978. // Fetch rows from the search table, by thorTime, above a threshold
  3979. const CassResult *fetchDataByThorTime(const char *threshold, bool descending, unsigned limit) const
  3980. {
  3981. StringBuffer names;
  3982. StringBuffer tableName;
  3983. getFieldNames(searchMappings+3, names, tableName); // mappings+3 means we don't return the key columns (xpath, upper(keyPrefix), upper(key))
  3984. VStringBuffer selectQuery("select %s from %s where xpath=? and fieldPrefix=?", names.str()+1, tableName.str());
  3985. if (threshold && *threshold)
  3986. selectQuery.appendf(" where fieldValue >= ?");
  3987. if (descending)
  3988. selectQuery.append(" ORDER BY fieldValue DESC, wuid ASC");
  3989. else
  3990. selectQuery.append(" ORDER BY fieldValue ASC, wuid DESC");
  3991. if (limit)
  3992. selectQuery.appendf(" LIMIT %u", limit);
  3993. selectQuery.append(';');
  3994. CassandraStatement select(prepareStatement(selectQuery));
  3995. select.bindString(0, "@totalThorTime");
  3996. select.bindString_n(1, " ", CASS_SEARCH_PREFIX_SIZE); // This would stop working if we ever set the search prefix to > 8 chars. So don't.
  3997. if (threshold && *threshold)
  3998. select.bindString(2, threshold);
  3999. return executeQuery(querySession(), select);
  4000. }
  4001. // Fetch rows from the search table, continuing a previous query that was sorted by thor time - part one
  4002. // This technique only works for thor time where we have forced to a single partition. Otherwise it gets even more complicated, and not worth it.
  4003. const CassResult *fetchMoreDataByThorTime(const char *threshold, const char *wuid, bool descending, unsigned limit) const
  4004. {
  4005. StringBuffer names;
  4006. StringBuffer tableName;
  4007. getFieldNames(searchMappings+3, names, tableName); // mappings+3 means we don't return the key columns (xpath, upper(keyPrefix), upper(key))
  4008. const char *wuidTest;
  4009. const char *fieldTest;
  4010. if (descending)
  4011. {
  4012. wuidTest = ">";
  4013. fieldTest = wuid ? "=" : "<";
  4014. }
  4015. else
  4016. {
  4017. wuidTest = "<";
  4018. fieldTest = wuid ? "=" : ">";
  4019. }
  4020. VStringBuffer selectQuery("select %s from %s where xpath=? and fieldPrefix=? and fieldValue %s ?", names.str()+1, tableName.str(), fieldTest);
  4021. if (wuid)
  4022. selectQuery.appendf(" and wuid %s ?", wuidTest);
  4023. if (descending)
  4024. selectQuery.append(" ORDER BY fieldValue DESC, WUID ASC");
  4025. else
  4026. selectQuery.append(" ORDER BY fieldValue ASC, WUID DESC");
  4027. if (limit)
  4028. selectQuery.appendf(" LIMIT %u", limit);
  4029. selectQuery.append(';');
  4030. CassandraStatement select(prepareStatement(selectQuery));
  4031. select.bindString(0, "@totalThorTime");
  4032. select.bindString_n(1, threshold, CASS_SEARCH_PREFIX_SIZE);
  4033. select.bindString(2, threshold);
  4034. if (wuid)
  4035. select.bindString(3, wuid);
  4036. return executeQuery(querySession(), select);
  4037. }
  4038. // Fetch rows from the file search table (covers files read and files written)
  4039. const CassResult *fetchDataForFiles(const char *name, const IArrayOf<IPostFilter> &wuidFilters, bool read) const
  4040. {
  4041. StringBuffer names;
  4042. StringBuffer tableName;
  4043. getFieldNames(filesSearchMappings+2, names, tableName); // mappings+2 means we don't return the key columns (name and readmode)
  4044. VStringBuffer selectQuery("select %s from %s where name=? and read=?", names.str()+1, tableName.str());
  4045. ForEachItemIn(idx, wuidFilters)
  4046. {
  4047. const IPostFilter &wuidFilter = wuidFilters.item(idx);
  4048. selectQuery.appendf(" and wuid %s ?", wuidFilter.queryField()==WUSFwuidhigh ? "<=" : ">=");
  4049. }
  4050. CassandraStatement select(prepareStatement(selectQuery));
  4051. select.bindString(0, name);
  4052. select.bindBool(1, read ? cass_true : cass_false);
  4053. ForEachItemIn(idx2, wuidFilters)
  4054. {
  4055. const IPostFilter &wuidFilter = wuidFilters.item(idx2);
  4056. select.bindString(idx2+2, wuidFilter.queryValue());
  4057. }
  4058. return executeQuery(querySession(), select);
  4059. }
  4060. // Fetch matching rows from the search table, for a single wuid
  4061. const CassResult *fetchDataForKeyAndWuid(const char *xpath, const char *key, const char *wuid) const
  4062. {
  4063. assertex(key);
  4064. StringBuffer names;
  4065. StringBuffer tableName;
  4066. StringBuffer ucKey(key);
  4067. ucKey.toUpperCase();
  4068. getFieldNames(searchMappings+4, names, tableName); // mappings+4 means we don't return the key columns (xpath, upper(keyPrefix), upper(key), and wuid)
  4069. VStringBuffer selectQuery("select %s from %s where xpath=? and fieldPrefix=? and fieldValue =? and wuid=?;", names.str()+1, tableName.str());
  4070. CassandraStatement select(prepareStatement(selectQuery));
  4071. select.bindString(0, xpath);
  4072. select.bindString_n(1, ucKey, CASS_SEARCH_PREFIX_SIZE);
  4073. select.bindString(2, ucKey);
  4074. select.bindString(3, wuid);
  4075. return executeQuery(querySession(), select);
  4076. }
  4077. // Delete matching rows from a child table
  4078. virtual void deleteChildByWuid(const CassandraXmlMapping *mappings, const char *wuid, CassBatch *batch) const
  4079. {
  4080. StringBuffer names;
  4081. StringBuffer tableName;
  4082. getFieldNames(mappings, names, tableName);
  4083. VStringBuffer insertQuery("DELETE from %s where partition=? and wuid=?;", tableName.str());
  4084. CassandraStatement update(prepareStatement(insertQuery));
  4085. update.bindInt32(0, rtlHash32VStr(wuid, 0) % NUM_PARTITIONS);
  4086. update.bindString(1, wuid);
  4087. check(cass_batch_add_statement(batch, update));
  4088. }
  4089. unsigned retireCache()
  4090. {
  4091. CriticalBlock b(cacheCrit); // Is this too coarse-grained?
  4092. unsigned expires = CASS_WU_QUERY_EXPIRES;
  4093. unsigned now = msTick();
  4094. ICopyArrayOf<CCassandraWuUQueryCacheEntry> goers;
  4095. HashIterator iter(cacheIdMap);
  4096. ForEach(iter)
  4097. {
  4098. CCassandraWuUQueryCacheEntry *entry = cacheIdMap.mapToValue(&iter.query());
  4099. unsigned age = now - entry->queryLastAccess();
  4100. int ttl = CASS_WU_QUERY_EXPIRES-age;
  4101. if (ttl<= 0)
  4102. goers.append(*entry);
  4103. else if (ttl< expires)
  4104. expires = ttl;
  4105. }
  4106. ForEachItemIn(idx, goers)
  4107. {
  4108. DBGLOG("Expiring cache entry %p", &goers.item(idx));
  4109. cacheIdMap.remove(goers.item(idx).queryHint());
  4110. }
  4111. return expires;
  4112. }
  4113. class CacheRetirer : public Thread
  4114. {
  4115. public:
  4116. CacheRetirer(CCasssandraWorkUnitFactory &_parent) : Thread("WorkunitListCacheRetirer"), parent(_parent)
  4117. {
  4118. stopping = false;
  4119. }
  4120. virtual int run()
  4121. {
  4122. while (!stopping)
  4123. {
  4124. unsigned delay = parent.retireCache();
  4125. sem.wait(delay);
  4126. }
  4127. return 0;
  4128. }
  4129. void stop()
  4130. {
  4131. stopping = true;
  4132. sem.signal();
  4133. }
  4134. private:
  4135. Semaphore sem;
  4136. CCasssandraWorkUnitFactory &parent;
  4137. bool stopping;
  4138. } cacheRetirer;
  4139. unsigned randomizeSuffix;
  4140. unsigned traceLevel;
  4141. unsigned randState;
  4142. CassandraClusterSession cluster;
  4143. mutable CriticalSection cacheCrit;
  4144. mutable MapXToMyClass<__uint64, __uint64, CCassandraWuUQueryCacheEntry> cacheIdMap;
  4145. };
  4146. } // namespace
  4147. extern "C" EXPORT IWorkUnitFactory *createWorkUnitFactory(const SharedObject *dll, const IPropertyTree *props)
  4148. {
  4149. return new cassandraembed::CCasssandraWorkUnitFactory(dll, props);
  4150. }