ws_workunitsService.cpp 204 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323132413251326132713281329133013311332133313341335133613371338133913401341134213431344134513461347134813491350135113521353135413551356135713581359136013611362136313641365136613671368136913701371137213731374137513761377137813791380138113821383138413851386138713881389139013911392139313941395139613971398139914001401140214031404140514061407140814091410141114121413141414151416141714181419142014211422142314241425142614271428142914301431143214331434143514361437143814391440144114421443144414451446144714481449145014511452145314541455145614571458145914601461146214631464146514661467146814691470147114721473147414751476147714781479148014811482148314841485148614871488148914901491149214931494149514961497149814991500150115021503150415051506150715081509151015111512151315141515151615171518151915201521152215231524152515261527152815291530153115321533153415351536153715381539154015411542154315441545154615471548154915501551155215531554155515561557155815591560156115621563156415651566156715681569157015711572157315741575157615771578157915801581158215831584158515861587158815891590159115921593159415951596159715981599160016011602160316041605160616071608160916101611161216131614161516161617161816191620162116221623162416251626162716281629163016311632163316341635163616371638163916401641164216431644164516461647164816491650165116521653165416551656165716581659166016611662166316641665166616671668166916701671167216731674167516761677167816791680168116821683168416851686168716881689169016911692169316941695169616971698169917001701170217031704170517061707170817091710171117121713171417151716171717181719172017211722172317241725172617271728172917301731173217331734173517361737173817391740174117421743174417451746174717481749175017511752175317541755175617571758175917601761176217631764176517661767176817691770177117721773177417751776177717781779178017811782178317841785178617871788178917901791179217931794179517961797179817991800180118021803180418051806180718081809181018111812181318141815181618171818181918201821182218231824182518261827182818291830183118321833183418351836183718381839184018411842184318441845184618471848184918501851185218531854185518561857185818591860186118621863186418651866186718681869187018711872187318741875187618771878187918801881188218831884188518861887188818891890189118921893189418951896189718981899190019011902190319041905190619071908190919101911191219131914191519161917191819191920192119221923192419251926192719281929193019311932193319341935193619371938193919401941194219431944194519461947194819491950195119521953195419551956195719581959196019611962196319641965196619671968196919701971197219731974197519761977197819791980198119821983198419851986198719881989199019911992199319941995199619971998199920002001200220032004200520062007200820092010201120122013201420152016201720182019202020212022202320242025202620272028202920302031203220332034203520362037203820392040204120422043204420452046204720482049205020512052205320542055205620572058205920602061206220632064206520662067206820692070207120722073207420752076207720782079208020812082208320842085208620872088208920902091209220932094209520962097209820992100210121022103210421052106210721082109211021112112211321142115211621172118211921202121212221232124212521262127212821292130213121322133213421352136213721382139214021412142214321442145214621472148214921502151215221532154215521562157215821592160216121622163216421652166216721682169217021712172217321742175217621772178217921802181218221832184218521862187218821892190219121922193219421952196219721982199220022012202220322042205220622072208220922102211221222132214221522162217221822192220222122222223222422252226222722282229223022312232223322342235223622372238223922402241224222432244224522462247224822492250225122522253225422552256225722582259226022612262226322642265226622672268226922702271227222732274227522762277227822792280228122822283228422852286228722882289229022912292229322942295229622972298229923002301230223032304230523062307230823092310231123122313231423152316231723182319232023212322232323242325232623272328232923302331233223332334233523362337233823392340234123422343234423452346234723482349235023512352235323542355235623572358235923602361236223632364236523662367236823692370237123722373237423752376237723782379238023812382238323842385238623872388238923902391239223932394239523962397239823992400240124022403240424052406240724082409241024112412241324142415241624172418241924202421242224232424242524262427242824292430243124322433243424352436243724382439244024412442244324442445244624472448244924502451245224532454245524562457245824592460246124622463246424652466246724682469247024712472247324742475247624772478247924802481248224832484248524862487248824892490249124922493249424952496249724982499250025012502250325042505250625072508250925102511251225132514251525162517251825192520252125222523252425252526252725282529253025312532253325342535253625372538253925402541254225432544254525462547254825492550255125522553255425552556255725582559256025612562256325642565256625672568256925702571257225732574257525762577257825792580258125822583258425852586258725882589259025912592259325942595259625972598259926002601260226032604260526062607260826092610261126122613261426152616261726182619262026212622262326242625262626272628262926302631263226332634263526362637263826392640264126422643264426452646264726482649265026512652265326542655265626572658265926602661266226632664266526662667266826692670267126722673267426752676267726782679268026812682268326842685268626872688268926902691269226932694269526962697269826992700270127022703270427052706270727082709271027112712271327142715271627172718271927202721272227232724272527262727272827292730273127322733273427352736273727382739274027412742274327442745274627472748274927502751275227532754275527562757275827592760276127622763276427652766276727682769277027712772277327742775277627772778277927802781278227832784278527862787278827892790279127922793279427952796279727982799280028012802280328042805280628072808280928102811281228132814281528162817281828192820282128222823282428252826282728282829283028312832283328342835283628372838283928402841284228432844284528462847284828492850285128522853285428552856285728582859286028612862286328642865286628672868286928702871287228732874287528762877287828792880288128822883288428852886288728882889289028912892289328942895289628972898289929002901290229032904290529062907290829092910291129122913291429152916291729182919292029212922292329242925292629272928292929302931293229332934293529362937293829392940294129422943294429452946294729482949295029512952295329542955295629572958295929602961296229632964296529662967296829692970297129722973297429752976297729782979298029812982298329842985298629872988298929902991299229932994299529962997299829993000300130023003300430053006300730083009301030113012301330143015301630173018301930203021302230233024302530263027302830293030303130323033303430353036303730383039304030413042304330443045304630473048304930503051305230533054305530563057305830593060306130623063306430653066306730683069307030713072307330743075307630773078307930803081308230833084308530863087308830893090309130923093309430953096309730983099310031013102310331043105310631073108310931103111311231133114311531163117311831193120312131223123312431253126312731283129313031313132313331343135313631373138313931403141314231433144314531463147314831493150315131523153315431553156315731583159316031613162316331643165316631673168316931703171317231733174317531763177317831793180318131823183318431853186318731883189319031913192319331943195319631973198319932003201320232033204320532063207320832093210321132123213321432153216321732183219322032213222322332243225322632273228322932303231323232333234323532363237323832393240324132423243324432453246324732483249325032513252325332543255325632573258325932603261326232633264326532663267326832693270327132723273327432753276327732783279328032813282328332843285328632873288328932903291329232933294329532963297329832993300330133023303330433053306330733083309331033113312331333143315331633173318331933203321332233233324332533263327332833293330333133323333333433353336333733383339334033413342334333443345334633473348334933503351335233533354335533563357335833593360336133623363336433653366336733683369337033713372337333743375337633773378337933803381338233833384338533863387338833893390339133923393339433953396339733983399340034013402340334043405340634073408340934103411341234133414341534163417341834193420342134223423342434253426342734283429343034313432343334343435343634373438343934403441344234433444344534463447344834493450345134523453345434553456345734583459346034613462346334643465346634673468346934703471347234733474347534763477347834793480348134823483348434853486348734883489349034913492349334943495349634973498349935003501350235033504350535063507350835093510351135123513351435153516351735183519352035213522352335243525352635273528352935303531353235333534353535363537353835393540354135423543354435453546354735483549355035513552355335543555355635573558355935603561356235633564356535663567356835693570357135723573357435753576357735783579358035813582358335843585358635873588358935903591359235933594359535963597359835993600360136023603360436053606360736083609361036113612361336143615361636173618361936203621362236233624362536263627362836293630363136323633363436353636363736383639364036413642364336443645364636473648364936503651365236533654365536563657365836593660366136623663366436653666366736683669367036713672367336743675367636773678367936803681368236833684368536863687368836893690369136923693369436953696369736983699370037013702370337043705370637073708370937103711371237133714371537163717371837193720372137223723372437253726372737283729373037313732373337343735373637373738373937403741374237433744374537463747374837493750375137523753375437553756375737583759376037613762376337643765376637673768376937703771377237733774377537763777377837793780378137823783378437853786378737883789379037913792379337943795379637973798379938003801380238033804380538063807380838093810381138123813381438153816381738183819382038213822382338243825382638273828382938303831383238333834383538363837383838393840384138423843384438453846384738483849385038513852385338543855385638573858385938603861386238633864386538663867386838693870387138723873387438753876387738783879388038813882388338843885388638873888388938903891389238933894389538963897389838993900390139023903390439053906390739083909391039113912391339143915391639173918391939203921392239233924392539263927392839293930393139323933393439353936393739383939394039413942394339443945394639473948394939503951395239533954395539563957395839593960396139623963396439653966396739683969397039713972397339743975397639773978397939803981398239833984398539863987398839893990399139923993399439953996399739983999400040014002400340044005400640074008400940104011401240134014401540164017401840194020402140224023402440254026402740284029403040314032403340344035403640374038403940404041404240434044404540464047404840494050405140524053405440554056405740584059406040614062406340644065406640674068406940704071407240734074407540764077407840794080408140824083408440854086408740884089409040914092409340944095409640974098409941004101410241034104410541064107410841094110411141124113411441154116411741184119412041214122412341244125412641274128412941304131413241334134413541364137413841394140414141424143414441454146414741484149415041514152415341544155415641574158415941604161416241634164416541664167416841694170417141724173417441754176417741784179418041814182418341844185418641874188418941904191419241934194419541964197419841994200420142024203420442054206420742084209421042114212421342144215421642174218421942204221422242234224422542264227422842294230423142324233423442354236423742384239424042414242424342444245424642474248424942504251425242534254425542564257425842594260426142624263426442654266426742684269427042714272427342744275427642774278427942804281428242834284428542864287428842894290429142924293429442954296429742984299430043014302430343044305430643074308430943104311431243134314431543164317431843194320432143224323432443254326432743284329433043314332433343344335433643374338433943404341434243434344434543464347434843494350435143524353435443554356435743584359436043614362436343644365436643674368436943704371437243734374437543764377437843794380438143824383438443854386438743884389439043914392439343944395439643974398439944004401440244034404440544064407440844094410441144124413441444154416441744184419442044214422442344244425442644274428442944304431443244334434443544364437443844394440444144424443444444454446444744484449445044514452445344544455445644574458445944604461446244634464446544664467446844694470447144724473447444754476447744784479448044814482448344844485448644874488448944904491449244934494449544964497449844994500450145024503450445054506450745084509451045114512451345144515451645174518451945204521452245234524452545264527452845294530453145324533453445354536453745384539454045414542454345444545454645474548454945504551455245534554455545564557455845594560456145624563456445654566456745684569457045714572457345744575457645774578457945804581458245834584458545864587458845894590459145924593459445954596459745984599460046014602460346044605460646074608460946104611461246134614461546164617461846194620462146224623462446254626462746284629463046314632463346344635463646374638463946404641464246434644464546464647464846494650465146524653465446554656465746584659466046614662466346644665466646674668466946704671467246734674467546764677467846794680468146824683468446854686468746884689469046914692469346944695469646974698469947004701470247034704470547064707470847094710471147124713471447154716471747184719472047214722472347244725472647274728472947304731473247334734473547364737473847394740474147424743474447454746474747484749475047514752475347544755475647574758475947604761476247634764476547664767476847694770477147724773477447754776477747784779478047814782478347844785478647874788478947904791479247934794479547964797479847994800480148024803480448054806480748084809481048114812481348144815481648174818481948204821482248234824482548264827482848294830483148324833483448354836483748384839484048414842484348444845484648474848484948504851485248534854485548564857485848594860486148624863486448654866486748684869487048714872487348744875487648774878487948804881488248834884488548864887488848894890489148924893489448954896489748984899490049014902490349044905490649074908490949104911491249134914491549164917491849194920492149224923492449254926492749284929493049314932493349344935493649374938493949404941494249434944494549464947494849494950495149524953495449554956495749584959496049614962496349644965496649674968496949704971497249734974497549764977497849794980498149824983498449854986498749884989499049914992499349944995499649974998499950005001500250035004500550065007500850095010501150125013501450155016501750185019502050215022502350245025502650275028502950305031503250335034503550365037503850395040504150425043504450455046504750485049505050515052505350545055505650575058505950605061506250635064506550665067506850695070507150725073507450755076507750785079508050815082508350845085508650875088508950905091509250935094509550965097509850995100510151025103510451055106510751085109511051115112511351145115511651175118511951205121512251235124512551265127512851295130513151325133513451355136513751385139514051415142514351445145514651475148514951505151515251535154515551565157515851595160516151625163516451655166516751685169517051715172517351745175517651775178517951805181518251835184518551865187518851895190519151925193519451955196519751985199520052015202520352045205520652075208520952105211521252135214521552165217521852195220522152225223522452255226522752285229523052315232523352345235523652375238523952405241524252435244524552465247524852495250525152525253525452555256525752585259526052615262526352645265526652675268526952705271527252735274527552765277527852795280528152825283528452855286528752885289529052915292529352945295529652975298529953005301530253035304530553065307530853095310531153125313531453155316531753185319532053215322532353245325532653275328532953305331533253335334533553365337533853395340534153425343534453455346534753485349535053515352535353545355535653575358535953605361536253635364536553665367536853695370537153725373537453755376537753785379538053815382538353845385538653875388538953905391539253935394539553965397539853995400540154025403540454055406540754085409541054115412541354145415541654175418541954205421542254235424542554265427542854295430543154325433543454355436543754385439544054415442544354445445544654475448544954505451545254535454545554565457545854595460546154625463546454655466546754685469547054715472547354745475547654775478547954805481548254835484548554865487548854895490549154925493549454955496549754985499
  1. /*##############################################################################
  2. HPCC SYSTEMS software Copyright (C) 2012 HPCC Systems®.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. ############################################################################## */
  13. #include "ws_workunitsService.hpp"
  14. #include "ws_fs.hpp"
  15. #include "jlib.hpp"
  16. #include "jflz.hpp"
  17. #include "daclient.hpp"
  18. #include "dadfs.hpp"
  19. #include "daaudit.hpp"
  20. #include "dautils.hpp"
  21. #include "exception_util.hpp"
  22. #include "wujobq.hpp"
  23. #include "eventqueue.hpp"
  24. #include "fileview.hpp"
  25. #include "hqlerror.hpp"
  26. #include "sacmd.hpp"
  27. #include "wuwebview.hpp"
  28. #include "portlist.h"
  29. #include "dllserver.hpp"
  30. #include "schedulectrl.hpp"
  31. #include "scheduleread.hpp"
  32. #include "dadfs.hpp"
  33. #include "dfuwu.hpp"
  34. #include "thorplugin.hpp"
  35. #include "roxiecontrol.hpp"
  36. #include "deftype.hpp"
  37. #include "thorcommon.hpp"
  38. #include "thorxmlwrite.hpp"
  39. #include "fvdatasource.hpp"
  40. #include "fvresultset.ipp"
  41. #include "ws_wudetails.hpp"
  42. #include "wuerror.hpp"
  43. #include "TpWrapper.hpp"
  44. #include "rtlformat.hpp"
  45. #include "package.h"
  46. #include "build-config.h"
  47. #ifdef _USE_ZLIB
  48. #include "zcrypt.hpp"
  49. #endif
  50. #define ESP_WORKUNIT_DIR "workunits/"
  51. static constexpr const char* zipFolder = "tempzipfiles" PATHSEPSTR;
  52. #define WU_SDS_LOCK_TIMEOUT (5*60*1000) // 5 mins
  53. const unsigned CHECK_QUERY_STATUS_THREAD_POOL_SIZE = 25;
  54. const unsigned MAX_ZAP_BUFFER_SIZE = 10000000; //10M
  55. class ExecuteExistingQueryInfo
  56. {
  57. public:
  58. ExecuteExistingQueryInfo(IConstWorkUnit *cw)
  59. {
  60. const char *name = cw->queryJobName();
  61. const char *div = strchr(name, '.');
  62. if (div)
  63. {
  64. queryset.set(name, div-name);
  65. query.set(div+1);
  66. }
  67. }
  68. public:
  69. StringAttr queryset;
  70. StringAttr query;
  71. };
  72. //The ECLWUActionNames[] has to match with the ESPenum ECLWUActions in the ecm file.
  73. static unsigned NumOfECLWUActionNames = 12;
  74. static const char *ECLWUActionNames[] = { "Abort", "Delete", "Deschedule", "Reschedule", "Pause",
  75. "PauseNow", "Protect", "Unprotect", "Restore", "Resume", "SetToFailed", "Archive", nullptr };
  76. class CECLWUActionsEx : public SoapEnumParamNew<CECLWUActions>
  77. {
  78. public:
  79. CECLWUActionsEx() : SoapEnumParamNew<CECLWUActions>() { init("ECLWUActions","string", ECLWUActionNames); }
  80. };
  81. static CECLWUActionsEx eclWUActionType;
  82. void setActionResult(const char* wuid, CECLWUActions action, const char* result, const char* strAction, IArrayOf<IConstWUActionResult>* results)
  83. {
  84. if (!results || !wuid || !*wuid || !result || !*result)
  85. return;
  86. Owned<IEspWUActionResult> res = createWUActionResult("", "");
  87. res->setWuid(wuid);
  88. res->setAction(strAction);
  89. res->setResult(result);
  90. results->append(*res.getClear());
  91. }
  92. bool doAction(IEspContext& context, StringArray& wuids, CECLWUActions action, IProperties* params, IArrayOf<IConstWUActionResult>* results)
  93. {
  94. if (!wuids.length())
  95. return true;
  96. if ((action == CECLWUActions_Restore) || (action == CECLWUActions_Archive))
  97. {
  98. StringBuffer msg;
  99. ForEachItemIn(i, wuids)
  100. {
  101. StringBuffer wuidStr(wuids.item(i));
  102. const char* wuid = wuidStr.trim().str();
  103. if (isEmpty(wuid))
  104. {
  105. msg.appendf("Empty Workunit ID at %u. ", i);
  106. continue;
  107. }
  108. if ((action == CECLWUActions_Archive) && !validateWsWorkunitAccess(context, wuid, SecAccess_Full))
  109. msg.appendf("Access denied for Workunit %s. ", wuid);
  110. }
  111. if (!msg.isEmpty())
  112. throw makeStringException(ECLWATCH_INVALID_INPUT, msg);
  113. Owned<ISashaCommand> cmd = archiveOrRestoreWorkunits(wuids, params, action == CECLWUActions_Archive, false);
  114. ForEachItemIn(idx, wuids)
  115. {
  116. StringBuffer reply;
  117. cmd->getId(idx, reply);
  118. const char* wuid = wuids.item(idx);
  119. if ((action == CECLWUActions_Restore) && !validateWsWorkunitAccess(context, wuid, SecAccess_Full))
  120. reply.appendf("Access denied for Workunit %s. ", wuid);
  121. AuditSystemAccess(context.queryUserId(), true, "%s", reply.str());
  122. }
  123. return true;
  124. }
  125. bool bAllSuccess = true;
  126. const char* strAction = (action < NumOfECLWUActionNames) ? ECLWUActionNames[action] : "Unknown Action";
  127. for(aindex_t i=0; i<wuids.length();i++)
  128. {
  129. StringBuffer wuidStr(wuids.item(i));
  130. const char* wuid = wuidStr.trim().str();
  131. if (isEmpty(wuid))
  132. {
  133. UWARNLOG("Empty Workunit ID");
  134. continue;
  135. }
  136. try
  137. {
  138. if (!looksLikeAWuid(wuid, 'W'))
  139. throw MakeStringException(ECLWATCH_INVALID_INPUT, "Invalid Workunit ID: %s", wuid);
  140. PROGLOG("%s %s", strAction, wuid);
  141. if (action == CECLWUActions_EventDeschedule)
  142. {
  143. if (!context.validateFeatureAccess(OWN_WU_ACCESS, SecAccess_Full, false)
  144. || !context.validateFeatureAccess(OTHERS_WU_ACCESS, SecAccess_Full, false))
  145. ensureWsWorkunitAccess(context, wuid, SecAccess_Full);
  146. descheduleWorkunit(wuid);
  147. AuditSystemAccess(context.queryUserId(), true, "Updated %s", wuid);
  148. }
  149. else
  150. {
  151. Owned<IWorkUnitFactory> factory = getWorkUnitFactory(context.querySecManager(), context.queryUser());
  152. Owned<IConstWorkUnit> cw = factory->openWorkUnit(wuid);
  153. if(!cw)
  154. throw MakeStringException(ECLWATCH_CANNOT_OPEN_WORKUNIT,"Cannot open workunit %s.",wuid);
  155. if ((action == CECLWUActions_Delete) && (cw->getState() == WUStateWait))
  156. throw MakeStringException(ECLWATCH_CANNOT_DELETE_WORKUNIT,"Cannot delete a workunit which is in a 'Wait' status.");
  157. switch(action)
  158. {
  159. case CECLWUActions_Pause:
  160. {
  161. ensureWsWorkunitAccess(context, *cw, SecAccess_Full);
  162. WorkunitUpdate wu(&cw->lock());
  163. wu->setAction(WUActionPause);
  164. break;
  165. }
  166. case CECLWUActions_PauseNow:
  167. {
  168. ensureWsWorkunitAccess(context, *cw, SecAccess_Full);
  169. WorkunitUpdate wu(&cw->lock());
  170. wu->setAction(WUActionPauseNow);
  171. break;
  172. }
  173. case CECLWUActions_Resume:
  174. {
  175. ensureWsWorkunitAccess(context, *cw, SecAccess_Full);
  176. WorkunitUpdate wu(&cw->lock());
  177. wu->setAction(WUActionResume);
  178. break;
  179. }
  180. case CECLWUActions_Delete:
  181. ensureWsWorkunitAccess(context, *cw, SecAccess_Full);
  182. {
  183. cw.clear();
  184. factory->deleteWorkUnitEx(wuid, true),
  185. AuditSystemAccess(context.queryUserId(), true, "Deleted %s", wuid);
  186. }
  187. break;
  188. case CECLWUActions_Abort:
  189. ensureWsWorkunitAccess(context, *cw, SecAccess_Full);
  190. {
  191. if (cw->getState() == WUStateWait)
  192. {
  193. WorkunitUpdate wu(&cw->lock());
  194. wu->deschedule();
  195. wu->setState(WUStateAborted);
  196. }
  197. else
  198. abortWorkUnit(wuid, context.querySecManager(), context.queryUser());
  199. AuditSystemAccess(context.queryUserId(), true, "Aborted %s", wuid);
  200. }
  201. break;
  202. case CECLWUActions_Protect:
  203. case CECLWUActions_Unprotect:
  204. ensureWsWorkunitAccess(context, *cw, SecAccess_Write);
  205. cw->protect((action == CECLWUActions_Protect) ? true:false);
  206. AuditSystemAccess(context.queryUserId(), true, "Updated %s", wuid);
  207. break;
  208. case CECLWUActions_SetToFailed:
  209. {
  210. ensureWsWorkunitAccess(context, *cw, SecAccess_Write);
  211. WorkunitUpdate wu(&cw->lock());
  212. wu->setState(WUStateFailed);
  213. AuditSystemAccess(context.queryUserId(), true, "Updated %s", wuid);
  214. }
  215. break;
  216. case CECLWUActions_EventReschedule:
  217. {
  218. ensureWsWorkunitAccess(context, *cw, SecAccess_Full);
  219. WorkunitUpdate wu(&cw->lock());
  220. wu->schedule();
  221. AuditSystemAccess(context.queryUserId(), true, "Updated %s", wuid);
  222. }
  223. break;
  224. }
  225. }
  226. PROGLOG("%s %s done", strAction, wuid);
  227. setActionResult(wuid, action, "Success", strAction, results);
  228. }
  229. catch (IException *e)
  230. {
  231. bAllSuccess = false;
  232. StringBuffer eMsg;
  233. StringBuffer failedMsg("Failed: ");
  234. setActionResult(wuid, action, failedMsg.append(e->errorMessage(eMsg)).str(), strAction, results);
  235. OWARNLOG("Failed to %s for workunit: %s, %s", strAction, wuid, eMsg.str());
  236. AuditSystemAccess(context.queryUserId(), false, "Failed to %s %s", strAction, wuid);
  237. e->Release();
  238. continue;
  239. }
  240. catch (...)
  241. {
  242. bAllSuccess = false;
  243. StringBuffer failedMsg;
  244. failedMsg.appendf("Unknown exception");
  245. setActionResult(wuid, action, failedMsg.str(), strAction, results);
  246. IWARNLOG("Failed to %s for workunit: %s, %s", strAction, wuid, failedMsg.str());
  247. AuditSystemAccess(context.queryUserId(), false, "Failed to %s %s", strAction, wuid);
  248. continue;
  249. }
  250. }
  251. int timeToWait = 0;
  252. if (params)
  253. timeToWait = params->getPropInt("BlockTillFinishTimer");
  254. if (timeToWait != 0)
  255. {
  256. for(aindex_t i=0; i<wuids.length();i++)
  257. {
  258. const char* wuid=wuids.item(i);
  259. if (isEmpty(wuid))
  260. continue;
  261. waitForWorkUnitToComplete(wuid, timeToWait);
  262. }
  263. }
  264. return bAllSuccess;
  265. }
  266. bool doProtectWorkunits(IEspContext& context, StringArray& wuids, IArrayOf<IConstWUActionResult>* results)
  267. {
  268. Owned<IProperties> params(createProperties(true));
  269. params->setProp("BlockTillFinishTimer", 0);
  270. return doAction(context, wuids, CECLWUActions_Protect, params, results);
  271. }
  272. bool doUnProtectWorkunits(IEspContext& context, StringArray& wuids, IArrayOf<IConstWUActionResult>* results)
  273. {
  274. Owned<IProperties> params(createProperties(true));
  275. params->setProp("BlockTillFinishTimer", 0);
  276. return doAction(context, wuids, CECLWUActions_Unprotect, params, results);
  277. }
  278. static void checkUpdateQuerysetLibraries()
  279. {
  280. Owned<IRemoteConnection> globalLock = querySDS().connect("/QuerySets/", myProcessSession(), RTM_LOCK_WRITE|RTM_CREATE_QUERY, WU_SDS_LOCK_TIMEOUT);
  281. if (!globalLock)
  282. return;
  283. IPropertyTree *root = globalLock->queryRoot();
  284. if (!root)
  285. return;
  286. Owned<IWorkUnitFactory> factory = getWorkUnitFactory();
  287. Owned<IPropertyTreeIterator> querySets = root->getElements("QuerySet");
  288. ForEach(*querySets)
  289. {
  290. IPropertyTree &querySet = querySets->query();
  291. if (querySet.hasProp("@updatedLibraries")) //only need to do this once, then publish and copy will keep up to date
  292. continue;
  293. Owned<IPropertyTreeIterator> queries = querySet.getElements("Query");
  294. ForEach(*queries)
  295. {
  296. IPropertyTree &query = queries->query();
  297. if (query.hasProp("@libCount"))
  298. continue;
  299. const char *wuid = query.queryProp("@wuid");
  300. if (!wuid || !*wuid)
  301. continue;
  302. Owned<IConstWorkUnit> cw = factory->openWorkUnit(wuid);
  303. if (!cw)
  304. continue;
  305. checkAddLibrariesToQueryEntry(&query, cw);
  306. }
  307. querySet.setPropBool("@updatedLibraries", true);
  308. }
  309. }
  310. void CWsWorkunitsEx::init(IPropertyTree *cfg, const char *process, const char *service)
  311. {
  312. if (!daliClientActive())
  313. {
  314. OERRLOG("No Dali Connection Active.");
  315. throw MakeStringException(-1, "No Dali Connection Active. Please Specify a Dali to connect to in you configuration file");
  316. }
  317. DBGLOG("Initializing %s service [process = %s]", service, process);
  318. checkUpdateQuerysetLibraries();
  319. refreshValidClusters();
  320. daliServers.set(cfg->queryProp("Software/EspProcess/@daliServers"));
  321. const char *computer = cfg->queryProp("Software/EspProcess/@computer");
  322. if (daliServers.isEmpty() || !computer || streq(computer, "localhost")) //otherwise can't assume environment "." netAddresses are the same as my address
  323. queryHostIP().getIpText(envLocalAddress);
  324. else
  325. {
  326. //a bit weird, but other netAddresses in the environment are not the same localhost as this server
  327. //use the address of the DALI
  328. const char *finger = daliServers.get();
  329. while (*finger && !strchr(":;,", *finger))
  330. envLocalAddress.append(*finger++);
  331. }
  332. awusCacheMinutes = AWUS_CACHE_MIN_DEFAULT;
  333. VStringBuffer xpath("Software/EspProcess[@name=\"%s\"]/EspService[@name=\"%s\"]/AWUsCacheMinutes", process, service);
  334. cfg->getPropInt(xpath.str(), awusCacheMinutes);
  335. xpath.setf("Software/EspProcess[@name=\"%s\"]/@PageCacheTimeoutSeconds", process);
  336. if (cfg->hasProp(xpath.str()))
  337. setPageCacheTimeoutMilliSeconds(cfg->getPropInt(xpath.str()));
  338. xpath.setf("Software/EspProcess[@name=\"%s\"]/@MaxPageCacheItems", process);
  339. if (cfg->hasProp(xpath.str()))
  340. setMaxPageCacheItems(cfg->getPropInt(xpath.str()));
  341. xpath.setf("Software/EspProcess[@name=\"%s\"]/EspService[@name=\"%s\"]/serverForArchivedECLWU/@netAddress", process, service);
  342. if (cfg->hasProp(xpath.str()))
  343. {
  344. sashaServerIp.set(cfg->queryProp(xpath.str()));
  345. xpath.setf("Software/EspProcess[@name=\"%s\"]/EspService[@name=\"%s\"]/serverForArchivedECLWU/@port", process, service);
  346. sashaServerPort = cfg->getPropInt(xpath.str(), DEFAULT_SASHA_PORT);
  347. }
  348. xpath.setf("Software/EspProcess[@name=\"%s\"]/EspService[@name=\"%s\"]/ThorSlaveLogThreadPoolSize", process, service);
  349. thorSlaveLogThreadPoolSize = cfg->getPropInt(xpath, THOR_SLAVE_LOG_THREAD_POOL_SIZE);
  350. xpath.setf("Software/EspProcess[@name=\"%s\"]/EspService[@name=\"%s\"]/WUResultMaxSizeMB", process, service);
  351. unsigned wuResultMaxSizeMB = cfg->getPropInt(xpath);
  352. if (wuResultMaxSizeMB > 0)
  353. wuResultMaxSize = wuResultMaxSizeMB * 1000000;
  354. xpath.setf("Software/EspProcess[@name=\"%s\"]/EspService[@name=\"%s\"]/ZAPEmail", process, service);
  355. IPropertyTree *zapEmail = cfg->queryPropTree(xpath.str());
  356. if (zapEmail)
  357. {
  358. zapEmailTo = zapEmail->queryProp("@to");
  359. if (zapEmailTo.isEmpty())
  360. throw MakeStringException(-1, "ZAPEmail: EmailTo not specified.");
  361. zapEmailFrom = zapEmail->queryProp("@from");
  362. if (zapEmailFrom.isEmpty())
  363. throw MakeStringException(-1, "ZAPEmail: EmailFrom not specified.");
  364. zapEmailServer = zapEmail->queryProp("@serverURL");
  365. if (zapEmailServer.isEmpty())
  366. throw MakeStringException(-1, "ZAPEmail: EmailServer not specified.");
  367. zapEmailServerPort = zapEmail->getPropInt("@serverPort", WUDEFAULT_ZAPEMAILSERVER_PORT);
  368. zapEmailMaxAttachmentSize = zapEmail->getPropInt("@maxAttachmentSize", MAX_ZAP_BUFFER_SIZE);
  369. }
  370. maxRequestEntityLength = cfg->getPropInt("Software[1]/EspProcess[1]/EspProtocol[@type='http_protocol'][1]/@maxRequestEntityLength");
  371. directories.set(cfg->queryPropTree("Software/Directories"));
  372. const char *name = cfg->queryProp("Software/EspProcess/@name");
  373. getConfigurationDirectory(directories, "query", "esp", name ? name : "esp", queryDirectory);
  374. recursiveCreateDirectory(queryDirectory.str());
  375. dataCache.setown(new DataCache(DATA_SIZE));
  376. archivedWuCache.setown(new ArchivedWuCache(AWUS_CACHE_SIZE));
  377. wuArchiveCache.setown(new WUArchiveCache(WUARCHIVE_CACHE_SIZE));
  378. //Create a folder for temporarily holding gzip files by WUResultBin()
  379. Owned<IFile> tmpdir = createIFile(TEMPZIPDIR);
  380. if(!tmpdir->exists())
  381. tmpdir->createDirectory();
  382. recursiveCreateDirectory(ESP_WORKUNIT_DIR);
  383. getConfigurationDirectory(directories, "data", "esp", process, dataDirectory);
  384. wuFactory.setown(getWorkUnitFactory());
  385. m_sched.start();
  386. filesInUse.subscribe();
  387. //Start thread pool
  388. xpath.setf("Software/EspProcess[@name=\"%s\"]/EspService[@name=\"%s\"]/ClusterQueryStateThreadPoolSize", process, service);
  389. Owned<CClusterQueryStateThreadFactory> threadFactory = new CClusterQueryStateThreadFactory();
  390. clusterQueryStatePool.setown(createThreadPool("CheckAndSetClusterQueryState Thread Pool", threadFactory, NULL,
  391. cfg->getPropInt(xpath.str(), CHECK_QUERY_STATUS_THREAD_POOL_SIZE)));
  392. }
  393. void CWsWorkunitsEx::refreshValidClusters()
  394. {
  395. validClusters.kill();
  396. #ifdef _CONTAINERIZED
  397. // discovered from generated cluster names
  398. Owned<IPropertyTreeIterator> iter = queryComponentConfig().getElements("queues");
  399. ForEach(*iter)
  400. {
  401. IPropertyTree &queue = iter->query();
  402. const char *qName = queue.queryProp("@name");
  403. bool* found = validClusters.getValue(qName);
  404. if (!found || !*found)
  405. {
  406. validClusters.setValue(qName, true);
  407. PROGLOG("adding valid cluster: %s", qName);
  408. }
  409. }
  410. #else
  411. Owned<IStringIterator> it = getTargetClusters(NULL, NULL);
  412. ForEach(*it)
  413. {
  414. SCMStringBuffer s;
  415. IStringVal &val = it->str(s);
  416. bool* found = validClusters.getValue(val.str());
  417. if (!found || !*found)
  418. validClusters.setValue(val.str(), true);
  419. }
  420. #endif
  421. }
  422. bool CWsWorkunitsEx::isValidCluster(const char *cluster)
  423. {
  424. if (!cluster || !*cluster)
  425. return false;
  426. CriticalBlock block(crit);
  427. bool* found = validClusters.getValue(cluster);
  428. if (found && *found)
  429. return true;
  430. if (validateTargetClusterName(cluster))
  431. {
  432. refreshValidClusters();
  433. return true;
  434. }
  435. return false;
  436. }
  437. bool CWsWorkunitsEx::onWUCreate(IEspContext &context, IEspWUCreateRequest &req, IEspWUCreateResponse &resp)
  438. {
  439. try
  440. {
  441. ensureWsCreateWorkunitAccess(context);
  442. NewWsWorkunit wu(context);
  443. resp.updateWorkunit().setWuid(wu->queryWuid());
  444. AuditSystemAccess(context.queryUserId(), true, "Updated %s", wu->queryWuid());
  445. }
  446. catch(IException* e)
  447. {
  448. FORWARDEXCEPTION(context, e, ECLWATCH_INTERNAL_ERROR);
  449. }
  450. return true;
  451. }
  452. bool origValueChanged(const char *newValue, const char *origValue, StringBuffer &s, bool nillable)
  453. {
  454. if (!nillable && isEmpty(newValue))
  455. return false;
  456. if(newValue && origValue)
  457. {
  458. if (!streq(origValue, newValue))
  459. {
  460. s.append(newValue).trim();
  461. return true;
  462. }
  463. return false;
  464. }
  465. if (newValue)
  466. {
  467. s.append(newValue).trim();
  468. return true;
  469. }
  470. return (origValue!=NULL);
  471. }
  472. bool CWsWorkunitsEx::onWUUpdate(IEspContext &context, IEspWUUpdateRequest &req, IEspWUUpdateResponse &resp)
  473. {
  474. try
  475. {
  476. StringBuffer wuid(req.getWuid());
  477. WsWuHelpers::checkAndTrimWorkunit("WUUpdate", wuid);
  478. ensureWsWorkunitAccess(context, wuid.str(), SecAccess_Write);
  479. Owned<IWorkUnitFactory> factory = getWorkUnitFactory(context.querySecManager(), context.queryUser());
  480. Owned<IConstWorkUnit> cw = factory->openWorkUnit(wuid.str());
  481. if(!cw)
  482. throw MakeStringException(ECLWATCH_CANNOT_UPDATE_WORKUNIT,"Cannot open workunit %s.",wuid.str());
  483. PROGLOG("WUUpdate: %s", wuid.str());
  484. if(req.getProtected() != req.getProtectedOrig())
  485. {
  486. cw->protect(req.getProtected());
  487. cw.clear();
  488. cw.setown(factory->openWorkUnit(wuid.str()));
  489. if(!cw)
  490. throw MakeStringException(ECLWATCH_CANNOT_UPDATE_WORKUNIT,"Cannot open workunit %s.",wuid.str());
  491. }
  492. if ((req.getState() == WUStateRunning)||(req.getState() == WUStateDebugPaused)||(req.getState() == WUStateDebugRunning))
  493. {
  494. WsWuInfo winfo(context, cw);
  495. winfo.getInfo(resp.updateWorkunit(), WUINFO_All);
  496. resp.setRedirectUrl(StringBuffer("/WsWorkunits/WUInfo?Wuid=").append(wuid).str());
  497. AuditSystemAccess(context.queryUserId(), true, "Updated %s", wuid.str());
  498. return true;
  499. }
  500. WorkunitUpdate wu(&cw->lock());
  501. if(!req.getState_isNull() && (req.getStateOrig_isNull() || req.getState() != req.getStateOrig()))
  502. {
  503. if (!req.getStateOrig_isNull() && cw->getState() != (WUState) req.getStateOrig())
  504. throw MakeStringException(ECLWATCH_CANNOT_UPDATE_WORKUNIT, "Cannot update workunit %s because its state has been changed internally. Please refresh the page and try again.", wuid.str());
  505. WUState state = (WUState) req.getState();
  506. if(state < WUStateSize)
  507. wu->setState(state);
  508. }
  509. StringBuffer s;
  510. if (origValueChanged(req.getJobname(), req.getJobnameOrig(), s))
  511. wu->setJobName(s.trim().str());
  512. if (origValueChanged(req.getDescription(), req.getDescriptionOrig(), s.clear()))
  513. wu->setDebugValue("description", (req.getDescription() && *req.getDescription()) ? s.trim().str() : NULL, true);
  514. double version = context.getClientVersion();
  515. if (version > 1.04)
  516. {
  517. if (origValueChanged(req.getClusterSelection(), req.getClusterOrig(), s.clear(), false))
  518. {
  519. if (!isValidCluster(s.str()))
  520. throw MakeStringException(ECLWATCH_INVALID_CLUSTER_NAME, "Invalid cluster name: %s", s.str());
  521. if (req.getState() == WUStateBlocked)
  522. switchWorkUnitQueue(wu.get(), s.str());
  523. else if ((req.getState() != WUStateSubmitted) && (req.getState() != WUStateRunning) && (req.getState() != WUStateDebugPaused) && (req.getState() != WUStateDebugRunning))
  524. wu->setClusterName(s.str());
  525. }
  526. }
  527. WsWuHelpers::setXmlParameters(wu, req.getXmlParams(), (req.getAction()==WUActionExecuteExisting));
  528. if (notEmpty(req.getQueryText()))
  529. {
  530. Owned<IWUQuery> query=wu->updateQuery();
  531. query->setQueryText(req.getQueryText());
  532. }
  533. if (version > 1.34 && notEmpty(req.getQueryMainDefinition()))
  534. {
  535. Owned<IWUQuery> query=wu->updateQuery();
  536. query->setQueryMainDefinition(req.getQueryMainDefinition());
  537. }
  538. if (!req.getResultLimit_isNull())
  539. wu->setResultLimit(req.getResultLimit());
  540. if (!req.getAction_isNull())
  541. {
  542. WUAction action = (WUAction) req.getAction();
  543. if(action < WUActionSize)
  544. wu->setAction(action);
  545. }
  546. if (!req.getPriorityClass_isNull())
  547. {
  548. WUPriorityClass priority = (WUPriorityClass) req.getPriorityClass();
  549. if(priority<PriorityClassSize)
  550. wu->setPriority(priority);
  551. }
  552. if (!req.getPriorityLevel_isNull())
  553. wu->setPriorityLevel(req.getPriorityLevel());
  554. if (origValueChanged(req.getScope(), req.getScopeOrig(), s.clear(), false))
  555. wu->setWuScope(s.str());
  556. ForEachItemIn(di, req.getDebugValues())
  557. {
  558. IConstDebugValue& item=req.getDebugValues().item(di);
  559. const char *debugName = item.getName();
  560. if (notEmpty(debugName))
  561. {
  562. StringBuffer expanded;
  563. if (*debugName=='-')
  564. debugName=expanded.append("eclcc").append(debugName).str();
  565. wu->setDebugValue(debugName, item.getValue(), true);
  566. }
  567. }
  568. ForEachItemIn(ai, req.getApplicationValues())
  569. {
  570. IConstApplicationValue& item=req.getApplicationValues().item(ai);
  571. if(notEmpty(item.getApplication()) && notEmpty(item.getName()))
  572. wu->setApplicationValue(item.getApplication(), item.getName(), item.getValue(), true);
  573. }
  574. wu->commit();
  575. wu.clear();
  576. WsWuInfo winfo(context, cw);
  577. winfo.getInfo(resp.updateWorkunit(), WUINFO_All);
  578. StringBuffer thorSlaveIP;
  579. if (version > 1.24 && notEmpty(req.getThorSlaveIP()))
  580. thorSlaveIP = req.getThorSlaveIP();
  581. if (thorSlaveIP.length() > 0)
  582. {
  583. StringBuffer url;
  584. url.appendf("/WsWorkunits/WUInfo?Wuid=%s&ThorSlaveIP=%s", wuid.str(), thorSlaveIP.str());
  585. resp.setRedirectUrl(url.str());
  586. }
  587. else
  588. resp.setRedirectUrl(StringBuffer("/WsWorkunits/WUInfo?Wuid=").append(wuid).str());
  589. AuditSystemAccess(context.queryUserId(), true, "Updated %s", wuid.str());
  590. }
  591. catch(IException* e)
  592. {
  593. FORWARDEXCEPTION(context, e, ECLWATCH_INTERNAL_ERROR);
  594. }
  595. return true;
  596. }
  597. bool CWsWorkunitsEx::onWUCreateAndUpdate(IEspContext &context, IEspWUUpdateRequest &req, IEspWUUpdateResponse &resp)
  598. {
  599. try
  600. {
  601. const char* wuid = req.getWuid();
  602. if (!wuid || !*wuid)
  603. {
  604. ensureWsCreateWorkunitAccess(context);
  605. NewWsWorkunit wu(context);
  606. req.setWuid(wu->queryWuid());
  607. }
  608. }
  609. catch(IException* e)
  610. {
  611. FORWARDEXCEPTION(context, e, ECLWATCH_INTERNAL_ERROR);
  612. }
  613. return onWUUpdate(context, req, resp);
  614. }
  615. static inline StringBuffer &appendUrlParameter(StringBuffer &url, const char *name, const char *value, bool &first)
  616. {
  617. if (notEmpty(value))
  618. {
  619. url.append(first ? '?' : '&').append(name).append('=').append(value);
  620. first=false;
  621. }
  622. return url;
  623. }
  624. bool CWsWorkunitsEx::onWUAction(IEspContext &context, IEspWUActionRequest &req, IEspWUActionResponse &resp)
  625. {
  626. try
  627. {
  628. CECLWUActions action;
  629. double version = context.getClientVersion();
  630. if (version >= 1.57)
  631. action = req.getWUActionType();
  632. else
  633. action = eclWUActionType.toEnum(req.getActionType());
  634. if (action == ECLWUActions_Undefined)
  635. throw MakeStringException(ECLWATCH_INVALID_INPUT,"Action not defined.");
  636. Owned<IProperties> params = createProperties(true);
  637. params->setProp("BlockTillFinishTimer", req.getBlockTillFinishTimer());
  638. if (((action == CECLWUActions_Restore) || (action == CECLWUActions_Archive)) && !sashaServerIp.isEmpty())
  639. {
  640. params->setProp("sashaServerIP", sashaServerIp.get());
  641. params->setProp("sashaServerPort", sashaServerPort);
  642. }
  643. IArrayOf<IConstWUActionResult> results;
  644. if (doAction(context, req.getWuids(), action, params, &results) && (action != CECLWUActions_Delete) && checkRedirect(context))
  645. {
  646. StringBuffer redirect;
  647. if(req.getPageFrom() && strieq(req.getPageFrom(), "wuid"))
  648. redirect.append("/WsWorkunits/WUInfo?Wuid=").append(req.getWuids().item(0));
  649. else if (req.getPageFrom() && strieq(req.getPageFrom(), "scheduler"))
  650. {
  651. redirect.set("/WsWorkunits/WUShowScheduled");
  652. bool first=true;
  653. appendUrlParameter(redirect, "Cluster", req.getEventServer(), first);
  654. appendUrlParameter(redirect, "EventName", req.getEventName(), first);
  655. }
  656. else
  657. {
  658. redirect.append("/WsWorkunits/WUQuery");
  659. bool first=true;
  660. appendUrlParameter(redirect, "PageSize", req.getPageSize(), first);
  661. appendUrlParameter(redirect, "PageStartFrom", req.getCurrentPage(), first);
  662. appendUrlParameter(redirect, "Sortby", req.getSortby(), first);
  663. appendUrlParameter(redirect, "Descending", req.getDescending() ? "1" : "0", first);
  664. appendUrlParameter(redirect, "State", req.getState(), first);
  665. appendUrlParameter(redirect, "Cluster", req.getCluster(), first);
  666. appendUrlParameter(redirect, "Owner", req.getOwner(), first);
  667. appendUrlParameter(redirect, "StartDate", req.getStartDate(), first);
  668. appendUrlParameter(redirect, "EndDate", req.getEndDate(), first);
  669. appendUrlParameter(redirect, "ECL", req.getECL(), first);
  670. appendUrlParameter(redirect, "Jobname", req.getJobname(), first);
  671. }
  672. resp.setRedirectUrl(redirect.str());
  673. }
  674. else
  675. resp.setActionResults(results);
  676. }
  677. catch(IException* e)
  678. {
  679. FORWARDEXCEPTION(context, e, ECLWATCH_INTERNAL_ERROR);
  680. }
  681. return true;
  682. }
  683. bool CWsWorkunitsEx::onWUDelete(IEspContext &context, IEspWUDeleteRequest &req, IEspWUDeleteResponse &resp)
  684. {
  685. try
  686. {
  687. IArrayOf<IConstWUActionResult> results;
  688. Owned<IProperties> params = createProperties(true);
  689. params->setProp("BlockTillFinishTimer", req.getBlockTillFinishTimer());
  690. if (!doAction(context,req.getWuids(), CECLWUActions_Delete, params, &results))
  691. resp.setActionResults(results);
  692. }
  693. catch(IException* e)
  694. {
  695. FORWARDEXCEPTION(context, e, ECLWATCH_INTERNAL_ERROR);
  696. }
  697. return true;
  698. }
  699. bool CWsWorkunitsEx::onWUAbort(IEspContext &context, IEspWUAbortRequest &req, IEspWUAbortResponse &resp)
  700. {
  701. try
  702. {
  703. IArrayOf<IConstWUActionResult> results;
  704. Owned<IProperties> params = createProperties(true);
  705. params->setProp("BlockTillFinishTimer", req.getBlockTillFinishTimer());
  706. if (!doAction(context,req.getWuids(), CECLWUActions_Abort, params, &results))
  707. resp.setActionResults(results);
  708. }
  709. catch(IException* e)
  710. {
  711. FORWARDEXCEPTION(context, e, ECLWATCH_INTERNAL_ERROR);
  712. }
  713. return true;
  714. }
  715. bool CWsWorkunitsEx::onWUProtect(IEspContext &context, IEspWUProtectRequest &req, IEspWUProtectResponse &resp)\
  716. {
  717. try
  718. {
  719. IArrayOf<IConstWUActionResult> results;
  720. Owned<IProperties> params(createProperties(true));
  721. params->setProp("BlockTillFinishTimer", 0);
  722. CECLWUActions action = req.getProtect() ? CECLWUActions_Protect : CECLWUActions_Unprotect;
  723. if (!doAction(context,req.getWuids(), action, params, &results))
  724. resp.setActionResults(results);
  725. }
  726. catch(IException* e)
  727. {
  728. FORWARDEXCEPTION(context, e, ECLWATCH_INTERNAL_ERROR);
  729. }
  730. return true;
  731. }
  732. bool CWsWorkunitsEx::onWUResubmit(IEspContext &context, IEspWUResubmitRequest &req, IEspWUResubmitResponse &resp)
  733. {
  734. try
  735. {
  736. Owned<IMultiException> me = MakeMultiException();
  737. StringAttr wuid;
  738. StringArray wuids;
  739. double version = context.getClientVersion();
  740. IArrayOf<IEspResubmittedWU> resubmittedWUs;
  741. for(aindex_t i=0; i<req.getWuids().length();i++)
  742. {
  743. StringBuffer requestWuid(req.getWuids().item(i));
  744. WsWuHelpers::checkAndTrimWorkunit("WUResubmit", requestWuid);
  745. ensureWsWorkunitAccess(context, requestWuid.str(), SecAccess_Write);
  746. PROGLOG("WUResubmit: %s", requestWuid.str());
  747. wuid.set(requestWuid.str());
  748. try
  749. {
  750. Owned<IWorkUnitFactory> factory = getWorkUnitFactory(context.querySecManager(), context.queryUser());
  751. if(req.getCloneWorkunit() || req.getRecompile())
  752. {
  753. Owned<IConstWorkUnit> src(factory->openWorkUnit(wuid.str()));
  754. NewWsWorkunit wu(factory, context);
  755. wuid.set(wu->queryWuid());
  756. queryExtendedWU(wu)->copyWorkUnit(src, false, false);
  757. }
  758. wuids.append(wuid.str());
  759. Owned<IConstWorkUnit> cw(factory->openWorkUnit(wuid.str()));
  760. if(!cw)
  761. throw MakeStringException(ECLWATCH_CANNOT_OPEN_WORKUNIT,"Cannot open workunit %s.",wuid.str());
  762. WsWuHelpers::submitWsWorkunit(context, cw, NULL, NULL, 0, req.getRecompile(), req.getResetWorkflow(), false);
  763. if (version < 1.40)
  764. continue;
  765. Owned<IEspResubmittedWU> resubmittedWU = createResubmittedWU();
  766. resubmittedWU->setWUID(wuid.str());
  767. if (!streq(requestWuid.str(), wuid.str()))
  768. resubmittedWU->setParentWUID(requestWuid.str());
  769. resubmittedWUs.append(*resubmittedWU.getClear());
  770. }
  771. catch (IException *E)
  772. {
  773. me->append(*E);
  774. }
  775. catch (...)
  776. {
  777. me->append(*MakeStringException(0,"Unknown exception submitting %s",wuid.str()));
  778. }
  779. }
  780. if(me->ordinality())
  781. throw me.getLink();
  782. int timeToWait = req.getBlockTillFinishTimer();
  783. if (timeToWait != 0)
  784. {
  785. for(aindex_t i=0; i<wuids.length(); i++)
  786. waitForWorkUnitToComplete(wuids.item(i), timeToWait);
  787. }
  788. if (version >= 1.40)
  789. resp.setWUs(resubmittedWUs);
  790. if(wuids.length()==1)
  791. {
  792. resp.setRedirectUrl(StringBuffer("/WsWorkunits/WUInfo?Wuid=").append(wuids.item(0)));
  793. }
  794. }
  795. catch(IException* e)
  796. {
  797. FORWARDEXCEPTION(context, e, ECLWATCH_INTERNAL_ERROR);
  798. }
  799. return true;
  800. }
  801. bool CWsWorkunitsEx::onWUPushEvent(IEspContext &context, IEspWUPushEventRequest &req, IEspWUPushEventResponse &resp)
  802. {
  803. try
  804. {
  805. const char *name = req.getEventName();
  806. const char *text = req.getEventText();
  807. const char *target = NULL;
  808. if (notEmpty(name) && notEmpty(text))
  809. {
  810. PROGLOG("WUPushEvent: EventName %s, EventText %s", name, text);
  811. Owned<IScheduleEventPusher> pusher(getScheduleEventPusher());
  812. pusher->push(name, text, target);
  813. StringBuffer redirect("/WsWorkunits/WUShowScheduled");
  814. bool first=true;
  815. appendUrlParameter(redirect, "PushEventName", name, first);
  816. appendUrlParameter(redirect, "PushEventText", text, first);
  817. resp.setRedirectUrl(redirect.str());
  818. return true;
  819. }
  820. }
  821. catch(IException* e)
  822. {
  823. FORWARDEXCEPTION(context, e, ECLWATCH_INTERNAL_ERROR);
  824. }
  825. return false;
  826. }
  827. bool CWsWorkunitsEx::onWUSchedule(IEspContext &context, IEspWUScheduleRequest &req, IEspWUScheduleResponse &resp)
  828. {
  829. try
  830. {
  831. StringBuffer wuid(req.getWuid());
  832. WsWuHelpers::checkAndTrimWorkunit("WUSchedule", wuid);
  833. const char* cluster = req.getCluster();
  834. if (isEmpty(cluster))
  835. throw MakeStringException(ECLWATCH_INVALID_INPUT,"No Cluster defined.");
  836. if (!isValidCluster(cluster))
  837. throw MakeStringException(ECLWATCH_INVALID_CLUSTER_NAME, "Invalid cluster name: %s", cluster);
  838. Owned<IWorkUnitFactory> factory = getWorkUnitFactory(context.querySecManager(), context.queryUser());
  839. WorkunitUpdate wu(factory->updateWorkUnit(wuid.str()));
  840. ensureWsWorkunitAccess(context, *wu.get(), SecAccess_Write);
  841. switch(wu->getState())
  842. {
  843. case WUStateDebugPaused:
  844. case WUStateDebugRunning:
  845. case WUStateRunning:
  846. case WUStateAborting:
  847. case WUStateBlocked:
  848. throw MakeStringException(ECLWATCH_CANNOT_SCHEDULE_WORKUNIT, "Cannot schedule the workunit. Workunit state is '%s'.", wu->queryStateDesc());
  849. }
  850. PROGLOG("WUSchedule: %s", wuid.str());
  851. wu->clearExceptions();
  852. wu->setClusterName(cluster);
  853. if (notEmpty(req.getWhen()))
  854. {
  855. WsWuDateTime dt;
  856. dt.setString(req.getWhen());
  857. wu->setTimeScheduled(dt);
  858. }
  859. if(notEmpty(req.getSnapshot()))
  860. wu->setSnapshot(req.getSnapshot());
  861. wu->setState(WUStateScheduled);
  862. if (req.getMaxRunTime())
  863. wu->setDebugValueInt("maxRunTime", req.getMaxRunTime(), true);
  864. AuditSystemAccess(context.queryUserId(), true, "Scheduled %s", wuid.str());
  865. }
  866. catch(IException* e)
  867. {
  868. FORWARDEXCEPTION(context, e, ECLWATCH_INTERNAL_ERROR);
  869. }
  870. return true;
  871. }
  872. bool CWsWorkunitsEx::onWUSubmit(IEspContext &context, IEspWUSubmitRequest &req, IEspWUSubmitResponse &resp)
  873. {
  874. try
  875. {
  876. StringBuffer wuid(req.getWuid());
  877. WsWuHelpers::checkAndTrimWorkunit("WUSubmit", wuid);
  878. const char *cluster = req.getCluster();
  879. if (isEmpty(cluster))
  880. throw MakeStringException(ECLWATCH_INVALID_INPUT,"No Cluster defined.");
  881. if (!isValidCluster(cluster))
  882. throw MakeStringException(ECLWATCH_INVALID_CLUSTER_NAME, "Invalid cluster name: %s", cluster);
  883. Owned<IWorkUnitFactory> factory = getWorkUnitFactory(context.querySecManager(), context.queryUser());
  884. Owned<IConstWorkUnit> cw = factory->openWorkUnit(wuid.str());
  885. if(!cw)
  886. throw MakeStringException(ECLWATCH_CANNOT_OPEN_WORKUNIT,"Cannot open workunit %s.",wuid.str());
  887. ensureWsWorkunitAccess(context, *cw, SecAccess_Full);
  888. if (cw->getAction()==WUActionExecuteExisting)
  889. {
  890. ExecuteExistingQueryInfo info(cw);
  891. if (info.queryset.isEmpty() || info.query.isEmpty())
  892. {
  893. WorkunitUpdate wu(&cw->lock());
  894. throw WsWuHelpers::noteException(wu, MakeStringException(ECLWATCH_INVALID_INPUT,"Queryset and/or query not specified"));
  895. }
  896. WsWuHelpers::runWsWuQuery(context, cw, info.queryset.str(), info.query.str(), cluster, NULL);
  897. }
  898. else
  899. WsWuHelpers::submitWsWorkunit(context, cw, cluster, req.getSnapshot(), req.getMaxRunTime(), true, false, false);
  900. PROGLOG("WUSubmit: %s", wuid.str());
  901. if (req.getBlockTillFinishTimer() != 0)
  902. waitForWorkUnitToComplete(wuid.str(), req.getBlockTillFinishTimer());
  903. resp.setRedirectUrl(StringBuffer("/WsWorkunits/WUInfo?Wuid=").append(wuid).str());
  904. }
  905. catch(IException* e)
  906. {
  907. FORWARDEXCEPTION(context, e, ECLWATCH_INTERNAL_ERROR);
  908. }
  909. return true;
  910. }
  911. ErrorSeverity checkGetExceptionSeverity(CWUExceptionSeverity severity)
  912. {
  913. switch (severity)
  914. {
  915. case CWUExceptionSeverity_INFO:
  916. return SeverityInformation;
  917. case CWUExceptionSeverity_WARNING:
  918. return SeverityWarning;
  919. case CWUExceptionSeverity_ERROR:
  920. return SeverityError;
  921. case CWUExceptionSeverity_ALERT:
  922. return SeverityAlert;
  923. }
  924. throw MakeStringExceptionDirect(ECLWATCH_INVALID_INPUT,"invalid exception severity");
  925. }
  926. bool CWsWorkunitsEx::onWURun(IEspContext &context, IEspWURunRequest &req, IEspWURunResponse &resp)
  927. {
  928. try
  929. {
  930. const char *cluster = req.getCluster();
  931. if (notEmpty(cluster) && !isValidCluster(cluster))
  932. throw MakeStringException(ECLWATCH_INVALID_CLUSTER_NAME, "Invalid cluster name: %s", cluster);
  933. StringBuffer wuidStr(req.getWuid());
  934. const char* runWuid = wuidStr.trim().str();
  935. StringBuffer wuid;
  936. ErrorSeverity severity = checkGetExceptionSeverity(req.getExceptionSeverity());
  937. if (runWuid && *runWuid)
  938. {
  939. if (!looksLikeAWuid(runWuid, 'W'))
  940. throw MakeStringException(ECLWATCH_INVALID_INPUT, "Invalid Workunit ID: %s", runWuid);
  941. ensureWsWorkunitAccess(context, runWuid, SecAccess_Full);
  942. PROGLOG("WURun: %s", runWuid);
  943. if (req.getCloneWorkunit())
  944. WsWuHelpers::runWsWorkunit(context, wuid, runWuid, cluster, req.getInput(), &req.getVariables(),
  945. &req.getDebugValues(), &req.getApplicationValues());
  946. else
  947. {
  948. WsWuHelpers::submitWsWorkunit(context, runWuid, cluster, NULL, 0, false, true, true, req.getInput(),
  949. &req.getVariables(), &req.getDebugValues(), &req.getApplicationValues());
  950. wuid.set(runWuid);
  951. }
  952. }
  953. else if (notEmpty(req.getQuerySet()) && notEmpty(req.getQuery()))
  954. {
  955. PROGLOG("WURun: QuerySet %s, Query %s", req.getQuerySet(), req.getQuery());
  956. WsWuHelpers::runWsWuQuery(context, wuid, req.getQuerySet(), req.getQuery(), cluster, req.getInput(),
  957. &req.getApplicationValues());
  958. }
  959. else
  960. throw MakeStringException(ECLWATCH_MISSING_PARAMS,"Workunit or Query required");
  961. int timeToWait = req.getWait();
  962. if (timeToWait != 0)
  963. waitForWorkUnitToComplete(wuid.str(), timeToWait);
  964. Owned<IWorkUnitFactory> factory = getWorkUnitFactory(context.querySecManager(), context.queryUser());
  965. Owned<IConstWorkUnit> cw = factory->openWorkUnit(wuid.str());
  966. if (!cw)
  967. throw MakeStringException(ECLWATCH_CANNOT_UPDATE_WORKUNIT,"Cannot open workunit %s.", wuid.str());
  968. resp.setState(cw->queryStateDesc());
  969. resp.setWuid(wuid.str());
  970. switch (cw->getState())
  971. {
  972. case WUStateCompleted:
  973. case WUStateFailed:
  974. case WUStateUnknown:
  975. {
  976. SCMStringBuffer result;
  977. unsigned flags = WorkUnitXML_SeverityTags;
  978. if (req.getNoRootTag())
  979. flags |= WorkUnitXML_NoRoot;
  980. getFullWorkUnitResultsXML(context.queryUserId(), context.queryPassword(), cw.get(), result, flags, severity);
  981. resp.setResults(result.str());
  982. break;
  983. }
  984. default:
  985. break;
  986. }
  987. }
  988. catch(IException* e)
  989. {
  990. FORWARDEXCEPTION(context, e, ECLWATCH_INTERNAL_ERROR);
  991. }
  992. return true;
  993. }
  994. bool CWsWorkunitsEx::onWUFullResult(IEspContext &context, IEspWUFullResultRequest &req, IEspWUFullResultResponse &resp)
  995. {
  996. try
  997. {
  998. StringBuffer wuid(req.getWuid());
  999. WsWuHelpers::checkAndTrimWorkunit("WUFullResult", wuid);
  1000. ErrorSeverity severity = checkGetExceptionSeverity(req.getExceptionSeverity());
  1001. if (!wuid.length())
  1002. throw MakeStringException(ECLWATCH_MISSING_PARAMS,"Workunit or Query required");
  1003. if (!looksLikeAWuid(wuid, 'W'))
  1004. throw MakeStringException(ECLWATCH_INVALID_INPUT, "Invalid Workunit ID: %s", wuid.str());
  1005. PROGLOG("WUFullResults: %s", wuid.str());
  1006. Owned<IWorkUnitFactory> factory = getWorkUnitFactory(context.querySecManager(), context.queryUser());
  1007. Owned<IConstWorkUnit> cw = factory->openWorkUnit(wuid.str());
  1008. if (!cw)
  1009. throw MakeStringException(ECLWATCH_CANNOT_OPEN_WORKUNIT,"Cannot open workunit %s.", wuid.str());
  1010. ensureWsWorkunitAccess(context, *cw, SecAccess_Read);
  1011. resp.setWuid(wuid.str());
  1012. switch (cw->getState())
  1013. {
  1014. case WUStateCompleted:
  1015. case WUStateFailed:
  1016. case WUStateUnknown:
  1017. {
  1018. SCMStringBuffer result;
  1019. unsigned flags = WorkUnitXML_SeverityTags;
  1020. if (req.getNoRootTag())
  1021. flags |= WorkUnitXML_NoRoot;
  1022. if (context.getResponseFormat()==ESPSerializationJSON)
  1023. getFullWorkUnitResultsJSON(context.queryUserId(), context.queryPassword(), cw.get(), result, flags, severity);
  1024. else
  1025. getFullWorkUnitResultsXML(context.queryUserId(), context.queryPassword(), cw.get(), result, flags, severity);
  1026. resp.setResults(result.str());
  1027. break;
  1028. }
  1029. default:
  1030. throw MakeStringException(ECLWATCH_CANNOT_GET_WU_RESULT, "Cannot get results Workunit %s %s.", wuid.str(), getWorkunitStateStr(cw->getState()));
  1031. }
  1032. }
  1033. catch(IException* e)
  1034. {
  1035. FORWARDEXCEPTION(context, e, ECLWATCH_INTERNAL_ERROR);
  1036. }
  1037. return true;
  1038. }
  1039. bool CWsWorkunitsEx::onWUWaitCompiled(IEspContext &context, IEspWUWaitRequest &req, IEspWUWaitResponse &resp)
  1040. {
  1041. try
  1042. {
  1043. StringBuffer wuid(req.getWuid());
  1044. WsWuHelpers::checkAndTrimWorkunit("WUWaitCompiled", wuid);
  1045. ensureWsWorkunitAccess(context, wuid.str(), SecAccess_Full);
  1046. PROGLOG("WUWaitCompiled: %s", wuid.str());
  1047. secWaitForWorkUnitToCompile(wuid.str(), *context.querySecManager(), *context.queryUser(), req.getWait());
  1048. Owned<IWorkUnitFactory> factory = getWorkUnitFactory(context.querySecManager(), context.queryUser());
  1049. Owned<IConstWorkUnit> cw = factory->openWorkUnit(wuid.str());
  1050. if(!cw)
  1051. throw MakeStringException(ECLWATCH_CANNOT_OPEN_WORKUNIT,"Cannot open workunit %s.",wuid.str());
  1052. resp.setStateID(cw->getState());
  1053. }
  1054. catch(IException* e)
  1055. {
  1056. FORWARDEXCEPTION(context, e, ECLWATCH_INTERNAL_ERROR);
  1057. }
  1058. return true;
  1059. }
  1060. bool CWsWorkunitsEx::onWUWaitComplete(IEspContext &context, IEspWUWaitRequest &req, IEspWUWaitResponse &resp)
  1061. {
  1062. try
  1063. {
  1064. StringBuffer wuid(req.getWuid());
  1065. WsWuHelpers::checkAndTrimWorkunit("WUWaitComplete", wuid);
  1066. ensureWsWorkunitAccess(context, wuid.str(), SecAccess_Full);
  1067. PROGLOG("WUWaitComplete: %s", wuid.str());
  1068. std::list<WUState> expectedStates;
  1069. if (req.getReturnOnWait())
  1070. expectedStates.push_back(WUStateWait);
  1071. resp.setStateID(secWaitForWorkUnitToComplete(wuid.str(), *context.querySecManager(), *context.queryUser(), req.getWait(), expectedStates));
  1072. }
  1073. catch(IException* e)
  1074. {
  1075. FORWARDEXCEPTION(context, e, ECLWATCH_INTERNAL_ERROR);
  1076. }
  1077. return true;
  1078. }
  1079. bool CWsWorkunitsEx::onWUCDebug(IEspContext &context, IEspWUDebugRequest &req, IEspWUDebugResponse &resp)
  1080. {
  1081. try
  1082. {
  1083. StringBuffer wuid(req.getWuid());
  1084. WsWuHelpers::checkAndTrimWorkunit("WUCDebug", wuid);
  1085. ensureWsWorkunitAccess(context, wuid.str(), SecAccess_Full);
  1086. PROGLOG("WUCDebug: %s", wuid.str());
  1087. StringBuffer result;
  1088. secDebugWorkunit(wuid.str(), *context.querySecManager(), *context.queryUser(), req.getCommand(), result);
  1089. resp.setResult(result);
  1090. }
  1091. catch(IException* e)
  1092. {
  1093. FORWARDEXCEPTION(context, e, ECLWATCH_INTERNAL_ERROR);
  1094. }
  1095. return true;
  1096. }
  1097. bool CWsWorkunitsEx::onWUSyntaxCheckECL(IEspContext &context, IEspWUSyntaxCheckRequest &req, IEspWUSyntaxCheckResponse &resp)
  1098. {
  1099. try
  1100. {
  1101. ensureWsCreateWorkunitAccess(context);
  1102. Owned<IWorkUnitFactory> factory = getWorkUnitFactory(context.querySecManager(), context.queryUser());
  1103. NewWsWorkunit wu(factory, context);
  1104. wu->setAction(WUActionCheck);
  1105. if(notEmpty(req.getModuleName()) && notEmpty(req.getAttributeName()))
  1106. {
  1107. wu->setApplicationValue("SyntaxCheck", "ModuleName", req.getModuleName(), true);
  1108. wu->setApplicationValue("SyntaxCheck", "AttributeName", req.getAttributeName(), true);
  1109. }
  1110. ForEachItemIn(di, req.getDebugValues())
  1111. {
  1112. IConstDebugValue& item=req.getDebugValues().item(di);
  1113. const char *debugName = item.getName();
  1114. if (notEmpty(debugName))
  1115. {
  1116. StringBuffer expanded;
  1117. if (*debugName=='-')
  1118. debugName=expanded.append("eclcc").append(debugName).str();
  1119. wu->setDebugValue(debugName, item.getValue(), true);
  1120. }
  1121. }
  1122. wu.setQueryText(req.getECL());
  1123. StringAttr wuid(wu->queryWuid()); // NB queryWuid() not valid after workunit,clear()
  1124. wu->commit();
  1125. wu.clear();
  1126. WsWuHelpers::submitWsWorkunit(context, wuid.str(), req.getCluster(), req.getSnapshot(), 0, true, false, false);
  1127. waitForWorkUnitToComplete(wuid.str(), req.getTimeToWait());
  1128. Owned<IConstWorkUnit> cw(factory->openWorkUnit(wuid.str()));
  1129. WsWUExceptions errors(*cw);
  1130. resp.setErrors(errors);
  1131. StringBuffer msg;
  1132. WUState st = cw->getState();
  1133. cw.clear();
  1134. switch (st)
  1135. {
  1136. case WUStateAborted:
  1137. case WUStateCompleted:
  1138. case WUStateFailed:
  1139. factory->deleteWorkUnitEx(wuid.str(), true);
  1140. break;
  1141. default:
  1142. abortWorkUnit(wuid.str(), context.querySecManager(), context.queryUser());
  1143. if (!factory->deleteWorkUnit(wuid.str()))
  1144. {
  1145. throw MakeStringException(ECLWATCH_CANNOT_DELETE_WORKUNIT,
  1146. "WUSyntaxCheckECL has timed out. Workunit %s cannot be deleted now. You may delete it when its status changes.", wuid.str());
  1147. }
  1148. if (context.getClientVersion() < 1.57)
  1149. throw MakeStringException(ECLWATCH_CANNOT_DELETE_WORKUNIT, "WUSyntaxCheckECL has timed out.");
  1150. resp.setMessage("WUSyntaxCheckECL has timed out.");
  1151. break;
  1152. }
  1153. }
  1154. catch(IException* e)
  1155. {
  1156. FORWARDEXCEPTION(context, e, ECLWATCH_INTERNAL_ERROR);
  1157. }
  1158. return true;
  1159. }
  1160. bool CWsWorkunitsEx::onWUCompileECL(IEspContext &context, IEspWUCompileECLRequest &req, IEspWUCompileECLResponse &resp)
  1161. {
  1162. try
  1163. {
  1164. ensureWsCreateWorkunitAccess(context);
  1165. Owned<IWorkUnitFactory> factory = getWorkUnitFactory(context.querySecManager(), context.queryUser());
  1166. NewWsWorkunit wu(factory, context);
  1167. if(req.getIncludeComplexity())
  1168. {
  1169. wu->setAction(WUActionCompile);
  1170. wu->setDebugValueInt("calculateComplexity",1,true);
  1171. }
  1172. else
  1173. wu->setAction(WUActionCheck);
  1174. if(req.getModuleName() && req.getAttributeName())
  1175. {
  1176. wu->setApplicationValue("SyntaxCheck","ModuleName",req.getModuleName(),true);
  1177. wu->setApplicationValue("SyntaxCheck","AttributeName",req.getAttributeName(),true);
  1178. }
  1179. if(req.getIncludeDependencies())
  1180. wu->setApplicationValueInt("SyntaxCheck","IncludeDependencies",1,true);
  1181. wu.setQueryText(req.getECL());
  1182. StringAttr wuid(wu->queryWuid()); // NB queryWuid() not valid after workunit,clear() StringAttr wuid(wu->queryWuid());
  1183. WsWuHelpers::submitWsWorkunit(context, wuid.str(), req.getCluster(), req.getSnapshot(), 0, true, false, false);
  1184. waitForWorkUnitToComplete(wuid.str(),req.getTimeToWait());
  1185. Owned<IConstWorkUnit> cw = factory->openWorkUnit(wuid.str());
  1186. SCMStringBuffer s;
  1187. cw->getDebugValue("__Calculated__Complexity__",s);
  1188. if(s.length())
  1189. resp.setComplexity(s.str());
  1190. WsWUExceptions errors(*cw);
  1191. resp.setErrors(errors);
  1192. if(!errors.ErrCount())
  1193. {
  1194. IArrayOf<IEspWUECLAttribute> dependencies;
  1195. for(unsigned count=1;;count++)
  1196. {
  1197. SCMStringBuffer xml;
  1198. cw->getApplicationValue("SyntaxCheck",StringBuffer("Dependency").append(count).str(),xml);
  1199. if(!xml.length())
  1200. break;
  1201. Owned<IPropertyTree> dep=createPTreeFromXMLString(xml.str(), ipt_caseInsensitive);
  1202. if(!dep)
  1203. continue;
  1204. Owned<IEspWUECLAttribute> att = createWUECLAttribute("","");
  1205. att->setModuleName(dep->queryProp("@module"));
  1206. att->setAttributeName(dep->queryProp("@name"));
  1207. int flags = dep->getPropInt("@flags",0);
  1208. if(flags & ob_locked)
  1209. {
  1210. if(flags & ob_lockedself)
  1211. att->setIsCheckedOut(true);
  1212. else
  1213. att->setIsLocked(true);
  1214. }
  1215. if(flags & ob_sandbox)
  1216. att->setIsSandbox(true);
  1217. if(flags & ob_orphaned)
  1218. att->setIsOrphaned(true);
  1219. dependencies.append(*att.getLink());
  1220. }
  1221. resp.setDependencies(dependencies);
  1222. }
  1223. cw.clear();
  1224. factory->deleteWorkUnitEx(wuid.str(), true);
  1225. }
  1226. catch(IException* e)
  1227. {
  1228. FORWARDEXCEPTION(context, e, ECLWATCH_INTERNAL_ERROR);
  1229. }
  1230. return true;
  1231. }
  1232. bool CWsWorkunitsEx::onWUGetDependancyTrees(IEspContext& context, IEspWUGetDependancyTreesRequest& req, IEspWUGetDependancyTreesResponse& resp)
  1233. {
  1234. try
  1235. {
  1236. ensureWsCreateWorkunitAccess(context);
  1237. unsigned int timeMilliSec = 500;
  1238. Owned<IWorkUnitFactory> factory = getWorkUnitFactory(context.querySecManager(), context.queryUser());
  1239. NewWsWorkunit wu(factory, context);
  1240. wu->setAction(WUActionCheck);
  1241. if (notEmpty(req.getCluster()))
  1242. wu->setClusterName(req.getCluster());
  1243. if (notEmpty(req.getSnapshot()))
  1244. wu->setSnapshot(req.getSnapshot());
  1245. wu->setDebugValue("gatherDependenciesSelection",notEmpty(req.getItems()) ? req.getItems() : NULL,true);
  1246. if (context.getClientVersion() > 1.12)
  1247. {
  1248. wu->setDebugValueInt("gatherDependencies", 1, true);
  1249. const char *timeout = req.getTimeoutMilliSec();
  1250. if (notEmpty(timeout))
  1251. {
  1252. const char *finger = timeout;
  1253. while (*finger)
  1254. {
  1255. if (!isdigit(*finger++))
  1256. throw MakeStringException(ECLWATCH_INVALID_INPUT, "Incorrect timeout value");
  1257. }
  1258. timeMilliSec = atol(timeout);
  1259. }
  1260. }
  1261. StringAttr wuid(wu->queryWuid()); // NB queryWuid() not valid after workunit,clear()
  1262. wu->commit();
  1263. wu.clear();
  1264. WsWuHelpers::submitWsWorkunit(context, wuid.str(), req.getCluster(), req.getSnapshot(), 0, true, false, false);
  1265. int state = waitForWorkUnitToComplete(wuid.str(), timeMilliSec);
  1266. Owned<IConstWorkUnit> cw = factory->openWorkUnit(wuid.str());
  1267. WsWUExceptions errors(*cw);
  1268. resp.setErrors(errors);
  1269. MemoryBuffer temp;
  1270. MemoryBuffer2IDataVal xmlresult(temp);
  1271. Owned<IConstWUResult> result = cw->getResultBySequence(0);
  1272. if (result)
  1273. {
  1274. result->getResultRaw(xmlresult, NULL, NULL);
  1275. resp.setDependancyTrees(temp);
  1276. }
  1277. wu.setown(&cw->lock());
  1278. wu->setState(WUStateAborted);
  1279. wu->commit();
  1280. wu.clear();
  1281. factory->deleteWorkUnitEx(wuid.str(), true);
  1282. }
  1283. catch(IException* e)
  1284. {
  1285. FORWARDEXCEPTION(context, e, ECLWATCH_INTERNAL_ERROR);
  1286. }
  1287. return true;
  1288. }
  1289. bool getWsWuInfoFromSasha(IEspContext &context, SocketEndpoint &ep, const char* wuid, IEspECLWorkunit *info)
  1290. {
  1291. Owned<INode> node = createINode(ep);
  1292. Owned<ISashaCommand> cmd = createSashaCommand();
  1293. cmd->addId(wuid);
  1294. cmd->setAction(SCA_GET);
  1295. if (!cmd->send(node, 1*60*1000))
  1296. {
  1297. StringBuffer url;
  1298. throw MakeStringException(ECLWATCH_CANNOT_CONNECT_ARCHIVE_SERVER,
  1299. "Sasha (%s) took too long to respond from: Get information for %s.",
  1300. ep.getUrlStr(url).str(), wuid);
  1301. }
  1302. if (cmd->numIds()==0)
  1303. {
  1304. DBGLOG("Could not read archived workunit %s",wuid);
  1305. throw MakeStringException(ECLWATCH_CANNOT_GET_WORKUNIT,"Cannot read workunit %s.",wuid);
  1306. }
  1307. unsigned num = cmd->numResults();
  1308. if (num < 1)
  1309. return false;
  1310. StringBuffer res;
  1311. cmd->getResult(0, res);
  1312. if(res.length() < 1)
  1313. return false;
  1314. Owned<IPropertyTree> wpt = createPTreeFromXMLString(res.str());
  1315. if (!wpt)
  1316. return false;
  1317. const char * owner = wpt->queryProp("@submitID");
  1318. ensureWsWorkunitAccessByOwnerId(context, owner, SecAccess_Read);
  1319. PROGLOG("GetArchivedWUInfo: %s", wuid);
  1320. info->setWuid(wuid);
  1321. info->setArchived(true);
  1322. if (notEmpty(owner))
  1323. info->setOwner(owner);
  1324. const char * state = wpt->queryProp("@state");
  1325. if (notEmpty(state))
  1326. info->setState(state);
  1327. const char * cluster = wpt->queryProp("@clusterName");
  1328. if (notEmpty(cluster))
  1329. info->setCluster(cluster);
  1330. if (context.querySecManager())
  1331. {
  1332. const char * scope = wpt->queryProp("@scope");
  1333. if (notEmpty(scope))
  1334. info->setScope(scope);
  1335. }
  1336. const char * jobName = wpt->queryProp("@jobName");
  1337. if (notEmpty(jobName))
  1338. info->setJobname(jobName);
  1339. const char * description = wpt->queryProp("Debug/description");
  1340. if (notEmpty(description))
  1341. info->setDescription(description);
  1342. const char * queryText = wpt->queryProp("Query/Text");
  1343. if (notEmpty(queryText))
  1344. info->updateQuery().setText(queryText);
  1345. const char * protectedWU = wpt->queryProp("@protected");
  1346. info->setProtected((protectedWU && *protectedWU!='0'));
  1347. return true;
  1348. }
  1349. #define WUDETAILS_REFRESH_MINS 1
  1350. void getArchivedWUInfo(IEspContext &context, const char* sashaServerIP, unsigned sashaServerPort, const char *wuid, IEspWUInfoResponse &resp)
  1351. {
  1352. SocketEndpoint ep;
  1353. if (sashaServerIP && *sashaServerIP)
  1354. ep.set(sashaServerIP, sashaServerPort);
  1355. else
  1356. getSashaNode(ep);
  1357. if (getWsWuInfoFromSasha(context, ep, wuid, &resp.updateWorkunit()))
  1358. {
  1359. resp.setCanCompile(false);
  1360. return;
  1361. }
  1362. throw MakeStringException(ECLWATCH_CANNOT_GET_WORKUNIT, "Cannot find workunit %s.", wuid);
  1363. return;
  1364. }
  1365. #define WUDETAILS_REFRESH_MINS 1
  1366. bool CWsWorkunitsEx::onWUInfo(IEspContext &context, IEspWUInfoRequest &req, IEspWUInfoResponse &resp)
  1367. {
  1368. try
  1369. {
  1370. StringBuffer wuid(req.getWuid());
  1371. WsWuHelpers::checkAndTrimWorkunit("WUInfo", wuid);
  1372. double version = context.getClientVersion();
  1373. if (req.getType() && strieq(req.getType(), "archived workunits"))
  1374. getArchivedWUInfo(context, sashaServerIp.get(), sashaServerPort, wuid.str(), resp);
  1375. else
  1376. {
  1377. try
  1378. {
  1379. //The access is checked here because getArchivedWUInfo() has its own access check.
  1380. ensureWsWorkunitAccess(context, wuid.str(), SecAccess_Read);
  1381. unsigned long flags=0;
  1382. if (req.getTruncateEclTo64k())
  1383. flags|=WUINFO_TruncateEclTo64k;
  1384. if (req.getIncludeExceptions())
  1385. flags|=WUINFO_IncludeExceptions;
  1386. if (req.getIncludeGraphs())
  1387. flags|=WUINFO_IncludeGraphs;
  1388. if (req.getIncludeSourceFiles())
  1389. flags|=WUINFO_IncludeSourceFiles;
  1390. if (req.getIncludeResults())
  1391. flags|=WUINFO_IncludeResults;
  1392. if (req.getIncludeVariables())
  1393. flags|=WUINFO_IncludeVariables;
  1394. if (req.getIncludeTimers())
  1395. flags|=WUINFO_IncludeTimers;
  1396. if (req.getIncludeDebugValues())
  1397. flags|=WUINFO_IncludeDebugValues;
  1398. if (req.getIncludeApplicationValues())
  1399. flags|=WUINFO_IncludeApplicationValues;
  1400. if (req.getIncludeWorkflows())
  1401. flags|=WUINFO_IncludeWorkflows;
  1402. if (!req.getSuppressResultSchemas())
  1403. flags|=WUINFO_IncludeEclSchemas;
  1404. if (req.getIncludeXmlSchemas())
  1405. flags|=WUINFO_IncludeXmlSchema;
  1406. if (req.getIncludeResultsViewNames())
  1407. flags|=WUINFO_IncludeResultsViewNames;
  1408. if (req.getIncludeResourceURLs())
  1409. flags|=WUINFO_IncludeResourceURLs;
  1410. if (req.getIncludeECL())
  1411. flags|=WUINFO_IncludeECL;
  1412. if (req.getIncludeHelpers())
  1413. flags|=WUINFO_IncludeHelpers;
  1414. if (req.getIncludeAllowedClusters())
  1415. flags|=WUINFO_IncludeAllowedClusters;
  1416. if (req.getIncludeTotalClusterTime())
  1417. flags|=WUINFO_IncludeTotalClusterTime;
  1418. if (req.getIncludeServiceNames())
  1419. flags|=WUINFO_IncludeServiceNames;
  1420. PROGLOG("WUInfo: %s %lx", wuid.str(), flags);
  1421. WsWuInfo winfo(context, wuid.str());
  1422. winfo.getInfo(resp.updateWorkunit(), flags);
  1423. if (req.getIncludeResultsViewNames()||req.getIncludeResourceURLs()||(version >= 1.50))
  1424. {
  1425. StringArray views, urls;
  1426. winfo.getResourceInfo(views, urls, WUINFO_IncludeResultsViewNames|WUINFO_IncludeResourceURLs);
  1427. IEspECLWorkunit& eclWU = resp.updateWorkunit();
  1428. if (req.getIncludeResultsViewNames())
  1429. resp.setResultViews(views);
  1430. if (req.getIncludeResourceURLs())
  1431. eclWU.setResourceURLs(urls);
  1432. if (version >= 1.50)
  1433. {
  1434. eclWU.setResultViewCount(views.length());
  1435. eclWU.setResourceURLCount(urls.length());
  1436. }
  1437. }
  1438. }
  1439. catch (IException *e)
  1440. {
  1441. if (e->errorCode() != ECLWATCH_CANNOT_OPEN_WORKUNIT)
  1442. throw e;
  1443. getArchivedWUInfo(context, sashaServerIp.get(), sashaServerPort, wuid.str(), resp);
  1444. e->Release();
  1445. }
  1446. switch (resp.getWorkunit().getStateID())
  1447. {
  1448. case WUStateCompiling:
  1449. case WUStateCompiled:
  1450. case WUStateScheduled:
  1451. case WUStateSubmitted:
  1452. case WUStateRunning:
  1453. case WUStateAborting:
  1454. case WUStateWait:
  1455. case WUStateUploadingFiles:
  1456. case WUStateDebugPaused:
  1457. case WUStateDebugRunning:
  1458. resp.setAutoRefresh(WUDETAILS_REFRESH_MINS);
  1459. break;
  1460. case WUStateBlocked:
  1461. resp.setAutoRefresh(WUDETAILS_REFRESH_MINS*5);
  1462. break;
  1463. }
  1464. resp.setCanCompile(notEmpty(context.queryUserId()));
  1465. if (version > 1.24 && notEmpty(req.getThorSlaveIP()))
  1466. resp.setThorSlaveIP(req.getThorSlaveIP());
  1467. ISecManager* secmgr = context.querySecManager();
  1468. if (!secmgr)
  1469. resp.setSecMethod(NULL);
  1470. else
  1471. resp.setSecMethod(secmgr->querySecMgrTypeName());
  1472. }
  1473. }
  1474. catch(IException* e)
  1475. {
  1476. FORWARDEXCEPTION(context, e, ECLWATCH_INTERNAL_ERROR);
  1477. }
  1478. return true;
  1479. }
  1480. bool CWsWorkunitsEx::onWUInfoDetails(IEspContext &context, IEspWUInfoRequest &req, IEspWUInfoResponse &resp)
  1481. {
  1482. return onWUInfo(context, req, resp);
  1483. }
  1484. bool CWsWorkunitsEx::onWUResultView(IEspContext &context, IEspWUResultViewRequest &req, IEspWUResultViewResponse &resp)
  1485. {
  1486. StringBuffer wuid(req.getWuid());
  1487. WsWuHelpers::checkAndTrimWorkunit("WUResultView", wuid);
  1488. ensureWsWorkunitAccess(context, wuid.str(), SecAccess_Read);
  1489. PROGLOG("WUResultView: %s", wuid.str());
  1490. Owned<IWuWebView> wv = createWuWebView(wuid.str(), NULL, NULL, getCFD(), true, nullptr);
  1491. StringBuffer html;
  1492. wv->renderSingleResult(req.getViewName(), req.getResultName(), html);
  1493. resp.setResult(html.str());
  1494. resp.setResult_mimetype("text/html");
  1495. return true;
  1496. }
  1497. void doWUQueryBySingleWuid(IEspContext &context, const char *wuid, IEspWUQueryResponse &resp)
  1498. {
  1499. Owned<IEspECLWorkunit> info= createECLWorkunit("","");
  1500. WsWuInfo winfo(context, wuid);
  1501. winfo.getCommon(*info, 0);
  1502. IArrayOf<IEspECLWorkunit> results;
  1503. results.append(*info.getClear());
  1504. resp.setWorkunits(results);
  1505. resp.setPageSize(1);
  1506. resp.setCount(1);
  1507. PROGLOG("getWUInfo: %s", wuid);
  1508. }
  1509. void doWUQueryByFile(IEspContext &context, const char *logicalFile, IEspWUQueryResponse &resp)
  1510. {
  1511. StringBuffer wuid;
  1512. getWuidFromLogicalFileName(context, logicalFile, wuid);
  1513. if (!wuid.length())
  1514. throw MakeStringException(ECLWATCH_CANNOT_GET_WORKUNIT,"Cannot find the workunit for file %s.", logicalFile);
  1515. Owned<IWorkUnitFactory> factory = getWorkUnitFactory(context.querySecManager(), context.queryUser());
  1516. Owned<IConstWorkUnit> cw= factory->openWorkUnit(wuid.str());
  1517. if (!cw)
  1518. throw MakeStringException(ECLWATCH_CANNOT_OPEN_WORKUNIT,"Cannot find the workunit for file %s.", logicalFile);
  1519. if (getWsWorkunitAccess(context, *cw) < SecAccess_Read)
  1520. throw MakeStringException(ECLWATCH_ECL_WU_ACCESS_DENIED,"Cannot access the workunit for file %s.",logicalFile);
  1521. doWUQueryBySingleWuid(context, wuid.str(), resp);
  1522. resp.setFirst(false);
  1523. resp.setPageSize(1);
  1524. resp.setCount(1);
  1525. }
  1526. bool addWUQueryFilter(WUSortField *filters, unsigned short &count, MemoryBuffer &buff, const char *name, WUSortField value)
  1527. {
  1528. if (isEmpty(name))
  1529. return false;
  1530. filters[count++] = value;
  1531. if ((value & WUSFwild) != 0 && !containsWildcard(name))
  1532. {
  1533. VStringBuffer s("*%s*", name);
  1534. buff.append(s);
  1535. }
  1536. else
  1537. buff.append(name);
  1538. return true;
  1539. }
  1540. bool addWUQueryFilterTime(WUSortField *filters, unsigned short &count, MemoryBuffer &buff, const char *stime, WUSortField value)
  1541. {
  1542. if (isEmpty(stime))
  1543. return false;
  1544. CDateTime dt;
  1545. dt.setString(stime, NULL, true);
  1546. unsigned year, month, day, hour, minute, second, nano;
  1547. dt.getDate(year, month, day, true);
  1548. dt.getTime(hour, minute, second, nano, true);
  1549. VStringBuffer wuid("W%4d%02d%02d-%02d%02d%02d",year,month,day,hour,minute,second);
  1550. filters[count++] = value;
  1551. buff.append(wuid.str());
  1552. return true;
  1553. }
  1554. bool addWUQueryFilterTotalClusterTime(WUSortField *filters, unsigned short &count, MemoryBuffer &filterBuf, unsigned milliseconds, WUSortField value)
  1555. {
  1556. if (milliseconds == 0)
  1557. return false;
  1558. VStringBuffer vBuf("%u", milliseconds);
  1559. filters[count++] = value;
  1560. filterBuf.append(vBuf);
  1561. return true;
  1562. }
  1563. bool addWUQueryFilterApplication(WUSortField *filters, unsigned short &count, MemoryBuffer &buff, const char *appname, const char *appkey, const char *appdata)
  1564. {
  1565. if (isEmpty(appname))
  1566. return false; // appname must be specified
  1567. if (isEmpty(appkey) && isEmpty(appdata)) //one or other is required ( MORE - see if cassandra can relax that)
  1568. return false;
  1569. VStringBuffer path("%s/%s", appname, appkey && *appkey ? appkey : "*");
  1570. buff.append(path.str());
  1571. buff.append(appdata);
  1572. filters[count++] = WUSFappvalue;
  1573. return true;
  1574. }
  1575. void readWUQuerySortOrder(const char* sortBy, const bool descending, WUSortField& sortOrder)
  1576. {
  1577. if (isEmpty(sortBy))
  1578. {
  1579. sortOrder = (WUSortField) (WUSFwuid | WUSFreverse);
  1580. return;
  1581. }
  1582. if (strieq(sortBy, "Owner"))
  1583. sortOrder = WUSFuser;
  1584. else if (strieq(sortBy, "JobName"))
  1585. sortOrder = WUSFjob;
  1586. else if (strieq(sortBy, "Cluster"))
  1587. sortOrder = WUSFcluster;
  1588. else if (strieq(sortBy, "Protected"))
  1589. sortOrder = WUSFprotected;
  1590. else if (strieq(sortBy, "State"))
  1591. sortOrder = WUSFstate;
  1592. else if (strieq(sortBy, "ClusterTime"))
  1593. sortOrder = (WUSortField) (WUSFtotalthortime+WUSFnumeric);
  1594. else
  1595. sortOrder = WUSFwuid;
  1596. sortOrder = (WUSortField) (sortOrder | WUSFnocase);
  1597. if (descending)
  1598. sortOrder = (WUSortField) (sortOrder | WUSFreverse);
  1599. }
  1600. void doWUQueryWithSort(IEspContext &context, IEspWUQueryRequest & req, IEspWUQueryResponse & resp)
  1601. {
  1602. SecAccessFlags accessOwn;
  1603. SecAccessFlags accessOthers;
  1604. getUserWuAccessFlags(context, accessOwn, accessOthers, true);
  1605. double version = context.getClientVersion();
  1606. IArrayOf<IEspECLWorkunit> results;
  1607. int begin = 0;
  1608. unsigned int count = 100;
  1609. int pagesize = 100;
  1610. if (version > 1.01)
  1611. {
  1612. pagesize = (int)req.getPageSize();
  1613. if (!req.getCount_isNull())
  1614. pagesize = req.getCount();
  1615. if(pagesize < 1)
  1616. pagesize = 100;
  1617. begin = (int)req.getPageStartFrom();
  1618. }
  1619. else
  1620. {
  1621. count=(unsigned)req.getCount();
  1622. if(!count)
  1623. count=100;
  1624. if (notEmpty(req.getAfter()))
  1625. begin=atoi(req.getAfter());
  1626. else if (notEmpty(req.getBefore()))
  1627. begin=atoi(req.getBefore())-count;
  1628. if (begin < 0)
  1629. begin = 0;
  1630. pagesize = count;
  1631. }
  1632. WUSortField sortorder;
  1633. readWUQuerySortOrder(req.getSortby(), req.getDescending(), sortorder);
  1634. WUSortField filters[10];
  1635. unsigned short filterCount = 0;
  1636. MemoryBuffer filterbuf;
  1637. // Query filters should be added in order of expected power - add the most restrictive filters first
  1638. bool bDoubleCheckState = false;
  1639. if(req.getState() && *req.getState())
  1640. {
  1641. filters[filterCount++] = WUSFstate;
  1642. if (!strieq(req.getState(), "unknown"))
  1643. filterbuf.append(req.getState());
  1644. else
  1645. filterbuf.append("");
  1646. if (strieq(req.getState(), "submitted"))
  1647. bDoubleCheckState = true;
  1648. }
  1649. addWUQueryFilter(filters, filterCount, filterbuf, req.getWuid(), WUSFwildwuid);
  1650. addWUQueryFilter(filters, filterCount, filterbuf, req.getCluster(), WUSFcluster);
  1651. addWUQueryFilter(filters, filterCount, filterbuf, req.getLogicalFile(), (WUSortField) (WUSFfileread | WUSFnocase));
  1652. addWUQueryFilter(filters, filterCount, filterbuf, req.getOwner(), (WUSortField) (WUSFuser | WUSFnocase));
  1653. addWUQueryFilter(filters, filterCount, filterbuf, req.getJobname(), (WUSortField) (WUSFjob | WUSFnocase));
  1654. addWUQueryFilter(filters, filterCount, filterbuf, req.getECL(), (WUSortField) (WUSFecl | WUSFwild));
  1655. addWUQueryFilterTotalClusterTime(filters, filterCount, filterbuf, req.getTotalClusterTimeThresholdMilliSec(), WUSFtotalthortime);
  1656. addWUQueryFilterTime(filters, filterCount, filterbuf, req.getStartDate(), WUSFwuid);
  1657. addWUQueryFilterTime(filters, filterCount, filterbuf, req.getEndDate(), WUSFwuidhigh);
  1658. if (version < 1.55)
  1659. addWUQueryFilterApplication(filters, filterCount, filterbuf, req.getApplicationName(), req.getApplicationKey(), req.getApplicationData());
  1660. else
  1661. {
  1662. IArrayOf<IConstApplicationValue>& applicationFilters = req.getApplicationValues();
  1663. ForEachItemIn(i, applicationFilters)
  1664. {
  1665. IConstApplicationValue &item = applicationFilters.item(i);
  1666. addWUQueryFilterApplication(filters, filterCount, filterbuf, item.getApplication(), item.getName(), item.getValue());
  1667. }
  1668. }
  1669. filters[filterCount] = WUSFterm;
  1670. __int64 cacheHint = 0;
  1671. if (!req.getCacheHint_isNull())
  1672. cacheHint = req.getCacheHint();
  1673. Owned<IWorkUnitFactory> factory = getWorkUnitFactory(context.querySecManager(), context.queryUser());
  1674. unsigned numWUs;
  1675. PROGLOG("WUQuery: getWorkUnitsSorted");
  1676. Owned<IConstWorkUnitIterator> it = factory->getWorkUnitsSorted(sortorder, filters, filterbuf.bufferBase(), begin, pagesize+1, &cacheHint, &numWUs); // MORE - need security flags here!
  1677. if (version >= 1.41)
  1678. resp.setCacheHint(cacheHint);
  1679. PROGLOG("WUQuery: getWorkUnitsSorted done");
  1680. unsigned actualCount = 0;
  1681. ForEach(*it)
  1682. {
  1683. IConstWorkUnitInfo& cw = it->query();
  1684. if (bDoubleCheckState && (cw.getState() != WUStateSubmitted))
  1685. {
  1686. numWUs--;
  1687. continue;
  1688. }
  1689. // This test is presumably trying to remove the global workunit, though it's not the right way to do so (since it will mess up page counts etc)
  1690. const char* wuid = cw.queryWuid();
  1691. if (!looksLikeAWuid(wuid, 'W'))
  1692. {
  1693. numWUs--;
  1694. continue;
  1695. }
  1696. actualCount++;
  1697. Owned<IEspECLWorkunit> info = createECLWorkunit("","");
  1698. info->setWuid(cw.queryWuid());
  1699. if (chooseWuAccessFlagsByOwnership(context.queryUserId(), cw, accessOwn, accessOthers) < SecAccess_Read)
  1700. {
  1701. info->setState("<Hidden>");
  1702. results.append(*info.getClear());
  1703. continue;
  1704. }
  1705. info->setProtected(cw.isProtected() ? 1 : 0);
  1706. info->setJobname(cw.queryJobName());
  1707. info->setOwner(cw.queryUser());
  1708. info->setCluster(cw.queryClusterName());
  1709. SCMStringBuffer s;
  1710. // info.setSnapshot(cw->getSnapshot(s).str());
  1711. info->setStateID(cw.getState());
  1712. info->setState(cw.queryStateDesc());
  1713. unsigned totalThorTimeMS = cw.getTotalThorTime();
  1714. StringBuffer totalThorTimeStr;
  1715. formatDuration(totalThorTimeStr, totalThorTimeMS);
  1716. if (version > 1.52)
  1717. info->setTotalClusterTime(totalThorTimeStr.str());
  1718. else
  1719. info->setTotalThorTime(totalThorTimeStr.str());
  1720. //if (cw->isPausing())
  1721. // info.setIsPausing(true);
  1722. // getEventScheduleFlag(info);
  1723. WsWuDateTime dt;
  1724. cw.getTimeScheduled(dt);
  1725. if(dt.isValid())
  1726. info->setDateTimeScheduled(dt.getString(s).str());
  1727. if (version >= 1.55)
  1728. {
  1729. IArrayOf<IEspApplicationValue> av;
  1730. Owned<IConstWUAppValueIterator> app(&cw.getApplicationValues());
  1731. ForEach(*app)
  1732. {
  1733. IConstWUAppValue& val=app->query();
  1734. Owned<IEspApplicationValue> t= createApplicationValue("","");
  1735. t->setApplication(val.queryApplication());
  1736. t->setName(val.queryName());
  1737. t->setValue(val.queryValue());
  1738. av.append(*t.getLink());
  1739. }
  1740. info->setApplicationValues(av);
  1741. }
  1742. results.append(*info.getClear());
  1743. }
  1744. if (version > 1.02)
  1745. {
  1746. resp.setPageStartFrom(begin+1);
  1747. resp.setNumWUs(numWUs);
  1748. if (results.length() > (aindex_t)pagesize)
  1749. results.pop();
  1750. if(unsigned (begin + pagesize) < numWUs)
  1751. {
  1752. resp.setNextPage(begin + pagesize);
  1753. resp.setPageEndAt(begin + pagesize);
  1754. int last = begin + pagesize;
  1755. while (numWUs > (unsigned) last + pagesize)
  1756. last += pagesize;
  1757. resp.setLastPage(last);
  1758. }
  1759. else
  1760. {
  1761. resp.setNextPage(-1);
  1762. resp.setPageEndAt(numWUs);
  1763. }
  1764. if(begin > 0)
  1765. {
  1766. resp.setFirst(false);
  1767. if (begin - pagesize > 0)
  1768. resp.setPrevPage(begin - pagesize);
  1769. else
  1770. resp.setPrevPage(0);
  1771. }
  1772. resp.setPageSize(pagesize);
  1773. }
  1774. else
  1775. {
  1776. if(begin>0 && actualCount > 0)
  1777. {
  1778. char buf[10];
  1779. itoa(begin, buf, 10);
  1780. resp.setCurrent(buf);
  1781. }
  1782. if(count<actualCount)
  1783. {
  1784. char buf[10];
  1785. itoa(begin+count, buf, 10);
  1786. resp.setNext(buf);
  1787. resp.setNumWUs(numWUs);
  1788. if (results.length() > count)
  1789. results.pop();
  1790. }
  1791. if(begin == 0 && actualCount <= count)
  1792. resp.setFirst(false);
  1793. resp.setCount(count);
  1794. }
  1795. resp.setWorkunits(results);
  1796. return;
  1797. }
  1798. void doWULightWeightQueryWithSort(IEspContext &context, IEspWULightWeightQueryRequest & req, IEspWULightWeightQueryResponse & resp)
  1799. {
  1800. SecAccessFlags accessOwn;
  1801. SecAccessFlags accessOthers;
  1802. getUserWuAccessFlags(context, accessOwn, accessOthers, true);
  1803. double version = context.getClientVersion();
  1804. int pageStartFrom = 0;
  1805. int pageSize = 100;
  1806. if (!req.getPageStartFrom_isNull())
  1807. pageStartFrom = req.getPageStartFrom();
  1808. if (!req.getPageSize_isNull())
  1809. pageSize = req.getPageSize();
  1810. WUSortField sortOrder;
  1811. readWUQuerySortOrder(req.getSortBy(), req.getDescending(), sortOrder);
  1812. WUSortField filters[10];
  1813. unsigned short filterCount = 0;
  1814. MemoryBuffer filterbuf;
  1815. // Query filters should be added in order of expected power - add the most restrictive filters first
  1816. bool bDoubleCheckState = false;
  1817. if(req.getState() && *req.getState())
  1818. {
  1819. filters[filterCount++] = WUSFstate;
  1820. if (!strieq(req.getState(), "unknown"))
  1821. filterbuf.append(req.getState());
  1822. else
  1823. filterbuf.append("");
  1824. if (strieq(req.getState(), "submitted"))
  1825. bDoubleCheckState = true;
  1826. }
  1827. addWUQueryFilter(filters, filterCount, filterbuf, req.getWuid(), WUSFwildwuid);
  1828. addWUQueryFilter(filters, filterCount, filterbuf, req.getCluster(), WUSFcluster);
  1829. addWUQueryFilter(filters, filterCount, filterbuf, req.getOwner(), (WUSortField) (WUSFuser | WUSFnocase));
  1830. addWUQueryFilter(filters, filterCount, filterbuf, req.getJobName(), (WUSortField) (WUSFjob | WUSFnocase));
  1831. //StartDate example: 2015-08-26T14:26:00
  1832. addWUQueryFilterTime(filters, filterCount, filterbuf, req.getStartDate(), WUSFwuid);
  1833. addWUQueryFilterTime(filters, filterCount, filterbuf, req.getEndDate(), WUSFwuidhigh);
  1834. IArrayOf<IConstApplicationValue>& applicationFilters = req.getApplicationValues();
  1835. ForEachItemIn(i, applicationFilters)
  1836. {
  1837. IConstApplicationValue &item = applicationFilters.item(i);
  1838. addWUQueryFilterApplication(filters, filterCount, filterbuf, item.getApplication(), item.getName(), item.getValue());
  1839. }
  1840. filters[filterCount] = WUSFterm;
  1841. __int64 cacheHint = 0;
  1842. if (!req.getCacheHint_isNull())
  1843. cacheHint = req.getCacheHint();
  1844. Owned<IWorkUnitFactory> factory = getWorkUnitFactory(context.querySecManager(), context.queryUser());
  1845. unsigned numWUs;
  1846. PROGLOG("getWorkUnitsSorted(LightWeight)");
  1847. Owned<IConstWorkUnitIterator> it = factory->getWorkUnitsSorted(sortOrder, filters, filterbuf.bufferBase(), pageStartFrom, pageSize+1, &cacheHint, &numWUs); // MORE - need security flags here!
  1848. resp.setCacheHint(cacheHint);
  1849. PROGLOG("getWorkUnitsSorted(LightWeight) done");
  1850. IArrayOf<IEspECLWorkunitLW> results;
  1851. ForEach(*it)
  1852. {
  1853. IConstWorkUnitInfo& cw = it->query();
  1854. if (bDoubleCheckState && (cw.getState() != WUStateSubmitted))
  1855. {
  1856. numWUs--;
  1857. continue;
  1858. }
  1859. // This test is presumably trying to remove the global workunit, though it's not the right way to do so (since it will mess up page counts etc)
  1860. const char* wuid = cw.queryWuid();
  1861. if (!looksLikeAWuid(wuid, 'W'))
  1862. {
  1863. numWUs--;
  1864. continue;
  1865. }
  1866. Owned<IEspECLWorkunitLW> info = createECLWorkunitLW("","");
  1867. info->setWuid(cw.queryWuid());
  1868. if (chooseWuAccessFlagsByOwnership(context.queryUserId(), cw, accessOwn, accessOthers) < SecAccess_Read)
  1869. {
  1870. info->setStateDesc("<Hidden>");
  1871. results.append(*info.getClear());
  1872. continue;
  1873. }
  1874. SCMStringBuffer s;
  1875. info->setIsProtected(cw.isProtected() ? 1 : 0);
  1876. info->setJobName(cw.queryJobName());
  1877. info->setWuScope(cw.queryWuScope());
  1878. info->setOwner(cw.queryUser());
  1879. info->setClusterName(cw.queryClusterName());
  1880. info->setState(cw.getState());
  1881. info->setStateDesc(cw.queryStateDesc());
  1882. info->setAction(cw.getAction());
  1883. info->setActionDesc(cw.queryActionDesc());
  1884. info->setPriority(cw.getPriority());
  1885. info->setPriorityLevel(cw.getPriorityLevel());
  1886. info->setPriorityDesc(cw.queryPriorityDesc());
  1887. info->setTotalClusterTime(cw.getTotalThorTime());
  1888. WsWuDateTime dt;
  1889. cw.getTimeScheduled(dt);
  1890. if(dt.isValid())
  1891. info->setDateTimeScheduled(dt.getString(s).str());
  1892. IArrayOf<IEspApplicationValue> av;
  1893. Owned<IConstWUAppValueIterator> app(&cw.getApplicationValues());
  1894. ForEach(*app)
  1895. {
  1896. IConstWUAppValue& val=app->query();
  1897. Owned<IEspApplicationValue> t= createApplicationValue("","");
  1898. t->setApplication(val.queryApplication());
  1899. t->setName(val.queryName());
  1900. t->setValue(val.queryValue());
  1901. av.append(*t.getClear());
  1902. }
  1903. info->setApplicationValues(av);
  1904. results.append(*info.getClear());
  1905. }
  1906. resp.setNumWUs(numWUs);
  1907. if (results.length() > (aindex_t)pageSize)
  1908. results.pop();
  1909. resp.setWorkunits(results);
  1910. return;
  1911. }
  1912. class CArchivedWUsReader : public CInterface, implements IArchivedWUsReader
  1913. {
  1914. IEspContext& context;
  1915. unsigned pageSize;
  1916. StringAttr sashaServerIP;
  1917. unsigned sashaServerPort;
  1918. unsigned cacheMinutes;
  1919. StringBuffer filterStr;
  1920. ArchivedWuCache& archivedWuCache;
  1921. unsigned numberOfWUsReturned;
  1922. bool hasMoreWU;
  1923. void readDateFilters(const char* startDateReq, const char* endDateReq, StringBuffer& from, StringBuffer& to)
  1924. {
  1925. CDateTime timeFrom, timeTo;
  1926. if(notEmpty(endDateReq))
  1927. {//endDateReq example: 2015-08-26T14:26:00
  1928. unsigned year, month, day, hour, minute, second, nano;
  1929. timeTo.setString(endDateReq, NULL, true);
  1930. timeTo.getDate(year, month, day, true);
  1931. timeTo.getTime(hour, minute, second, nano, true);
  1932. to.setf("%4d%02d%02d-%02d%02d%02d", year, month, day, hour, minute, second);
  1933. }
  1934. if(isEmpty(startDateReq))
  1935. return;
  1936. timeFrom.setString(startDateReq, NULL, true);
  1937. if (timeFrom >= timeTo)
  1938. return;
  1939. unsigned year0, month0, day0, hour0, minute0, second0, nano0;
  1940. timeFrom.getDate(year0, month0, day0, true);
  1941. timeFrom.getTime(hour0, minute0, second0, nano0, true);
  1942. from.setf("%4d%02d%02d-%02d%02d%02d", year0, month0, day0, hour0, minute0, second0);
  1943. return;
  1944. }
  1945. bool addToFilterString(const char *name, const char *value)
  1946. {
  1947. if (isEmpty(name) || isEmpty(value))
  1948. return false;
  1949. filterStr.append(';').append(name).append("=").append(value);
  1950. return true;
  1951. }
  1952. bool addToFilterString(const char *name, unsigned value)
  1953. {
  1954. if (isEmpty(name))
  1955. return false;
  1956. filterStr.append(';').append(name).append("=").append(value);
  1957. return true;
  1958. }
  1959. void setFilterString(IEspWUQueryRequest& req)
  1960. {
  1961. filterStr.set("0");
  1962. addToFilterString("wuid", req.getWuid());
  1963. addToFilterString("cluster", req.getCluster());
  1964. addToFilterString("owner", req.getOwner());
  1965. addToFilterString("jobName", req.getJobname());
  1966. addToFilterString("state", req.getState());
  1967. addToFilterString("timeFrom", req.getStartDate());
  1968. addToFilterString("timeTo", req.getEndDate());
  1969. addToFilterString("beforeWU", req.getBeforeWU());
  1970. addToFilterString("afterWU", req.getAfterWU());
  1971. addToFilterString("descending", req.getDescending());
  1972. addToFilterString("pageSize", pageSize);
  1973. if (sashaServerIP && *sashaServerIP)
  1974. {
  1975. addToFilterString("sashaServerIP", sashaServerIP.get());
  1976. addToFilterString("sashaServerPort", sashaServerPort);
  1977. }
  1978. }
  1979. void setFilterStringLW(IEspWULightWeightQueryRequest& req)
  1980. {
  1981. filterStr.set("1");
  1982. addToFilterString("wuid", req.getWuid());
  1983. addToFilterString("cluster", req.getCluster());
  1984. addToFilterString("owner", req.getOwner());
  1985. addToFilterString("jobName", req.getJobName());
  1986. addToFilterString("state", req.getState());
  1987. addToFilterString("timeFrom", req.getStartDate());
  1988. addToFilterString("timeTo", req.getEndDate());
  1989. addToFilterString("beforeWU", req.getBeforeWU());
  1990. addToFilterString("afterWU", req.getAfterWU());
  1991. addToFilterString("descending", req.getDescending());
  1992. addToFilterString("pageSize", pageSize);
  1993. if (sashaServerIP && *sashaServerIP)
  1994. {
  1995. addToFilterString("sashaServerIP", sashaServerIP.get());
  1996. addToFilterString("sashaServerPort", sashaServerPort);
  1997. }
  1998. }
  1999. void initSashaCommand(ISashaCommand* cmd)
  2000. {
  2001. cmd->setAction(SCA_LIST);
  2002. cmd->setOutputFormat("owner,jobname,cluster,state");
  2003. cmd->setOnline(false);
  2004. cmd->setArchived(true);
  2005. cmd->setLimit(pageSize+1); //read an extra WU to check hasMoreWU
  2006. }
  2007. void setSashaCommand(IEspWUQueryRequest& req, ISashaCommand* cmd)
  2008. {
  2009. if (notEmpty(req.getWuid()))
  2010. cmd->addId(req.getWuid());
  2011. if (notEmpty(req.getCluster()))
  2012. cmd->setCluster(req.getCluster());
  2013. if (notEmpty(req.getOwner()))
  2014. cmd->setOwner(req.getOwner());
  2015. if (notEmpty(req.getJobname()))
  2016. cmd->setJobName(req.getJobname());
  2017. if (notEmpty(req.getState()))
  2018. cmd->setState(req.getState());
  2019. StringBuffer timeFrom, timeTo;
  2020. readDateFilters(req.getStartDate(), req.getEndDate(), timeFrom, timeTo);
  2021. if (timeFrom.length())
  2022. cmd->setAfter(timeFrom.str());
  2023. if (timeTo.length())
  2024. cmd->setBefore(timeTo.str());
  2025. if (notEmpty(req.getBeforeWU()))
  2026. cmd->setBeforeWU(req.getBeforeWU());
  2027. if (notEmpty(req.getAfterWU()))
  2028. cmd->setAfterWU(req.getAfterWU());
  2029. cmd->setSortDescending(req.getDescending());
  2030. return;
  2031. }
  2032. void setSashaCommandLW(IEspWULightWeightQueryRequest& req, ISashaCommand* cmd)
  2033. {
  2034. if (notEmpty(req.getWuid()))
  2035. cmd->addId(req.getWuid());
  2036. if (notEmpty(req.getCluster()))
  2037. cmd->setCluster(req.getCluster());
  2038. if (notEmpty(req.getOwner()))
  2039. cmd->setOwner(req.getOwner());
  2040. if (notEmpty(req.getJobName()))
  2041. cmd->setJobName(req.getJobName());
  2042. if (notEmpty(req.getState()))
  2043. cmd->setState(req.getState());
  2044. StringBuffer timeFrom, timeTo;
  2045. readDateFilters(req.getStartDate(), req.getEndDate(), timeFrom, timeTo);
  2046. if (timeFrom.length())
  2047. cmd->setAfter(timeFrom.str());
  2048. if (timeTo.length())
  2049. cmd->setBefore(timeTo.str());
  2050. if (notEmpty(req.getBeforeWU()))
  2051. cmd->setBeforeWU(req.getBeforeWU());
  2052. if (notEmpty(req.getAfterWU()))
  2053. cmd->setAfterWU(req.getAfterWU());
  2054. cmd->setSortDescending(req.getDescending());
  2055. return;
  2056. }
  2057. IEspECLWorkunit *createArchivedWUEntry(StringArray& wuDataArray, bool canAccess)
  2058. {
  2059. Owned<IEspECLWorkunit> info= createECLWorkunit();
  2060. info->setWuid(wuDataArray.item(0));
  2061. if (!canAccess)
  2062. {
  2063. info->setState("<Hidden>");
  2064. return info.getClear();
  2065. }
  2066. const char* owner = wuDataArray.item(1);
  2067. const char* jobName = wuDataArray.item(2);
  2068. const char* cluster = wuDataArray.item(3);
  2069. const char* state = wuDataArray.item(4);
  2070. if (notEmpty(owner))
  2071. info->setOwner(owner);
  2072. if (notEmpty(jobName))
  2073. info->setJobname(jobName);
  2074. if (notEmpty(cluster))
  2075. info->setCluster(cluster);
  2076. if (notEmpty(state))
  2077. info->setState(state);
  2078. return info.getClear();
  2079. }
  2080. IEspECLWorkunitLW *createArchivedLWWUEntry(StringArray& wuDataArray, bool canAccess)
  2081. {
  2082. Owned<IEspECLWorkunitLW> info= createECLWorkunitLW();
  2083. info->setWuid(wuDataArray.item(0));
  2084. if (!canAccess)
  2085. {
  2086. info->setStateDesc("<Hidden>");
  2087. return info.getClear();
  2088. }
  2089. const char* owner = wuDataArray.item(1);
  2090. const char* jobName = wuDataArray.item(2);
  2091. const char* cluster = wuDataArray.item(3);
  2092. const char* state = wuDataArray.item(4);
  2093. if (notEmpty(owner))
  2094. info->setOwner(owner);
  2095. if (notEmpty(jobName))
  2096. info->setJobName(jobName);
  2097. if (notEmpty(cluster))
  2098. info->setClusterName(cluster);
  2099. if (notEmpty(state))
  2100. info->setStateDesc(state);
  2101. return info.getClear();
  2102. }
  2103. static int compareWuids(IInterface * const *_a, IInterface * const *_b)
  2104. {
  2105. IEspECLWorkunit *a = *(IEspECLWorkunit **)_a;
  2106. IEspECLWorkunit *b = *(IEspECLWorkunit **)_b;
  2107. return strcmp(b->getWuid(), a->getWuid());
  2108. }
  2109. static int compareLWWuids(IInterface * const *_a, IInterface * const *_b)
  2110. {
  2111. IEspECLWorkunitLW *a = *(IEspECLWorkunitLW **)_a;
  2112. IEspECLWorkunitLW *b = *(IEspECLWorkunitLW **)_b;
  2113. return strcmp(b->getWuid(), a->getWuid());
  2114. }
  2115. public:
  2116. IMPLEMENT_IINTERFACE_USING(CInterface);
  2117. CArchivedWUsReader(IEspContext& _context, const char* _sashaServerIP, unsigned _sashaServerPort, ArchivedWuCache& _archivedWuCache,
  2118. unsigned _cacheMinutes, unsigned _pageSize)
  2119. : context(_context), sashaServerIP(_sashaServerIP), sashaServerPort(_sashaServerPort),
  2120. archivedWuCache(_archivedWuCache), cacheMinutes(_cacheMinutes), pageSize(_pageSize)
  2121. {
  2122. hasMoreWU = false;
  2123. numberOfWUsReturned = 0;
  2124. }
  2125. void getArchivedWUs(bool lightWeight, IEspWUQueryRequest& req, IEspWULightWeightQueryRequest& reqLW, IArrayOf<IEspECLWorkunit>& archivedWUs, IArrayOf<IEspECLWorkunitLW>& archivedLWWUs)
  2126. {
  2127. if (!lightWeight)
  2128. setFilterString(req);
  2129. else
  2130. setFilterStringLW(reqLW);
  2131. Owned<ArchivedWuCacheElement> cachedResults = archivedWuCache.lookup(context, filterStr, "AddWhenAvailable", cacheMinutes);
  2132. if (cachedResults)
  2133. {
  2134. hasMoreWU = cachedResults->m_hasNextPage;
  2135. numberOfWUsReturned = cachedResults->numWUsReturned;
  2136. if (!lightWeight && cachedResults->m_results.length())
  2137. {
  2138. ForEachItemIn(i, cachedResults->m_results)
  2139. archivedWUs.append(*LINK(&cachedResults->m_results.item(i)));
  2140. }
  2141. if (lightWeight && cachedResults->resultsLW.length())
  2142. {
  2143. ForEachItemIn(i, cachedResults->resultsLW)
  2144. archivedLWWUs.append(*LINK(&cachedResults->resultsLW.item(i)));
  2145. }
  2146. return;
  2147. }
  2148. SocketEndpoint ep;
  2149. if (sashaServerIP && *sashaServerIP)
  2150. ep.set(sashaServerIP, sashaServerPort);
  2151. else
  2152. getSashaNode(ep);
  2153. Owned<INode> sashaserver = createINode(ep);
  2154. Owned<ISashaCommand> cmd = createSashaCommand();
  2155. initSashaCommand(cmd);
  2156. if (!lightWeight)
  2157. setSashaCommand(req, cmd);
  2158. else
  2159. setSashaCommandLW(reqLW, cmd);
  2160. if (!cmd->send(sashaserver))
  2161. {
  2162. StringBuffer url;
  2163. throw MakeStringException(ECLWATCH_CANNOT_CONNECT_ARCHIVE_SERVER,
  2164. "Sasha (%s) took too long to respond from: Get archived workUnits.",
  2165. ep.getUrlStr(url).str());
  2166. }
  2167. numberOfWUsReturned = cmd->numIds();
  2168. hasMoreWU = (numberOfWUsReturned > pageSize);
  2169. if (hasMoreWU)
  2170. numberOfWUsReturned--;
  2171. if (numberOfWUsReturned == 0)
  2172. return;
  2173. SecAccessFlags accessOwn, accessOthers;
  2174. getUserWuAccessFlags(context, accessOwn, accessOthers, true);
  2175. for (unsigned i=0; i<numberOfWUsReturned; i++)
  2176. {
  2177. const char *csline = cmd->queryId(i);
  2178. if (!csline || !*csline)
  2179. continue;
  2180. StringArray wuDataArray;
  2181. wuDataArray.appendList(csline, ",");
  2182. const char* wuid = wuDataArray.item(0);
  2183. if (isEmpty(wuid))
  2184. {
  2185. IWARNLOG("Empty WUID in SCA_LIST response"); // JCS->KW - have u ever seen this happen?
  2186. continue;
  2187. }
  2188. const char* owner = wuDataArray.item(1);
  2189. bool canAccess = chooseWuAccessFlagsByOwnership(context.queryUserId(), owner, accessOwn, accessOthers) >= SecAccess_Read;
  2190. if (!lightWeight)
  2191. {
  2192. Owned<IEspECLWorkunit> info = createArchivedWUEntry(wuDataArray, canAccess);
  2193. archivedWUs.append(*info.getClear());
  2194. }
  2195. else
  2196. {
  2197. Owned<IEspECLWorkunitLW> info = createArchivedLWWUEntry(wuDataArray, canAccess);
  2198. archivedLWWUs.append(*info.getClear());
  2199. }
  2200. }
  2201. archivedWuCache.add(filterStr, "AddWhenAvailable", hasMoreWU, numberOfWUsReturned, archivedWUs, archivedLWWUs);
  2202. return;
  2203. };
  2204. bool getHasMoreWU() { return hasMoreWU; };
  2205. unsigned getNumberOfWUsReturned() { return numberOfWUsReturned; };
  2206. };
  2207. void doWUQueryFromArchive(IEspContext &context, const char* sashaServerIP, unsigned sashaServerPort,
  2208. ArchivedWuCache &archivedWuCache, unsigned cacheMinutes, IEspWUQueryRequest & req, IEspWUQueryResponse & resp)
  2209. {
  2210. //Sasha server does noy support the PageStartFrom due to inefficient access to archived workunits for pages>1.
  2211. unsigned pageSize = (unsigned) req.getPageSize();
  2212. if(pageSize < 1)
  2213. pageSize=500;
  2214. Owned<IArchivedWUsReader> archiveWUsReader = new CArchivedWUsReader(context, sashaServerIP, sashaServerPort, archivedWuCache,
  2215. cacheMinutes, pageSize);
  2216. IArrayOf<IEspECLWorkunit> archivedWUs;
  2217. IArrayOf<IEspECLWorkunitLW> dummyWUs;
  2218. Owned<CWULightWeightQueryRequest> dummyReq = new CWULightWeightQueryRequest("WsWorkunits");
  2219. PROGLOG("getWorkUnitsFromArchive");
  2220. archiveWUsReader->getArchivedWUs(false, req, *dummyReq, archivedWUs, dummyWUs);
  2221. PROGLOG("getWorkUnitsFromArchive done");
  2222. resp.setWorkunits(archivedWUs);
  2223. resp.setNumWUs(archiveWUsReader->getNumberOfWUsReturned());
  2224. resp.setType("archived only");
  2225. resp.setPageSize(pageSize);
  2226. return;
  2227. }
  2228. void doWULightWeightQueryFromArchive(IEspContext &context, const char* sashaServerIP, unsigned sashaServerPort,
  2229. ArchivedWuCache &archivedWuCache, unsigned cacheMinutes, IEspWULightWeightQueryRequest & req, IEspWULightWeightQueryResponse & resp)
  2230. {
  2231. int pageSize = req.getPageSize_isNull()? 500 : req.getPageSize();
  2232. Owned<IArchivedWUsReader> archiveWUsReader = new CArchivedWUsReader(context, sashaServerIP, sashaServerPort, archivedWuCache,
  2233. cacheMinutes, pageSize);
  2234. Owned<CWUQueryRequest> dummyReq = new CWUQueryRequest("WsWorkunits");
  2235. IArrayOf<IEspECLWorkunit> dummyWUs;
  2236. IArrayOf<IEspECLWorkunitLW> archivedWUs;
  2237. PROGLOG("getWorkUnitsFromArchive(LightWeight)");
  2238. archiveWUsReader->getArchivedWUs(true, *dummyReq, req, dummyWUs, archivedWUs);
  2239. PROGLOG("getWorkUnitsFromArchive(LightWeight) done");
  2240. resp.setWorkunits(archivedWUs);
  2241. resp.setNumWUs(archiveWUsReader->getNumberOfWUsReturned());
  2242. return;
  2243. }
  2244. bool CWsWorkunitsEx::onWUQuery(IEspContext &context, IEspWUQueryRequest & req, IEspWUQueryResponse & resp)
  2245. {
  2246. try
  2247. {
  2248. StringBuffer wuidStr(req.getWuid());
  2249. const char* wuid = wuidStr.trim().str();
  2250. if (req.getType() && strieq(req.getType(), "archived workunits"))
  2251. doWUQueryFromArchive(context, sashaServerIp.get(), sashaServerPort, *archivedWuCache, awusCacheMinutes, req, resp);
  2252. else if(notEmpty(wuid) && looksLikeAWuid(wuid, 'W'))
  2253. doWUQueryBySingleWuid(context, wuid, resp);
  2254. else if (notEmpty(req.getLogicalFile()) && req.getLogicalFileSearchType() && strieq(req.getLogicalFileSearchType(), "Created"))
  2255. doWUQueryByFile(context, req.getLogicalFile(), resp);
  2256. else
  2257. doWUQueryWithSort(context, req, resp);
  2258. resp.setState(req.getState());
  2259. resp.setCluster(req.getCluster());
  2260. resp.setRoxieCluster(req.getRoxieCluster());
  2261. resp.setOwner(req.getOwner());
  2262. resp.setStartDate(req.getStartDate());
  2263. resp.setEndDate(req.getEndDate());
  2264. double version = context.getClientVersion();
  2265. StringBuffer basicQuery;
  2266. addToQueryString(basicQuery, "State", req.getState());
  2267. addToQueryString(basicQuery, "Cluster", req.getCluster());
  2268. addToQueryString(basicQuery, "Owner", req.getOwner());
  2269. addToQueryString(basicQuery, "StartDate", req.getStartDate());
  2270. addToQueryString(basicQuery, "EndDate", req.getEndDate());
  2271. if (version >= 1.26 && version < 1.72 && req.getLastNDays() > -1)
  2272. addToQueryString(basicQuery, "LastNDays", StringBuffer().append(req.getLastNDays()).str());
  2273. addToQueryString(basicQuery, "ECL", req.getECL());
  2274. addToQueryString(basicQuery, "Jobname", req.getJobname());
  2275. addToQueryString(basicQuery, "Type", req.getType());
  2276. if (addToQueryString(basicQuery, "LogicalFile", req.getLogicalFile()))
  2277. addToQueryString(basicQuery, "LogicalFileSearchType", req.getLogicalFileSearchType());
  2278. resp.setFilters(basicQuery.str());
  2279. if (notEmpty(req.getSortby()) && !strstr(basicQuery.str(), StringBuffer(req.getSortby()).append('=').str()))
  2280. {
  2281. resp.setSortby(req.getSortby());
  2282. addToQueryString(basicQuery, "Sortby", req.getSortby());
  2283. if (req.getDescending())
  2284. {
  2285. resp.setDescending(req.getDescending());
  2286. addToQueryString(basicQuery, "Descending", "1");
  2287. }
  2288. }
  2289. resp.setBasicQuery(basicQuery.str());
  2290. StringBuffer s;
  2291. if(notEmpty(req.getECL()))
  2292. resp.setECL(Utils::url_encode(req.getECL(), s).str());
  2293. if(notEmpty(req.getJobname()))
  2294. resp.setJobname(Utils::url_encode(req.getJobname(), s.clear()).str());
  2295. }
  2296. catch(IException* e)
  2297. {
  2298. FORWARDEXCEPTION(context, e, ECLWATCH_INTERNAL_ERROR);
  2299. }
  2300. return true;
  2301. }
  2302. bool CWsWorkunitsEx::onWULightWeightQuery(IEspContext &context, IEspWULightWeightQueryRequest & req, IEspWULightWeightQueryResponse & resp)
  2303. {
  2304. try
  2305. {
  2306. if (req.getType() && strieq(req.getType(), "archived workunits"))
  2307. doWULightWeightQueryFromArchive(context, sashaServerIp.get(), sashaServerPort, *archivedWuCache, awusCacheMinutes, req, resp);
  2308. else
  2309. doWULightWeightQueryWithSort(context, req, resp);
  2310. }
  2311. catch(IException* e)
  2312. {
  2313. FORWARDEXCEPTION(context, e, ECLWATCH_INTERNAL_ERROR);
  2314. }
  2315. return true;
  2316. }
  2317. ITypeInfo * containsSingleSimpleFieldBlankXPath(IResultSetMetaData * meta)
  2318. {
  2319. if (meta->getColumnCount() != 1)
  2320. return NULL;
  2321. CResultSetMetaData * castMeta = static_cast<CResultSetMetaData *>(meta);
  2322. const char * xpath = castMeta->queryXPath(0);
  2323. if (xpath && (*xpath == 0))
  2324. {
  2325. return castMeta->queryType(0);
  2326. }
  2327. return NULL;
  2328. }
  2329. void csvSplitXPath(const char *xpath, StringBuffer &s, const char *&name, const char **childname=NULL)
  2330. {
  2331. if (!xpath)
  2332. return;
  2333. const char * slash = strchr(xpath, '/');
  2334. if (!slash)
  2335. {
  2336. name = xpath;
  2337. if (childname)
  2338. *childname = NULL;
  2339. }
  2340. else
  2341. {
  2342. if (!childname || strchr(slash+1, '/')) //output ignores xpaths that are too deep
  2343. return;
  2344. name = s.clear().append(slash-xpath, xpath).str();
  2345. *childname = slash+1;
  2346. }
  2347. }
  2348. void getCSVHeaders(const IResultSetMetaData& metaIn, CommonCSVWriter* writer, unsigned& layer)
  2349. {
  2350. StringBuffer xname;
  2351. const CResultSetMetaData& cMeta = static_cast<const CResultSetMetaData &>(metaIn);
  2352. IFvDataSourceMetaData* meta = cMeta.getMeta();
  2353. int columnCount = metaIn.getColumnCount();
  2354. for (unsigned idx = 0; idx < columnCount; idx++)
  2355. {
  2356. const CResultSetColumnInfo& column = cMeta.getColumn(idx);
  2357. unsigned flag = column.flag;
  2358. const char * name = meta->queryName(idx);
  2359. const char * childname = NULL;
  2360. switch (flag)
  2361. {
  2362. case FVFFbeginif:
  2363. case FVFFendif:
  2364. break;
  2365. case FVFFbeginrecord:
  2366. csvSplitXPath(meta->queryXPath(idx), xname, name);
  2367. writer->outputBeginNested(name, false, true);
  2368. break;
  2369. case FVFFendrecord:
  2370. csvSplitXPath(meta->queryXPath(idx), xname, name);
  2371. writer->outputEndNested(name, true);
  2372. break;
  2373. case FVFFdataset:
  2374. {
  2375. childname = "Row";
  2376. csvSplitXPath(meta->queryXPath(idx), xname, name, &childname);
  2377. ITypeInfo* singleFieldType = (name && *name && childname && *childname)
  2378. ? containsSingleSimpleFieldBlankXPath(column.childMeta.get()) : NULL;
  2379. if (!singleFieldType)
  2380. {
  2381. bool nameValid = (name && *name);
  2382. if (nameValid || (childname && *childname))
  2383. {
  2384. if (nameValid)
  2385. writer->outputBeginNested(name, false, true);
  2386. if (childname && *childname)
  2387. writer->outputBeginNested(childname, false, !nameValid);
  2388. const CResultSetMetaData *childMeta = static_cast<const CResultSetMetaData *>(column.childMeta.get());
  2389. getCSVHeaders(*childMeta, writer, ++layer);
  2390. layer--;
  2391. if (childname && *childname)
  2392. writer->outputEndNested(childname, !nameValid);
  2393. if (nameValid)
  2394. writer->outputEndNested(name, true);
  2395. }
  2396. }
  2397. break;
  2398. }
  2399. case FVFFblob: //for now FileViewer will output the string "[blob]"
  2400. {
  2401. Owned<ITypeInfo> stringType = makeStringType(UNKNOWN_LENGTH, NULL, NULL);
  2402. csvSplitXPath(meta->queryXPath(idx), xname, name);
  2403. StringBuffer eclTypeName;
  2404. stringType->getECLType(eclTypeName);
  2405. writer->outputCSVHeader(name, eclTypeName.str());
  2406. }
  2407. break;
  2408. default:
  2409. {
  2410. ITypeInfo & type = *column.type;
  2411. if (type.getTypeCode() == type_set)
  2412. {
  2413. childname = "Item";
  2414. csvSplitXPath(meta->queryXPath(idx), xname, name, &childname);
  2415. writer->outputBeginNested(name, true, true);
  2416. writer->outputEndNested(name, true);
  2417. }
  2418. else
  2419. {
  2420. csvSplitXPath(meta->queryXPath(idx), xname, name);
  2421. StringBuffer eclTypeName;
  2422. type.getECLType(eclTypeName);
  2423. writer->outputCSVHeader(name, eclTypeName.str());
  2424. }
  2425. break;
  2426. }
  2427. }
  2428. }
  2429. }
  2430. unsigned getResultCSV(IStringVal& ret, INewResultSet* result, const char* name, __int64 start, unsigned& count)
  2431. {
  2432. unsigned headerLayer = 0;
  2433. CSVOptions csvOptions;
  2434. csvOptions.delimiter.set(",");
  2435. csvOptions.terminator.set("\n");
  2436. csvOptions.includeHeader = true;
  2437. Owned<CommonCSVWriter> writer = new CommonCSVWriter(XWFtrim, csvOptions);
  2438. const IResultSetMetaData & meta = result->getMetaData();
  2439. getCSVHeaders(meta, writer, headerLayer);
  2440. writer->finishCSVHeaders();
  2441. Owned<IResultSetCursor> cursor = result->createCursor();
  2442. count = writeResultCursorXml(*writer, cursor, name, start, count, NULL);
  2443. ret.set(writer->str());
  2444. return count;
  2445. }
  2446. void appendResultSet(MemoryBuffer& mb, INewResultSet* result, const char *name, __int64 start, unsigned& count, __int64& total, bool bin, bool xsd, ESPSerializationFormat fmt, const IProperties *xmlns)
  2447. {
  2448. if (!result)
  2449. return;
  2450. total=result->getNumRows();
  2451. if(bin)
  2452. count = getResultBin(mb, result, (unsigned)start, count);
  2453. else
  2454. {
  2455. struct MemoryBuffer2IStringVal : public CInterface, implements IStringVal
  2456. {
  2457. MemoryBuffer2IStringVal(MemoryBuffer & _buffer) : buffer(_buffer) {}
  2458. IMPLEMENT_IINTERFACE;
  2459. virtual const char * str() const { UNIMPLEMENTED; }
  2460. virtual void set(const char *val) { buffer.append(strlen(val),val); }
  2461. virtual void clear() { } // support appending only
  2462. virtual void setLen(const char *val, unsigned length) { buffer.append(length, val); }
  2463. virtual unsigned length() const { return buffer.length(); };
  2464. MemoryBuffer & buffer;
  2465. } adaptor(mb);
  2466. if (fmt==ESPSerializationCSV)
  2467. count = getResultCSV(adaptor, result, name, (unsigned) start, count);
  2468. else if (fmt==ESPSerializationJSON)
  2469. count = getResultJSON(adaptor, result, name, (unsigned) start, count, (xsd) ? "myschema" : NULL);
  2470. else
  2471. count = getResultXml(adaptor, result, name, (unsigned) start, count, (xsd) ? "myschema" : NULL, xmlns);
  2472. }
  2473. }
  2474. INewResultSet* createFilteredResultSet(INewResultSet* result, IArrayOf<IConstNamedValue>* filterBy)
  2475. {
  2476. if (!result || !filterBy || !filterBy->length())
  2477. return NULL;
  2478. Owned<IFilteredResultSet> filter = result->createFiltered();
  2479. const IResultSetMetaData &meta = result->getMetaData();
  2480. unsigned columnCount = meta.getColumnCount();
  2481. ForEachItemIn(i, *filterBy)
  2482. {
  2483. IConstNamedValue &item = filterBy->item(i);
  2484. const char *name = item.getName();
  2485. const char *value = item.getValue();
  2486. if (!name || !*name || !value || !*value)
  2487. continue;
  2488. for(unsigned col = 0; col < columnCount; col++)
  2489. {
  2490. SCMStringBuffer scmbuf;
  2491. meta.getColumnLabel(scmbuf, col);
  2492. if (strieq(scmbuf.str(), name))
  2493. {
  2494. filter->addFilter(col, value);
  2495. break;
  2496. }
  2497. }
  2498. }
  2499. return filter->create();
  2500. }
  2501. void CWsWorkunitsEx::getWsWuResult(IEspContext &context, const char *wuid, const char *name, const char *logical, unsigned index, __int64 start,
  2502. unsigned &count, __int64 &total, IStringVal &resname, bool bin, IArrayOf<IConstNamedValue> *filterBy, MemoryBuffer &mb,
  2503. WUState &wuState, bool xsd)
  2504. {
  2505. Owned<IWorkUnitFactory> factory = getWorkUnitFactory(context.querySecManager(), context.queryUser());
  2506. Owned<IConstWorkUnit> cw = factory->openWorkUnit(wuid);
  2507. if(!cw)
  2508. throw MakeStringException(ECLWATCH_CANNOT_OPEN_WORKUNIT,"Cannot open workunit %s.",wuid);
  2509. Owned<IConstWUResult> result;
  2510. if (notEmpty(name))
  2511. result.setown(cw->getResultByName(name));
  2512. else if (notEmpty(logical))
  2513. {
  2514. Owned<IConstWUResultIterator> it = &cw->getResults();
  2515. ForEach(*it)
  2516. {
  2517. IConstWUResult &r = it->query();
  2518. SCMStringBuffer filename;
  2519. if(strieq(r.getResultLogicalName(filename).str(), logical))
  2520. {
  2521. result.setown(LINK(&r));
  2522. break;
  2523. }
  2524. }
  2525. }
  2526. else
  2527. result.setown(cw->getResultBySequence(index));
  2528. if (!result)
  2529. throw MakeStringException(ECLWATCH_CANNOT_GET_WU_RESULT,"Cannot open the workunit result.");
  2530. if (result->getResultRawSize(nullptr, nullptr) > wuResultMaxSize)
  2531. throw makeStringExceptionV(ECLWATCH_INVALID_ACTION, "Failed to get the result for %s. The size is bigger than %lld.",
  2532. wuid, wuResultMaxSize);
  2533. if (!resname.length())
  2534. result->getResultName(resname);
  2535. Owned<IResultSetFactory> resultSetFactory = getSecResultSetFactory(context.querySecManager(), context.queryUser(), context.queryUserId(), context.queryPassword());
  2536. SCMStringBuffer logicalName;
  2537. result->getResultLogicalName(logicalName);
  2538. Owned<INewResultSet> rs;
  2539. if (logicalName.length())
  2540. {
  2541. rs.setown(resultSetFactory->createNewFileResultSet(logicalName.str(), cw->queryClusterName())); //MORE is this wrong cluster?
  2542. }
  2543. else
  2544. rs.setown(resultSetFactory->createNewResultSet(result, wuid));
  2545. if (!filterBy || !filterBy->length())
  2546. appendResultSet(mb, rs, name, start, count, total, bin, xsd, context.getResponseFormat(), result->queryResultXmlns());
  2547. else
  2548. {
  2549. Owned<INewResultSet> filteredResult = createFilteredResultSet(rs, filterBy);
  2550. appendResultSet(mb, filteredResult, name, start, count, total, bin, xsd, context.getResponseFormat(), result->queryResultXmlns());
  2551. }
  2552. wuState = cw->getState();
  2553. }
  2554. void checkFileSizeLimit(unsigned long xmlSize, unsigned long sizeLimit)
  2555. {
  2556. if ((sizeLimit > 0) && (xmlSize > sizeLimit))
  2557. throw MakeStringException(ECLWATCH_CANNOT_OPEN_FILE,
  2558. "The file size (%ld bytes) exceeds the size limit (%ld bytes). You may set 'Option > 1' or use 'Download_XML' link to get compressed file.",
  2559. xmlSize, sizeLimit);
  2560. }
  2561. void openSaveFile(IEspContext &context, int opt, __int64 sizeLimit, const char* filename, const char* origMimeType, MemoryBuffer& buf, IEspWULogFileResponse &resp)
  2562. {
  2563. if (opt < 1)
  2564. {
  2565. checkFileSizeLimit(buf.length(), sizeLimit);
  2566. resp.setThefile(buf);
  2567. resp.setThefile_mimetype(origMimeType);
  2568. }
  2569. else if (opt < 2)
  2570. {
  2571. checkFileSizeLimit(buf.length(), sizeLimit);
  2572. StringBuffer headerStr("attachment;");
  2573. if (filename && *filename)
  2574. {
  2575. const char* pFileName = strrchr(filename, PATHSEPCHAR);
  2576. if (pFileName)
  2577. headerStr.appendf("filename=%s", pFileName+1);
  2578. else
  2579. headerStr.appendf("filename=%s", filename);
  2580. }
  2581. MemoryBuffer buf0;
  2582. unsigned i = 0;
  2583. char* p = (char*) buf.toByteArray();
  2584. while (i < buf.length())
  2585. {
  2586. if (p[0] != 10)
  2587. buf0.append(p[0]);
  2588. else
  2589. buf0.append(0x0d);
  2590. p++;
  2591. i++;
  2592. }
  2593. resp.setThefile(buf);
  2594. resp.setThefile_mimetype(origMimeType);
  2595. context.addCustomerHeader("Content-disposition", headerStr.str());
  2596. }
  2597. else
  2598. {
  2599. #ifndef _USE_ZLIB
  2600. throw MakeStringException(ECLWATCH_CANNOT_COMPRESS_DATA,"The data cannot be compressed.");
  2601. #else
  2602. StringBuffer fileNameStr, headerStr("attachment;");
  2603. if (notEmpty(filename))
  2604. {
  2605. fileNameStr.append(filename);
  2606. headerStr.append("filename=").append(filename).append((opt>2) ? ".gz" : ".zip");
  2607. }
  2608. else
  2609. fileNameStr.append("file");
  2610. StringBuffer ifname;
  2611. ifname.appendf("%s%sT%xAT%x", TEMPZIPDIR, PATHSEPSTR, (unsigned)(memsize_t)GetCurrentThreadId(), msTick()).append((opt>2)? "" : ".zip");
  2612. IZZIPor* Zipor = createZZIPor();
  2613. int ret = 0;
  2614. if (opt > 2)
  2615. ret = Zipor->gzipToFile(buf.length(), (void*)buf.toByteArray(), ifname.str());
  2616. else
  2617. ret = Zipor->zipToFile(buf.length(), (void*)buf.toByteArray(), fileNameStr.str(), ifname.str());
  2618. releaseIZ(Zipor);
  2619. if (ret < 0)
  2620. {
  2621. Owned<IFile> rFile = createIFile(ifname.str());
  2622. if (rFile->exists())
  2623. rFile->remove();
  2624. throw MakeStringException(ECLWATCH_CANNOT_COMPRESS_DATA,"The data cannot be compressed.");
  2625. }
  2626. Owned <IFile> rf = createIFile(ifname.str());
  2627. if (!rf->exists())
  2628. throw MakeStringException(ECLWATCH_CANNOT_COMPRESS_DATA,"The data cannot be compressed.");
  2629. MemoryBuffer out;
  2630. Owned <IFileIO> fio = rf->open(IFOread);
  2631. read(fio, 0, (size32_t) rf->size(), out);
  2632. resp.setThefile(out);
  2633. fio.clear();
  2634. rf->remove();
  2635. resp.setThefile_mimetype((opt > 2) ? "application/x-gzip" : "application/zip");
  2636. context.addCustomerHeader("Content-disposition", headerStr.str());
  2637. #endif
  2638. }
  2639. }
  2640. bool CWsWorkunitsEx::onWUFile(IEspContext &context,IEspWULogFileRequest &req, IEspWULogFileResponse &resp)
  2641. {
  2642. try
  2643. {
  2644. StringBuffer wuidStr(req.getWuid());
  2645. const char* wuidIn = wuidStr.trim().str();
  2646. if (wuidIn && *wuidIn)
  2647. {
  2648. if (!looksLikeAWuid(wuidIn, 'W'))
  2649. throw MakeStringException(ECLWATCH_INVALID_INPUT, "Invalid Workunit ID");
  2650. ensureWsWorkunitAccess(context, wuidIn, SecAccess_Read);
  2651. }
  2652. StringAttr wuid(wuidIn);
  2653. if (wuid.isEmpty() && notEmpty(req.getQuerySet()) && notEmpty(req.getQuery()))
  2654. {
  2655. Owned<IPropertyTree> registry = getQueryRegistry(req.getQuerySet(), false);
  2656. if (!registry)
  2657. throw MakeStringException(ECLWATCH_QUERYSET_NOT_FOUND, "Queryset %s not found", req.getQuerySet());
  2658. Owned<IPropertyTree> query = resolveQueryAlias(registry, req.getQuery());
  2659. if (!query)
  2660. throw MakeStringException(ECLWATCH_QUERYID_NOT_FOUND, "Query %s not found", req.getQuery());
  2661. resp.setQuerySet(req.getQuerySet());
  2662. resp.setQueryName(query->queryProp("@name"));
  2663. resp.setQueryId(query->queryProp("@id"));
  2664. wuid.set(query->queryProp("@wuid"));
  2665. }
  2666. int opt = req.getOption();
  2667. if (!wuid.isEmpty())
  2668. {
  2669. VStringBuffer logMsg("WUFile: %s", wuid.str());
  2670. if (notEmpty(req.getType()))
  2671. logMsg.append(", ").append(req.getType());
  2672. if (opt > 1)
  2673. logMsg.append(", download gzip");
  2674. else if (opt > 0)
  2675. logMsg.append(", download");
  2676. PROGLOG("%s", logMsg.str());
  2677. resp.setWuid(wuid.get());
  2678. MemoryBuffer mb;
  2679. WsWuInfo winfo(context, wuid);
  2680. if (strieq(File_ArchiveQuery, req.getType()))
  2681. {
  2682. winfo.getWorkunitArchiveQuery(mb);
  2683. openSaveFile(context, opt, req.getSizeLimit(), "ArchiveQuery.xml", HTTP_TYPE_APPLICATION_XML, mb, resp);
  2684. }
  2685. else if ((strieq(File_Cpp,req.getType()) || strieq(File_Log,req.getType())) && notEmpty(req.getName()))
  2686. {
  2687. winfo.getWorkunitCpp(req.getName(), req.getDescription(), req.getIPAddress(),mb, opt > 0, nullptr);
  2688. openSaveFile(context, opt, req.getSizeLimit(), req.getName(), HTTP_TYPE_TEXT_PLAIN, mb, resp);
  2689. }
  2690. else if (strieq(File_DLL,req.getType()))
  2691. {
  2692. StringBuffer name;
  2693. winfo.getWorkunitDll(name, mb);
  2694. resp.setFileName(name.str());
  2695. resp.setDaliServer(daliServers.get());
  2696. openSaveFile(context, opt, req.getSizeLimit(), req.getName(), HTTP_TYPE_OCTET_STREAM, mb, resp);
  2697. }
  2698. else if (strieq(File_Res,req.getType()))
  2699. {
  2700. winfo.getWorkunitResTxt(mb);
  2701. openSaveFile(context, opt, req.getSizeLimit(), "res.txt", HTTP_TYPE_TEXT_PLAIN, mb, resp);
  2702. }
  2703. else if (strncmp(req.getType(), File_ThorLog, 7) == 0)
  2704. {
  2705. winfo.getWorkunitThorLog(req.getName(), mb, nullptr);
  2706. openSaveFile(context, opt, req.getSizeLimit(), "thormaster.log", HTTP_TYPE_TEXT_PLAIN, mb, resp);
  2707. }
  2708. else if (strieq(File_ThorSlaveLog,req.getType()))
  2709. {
  2710. winfo.getWorkunitThorSlaveLog(directories, req.getProcess(), req.getClusterGroup(), req.getIPAddress(),
  2711. req.getLogDate(), req.getSlaveNumber(), mb, nullptr, false);
  2712. openSaveFile(context, opt, req.getSizeLimit(), "ThorSlave.log", HTTP_TYPE_TEXT_PLAIN, mb, resp);
  2713. }
  2714. else if (strieq(File_EclAgentLog,req.getType()))
  2715. {
  2716. winfo.getWorkunitEclAgentLog(req.getName(), req.getProcess(), mb, nullptr);
  2717. openSaveFile(context, opt, req.getSizeLimit(), "eclagent.log", HTTP_TYPE_TEXT_PLAIN, mb, resp);
  2718. }
  2719. else if (strieq(File_XML,req.getType()) && notEmpty(req.getName()))
  2720. {
  2721. const char* name = req.getName();
  2722. const char* ptr = strrchr(name, '/');
  2723. if (ptr)
  2724. ptr++;
  2725. else
  2726. ptr = name;
  2727. winfo.getWorkunitAssociatedXml(name, req.getIPAddress(), req.getPlainText(), req.getDescription(), opt > 0, true, mb, nullptr);
  2728. openSaveFile(context, opt, req.getSizeLimit(), ptr, HTTP_TYPE_APPLICATION_XML, mb, resp);
  2729. }
  2730. else if (strieq(File_XML,req.getType()) || strieq(File_WUECL,req.getType()))
  2731. {
  2732. StringBuffer mimeType, fileName;
  2733. if (strieq(File_WUECL,req.getType()))
  2734. {
  2735. fileName.setf("%s.ecl", wuid.get());
  2736. winfo.getWorkunitQueryShortText(mb, nullptr);
  2737. mimeType.set(HTTP_TYPE_TEXT_PLAIN);
  2738. }
  2739. else
  2740. {
  2741. fileName.setf("%s.xml", wuid.get());
  2742. winfo.getWorkunitXml(req.getPlainText(), mb);
  2743. if (opt < 2)
  2744. {
  2745. const char* plainText = req.getPlainText();
  2746. if (plainText && (!stricmp(plainText, "yes")))
  2747. mimeType.set(HTTP_TYPE_TEXT_PLAIN);
  2748. else
  2749. mimeType.set(HTTP_TYPE_APPLICATION_XML);
  2750. }
  2751. else
  2752. {
  2753. mimeType.set(HTTP_TYPE_APPLICATION_XML);
  2754. }
  2755. }
  2756. openSaveFile(context, opt, req.getSizeLimit(), fileName.str(), mimeType.str(), mb, resp);
  2757. }
  2758. PROGLOG("%s -- done", logMsg.str());
  2759. }
  2760. }
  2761. catch(IException* e)
  2762. {
  2763. CErrorMessageFormat errorMessageFormat = req.getErrorMessageFormat();
  2764. if (errorMessageFormat == CErrorMessageFormat_XML)
  2765. context.setResponseFormat(ESPSerializationXML);
  2766. else if (errorMessageFormat == CErrorMessageFormat_JSON)
  2767. context.setResponseFormat(ESPSerializationJSON);
  2768. else if (errorMessageFormat == CErrorMessageFormat_Text)
  2769. context.setResponseFormat(ESPSerializationTEXT);
  2770. FORWARDEXCEPTION(context, e, ECLWATCH_INTERNAL_ERROR);
  2771. }
  2772. return true;
  2773. }
  2774. IPropertyTree *getArchivedWorkUnitProperties(const char *wuid, bool dfuWU)
  2775. {
  2776. SocketEndpoint ep;
  2777. getSashaNode(ep);
  2778. Owned<INode> node = createINode(ep);
  2779. if (!node)
  2780. throw MakeStringException(ECLWATCH_INODE_NOT_FOUND, "INode not found.");
  2781. StringBuffer tmp;
  2782. Owned<ISashaCommand> cmd = createSashaCommand();
  2783. cmd->addId(wuid);
  2784. cmd->setAction(SCA_GET);
  2785. cmd->setArchived(true);
  2786. if (dfuWU)
  2787. cmd->setDFU(true);
  2788. if (!cmd->send(node, 1*60*1000))
  2789. throw MakeStringException(ECLWATCH_CANNOT_CONNECT_ARCHIVE_SERVER,
  2790. "Sasha (%s) took too long to respond from: Get workUnit properties for %s.",
  2791. ep.getUrlStr(tmp).str(), wuid);
  2792. if ((cmd->numIds() < 1) || (cmd->numResults() < 1))
  2793. return nullptr;
  2794. cmd->getResult(0, tmp.clear());
  2795. if(tmp.length() < 1)
  2796. return nullptr;
  2797. Owned<IPropertyTree> wu = createPTreeFromXMLString(tmp.str());
  2798. if (!wu)
  2799. return nullptr;
  2800. return wu.getClear();
  2801. }
  2802. void getWorkunitCluster(IEspContext &context, const char *wuid, SCMStringBuffer &cluster, bool checkArchiveWUs)
  2803. {
  2804. if (isEmpty(wuid))
  2805. return;
  2806. if ('W' == wuid[0])
  2807. {
  2808. Owned<IWorkUnitFactory> factory = getWorkUnitFactory(context.querySecManager(), context.queryUser());
  2809. Owned<IConstWorkUnit> cw = factory->openWorkUnit(wuid);
  2810. if (cw)
  2811. cluster.set(cw->queryClusterName());
  2812. else if (checkArchiveWUs)
  2813. {
  2814. Owned<IPropertyTree> wuProps = getArchivedWorkUnitProperties(wuid, false);
  2815. if (wuProps)
  2816. cluster.set(wuProps->queryProp("@clusterName"));
  2817. }
  2818. }
  2819. else
  2820. {
  2821. Owned<IDFUWorkUnitFactory> factory = getDFUWorkUnitFactory();
  2822. Owned<IConstDFUWorkUnit> cw = factory->openWorkUnit(wuid, false);
  2823. if(cw)
  2824. {
  2825. StringBuffer tmp;
  2826. if (cw->getClusterName(tmp).length()!=0)
  2827. cluster.set(tmp.str());
  2828. }
  2829. else if (checkArchiveWUs)
  2830. {
  2831. Owned<IPropertyTree> wuProps = getArchivedWorkUnitProperties(wuid, true);
  2832. if (wuProps)
  2833. cluster.set(wuProps->queryProp("@clusterName"));
  2834. }
  2835. }
  2836. }
  2837. IDistributedFile *CWsWorkunitsEx::lookupLogicalName(IEspContext &context, const char *logicalName)
  2838. {
  2839. StringBuffer userID;
  2840. context.getUserID(userID);
  2841. Owned<IUserDescriptor> userDesc;
  2842. if (!userID.isEmpty())
  2843. {
  2844. userDesc.setown(createUserDescriptor());
  2845. userDesc->set(userID, context.queryPassword(), context.querySignature());
  2846. }
  2847. Owned<IDistributedFile> df = queryDistributedFileDirectory().lookup(logicalName, userDesc, false,
  2848. false, false, nullptr, defaultPrivilegedUser);
  2849. return df.getClear();
  2850. }
  2851. void CWsWorkunitsEx::getFileResults(IEspContext &context, const char *logicalName, const char *cluster, __int64 start, unsigned &count, __int64 &total,
  2852. IStringVal &resname, bool bin, IArrayOf<IConstNamedValue> *filterBy, MemoryBuffer &buf, bool xsd)
  2853. {
  2854. Owned<IDistributedFile> df = lookupLogicalName(context, logicalName);
  2855. if (!df)
  2856. throw makeStringExceptionV(ECLWATCH_FILE_NOT_EXIST, "Cannot find file %s.", logicalName);
  2857. if (df->getDiskSize(true, false) > wuResultMaxSize)
  2858. throw makeStringExceptionV(ECLWATCH_INVALID_ACTION, "Failed to get the result from file %s. The size is bigger than %lld.",
  2859. logicalName, wuResultMaxSize);
  2860. Owned<IResultSetFactory> resultSetFactory = getSecResultSetFactory(context.querySecManager(), context.queryUser(), context.queryUserId(), context.queryPassword());
  2861. Owned<INewResultSet> result(resultSetFactory->createNewFileResultSet(df, cluster));
  2862. if (!filterBy || !filterBy->length())
  2863. appendResultSet(buf, result, resname.str(), start, count, total, bin, xsd, context.getResponseFormat(), NULL);
  2864. else
  2865. {
  2866. Owned<INewResultSet> filteredResult = createFilteredResultSet(result, filterBy);
  2867. appendResultSet(buf, filteredResult, resname.str(), start, count, total, bin, xsd, context.getResponseFormat(), NULL);
  2868. }
  2869. }
  2870. bool CWsWorkunitsEx::onWUResultBin(IEspContext &context,IEspWUResultBinRequest &req, IEspWUResultBinResponse &resp)
  2871. {
  2872. try
  2873. {
  2874. StringBuffer wuidStr(req.getWuid());
  2875. const char* wuidIn = wuidStr.trim().str();
  2876. if (wuidIn && *wuidIn)
  2877. {
  2878. if (!looksLikeAWuid(wuidIn, 'W'))
  2879. throw MakeStringException(ECLWATCH_INVALID_INPUT, "Invalid Workunit ID: %s", wuidIn);
  2880. ensureWsWorkunitAccess(context, wuidIn, SecAccess_Read);
  2881. }
  2882. MemoryBuffer mb;
  2883. __int64 total=0;
  2884. __int64 start = req.getStart() > 0 ? req.getStart() : 0;
  2885. unsigned count = req.getCount(), requested=count;
  2886. IArrayOf<IConstNamedValue>* filterBy = &req.getFilterBy();
  2887. SCMStringBuffer name;
  2888. if(strieq(req.getFormat(),"csv"))
  2889. context.setResponseFormat(ESPSerializationCSV);
  2890. WUState wuState = WUStateUnknown;
  2891. bool bin = (req.getFormat() && strieq(req.getFormat(),"raw"));
  2892. if (notEmpty(wuidIn) && notEmpty(req.getResultName()))
  2893. {
  2894. PROGLOG("WUResultBin: wuid %s, ResultName %s", wuidIn, req.getResultName());
  2895. getWsWuResult(context, wuidIn, req.getResultName(), NULL, 0, start, count, total, name, bin, filterBy, mb, wuState);
  2896. }
  2897. else if (notEmpty(wuidIn) && (req.getSequence() >= 0))
  2898. {
  2899. PROGLOG("WUResultBin: wuid %s, Sequence %d", wuidIn, req.getSequence());
  2900. getWsWuResult(context, wuidIn, NULL, NULL, req.getSequence(), start, count, total, name, bin,filterBy, mb, wuState);
  2901. }
  2902. else if (notEmpty(req.getLogicalName()))
  2903. {
  2904. const char* logicalName = req.getLogicalName();
  2905. const char* clusterIn = req.getCluster();
  2906. if (!isEmptyString(clusterIn))
  2907. getFileResults(context, logicalName, clusterIn, start, count, total, name, false, filterBy, mb, true);
  2908. else
  2909. {
  2910. StringBuffer wuid;
  2911. getWuidFromLogicalFileName(context, logicalName, wuid);
  2912. if (!wuid.length())
  2913. throw MakeStringException(ECLWATCH_CANNOT_GET_WORKUNIT,"Cannot find the workunit for file %s.",logicalName);
  2914. SCMStringBuffer cluster;
  2915. getWorkunitCluster(context, wuid.str(), cluster, true);
  2916. if (cluster.length() > 0)
  2917. getFileResults(context, logicalName, cluster.str(), start, count, total, name, false, filterBy, mb, true);
  2918. else
  2919. getWsWuResult(context, wuid.str(), NULL, logicalName, 0, start, count, total, name, bin, filterBy, mb, wuState);
  2920. }
  2921. }
  2922. else
  2923. throw MakeStringException(ECLWATCH_CANNOT_GET_WU_RESULT,"Cannot open the workunit result.");
  2924. if(strieq(req.getFormat(),"csv"))
  2925. {
  2926. resp.setResult(mb);
  2927. resp.setResult_mimetype("text/csv");
  2928. context.addCustomerHeader("Content-disposition", "attachment;filename=WUResult.csv");
  2929. }
  2930. else if(stricmp(req.getFormat(),"xls")==0)
  2931. {
  2932. Owned<IProperties> params(createProperties());
  2933. params->setProp("showCount",0);
  2934. StringBuffer xml;
  2935. xml.append("<WUResultExcel><Result>").append(mb.length(), mb.toByteArray()).append("</Result></WUResultExcel>");
  2936. if (xml.length() > MAXXLSTRANSFER)
  2937. throw MakeStringException(ECLWATCH_TOO_BIG_DATA_SET, "The data set is too big to be converted to an Excel file. Please use the gzip link to download a compressed XML data file.");
  2938. StringBuffer xls;
  2939. xsltTransform(xml.str(), StringBuffer(getCFD()).append("./smc_xslt/result.xslt").str(), params, xls);
  2940. MemoryBuffer out;
  2941. out.setBuffer(xls.length(), (void*)xls.str());
  2942. resp.setResult(out);
  2943. resp.setResult_mimetype("application/vnd.ms-excel");
  2944. }
  2945. #ifdef _USE_ZLIB
  2946. else if(strieq(req.getFormat(),"zip") || strieq(req.getFormat(),"gzip"))
  2947. {
  2948. bool gzip = strieq(req.getFormat(),"gzip");
  2949. StringBuffer xml("<?xml version=\"1.0\" encoding=\"UTF-8\"?>");
  2950. xml.append("<Result>").append(mb.length(),mb.toByteArray()).append("</Result>");
  2951. VStringBuffer ifname("%s%sT%xAT%x%s", TEMPZIPDIR, PATHSEPSTR, (unsigned)(memsize_t)GetCurrentThreadId(), msTick(), gzip ? "" : ".zip");
  2952. IZZIPor* Zipor = createZZIPor();
  2953. int ret = 0;
  2954. if (gzip)
  2955. ret = Zipor->gzipToFile(xml.length(), (void*)xml.str(), ifname.str());
  2956. else
  2957. ret = Zipor->zipToFile(xml.length(), (void*)xml.str(), "WUResult.xml", ifname.str());
  2958. releaseIZ(Zipor);
  2959. if (ret < 0)
  2960. {
  2961. Owned<IFile> rFile = createIFile(ifname.str());
  2962. if (rFile->exists())
  2963. rFile->remove();
  2964. throw MakeStringException(ECLWATCH_CANNOT_COMPRESS_DATA, "The data cannot be compressed.");
  2965. }
  2966. MemoryBuffer out;
  2967. Owned <IFile> rf = createIFile(ifname.str());
  2968. if (rf->exists())
  2969. {
  2970. Owned <IFileIO> fio = rf->open(IFOread);
  2971. read(fio, 0, (size32_t) rf->size(), out);
  2972. resp.setResult(out);
  2973. }
  2974. if (gzip)
  2975. {
  2976. resp.setResult_mimetype("application/x-gzip");
  2977. context.addCustomerHeader("Content-disposition", "attachment;filename=WUResult.xml.gz");
  2978. }
  2979. else
  2980. {
  2981. resp.setResult_mimetype("application/zip");
  2982. context.addCustomerHeader("Content-disposition", "attachment;filename=WUResult.xml.zip");
  2983. }
  2984. Owned<IFile> rFile = createIFile(ifname.str());
  2985. if (rFile->exists())
  2986. rFile->remove();
  2987. }
  2988. #endif
  2989. else
  2990. {
  2991. resp.setResult(mb);
  2992. }
  2993. resp.setName(name.str());
  2994. resp.setWuid(wuidIn);
  2995. resp.setSequence(req.getSequence());
  2996. resp.setStart(start);
  2997. if (requested > total)
  2998. requested = (unsigned)total;
  2999. resp.setRequested(requested);
  3000. resp.setCount(count);
  3001. resp.setTotal(total);
  3002. resp.setFormat(req.getFormat());
  3003. }
  3004. catch(IException* e)
  3005. {
  3006. FORWARDEXCEPTION(context, e, ECLWATCH_INTERNAL_ERROR);
  3007. }
  3008. return true;
  3009. }
  3010. bool CWsWorkunitsEx::onWUResultSummary(IEspContext &context, IEspWUResultSummaryRequest &req, IEspWUResultSummaryResponse &resp)
  3011. {
  3012. try
  3013. {
  3014. StringBuffer wuid(req.getWuid());
  3015. WsWuHelpers::checkAndTrimWorkunit("WUResultSummary", wuid);
  3016. Owned<IWorkUnitFactory> factory = getWorkUnitFactory(context.querySecManager(), context.queryUser());
  3017. Owned<IConstWorkUnit> cw = factory->openWorkUnit(wuid.str());
  3018. if(!cw)
  3019. throw MakeStringException(ECLWATCH_CANNOT_OPEN_WORKUNIT,"Cannot open workunit %s.",wuid.str());
  3020. ensureWsWorkunitAccess(context, *cw, SecAccess_Read);
  3021. PROGLOG("WUResultSummary: %s", wuid.str());
  3022. resp.setWuid(wuid.str());
  3023. resp.setSequence(req.getSequence());
  3024. IArrayOf<IEspECLResult> results;
  3025. Owned<IConstWUResult> r = cw->getResultBySequence(req.getSequence());
  3026. if (r)
  3027. {
  3028. WsWuInfo winfo(context, cw);
  3029. winfo.getResult(*r, results, 0);
  3030. resp.setFormat(r->getResultFormat());
  3031. resp.setResult(results.item(0));
  3032. }
  3033. }
  3034. catch(IException* e)
  3035. {
  3036. FORWARDEXCEPTION(context, e, ECLWATCH_INTERNAL_ERROR);
  3037. }
  3038. return true;
  3039. }
  3040. bool CWsWorkunitsEx::onWUResult(IEspContext &context, IEspWUResultRequest &req, IEspWUResultResponse &resp)
  3041. {
  3042. try
  3043. {
  3044. StringBuffer wuidStr(req.getWuid());
  3045. const char* wuid = wuidStr.trim().str();
  3046. if (wuid && *wuid)
  3047. {
  3048. if (!looksLikeAWuid(wuid, 'W'))
  3049. throw MakeStringException(ECLWATCH_INVALID_INPUT, "Invalid Workunit ID: %s", wuid);
  3050. ensureWsWorkunitAccess(context, wuid, SecAccess_Read);
  3051. }
  3052. MemoryBuffer mb;
  3053. SCMStringBuffer name;
  3054. __int64 total=0;
  3055. __int64 start = req.getStart() > 0 ? req.getStart() : 0;
  3056. unsigned count=req.getCount() ? req.getCount() : 100, requested=count;
  3057. unsigned seq = req.getSequence();
  3058. bool inclXsd = !req.getSuppressXmlSchema();
  3059. VStringBuffer filter("start=%" I64F "d;count=%d", start, count);
  3060. addToQueryString(filter, "clusterName", req.getCluster(), ';');
  3061. addToQueryString(filter, "logicalName", req.getLogicalName(), ';');
  3062. if (wuid && *wuid)
  3063. addToQueryString(filter, "wuid", wuid, ';');
  3064. addToQueryString(filter, "resultName", req.getResultName(), ';');
  3065. filter.appendf(";seq=%d;", seq);
  3066. if (inclXsd)
  3067. filter.append("xsd;");
  3068. if (context.getResponseFormat()==ESPSerializationJSON)
  3069. filter.append("json;");
  3070. IArrayOf<IConstNamedValue>* filterBy = &req.getFilterBy();
  3071. ForEachItemIn(i, *filterBy)
  3072. {
  3073. IConstNamedValue &item = filterBy->item(i);
  3074. const char *name = item.getName();
  3075. const char *value = item.getValue();
  3076. if (name && *name && value && *value)
  3077. addToQueryString(filter, name, value, ';');
  3078. }
  3079. const char* logicalName = req.getLogicalName();
  3080. const char* clusterName = req.getCluster();
  3081. const char* resultName = req.getResultName();
  3082. Owned<DataCacheElement> data;
  3083. if (!req.getBypassCachedResult())
  3084. data.setown(dataCache->lookup(context, filter, awusCacheMinutes));
  3085. if (data)
  3086. {
  3087. PROGLOG("Retrieving Cached WUResult: %s", filter.str());
  3088. mb.append(data->m_data.c_str());
  3089. name.set(data->m_name.c_str());
  3090. logicalName = data->m_logicalName.c_str();
  3091. wuid = data->m_wuid.c_str();
  3092. resultName = data->m_resultName.c_str();
  3093. seq = data->m_seq;
  3094. start = data->m_start;
  3095. count = data->m_rowcount;
  3096. requested = (unsigned)data->m_requested;
  3097. total = data->m_total;
  3098. if (notEmpty(logicalName))
  3099. resp.setLogicalName(logicalName);
  3100. else
  3101. {
  3102. if (notEmpty(wuid))
  3103. resp.setWuid(wuid);
  3104. resp.setSequence(seq);
  3105. }
  3106. }
  3107. else
  3108. {
  3109. PROGLOG("Retrieving WUResult: %s", filter.str());
  3110. WUState wuState = WUStateUnknown;
  3111. if(logicalName && *logicalName)
  3112. {
  3113. StringBuffer lwuid;
  3114. getWuidFromLogicalFileName(context, logicalName, lwuid);
  3115. SCMStringBuffer cluster;
  3116. if (lwuid.length())
  3117. getWorkunitCluster(context, lwuid.str(), cluster, true);
  3118. if (cluster.length())
  3119. {
  3120. getFileResults(context, logicalName, cluster.str(), start, count, total, name, false, filterBy, mb, inclXsd);
  3121. resp.setLogicalName(logicalName);
  3122. }
  3123. else if (notEmpty(clusterName))
  3124. {
  3125. getFileResults(context, logicalName, clusterName, start, count, total, name, false, filterBy, mb, inclXsd);
  3126. resp.setLogicalName(logicalName);
  3127. }
  3128. else
  3129. throw MakeStringException(ECLWATCH_INVALID_INPUT,"Need valid target cluster to browse file %s.",logicalName);
  3130. Owned<IWorkUnitFactory> wf = getWorkUnitFactory(context.querySecManager(), context.queryUser());
  3131. Owned<IConstWorkUnit> cw = wf->openWorkUnit(lwuid.str());
  3132. if (cw)
  3133. wuState = cw->getState();
  3134. }
  3135. else if (notEmpty(wuid) && notEmpty(resultName))
  3136. {
  3137. name.set(resultName);
  3138. getWsWuResult(context, wuid, resultName, NULL, 0, start, count, total, name, false, filterBy, mb, wuState, inclXsd);
  3139. resp.setWuid(wuid);
  3140. resp.setSequence(seq);
  3141. }
  3142. else
  3143. {
  3144. getWsWuResult(context, wuid, NULL, NULL, seq, start, count, total, name, false, filterBy, mb, wuState, inclXsd);
  3145. resp.setWuid(wuid);
  3146. resp.setSequence(seq);
  3147. }
  3148. mb.append(0);
  3149. if (requested > total)
  3150. requested = (unsigned)total;
  3151. switch (wuState)
  3152. {
  3153. case WUStateCompleted:
  3154. case WUStateAborted:
  3155. case WUStateFailed:
  3156. case WUStateArchived:
  3157. dataCache->add(filter, mb.toByteArray(), name.str(), logicalName, wuid, resultName, seq, start, count, requested, total);
  3158. break;
  3159. }
  3160. }
  3161. resp.setName(name.str());
  3162. resp.setStart(start);
  3163. if (clusterName && *clusterName)
  3164. resp.setCluster(clusterName);
  3165. resp.setRequested(requested);
  3166. resp.setCount(count);
  3167. resp.setTotal(total);
  3168. resp.setResult(mb.toByteArray());
  3169. context.queryXslParameters()->setProp("escapeResults","1");
  3170. }
  3171. catch(IException* e)
  3172. {
  3173. FORWARDEXCEPTION(context, e, ECLWATCH_INTERNAL_ERROR);
  3174. }
  3175. return true;
  3176. }
  3177. void getScheduledWUs(IEspContext &context, WUShowScheduledFilters *filters, const char *serverName, IArrayOf<IEspScheduledWU> & results)
  3178. {
  3179. double version = context.getClientVersion();
  3180. if (notEmpty(serverName))
  3181. {
  3182. Owned<IWorkUnitFactory> factory = getWorkUnitFactory(context.querySecManager(), context.queryUser());
  3183. Owned<IScheduleReader> reader = getScheduleReader(serverName, filters->eventName);
  3184. Owned<IScheduleReaderIterator> it(reader->getIterator());
  3185. while(it->isValidEventName())
  3186. {
  3187. StringBuffer ieventName;
  3188. it->getEventName(ieventName);
  3189. while(it->isValidEventText())
  3190. {
  3191. StringBuffer ieventText;
  3192. it->getEventText(ieventText);
  3193. while(it->isValidWuid())
  3194. {
  3195. StringBuffer wuid;
  3196. it->getWuid(wuid);
  3197. if (wuid.length())
  3198. {
  3199. bool match = true;
  3200. unsigned stateID = WUStateUnknown;
  3201. StringBuffer jobName, owner;
  3202. SCMStringBuffer state;
  3203. try
  3204. {
  3205. Owned<IConstWorkUnit> cw = factory->openWorkUnit(wuid.str());
  3206. if (cw)
  3207. {
  3208. jobName.set(cw->queryJobName());
  3209. owner.set(cw->queryUser());
  3210. }
  3211. if (!filters->jobName.isEmpty() && (jobName.isEmpty() || !WildMatch(jobName.str(), filters->jobName, true)))
  3212. match = false;
  3213. else if (!filters->owner.isEmpty() && (owner.isEmpty() || !WildMatch(owner, filters->owner, true)))
  3214. match = false;
  3215. else if (!filters->eventText.isEmpty() && (ieventText.isEmpty() || !WildMatch(ieventText, filters->eventText, true)))
  3216. match = false;
  3217. else if (!filters->state.isEmpty())
  3218. {
  3219. if (!cw)
  3220. match = false;
  3221. else
  3222. {
  3223. if ((cw->getState() == WUStateScheduled) && cw->aborting())
  3224. {
  3225. stateID = WUStateAborting;
  3226. state.set("aborting");
  3227. }
  3228. else
  3229. {
  3230. stateID = cw->getState();
  3231. state.set(cw->queryStateDesc());
  3232. }
  3233. if (!strieq(filters->state, state.str()))
  3234. match = false;
  3235. }
  3236. }
  3237. }
  3238. catch (IException *e)
  3239. {
  3240. EXCLOG(e, "Get scheduled WUs");
  3241. e->Release();
  3242. match = false;
  3243. }
  3244. if (!match)
  3245. {
  3246. it->nextWuid();
  3247. continue;
  3248. }
  3249. Owned<IEspScheduledWU> scheduledWU = createScheduledWU("");
  3250. scheduledWU->setWuid(wuid.str());
  3251. scheduledWU->setCluster(serverName);
  3252. if (ieventName.length())
  3253. scheduledWU->setEventName(ieventName.str());
  3254. if (ieventText.str())
  3255. scheduledWU->setEventText(ieventText.str());
  3256. if (jobName.length())
  3257. scheduledWU->setJobName(jobName.str());
  3258. if (version >= 1.51)
  3259. {
  3260. if (owner.length())
  3261. scheduledWU->setOwner(owner.str());
  3262. if (state.length())
  3263. {
  3264. scheduledWU->setStateID(stateID);
  3265. scheduledWU->setState(state.str());
  3266. }
  3267. }
  3268. results.append(*scheduledWU.getLink());
  3269. }
  3270. it->nextWuid();
  3271. }
  3272. it->nextEventText();
  3273. }
  3274. it->nextEventName();
  3275. }
  3276. }
  3277. return;
  3278. }
  3279. bool CWsWorkunitsEx::onWUShowScheduled(IEspContext &context, IEspWUShowScheduledRequest & req, IEspWUShowScheduledResponse & resp)
  3280. {
  3281. try
  3282. {
  3283. WUShowScheduledFilters filters(req.getCluster(), req.getState(), req.getOwner(),
  3284. req.getJobName(), req.getEventName(), req.getEventText());
  3285. IArrayOf<IEspScheduledWU> results;
  3286. if(notEmpty(req.getPushEventName()))
  3287. resp.setPushEventName(req.getPushEventName());
  3288. if(notEmpty(req.getPushEventText()))
  3289. resp.setPushEventText(req.getPushEventText());
  3290. Owned<IEnvironmentFactory> factory = getEnvironmentFactory(true);
  3291. Owned<IConstEnvironment> environment = factory->openEnvironment();
  3292. Owned<IPropertyTree> root = &environment->getPTree();
  3293. unsigned i = 0;
  3294. Owned<IPropertyTreeIterator> ic = root->getElements("Software/Topology/Cluster");
  3295. IArrayOf<IEspServerInfo> servers;
  3296. ForEach(*ic)
  3297. {
  3298. IPropertyTree &cluster = ic->query();
  3299. const char *iclusterName = cluster.queryProp("@name");
  3300. if (isEmpty(iclusterName))
  3301. continue;
  3302. if (filters.cluster.isEmpty())
  3303. getScheduledWUs(context, &filters, iclusterName, results);
  3304. else if (strieq(filters.cluster, iclusterName))
  3305. {
  3306. getScheduledWUs(context, &filters, filters.cluster, results);
  3307. resp.setClusterSelected(i+1);
  3308. }
  3309. Owned<IEspServerInfo> server = createServerInfo("");
  3310. server->setName(iclusterName);
  3311. servers.append(*server.getLink());
  3312. i++;
  3313. }
  3314. if (servers.length())
  3315. resp.setClusters(servers);
  3316. if (results.length())
  3317. resp.setWorkunits(results);
  3318. bool first=false;
  3319. StringBuffer Query("PageFrom=Scheduler");
  3320. appendUrlParameter(Query, "EventName", filters.eventName, first);
  3321. appendUrlParameter(Query, "ECluster", filters.cluster, first);
  3322. resp.setQuery(Query.str());
  3323. }
  3324. catch(IException* e)
  3325. {
  3326. FORWARDEXCEPTION(context, e, ECLWATCH_INTERNAL_ERROR);
  3327. }
  3328. return true;
  3329. }
  3330. bool CWsWorkunitsEx::onWUExport(IEspContext &context, IEspWUExportRequest &req, IEspWUExportResponse &resp)
  3331. {
  3332. try
  3333. {
  3334. if (req.getECL() && *req.getECL())
  3335. throw makeStringException(0, "WUExport no longer supports filtering by ECL text");
  3336. Owned<IWorkUnitFactory> factory = getWorkUnitFactory(context.querySecManager(), context.queryUser());
  3337. WsWuSearch ws(context, req.getOwner(), req.getState(), req.getCluster(), req.getStartDate(), req.getEndDate(), req.getJobname());
  3338. StringBuffer xml("<?xml version=\"1.0\" encoding=\"UTF-8\"?><Workunits>");
  3339. for(WsWuSearch::iterator it=ws.begin(); it!=ws.end(); it++)
  3340. {
  3341. Owned<IConstWorkUnit> cw = factory->openWorkUnit(it->c_str());
  3342. if (cw)
  3343. exportWorkUnitToXML(cw, xml, true, false, true);
  3344. }
  3345. xml.append("</Workunits>");
  3346. MemoryBuffer mb;
  3347. mb.setBuffer(xml.length(),(void*)xml.str());
  3348. resp.setExportData(mb);
  3349. resp.setExportData_mimetype(HTTP_TYPE_APPLICATION_XML);
  3350. }
  3351. catch(IException* e)
  3352. {
  3353. FORWARDEXCEPTION(context, e, ECLWATCH_INTERNAL_ERROR);
  3354. }
  3355. return true;
  3356. }
  3357. bool CWsWorkunitsEx::onWUListLocalFileRequired(IEspContext& context, IEspWUListLocalFileRequiredRequest& req, IEspWUListLocalFileRequiredResponse& resp)
  3358. {
  3359. try
  3360. {
  3361. StringBuffer wuid(req.getWuid());
  3362. WsWuHelpers::checkAndTrimWorkunit("WUListLocalFileRequired", wuid);
  3363. ensureWsWorkunitAccess(context, wuid.str(), SecAccess_Read);
  3364. PROGLOG("WUListLocalFileRequired: %s", wuid.str());
  3365. Owned<IWorkUnitFactory> factory = getWorkUnitFactory(context.querySecManager(), context.queryUser());
  3366. Owned<IConstWorkUnit> cw = factory->openWorkUnit(wuid.str());
  3367. if (!cw)
  3368. throw MakeStringException(ECLWATCH_CANNOT_UPDATE_WORKUNIT, "Workunit %s not found.", wuid.str());
  3369. IArrayOf<IEspLogicalFileUpload> localFiles;
  3370. Owned<IConstLocalFileUploadIterator> it = cw->getLocalFileUploads();
  3371. ForEach(*it)
  3372. {
  3373. Owned<IConstLocalFileUpload> file = it->get();
  3374. if(!file)
  3375. continue;
  3376. Owned<IEspLogicalFileUpload> up = createLogicalFileUpload();
  3377. SCMStringBuffer s;
  3378. up->setType(file->queryType());
  3379. up->setSource(file->getSource(s).str());
  3380. up->setDestination(file->getDestination(s).str());
  3381. up->setEventTag(file->getEventTag(s).str());
  3382. localFiles.append(*up.getLink());
  3383. }
  3384. resp.setLocalFileUploads(localFiles);
  3385. }
  3386. catch(IException* e)
  3387. {
  3388. FORWARDEXCEPTION(context, e, ECLWATCH_INTERNAL_ERROR);
  3389. }
  3390. return true;
  3391. }
  3392. typedef enum wsEclTypes_
  3393. {
  3394. wsEclTypeUnknown,
  3395. xsdString,
  3396. xsdBoolean,
  3397. xsdDecimal,
  3398. xsdFloat,
  3399. xsdDouble,
  3400. xsdDuration,
  3401. xsdDateTime,
  3402. xsdTime,
  3403. xsdDate,
  3404. xsdYearMonth,
  3405. xsdYear,
  3406. xsdMonthDay,
  3407. xsdDay,
  3408. xsdMonth,
  3409. xsdHexBinary,
  3410. xsdBase64Binary,
  3411. xsdAnyURI,
  3412. xsdQName,
  3413. xsdNOTATION,
  3414. xsdNormalizedString,
  3415. xsdToken,
  3416. xsdLanguage,
  3417. xsdNMTOKEN,
  3418. xsdNMTOKENS,
  3419. xsdName,
  3420. xsdNCName,
  3421. xsdID,
  3422. xsdIDREF,
  3423. xsdIDREFS,
  3424. xsdENTITY,
  3425. xsdENTITIES,
  3426. xsdInteger,
  3427. xsdNonPositiveInteger,
  3428. xsdNegativeInteger,
  3429. xsdLong,
  3430. xsdInt,
  3431. xsdShort,
  3432. xsdByte,
  3433. xsdNonNegativeInteger,
  3434. xsdUnsignedLong,
  3435. xsdUnsignedInt,
  3436. xsdUnsignedShort,
  3437. xsdUnsignedByte,
  3438. xsdPositiveInteger,
  3439. tnsRawDataFile,
  3440. tnsCsvDataFile,
  3441. tnsEspStringArray,
  3442. tnsEspIntArray,
  3443. tnsXmlDataSet,
  3444. maxWsEclType
  3445. } wsEclType;
  3446. bool CWsWorkunitsEx::onWUAddLocalFileToWorkunit(IEspContext& context, IEspWUAddLocalFileToWorkunitRequest& req, IEspWUAddLocalFileToWorkunitResponse& resp)
  3447. {
  3448. try
  3449. {
  3450. StringBuffer wuid(req.getWuid());
  3451. WsWuHelpers::checkAndTrimWorkunit("WUAddLocalFileToWorkunit", wuid);
  3452. ensureWsWorkunitAccess(context, wuid.str(), SecAccess_Write);
  3453. resp.setWuid(wuid.str());
  3454. const char* varname = req.getName();
  3455. if (isEmpty(varname))
  3456. {
  3457. resp.setResult("Name is not defined!");
  3458. return true;
  3459. }
  3460. resp.setName(varname);
  3461. PROGLOG("WUAddLocalFileToWorkunit: %s, name %s", wuid.str(), varname);
  3462. wsEclType type = (wsEclType) req.getType();
  3463. const char *val = req.getVal();
  3464. unsigned len = req.getLength();
  3465. Owned<IWorkUnitFactory> factory = getWorkUnitFactory(context.querySecManager(), context.queryUser());
  3466. WorkunitUpdate wu(factory->updateWorkUnit(wuid.str()));
  3467. if (!wu)
  3468. {
  3469. resp.setResult("Workunit not found!");
  3470. return true;
  3471. }
  3472. Owned<IWUResult> wuRslt = wu->updateResultByName(varname);
  3473. if (isEmpty(val))
  3474. val=req.getDefVal();
  3475. if (notEmpty(val))
  3476. {
  3477. switch (type)
  3478. {
  3479. case xsdBoolean:
  3480. wuRslt->setResultBool((strieq(val, "1") || strieq(val, "true") || strieq(val, "on")));
  3481. wuRslt->setResultStatus(ResultStatusSupplied);
  3482. break;
  3483. case xsdDecimal:
  3484. case xsdFloat:
  3485. case xsdDouble:
  3486. wuRslt->setResultReal(atof(val));
  3487. wuRslt->setResultStatus(ResultStatusSupplied);
  3488. break;
  3489. case xsdInteger:
  3490. case xsdNonPositiveInteger:
  3491. case xsdNegativeInteger:
  3492. case xsdLong:
  3493. case xsdInt:
  3494. case xsdShort:
  3495. case xsdByte:
  3496. case xsdNonNegativeInteger:
  3497. case xsdUnsignedLong:
  3498. case xsdUnsignedInt:
  3499. case xsdUnsignedShort:
  3500. case xsdUnsignedByte:
  3501. case xsdPositiveInteger:
  3502. wuRslt->setResultInt(_atoi64(val));
  3503. wuRslt->setResultStatus(ResultStatusSupplied);
  3504. break;
  3505. case tnsEspIntArray:
  3506. case tnsEspStringArray:
  3507. wuRslt->setResultRaw(len, val, ResultFormatXmlSet);
  3508. break;
  3509. case tnsRawDataFile:
  3510. wuRslt->setResultRaw(len, val, ResultFormatRaw);
  3511. break;
  3512. case tnsXmlDataSet:
  3513. wuRslt->setResultRaw(len, val, ResultFormatXml);
  3514. break;
  3515. case tnsCsvDataFile:
  3516. case xsdBase64Binary: //tbd
  3517. case xsdHexBinary:
  3518. break;
  3519. default:
  3520. wuRslt->setResultString(val, len);
  3521. wuRslt->setResultStatus(ResultStatusSupplied);
  3522. break;
  3523. }
  3524. }
  3525. resp.setResult("Result has been set as required!");
  3526. }
  3527. catch(IException* e)
  3528. {
  3529. FORWARDEXCEPTION(context, e, ECLWATCH_INTERNAL_ERROR);
  3530. }
  3531. return true;
  3532. }
  3533. bool CWsWorkunitsEx::onWUGetGraphNameAndTypes(IEspContext &context,IEspWUGetGraphNameAndTypesRequest &req, IEspWUGetGraphNameAndTypesResponse &resp)
  3534. {
  3535. try
  3536. {
  3537. StringBuffer wuid(req.getWuid());
  3538. WsWuHelpers::checkAndTrimWorkunit("WUGraphQuery", wuid);
  3539. ensureWsWorkunitAccess(context, wuid.str(), SecAccess_Read);
  3540. PROGLOG("WUGetGraphNameAndTypes: %s", wuid.str());
  3541. StringBuffer type(req.getType());
  3542. WUGraphType graphType = GraphTypeAny;
  3543. if (type.trim().length())
  3544. graphType = getGraphTypeFromString(type.str());
  3545. IArrayOf<IEspNameAndType> graphNameAndTypes;
  3546. WsWuInfo winfo(context, wuid.str());
  3547. winfo.getWUGraphNameAndTypes(graphType, graphNameAndTypes);
  3548. resp.setGraphNameAndTypes(graphNameAndTypes);
  3549. }
  3550. catch(IException* e)
  3551. {
  3552. FORWARDEXCEPTION(context, e, ECLWATCH_INTERNAL_ERROR);
  3553. }
  3554. return true;
  3555. }
  3556. bool CWsWorkunitsEx::onWUProcessGraph(IEspContext &context,IEspWUProcessGraphRequest &req, IEspWUProcessGraphResponse &resp)
  3557. {
  3558. try
  3559. {
  3560. StringBuffer wuid(req.getWuid());
  3561. WsWuHelpers::checkAndTrimWorkunit("WUProcessGraph", wuid);
  3562. Owned<IWorkUnitFactory> factory = getWorkUnitFactory(context.querySecManager(), context.queryUser());
  3563. Owned<IConstWorkUnit> cw = factory->openWorkUnit(wuid.str());
  3564. if(!cw)
  3565. throw MakeStringException(ECLWATCH_CANNOT_OPEN_WORKUNIT,"Cannot open workunit %s.",wuid.str());
  3566. ensureWsWorkunitAccess(context, *cw, SecAccess_Read);
  3567. if (isEmpty(req.getName()))
  3568. throw MakeStringException(ECLWATCH_GRAPH_NOT_FOUND, "Please specify a graph name.");
  3569. Owned<IConstWUGraph> graph = cw->getGraph(req.getName());
  3570. if (!graph)
  3571. throw MakeStringException(ECLWATCH_GRAPH_NOT_FOUND, "Invalid graph name: %s for %s", req.getName(), wuid.str());
  3572. PROGLOG("WUProcessGraph: %s, Graph Name %s", wuid.str(), req.getName());
  3573. StringBuffer xml;
  3574. Owned<IPropertyTree> xgmml = graph->getXGMMLTree(true); // merge in graph progress information
  3575. toXML(xgmml.get(), xml);
  3576. resp.setTheGraph(xml.str());
  3577. }
  3578. catch(IException* e)
  3579. {
  3580. FORWARDEXCEPTION(context, e, ECLWATCH_INTERNAL_ERROR);
  3581. }
  3582. return true;
  3583. }
  3584. bool isRunning(IConstWorkUnit &cw)
  3585. {
  3586. // MORE - move into workunit interface
  3587. switch (cw.getState())
  3588. {
  3589. case WUStateFailed:
  3590. case WUStateAborted:
  3591. case WUStateCompleted:
  3592. return false;
  3593. default:
  3594. return true;
  3595. }
  3596. }
  3597. void CWsWorkunitsEx::readGraph(IEspContext& context, const char* subGraphId, WUGraphIDType& id, bool running,
  3598. IConstWUGraph* graph, IArrayOf<IEspECLGraphEx>& graphs)
  3599. {
  3600. SCMStringBuffer name, label, type;
  3601. graph->getName(name);
  3602. graph->getLabel(label);
  3603. graph->getTypeName(type);
  3604. Owned<IEspECLGraphEx> g = createECLGraphEx("","");
  3605. g->setName(name.str());
  3606. g->setLabel(label.str());
  3607. g->setType(type.str());
  3608. WUGraphState graphState = graph->getState();
  3609. if (running && (WUGraphRunning == graphState))
  3610. {
  3611. g->setRunning(true);
  3612. g->setRunningId(id);
  3613. }
  3614. else if (context.getClientVersion() > 1.20)
  3615. {
  3616. if (WUGraphComplete == graphState)
  3617. g->setComplete(true);
  3618. else if (WUGraphFailed == graphState)
  3619. g->setFailed(true);
  3620. }
  3621. Owned<IPropertyTree> xgmml = graph->getXGMMLTree(true);
  3622. // New functionality, if a subgraph id is specified and we only want to load the xgmml for that subgraph
  3623. // then we need to conditionally pull a propertytree from the xgmml graph one and use that for the xgmml.
  3624. //JCSMORE this should be part of the API and therefore allow *only* the subtree to be pulled from the backend.
  3625. StringBuffer xml;
  3626. if (notEmpty(subGraphId))
  3627. {
  3628. VStringBuffer xpath("//node[@id='%s']", subGraphId);
  3629. toXML(xgmml->queryPropTree(xpath.str()), xml);
  3630. }
  3631. else
  3632. toXML(xgmml, xml);
  3633. g->setGraph(xml.str());
  3634. graphs.append(*g.getClear());
  3635. }
  3636. bool CWsWorkunitsEx::onWUGetGraph(IEspContext& context, IEspWUGetGraphRequest& req, IEspWUGetGraphResponse& resp)
  3637. {
  3638. try
  3639. {
  3640. StringBuffer wuid(req.getWuid());
  3641. WsWuHelpers::checkAndTrimWorkunit("WUGetGraph", wuid);
  3642. Owned<IWorkUnitFactory> factory = getWorkUnitFactory(context.querySecManager(), context.queryUser());
  3643. Owned<IConstWorkUnit> cw = factory->openWorkUnit(wuid.str());
  3644. if(!cw)
  3645. throw MakeStringException(ECLWATCH_CANNOT_OPEN_WORKUNIT,"Cannot open workunit %s.",wuid.str());
  3646. ensureWsWorkunitAccess(context, *cw, SecAccess_Read);
  3647. WUGraphIDType id;
  3648. SCMStringBuffer runningGraph;
  3649. bool running = (isRunning(*cw) && cw->getRunningGraph(runningGraph,id));
  3650. IArrayOf<IEspECLGraphEx> graphs;
  3651. if (isEmpty(req.getGraphName())) // JCS->GS - is this really required??
  3652. {
  3653. PROGLOG("WUGetGraph: %s", wuid.str());
  3654. Owned<IConstWUGraphIterator> it = &cw->getGraphs(GraphTypeAny);
  3655. ForEach(*it)
  3656. readGraph(context, req.getSubGraphId(), id, running, &it->query(), graphs);
  3657. }
  3658. else
  3659. {
  3660. PROGLOG("WUGetGraph: %s, Graph Name %s", wuid.str(), req.getGraphName());
  3661. Owned<IConstWUGraph> graph = cw->getGraph(req.getGraphName());
  3662. if (graph)
  3663. readGraph(context, req.getSubGraphId(), id, running, graph, graphs);
  3664. }
  3665. resp.setGraphs(graphs);
  3666. }
  3667. catch(IException* e)
  3668. {
  3669. FORWARDEXCEPTION(context, e, ECLWATCH_INTERNAL_ERROR);
  3670. }
  3671. return true;
  3672. }
  3673. bool CWsWorkunitsEx::onGVCAjaxGraph(IEspContext &context, IEspGVCAjaxGraphRequest &req, IEspGVCAjaxGraphResponse &resp)
  3674. {
  3675. try
  3676. {
  3677. resp.setName(req.getName());
  3678. resp.setGraphName(req.getGraphName());
  3679. resp.setGraphType("eclwatch");
  3680. double version = context.getClientVersion();
  3681. if (version > 1.19)
  3682. resp.setSubGraphId(req.getSubGraphId());
  3683. if (version > 1.20)
  3684. resp.setSubGraphOnly(req.getSubGraphOnly());
  3685. }
  3686. catch(IException* e)
  3687. {
  3688. FORWARDEXCEPTION(context, e, ECLWATCH_INTERNAL_ERROR);
  3689. }
  3690. return true;
  3691. }
  3692. bool CWsWorkunitsEx::onWUDetails(IEspContext &context, IEspWUDetailsRequest &req, IEspWUDetailsResponse &resp)
  3693. {
  3694. try
  3695. {
  3696. StringBuffer wuid(req.getWUID());
  3697. WsWuHelpers::checkAndTrimWorkunit("WUDetails", wuid);
  3698. PROGLOG("WUDetails: %s", wuid.str());
  3699. Owned<IWorkUnitFactory> factory = getWorkUnitFactory(context.querySecManager(), context.queryUser());
  3700. Owned<IConstWorkUnit> cw = factory->openWorkUnit(wuid.str());
  3701. if(!cw)
  3702. throw MakeStringException(ECLWATCH_CANNOT_OPEN_WORKUNIT,"Cannot open workunit %s.",wuid.str());
  3703. ensureWsWorkunitAccess(context, *cw, SecAccess_Read);
  3704. WUDetails wuDetails(cw, wuid);
  3705. wuDetails.processRequest(req, resp);
  3706. }
  3707. catch(IException* e)
  3708. {
  3709. FORWARDEXCEPTION(context, e, ECLWATCH_INTERNAL_ERROR);
  3710. }
  3711. return true;
  3712. }
  3713. static void getWUDetailsMetaProperties(IArrayOf<IEspWUDetailsMetaProperty> & properties)
  3714. {
  3715. for (unsigned sk=StKindAll+1; sk<StMax;++sk)
  3716. {
  3717. const char * s = queryStatisticName((StatisticKind)sk);
  3718. if (s && *s)
  3719. {
  3720. Owned<IEspWUDetailsMetaProperty> property = createWUDetailsMetaProperty("","");
  3721. property->setName(s);
  3722. property->setValueType(CWUDetailsAttrValueType_Single);
  3723. properties.append(*property.getClear());
  3724. }
  3725. }
  3726. for (WuAttr attr=WaKind; attr<WaMax; ++attr)
  3727. {
  3728. Owned<IEspWUDetailsMetaProperty> property = createWUDetailsMetaProperty("","");
  3729. const char * s = queryWuAttributeName(attr);
  3730. assertex(s && *s);
  3731. property->setName(s);
  3732. if (isListAttribute(attr))
  3733. property->setValueType(CWUDetailsAttrValueType_List);
  3734. else if (isMultiAttribute(attr))
  3735. property->setValueType(CWUDetailsAttrValueType_Multi);
  3736. else
  3737. property->setValueType(CWUDetailsAttrValueType_Single);
  3738. properties.append(*property.getClear());
  3739. }
  3740. }
  3741. static void getWUDetailsMetaScopeTypes(StringArray & scopeTypes)
  3742. {
  3743. for (unsigned sst=SSTall+1; sst<SSTmax; ++sst)
  3744. {
  3745. const char * s = queryScopeTypeName((StatisticScopeType)sst);
  3746. if (s && *s)
  3747. scopeTypes.append(s);
  3748. }
  3749. }
  3750. static void getWUDetailsMetaMeasures(StringArray & measures)
  3751. {
  3752. for (unsigned measure=SMeasureAll+1; measure<SMeasureMax; ++measure)
  3753. {
  3754. const char *s = queryMeasureName((StatisticMeasure)measure);
  3755. if (s && *s)
  3756. measures.append(s);
  3757. }
  3758. }
  3759. static void getWUDetailsMetaActivities(IArrayOf<IConstWUDetailsActivityInfo> & activities)
  3760. {
  3761. for (unsigned kind=((unsigned)ThorActivityKind::TAKnone)+1; kind< TAKlast; ++kind)
  3762. {
  3763. Owned<IEspWUDetailsActivityInfo> activity = createWUDetailsActivityInfo("","");
  3764. const char * name = getActivityText(static_cast<ThorActivityKind>(kind));
  3765. assertex(name && *name);
  3766. activity->setKind(kind);
  3767. activity->setName(name);
  3768. activity->setIsSink(isActivitySink(static_cast<ThorActivityKind>(kind)));
  3769. activity->setIsSource(isActivitySource(static_cast<ThorActivityKind>(kind)));
  3770. activities.append(*activity.getClear());
  3771. }
  3772. }
  3773. bool CWsWorkunitsEx::onWUDetailsMeta(IEspContext &context, IEspWUDetailsMetaRequest &req, IEspWUDetailsMetaResponse &resp)
  3774. {
  3775. try
  3776. {
  3777. IArrayOf<IEspWUDetailsMetaProperty> properties;
  3778. getWUDetailsMetaProperties(properties);
  3779. resp.setProperties(properties);
  3780. StringArray scopeTypes;
  3781. getWUDetailsMetaScopeTypes(scopeTypes);
  3782. resp.setScopeTypes(scopeTypes);
  3783. StringArray measures;
  3784. getWUDetailsMetaMeasures(measures);
  3785. resp.setMeasures(measures);
  3786. IArrayOf<IConstWUDetailsActivityInfo> activities;
  3787. getWUDetailsMetaActivities(activities);
  3788. resp.setActivities(activities);
  3789. }
  3790. catch(IException* e)
  3791. {
  3792. FORWARDEXCEPTION(context, e, ECLWATCH_INTERNAL_ERROR);
  3793. }
  3794. return true;
  3795. }
  3796. #ifdef _USE_CPPUNIT
  3797. #include "unittests.hpp"
  3798. class WUDetailsMetaTest : public CppUnit::TestFixture
  3799. {
  3800. CPPUNIT_TEST_SUITE( WUDetailsMetaTest );
  3801. CPPUNIT_TEST(testWUDetailsMeta);
  3802. CPPUNIT_TEST_SUITE_END();
  3803. void testWUDetailsMeta()
  3804. {
  3805. // These calls also check that all the calls required to build WUDetailsMeta
  3806. // are successful.
  3807. IArrayOf<IEspWUDetailsMetaProperty> properties;
  3808. getWUDetailsMetaProperties(properties);
  3809. unsigned expectedOrdinalityProps = StMax - (StKindAll + 1) + (WaMax-WaKind);
  3810. ASSERT(properties.ordinality()==expectedOrdinalityProps);
  3811. StringArray scopeTypes;
  3812. getWUDetailsMetaScopeTypes(scopeTypes);
  3813. unsigned expectedOrdinalityScopeTypes = SSTmax - (SSTall+1);
  3814. ASSERT(scopeTypes.ordinality()==expectedOrdinalityScopeTypes);
  3815. StringArray measures;
  3816. getWUDetailsMetaMeasures(measures);
  3817. unsigned expectedOrdinalityMeasures = SMeasureMax - (SMeasureAll+1);
  3818. ASSERT(measures.ordinality()==expectedOrdinalityMeasures);
  3819. IArrayOf<IConstWUDetailsActivityInfo> activities;
  3820. getWUDetailsMetaActivities(activities);
  3821. unsigned expectedOrdinalityActivities = TAKlast - (TAKnone+1);
  3822. ASSERT(activities.ordinality()==expectedOrdinalityActivities);
  3823. }
  3824. };
  3825. CPPUNIT_TEST_SUITE_REGISTRATION( WUDetailsMetaTest );
  3826. CPPUNIT_TEST_SUITE_NAMED_REGISTRATION( WUDetailsMetaTest, "WUDetailsMetaTest" );
  3827. #endif // _USE_CPPUNIT
  3828. bool CWsWorkunitsEx::onWUGraphInfo(IEspContext &context,IEspWUGraphInfoRequest &req, IEspWUGraphInfoResponse &resp)
  3829. {
  3830. try
  3831. {
  3832. StringBuffer wuid(req.getWuid());
  3833. WsWuHelpers::checkAndTrimWorkunit("WUGraphInfo", wuid);
  3834. Owned<IWorkUnitFactory> factory = getWorkUnitFactory(context.querySecManager(), context.queryUser());
  3835. Owned<IConstWorkUnit> cw = factory->openWorkUnit(wuid.str());
  3836. if(!cw)
  3837. throw MakeStringException(ECLWATCH_CANNOT_UPDATE_WORKUNIT,"Cannot open workunit %s.",wuid.str());
  3838. ensureWsWorkunitAccess(context, *cw, SecAccess_Write);
  3839. resp.setWuid(wuid.str());
  3840. resp.setName(req.getName());
  3841. resp.setRunning(isRunning(*cw));
  3842. if (notEmpty(req.getGID()))
  3843. resp.setGID(req.getGID());
  3844. if(!req.getBatchWU_isNull())
  3845. resp.setBatchWU(req.getBatchWU());
  3846. }
  3847. catch(IException* e)
  3848. {
  3849. FORWARDEXCEPTION(context, e, ECLWATCH_INTERNAL_ERROR);
  3850. }
  3851. return true;
  3852. }
  3853. bool CWsWorkunitsEx::onWUGVCGraphInfo(IEspContext &context,IEspWUGVCGraphInfoRequest &req, IEspWUGVCGraphInfoResponse &resp)
  3854. {
  3855. try
  3856. {
  3857. StringBuffer wuid(req.getWuid());
  3858. WsWuHelpers::checkAndTrimWorkunit("WUGVCGraphInfo", wuid);
  3859. Owned<IWorkUnitFactory> factory = getWorkUnitFactory(context.querySecManager(), context.queryUser());
  3860. Owned<IConstWorkUnit> cw = factory->openWorkUnit(wuid.str());
  3861. if(!cw)
  3862. throw MakeStringException(ECLWATCH_CANNOT_UPDATE_WORKUNIT,"Cannot open workunit %s.",wuid.str());
  3863. ensureWsWorkunitAccess(context, *cw, SecAccess_Read);
  3864. resp.setWuid(wuid.str());
  3865. resp.setName(req.getName());
  3866. resp.setRunning(isRunning(*cw));
  3867. if (notEmpty(req.getGID()))
  3868. resp.setGID(req.getGID());
  3869. if(!req.getBatchWU_isNull())
  3870. resp.setBatchWU(req.getBatchWU());
  3871. StringBuffer xml("<Control><Endpoint><Query id=\"Gordon.Extractor.0\">");
  3872. xml.appendf("<Graph id=\"%s\">", req.getName());
  3873. if (context.getClientVersion() > 1.17)
  3874. {
  3875. xml.append("<Subgraph>");
  3876. xml.append(req.getSubgraphId_isNull() ? 0 : req.getSubgraphId());
  3877. xml.append("</Subgraph>");
  3878. }
  3879. xml.append("</Graph></Query></Endpoint></Control>");
  3880. resp.setTheGraph(xml.str());
  3881. }
  3882. catch(IException* e)
  3883. {
  3884. FORWARDEXCEPTION(context, e, ECLWATCH_INTERNAL_ERROR);
  3885. }
  3886. return true;
  3887. }
  3888. bool CWsWorkunitsEx::onWUGraphTiming(IEspContext &context, IEspWUGraphTimingRequest &req, IEspWUGraphTimingResponse &resp)
  3889. {
  3890. try
  3891. {
  3892. StringBuffer wuid(req.getWuid());
  3893. WsWuHelpers::checkAndTrimWorkunit("WUGraphTiming", wuid);
  3894. Owned<IWorkUnitFactory> factory = getWorkUnitFactory(context.querySecManager(), context.queryUser());
  3895. Owned<IConstWorkUnit> cw = factory->openWorkUnit(wuid.str());
  3896. if(!cw)
  3897. throw MakeStringException(ECLWATCH_CANNOT_UPDATE_WORKUNIT,"Cannot open workunit %s.",wuid.str());
  3898. ensureWsWorkunitAccess(context, *cw, SecAccess_Read);
  3899. PROGLOG("WUGraphTiming: %s", wuid.str());
  3900. resp.updateWorkunit().setWuid(wuid.str());
  3901. WsWuInfo winfo(context, cw);
  3902. IArrayOf<IConstECLTimingData> timingData;
  3903. winfo.getGraphTimingData(timingData);
  3904. resp.updateWorkunit().setTimingData(timingData);
  3905. }
  3906. catch(IException* e)
  3907. {
  3908. FORWARDEXCEPTION(context, e, ECLWATCH_INTERNAL_ERROR);
  3909. }
  3910. return true;
  3911. }
  3912. int CWsWorkunitsSoapBindingEx::onGetForm(IEspContext &context, CHttpRequest* request, CHttpResponse* response, const char *service, const char *method)
  3913. {
  3914. try
  3915. {
  3916. StringBuffer xml;
  3917. StringBuffer xslt;
  3918. if(strieq(method,"WUQuery") || strieq(method,"WUJobList"))
  3919. {
  3920. Owned<IEnvironmentFactory> factory = getEnvironmentFactory(true);
  3921. Owned<IConstEnvironment> environment = factory->openEnvironment();
  3922. Owned<IPropertyTree> root = &environment->getPTree();
  3923. if(strieq(method,"WUQuery"))
  3924. {
  3925. SecAccessFlags accessOwn;
  3926. SecAccessFlags accessOthers;
  3927. getUserWuAccessFlags(context, accessOwn, accessOthers, false);
  3928. xml.append("<WUQuery>");
  3929. if ((accessOwn == SecAccess_None) && (accessOthers == SecAccess_None))
  3930. {
  3931. context.setAuthStatus(AUTH_STATUS_NOACCESS);
  3932. xml.appendf("<ErrorMessage>Access to workunit is denied.</ErrorMessage>");
  3933. }
  3934. else
  3935. {
  3936. MapStringTo<bool> added;
  3937. Owned<IPropertyTreeIterator> it = root->getElements("Software/Topology/Cluster");
  3938. ForEach(*it)
  3939. {
  3940. const char *name = it->query().queryProp("@name");
  3941. if (notEmpty(name) && !added.getValue(name))
  3942. {
  3943. added.setValue(name, true);
  3944. appendXMLTag(xml, "Cluster", name);
  3945. }
  3946. }
  3947. }
  3948. xml.append("</WUQuery>");
  3949. xslt.append(getCFD()).append("./smc_xslt/wuid_search.xslt");
  3950. }
  3951. else if (strieq(method,"WUJobList"))
  3952. {
  3953. StringBuffer cluster, defaultProcess, range;
  3954. request->getParameter("Cluster", cluster);
  3955. request->getParameter("Process",defaultProcess);
  3956. request->getParameter("Range",range);
  3957. Owned<IConstWUClusterInfo> clusterInfo = getTargetClusterInfo(cluster);
  3958. xml.append("<WUJobList>");
  3959. SecAccessFlags accessOwn;
  3960. SecAccessFlags accessOthers;
  3961. getUserWuAccessFlags(context, accessOwn, accessOthers, false);
  3962. if ((accessOwn == SecAccess_None) && (accessOthers == SecAccess_None))
  3963. {
  3964. context.setAuthStatus(AUTH_STATUS_NOACCESS);
  3965. xml.appendf("<ErrorMessage>Access to workunit is denied.</ErrorMessage>");
  3966. }
  3967. else
  3968. {
  3969. if (range.length())
  3970. appendXMLTag(xml, "Range", range.str());
  3971. if (clusterInfo)
  3972. {
  3973. const StringArray &thorInstances = clusterInfo->getThorProcesses();
  3974. ForEachItemIn(i, thorInstances)
  3975. {
  3976. const char* instance = thorInstances.item(i);
  3977. if (defaultProcess.length() && strieq(instance, defaultProcess.str()))
  3978. xml.append("<Cluster selected=\"1\">").append(instance).append("</Cluster>");
  3979. else
  3980. xml.append("<Cluster>").append(instance).append("</Cluster>");
  3981. }
  3982. }
  3983. xml.append("<TargetCluster>").append(cluster).append("</TargetCluster>");
  3984. }
  3985. xml.append("</WUJobList>");
  3986. xslt.append(getCFD()).append("./smc_xslt/jobs_search.xslt");
  3987. response->addHeader("Expires", "0");
  3988. }
  3989. }
  3990. if (xslt.length() && xml.length())
  3991. {
  3992. StringBuffer html;
  3993. xsltTransform(xml.str(), xslt.str(), NULL, html);
  3994. response->setContent(html.str());
  3995. response->setContentType(HTTP_TYPE_TEXT_HTML_UTF8);
  3996. response->send();
  3997. return 0;
  3998. }
  3999. }
  4000. catch(IException* e)
  4001. {
  4002. FORWARDEXCEPTION(context, e, ECLWATCH_INTERNAL_ERROR);
  4003. }
  4004. return onGetNotFound(context, request, response, service);
  4005. }
  4006. int CWsWorkunitsSoapBindingEx::onStartUpload(IEspContext &ctx, CHttpRequest* request, CHttpResponse* response, const char *serv, const char *method)
  4007. {
  4008. StringArray fileNames, files;
  4009. StringBuffer source;
  4010. Owned<IMultiException> me = MakeMultiException(source.setf("WsWorkunits::%s()", method).str());
  4011. try
  4012. {
  4013. if (strieq(method, "ImportWUZAPFile"))
  4014. {
  4015. SecAccessFlags accessOwn, accessOthers;
  4016. getUserWuAccessFlags(ctx, accessOwn, accessOthers, false);
  4017. if ((accessOwn != SecAccess_Full) || (accessOthers != SecAccess_Full))
  4018. throw MakeStringException(-1, "Permission denied.");
  4019. StringBuffer password;
  4020. request->getParameter("Password", password);
  4021. request->readContentToFiles(nullptr, zipFolder, fileNames);
  4022. unsigned count = fileNames.ordinality();
  4023. if (count == 0)
  4024. throw MakeStringException(ECLWATCH_INVALID_INPUT, "Failed to read upload content.");
  4025. //For now, we only support importing 1 ZAP report per ImportWUZAPFile request for a better response time.
  4026. //Some ZAP report could be very big. It may take a log time to import.
  4027. if (count > 1)
  4028. throw MakeStringException(ECLWATCH_INVALID_INPUT, "Only one WU ZAP report is allowed.");
  4029. VStringBuffer fileName("%s%s", zipFolder, fileNames.item(0));
  4030. wswService->queryWUFactory()->importWorkUnit(fileName, password,
  4031. wswService->getDataDirectory(), "ws_workunits", ctx.queryUserId(), ctx.querySecManager(), ctx.queryUser());
  4032. }
  4033. else
  4034. throw MakeStringException(ECLWATCH_INVALID_INPUT, "WsWorkunits::%s does not support the upload_ option.", method);
  4035. }
  4036. catch (IException* e)
  4037. {
  4038. me->append(*e);
  4039. }
  4040. catch (...)
  4041. {
  4042. me->append(*MakeStringExceptionDirect(ECLWATCH_INTERNAL_ERROR, "Unknown Exception"));
  4043. }
  4044. return onFinishUpload(ctx, request, response, serv, method, fileNames, files, me);
  4045. }
  4046. bool isDeploymentTypeCompressed(const char *type)
  4047. {
  4048. if (type && *type)
  4049. return (0==strncmp(type, "compressed_", strlen("compressed_")));
  4050. return false;
  4051. }
  4052. const char *skipCompressedTypeQualifier(const char *type)
  4053. {
  4054. if (isDeploymentTypeCompressed(type))
  4055. type += strlen("compressed_");
  4056. return type;
  4057. }
  4058. void deployEclOrArchive(IEspContext &context, IEspWUDeployWorkunitRequest & req, IEspWUDeployWorkunitResponse & resp)
  4059. {
  4060. NewWsWorkunit wu(context);
  4061. StringAttr wuid(wu->queryWuid()); // NB queryWuid() not valid after workunit,clear()
  4062. wu->setAction(WUActionCompile);
  4063. StringBuffer name(req.getName());
  4064. if (!name.trim().length() && notEmpty(req.getFileName()))
  4065. splitFilename(req.getFileName(), NULL, NULL, &name, NULL);
  4066. if (name.length())
  4067. wu->setJobName(name.str());
  4068. if (req.getObject().length())
  4069. {
  4070. MemoryBuffer mb;
  4071. const MemoryBuffer *uncompressed = &req.getObject();
  4072. if (isDeploymentTypeCompressed(req.getObjType()))
  4073. {
  4074. fastLZDecompressToBuffer(mb, req.getObject().bufferBase());
  4075. uncompressed = &mb;
  4076. }
  4077. StringBuffer text(uncompressed->length(), uncompressed->toByteArray());
  4078. wu.setQueryText(text.str());
  4079. }
  4080. if (req.getQueryMainDefinition())
  4081. wu.setQueryMain(req.getQueryMainDefinition());
  4082. if (req.getSnapshot())
  4083. wu->setSnapshot(req.getSnapshot());
  4084. if (!req.getResultLimit_isNull())
  4085. wu->setResultLimit(req.getResultLimit());
  4086. wu->commit();
  4087. wu.clear();
  4088. WsWuHelpers::submitWsWorkunit(context, wuid.str(), req.getCluster(), NULL, 0, true, false, false, NULL, NULL, &req.getDebugValues());
  4089. waitForWorkUnitToCompile(wuid.str(), req.getWait());
  4090. WsWuInfo winfo(context, wuid.str());
  4091. winfo.getCommon(resp.updateWorkunit(), WUINFO_All);
  4092. winfo.getExceptions(resp.updateWorkunit(), WUINFO_All);
  4093. name.clear();
  4094. if (notEmpty(resp.updateWorkunit().getJobname()))
  4095. origValueChanged(req.getName(), resp.updateWorkunit().getJobname(), name, false);
  4096. if (name.length()) //non generated user specified name, so override #Workunit('name')
  4097. {
  4098. WorkunitUpdate wx(&winfo.cw->lock());
  4099. wx->setJobName(name.str());
  4100. resp.updateWorkunit().setJobname(name.str());
  4101. }
  4102. PROGLOG("WUDeploy generates: %s", wuid.str());
  4103. AuditSystemAccess(context.queryUserId(), true, "Updated %s", wuid.str());
  4104. }
  4105. StringBuffer &sharedObjectFileName(StringBuffer &filename, const char *name, const char *ext, unsigned copy)
  4106. {
  4107. filename.append((name && *name) ? name : "workunit");
  4108. if (copy)
  4109. filename.append('_').append(copy);
  4110. if (notEmpty(ext))
  4111. filename.append(ext);
  4112. return filename;
  4113. }
  4114. inline StringBuffer &buildFullDllPath(StringBuffer &dllpath, StringBuffer &dllname, const char *dir, const char *name, const char *ext, unsigned copy)
  4115. {
  4116. return addPathSepChar(dllpath.set(dir)).append(sharedObjectFileName(dllname, name, ext, copy));
  4117. }
  4118. void writeTempSharedObject(const MemoryBuffer &obj, const char *dir, StringBuffer &filename)
  4119. {
  4120. OwnedIFileIO io = createUniqueFile(dir, "query_copy_dll_", NULL, filename);
  4121. io->write(0, obj.length(), obj.toByteArray());
  4122. }
  4123. void writeSharedObject(const char *srcpath, const MemoryBuffer &obj, const char *dir, StringBuffer &dllpath, StringBuffer &dllname)
  4124. {
  4125. StringBuffer name, ext;
  4126. if (srcpath && *srcpath)
  4127. splitFilename(srcpath, NULL, NULL, &name, &ext);
  4128. unsigned copy=0;
  4129. buildFullDllPath(dllpath.clear(), dllname.clear(), dir, name.str(), ext.str(), copy);
  4130. unsigned crc=0;
  4131. StringBuffer tempDllName;
  4132. const unsigned attempts = 3; // max attempts
  4133. for (unsigned i=0; i<attempts; i++)
  4134. {
  4135. while (checkFileExists(dllpath.str()))
  4136. {
  4137. if (crc==0)
  4138. crc = crc32(obj.toByteArray(), obj.length(), 0);
  4139. if (crc == crc_file(dllpath.str()))
  4140. {
  4141. DBGLOG("Workunit dll already exists: %s", dllpath.str());
  4142. if (tempDllName.length())
  4143. removeFileTraceIfFail(tempDllName);
  4144. return;
  4145. }
  4146. buildFullDllPath(dllpath.clear(), dllname.clear(), dir, name.str(), ext.str(), ++copy);
  4147. }
  4148. if (!tempDllName.length())
  4149. writeTempSharedObject(obj, dir, tempDllName);
  4150. try
  4151. {
  4152. renameFile(dllpath, tempDllName, false);
  4153. return;
  4154. }
  4155. catch (IException *e)
  4156. {
  4157. EXCLOG(e, "writeSharedObject"); //pretty small window for another copy of this dll to sneak by
  4158. e->Release();
  4159. }
  4160. }
  4161. throw MakeStringException(ECLWATCH_CANNOT_COPY_DLL, "Failed copying shared object %s", srcpath);
  4162. }
  4163. void deploySharedObject(IEspContext &context, StringBuffer &wuid, const char *filename, const char *cluster, const char *name, const MemoryBuffer &obj, const char *dir, const char *xml)
  4164. {
  4165. StringBuffer dllpath, dllname;
  4166. StringBuffer srcname(filename);
  4167. unsigned crc = 0;
  4168. Owned<IPropertyTree> srcxml;
  4169. if (xml && *xml)
  4170. {
  4171. srcxml.setown(createPTreeFromXMLString(xml));
  4172. if (srcxml && wuid.length())
  4173. {
  4174. crc = srcxml->getPropInt("Query[1]/Associated[1]/File[@type='dll'][1]/@crc", 0);
  4175. if (crc)
  4176. {
  4177. Owned<IWorkUnitFactory> factory = getWorkUnitFactory(context.querySecManager(), context.queryUser());
  4178. Owned<IConstWorkUnit> cw = factory->openWorkUnit(wuid);
  4179. if (cw)
  4180. {
  4181. //is this a previous copy of same query, or a WUID collision?
  4182. if (cw->getHash() == (unsigned) srcxml->getPropInt64("@hash", 0))
  4183. {
  4184. Owned<IConstWUQuery> query = cw->getQuery();
  4185. if (query && crc == query->getQueryDllCrc())
  4186. return;
  4187. }
  4188. }
  4189. }
  4190. }
  4191. }
  4192. if (!srcname.length())
  4193. srcname.append(name).append(SharedObjectExtension);
  4194. writeSharedObject(srcname.str(), obj, dir, dllpath, dllname);
  4195. NewWsWorkunit wu(context, wuid); //duplicate wuid made unique
  4196. wuid.set(wu->queryWuid());
  4197. wu->setClusterName(cluster);
  4198. wu->commit();
  4199. StringBuffer dllXML;
  4200. if (getWorkunitXMLFromFile(dllpath.str(), dllXML))
  4201. {
  4202. Owned<ILocalWorkUnit> embeddedWU = createLocalWorkUnit(dllXML.str());
  4203. queryExtendedWU(wu)->copyWorkUnit(embeddedWU, true, true);
  4204. }
  4205. wu.associateDll(dllpath.str(), dllname.str());
  4206. if (name && *name)
  4207. wu->setJobName(name);
  4208. //clean slate, copy only select items from processed workunit xml
  4209. if (srcxml)
  4210. {
  4211. if (srcxml->hasProp("@jobName"))
  4212. wu->setJobName(srcxml->queryProp("@jobName"));
  4213. if (srcxml->hasProp("Query/Text"))
  4214. wu.setQueryText(srcxml->queryProp("Query/Text"));
  4215. }
  4216. wu->setState(WUStateCompiled);
  4217. wu->commit();
  4218. wu.clear();
  4219. AuditSystemAccess(context.queryUserId(), true, "Updated %s", wuid.str());
  4220. }
  4221. void CWsWorkunitsEx::deploySharedObjectReq(IEspContext &context, IEspWUDeployWorkunitRequest & req, IEspWUDeployWorkunitResponse & resp, const char *dir, const char *xml)
  4222. {
  4223. if (isEmpty(req.getFileName()))
  4224. throw MakeStringException(ECLWATCH_INVALID_INPUT, "File name required when deploying a shared object.");
  4225. const char *cluster = req.getCluster();
  4226. if (isEmpty(cluster))
  4227. throw MakeStringException(ECLWATCH_INVALID_INPUT, "Cluster name required when deploying a shared object.");
  4228. const MemoryBuffer *uncompressed = &req.getObject();
  4229. MemoryBuffer mb;
  4230. if (isDeploymentTypeCompressed(req.getObjType()))
  4231. {
  4232. fastLZDecompressToBuffer(mb, req.getObject().bufferBase());
  4233. uncompressed = &mb;
  4234. }
  4235. StringBuffer wuid;
  4236. deploySharedObject(context, wuid, req.getFileName(), cluster, req.getName(), *uncompressed, dir, xml);
  4237. WsWuInfo winfo(context, wuid.str());
  4238. winfo.getCommon(resp.updateWorkunit(), WUINFO_All);
  4239. PROGLOG("WUDeploy generates: %s", wuid.str());
  4240. AuditSystemAccess(context.queryUserId(), true, "Updated %s", wuid.str());
  4241. }
  4242. bool CWsWorkunitsEx::onWUDeployWorkunit(IEspContext &context, IEspWUDeployWorkunitRequest & req, IEspWUDeployWorkunitResponse & resp)
  4243. {
  4244. const char *type = skipCompressedTypeQualifier(req.getObjType());
  4245. try
  4246. {
  4247. ensureWsCreateWorkunitAccess(context);
  4248. if (notEmpty(req.getCluster()) && !isValidCluster(req.getCluster()))
  4249. throw MakeStringException(ECLWATCH_INVALID_CLUSTER_NAME, "Invalid cluster name: %s", req.getCluster());
  4250. if (!type || !*type)
  4251. throw MakeStringExceptionDirect(ECLWATCH_INVALID_INPUT, "WUDeployWorkunit unspecified object type.");
  4252. if (strieq(type, "archive")|| strieq(type, "ecl_text"))
  4253. deployEclOrArchive(context, req, resp);
  4254. else if (strieq(type, "shared_object"))
  4255. deploySharedObjectReq(context, req, resp, queryDirectory.str());
  4256. else
  4257. throw MakeStringException(ECLWATCH_INVALID_INPUT, "WUDeployWorkunit '%s' unknown object type.", type);
  4258. }
  4259. catch(IException* e)
  4260. {
  4261. FORWARDEXCEPTION(context, e, ECLWATCH_INTERNAL_ERROR);
  4262. }
  4263. return true;
  4264. }
  4265. bool CWsWorkunitsEx::onWUCreateZAPInfo(IEspContext &context, IEspWUCreateZAPInfoRequest &req, IEspWUCreateZAPInfoResponse &resp)
  4266. {
  4267. try
  4268. {
  4269. CWsWuZAPInfoReq zapInfoReq;
  4270. zapInfoReq.wuid = req.getWuid();
  4271. WsWuHelpers::checkAndTrimWorkunit("WUCreateZAPInfo", zapInfoReq.wuid);
  4272. Owned<IWorkUnitFactory> factory = getWorkUnitFactory(context.querySecManager(), context.queryUser());
  4273. Owned<IConstWorkUnit> cwu = factory->openWorkUnit(zapInfoReq.wuid.str());
  4274. if(!cwu.get())
  4275. throw MakeStringException(ECLWATCH_CANNOT_OPEN_WORKUNIT, "Cannot open workunit %s.", zapInfoReq.wuid.str());
  4276. ensureWsWorkunitAccess(context, *cwu, SecAccess_Read);
  4277. PROGLOG("WUCreateZAPInfo(): %s", zapInfoReq.wuid.str());
  4278. zapInfoReq.espIP = req.getESPIPAddress();
  4279. zapInfoReq.thorIP = req.getThorIPAddress();
  4280. zapInfoReq.problemDesc = req.getProblemDescription();
  4281. zapInfoReq.whatChanged = req.getWhatChanged();
  4282. zapInfoReq.whereSlow = req.getWhereSlow();
  4283. zapInfoReq.includeThorSlaveLog = req.getIncludeThorSlaveLog();
  4284. zapInfoReq.zapFileName = req.getZAPFileName();
  4285. zapInfoReq.password = req.getZAPPassword();
  4286. StringBuffer zipFileName, zipFileNameWithPath;
  4287. //CWsWuFileHelper may need ESP's <Directories> settings to locate log files.
  4288. CWsWuFileHelper helper(directories);
  4289. helper.createWUZAPFile(context, cwu, zapInfoReq, zipFileName, zipFileNameWithPath, thorSlaveLogThreadPoolSize);
  4290. //Download ZIP file to user
  4291. Owned<IFile> f = createIFile(zipFileNameWithPath.str());
  4292. Owned<IFileIO> io = f->open(IFOread);
  4293. unsigned zapFileSize = (unsigned) io->size();
  4294. if (zapFileSize > MAX_ZAP_BUFFER_SIZE)
  4295. throw MakeStringException(ECLWATCH_INVALID_INPUT, "WUCreateZAPInfo: ZAP file size is too big (>10M) to be retrieved. Please call /WsWorkunits/WUCreateAndDownloadZAPInfo using HTTP GET.");
  4296. MemoryBuffer mb;
  4297. void * data = mb.reserve(zapFileSize);
  4298. size32_t read = io->read(0, zapFileSize, data);
  4299. mb.setLength(read);
  4300. resp.setThefile(mb);
  4301. resp.setThefile_mimetype(HTTP_TYPE_OCTET_STREAM);
  4302. resp.setZAPFileName(zipFileName.str());
  4303. StringBuffer headerStr("attachment;filename=");
  4304. headerStr.append(zipFileName.str());
  4305. context.addCustomerHeader("Content-disposition", headerStr.str());
  4306. io->close();
  4307. f->remove();
  4308. }
  4309. catch(IException* e)
  4310. {
  4311. FORWARDEXCEPTION(context, e, ECLWATCH_INTERNAL_ERROR);
  4312. }
  4313. return true;
  4314. }
  4315. bool CWsWorkunitsEx::onWUGetZAPInfo(IEspContext &context, IEspWUGetZAPInfoRequest &req, IEspWUGetZAPInfoResponse &resp)
  4316. {
  4317. try
  4318. {
  4319. StringBuffer wuid(req.getWUID());
  4320. WsWuHelpers::checkAndTrimWorkunit("WUGetZAPInfo", wuid);
  4321. Owned<IWorkUnitFactory> factory = getWorkUnitFactory(context.querySecManager(), context.queryUser());
  4322. Owned<IConstWorkUnit> cw = factory->openWorkUnit(wuid);
  4323. if(!cw)
  4324. throw MakeStringException(ECLWATCH_CANNOT_OPEN_WORKUNIT,"Cannot open workunit %s.",wuid.str());
  4325. ensureWsWorkunitAccess(context, *cw, SecAccess_Read);
  4326. PROGLOG("WUGetZAPInfo: %s", wuid.str());
  4327. StringBuffer EspIP, ThorIP;
  4328. resp.setWUID(wuid.str());
  4329. resp.setBuildVersion(getBuildVersion());
  4330. IpAddress ipaddr = queryHostIP();
  4331. ipaddr.getIpText(EspIP);
  4332. resp.setESPIPAddress(EspIP.str());
  4333. //Get Archive
  4334. Owned<IConstWUQuery> query = cw->getQuery();
  4335. if(query)
  4336. {
  4337. SCMStringBuffer queryText;
  4338. query->getQueryText(queryText);
  4339. if (queryText.length() && isArchiveQuery(queryText.str()))
  4340. resp.setArchive(queryText.str());
  4341. }
  4342. //Get Thor IP
  4343. BoolHash uniqueProcesses;
  4344. Owned<IStringIterator> thorInstances = cw->getProcesses("Thor");
  4345. ForEach (*thorInstances)
  4346. {
  4347. SCMStringBuffer processName;
  4348. thorInstances->str(processName);
  4349. if (processName.length() < 1)
  4350. continue;
  4351. bool* found = uniqueProcesses.getValue(processName.str());
  4352. if (found && *found)
  4353. continue;
  4354. uniqueProcesses.setValue(processName.str(), true);
  4355. Owned<IStringIterator> thorLogs = cw->getLogs("Thor", processName.str());
  4356. ForEach (*thorLogs)
  4357. {
  4358. SCMStringBuffer logName;
  4359. thorLogs->str(logName);
  4360. if (!logName.length())
  4361. continue;
  4362. const char* thorIPPtr = NULL;
  4363. const char* ptr = logName.str();
  4364. while (ptr)
  4365. {
  4366. if (!thorIPPtr && (*ptr != '/'))
  4367. thorIPPtr = ptr;
  4368. else if (thorIPPtr && (*ptr == '/'))
  4369. break;
  4370. ptr++;
  4371. }
  4372. if (!thorIPPtr)
  4373. continue;
  4374. //Found a thor IP
  4375. if (ThorIP.length())
  4376. ThorIP.append(",");
  4377. if (!*ptr)
  4378. ThorIP.append(thorIPPtr);
  4379. else
  4380. ThorIP.append(ptr-thorIPPtr, thorIPPtr);
  4381. }
  4382. }
  4383. if (ThorIP.length())
  4384. resp.setThorIPAddress(ThorIP.str());
  4385. double version = context.getClientVersion();
  4386. if (version >= 1.73)
  4387. {
  4388. resp.setEmailTo(zapEmailTo.get());
  4389. resp.setEmailFrom(zapEmailFrom.get());
  4390. }
  4391. }
  4392. catch(IException* e)
  4393. {
  4394. FORWARDEXCEPTION(context, e, ECLWATCH_INTERNAL_ERROR);
  4395. }
  4396. return true;
  4397. }
  4398. bool CWsWorkunitsEx::onWUCheckFeatures(IEspContext &context, IEspWUCheckFeaturesRequest &req, IEspWUCheckFeaturesResponse &resp)
  4399. {
  4400. resp.setBuildVersionMajor(BUILD_VERSION_MAJOR);
  4401. resp.setBuildVersionMinor(BUILD_VERSION_MINOR);
  4402. resp.setBuildVersionPoint(BUILD_VERSION_POINT);
  4403. resp.setMaxRequestEntityLength(maxRequestEntityLength);
  4404. resp.updateDeployment().setUseCompression(true);
  4405. return true;
  4406. }
  4407. static const char * checkGetStatsNullInput(const char * s)
  4408. {
  4409. if (!s || !*s)
  4410. return nullptr;
  4411. return s;
  4412. }
  4413. static const char * checkGetStatsInput(const char * s)
  4414. {
  4415. if (!s || !*s)
  4416. return "*";
  4417. return s;
  4418. }
  4419. bool CWsWorkunitsEx::onWUGetStats(IEspContext &context, IEspWUGetStatsRequest &req, IEspWUGetStatsResponse &resp)
  4420. {
  4421. //This function is deprecated for 7.x and will be removed shortly afterwards.
  4422. //Anything that cannot be implemented with the scope iterator is implemented as a post filter
  4423. try
  4424. {
  4425. const char* creatorType = checkGetStatsNullInput(req.getCreatorType());
  4426. const char* creator = checkGetStatsNullInput(req.getCreator());
  4427. const char* scopeType = checkGetStatsNullInput(req.getScopeType());
  4428. const char* scope = checkGetStatsNullInput(req.getScope());
  4429. const char* kind = checkGetStatsNullInput(req.getKind());
  4430. const char* measure = req.getMeasure();
  4431. WuScopeFilter filter;
  4432. StatisticsFilter statsFilter(creatorType, creator, "*", "*", "*", "*");
  4433. filter.addOutputProperties(PTstatistics);
  4434. if (scopeType)
  4435. filter.addScopeType(scopeType);
  4436. if (scope)
  4437. filter.addScope(scope);
  4438. if (kind)
  4439. filter.addOutputStatistic(kind);
  4440. if (measure)
  4441. filter.setMeasure(measure);
  4442. if (!req.getMinScopeDepth_isNull() && !req.getMaxScopeDepth_isNull())
  4443. filter.setDepth(req.getMinScopeDepth(), req.getMaxScopeDepth());
  4444. else if (!req.getMinScopeDepth_isNull())
  4445. filter.setDepth(req.getMinScopeDepth(), req.getMinScopeDepth());
  4446. if (!req.getMinValue_isNull() || !req.getMaxValue_isNull())
  4447. {
  4448. unsigned __int64 lowValue = 0;
  4449. unsigned __int64 highValue = MaxStatisticValue;
  4450. if (!req.getMinValue_isNull())
  4451. lowValue = (unsigned __int64)req.getMinValue();
  4452. if (!req.getMaxValue_isNull())
  4453. highValue = (unsigned __int64)req.getMaxValue();
  4454. statsFilter.setValueRange(lowValue, highValue);
  4455. }
  4456. const char * textFilter = req.getFilter();
  4457. if (textFilter)
  4458. statsFilter.setFilter(textFilter);
  4459. filter.setIncludeNesting(0).finishedFilter();
  4460. bool createDescriptions = false;
  4461. if (!req.getCreateDescriptions_isNull())
  4462. createDescriptions = req.getCreateDescriptions();
  4463. StringBuffer wuid(req.getWUID());
  4464. PROGLOG("WUGetStats: %s", wuid.str());
  4465. IArrayOf<IEspWUStatisticItem> statistics;
  4466. if (strchr(wuid, '*'))
  4467. {
  4468. WUSortField filters[2];
  4469. MemoryBuffer filterbuf;
  4470. filters[0] = WUSFwildwuid;
  4471. filterbuf.append(wuid.str());
  4472. filters[1] = WUSFterm;
  4473. Owned<IWorkUnitFactory> factory = getWorkUnitFactory(context.querySecManager(), context.queryUser());
  4474. Owned<IConstWorkUnitIterator> iter = factory->getWorkUnitsSorted((WUSortField) (WUSFwuid), filters, filterbuf.bufferBase(), 0, INT_MAX, NULL, NULL);
  4475. ForEach(*iter)
  4476. {
  4477. Owned<IConstWorkUnit> workunit = factory->openWorkUnit(iter->query().queryWuid());
  4478. if (workunit)
  4479. {
  4480. //No need to check for access since the list is already filtered
  4481. WsWuInfo winfo(context, workunit->queryWuid());
  4482. winfo.getStats(filter, statsFilter, createDescriptions, statistics);
  4483. }
  4484. }
  4485. }
  4486. else
  4487. {
  4488. WsWuHelpers::checkAndTrimWorkunit("WUInfo", wuid);
  4489. ensureWsWorkunitAccess(context, wuid, SecAccess_Read);
  4490. WsWuInfo winfo(context, wuid);
  4491. winfo.getStats(filter, statsFilter, createDescriptions, statistics);
  4492. }
  4493. resp.setStatistics(statistics);
  4494. resp.setWUID(wuid.str());
  4495. }
  4496. catch(IException* e)
  4497. {
  4498. FORWARDEXCEPTION(context, e, ECLWATCH_INTERNAL_ERROR);
  4499. }
  4500. return true;
  4501. }
  4502. IPropertyTree* CWsWorkunitsEx::getWorkunitArchive(IEspContext &context, WsWuInfo& winfo, const char* wuid, unsigned cacheMinutes)
  4503. {
  4504. Owned<WUArchiveCacheElement> wuArchive = wuArchiveCache->lookup(context, wuid, cacheMinutes);
  4505. if (wuArchive)
  4506. return wuArchive->archive.getLink();
  4507. Owned<IPropertyTree> archive = winfo.getWorkunitArchive();
  4508. if (!archive)
  4509. return NULL;
  4510. wuArchiveCache->add(wuid, archive.getLink());
  4511. return archive.getClear();
  4512. }
  4513. bool CWsWorkunitsEx::onWUListArchiveFiles(IEspContext &context, IEspWUListArchiveFilesRequest &req, IEspWUListArchiveFilesResponse &resp)
  4514. {
  4515. try
  4516. {
  4517. const char* wuid = req.getWUID();
  4518. if (isEmpty(wuid))
  4519. throw MakeStringException(ECLWATCH_NO_WUID_SPECIFIED, "No workunit defined.");
  4520. ensureWsWorkunitAccess(context, wuid, SecAccess_Read);
  4521. PROGLOG("WUListArchiveFiles: %s", wuid);
  4522. WsWuInfo winfo(context, wuid);
  4523. Owned<IPropertyTree> archive = getWorkunitArchive(context, winfo, wuid, WUARCHIVE_CACHE_MINITES);
  4524. if (!archive)
  4525. throw MakeStringException(ECLWATCH_INVALID_INPUT,"No workunit archive found for %s.", wuid);
  4526. IArrayOf<IEspWUArchiveModule> modules;
  4527. IArrayOf<IEspWUArchiveFile> files;
  4528. winfo.listArchiveFiles(archive, "", modules, files);
  4529. if (modules.length())
  4530. resp.setArchiveModules(modules);
  4531. if (files.length())
  4532. resp.setFiles(files);
  4533. if (!modules.length() && !files.length())
  4534. resp.setMessage("Files not found");
  4535. }
  4536. catch(IException* e)
  4537. {
  4538. FORWARDEXCEPTION(context, e, ECLWATCH_INTERNAL_ERROR);
  4539. }
  4540. return true;
  4541. }
  4542. bool CWsWorkunitsEx::onWUGetArchiveFile(IEspContext &context, IEspWUGetArchiveFileRequest &req, IEspWUGetArchiveFileResponse &resp)
  4543. {
  4544. try
  4545. {
  4546. const char* wuid = req.getWUID();
  4547. const char* moduleName = req.getModuleName();
  4548. const char* attrName = req.getFileName();
  4549. if (isEmpty(wuid))
  4550. throw MakeStringException(ECLWATCH_NO_WUID_SPECIFIED, "No workunit defined.");
  4551. if (isEmpty(moduleName) && isEmpty(attrName))
  4552. throw MakeStringException(ECLWATCH_INVALID_INPUT, "No file name defined.");
  4553. ensureWsWorkunitAccess(context, wuid, SecAccess_Read);
  4554. PROGLOG("WUGetArchiveFile: %s", wuid);
  4555. WsWuInfo winfo(context, wuid);
  4556. Owned<IPropertyTree> archive = getWorkunitArchive(context, winfo, wuid, WUARCHIVE_CACHE_MINITES);
  4557. if (!archive)
  4558. throw MakeStringException(ECLWATCH_INVALID_INPUT,"No workunit archive found for %s.", wuid);
  4559. StringBuffer file;
  4560. winfo.getArchiveFile(archive, moduleName, attrName, req.getPath(), file);
  4561. if (file.length())
  4562. resp.setFile(file.str());
  4563. else
  4564. resp.setMessage("File not found");
  4565. }
  4566. catch(IException* e)
  4567. {
  4568. FORWARDEXCEPTION(context, e, ECLWATCH_INTERNAL_ERROR);
  4569. }
  4570. return true;
  4571. }
  4572. const char *CWsWorkunitsEx::gatherQueryFileCopyErrors(IArrayOf<IConstLogicalFileError> &errors, StringBuffer &errorMsg)
  4573. {
  4574. if (!errors.ordinality())
  4575. return errorMsg.str();
  4576. errorMsg.append("Query File Copy Error(s):");
  4577. ForEachItemIn(i, errors)
  4578. {
  4579. IConstLogicalFileError &error = errors.item(i);
  4580. errorMsg.append(" ").append(error.getLogicalName()).append(": ");
  4581. errorMsg.append(error.getError()).append(";");
  4582. }
  4583. return errorMsg.str();
  4584. }
  4585. const char *CWsWorkunitsEx::gatherExceptionMessage(const IMultiException &me, StringBuffer &exceptionMsg)
  4586. {
  4587. exceptionMsg.append("Exception(s):");
  4588. aindex_t count = me.ordinality();
  4589. for (aindex_t i=0; i<count; i++)
  4590. {
  4591. IException& e = me.item(i);
  4592. StringBuffer errMsg;
  4593. exceptionMsg.append(" ").append(e.errorCode()).append(": ");
  4594. exceptionMsg.append(e.errorMessage(errMsg).str()).append(";");
  4595. }
  4596. exceptionMsg.append("\n");
  4597. return exceptionMsg.str();
  4598. }
  4599. const char *CWsWorkunitsEx::gatherWUException(IConstWUExceptionIterator &it, StringBuffer &exceptionMsg)
  4600. {
  4601. unsigned numErr = 0, numWRN = 0, numInf = 0, numAlert = 0;
  4602. ForEach(it)
  4603. {
  4604. IConstWUException & cur = it.query();
  4605. SCMStringBuffer src, msg, file;
  4606. exceptionMsg.append(" Exception: Code: ").append(cur.getExceptionCode());
  4607. exceptionMsg.append(" Source: ").append(cur.getExceptionSource(src).str());
  4608. exceptionMsg.append(" Message: ").append(cur.getExceptionMessage(msg).str());
  4609. exceptionMsg.append(" FileName: ").append(cur.getExceptionFileName(file).str());
  4610. exceptionMsg.append(" LineNo: ").append(cur.getExceptionLineNo());
  4611. exceptionMsg.append(" Column: ").append(cur.getExceptionColumn());
  4612. if (cur.getActivityId())
  4613. exceptionMsg.append(" ActivityId: ").append(cur.getActivityId());
  4614. if (cur.getPriority())
  4615. exceptionMsg.append(" Priority: ").append(cur.getPriority());
  4616. exceptionMsg.append(" Scope: ").append(cur.queryScope());
  4617. const char * label = "";
  4618. switch (cur.getSeverity())
  4619. {
  4620. default:
  4621. case SeverityError: label = "Error"; numErr++; break;
  4622. case SeverityWarning: label = "Warning"; numWRN++; break;
  4623. case SeverityInformation: label = "Info"; numInf++; break;
  4624. case SeverityAlert: label = "Alert"; numAlert++; break;
  4625. }
  4626. exceptionMsg.append(" Severity: ").append(label);
  4627. }
  4628. exceptionMsg.append(" Total error: ").append(numErr);
  4629. exceptionMsg.append(" warning: ").append(numWRN);
  4630. exceptionMsg.append(" info: ").append(numInf);
  4631. exceptionMsg.append(" alert: ").append(numAlert);
  4632. exceptionMsg.append("\n");
  4633. return exceptionMsg.str();
  4634. }
  4635. const char *CWsWorkunitsEx::gatherECLException(IArrayOf<IConstECLException> &exceptions, StringBuffer &exceptionMsg)
  4636. {
  4637. unsigned errorCount = 0, warningCount = 0;
  4638. ForEachItemIn(i, exceptions)
  4639. {
  4640. IConstECLException &e = exceptions.item(i);
  4641. if (strieq(e.getSeverity(), "warning"))
  4642. {
  4643. warningCount++;
  4644. exceptionMsg.append(" Warning: ");
  4645. }
  4646. else if (strieq(e.getSeverity(), "error"))
  4647. {
  4648. errorCount++;
  4649. exceptionMsg.append(" Error: ");
  4650. }
  4651. if (e.getSource())
  4652. exceptionMsg.append(e.getSource()).append(": ");
  4653. if (e.getFileName())
  4654. exceptionMsg.append(e.getFileName());
  4655. if (!e.getLineNo_isNull() && !e.getColumn_isNull())
  4656. exceptionMsg.appendf("(%d,%d): ", e.getLineNo(), e.getColumn());
  4657. exceptionMsg.appendf("%s C%d: %s;", e.getSeverity(), e.getCode(), e.getMessage());
  4658. }
  4659. exceptionMsg.append(" Total error: ").append(errorCount);
  4660. exceptionMsg.append(" warning: ").append(warningCount);
  4661. exceptionMsg.append("\n");
  4662. return exceptionMsg.str();
  4663. }
  4664. bool CWsWorkunitsEx::readDeployWUResponse(CWUDeployWorkunitResponse* deployResponse, StringBuffer &wuid, StringBuffer &result)
  4665. {
  4666. const IMultiException &me = deployResponse->getExceptions();
  4667. if (me.ordinality())
  4668. gatherExceptionMessage(me, result);
  4669. const char *w = deployResponse->getWorkunit().getWuid();
  4670. if (isEmptyString(w))
  4671. {
  4672. result.appendf("Error: no workunit ID!");
  4673. return false;
  4674. }
  4675. wuid.set(w);
  4676. const char *state = deployResponse->getWorkunit().getState();
  4677. bool isCompiled = (strieq(state, "compiled") || strieq(state, "completed"));
  4678. if (!isCompiled)
  4679. result.appendf("state: %s;", state);
  4680. gatherECLException(deployResponse->getWorkunit().getExceptions(), result);
  4681. return isCompiled;
  4682. }
  4683. void CWsWorkunitsEx::addEclDefinitionActionResult(const char *eclDefinition, const char *result, const char *wuid,
  4684. const char *queryID, const char* strAction, bool logResult, IArrayOf<IConstWUEclDefinitionActionResult> &results)
  4685. {
  4686. Owned<IEspWUEclDefinitionActionResult> res = createWUEclDefinitionActionResult();
  4687. if (!isEmptyString(eclDefinition))
  4688. res->setEclDefinition(eclDefinition);
  4689. res->setAction(strAction);
  4690. res->setResult(result);
  4691. if (!isEmptyString(wuid))
  4692. res->setWUID(wuid);
  4693. if (!isEmptyString(queryID))
  4694. res->setQueryID(queryID);
  4695. results.append(*res.getClear());
  4696. if (logResult)
  4697. PROGLOG("%s", result);
  4698. }
  4699. void CWsWorkunitsEx::checkEclDefinitionSyntax(IEspContext &context, const char *target, const char *eclDefinition,
  4700. int msToWait, IArrayOf<IConstWUEclDefinitionActionResult> &results)
  4701. {
  4702. Owned<IWorkUnitFactory> factory = getWorkUnitFactory(context.querySecManager(), context.queryUser());
  4703. NewWsWorkunit wu(factory, context);
  4704. wu->setAction(WUActionCheck);
  4705. wu.setQueryMain(eclDefinition);
  4706. StringAttr wuid(wu->queryWuid()); // NB queryWuid() not valid after workunit.clear()
  4707. wu->commit();
  4708. wu.clear();
  4709. WsWuHelpers::submitWsWorkunit(context, wuid.str(), target, nullptr, 0, true, false, false);
  4710. waitForWorkUnitToComplete(wuid.str(), msToWait);
  4711. Owned<IConstWorkUnit> cw(factory->openWorkUnit(wuid.str()));
  4712. WUState st = cw->getState();
  4713. bool wuTimeout = (st != WUStateAborted) && (st != WUStateCompleted) && (st != WUStateFailed);
  4714. VStringBuffer result(" WUSyntaxCheckECL for %s:", eclDefinition);
  4715. if (wuTimeout)
  4716. result.append(" timed out.");
  4717. gatherWUException(cw->getExceptions(), result);
  4718. addEclDefinitionActionResult(eclDefinition, result.str(), wuid.str(), nullptr, "SyntaxCheck", true, results);
  4719. cw.clear();
  4720. if (wuTimeout)
  4721. abortWorkUnit(wuid.str(), context.querySecManager(), context.queryUser());
  4722. if (!factory->deleteWorkUnit(wuid.str()))
  4723. {
  4724. result.setf(" Workunit %s cannot be deleted now. You may delete it when its status changes.", wuid.str());
  4725. addEclDefinitionActionResult(eclDefinition, result.str(), wuid.str(), nullptr, "SyntaxCheck", true, results);
  4726. }
  4727. }
  4728. bool CWsWorkunitsEx::deployEclDefinition(IEspContext &context, const char *target, const char *eclDefinition,
  4729. int msToWait, StringBuffer &wuid, StringBuffer &result)
  4730. {
  4731. Owned<CWUDeployWorkunitRequest> deployReq = new CWUDeployWorkunitRequest("WsWorkunits");
  4732. deployReq->setName(eclDefinition);
  4733. deployReq->setQueryMainDefinition(eclDefinition);
  4734. deployReq->setObjType("compressed_ecl_text");
  4735. deployReq->setCluster(target);
  4736. deployReq->setWait(msToWait);
  4737. deployReq->setFileName("");
  4738. Owned<CWUDeployWorkunitResponse> deployResponse = new CWUDeployWorkunitResponse("WsWorkunits");
  4739. onWUDeployWorkunit(context, *deployReq, *deployResponse);
  4740. return readDeployWUResponse(deployResponse, wuid, result);
  4741. }
  4742. void CWsWorkunitsEx::deployEclDefinition(IEspContext &context, const char *target, const char *eclDefinition,
  4743. int msToWait, IArrayOf<IConstWUEclDefinitionActionResult> &results)
  4744. {
  4745. StringBuffer wuid, finalResult;
  4746. deployEclDefinition(context, target, eclDefinition, msToWait, wuid, finalResult);
  4747. addEclDefinitionActionResult(eclDefinition, finalResult.str(), wuid.str(), nullptr, "Deploy", true, results);
  4748. }
  4749. void CWsWorkunitsEx::publishEclDefinition(IEspContext &context, const char *target, const char *eclDefinition,
  4750. int msToWait, IEspWUEclDefinitionActionRequest &req, IArrayOf<IConstWUEclDefinitionActionResult> &results)
  4751. {
  4752. StringBuffer priorityReq(req.getPriority());
  4753. if (priorityReq.trim().length() && !isValidPriorityValue(priorityReq.str()))
  4754. {
  4755. VStringBuffer msg("Invalid Priority: %s", priorityReq.str());
  4756. addEclDefinitionActionResult(eclDefinition, msg.str(), nullptr, nullptr, "Publish", true, results);
  4757. return;
  4758. }
  4759. StringBuffer memoryLimitReq(req.getMemoryLimit());
  4760. if (memoryLimitReq.trim().length() && !isValidMemoryValue(memoryLimitReq.str()))
  4761. {
  4762. VStringBuffer msg("Invalid MemoryLimit: %s", memoryLimitReq.str());
  4763. addEclDefinitionActionResult(eclDefinition, msg.str(), nullptr, nullptr, "Publish", true, results);
  4764. return;
  4765. }
  4766. time_t timenow;
  4767. int startTime = time(&timenow);
  4768. //Do deploy first
  4769. StringBuffer wuid, finalResult;
  4770. if (!deployEclDefinition(context, target, eclDefinition, msToWait, wuid, finalResult))
  4771. {
  4772. addEclDefinitionActionResult(eclDefinition, finalResult.str(), wuid.str(), nullptr, "Publish", true, results);
  4773. return;
  4774. }
  4775. int timeLeft = msToWait - (time(&timenow) - startTime);
  4776. if (timeLeft <= 0)
  4777. {
  4778. addEclDefinitionActionResult(eclDefinition, "Timed out after deployment", wuid.str(), nullptr, "Publish", true, results);
  4779. return;
  4780. }
  4781. //Do publish now
  4782. StringBuffer comment(req.getComment());
  4783. StringBuffer remoteDali(req.getRemoteDali());
  4784. StringBuffer sourceProcess(req.getSourceProcess());
  4785. int timeLimit = req.getTimeLimit();
  4786. int warnTimeLimit = req.getWarnTimeLimit();
  4787. Owned<CWUPublishWorkunitRequest> publishReq = new CWUPublishWorkunitRequest("WsWorkunits");
  4788. publishReq->setWuid(wuid.str());
  4789. publishReq->setCluster(target);
  4790. publishReq->setJobName(eclDefinition);
  4791. if (!remoteDali.trim().isEmpty())
  4792. publishReq->setRemoteDali(remoteDali.str());
  4793. if (!sourceProcess.trim().isEmpty())
  4794. publishReq->setSourceProcess(sourceProcess.str());
  4795. if (!priorityReq.isEmpty())
  4796. publishReq->setPriority(priorityReq.str());
  4797. if (comment.str()) //allow empty
  4798. publishReq->setComment(comment.str());
  4799. if (req.getDeletePrevious())
  4800. publishReq->setActivate(CWUQueryActivationMode_ActivateDeletePrevious);
  4801. else if (req.getSuspendPrevious())
  4802. publishReq->setActivate(CWUQueryActivationMode_ActivateSuspendPrevious);
  4803. else
  4804. publishReq->setActivate(req.getNoActivate() ? CWUQueryActivationMode_NoActivate : CWUQueryActivationMode_Activate);
  4805. publishReq->setWait(timeLeft);
  4806. publishReq->setNoReload(req.getNoReload());
  4807. publishReq->setDontCopyFiles(req.getDontCopyFiles());
  4808. publishReq->setAllowForeignFiles(req.getAllowForeign());
  4809. publishReq->setUpdateDfs(req.getUpdateDfs());
  4810. publishReq->setUpdateSuperFiles(req.getUpdateSuperfiles());
  4811. publishReq->setUpdateCloneFrom(req.getUpdateCloneFrom());
  4812. publishReq->setAppendCluster(!req.getDontAppendCluster());
  4813. publishReq->setIncludeFileErrors(true);
  4814. if (timeLimit != -1)
  4815. publishReq->setTimeLimit(timeLimit);
  4816. if (warnTimeLimit != (unsigned) -1)
  4817. publishReq->setWarnTimeLimit(warnTimeLimit);
  4818. if (!memoryLimitReq.isEmpty())
  4819. publishReq->setMemoryLimit(memoryLimitReq.str());
  4820. Owned<CWUPublishWorkunitResponse> publishResponse = new CWUPublishWorkunitResponse("WsWorkunits");
  4821. onWUPublishWorkunit(context, *publishReq, *publishResponse);
  4822. const char *id = publishResponse->getQueryId();
  4823. if (!isEmptyString(id))
  4824. {
  4825. const char *qs = publishResponse->getQuerySet();
  4826. finalResult.append(" ").append(qs ? qs : "").append('/').append(id).append(" published. ");
  4827. }
  4828. if (publishResponse->getReloadFailed())
  4829. finalResult.append(" Added to target, but request to reload queries on cluster failed.");
  4830. const IMultiException &me = publishResponse->getExceptions();
  4831. if (me.ordinality())
  4832. gatherExceptionMessage(me, finalResult);
  4833. gatherQueryFileCopyErrors(publishResponse->getFileErrors(), finalResult);
  4834. addEclDefinitionActionResult(eclDefinition, finalResult.str(), wuid.str(), id, "Publish", true, results);
  4835. }
  4836. bool CWsWorkunitsEx::onWUEclDefinitionAction(IEspContext &context, IEspWUEclDefinitionActionRequest &req, IEspWUEclDefinitionActionResponse &resp)
  4837. {
  4838. try
  4839. {
  4840. CEclDefinitionActions action = req.getActionType();
  4841. if (action == EclDefinitionActions_Undefined)
  4842. throw MakeStringException(ECLWATCH_INVALID_INPUT,"Action not defined in onWUEclDefinitionAction.");
  4843. ensureWsCreateWorkunitAccess(context);
  4844. StringBuffer target(req.getTarget());
  4845. if (target.trim().isEmpty())
  4846. throw MakeStringException(ECLWATCH_INVALID_INPUT,"Target not defined in onWUEclDefinitionAction.");
  4847. IArrayOf<IConstWUEclDefinitionActionResult> results;
  4848. StringArray &eclDefinitions = req.getEclDefinitions();
  4849. int msToWait = req.getMsToWait();
  4850. for (aindex_t i = 0; i < eclDefinitions.length(); i++)
  4851. {
  4852. StringBuffer eclDefinitionName(eclDefinitions.item(i));
  4853. if (eclDefinitionName.trim().isEmpty())
  4854. UWARNLOG("Empty ECL Definition name in WUEclDefinitionAction request");
  4855. else if (action == CEclDefinitionActions_SyntaxCheck)
  4856. checkEclDefinitionSyntax(context, target.str(), eclDefinitionName.str(), msToWait, results);
  4857. else if (action == CEclDefinitionActions_Deploy)
  4858. deployEclDefinition(context, target.str(), eclDefinitionName.str(), msToWait, results);
  4859. else
  4860. publishEclDefinition(context, target.str(), eclDefinitionName.str(), msToWait, req, results);
  4861. }
  4862. resp.setActionResults(results);
  4863. }
  4864. catch(IException* e)
  4865. {
  4866. FORWARDEXCEPTION(context, e, ECLWATCH_INTERNAL_ERROR);
  4867. }
  4868. return true;
  4869. }