1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201120212031204120512061207120812091210121112121213121412151216121712181219122012211222122312241225122612271228122912301231123212331234123512361237123812391240124112421243124412451246124712481249125012511252125312541255125612571258125912601261126212631264126512661267126812691270127112721273127412751276127712781279128012811282128312841285128612871288128912901291129212931294129512961297129812991300130113021303130413051306130713081309131013111312131313141315131613171318131913201321132213231324132513261327132813291330133113321333133413351336133713381339134013411342134313441345134613471348134913501351135213531354135513561357135813591360136113621363136413651366136713681369137013711372137313741375137613771378137913801381138213831384138513861387138813891390139113921393139413951396139713981399140014011402140314041405140614071408140914101411141214131414141514161417141814191420142114221423142414251426142714281429143014311432143314341435143614371438143914401441144214431444144514461447144814491450145114521453145414551456145714581459146014611462146314641465146614671468146914701471147214731474147514761477147814791480148114821483148414851486148714881489149014911492149314941495149614971498149915001501150215031504150515061507150815091510151115121513151415151516151715181519152015211522152315241525152615271528152915301531153215331534153515361537153815391540154115421543154415451546154715481549155015511552155315541555155615571558155915601561156215631564156515661567156815691570157115721573157415751576157715781579158015811582158315841585158615871588158915901591159215931594159515961597159815991600160116021603160416051606160716081609161016111612161316141615161616171618161916201621162216231624162516261627162816291630163116321633163416351636163716381639164016411642164316441645164616471648164916501651165216531654165516561657165816591660166116621663166416651666166716681669167016711672167316741675167616771678167916801681168216831684168516861687168816891690169116921693169416951696169716981699170017011702170317041705170617071708170917101711171217131714171517161717171817191720172117221723172417251726172717281729173017311732173317341735173617371738173917401741174217431744174517461747174817491750175117521753175417551756175717581759176017611762176317641765176617671768176917701771177217731774177517761777177817791780178117821783178417851786178717881789179017911792179317941795179617971798179918001801180218031804180518061807180818091810181118121813181418151816181718181819182018211822182318241825182618271828182918301831183218331834183518361837183818391840184118421843184418451846184718481849185018511852185318541855185618571858185918601861186218631864186518661867186818691870187118721873187418751876187718781879188018811882188318841885188618871888188918901891189218931894189518961897189818991900190119021903190419051906190719081909191019111912191319141915191619171918191919201921192219231924192519261927192819291930193119321933193419351936193719381939194019411942194319441945194619471948194919501951195219531954195519561957195819591960196119621963196419651966196719681969197019711972197319741975197619771978197919801981198219831984198519861987198819891990199119921993199419951996199719981999200020012002200320042005200620072008200920102011201220132014201520162017201820192020202120222023202420252026202720282029203020312032203320342035203620372038203920402041204220432044204520462047204820492050205120522053205420552056205720582059206020612062206320642065206620672068206920702071207220732074207520762077207820792080208120822083208420852086208720882089209020912092209320942095209620972098209921002101210221032104210521062107210821092110211121122113211421152116211721182119212021212122212321242125212621272128212921302131213221332134213521362137213821392140214121422143214421452146214721482149215021512152215321542155215621572158215921602161216221632164216521662167216821692170217121722173217421752176217721782179218021812182218321842185218621872188218921902191219221932194219521962197219821992200220122022203220422052206220722082209221022112212221322142215221622172218221922202221222222232224222522262227222822292230223122322233223422352236223722382239224022412242224322442245224622472248224922502251225222532254225522562257225822592260226122622263226422652266226722682269227022712272227322742275227622772278227922802281228222832284228522862287228822892290229122922293229422952296229722982299230023012302230323042305230623072308230923102311231223132314231523162317231823192320232123222323232423252326232723282329233023312332233323342335233623372338233923402341234223432344234523462347234823492350235123522353235423552356235723582359236023612362236323642365236623672368236923702371237223732374237523762377237823792380238123822383238423852386238723882389239023912392239323942395239623972398239924002401240224032404240524062407240824092410241124122413241424152416241724182419242024212422242324242425242624272428242924302431243224332434243524362437243824392440244124422443244424452446244724482449245024512452245324542455245624572458245924602461246224632464246524662467246824692470247124722473247424752476247724782479248024812482248324842485248624872488248924902491249224932494249524962497249824992500250125022503250425052506250725082509251025112512251325142515251625172518251925202521252225232524252525262527252825292530253125322533253425352536253725382539254025412542254325442545254625472548254925502551255225532554255525562557255825592560256125622563256425652566256725682569257025712572257325742575257625772578257925802581258225832584258525862587258825892590259125922593259425952596259725982599260026012602260326042605260626072608260926102611261226132614261526162617261826192620262126222623262426252626262726282629263026312632263326342635263626372638263926402641264226432644264526462647264826492650265126522653265426552656265726582659266026612662266326642665266626672668266926702671267226732674267526762677267826792680268126822683268426852686268726882689269026912692269326942695269626972698269927002701270227032704270527062707270827092710271127122713271427152716271727182719272027212722272327242725272627272728272927302731273227332734273527362737273827392740274127422743274427452746274727482749275027512752275327542755275627572758275927602761276227632764276527662767276827692770277127722773277427752776277727782779278027812782278327842785278627872788278927902791279227932794279527962797279827992800280128022803280428052806280728082809281028112812281328142815281628172818281928202821282228232824282528262827282828292830283128322833283428352836283728382839284028412842284328442845284628472848284928502851285228532854285528562857285828592860286128622863286428652866286728682869287028712872287328742875287628772878287928802881288228832884288528862887288828892890289128922893289428952896289728982899290029012902290329042905290629072908290929102911291229132914291529162917291829192920292129222923292429252926292729282929293029312932293329342935293629372938293929402941294229432944294529462947294829492950295129522953295429552956295729582959296029612962296329642965296629672968296929702971297229732974297529762977297829792980298129822983298429852986298729882989299029912992299329942995299629972998299930003001300230033004300530063007300830093010301130123013301430153016301730183019302030213022302330243025302630273028302930303031303230333034303530363037303830393040304130423043304430453046304730483049305030513052305330543055305630573058305930603061306230633064306530663067306830693070307130723073307430753076307730783079308030813082308330843085308630873088308930903091309230933094309530963097309830993100310131023103310431053106310731083109311031113112311331143115311631173118311931203121312231233124312531263127312831293130313131323133313431353136313731383139314031413142314331443145314631473148314931503151315231533154315531563157315831593160316131623163316431653166316731683169317031713172317331743175317631773178317931803181318231833184318531863187318831893190319131923193319431953196319731983199320032013202320332043205320632073208320932103211321232133214321532163217321832193220322132223223322432253226322732283229323032313232323332343235323632373238323932403241324232433244324532463247324832493250325132523253325432553256325732583259326032613262326332643265326632673268326932703271327232733274327532763277327832793280328132823283328432853286328732883289329032913292329332943295329632973298329933003301330233033304330533063307330833093310331133123313331433153316331733183319332033213322332333243325332633273328332933303331333233333334333533363337333833393340334133423343334433453346334733483349335033513352335333543355335633573358335933603361336233633364336533663367336833693370337133723373337433753376337733783379338033813382338333843385338633873388338933903391339233933394339533963397339833993400340134023403340434053406340734083409341034113412341334143415341634173418341934203421342234233424342534263427342834293430343134323433343434353436343734383439344034413442344334443445344634473448344934503451345234533454345534563457345834593460346134623463346434653466346734683469347034713472347334743475347634773478347934803481348234833484348534863487348834893490349134923493349434953496349734983499350035013502350335043505350635073508350935103511351235133514351535163517351835193520352135223523352435253526352735283529353035313532353335343535353635373538353935403541354235433544354535463547354835493550355135523553355435553556355735583559356035613562356335643565356635673568356935703571357235733574357535763577357835793580358135823583358435853586358735883589359035913592359335943595359635973598359936003601360236033604360536063607360836093610361136123613361436153616361736183619362036213622362336243625362636273628362936303631363236333634363536363637363836393640364136423643364436453646364736483649365036513652365336543655365636573658365936603661366236633664366536663667366836693670367136723673367436753676367736783679368036813682368336843685368636873688368936903691369236933694369536963697369836993700370137023703370437053706370737083709371037113712371337143715371637173718371937203721372237233724372537263727372837293730373137323733373437353736373737383739374037413742374337443745374637473748374937503751375237533754375537563757375837593760376137623763376437653766376737683769377037713772377337743775377637773778377937803781378237833784378537863787378837893790379137923793379437953796379737983799380038013802380338043805380638073808380938103811381238133814381538163817381838193820382138223823382438253826382738283829383038313832383338343835383638373838383938403841384238433844384538463847384838493850385138523853385438553856385738583859386038613862386338643865386638673868386938703871387238733874387538763877387838793880388138823883388438853886388738883889389038913892389338943895389638973898389939003901390239033904390539063907390839093910391139123913391439153916391739183919392039213922392339243925392639273928392939303931393239333934393539363937393839393940394139423943394439453946394739483949395039513952395339543955395639573958395939603961396239633964396539663967396839693970397139723973397439753976397739783979398039813982398339843985398639873988398939903991399239933994399539963997399839994000400140024003400440054006400740084009401040114012401340144015401640174018401940204021402240234024402540264027402840294030403140324033403440354036403740384039404040414042404340444045404640474048404940504051405240534054405540564057405840594060406140624063406440654066406740684069407040714072407340744075407640774078407940804081408240834084408540864087408840894090409140924093409440954096409740984099410041014102410341044105410641074108410941104111411241134114411541164117411841194120412141224123412441254126412741284129413041314132413341344135413641374138413941404141414241434144414541464147414841494150415141524153415441554156415741584159416041614162416341644165416641674168416941704171417241734174417541764177417841794180418141824183418441854186418741884189419041914192419341944195419641974198419942004201420242034204420542064207420842094210421142124213421442154216421742184219422042214222422342244225422642274228422942304231423242334234423542364237423842394240424142424243424442454246424742484249425042514252425342544255425642574258425942604261426242634264426542664267426842694270427142724273427442754276427742784279428042814282428342844285428642874288428942904291429242934294429542964297429842994300430143024303430443054306430743084309431043114312431343144315431643174318431943204321432243234324432543264327432843294330433143324333433443354336433743384339434043414342434343444345434643474348434943504351435243534354435543564357435843594360436143624363436443654366436743684369437043714372437343744375437643774378437943804381438243834384438543864387438843894390439143924393439443954396439743984399440044014402440344044405440644074408440944104411441244134414441544164417441844194420442144224423442444254426442744284429443044314432443344344435443644374438443944404441444244434444444544464447444844494450445144524453445444554456445744584459446044614462446344644465446644674468446944704471447244734474447544764477447844794480448144824483448444854486448744884489449044914492449344944495449644974498449945004501450245034504450545064507450845094510451145124513451445154516451745184519452045214522452345244525452645274528452945304531453245334534453545364537453845394540454145424543454445454546454745484549455045514552455345544555455645574558455945604561456245634564456545664567456845694570457145724573457445754576457745784579458045814582458345844585458645874588458945904591459245934594459545964597459845994600460146024603460446054606460746084609461046114612461346144615461646174618461946204621462246234624462546264627 |
- /*##############################################################################
- HPCC SYSTEMS software Copyright (C) 2013 HPCC Systems.
- Licensed under the Apache License, Version 2.0 (the "License");
- you may not use this file except in compliance with the License.
- You may obtain a copy of the License at
- http://www.apache.org/licenses/LICENSE-2.0
- Unless required by applicable law or agreed to in writing, software
- distributed under the License is distributed on an "AS IS" BASIS,
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- See the License for the specific language governing permissions and
- limitations under the License.
- ############################################################################## */
- #include "platform.h"
- #include "cassandra.h"
- #include "jexcept.hpp"
- #include "jthread.hpp"
- #include "hqlplugins.hpp"
- #include "deftype.hpp"
- #include "eclhelper.hpp"
- #include "eclrtl.hpp"
- #include "eclrtl_imp.hpp"
- #include "rtlds_imp.hpp"
- #include "rtlfield.hpp"
- #include "rtlembed.hpp"
- #include "roxiemem.hpp"
- #include "nbcd.hpp"
- #include "jsort.hpp"
- #include "jptree.hpp"
- #include "jlzw.hpp"
- #include "jregexp.hpp"
- #include "dadfs.hpp"
- #include "dasds.hpp"
- #include "wuerror.hpp"
- #include "workunit.hpp"
- #include "workunit.ipp"
- #include "cassandraembed.hpp"
- #include <list>
- #include <string>
- #include <algorithm>
- #define EXPORT DECL_EXPORT
- namespace cassandraembed {
- #define CASS_WU_QUERY_EXPIRES (1000*60*5)
- #define CASS_WORKUNIT_POSTSORT_LIMIT 10000
- #define DEFAULT_PREFIX_SIZE 2
- #define MIN_PREFIX_SIZE 2
- #define MAX_PREFIX_SIZE 8
- #define DEFAULT_PARTITIONS 2
- #define MIN_PARTITIONS 1
- #define MAX_PARTITIONS 10
- static const CassValue *getSingleResult(const CassResult *result)
- {
- const CassRow *row = cass_result_first_row(result);
- if (row)
- return cass_row_get_column(row, 0);
- else
- return NULL;
- }
- static StringBuffer &getCassString(StringBuffer &str, const CassValue *value)
- {
- const char *output;
- size_t length;
- check(cass_value_get_string(value, &output, &length));
- return str.append(length, output);
- }
- struct CassandraXmlMapping;
- interface ICassandraSession : public IInterface // MORE - rename!
- {
- virtual CassSession *querySession() const = 0;
- virtual CassandraPrepared *prepareStatement(const char *query) const = 0;
- virtual void executeAsync(CIArrayOf<CassandraStatement> &batch, const char *what) const = 0;
- virtual unsigned queryTraceLevel() const = 0;
- virtual const CassResult *fetchDataForWuid(const CassandraXmlMapping *mappings, const char *wuid, bool includeWuid) const = 0;
- virtual const CassResult *fetchDataForWuidAndKey(const CassandraXmlMapping *mappings, const char *wuid, const char *key) const = 0;
- virtual void deleteChildByWuid(const CassandraXmlMapping *mappings, const char *wuid, CassBatch *batch) const = 0;
- virtual IPTree *cassandraToWorkunitXML(const char *wuid) const = 0;
- virtual unsigned queryPrefixSize() const = 0;
- virtual unsigned queryPartitions() const = 0;
- };
- struct CassandraColumnMapper
- {
- virtual ~CassandraColumnMapper() {}
- virtual IPTree *toXML(IPTree *row, const char *name, const CassValue *value) = 0;
- virtual bool fromXML(const ICassandraSession *session, CassandraStatement *statement, unsigned idx, IPTree *row, const char *name, const char *userVal) = 0;
- };
- static class StringColumnMapper : implements CassandraColumnMapper
- {
- public:
- virtual IPTree *toXML(IPTree *row, const char *name, const CassValue *value) override
- {
- rtlDataAttr str;
- unsigned chars;
- getUTF8Result(NULL, value, chars, str.refstr());
- StringAttr s(str.getstr(), rtlUtf8Size(chars, str.getstr()));
- row->setProp(name, s);
- return row;
- }
- virtual bool fromXML(const ICassandraSession *session, CassandraStatement *statement, unsigned idx, IPTree *row, const char *name, const char *userVal) override
- {
- const char *value = row->queryProp(name);
- if (!value)
- return false;
- if (statement)
- statement->bindString(idx, value);
- return true;
- }
- } stringColumnMapper;
- static class RequiredStringColumnMapper : public StringColumnMapper
- {
- public:
- virtual bool fromXML(const ICassandraSession *session, CassandraStatement *statement, unsigned idx, IPTree *row, const char *name, const char *userVal) override
- {
- const char *value = row->queryProp(name);
- if (!value)
- value = "";
- if (statement)
- statement->bindString(idx, value);
- return true;
- }
- } requiredStringColumnMapper;
- static class SuppliedStringColumnMapper : public StringColumnMapper
- {
- public:
- virtual bool fromXML(const ICassandraSession *session, CassandraStatement *statement, unsigned idx, IPTree *row, const char *name, const char *userVal) override
- {
- if (statement)
- statement->bindString(idx, userVal);
- return true;
- }
- } suppliedStringColumnMapper;
- static class BlobColumnMapper : implements CassandraColumnMapper
- {
- public:
- virtual IPTree *toXML(IPTree *row, const char *name, const CassValue *value) override
- {
- rtlDataAttr str;
- unsigned chars;
- getDataResult(NULL, value, chars, str.refdata());
- row->setPropBin(name, chars, str.getbytes());
- return row;
- }
- virtual bool fromXML(const ICassandraSession *session, CassandraStatement *statement, unsigned idx, IPTree *row, const char *name, const char *userVal) override
- {
- MemoryBuffer value;
- row->getPropBin(name, value);
- if (value.length())
- {
- if (statement)
- statement->bindBytes(idx, (const cass_byte_t *) value.toByteArray(), value.length());
- return true;
- }
- else
- return false;
- }
- } blobColumnMapper;
- static class compressTreeColumnMapper : implements CassandraColumnMapper
- {
- public:
- virtual IPTree *toXML(IPTree *row, const char *name, const CassValue *value) override
- {
- rtlDataAttr str;
- unsigned chars;
- getDataResult(NULL, value, chars, str.refdata());
- if (chars)
- {
- MemoryBuffer compressed, decompressed;
- compressed.setBuffer(chars, str.getbytes(), false);
- decompressToBuffer(decompressed, compressed);
- Owned<IPTree> p = createPTree(decompressed);
- row->setPropTree(name, p.getClear());
- }
- return row;
- }
- virtual bool fromXML(const ICassandraSession *session, CassandraStatement *statement, unsigned idx, IPTree *row, const char *name, const char *userVal) override
- {
- IPTree *child = row->queryPropTree(name);
- if (child && child->hasChildren())
- {
- if (statement)
- {
- MemoryBuffer decompressed, compressed;
- child->serialize(decompressed);
- compressToBuffer(compressed, decompressed.length(), decompressed.toByteArray());
- statement->bindBytes(idx, (const cass_byte_t *) compressed.toByteArray(), compressed.length());
- }
- return true;
- }
- else
- return false;
- }
- } compressTreeColumnMapper;
- static class TimeStampColumnMapper : implements CassandraColumnMapper
- {
- public:
- virtual IPTree *toXML(IPTree *row, const char *name, const CassValue *value) override
- {
- // never fetched (that may change?)
- return row;
- }
- virtual bool fromXML(const ICassandraSession *session, CassandraStatement *statement, unsigned idx, IPTree *row, const char *name, const char *userVal) override
- {
- // never bound, but does need to be included in the ?
- return true;
- }
- } timestampColumnMapper;
- static class HashRootNameColumnMapper : implements CassandraColumnMapper
- {
- public:
- virtual IPTree *toXML(IPTree *row, const char *name, const CassValue *value) override
- {
- throwUnexpected(); // we never return the partition column
- }
- virtual bool fromXML(const ICassandraSession *session, CassandraStatement *statement, unsigned idx, IPTree *row, const char *name, const char *userVal) override
- {
- if (statement)
- {
- int hash = rtlHash32VStr(row->queryName(), 0) % session->queryPartitions();
- statement->bindInt32(idx, hash);
- }
- return true;
- }
- } hashRootNameColumnMapper;
- static class RootNameColumnMapper : implements CassandraColumnMapper
- {
- public:
- virtual IPTree *toXML(IPTree *row, const char *name, const CassValue *value) override
- {
- rtlDataAttr str;
- unsigned chars;
- getUTF8Result(NULL, value, chars, str.refstr());
- StringAttr s(str.getstr(), rtlUtf8Size(chars, str.getstr()));
- row->renameProp("/", s);
- return row;
- }
- virtual bool fromXML(const ICassandraSession *session, CassandraStatement *statement, unsigned idx, IPTree *row, const char *name, const char *userVal) override
- {
- if (statement)
- {
- const char *value = row->queryName();
- statement->bindString(idx, value);
- }
- return true;
- }
- } rootNameColumnMapper;
- // WuidColumnMapper is used for columns containing a wuid that is NOT in the resulting XML - it
- // is an error to try to map such a column to/from the XML representation
- static class WuidColumnMapper : implements CassandraColumnMapper
- {
- public:
- virtual IPTree *toXML(IPTree *row, const char *name, const CassValue *value) override
- {
- throwUnexpected();
- }
- virtual bool fromXML(const ICassandraSession *session, CassandraStatement *statement, unsigned idx, IPTree *row, const char *name, const char *userVal) override
- {
- throwUnexpected();
- }
- } wuidColumnMapper;
- static class BoolColumnMapper : implements CassandraColumnMapper
- {
- public:
- virtual IPTree *toXML(IPTree *row, const char *name, const CassValue *value) override
- {
- row->addPropBool(name, getBooleanResult(NULL, value));
- return row;
- }
- virtual bool fromXML(const ICassandraSession *session, CassandraStatement *statement, unsigned idx, IPTree *row, const char *name, const char *userVal) override
- {
- if (row->hasProp(name))
- {
- if (statement)
- {
- bool value = row->getPropBool(name, false);
- statement->bindBool(idx, value ? cass_true : cass_false);
- }
- return true;
- }
- else
- return false;
- }
- } boolColumnMapper;
- static class PrefixSearchColumnMapper : implements CassandraColumnMapper
- {
- public:
- virtual IPTree *toXML(IPTree *row, const char *name, const CassValue *value) override
- {
- return row;
- }
- virtual bool fromXML(const ICassandraSession *session, CassandraStatement *statement, unsigned idx, IPTree *row, const char *name, const char *userVal) override
- {
- return _fromXML(statement, idx, row, userVal, session->queryPrefixSize(), true);
- }
- protected:
- static bool _fromXML(CassandraStatement *statement, unsigned idx, IPTree *row, const char *xpath, unsigned prefixLength, bool uc)
- {
- const char *columnVal = row->queryProp(xpath);
- if (columnVal)
- {
- if (statement)
- {
- StringBuffer buf(columnVal);
- if (uc)
- buf.toUpperCase();
- if (prefixLength)
- statement->bindString_n(idx, buf, prefixLength);
- else
- statement->bindString(idx, buf);
- }
- return true;
- }
- else
- return false;
- }
- } prefixSearchColumnMapper;
- static class SearchColumnMapper : public PrefixSearchColumnMapper
- {
- public:
- virtual bool fromXML(const ICassandraSession *session, CassandraStatement *statement, unsigned idx, IPTree *row, const char *name, const char *userVal) override
- {
- return _fromXML(statement, idx, row, userVal, 0, true);
- }
- } searchColumnMapper;
- static class LCSearchColumnMapper : public PrefixSearchColumnMapper
- {
- public:
- virtual bool fromXML(const ICassandraSession *session, CassandraStatement *statement, unsigned idx, IPTree *row, const char *name, const char *userVal) override
- {
- return _fromXML(statement, idx, row, userVal, 0, false);
- }
- } lcSearchColumnMapper;
- static class IntColumnMapper : implements CassandraColumnMapper
- {
- public:
- virtual IPTree *toXML(IPTree *row, const char *name, const CassValue *value) override
- {
- if (name)
- row->addPropInt(name, getSignedResult(NULL, value));
- return row;
- }
- virtual bool fromXML(const ICassandraSession *session, CassandraStatement *statement, unsigned idx, IPTree *row, const char *name, const char *userVal) override
- {
- if (row->hasProp(name))
- {
- if (statement)
- {
- int value = row->getPropInt(name);
- statement->bindInt32(idx, value);
- }
- return true;
- }
- else
- return false;
- }
- } intColumnMapper;
- static class DefaultedIntColumnMapper : public IntColumnMapper
- {
- public:
- virtual bool fromXML(const ICassandraSession *session, CassandraStatement *statement, unsigned idx, IPTree *row, const char *name, const char *userVal) override
- {
- if (statement)
- {
- int value = row->getPropInt(name, atoi(userVal));
- statement->bindInt32(idx, value);
- }
- return true;
- }
- } defaultedIntColumnMapper;
- static class BigIntColumnMapper : implements CassandraColumnMapper
- {
- public:
- virtual IPTree *toXML(IPTree *row, const char *name, const CassValue *value) override
- {
- row->addPropInt64(name, getSignedResult(NULL, value));
- return row;
- }
- virtual bool fromXML(const ICassandraSession *session, CassandraStatement *statement, unsigned idx, IPTree *row, const char *name, const char *userVal) override
- {
- if (row->hasProp(name))
- {
- if (statement)
- {
- __int64 value = row->getPropInt64(name);
- statement->bindInt64(idx, value);
- }
- return true;
- }
- else
- return false;
- }
- } bigintColumnMapper;
- static class SimpleMapColumnMapper : implements CassandraColumnMapper
- {
- public:
- virtual IPTree *toXML(IPTree *row, const char *name, const CassValue *value) override
- {
- Owned<IPTree> map = createPTree(name);
- CassandraIterator elems(cass_iterator_from_map(value));
- while (cass_iterator_next(elems))
- {
- rtlDataAttr str;
- unsigned chars;
- getStringResult(NULL, cass_iterator_get_map_key(elems), chars, str.refstr());
- StringAttr s(str.getstr(), chars);
- stringColumnMapper.toXML(map, s, cass_iterator_get_map_value(elems));
- }
- row->addPropTree(name, map.getClear());
- return row;
- }
- virtual bool fromXML(const ICassandraSession *session, CassandraStatement *statement, unsigned idx, IPTree *row, const char *name, const char *userVal) override
- {
- Owned<IPTree> child = row->getPropTree(name);
- if (child)
- {
- unsigned numItems = child->numChildren();
- if (numItems)
- {
- if (statement)
- {
- CassandraCollection collection(cass_collection_new(CASS_COLLECTION_TYPE_MAP, numItems));
- Owned<IPTreeIterator> items = child->getElements("*");
- ForEach(*items)
- {
- IPTree &item = items->query();
- const char *key = item.queryName();
- const char *value = item.queryProp(NULL);
- if (key && value)
- {
- check(cass_collection_append_string(collection, key));
- check(cass_collection_append_string(collection, value));
- }
- }
- statement->bindCollection(idx, collection);
- }
- return true;
- }
- }
- return false;
- }
- } simpleMapColumnMapper;
- static class AttributeMapColumnMapper : implements CassandraColumnMapper
- {
- public:
- virtual IPTree *toXML(IPTree *row, const char *name, const CassValue *value) override
- {
- CassandraIterator elems(cass_iterator_from_map(value));
- while (cass_iterator_next(elems))
- {
- rtlDataAttr str;
- unsigned chars;
- getStringResult(NULL, cass_iterator_get_map_key(elems), chars, str.refstr());
- StringBuffer s("@");
- s.append(chars, str.getstr());
- stringColumnMapper.toXML(row, s, cass_iterator_get_map_value(elems));
- }
- return row;
- }
- virtual bool fromXML(const ICassandraSession *session, CassandraStatement *statement, unsigned idx, IPTree *row, const char *name, const char *userVal) override
- {
- // NOTE - name here provides a list of attributes that we should NOT be mapping
- Owned<IAttributeIterator> attrs = row->getAttributes();
- unsigned numItems = 0;
- ForEach(*attrs)
- {
- StringBuffer key(attrs->queryName());
- key.append('@');
- if (strstr(name, key) == NULL)
- numItems++;
- }
- if (numItems)
- {
- if (statement)
- {
- CassandraCollection collection(cass_collection_new(CASS_COLLECTION_TYPE_MAP, numItems));
- ForEach(*attrs)
- {
- StringBuffer key(attrs->queryName());
- key.append('@');
- if (strstr(name, key) == NULL)
- {
- const char *value = attrs->queryValue();
- check(cass_collection_append_string(collection, attrs->queryName()+1)); // skip the @
- check(cass_collection_append_string(collection, value));
- }
- }
- statement->bindCollection(idx, collection);
- }
- return true;
- }
- else
- return false;
- }
- } attributeMapColumnMapper;
- static class ElementMapColumnMapper : implements CassandraColumnMapper
- {
- public:
- virtual IPTree *toXML(IPTree *row, const char *name, const CassValue *value) override
- {
- CassandraIterator elems(cass_iterator_from_map(value));
- while (cass_iterator_next(elems))
- {
- rtlDataAttr str;
- unsigned chars;
- getStringResult(NULL, cass_iterator_get_map_key(elems), chars, str.refstr());
- StringBuffer elemName(chars, str.getstr());
- stringColumnMapper.toXML(row, elemName, cass_iterator_get_map_value(elems));
- }
- return row;
- }
- virtual bool fromXML(const ICassandraSession *session, CassandraStatement *statement, unsigned idx, IPTree *row, const char *name, const char *userVal) override
- {
- // NOTE - name here provides a list of elements that we should NOT be mapping
- Owned<IPTreeIterator> elems = row->getElements("*");
- unsigned numItems = 0;
- ForEach(*elems)
- {
- IPTree &item = elems->query();
- StringBuffer key('@');
- key.append(item.queryName());
- key.append('@');
- if (strstr(name, key) == NULL)
- {
- const char *value = item.queryProp(".");
- if (value)
- numItems++;
- }
- }
- if (numItems)
- {
- if (statement)
- {
- CassandraCollection collection(cass_collection_new(CASS_COLLECTION_TYPE_MAP, numItems));
- ForEach(*elems)
- {
- IPTree &item = elems->query();
- StringBuffer key('@');
- key.append(item.queryName());
- key.append('@');
- if (strstr(name, key) == NULL)
- {
- const char *value = item.queryProp(".");
- if (value)
- {
- check(cass_collection_append_string(collection, item.queryName()));
- check(cass_collection_append_string(collection, value));
- }
- }
- }
- statement->bindCollection(idx, collection);
- }
- return true;
- }
- else
- return false;
- }
- } elementMapColumnMapper;
- static class SubtreeMapColumnMapper : implements CassandraColumnMapper
- {
- public:
- virtual IPTree *toXML(IPTree *row, const char *name, const CassValue *value) override
- {
- CassandraIterator elems(cass_iterator_from_map(value));
- while (cass_iterator_next(elems))
- {
- rtlDataAttr str;
- unsigned chars;
- getStringResult(NULL, cass_iterator_get_map_key(elems), chars, str.refstr());
- StringBuffer elemName(chars, str.getstr());
- const CassValue *value = cass_iterator_get_map_value(elems);
- StringBuffer valStr;
- getCassString(valStr, value);
- if (valStr.length() && valStr.charAt(0)== '<')
- {
- IPTree *sub = createPTreeFromXMLString(valStr);
- row->setPropTree(elemName, sub);
- }
- }
- return row;
- }
- virtual bool fromXML(const ICassandraSession *session, CassandraStatement *statement, unsigned idx, IPTree *row, const char *name, const char *userVal) override
- {
- // NOTE - name here provides a list of elements that we SHOULD be mapping
- Owned<IPTreeIterator> elems = row->getElements("*");
- unsigned numItems = 0;
- ForEach(*elems)
- {
- IPTree &item = elems->query();
- StringBuffer key("@");
- key.append(item.queryName());
- key.append('@');
- if (strstr(name, key) != NULL)
- numItems++;
- }
- if (numItems)
- {
- if (statement)
- {
- CassandraCollection collection(cass_collection_new(CASS_COLLECTION_TYPE_MAP, numItems));
- ForEach(*elems)
- {
- IPTree &item = elems->query();
- StringBuffer key("@");
- key.append(item.queryName());
- key.append('@');
- if (strstr(name, key) != NULL)
- {
- StringBuffer x;
- ::toXML(&item, x);
- check(cass_collection_append_string(collection, item.queryName()));
- check(cass_collection_append_string(collection, x));
- }
- }
- statement->bindCollection(idx, collection);
- }
- return true;
- }
- else
- return false;
- }
- } subTreeMapColumnMapper;
- /*
- static class QueryTextColumnMapper : public StringColumnMapper
- {
- public:
- virtual IPTree *toXML(IPTree *row, const char *name, const CassValue *value) override
- {
- // Name is "Query/Text ...
- IPTree *query = row->queryPropTree("Query");
- if (!query)
- {
- query = createPTree("Query");
- query = row->setPropTree("Query", query);
- row->setProp("Query/@fetchEntire", "1"); // Compatibility...
- }
- return StringColumnMapper::toXML(query, "Text", value);
- }
- } queryTextColumnMapper;
- */
- static class GraphMapColumnMapper : implements CassandraColumnMapper
- {
- public:
- GraphMapColumnMapper(const char *_elemName, const char *_nameAttr)
- : elemName(_elemName), nameAttr(_nameAttr)
- {
- }
- virtual IPTree *toXML(IPTree *row, const char *name, const CassValue *value) override
- {
- Owned<IPTree> map = createPTree(name);
- CassandraIterator elems(cass_iterator_from_map(value));
- while (cass_iterator_next(elems))
- {
- rtlDataAttr str;
- unsigned chars;
- getStringResult(NULL, cass_iterator_get_map_value(elems), chars, str.refstr());
- Owned<IPTree> child = createPTreeFromXMLString(chars, str.getstr());
- map->addPropTree(elemName, child.getClear());
- }
- row->addPropTree(name, map.getClear());
- return row;
- }
- virtual bool fromXML(const ICassandraSession *session, CassandraStatement *statement, unsigned idx, IPTree *row, const char *name, const char *userVal) override
- {
- Owned<IPTree> child = row->getPropTree(name);
- if (child)
- {
- unsigned numItems = child->numChildren();
- if (numItems)
- {
- if (statement)
- {
- CassandraCollection collection(cass_collection_new(CASS_COLLECTION_TYPE_MAP, numItems));
- Owned<IPTreeIterator> items = child->getElements("*");
- ForEach(*items)
- {
- IPTree &item = items->query();
- const char *key = item.queryProp(nameAttr);
- // MORE - may need to read, and probably should write, compressed. At least for graphs
- StringBuffer value;
- ::toXML(&item, value, 0, 0);
- if (key && value.length())
- {
- check(cass_collection_append_string(collection, key));
- check(cass_collection_append_string(collection, value));
- }
- }
- statement->bindCollection(idx, collection);
- }
- return true;
- }
- }
- return false;
- }
- private:
- const char *elemName;
- const char *nameAttr;
- } graphMapColumnMapper("Graph", "@name"), workflowMapColumnMapper("Item", "@wfid"), associationsMapColumnMapper("File", "@filename"), usedFieldsMapColumnMapper("field", "@name");
- static class WarningsMapColumnMapper : implements CassandraColumnMapper
- {
- public:
- virtual IPTree *toXML(IPTree *row, const char *name, const CassValue *value) override
- {
- CassandraIterator elems(cass_iterator_from_map(value));
- while (cass_iterator_next(elems))
- {
- unsigned code = getUnsignedResult(NULL, cass_iterator_get_map_key(elems));
- VStringBuffer xpath("OnWarnings/OnWarning[@code='%u']", code);
- IPropertyTree * mapping = row->queryPropTree(xpath);
- if (!mapping)
- {
- IPropertyTree * onWarnings = ensurePTree(row, "OnWarnings");
- mapping = onWarnings->addPropTree("OnWarning", createPTree());
- mapping->setPropInt("@code", code);
- }
- rtlDataAttr str;
- unsigned chars;
- getStringResult(NULL, cass_iterator_get_map_value(elems), chars, str.refstr());
- StringBuffer s(chars, str.getstr());
- mapping->setProp("@severity", s);
- }
- return row;
- }
- virtual bool fromXML(const ICassandraSession *session, CassandraStatement *statement, unsigned idx, IPTree *row, const char *name, const char *userVal) override
- {
- if (!row->hasProp("OnWarnings/OnWarning"))
- return false;
- else
- {
- if (statement)
- {
- CassandraCollection collection(cass_collection_new(CASS_COLLECTION_TYPE_MAP, 5));
- Owned<IPTreeIterator> elems = row->getElements("OnWarnings/OnWarning");
- ForEach(*elems)
- {
- IPTree &item = elems->query();
- unsigned code = item.getPropInt("@code", 0);
- const char *value = item.queryProp("@severity");
- if (value)
- {
- check(cass_collection_append_int32(collection, code));
- check(cass_collection_append_string(collection, value));
- }
- }
- statement->bindCollection(idx, collection);
- }
- return true;
- }
- }
- } warningsMapColumnMapper;
- static class PluginListColumnMapper : implements CassandraColumnMapper
- {
- public:
- PluginListColumnMapper(const char *_elemName, const char *_nameAttr)
- : elemName(_elemName), nameAttr(_nameAttr)
- {
- }
- virtual IPTree *toXML(IPTree *row, const char *name, const CassValue *value) override
- {
- Owned<IPTree> map = name ? createPTree(name) : LINK(row);
- CassandraIterator elems(cass_iterator_from_collection(value));
- while (cass_iterator_next(elems))
- {
- Owned<IPTree> child = createPTree(elemName);
- stringColumnMapper.toXML(child, nameAttr, cass_iterator_get_value(elems));
- map->addPropTree(elemName, child.getClear());
- }
- if (name)
- row->addPropTree(name, map.getClear());
- return row;
- }
- virtual bool fromXML(const ICassandraSession *session, CassandraStatement *statement, unsigned idx, IPTree *row, const char *name, const char *userVal) override
- {
- Owned<IPTree> child = row->getPropTree(name);
- if (child)
- {
- unsigned numItems = child->numChildren();
- if (numItems)
- {
- if (statement)
- {
- CassandraCollection collection(cass_collection_new(CASS_COLLECTION_TYPE_LIST, numItems));
- Owned<IPTreeIterator> items = child->getElements("*");
- ForEach(*items)
- {
- IPTree &item = items->query();
- const char *value = item.queryProp(nameAttr);
- if (value)
- check(cass_collection_append_string(collection, value));
- }
- statement->bindCollection(idx, collection);
- }
- return true;
- }
- }
- return false;
- }
- private:
- const char *elemName;
- const char *nameAttr;
- } pluginListColumnMapper("Plugin", "@dllname"), subfileListColumnMapper("Subfile", "@name");
- struct CassandraXmlMapping
- {
- const char *columnName;
- const char *columnType;
- const char *xpath;
- CassandraColumnMapper &mapper;
- };
- struct CassandraTableInfo
- {
- const char *x;
- const CassandraXmlMapping *mappings;
- };
- static const int majorVersion = 1; // If this does not match the value in the repository, you cannot proceed - a conversion tool is needed
- static const int minorVersion = 2; // If this is less that the value in the repository, we should be fine (but there may be columns we don't know about and thus don't read - and will write as NULL in new rows)
- // If this is greater than the value in the repository, we need to update the repository (using add column) and its version before proceeding
- // Make sure to increment this if any column is ever added below
- static const CassandraXmlMapping workunitsMappings [] =
- {
- {"partition", "int", NULL, hashRootNameColumnMapper},
- {"wuid", "text", NULL, rootNameColumnMapper},
- {"clustername", "text", "@clusterName", stringColumnMapper},
- {"jobname", "text", "@jobName", stringColumnMapper},
- {"priorityclass", "text", "@priorityClass", stringColumnMapper},
- {"prioritylevel", "int", "@priorityLevel", intColumnMapper},
- {"wuScope", "text", "@scope", stringColumnMapper},
- {"submitID", "text", "@submitID", stringColumnMapper},
- {"state", "text", "@state", stringColumnMapper},
- {"action", "text", "Action", stringColumnMapper},
- {"protected", "boolean", "@protected", boolColumnMapper},
- {"scheduled", "text", "@timeScheduled", stringColumnMapper}, // Should store as a date?
- {"totalThorTime", "text", "@totalThorTime", stringColumnMapper}, // We store in the wu ptree as a collatable string (with leading spaces to force to one partition)
- {"appvalues", "map<text, text>", "@Application@", subTreeMapColumnMapper},
- {"agentSession", "bigint", "@agentSession", bigintColumnMapper},
- {"debug", "map<text, text>", "Debug", simpleMapColumnMapper},
- {"attributes", "map<text, text>", "@agentSession@wuid@clusterName@jobName@priorityClass@priorityLevel@protected@scope@submitID@state@timeScheduled@totalThorTime@", attributeMapColumnMapper}, // name is the suppression list, note trailing @
- {"plugins", "list<text>", "Plugins", pluginListColumnMapper},
- {"workflow", "map<text, text>", "Workflow", workflowMapColumnMapper},
- {"onWarnings", "map<int, text>", "OnWarnings/OnWarning", warningsMapColumnMapper},
- // These are catchalls for anything not processed above or in a child table
- {"elements", "map<text, text>", "@Action@Application@Debug@Exceptions@Files@FilesRead@Graphs@Results@Statistics@Plugins@Query@State@Variables@Temporaries@Workflow@", elementMapColumnMapper}, // name is the suppression list, note trailing @
- {"subtrees", "map<text, text>", "@Parameters@Process@Tracing@", subTreeMapColumnMapper}, // name is the INCLUSION list, note trailing @
- { NULL, "workunits", "((partition), wuid)|CLUSTERING ORDER BY (wuid DESC)", stringColumnMapper}
- };
- static const CassandraXmlMapping workunitInfoMappings [] = // A cut down version of the workunit mappings - used when querying with no key
- {
- {"partition", "int", NULL, hashRootNameColumnMapper},
- {"wuid", "text", NULL, rootNameColumnMapper},
- {"clustername", "text", "@clusterName", stringColumnMapper},
- {"jobname", "text", "@jobName", stringColumnMapper},
- {"priorityclass", "text", "@priorityClass", stringColumnMapper},
- {"prioritylevel", "int", "@priorityLevel", intColumnMapper},
- {"wuScope", "text", "@scope", stringColumnMapper},
- {"submitID", "text", "@submitID", stringColumnMapper},
- {"state", "text", "@state", stringColumnMapper},
- {"action", "text", "Action", stringColumnMapper},
- {"protected", "boolean", "@protected", boolColumnMapper},
- {"scheduled", "text", "@timeScheduled", stringColumnMapper}, // Should store as a date?
- {"totalThorTime", "text", "@totalThorTime", stringColumnMapper}, // We store in the wu ptree as a collatable string. Need to force to one partition too
- {"appvalues", "map<text, text>", "@Application@", subTreeMapColumnMapper},
- { NULL, "workunits", "((partition), wuid)|CLUSTERING ORDER BY (wuid DESC)", stringColumnMapper}
- };
- // The following describes the search table - this contains copies of the basic wu information but keyed by different fields
- static const CassandraXmlMapping searchMappings [] =
- {
- {"xpath", "text", NULL, suppliedStringColumnMapper},
- {"fieldPrefix", "text", NULL, prefixSearchColumnMapper},
- {"fieldValue", "text", NULL, searchColumnMapper},
- {"wuid", "text", NULL, rootNameColumnMapper},
- {"clustername", "text", "@clusterName", stringColumnMapper},
- {"jobname", "text", "@jobName", stringColumnMapper},
- {"priorityclass", "text", "@priorityClass", stringColumnMapper},
- {"prioritylevel", "int", "@priorityLevel", intColumnMapper},
- {"scope", "text", "@scope", stringColumnMapper},
- {"submitID", "text", "@submitID", stringColumnMapper},
- {"state", "text", "@state", stringColumnMapper},
- {"action", "text", "Action", stringColumnMapper},
- {"protected", "boolean", "@protected", boolColumnMapper},
- {"scheduled", "text", "@timeScheduled", stringColumnMapper}, // Should store as a date?
- {"totalThorTime", "text", "@totalThorTime", stringColumnMapper}, // We store in the wu ptree as a collatable string. Need to force to one partition too
- {"appvalues", "map<text, text>", "@Application@", subTreeMapColumnMapper},
- { NULL, "workunitsSearch", "((xpath, fieldPrefix), fieldValue, wuid)|CLUSTERING ORDER BY (fieldValue ASC, wuid DESC)", stringColumnMapper}
- };
- // The fields we can search by. These are a subset of the fields in the basic workunit info that is returned from a search. A row is created in the search table for each of these, for each workunit.
- const char * searchPaths[] = { "@submitID", "@clusterName", "@jobName", "@priorityClass", "@protected", "@scope", "@state", "@totalThorTime", NULL};
- static const CassandraXmlMapping uniqueSearchMappings [] =
- {
- {"xpath", "text", NULL, suppliedStringColumnMapper},
- {"fieldPrefix", "text", NULL, prefixSearchColumnMapper}, // Leading N chars, upper-cased
- {"fieldValue", "text", NULL, searchColumnMapper}, // upper-cased
- {"origFieldValue", "text", NULL, lcSearchColumnMapper}, // original case
- { NULL, "uniqueSearchValues", "((xpath, fieldPrefix), fieldValue, origFieldValue)|CLUSTERING ORDER BY (fieldValue ASC)", stringColumnMapper}
- };
- // The fields we can wild search by. We store these in the uniqueSearchMappings table so we can translate wildcards into sets
- // We also add application name/key combinations, but we have to special-case that
- const char * wildSearchPaths[] = { "@submitID", "@clusterName", "@jobName", NULL};
- static const CassandraXmlMapping filesSearchMappings [] =
- {
- {"name", "text", "@name", stringColumnMapper},
- {"read", "boolean", "@read", boolColumnMapper},
- {"wuid", "text", NULL, suppliedStringColumnMapper},
- { NULL, "filesSearchValues", "((name, read), wuid)|CLUSTERING ORDER BY (wuid DESC)", stringColumnMapper}
- };
- // The version table is keyed by a partition value because (a) you need to key by something and (b) we can use it to spread the load of
- // version lookups (pick a partition at random).
- // Note that this table must have the same minimum layout on all versions.
- static const CassandraXmlMapping versionMappings [] =
- {
- {"partition", "int", "@partition", intColumnMapper},
- {"major", "int", "@major", intColumnMapper},
- {"minor", "int", "@minor", intColumnMapper},
- {"attributes", "map<text, text>", "@major@minor@partition@", attributeMapColumnMapper}, // name is the suppression list, note trailing @
- { NULL, "version", "((partition))", stringColumnMapper}
- };
- /*
- * Some thoughts on the secondary tables:
- * 1. To support (trailing) wildcards we will need to split the key into two - the leading N chars and the rest. Exactly what N is will depend on the installation size.
- * Too large and users will complain, but too small would hinder partitioning of the values across Cassandra nodes. 1 or 2 may be enough.
- * 2. I could combine all the secondary tables into 1 with a field indicating the type of the key. The key field would be repeated though... Would it help?
- * I'm not sure it really changes a lot - adds a bit of noise into the partitioner...
- * Actually, it does mean that the updates and deletes can all be done with a single Cassandra query, though whether that has any advantages over multiple in a batch I don't know
- * It MAY well make it easier to make sure that searches are case-insensitive, since we'll generally need to separate out the search field from the display field to achieve that
- * 3. Sort orders are tricky - I can use the secondary table to deliver sorted by one field as long as it is the one I am filtering by (but if is is I probably don't need it sorted!)
- *
- */
- // The following describe child tables - all keyed by wuid
- enum ChildTablesEnum { WuQueryChild, WuExceptionsChild, WuStatisticsChild, WuGraphsChild, WuResultsChild, WuVariablesChild, WuTemporariesChild, WuFilesReadChild, WuFilesWrittenChild, WuFieldUsage, ChildTablesSize };
- struct ChildTableInfo
- {
- const char *parentElement;
- const char *childElement;
- ChildTablesEnum index;
- const CassandraXmlMapping *mappings;
- };
- // wuQueries table is slightly unusual among the child tables as is is 1:1 - it is split out for lazy load purposes.
- static const CassandraXmlMapping wuQueryMappings [] =
- {
- {"partition", "int", NULL, hashRootNameColumnMapper},
- {"wuid", "text", NULL, rootNameColumnMapper},
- {"associations", "map<text, text>", "Associated", associationsMapColumnMapper},
- {"attributes", "map<text, text>", "", attributeMapColumnMapper},
- {"query", "text", "Text", stringColumnMapper}, // May want to make this even lazier...
- {"shortQuery", "text", "ShortText", stringColumnMapper},
- { NULL, "wuQueries", "((partition), wuid)", stringColumnMapper}
- };
- static const ChildTableInfo wuQueriesTable =
- {
- "Query", NULL,
- WuQueryChild,
- wuQueryMappings
- };
- // wuExceptions table holds the exceptions associated with a wuid
- static const CassandraXmlMapping wuExceptionsMappings [] =
- {
- {"partition", "int", NULL, hashRootNameColumnMapper},
- {"wuid", "text", NULL, rootNameColumnMapper},
- {"sequence", "int", "@sequence", intColumnMapper},
- {"attributes", "map<text, text>", "", attributeMapColumnMapper},
- {"value", "text", ".", stringColumnMapper},
- { NULL, "wuExceptions", "((partition), wuid, sequence)", stringColumnMapper}
- };
- static const ChildTableInfo wuExceptionsTable =
- {
- "Exceptions", "Exception",
- WuExceptionsChild,
- wuExceptionsMappings
- };
- static const CassandraXmlMapping wuStatisticsMappings [] =
- {
- {"partition", "int", NULL, hashRootNameColumnMapper},
- {"wuid", "text", NULL, rootNameColumnMapper},
- {"ts", "bigint", "@ts", bigintColumnMapper}, // MORE - should change this to a timeuuid ?
- {"kind", "text", "@kind", stringColumnMapper},
- {"creator", "text", "@creator", stringColumnMapper},
- {"scope", "text", "@scope", stringColumnMapper},
- {"attributes", "map<text, text>", "@ts@kind@creator@scope@", attributeMapColumnMapper},
- { NULL, "wuStatistics", "((partition), wuid, ts, kind, creator, scope)", stringColumnMapper}
- };
- static const ChildTableInfo wuStatisticsTable =
- {
- "Statistics", "Statistic",
- WuStatisticsChild,
- wuStatisticsMappings
- };
- static const CassandraXmlMapping wuGraphsMappings [] =
- {
- {"partition", "int", NULL, hashRootNameColumnMapper},
- {"wuid", "text", NULL, rootNameColumnMapper},
- {"name", "text", "@name", stringColumnMapper},
- {"attributes", "map<text, text>", "@name@", attributeMapColumnMapper},
- {"xgmml", "blob", "xgmml", compressTreeColumnMapper},
- { NULL, "wuGraphs", "((partition), wuid, name)", stringColumnMapper} // Note - we do occasionally search by type - but that is done in a postfilter having preloaded/cached all
- };
- static const ChildTableInfo wuGraphsTable =
- {
- "Graphs", "Graph",
- WuGraphsChild,
- wuGraphsMappings
- };
- // A cut down version of the above - note this does not represent a different table!
- static const CassandraXmlMapping wuGraphMetasMappings [] =
- {
- {"partition", "int", NULL, hashRootNameColumnMapper},
- {"wuid", "text", NULL, rootNameColumnMapper},
- {"name", "text", "@name", stringColumnMapper},
- {"attributes", "map<text, text>", "@name@", attributeMapColumnMapper},
- { NULL, "wuGraphs", "((partition), wuid, name)", stringColumnMapper}
- };
- static const ChildTableInfo wuGraphMetasTable =
- {
- "Graphs", "Graph",
- WuGraphsChild,
- wuGraphMetasMappings
- };
- #define resultTableFields \
- {"partition", "int", NULL, hashRootNameColumnMapper}, \
- {"wuid", "text", NULL, rootNameColumnMapper}, \
- {"sequence", "int", "@sequence", defaultedIntColumnMapper}, \
- {"name", "text", "@name", stringColumnMapper}, \
- {"attributes", "map<text, text>", "@sequence@name@", attributeMapColumnMapper}, /* name is the suppression list */ \
- {"rowcount", "int", "rowCount", intColumnMapper}, /* This is the number of rows in result (which may be stored in a file rather than in value) */ \
- {"totalrowcount", "bigint", "totalRowCount", bigintColumnMapper}, /* This is the number of rows in value */ \
- {"schemaRaw", "blob", "SchemaRaw", blobColumnMapper}, \
- {"logicalName", "text", "logicalName", stringColumnMapper}, /* either this or value will be present once result status is "calculated" */ \
- {"value", "blob", "Value", blobColumnMapper}, \
- {"graph", "text", "@graph", stringColumnMapper}, \
- {"activity", "int", "@activity", intColumnMapper}
- static const CassandraXmlMapping wuResultsMappings [] =
- {
- resultTableFields,
- { NULL, "wuResults", "((partition), wuid, sequence)", stringColumnMapper}
- };
- static const ChildTableInfo wuResultsTable =
- {
- "Results", "Result",
- WuResultsChild,
- wuResultsMappings
- };
- // This looks very similar to the above, but the key is different...
- static const CassandraXmlMapping wuVariablesMappings [] =
- {
- resultTableFields,
- {"xmlValue", "text", "xmlValue", stringColumnMapper},
- { NULL, "wuVariables", "((partition), wuid, sequence, name)", stringColumnMapper}
- };
- static const ChildTableInfo wuVariablesTable =
- {
- "Variables", "Variable",
- WuVariablesChild,
- wuVariablesMappings
- };
- // Again, very similar, but mapped to a different area of the XML
- static const CassandraXmlMapping wuTemporariesMappings [] =
- {
- resultTableFields,
- { NULL, "wuTemporaries", "((partition), wuid, sequence, name)", stringColumnMapper}
- };
- static const ChildTableInfo wuTemporariesTable =
- {
- "Temporaries", "Variable",
- WuTemporariesChild,
- wuTemporariesMappings
- };
- static const CassandraXmlMapping wuFilesReadMappings [] =
- {
- {"partition", "int", NULL, hashRootNameColumnMapper},
- {"wuid", "text", NULL, rootNameColumnMapper},
- {"name", "text", "@name", stringColumnMapper},
- {"attributes", "map<text, text>", "@name@", attributeMapColumnMapper}, /* name is the suppression list */
- {"subfiles", "list<text>", NULL, subfileListColumnMapper},
- { NULL, "wuFilesRead", "((partition), wuid, name)", stringColumnMapper}
- };
- static const ChildTableInfo wuFilesReadTable =
- {
- "FilesRead", "File",
- WuFilesReadChild,
- wuFilesReadMappings
- };
- static const CassandraXmlMapping wuFilesWrittenMappings [] =
- {
- {"partition", "int", NULL, hashRootNameColumnMapper},
- {"wuid", "text", NULL, rootNameColumnMapper},
- {"name", "text", "@name", stringColumnMapper},
- {"attributes", "map<text, text>", "@name@", attributeMapColumnMapper}, /* name is the suppression list */
- { NULL, "wuFilesWritten", "((partition), wuid, name)", stringColumnMapper}
- };
- static const ChildTableInfo wuFilesWrittenTable =
- {
- "Files", "File",
- WuFilesWrittenChild,
- wuFilesWrittenMappings
- };
- static const CassandraXmlMapping wuFieldUsageMappings [] =
- {
- {"partition", "int", NULL, hashRootNameColumnMapper},
- {"wuid", "text", NULL, rootNameColumnMapper},
- {"name", "text", "@name", stringColumnMapper},
- {"type", "text", "@type", stringColumnMapper},
- {"numFields", "int", "@numFields", intColumnMapper},
- {"numFieldsUsed", "int", "@numFieldsUsed", intColumnMapper},
- {"fields", "map<text, text>", "fields", usedFieldsMapColumnMapper},
- { NULL, "wuFieldUsage", "((partition), wuid, name)", stringColumnMapper}
- };
- static const ChildTableInfo wuFieldUsageTable =
- {
- "usedsources", "datasource",
- WuFieldUsage,
- wuFieldUsageMappings
- };
- // Order should match the enum above
- static const ChildTableInfo * const childTables [] = { &wuQueriesTable, &wuExceptionsTable, &wuStatisticsTable, &wuGraphsTable, &wuResultsTable, &wuVariablesTable, &wuTemporariesTable, &wuFilesReadTable, &wuFilesWrittenTable, &wuFieldUsageTable, NULL };
- // Graph progress tables are read directly, XML mappers not used
- static const CassandraXmlMapping wuGraphProgressMappings [] =
- {
- {"partition", "int", NULL, hashRootNameColumnMapper},
- {"wuid", "text", NULL, rootNameColumnMapper},
- {"graphID", "text", NULL, stringColumnMapper},
- {"subgraphID", "bigint", NULL, bigintColumnMapper},
- {"creator", "text", NULL, stringColumnMapper},
- {"progress", "blob", NULL, blobColumnMapper},
- { NULL, "wuGraphProgress", "((partition), wuid, graphID, subgraphID, creator)", stringColumnMapper}
- };
- static const CassandraXmlMapping wuGraphStateMappings [] =
- {
- {"partition", "int", NULL, hashRootNameColumnMapper},
- {"wuid", "text", NULL, rootNameColumnMapper},
- {"graphID", "text", NULL, stringColumnMapper},
- {"subgraphID", "bigint", NULL, bigintColumnMapper},
- {"state", "int", NULL, intColumnMapper},
- { NULL, "wuGraphState", "((partition), wuid, graphID, subgraphID)", stringColumnMapper}
- };
- static const CassandraXmlMapping wuGraphRunningMappings [] =
- {
- {"partition", "int", NULL, hashRootNameColumnMapper},
- {"wuid", "text", NULL, rootNameColumnMapper},
- {"graphID", "text", NULL, stringColumnMapper},
- {"subgraphID", "bigint", NULL, bigintColumnMapper},
- { NULL, "wuGraphRunning", "((partition), wuid)", stringColumnMapper}
- };
- void getBoundFieldNames(const ICassandraSession *session, const CassandraXmlMapping *mappings, StringBuffer &names, StringBuffer &bindings, IPTree *inXML, const char *userVal, StringBuffer &tableName)
- {
- while (mappings->columnName)
- {
- if (!inXML || mappings->mapper.fromXML(session, NULL, 0, inXML, mappings->xpath, userVal))
- {
- names.appendf(",%s", mappings->columnName);
- if (strcmp(mappings->columnType, "timeuuid")==0)
- bindings.appendf(",now()");
- else
- bindings.appendf(",?");
- }
- mappings++;
- }
- tableName.append(mappings->columnType);
- }
- void getFieldNames(const CassandraXmlMapping *mappings, StringBuffer &names, StringBuffer &tableName)
- {
- while (mappings->columnName)
- {
- names.appendf(",%s", mappings->columnName);
- mappings++;
- }
- tableName.append(mappings->columnType);
- }
- const char *queryTableName(const CassandraXmlMapping *mappings)
- {
- while (mappings->columnName)
- mappings++;
- return mappings->columnType;
- }
- StringBuffer & describeTable(const CassandraXmlMapping *mappings, StringBuffer &out)
- {
- StringBuffer fields;
- while (mappings->columnName)
- {
- fields.appendf("%s %s,", mappings->columnName, mappings->columnType);
- mappings++;
- }
- StringArray options;
- options.appendList(mappings->xpath, "|");
- assertex(options.length()); // Primary key at least should be present!
- out.appendf("CREATE TABLE IF NOT EXISTS %s (%s PRIMARY KEY %s)", mappings->columnType, fields.str(), options.item(0));
- unsigned idx = 1;
- while (options.isItem(idx))
- {
- if (idx==1)
- out.append(" WITH ");
- else
- out.append(", ");
- out.append(options.item(idx));
- idx++;
- }
- out.append(';');
- return out;
- }
- const CassResult *executeQuery(CassSession *session, CassStatement *statement)
- {
- CassandraFuture future(cass_session_execute(session, statement));
- future.wait("executeQuery");
- return cass_future_get_result(future);
- }
- void deleteSecondaryByKey(const char * xpath, const char *key, const char *wuid, const ICassandraSession *sessionCache, CIArrayOf<CassandraStatement> &batch)
- {
- if (key)
- {
- StringBuffer ucKey(key);
- ucKey.toUpperCase();
- StringBuffer names;
- StringBuffer tableName;
- getFieldNames(searchMappings, names, tableName);
- VStringBuffer deleteQuery("DELETE from %s where xpath=? and fieldPrefix=? and fieldValue=? and wuid=?;", tableName.str());
- CassandraStatement &update = *new CassandraStatement(sessionCache->prepareStatement(deleteQuery));
- update.bindString(0, xpath);
- update.bindString_n(1, ucKey, sessionCache->queryPrefixSize());
- update.bindString(2, ucKey);
- update.bindString(3, wuid);
- batch.append(update);
- }
- }
- void executeSimpleCommand(CassSession *session, const char *command)
- {
- CassandraStatement statement(cass_statement_new(command, 0));
- CassandraFuture future(cass_session_execute(session, statement));
- future.wait("execute");
- }
- void ensureTable(CassSession *session, const CassandraXmlMapping *mappings)
- {
- StringBuffer schema;
- executeSimpleCommand(session, describeTable(mappings, schema));
- }
- extern void simpleXMLtoCassandra(const ICassandraSession *session, CassBatch *batch, const CassandraXmlMapping *mappings, IPTree *inXML, const char *userVal = NULL)
- {
- StringBuffer names;
- StringBuffer bindings;
- StringBuffer tableName;
- getBoundFieldNames(session, mappings, names, bindings, inXML, userVal, tableName);
- VStringBuffer insertQuery("INSERT into %s (%s) values (%s);", tableName.str(), names.str()+1, bindings.str()+1);
- CassandraStatement update(session->prepareStatement(insertQuery));
- unsigned bindidx = 0;
- while (mappings->columnName)
- {
- if (mappings->mapper.fromXML(session, &update, bindidx, inXML, mappings->xpath, userVal))
- bindidx++;
- mappings++;
- }
- check(cass_batch_add_statement(batch, update));
- }
- extern void simpleXMLtoCassandra(const ICassandraSession *session, CIArrayOf<CassandraStatement> &batch, const CassandraXmlMapping *mappings, IPTree *inXML, const char *userVal = NULL)
- {
- StringBuffer names;
- StringBuffer bindings;
- StringBuffer tableName;
- getBoundFieldNames(session, mappings, names, bindings, inXML, userVal, tableName);
- VStringBuffer insertQuery("INSERT into %s (%s) values (%s);", tableName.str(), names.str()+1, bindings.str()+1);
- CassandraStatement &update = *new CassandraStatement(session->prepareStatement(insertQuery));
- unsigned bindidx = 0;
- while (mappings->columnName)
- {
- if (mappings->mapper.fromXML(session, &update, bindidx, inXML, mappings->xpath, userVal))
- bindidx++;
- mappings++;
- }
- batch.append(update);
- }
- extern void deleteFileSearch(const ICassandraSession *session, CIArrayOf<CassandraStatement> &batch, const char *name, bool read, const char *wuid)
- {
- StringBuffer names;
- StringBuffer tableName;
- getFieldNames(filesSearchMappings, names, tableName);
- VStringBuffer deleteQuery("DELETE from %s where name=? and read=? and wuid=?", tableName.str());
- CassandraStatement &update = *new CassandraStatement(session->prepareStatement(deleteQuery));
- update.bindString(0, name);
- update.bindBool(1, read ? cass_true : cass_false);
- update.bindString(2, wuid);
- batch.append(update);
- }
- extern void addFileSearch(const ICassandraSession *session, CIArrayOf<CassandraStatement> &batch, const char *name, bool read, const char *wuid)
- {
- StringBuffer bindings;
- StringBuffer names;
- StringBuffer tableName;
- getBoundFieldNames(session, filesSearchMappings, names, bindings, NULL, NULL, tableName);
- VStringBuffer insertQuery("INSERT INTO %s (%s) values (%s)", tableName.str(), names.str()+1, bindings.str()+1);
- CassandraStatement &update = *new CassandraStatement(session->prepareStatement(insertQuery));
- update.bindString(0, name);
- update.bindBool(1, read ? cass_true : cass_false);
- update.bindString(2, wuid);
- batch.append(update);
- }
- extern void addUniqueValue(const ICassandraSession *session, CIArrayOf<CassandraStatement> &batch, const char *xpath, const char *value)
- {
- StringBuffer bindings;
- StringBuffer names;
- StringBuffer tableName;
- getBoundFieldNames(session, uniqueSearchMappings, names, bindings, NULL, NULL, tableName);
- VStringBuffer insertQuery("INSERT into %s (%s) values (%s);", tableName.str(), names.str()+1, bindings.str()+1);
- CassandraStatement &update = *new CassandraStatement(session->prepareStatement(insertQuery));
- update.bindString(0, xpath);
- StringBuffer ucValue(value);
- ucValue.toUpperCase();
- update.bindString_n(1, ucValue, session->queryPrefixSize());
- update.bindString(2, ucValue);
- update.bindString(3, value);
- batch.append(update);
- }
- extern void childXMLRowtoCassandra(const ICassandraSession *session, CassBatch *batch, const CassandraXmlMapping *mappings, const char *wuid, IPTree &row, const char *userVal)
- {
- StringBuffer bindings;
- StringBuffer names;
- StringBuffer tableName;
- // Note that we bind all fields, even where there is no value in the XML
- // This ensures that values are correctly deleted where necessary - it also has
- // the fortuitous benefit of reducing the number of variants of the query that we need to prepare and cache.
- getBoundFieldNames(session, mappings, names, bindings, NULL, userVal, tableName);
- VStringBuffer insertQuery("INSERT into %s (%s) values (%s);", tableName.str(), names.str()+1, bindings.str()+1);
- CassandraStatement update(session->prepareStatement(insertQuery));
- update.bindInt32(0, rtlHash32VStr(wuid, 0) % session->queryPartitions());
- update.bindString(1, wuid);
- unsigned colidx = 2; // We already bound wuid and partition
- while (mappings[colidx].columnName)
- {
- if (!mappings[colidx].mapper.fromXML(session, &update, colidx, &row, mappings[colidx].xpath, userVal))
- update.bindNull(colidx);
- colidx++;
- }
- check(cass_batch_add_statement(batch, update));
- }
- extern unsigned childCount(const ICassandraSession *session, const CassandraXmlMapping *mappings, const char *wuid)
- {
- VStringBuffer countQuery("SELECT count(*) FROM %s WHERE partition=? AND wuid=?;", queryTableName(mappings));
- CassandraStatement count(session->prepareStatement(countQuery));
- count.bindInt32(0, rtlHash32VStr(wuid, 0) % session->queryPartitions());
- count.bindString(1, wuid);
- CassandraFuture future(cass_session_execute(session->querySession(), count));
- future.wait("select count(*)");
- CassandraResult result(cass_future_get_result(future));
- return getUnsignedResult(NULL, getSingleResult(result));
- }
- extern void childXMLtoCassandra(const ICassandraSession *session, CassBatch *batch, const CassandraXmlMapping *mappings, const char *wuid, IPTreeIterator *elements, const char *userVal)
- {
- if (elements->first())
- {
- do
- {
- childXMLRowtoCassandra(session, batch, mappings, wuid, elements->query(), userVal);
- }
- while (elements->next());
- }
- }
- extern void childXMLtoCassandra(const ICassandraSession *session, CassBatch *batch, const CassandraXmlMapping *mappings, IPTree *inXML, const char *xpath, const char *defaultValue)
- {
- Owned<IPTreeIterator> elements = inXML->getElements(xpath);
- childXMLtoCassandra(session, batch, mappings, inXML->queryName(), elements, defaultValue);
- }
- static IPTree *rowToPTree(const char *xpath, const char *key, const CassandraXmlMapping *mappings, const CassRow *row)
- {
- CassandraIterator cols(cass_iterator_from_row(row));
- Owned<IPTree> xml = createPTree("row"); // May be overwritten below if wuid field is processed
- if (xpath && *xpath && key && *key)
- xml->setProp(xpath, key);
- while (cass_iterator_next(cols))
- {
- assertex(mappings->columnName);
- const CassValue *value = cass_iterator_get_column(cols);
- if (value && !cass_value_is_null(value))
- mappings->mapper.toXML(xml, mappings->xpath, value);
- mappings++;
- }
- return xml.getClear();
- }
- /*
- * PostFilter represents a filter to be applied to a ConstWorkUnitInfo tree representation prior to returning it from an iterator
- */
- interface IPostFilter : public IInterface
- {
- virtual bool matches(IPTree &p) const = 0;
- virtual const char *queryValue() const = 0;
- virtual const char *queryXPath() const = 0;
- virtual WUSortField queryField() const = 0;
- };
- class PostFilter : public CInterfaceOf<IPostFilter>
- {
- public:
- PostFilter(WUSortField _field, const char *_value, bool _wild)
- : field(_field), xpath(queryFilterXPath(_field)), wild(_wild)
- {
- setValue(_value);
- }
- virtual bool matches(IPTree &p) const
- {
- const char *val = p.queryProp(xpath);
- if (val)
- return wild ? WildMatch(val, pattern) : strieq(val, pattern);
- else
- return false;
- }
- virtual const char *queryValue() const
- {
- return value.str();
- }
- void setValue(const char *_value)
- {
- if (wild)
- {
- VStringBuffer filter("*%s*", _value);
- pattern.set(filter);
- }
- else
- pattern.set(_value);
- value.set(_value);
- }
- virtual const char *queryXPath() const
- {
- return xpath;
- }
- virtual WUSortField queryField() const
- {
- return field;
- }
- protected:
- const char *xpath;
- StringAttr pattern;
- StringAttr value;
- WUSortField field;
- bool wild;
- };
- class MultiValuePostFilter : public PostFilter
- {
- public:
- MultiValuePostFilter(WUSortField _field, const char *_value)
- : PostFilter(_field, _value, false)
- {
- setValue(_value);
- }
- virtual bool matches(IPTree &p) const
- {
- const char *val = p.queryProp(xpath);
- if (val)
- {
- ForEachItemIn(idx, values)
- {
- if (strieq(val, values.item(idx)))
- return true;
- }
- }
- return false;
- }
- void setValue(const char *_value)
- {
- values.appendList(_value, "|");
- }
- private:
- StringArray values;
- };
- class AppValuePostFilter : public CInterfaceOf<IPostFilter>
- {
- public:
- AppValuePostFilter(const char *_name, const char *_value, bool _wild) : wild(_wild)
- {
- xpath.appendf("Application/%s", _name);
- setValue(_value);
- }
- virtual bool matches(IPTree &p) const
- {
- const char *val = p.queryProp(xpath);
- if (val)
- return wild ? WildMatch(val, pattern) : strieq(val, pattern);
- else
- return false;
- }
- virtual const char *queryValue() const
- {
- return value.str();
- }
- void setValue(const char *_value)
- {
- if (wild)
- {
- VStringBuffer filter("*%s*", _value);
- pattern.set(filter);
- }
- else
- pattern.set(_value);
- value.set(_value);
- }
- virtual const char *queryXPath() const
- {
- return xpath;
- }
- virtual WUSortField queryField() const
- {
- return WUSFappvalue;
- }
- private:
- StringBuffer xpath;
- StringAttr pattern;
- StringAttr value;
- bool wild;
- };
- class CassSortableIterator : public CassandraIterator
- {
- public:
- CassSortableIterator(CassIterator *_iterator, unsigned _idx, int _compareColumn, bool _descending)
- : CassandraIterator(_iterator), idx(_idx), compareColumn(_compareColumn), descending(_descending)
- {
- }
- const CassSortableIterator *nextRow()
- {
- if (iterator && cass_iterator_next(iterator))
- {
- if (compareColumn != -1)
- {
- const CassRow *row = cass_iterator_get_row(iterator);
- getCassString(value.clear(), cass_row_get_column(row, compareColumn));
- }
- return this;
- }
- else
- return NULL;
- }
- void stop()
- {
- value.clear();
- set(NULL);
- }
- int compare(const CassSortableIterator *to) const
- {
- if (compareColumn==-1)
- return idx - to->idx; // concat mode
- int ret = strcmp(value, to->value); // Note - empty StringBuffer always returns ""
- return descending ? -ret : ret;
- }
- private:
- StringBuffer value;
- unsigned idx;
- int compareColumn;
- bool descending;
- };
- interface IConstWorkUnitIteratorEx : public IConstWorkUnitIterator
- {
- virtual bool hasPostFilters() const = 0;
- virtual bool isMerging() const = 0;
- virtual void notePosition() const = 0;
- };
- /*
- *
- * The cache entries serve two purposes:
- *
- * 1. They allow us to map row numbers to values for the end of each page returned, which can make forward paging efficient when not post-sorting
- * 2. They allow us to preserve post-sort results in order to avoid having to re-retrieve them.
- */
- class CCassandraWuUQueryCacheEntry : public CInterfaceOf<IInterface>
- {
- public:
- CCassandraWuUQueryCacheEntry()
- {
- hint = get_cycles_now(); // MORE - should do better perhaps?
- lastAccess = msTick();
- }
- __int64 queryHint() const
- {
- return hint;
- }
- void noteWuid(const char *wuid, const char *fieldValue, unsigned row)
- {
- CriticalBlock b(crit);
- // NOTE - we store one set of row information per page retrieved - and we normally traverse the pages
- // in order so appending to the end is better than (for example) binchopping
- ForEachItemInRev(idx, rows)
- {
- unsigned foundRow = rows.item(idx);
- if (foundRow==row)
- {
- assert(streq(wuids.item(idx), wuid));
- assert(streq(fieldValues.item(idx), fieldValue));
- return;
- }
- if (foundRow < row)
- break;
- }
- rows.add(row, idx+1);
- wuids.add(wuid, idx+1);
- fieldValues.add(fieldValue ? fieldValue : "", idx+1);
- }
- IConstWorkUnitIteratorEx *getResult() const
- {
- CriticalBlock b(crit);
- return result.getLink();
- }
- void setResult(IConstWorkUnitIteratorEx *_result)
- {
- CriticalBlock b(crit);
- result.set(_result);
- }
- unsigned lookupStartRow(StringBuffer &wuid, StringBuffer &fieldValue, unsigned startOffset) const
- {
- // See if we can provide a base wuid to search above/below
- CriticalBlock b(crit);
- ForEachItemInRev(idx, rows)
- {
- unsigned foundRow = rows.item(idx);
- if (foundRow <= startOffset)
- {
- wuid.set(wuids.item(idx));
- fieldValue.set(fieldValues.item(idx));
- return foundRow;
- }
- }
- return 0;
- }
- void touch()
- {
- lastAccess = msTick();
- }
- inline unsigned queryLastAccess() const
- {
- return lastAccess;
- }
- private:
- mutable CriticalSection crit; // It's POSSIBLE that we could get two queries in hitting the cache at the same time, I think...
- UnsignedArray rows;
- StringArray wuids;
- StringArray fieldValues;
- Owned<IConstWorkUnitIteratorEx> result;
- __uint64 hint;
- unsigned lastAccess;
- };
- class CassMultiIterator : public CInterface, implements IRowProvider, implements ICompare, implements IConstWorkUnitIteratorEx
- {
- public:
- IMPLEMENT_IINTERFACE;
- CassMultiIterator(CCassandraWuUQueryCacheEntry *_cache, unsigned _startRowNum, int _compareColumn, bool _descending)
- : cache(_cache)
- {
- compareColumn = _compareColumn;
- descending = _descending;
- startRowNum = _startRowNum;
- rowNum = 0;
- }
- void setStartOffset(unsigned start)
- {
- startRowNum = start; // we managed to do a seek forward via a filter
- }
- void setCompareColumn(int _compareColumn)
- {
- assert(!inputs.length());
- compareColumn = _compareColumn;
- }
- void addResult(CassandraResult &result)
- {
- results.append(result);
- }
- void addPostFilters(IArrayOf<IPostFilter> &filters, unsigned start)
- {
- unsigned len = filters.length();
- while (start<len)
- postFilters.append(OLINK(filters.item(start++)));
- }
- void addPostFilter(PostFilter &filter)
- {
- postFilters.append(filter);
- }
- virtual bool hasPostFilters() const
- {
- return postFilters.length() != 0;
- }
- virtual bool isMerging() const
- {
- return results.length() > 1;
- }
- virtual bool first()
- {
- inputs.kill();
- ForEachItemIn(idx, results)
- {
- inputs.append(*new CassSortableIterator(cass_iterator_from_result(results.item(idx)), idx, compareColumn, descending));
- }
- merger.setown(createRowStreamMerger(inputs.length(), *this, this, false));
- rowNum = startRowNum;
- return next();
- }
- virtual void notePosition() const
- {
- if (cache && current)
- {
- cache->noteWuid(current->queryWuid(), lastThorTime, rowNum);
- }
- }
- virtual bool next()
- {
- Owned<IConstWorkUnitInfo> last = current.getClear();
- for (;;)
- {
- const CassandraIterator *nextSource = nextMergedSource();
- if (!nextSource)
- {
- if (cache && last)
- {
- cache->noteWuid(last->queryWuid(), lastThorTime, rowNum);
- }
- return false;
- }
- Owned<IPTree> wuXML = rowToPTree(NULL, NULL, workunitInfoMappings+1, cass_iterator_get_row(*nextSource)); // NOTE - this is relying on search mappings and wuInfoMappings being the same
- bool postFiltered = false;
- ForEachItemIn(pfIdx, postFilters)
- {
- if (!postFilters.item(pfIdx).matches(*wuXML))
- {
- postFiltered = true;
- break;
- }
- }
- if (!postFiltered)
- {
- current.setown(createConstWorkUnitInfo(*wuXML));
- lastThorTime.set(wuXML->queryProp("@totalThorTime"));
- rowNum++;
- return true;
- }
- }
- }
- virtual bool isValid()
- {
- return current != NULL;
- }
- virtual IConstWorkUnitInfo & query()
- {
- assertex(current);
- return *current.get();
- }
- const CassandraIterator *nextMergedSource()
- {
- return (const CassSortableIterator *) merger->nextRow();
- }
- protected:
- virtual void linkRow(const void *row) { }
- virtual void releaseRow(const void *row) { }
- virtual const void *nextRow(unsigned idx)
- {
- CassSortableIterator &it = inputs.item(idx);
- return it.nextRow(); // returns either a pointer to the iterator, or NULL
- }
- virtual void stop(unsigned idx)
- {
- inputs.item(idx).stop();
- }
- virtual int docompare(const void *a, const void *b) const
- {
- // a and b point to to CassSortableIterator objects
- const CassSortableIterator *aa = (const CassSortableIterator *) a;
- const CassSortableIterator *bb = (const CassSortableIterator *) b;
- return aa->compare(bb);
- }
- private:
- IArrayOf<CassandraResult> results;
- IArrayOf<CassSortableIterator> inputs;
- Owned<IRowStream> merger; // NOTE - must be destroyed before inputs is destroyed
- IArrayOf<IPostFilter> postFilters;
- Owned<IConstWorkUnitInfo> current;
- Linked<CCassandraWuUQueryCacheEntry> cache;
- StringAttr lastThorTime;
- int compareColumn;
- unsigned startRowNum;
- unsigned rowNum;
- bool descending;
- };
- class CassPostSortIterator : public CInterfaceOf<IConstWorkUnitIteratorEx>, implements ICompare
- {
- public:
- CassPostSortIterator(IConstWorkUnitIterator * _input, unsigned _sortorder, unsigned _limit)
- : input(_input), sortorder(_sortorder), limit(_limit)
- {
- idx = 0;
- }
- virtual bool first()
- {
- if (input)
- {
- readFirst();
- input.clear();
- }
- idx = 0;
- return sorted.isItem(idx);
- }
- virtual bool next()
- {
- idx++;
- if (sorted.isItem(idx))
- return true;
- return false;
- }
- virtual void notePosition() const
- {
- }
- virtual bool isValid()
- {
- return sorted.isItem(idx);
- }
- virtual IConstWorkUnitInfo & query()
- {
- return sorted.item(idx);
- }
- virtual bool hasPostFilters() const
- {
- return false; // they are done by my input. But we may want to rename this function to indicate "may return more than asked" in which case would be true
- }
- virtual bool isMerging() const
- {
- return false;
- }
- private:
- void readFirst()
- {
- ForEach(*input)
- {
- sorted.append(OLINK(input->query()));
- if (sorted.length()>=limit)
- break;
- }
- qsortvec((void **)sorted.getArray(0), sorted.length(), *this);
- }
- virtual int docompare(const void *a, const void *b) const
- {
- // a and b point to to IConstWorkUnitInfo objects
- const IConstWorkUnitInfo *aa = (const IConstWorkUnitInfo *) a;
- const IConstWorkUnitInfo *bb = (const IConstWorkUnitInfo *) b;
- int diff;
- switch (sortorder & 0xff)
- {
- case WUSFuser:
- diff = stricmp(aa->queryUser(), bb->queryUser());
- break;
- case WUSFcluster:
- diff = stricmp(aa->queryClusterName(), bb->queryClusterName());
- break;
- case WUSFjob:
- diff = stricmp(aa->queryJobName(), bb->queryJobName());
- break;
- case WUSFstate:
- diff = stricmp(aa->queryStateDesc(), bb->queryStateDesc());
- break;
- case WUSFprotected:
- diff = (int) bb->isProtected() - (int) aa->isProtected();
- break;
- case WUSFtotalthortime:
- diff = (int) (bb->getTotalThorTime() - bb->getTotalThorTime());
- break;
- case WUSFwuid:
- diff = stricmp(aa->queryWuid(), bb->queryWuid()); // Should never happen, since we always fetch with a wuid sort
- break;
- default:
- throwUnexpected();
- }
- if (sortorder & WUSFreverse)
- return -diff;
- else
- return diff;
- }
- Owned<IConstWorkUnitIterator> input;
- IArrayOf<IConstWorkUnitInfo> sorted;
- unsigned sortorder;
- unsigned idx;
- unsigned limit;
- };
- class SubPageIterator : public CInterfaceOf<IConstWorkUnitIteratorEx>
- {
- public:
- SubPageIterator(IConstWorkUnitIteratorEx *_input, unsigned _startOffset, unsigned _pageSize)
- : input(_input), startOffset(_startOffset), pageSize(_pageSize), idx(0)
- {
- }
- virtual bool first()
- {
- idx = 0;
- // MORE - put a seek into the Ex interface
- if (input->first())
- {
- for (int i = 0; i < startOffset;i++)
- {
- if (!input->next())
- return false;
- }
- return true;
- }
- else
- return false;
- }
- virtual bool next()
- {
- idx++;
- if (idx >= pageSize)
- {
- input->notePosition();
- return false;
- }
- return input->next();
- }
- virtual void notePosition() const
- {
- input->notePosition();
- }
- virtual bool isValid()
- {
- return idx < pageSize && input->isValid();
- }
- virtual IConstWorkUnitInfo & query()
- {
- return input->query();
- }
- virtual bool hasPostFilters() const
- {
- return false;
- }
- virtual bool isMerging() const
- {
- return false;
- }
- private:
- Owned<IConstWorkUnitIteratorEx> input;
- unsigned startOffset;
- unsigned pageSize;
- unsigned idx;
- };
- class CassJoinIterator : implements IConstWorkUnitIteratorEx, public CInterface
- {
- public:
- IMPLEMENT_IINTERFACE;
- CassJoinIterator(unsigned _compareColumn, bool _descending)
- {
- compareColumn = _compareColumn;
- descending = _descending;
- }
- void addResult(CassandraResult &result)
- {
- results.append(result);
- }
- void addPostFilter(IPostFilter &post)
- {
- postFilters.append(post);
- }
- virtual bool first()
- {
- if (!results.length())
- return false;
- inputs.kill();
- ForEachItemIn(idx, results)
- {
- Owned <CassSortableIterator> input = new CassSortableIterator(cass_iterator_from_result(results.item(idx)), idx, compareColumn, descending);
- if (!input->nextRow())
- return false;
- inputs.append(*input.getClear());
- }
- return next();
- }
- virtual bool next()
- {
- current.clear();
- for (;;)
- {
- unsigned idx = 0;
- unsigned target = 0;
- unsigned matches = 1; // I always match myself!
- unsigned sources = inputs.length();
- if (!sources)
- return false;
- while (matches < sources)
- {
- idx++;
- if (idx==sources)
- idx = 0;
- int diff;
- for (;;)
- {
- assert(idx != target);
- diff = inputs.item(idx).compare(&inputs.item(target));
- if (diff >= 0)
- break;
- if (!inputs.item(idx).nextRow())
- {
- inputs.kill(); // Once any reaches EOF, we are done
- return false;
- }
- }
- if (diff > 0)
- {
- target = idx;
- matches = 1;
- }
- else
- matches++;
- }
- Owned<IPTree> wuXML = rowToPTree(NULL, NULL, workunitInfoMappings+1, cass_iterator_get_row(inputs.item(0)));
- bool postFiltered = false;
- ForEachItemIn(pfIdx, postFilters)
- {
- if (!postFilters.item(pfIdx).matches(*wuXML))
- {
- postFiltered = true;
- break;
- }
- }
- if (!postFiltered)
- {
- current.setown(createConstWorkUnitInfo(*wuXML));
- ForEachItemIn(idx2, inputs)
- {
- if (!inputs.item(idx2).nextRow())
- {
- inputs.clear(); // Make sure next() fails next time it is called
- break;
- }
- }
- return true;
- }
- }
- }
- virtual bool isValid()
- {
- return current != NULL;
- }
- virtual IConstWorkUnitInfo & query()
- {
- assertex(current);
- return *current.get();
- }
- private:
- IArrayOf<CassandraResult> results;
- IArrayOf<CassSortableIterator> inputs;
- IArrayOf<IPostFilter> postFilters;
- Owned<IConstWorkUnitInfo> current;
- unsigned compareColumn;
- bool descending;
- };
- static void lockWuid(Owned<IRemoteConnection> &connection, const char *wuid)
- {
- VStringBuffer wuRoot("/WorkUnitLocks/%s", wuid);
- if (connection)
- connection->changeMode(RTM_LOCK_WRITE, SDS_LOCK_TIMEOUT); // Would it ever be anything else?
- else
- connection.setown(querySDS().connect(wuRoot.str(), myProcessSession(), RTM_LOCK_WRITE|RTM_CREATE_QUERY, SDS_LOCK_TIMEOUT));
- if (!connection)
- throw makeStringExceptionV(WUERR_LockFailed, "Failed to get connection for xpath %s", wuRoot.str());
- }
- class CCassandraWorkUnit : public CPersistedWorkUnit
- {
- public:
- CCassandraWorkUnit(ICassandraSession *_sessionCache, IPTree *wuXML, ISecManager *secmgr, ISecUser *secuser, IRemoteConnection *_daliLock, bool _allDirty)
- : sessionCache(_sessionCache), CPersistedWorkUnit(secmgr, secuser), daliLock(_daliLock), allDirty(_allDirty)
- {
- CPersistedWorkUnit::loadPTree(wuXML);
- memset(childLoaded, 0, sizeof(childLoaded));
- actionChanged = false;
- stateChanged = false;
- abortDirty = false;
- }
- ~CCassandraWorkUnit()
- {
- }
- virtual void forceReload()
- {
- synchronized sync(locked); // protect locked workunits (uncommitted writes) from reload
- loadPTree(sessionCache->cassandraToWorkunitXML(queryWuid()));
- memset(childLoaded, 0, sizeof(childLoaded));
- allDirty = false;
- actionChanged = false;
- stateChanged = false;
- abortDirty = true;
- }
- void executeBatch(CassandraBatch &batch, const char * what) const
- {
- if (sessionCache->queryTraceLevel() > 1)
- DBGLOG("Executing batch %s", what);
- batch.execute(sessionCache->querySession(), what);
- }
- void executeAsync(CIArrayOf<CassandraStatement> &batch, const char * what) const
- {
- if (sessionCache->queryTraceLevel() > 1)
- DBGLOG("Executing async batch %s (%d elements)", what, batch.length());
- sessionCache->executeAsync(batch, what);
- }
- virtual void cleanupAndDelete(bool deldll, bool deleteOwned, const StringArray *deleteExclusions)
- {
- const char *wuid = queryWuid();
- CPersistedWorkUnit::cleanupAndDelete(deldll, deleteOwned, deleteExclusions);
- // Note we need to gather the information about what secondaries to delete before we delete the parent/children,
- // but we actually do the deletion afterwards
- CIArrayOf<CassandraStatement> deleteSearches;
- deleteSecondaries(wuid, deleteSearches);
- CassandraBatch mainBatch(CASS_BATCH_TYPE_UNLOGGED);
- deleteChildren(wuid, mainBatch);
- sessionCache->deleteChildByWuid(wuGraphProgressMappings, wuid, mainBatch);
- sessionCache->deleteChildByWuid(wuGraphStateMappings, wuid, mainBatch);
- sessionCache->deleteChildByWuid(wuGraphRunningMappings, wuid, mainBatch);
- // If the partitioning of the main workunits table does not match the partitioning of the other tables, then would be better to
- // execute the deletes of the child tables and the main record as two separate batches.
- CassandraStatement update(sessionCache->prepareStatement("DELETE from workunits where partition=? and wuid=?;"));
- update.bindInt32(0, rtlHash32VStr(wuid, 0) % sessionCache->queryPartitions());
- update.bindString(1, wuid);
- check(cass_batch_add_statement(mainBatch, update));
- executeBatch(mainBatch, "delete wu");
- executeAsync(deleteSearches, "delete wu");
- }
- virtual void commit()
- {
- CPersistedWorkUnit::commit();
- if (sessionCache->queryTraceLevel() >= 8)
- {
- StringBuffer s; toXML(p, s); DBGLOG("CCassandraWorkUnit::commit\n%s", s.str());
- }
- CIArrayOf<CassandraStatement> secondaryBatch;
- CassandraBatch batch(CASS_BATCH_TYPE_UNLOGGED);
- Owned<CassandraBatch> deletesBatch;
- const char *wuid = queryWuid();
- bool isGlobal = streq(wuid, GLOBAL_WORKUNIT);
- if (!isGlobal) // Global workunit only has child rows, no parent
- {
- if (prev) // Holds the values of the "basic" info at the last commit
- updateSecondaries(wuid, secondaryBatch);
- simpleXMLtoCassandra(sessionCache, batch, workunitsMappings, p); // This just does the parent row
- }
- if (allDirty && !isGlobal)
- {
- // MORE - this delete is technically correct, but if we assert that the only place that copyWorkUnit is used is to populate an
- // empty newly-created WU, it is unnecessary.
- // deleteChildren(wuid, deletesBatch);
- // MORE can use the table?
- childXMLtoCassandra(sessionCache, batch, wuGraphsMappings, p, "Graphs/Graph", 0);
- childXMLtoCassandra(sessionCache, batch, wuResultsMappings, p, "Results/Result", "0");
- childXMLtoCassandra(sessionCache, batch, wuVariablesMappings, p, "Variables/Variable", "-1"); // ResultSequenceStored
- childXMLtoCassandra(sessionCache, batch, wuTemporariesMappings, p, "Temporaries/Variable", "-3"); // ResultSequenceInternal // NOTE - lookups may also request ResultSequenceOnce
- childXMLtoCassandra(sessionCache, batch, wuExceptionsMappings, p, "Exceptions/Exception", 0);
- childXMLtoCassandra(sessionCache, batch, wuStatisticsMappings, p, "Statistics/Statistic", 0);
- childXMLtoCassandra(sessionCache, batch, wuFilesReadMappings, p, "FilesRead/File", 0);
- childXMLtoCassandra(sessionCache, batch, wuFilesWrittenMappings, p, "Files/File", 0);
- childXMLtoCassandra(sessionCache, batch, wuFieldUsageMappings, p, "usedsources/datasource", 0);
- IPTree *query = p->queryPropTree("Query");
- if (query)
- childXMLRowtoCassandra(sessionCache, batch, wuQueryMappings, wuid, *query, 0);
- }
- else
- {
- HashIterator iter(dirtyPaths);
- ForEach (iter)
- {
- const char *path = (const char *) iter.query().getKey();
- const CassandraXmlMapping *table = *dirtyPaths.mapToValue(&iter.query());
- if (sessionCache->queryTraceLevel()>2)
- DBGLOG("Updating dirty path %s", path);
- if (*path == '*')
- {
- if (!deletesBatch)
- deletesBatch.setown(new CassandraBatch(CASS_BATCH_TYPE_UNLOGGED));
- sessionCache->deleteChildByWuid(table, wuid, *deletesBatch);
- childXMLtoCassandra(sessionCache, batch, table, p, path+1, 0);
- }
- else
- {
- IPTree *dirty = p->queryPropTree(path);
- if (dirty)
- childXMLRowtoCassandra(sessionCache, batch, table, wuid, *dirty, 0);
- else if (sessionCache->queryTraceLevel())
- {
- StringBuffer xml;
- toXML(p, xml);
- DBGLOG("Missing dirty element %s in %s", path, xml.str());
- }
- }
- }
- ForEachItemIn(d, dirtyResults)
- {
- IWUResult &result = dirtyResults.item(d);
- switch (result.getResultSequence())
- {
- case ResultSequenceStored:
- childXMLRowtoCassandra(sessionCache, batch, wuVariablesMappings, wuid, *result.queryPTree(), "-1");
- break;
- case ResultSequenceInternal:
- case ResultSequenceOnce:
- childXMLRowtoCassandra(sessionCache, batch, wuTemporariesMappings, wuid, *result.queryPTree(), "-3");
- break;
- default:
- childXMLRowtoCassandra(sessionCache, batch, wuResultsMappings, wuid, *result.queryPTree(), "0");
- break;
- }
- }
- }
- if (sessionCache->queryTraceLevel() > 1)
- DBGLOG("Executing commit batches");
- if (deletesBatch)
- {
- CassandraFuture futureBatch(cass_session_execute_batch(sessionCache->querySession(), *deletesBatch));
- futureBatch.wait("commit deletes");
- }
- CassandraFuture futureBatch(cass_session_execute_batch(sessionCache->querySession(), batch));
- futureBatch.wait("commit updates");
- executeAsync(secondaryBatch, "commit");
- if (stateChanged)
- {
- // Signal changes to state to anyone that might be watching via Dali
- VStringBuffer xpath("/WorkUnitStates/%s/State", wuid);
- Owned<IRemoteConnection> conn = querySDS().connect(xpath.str(), myProcessSession(), RTM_LOCK_WRITE|RTM_CREATE, SDS_LOCK_TIMEOUT);
- conn->queryRoot()->setProp(NULL, p->queryProp("@state"));
- }
- if (actionChanged)
- {
- // Signal changes to action to anyone that might be watching via Dali
- VStringBuffer xpath("/WorkUnitStates/%s/Action", wuid);
- Owned<IRemoteConnection> conn = querySDS().connect(xpath.str(), myProcessSession(), RTM_LOCK_WRITE|RTM_CREATE, SDS_LOCK_TIMEOUT);
- conn->queryRoot()->setProp(NULL, p->queryProp("Action"));
- }
- prev.clear();
- allDirty = false;
- stateChanged = false;
- actionChanged = false;
- dirtyPaths.kill();
- dirtyResults.kill();
- }
- virtual void import(IPropertyTree *wuTree, IPropertyTree *graphProgressTree)
- {
- CPersistedWorkUnit::loadPTree(LINK(wuTree));
- if (sessionCache->queryTraceLevel() >= 8)
- {
- StringBuffer s; toXML(wuTree, s); DBGLOG("CCassandraWorkUnit::import\n%s", s.str());
- }
- CIArrayOf<CassandraStatement> secondaryBatch;
- CassandraBatch batch(CASS_BATCH_TYPE_UNLOGGED);
- updateSecondaries(secondaryBatch);
- // MORE can use the table?
- childXMLtoCassandra(sessionCache, batch, wuGraphsMappings, wuTree, "Graphs/Graph", 0);
- childXMLtoCassandra(sessionCache, batch, wuResultsMappings, wuTree, "Results/Result", "0");
- childXMLtoCassandra(sessionCache, batch, wuVariablesMappings, wuTree, "Variables/Variable", "-1"); // ResultSequenceStored
- childXMLtoCassandra(sessionCache, batch, wuTemporariesMappings, wuTree, "Temporaries/Variable", "-3"); // ResultSequenceInternal // NOTE - lookups may also request ResultSequenceOnce
- childXMLtoCassandra(sessionCache, batch, wuExceptionsMappings, wuTree, "Exceptions/Exception", 0);
- childXMLtoCassandra(sessionCache, batch, wuStatisticsMappings, wuTree, "Statistics/Statistic", 0);
- childXMLtoCassandra(sessionCache, batch, wuFilesReadMappings, wuTree, "FilesRead/File", 0);
- childXMLtoCassandra(sessionCache, batch, wuFilesWrittenMappings, wuTree, "Files/File", 0);
- childXMLtoCassandra(sessionCache, batch, wuFieldUsageMappings, wuTree, "usedsources/datasource", 0);
- IPTree *query = wuTree->queryPropTree("Query");
- if (query)
- childXMLRowtoCassandra(sessionCache, batch, wuQueryMappings, queryWuid(), *query, 0);
- if (sessionCache->queryTraceLevel() > 1)
- DBGLOG("Executing commit batches");
- CassandraFuture futureBatch(cass_session_execute_batch(sessionCache->querySession(), batch));
- futureBatch.wait("commit updates");
- executeAsync(secondaryBatch, "commit");
- if (!graphProgressTree)
- return;
- if (sessionCache->queryTraceLevel() >= 8)
- {
- StringBuffer s; toXML(graphProgressTree, s); DBGLOG("CCassandraWorkUnit::import\n%s", s.str());
- }
- Owned<IPTreeIterator> graphs = graphProgressTree->getElements("*");
- ForEach(*graphs)
- {
- IPTree &graph = graphs->query();
- const char *graphName = graph.queryName();
- Owned<IPTreeIterator> subs = graph.getElements("*");
- ForEach(*subs)
- {
- IPTree &sub = subs->query();
- const char *name=sub.queryName();
- if (name[0]=='s' && name[1]=='g')
- {
- setGraphProgress(&graph, graphName, atoi(name+2), sub.queryProp("@creator"), false);
- }
- else if (streq(name, "node"))
- {
- unsigned subid = sub.getPropInt("@id");
- if (subid)
- {
- if (sub.hasChildren()) // Old format
- setGraphProgress(&sub, graphName, subid, sub.queryProp("@creator"), false);
- if (sub.hasProp("@_state"))
- setNodeState(graphName, subid, (WUGraphState) sub.getPropInt("@_state"));
- }
- }
- }
- if (graph.hasProp("@_state"))
- setGraphState(graphName, graph.getPropInt("@wfid"), (WUGraphState) graph.getPropInt("@_state"));
- }
- }
- virtual IConstWUGraph *getGraph(const char *qname) const
- {
- // Just because we read one graph, does not mean we are likely to read more. So don't cache this result.
- // Also note that graphs are generally read-only
- CassandraResult result(sessionCache->fetchDataForWuidAndKey(wuGraphsMappings, queryWuid(), qname));
- const CassRow *row = cass_result_first_row(result);
- if (row)
- {
- Owned<IPTree> graph = createPTree("Graph");
- unsigned colidx = 2; // We did not fetch wuid or partition
- CassandraIterator cols(cass_iterator_from_row(row));
- while (cass_iterator_next(cols))
- {
- assertex(wuGraphsMappings[colidx].columnName);
- const CassValue *value = cass_iterator_get_column(cols);
- if (value && !cass_value_is_null(value))
- wuGraphsMappings[colidx].mapper.toXML(graph, wuGraphsMappings[colidx].xpath, value);
- colidx++;
- }
- return new CLocalWUGraph(*this, graph.getClear());
- }
- else
- return NULL;
- }
- virtual unsigned getResultCount() const
- {
- return childCount(sessionCache, wuResultsMappings, queryWuid());
- }
- virtual unsigned getGraphCount() const
- {
- return childCount(sessionCache, wuGraphsMappings, queryWuid());
- }
- virtual unsigned getSourceFileCount() const
- {
- return childCount(sessionCache, wuFilesReadMappings, queryWuid());
- }
- virtual unsigned getVariableCount() const
- {
- return childCount(sessionCache, wuVariablesMappings, queryWuid());
- }
- virtual void setUser(const char *user)
- {
- if (trackSecondaryChange(user, "@submitID"))
- CPersistedWorkUnit::setUser(user);
- }
- virtual void setClusterName(const char *cluster)
- {
- if (trackSecondaryChange(cluster, "@clusterName"))
- CPersistedWorkUnit::setClusterName(cluster);
- }
- virtual void setJobName(const char *jobname)
- {
- if (trackSecondaryChange(jobname, "@jobName"))
- CPersistedWorkUnit::setJobName(jobname);
- }
- virtual void setState(WUState state)
- {
- if (trackSecondaryChange(getWorkunitStateStr(state), "@state"))
- {
- stateChanged = true;
- CPersistedWorkUnit::setState(state);
- }
- }
- virtual void setAction(WUAction action)
- {
- actionChanged = true;
- CPersistedWorkUnit::setAction(action);
- }
- virtual void setApplicationValue(const char *app, const char *propname, const char *value, bool overwrite)
- {
- VStringBuffer xpath("Application/%s/%s", app, propname);
- if (trackSecondaryChange(value, xpath))
- CPersistedWorkUnit::setApplicationValue(app, propname, value, overwrite);
- }
- virtual void _lockRemote()
- {
- lockWuid(daliLock, queryWuid());
- }
- virtual void _unlockRemote()
- {
- commit();
- if (daliLock)
- {
- daliLock->close(true);
- daliLock.clear();
- }
- }
- virtual void createGraph(const char * name, const char *label, WUGraphType type, IPropertyTree *xgmml, unsigned wfid)
- {
- CPersistedWorkUnit::createGraph(name, label, type, xgmml, wfid);
- VStringBuffer xpath("Graphs/Graph[@name='%s']", name);
- noteDirty(xpath, wuGraphsMappings);
- }
- virtual IWUResult * updateResultByName(const char * name)
- {
- return noteDirty(CPersistedWorkUnit::updateResultByName(name));
- }
- virtual IWUResult * updateResultBySequence(unsigned seq)
- {
- return noteDirty(CPersistedWorkUnit::updateResultBySequence(seq));
- }
- virtual IWUResult * updateTemporaryByName(const char * name)
- {
- return noteDirty(CPersistedWorkUnit::updateTemporaryByName(name));
- }
- virtual IWUResult * updateVariableByName(const char * name)
- {
- return noteDirty(CPersistedWorkUnit::updateVariableByName(name));
- }
- virtual IWUQuery * updateQuery()
- {
- noteDirty("Query", wuQueryMappings);
- return CPersistedWorkUnit::updateQuery();
- }
- virtual IConstWUQuery *getQuery() const
- {
- checkChildLoaded(wuQueriesTable);
- return CPersistedWorkUnit::getQuery();
- }
- virtual IConstWUFileUsageIterator * getFieldUsage() const
- {
- checkChildLoaded(wuFieldUsageTable);
- return CPersistedWorkUnit::getFieldUsage();
- }
- virtual IWUException *createException()
- {
- IWUException *result = CPersistedWorkUnit::createException();
- VStringBuffer xpath("Exceptions/Exception[@sequence='%d']", result->getSequence());
- noteDirty(xpath, wuExceptionsMappings);
- return result;
- }
- virtual void copyWorkUnit(IConstWorkUnit *cached, bool copyStats, bool all)
- {
- // Make sure that any required updates to the secondary files happen
- IPropertyTree *fromP = queryExtendedWU(cached)->queryPTree();
- for (const char * const *search = searchPaths; *search; search++)
- trackSecondaryChange(fromP->queryProp(*search), *search);
- for (const ChildTableInfo * const * table = childTables; *table != NULL; table++)
- checkChildLoaded(**table);
- CPersistedWorkUnit::copyWorkUnit(cached, copyStats, all);
- memset(childLoaded, 1, sizeof(childLoaded));
- allDirty = true;
- actionChanged = true;
- stateChanged = true;
- }
- virtual void noteFileRead(IDistributedFile *file)
- {
- if (file)
- {
- childLoaded[WuFilesReadChild] = true; // Prevent duplicates if someone tries to read back files read (unlikely)
- CPersistedWorkUnit::noteFileRead(file);
- VStringBuffer xpath("FilesRead/File[@name='%s']", file->queryLogicalName());
- noteDirty(xpath, wuFilesReadMappings);
- }
- else
- {
- // A hack for testing!
- Owned<IPropertyTreeIterator> files = p->getElements("FilesRead/File");
- ForEach(*files)
- {
- VStringBuffer xpath("FilesRead/File[@name='%s']", files->query().queryProp("@name"));
- noteDirty(xpath, wuFilesReadMappings);
- }
- }
- }
- virtual void addFile(const char *fileName, StringArray *clusters, unsigned usageCount, WUFileKind fileKind, const char *graphOwner)
- {
- if (fileName)
- {
- childLoaded[WuFilesWrittenChild] = true; // Prevent duplicates if someone tries to read back files written from same object (unlikely)
- CPersistedWorkUnit::addFile(fileName, clusters, usageCount, fileKind, graphOwner);
- VStringBuffer xpath("Files/File[@name='%s']", fileName);
- noteDirty(xpath, wuFilesWrittenMappings);
- }
- }
- virtual void clearGraphProgress() const
- {
- const char *wuid = queryWuid();
- CassandraBatch batch(CASS_BATCH_TYPE_UNLOGGED);
- sessionCache->deleteChildByWuid(wuGraphProgressMappings, wuid, batch);
- sessionCache->deleteChildByWuid(wuGraphStateMappings, wuid, batch);
- sessionCache->deleteChildByWuid(wuGraphRunningMappings, wuid, batch);
- executeBatch(batch, "clearGraphProgress");
- }
- virtual bool getRunningGraph(IStringVal &graphName, WUGraphIDType &subId) const
- {
- CassandraStatement statement(sessionCache->prepareStatement("SELECT graphID, subgraphID FROM wuGraphRunning where partition=? and wuid=?;"));
- const char *wuid = queryWuid();
- statement.bindInt32(0, rtlHash32VStr(wuid, 0) % sessionCache->queryPartitions());
- statement.bindString(1, wuid);
- CassandraFuture future(cass_session_execute(sessionCache->querySession(), statement));
- future.wait("getRunningGraph");
- CassandraResult result(cass_future_get_result(future));
- if (cass_result_row_count(result))
- {
- const CassRow *row = cass_result_first_row(result);
- assertex(row);
- StringBuffer b;
- getCassString(b, cass_row_get_column(row, 0));
- graphName.set(b);
- subId = getUnsignedResult(NULL, cass_row_get_column(row, 1));
- return true;
- }
- else
- return false;
- }
- virtual IConstWUGraphProgress *getGraphProgress(const char *graphName) const
- {
- CassandraStatement statement(sessionCache->prepareStatement("SELECT subgraphID, creator, progress FROM wuGraphProgress where partition=? and wuid=? and graphID=?;"));
- const char *wuid = queryWuid();
- statement.bindInt32(0, rtlHash32VStr(wuid, 0) % sessionCache->queryPartitions());
- statement.bindString(1, wuid);
- statement.bindString(2, graphName);
- CassandraFuture future(cass_session_execute(sessionCache->querySession(), statement));
- future.wait("getGraphProgress");
- CassandraResult result(cass_future_get_result(future));
- CassandraIterator rows(cass_iterator_from_result(result));
- if (!cass_result_row_count(result))
- return NULL;
- Owned<IPropertyTree> progress = createPTree(graphName);
- progress->setPropBool("@stats", true);
- progress->setPropInt("@format", PROGRESS_FORMAT_V);
- while (cass_iterator_next(rows))
- {
- const CassRow *row = cass_iterator_get_row(rows);
- WUGraphIDType subId = getUnsignedResult(NULL, cass_row_get_column(row, 0));
- StringBuffer creator, xml;
- getCassString(creator, cass_row_get_column(row, 1));
- getCassString(xml, cass_row_get_column(row, 2));
- IPTree *stats = createPTreeFromXMLString(xml);
- // We could check that atoi(stats->queryName()+2)==subgraphID, and that stats->queryProp(@creator)==creator)....
- progress->addPropTree(stats->queryName(), stats);
- }
- return createConstGraphProgress(queryWuid(), graphName, progress); // Links progress
- }
- WUGraphState queryGraphState(const char *graphName) const
- {
- return queryNodeState(graphName, 0);
- }
- WUGraphState queryNodeState(const char *graphName, WUGraphIDType nodeId) const
- {
- CassandraStatement statement(sessionCache->prepareStatement("SELECT state FROM wuGraphState where partition=? and wuid=? and graphID=? and subgraphID=?;"));
- const char *wuid = queryWuid();
- statement.bindInt32(0, rtlHash32VStr(wuid, 0) % sessionCache->queryPartitions());
- statement.bindString(1, wuid);
- statement.bindString(2, graphName);
- statement.bindInt64(3, nodeId);
- CassandraFuture future(cass_session_execute(sessionCache->querySession(), statement));
- future.wait("queryNodeState");
- CassandraResult result(cass_future_get_result(future));
- if (cass_result_row_count(result))
- return (WUGraphState) getUnsignedResult(NULL, getSingleResult(result));
- else
- return WUGraphUnknown;
- }
- void setGraphState(const char *graphName, unsigned wfid, WUGraphState state) const
- {
- setNodeState(graphName, 0, state);
- }
- void setNodeState(const char *graphName, WUGraphIDType nodeId, WUGraphState state) const
- {
- CassandraStatement statement(sessionCache->prepareStatement("INSERT INTO wuGraphState (partition, wuid, graphID, subgraphID, state) values (?,?,?,?,?);"));
- const char *wuid = queryWuid();
- statement.bindInt32(0, rtlHash32VStr(wuid, 0) % sessionCache->queryPartitions());
- statement.bindString(1, wuid);
- statement.bindString(2, graphName);
- statement.bindInt64(3, nodeId);
- statement.bindInt32(4, (int) state);
- CassandraFuture future(cass_session_execute(sessionCache->querySession(), statement));
- future.wait("setNodeState update state");
- if (nodeId)
- {
- switch (state)
- {
- case WUGraphRunning:
- {
- CassandraStatement statement2(sessionCache->prepareStatement("INSERT INTO wuGraphRunning (partition, wuid, graphID, subgraphID) values (?,?,?,?);"));
- statement2.bindInt32(0, rtlHash32VStr(wuid, 0) % sessionCache->queryPartitions());
- statement2.bindString(1, wuid);
- statement2.bindString(2, graphName);
- statement2.bindInt64(3, nodeId);
- CassandraFuture future(cass_session_execute(sessionCache->querySession(), statement2));
- future.wait("setNodeState update running");
- break;
- }
- case WUGraphComplete:
- {
- CassandraStatement statement3(sessionCache->prepareStatement("DELETE FROM wuGraphRunning where partition=? and wuid=?;"));
- statement3.bindInt32(0, rtlHash32VStr(wuid, 0) % sessionCache->queryPartitions());
- statement3.bindString(1, wuid);
- CassandraFuture future(cass_session_execute(sessionCache->querySession(), statement3));
- future.wait("setNodeState remove running");
- break;
- }
- }
- }
- }
- class CCassandraWuGraphStats : public CWuGraphStats
- {
- public:
- CCassandraWuGraphStats(const CCassandraWorkUnit *_parent, StatisticCreatorType _creatorType, const char * _creator, unsigned _wfid, const char * _rootScope, unsigned _id, bool _merge)
- : CWuGraphStats(_creatorType, _creator, _wfid, _rootScope, _id, _merge),
- progress(createPTree(_rootScope)), parent(_parent)
- {
- }
- virtual IPropertyTree &queryProgressTree() override
- {
- return *progress.get();
- }
- virtual void beforeDispose() override
- {
- CWuGraphStats::beforeDispose(); // Sets up progress - should contain a single child tree sqNN where nn==id
- parent->setGraphProgress(progress, progress->queryName(), id, creator, merge);
- }
- protected:
- Owned<IPropertyTree> progress;
- Linked<const CCassandraWorkUnit> parent;
- StringAttr wuid;
- };
- IWUGraphStats *updateStats(const char *graphName, StatisticCreatorType creatorType, const char * creator, unsigned wfid, unsigned subgraph, bool merge) const override
- {
- return new CCassandraWuGraphStats(this, creatorType, creator, wfid, graphName, subgraph, merge);
- }
- virtual void _loadFilesRead() const
- {
- checkChildLoaded(wuFilesReadTable); // Lazy populate the FilesRead branch of p from Cassandra
- CPersistedWorkUnit::_loadFilesRead();
- }
- virtual void _loadFilesWritten() const
- {
- checkChildLoaded(wuFilesWrittenTable); // Lazy populate the Files branch of p from Cassandra
- CPersistedWorkUnit::_loadFilesWritten();
- }
- virtual void _loadResults() const
- {
- checkChildLoaded(wuResultsTable); // Lazy populate the Results branch of p from Cassandra
- CPersistedWorkUnit::_loadResults();
- }
- virtual void _loadGraphs(bool heavy) const
- {
- // Lazy populate the Graphs branch of p from Cassandra
- if (heavy)
- {
- // If we loaded light before, and are now loading heavy, we need to force the reload. Unlikely to happen in practice.
- if (graphsCached==1)
- {
- p->removeProp("Graphs");
- childLoaded[WuGraphsChild] = false;
- }
- checkChildLoaded(wuGraphsTable);
- }
- else
- {
- checkChildLoaded(wuGraphMetasTable);
- }
- CPersistedWorkUnit::_loadGraphs(heavy);
- }
- virtual void _loadVariables() const
- {
- checkChildLoaded(wuVariablesTable); // Lazy populate the Variables branch of p from Cassandra
- CPersistedWorkUnit::_loadVariables();
- }
- virtual void _loadTemporaries() const
- {
- checkChildLoaded(wuTemporariesTable); // Lazy populate the Temporaries branch of p from Cassandra
- CPersistedWorkUnit::_loadTemporaries();
- }
- virtual void _loadStatistics() const
- {
- checkChildLoaded(wuStatisticsTable); // Lazy populate the Statistics branch of p from Cassandra
- CPersistedWorkUnit::_loadStatistics();
- }
- virtual void _loadExceptions() const
- {
- checkChildLoaded(wuExceptionsTable); // Lazy populate the Exceptions branch of p from Cassandra
- CPersistedWorkUnit::_loadExceptions();
- }
- virtual void clearExceptions(const char * source=nullptr)
- {
- CriticalBlock b(crit);
- noteDirty("*Exceptions/Exception", wuExceptionsMappings);
- CPersistedWorkUnit::clearExceptions(source);
- }
- virtual IPropertyTree *getUnpackedTree(bool includeProgress) const
- {
- // If anyone wants the whole ptree, we'd better make sure we have fully loaded it...
- CriticalBlock b(crit);
- for (const ChildTableInfo * const * table = childTables; *table != NULL; table++)
- checkChildLoaded(**table);
- return CPersistedWorkUnit::getUnpackedTree(includeProgress);
- }
- virtual IPropertyTree *queryPTree() const
- {
- // If anyone wants the whole ptree, we'd better make sure we have fully loaded it...
- CriticalBlock b(crit);
- for (const ChildTableInfo * const * table = childTables; *table != NULL; table++)
- checkChildLoaded(**table);
- // And a hack for the fact that Dali stores state in both @state and <state>
- const char *stateStr = p->queryProp("@state");
- if (stateStr)
- p->setProp("State", stateStr);
- return p;
- }
- void setGraphProgress(IPropertyTree *progress, const char *gid, unsigned subid, const char *creator, bool merge) const
- {
- if (merge)
- UNIMPLEMENTED;
- const char *wuid=queryWuid();
- CassandraStatement statement(sessionCache->prepareStatement("INSERT INTO wuGraphProgress (partition, wuid, graphID, subgraphID, creator, progress) values (?,?,?,?,?,?);"));
- statement.bindInt32(0, rtlHash32VStr(wuid, 0) % sessionCache->queryPartitions());
- statement.bindString(1, wuid);
- statement.bindString(2, gid);
- statement.bindInt64(3, subid);
- statement.bindString(4, creator);
- StringBuffer tag;
- tag.append("sg").append(subid);
- IPTree *sq = progress->queryPropTree(tag);
- assertex(sq);
- StringBuffer xml;
- toXML(sq, xml);
- statement.bindString(5, xml);
- CassandraFuture future(cass_session_execute(sessionCache->querySession(), statement));
- future.wait("update stats");
- }
- virtual IPropertyTree *getGraphProgressTree() const
- {
- CassandraStatement graphQuery(sessionCache->prepareStatement("SELECT graphId, subgraphID, creator, progress FROM wuGraphProgress where partition=? and wuid=?;"));
- const char *wuid = queryWuid();
- graphQuery.bindInt32(0, rtlHash32VStr(wuid, 0) % sessionCache->queryPartitions());
- graphQuery.bindString(1, wuid);
- CassandraFuture future(cass_session_execute(sessionCache->querySession(), graphQuery));
- future.wait("getGraphProgress");
- CassandraResult result(cass_future_get_result(future));
- if (!cass_result_row_count(result))
- return NULL;
- Owned<IPTree> progress = createPTree("GraphProgress");
- CassandraIterator rows(cass_iterator_from_result(result));
- while (cass_iterator_next(rows))
- {
- const CassRow *row = cass_iterator_get_row(rows);
- StringBuffer graphName, creator, xml;
- getCassString(graphName, cass_row_get_column(row, 0));
- WUGraphIDType subId = getUnsignedResult(NULL, cass_row_get_column(row, 1));
- getCassString(creator, cass_row_get_column(row, 2));
- getCassString(xml, cass_row_get_column(row, 3));
- if (!progress->hasProp(graphName))
- progress->setPropTree(graphName, createPTree(graphName));
- IPTree *graph = progress->queryPropTree(graphName);
- graph->setPropBool("@stats", true);
- graph->setPropInt("@format", PROGRESS_FORMAT_V);
- IPTree *stats = createPTreeFromXMLString(xml);
- // We could check that atoi(stats->queryName()+2)==subgraphID, and that stats->queryProp(@creator)==creator)....
- graph->addPropTree(stats->queryName(), stats);
- }
- // Now fill in the graph/node states
- CassandraStatement stateQuery(sessionCache->prepareStatement("SELECT graphId, subgraphId, state FROM wuGraphState where partition=? and wuid=?;"));
- stateQuery.bindInt32(0, rtlHash32VStr(wuid, 0) % sessionCache->queryPartitions());
- stateQuery.bindString(1, wuid);
- CassandraFuture stateFuture(cass_session_execute(sessionCache->querySession(), stateQuery));
- stateFuture.wait("getGraphStateProgress");
- CassandraResult stateResult(cass_future_get_result(stateFuture));
- CassandraIterator stateRows(cass_iterator_from_result(stateResult));
- if (cass_result_row_count(stateResult))
- {
- CassandraIterator stateRows(cass_iterator_from_result(stateResult));
- while (cass_iterator_next(stateRows))
- {
- const CassRow *row = cass_iterator_get_row(stateRows);
- StringBuffer graphName;
- getCassString(graphName, cass_row_get_column(row, 0));
- WUGraphIDType subId = getUnsignedResult(NULL, cass_row_get_column(row, 1));
- unsigned state = getUnsignedResult(NULL, cass_row_get_column(row, 2));
- IPTree *node = progress->queryPropTree(graphName);
- if (node)
- {
- if (subId)
- {
- // This is what you might expect it to say...
- //StringBuffer sg("sg");
- //sg.append(subId);
- //node = node->queryPropTree(sg);
- // but in fact the node states are stored in separate elements. I need to see if that is something I broke.
- StringBuffer xpath("node[@id='");
- xpath.append(subId).append("'])");
- node->removeProp(xpath); // Shouldn't be one, just playing safe
- node = node->addPropTree("node", createPTree("node"));
- node->setPropInt("@id", subId);
- node->setPropInt("@_state", state);
- }
- else
- node->setPropInt("@_state", state);
- }
- }
- }
- return progress.getClear();
- }
- protected:
- // Delete child table rows
- void deleteChildren(const char *wuid, CassBatch *useBatch)
- {
- for (const ChildTableInfo * const * table = childTables; *table != NULL; table++)
- sessionCache->deleteChildByWuid(table[0]->mappings, wuid, useBatch);
- }
- // Lazy-populate a portion of WU xml from a child table
- void checkChildLoaded(const ChildTableInfo &childTable) const
- {
- // NOTE - should be called inside critsec
- if (!childLoaded[childTable.index])
- {
- const CassResult* cassResult;
- try
- {
- cassResult = sessionCache->fetchDataForWuid(childTable.mappings, queryWuid(), false);
- }
- catch (IException* e)
- {
- int errorCode = e->errorCode();
- StringBuffer origErrorMsg;
- e->errorMessage(origErrorMsg);
- e->Release();
- const char* tableName = queryTableName(childTable.mappings);
- VStringBuffer newErrorMsg("Failed to read from cassandra table '%s' (Have you run wutool to initialize cassandra repository?), [%s]", tableName, origErrorMsg.str());
- rtlFail(errorCode, newErrorMsg);
- }
- CassandraResult result(cassResult);
- IPTree *results = p->queryPropTree(childTable.parentElement);
- CassandraIterator rows(cass_iterator_from_result(result));
- while (cass_iterator_next(rows))
- {
- CassandraIterator cols(cass_iterator_from_row(cass_iterator_get_row(rows)));
- Owned<IPTree> child;
- if (!results)
- results = ensurePTree(p, childTable.parentElement);
- if (childTable.childElement)
- child.setown(createPTree(childTable.childElement));
- else
- child.set(results);
- unsigned colidx = 2; // We did not fetch wuid or partition
- while (cass_iterator_next(cols))
- {
- assertex(childTable.mappings[colidx].columnName);
- const CassValue *value = cass_iterator_get_column(cols);
- if (value && !cass_value_is_null(value))
- childTable.mappings[colidx].mapper.toXML(child, childTable.mappings[colidx].xpath, value);
- colidx++;
- }
- if (childTable.childElement)
- {
- const char *childName = child->queryName();
- results->addPropTree(childName, child.getClear());
- }
- }
- childLoaded[childTable.index] = true;
- }
- }
- // Update secondary tables (used to search wuids by owner, state, jobname etc)
- void updateSecondaryTable(const char *xpath, const char *prevKey, const char *wuid, CIArrayOf<CassandraStatement> &batch)
- {
- if (prevKey && *prevKey)
- deleteSecondaryByKey(xpath, prevKey, wuid, sessionCache, batch);
- const char *value = p->queryProp(xpath);
- if (value && *value)
- simpleXMLtoCassandra(sessionCache, batch, searchMappings, p, xpath);
- }
- void updateSecondaryTable(const char *xpath, const char *wuid, CIArrayOf<CassandraStatement> &batch)
- {
- const char *value = p->queryProp(xpath);
- if (value && *value)
- simpleXMLtoCassandra(sessionCache, batch, searchMappings, p, xpath);
- }
- void deleteAppSecondaries(IPTree &pt, const char *wuid, CIArrayOf<CassandraStatement> &batch)
- {
- Owned<IPTreeIterator> apps = pt.getElements("Application");
- ForEach(*apps)
- {
- IPTree &app = apps->query();
- Owned<IPTreeIterator> names = app.getElements("*");
- ForEach(*names)
- {
- IPTree &name = names->query();
- Owned<IPTreeIterator> values = name.getElements("*");
- ForEach(*values)
- {
- IPTree &value = values->query();
- const char *appValue = value.queryProp(".");
- if (appValue && *appValue)
- {
- VStringBuffer xpath("%s/%s/%s", app.queryName(), name.queryName(), value.queryName());
- deleteSecondaryByKey(xpath, appValue, wuid, sessionCache, batch);
- }
- }
- }
- }
- }
- void deleteSecondaries(const char *wuid, CIArrayOf<CassandraStatement> &batch)
- {
- for (const char * const *search = searchPaths; *search; search++)
- deleteSecondaryByKey(*search, p->queryProp(*search), wuid, sessionCache, batch);
- deleteAppSecondaries(*p, wuid, batch);
- Owned<IPropertyTreeIterator> filesRead = &getFilesReadIterator();
- ForEach(*filesRead)
- {
- deleteFileSearch(sessionCache, batch, filesRead->query().queryProp("@name"), true, wuid);
- }
- Owned<IPropertyTreeIterator> filesWritten = &getFileIterator();
- ForEach(*filesWritten)
- {
- deleteFileSearch(sessionCache, batch, filesWritten->query().queryProp("@name"), false, wuid);
- }
- }
- void updateSecondaries(const char *wuid, CIArrayOf<CassandraStatement> &batch)
- {
- const char * const *search;
- for (search = searchPaths; *search; search++)
- updateSecondaryTable(*search, prev->queryProp(*search), wuid, batch);
- for (search = wildSearchPaths; *search; search++)
- {
- const char *value = p->queryProp(*search);
- if (value && *value)
- addUniqueValue(sessionCache, batch, *search, value);
- }
- deleteAppSecondaries(*prev, wuid, batch);
- Owned<IConstWUAppValueIterator> appValues = &getApplicationValues();
- ForEach(*appValues)
- {
- IConstWUAppValue& val=appValues->query();
- addUniqueValue(sessionCache, batch, "Application", val.queryApplication()); // Used to populate droplists of applications
- VStringBuffer key("@@%s", val.queryApplication());
- addUniqueValue(sessionCache, batch, key, val.queryName()); // Used to populate droplists of value names for a given application
- VStringBuffer xpath("Application/%s/%s", val.queryApplication(), val.queryName());
- addUniqueValue(sessionCache, batch, xpath, val.queryValue()); // Used to get lists of values for a given app and name, and for filtering
- simpleXMLtoCassandra(sessionCache, batch, searchMappings, p, xpath);
- }
- Owned<IPropertyTreeIterator> filesRead = &getFilesReadIterator();
- ForEach(*filesRead)
- {
- addFileSearch(sessionCache, batch, filesRead->query().queryProp("@name"), true, wuid);
- }
- Owned<IPropertyTreeIterator> filesWritten = &getFileIterator();
- ForEach(*filesWritten)
- {
- addFileSearch(sessionCache, batch, filesWritten->query().queryProp("@name"), false, wuid);
- }
- }
- void updateSecondaries(CIArrayOf<CassandraStatement> &batch)
- {
- const char *wuid = queryWuid();
- const char * const *search;
- for (search = searchPaths; *search; search++)
- updateSecondaryTable(*search, wuid, batch);
- for (search = wildSearchPaths; *search; search++)
- {
- const char *value = p->queryProp(*search);
- if (value && *value)
- addUniqueValue(sessionCache, batch, *search, value);
- }
- Owned<IConstWUAppValueIterator> appValues = &getApplicationValues();
- ForEach(*appValues)
- {
- IConstWUAppValue& val=appValues->query();
- addUniqueValue(sessionCache, batch, "Application", val.queryApplication()); // Used to populate droplists of applications
- VStringBuffer key("@@%s", val.queryApplication());
- addUniqueValue(sessionCache, batch, key, val.queryName()); // Used to populate droplists of value names for a given application
- VStringBuffer xpath("Application/%s/%s", val.queryApplication(), val.queryName());
- addUniqueValue(sessionCache, batch, xpath, val.queryValue()); // Used to get lists of values for a given app and name, and for filtering
- simpleXMLtoCassandra(sessionCache, batch, searchMappings, p, xpath);
- }
- Owned<IPropertyTreeIterator> filesRead = &getFilesReadIterator();
- ForEach(*filesRead)
- {
- addFileSearch(sessionCache, batch, filesRead->query().queryProp("@name"), true, wuid);
- }
- Owned<IPropertyTreeIterator> filesWritten = &getFileIterator();
- ForEach(*filesWritten)
- {
- addFileSearch(sessionCache, batch, filesWritten->query().queryProp("@name"), false, wuid);
- }
- }
- // Keep track of previously committed values for fields that we have a secondary table for, so that we can update them appropriately when we commit
- bool trackSecondaryChange(const char *newval, const char *xpath)
- {
- if (!newval)
- newval = "";
- const char *oldval = p->queryProp(xpath);
- if (!oldval)
- oldval = "";
- if (streq(newval, oldval))
- return false; // No change
- bool add;
- if (!prev)
- {
- prev.setown(createPTree());
- add = true;
- }
- else add = !prev->hasProp(xpath);
- if (add)
- {
- const char *tailptr = strrchr(xpath, '/');
- if (tailptr)
- {
- StringBuffer head(tailptr-xpath, xpath);
- ensurePTree(prev, head)->setProp(tailptr+1, oldval);
- }
- else
- prev->setProp(xpath, oldval);
- }
- return true;
- }
- IWUResult *noteDirty(IWUResult *result)
- {
- if (result)
- dirtyResults.append(*LINK(result));
- return result;
- }
- void noteDirty(const char *xpath, const CassandraXmlMapping *table)
- {
- dirtyPaths.setValue(xpath, table);
- }
- Linked<const ICassandraSession> sessionCache;
- mutable bool childLoaded[ChildTablesSize];
- bool allDirty;
- bool stateChanged;
- bool actionChanged;
- Owned<IPTree> prev;
- MapStringTo<const CassandraXmlMapping *> dirtyPaths;
- IArrayOf<IWUResult> dirtyResults;
- Owned<IRemoteConnection> daliLock; // We still use dali for locking
- };
- class CCassandraWorkUnitWatcher : public CWorkUnitWatcher
- {
- public:
- CCassandraWorkUnitWatcher(IWorkUnitSubscriber *_subscriber, WUSubscribeOptions flags, const char *wuid)
- : CWorkUnitWatcher(_subscriber, (WUSubscribeOptions) (flags & SubscribeOptionAbort), wuid)
- {
- if (flags & SubscribeOptionState)
- {
- VStringBuffer xpath("/WorkUnitStates/%s/State", wuid);
- stateId = querySDS().subscribe(xpath.str(), *this);
- }
- if (flags & SubscribeOptionAction)
- {
- VStringBuffer xpath("/WorkUnitStates/%s/Action", wuid);
- actionId = querySDS().subscribe(xpath.str(), *this);
- }
- }
- };
- class CCasssandraWorkUnitFactory : public CWorkUnitFactory, implements ICassandraSession
- {
- IMPLEMENT_IINTERFACE;
- public:
- CCasssandraWorkUnitFactory(const SharedObject *_dll, const IPropertyTree *props) : cluster(cass_cluster_new()), randomizeSuffix(0), randState((unsigned) get_cycles_now()), cacheRetirer(*this)
- {
- StringArray options;
- options.append("write_bytes_high_water_mark=1000000"); // Set the default HWM - workunits get big. This can be overridden by supplied options
- Owned<IPTreeIterator> it = props->getElements("Option");
- ForEach(*it)
- {
- IPTree &item = it->query();
- const char *opt = item.queryProp("@name");
- const char *val = item.queryProp("@value");
- if (opt && val)
- {
- if (strieq(opt, "randomWuidSuffix"))
- randomizeSuffix = atoi(val);
- else if (strieq(opt, "traceLevel"))
- traceLevel = atoi(val);
- else if (strieq(opt, "partitions"))
- {
- partitions = atoi(val); // Note this value is only used when creating a new repo
- if (partitions < MIN_PARTITIONS)
- partitions = MIN_PARTITIONS;
- else if (partitions > MAX_PARTITIONS)
- partitions = MAX_PARTITIONS;
- }
- else if (strieq(opt, "prefixSize"))
- {
- prefixSize = atoi(val); // Note this value is only used when creating a new repo
- if (prefixSize < MIN_PREFIX_SIZE)
- prefixSize = MIN_PREFIX_SIZE;
- else if (prefixSize > MAX_PREFIX_SIZE)
- prefixSize = MAX_PREFIX_SIZE;
- }
- else
- {
- VStringBuffer optstr("%s=%s", opt, val);
- options.append(optstr);
- }
- }
- }
- cluster.setOptions(options);
- if (!cluster.queryKeySpace())
- cluster.setKeySpace("hpcc");
- try
- {
- cluster.connect();
- Owned<IPTree> versionInfo = getVersionInfo();
- if (versionInfo)
- {
- int major = versionInfo->getPropInt("@major", 0);
- int minor = versionInfo->getPropInt("@minor", 0);
- partitions = versionInfo->getPropInt("@numPartitions", DEFAULT_PARTITIONS);
- prefixSize = versionInfo->getPropInt("@searchPrefixSize", DEFAULT_PREFIX_SIZE);
- if (major && minor)
- {
- // Note that if there is no version info at all, we have to assume that the repository is not yet created. We don't fail, otherwise no-one can call createRepository the first time...
- if (major != majorVersion)
- throw makeStringExceptionV(WUERR_WorkunitVersionMismatch, "Incompatible workunit repository version (wanted %d.%d, found %d.%d)", majorVersion, minorVersion, major, minor);
- if (minor != minorVersion)
- {
- if (minor < minorVersion)
- {
- DBGLOG("WARNING: repository version %d.%d is older than current version %d.%d - adding required columns", major, minor, majorVersion, minorVersion);
- switch (minor)
- {
- case 1:
- executeSimpleCommand(querySession(), "ALTER TABLE wuresults ADD graph text;");
- executeSimpleCommand(querySession(), "ALTER TABLE wuresults ADD activity int;");
- executeSimpleCommand(querySession(), "ALTER TABLE wuvariables ADD graph text;");
- executeSimpleCommand(querySession(), "ALTER TABLE wuvariables ADD activity int;");
- executeSimpleCommand(querySession(), "ALTER TABLE wutemporaries ADD graph text;");
- executeSimpleCommand(querySession(), "ALTER TABLE wutemporaries ADD activity int;");
- break;
- }
- createVersionTable(true);
- }
- else
- DBGLOG("WARNING: repository version %d.%d is newer than current version %d.%d - some columns will not be updated", major, minor, majorVersion, minorVersion);
- }
- }
- }
- else
- {
- DBGLOG("WARNING: repository version could not be retrieved (repository not yet created?)");
- cluster.disconnect();
- }
- }
- catch (IException *E)
- {
- EXCLOG(E);
- E->Release();
- DBGLOG("WARNING: repository version could not be retrieved (repository not yet created?)");
- }
- cacheRetirer.start();
- LINK(_dll); // Yes, this leaks. Not really sure how to avoid that.
- }
- ~CCasssandraWorkUnitFactory()
- {
- cacheRetirer.stop();
- cacheRetirer.join();
- if (traceLevel)
- DBGLOG("CCasssandraWorkUnitFactory destroyed");
- }
- virtual bool initializeStore()
- {
- createRepository();
- return true;
- }
- virtual IWorkUnitWatcher *getWatcher(IWorkUnitSubscriber *subscriber, WUSubscribeOptions options, const char *wuid) const
- {
- return new CCassandraWorkUnitWatcher(subscriber, options, wuid);
- }
- virtual CLocalWorkUnit* _createWorkUnit(const char *wuid, ISecManager *secmgr, ISecUser *secuser)
- {
- unsigned suffix;
- unsigned suffixLength;
- if (randomizeSuffix) // May need to enable this option if you are expecting to create hundreds of workunits / second
- {
- suffix = rand_r(&randState);
- suffixLength = randomizeSuffix;
- }
- else
- {
- suffix = 0;
- suffixLength = 0;
- }
- Owned<CassandraPrepared> prepared = prepareStatement("INSERT INTO workunits (partition, wuid) VALUES (?,?) IF NOT EXISTS;");
- for (;;)
- {
- // Create a unique WUID by adding suffixes until we managed to add a new value
- StringBuffer useWuid(wuid);
- if (suffix)
- {
- useWuid.append("-");
- for (unsigned i = 0; i < suffixLength; i++)
- {
- useWuid.appendf("%c", '0'+suffix%10);
- suffix /= 10;
- }
- }
- CassandraStatement statement(prepared.getLink());
- statement.bindInt32(0, rtlHash32VStr(useWuid.str(), 0) % partitions);
- statement.bindString(1, useWuid.str());
- if (traceLevel >= 2)
- DBGLOG("Try creating %s", useWuid.str());
- CassandraFuture future(cass_session_execute(querySession(), statement));
- future.wait("execute");
- CassandraResult result(cass_future_get_result(future));
- if (cass_result_column_count(result)==1)
- {
- // A single column result indicates success, - the single column should be called '[applied]' and have the value 'true'
- // If there are multiple columns it will be '[applied]' (value false) and the fields of the existing row
- Owned<IPTree> wuXML = createPTree(useWuid);
- wuXML->setProp("@xmlns:xsi", "http://www.w3.org/1999/XMLSchema-instance");
- wuXML->setPropInt("@wuidVersion", WUID_VERSION); // we implement the latest version.
- wuXML->setProp("@totalThorTime", ""); // must be non null, otherwise sorting by thor time excludes the values
- Owned<IRemoteConnection> daliLock;
- lockWuid(daliLock, useWuid);
- Owned<CLocalWorkUnit> wu = new CCassandraWorkUnit(this, wuXML.getClear(), secmgr, secuser, daliLock.getClear(), false);
- return wu.getClear();
- }
- suffix = rand_r(&randState);
- if (suffixLength<9)
- suffixLength++;
- }
- }
- virtual CLocalWorkUnit* _openWorkUnit(const char *wuid, ISecManager *secmgr, ISecUser *secuser)
- {
- Owned<IPTree> wuXML = cassandraToWorkunitXML(wuid);
- if (wuXML)
- return new CCassandraWorkUnit(this, wuXML.getClear(), secmgr, secuser, NULL, false);
- else
- return NULL;
- }
- virtual CLocalWorkUnit* _updateWorkUnit(const char *wuid, ISecManager *secmgr, ISecUser *secuser)
- {
- // We still use dali for the locks
- Owned<IRemoteConnection> daliLock;
- lockWuid(daliLock, wuid);
- Owned<IPTree> wuXML = cassandraToWorkunitXML(wuid);
- Owned<CLocalWorkUnit> wu = new CCassandraWorkUnit(this, wuXML.getClear(), secmgr, secuser, daliLock.getClear(), false);
- return wu.getClear();
- }
- virtual bool _restoreWorkUnit(IPTree *_pt, const char *wuid)
- {
- Owned<IPTree> pt(_pt);
- try
- {
- Owned<IRemoteConnection> daliLock;
- lockWuid(daliLock, wuid);
- Owned<IPropertyTree> gProgress = pruneBranch(pt, "GraphProgress[1]");
- Owned<CCassandraWorkUnit> wu = new CCassandraWorkUnit(this, pt.getClear(), NULL, NULL, daliLock.getClear(), true);
- if (gProgress)
- {
- Owned<IPTreeIterator> graphs = gProgress->getElements("*");
- ForEach(*graphs)
- {
- IPTree &graph = graphs->query();
- const char *graphName = graph.queryName();
- Owned<IPTreeIterator> subs = graph.getElements("*");
- ForEach(*subs)
- {
- IPTree &sub = subs->query();
- const char *name=sub.queryName();
- if (name[0]=='s' && name[1]=='g')
- {
- wu->setGraphProgress(&graph, graphName, atoi(name+2), sub.queryProp("@creator"), false);
- }
- else if (streq(name, "node"))
- {
- unsigned subid = sub.getPropInt("@id");
- if (subid)
- {
- if (sub.hasChildren()) // Old format
- wu->setGraphProgress(&sub, graphName, subid, sub.queryProp("@creator"), false);
- if (sub.hasProp("@_state"))
- wu->setNodeState(graphName, subid, (WUGraphState) sub.getPropInt("@_state"));
- }
- }
- }
- if (graph.hasProp("@_state"))
- wu->setGraphState(graphName, graph.getPropInt("@wfid"), (WUGraphState) graph.getPropInt("@_state"));
- }
- }
- wu->commit();
- return true;
- }
- catch (IException *E)
- {
- EXCLOG(E);
- ::Release(E);
- return false;
- }
- }
- virtual IWorkUnit * getGlobalWorkUnit(ISecManager *secmgr = NULL, ISecUser *secuser = NULL)
- {
- // MORE - should it check security? Dali version never did...
- Owned<IRemoteConnection> daliLock;
- lockWuid(daliLock, GLOBAL_WORKUNIT);
- Owned<IPTree> wuXML = createPTree(GLOBAL_WORKUNIT);
- Owned<CLocalWorkUnit> wu = new CCassandraWorkUnit(this, wuXML.getClear(), NULL, NULL, daliLock.getClear(), false);
- return &wu->lockRemote(false);
- }
- virtual IConstWorkUnitIterator * getWorkUnitsByOwner(const char * owner, ISecManager *secmgr, ISecUser *secuser)
- {
- return getWorkUnitsByXXX("@submitID", owner, secmgr, secuser);
- }
- virtual IConstWorkUnitIterator * getScheduledWorkUnits(ISecManager *secmgr, ISecUser *secuser)
- {
- return getWorkUnitsByXXX("@state", getWorkunitStateStr(WUStateScheduled), secmgr, secuser); // MORE - there may be more efficient ways to do this?
- }
- virtual IConstWorkUnitIterator * getWorkUnitsSorted(WUSortField sortorder, WUSortField * filters, const void * filterbuf,
- unsigned startOffset, unsigned pageSize, __int64 * cachehint, unsigned *total,
- ISecManager *secmgr, ISecUser *secuser)
- {
- // To assist in the efficient implementation of this function without requiring local sorting and filtering,
- // we maintain a couple of additional search tables in addition to the main workunit table.
- //
- // The workunitsSearch table allows us to map from a given field's value to a workunit - to avoid the need
- // for a second lookup this table contains a copy of all the 'lightweight' fields in the workunit. The table
- // has a partition key of xpath, searchPrefix allowing it to be used for range lookups provided at least
- // 2 characters are provided, while hopefully spreading the load a little between Cassandra partitions.
- //
- // The uniqueValues table is used to track what values are present for some wild-searchable fields, so we do
- // two lookups - one to translate the wildcard to a set, then others to retrieve the wus matching each value
- // in the set. These are done as N parallel reads rather than a single query (which might naively be expected
- // to be more efficient) for two reasons. Firstly, we can get them back sorted that way and merge the results
- // on the fly. Secondly, it is actually more efficient, at least in the case when there are multiple Cassandra
- // partitions, since it in-effect cuts out the step of talking to a coordinator node which would talk to
- // multiple other nodes to get the data.
- //
- // We go to some lengths to avoid post-sorting if we can, but any sort order other than by wuid or totalThorTime
- // will post-sort it. If a post-sort is required, we will fetch up to WUID_LOCALSORT_LIMIT rows, - if there are
- // more then we should fail, and the user should be invited to add filters.
- //
- // We can do at most one 'hard' filter, plus a filter on wuid range - anything else will require post-filtering.
- // Most 'wild' searches can only be done with post-filtering, but some can be translated to multiple hard values
- // using the unique values table. In such cases we merge results in the fly to avoid a post-sort if possible
- //
- // Note that Cassandra does not presently support filtering before returning the values except where a
- // key or secondary index is available - even if ALLOW FILTERING is specified. If it did, some of the post-
- // filtering would be better off done at the Cassandra side.
- //
- // We should encourage the UI to present drop-lists of users for filtering, to avoid the use of wildcard
- // searches just because people can't remember the name.
- //
- // Searching by files probably needs to be done differently - a separate table mapping filenames to wuids.
- // This can perhaps be join-merged if other filters are present. This is still TBD at the moment.
- Owned<CCassandraWuUQueryCacheEntry> cached;
- if (cachehint && *cachehint)
- {
- CriticalBlock b(cacheCrit);
- cached.set(cacheIdMap.getValue(*cachehint));
- }
- if (cached)
- cached->touch();
- else
- cached.setown(new CCassandraWuUQueryCacheEntry());
- if (pageSize > INT_MAX)
- pageSize = INT_MAX;
- const WUSortField *thisFilter = filters;
- IArrayOf<IPostFilter> goodFilters;
- IArrayOf<IPostFilter> wuidFilters;
- IArrayOf<IPostFilter> poorFilters;
- IArrayOf<IPostFilter> fileFilters;
- IArrayOf<IPostFilter> remoteWildFilters;
- Owned<IConstWorkUnitIteratorEx> result;
- WUSortField baseSort = (WUSortField) (sortorder & 0xff);
- StringBuffer thorTimeThreshold;
- bool sortByThorTime = (baseSort == WUSFtotalthortime);
- bool needsPostSort = (baseSort != WUSFwuid && baseSort != WUSFtotalthortime);
- bool sortDescending = (sortorder & WUSFreverse) || needsPostSort;
- bool keepThorTimeFilter = sortByThorTime;
- if (!keepThorTimeFilter)
- {
- const WUSortField *filterPtr = filters;
- while (filterPtr && *filterPtr)
- {
- WUSortField field = (WUSortField) (*filterPtr & 0xff);
- if (field == WUSFtotalthortime)
- {
- keepThorTimeFilter = true;
- break;
- }
- filterPtr++;
- }
- }
- if (!result)
- {
- Owned<CassMultiIterator> merger = new CassMultiIterator(needsPostSort ? NULL : cached, 0, 0, sortDescending); // We always merge by wuid (except when we merge by thor time... we turn the compare off then to make it an appender)
- if (startOffset)
- {
- StringBuffer startWuid;
- unsigned found = cached->lookupStartRow(startWuid, thorTimeThreshold, startOffset);
- if (found)
- {
- if (!keepThorTimeFilter)
- {
- if (sortDescending)
- startWuid.setCharAt(startWuid.length()-1, startWuid.charAt(startWuid.length()-1)-1); // we want to find the last wuid BEFORE
- else
- startWuid.append('\x21'); // we want to find the first wuid AFTER. This is printable but not going to be in any wuid
- thorTimeThreshold.clear();
- }
- wuidFilters.append(*new PostFilter(sortorder==WUSFwuid ? WUSFwuid : WUSFwuidhigh, startWuid, true));
- startOffset -= found;
- merger->setStartOffset(found);
- }
- }
- const char *fv = (const char *) filterbuf;
- while (thisFilter && *thisFilter)
- {
- WUSortField field = (WUSortField) (*thisFilter & 0xff);
- bool isWild = (*thisFilter & WUSFwild) != 0;
- switch (field)
- {
- case WUSFappvalue:
- {
- assertex(fv);
- const char *name = fv;
- fv = fv + strlen(fv)+1;
- if (isWild)
- {
- StringBuffer s(fv);
- if (s.charAt(s.length()-1)== '*')
- s.remove(s.length()-1, 1);
- if (s.length())
- remoteWildFilters.append(*new AppValuePostFilter(name, s, true)); // Should we allow wild on the app and/or name too? Not at the moment
- }
- else
- goodFilters.append(*new AppValuePostFilter(name, fv, false));
- break;
- }
- case WUSFuser:
- case WUSFcluster:
- case WUSFjob:
- if (isWild)
- {
- StringBuffer s(fv);
- if (s.charAt(s.length()-1)== '*')
- s.remove(s.length()-1, 1);
- if (s.length())
- remoteWildFilters.append(*new PostFilter(field, s, true)); // Trailing-only wildcards can be done remotely
- }
- else if (strchr(fv, '|'))
- goodFilters.append(*new MultiValuePostFilter(field, fv));
- else
- goodFilters.append(*new PostFilter(field, fv, false));
- break;
- case WUSFstate:
- case WUSFpriority:
- case WUSFprotected:
- // These can't be wild, but are not very good filters
- if (strchr(fv, '|'))
- poorFilters.append(*new MultiValuePostFilter(field, fv));
- else
- poorFilters.append(*new PostFilter(field, fv, false));
- break;
- case WUSFwuid: // Acts as wuidLo when specified as a filter
- case WUSFwuidhigh:
- // Wuid filters can be added to good and poor filters, and to remoteWild if they are done via merged sets rather than ranges...
- if (keepThorTimeFilter)
- remoteWildFilters.append(*new PostFilter(field, fv, true));
- else
- mergeFilter(wuidFilters, field, fv);
- break;
- case WUSFfileread:
- case WUSFfilewritten:
- fileFilters.append(*new PostFilter(field, fv, true));
- break;
- case WUSFtotalthortime:
- // This should be treated as a low value - i.e. return only wu's that took longer than the supplied value
- if (thorTimeThreshold.isEmpty()) // If not a continuation
- formatTimeCollatable(thorTimeThreshold, milliToNano(atoi(fv)), false);
- break;
- case WUSFwildwuid:
- // Translate into a range - note that we only support trailing * wildcard.
- if (fv && *fv)
- {
- StringBuffer s(fv);
- if (s.charAt(s.length()-1)== '*')
- s.remove(s.length()-1, 1);
- if (s.length())
- {
- mergeFilter(wuidFilters, WUSFwuid, s);
- s.append('\x7e'); // '~' - higher than anything that should occur in a wuid (but still printable)
- mergeFilter(wuidFilters, WUSFwuidhigh, s);
- }
- }
- break;
- case WUSFecl: // This is different...
- if (isWild)
- merger->addPostFilter(*new PostFilter(field, fv, true)); // Wildcards on ECL are trailing and leading - no way to do remotely
- else
- goodFilters.append(*new PostFilter(field, fv, false)); // A hard filter on exact ecl match is possible but very unlikely
- break;
- default:
- UNSUPPORTED("Workunit filter criteria");
- }
- thisFilter++;
- if (fv)
- fv = fv + strlen(fv)+1;
- }
- if (fileFilters.length())
- {
- // We can't postfilter by these - we COULD in some cases do a join between these and some other filtered set
- // but we will leave that as an exercise to the reader. So if there is a fileFilter, read it first, and turn it into a merge set of the resulting wus.
- // MORE read and written are not the same
- assertex(fileFilters.length()==1); // If we supported more there would be a join phase here
- merger->addPostFilters(goodFilters, 0);
- merger->addPostFilters(poorFilters, 0);
- merger->addPostFilters(remoteWildFilters, 0);
- const IPostFilter &fileFilter = fileFilters.item(0);
- CassandraResult wuids(fetchDataForFiles(fileFilter.queryValue(), wuidFilters, fileFilter.queryField()==WUSFfileread));
- CassandraIterator rows(cass_iterator_from_result(wuids));
- StringBuffer value;
- while (cass_iterator_next(rows))
- {
- const CassRow *row = cass_iterator_get_row(rows);
- getCassString(value.clear(), cass_row_get_column(row, 0));
- merger->addResult(*new CassandraResult(fetchDataForWuid(workunitInfoMappings, value, true)));
- }
- }
- else if (sortByThorTime || !thorTimeThreshold.isEmpty())
- {
- merger->addPostFilters(goodFilters, 0);
- merger->addPostFilters(poorFilters, 0);
- merger->addPostFilters(remoteWildFilters, 0);
- if (wuidFilters.length())
- {
- // We are doing a continuation of a prior search that is sorted by a searchField, which may not be unique
- // We need two queries - one where searchField==startSearchField and wuid > startWuid,
- // and one where searchField > startSearchField. We know that there are no other filters in play (as Cassandra would not support them)
- // though there may be postfilters
- assertex(wuidFilters.length()==1);
- merger->addResult(*new CassandraResult(fetchMoreDataByThorTime(thorTimeThreshold, wuidFilters.item(0).queryValue(), sortDescending, merger->hasPostFilters() ? 0 : pageSize+startOffset)));
- merger->addResult(*new CassandraResult(fetchMoreDataByThorTime(thorTimeThreshold, NULL, sortDescending, merger->hasPostFilters() ? 0 : pageSize+startOffset)));
- merger->setCompareColumn(-1); // we want to preserve the order of these two results
- }
- else
- merger->addResult(*new CassandraResult(fetchDataByThorTime(thorTimeThreshold, sortDescending, merger->hasPostFilters() ? 0 : pageSize+startOffset)));
- }
- else if (goodFilters.length())
- {
- merger->addPostFilters(goodFilters, 1);
- merger->addPostFilters(poorFilters, 0);
- merger->addPostFilters(remoteWildFilters, 0);
- const IPostFilter &best = goodFilters.item(0);
- const char *queryValue = best.queryValue();
- if (strchr(queryValue, '|'))
- {
- StringArray values;
- values.appendListUniq(queryValue, "|");
- ForEachItemIn(vidx, values)
- {
- const char *thisValue = values.item(vidx);
- if (!isEmptyString(thisValue))
- merger->addResult(*new CassandraResult(fetchDataForKeyWithFilter(best.queryXPath(), thisValue, wuidFilters, sortorder, merger->hasPostFilters() ? 0 : pageSize+startOffset)));
- }
- }
- else
- merger->addResult(*new CassandraResult(fetchDataForKeyWithFilter(best.queryXPath(), best.queryValue(), wuidFilters, sortorder, merger->hasPostFilters() ? 0 : pageSize+startOffset)));
- }
- else if (poorFilters.length())
- {
- merger->addPostFilters(poorFilters, 1);
- merger->addPostFilters(remoteWildFilters, 0);
- const IPostFilter &best= poorFilters.item(0);
- const char *queryValue =best.queryValue();
- if (strchr(queryValue, '|'))
- {
- StringArray values;
- values.appendListUniq(queryValue, "|");
- ForEachItemIn(vidx, values)
- {
- const char *thisValue = values.item(vidx);
- if (!isEmptyString(thisValue))
- merger->addResult(*new CassandraResult(fetchDataForKeyWithFilter(best.queryXPath(), thisValue, wuidFilters, sortorder, merger->hasPostFilters() ? 0 : pageSize+startOffset)));
- }
- }
- else
- merger->addResult(*new CassandraResult(fetchDataForKeyWithFilter(best.queryXPath(), best.queryValue(), wuidFilters, sortorder, merger->hasPostFilters() ? 0 : pageSize+startOffset)));
- }
- else if (remoteWildFilters.length())
- {
- merger->addPostFilters(remoteWildFilters, 1); // Any other filters have to be done locally
- // Convert into a value IN [] which we do via a merge
- // NOTE - If we want sorted by filter (or don't care about sort order), we could do directly as a range - but the wuid range filters then don't work, and the merger would be invalid
- StringArray fieldValues;
- const IPostFilter &best= remoteWildFilters.item(0);
- _getUniqueValues(best.queryXPath(), best.queryValue(), fieldValues);
- ForEachItemIn(idx, fieldValues)
- {
- merger->addResult(*new CassandraResult(fetchDataForKeyWithFilter(best.queryXPath(), fieldValues.item(idx), wuidFilters, sortorder, merger->hasPostFilters() ? 0 : pageSize+startOffset)));
- }
- }
- else
- {
- // If all we have is a wuid range (or nothing), search the wuid table and/or return everything
- for (int i = 0; i < partitions; i++)
- {
- merger->addResult(*new CassandraResult(fetchDataByPartition(workunitInfoMappings, i, wuidFilters, sortorder, merger->hasPostFilters() ? 0 : pageSize+startOffset)));
- }
- }
- // The result we have will be sorted by wuid (ascending or descending)
- if (needsPostSort)
- {
- // A post-sort will be required.
- // Result should be limited in (to CASS_WORKUNIT_POSTSORT_LIMIT * number of results being merged)
- result.setown(new CassPostSortIterator(merger.getClear(), sortorder, pageSize > CASS_WORKUNIT_POSTSORT_LIMIT ? pageSize : CASS_WORKUNIT_POSTSORT_LIMIT));
- cached->setResult(result);
- }
- else
- result.setown(merger.getClear());
- }
- if (startOffset || needsPostSort || result->hasPostFilters() || result->isMerging()) // we need a subpage if we have fetched anything other than exactly the rows requested
- result.setown(new SubPageIterator(result.getClear(), startOffset, pageSize));
- if (cachehint)
- {
- *cachehint = cached->queryHint();
- CriticalBlock b(cacheCrit);
- cacheIdMap.setValue(*cachehint, cached); // Links its parameter
- }
- if (total)
- *total = 0; // We don't know
- return result.getClear();
- }
- virtual StringArray &getUniqueValues(WUSortField field, const char *prefix, StringArray &result) const
- {
- return _getUniqueValues(queryFilterXPath(field), prefix, result);
- }
- virtual unsigned numWorkUnits()
- {
- unsigned total = 0;
- CIArrayOf<CassandraFuture> futures;
- for (int i = 0; i < partitions; i++)
- {
- CassandraStatement statement(prepareStatement("SELECT COUNT(*) FROM workunits where partition=?;"));
- statement.bindInt32(0, i);
- futures.append(*new CassandraFuture(cass_session_execute(querySession(), statement)));
- }
- ForEachItemIn(idx, futures)
- {
- CassandraFuture &future = futures.item(idx);
- future.wait("select count(*)");
- CassandraResult result(cass_future_get_result(future));
- total += getUnsignedResult(NULL, getSingleResult(result));
- }
- return total;
- }
- /*
- virtual bool isAborting(const char *wuid) const - done in the base class using dali
- virtual void clearAborting(const char *wuid) - done in the base class using dali
- */
- virtual WUState waitForWorkUnit(const char * wuid, unsigned timeout, bool compiled, std::list<WUState> expectedStates)
- {
- Owned<WorkUnitWaiter> waiter = new WorkUnitWaiter(wuid, SubscribeOptionState);
- LocalIAbortHandler abortHandler(*waiter);
- CassandraStatement statement(prepareStatement("select state, agentSession from workunits where partition=? and wuid=?;"));
- statement.bindInt32(0, rtlHash32VStr(wuid, 0) % partitions);
- statement.bindString(1, wuid);
- SessionId agent = 0;
- bool agentSessionStopped = false;
- unsigned start = msTick();
- for (;;)
- {
- CassandraFuture future(cass_session_execute(querySession(), statement));
- future.wait("Lookup wu state");
- CassandraResult result(cass_future_get_result(future));
- const CassRow *row = cass_result_first_row(result);
- if (!row)
- return WUStateUnknown;
- const CassValue *stateVal = cass_row_get_column(row, 0);
- if (!stateVal)
- return WUStateUnknown;
- StringBuffer stateStr;
- getCassString(stateStr, stateVal);
- WUState state = getWorkUnitState(stateStr);
- auto it = std::find(expectedStates.begin(), expectedStates.end(), state);
- if (it != expectedStates.end())
- return state;
- switch (state)
- {
- case WUStateCompiled:
- case WUStateUploadingFiles:
- if (compiled)
- return state;
- break;
- case WUStateCompleted:
- case WUStateFailed:
- case WUStateAborted:
- return state;
- case WUStateWait:
- break;
- case WUStateCompiling:
- case WUStateRunning:
- case WUStateDebugPaused:
- case WUStateDebugRunning:
- case WUStateBlocked:
- case WUStateAborting:
- if (agentSessionStopped)
- {
- reportAbnormalTermination(wuid, state, agent);
- return state;
- }
- if (queryDaliServerVersion().compare("2.1")>=0)
- {
- agent = getUnsignedResult(NULL, cass_row_get_column(row, 1));
- if(agent && querySessionManager().sessionStopped(agent, 0))
- {
- agentSessionStopped = true;
- continue;
- }
- }
- break;
- }
- agentSessionStopped = false; // reset for state changes such as WUStateWait then WUStateRunning again
- unsigned waited = msTick() - start;
- if (timeout==-1 || waited + 20000 < timeout)
- {
- waiter->wait(20000); // recheck state every 20 seconds, in case eclagent has crashed.
- if (waiter->isAborted())
- return WUStateUnknown; // MORE - throw an exception?
- }
- else if (waited > timeout || !waiter->wait(timeout-waited))
- return WUStateUnknown; // MORE - throw an exception?
- }
- }
- virtual WUAction waitForWorkUnitAction(const char * wuid, WUAction original)
- {
- StringAttr origStr(getWorkunitActionStr(original));
- Owned<WorkUnitWaiter> waiter = new WorkUnitWaiter(wuid, SubscribeOptionAction);
- LocalIAbortHandler abortHandler(*waiter);
- CassandraStatement statement(prepareStatement("select action from workunits where partition=? and wuid=?;"));
- statement.bindInt32(0, rtlHash32VStr(wuid, 0) % partitions);
- statement.bindString(1, wuid);
- WUAction ret = WUActionUnknown;
- for (;;)
- {
- CassandraFuture future(cass_session_execute(querySession(), statement));
- future.wait("Lookup wu action");
- CassandraResult result(cass_future_get_result(future));
- const CassRow *row = cass_result_first_row(result);
- if (!row)
- {
- PROGLOG("While waiting for job %s, WU no longer exists", wuid);
- break;
- }
- const CassValue *actionVal = cass_row_get_column(row, 0);
- if (!actionVal)
- {
- PROGLOG("While waiting for job %s, WU action cannot be read", wuid);
- break;
- }
- StringBuffer actionStr;
- getCassString(actionStr, actionVal);
- if (!streq(actionStr, origStr))
- {
- ret = getWorkunitAction(actionStr);
- break;
- }
- waiter->wait(10000); // recheck state every 20 seconds even if no notifications... just because we used to before
- if (waiter->isAborted())
- break;
- }
- return ret;
- }
- unsigned validateRepository(bool fix)
- {
- unsigned errCount = 0;
- // 1. Check that every entry in main wu table has matching entries in secondary tables
- CassandraResult result(fetchData(workunitInfoMappings+1));
- CassandraIterator rows(cass_iterator_from_result(result));
- if (fix)
- {
- // Delete the unique values table - the validate process recreates it afresh
- executeSimpleCommand(querySession(), "TRUNCATE uniqueSearchValues;");
- }
- while (cass_iterator_next(rows))
- {
- Owned<IPTree> wuXML = rowToPTree(NULL, NULL, workunitInfoMappings+1, cass_iterator_get_row(rows));
- const char *wuid = wuXML->queryName();
- // For each search entry, check that we get matching XML
- for (const char * const *search = searchPaths; *search; search++)
- errCount += validateSearch(*search, wuid, wuXML, fix);
- }
- // 2. Check that there are no orphaned entries in search or child tables
- errCount += checkOrphans(searchMappings, 3, fix);
- for (const ChildTableInfo * const * table = childTables; *table != NULL; table++)
- errCount += checkOrphans(table[0]->mappings, 1, fix);
- errCount += checkOrphans(wuGraphProgressMappings, 1, fix);
- errCount += checkOrphans(wuGraphStateMappings, 1, fix);
- errCount += checkOrphans(wuGraphRunningMappings, 1, fix);
- return errCount;
- }
- virtual void deleteRepository(bool recreate)
- {
- // USE WITH CARE!
- CassandraSession s(cass_session_new());
- CassandraFuture future(cass_session_connect(s, cluster.queryCluster()));
- future.wait("connect without keyspace to delete");
- VStringBuffer deleteKeyspace("DROP KEYSPACE IF EXISTS %s;", cluster.queryKeySpace());
- executeSimpleCommand(s, deleteKeyspace);
- s.set(NULL);
- cluster.disconnect();
- if (recreate)
- createRepository();
- }
- virtual void createRepository()
- {
- cluster.disconnect();
- CassandraSession s(cass_session_new());
- CassandraFuture future(cass_session_connect(s, cluster.queryCluster()));
- future.wait("connect without keyspace");
- VStringBuffer create("CREATE KEYSPACE IF NOT EXISTS %s WITH replication = { 'class': 'SimpleStrategy', 'replication_factor': '1' };", cluster.queryKeySpace()); // MORE - options from props? Not 100% sure if they are appropriate.
- executeSimpleCommand(s, create);
- s.set(NULL);
- cluster.connect();
- createVersionTable(false);
- ensureTable(querySession(), workunitsMappings);
- ensureTable(querySession(), searchMappings);
- ensureTable(querySession(), uniqueSearchMappings);
- ensureTable(querySession(), filesSearchMappings);
- for (const ChildTableInfo * const * table = childTables; *table != NULL; table++)
- ensureTable(querySession(), table[0]->mappings);
- ensureTable(querySession(), wuGraphProgressMappings);
- ensureTable(querySession(), wuGraphStateMappings);
- ensureTable(querySession(), wuGraphRunningMappings);
- }
- virtual const char *queryStoreType() const
- {
- return "Cassandra";
- }
- // Interface ICassandraSession
- virtual CassSession *querySession() const { return cluster.querySession(); };
- virtual unsigned queryTraceLevel() const { return traceLevel; };
- virtual CassandraPrepared *prepareStatement(const char *query) const
- {
- return cluster.prepareStatement(query, traceLevel>=2);
- }
- virtual void executeAsync(CIArrayOf<CassandraStatement> &batch, const char *what) const override
- {
- if (batch.ordinality())
- {
- if (queryTraceLevel() > 1)
- DBGLOG("Executing async batch %s", what);
- cluster.executeAsync(batch, what);
- }
- }
- virtual unsigned queryPartitions() const override
- {
- return partitions;
- }
- virtual unsigned queryPrefixSize() const override
- {
- return prefixSize;
- }
- private:
- virtual void executeBatch(CassandraBatch &batch, const char *what) const
- {
- if (queryTraceLevel() > 1)
- DBGLOG("Executing batch %s", what);
- CassandraFuture futureBatch(cass_session_execute_batch(querySession(), batch));
- futureBatch.wait(what);
- }
- void createVersionTable(bool force)
- {
- StringBuffer schema;
- executeSimpleCommand(querySession(), describeTable(versionMappings, schema));
- Owned<IPTree> oldVersion = getVersionInfo();
- if (force || !oldVersion)
- {
- VStringBuffer versionInfo("<Version major='%d' minor='%d' numPartitions='%d' searchPrefixSize='%d'/>", majorVersion, minorVersion, partitions, prefixSize);
- CassandraBatch versionBatch(CASS_BATCH_TYPE_LOGGED);
- Owned<IPTree> pt = createPTreeFromXMLString(versionInfo);
- for (int i = 0; i < DEFAULT_PARTITIONS; i++) // NOTE - version table always has DEFAULT_PARTITIONS partitions
- {
- pt->setPropInt("@partition", i);
- simpleXMLtoCassandra(this, versionBatch, versionMappings, pt, NULL);
- }
- executeBatch(versionBatch, "createVersionTable");
- }
- }
- IPTree *getVersionInfo()
- {
- try
- {
- StringBuffer names;
- StringBuffer tableName;
- getFieldNames(versionMappings, names, tableName);
- VStringBuffer selectQuery("select %s from %s where partition=?;", names.str()+1, tableName.str());
- CassandraStatement select(prepareStatement(selectQuery));
- select.bindInt32(0, rand_r(&randState) % DEFAULT_PARTITIONS); // NOTE - version table always has DEFAULT_PARTITIONS partitions
- CassandraFuture future(cass_session_execute(querySession(), select));
- future.wait("read version");
- CassandraResult result(cass_future_get_result(future));
- const CassRow *row = cass_result_first_row(result);
- if (row)
- return rowToPTree(NULL, NULL, versionMappings, row);
- }
- catch (IException *E)
- {
- EXCLOG(E);
- E->Release();
- }
- catch (...)
- {
- DBGLOG("WARNING: Unknown exception caught while trying to retrieve Cassandra repository version information");
- }
- return NULL;
- }
- bool checkWuExists(const char *wuid)
- {
- CassandraStatement statement(prepareStatement("SELECT COUNT(*) FROM workunits where partition=? and wuid=?;"));
- statement.bindInt32(0, rtlHash32VStr(wuid, 0) % partitions);
- statement.bindString(1, wuid);
- CassandraFuture future(cass_session_execute(querySession(), statement));
- future.wait("select count(*)");
- CassandraResult result(cass_future_get_result(future));
- return getUnsignedResult(NULL, getSingleResult(result)) != 0; // Shouldn't be more than 1, either
- }
- void mergeFilter(IArrayOf<IPostFilter> &filters, WUSortField field, const char *value)
- {
- // Combine multiple filters on wuid - Cassandra doesn't like seeing more than one.
- ForEachItemIn(idx, filters)
- {
- PostFilter &filter = static_cast<PostFilter &>(filters.item(idx));
- if (filter.queryField()==field)
- {
- const char *prevLimit = filter.queryValue();
- int diff = strcmp(prevLimit, value);
- if (diff && ((diff < 0) == (field==WUSFwuid)))
- filter.setValue(value);
- return;
- }
- }
- // Not found - add new filter
- filters.append(*new PostFilter(field, value, true));
- }
- IConstWorkUnitIterator * getWorkUnitsByXXX(const char *xpath, const char *key, ISecManager *secmgr, ISecUser *secuser)
- {
- Owned<CassMultiIterator> merger = new CassMultiIterator(NULL, 0, 0, true); // Merge by wuid
- if (!key || !*key)
- {
- IArrayOf<IPostFilter> wuidFilters;
- for (int i = 0; i < partitions; i++)
- {
- merger->addResult(*new CassandraResult(fetchDataByPartition(workunitInfoMappings, i, wuidFilters)));
- }
- }
- else
- merger->addResult(*new CassandraResult(fetchDataForKey(xpath, key)));
- return createSecureConstWUIterator(merger.getClear(), secmgr, secuser);
- }
- StringArray &_getUniqueValues(const char *xpath, const char *prefix, StringArray &result) const
- {
- if (prefix && strlen(prefix) >= prefixSize)
- {
- CassandraResult r(fetchDataForWildSearch(xpath, prefix, uniqueSearchMappings));
- CassandraIterator rows(cass_iterator_from_result(r));
- StringBuffer value;
- while (cass_iterator_next(rows))
- {
- const CassRow *row = cass_iterator_get_row(rows);
- getCassString(value.clear(), cass_row_get_column(row, 0));
- result.append(value);
- }
- }
- return result;
- }
- unsigned validateSearch(const char *xpath, const char *wuid, IPTree *wuXML, bool fix)
- {
- unsigned errCount = 0;
- const char *childKey = wuXML->queryProp(xpath);
- if (childKey && *childKey)
- {
- CIArrayOf<CassandraStatement> batch;
- CIArrayOf<CassandraStatement> deletes;
- CassandraResult result(fetchDataForKeyAndWuid(xpath, childKey, wuid));
- if (fix)
- simpleXMLtoCassandra(this, batch, uniqueSearchMappings, wuXML, xpath);
- switch (cass_result_row_count(result))
- {
- case 0:
- DBGLOG("Missing search data for %s for wuid=%s key=%s", xpath, wuid, childKey);
- if (fix)
- simpleXMLtoCassandra(this, batch, searchMappings, wuXML, xpath);
- errCount++;
- break;
- case 1:
- {
- Owned<IPTree> secXML = rowToPTree(xpath, childKey, searchMappings+4, cass_result_first_row(result)); // type, prefix, key, and wuid are not returned
- secXML->renameProp("/", wuid);
- if (!areMatchingPTrees(wuXML, secXML))
- {
- DBGLOG("Mismatched search data for %s for wuid %s", xpath, wuid);
- if (fix)
- simpleXMLtoCassandra(this, batch, searchMappings, wuXML, xpath);
- errCount++;
- }
- break;
- }
- default:
- DBGLOG("Multiple secondary data %d for %s for wuid %s", (int) cass_result_row_count(result), xpath, wuid); // This should be impossible!
- if (fix)
- {
- deleteSecondaryByKey(xpath, childKey, wuid, this, deletes);
- simpleXMLtoCassandra(this, batch, searchMappings, wuXML, xpath);
- }
- break;
- }
- if (fix)
- {
- executeAsync(deletes, "delete search");
- executeAsync(batch, "fix search");
- }
- }
- return errCount;
- }
- unsigned checkOrphans(const CassandraXmlMapping *mappings, unsigned wuidIndex, bool fix)
- {
- unsigned errCount = 0;
- CassandraResult result(fetchData(mappings));
- CassandraIterator rows(cass_iterator_from_result(result));
- while (cass_iterator_next(rows))
- {
- const CassRow *row = cass_iterator_get_row(rows);
- StringBuffer wuid;
- getCassString(wuid, cass_row_get_column(row, wuidIndex));
- if (!streq(wuid, GLOBAL_WORKUNIT) && !checkWuExists(wuid))
- {
- DBGLOG("Orphaned data in %s for wuid=%s", queryTableName(mappings), wuid.str());
- if (fix)
- {
- if (wuidIndex)
- {
- CIArrayOf<CassandraStatement> secondaryBatch;
- StringBuffer xpath, fieldValue;
- getCassString(xpath, cass_row_get_column(row, 0));
- getCassString(fieldValue, cass_row_get_column(row, 2));
- deleteSecondaryByKey(xpath, fieldValue, wuid, this, secondaryBatch);
- executeAsync(secondaryBatch, "Delete orphans");
- }
- else
- {
- CassandraBatch batch(CASS_BATCH_TYPE_UNLOGGED);
- deleteChildByWuid(mappings, wuid, batch);
- executeBatch(batch, "Delete orphans");
- }
- }
- errCount++;
- }
- }
- return errCount;
- }
- IPTree *cassandraToWorkunitXML(const char *wuid) const
- {
- CassandraResult result(fetchDataForWuid(workunitsMappings, wuid, false));
- CassandraIterator rows(cass_iterator_from_result(result));
- if (cass_iterator_next(rows)) // should just be one
- {
- Owned<IPTree> wuXML = createPTree(wuid);
- wuXML->setProp("@xmlns:xsi", "http://www.w3.org/1999/XMLSchema-instance");
- CassandraIterator cols(cass_iterator_from_row(cass_iterator_get_row(rows)));
- unsigned colidx = 2; // wuid and partition are not returned
- while (cass_iterator_next(cols))
- {
- assertex(workunitsMappings[colidx].columnName);
- const CassValue *value = cass_iterator_get_column(cols);
- if (value && !cass_value_is_null(value))
- workunitsMappings[colidx].mapper.toXML(wuXML, workunitsMappings[colidx].xpath, value);
- colidx++;
- }
- return wuXML.getClear();
- }
- else
- return NULL;
- }
- // Fetch all rows from a table
- const CassResult *fetchData(const CassandraXmlMapping *mappings) const
- {
- StringBuffer names;
- StringBuffer tableName;
- getFieldNames(mappings, names, tableName);
- VStringBuffer selectQuery("select %s from %s;", names.str()+1, tableName.str());
- if (traceLevel >= 2)
- DBGLOG("%s", selectQuery.str());
- CassandraStatement statement(cass_statement_new(selectQuery.str(), 0));
- return executeQuery(querySession(), statement);
- }
- // Fetch all rows from a single partition of a table
- const CassResult *fetchDataByPartition(const CassandraXmlMapping *mappings, int partition, const IArrayOf<IPostFilter> &wuidFilters, unsigned sortOrder=WUSFwuid|WUSFreverse, unsigned limit=0) const
- {
- StringBuffer names;
- StringBuffer tableName;
- getFieldNames(mappings+1, names, tableName); // Don't fetch partition column
- VStringBuffer selectQuery("select %s from %s where partition=?", names.str()+1, tableName.str());
- ForEachItemIn(idx, wuidFilters)
- {
- const IPostFilter &wuidFilter = wuidFilters.item(idx);
- selectQuery.appendf(" and wuid %s ?", wuidFilter.queryField()==WUSFwuidhigh ? "<=" : ">=");
- }
- switch (sortOrder)
- {
- case WUSFwuid:
- selectQuery.append(" ORDER BY WUID ASC");
- break;
- case WUSFwuid|WUSFreverse:
- // If not wuid, descending, we will have to post-sort
- selectQuery.append(" ORDER BY WUID DESC");
- break;
- default:
- // If not wuid, descending, we will have to post-sort. We still need in wuid desc order for the merge though.
- selectQuery.append(" ORDER BY WUID DESC");
- if (!limit)
- limit = CASS_WORKUNIT_POSTSORT_LIMIT;
- break;
- }
- if (limit)
- selectQuery.appendf(" LIMIT %u", limit);
- selectQuery.append(';');
- CassandraStatement select(prepareStatement(selectQuery));
- select.bindInt32(0, partition);
- ForEachItemIn(idx2, wuidFilters)
- {
- const IPostFilter &wuidFilter = wuidFilters.item(idx2);
- select.bindString(idx2+1, wuidFilter.queryValue());
- }
- return executeQuery(querySession(), select);
- }
- // Fetch matching rows from a child table, or the main wu table
- const CassResult *fetchDataForWuid(const CassandraXmlMapping *mappings, const char *wuid, bool includeWuid) const
- {
- assertex(wuid && *wuid);
- StringBuffer names;
- StringBuffer tableName;
- getFieldNames(mappings + (includeWuid ? 1 : 2), names, tableName); // mappings+2 means we don't return the partition or wuid columns
- VStringBuffer selectQuery("select %s from %s where partition=? and wuid=?;", names.str()+1, tableName.str());
- CassandraStatement select(prepareStatement(selectQuery));
- select.bindInt32(0, rtlHash32VStr(wuid, 0) % partitions);
- select.bindString(1, wuid);
- return executeQuery(querySession(), select);
- }
- const CassResult *fetchDataForWuidAndKey(const CassandraXmlMapping *mappings, const char *wuid, const char *key) const
- {
- assertex(wuid && *wuid);
- StringBuffer names;
- StringBuffer tableName;
- getFieldNames(mappings+2, names, tableName); // mappings+2 means we don't return the partition or wuid columns. We do return the key.
- VStringBuffer selectQuery("select %s from %s where partition=? and wuid=? and %s=?;", names.str()+1, tableName.str(), mappings[2].columnName);
- CassandraStatement select(prepareStatement(selectQuery));
- select.bindInt32(0, rtlHash32VStr(wuid, 0) % partitions);
- select.bindString(1, wuid);
- select.bindString(2, key);
- return executeQuery(querySession(), select);
- }
- // Fetch matching rows from the search table, for all wuids, sorted by wuid
- const CassResult *fetchDataForKey(const char *xpath, const char *key) const
- {
- assertex(key);
- StringBuffer names;
- StringBuffer tableName;
- StringBuffer ucKey(key);
- ucKey.toUpperCase();
- getFieldNames(searchMappings+3, names, tableName); // mappings+3 means we don't return the key columns (xpath, upper(keyPrefix), upper(key))
- VStringBuffer selectQuery("select %s from %s where xpath=? and fieldPrefix=? and fieldValue=?", names.str()+1, tableName.str());
- selectQuery.append(" ORDER BY fieldValue ASC, WUID desc;");
- CassandraStatement select(prepareStatement(selectQuery));
- select.bindString(0, xpath);
- select.bindString_n(1, ucKey, prefixSize);
- select.bindString(2, ucKey);
- return executeQuery(querySession(), select);
- }
- // Fetch matching rows from the search table, for all wuids, sorted by wuid
- const CassResult *fetchDataForKeyWithFilter(const char *xpath, const char *key, const IArrayOf<IPostFilter> &wuidFilters, unsigned sortOrder, unsigned limit) const
- {
- StringBuffer names;
- StringBuffer tableName;
- StringBuffer ucKey(key);
- ucKey.toUpperCase();
- getFieldNames(searchMappings+3, names, tableName); // mappings+3 means we don't return the key columns (xpath, upper(keyPrefix), upper(key))
- VStringBuffer selectQuery("select %s from %s where xpath=? and fieldPrefix=? and fieldValue=?", names.str()+1, tableName.str());
- ForEachItemIn(idx, wuidFilters)
- {
- const IPostFilter &wuidFilter = wuidFilters.item(idx);
- selectQuery.appendf(" and wuid %s ?", wuidFilter.queryField()==WUSFwuidhigh ? "<=" : ">=");
- }
- switch (sortOrder)
- {
- case WUSFwuid:
- selectQuery.append(" ORDER BY fieldValue DESC, WUID ASC");
- break;
- case WUSFwuid|WUSFreverse:
- selectQuery.append(" ORDER BY fieldValue ASC, WUID DESC");
- break;
- default:
- // If not wuid, descending, we will have to post-sort. We still need in wuid desc order for the merge though.
- selectQuery.appendf(" ORDER BY fieldvalue ASC, WUID DESC");
- limit = CASS_WORKUNIT_POSTSORT_LIMIT;
- break;
- }
- if (limit)
- selectQuery.appendf(" LIMIT %u", limit);
- CassandraStatement select(prepareStatement(selectQuery));
- select.bindString(0, xpath);
- select.bindString_n(1, ucKey, prefixSize);
- select.bindString(2, ucKey);
- ForEachItemIn(idx2, wuidFilters)
- {
- const IPostFilter &wuidFilter = wuidFilters.item(idx2);
- select.bindString(3+idx2, wuidFilter.queryValue());
- }
- return executeQuery(querySession(), select);
- }
- // Fetch matching rows from the search or uniqueSearch table, for a given prefix
- const CassResult *fetchDataForWildSearch(const char *xpath, const char *prefix, const CassandraXmlMapping *mappings) const
- {
- assertex(prefix && *prefix);
- StringBuffer names;
- StringBuffer tableName;
- StringBuffer ucKey(prefix);
- ucKey.toUpperCase();
- StringBuffer ucKeyEnd(ucKey);
- size32_t len = ucKeyEnd.length();
- assertex(len);
- ucKeyEnd.setCharAt(len-1, ucKeyEnd.charAt(len-1)+1);
- getFieldNames(mappings+3, names, tableName); // mappings+3 means we don't return the key columns (xpath, upper(keyPrefix), upper(key))
- VStringBuffer selectQuery("select %s from %s where xpath=? and fieldPrefix=? and fieldValue>=? and fieldValue<?;", names.str()+1, tableName.str());
- CassandraStatement select(prepareStatement(selectQuery));
- select.bindString(0, xpath);
- select.bindString_n(1, ucKey, prefixSize);
- select.bindString(2, ucKey);
- select.bindString(3, ucKeyEnd);
- return executeQuery(querySession(), select);
- }
- // Fetch rows from the search table, by thorTime, above a threshold
- const CassResult *fetchDataByThorTime(const char *threshold, bool descending, unsigned limit) const
- {
- StringBuffer names;
- StringBuffer tableName;
- getFieldNames(searchMappings+3, names, tableName); // mappings+3 means we don't return the key columns (xpath, upper(keyPrefix), upper(key))
- VStringBuffer selectQuery("select %s from %s where xpath=? and fieldPrefix=?", names.str()+1, tableName.str());
- if (threshold && *threshold)
- selectQuery.appendf(" and fieldValue >= ?");
- if (descending)
- selectQuery.append(" ORDER BY fieldValue DESC, wuid ASC");
- else
- selectQuery.append(" ORDER BY fieldValue ASC, wuid DESC");
- if (limit)
- selectQuery.appendf(" LIMIT %u", limit);
- selectQuery.append(';');
- CassandraStatement select(prepareStatement(selectQuery));
- select.bindString(0, "@totalThorTime");
- select.bindString_n(1, " ", prefixSize); // This would stop working if we ever set the search prefix to > 8 chars. So don't.
- if (threshold && *threshold)
- select.bindString(2, threshold);
- return executeQuery(querySession(), select);
- }
- // Fetch rows from the search table, continuing a previous query that was sorted by thor time - part one
- // This technique only works for thor time where we have forced to a single partition. Otherwise it gets even more complicated, and not worth it.
- const CassResult *fetchMoreDataByThorTime(const char *threshold, const char *wuid, bool descending, unsigned limit) const
- {
- StringBuffer names;
- StringBuffer tableName;
- getFieldNames(searchMappings+3, names, tableName); // mappings+3 means we don't return the key columns (xpath, upper(keyPrefix), upper(key))
- const char *wuidTest;
- const char *fieldTest;
- if (descending)
- {
- wuidTest = ">";
- fieldTest = wuid ? "=" : "<";
- }
- else
- {
- wuidTest = "<";
- fieldTest = wuid ? "=" : ">";
- }
- VStringBuffer selectQuery("select %s from %s where xpath=? and fieldPrefix=? and fieldValue %s ?", names.str()+1, tableName.str(), fieldTest);
- if (wuid)
- selectQuery.appendf(" and wuid %s ?", wuidTest);
- if (descending)
- selectQuery.append(" ORDER BY fieldValue DESC, WUID ASC");
- else
- selectQuery.append(" ORDER BY fieldValue ASC, WUID DESC");
- if (limit)
- selectQuery.appendf(" LIMIT %u", limit);
- selectQuery.append(';');
- CassandraStatement select(prepareStatement(selectQuery));
- select.bindString(0, "@totalThorTime");
- select.bindString_n(1, threshold, prefixSize);
- select.bindString(2, threshold);
- if (wuid)
- select.bindString(3, wuid);
- return executeQuery(querySession(), select);
- }
- // Fetch rows from the file search table (covers files read and files written)
- const CassResult *fetchDataForFiles(const char *name, const IArrayOf<IPostFilter> &wuidFilters, bool read) const
- {
- StringBuffer names;
- StringBuffer tableName;
- getFieldNames(filesSearchMappings+2, names, tableName); // mappings+2 means we don't return the key columns (name and readmode)
- VStringBuffer selectQuery("select %s from %s where name=? and read=?", names.str()+1, tableName.str());
- ForEachItemIn(idx, wuidFilters)
- {
- const IPostFilter &wuidFilter = wuidFilters.item(idx);
- selectQuery.appendf(" and wuid %s ?", wuidFilter.queryField()==WUSFwuidhigh ? "<=" : ">=");
- }
- CassandraStatement select(prepareStatement(selectQuery));
- select.bindString(0, name);
- select.bindBool(1, read ? cass_true : cass_false);
- ForEachItemIn(idx2, wuidFilters)
- {
- const IPostFilter &wuidFilter = wuidFilters.item(idx2);
- select.bindString(idx2+2, wuidFilter.queryValue());
- }
- return executeQuery(querySession(), select);
- }
- // Fetch matching rows from the search table, for a single wuid
- const CassResult *fetchDataForKeyAndWuid(const char *xpath, const char *key, const char *wuid) const
- {
- assertex(key);
- StringBuffer names;
- StringBuffer tableName;
- StringBuffer ucKey(key);
- ucKey.toUpperCase();
- getFieldNames(searchMappings+4, names, tableName); // mappings+4 means we don't return the key columns (xpath, upper(keyPrefix), upper(key), and wuid)
- VStringBuffer selectQuery("select %s from %s where xpath=? and fieldPrefix=? and fieldValue =? and wuid=?;", names.str()+1, tableName.str());
- CassandraStatement select(prepareStatement(selectQuery));
- select.bindString(0, xpath);
- select.bindString_n(1, ucKey, prefixSize);
- select.bindString(2, ucKey);
- select.bindString(3, wuid);
- return executeQuery(querySession(), select);
- }
- // Delete matching rows from a child table
- virtual void deleteChildByWuid(const CassandraXmlMapping *mappings, const char *wuid, CassBatch *batch) const
- {
- StringBuffer names;
- StringBuffer tableName;
- getFieldNames(mappings, names, tableName);
- VStringBuffer insertQuery("DELETE from %s where partition=? and wuid=?;", tableName.str());
- CassandraStatement update(prepareStatement(insertQuery));
- update.bindInt32(0, rtlHash32VStr(wuid, 0) % partitions);
- update.bindString(1, wuid);
- check(cass_batch_add_statement(batch, update));
- }
- unsigned retireCache()
- {
- CriticalBlock b(cacheCrit); // Is this too coarse-grained?
- unsigned expires = CASS_WU_QUERY_EXPIRES;
- unsigned now = msTick();
- ICopyArrayOf<CCassandraWuUQueryCacheEntry> goers;
- HashIterator iter(cacheIdMap);
- ForEach(iter)
- {
- CCassandraWuUQueryCacheEntry *entry = cacheIdMap.mapToValue(&iter.query());
- unsigned age = now - entry->queryLastAccess();
- int ttl = CASS_WU_QUERY_EXPIRES-age;
- if (ttl<= 0)
- goers.append(*entry);
- else if (ttl< expires)
- expires = ttl;
- }
- ForEachItemIn(idx, goers)
- {
- DBGLOG("Expiring cache entry %p", &goers.item(idx));
- cacheIdMap.remove(goers.item(idx).queryHint());
- }
- return expires;
- }
- class CacheRetirer : public Thread
- {
- public:
- CacheRetirer(CCasssandraWorkUnitFactory &_parent) : Thread("WorkunitListCacheRetirer"), parent(_parent)
- {
- stopping = false;
- }
- virtual int run()
- {
- while (!stopping)
- {
- unsigned delay = parent.retireCache();
- sem.wait(delay);
- }
- return 0;
- }
- void stop()
- {
- stopping = true;
- sem.signal();
- }
- private:
- Semaphore sem;
- CCasssandraWorkUnitFactory &parent;
- bool stopping;
- } cacheRetirer;
- unsigned randomizeSuffix;
- unsigned traceLevel;
- unsigned randState;
- int partitions = DEFAULT_PARTITIONS;
- int prefixSize = DEFAULT_PREFIX_SIZE;
- CassandraClusterSession cluster;
- mutable CriticalSection cacheCrit;
- mutable MapXToMyClass<__uint64, __uint64, CCassandraWuUQueryCacheEntry> cacheIdMap;
- };
- } // namespace
- extern "C" EXPORT IWorkUnitFactory *createWorkUnitFactory(const SharedObject *dll, const IPropertyTree *props)
- {
- return new cassandraembed::CCasssandraWorkUnitFactory(dll, props);
- }
|