123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323132413251326132713281329133013311332133313341335133613371338133913401341134213431344134513461347134813491350135113521353135413551356135713581359136013611362136313641365136613671368136913701371137213731374137513761377137813791380138113821383138413851386138713881389139013911392139313941395139613971398139914001401140214031404140514061407140814091410141114121413141414151416141714181419142014211422142314241425142614271428142914301431143214331434143514361437143814391440144114421443144414451446144714481449145014511452145314541455145614571458145914601461146214631464146514661467146814691470147114721473147414751476147714781479148014811482148314841485148614871488148914901491149214931494149514961497149814991500150115021503150415051506150715081509151015111512151315141515151615171518151915201521152215231524152515261527152815291530153115321533153415351536153715381539154015411542154315441545154615471548154915501551155215531554155515561557155815591560156115621563156415651566156715681569157015711572157315741575157615771578157915801581158215831584158515861587158815891590159115921593159415951596159715981599160016011602160316041605160616071608160916101611161216131614161516161617161816191620162116221623162416251626162716281629163016311632163316341635163616371638163916401641164216431644164516461647164816491650165116521653165416551656165716581659166016611662166316641665166616671668166916701671167216731674167516761677167816791680168116821683168416851686168716881689169016911692169316941695169616971698169917001701170217031704170517061707170817091710171117121713171417151716171717181719172017211722172317241725172617271728172917301731173217331734173517361737173817391740174117421743174417451746174717481749175017511752175317541755175617571758175917601761176217631764176517661767176817691770177117721773177417751776177717781779178017811782178317841785178617871788178917901791179217931794179517961797179817991800180118021803180418051806180718081809181018111812181318141815181618171818181918201821182218231824182518261827182818291830183118321833183418351836183718381839184018411842184318441845184618471848184918501851185218531854185518561857185818591860186118621863186418651866186718681869187018711872187318741875187618771878187918801881188218831884188518861887188818891890189118921893189418951896189718981899190019011902190319041905190619071908190919101911191219131914191519161917191819191920192119221923192419251926192719281929193019311932193319341935193619371938193919401941194219431944194519461947194819491950195119521953195419551956195719581959196019611962196319641965196619671968196919701971197219731974197519761977197819791980198119821983198419851986198719881989199019911992199319941995199619971998199920002001200220032004200520062007200820092010201120122013201420152016201720182019202020212022202320242025202620272028202920302031203220332034203520362037203820392040204120422043204420452046204720482049205020512052205320542055205620572058205920602061206220632064206520662067206820692070207120722073207420752076207720782079208020812082208320842085208620872088208920902091209220932094209520962097209820992100210121022103210421052106210721082109211021112112211321142115211621172118211921202121212221232124212521262127212821292130213121322133213421352136213721382139214021412142214321442145214621472148214921502151215221532154215521562157215821592160216121622163216421652166216721682169217021712172217321742175217621772178217921802181218221832184218521862187218821892190219121922193219421952196219721982199220022012202220322042205220622072208220922102211221222132214221522162217221822192220222122222223222422252226222722282229223022312232223322342235223622372238223922402241224222432244224522462247224822492250225122522253225422552256225722582259226022612262226322642265226622672268226922702271227222732274227522762277227822792280228122822283228422852286228722882289229022912292229322942295229622972298229923002301230223032304230523062307230823092310231123122313231423152316231723182319232023212322232323242325232623272328232923302331233223332334233523362337233823392340234123422343234423452346234723482349235023512352235323542355235623572358235923602361236223632364236523662367236823692370237123722373237423752376237723782379238023812382238323842385238623872388238923902391239223932394239523962397239823992400240124022403240424052406240724082409241024112412241324142415241624172418241924202421242224232424242524262427242824292430243124322433243424352436243724382439244024412442244324442445244624472448244924502451245224532454245524562457245824592460246124622463246424652466246724682469247024712472247324742475247624772478247924802481248224832484248524862487248824892490249124922493249424952496249724982499250025012502250325042505250625072508250925102511251225132514251525162517251825192520252125222523252425252526252725282529253025312532253325342535253625372538253925402541254225432544254525462547254825492550255125522553255425552556255725582559256025612562256325642565256625672568256925702571257225732574257525762577257825792580258125822583258425852586258725882589259025912592259325942595259625972598259926002601260226032604260526062607260826092610261126122613261426152616261726182619262026212622262326242625262626272628262926302631263226332634263526362637263826392640264126422643264426452646264726482649265026512652265326542655265626572658265926602661266226632664266526662667266826692670267126722673267426752676267726782679268026812682268326842685268626872688268926902691269226932694269526962697269826992700270127022703270427052706270727082709271027112712271327142715271627172718271927202721272227232724272527262727272827292730273127322733273427352736273727382739274027412742274327442745274627472748274927502751275227532754275527562757275827592760276127622763276427652766276727682769277027712772277327742775277627772778277927802781278227832784278527862787278827892790279127922793279427952796279727982799280028012802280328042805280628072808280928102811281228132814281528162817281828192820282128222823282428252826282728282829283028312832283328342835283628372838283928402841284228432844284528462847284828492850285128522853285428552856285728582859286028612862286328642865286628672868286928702871287228732874287528762877287828792880288128822883288428852886288728882889289028912892289328942895289628972898289929002901290229032904290529062907290829092910291129122913291429152916291729182919292029212922292329242925292629272928292929302931293229332934293529362937293829392940294129422943294429452946294729482949295029512952295329542955295629572958295929602961296229632964296529662967296829692970297129722973297429752976297729782979298029812982298329842985298629872988298929902991299229932994299529962997299829993000300130023003300430053006300730083009301030113012301330143015301630173018301930203021302230233024302530263027302830293030303130323033303430353036303730383039304030413042304330443045304630473048304930503051305230533054305530563057305830593060306130623063306430653066306730683069307030713072307330743075307630773078307930803081308230833084308530863087308830893090309130923093309430953096309730983099310031013102310331043105310631073108310931103111311231133114311531163117311831193120312131223123312431253126312731283129313031313132313331343135313631373138313931403141314231433144314531463147314831493150315131523153315431553156315731583159316031613162316331643165316631673168316931703171317231733174317531763177317831793180318131823183318431853186318731883189319031913192319331943195319631973198319932003201320232033204320532063207320832093210321132123213321432153216321732183219322032213222322332243225322632273228322932303231323232333234323532363237323832393240324132423243324432453246324732483249325032513252325332543255325632573258325932603261326232633264326532663267326832693270327132723273327432753276327732783279328032813282328332843285328632873288328932903291329232933294329532963297329832993300330133023303330433053306330733083309331033113312331333143315331633173318331933203321332233233324332533263327332833293330333133323333333433353336333733383339334033413342334333443345334633473348334933503351335233533354335533563357335833593360336133623363336433653366336733683369337033713372337333743375337633773378337933803381338233833384338533863387338833893390339133923393339433953396339733983399 |
- /*##############################################################################
- HPCC SYSTEMS software Copyright (C) 2012 HPCC Systems®.
- Licensed under the Apache License, Version 2.0 (the "License");
- you may not use this file except in compliance with the License.
- You may obtain a copy of the License at
- http://www.apache.org/licenses/LICENSE-2.0
- Unless required by applicable law or agreed to in writing, software
- distributed under the License is distributed on an "AS IS" BASIS,
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- See the License for the specific language governing permissions and
- limitations under the License.
- ############################################################################## */
- #include "ws_workunitsService.hpp"
- #include "ws_fs.hpp"
- #include "jlib.hpp"
- #include "jflz.hpp"
- #include "daclient.hpp"
- #include "dalienv.hpp"
- #include "dadfs.hpp"
- #include "dfuwu.hpp"
- #include "eclhelper.hpp"
- #include "roxiecontrol.hpp"
- #include "dfuutil.hpp"
- #include "dautils.hpp"
- #include "httpclient.hpp"
- #include "portlist.h" //ROXIE_SERVER_PORT
- #define DALI_FILE_LOOKUP_TIMEOUT (1000*15*1) // 15 seconds
- const unsigned ROXIECONNECTIONTIMEOUT = 1000; //1 second
- const unsigned ROXIECONTROLQUERYTIMEOUT = 3000; //3 second
- const unsigned ROXIECONTROLQUERIESTIMEOUT = 30000; //30 second
- const unsigned ROXIELOCKCONNECTIONTIMEOUT = 60000; //60 second
- #define SDS_LOCK_TIMEOUT (5*60*1000) // 5mins, 30s a bit short
- //The CQuerySetQueryActionTypes[] has to match with the ESPenum QuerySetQueryActionTypes in the ecm file.
- static unsigned NumOfQuerySetQueryActionTypes = 7;
- static const char *QuerySetQueryActionTypes[] = { "Suspend", "Unsuspend", "ToggleSuspend", "Activate",
- "Delete", "RemoveAllAliases", "ResetQueryStats", NULL };
- //The CQuerySetAliasActionTypes[] has to match with the ESPenum QuerySetAliasActionTypes in the ecm file.
- static unsigned NumOfQuerySetAliasActionTypes = 1;
- static const char *QuerySetAliasActionTypes[] = { "Deactivate", NULL };
- bool isRoxieProcess(const char *process)
- {
- if (!process)
- return false;
- Owned<IEnvironmentFactory> factory = getEnvironmentFactory(true);
- Owned<IConstEnvironment> env = factory->openEnvironment();
- Owned<IPropertyTree> root = &env->getPTree();
- VStringBuffer xpath("Software/RoxieCluster[@name=\"%s\"]", process);
- return root->hasProp(xpath.str());
- }
- void checkUseEspOrDaliIP(SocketEndpoint &ep, const char *ip, const char *esp)
- {
- if (!ip || !*ip)
- return;
- ep.set(ip, 7070);
- if (ep.isLoopBack() || *ip=='.' || (ip[0]=='0' && ip[1]=='.'))
- ep.ipset(esp);
- }
- void ensureInputString(const char* input, bool lowerCase, StringBuffer& inputStr, int code, const char* msg)
- {
- inputStr.set(input).trim();
- if (inputStr.isEmpty())
- throw MakeStringException(code, "%s", msg);
- if (lowerCase)
- inputStr.toLowerCase();
- }
- static IClientWsWorkunits *ensureWsWorkunitsClient(IClientWsWorkunits *ws, IEspContext *ctx, const char *netAddress)
- {
- if (ws)
- return LINK(ws);
- StringBuffer url;
- if (netAddress && *netAddress)
- url.appendf("http://%s%s/WsWorkunits", netAddress, (!strchr(netAddress, ':')) ? ":8010" : "");
- else
- {
- if (!ctx)
- throw MakeStringException(ECLWATCH_INVALID_IP, "Missing WsWorkunits service address");
- StringBuffer ip;
- short port = 0;
- ctx->getServAddress(ip, port);
- url.appendf("http://%s:%d/WsWorkunits", ip.str(), port);
- }
- Owned<IClientWsWorkunits> cws = createWsWorkunitsClient();
- cws->addServiceUrl(url);
- if (ctx && ctx->queryUserId() && *ctx->queryUserId())
- cws->setUsernameToken(ctx->queryUserId(), ctx->queryPassword(), NULL);
- return cws.getClear();
- }
- IClientWUQuerySetDetailsResponse *fetchQueryDetails(IClientWsWorkunits *_ws, IEspContext *ctx, const char *netAddress, const char *target, const char *queryid)
- {
- Owned<IClientWsWorkunits> ws = ensureWsWorkunitsClient(_ws, ctx, netAddress);
- //using existing WUQuerysetDetails rather than extending WUQueryDetails, to support copying query meta data from prior releases
- Owned<IClientWUQuerySetDetailsRequest> reqQueryInfo = ws->createWUQuerysetDetailsRequest();
- reqQueryInfo->setClusterName(target);
- reqQueryInfo->setQuerySetName(target);
- reqQueryInfo->setFilter(queryid);
- reqQueryInfo->setFilterType("Id");
- return ws->WUQuerysetDetails(reqQueryInfo);
- }
- void fetchRemoteWorkunit(IClientWsWorkunits *_ws, IEspContext *ctx, const char *netAddress, const char *queryset, const char *query, const char *wuid, StringBuffer &name, StringBuffer &xml, StringBuffer &dllname, MemoryBuffer &dll, StringBuffer &daliServer)
- {
- Owned<IClientWsWorkunits> ws = ensureWsWorkunitsClient(_ws, ctx, netAddress);
- Owned<IClientWULogFileRequest> req = ws->createWUFileRequest();
- if (queryset && *queryset)
- req->setQuerySet(queryset);
- if (query && *query)
- req->setQuery(query);
- if (wuid && *wuid)
- req->setWuid(wuid);
- req->setType("xml");
- Owned<IClientWULogFileResponse> resp = ws->WUFile(req);
- if (!resp || resp->getExceptions().ordinality() || !resp->getThefile().length())
- throw MakeStringException(ECLWATCH_CANNOT_GET_WORKUNIT, "Cannot retrieve remote workunit");
- xml.append(resp->getThefile().length(), resp->getThefile().toByteArray());
- req->setType("dll");
- resp.setown(ws->WUFile(req));
- if (!resp || resp->getExceptions().ordinality() || !resp->getThefile().length())
- throw MakeStringException(ECLWATCH_CANNOT_GET_WORKUNIT, "Cannot retrieve remote workunit shared object");
- dll.append(resp->getThefile());
- dllname.append(resp->getFileName());
- name.append(resp->getQueryName());
- SocketEndpoint ep;
- checkUseEspOrDaliIP(ep, resp->getDaliServer(), netAddress);
- if (!ep.isNull())
- ep.getUrlStr(daliServer);
- }
- void fetchRemoteWorkunitAndQueryDetails(IClientWsWorkunits *_ws, IEspContext *ctx, const char *netAddress, const char *queryset, const char *query, const char *wuid, StringBuffer &name, StringBuffer &xml, StringBuffer &dllname, MemoryBuffer &dll, StringBuffer &daliServer, Owned<IClientWUQuerySetDetailsResponse> &respQueryInfo)
- {
- Owned<IClientWsWorkunits> ws = ensureWsWorkunitsClient(_ws, ctx, netAddress);
- fetchRemoteWorkunit(ws, ctx, netAddress, queryset, query, wuid, name, xml, dllname, dll, daliServer);
- respQueryInfo.setown(fetchQueryDetails(ws, ctx, netAddress, queryset, query));
- }
- void doWuFileCopy(IClientFileSpray &fs, IEspWULogicalFileCopyInfo &info, const char *logicalname, const char *cluster, bool isRoxie, bool supercopy)
- {
- try
- {
- Owned<IClientCopy> req = fs.createCopyRequest();
- req->setSourceLogicalName(logicalname);
- req->setDestLogicalName(logicalname);
- req->setDestGroup(cluster);
- req->setSuperCopy(supercopy);
- if (isRoxie)
- req->setDestGroupRoxie("Yes");
- Owned<IClientCopyResponse> resp = fs.Copy(req);
- info.setDfuCopyWuid(resp->getResult());
- }
- catch (IException *e)
- {
- StringBuffer msg;
- info.setDfuCopyError(e->errorMessage(msg).str());
- e->Release();
- }
- }
- bool copyWULogicalFiles(IEspContext &context, IConstWorkUnit &cw, const char *cluster, bool copyLocal, IEspWUCopyLogicalClusterFileSections &lfinfo)
- {
- if (isEmpty(cluster))
- throw MakeStringException(ECLWATCH_INVALID_CLUSTER_NAME, "copyWULogicalFiles Cluster parameter not set.");
- Owned<IUserDescriptor> udesc = createUserDescriptor();
- udesc->set(context.queryUserId(), context.queryPassword(), context.querySessionToken(), context.querySignature());
- IArrayOf<IEspWULogicalFileCopyInfo> foreign;
- IArrayOf<IEspWULogicalFileCopyInfo> onCluster;
- IArrayOf<IEspWULogicalFileCopyInfo> notOnCluster;
- IArrayOf<IEspWULogicalFileCopyInfo> notFound;
- Owned<IClientFileSpray> fs;
- if (copyLocal)
- {
- fs.setown(createFileSprayClient());
- VStringBuffer url("http://.:%d/FileSpray", 8010);
- fs->addServiceUrl(url.str());
- }
- bool isRoxie = isRoxieProcess(cluster);
- Owned<IConstWUGraphIterator> graphs = &cw.getGraphs(GraphTypeActivities);
- ForEach(*graphs)
- {
- Owned <IPropertyTree> xgmml = graphs->query().getXGMMLTree(false);
- Owned<IPropertyTreeIterator> iter = xgmml->getElements(".//node");
- ForEach(*iter)
- {
- try
- {
- IPropertyTree &node = iter->query();
- ThorActivityKind kind = (ThorActivityKind) node.getPropInt("att[@name='_kind']/@value", TAKnone);
- if(kind==TAKdiskwrite || kind==TAKspillwrite || kind==TAKindexwrite || kind==TAKcsvwrite || kind==TAKxmlwrite || kind==TAKjsonwrite)
- continue;
- if (node.getPropBool("att[@name='_isSpill']/@value") || node.getPropBool("att[@name='_isTransformSpill']/@value"))
- continue;
- Owned<IEspWULogicalFileCopyInfo> info = createWULogicalFileCopyInfo();
- const char *logicalname = node.queryProp("att[@name='_indexFileName']/@value");
- if (logicalname)
- info->setIsIndex(true);
- else
- logicalname = node.queryProp("att[@name='_fileName']/@value");
- info->setLogicalName(logicalname);
- if (logicalname)
- {
- if (!strnicmp("~foreign::", logicalname, 10))
- foreign.append(*info.getClear());
- else
- {
- Owned<IDistributedFile> df = queryDistributedFileDirectory().lookup(logicalname, udesc);
- if(!df)
- notFound.append(*info.getClear());
- else if (df->findCluster(cluster)!=NotFound)
- {
- onCluster.append(*info.getClear());
- }
- else
- {
- StringArray clusters;
- df->getClusterNames(clusters);
- info->setClusters(clusters);
- if (copyLocal)
- {
- StringBuffer wuid;
- bool supercopy = queryDistributedFileDirectory().isSuperFile(logicalname, udesc, NULL);
- doWuFileCopy(*fs, *info, logicalname, cluster, isRoxie, supercopy);
- }
- notOnCluster.append(*info.getClear());
- }
- }
- }
- }
- catch(IException *e)
- {
- e->Release();
- }
- }
- lfinfo.setClusterName(cluster);
- lfinfo.setNotOnCluster(notOnCluster);
- lfinfo.setOnCluster(onCluster);
- lfinfo.setForeign(foreign);
- lfinfo.setNotFound(notFound);
- }
- return true;
- }
- void copyWULogicalFilesToTarget(IEspContext &context, IConstWUClusterInfo &clusterInfo, IConstWorkUnit &cw, IArrayOf<IConstWUCopyLogicalClusterFileSections> &clusterfiles, bool doLocalCopy)
- {
- const StringArray &thors = clusterInfo.getThorProcesses();
- ForEachItemIn(i, thors)
- {
- Owned<IEspWUCopyLogicalClusterFileSections> files = createWUCopyLogicalClusterFileSections();
- copyWULogicalFiles(context, cw, thors.item(i), doLocalCopy, *files);
- clusterfiles.append(*files.getClear());
- }
- SCMStringBuffer roxie;
- clusterInfo.getRoxieProcess(roxie);
- if (roxie.length())
- {
- Owned<IEspWUCopyLogicalClusterFileSections> files = createWUCopyLogicalClusterFileSections();
- copyWULogicalFiles(context, cw, roxie.str(), doLocalCopy, *files);
- clusterfiles.append(*files.getClear());
- }
- }
- void QueryFilesInUse::loadTarget(IPropertyTree *t, const char *target, unsigned flags)
- {
- if (!target || !*target)
- return;
- Owned<IConstWUClusterInfo> clusterInfo = getTargetClusterInfo(target);
- if (!clusterInfo || !(clusterInfo->getPlatform() == RoxieCluster))
- return;
- Owned<IPropertyTree> queryRegistry = getQueryRegistry(target, true);
- if (!queryRegistry)
- return;
- SCMStringBuffer process;
- clusterInfo->getRoxieProcess(process);
- if (!process.length())
- return;
- Owned<IHpccPackageSet> ps = createPackageSet(process.str());
- const IHpccPackageMap *pm = (ps) ? ps->queryActiveMap(target) : NULL;
- const char *pmid = (pm) ? pm->queryPackageId() : NULL;
- VStringBuffer xpath("%s/@pmid", target);
- const char *pmidPrev = t->queryProp(xpath);
- if ((flags & UFO_RELOAD_TARGETS_CHANGED_PMID) && (pmid || pmidPrev))
- {
- if (!(pmid && pmidPrev) || !streq(pmid, pmidPrev))
- t->removeProp(target);
- }
- IPropertyTree *targetTree = ensurePTree(t, target);
- if (pm)
- targetTree->setProp("@pmid", pmid);
- if (flags & UFO_REMOVE_QUERIES_NOT_IN_QUERYSET)
- {
- Owned<IPropertyTreeIterator> cachedQueries = targetTree->getElements("Query");
- ForEach(*cachedQueries)
- {
- IPropertyTree &cachedQuery = cachedQueries->query();
- VStringBuffer xpath("Query[@id='%s']", cachedQuery.queryProp("@id"));
- if (!queryRegistry->hasProp(xpath))
- targetTree->removeTree(&cachedQuery);
- }
- }
- Owned<IPropertyTreeIterator> queries = queryRegistry->getElements("Query");
- ForEach(*queries)
- {
- if (aborting)
- return;
- IPropertyTree &query = queries->query();
- const char *queryid = query.queryProp("@id");
- if (!queryid || !*queryid)
- continue;
- const char *wuid = query.queryProp("@wuid");
- if (!wuid || !*wuid)
- continue;
- const char *pkgid=NULL;
- if (pm)
- {
- const IHpccPackage *pkg = pm->matchPackage(queryid);
- if (pkg)
- pkgid = pkg->queryId();
- }
- VStringBuffer xpath("Query[@id='%s']", queryid);
- IPropertyTree *queryTree = targetTree->queryPropTree(xpath);
- if (queryTree)
- {
- const char *cachedPkgid = queryTree->queryProp("@pkgid");
- if (pkgid && *pkgid)
- {
- if (!(flags & UFO_RELOAD_MAPPED_QUERIES) && (cachedPkgid && streq(pkgid, cachedPkgid)))
- continue;
- }
- else if (!cachedPkgid || !*cachedPkgid)
- continue;
- targetTree->removeTree(queryTree);
- queryTree = NULL;
- }
- Owned<IWorkUnitFactory> factory = getWorkUnitFactory();
- Owned<IConstWorkUnit> cw = factory->openWorkUnit(wuid);
- if (!cw)
- continue;
- queryTree = targetTree->addPropTree("Query", createPTree("Query"));
- queryTree->setProp("@target", target); //for reference when searching across targets
- queryTree->setProp("@id", queryid);
- if (pkgid && *pkgid)
- queryTree->setProp("@pkgid", pkgid);
- IUserDescriptor **roxieUser = roxieUserMap.getValue(target);
- Owned<IReferencedFileList> wufiles = createReferencedFileList(roxieUser ? *roxieUser : NULL, true, true);
- wufiles->addFilesFromQuery(cw, pm, queryid);
- if (aborting)
- return;
- wufiles->resolveFiles(process.str(), NULL, NULL, NULL, true, true, false, false);
- Owned<IReferencedFileIterator> files = wufiles->getFiles();
- ForEach(*files)
- {
- if (aborting)
- return;
- IReferencedFile &rf = files->query();
- //if (!(rf.getFlags() & RefSubFile))
- // continue;
- const char *lfn = rf.getLogicalName();
- if (!lfn || !*lfn)
- continue;
- if (!queryTree->hasProp(xpath.setf("File[@lfn='%s']", lfn)))
- {
- IPropertyTree *fileTree = queryTree->addPropTree("File", createPTree("File"));
- fileTree->setProp("@lfn", lfn);
- if (rf.getFlags() & RefFileSuper)
- fileTree->setPropBool("@super", true);
- if (rf.getFlags() & RefFileNotFound)
- fileTree->setPropBool("@notFound", true);
- const char *fpkgid = rf.queryPackageId();
- if (fpkgid && *fpkgid)
- fileTree->setProp("@pkgid", fpkgid);
- if (rf.getFileSize())
- fileTree->setPropInt64("@size", rf.getFileSize());
- if (rf.getNumParts())
- fileTree->setPropInt("@numparts", rf.getNumParts());
- }
- }
- }
- }
- void QueryFilesInUse::loadTargets(IPropertyTree *t, unsigned flags)
- {
- Owned<IStringIterator> targets = getTargetClusters("RoxieCluster", NULL);
- SCMStringBuffer s;
- ForEach(*targets)
- {
- if (aborting)
- return;
- loadTarget(t, targets->str(s).str(), flags);
- }
- }
- IPropertyTreeIterator *QueryFilesInUse::findAllQueriesUsingFile(const char *lfn)
- {
- if (!lfn || !*lfn)
- return NULL;
- Owned<IPropertyTree> t = getTree();
- VStringBuffer xpath("*/Query[File/@lfn='%s']", lfn);
- return t->getElements(xpath);
- }
- IPropertyTreeIterator *QueryFilesInUse::findQueriesUsingFile(const char *target, const char *lfn, StringAttr &pmid)
- {
- if (!lfn || !*lfn)
- return NULL;
- if (!target || !*target)
- return findAllQueriesUsingFile(lfn);
- Owned<IPropertyTree> t = getTree();
- IPropertyTree *targetTree = t->queryPropTree(target);
- if (!targetTree)
- return NULL;
- pmid.set(targetTree->queryProp("@pmid"));
- VStringBuffer xpath("Query[File/@lfn='%s']", lfn);
- return targetTree->getElements(xpath);
- }
- bool CWsWorkunitsEx::onWUCopyLogicalFiles(IEspContext &context, IEspWUCopyLogicalFilesRequest &req, IEspWUCopyLogicalFilesResponse &resp)
- {
- StringBuffer wuid = req.getWuid();
- WsWuHelpers::checkAndTrimWorkunit("WUCopyLogicalFiles", wuid);
- Owned<IWorkUnitFactory> factory = getWorkUnitFactory(context.querySecManager(), context.queryUser());
- Owned<IConstWorkUnit> cw = factory->openWorkUnit(wuid.str());
- if (!cw)
- throw MakeStringException(ECLWATCH_CANNOT_OPEN_WORKUNIT,"Cannot open workunit %s", wuid.str());
- resp.setWuid(wuid.str());
- StringAttr cluster;
- if (notEmpty(req.getCluster()))
- cluster.set(req.getCluster());
- else
- cluster.set(cw->queryClusterName());
- if (!isValidCluster(cluster))
- throw MakeStringException(ECLWATCH_INVALID_CLUSTER_NAME, "Invalid cluster name: %s", cluster.str());
- Owned <IConstWUClusterInfo> clusterInfo = getTargetClusterInfo(cluster.str());
- IArrayOf<IConstWUCopyLogicalClusterFileSections> clusterfiles;
- PROGLOG("WUCopyLogicalFiles: %s", wuid.str());
- copyWULogicalFilesToTarget(context, *clusterInfo, *cw, clusterfiles, req.getCopyLocal());
- resp.setClusterFiles(clusterfiles);
- return true;
- }
- static inline unsigned remainingMsWait(unsigned wait, unsigned start)
- {
- if (wait==0 || wait==(unsigned)-1)
- return wait;
- unsigned waited = msTick()-start;
- return (wait>waited) ? wait-waited : 0;
- }
- bool reloadCluster(IConstWUClusterInfo *clusterInfo, unsigned wait)
- {
- if (0==wait || !clusterInfo || clusterInfo->getPlatform()!=RoxieCluster)
- return true;
- const SocketEndpointArray &addrs = clusterInfo->getRoxieServers();
- if (addrs.length())
- {
- try
- {
- Owned<IPropertyTree> result = sendRoxieControlAllNodes(addrs.item(0), "<control:reload/>", false, wait);
- const char *status = result->queryProp("Endpoint[1]/Status");
- if (!status || !strieq(status, "ok"))
- return false;
- }
- catch(IMultiException *me)
- {
- StringBuffer err;
- DBGLOG("ERROR control:reloading roxie query info %s", me->errorMessage(err.append(me->errorCode()).append(' ')).str());
- me->Release();
- return false;
- }
- catch(IException *e)
- {
- StringBuffer err;
- DBGLOG("ERROR control:reloading roxie query info %s", e->errorMessage(err.append(e->errorCode()).append(' ')).str());
- e->Release();
- return false;
- }
- }
- return true;
- }
- bool reloadCluster(const char *cluster, unsigned wait)
- {
- Owned <IConstWUClusterInfo> clusterInfo = getTargetClusterInfo(cluster);
- return (clusterInfo) ? reloadCluster(clusterInfo, wait) : true;
- }
- static inline void updateQuerySetting(bool ignore, IPropertyTree *queryTree, const char *xpath, int value)
- {
- if (ignore || !queryTree)
- return;
- if (value!=0)
- queryTree->setPropInt(xpath, value);
- else
- queryTree->removeProp(xpath);
- }
- static inline void updateTimeLimitSetting(IPropertyTree *queryTree, bool valueNotSet, int value, IConstQuerySetQuery *srcInfo=NULL)
- {
- if (valueNotSet && srcInfo && !srcInfo->getTimeLimit_isNull())
- {
- value = srcInfo->getTimeLimit();
- valueNotSet=false;
- }
- updateQuerySetting(valueNotSet, queryTree, "@timeLimit", value);
- }
- static inline void updateWarnTimeLimitSetting(IPropertyTree *queryTree, bool valueNotSet, int value, IConstQuerySetQuery *srcInfo=NULL)
- {
- if (valueNotSet && srcInfo && !srcInfo->getWarnTimeLimit_isNull())
- {
- value = srcInfo->getWarnTimeLimit();
- valueNotSet=false;
- }
- updateQuerySetting(valueNotSet, queryTree, "@warnTimeLimit", value);
- }
- static inline unsigned __int64 memoryLimitUInt64FromString(const char *value)
- {
- if (!value || !*value || !isdigit(*value))
- return 0;
- unsigned __int64 result = (*value - '0');
- const char *s = value+1;
- while (isdigit(*s))
- {
- result = 10 * result + ((*s) - '0');
- s++;
- }
- if (*s)
- {
- const char unit = toupper(*s++);
- if (*s && !strieq("B", s)) //more?
- return 0;
- switch (unit)
- {
- case 'E':
- result <<=60;
- break;
- case 'P':
- result <<=50;
- break;
- case 'T':
- result <<=40;
- break;
- case 'G':
- result <<=30;
- break;
- case 'M':
- result <<=20;
- break;
- case 'K':
- result <<=10;
- break;
- case 'B':
- break;
- default:
- return 0;
- }
- }
- return result;
- }
- const char memUnitAbbrev[] = {'B', 'K', 'M', 'G', 'T', 'P', 'E'};
- #define MAX_MEMUNIT_ABBREV 6
- static inline StringBuffer &memoryLimitStringFromUInt64(StringBuffer &s, unsigned __int64 in)
- {
- if (!in)
- return s;
- unsigned __int64 value = in;
- unsigned char unit = 0;
- while (!(value & 0x3FF) && unit < MAX_MEMUNIT_ABBREV)
- {
- value >>= 10;
- unit++;
- }
- return s.append(value).append(memUnitAbbrev[unit]);
- }
- static inline void updateMemoryLimitSetting(IPropertyTree *queryTree, const char *value, IConstQuerySetQuery *srcInfo=NULL)
- {
- if (!queryTree)
- return;
- if (!value && srcInfo)
- value = srcInfo->getMemoryLimit();
- if (!value)
- return;
- unsigned __int64 limit = memoryLimitUInt64FromString(value);
- if (0==limit)
- queryTree->removeProp("@memoryLimit");
- else
- queryTree->setPropInt64("@memoryLimit", limit);
- }
- enum QueryPriority {
- QueryPriorityNone = -1,
- QueryPriorityLow = 0,
- QueryPriorityHigh = 1,
- QueryPrioritySLA = 2,
- QueryPriorityInvalid = 3
- };
- static inline const char *getQueryPriorityName(int value)
- {
- switch (value)
- {
- case QueryPriorityLow:
- return "LOW";
- case QueryPriorityHigh:
- return "HIGH";
- case QueryPrioritySLA:
- return "SLA";
- case QueryPriorityNone:
- return "NONE";
- }
- return "INVALID";
- }
- static inline void updateQueryPriority(IPropertyTree *queryTree, const char *value, IConstQuerySetQuery *srcInfo=NULL)
- {
- if (!queryTree)
- return;
- if ((!value || !*value) && srcInfo)
- value = srcInfo->getPriority();
- if (!value || !*value)
- return;
- int priority = QueryPriorityInvalid;
- if (strieq("LOW", value))
- priority=QueryPriorityLow;
- else if (strieq("HIGH", value))
- priority=QueryPriorityHigh;
- else if (strieq("SLA", value))
- priority=QueryPrioritySLA;
- else if (strieq("NONE", value))
- priority=QueryPriorityNone;
- switch (priority)
- {
- case QueryPriorityInvalid:
- break;
- case QueryPriorityNone:
- queryTree->removeProp("@priority");
- break;
- default:
- queryTree->setPropInt("@priority", priority);
- break;
- }
- }
- void gatherFileErrors(IReferencedFileList *files, IArrayOf<IConstLogicalFileError> &errors)
- {
- Owned<IReferencedFileIterator> it = files->getFiles();
- ForEach(*it)
- {
- IReferencedFile &file = it->query();
- unsigned flags = file.getFlags();
- if (!(flags & (RefFileNotFound | RefFileCopyInfoFailed)))
- continue;
- StringBuffer msg;
- if (flags & RefFileOptional)
- msg.append("OPT ");
- if (flags & RefFileNotFound)
- msg.append("Not Found");
- else
- msg.append("Copy Failed");
- Owned<IEspLogicalFileError> error = createLogicalFileError();
- error->setLogicalName(file.getLogicalName());
- error->setError(msg);
- errors.append(*static_cast<IConstLogicalFileError*>(error.getClear()));
- }
- }
- class QueryFileCopier
- {
- public:
- QueryFileCopier(const char *target_) : target(target_) {}
- void init(IEspContext &context, bool allowForeignFiles)
- {
- files.setown(createReferencedFileList(context.queryUserId(), context.queryPassword(), allowForeignFiles, false));
- clusterInfo.setown(getTargetClusterInfo(target));
- StringBufferAdaptor sba(process);
- if (clusterInfo && clusterInfo->getPlatform()==RoxieCluster)
- clusterInfo->getRoxieProcess(sba);
- if (!process.length())
- return;
- ps.setown(createPackageSet(process.str()));
- if (ps)
- pm = ps->queryActiveMap(target);
- }
- void copy(IConstWorkUnit *cw, unsigned updateFlags)
- {
- StringBuffer queryid;
- if (queryname && *queryname)
- queryname = queryid.append(queryname).append(".0").str(); //prepublish dummy version number to support fuzzy match like queries="myquery.*" in package
- files->addFilesFromQuery(cw, pm, queryname);
- files->resolveFiles(process.str(), remoteIP, remotePrefix, srcCluster, !(updateFlags & (DALI_UPDATEF_REPLACE_FILE | DALI_UPDATEF_CLONE_FROM | DALI_UPDATEF_SUPERFILES)), true, false, true);
- StringBuffer defReplicateFolder;
- getConfigurationDirectory(NULL, "data2", "roxie", process.str(), defReplicateFolder);
- Owned<IDFUhelper> helper = createIDFUhelper();
- files->cloneAllInfo(updateFlags, helper, true, true, clusterInfo->getRoxieRedundancy(), clusterInfo->getChannelsPerNode(), clusterInfo->getRoxieReplicateOffset(), defReplicateFolder);
- }
- void gatherFileErrors(IArrayOf<IConstLogicalFileError> &errors)
- {
- ::gatherFileErrors(files, errors);
- }
- private:
- Owned <IConstWUClusterInfo> clusterInfo;
- Owned<IHpccPackageSet> ps;
- const IHpccPackageMap *pm = nullptr;
- StringAttr target;
- public:
- Owned<IReferencedFileList> files;
- StringBuffer process;
- StringAttr remoteIP;
- StringAttr remotePrefix;
- StringAttr srcCluster;
- StringAttr queryname;
- };
- bool CWsWorkunitsEx::isQuerySuspended(const char* query, IConstWUClusterInfo *clusterInfo, unsigned wait, StringBuffer& errorMessage)
- {
- try
- {
- if (0==wait || !clusterInfo || clusterInfo->getPlatform()!=RoxieCluster)
- return false;
- const SocketEndpointArray &addrs = clusterInfo->getRoxieServers();
- if (addrs.length() < 1)
- return false;
- StringBuffer control;
- control.appendf("<control:queries><Query id='%s'/></control:queries>", query);
- Owned<IPropertyTree> result = sendRoxieControlAllNodes(addrs.item(0), control.str(), false, wait);
- if (!result)
- return false;
- Owned<IPropertyTreeIterator> suspendedQueries = result->getElements("Endpoint/Queries/Query[@suspended='1']");
- if (!suspendedQueries->first())
- return false;
- errorMessage.set(suspendedQueries->query().queryProp("@error"));
- return true;
- }
- catch(IMultiException *me)
- {
- StringBuffer err;
- DBGLOG("ERROR control:queries roxie query info %s", me->errorMessage(err.append(me->errorCode()).append(' ')).str());
- me->Release();
- return false;
- }
- catch(IException *e)
- {
- StringBuffer err;
- DBGLOG("ERROR control:queries roxie query info %s", e->errorMessage(err.append(e->errorCode()).append(' ')).str());
- e->Release();
- return false;
- }
- }
- bool CWsWorkunitsEx::onWUPublishWorkunit(IEspContext &context, IEspWUPublishWorkunitRequest & req, IEspWUPublishWorkunitResponse & resp)
- {
- StringBuffer wuid = req.getWuid();
- WsWuHelpers::checkAndTrimWorkunit("WUPublishWorkunit", wuid);
- Owned<IWorkUnitFactory> factory = getWorkUnitFactory(context.querySecManager(), context.queryUser());
- Owned<IConstWorkUnit> cw = factory->openWorkUnit(wuid.str());
- if (!cw)
- throw MakeStringException(ECLWATCH_CANNOT_OPEN_WORKUNIT,"Cannot find the workunit %s", wuid.str());
- resp.setWuid(wuid.str());
- StringAttr queryName;
- if (notEmpty(req.getJobName()))
- queryName.set(req.getJobName());
- else
- queryName.set(cw->queryJobName());
- if (!queryName.length())
- throw MakeStringException(ECLWATCH_MISSING_PARAMS, "Query/Job name not defined for publishing workunit %s", wuid.str());
- StringAttr target;
- if (notEmpty(req.getCluster()))
- target.set(req.getCluster());
- else
- target.set(cw->queryClusterName());
- if (!target.length())
- throw MakeStringException(ECLWATCH_MISSING_PARAMS, "Cluster name not defined for publishing workunit %s", wuid.str());
- if (!isValidCluster(target.str()))
- throw MakeStringException(ECLWATCH_INVALID_CLUSTER_NAME, "Invalid cluster name: %s", target.str());
- DBGLOG("%s publishing wuid %s to target %s as query %s", context.queryUserId(), wuid.str(), target.str(), queryName.str());
- StringBuffer daliIP;
- StringBuffer srcCluster;
- StringBuffer srcPrefix;
- splitDerivedDfsLocation(req.getRemoteDali(), srcCluster, daliIP, srcPrefix, req.getSourceProcess(),req.getSourceProcess(), NULL, NULL);
- if (srcCluster.length())
- {
- if (!isProcessCluster(daliIP, srcCluster))
- throw MakeStringException(ECLWATCH_INVALID_CLUSTER_NAME, "Process cluster %s not found on %s DALI", srcCluster.str(), daliIP.length() ? daliIP.str() : "local");
- }
- unsigned updateFlags = 0;
- if (req.getUpdateDfs())
- updateFlags |= (DALI_UPDATEF_SUPERFILES | DALI_UPDATEF_REPLACE_FILE | DALI_UPDATEF_CLONE_FROM);
- if (req.getUpdateCloneFrom())
- updateFlags |= DALI_UPDATEF_CLONE_FROM;
- if (req.getUpdateSuperFiles())
- updateFlags |= DALI_UPDATEF_SUPERFILES;
- if (req.getAppendCluster())
- updateFlags |= DALI_UPDATEF_APPEND_CLUSTER;
- if (!req.getDontCopyFiles())
- {
- QueryFileCopier cpr(target);
- cpr.init(context, req.getAllowForeignFiles());
- cpr.remoteIP.set(daliIP);
- cpr.remotePrefix.set(srcPrefix);
- cpr.srcCluster.set(srcCluster);
- cpr.queryname.set(queryName);
- cpr.copy(cw, updateFlags);
- if (req.getIncludeFileErrors())
- cpr.gatherFileErrors(resp.getFileErrors());
- }
- WorkunitUpdate wu(&cw->lock());
- if (req.getUpdateWorkUnitName() && notEmpty(req.getJobName()))
- wu->setJobName(req.getJobName());
- StringBuffer queryId;
- WUQueryActivationOptions activate = (WUQueryActivationOptions)req.getActivate();
- addQueryToQuerySet(wu, target.str(), queryName.str(), activate, queryId, context.queryUserId());
- if (req.getMemoryLimit() || !req.getTimeLimit_isNull() || !req.getWarnTimeLimit_isNull() || req.getPriority() || req.getComment())
- {
- Owned<IPropertyTree> queryTree = getQueryById(target.str(), queryId, false);
- updateMemoryLimitSetting(queryTree, req.getMemoryLimit());
- updateQuerySetting(req.getTimeLimit_isNull(), queryTree, "@timeLimit", req.getTimeLimit());
- updateQuerySetting(req.getWarnTimeLimit_isNull(), queryTree, "@warnTimeLimit", req.getWarnTimeLimit());
- updateQueryPriority(queryTree, req.getPriority());
- if (req.getComment())
- queryTree->setProp("@comment", req.getComment());
- }
- wu->commit();
- wu.clear();
- if (queryId.length())
- resp.setQueryId(queryId.str());
- resp.setQueryName(queryName.str());
- resp.setQuerySet(target.str());
- Owned <IConstWUClusterInfo> clusterInfo = getTargetClusterInfo(target.str());
- bool reloadFailed = false;
- if (0!=req.getWait() && !req.getNoReload())
- reloadFailed = !reloadCluster(clusterInfo, (unsigned)req.getWait());
-
- resp.setReloadFailed(reloadFailed);
- double version = context.getClientVersion();
- if (version > 1.38)
- {
- StringBuffer errorMessage;
- if (!reloadFailed && !req.getNoReload() && isQuerySuspended(queryName.str(), clusterInfo, (unsigned)req.getWait(), errorMessage))
- {
- resp.setSuspended(true);
- resp.setErrorMessage(errorMessage);
- }
- }
- return true;
- }
- bool CWsWorkunitsEx::onWUQuerysets(IEspContext &context, IEspWUQuerysetsRequest & req, IEspWUQuerysetsResponse & resp)
- {
- IArrayOf<IEspQuerySet> querySets;
- Owned<IStringIterator> targets = getTargetClusters(NULL, NULL);
- SCMStringBuffer target;
- ForEach(*targets)
- {
- Owned<IEspQuerySet> qs = createQuerySet();
- qs->setQuerySetName(targets->str(target).str());
- querySets.append(*qs.getClear());
- }
- resp.setQuerysets(querySets);
- return true;
- }
- void addClusterQueryStates(IPropertyTree* queriesOnCluster, const char *target, const char *id, IArrayOf<IEspClusterQueryState>& clusterStates, double version)
- {
- if (queriesOnCluster)
- queriesOnCluster = queriesOnCluster->queryPropTree("Endpoint[1]/Queries[1]");
- if (!queriesOnCluster)
- return;
- int reporting = queriesOnCluster->getPropInt("@reporting");
- Owned<IEspClusterQueryState> clusterState = createClusterQueryState();
- clusterState->setCluster(target);
- VStringBuffer xpath("Query[@id='%s']", id);
- IPropertyTree *query = queriesOnCluster->queryPropTree(xpath.str());
- if (!query)
- clusterState->setState("Not Found");
- else
- {
- int suspended = query->getPropInt("@suspended");
- const char* error = query->queryProp("@error");
- if (0==suspended)
- clusterState->setState("Available");
- else
- {
- clusterState->setState("Suspended");
- if (suspended<reporting)
- clusterState->setMixedNodeStates(true);
- }
- if (error && *error)
- clusterState->setErrors(error);
- }
- clusterStates.append(*clusterState.getClear());
- }
- void gatherQuerySetQueryDetails(IEspContext &context, IPropertyTree *query, IEspQuerySetQuery *queryInfo, const char *cluster, IPropertyTree *queriesOnCluster)
- {
- queryInfo->setId(query->queryProp("@id"));
- queryInfo->setName(query->queryProp("@name"));
- queryInfo->setDll(query->queryProp("@dll"));
- queryInfo->setWuid(query->queryProp("@wuid"));
- queryInfo->setSuspended(query->getPropBool("@suspended", false));
- if (query->hasProp("@memoryLimit"))
- {
- StringBuffer s;
- memoryLimitStringFromUInt64(s, query->getPropInt64("@memoryLimit"));
- queryInfo->setMemoryLimit(s);
- }
- if (query->hasProp("@timeLimit"))
- queryInfo->setTimeLimit(query->getPropInt("@timeLimit"));
- if (query->hasProp("@warnTimeLimit"))
- queryInfo->setWarnTimeLimit(query->getPropInt("@warnTimeLimit"));
- if (query->hasProp("@priority"))
- queryInfo->setPriority(getQueryPriorityName(query->getPropInt("@priority")));
- if (query->hasProp("@comment"))
- queryInfo->setComment(query->queryProp("@comment"));
- if (query->hasProp("@snapshot"))
- queryInfo->setSnapshot(query->queryProp("@snapshot"));
- double version = context.getClientVersion();
- if (version >= 1.46)
- {
- queryInfo->setPublishedBy(query->queryProp("@publishedBy"));
- queryInfo->setIsLibrary(query->getPropBool("@isLibrary"));
- }
- if (queriesOnCluster)
- {
- IArrayOf<IEspClusterQueryState> clusters;
- addClusterQueryStates(queriesOnCluster, cluster, query->queryProp("@id"), clusters, version);
- queryInfo->setClusters(clusters);
- }
- }
- void gatherQuerySetAliasDetails(IPropertyTree *alias, IEspQuerySetAlias *aliasInfo)
- {
- aliasInfo->setName(alias->queryProp("@name"));
- aliasInfo->setId(alias->queryProp("@id"));
- }
- void retrieveAllQuerysetDetails(IEspContext &context, IPropertyTree *registry, IArrayOf<IEspQuerySetQuery> &queries, IArrayOf<IEspQuerySetAlias> &aliases, const char *cluster=NULL, IPropertyTree *queriesOnCluster=NULL, const char *type=NULL, const char *value=NULL)
- {
- Owned<IPropertyTreeIterator> regQueries = registry->getElements("Query");
- ForEach(*regQueries)
- {
- IPropertyTree &query = regQueries->query();
- Owned<IEspQuerySetQuery> q = createQuerySetQuery();
- gatherQuerySetQueryDetails(context, &query, q, cluster, queriesOnCluster);
- if (isEmpty(cluster) || isEmpty(type) || isEmpty(value) || !strieq(type, "Status"))
- queries.append(*q.getClear());
- else
- {
- IArrayOf<IConstClusterQueryState>& cs = q->getClusters();
- ForEachItemIn(i, cs)
- {
- IConstClusterQueryState& c = cs.item(i);
- if (strieq(c.getCluster(), cluster) && (strieq(value, "All") || strieq(c.getState(), value)))
- {
- queries.append(*q.getClear());
- break;
- }
- }
- }
- }
- Owned<IPropertyTreeIterator> regAliases = registry->getElements("Alias");
- ForEach(*regAliases)
- {
- IPropertyTree &alias = regAliases->query();
- Owned<IEspQuerySetAlias> a = createQuerySetAlias();
- gatherQuerySetAliasDetails(&alias, a);
- aliases.append(*a.getClear());
- }
- }
- void retrieveQuerysetDetailsFromAlias(IEspContext &context, IPropertyTree *registry, const char *name, IArrayOf<IEspQuerySetQuery> &queries, IArrayOf<IEspQuerySetAlias> &aliases, const char *cluster, IPropertyTree *queriesOnCluster)
- {
- StringBuffer xpath;
- xpath.append("Alias[@name='").append(name).append("']");
- Owned<IPropertyTreeIterator> regAliases = registry->getElements(xpath.str());
- if (!regAliases->first())
- {
- DBGLOG("Alias %s not found", name);
- return;
- }
- ForEach(*regAliases)
- {
- IPropertyTree& alias = regAliases->query();
- Owned<IEspQuerySetAlias> a = createQuerySetAlias();
- gatherQuerySetAliasDetails(&alias, a);
- xpath.clear().append("Query[@id='").append(a->getId()).append("']");
- aliases.append(*a.getClear());
- IPropertyTree *query = registry->queryPropTree(xpath);
- if (!query)
- {
- DBGLOG("No matching Query %s found for Alias %s", a->getId(), name);
- return;
- }
- Owned<IEspQuerySetQuery> q = createQuerySetQuery();
- gatherQuerySetQueryDetails(context, query, q, cluster, queriesOnCluster);
- queries.append(*q.getClear());
- }
- }
- void retrieveQuerysetDetailsFromQuery(IEspContext &context, IPropertyTree *registry, const char *value, const char *type, IArrayOf<IEspQuerySetQuery> &queries, IArrayOf<IEspQuerySetAlias> &aliases, const char *cluster=NULL, IPropertyTree *queriesOnCluster=NULL)
- {
- if (!strieq(type, "Id") && !strieq(type, "Name"))
- throw MakeStringException(ECLWATCH_INVALID_INPUT, "Unrecognized queryset filter type %s", type);
- StringBuffer attributeName(type);
- StringBuffer xpath;
- xpath.clear().append("Query[@").append(attributeName.toLowerCase()).append("='").append(value).append("']");
- Owned<IPropertyTreeIterator> regQueries = registry->getElements(xpath.str());
- if (!regQueries->first())
- {
- DBGLOG("No matching Query %s found for %s", value, type);
- return;
- }
- ForEach(*regQueries)
- {
- IPropertyTree& query = regQueries->query();
- Owned<IEspQuerySetQuery> q = createQuerySetQuery();
- gatherQuerySetQueryDetails(context, &query, q, cluster, queriesOnCluster);
- xpath.clear().append("Alias[@id='").append(q->getId()).append("']");
- queries.append(*q.getClear());
- Owned<IPropertyTreeIterator> regAliases = registry->getElements(xpath.str());
- ForEach(*regAliases)
- {
- IPropertyTree &alias = regAliases->query();
- Owned<IEspQuerySetAlias> a = createQuerySetAlias();
- gatherQuerySetAliasDetails(&alias, a);
- aliases.append(*a.getClear());
- }
- }
- }
- void retrieveQuerysetDetails(IEspContext &context, IPropertyTree *registry, const char *type, const char *value, IArrayOf<IEspQuerySetQuery> &queries, IArrayOf<IEspQuerySetAlias> &aliases, const char *cluster=NULL, IPropertyTree *queriesOnCluster=NULL)
- {
- if (strieq(type, "All"))
- return retrieveAllQuerysetDetails(context, registry, queries, aliases, cluster, queriesOnCluster);
- if (!value || !*value)
- return;
- if (strieq(type, "Alias"))
- return retrieveQuerysetDetailsFromAlias(context, registry, value, queries, aliases, cluster, queriesOnCluster);
- if (strieq(type, "Status") && !isEmpty(cluster))
- return retrieveAllQuerysetDetails(context, registry, queries, aliases, cluster, queriesOnCluster, type, value);
- return retrieveQuerysetDetailsFromQuery(context, registry, value, type, queries, aliases, cluster, queriesOnCluster);
- }
- void retrieveQuerysetDetails(IEspContext &context, IArrayOf<IEspWUQuerySetDetail> &details, IPropertyTree *registry, const char *type, const char *value, const char *cluster=NULL, IPropertyTree *queriesOnCluster=NULL)
- {
- if (!registry)
- return;
- IArrayOf<IEspQuerySetQuery> queries;
- IArrayOf<IEspQuerySetAlias> aliases;
- retrieveQuerysetDetails(context, registry, type, value, queries, aliases, cluster, queriesOnCluster);
- Owned<IEspWUQuerySetDetail> detail = createWUQuerySetDetail();
- detail->setQuerySetName(registry->queryProp("@id"));
- detail->setQueries(queries);
- detail->setAliases(aliases);
- details.append(*detail.getClear());
- }
- void retrieveQuerysetDetails(IEspContext &context, IArrayOf<IEspWUQuerySetDetail> &details, const char *queryset, const char *type, const char *value, const char *cluster=NULL, IPropertyTree *queriesOnCluster=NULL)
- {
- if (!queryset || !*queryset)
- return;
- Owned<IPropertyTree> registry = getQueryRegistry(queryset, true);
- if (!registry)
- return;
- retrieveQuerysetDetails(context, details, registry, type, value, cluster, queriesOnCluster);
- }
- IPropertyTree* getQueriesOnCluster(const char *target, const char *queryset, bool checkAllNodes)
- {
- if (isEmpty(target))
- target = queryset;
- Owned<IConstWUClusterInfo> info = getTargetClusterInfo(target);
- if (!info)
- throw MakeStringException(ECLWATCH_CANNOT_RESOLVE_CLUSTER_NAME, "Cluster %s not found", target);
- if (queryset && *queryset && !strieq(target, queryset))
- throw MakeStringException(ECLWATCH_QUERYSET_NOT_ON_CLUSTER, "Target %s and QuerySet %s should match", target, queryset);
- if (info->getPlatform()!=RoxieCluster)
- return NULL;
- const SocketEndpointArray &eps = info->getRoxieServers();
- if (!eps.length())
- return NULL;
- try
- {
- Owned<ISocket> sock = ISocket::connect_timeout(eps.item(0), ROXIECONNECTIONTIMEOUT);
- if (checkAllNodes)
- return sendRoxieControlAllNodes(sock, "<control:queries/>", false, ROXIECONTROLQUERIESTIMEOUT);
- else
- return sendRoxieControlQuery(sock, "<control:queries/>", ROXIECONTROLQUERIESTIMEOUT);
- }
- catch(IException* e)
- {
- StringBuffer err;
- DBGLOG("Get exception in control:queries: %s", e->errorMessage(err.append(e->errorCode()).append(' ')).str());
- e->Release();
- return NULL;
- }
- }
- void retrieveQuerysetDetailsByCluster(IEspContext &context, IArrayOf<IEspWUQuerySetDetail> &details, const char *target, const char *queryset, const char *type, const char *value, bool checkAllNodes)
- {
- Owned<IPropertyTree> queriesOnCluster = getQueriesOnCluster(target, queryset, checkAllNodes);
- retrieveQuerysetDetails(context, details, target, type, value, target, queriesOnCluster);
- }
- void retrieveAllQuerysetDetails(IEspContext &context, IArrayOf<IEspWUQuerySetDetail> &details, const char *type, const char *value)
- {
- Owned<IPropertyTree> root = getQueryRegistryRoot();
- if (!root)
- throw MakeStringException(ECLWATCH_QUERYSET_NOT_FOUND, "QuerySet Registry not found");
- Owned<IPropertyTreeIterator> querysets = root->getElements("QuerySet");
- ForEach(*querysets)
- retrieveQuerysetDetails(context, details, &querysets->query(), type, value);
- }
- bool CWsWorkunitsEx::onWUQuerysetDetails(IEspContext &context, IEspWUQuerySetDetailsRequest & req, IEspWUQuerySetDetailsResponse & resp)
- {
- resp.setQuerySetName(req.getQuerySetName());
- double version = context.getClientVersion();
- if (version > 1.36)
- {
- Owned<IPropertyTree> queryRegistry = getQueryRegistry(req.getQuerySetName(), false);
- resp.setClusterName(req.getClusterName());
- resp.setFilter(req.getFilter());
- resp.setFilterType(req.getFilterType());
- }
- Owned<IPropertyTree> registry = getQueryRegistry(req.getQuerySetName(), true);
- if (!registry)
- return false;
- PROGLOG("WUQuerysetDetails for queryset %s", req.getQuerySetName());
- IArrayOf<IEspQuerySetQuery> respQueries;
- IArrayOf<IEspQuerySetAlias> respAliases;
- if (isEmpty(req.getClusterName()) || isEmpty(req.getFilterTypeAsString()) || !strieq(req.getFilterTypeAsString(), "Status") || isEmpty(req.getFilter()))
- {
- const char* cluster = req.getClusterName();
- if (isEmpty(cluster))
- cluster = req.getQuerySetName();
- Owned<IPropertyTree> queriesOnCluster = getQueriesOnCluster(cluster, req.getQuerySetName(), req.getCheckAllNodes());
- retrieveQuerysetDetails(context, registry, req.getFilterTypeAsString(), req.getFilter(), respQueries, respAliases, cluster, queriesOnCluster);
- resp.setQuerysetQueries(respQueries);
- resp.setQuerysetAliases(respAliases);
- }
- else
- {
- IArrayOf<IEspWUQuerySetDetail> respDetails;
- retrieveQuerysetDetailsByCluster(context, respDetails, req.getClusterName(), req.getQuerySetName(), req.getFilterTypeAsString(), req.getFilter(), false);
- if (respDetails.ordinality())
- {
- IEspWUQuerySetDetail& detail = respDetails.item(0);
- resp.setQuerysetQueries(detail.getQueries());
- resp.setQuerysetAliases(detail.getAliases());
- }
- }
- return true;
- }
- bool CWsWorkunitsEx::onWUMultiQuerysetDetails(IEspContext &context, IEspWUMultiQuerySetDetailsRequest & req, IEspWUMultiQuerySetDetailsResponse & resp)
- {
- IArrayOf<IEspWUQuerySetDetail> respDetails;
- if (notEmpty(req.getClusterName()))
- {
- PROGLOG("WUMultiQuerysetDetails for cluster %s", req.getClusterName());
- retrieveQuerysetDetailsByCluster(context, respDetails, req.getClusterName(), req.getQuerySetName(), req.getFilterTypeAsString(), req.getFilter(), req.getCheckAllNodes());
- }
- else if (notEmpty(req.getQuerySetName()))
- {
- PROGLOG("WUMultiQuerysetDetails for queryset %s", req.getQuerySetName());
- retrieveQuerysetDetails(context, respDetails, req.getQuerySetName(), req.getFilterTypeAsString(), req.getFilter());
- }
- else
- {
- VStringBuffer logMsg("WUMultiQuerysetDetails: FilterType %s", req.getFilterTypeAsString());
- if (notEmpty(req.getFilter()))
- logMsg.append(", Filter ").append(req.getFilter());
- PROGLOG("%s", logMsg.str());
- retrieveAllQuerysetDetails(context, respDetails, req.getFilterTypeAsString(), req.getFilter());
- }
- resp.setQuerysets(respDetails);
- return true;
- }
- bool addWUQSQueryFilter(WUQuerySortField *filters, unsigned short &count, MemoryBuffer &buff, const char* value, WUQuerySortField name)
- {
- if (isEmpty(value))
- return false;
- filters[count++] = name;
- buff.append(value);
- return true;
- }
- bool addWUQSQueryFilterInt(WUQuerySortField *filters, unsigned short &count, MemoryBuffer &buff, int value, WUQuerySortField name)
- {
- VStringBuffer vBuf("%d", value);
- filters[count++] = name;
- buff.append(vBuf.str());
- return true;
- }
- bool addWUQSQueryFilterInt64(WUQuerySortField *filters, unsigned short &count, MemoryBuffer &buff, __int64 value, WUQuerySortField name)
- {
- VStringBuffer vBuf("%" I64F "d", value);
- filters[count++] = name;
- buff.append(vBuf.str());
- return true;
- }
- unsigned CWsWorkunitsEx::getGraphIdsByQueryId(const char *target, const char *queryId, StringArray& graphIds)
- {
- if (!target || !*target)
- throw MakeStringException(ECLWATCH_MISSING_PARAMS, "Target name required");
- if (!queryId || !*queryId)
- throw MakeStringException(ECLWATCH_MISSING_PARAMS, "Query Id required");
- Owned<IConstWUClusterInfo> info = getTargetClusterInfo(target);
- if (!info || (info->getPlatform()!=RoxieCluster)) //Only roxie query has query graph.
- return 0;
- const SocketEndpointArray &eps = info->getRoxieServers();
- if (eps.empty())
- return 0;
- VStringBuffer xpath("<control:querystats><Query id='%s'/></control:querystats>", queryId);
- Owned<ISocket> sock = ISocket::connect_timeout(eps.item(0), ROXIECONNECTIONTIMEOUT);
- Owned<IPropertyTree> querystats = sendRoxieControlQuery(sock, xpath.str(), ROXIECONTROLQUERYTIMEOUT);
- if (!querystats)
- return 0;
- Owned<IPropertyTreeIterator> graphs = querystats->getElements("Endpoint/Query/Graph");
- ForEach(*graphs)
- {
- IPropertyTree &graph = graphs->query();
- const char* graphId = graph.queryProp("@id");
- if (graphId && *graphId)
- graphIds.appendUniq(graphId);
- }
- return graphIds.length();
- }
- //This method is thread safe because a query belongs to a single queryset. The method may be called by different threads.
- //Since one thread is for one queryset and a query only belongs to a single queryset, it is impossible for different threads
- //to update the same query object.
- void CWsWorkunitsEx::checkAndSetClusterQueryState(IEspContext &context, const char* cluster, const char* querySetId, IArrayOf<IEspQuerySetQuery>& queries, bool checkAllNodes)
- {
- try
- {
- double version = context.getClientVersion();
- if (isEmpty(cluster))
- cluster = querySetId;
- Owned<IPropertyTree> queriesOnCluster = getQueriesOnCluster(cluster, querySetId, checkAllNodes);
- if (!queriesOnCluster)
- {
- DBGLOG("getQueriesOnCluster() returns NULL for cluster<%s> and querySetId<%s>", cluster, querySetId);
- return;
- }
- ForEachItemIn(i, queries)
- {
- IEspQuerySetQuery& query = queries.item(i);
- const char* queryId = query.getId();
- const char* querySetId0 = query.getQuerySetId();
- if (!queryId || !querySetId0 || !strieq(querySetId0, querySetId))
- continue;
- IArrayOf<IEspClusterQueryState> clusters;
- addClusterQueryStates(queriesOnCluster, cluster, queryId, clusters, version);
- query.setClusters(clusters);
- }
- }
- catch(IException *e)
- {
- EXCLOG(e, "CWsWorkunitsEx::checkAndSetClusterQueryState: Failed to read Query State On Cluster");
- e->Release();
- }
- }
- void CWsWorkunitsEx::checkAndSetClusterQueryState(IEspContext &context, const char* cluster, StringArray& querySetIds, IArrayOf<IEspQuerySetQuery>& queries, bool checkAllNodes)
- {
- UnsignedArray threadHandles;
- ForEachItemIn(i, querySetIds)
- {
- const char* querySetId = querySetIds.item(i);
- if(!querySetId || !*querySetId)
- continue;
- Owned<CClusterQueryStateParam> threadReq = new CClusterQueryStateParam(this, context, cluster, querySetId, queries, checkAllNodes);
- PooledThreadHandle handle = clusterQueryStatePool->start( threadReq.getClear() );
- threadHandles.append(handle);
- }
- //block for worker threads to finish, if necessary and then collect results
- //Not use joinAll() because multiple threads may call this method. Each call uses the pool to create
- //its own threads of checking query state. Each call should only join the ones created by that call.
- ForEachItemIn(ii, threadHandles)
- clusterQueryStatePool->join(threadHandles.item(ii));
- }
- bool CWsWorkunitsEx::onWUListQueries(IEspContext &context, IEspWUListQueriesRequest & req, IEspWUListQueriesResponse & resp)
- {
- bool descending = req.getDescending();
- const char *sortBy = req.getSortby();
- WUQuerySortField sortOrder[2] = {WUQSFId, WUQSFterm};
- if(notEmpty(sortBy))
- {
- if (strieq(sortBy, "Name"))
- sortOrder[0] = WUQSFname;
- else if (strieq(sortBy, "WUID"))
- sortOrder[0] = WUQSFwuid;
- else if (strieq(sortBy, "DLL"))
- sortOrder[0] = WUQSFdll;
- else if (strieq(sortBy, "Activated"))
- sortOrder[0] = WUQSFActivited;
- else if (strieq(sortBy, "MemoryLimit"))
- sortOrder[0] = (WUQuerySortField) (WUQSFmemoryLimit | WUQSFnumeric);
- else if (strieq(sortBy, "TimeLimit"))
- sortOrder[0] = (WUQuerySortField) (WUQSFtimeLimit | WUQSFnumeric);
- else if (strieq(sortBy, "WarnTimeLimit"))
- sortOrder[0] = (WUQuerySortField) (WUQSFwarnTimeLimit | WUQSFnumeric);
- else if (strieq(sortBy, "Priority"))
- sortOrder[0] = (WUQuerySortField) (WUQSFpriority | WUQSFnumeric);
- else if (strieq(sortBy, "PublishedBy"))
- sortOrder[0] = WUQSFPublishedBy;
- else if (strieq(sortBy, "QuerySetId"))
- sortOrder[0] = WUQSFQuerySet;
- else
- sortOrder[0] = WUQSFId;
- sortOrder[0] = (WUQuerySortField) (sortOrder[0] | WUQSFnocase);
- if (descending)
- sortOrder[0] = (WUQuerySortField) (sortOrder[0] | WUQSFreverse);
- }
- WUQuerySortField filters[16];
- unsigned short filterCount = 0;
- MemoryBuffer filterBuf;
- const char* clusterReq = req.getClusterName();
- addWUQSQueryFilter(filters, filterCount, filterBuf, req.getQuerySetName(), WUQSFQuerySet);
- addWUQSQueryFilter(filters, filterCount, filterBuf, req.getQueryID(), (WUQuerySortField) (WUQSFId | WUQSFwild | WUSFnocase));
- addWUQSQueryFilter(filters, filterCount, filterBuf, req.getQueryName(), (WUQuerySortField) (WUQSFname | WUQSFwild | WUSFnocase));
- addWUQSQueryFilter(filters, filterCount, filterBuf, req.getWUID(), (WUQuerySortField) (WUQSFwuid | WUSFnocase));
- addWUQSQueryFilter(filters, filterCount, filterBuf, req.getLibraryName(), (WUQuerySortField) (WUQSFLibrary | WUQSFnocase));
- addWUQSQueryFilter(filters, filterCount, filterBuf, req.getPublishedBy(), (WUQuerySortField) (WUQSFPublishedBy | WUQSFwild | WUSFnocase));
- if (!req.getMemoryLimitLow_isNull())
- addWUQSQueryFilterInt64(filters, filterCount, filterBuf, req.getMemoryLimitLow(), (WUQuerySortField) (WUQSFmemoryLimit | WUQSFnumeric));
- if (!req.getMemoryLimitHigh_isNull())
- addWUQSQueryFilterInt64(filters, filterCount, filterBuf, req.getMemoryLimitHigh(), (WUQuerySortField) (WUQSFmemoryLimitHi | WUQSFnumeric));
- if (!req.getTimeLimitLow_isNull())
- addWUQSQueryFilterInt(filters, filterCount, filterBuf, req.getTimeLimitLow(), (WUQuerySortField) (WUQSFtimeLimit | WUQSFnumeric));
- if (!req.getTimeLimitHigh_isNull())
- addWUQSQueryFilterInt(filters, filterCount, filterBuf, req.getTimeLimitHigh(), (WUQuerySortField) (WUQSFtimeLimitHi | WUQSFnumeric));
- if (!req.getWarnTimeLimitLow_isNull())
- addWUQSQueryFilterInt(filters, filterCount, filterBuf, req.getWarnTimeLimitLow(), (WUQuerySortField) (WUQSFwarnTimeLimit | WUQSFnumeric));
- if (!req.getWarnTimeLimitHigh_isNull())
- addWUQSQueryFilterInt(filters, filterCount, filterBuf, req.getWarnTimeLimitHigh(), (WUQuerySortField) (WUQSFwarnTimeLimitHi | WUQSFnumeric));
- if (!req.getPriorityLow_isNull())
- addWUQSQueryFilterInt(filters, filterCount, filterBuf, req.getPriorityLow(), (WUQuerySortField) (WUQSFpriority | WUQSFnumeric));
- if (!req.getPriorityHigh_isNull())
- addWUQSQueryFilterInt(filters, filterCount, filterBuf, req.getPriorityHigh(), (WUQuerySortField) (WUQSFpriorityHi | WUQSFnumeric));
- if (!req.getActivated_isNull())
- addWUQSQueryFilterInt(filters, filterCount, filterBuf, req.getActivated(), (WUQuerySortField) (WUQSFActivited | WUQSFnumeric));
- if (!req.getSuspendedByUser_isNull())
- addWUQSQueryFilterInt(filters, filterCount, filterBuf, req.getSuspendedByUser(), (WUQuerySortField) (WUQSFSuspendedByUser | WUQSFnumeric));
- filters[filterCount] = WUQSFterm;
- unsigned numberOfQueries = 0;
- unsigned pageSize = req.getPageSize();
- unsigned pageStartFrom = req.getPageStartFrom();
- if(pageSize < 1)
- pageSize = 100;
- __int64 cacheHint = 0;
- if (!req.getCacheHint_isNull())
- cacheHint = req.getCacheHint();
- Owned<MapStringTo<bool> > queriesUsingFileMap;
- const char *lfn = req.getFileName();
- if (lfn && *lfn)
- {
- queriesUsingFileMap.setown(new MapStringTo<bool>());
- StringAttr dummy;
- Owned<IPropertyTreeIterator> queriesUsingFile = filesInUse.findQueriesUsingFile(clusterReq, lfn, dummy);
- ForEach (*queriesUsingFile)
- {
- IPropertyTree &queryUsingFile = queriesUsingFile->query();
- const char *queryTarget = queryUsingFile.queryProp("@target");
- const char *queryId = queryUsingFile.queryProp("@id");
- if (queryTarget && *queryTarget && queryId && *queryId)
- {
- VStringBuffer targetQuery("%s/%s", queryTarget, queryId);
- queriesUsingFileMap->setValue(targetQuery, true);
- }
- }
- }
- PROGLOG("WUListQueries: getQuerySetQueriesSorted");
- Owned<IWorkUnitFactory> factory = getWorkUnitFactory(context.querySecManager(), context.queryUser());
- Owned<IConstQuerySetQueryIterator> it = factory->getQuerySetQueriesSorted(sortOrder, filters, filterBuf.bufferBase(), pageStartFrom, pageSize, &cacheHint, &numberOfQueries, queriesUsingFileMap);
- resp.setCacheHint(cacheHint);
- PROGLOG("WUListQueries: getQuerySetQueriesSorted done");
- StringArray querySetIds;
- IArrayOf<IEspQuerySetQuery> queries;
- double version = context.getClientVersion();
- ForEach(*it)
- {
- IPropertyTree &query=it->query();
- const char *queryId = query.queryProp("@id");
- const char *queryTarget = query.queryProp("@querySetId");
- Owned<IEspQuerySetQuery> q = createQuerySetQuery();
- q->setId(queryId);
- q->setQuerySetId(queryTarget);
- q->setName(query.queryProp("@name"));
- q->setDll(query.queryProp("@dll"));
- q->setWuid(query.queryProp("@wuid"));
- q->setActivated(query.getPropBool("@activated", false));
- q->setSuspended(query.getPropBool("@suspended", false));
- if (query.hasProp("@memoryLimit"))
- {
- StringBuffer s;
- memoryLimitStringFromUInt64(s, query.getPropInt64("@memoryLimit"));
- q->setMemoryLimit(s);
- }
- if (query.hasProp("@timeLimit"))
- q->setTimeLimit(query.getPropInt("@timeLimit"));
- if (query.hasProp("@warnTimeLimit"))
- q->setWarnTimeLimit(query.getPropInt("@warnTimeLimit"));
- if (query.hasProp("@priority"))
- q->setPriority(getQueryPriorityName(query.getPropInt("@priority")));
- if (query.hasProp("@comment"))
- q->setComment(query.queryProp("@comment"));
- if (version >= 1.46)
- {
- q->setPublishedBy(query.queryProp("@publishedBy"));
- q->setIsLibrary(query.getPropBool("@isLibrary"));
- }
- if (!querySetIds.contains(queryTarget))
- querySetIds.append(queryTarget);
- queries.append(*q.getClear());
- }
- checkAndSetClusterQueryState(context, clusterReq, querySetIds, queries, req.getCheckAllNodes());
- resp.setQuerysetQueries(queries);
- resp.setNumberOfQueries(numberOfQueries);
- return true;
- }
- bool CWsWorkunitsEx::onWUListQueriesUsingFile(IEspContext &context, IEspWUListQueriesUsingFileRequest &req, IEspWUListQueriesUsingFileResponse &resp)
- {
- const char *target = req.getTarget();
- const char *process = req.getProcess();
- StringBuffer lfn(req.getFileName());
- resp.setFileName(lfn.toLowerCase());
- resp.setProcess(process);
- if (lfn.isEmpty())
- throw MakeStringException(ECLWATCH_MISSING_PARAMS, "FileName required");
- VStringBuffer logMsg("WUListQueriesUsingFile: %s", lfn.str());
- StringArray targets;
- if (target && *target)
- {
- targets.append(target);
- logMsg.append(", target ").append(target);
- }
- else // if (process && *process)
- {
- SCMStringBuffer targetStr;
- Owned<IStringIterator> targetClusters = getTargetClusters("RoxieCluster", process);
- ForEach(*targetClusters)
- targets.append(targetClusters->str(targetStr).str());
- logMsg.append(", process ").append(process);
- }
- PROGLOG("%s", logMsg.str());
- IArrayOf<IEspTargetQueriesUsingFile> respTargets;
- ForEachItemIn(i, targets)
- {
- target = targets.item(i);
- Owned<IEspTargetQueriesUsingFile> respTarget = createTargetQueriesUsingFile();
- respTarget->setTarget(target);
- StringAttr pmid;
- Owned<IPropertyTreeIterator> queries = filesInUse.findQueriesUsingFile(target, lfn, pmid);
- if (!pmid.isEmpty())
- respTarget->setPackageMap(pmid);
- if (queries)
- {
- IArrayOf<IEspQueryUsingFile> respQueries;
- ForEach(*queries)
- {
- IPropertyTree &query = queries->query();
- Owned<IEspQueryUsingFile> q = createQueryUsingFile();
- q->setId(query.queryProp("@id"));
- VStringBuffer xpath("File[@lfn='%s']/@pkgid", lfn.str());
- if (query.hasProp(xpath))
- q->setPackage(query.queryProp(xpath));
- respQueries.append(*q.getClear());
- }
- respTarget->setQueries(respQueries);
- }
- respTargets.append(*respTarget.getClear());
- }
- resp.setTargets(respTargets);
- return true;
- }
- bool CWsWorkunitsEx::onWUQueryFiles(IEspContext &context, IEspWUQueryFilesRequest &req, IEspWUQueryFilesResponse &resp)
- {
- const char *target = req.getTarget();
- const char *query = req.getQueryId();
- if (!target || !*target)
- throw MakeStringException(ECLWATCH_QUERYSET_NOT_FOUND, "Target not specified");
- if (!isValidCluster(target))
- throw MakeStringException(ECLWATCH_INVALID_CLUSTER_NAME, "Invalid target name: %s", target);
- if (!query || !*query)
- throw MakeStringException(ECLWATCH_QUERYID_NOT_FOUND, "Query not specified");
- Owned<IPropertyTree> registeredQuery = resolveQueryAlias(target, query, true);
- if (!registeredQuery)
- throw MakeStringException(ECLWATCH_QUERYID_NOT_FOUND, "Query not found");
- PROGLOG("WUQueryFiles: target %s, query %s", target, query);
- StringAttr queryid(registeredQuery->queryProp("@id"));
- registeredQuery.clear();
- Owned<IPropertyTree> tree = filesInUse.getTree();
- VStringBuffer xpath("%s/Query[@id='%s']", target, queryid.get());
- IPropertyTree *queryTree = tree->queryPropTree(xpath);
- if (!queryTree)
- throw MakeStringException(ECLWATCH_QUERYID_NOT_FOUND, "Query not found in file cache (%s)", xpath.str());
- IArrayOf<IEspFileUsedByQuery> referencedFiles;
- Owned<IPropertyTreeIterator> files = queryTree->getElements("File");
- ForEach(*files)
- {
- IPropertyTree &file = files->query();
- if (file.getPropBool("@super", 0))
- continue;
- Owned<IEspFileUsedByQuery> respFile = createFileUsedByQuery();
- respFile->setFileName(file.queryProp("@lfn"));
- respFile->setFileSize(file.getPropInt64("@size"));
- respFile->setNumberOfParts(file.getPropInt("@numparts"));
- referencedFiles.append(*respFile.getClear());
- }
- resp.setFiles(referencedFiles);
- return true;
- }
- void copyWorkunitForRecompile(IEspContext &context, IWorkUnitFactory *factory, const char *srcWuid, StringAttr &wuid, StringAttr &jobname)
- {
- Owned<IConstWorkUnit> src(factory->openWorkUnit(srcWuid));
- if (!src)
- throw MakeStringException(ECLWATCH_CANNOT_OPEN_WORKUNIT,"Cannot open workunit %s.", srcWuid);
- WsWuInfo info(context, src);
- StringBuffer archiveText;
- info.getWorkunitArchiveQuery(archiveText); //archive required, fail otherwise
- if (!isArchiveQuery(archiveText))
- throw MakeStringException(ECLWATCH_RESOURCE_NOT_FOUND,"Cannot retrieve workunit ECL archive %s.", srcWuid);
- SCMStringBuffer mainDefinition;
- Owned <IConstWUQuery> query = src->getQuery();
- if (query)
- query->getQueryMainDefinition(mainDefinition);
- NewWsWorkunit wu(factory, context);
- wuid.set(wu->queryWuid());
- wu->setAction(WUActionCompile);
- SCMStringBuffer token;
- wu->setSecurityToken(createToken(wuid.str(), context.queryUserId(), context.queryPassword(), token).str());
- jobname.set(src->queryJobName());
- if (jobname.length())
- wu->setJobName(jobname);
- wu.setQueryText(archiveText.str());
- if (mainDefinition.length())
- wu.setQueryMain(mainDefinition.str());
- wu->setResultLimit(src->getResultLimit());
- IStringIterator &names = src->getDebugValues();
- ForEach(names)
- {
- SCMStringBuffer name, value;
- names.str(name);
- if (0==strncmp(name.str(), "eclcc", 5))
- wu->setDebugValue(name.str(), src->getDebugValue(name.str(), value).str(), true);
- }
- }
- bool CWsWorkunitsEx::onWURecreateQuery(IEspContext &context, IEspWURecreateQueryRequest &req, IEspWURecreateQueryResponse &resp)
- {
- try
- {
- const char* srcTarget = req.getTarget();
- const char* queryIdOrAlias = req.getQueryId();
- if (!srcTarget || !*srcTarget)
- throw MakeStringException(ECLWATCH_QUERYSET_NOT_FOUND, "Target not specified");
- if (!queryIdOrAlias || !*queryIdOrAlias)
- throw MakeStringException(ECLWATCH_QUERYID_NOT_FOUND, "QueryId not specified");
- const char *target = req.getDestTarget();
- if (isEmptyString(target))
- target = srcTarget;
- Owned<IPropertyTree> queryRegistry = getQueryRegistry(srcTarget, false);
- Owned<IPropertyTree> srcQueryTree = resolveQueryAlias(queryRegistry, queryIdOrAlias);
- if (!srcQueryTree)
- {
- DBGLOG("WURecreateQuery - No matching Query");
- throw MakeStringException(ECLWATCH_QUERYID_NOT_FOUND,"No matching query for given id or alias %s.", queryIdOrAlias);
- }
- resp.setPriority(isEmptyString(req.getPriority()) ? srcQueryTree->queryProp("@priority") : req.getPriority());
- resp.setComment(isEmptyString(req.getComment()) ? srcQueryTree->queryProp("@comment") : req.getComment());
- resp.setMemoryLimit(isEmptyString(req.getMemoryLimit()) ? srcQueryTree->queryProp("@memoryLimit") : req.getMemoryLimit());
- resp.setTimeLimit(req.getTimeLimit_isNull() ? srcQueryTree->getPropInt("@timeLimit") : req.getTimeLimit());
- resp.setWarnTimeLimit(req.getWarnTimeLimit_isNull() ? srcQueryTree->getPropInt("@warnTimeLimit") : req.getWarnTimeLimit());
- StringAttr wuid;
- StringAttr jobname;
- const char* srcQueryId = srcQueryTree->queryProp("@id");
- const char* srcQueryName = srcQueryTree->queryProp("@name");
- const char *srcWuid = srcQueryTree->queryProp("@wuid");
- PROGLOG("WURecreateQuery: QuerySet %s, query %s, wuid %s", srcTarget, srcQueryId, srcWuid);
- ensureWsWorkunitAccess(context, srcWuid, SecAccess_Write);
- Owned<IWorkUnitFactory> factory = getWorkUnitFactory(context.querySecManager(), context.queryUser());
- copyWorkunitForRecompile(context, factory, srcWuid, wuid, jobname);
- resp.setWuid(wuid);
- WsWuHelpers::submitWsWorkunit(context, wuid.str(), target, NULL, 0, true, false, false, NULL, NULL, &req.getDebugValues());
- waitForWorkUnitToCompile(wuid.str(), req.getWait());
- Owned<IConstWorkUnit> cw(factory->openWorkUnit(wuid.str()));
- if (!cw)
- throw MakeStringException(ECLWATCH_CANNOT_OPEN_WORKUNIT,"Cannot open recreated workunit %s.",wuid.str());
- if (jobname.length())
- {
- StringBuffer name;
- origValueChanged(jobname.str(), cw->queryJobName(), name, false);
- if (name.length()) //non generated user specified name, so override #Workunit('name')
- {
- WorkunitUpdate wx(&cw->lock());
- wx->setJobName(name.str());
- }
- }
- PROGLOG("WURecreateQuery generated: %s", wuid.str());
- AuditSystemAccess(context.queryUserId(), true, "Updated %s", wuid.str());
- queryRegistry.clear();
- srcQueryTree.clear();
- if (req.getRepublish())
- {
- if (!req.getDontCopyFiles())
- {
- StringBuffer daliIP;
- StringBuffer srcCluster;
- StringBuffer srcPrefix;
- splitDerivedDfsLocation(req.getRemoteDali(), srcCluster, daliIP, srcPrefix, req.getSourceProcess(),req.getSourceProcess(), NULL, NULL);
- if (srcCluster.length())
- {
- if (!isProcessCluster(daliIP, srcCluster))
- throw MakeStringException(ECLWATCH_INVALID_CLUSTER_NAME, "Process cluster %s not found on %s DALI", srcCluster.str(), daliIP.length() ? daliIP.str() : "local");
- }
- unsigned updateFlags = 0;
- if (req.getUpdateDfs())
- updateFlags |= (DALI_UPDATEF_SUPERFILES | DALI_UPDATEF_REPLACE_FILE | DALI_UPDATEF_CLONE_FROM);
- if (req.getUpdateCloneFrom())
- updateFlags |= DALI_UPDATEF_CLONE_FROM;
- if (req.getUpdateSuperFiles())
- updateFlags |= DALI_UPDATEF_SUPERFILES;
- if (req.getAppendCluster())
- updateFlags |= DALI_UPDATEF_APPEND_CLUSTER;
- QueryFileCopier cpr(target);
- cpr.init(context, req.getAllowForeignFiles());
- cpr.remoteIP.set(daliIP);
- cpr.remotePrefix.set(srcPrefix);
- cpr.srcCluster.set(srcCluster);
- cpr.queryname.set(srcQueryName);
- cpr.copy(cw, updateFlags);
- if (req.getIncludeFileErrors())
- cpr.gatherFileErrors(resp.getFileErrors());
- }
- StringBuffer queryId;
- WorkunitUpdate wu(&cw->lock());
- WUQueryActivationOptions activate = (WUQueryActivationOptions)req.getActivate();
- addQueryToQuerySet(wu, target, srcQueryName, activate, queryId, context.queryUserId());
- {
- Owned<IPropertyTree> queryTree = getQueryById(target, queryId, false);
- if (queryTree)
- {
- queryTree->setProp("@priority", resp.getPriority());
- updateMemoryLimitSetting(queryTree, resp.getMemoryLimit());
- updateQuerySetting(resp.getTimeLimit_isNull(), queryTree, "@timeLimit", resp.getTimeLimit());
- updateQuerySetting(resp.getWarnTimeLimit_isNull(), queryTree, "@warnTimeLimit", resp.getWarnTimeLimit());
- updateQueryPriority(queryTree, resp.getPriority());
- queryTree->setProp("@comment", resp.getComment());
- }
- }
- wu->commit();
- wu.clear();
- PROGLOG("WURecreateQuery published: %s as %s/%s", wuid.str(), target, queryId.str());
- resp.setQuerySet(target);
- resp.setQueryName(srcQueryName);
- resp.setQueryId(queryId.str());
- Owned <IConstWUClusterInfo> clusterInfo = getTargetClusterInfo(target);
- bool reloadFailed = false;
- if (0!=req.getWait() && !req.getNoReload())
- reloadFailed = !reloadCluster(clusterInfo, (unsigned)req.getWait());
- resp.setReloadFailed(reloadFailed);
- StringBuffer errorMessage;
- if (!reloadFailed && !req.getNoReload() && isQuerySuspended(queryId.str(), clusterInfo, (unsigned)req.getWait(), errorMessage))
- {
- resp.setSuspended(true);
- resp.setErrorMessage(errorMessage);
- }
- }
- }
- catch(IException* e)
- {
- FORWARDEXCEPTION(context, e, ECLWATCH_INTERNAL_ERROR);
- }
- return true;
- }
- bool CWsWorkunitsEx::onWUQueryDetails(IEspContext &context, IEspWUQueryDetailsRequest & req, IEspWUQueryDetailsResponse & resp)
- {
- const char* querySet = req.getQuerySet();
- const char* queryIdOrAlias = req.getQueryId();
- bool includeStateOnClusters = req.getIncludeStateOnClusters();
- if (!querySet || !*querySet)
- throw MakeStringException(ECLWATCH_QUERYSET_NOT_FOUND, "QuerySet not specified");
- if (!queryIdOrAlias || !*queryIdOrAlias)
- throw MakeStringException(ECLWATCH_QUERYID_NOT_FOUND, "QueryId not specified");
- Owned<IPropertyTree> queryRegistry = getQueryRegistry(querySet, false);
- Owned<IPropertyTree> query = resolveQueryAlias(queryRegistry, queryIdOrAlias);
- if (!query)
- {
- DBGLOG("No matching Query");
- throw MakeStringException(ECLWATCH_QUERYID_NOT_FOUND,"No matching query for given id or alias %s.", queryIdOrAlias);
- }
- const char* queryId = query->queryProp("@id");
- resp.setQueryId(queryId);
- resp.setQuerySet(querySet);
- PROGLOG("WUQueryDetails: QuerySet %s, query %s", querySet, queryId);
- const char* queryName = query->queryProp("@name");
- const char* wuid = query->queryProp("@wuid");
- resp.setQueryName(queryName);
- resp.setWuid(wuid);
- resp.setDll(query->queryProp("@dll"));
- resp.setPublishedBy(query->queryProp("@publishedBy"));
- resp.setSuspended(query->getPropBool("@suspended", false));
- resp.setSuspendedBy(query->queryProp("@suspendedBy"));
- resp.setComment(query->queryProp("@comment"));
- double version = context.getClientVersion();
- if (version >= 1.46)
- {
- Owned<IWorkUnitFactory> factory = getWorkUnitFactory(context.querySecManager(), context.queryUser());
- Owned<IConstWorkUnit> cw = factory->openWorkUnit(wuid);
- if(!cw)
- throw MakeStringException(ECLWATCH_CANNOT_UPDATE_WORKUNIT,"Cannot open workunit %s.",wuid);
- if (query->hasProp("@priority"))
- resp.setPriority(getQueryPriorityName(query->getPropInt("@priority")));
- resp.setIsLibrary(query->getPropBool("@isLibrary"));
- SCMStringBuffer s;
- resp.setWUSnapShot(cw->getSnapshot(s).str()); //Label
- stat_type whenCompiled;
- if (cw->getStatistic(whenCompiled, "", StWhenCompiled))
- {
- formatStatistic(s.s.clear(), whenCompiled, StWhenCompiled);
- resp.setCompileTime(s.str());
- }
- StringArray libUsed, graphIds;
- Owned<IConstWULibraryIterator> libs = &cw->getLibraries();
- ForEach(*libs)
- libUsed.append(libs->query().getName(s).str());
- if (libUsed.length())
- resp.setLibrariesUsed(libUsed);
- if (version < 1.64)
- {
- unsigned numGraphIds = getGraphIdsByQueryId(querySet, queryId, graphIds);
- resp.setCountGraphs(numGraphIds);
- if (numGraphIds > 0)
- resp.setGraphIds(graphIds);
- }
- }
- StringArray logicalFiles;
- IArrayOf<IEspQuerySuperFile> superFiles;
- getQueryFiles(context, wuid, queryId, querySet, logicalFiles, req.getIncludeSuperFiles() ? &superFiles : NULL);
- if (logicalFiles.length())
- resp.setLogicalFiles(logicalFiles);
- if (superFiles.length())
- resp.setSuperFiles(superFiles);
- if (version >= 1.42)
- {
- VStringBuffer xpath("Alias[@id='%s']", queryId);
- IPropertyTree *alias = queryRegistry->queryPropTree(xpath.str());
- if (!alias)
- resp.setActivated(false);
- else
- resp.setActivated(true);
- }
- if (includeStateOnClusters && (version >= 1.43))
- {
- Owned<IPropertyTree> queriesOnCluster = getQueriesOnCluster(querySet, querySet, req.getCheckAllNodes());
- if (queriesOnCluster)
- {
- IArrayOf<IEspClusterQueryState> clusterStates;
- addClusterQueryStates(queriesOnCluster, querySet, queryId, clusterStates, version);
- resp.setClusters(clusterStates);
- }
- }
- if (version >= 1.50)
- {
- WsWuInfo winfo(context, wuid);
- resp.setResourceURLCount(winfo.getResourceURLCount());
- if (version >= 1.64)
- {
- IArrayOf<IEspECLTimer> timers;
- winfo.doGetTimers(timers); //Graph Duration
- if (timers.length())
- resp.setWUTimers(timers);
- IArrayOf<IEspECLGraph> graphs;
- winfo.doGetGraphs(graphs); //Graph Name, Label, Started, Finished, Type
- unsigned numGraphIds = graphs.length();
- resp.setCountGraphs(numGraphIds);
- if (numGraphIds > 0)
- resp.setWUGraphs(graphs);
- }
- }
- if (req.getIncludeWsEclAddresses())
- {
- StringArray wseclAddresses;
- Owned<IEnvironmentFactory> factory = getEnvironmentFactory(true);
- Owned<IConstEnvironment> env = factory->openEnvironment();
- Owned<IPropertyTree> root = &env->getPTree();
- Owned<IPropertyTreeIterator> services = root->getElements("Software/EspService[Properties/@type='ws_ecl']");
- StringArray serviceNames;
- VStringBuffer xpath("Target[@name='%s']", querySet);
- ForEach(*services)
- {
- IPropertyTree &service = services->query();
- if (!service.hasProp("Target") || service.hasProp(xpath))
- serviceNames.append(service.queryProp("@name"));
- }
- Owned<IPropertyTreeIterator> processes = root->getElements("Software/EspProcess");
- ForEach(*processes)
- {
- StringArray netAddrs;
- IPropertyTree &process = processes->query();
- Owned<IPropertyTreeIterator> instances = process.getElements("Instance");
- ForEach(*instances)
- {
- IPropertyTree &instance = instances->query();
- const char *netAddr = instance.queryProp("@netAddress");
- if (!netAddr || !*netAddr)
- continue;
- if (streq(netAddr, "."))
- netAddrs.appendUniq(envLocalAddress); //not necessarily local to this server
- else
- netAddrs.appendUniq(netAddr);
- }
- Owned<IPropertyTreeIterator> bindings = process.getElements("EspBinding");
- ForEach(*bindings)
- {
- IPropertyTree &binding = bindings->query();
- const char *srvName = binding.queryProp("@service");
- if (!serviceNames.contains(srvName))
- continue;
- const char *port = binding.queryProp("@port"); //should always be an integer, but we're just concatenating strings
- if (!port || !*port)
- continue;
- ForEachItemIn(i, netAddrs)
- {
- VStringBuffer wseclAddr("%s:%s", netAddrs.item(i), port);
- wseclAddresses.append(wseclAddr);
- }
- }
- }
- resp.setWsEclAddresses(wseclAddresses);
- }
- return true;
- }
- int EspQuerySuperFileCompareFunc(IInterface * const *i1, IInterface * const *i2)
- {
- if (!i1 || !*i1 || !i2 || !*i2)
- return 0;
- IEspQuerySuperFile *sf1 = QUERYINTERFACE(*i1, IEspQuerySuperFile);
- IEspQuerySuperFile *sf2 = QUERYINTERFACE(*i2, IEspQuerySuperFile);
- if (!sf1 || !sf2)
- return 0;
- const char *name1 = sf1->getName();
- const char *name2 = sf2->getName();
- if (!name1 || !name2)
- return 0;
- return strcmp(name1, name2);
- }
- IReferencedFile* CWsWorkunitsEx::getReferencedFileByName(const char* name, IReferencedFileList* wufiles)
- {
- Owned<IReferencedFileIterator> refFileItr = wufiles->getFiles();
- ForEach(*refFileItr)
- {
- IReferencedFile& rf = refFileItr->query();
- const char* lfn = rf.getLogicalName();
- if (lfn && strieq(lfn, name))
- return &rf;
- }
- return NULL;
- }
- void CWsWorkunitsEx::readSuperFiles(IEspContext &context, IReferencedFile* rf, const char* fileName, IReferencedFileList* wufiles, IArrayOf<IEspQuerySuperFile>* files)
- {
- double version = context.getClientVersion();
- StringArray subFiles;
- IArrayOf<IEspQuerySuperFile> superFiles;
- const StringArray& subFileNames = rf->getSubFileNames();
- ForEachItemIn(i, subFileNames)
- {
- const char* name = subFileNames.item(i);
- if (!name || !*name)
- continue;
- IReferencedFile* pRF = getReferencedFileByName(name, wufiles);
- if (!pRF)
- continue;
- if (!(pRF->getFlags() & RefFileSuper))
- {
- subFiles.append(name);
- }
- else if (version >= 1.57)
- {
- readSuperFiles(context, pRF, name, wufiles, &superFiles);
- }
- }
- Owned<IEspQuerySuperFile> newSuperFile = createQuerySuperFile();
- newSuperFile->setName(fileName);
- if (subFiles.length())
- {
- subFiles.sortAscii();
- newSuperFile->setSubFiles(subFiles);
- }
- if ((version >= 1.57) && superFiles.length())
- {
- superFiles.sort(EspQuerySuperFileCompareFunc);
- newSuperFile->setSuperFiles(superFiles);
- }
- files->append(*newSuperFile.getClear());
- }
- bool CWsWorkunitsEx::getQueryFiles(IEspContext &context, const char* wuid, const char* query, const char* target, StringArray& logicalFiles, IArrayOf<IEspQuerySuperFile> *respSuperFiles)
- {
- try
- {
- Owned<IConstWUClusterInfo> info = getTargetClusterInfo(target);
- if (!info || (info->getPlatform()!=RoxieCluster))
- return false;
- SCMStringBuffer process;
- info->getRoxieProcess(process);
- if (!process.length())
- return false;
- Owned<IWorkUnitFactory> factory = getWorkUnitFactory();
- Owned<IConstWorkUnit> cw = factory->openWorkUnit(wuid);
- if (!cw)
- return false;
- StringArray superFileNames;
- Owned<IHpccPackageSet> ps = createPackageSet(process.str());
- Owned<IReferencedFileList> wufiles = createReferencedFileList(context.queryUserId(),
- context.queryPassword(), true, true);
- wufiles->addFilesFromQuery(cw, (ps) ? ps->queryActiveMap(target) : NULL, query);
- wufiles->resolveFiles(process.str(), NULL, NULL, NULL, true, true, true, true);
- Owned<IReferencedFileIterator> refFileItr = wufiles->getFiles();
- ForEach(*refFileItr)
- {
- IReferencedFile &rf = refFileItr->query();
- const char *lfn = rf.getLogicalName();
- if (lfn && *lfn)
- {
- logicalFiles.append(lfn);
- if (respSuperFiles && (rf.getFlags() & RefFileSuper))
- readSuperFiles(context, &rf, lfn, wufiles, respSuperFiles);
- }
- }
- logicalFiles.sortAscii();
- if (respSuperFiles)
- respSuperFiles->sort(EspQuerySuperFileCompareFunc);
- return true;
- }
- catch(IMultiException *me)
- {
- StringBuffer err;
- DBGLOG("ERROR control:getQueryXrefInfo roxie query info %s", me->errorMessage(err.append(me->errorCode()).append(' ')).str());
- me->Release();
- return false;
- }
- catch(IException *e)
- {
- StringBuffer err;
- DBGLOG("ERROR control:getQueryXrefInfo roxie query info %s", e->errorMessage(err.append(e->errorCode()).append(' ')).str());
- e->Release();
- return false;
- }
- }
- inline void verifyQueryActionAllowsWild(bool &allowWildChecked, CQuerySetQueryActionTypes action)
- {
- if (allowWildChecked)
- return;
- switch (action)
- {
- case CQuerySetQueryActionTypes_ToggleSuspend:
- throw MakeStringException(ECLWATCH_INVALID_ACTION, "Wildcards not supported for toggling suspended state");
- case CQuerySetQueryActionTypes_Activate:
- throw MakeStringException(ECLWATCH_INVALID_ACTION, "Wildcards not supported for Activating queries");
- }
- allowWildChecked=true;
- }
- inline bool verifyQueryActionAllowsAlias(CQuerySetQueryActionTypes action)
- {
- return (action!=CQuerySetQueryActionTypes_Activate && action!=CQuerySetQueryActionTypes_RemoveAllAliases);
- }
- void expandQueryActionTargetList(IProperties *queryIds, IPropertyTree *queryset, IArrayOf<IConstQuerySetQueryActionItem> &items, CQuerySetQueryActionTypes action)
- {
- bool allowWildChecked=false;
- ForEachItemIn(i, items)
- {
- const char *itemId = items.item(i).getQueryId();
- if (!isWildString(itemId))
- {
- bool suspendedByUser = false;
- const char *itemSuspendState = items.item(i).getClientState().getSuspended();
- if (itemSuspendState && (strieq(itemSuspendState, "By User") || strieq(itemSuspendState, "1")))
- suspendedByUser = true;
- if (!verifyQueryActionAllowsAlias(action))
- queryIds->setProp(itemId, suspendedByUser);
- else
- {
- Owned<IPropertyTree> query = resolveQueryAlias(queryset, itemId);
- if (query)
- {
- const char *id = query->queryProp("@id");
- if (id && *id)
- queryIds->setProp(id, suspendedByUser);
- }
- }
- }
- else
- {
- verifyQueryActionAllowsWild(allowWildChecked, action);
- if (verifyQueryActionAllowsAlias(action))
- {
- Owned<IPropertyTreeIterator> active = queryset->getElements("Alias");
- ForEach(*active)
- {
- const char *name = active->query().queryProp("@name");
- const char *id = active->query().queryProp("@id");
- if (name && id && WildMatch(name, itemId))
- queryIds->setProp(id, 0);
- }
- }
- Owned<IPropertyTreeIterator> queries = queryset->getElements("Query");
- ForEach(*queries)
- {
- const char *id = queries->query().queryProp("@id");
- if (id && WildMatch(id, itemId))
- queryIds->setProp(id, 0);
- }
- }
- }
- }
- void expandQueryActionTargetList(IProperties *queryIds, IPropertyTree *queryset, const char *id, CQuerySetQueryActionTypes action)
- {
- IArrayOf<IConstQuerySetQueryActionItem> items;
- Owned<IEspQuerySetQueryActionItem> item = createQuerySetQueryActionItem();
- item->setQueryId(id);
- items.append(*(IConstQuerySetQueryActionItem*)item.getClear());
- expandQueryActionTargetList(queryIds, queryset, items, action);
- }
- bool CWsWorkunitsEx::onWUQueryConfig(IEspContext &context, IEspWUQueryConfigRequest & req, IEspWUQueryConfigResponse & resp)
- {
- StringAttr target(req.getTarget());
- if (target.isEmpty())
- throw MakeStringException(ECLWATCH_MISSING_PARAMS, "Target name required");
- if (!isValidCluster(target))
- throw MakeStringException(ECLWATCH_INVALID_CLUSTER_NAME, "Invalid target name: %s", target.get());
- Owned<IPropertyTree> queryset = getQueryRegistry(target.get(), false);
- if (!queryset)
- throw MakeStringException(ECLWATCH_QUERYSET_NOT_FOUND, "Target Queryset %s not found", req.getTarget());
- PROGLOG("WUQueryConfig: target %s", target.get());
- Owned<IProperties> queryIds = createProperties();
- expandQueryActionTargetList(queryIds, queryset, req.getQueryId(), QuerySetQueryActionTypes_Undefined);
- IArrayOf<IEspWUQueryConfigResult> results;
- Owned<IPropertyIterator> it = queryIds->getIterator();
- ForEach(*it)
- {
- Owned<IEspWUQueryConfigResult> result = createWUQueryConfigResult();
- result->setQueryId(it->getPropKey());
- VStringBuffer xpath("Query[@id='%s']", it->getPropKey());
- IPropertyTree *queryTree = queryset->queryPropTree(xpath);
- if (queryTree)
- {
- updateMemoryLimitSetting(queryTree, req.getMemoryLimit());
- updateQueryPriority(queryTree, req.getPriority());
- updateQuerySetting(req.getTimeLimit_isNull(), queryTree, "@timeLimit", req.getTimeLimit());
- updateQuerySetting(req.getWarnTimeLimit_isNull(), queryTree, "@warnTimeLimit", req.getWarnTimeLimit());
- if (req.getComment())
- queryTree->setProp("@comment", req.getComment());
- }
- results.append(*result.getClear());
- }
- resp.setResults(results);
- bool reloadFailed = false;
- if (0!=req.getWait() && !req.getNoReload())
- reloadFailed = !reloadCluster(target.get(), (unsigned)req.getWait());
- resp.setReloadFailed(reloadFailed);
- return true;
- }
- bool CWsWorkunitsEx::onWUQuerysetQueryAction(IEspContext &context, IEspWUQuerySetQueryActionRequest & req, IEspWUQuerySetQueryActionResponse & resp)
- {
- resp.setQuerySetName(req.getQuerySetName());
- resp.setAction(req.getAction());
- if (isEmpty(req.getQuerySetName()))
- throw MakeStringException(ECLWATCH_MISSING_PARAMS, "Queryset name required");
- Owned<IPropertyTree> queryset = getQueryRegistry(req.getQuerySetName(), true);
- if (!queryset)
- throw MakeStringException(ECLWATCH_QUERYSET_NOT_FOUND, "Queryset %s not found", req.getQuerySetName());
- Owned<IProperties> queryIds = createProperties();
- expandQueryActionTargetList(queryIds, queryset, req.getQueries(), req.getAction());
- if (req.getAction() == CQuerySetQueryActionTypes_ResetQueryStats)
- return resetQueryStats(context, req.getQuerySetName(), queryIds, resp);
- IArrayOf<IEspQuerySetQueryActionResult> results;
- Owned<IPropertyIterator> it = queryIds->getIterator();
- ForEach(*it)
- {
- const char *id = it->getPropKey();
- Owned<IEspQuerySetQueryActionResult> result = createQuerySetQueryActionResult();
- result->setQueryId(id);
- try
- {
- Owned<IPropertyTree> query = getQueryById(queryset, id);
- if (!query)
- throw MakeStringException(ECLWATCH_QUERYID_NOT_FOUND, "Query %s/%s not found.", req.getQuerySetName(), id);
- CQuerySetQueryActionTypes action = req.getAction();
- const char* strAction = (action > -1) && (action < NumOfQuerySetQueryActionTypes) ? QuerySetQueryActionTypes[action] : "Undefined";
- PROGLOG("%s: queryset %s, query %s", strAction, req.getQuerySetName(), id);
- switch (action)
- {
- case CQuerySetQueryActionTypes_ToggleSuspend:
- setQuerySuspendedState(queryset, id, !queryIds->getPropBool(id), context.queryUserId());
- break;
- case CQuerySetQueryActionTypes_Suspend:
- setQuerySuspendedState(queryset, id, true, context.queryUserId());
- break;
- case CQuerySetQueryActionTypes_Unsuspend:
- setQuerySuspendedState(queryset, id, false, NULL);
- break;
- case CQuerySetQueryActionTypes_Activate:
- setQueryAlias(queryset, query->queryProp("@name"), id);
- break;
- case CQuerySetQueryActionTypes_Delete:
- removeNamedQuery(queryset, id);
- query.clear();
- break;
- case CQuerySetQueryActionTypes_RemoveAllAliases:
- removeAliasesFromNamedQuery(queryset, id);
- break;
- }
- result->setSuccess(true);
- if (query)
- result->setSuspended(query->getPropBool("@suspended"));
- }
- catch(IException *e)
- {
- StringBuffer msg;
- result->setMessage(e->errorMessage(msg).str());
- result->setCode(e->errorCode());
- result->setSuccess(false);
- e->Release();
- }
- results.append(*result.getClear());
- }
- resp.setResults(results);
- return true;
- }
- bool CWsWorkunitsEx::onWUQuerysetAliasAction(IEspContext &context, IEspWUQuerySetAliasActionRequest &req, IEspWUQuerySetAliasActionResponse &resp)
- {
- resp.setQuerySetName(req.getQuerySetName());
- resp.setAction(req.getAction());
- if (isEmpty(req.getQuerySetName()))
- throw MakeStringException(ECLWATCH_MISSING_PARAMS, "Queryset name required");
- Owned<IPropertyTree> queryset = getQueryRegistry(req.getQuerySetName(), true);
- if (!queryset)
- throw MakeStringException(ECLWATCH_QUERYSET_NOT_FOUND, "Queryset %s not found", req.getQuerySetName());
- IArrayOf<IEspQuerySetAliasActionResult> results;
- ForEachItemIn(i, req.getAliases())
- {
- IConstQuerySetAliasActionItem& item=req.getAliases().item(i);
- Owned<IEspQuerySetAliasActionResult> result = createQuerySetAliasActionResult();
- try
- {
- VStringBuffer xpath("Alias[@name='%s']", item.getName());
- IPropertyTree *alias = queryset->queryPropTree(xpath.str());
- if (!alias)
- throw MakeStringException(ECLWATCH_ALIAS_NOT_FOUND, "Alias %s/%s not found.", req.getQuerySetName(), item.getName());
- CQuerySetAliasActionTypes action = req.getAction();
- const char* strAction = (action > -1) && (action < NumOfQuerySetAliasActionTypes) ? QuerySetAliasActionTypes[action] : "Undefined";
- PROGLOG("%s: queryset %s, alias %s", strAction, req.getQuerySetName(), item.getName());
- switch (action)
- {
- case CQuerySetAliasActionTypes_Deactivate:
- removeQuerySetAlias(req.getQuerySetName(), item.getName());
- break;
- }
- result->setSuccess(true);
- }
- catch(IException *e)
- {
- StringBuffer msg;
- result->setMessage(e->errorMessage(msg).str());
- result->setCode(e->errorCode());
- result->setSuccess(false);
- e->Release();
- }
- results.append(*result.getClear());
- }
- resp.setResults(results);
- return true;
- }
- #define QUERYPATH_SEP_CHAR '/'
- bool nextQueryPathNode(const char *&path, StringBuffer &node)
- {
- if (*path==QUERYPATH_SEP_CHAR)
- path++;
- while (*path && *path!=QUERYPATH_SEP_CHAR)
- node.append(*path++);
- return (*path && *++path);
- }
- bool splitQueryPath(const char *path, StringBuffer &netAddress, StringBuffer &queryset, StringBuffer *query)
- {
- if (!path || !*path)
- return false;
- if (*path==QUERYPATH_SEP_CHAR && path[1]==QUERYPATH_SEP_CHAR)
- {
- path+=2;
- if (!nextQueryPathNode(path, netAddress))
- return false;
- }
- if (!nextQueryPathNode(path, queryset))
- return (query==NULL);
- if (!query)
- return false;
- if (nextQueryPathNode(path, *query))
- return false; //query path too deep
- return true;
- }
- IPropertyTree *fetchRemoteQuerySetInfo(IEspContext *context, const char *srcAddress, const char *srcTarget)
- {
- if (!srcAddress || !*srcAddress || !srcTarget || !*srcTarget)
- return NULL;
- VStringBuffer url("http://%s%s/WsWorkunits/WUQuerysetDetails.xml?ver_=1.51&QuerySetName=%s&FilterType=All", srcAddress, (!strchr(srcAddress, ':')) ? ":8010" : "", srcTarget);
- Owned<IHttpClientContext> httpCtx = getHttpClientContext();
- Owned<IHttpClient> httpclient = httpCtx->createHttpClient(NULL, url);
- const char *user = context->queryUserId();
- if (user && *user)
- httpclient->setUserID(user);
- const char *pw = context->queryPassword();
- if (pw && *pw)
- httpclient->setPassword(pw);
- StringBuffer request; //empty
- StringBuffer response;
- StringBuffer status;
- if (0 > httpclient->sendRequest("GET", NULL, request, response, status) || !response.length() || strncmp("200", status, 3))
- throw MakeStringException(ECLWATCH_QUERYSET_NOT_FOUND, "Error fetching remote queryset information: %s %s %s", srcAddress, srcTarget, status.str());
- return createPTreeFromXMLString(response);
- }
- class QueryCloner
- {
- public:
- QueryCloner(IEspContext *_context, const char *address, const char *source, const char *_target) :
- context(_context), target(_target), srcAddress(address)
- {
- if (srcAddress.length())
- srcQuerySet.setown(fetchRemoteQuerySetInfo(context, srcAddress, source));
- else
- srcQuerySet.setown(getQueryRegistry(source, true));
- if (!srcQuerySet)
- throw MakeStringException(ECLWATCH_QUERYSET_NOT_FOUND, "Source Queryset %s %s not found", srcAddress.str(), source);
- destQuerySet.setown(getQueryRegistry(target, false));
- if (!destQuerySet) // getQueryRegistry should have created if not found
- throw MakeStringException(ECLWATCH_QUERYSET_NOT_FOUND, "Destination Queryset %s could not be created, or found", target.str());
- factory.setown(getWorkUnitFactory(context->querySecManager(), context->queryUser()));
- }
- QueryCloner(IEspContext *_context, IPropertyTree *srcTree, const char *_target) :
- context(_context), target(_target)
- {
- srcQuerySet.set(srcTree);
- destQuerySet.setown(getQueryRegistry(target, false));
- if (!destQuerySet) // getQueryRegistry should have created if not found
- throw MakeStringException(ECLWATCH_QUERYSET_NOT_FOUND, "Destination Queryset %s could not be created, or found", target.str());
- factory.setown(getWorkUnitFactory(context->querySecManager(), context->queryUser()));
- }
- void setQueryDirectory(const char *dir)
- {
- queryDirectory.set(dir);
- }
- void cloneQueryRemote(IPropertyTree *query, bool makeActive)
- {
- StringBuffer wuid = query->queryProp("Wuid");
- if (!wuid.length())
- return;
- const char *queryName = query->queryProp("Name");
- if (!queryName || !*queryName)
- return;
- StringBuffer xml;
- MemoryBuffer dll;
- StringBuffer dllname;
- StringBuffer fetchedName;
- StringBuffer remoteDfs;
- fetchRemoteWorkunit(NULL, context, srcAddress.str(), NULL, NULL, wuid, fetchedName, xml, dllname, dll, remoteDfs);
- deploySharedObject(*context, wuid, dllname, target, queryName, dll, queryDirectory, xml.str());
- SCMStringBuffer existingQueryId;
- queryIdFromQuerySetWuid(destQuerySet, wuid, queryName, existingQueryId);
- if (existingQueryId.length())
- {
- existingQueryIds.append(existingQueryId.str());
- if (makeActive)
- activateQuery(destQuerySet, ACTIVATE_SUSPEND_PREVIOUS, queryName, existingQueryId.str(), context->queryUserId());
- return;
- }
- StringBuffer newQueryId;
- Owned<IWorkUnit> workunit = factory->updateWorkUnit(wuid);
- addQueryToQuerySet(workunit, destQuerySet, queryName, makeActive ? ACTIVATE_SUSPEND_PREVIOUS : DO_NOT_ACTIVATE, newQueryId, context->queryUserId());
- copiedQueryIds.append(newQueryId);
- Owned<IPropertyTree> destQuery = getQueryById(destQuerySet, newQueryId);
- if (destQuery)
- {
- Owned<IAttributeIterator> aiter = query->getAttributes();
- ForEach(*aiter)
- {
- const char *atname = aiter->queryName();
- if (!destQuery->hasProp(atname))
- destQuery->setProp(atname, aiter->queryValue());
- }
- if (cloneFilesEnabled && wufiles)
- wufiles->addFilesFromQuery(workunit, pm, newQueryId);
- }
- }
- void cloneQueryLocal(IPropertyTree *query, bool makeActive)
- {
- const char *wuid = query->queryProp("@wuid");
- if (!wuid || !*wuid)
- return;
- const char *queryName = query->queryProp("@name");
- if (!queryName || !*queryName)
- return;
- SCMStringBuffer existingQueryId;
- queryIdFromQuerySetWuid(destQuerySet, wuid, queryName, existingQueryId);
- if (existingQueryId.length())
- {
- existingQueryIds.append(existingQueryId.str());
- if (makeActive)
- activateQuery(destQuerySet, ACTIVATE_SUSPEND_PREVIOUS, queryName, existingQueryId.str(), context->queryUserId());
- return;
- }
- StringBuffer newQueryId;
- Owned<IWorkUnit> workunit = factory->updateWorkUnit(wuid);
- if (!workunit)
- {
- StringBuffer msg(wuid);
- msg.append(": ").append(query->queryProp("@id"));
- missingWuids.append(msg);
- return;
- }
- if (!newQueryId.length())
- addQueryToQuerySet(workunit, destQuerySet, queryName, makeActive ? ACTIVATE_SUSPEND_PREVIOUS : DO_NOT_ACTIVATE, newQueryId, context->queryUserId());
- copiedQueryIds.append(newQueryId);
- Owned<IPropertyTree> destQuery = getQueryById(destQuerySet, newQueryId);
- if (destQuery)
- {
- Owned<IAttributeIterator> aiter = query->getAttributes();
- ForEach(*aiter)
- {
- const char *atname = aiter->queryName();
- if (!destQuery->hasProp(atname))
- destQuery->setProp(atname, aiter->queryValue());
- }
- Owned<IPropertyTreeIterator> children = query->getElements("*");
- ForEach(*children)
- {
- IPropertyTree &child = children->query();
- destQuery->addPropTree(child.queryName(), createPTreeFromIPT(&child));
- }
- if (cloneFilesEnabled && wufiles)
- wufiles->addFilesFromQuery(workunit, pm, newQueryId);
- }
- }
- void cloneActiveRemote(bool makeActive)
- {
- Owned<IPropertyTreeIterator> activeQueries = srcQuerySet->getElements("QuerysetAliases/QuerySetAlias");
- ForEach(*activeQueries)
- {
- IPropertyTree &alias = activeQueries->query();
- VStringBuffer xpath("QuerysetQueries/QuerySetQuery[Id='%s'][1]", alias.queryProp("Id"));
- IPropertyTree *query = srcQuerySet->queryPropTree(xpath);
- if (!query)
- continue;
- cloneQueryRemote(query, makeActive);
- }
- }
- void cloneActiveLocal(bool makeActive, const char *mask)
- {
- StringBuffer xpath("Alias");
- if (mask && *mask)
- xpath.appendf("[@id='%s']", mask);
- Owned<IPropertyTreeIterator> activeQueries = srcQuerySet->getElements(xpath);
- ForEach(*activeQueries)
- {
- IPropertyTree &alias = activeQueries->query();
- Owned<IPropertyTree> query = getQueryById(srcQuerySet, alias.queryProp("@id"));
- if (!query)
- return;
- cloneQueryLocal(query, makeActive);
- }
- }
- void cloneActive(bool makeActive)
- {
- if (srcAddress.length())
- cloneActiveRemote(makeActive);
- else
- cloneActiveLocal(makeActive, nullptr);
- }
- void cloneAllRemote(bool cloneActiveState)
- {
- Owned<IPropertyTreeIterator> allQueries = srcQuerySet->getElements("QuerysetQueries/QuerySetQuery");
- ForEach(*allQueries)
- {
- IPropertyTree &query = allQueries->query();
- bool makeActive = false;
- if (cloneActiveState)
- {
- VStringBuffer xpath("QuerysetAliases/QuerySetAlias[Id='%s']", query.queryProp("Id"));
- makeActive = srcQuerySet->hasProp(xpath);
- }
- cloneQueryRemote(&query, makeActive);
- }
- }
- void cloneAllLocal(bool cloneActiveState, const char *mask)
- {
- StringBuffer xpath("Query");
- if (mask && *mask)
- xpath.appendf("[@id='%s']", mask);
- Owned<IPropertyTreeIterator> allQueries = srcQuerySet->getElements(xpath);
- ForEach(*allQueries)
- {
- IPropertyTree &query = allQueries->query();
- bool makeActive = false;
- if (cloneActiveState)
- {
- VStringBuffer xpath("Alias[@id='%s']", query.queryProp("@id"));
- makeActive = srcQuerySet->hasProp(xpath);
- }
- cloneQueryLocal(&query, makeActive);
- }
- }
- void cloneAll(bool cloneActiveState)
- {
- if (srcAddress.length())
- cloneAllRemote(cloneActiveState);
- else
- cloneAllLocal(cloneActiveState, nullptr);
- }
- void enableFileCloning(unsigned _updateFlags, const char *dfsServer, const char *destProcess, const char *sourceProcess, bool allowForeign)
- {
- cloneFilesEnabled = true;
- updateFlags = _updateFlags;
- splitDerivedDfsLocation(dfsServer, srcCluster, dfsIP, srcPrefix, sourceProcess, sourceProcess, NULL, NULL);
- wufiles.setown(createReferencedFileList(context->queryUserId(), context->queryPassword(), allowForeign, false));
- Owned<IHpccPackageSet> ps = createPackageSet(destProcess);
- pm.set(ps->queryActiveMap(target));
- process.set(destProcess);
- }
- void cloneFiles()
- {
- if (cloneFilesEnabled)
- {
- wufiles->resolveFiles(process, dfsIP, srcPrefix, srcCluster, !(updateFlags & (DALI_UPDATEF_REPLACE_FILE | DALI_UPDATEF_CLONE_FROM)), true, false, true);
- Owned<IDFUhelper> helper = createIDFUhelper();
- Owned <IConstWUClusterInfo> cl = getTargetClusterInfo(target);
- if (cl)
- {
- SCMStringBuffer process;
- StringBuffer defReplicateFolder;
- getConfigurationDirectory(NULL, "data2", "roxie", cl->getRoxieProcess(process).str(), defReplicateFolder);
- wufiles->cloneAllInfo(updateFlags, helper, true, true, cl->getRoxieRedundancy(), cl->getChannelsPerNode(), cl->getRoxieReplicateOffset(), defReplicateFolder);
- }
- }
- }
- void gatherFileErrors(IArrayOf<IConstLogicalFileError> &errors)
- {
- ::gatherFileErrors(wufiles, errors);
- }
- private:
- Linked<IEspContext> context;
- Linked<IWorkUnitFactory> factory;
- Owned<IPropertyTree> destQuerySet;
- Owned<IPropertyTree> srcQuerySet;
- Owned<IReferencedFileList> wufiles;
- Owned<const IHpccPackageMap> pm;
- StringBuffer dfsIP;
- StringBuffer srcAddress;
- StringBuffer srcCluster;
- StringBuffer srcPrefix;
- StringAttr target;
- StringAttr process;
- StringAttr queryDirectory;
- bool cloneFilesEnabled = false;
- unsigned updateFlags = 0;
- public:
- StringArray existingQueryIds;
- StringArray copiedQueryIds;
- StringArray missingWuids;
- };
- bool CWsWorkunitsEx::onWUCopyQuerySet(IEspContext &context, IEspWUCopyQuerySetRequest &req, IEspWUCopyQuerySetResponse &resp)
- {
- const char *source = req.getSource();
- if (!source || !*source)
- throw MakeStringException(ECLWATCH_MISSING_PARAMS, "No source target specified");
- StringBuffer srcAddress;
- StringBuffer srcTarget;
- if (!splitQueryPath(source, srcAddress, srcTarget, NULL))
- throw MakeStringException(ECLWATCH_INVALID_INPUT, "Invalid source target");
- if (!srcAddress.length() && !isValidCluster(srcTarget))
- throw MakeStringException(ECLWATCH_INVALID_CLUSTER_NAME, "Invalid source target name: %s", source);
- const char *target = req.getTarget();
- if (!target || !*target)
- throw MakeStringException(ECLWATCH_MISSING_PARAMS, "No destination target specified");
- if (!isValidCluster(target))
- throw MakeStringException(ECLWATCH_INVALID_CLUSTER_NAME, "Invalid destination target name: %s", target);
- DBGLOG("%s copying queryset %s from %s target %s", context.queryUserId(), target, srcAddress.str(), srcTarget.str());
- QueryCloner cloner(&context, srcAddress, srcTarget, target);
- cloner.setQueryDirectory(queryDirectory);
- SCMStringBuffer process;
- if (req.getCopyFiles())
- {
- Owned <IConstWUClusterInfo> clusterInfo = getTargetClusterInfo(target);
- if (clusterInfo && clusterInfo->getPlatform()==RoxieCluster)
- {
- clusterInfo->getRoxieProcess(process);
- if (!process.length())
- throw MakeStringException(ECLWATCH_INVALID_CLUSTER_INFO, "DFS process cluster not found for destination target %s", target);
- unsigned updateFlags = 0;
- if (req.getOverwriteDfs())
- updateFlags |= (DALI_UPDATEF_REPLACE_FILE | DALI_UPDATEF_CLONE_FROM | DALI_UPDATEF_SUPERFILES);
- if (req.getUpdateCloneFrom())
- updateFlags |= DALI_UPDATEF_CLONE_FROM;
- if (req.getUpdateSuperFiles())
- updateFlags |= DALI_UPDATEF_SUPERFILES;
- if (req.getAppendCluster())
- updateFlags |= DALI_UPDATEF_APPEND_CLUSTER;
- cloner.enableFileCloning(updateFlags, req.getDfsServer(), process.str(), req.getSourceProcess(), req.getAllowForeignFiles());
- }
- }
- if (req.getActiveOnly())
- cloner.cloneActive(req.getCloneActiveState());
- else
- cloner.cloneAll(req.getCloneActiveState());
- cloner.cloneFiles();
- if (req.getIncludeFileErrors())
- cloner.gatherFileErrors(resp.getFileErrors());
- resp.setCopiedQueries(cloner.copiedQueryIds);
- resp.setExistingQueries(cloner.existingQueryIds);
- return true;
- }
- bool CWsWorkunitsEx::onWUQuerysetCopyQuery(IEspContext &context, IEspWUQuerySetCopyQueryRequest &req, IEspWUQuerySetCopyQueryResponse &resp)
- {
- unsigned start = msTick();
- const char *source = req.getSource();
- if (!source || !*source)
- throw MakeStringException(ECLWATCH_MISSING_PARAMS, "No source query specified");
- const char *target = req.getTarget();
- if (!target || !*target)
- throw MakeStringException(ECLWATCH_MISSING_PARAMS, "No destination specified");
- if (strchr(target, '/')) //for future use
- throw MakeStringException(ECLWATCH_INVALID_INPUT, "Invalid target queryset name");
- if (req.getCluster() && *req.getCluster() && !strieq(req.getCluster(), target)) //backward compatability check
- throw MakeStringException(ECLWATCH_INVALID_INPUT, "Invalid target cluster and queryset must match");
- if (!isValidCluster(target))
- throw MakeStringException(ECLWATCH_INVALID_CLUSTER_NAME, "Invalid target name: %s", target);
- StringBuffer srcAddress, srcQuerySet, srcQuery;
- if (!splitQueryPath(source, srcAddress, srcQuerySet, &srcQuery))
- throw MakeStringException(ECLWATCH_INVALID_INPUT, "Invalid source query path");
- StringAttr targetQueryName(req.getDestName());
- Owned<IClientWUQuerySetDetailsResponse> sourceQueryInfoResp;
- IConstQuerySetQuery *srcInfo=NULL;
- DBGLOG("%s copying query %s to target %s from %s target %s", context.queryUserId(), srcQuery.str(), target, srcAddress.str(), srcQuerySet.str());
- StringBuffer remoteIP;
- StringBuffer wuid;
- if (srcAddress.length())
- {
- StringBuffer xml;
- MemoryBuffer dll;
- StringBuffer dllname;
- StringBuffer queryName;
- fetchRemoteWorkunitAndQueryDetails(NULL, &context, srcAddress.str(), srcQuerySet.str(), srcQuery.str(), NULL, queryName, xml, dllname, dll, remoteIP, sourceQueryInfoResp);
- if (sourceQueryInfoResp && sourceQueryInfoResp->getQuerysetQueries().ordinality())
- srcInfo = &sourceQueryInfoResp->getQuerysetQueries().item(0);
- if (srcInfo)
- wuid.set(srcInfo->getWuid());
- if (targetQueryName.isEmpty())
- targetQueryName.set(queryName);
- deploySharedObject(context, wuid, dllname.str(), target, targetQueryName.get(), dll, queryDirectory.str(), xml.str());
- }
- else
- {
- //Could get the atributes without soap call, but this creates a common data structure shared with fetching remote query info
- //Get query attributes before resolveQueryAlias, to avoid deadlock
- sourceQueryInfoResp.setown(fetchQueryDetails(NULL, &context, NULL, srcQuerySet, srcQuery));
- if (sourceQueryInfoResp && sourceQueryInfoResp->getQuerysetQueries().ordinality())
- srcInfo = &sourceQueryInfoResp->getQuerysetQueries().item(0);
- Owned<IPropertyTree> queryset = getQueryRegistry(srcQuerySet.str(), true);
- if (!queryset)
- throw MakeStringException(ECLWATCH_QUERYSET_NOT_FOUND, "Source Queryset %s not found", srcQuery.str());
- Owned<IPropertyTree> query = resolveQueryAlias(queryset, srcQuery.str());
- if (!query)
- throw MakeStringException(ECLWATCH_QUERYSET_NOT_FOUND, "Source query %s not found", source);
- wuid.set(query->queryProp("@wuid"));
- if (targetQueryName.isEmpty())
- targetQueryName.set(query->queryProp("@name"));
- }
- Owned<IWorkUnitFactory> factory = getWorkUnitFactory(context.querySecManager(), context.queryUser());
- Owned<IConstWorkUnit> cw = factory->openWorkUnit(wuid.str());
- if (!req.getDontCopyFiles())
- {
- StringBuffer daliIP;
- StringBuffer srcCluster;
- StringBuffer srcPrefix;
- splitDerivedDfsLocation(req.getDaliServer(), srcCluster, daliIP, srcPrefix, req.getSourceProcess(), req.getSourceProcess(), remoteIP.str(), NULL);
- unsigned updateFlags = 0;
- if (req.getOverwrite())
- updateFlags |= (DALI_UPDATEF_REPLACE_FILE | DALI_UPDATEF_CLONE_FROM | DALI_UPDATEF_SUPERFILES);
- if (req.getUpdateCloneFrom())
- updateFlags |= DALI_UPDATEF_CLONE_FROM;
- if (req.getUpdateSuperFiles())
- updateFlags |= DALI_UPDATEF_SUPERFILES;
- if (req.getAppendCluster())
- updateFlags |= DALI_UPDATEF_APPEND_CLUSTER;
- QueryFileCopier cpr(target);
- cpr.init(context, req.getAllowForeignFiles());
- cpr.remoteIP.set(daliIP);
- cpr.remotePrefix.set(srcPrefix);
- cpr.srcCluster.set(srcCluster);
- cpr.queryname.set(targetQueryName);
- cpr.copy(cw, updateFlags);
- if (req.getIncludeFileErrors())
- cpr.gatherFileErrors(resp.getFileErrors());
- }
- WorkunitUpdate wu(&cw->lock());
- if (!wu)
- throw MakeStringException(ECLWATCH_CANNOT_OPEN_WORKUNIT, "Error opening wuid %s for query %s", wuid.str(), source);
- StringBuffer targetQueryId;
- WUQueryActivationOptions activate = (WUQueryActivationOptions)req.getActivate();
- addQueryToQuerySet(wu, target, targetQueryName.get(), activate, targetQueryId, context.queryUserId());
- Owned<IPropertyTree> queryTree = getQueryById(target, targetQueryId, false);
- if (queryTree)
- {
- updateMemoryLimitSetting(queryTree, req.getMemoryLimit(), srcInfo);
- updateQueryPriority(queryTree, req.getPriority(), srcInfo);
- updateTimeLimitSetting(queryTree, req.getTimeLimit_isNull(), req.getTimeLimit(), srcInfo);
- updateWarnTimeLimitSetting(queryTree, req.getWarnTimeLimit_isNull(), req.getWarnTimeLimit(), srcInfo);
- if (req.getComment())
- queryTree->setProp("@comment", req.getComment());
- else if (srcInfo && srcInfo->getComment())
- queryTree->setProp("@comment", srcInfo->getComment());
- if (srcInfo && srcInfo->getSnapshot())
- queryTree->setProp("@snapshot", srcInfo->getSnapshot());
- }
- wu.clear();
- resp.setQueryId(targetQueryId.str());
- if (0!=req.getWait() && !req.getNoReload())
- reloadCluster(target, remainingMsWait(req.getWait(), start));
- return true;
- }
- bool CWsWorkunitsEx::onWUQuerysetImport(IEspContext &context, IEspWUQuerysetImportRequest &req, IEspWUQuerysetImportResponse &resp)
- {
- try
- {
- const char* target = req.getTarget();
- if (!target || !*target)
- throw MakeStringException(ECLWATCH_QUERYSET_NOT_FOUND, "Target not specified");
- Owned <IConstWUClusterInfo> clusterInfo = getTargetClusterInfo(target);
- if (!clusterInfo)
- throw MakeStringException(ECLWATCH_CANNOT_RESOLVE_CLUSTER_NAME, "Target not found");
- if (req.getCopyFiles() && clusterInfo->getPlatform()!=RoxieCluster)
- throw MakeStringException(ECLWATCH_INVALID_ACTION, "Copy files option only supported for Roxie");
- MemoryBuffer &mb = const_cast<MemoryBuffer &>(req.getData()); //for efficiency, content of request shouldn't matter after
- if (req.getCompressed())
- {
- MemoryBuffer decompressed;
- fastLZDecompressToBuffer(decompressed, mb);
- mb.swapWith(decompressed);
- }
- mb.append('\0');
- Owned<IPropertyTree> srcTree = createPTreeFromXMLString(mb.toByteArray());
- const char *archivedTarget = srcTree->queryProp("@target");
- if (archivedTarget && *archivedTarget) //support simple queryset or with archived (exported) root format
- {
- VStringBuffer xpath("QuerySet[@id='%s']", archivedTarget);
- IPropertyTree *qsTree = srcTree->queryPropTree(xpath);
- if (qsTree)
- srcTree.setown(LINK(qsTree));
- }
- if (req.getReplace())
- {
- Owned<IPropertyTree> queryRegistry = getQueryRegistry(target, false);
- queryRegistry->removeProp("*");
- resp.setClearedExisting(true);
- }
- const bool activate = CQuerysetImportActivation_ImportedActive == req.getActivation(); //only two options now but may evolve
- QueryCloner cloner(&context, srcTree, target);
- SCMStringBuffer process;
- if (req.getCopyFiles())
- {
- clusterInfo->getRoxieProcess(process); //checked if roxie when copying files above
- if (!process.length())
- throw MakeStringException(ECLWATCH_INVALID_CLUSTER_INFO, "DFS process cluster not found for destination target %s", target);
- unsigned updateFlags = 0;
- if (req.getOverwriteDfs())
- updateFlags |= (DALI_UPDATEF_REPLACE_FILE | DALI_UPDATEF_CLONE_FROM | DALI_UPDATEF_SUPERFILES);
- if (req.getUpdateCloneFrom())
- updateFlags |= DALI_UPDATEF_CLONE_FROM;
- if (req.getUpdateSuperFiles())
- updateFlags |= DALI_UPDATEF_SUPERFILES;
- if (req.getAppendCluster())
- updateFlags |= DALI_UPDATEF_APPEND_CLUSTER;
- cloner.enableFileCloning(updateFlags, req.getDfsServer(), process.str(), req.getSourceProcess(), req.getAllowForeignFiles());
- }
- if (req.getActiveOnly())
- cloner.cloneActiveLocal(activate, req.getQueryMask());
- else
- cloner.cloneAllLocal(activate, req.getQueryMask());
- cloner.cloneFiles();
- if (req.getIncludeFileErrors())
- cloner.gatherFileErrors(resp.getFileErrors());
- resp.setImportedQueries(cloner.copiedQueryIds);
- resp.setExistingQueries(cloner.existingQueryIds);
- resp.setMissingWuids(cloner.missingWuids);
- }
- catch(IException* e)
- {
- FORWARDEXCEPTION(context, e, ECLWATCH_INTERNAL_ERROR);
- }
- return true;
- }
- bool CWsWorkunitsEx::onWUQuerysetExport(IEspContext &context, IEspWUQuerysetExportRequest &req, IEspWUQuerysetExportResponse &resp)
- {
- try
- {
- const char* target = req.getTarget();
- if (!target || !*target)
- throw MakeStringException(ECLWATCH_QUERYSET_NOT_FOUND, "Target not specified");
- Owned<IPropertyTree> queryRegistry = getQueryRegistry(target, true);
- if (req.getActiveOnly())
- {
- Owned<IPropertyTree> activeOnly = createPTree("QuerySet");
- Owned<IAttributeIterator> attrs = queryRegistry->getAttributes();
- ForEach(*attrs)
- activeOnly->setProp(attrs->queryName(), attrs->queryValue());
- Owned<IPropertyTreeIterator> aliases = queryRegistry->getElements("Alias");
- ForEach(*aliases)
- {
- IPropertyTree &alias = aliases->query();
- const char *id = alias.queryProp("@id");
- if (id && *id)
- {
- VStringBuffer xpath("Query[@id='%s']", id);
- IPropertyTree *query = queryRegistry->queryPropTree(xpath);
- if (query)
- {
- activeOnly->addPropTree("Query", LINK(query));
- activeOnly->addPropTree("Alias", LINK(&alias));
- }
- }
- }
- queryRegistry.setown(activeOnly.getClear());
- }
- if (req.getProtect())
- {
- StringArray wuids;
- Owned<IPropertyTreeIterator> queries = queryRegistry->getElements("Query");
- ForEach(*queries)
- {
- IPropertyTree &query = queries->query();
- const char *wuid = query.queryProp("@wuid");
- if (wuid && *wuid)
- wuids.append(wuid);
- }
- if (wuids.length())
- doProtectWorkunits(context, wuids, nullptr);
- }
- CDateTime dt;
- dt.setNow();
- StringBuffer dts;
- VStringBuffer qs("<QuerySetArchive exported='%s' target='%s' activeOnly='%s'>\n", dt.getString(dts, true).str(), target, req.getActiveOnly() ? "true" : "false");
- toXML(queryRegistry, qs);
- qs.append("</QuerySetArchive>");
- MemoryBuffer content;
- if (req.getCompress())
- fastLZCompressToBuffer(content, qs.length()+1, qs);
- else
- content.append(qs.str());
- resp.setTarget(target);
- resp.setCompressed(req.getCompress());
- resp.setData(content);
- }
- catch(IException* e)
- {
- FORWARDEXCEPTION(context, e, ECLWATCH_INTERNAL_ERROR);
- }
- return true;
- }
- void CWsWorkunitsEx::getGraphsByQueryId(const char *target, const char *queryId, const char *graphId, const char *subGraphId, IArrayOf<IEspECLGraphEx>& ECLGraphs)
- {
- if (!target || !*target)
- throw MakeStringException(ECLWATCH_MISSING_PARAMS, "Target name required");
- if (!queryId || !*queryId)
- throw MakeStringException(ECLWATCH_MISSING_PARAMS, "Query Id required");
- Owned<IConstWUClusterInfo> info = getTargetClusterInfo(target);
- if (!info || (info->getPlatform()!=RoxieCluster)) //Only support roxie for now
- throw MakeStringException(ECLWATCH_INVALID_CLUSTER_NAME, "Invalid Roxie name");
- PROGLOG("getGraphsByQueryId: target %s, query %s", target, queryId);
- const SocketEndpointArray &eps = info->getRoxieServers();
- if (eps.empty())
- return;
- VStringBuffer control("<control:querystats><Query id='%s'/></control:querystats>", queryId);
- Owned<IPropertyTree> querystats = sendRoxieControlAllNodes(eps.item(0), control.str(), false, ROXIELOCKCONNECTIONTIMEOUT);
- if (!querystats)
- return;
- Owned<IPropertyTreeIterator> graphs = querystats->getElements("Endpoint/Query/Graph");
- ForEach(*graphs)
- {
- IPropertyTree &graph = graphs->query();
- const char* aGraphId = graph.queryProp("@id");
- if (graphId && *graphId && !strieq(graphId, aGraphId))
- continue;
- IPropertyTree* xgmml = graph.getBranch("xgmml/graph");
- if (!xgmml)
- continue;
- Owned<IEspECLGraphEx> g = createECLGraphEx("","");
- g->setName(aGraphId);
- StringBuffer xml;
- if (!subGraphId || !*subGraphId)
- toXML(xgmml, xml);
- else
- {
- VStringBuffer xpath("//node[@id='%s']", subGraphId);
- toXML(xgmml->queryPropTree(xpath.str()), xml);
- }
- g->setGraph(xml.str());
- ECLGraphs.append(*g.getClear());
- }
- return;
- }
- bool CWsWorkunitsEx::onWUQueryGetGraph(IEspContext& context, IEspWUQueryGetGraphRequest& req, IEspWUQueryGetGraphResponse& resp)
- {
- try
- {
- IArrayOf<IEspECLGraphEx> graphs;
- getGraphsByQueryId(req.getTarget(), req.getQueryId(), req.getGraphName(), req.getSubGraphId(), graphs);
- resp.setGraphs(graphs);
- }
- catch(IException* e)
- {
- FORWARDEXCEPTION(context, e, ECLWATCH_INTERNAL_ERROR);
- }
- return true;
- }
- bool CWsWorkunitsEx::resetQueryStats(IEspContext& context, const char* target, IProperties* queryIds, IEspWUQuerySetQueryActionResponse& resp)
- {
- IArrayOf<IEspQuerySetQueryActionResult> results;
- Owned<IEspQuerySetQueryActionResult> result = createQuerySetQueryActionResult();
- try
- {
- StringBuffer control;
- Owned<IPropertyIterator> it = queryIds->getIterator();
- ForEach(*it)
- {
- const char *queryId = it->getPropKey();
- if (queryId && *queryId)
- {
- appendXMLOpenTag(control, "Query", NULL, false);
- appendXMLAttr(control, "id", queryId);
- if (target && *target)
- appendXMLAttr(control, "target", target);
- control.append("/>");
- }
- }
- if (!control.length())
- throw MakeStringException(ECLWATCH_MISSING_PARAMS, "CWsWorkunitsEx::resetQueryStats: Query ID not specified");
- control.insert(0, "<control:resetquerystats>");
- control.append("</control:resetquerystats>");
- if (!sendControlQuery(context, target, control.str(), ROXIECONNECTIONTIMEOUT))
- throw MakeStringException(ECLWATCH_INTERNAL_ERROR, "CWsWorkunitsEx::resetQueryStats: Failed to send roxie control query");
- result->setMessage("Query stats reset succeeded");
- result->setSuccess(true);;
- }
- catch(IMultiException *me)
- {
- StringBuffer msg;
- result->setMessage(me->errorMessage(msg).str());
- result->setCode(me->errorCode());
- result->setSuccess(false);
- me->Release();
- }
- catch(IException *e)
- {
- StringBuffer msg;
- result->setMessage(e->errorMessage(msg).str());
- result->setCode(e->errorCode());
- result->setSuccess(false);
- e->Release();
- }
- results.append(*result.getClear());
- resp.setResults(results);
- return true;
- }
- IPropertyTree* CWsWorkunitsEx::sendControlQuery(IEspContext& context, const char* target, const char* query, unsigned timeout)
- {
- if (!target || !*target)
- throw MakeStringException(ECLWATCH_MISSING_PARAMS, "CWsWorkunitsEx::sendControlQuery: target not specified");
- if (!query || !*query)
- throw MakeStringException(ECLWATCH_MISSING_PARAMS, "CWsWorkunitsEx::sendControlQuery: Control query not specified");
- Owned<IConstWUClusterInfo> info = getTargetClusterInfo(target);
- if (!info || (info->getPlatform()!=RoxieCluster)) //Only support roxie for now
- throw MakeStringException(ECLWATCH_INVALID_CLUSTER_NAME, "CWsWorkunitsEx::sendControlQuery: Invalid target name %s", target);
- const SocketEndpointArray &eps = info->getRoxieServers();
- if (eps.empty())
- throw MakeStringException(ECLWATCH_INVALID_CLUSTER_NAME, "CWsWorkunitsEx::sendControlQuery: Server not found for %s", target);
- Owned<ISocket> sock = ISocket::connect_timeout(eps.item(0), timeout);
- return sendRoxieControlQuery(sock, query, timeout);
- }
- bool CWsWorkunitsEx::onWUUpdateQueryEntry(IEspContext& context, IEspWUUpdateQueryEntryRequest& req, IEspWUUpdateQueryEntryResponse& resp)
- {
- try
- {
- StringBuffer querySetName, query;
- ensureInputString(req.getQuerySet(), true, querySetName, ECLWATCH_QUERYSET_NOT_FOUND, "Query Set not specified");
- ensureInputString(req.getQueryId(), true, query, ECLWATCH_QUERYID_NOT_FOUND, "Query not specified");
- Owned<IPropertyTree> querySet = getQueryRegistry(querySetName.str(), true);
- if (!querySet)
- throw MakeStringException(ECLWATCH_QUERYSET_NOT_FOUND, "Queryset %s not found", querySetName.str());
- VStringBuffer xpath("Query[@id=\"%s\"]", query.str());
- IPropertyTree *tree = querySet->queryPropTree(xpath);
- if (!tree)
- throw MakeStringException(ECLWATCH_QUERYSET_NOT_FOUND, "Query %s not found", query.str());
- StringBuffer comment = req.getComment();
- if (comment.isEmpty())
- tree->removeProp("@comment");
- else
- tree->setProp("@comment", comment.str());
- }
- catch(IException* e)
- {
- FORWARDEXCEPTION(context, e, ECLWATCH_INTERNAL_ERROR);
- }
- return true;
- }
- bool CWsWorkunitsEx::onWUGetNumFileToCopy(IEspContext& context, IEspWUGetNumFileToCopyRequest& req, IEspWUGetNumFileToCopyResponse& resp)
- {
- class CWUGetNumFileToCopyPager : public CSimpleInterface, implements IElementsPager
- {
- StringAttr clusterName;
- StringAttr sortOrder;
- public:
- IMPLEMENT_IINTERFACE_USING(CSimpleInterface);
- CWUGetNumFileToCopyPager(const char* _clusterName, const char *_sortOrder)
- : clusterName(_clusterName), sortOrder(_sortOrder) { };
- virtual IRemoteConnection* getElements(IArrayOf<IPropertyTree> &elements)
- {
- SocketEndpointArray servers;
- getRoxieProcessServers(clusterName.get(), servers);
- if (servers.length() < 1)
- {
- PROGLOG("WUGetNumFileToCopy: Process Server not found for %s", clusterName.get());
- return NULL;
- }
- Owned<IPropertyTree> result = sendRoxieControlAllNodes(servers.item(0), "<control:numfilestoprocess/>", false, ROXIELOCKCONNECTIONTIMEOUT);
- if (!result)
- {
- PROGLOG("WUGetNumFileToCopy: Empty result received for cluster %s", clusterName.get());
- return NULL;
- }
- Owned<IPropertyTreeIterator> iter = result->getElements("*");
- if (!iter)
- return NULL;
- StringArray unknownAttributes;
- sortElements(iter, sortOrder.get(), NULL, NULL, unknownAttributes, elements);
- return NULL;
- }
- virtual bool allMatchingElementsReceived() { return true; } //For now, roxie always returns all of matched items.
- };
- try
- {
- StringBuffer clusterName = req.getClusterName();
- if (clusterName.isEmpty())
- throw MakeStringException(ECLWATCH_CANNOT_RESOLVE_CLUSTER_NAME, "Cluster not specified");
- StringBuffer so;
- bool descending = req.getDescending();
- if (descending)
- so.set("-");
- const char *sortBy = req.getSortby();
- if (!isEmptyString(sortBy) && strieq(sortBy, "URL"))
- so.append("?@ep");
- else if (!isEmptyString(sortBy) && strieq(sortBy, "Status"))
- so.append("?Status");
- else
- so.append("#FilesToProcess/@value");
- unsigned pageSize = req.getPageSize();
- unsigned pageStartFrom = req.getPageStartFrom();
- if(pageSize < 1)
- pageSize = 100;
- __int64 cacheHint = 0;
- if (!req.getCacheHint_isNull())
- cacheHint = req.getCacheHint();
- unsigned numberOfEndpoints = 0;
- IArrayOf<IPropertyTree> results;
- Owned<IElementsPager> elementsPager = new CWUGetNumFileToCopyPager(clusterName.str(), so.str());
- getElementsPaged(elementsPager, pageStartFrom, pageSize, NULL, "", &cacheHint, results, &numberOfEndpoints, NULL, false);
- IArrayOf<IEspClusterEndpoint> endpoints;
- ForEachItemIn(i, results)
- {
- IPropertyTree &item = results.item(i);
- Owned<IEspClusterEndpoint> endpoint = createClusterEndpoint();
- endpoint->setURL(item.queryProp("@ep"));
- endpoint->setStatus(item.queryProp("Status"));
- endpoint->setNumQueryFileToCopy(item.getPropInt("FilesToProcess/@value", 0));
- endpoints.append(*endpoint.getClear());
- }
- resp.setEndpoints(endpoints);
- resp.setCacheHint(cacheHint);
- resp.setTotal(numberOfEndpoints);
- }
- catch(IException* e)
- {
- FORWARDEXCEPTION(context, e, ECLWATCH_INTERNAL_ERROR);
- }
- return true;
- }
- void getSummaryStatsByQueryId(const char *target, const char *queryId, const char *fromTime, const char *toTime, IArrayOf<IEspQuerySummaryStats>& querySummaryStatsList)
- {
- if (!target || !*target)
- throw MakeStringException(ECLWATCH_MISSING_PARAMS, "Target name required");
- if (!queryId || !*queryId)
- throw MakeStringException(ECLWATCH_MISSING_PARAMS, "Query Id required");
- Owned<IConstWUClusterInfo> info = getTargetClusterInfo(target);
- if (!info || (info->getPlatform()!=RoxieCluster)) //Only support roxie for now
- throw MakeStringException(ECLWATCH_INVALID_CLUSTER_NAME, "Invalid Roxie name");
- PROGLOG("getSummaryStatsByQueryId: target %s, query %s", target, queryId);
- const SocketEndpointArray &eps = info->getRoxieServers();
- if (eps.empty())
- return;
- VStringBuffer control("<control:queryAggregates");
- if (!isEmpty(fromTime))
- control.appendf(" from='%s'", fromTime);
- if (!isEmpty(toTime))
- control.appendf(" to='%s'", toTime);
- control.appendf("><Query id='%s'/></control:queryAggregates>", queryId);
- Owned<IPropertyTree> queryAggregates = sendRoxieControlAllNodes(eps.item(0), control.str(), false, ROXIELOCKCONNECTIONTIMEOUT);
- if (!queryAggregates)
- return;
- if (getEspLogLevel() >= LogMax)
- {
- StringBuffer sb;
- toXML(queryAggregates, sb);
- DBGLOG("getSummaryStatsByQueryId(): '%s' => '%s'", control.str(), sb.str());
- }
- //Parse queryAggregates and build querySummaryStatsList.
- Owned<IPropertyTreeIterator> aggregates = queryAggregates->getElements("Endpoint");
- ForEach(*aggregates)
- {
- IPropertyTree &aggregate = aggregates->query();
- const char *status = aggregate.queryProp("Status");
- const char *ep = aggregate.queryProp("@ep");
- if (isEmptyString(ep))
- continue;
- IPropertyTree *query = aggregate.queryPropTree("Query");
- Owned<IEspQuerySummaryStats> querySummaryStats = createQuerySummaryStats();
- querySummaryStats->setEndpoint(ep);
- if (query->hasProp("countFailed"))
- querySummaryStats->setCountFailed(query->getPropInt("countFailed"));
- if (query->hasProp("countTotal"))
- querySummaryStats->setCountTotal(query->getPropInt("countTotal"));
- if (query->hasProp("averageBytesOut"))
- querySummaryStats->setAverageBytesOut(query->getPropInt64("averageBytesOut"));
- if (query->hasProp("averageMemUsed"))
- querySummaryStats->setSizeAvgPeakMemory(query->getPropInt64("averageMemUsed"));
- if (query->hasProp("averageSlavesReplyLen"))
- querySummaryStats->setAverageSlavesReplyLen(query->getPropInt("averageSlavesReplyLen"));
- if (query->hasProp("averageTimeMs"))
- querySummaryStats->setTimeAvgTotalExecuteMinutes(query->getPropInt64("averageTimeMs"));
- if (query->hasProp("minTimeMs"))
- querySummaryStats->setTimeMinTotalExecuteMinutes(query->getPropInt64("minTimeMs"));
- if (query->hasProp("maxTimeMs"))
- querySummaryStats->setTimeMaxTotalExecuteMinutes(query->getPropInt64("maxTimeMs"));
- if (query->hasProp("percentile97"))
- {
- querySummaryStats->setPercentile97(query->getPropInt("percentile97"));
- if (query->hasProp("percentile97/@estimate"))
- querySummaryStats->setPercentile97Estimate(query->getPropBool("percentile97/@estimate"));
- }
- const char *startTime = query->queryProp("startTime");
- const char *endTime = query->queryProp("endTime");
- if (!isEmptyString(startTime))
- querySummaryStats->setStartTime(startTime);
- if (!isEmptyString(endTime))
- querySummaryStats->setEndTime(endTime);
- if (!isEmptyString(status))
- querySummaryStats->setStatus(status);
- querySummaryStatsList.append(*querySummaryStats.getLink());
- }
- return;
- }
- bool CWsWorkunitsEx::onWUQueryGetSummaryStats(IEspContext& context, IEspWUQueryGetSummaryStatsRequest& req, IEspWUQueryGetSummaryStatsResponse& resp)
- {
- try
- {
- IArrayOf<IEspQuerySummaryStats> querySummaryStatsList;
- getSummaryStatsByQueryId(req.getTarget(), req.getQueryId(), req.getFromTime(), req.getToTime(), querySummaryStatsList);
- resp.setStatsList(querySummaryStatsList);
- }
- catch(IException* e)
- {
- FORWARDEXCEPTION(context, e, ECLWATCH_INTERNAL_ERROR);
- }
- return true;
- }
|