12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214121512161217121812191220122112221223122412251226122712281229123012311232123312341235123612371238123912401241124212431244124512461247124812491250125112521253125412551256125712581259126012611262126312641265126612671268126912701271127212731274127512761277127812791280128112821283128412851286128712881289129012911292129312941295129612971298129913001301130213031304130513061307130813091310131113121313131413151316131713181319132013211322132313241325132613271328132913301331133213331334133513361337133813391340134113421343134413451346134713481349135013511352135313541355135613571358135913601361136213631364136513661367136813691370137113721373137413751376137713781379138013811382138313841385138613871388138913901391139213931394139513961397139813991400140114021403140414051406140714081409141014111412141314141415141614171418141914201421142214231424142514261427142814291430143114321433143414351436143714381439144014411442144314441445144614471448144914501451145214531454145514561457145814591460146114621463146414651466146714681469147014711472147314741475147614771478147914801481148214831484148514861487148814891490149114921493149414951496149714981499150015011502150315041505150615071508150915101511151215131514151515161517151815191520152115221523152415251526152715281529153015311532153315341535153615371538153915401541154215431544154515461547154815491550155115521553155415551556155715581559156015611562156315641565156615671568156915701571157215731574157515761577157815791580158115821583158415851586158715881589159015911592159315941595159615971598159916001601160216031604160516061607160816091610161116121613161416151616161716181619162016211622162316241625162616271628162916301631163216331634163516361637163816391640164116421643164416451646164716481649165016511652165316541655165616571658165916601661166216631664166516661667166816691670167116721673167416751676167716781679168016811682168316841685168616871688168916901691169216931694169516961697169816991700170117021703170417051706170717081709171017111712171317141715171617171718171917201721172217231724172517261727172817291730173117321733173417351736173717381739174017411742174317441745174617471748174917501751175217531754175517561757175817591760176117621763176417651766176717681769177017711772177317741775177617771778177917801781178217831784178517861787178817891790179117921793179417951796179717981799180018011802180318041805180618071808180918101811181218131814181518161817181818191820182118221823182418251826182718281829183018311832183318341835183618371838183918401841184218431844184518461847184818491850185118521853185418551856185718581859186018611862186318641865186618671868186918701871187218731874187518761877187818791880188118821883188418851886188718881889189018911892189318941895189618971898189919001901190219031904190519061907190819091910191119121913191419151916191719181919192019211922192319241925192619271928192919301931193219331934193519361937193819391940194119421943194419451946194719481949195019511952195319541955195619571958195919601961196219631964196519661967196819691970197119721973197419751976197719781979198019811982198319841985198619871988198919901991199219931994199519961997199819992000200120022003200420052006200720082009201020112012201320142015201620172018201920202021202220232024202520262027202820292030203120322033203420352036203720382039204020412042204320442045204620472048204920502051205220532054205520562057205820592060206120622063206420652066206720682069207020712072207320742075207620772078207920802081208220832084208520862087208820892090209120922093209420952096209720982099210021012102210321042105210621072108210921102111211221132114211521162117211821192120212121222123212421252126212721282129213021312132213321342135213621372138213921402141214221432144214521462147214821492150215121522153215421552156215721582159216021612162216321642165216621672168216921702171217221732174217521762177217821792180218121822183218421852186218721882189219021912192219321942195219621972198219922002201220222032204220522062207220822092210221122122213221422152216221722182219222022212222222322242225222622272228222922302231223222332234223522362237223822392240224122422243224422452246224722482249225022512252225322542255225622572258225922602261226222632264226522662267226822692270227122722273227422752276227722782279228022812282228322842285228622872288228922902291229222932294229522962297229822992300230123022303230423052306230723082309231023112312231323142315231623172318231923202321232223232324232523262327232823292330233123322333233423352336233723382339234023412342234323442345234623472348234923502351235223532354235523562357235823592360236123622363236423652366236723682369237023712372237323742375237623772378237923802381238223832384238523862387238823892390239123922393239423952396239723982399240024012402240324042405240624072408240924102411241224132414241524162417241824192420242124222423242424252426242724282429243024312432243324342435243624372438243924402441244224432444244524462447244824492450245124522453245424552456245724582459246024612462246324642465246624672468246924702471247224732474247524762477247824792480248124822483248424852486248724882489249024912492249324942495249624972498249925002501250225032504250525062507250825092510251125122513251425152516251725182519252025212522252325242525252625272528252925302531253225332534253525362537253825392540254125422543254425452546254725482549255025512552255325542555255625572558255925602561256225632564256525662567256825692570257125722573257425752576257725782579258025812582258325842585258625872588258925902591259225932594259525962597259825992600260126022603260426052606260726082609261026112612261326142615261626172618261926202621262226232624262526262627262826292630263126322633263426352636263726382639264026412642264326442645264626472648264926502651265226532654265526562657265826592660266126622663266426652666266726682669267026712672267326742675267626772678267926802681268226832684268526862687268826892690269126922693269426952696269726982699270027012702270327042705270627072708270927102711271227132714271527162717271827192720272127222723272427252726272727282729273027312732273327342735273627372738273927402741274227432744274527462747274827492750275127522753275427552756275727582759276027612762276327642765276627672768276927702771277227732774277527762777277827792780278127822783278427852786278727882789279027912792279327942795279627972798279928002801280228032804280528062807280828092810281128122813281428152816281728182819282028212822282328242825282628272828282928302831283228332834283528362837283828392840284128422843284428452846284728482849285028512852285328542855285628572858285928602861286228632864286528662867286828692870287128722873287428752876287728782879288028812882288328842885288628872888288928902891289228932894289528962897289828992900290129022903290429052906290729082909291029112912291329142915291629172918291929202921292229232924292529262927292829292930293129322933293429352936293729382939294029412942294329442945294629472948294929502951295229532954295529562957295829592960296129622963296429652966296729682969297029712972297329742975297629772978297929802981298229832984298529862987298829892990299129922993299429952996299729982999300030013002300330043005300630073008300930103011301230133014301530163017301830193020302130223023302430253026302730283029303030313032303330343035303630373038303930403041304230433044304530463047304830493050305130523053305430553056305730583059306030613062306330643065306630673068306930703071307230733074307530763077307830793080308130823083308430853086308730883089309030913092309330943095309630973098309931003101310231033104310531063107310831093110311131123113311431153116311731183119312031213122312331243125312631273128312931303131313231333134313531363137313831393140314131423143314431453146314731483149315031513152315331543155315631573158315931603161316231633164316531663167316831693170317131723173317431753176317731783179318031813182318331843185318631873188318931903191319231933194319531963197319831993200320132023203320432053206320732083209321032113212321332143215321632173218321932203221322232233224322532263227322832293230323132323233323432353236323732383239324032413242324332443245324632473248324932503251325232533254325532563257325832593260326132623263326432653266326732683269327032713272327332743275327632773278327932803281328232833284328532863287328832893290329132923293329432953296329732983299330033013302330333043305330633073308330933103311331233133314331533163317331833193320332133223323332433253326332733283329333033313332333333343335333633373338333933403341334233433344334533463347334833493350335133523353335433553356335733583359336033613362336333643365336633673368336933703371337233733374337533763377337833793380338133823383338433853386338733883389339033913392339333943395339633973398339934003401340234033404340534063407340834093410341134123413341434153416341734183419342034213422342334243425342634273428342934303431343234333434343534363437343834393440344134423443344434453446344734483449345034513452345334543455345634573458345934603461346234633464346534663467346834693470347134723473347434753476347734783479348034813482348334843485348634873488348934903491349234933494349534963497349834993500350135023503350435053506350735083509351035113512351335143515351635173518351935203521352235233524352535263527352835293530353135323533353435353536353735383539354035413542354335443545354635473548354935503551355235533554355535563557355835593560356135623563 |
- /*##############################################################################
- HPCC SYSTEMS software Copyright (C) 2012 HPCC Systems®.
- Licensed under the Apache License, Version 2.0 (the "License");
- you may not use this file except in compliance with the License.
- You may obtain a copy of the License at
- http://www.apache.org/licenses/LICENSE-2.0
- Unless required by applicable law or agreed to in writing, software
- distributed under the License is distributed on an "AS IS" BASIS,
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- See the License for the specific language governing permissions and
- limitations under the License.
- ############################################################################## */
- #include "ws_workunitsService.hpp"
- #include "ws_fs.hpp"
- #include "jlib.hpp"
- #include "jflz.hpp"
- #include "daclient.hpp"
- #include "dalienv.hpp"
- #include "dadfs.hpp"
- #include "dfuwu.hpp"
- #include "eclhelper.hpp"
- #include "roxiecontrol.hpp"
- #include "dfuutil.hpp"
- #include "dautils.hpp"
- #include "httpclient.hpp"
- #include "portlist.h" //ROXIE_SERVER_PORT
- #define DALI_FILE_LOOKUP_TIMEOUT (1000*15*1) // 15 seconds
- const unsigned ROXIECONNECTIONTIMEOUT = 1000; //1 second
- const unsigned ROXIECONTROLQUERYTIMEOUT = 3000; //3 second
- const unsigned ROXIECONTROLQUERIESTIMEOUT = 30000; //30 second
- const unsigned ROXIELOCKCONNECTIONTIMEOUT = 60000; //60 second
- #define SDS_LOCK_TIMEOUT (5*60*1000) // 5mins, 30s a bit short
- //The CQuerySetQueryActionTypes[] has to match with the ESPenum QuerySetQueryActionTypes in the ecm file.
- static unsigned NumOfQuerySetQueryActionTypes = 7;
- static const char *QuerySetQueryActionTypes[] = { "Suspend", "Unsuspend", "ToggleSuspend", "Activate",
- "Delete", "RemoveAllAliases", "ResetQueryStats", NULL };
- //The CQuerySetAliasActionTypes[] has to match with the ESPenum QuerySetAliasActionTypes in the ecm file.
- static unsigned NumOfQuerySetAliasActionTypes = 1;
- static const char *QuerySetAliasActionTypes[] = { "Deactivate", NULL };
- bool isRoxieProcess(const char *process)
- {
- if (!process)
- return false;
- Owned<IEnvironmentFactory> factory = getEnvironmentFactory(true);
- Owned<IConstEnvironment> env = factory->openEnvironment();
- Owned<IPropertyTree> root = &env->getPTree();
- VStringBuffer xpath("Software/RoxieCluster[@name=\"%s\"]", process);
- return root->hasProp(xpath.str());
- }
- void checkUseEspOrDaliIP(SocketEndpoint &ep, const char *ip, const char *esp)
- {
- if (!ip || !*ip)
- return;
- ep.set(ip, 7070);
- if (ep.isLoopBack() || *ip=='.' || (ip[0]=='0' && ip[1]=='.'))
- ep.ipset(esp);
- }
- void ensureInputString(const char* input, bool lowerCase, StringBuffer& inputStr, int code, const char* msg)
- {
- inputStr.set(input).trim();
- if (inputStr.isEmpty())
- throw MakeStringException(code, "%s", msg);
- if (lowerCase)
- inputStr.toLowerCase();
- }
- static IClientWsWorkunits *ensureWsWorkunitsClient(IClientWsWorkunits *ws, IEspContext *ctx, const char *netAddress)
- {
- if (ws)
- return LINK(ws);
- StringBuffer url;
- if (netAddress && *netAddress)
- url.appendf("http://%s%s/WsWorkunits", netAddress, (!strchr(netAddress, ':')) ? ":8010" : "");
- else
- {
- if (!ctx)
- throw MakeStringException(ECLWATCH_INVALID_IP, "Missing WsWorkunits service address");
- StringBuffer ip;
- short port = 0;
- ctx->getServAddress(ip, port);
- url.appendf("http://%s:%d/WsWorkunits", ip.str(), port);
- }
- Owned<IClientWsWorkunits> cws = createWsWorkunitsClient();
- cws->addServiceUrl(url);
- if (ctx && ctx->queryUserId() && *ctx->queryUserId())
- cws->setUsernameToken(ctx->queryUserId(), ctx->queryPassword(), NULL);
- return cws.getClear();
- }
- IClientWUQuerySetDetailsResponse *fetchQueryDetails(IClientWsWorkunits *_ws, IEspContext *ctx, const char *netAddress, const char *target, const char *queryid)
- {
- Owned<IClientWsWorkunits> ws = ensureWsWorkunitsClient(_ws, ctx, netAddress);
- //using existing WUQuerysetDetails rather than extending WUQueryDetails, to support copying query meta data from prior releases
- Owned<IClientWUQuerySetDetailsRequest> reqQueryInfo = ws->createWUQuerysetDetailsRequest();
- reqQueryInfo->setClusterName(target);
- reqQueryInfo->setQuerySetName(target);
- reqQueryInfo->setFilter(queryid);
- reqQueryInfo->setFilterType("Id");
- return ws->WUQuerysetDetails(reqQueryInfo);
- }
- void fetchRemoteWorkunit(IClientWsWorkunits *_ws, IEspContext *ctx, const char *netAddress, const char *queryset, const char *query, const char *wuid, StringBuffer &name, StringBuffer &xml, StringBuffer &dllname, MemoryBuffer &dll, StringBuffer &daliServer)
- {
- Owned<IClientWsWorkunits> ws = ensureWsWorkunitsClient(_ws, ctx, netAddress);
- Owned<IClientWULogFileRequest> req = ws->createWUFileRequest();
- if (queryset && *queryset)
- req->setQuerySet(queryset);
- if (query && *query)
- req->setQuery(query);
- if (wuid && *wuid)
- req->setWuid(wuid);
- req->setErrorMessageFormat(CErrorMessageFormat_XML);
- req->setType("xml");
- Owned<IClientWULogFileResponse> resp = ws->WUFile(req);
- if (!resp || resp->getExceptions().ordinality() || !resp->getThefile().length())
- throw MakeStringException(ECLWATCH_CANNOT_GET_WORKUNIT, "Cannot retrieve remote workunit");
- xml.append(resp->getThefile().length(), resp->getThefile().toByteArray());
- req->setType("dll");
- resp.setown(ws->WUFile(req));
- if (!resp || resp->getExceptions().ordinality() || !resp->getThefile().length())
- throw MakeStringException(ECLWATCH_CANNOT_GET_WORKUNIT, "Cannot retrieve remote workunit shared object");
- dll.append(resp->getThefile());
- dllname.append(resp->getFileName());
- name.append(resp->getQueryName());
- SocketEndpoint ep;
- checkUseEspOrDaliIP(ep, resp->getDaliServer(), netAddress);
- if (!ep.isNull())
- ep.getUrlStr(daliServer);
- }
- void fetchRemoteWorkunitAndQueryDetails(IClientWsWorkunits *_ws, IEspContext *ctx, const char *netAddress, const char *queryset, const char *query, const char *wuid, StringBuffer &name, StringBuffer &xml, StringBuffer &dllname, MemoryBuffer &dll, StringBuffer &daliServer, Owned<IClientWUQuerySetDetailsResponse> &respQueryInfo)
- {
- Owned<IClientWsWorkunits> ws = ensureWsWorkunitsClient(_ws, ctx, netAddress);
- fetchRemoteWorkunit(ws, ctx, netAddress, queryset, query, wuid, name, xml, dllname, dll, daliServer);
- respQueryInfo.setown(fetchQueryDetails(ws, ctx, netAddress, queryset, query));
- }
- void doWuFileCopy(IClientFileSpray &fs, IEspWULogicalFileCopyInfo &info, const char *logicalname, const char *cluster, bool isRoxie, bool supercopy)
- {
- try
- {
- Owned<IClientCopy> req = fs.createCopyRequest();
- req->setSourceLogicalName(logicalname);
- req->setDestLogicalName(logicalname);
- req->setDestGroup(cluster);
- req->setSuperCopy(supercopy);
- if (isRoxie)
- req->setDestGroupRoxie("Yes");
- Owned<IClientCopyResponse> resp = fs.Copy(req);
- info.setDfuCopyWuid(resp->getResult());
- }
- catch (IException *e)
- {
- StringBuffer msg;
- info.setDfuCopyError(e->errorMessage(msg).str());
- e->Release();
- }
- }
- bool copyWULogicalFiles(IEspContext &context, IConstWorkUnit &cw, const char *cluster, bool copyLocal, IEspWUCopyLogicalClusterFileSections &lfinfo)
- {
- if (isEmpty(cluster))
- throw MakeStringException(ECLWATCH_INVALID_CLUSTER_NAME, "copyWULogicalFiles Cluster parameter not set.");
- Owned<IUserDescriptor> udesc = createUserDescriptor();
- udesc->set(context.queryUserId(), context.queryPassword(), context.querySignature());
- IArrayOf<IEspWULogicalFileCopyInfo> foreign;
- IArrayOf<IEspWULogicalFileCopyInfo> onCluster;
- IArrayOf<IEspWULogicalFileCopyInfo> notOnCluster;
- IArrayOf<IEspWULogicalFileCopyInfo> notFound;
- Owned<IClientFileSpray> fs;
- if (copyLocal)
- {
- fs.setown(createFileSprayClient());
- VStringBuffer url("http://.:%d/FileSpray", 8010);
- fs->addServiceUrl(url.str());
- }
- bool isRoxie = isRoxieProcess(cluster);
- Owned<IConstWUGraphIterator> graphs = &cw.getGraphs(GraphTypeActivities);
- ForEach(*graphs)
- {
- Owned <IPropertyTree> xgmml = graphs->query().getXGMMLTree(false);
- Owned<IPropertyTreeIterator> iter = xgmml->getElements(".//node");
- ForEach(*iter)
- {
- try
- {
- IPropertyTree &node = iter->query();
- ThorActivityKind kind = (ThorActivityKind) node.getPropInt("att[@name='_kind']/@value", TAKnone);
- if(kind==TAKdiskwrite || kind==TAKspillwrite || kind==TAKindexwrite || kind==TAKcsvwrite || kind==TAKxmlwrite || kind==TAKjsonwrite)
- continue;
- if (node.getPropBool("att[@name='_isSpill']/@value") || node.getPropBool("att[@name='_isTransformSpill']/@value"))
- continue;
- Owned<IEspWULogicalFileCopyInfo> info = createWULogicalFileCopyInfo();
- const char *logicalname = node.queryProp("att[@name='_indexFileName']/@value");
- if (logicalname)
- info->setIsIndex(true);
- else
- logicalname = node.queryProp("att[@name='_fileName']/@value");
- info->setLogicalName(logicalname);
- if (logicalname)
- {
- if (!strnicmp("~foreign::", logicalname, 10))
- foreign.append(*info.getClear());
- else
- {
- Owned<IDistributedFile> df = queryDistributedFileDirectory().lookup(logicalname, udesc, false, false, false, nullptr, defaultPrivilegedUser);
- if(!df)
- notFound.append(*info.getClear());
- else if (df->findCluster(cluster)!=NotFound)
- {
- onCluster.append(*info.getClear());
- }
- else
- {
- StringArray clusters;
- df->getClusterNames(clusters);
- info->setClusters(clusters);
- if (copyLocal)
- {
- StringBuffer wuid;
- bool supercopy = queryDistributedFileDirectory().isSuperFile(logicalname, udesc, NULL);
- doWuFileCopy(*fs, *info, logicalname, cluster, isRoxie, supercopy);
- }
- notOnCluster.append(*info.getClear());
- }
- }
- }
- }
- catch(IException *e)
- {
- e->Release();
- }
- }
- lfinfo.setClusterName(cluster);
- lfinfo.setNotOnCluster(notOnCluster);
- lfinfo.setOnCluster(onCluster);
- lfinfo.setForeign(foreign);
- lfinfo.setNotFound(notFound);
- }
- return true;
- }
- void copyWULogicalFilesToTarget(IEspContext &context, IConstWUClusterInfo &clusterInfo, IConstWorkUnit &cw, IArrayOf<IConstWUCopyLogicalClusterFileSections> &clusterfiles, bool doLocalCopy)
- {
- const StringArray &thors = clusterInfo.getThorProcesses();
- ForEachItemIn(i, thors)
- {
- Owned<IEspWUCopyLogicalClusterFileSections> files = createWUCopyLogicalClusterFileSections();
- copyWULogicalFiles(context, cw, thors.item(i), doLocalCopy, *files);
- clusterfiles.append(*files.getClear());
- }
- SCMStringBuffer roxie;
- clusterInfo.getRoxieProcess(roxie);
- if (roxie.length())
- {
- Owned<IEspWUCopyLogicalClusterFileSections> files = createWUCopyLogicalClusterFileSections();
- copyWULogicalFiles(context, cw, roxie.str(), doLocalCopy, *files);
- clusterfiles.append(*files.getClear());
- }
- }
- void QueryFilesInUse::loadTarget(IPropertyTree *t, const char *target, unsigned flags)
- {
- if (!target || !*target)
- return;
- Owned<IConstWUClusterInfo> clusterInfo = getTargetClusterInfo(target);
- if (!clusterInfo || !(clusterInfo->getPlatform() == RoxieCluster))
- return;
- Owned<IPropertyTree> queryRegistry = getQueryRegistry(target, true);
- if (!queryRegistry)
- return;
- SCMStringBuffer process;
- clusterInfo->getRoxieProcess(process);
- if (!process.length())
- return;
- Owned<IHpccPackageSet> ps = createPackageSet(process.str());
- const IHpccPackageMap *pm = (ps) ? ps->queryActiveMap(target) : NULL;
- const char *pmid = (pm) ? pm->queryPackageId() : NULL;
- VStringBuffer xpath("%s/@pmid", target);
- const char *pmidPrev = t->queryProp(xpath);
- if ((flags & UFO_RELOAD_TARGETS_CHANGED_PMID) && (pmid || pmidPrev))
- {
- if (!(pmid && pmidPrev) || !streq(pmid, pmidPrev))
- t->removeProp(target);
- }
- IPropertyTree *targetTree = ensurePTree(t, target);
- if (pm)
- targetTree->setProp("@pmid", pmid);
- if (flags & UFO_REMOVE_QUERIES_NOT_IN_QUERYSET)
- {
- Owned<IPropertyTreeIterator> cachedQueries = targetTree->getElements("Query");
- ForEach(*cachedQueries)
- {
- IPropertyTree &cachedQuery = cachedQueries->query();
- VStringBuffer xpath("Query[@id='%s']", cachedQuery.queryProp("@id"));
- if (!queryRegistry->hasProp(xpath))
- targetTree->removeTree(&cachedQuery);
- }
- }
- Owned<IPropertyTreeIterator> queries = queryRegistry->getElements("Query");
- ForEach(*queries)
- {
- if (aborting)
- return;
- IPropertyTree &query = queries->query();
- const char *queryid = query.queryProp("@id");
- if (!queryid || !*queryid)
- continue;
- const char *wuid = query.queryProp("@wuid");
- if (!wuid || !*wuid)
- continue;
- const char *pkgid=NULL;
- if (pm)
- {
- const IHpccPackage *pkg = pm->matchPackage(queryid);
- if (pkg)
- pkgid = pkg->queryId();
- }
- VStringBuffer xpath("Query[@id='%s']", queryid);
- IPropertyTree *queryTree = targetTree->queryPropTree(xpath);
- if (queryTree)
- {
- const char *cachedPkgid = queryTree->queryProp("@pkgid");
- if (pkgid && *pkgid)
- {
- if (!(flags & UFO_RELOAD_MAPPED_QUERIES) && (cachedPkgid && streq(pkgid, cachedPkgid)))
- continue;
- }
- else if (!cachedPkgid || !*cachedPkgid)
- continue;
- targetTree->removeTree(queryTree);
- queryTree = NULL;
- }
- Owned<IWorkUnitFactory> factory = getWorkUnitFactory();
- Owned<IConstWorkUnit> cw = factory->openWorkUnit(wuid);
- if (!cw)
- continue;
- queryTree = targetTree->addPropTree("Query", createPTree("Query"));
- queryTree->setProp("@target", target); //for reference when searching across targets
- queryTree->setProp("@id", queryid);
- if (pkgid && *pkgid)
- queryTree->setProp("@pkgid", pkgid);
- IUserDescriptor **roxieUser = roxieUserMap.getValue(target);
- Owned<IReferencedFileList> wufiles = createReferencedFileList(roxieUser ? *roxieUser : NULL, true, true);
- wufiles->addFilesFromQuery(cw, pm, queryid);
- if (aborting)
- return;
- wufiles->resolveFiles(process.str(), NULL, NULL, NULL, true, true, false, false);
- Owned<IReferencedFileIterator> files = wufiles->getFiles();
- ForEach(*files)
- {
- if (aborting)
- return;
- IReferencedFile &rf = files->query();
- //if (!(rf.getFlags() & RefSubFile))
- // continue;
- const char *lfn = rf.getLogicalName();
- if (!lfn || !*lfn)
- continue;
- if (!queryTree->hasProp(xpath.setf("File[@lfn='%s']", lfn)))
- {
- IPropertyTree *fileTree = queryTree->addPropTree("File", createPTree("File"));
- fileTree->setProp("@lfn", lfn);
- if (rf.getFlags() & RefFileSuper)
- fileTree->setPropBool("@super", true);
- if (rf.getFlags() & RefFileNotFound)
- fileTree->setPropBool("@notFound", true);
- const char *fpkgid = rf.queryPackageId();
- if (fpkgid && *fpkgid)
- fileTree->setProp("@pkgid", fpkgid);
- if (rf.getFileSize())
- fileTree->setPropInt64("@size", rf.getFileSize());
- if (rf.getNumParts())
- fileTree->setPropInt("@numparts", rf.getNumParts());
- }
- }
- }
- }
- void QueryFilesInUse::loadTargets(IPropertyTree *t, unsigned flags)
- {
- Owned<IStringIterator> targets = getTargetClusters("RoxieCluster", NULL);
- SCMStringBuffer s;
- ForEach(*targets)
- {
- if (aborting)
- return;
- loadTarget(t, targets->str(s).str(), flags);
- }
- }
- IPropertyTreeIterator *QueryFilesInUse::findAllQueriesUsingFile(const char *lfn)
- {
- if (!lfn || !*lfn)
- return NULL;
- Owned<IPropertyTree> t = getTree();
- VStringBuffer xpath("*/Query[File/@lfn='%s']", lfn);
- return t->getElements(xpath);
- }
- IPropertyTreeIterator *QueryFilesInUse::findQueriesUsingFile(const char *target, const char *lfn, StringAttr &pmid)
- {
- if (!lfn || !*lfn)
- return NULL;
- if (!target || !*target)
- return findAllQueriesUsingFile(lfn);
- Owned<IPropertyTree> t = getTree();
- IPropertyTree *targetTree = t->queryPropTree(target);
- if (!targetTree)
- return NULL;
- pmid.set(targetTree->queryProp("@pmid"));
- VStringBuffer xpath("Query[File/@lfn='%s']", lfn);
- return targetTree->getElements(xpath);
- }
- bool CWsWorkunitsEx::onWUCopyLogicalFiles(IEspContext &context, IEspWUCopyLogicalFilesRequest &req, IEspWUCopyLogicalFilesResponse &resp)
- {
- StringBuffer wuid(req.getWuid());
- WsWuHelpers::checkAndTrimWorkunit("WUCopyLogicalFiles", wuid);
- Owned<IWorkUnitFactory> factory = getWorkUnitFactory(context.querySecManager(), context.queryUser());
- Owned<IConstWorkUnit> cw = factory->openWorkUnit(wuid.str());
- if (!cw)
- throw MakeStringException(ECLWATCH_CANNOT_OPEN_WORKUNIT,"Cannot open workunit %s", wuid.str());
- ensureWsWorkunitAccess(context, *cw, SecAccess_Full);
- resp.setWuid(wuid.str());
- StringAttr cluster;
- if (notEmpty(req.getCluster()))
- cluster.set(req.getCluster());
- else
- cluster.set(cw->queryClusterName());
- if (!isValidCluster(cluster))
- throw MakeStringException(ECLWATCH_INVALID_CLUSTER_NAME, "Invalid cluster name: %s", cluster.str());
- Owned <IConstWUClusterInfo> clusterInfo = getTargetClusterInfo(cluster.str());
- IArrayOf<IConstWUCopyLogicalClusterFileSections> clusterfiles;
- PROGLOG("WUCopyLogicalFiles: %s", wuid.str());
- copyWULogicalFilesToTarget(context, *clusterInfo, *cw, clusterfiles, req.getCopyLocal());
- resp.setClusterFiles(clusterfiles);
- return true;
- }
- static inline unsigned remainingMsWait(unsigned wait, unsigned start)
- {
- if (wait==0 || wait==(unsigned)-1)
- return wait;
- unsigned waited = msTick()-start;
- return (wait>waited) ? wait-waited : 0;
- }
- bool reloadCluster(IConstWUClusterInfo *clusterInfo, unsigned wait)
- {
- if (0==wait || !clusterInfo || clusterInfo->getPlatform()!=RoxieCluster)
- return true;
- const SocketEndpointArray &addrs = clusterInfo->getRoxieServers();
- if (addrs.length())
- {
- try
- {
- Owned<IPropertyTree> result = sendRoxieControlAllNodes(addrs.item(0), "<control:reload/>", false, wait);
- const char *status = result->queryProp("Endpoint[1]/Status");
- if (!status || !strieq(status, "ok"))
- return false;
- }
- catch(IMultiException *me)
- {
- StringBuffer err;
- IERRLOG("ERROR control:reloading roxie query info %s", me->errorMessage(err.append(me->errorCode()).append(' ')).str());
- me->Release();
- return false;
- }
- catch(IException *e)
- {
- StringBuffer err;
- IERRLOG("ERROR control:reloading roxie query info %s", e->errorMessage(err.append(e->errorCode()).append(' ')).str());
- e->Release();
- return false;
- }
- }
- return true;
- }
- bool reloadCluster(const char *cluster, unsigned wait)
- {
- Owned <IConstWUClusterInfo> clusterInfo = getTargetClusterInfo(cluster);
- return (clusterInfo) ? reloadCluster(clusterInfo, wait) : true;
- }
- static inline void updateQuerySetting(bool ignore, IPropertyTree *queryTree, const char *xpath, int value)
- {
- if (ignore || !queryTree)
- return;
- if (value!=0)
- queryTree->setPropInt(xpath, value);
- else
- queryTree->removeProp(xpath);
- }
- static inline void updateTimeLimitSetting(IPropertyTree *queryTree, bool valueNotSet, int value, IConstQuerySetQuery *srcInfo=NULL)
- {
- if (valueNotSet && srcInfo && !srcInfo->getTimeLimit_isNull())
- {
- value = srcInfo->getTimeLimit();
- valueNotSet=false;
- }
- updateQuerySetting(valueNotSet, queryTree, "@timeLimit", value);
- }
- static inline void updateWarnTimeLimitSetting(IPropertyTree *queryTree, bool valueNotSet, int value, IConstQuerySetQuery *srcInfo=NULL)
- {
- if (valueNotSet && srcInfo && !srcInfo->getWarnTimeLimit_isNull())
- {
- value = srcInfo->getWarnTimeLimit();
- valueNotSet=false;
- }
- updateQuerySetting(valueNotSet, queryTree, "@warnTimeLimit", value);
- }
- static inline unsigned __int64 memoryLimitUInt64FromString(const char *value)
- {
- if (!value || !*value || !isdigit(*value))
- return 0;
- unsigned __int64 result = (*value - '0');
- const char *s = value+1;
- while (isdigit(*s))
- {
- result = 10 * result + ((*s) - '0');
- s++;
- }
- if (*s)
- {
- const char unit = toupper(*s++);
- if (*s && !strieq("B", s)) //more?
- return 0;
- switch (unit)
- {
- case 'E':
- result <<=60;
- break;
- case 'P':
- result <<=50;
- break;
- case 'T':
- result <<=40;
- break;
- case 'G':
- result <<=30;
- break;
- case 'M':
- result <<=20;
- break;
- case 'K':
- result <<=10;
- break;
- case 'B':
- break;
- default:
- return 0;
- }
- }
- return result;
- }
- const char memUnitAbbrev[] = {'B', 'K', 'M', 'G', 'T', 'P', 'E'};
- #define MAX_MEMUNIT_ABBREV 6
- static inline StringBuffer &memoryLimitStringFromUInt64(StringBuffer &s, unsigned __int64 in)
- {
- if (!in)
- return s;
- unsigned __int64 value = in;
- unsigned char unit = 0;
- while (!(value & 0x3FF) && unit < MAX_MEMUNIT_ABBREV)
- {
- value >>= 10;
- unit++;
- }
- return s.append(value).append(memUnitAbbrev[unit]);
- }
- static inline void updateMemoryLimitSetting(IPropertyTree *queryTree, const char *value, IConstQuerySetQuery *srcInfo=NULL)
- {
- if (!queryTree)
- return;
- if (!value && srcInfo)
- value = srcInfo->getMemoryLimit();
- if (!value)
- return;
- unsigned __int64 limit = memoryLimitUInt64FromString(value);
- if (0==limit)
- queryTree->removeProp("@memoryLimit");
- else
- queryTree->setPropInt64("@memoryLimit", limit);
- }
- enum QueryPriority {
- QueryPriorityNone = -1,
- QueryPriorityLow = 0,
- QueryPriorityHigh = 1,
- QueryPrioritySLA = 2,
- QueryPriorityInvalid = 3
- };
- static inline const char *getQueryPriorityName(int value)
- {
- switch (value)
- {
- case QueryPriorityLow:
- return "LOW";
- case QueryPriorityHigh:
- return "HIGH";
- case QueryPrioritySLA:
- return "SLA";
- case QueryPriorityNone:
- return "NONE";
- }
- return "INVALID";
- }
- static inline void updateQueryPriority(IPropertyTree *queryTree, const char *value, IConstQuerySetQuery *srcInfo=NULL)
- {
- if (!queryTree)
- return;
- if ((!value || !*value) && srcInfo)
- value = srcInfo->getPriority();
- if (!value || !*value)
- return;
- int priority = QueryPriorityInvalid;
- if (strieq("LOW", value))
- priority=QueryPriorityLow;
- else if (strieq("HIGH", value))
- priority=QueryPriorityHigh;
- else if (strieq("SLA", value))
- priority=QueryPrioritySLA;
- else if (strieq("NONE", value))
- priority=QueryPriorityNone;
- switch (priority)
- {
- case QueryPriorityInvalid:
- break;
- case QueryPriorityNone:
- queryTree->removeProp("@priority");
- break;
- default:
- queryTree->setPropInt("@priority", priority);
- break;
- }
- }
- void gatherFileErrors(IReferencedFileList *files, IArrayOf<IConstLogicalFileError> &errors)
- {
- if (!files)
- return;
- Owned<IReferencedFileIterator> it = files->getFiles();
- ForEach(*it)
- {
- IReferencedFile &file = it->query();
- unsigned flags = file.getFlags();
- if (!(flags & (RefFileNotFound | RefFileCopyInfoFailed)))
- continue;
- StringBuffer msg;
- if (flags & RefFileOptional)
- msg.append("OPT ");
- if (flags & RefFileNotFound)
- msg.append("Not Found");
- else
- msg.append("Copy Failed");
- Owned<IEspLogicalFileError> error = createLogicalFileError();
- error->setLogicalName(file.getLogicalName());
- error->setError(msg);
- errors.append(*static_cast<IConstLogicalFileError*>(error.getClear()));
- }
- }
- class QueryFileCopier
- {
- public:
- QueryFileCopier(const char *target_) : target(target_) {}
- void init(IEspContext &context, bool allowForeignFiles)
- {
- files.setown(createReferencedFileList(context.queryUserId(), context.queryPassword(), allowForeignFiles, false));
- clusterInfo.setown(getTargetClusterInfo(target));
- StringBufferAdaptor sba(process);
- if (clusterInfo && clusterInfo->getPlatform()==RoxieCluster)
- clusterInfo->getRoxieProcess(sba);
- if (!process.length())
- return;
- ps.setown(createPackageSet(process.str()));
- if (ps)
- pm = ps->queryActiveMap(target);
- }
- void copy(IConstWorkUnit *cw, unsigned updateFlags)
- {
- StringBuffer queryid;
- if (queryname && *queryname)
- queryname = queryid.append(queryname).append(".0").str(); //prepublish dummy version number to support fuzzy match like queries="myquery.*" in package
- files->addFilesFromQuery(cw, pm, queryname);
- files->resolveFiles(process.str(), remoteIP, remotePrefix, srcCluster, !(updateFlags & (DALI_UPDATEF_REPLACE_FILE | DALI_UPDATEF_CLONE_FROM | DALI_UPDATEF_SUPERFILES)), true, false, true);
- StringBuffer defReplicateFolder;
- getConfigurationDirectory(NULL, "data2", "roxie", process.str(), defReplicateFolder);
- Owned<IDFUhelper> helper = createIDFUhelper();
- files->cloneAllInfo(updateFlags, helper, true, true, clusterInfo->getRoxieRedundancy(), clusterInfo->getChannelsPerNode(), clusterInfo->getRoxieReplicateOffset(), defReplicateFolder);
- }
- void gatherFileErrors(IArrayOf<IConstLogicalFileError> &errors)
- {
- ::gatherFileErrors(files, errors);
- }
- private:
- Owned <IConstWUClusterInfo> clusterInfo;
- Owned<IHpccPackageSet> ps;
- const IHpccPackageMap *pm = nullptr;
- StringAttr target;
- public:
- Owned<IReferencedFileList> files;
- StringBuffer process;
- StringAttr remoteIP;
- StringAttr remotePrefix;
- StringAttr srcCluster;
- StringAttr queryname;
- };
- bool CWsWorkunitsEx::isQuerySuspended(const char* query, IConstWUClusterInfo *clusterInfo, unsigned wait, StringBuffer& errorMessage)
- {
- try
- {
- if (0==wait || !clusterInfo || clusterInfo->getPlatform()!=RoxieCluster)
- return false;
- const SocketEndpointArray &addrs = clusterInfo->getRoxieServers();
- if (addrs.length() < 1)
- return false;
- StringBuffer control;
- control.appendf("<control:queries><Query id='%s'/></control:queries>", query);
- Owned<IPropertyTree> result = sendRoxieControlAllNodes(addrs.item(0), control.str(), false, wait);
- if (!result)
- return false;
- Owned<IPropertyTreeIterator> suspendedQueries = result->getElements("Endpoint/Queries/Query[@suspended='1']");
- if (!suspendedQueries->first())
- return false;
- errorMessage.set(suspendedQueries->query().queryProp("@error"));
- return true;
- }
- catch(IMultiException *me)
- {
- StringBuffer err;
- IERRLOG("ERROR control:queries roxie query info %s", me->errorMessage(err.append(me->errorCode()).append(' ')).str());
- me->Release();
- return false;
- }
- catch(IException *e)
- {
- StringBuffer err;
- IERRLOG("ERROR control:queries roxie query info %s", e->errorMessage(err.append(e->errorCode()).append(' ')).str());
- e->Release();
- return false;
- }
- }
- bool CWsWorkunitsEx::onWUPublishWorkunit(IEspContext &context, IEspWUPublishWorkunitRequest & req, IEspWUPublishWorkunitResponse & resp)
- {
- StringBuffer wuid(req.getWuid());
- WsWuHelpers::checkAndTrimWorkunit("WUPublishWorkunit", wuid);
- Owned<IWorkUnitFactory> factory = getWorkUnitFactory(context.querySecManager(), context.queryUser());
- Owned<IConstWorkUnit> cw = factory->openWorkUnit(wuid.str());
- if (!cw)
- throw MakeStringException(ECLWATCH_CANNOT_OPEN_WORKUNIT,"Cannot find the workunit %s", wuid.str());
- ensureWsWorkunitAccess(context, *cw, SecAccess_Full);
- resp.setWuid(wuid.str());
- StringAttr queryName;
- if (notEmpty(req.getJobName()))
- queryName.set(req.getJobName());
- else
- queryName.set(cw->queryJobName());
- if (!queryName.length())
- throw MakeStringException(ECLWATCH_MISSING_PARAMS, "Query/Job name not defined for publishing workunit %s", wuid.str());
- StringAttr target;
- if (notEmpty(req.getCluster()))
- target.set(req.getCluster());
- else
- target.set(cw->queryClusterName());
- if (!target.length())
- throw MakeStringException(ECLWATCH_MISSING_PARAMS, "Cluster name not defined for publishing workunit %s", wuid.str());
- if (!isValidCluster(target.str()))
- throw MakeStringException(ECLWATCH_INVALID_CLUSTER_NAME, "Invalid cluster name: %s", target.str());
- DBGLOG("%s publishing wuid %s to target %s as query %s", context.queryUserId(), wuid.str(), target.str(), queryName.str());
- StringBuffer daliIP;
- StringBuffer srcCluster;
- StringBuffer srcPrefix;
- splitDerivedDfsLocation(req.getRemoteDali(), srcCluster, daliIP, srcPrefix, req.getSourceProcess(),req.getSourceProcess(), NULL, NULL);
- if (srcCluster.length())
- {
- if (!isProcessCluster(daliIP, srcCluster))
- throw MakeStringException(ECLWATCH_INVALID_CLUSTER_NAME, "Process cluster %s not found on %s DALI", srcCluster.str(), daliIP.length() ? daliIP.str() : "local");
- }
- unsigned updateFlags = 0;
- if (req.getUpdateDfs())
- updateFlags |= (DALI_UPDATEF_SUPERFILES | DALI_UPDATEF_REPLACE_FILE | DALI_UPDATEF_CLONE_FROM);
- if (req.getUpdateCloneFrom())
- updateFlags |= DALI_UPDATEF_CLONE_FROM;
- if (req.getUpdateSuperFiles())
- updateFlags |= DALI_UPDATEF_SUPERFILES;
- if (req.getAppendCluster())
- updateFlags |= DALI_UPDATEF_APPEND_CLUSTER;
- if (!req.getDontCopyFiles())
- {
- QueryFileCopier cpr(target);
- cpr.init(context, req.getAllowForeignFiles());
- cpr.remoteIP.set(daliIP);
- cpr.remotePrefix.set(srcPrefix);
- cpr.srcCluster.set(srcCluster);
- cpr.queryname.set(queryName);
- cpr.copy(cw, updateFlags);
- if (req.getIncludeFileErrors())
- cpr.gatherFileErrors(resp.getFileErrors());
- }
- WorkunitUpdate wu(&cw->lock());
- if (req.getUpdateWorkUnitName() && notEmpty(req.getJobName()))
- wu->setJobName(req.getJobName());
- StringBuffer queryId;
- WUQueryActivationOptions activate = (WUQueryActivationOptions)req.getActivate();
- addQueryToQuerySet(wu, target.str(), queryName.str(), activate, queryId, context.queryUserId());
- if (req.getMemoryLimit() || !req.getTimeLimit_isNull() || !req.getWarnTimeLimit_isNull() || req.getPriority() || req.getComment())
- {
- Owned<IPropertyTree> queryTree = getQueryById(target.str(), queryId, false);
- updateMemoryLimitSetting(queryTree, req.getMemoryLimit());
- updateQuerySetting(req.getTimeLimit_isNull(), queryTree, "@timeLimit", req.getTimeLimit());
- updateQuerySetting(req.getWarnTimeLimit_isNull(), queryTree, "@warnTimeLimit", req.getWarnTimeLimit());
- updateQueryPriority(queryTree, req.getPriority());
- if (req.getComment())
- queryTree->setProp("@comment", req.getComment());
- }
- wu->commit();
- wu.clear();
- if (queryId.length())
- resp.setQueryId(queryId.str());
- resp.setQueryName(queryName.str());
- resp.setQuerySet(target.str());
- Owned <IConstWUClusterInfo> clusterInfo = getTargetClusterInfo(target.str());
- bool reloadFailed = false;
- if (0!=req.getWait() && !req.getNoReload())
- reloadFailed = !reloadCluster(clusterInfo, (unsigned)req.getWait());
-
- resp.setReloadFailed(reloadFailed);
- double version = context.getClientVersion();
- if (version > 1.38)
- {
- StringBuffer errorMessage;
- if (!reloadFailed && !req.getNoReload() && isQuerySuspended(queryName.str(), clusterInfo, (unsigned)req.getWait(), errorMessage))
- {
- resp.setSuspended(true);
- resp.setErrorMessage(errorMessage);
- }
- }
- return true;
- }
- bool CWsWorkunitsEx::onWUQuerysets(IEspContext &context, IEspWUQuerysetsRequest & req, IEspWUQuerysetsResponse & resp)
- {
- IArrayOf<IEspQuerySet> querySets;
- Owned<IStringIterator> targets = getTargetClusters(NULL, NULL);
- SCMStringBuffer target;
- ForEach(*targets)
- {
- Owned<IEspQuerySet> qs = createQuerySet();
- qs->setQuerySetName(targets->str(target).str());
- querySets.append(*qs.getClear());
- }
- resp.setQuerysets(querySets);
- return true;
- }
- void addClusterQueryStates(IPropertyTree* queriesOnCluster, const char *target, const char *id, IArrayOf<IEspClusterQueryState>& clusterStates, double version)
- {
- if (queriesOnCluster)
- queriesOnCluster = queriesOnCluster->queryPropTree("Endpoint[1]/Queries[1]");
- if (!queriesOnCluster)
- return;
- int reporting = queriesOnCluster->getPropInt("@reporting");
- Owned<IEspClusterQueryState> clusterState = createClusterQueryState();
- clusterState->setCluster(target);
- VStringBuffer xpath("Query[@id='%s']", id);
- IPropertyTree *query = queriesOnCluster->queryPropTree(xpath.str());
- if (!query)
- clusterState->setState("Not Found");
- else
- {
- int suspended = query->getPropInt("@suspended");
- const char* error = query->queryProp("@error");
- if (0==suspended)
- clusterState->setState("Available");
- else
- {
- clusterState->setState("Suspended");
- if (suspended<reporting)
- clusterState->setMixedNodeStates(true);
- }
- if (error && *error)
- clusterState->setErrors(error);
- }
- clusterStates.append(*clusterState.getClear());
- }
- void gatherQuerySetQueryDetails(IEspContext &context, IPropertyTree *query, IEspQuerySetQuery *queryInfo, const char *cluster, IPropertyTree *queriesOnCluster)
- {
- queryInfo->setId(query->queryProp("@id"));
- queryInfo->setName(query->queryProp("@name"));
- queryInfo->setDll(query->queryProp("@dll"));
- queryInfo->setWuid(query->queryProp("@wuid"));
- queryInfo->setSuspended(query->getPropBool("@suspended", false));
- if (query->hasProp("@memoryLimit"))
- {
- StringBuffer s;
- memoryLimitStringFromUInt64(s, query->getPropInt64("@memoryLimit"));
- queryInfo->setMemoryLimit(s);
- }
- if (query->hasProp("@timeLimit"))
- queryInfo->setTimeLimit(query->getPropInt("@timeLimit"));
- if (query->hasProp("@warnTimeLimit"))
- queryInfo->setWarnTimeLimit(query->getPropInt("@warnTimeLimit"));
- if (query->hasProp("@priority"))
- queryInfo->setPriority(getQueryPriorityName(query->getPropInt("@priority")));
- if (query->hasProp("@comment"))
- queryInfo->setComment(query->queryProp("@comment"));
- if (query->hasProp("@snapshot"))
- queryInfo->setSnapshot(query->queryProp("@snapshot"));
- double version = context.getClientVersion();
- if (version >= 1.46)
- {
- queryInfo->setPublishedBy(query->queryProp("@publishedBy"));
- queryInfo->setIsLibrary(query->getPropBool("@isLibrary"));
- }
- if (queriesOnCluster)
- {
- IArrayOf<IEspClusterQueryState> clusters;
- addClusterQueryStates(queriesOnCluster, cluster, query->queryProp("@id"), clusters, version);
- queryInfo->setClusters(clusters);
- }
- }
- void gatherQuerySetAliasDetails(IPropertyTree *alias, IEspQuerySetAlias *aliasInfo)
- {
- aliasInfo->setName(alias->queryProp("@name"));
- aliasInfo->setId(alias->queryProp("@id"));
- }
- void retrieveAllQuerysetDetails(IEspContext &context, IPropertyTree *registry, IArrayOf<IEspQuerySetQuery> &queries, IArrayOf<IEspQuerySetAlias> &aliases, const char *cluster=NULL, IPropertyTree *queriesOnCluster=NULL, const char *type=NULL, const char *value=NULL)
- {
- Owned<IPropertyTreeIterator> regQueries = registry->getElements("Query");
- ForEach(*regQueries)
- {
- IPropertyTree &query = regQueries->query();
- Owned<IEspQuerySetQuery> q = createQuerySetQuery();
- gatherQuerySetQueryDetails(context, &query, q, cluster, queriesOnCluster);
- if (isEmpty(cluster) || isEmpty(type) || isEmpty(value) || !strieq(type, "Status"))
- queries.append(*q.getClear());
- else
- {
- IArrayOf<IConstClusterQueryState>& cs = q->getClusters();
- ForEachItemIn(i, cs)
- {
- IConstClusterQueryState& c = cs.item(i);
- if (strieq(c.getCluster(), cluster) && (strieq(value, "All") || strieq(c.getState(), value)))
- {
- queries.append(*q.getClear());
- break;
- }
- }
- }
- }
- Owned<IPropertyTreeIterator> regAliases = registry->getElements("Alias");
- ForEach(*regAliases)
- {
- IPropertyTree &alias = regAliases->query();
- Owned<IEspQuerySetAlias> a = createQuerySetAlias();
- gatherQuerySetAliasDetails(&alias, a);
- aliases.append(*a.getClear());
- }
- }
- void retrieveQuerysetDetailsFromAlias(IEspContext &context, IPropertyTree *registry, const char *name, IArrayOf<IEspQuerySetQuery> &queries, IArrayOf<IEspQuerySetAlias> &aliases, const char *cluster, IPropertyTree *queriesOnCluster)
- {
- StringBuffer xpath;
- xpath.append("Alias[@name='").append(name).append("']");
- Owned<IPropertyTreeIterator> regAliases = registry->getElements(xpath.str());
- if (!regAliases->first())
- {
- UWARNLOG("Alias %s not found", name);
- return;
- }
- ForEach(*regAliases)
- {
- IPropertyTree& alias = regAliases->query();
- Owned<IEspQuerySetAlias> a = createQuerySetAlias();
- gatherQuerySetAliasDetails(&alias, a);
- xpath.clear().append("Query[@id='").append(a->getId()).append("']");
- aliases.append(*a.getClear());
- IPropertyTree *query = registry->queryPropTree(xpath);
- if (!query)
- {
- UWARNLOG("No matching Query %s found for Alias %s", a->getId(), name);
- return;
- }
- Owned<IEspQuerySetQuery> q = createQuerySetQuery();
- gatherQuerySetQueryDetails(context, query, q, cluster, queriesOnCluster);
- queries.append(*q.getClear());
- }
- }
- void retrieveQuerysetDetailsFromQuery(IEspContext &context, IPropertyTree *registry, const char *value, const char *type, IArrayOf<IEspQuerySetQuery> &queries, IArrayOf<IEspQuerySetAlias> &aliases, const char *cluster=NULL, IPropertyTree *queriesOnCluster=NULL)
- {
- if (!strieq(type, "Id") && !strieq(type, "Name"))
- throw MakeStringException(ECLWATCH_INVALID_INPUT, "Unrecognized queryset filter type %s", type);
- StringBuffer attributeName(type);
- StringBuffer xpath;
- xpath.clear().append("Query[@").append(attributeName.toLowerCase()).append("='").append(value).append("']");
- Owned<IPropertyTreeIterator> regQueries = registry->getElements(xpath.str());
- if (!regQueries->first())
- {
- UWARNLOG("No matching Query %s found for %s", value, type);
- return;
- }
- ForEach(*regQueries)
- {
- IPropertyTree& query = regQueries->query();
- Owned<IEspQuerySetQuery> q = createQuerySetQuery();
- gatherQuerySetQueryDetails(context, &query, q, cluster, queriesOnCluster);
- xpath.clear().append("Alias[@id='").append(q->getId()).append("']");
- queries.append(*q.getClear());
- Owned<IPropertyTreeIterator> regAliases = registry->getElements(xpath.str());
- ForEach(*regAliases)
- {
- IPropertyTree &alias = regAliases->query();
- Owned<IEspQuerySetAlias> a = createQuerySetAlias();
- gatherQuerySetAliasDetails(&alias, a);
- aliases.append(*a.getClear());
- }
- }
- }
- void retrieveQuerysetDetails(IEspContext &context, IPropertyTree *registry, const char *type, const char *value, IArrayOf<IEspQuerySetQuery> &queries, IArrayOf<IEspQuerySetAlias> &aliases, const char *cluster=NULL, IPropertyTree *queriesOnCluster=NULL)
- {
- if (strieq(type, "All"))
- return retrieveAllQuerysetDetails(context, registry, queries, aliases, cluster, queriesOnCluster);
- if (!value || !*value)
- return;
- if (strieq(type, "Alias"))
- return retrieveQuerysetDetailsFromAlias(context, registry, value, queries, aliases, cluster, queriesOnCluster);
- if (strieq(type, "Status") && !isEmpty(cluster))
- return retrieveAllQuerysetDetails(context, registry, queries, aliases, cluster, queriesOnCluster, type, value);
- return retrieveQuerysetDetailsFromQuery(context, registry, value, type, queries, aliases, cluster, queriesOnCluster);
- }
- void retrieveQuerysetDetails(IEspContext &context, IArrayOf<IEspWUQuerySetDetail> &details, IPropertyTree *registry, const char *type, const char *value, const char *cluster=NULL, IPropertyTree *queriesOnCluster=NULL)
- {
- if (!registry)
- return;
- IArrayOf<IEspQuerySetQuery> queries;
- IArrayOf<IEspQuerySetAlias> aliases;
- retrieveQuerysetDetails(context, registry, type, value, queries, aliases, cluster, queriesOnCluster);
- Owned<IEspWUQuerySetDetail> detail = createWUQuerySetDetail();
- detail->setQuerySetName(registry->queryProp("@id"));
- detail->setQueries(queries);
- detail->setAliases(aliases);
- details.append(*detail.getClear());
- }
- void retrieveQuerysetDetails(IEspContext &context, IArrayOf<IEspWUQuerySetDetail> &details, const char *queryset, const char *type, const char *value, const char *cluster=NULL, IPropertyTree *queriesOnCluster=NULL)
- {
- if (!queryset || !*queryset)
- return;
- Owned<IPropertyTree> registry = getQueryRegistry(queryset, true);
- if (!registry)
- return;
- retrieveQuerysetDetails(context, details, registry, type, value, cluster, queriesOnCluster);
- }
- IPropertyTree* getQueriesOnCluster(const char *target, const char *queryset, StringArray *queryIDs, bool checkAllNodes)
- {
- if (isEmpty(target))
- target = queryset;
- Owned<IConstWUClusterInfo> info = getTargetClusterInfo(target);
- if (!info)
- throw MakeStringException(ECLWATCH_CANNOT_RESOLVE_CLUSTER_NAME, "Cluster %s not found", target);
- if (queryset && *queryset && !strieq(target, queryset))
- throw MakeStringException(ECLWATCH_QUERYSET_NOT_ON_CLUSTER, "Target %s and QuerySet %s should match", target, queryset);
- if (info->getPlatform()!=RoxieCluster)
- return NULL;
- const SocketEndpointArray &eps = info->getRoxieServers();
- if (!eps.length())
- return NULL;
- try
- {
- StringBuffer control;
- if (!queryIDs || (queryIDs->ordinality() == 0))
- control.append("<control:queries/>");
- else
- {
- control.append("<control:queries>");
- ForEachItemIn(i, *queryIDs)
- control.appendf("<Query id='%s'/>", queryIDs->item(i));
- control.append("</control:queries>");
- }
- Owned<ISocket> sock = ISocket::connect_timeout(eps.item(0), ROXIECONNECTIONTIMEOUT);
- if (checkAllNodes)
- return sendRoxieControlAllNodes(sock, control, false, ROXIECONTROLQUERIESTIMEOUT);
- else
- return sendRoxieControlQuery(sock, control, ROXIECONTROLQUERIESTIMEOUT);
- }
- catch(IException* e)
- {
- StringBuffer err;
- DBGLOG("Get exception in control:queries: %s", e->errorMessage(err.append(e->errorCode()).append(' ')).str());
- e->Release();
- return NULL;
- }
- }
- void retrieveQuerysetDetailsByCluster(IEspContext &context, IArrayOf<IEspWUQuerySetDetail> &details, const char *target, const char *queryset, const char *type, const char *value, bool checkAllNodes)
- {
- Owned<IPropertyTree> queriesOnCluster = getQueriesOnCluster(target, queryset, nullptr, checkAllNodes);
- retrieveQuerysetDetails(context, details, target, type, value, target, queriesOnCluster);
- }
- void retrieveAllQuerysetDetails(IEspContext &context, IArrayOf<IEspWUQuerySetDetail> &details, const char *type, const char *value)
- {
- Owned<IPropertyTree> root = getQueryRegistryRoot();
- if (!root)
- throw MakeStringException(ECLWATCH_QUERYSET_NOT_FOUND, "QuerySet Registry not found");
- Owned<IPropertyTreeIterator> querysets = root->getElements("QuerySet");
- ForEach(*querysets)
- retrieveQuerysetDetails(context, details, &querysets->query(), type, value);
- }
- bool CWsWorkunitsEx::onWUQuerysetDetails(IEspContext &context, IEspWUQuerySetDetailsRequest & req, IEspWUQuerySetDetailsResponse & resp)
- {
- resp.setQuerySetName(req.getQuerySetName());
- double version = context.getClientVersion();
- if (version > 1.36)
- {
- Owned<IPropertyTree> queryRegistry = getQueryRegistry(req.getQuerySetName(), false);
- resp.setClusterName(req.getClusterName());
- resp.setFilter(req.getFilter());
- resp.setFilterType(req.getFilterType());
- }
- Owned<IPropertyTree> registry = getQueryRegistry(req.getQuerySetName(), true);
- if (!registry)
- return false;
- PROGLOG("WUQuerysetDetails for queryset %s", req.getQuerySetName());
- IArrayOf<IEspQuerySetQuery> respQueries;
- IArrayOf<IEspQuerySetAlias> respAliases;
- if (isEmpty(req.getClusterName()) || isEmpty(req.getFilterTypeAsString()) || !strieq(req.getFilterTypeAsString(), "Status") || isEmpty(req.getFilter()))
- {
- const char* cluster = req.getClusterName();
- if (isEmpty(cluster))
- cluster = req.getQuerySetName();
- Owned<IPropertyTree> queriesOnCluster = getQueriesOnCluster(cluster, req.getQuerySetName(), nullptr, req.getCheckAllNodes());
- retrieveQuerysetDetails(context, registry, req.getFilterTypeAsString(), req.getFilter(), respQueries, respAliases, cluster, queriesOnCluster);
- resp.setQuerysetQueries(respQueries);
- resp.setQuerysetAliases(respAliases);
- }
- else
- {
- IArrayOf<IEspWUQuerySetDetail> respDetails;
- retrieveQuerysetDetailsByCluster(context, respDetails, req.getClusterName(), req.getQuerySetName(), req.getFilterTypeAsString(), req.getFilter(), false);
- if (respDetails.ordinality())
- {
- IEspWUQuerySetDetail& detail = respDetails.item(0);
- resp.setQuerysetQueries(detail.getQueries());
- resp.setQuerysetAliases(detail.getAliases());
- }
- }
- return true;
- }
- bool CWsWorkunitsEx::onWUMultiQuerysetDetails(IEspContext &context, IEspWUMultiQuerySetDetailsRequest & req, IEspWUMultiQuerySetDetailsResponse & resp)
- {
- IArrayOf<IEspWUQuerySetDetail> respDetails;
- if (notEmpty(req.getClusterName()))
- {
- PROGLOG("WUMultiQuerysetDetails for cluster %s", req.getClusterName());
- retrieveQuerysetDetailsByCluster(context, respDetails, req.getClusterName(), req.getQuerySetName(), req.getFilterTypeAsString(), req.getFilter(), req.getCheckAllNodes());
- }
- else if (notEmpty(req.getQuerySetName()))
- {
- PROGLOG("WUMultiQuerysetDetails for queryset %s", req.getQuerySetName());
- retrieveQuerysetDetails(context, respDetails, req.getQuerySetName(), req.getFilterTypeAsString(), req.getFilter());
- }
- else
- {
- VStringBuffer logMsg("WUMultiQuerysetDetails: FilterType %s", req.getFilterTypeAsString());
- if (notEmpty(req.getFilter()))
- logMsg.append(", Filter ").append(req.getFilter());
- PROGLOG("%s", logMsg.str());
- retrieveAllQuerysetDetails(context, respDetails, req.getFilterTypeAsString(), req.getFilter());
- }
- resp.setQuerysets(respDetails);
- return true;
- }
- bool addWUQSQueryFilter(WUQuerySortField *filters, unsigned short &count, MemoryBuffer &buff, const char* value, WUQuerySortField name)
- {
- if (isEmpty(value))
- return false;
- filters[count++] = name;
- buff.append(value);
- return true;
- }
- bool addWUQSQueryFilterInt(WUQuerySortField *filters, unsigned short &count, MemoryBuffer &buff, int value, WUQuerySortField name)
- {
- VStringBuffer vBuf("%d", value);
- filters[count++] = name;
- buff.append(vBuf.str());
- return true;
- }
- bool addWUQSQueryFilterInt64(WUQuerySortField *filters, unsigned short &count, MemoryBuffer &buff, __int64 value, WUQuerySortField name)
- {
- VStringBuffer vBuf("%" I64F "d", value);
- filters[count++] = name;
- buff.append(vBuf.str());
- return true;
- }
- unsigned CWsWorkunitsEx::getGraphIdsByQueryId(const char *target, const char *queryId, StringArray& graphIds)
- {
- if (!target || !*target)
- throw MakeStringException(ECLWATCH_MISSING_PARAMS, "Target name required");
- if (!queryId || !*queryId)
- throw MakeStringException(ECLWATCH_MISSING_PARAMS, "Query Id required");
- Owned<IConstWUClusterInfo> info = getTargetClusterInfo(target);
- if (!info || (info->getPlatform()!=RoxieCluster)) //Only roxie query has query graph.
- return 0;
- const SocketEndpointArray &eps = info->getRoxieServers();
- if (eps.empty())
- return 0;
- VStringBuffer xpath("<control:querystats><Query id='%s'/></control:querystats>", queryId);
- Owned<ISocket> sock = ISocket::connect_timeout(eps.item(0), ROXIECONNECTIONTIMEOUT);
- Owned<IPropertyTree> querystats = sendRoxieControlQuery(sock, xpath.str(), ROXIECONTROLQUERYTIMEOUT);
- if (!querystats)
- return 0;
- Owned<IPropertyTreeIterator> graphs = querystats->getElements("Endpoint/Query/Graph");
- ForEach(*graphs)
- {
- IPropertyTree &graph = graphs->query();
- const char* graphId = graph.queryProp("@id");
- if (graphId && *graphId)
- graphIds.appendUniq(graphId);
- }
- return graphIds.length();
- }
- //This method is thread safe because a query belongs to a single queryset. The method may be called by different threads.
- //Since one thread is for one queryset and a query only belongs to a single queryset, it is impossible for different threads
- //to update the same query object.
- void CWsWorkunitsEx::checkAndSetClusterQueryState(IEspContext &context, const char* cluster, const char* querySetId, IArrayOf<IEspQuerySetQuery>& queries, bool checkAllNodes)
- {
- try
- {
- double version = context.getClientVersion();
- if (isEmpty(cluster))
- cluster = querySetId;
- StringArray queryIDs;
- ForEachItemIn(j, queries)
- {
- IEspQuerySetQuery& query = queries.item(j);
- const char* queryId = query.getId();
- const char* querySetId0 = query.getQuerySetId();
- if (queryId && querySetId0 && strieq(querySetId0, querySetId))
- queryIDs.append(queryId);
- }
- if (queryIDs.ordinality() == 0)
- return;
- Owned<IPropertyTree> queriesOnCluster = getQueriesOnCluster(cluster, querySetId, &queryIDs, checkAllNodes);
- if (!queriesOnCluster)
- {
- UWARNLOG("getQueriesOnCluster() returns NULL for cluster<%s> and querySetId<%s>", cluster, querySetId);
- return;
- }
- ForEachItemIn(i, queries)
- {
- IEspQuerySetQuery& query = queries.item(i);
- const char* queryId = query.getId();
- const char* querySetId0 = query.getQuerySetId();
- if (!queryId || !querySetId0 || !strieq(querySetId0, querySetId))
- continue;
- IArrayOf<IEspClusterQueryState> clusters;
- addClusterQueryStates(queriesOnCluster, cluster, queryId, clusters, version);
- query.setClusters(clusters);
- }
- }
- catch(IException *e)
- {
- EXCLOG(e, "CWsWorkunitsEx::checkAndSetClusterQueryState: Failed to read Query State On Cluster");
- e->Release();
- }
- }
- void CWsWorkunitsEx::checkAndSetClusterQueryState(IEspContext &context, const char* cluster, StringArray& querySetIds, IArrayOf<IEspQuerySetQuery>& queries, bool checkAllNodes)
- {
- UnsignedArray threadHandles;
- ForEachItemIn(i, querySetIds)
- {
- const char* querySetId = querySetIds.item(i);
- if(!querySetId || !*querySetId)
- continue;
- Owned<CClusterQueryStateParam> threadReq = new CClusterQueryStateParam(this, context, cluster, querySetId, queries, checkAllNodes);
- PooledThreadHandle handle = clusterQueryStatePool->start( threadReq.getClear() );
- threadHandles.append(handle);
- }
- //block for worker threads to finish, if necessary and then collect results
- //Not use joinAll() because multiple threads may call this method. Each call uses the pool to create
- //its own threads of checking query state. Each call should only join the ones created by that call.
- ForEachItemIn(ii, threadHandles)
- clusterQueryStatePool->join(threadHandles.item(ii));
- }
- bool CWsWorkunitsEx::onWUListQueries(IEspContext &context, IEspWUListQueriesRequest & req, IEspWUListQueriesResponse & resp)
- {
- bool descending = req.getDescending();
- const char *sortBy = req.getSortby();
- WUQuerySortField sortOrder[2] = {WUQSFId, WUQSFterm};
- if(notEmpty(sortBy))
- {
- if (strieq(sortBy, "Name"))
- sortOrder[0] = WUQSFname;
- else if (strieq(sortBy, "WUID"))
- sortOrder[0] = WUQSFwuid;
- else if (strieq(sortBy, "DLL"))
- sortOrder[0] = WUQSFdll;
- else if (strieq(sortBy, "Activated"))
- sortOrder[0] = WUQSFActivited;
- else if (strieq(sortBy, "MemoryLimit"))
- sortOrder[0] = (WUQuerySortField) (WUQSFmemoryLimit | WUQSFnumeric);
- else if (strieq(sortBy, "TimeLimit"))
- sortOrder[0] = (WUQuerySortField) (WUQSFtimeLimit | WUQSFnumeric);
- else if (strieq(sortBy, "WarnTimeLimit"))
- sortOrder[0] = (WUQuerySortField) (WUQSFwarnTimeLimit | WUQSFnumeric);
- else if (strieq(sortBy, "Priority"))
- sortOrder[0] = (WUQuerySortField) (WUQSFpriority | WUQSFnumeric);
- else if (strieq(sortBy, "PublishedBy"))
- sortOrder[0] = WUQSFPublishedBy;
- else if (strieq(sortBy, "QuerySetId"))
- sortOrder[0] = WUQSFQuerySet;
- else
- sortOrder[0] = WUQSFId;
- sortOrder[0] = (WUQuerySortField) (sortOrder[0] | WUQSFnocase);
- if (descending)
- sortOrder[0] = (WUQuerySortField) (sortOrder[0] | WUQSFreverse);
- }
- WUQuerySortField filters[16];
- unsigned short filterCount = 0;
- MemoryBuffer filterBuf;
- const char* clusterReq = req.getClusterName();
- addWUQSQueryFilter(filters, filterCount, filterBuf, req.getQuerySetName(), WUQSFQuerySet);
- addWUQSQueryFilter(filters, filterCount, filterBuf, req.getQueryID(), (WUQuerySortField) (WUQSFId | WUQSFwild | WUSFnocase));
- addWUQSQueryFilter(filters, filterCount, filterBuf, req.getQueryName(), (WUQuerySortField) (WUQSFname | WUQSFwild | WUSFnocase));
- addWUQSQueryFilter(filters, filterCount, filterBuf, req.getWUID(), (WUQuerySortField) (WUQSFwuid | WUSFnocase));
- addWUQSQueryFilter(filters, filterCount, filterBuf, req.getLibraryName(), (WUQuerySortField) (WUQSFLibrary | WUQSFnocase));
- addWUQSQueryFilter(filters, filterCount, filterBuf, req.getPublishedBy(), (WUQuerySortField) (WUQSFPublishedBy | WUQSFwild | WUSFnocase));
- if (!req.getMemoryLimitLow_isNull())
- addWUQSQueryFilterInt64(filters, filterCount, filterBuf, req.getMemoryLimitLow(), (WUQuerySortField) (WUQSFmemoryLimit | WUQSFnumeric));
- if (!req.getMemoryLimitHigh_isNull())
- addWUQSQueryFilterInt64(filters, filterCount, filterBuf, req.getMemoryLimitHigh(), (WUQuerySortField) (WUQSFmemoryLimitHi | WUQSFnumeric));
- if (!req.getTimeLimitLow_isNull())
- addWUQSQueryFilterInt(filters, filterCount, filterBuf, req.getTimeLimitLow(), (WUQuerySortField) (WUQSFtimeLimit | WUQSFnumeric));
- if (!req.getTimeLimitHigh_isNull())
- addWUQSQueryFilterInt(filters, filterCount, filterBuf, req.getTimeLimitHigh(), (WUQuerySortField) (WUQSFtimeLimitHi | WUQSFnumeric));
- if (!req.getWarnTimeLimitLow_isNull())
- addWUQSQueryFilterInt(filters, filterCount, filterBuf, req.getWarnTimeLimitLow(), (WUQuerySortField) (WUQSFwarnTimeLimit | WUQSFnumeric));
- if (!req.getWarnTimeLimitHigh_isNull())
- addWUQSQueryFilterInt(filters, filterCount, filterBuf, req.getWarnTimeLimitHigh(), (WUQuerySortField) (WUQSFwarnTimeLimitHi | WUQSFnumeric));
- if (!req.getPriorityLow_isNull())
- addWUQSQueryFilterInt(filters, filterCount, filterBuf, req.getPriorityLow(), (WUQuerySortField) (WUQSFpriority | WUQSFnumeric));
- if (!req.getPriorityHigh_isNull())
- addWUQSQueryFilterInt(filters, filterCount, filterBuf, req.getPriorityHigh(), (WUQuerySortField) (WUQSFpriorityHi | WUQSFnumeric));
- if (!req.getActivated_isNull())
- addWUQSQueryFilterInt(filters, filterCount, filterBuf, req.getActivated(), (WUQuerySortField) (WUQSFActivited | WUQSFnumeric));
- if (!req.getSuspendedByUser_isNull())
- addWUQSQueryFilterInt(filters, filterCount, filterBuf, req.getSuspendedByUser(), (WUQuerySortField) (WUQSFSuspendedByUser | WUQSFnumeric));
- filters[filterCount] = WUQSFterm;
- unsigned numberOfQueries = 0;
- unsigned pageSize = req.getPageSize();
- unsigned pageStartFrom = req.getPageStartFrom();
- if(pageSize < 1)
- pageSize = 100;
- __int64 cacheHint = 0;
- if (!req.getCacheHint_isNull())
- cacheHint = req.getCacheHint();
- Owned<MapStringTo<bool> > queriesUsingFileMap;
- const char *lfn = req.getFileName();
- if (lfn && *lfn)
- {
- queriesUsingFileMap.setown(new MapStringTo<bool>());
- StringAttr dummy;
- Owned<IPropertyTreeIterator> queriesUsingFile = filesInUse.findQueriesUsingFile(clusterReq, lfn, dummy);
- ForEach (*queriesUsingFile)
- {
- IPropertyTree &queryUsingFile = queriesUsingFile->query();
- const char *queryTarget = queryUsingFile.queryProp("@target");
- const char *queryId = queryUsingFile.queryProp("@id");
- if (queryTarget && *queryTarget && queryId && *queryId)
- {
- VStringBuffer targetQuery("%s/%s", queryTarget, queryId);
- queriesUsingFileMap->setValue(targetQuery, true);
- }
- }
- }
- PROGLOG("WUListQueries: getQuerySetQueriesSorted");
- Owned<IWorkUnitFactory> factory = getWorkUnitFactory(context.querySecManager(), context.queryUser());
- Owned<IConstQuerySetQueryIterator> it = factory->getQuerySetQueriesSorted(sortOrder, filters, filterBuf.bufferBase(), pageStartFrom, pageSize, &cacheHint, &numberOfQueries, queriesUsingFileMap);
- resp.setCacheHint(cacheHint);
- PROGLOG("WUListQueries: getQuerySetQueriesSorted done");
- StringArray querySetIds;
- IArrayOf<IEspQuerySetQuery> queries;
- double version = context.getClientVersion();
- ForEach(*it)
- {
- IPropertyTree &query=it->query();
- const char *queryId = query.queryProp("@id");
- const char *queryTarget = query.queryProp("@querySetId");
- Owned<IEspQuerySetQuery> q = createQuerySetQuery();
- q->setId(queryId);
- q->setQuerySetId(queryTarget);
- q->setName(query.queryProp("@name"));
- q->setDll(query.queryProp("@dll"));
- q->setWuid(query.queryProp("@wuid"));
- q->setActivated(query.getPropBool("@activated", false));
- q->setSuspended(query.getPropBool("@suspended", false));
- if (query.hasProp("@memoryLimit"))
- {
- StringBuffer s;
- memoryLimitStringFromUInt64(s, query.getPropInt64("@memoryLimit"));
- q->setMemoryLimit(s);
- }
- if (query.hasProp("@timeLimit"))
- q->setTimeLimit(query.getPropInt("@timeLimit"));
- if (query.hasProp("@warnTimeLimit"))
- q->setWarnTimeLimit(query.getPropInt("@warnTimeLimit"));
- if (query.hasProp("@priority"))
- q->setPriority(getQueryPriorityName(query.getPropInt("@priority")));
- if (query.hasProp("@comment"))
- q->setComment(query.queryProp("@comment"));
- if (version >= 1.46)
- {
- q->setPublishedBy(query.queryProp("@publishedBy"));
- q->setIsLibrary(query.getPropBool("@isLibrary"));
- }
- if (!querySetIds.contains(queryTarget))
- querySetIds.append(queryTarget);
- queries.append(*q.getClear());
- }
- if (queries.ordinality() > 0)
- checkAndSetClusterQueryState(context, clusterReq, querySetIds, queries, req.getCheckAllNodes());
- resp.setQuerysetQueries(queries);
- resp.setNumberOfQueries(numberOfQueries);
- return true;
- }
- bool CWsWorkunitsEx::onWUListQueriesUsingFile(IEspContext &context, IEspWUListQueriesUsingFileRequest &req, IEspWUListQueriesUsingFileResponse &resp)
- {
- const char *target = req.getTarget();
- const char *process = req.getProcess();
- StringBuffer lfn(req.getFileName());
- resp.setFileName(lfn.toLowerCase());
- resp.setProcess(process);
- if (lfn.isEmpty())
- throw MakeStringException(ECLWATCH_MISSING_PARAMS, "FileName required");
- VStringBuffer logMsg("WUListQueriesUsingFile: %s", lfn.str());
- StringArray targets;
- if (target && *target)
- {
- targets.append(target);
- logMsg.append(", target ").append(target);
- }
- else // if (process && *process)
- {
- SCMStringBuffer targetStr;
- Owned<IStringIterator> targetClusters = getTargetClusters("RoxieCluster", process);
- ForEach(*targetClusters)
- targets.append(targetClusters->str(targetStr).str());
- logMsg.append(", process ").append(process);
- }
- PROGLOG("%s", logMsg.str());
- IArrayOf<IEspTargetQueriesUsingFile> respTargets;
- ForEachItemIn(i, targets)
- {
- target = targets.item(i);
- Owned<IEspTargetQueriesUsingFile> respTarget = createTargetQueriesUsingFile();
- respTarget->setTarget(target);
- StringAttr pmid;
- Owned<IPropertyTreeIterator> queries = filesInUse.findQueriesUsingFile(target, lfn, pmid);
- if (!pmid.isEmpty())
- respTarget->setPackageMap(pmid);
- if (queries)
- {
- IArrayOf<IEspQueryUsingFile> respQueries;
- ForEach(*queries)
- {
- IPropertyTree &query = queries->query();
- Owned<IEspQueryUsingFile> q = createQueryUsingFile();
- q->setId(query.queryProp("@id"));
- VStringBuffer xpath("File[@lfn='%s']/@pkgid", lfn.str());
- if (query.hasProp(xpath))
- q->setPackage(query.queryProp(xpath));
- respQueries.append(*q.getClear());
- }
- respTarget->setQueries(respQueries);
- }
- respTargets.append(*respTarget.getClear());
- }
- resp.setTargets(respTargets);
- return true;
- }
- bool CWsWorkunitsEx::onWUQueryFiles(IEspContext &context, IEspWUQueryFilesRequest &req, IEspWUQueryFilesResponse &resp)
- {
- const char *target = req.getTarget();
- const char *query = req.getQueryId();
- if (!target || !*target)
- throw MakeStringException(ECLWATCH_QUERYSET_NOT_FOUND, "Target not specified");
- if (!isValidCluster(target))
- throw MakeStringException(ECLWATCH_INVALID_CLUSTER_NAME, "Invalid target name: %s", target);
- if (!query || !*query)
- throw MakeStringException(ECLWATCH_QUERYID_NOT_FOUND, "Query not specified");
- Owned<IPropertyTree> registeredQuery = resolveQueryAlias(target, query, true);
- if (!registeredQuery)
- throw MakeStringException(ECLWATCH_QUERYID_NOT_FOUND, "Query not found");
- PROGLOG("WUQueryFiles: target %s, query %s", target, query);
- StringAttr queryid(registeredQuery->queryProp("@id"));
- registeredQuery.clear();
- Owned<IPropertyTree> tree = filesInUse.getTree();
- VStringBuffer xpath("%s/Query[@id='%s']", target, queryid.get());
- IPropertyTree *queryTree = tree->queryPropTree(xpath);
- if (!queryTree)
- throw MakeStringException(ECLWATCH_QUERYID_NOT_FOUND, "Query not found in file cache (%s)", xpath.str());
- IArrayOf<IEspFileUsedByQuery> referencedFiles;
- Owned<IPropertyTreeIterator> files = queryTree->getElements("File");
- ForEach(*files)
- {
- IPropertyTree &file = files->query();
- if (file.getPropBool("@super", 0))
- continue;
- Owned<IEspFileUsedByQuery> respFile = createFileUsedByQuery();
- respFile->setFileName(file.queryProp("@lfn"));
- respFile->setFileSize(file.getPropInt64("@size"));
- respFile->setNumberOfParts(file.getPropInt("@numparts"));
- referencedFiles.append(*respFile.getClear());
- }
- resp.setFiles(referencedFiles);
- return true;
- }
- void copyWorkunitForRecompile(IEspContext &context, IWorkUnitFactory *factory, const char *srcWuid, StringAttr &wuid, StringAttr &jobname)
- {
- Owned<IConstWorkUnit> src(factory->openWorkUnit(srcWuid));
- if (!src)
- throw MakeStringException(ECLWATCH_CANNOT_OPEN_WORKUNIT,"Cannot open workunit %s.", srcWuid);
- WsWuInfo info(context, src);
- StringBuffer archiveText;
- info.getWorkunitArchiveQuery(archiveText); //archive required, fail otherwise
- if (!isArchiveQuery(archiveText))
- throw MakeStringException(ECLWATCH_RESOURCE_NOT_FOUND,"Cannot retrieve workunit ECL archive %s.", srcWuid);
- SCMStringBuffer mainDefinition;
- Owned <IConstWUQuery> query = src->getQuery();
- if (query)
- query->getQueryMainDefinition(mainDefinition);
- NewWsWorkunit wu(factory, context);
- wuid.set(wu->queryWuid());
- wu->setAction(WUActionCompile);
- jobname.set(src->queryJobName());
- if (jobname.length())
- wu->setJobName(jobname);
- wu.setQueryText(archiveText.str());
- if (mainDefinition.length())
- wu.setQueryMain(mainDefinition.str());
- wu->setResultLimit(src->getResultLimit());
- IStringIterator &names = src->getDebugValues();
- ForEach(names)
- {
- SCMStringBuffer name, value;
- names.str(name);
- if (0==strncmp(name.str(), "eclcc", 5))
- wu->setDebugValue(name.str(), src->getDebugValue(name.str(), value).str(), true);
- }
- }
- bool CWsWorkunitsEx::onWURecreateQuery(IEspContext &context, IEspWURecreateQueryRequest &req, IEspWURecreateQueryResponse &resp)
- {
- try
- {
- const char* srcTarget = req.getTarget();
- const char* queryIdOrAlias = req.getQueryId();
- if (!srcTarget || !*srcTarget)
- throw MakeStringException(ECLWATCH_QUERYSET_NOT_FOUND, "Target not specified");
- if (!queryIdOrAlias || !*queryIdOrAlias)
- throw MakeStringException(ECLWATCH_QUERYID_NOT_FOUND, "QueryId not specified");
- const char *target = req.getDestTarget();
- if (isEmptyString(target))
- target = srcTarget;
- Owned<IPropertyTree> queryRegistry = getQueryRegistry(srcTarget, false);
- Owned<IPropertyTree> srcQueryTree = resolveQueryAlias(queryRegistry, queryIdOrAlias);
- if (!srcQueryTree)
- {
- DBGLOG("WURecreateQuery - No matching Query");
- throw MakeStringException(ECLWATCH_QUERYID_NOT_FOUND,"No matching query for given id or alias %s.", queryIdOrAlias);
- }
- resp.setPriority(isEmptyString(req.getPriority()) ? srcQueryTree->queryProp("@priority") : req.getPriority());
- resp.setComment(isEmptyString(req.getComment()) ? srcQueryTree->queryProp("@comment") : req.getComment());
- resp.setMemoryLimit(isEmptyString(req.getMemoryLimit()) ? srcQueryTree->queryProp("@memoryLimit") : req.getMemoryLimit());
- resp.setTimeLimit(req.getTimeLimit_isNull() ? srcQueryTree->getPropInt("@timeLimit") : req.getTimeLimit());
- resp.setWarnTimeLimit(req.getWarnTimeLimit_isNull() ? srcQueryTree->getPropInt("@warnTimeLimit") : req.getWarnTimeLimit());
- StringAttr wuid;
- StringAttr jobname;
- const char* srcQueryId = srcQueryTree->queryProp("@id");
- const char* srcQueryName = srcQueryTree->queryProp("@name");
- const char *srcWuid = srcQueryTree->queryProp("@wuid");
- PROGLOG("WURecreateQuery: QuerySet %s, query %s, wuid %s", srcTarget, srcQueryId, srcWuid);
- ensureWsWorkunitAccess(context, srcWuid, SecAccess_Write);
- Owned<IWorkUnitFactory> factory = getWorkUnitFactory(context.querySecManager(), context.queryUser());
- copyWorkunitForRecompile(context, factory, srcWuid, wuid, jobname);
- resp.setWuid(wuid);
- WsWuHelpers::submitWsWorkunit(context, wuid.str(), target, NULL, 0, true, false, false, NULL, NULL, &req.getDebugValues());
- waitForWorkUnitToCompile(wuid.str(), req.getWait());
- Owned<IConstWorkUnit> cw(factory->openWorkUnit(wuid.str()));
- if (!cw)
- throw MakeStringException(ECLWATCH_CANNOT_OPEN_WORKUNIT,"Cannot open recreated workunit %s.",wuid.str());
- if (jobname.length())
- {
- StringBuffer name;
- origValueChanged(jobname.str(), cw->queryJobName(), name, false);
- if (name.length()) //non generated user specified name, so override #Workunit('name')
- {
- WorkunitUpdate wx(&cw->lock());
- wx->setJobName(name.str());
- }
- }
- PROGLOG("WURecreateQuery generated: %s", wuid.str());
- AuditSystemAccess(context.queryUserId(), true, "Updated %s", wuid.str());
- queryRegistry.clear();
- srcQueryTree.clear();
- if (req.getRepublish())
- {
- if (!req.getDontCopyFiles())
- {
- StringBuffer daliIP;
- StringBuffer srcCluster;
- StringBuffer srcPrefix;
- splitDerivedDfsLocation(req.getRemoteDali(), srcCluster, daliIP, srcPrefix, req.getSourceProcess(),req.getSourceProcess(), NULL, NULL);
- if (srcCluster.length())
- {
- if (!isProcessCluster(daliIP, srcCluster))
- throw MakeStringException(ECLWATCH_INVALID_CLUSTER_NAME, "Process cluster %s not found on %s DALI", srcCluster.str(), daliIP.length() ? daliIP.str() : "local");
- }
- unsigned updateFlags = 0;
- if (req.getUpdateDfs())
- updateFlags |= (DALI_UPDATEF_SUPERFILES | DALI_UPDATEF_REPLACE_FILE | DALI_UPDATEF_CLONE_FROM);
- if (req.getUpdateCloneFrom())
- updateFlags |= DALI_UPDATEF_CLONE_FROM;
- if (req.getUpdateSuperFiles())
- updateFlags |= DALI_UPDATEF_SUPERFILES;
- if (req.getAppendCluster())
- updateFlags |= DALI_UPDATEF_APPEND_CLUSTER;
- QueryFileCopier cpr(target);
- cpr.init(context, req.getAllowForeignFiles());
- cpr.remoteIP.set(daliIP);
- cpr.remotePrefix.set(srcPrefix);
- cpr.srcCluster.set(srcCluster);
- cpr.queryname.set(srcQueryName);
- cpr.copy(cw, updateFlags);
- if (req.getIncludeFileErrors())
- cpr.gatherFileErrors(resp.getFileErrors());
- }
- StringBuffer queryId;
- WorkunitUpdate wu(&cw->lock());
- WUQueryActivationOptions activate = (WUQueryActivationOptions)req.getActivate();
- addQueryToQuerySet(wu, target, srcQueryName, activate, queryId, context.queryUserId());
- {
- Owned<IPropertyTree> queryTree = getQueryById(target, queryId, false);
- if (queryTree)
- {
- queryTree->setProp("@priority", resp.getPriority());
- updateMemoryLimitSetting(queryTree, resp.getMemoryLimit());
- updateQuerySetting(resp.getTimeLimit_isNull(), queryTree, "@timeLimit", resp.getTimeLimit());
- updateQuerySetting(resp.getWarnTimeLimit_isNull(), queryTree, "@warnTimeLimit", resp.getWarnTimeLimit());
- updateQueryPriority(queryTree, resp.getPriority());
- queryTree->setProp("@comment", resp.getComment());
- }
- }
- wu->commit();
- wu.clear();
- PROGLOG("WURecreateQuery published: %s as %s/%s", wuid.str(), target, queryId.str());
- resp.setQuerySet(target);
- resp.setQueryName(srcQueryName);
- resp.setQueryId(queryId.str());
- Owned <IConstWUClusterInfo> clusterInfo = getTargetClusterInfo(target);
- bool reloadFailed = false;
- if (0!=req.getWait() && !req.getNoReload())
- reloadFailed = !reloadCluster(clusterInfo, (unsigned)req.getWait());
- resp.setReloadFailed(reloadFailed);
- StringBuffer errorMessage;
- if (!reloadFailed && !req.getNoReload() && isQuerySuspended(queryId.str(), clusterInfo, (unsigned)req.getWait(), errorMessage))
- {
- resp.setSuspended(true);
- resp.setErrorMessage(errorMessage);
- }
- }
- }
- catch(IException* e)
- {
- FORWARDEXCEPTION(context, e, ECLWATCH_INTERNAL_ERROR);
- }
- return true;
- }
- bool CWsWorkunitsEx::onWUQueryDetails(IEspContext &context, IEspWUQueryDetailsRequest & req, IEspWUQueryDetailsResponse & resp)
- {
- const char* querySet = req.getQuerySet();
- const char* queryIdOrAlias = req.getQueryId();
- bool includeStateOnClusters = req.getIncludeStateOnClusters();
- if (!querySet || !*querySet)
- throw MakeStringException(ECLWATCH_QUERYSET_NOT_FOUND, "QuerySet not specified");
- if (!queryIdOrAlias || !*queryIdOrAlias)
- throw MakeStringException(ECLWATCH_QUERYID_NOT_FOUND, "QueryId not specified");
- Owned<IPropertyTree> queryRegistry = getQueryRegistry(querySet, false);
- Owned<IPropertyTree> query = resolveQueryAlias(queryRegistry, queryIdOrAlias);
- if (!query)
- {
- DBGLOG("No matching Query");
- throw MakeStringException(ECLWATCH_QUERYID_NOT_FOUND,"No matching query for given id or alias %s.", queryIdOrAlias);
- }
- const char* queryId = query->queryProp("@id");
- resp.setQueryId(queryId);
- resp.setQuerySet(querySet);
- PROGLOG("WUQueryDetails: QuerySet %s, query %s", querySet, queryId);
- const char* queryName = query->queryProp("@name");
- const char* wuid = query->queryProp("@wuid");
- resp.setQueryName(queryName);
- resp.setWuid(wuid);
- resp.setDll(query->queryProp("@dll"));
- resp.setPublishedBy(query->queryProp("@publishedBy"));
- resp.setSuspended(query->getPropBool("@suspended", false));
- resp.setSuspendedBy(query->queryProp("@suspendedBy"));
- resp.setComment(query->queryProp("@comment"));
- double version = context.getClientVersion();
- if (version >= 1.46)
- {
- Owned<IWorkUnitFactory> factory = getWorkUnitFactory(context.querySecManager(), context.queryUser());
- Owned<IConstWorkUnit> cw = factory->openWorkUnit(wuid);
- if(!cw)
- throw MakeStringException(ECLWATCH_CANNOT_UPDATE_WORKUNIT,"Cannot open workunit %s.",wuid);
- ensureWsWorkunitAccess(context, *cw, SecAccess_Read);
- if (query->hasProp("@priority"))
- resp.setPriority(getQueryPriorityName(query->getPropInt("@priority")));
- resp.setIsLibrary(query->getPropBool("@isLibrary"));
- SCMStringBuffer s;
- resp.setWUSnapShot(cw->getSnapshot(s).str()); //Label
- stat_type whenCompiled;
- if (cw->getStatistic(whenCompiled, "", StWhenCompiled))
- {
- formatStatistic(s.s.clear(), whenCompiled, StWhenCompiled);
- resp.setCompileTime(s.str());
- }
- StringArray libUsed, graphIds;
- Owned<IConstWULibraryIterator> libs = &cw->getLibraries();
- ForEach(*libs)
- libUsed.append(libs->query().getName(s).str());
- if (libUsed.length())
- resp.setLibrariesUsed(libUsed);
- if (version < 1.64)
- {
- unsigned numGraphIds = getGraphIdsByQueryId(querySet, queryId, graphIds);
- resp.setCountGraphs(numGraphIds);
- if (numGraphIds > 0)
- resp.setGraphIds(graphIds);
- }
- }
- StringArray logicalFiles;
- IArrayOf<IEspQuerySuperFile> superFiles;
- getQueryFiles(context, wuid, queryId, querySet, logicalFiles, req.getIncludeSuperFiles() ? &superFiles : NULL);
- if (logicalFiles.length())
- resp.setLogicalFiles(logicalFiles);
- if (superFiles.length())
- resp.setSuperFiles(superFiles);
- if (version >= 1.42)
- {
- VStringBuffer xpath("Alias[@id='%s']", queryId);
- IPropertyTree *alias = queryRegistry->queryPropTree(xpath.str());
- if (!alias)
- resp.setActivated(false);
- else
- resp.setActivated(true);
- }
- if (includeStateOnClusters && (version >= 1.43))
- {
- StringArray queryIds;
- queryIds.append(queryId);
- Owned<IPropertyTree> queriesOnCluster = getQueriesOnCluster(querySet, querySet, &queryIds, req.getCheckAllNodes());
- if (queriesOnCluster)
- {
- IArrayOf<IEspClusterQueryState> clusterStates;
- addClusterQueryStates(queriesOnCluster, querySet, queryId, clusterStates, version);
- resp.setClusters(clusterStates);
- }
- }
- if (version >= 1.50)
- {
- WsWuInfo winfo(context, wuid);
- resp.setResourceURLCount(winfo.getResourceURLCount());
- if (version >= 1.64)
- {
- IArrayOf<IEspECLTimer> timers;
- winfo.doGetTimers(timers); //Graph Duration
- if (timers.length())
- resp.setWUTimers(timers);
- IArrayOf<IEspECLGraph> graphs;
- winfo.doGetGraphs(graphs); //Graph Name, Label, Started, Finished, Type
- unsigned numGraphIds = graphs.length();
- resp.setCountGraphs(numGraphIds);
- if (numGraphIds > 0)
- resp.setWUGraphs(graphs);
- }
- }
- if (req.getIncludeWsEclAddresses())
- {
- StringArray wseclAddresses;
- Owned<IEnvironmentFactory> factory = getEnvironmentFactory(true);
- Owned<IConstEnvironment> env = factory->openEnvironment();
- Owned<IPropertyTree> root = &env->getPTree();
- Owned<IPropertyTreeIterator> services = root->getElements("Software/EspService[Properties/@type='ws_ecl']");
- StringArray serviceNames;
- VStringBuffer xpath("Target[@name='%s']", querySet);
- ForEach(*services)
- {
- IPropertyTree &service = services->query();
- if (!service.hasProp("Target") || service.hasProp(xpath))
- serviceNames.append(service.queryProp("@name"));
- }
- Owned<IPropertyTreeIterator> processes = root->getElements("Software/EspProcess");
- ForEach(*processes)
- {
- StringArray netAddrs;
- IPropertyTree &process = processes->query();
- Owned<IPropertyTreeIterator> instances = process.getElements("Instance");
- ForEach(*instances)
- {
- IPropertyTree &instance = instances->query();
- const char *netAddr = instance.queryProp("@netAddress");
- if (!netAddr || !*netAddr)
- continue;
- if (streq(netAddr, "."))
- netAddrs.appendUniq(envLocalAddress); //not necessarily local to this server
- else
- netAddrs.appendUniq(netAddr);
- }
- Owned<IPropertyTreeIterator> bindings = process.getElements("EspBinding");
- ForEach(*bindings)
- {
- IPropertyTree &binding = bindings->query();
- const char *srvName = binding.queryProp("@service");
- if (!serviceNames.contains(srvName))
- continue;
- const char *port = binding.queryProp("@port"); //should always be an integer, but we're just concatenating strings
- if (!port || !*port)
- continue;
- ForEachItemIn(i, netAddrs)
- {
- VStringBuffer wseclAddr("%s:%s", netAddrs.item(i), port);
- wseclAddresses.append(wseclAddr);
- }
- }
- }
- resp.setWsEclAddresses(wseclAddresses);
- }
- return true;
- }
- int EspQuerySuperFileCompareFunc(IInterface * const *i1, IInterface * const *i2)
- {
- if (!i1 || !*i1 || !i2 || !*i2)
- return 0;
- IEspQuerySuperFile *sf1 = QUERYINTERFACE(*i1, IEspQuerySuperFile);
- IEspQuerySuperFile *sf2 = QUERYINTERFACE(*i2, IEspQuerySuperFile);
- if (!sf1 || !sf2)
- return 0;
- const char *name1 = sf1->getName();
- const char *name2 = sf2->getName();
- if (!name1 || !name2)
- return 0;
- return strcmp(name1, name2);
- }
- IReferencedFile* CWsWorkunitsEx::getReferencedFileByName(const char* name, IReferencedFileList* wufiles)
- {
- Owned<IReferencedFileIterator> refFileItr = wufiles->getFiles();
- ForEach(*refFileItr)
- {
- IReferencedFile& rf = refFileItr->query();
- const char* lfn = rf.getLogicalName();
- if (lfn && strieq(lfn, name))
- return &rf;
- }
- return NULL;
- }
- void CWsWorkunitsEx::readSuperFiles(IEspContext &context, IReferencedFile* rf, const char* fileName, IReferencedFileList* wufiles, IArrayOf<IEspQuerySuperFile>* files)
- {
- double version = context.getClientVersion();
- StringArray subFiles;
- IArrayOf<IEspQuerySuperFile> superFiles;
- const StringArray& subFileNames = rf->getSubFileNames();
- ForEachItemIn(i, subFileNames)
- {
- const char* name = subFileNames.item(i);
- if (!name || !*name)
- continue;
- IReferencedFile* pRF = getReferencedFileByName(name, wufiles);
- if (!pRF)
- continue;
- if (!(pRF->getFlags() & RefFileSuper))
- {
- subFiles.append(name);
- }
- else if (version >= 1.57)
- {
- readSuperFiles(context, pRF, name, wufiles, &superFiles);
- }
- }
- Owned<IEspQuerySuperFile> newSuperFile = createQuerySuperFile();
- newSuperFile->setName(fileName);
- if (subFiles.length())
- {
- subFiles.sortAscii();
- newSuperFile->setSubFiles(subFiles);
- }
- if ((version >= 1.57) && superFiles.length())
- {
- superFiles.sort(EspQuerySuperFileCompareFunc);
- newSuperFile->setSuperFiles(superFiles);
- }
- files->append(*newSuperFile.getClear());
- }
- bool CWsWorkunitsEx::getQueryFiles(IEspContext &context, const char* wuid, const char* query, const char* target, StringArray& logicalFiles, IArrayOf<IEspQuerySuperFile> *respSuperFiles)
- {
- try
- {
- Owned<IConstWUClusterInfo> info = getTargetClusterInfo(target);
- if (!info || (info->getPlatform()!=RoxieCluster))
- return false;
- SCMStringBuffer process;
- info->getRoxieProcess(process);
- if (!process.length())
- return false;
- Owned<IWorkUnitFactory> factory = getWorkUnitFactory();
- Owned<IConstWorkUnit> cw = factory->openWorkUnit(wuid);
- if (!cw)
- return false;
- StringArray superFileNames;
- Owned<IHpccPackageSet> ps = createPackageSet(process.str());
- Owned<IReferencedFileList> wufiles = createReferencedFileList(context.queryUserId(),
- context.queryPassword(), true, true);
- wufiles->addFilesFromQuery(cw, (ps) ? ps->queryActiveMap(target) : NULL, query);
- wufiles->resolveFiles(process.str(), NULL, NULL, NULL, true, true, true, true);
- Owned<IReferencedFileIterator> refFileItr = wufiles->getFiles();
- ForEach(*refFileItr)
- {
- IReferencedFile &rf = refFileItr->query();
- const char *lfn = rf.getLogicalName();
- if (lfn && *lfn)
- {
- logicalFiles.append(lfn);
- if (respSuperFiles && (rf.getFlags() & RefFileSuper))
- readSuperFiles(context, &rf, lfn, wufiles, respSuperFiles);
- }
- }
- logicalFiles.sortAscii();
- if (respSuperFiles)
- respSuperFiles->sort(EspQuerySuperFileCompareFunc);
- return true;
- }
- catch(IMultiException *me)
- {
- StringBuffer err;
- IERRLOG("ERROR control:getQueryXrefInfo roxie query info %s", me->errorMessage(err.append(me->errorCode()).append(' ')).str());
- me->Release();
- return false;
- }
- catch(IException *e)
- {
- StringBuffer err;
- IERRLOG("ERROR control:getQueryXrefInfo roxie query info %s", e->errorMessage(err.append(e->errorCode()).append(' ')).str());
- e->Release();
- return false;
- }
- }
- inline void verifyQueryActionAllowsWild(bool &allowWildChecked, CQuerySetQueryActionTypes action)
- {
- if (allowWildChecked)
- return;
- switch (action)
- {
- case CQuerySetQueryActionTypes_ToggleSuspend:
- throw MakeStringException(ECLWATCH_INVALID_ACTION, "Wildcards not supported for toggling suspended state");
- case CQuerySetQueryActionTypes_Activate:
- throw MakeStringException(ECLWATCH_INVALID_ACTION, "Wildcards not supported for Activating queries");
- }
- allowWildChecked=true;
- }
- inline bool verifyQueryActionAllowsAlias(CQuerySetQueryActionTypes action)
- {
- return (action!=CQuerySetQueryActionTypes_Activate && action!=CQuerySetQueryActionTypes_RemoveAllAliases);
- }
- void expandQueryActionTargetList(IProperties *queryIds, IPropertyTree *queryset, IArrayOf<IConstQuerySetQueryActionItem> &items, CQuerySetQueryActionTypes action)
- {
- bool allowWildChecked=false;
- ForEachItemIn(i, items)
- {
- const char *itemId = items.item(i).getQueryId();
- if (!isWildString(itemId))
- {
- bool suspendedByUser = false;
- const char *itemSuspendState = items.item(i).getClientState().getSuspended();
- if (itemSuspendState && (strieq(itemSuspendState, "By User") || strieq(itemSuspendState, "1")))
- suspendedByUser = true;
- if (!verifyQueryActionAllowsAlias(action))
- queryIds->setProp(itemId, suspendedByUser);
- else
- {
- Owned<IPropertyTree> query = resolveQueryAlias(queryset, itemId);
- if (query)
- {
- const char *id = query->queryProp("@id");
- if (id && *id)
- queryIds->setProp(id, suspendedByUser);
- }
- }
- }
- else
- {
- verifyQueryActionAllowsWild(allowWildChecked, action);
- if (verifyQueryActionAllowsAlias(action))
- {
- Owned<IPropertyTreeIterator> active = queryset->getElements("Alias");
- ForEach(*active)
- {
- const char *name = active->query().queryProp("@name");
- const char *id = active->query().queryProp("@id");
- if (name && id && WildMatch(name, itemId))
- queryIds->setProp(id, 0);
- }
- }
- Owned<IPropertyTreeIterator> queries = queryset->getElements("Query");
- ForEach(*queries)
- {
- const char *id = queries->query().queryProp("@id");
- if (id && WildMatch(id, itemId))
- queryIds->setProp(id, 0);
- }
- }
- }
- }
- void expandQueryActionTargetList(IProperties *queryIds, IPropertyTree *queryset, const char *id, CQuerySetQueryActionTypes action)
- {
- IArrayOf<IConstQuerySetQueryActionItem> items;
- Owned<IEspQuerySetQueryActionItem> item = createQuerySetQueryActionItem();
- item->setQueryId(id);
- items.append(*(IConstQuerySetQueryActionItem*)item.getClear());
- expandQueryActionTargetList(queryIds, queryset, items, action);
- }
- bool CWsWorkunitsEx::onWUQueryConfig(IEspContext &context, IEspWUQueryConfigRequest & req, IEspWUQueryConfigResponse & resp)
- {
- StringAttr target(req.getTarget());
- if (target.isEmpty())
- throw MakeStringException(ECLWATCH_MISSING_PARAMS, "Target name required");
- if (!isValidCluster(target))
- throw MakeStringException(ECLWATCH_INVALID_CLUSTER_NAME, "Invalid target name: %s", target.get());
- Owned<IPropertyTree> queryset = getQueryRegistry(target.get(), false);
- if (!queryset)
- throw MakeStringException(ECLWATCH_QUERYSET_NOT_FOUND, "Target Queryset %s not found", req.getTarget());
- PROGLOG("WUQueryConfig: target %s", target.get());
- Owned<IProperties> queryIds = createProperties();
- expandQueryActionTargetList(queryIds, queryset, req.getQueryId(), QuerySetQueryActionTypes_Undefined);
- IArrayOf<IEspWUQueryConfigResult> results;
- Owned<IPropertyIterator> it = queryIds->getIterator();
- ForEach(*it)
- {
- Owned<IEspWUQueryConfigResult> result = createWUQueryConfigResult();
- result->setQueryId(it->getPropKey());
- VStringBuffer xpath("Query[@id='%s']", it->getPropKey());
- IPropertyTree *queryTree = queryset->queryPropTree(xpath);
- if (queryTree)
- {
- updateMemoryLimitSetting(queryTree, req.getMemoryLimit());
- updateQueryPriority(queryTree, req.getPriority());
- updateQuerySetting(req.getTimeLimit_isNull(), queryTree, "@timeLimit", req.getTimeLimit());
- updateQuerySetting(req.getWarnTimeLimit_isNull(), queryTree, "@warnTimeLimit", req.getWarnTimeLimit());
- if (req.getComment())
- queryTree->setProp("@comment", req.getComment());
- }
- results.append(*result.getClear());
- }
- resp.setResults(results);
- bool reloadFailed = false;
- if (0!=req.getWait() && !req.getNoReload())
- reloadFailed = !reloadCluster(target.get(), (unsigned)req.getWait());
- resp.setReloadFailed(reloadFailed);
- return true;
- }
- bool CWsWorkunitsEx::onWUQuerysetQueryAction(IEspContext &context, IEspWUQuerySetQueryActionRequest & req, IEspWUQuerySetQueryActionResponse & resp)
- {
- resp.setQuerySetName(req.getQuerySetName());
- resp.setAction(req.getAction());
- if (isEmpty(req.getQuerySetName()))
- throw MakeStringException(ECLWATCH_MISSING_PARAMS, "Queryset name required");
- Owned<IPropertyTree> queryset = getQueryRegistry(req.getQuerySetName(), true);
- if (!queryset)
- throw MakeStringException(ECLWATCH_QUERYSET_NOT_FOUND, "Queryset %s not found", req.getQuerySetName());
- Owned<IProperties> queryIds = createProperties();
- expandQueryActionTargetList(queryIds, queryset, req.getQueries(), req.getAction());
- if (req.getAction() == CQuerySetQueryActionTypes_ResetQueryStats)
- return resetQueryStats(context, req.getQuerySetName(), queryIds, resp);
- IArrayOf<IEspQuerySetQueryActionResult> results;
- Owned<IPropertyIterator> it = queryIds->getIterator();
- ForEach(*it)
- {
- const char *id = it->getPropKey();
- Owned<IEspQuerySetQueryActionResult> result = createQuerySetQueryActionResult();
- result->setQueryId(id);
- try
- {
- Owned<IPropertyTree> query = getQueryById(queryset, id);
- if (!query)
- throw MakeStringException(ECLWATCH_QUERYID_NOT_FOUND, "Query %s/%s not found.", req.getQuerySetName(), id);
- CQuerySetQueryActionTypes action = req.getAction();
- const char* strAction = (action > -1) && (action < NumOfQuerySetQueryActionTypes) ? QuerySetQueryActionTypes[action] : "Undefined";
- PROGLOG("%s: queryset %s, query %s", strAction, req.getQuerySetName(), id);
- switch (action)
- {
- case CQuerySetQueryActionTypes_ToggleSuspend:
- setQuerySuspendedState(queryset, id, !queryIds->getPropBool(id), context.queryUserId());
- break;
- case CQuerySetQueryActionTypes_Suspend:
- setQuerySuspendedState(queryset, id, true, context.queryUserId());
- break;
- case CQuerySetQueryActionTypes_Unsuspend:
- setQuerySuspendedState(queryset, id, false, NULL);
- break;
- case CQuerySetQueryActionTypes_Activate:
- setQueryAlias(queryset, query->queryProp("@name"), id);
- break;
- case CQuerySetQueryActionTypes_Delete:
- removeNamedQuery(queryset, id);
- query.clear();
- break;
- case CQuerySetQueryActionTypes_RemoveAllAliases:
- removeAliasesFromNamedQuery(queryset, id);
- break;
- }
- result->setSuccess(true);
- if (query)
- result->setSuspended(query->getPropBool("@suspended"));
- }
- catch(IException *e)
- {
- StringBuffer msg;
- result->setMessage(e->errorMessage(msg).str());
- result->setCode(e->errorCode());
- result->setSuccess(false);
- e->Release();
- }
- results.append(*result.getClear());
- }
- resp.setResults(results);
- return true;
- }
- bool CWsWorkunitsEx::onWUQuerysetAliasAction(IEspContext &context, IEspWUQuerySetAliasActionRequest &req, IEspWUQuerySetAliasActionResponse &resp)
- {
- resp.setQuerySetName(req.getQuerySetName());
- resp.setAction(req.getAction());
- if (isEmpty(req.getQuerySetName()))
- throw MakeStringException(ECLWATCH_MISSING_PARAMS, "Queryset name required");
- Owned<IPropertyTree> queryset = getQueryRegistry(req.getQuerySetName(), true);
- if (!queryset)
- throw MakeStringException(ECLWATCH_QUERYSET_NOT_FOUND, "Queryset %s not found", req.getQuerySetName());
- IArrayOf<IEspQuerySetAliasActionResult> results;
- ForEachItemIn(i, req.getAliases())
- {
- IConstQuerySetAliasActionItem& item=req.getAliases().item(i);
- Owned<IEspQuerySetAliasActionResult> result = createQuerySetAliasActionResult();
- try
- {
- VStringBuffer xpath("Alias[@name='%s']", item.getName());
- IPropertyTree *alias = queryset->queryPropTree(xpath.str());
- if (!alias)
- throw MakeStringException(ECLWATCH_ALIAS_NOT_FOUND, "Alias %s/%s not found.", req.getQuerySetName(), item.getName());
- CQuerySetAliasActionTypes action = req.getAction();
- const char* strAction = (action > -1) && (action < NumOfQuerySetAliasActionTypes) ? QuerySetAliasActionTypes[action] : "Undefined";
- PROGLOG("%s: queryset %s, alias %s", strAction, req.getQuerySetName(), item.getName());
- switch (action)
- {
- case CQuerySetAliasActionTypes_Deactivate:
- removeQuerySetAlias(req.getQuerySetName(), item.getName());
- break;
- }
- result->setSuccess(true);
- }
- catch(IException *e)
- {
- StringBuffer msg;
- result->setMessage(e->errorMessage(msg).str());
- result->setCode(e->errorCode());
- result->setSuccess(false);
- e->Release();
- }
- results.append(*result.getClear());
- }
- resp.setResults(results);
- return true;
- }
- #define QUERYPATH_SEP_CHAR '/'
- bool nextQueryPathNode(const char *&path, StringBuffer &node)
- {
- if (*path==QUERYPATH_SEP_CHAR)
- path++;
- while (*path && *path!=QUERYPATH_SEP_CHAR)
- node.append(*path++);
- return (*path && *++path);
- }
- bool splitQueryPath(const char *path, StringBuffer &netAddress, StringBuffer &queryset, StringBuffer *query)
- {
- if (!path || !*path)
- return false;
- if (*path==QUERYPATH_SEP_CHAR && path[1]==QUERYPATH_SEP_CHAR)
- {
- path+=2;
- if (!nextQueryPathNode(path, netAddress))
- return false;
- }
- if (!nextQueryPathNode(path, queryset))
- return (query==NULL);
- if (!query)
- return false;
- if (nextQueryPathNode(path, *query))
- return false; //query path too deep
- return true;
- }
- IPropertyTree *fetchRemoteQuerySetInfo(IEspContext *context, const char *srcAddress, const char *srcTarget)
- {
- if (!srcAddress || !*srcAddress || !srcTarget || !*srcTarget)
- return NULL;
- VStringBuffer url("http://%s%s/WsWorkunits/WUQuerysetDetails.xml?ver_=1.51&QuerySetName=%s&FilterType=All", srcAddress, (!strchr(srcAddress, ':')) ? ":8010" : "", srcTarget);
- Owned<IHttpClientContext> httpCtx = getHttpClientContext();
- Owned<IHttpClient> httpclient = httpCtx->createHttpClient(NULL, url);
- const char *user = context->queryUserId();
- if (user && *user)
- httpclient->setUserID(user);
- const char *pw = context->queryPassword();
- if (pw && *pw)
- httpclient->setPassword(pw);
- StringBuffer request; //empty
- StringBuffer response;
- StringBuffer status;
- if (0 > httpclient->sendRequest("GET", NULL, request, response, status) || !response.length() || strncmp("200", status, 3))
- throw MakeStringException(ECLWATCH_QUERYSET_NOT_FOUND, "Error fetching remote queryset information: %s %s %s", srcAddress, srcTarget, status.str());
- return createPTreeFromXMLString(response);
- }
- class QueryCloner
- {
- public:
- QueryCloner(IEspContext *_context, const char *address, const char *source, const char *_target) :
- context(_context), target(_target), srcAddress(address)
- {
- if (srcAddress.length())
- srcQuerySet.setown(fetchRemoteQuerySetInfo(context, srcAddress, source));
- else
- srcQuerySet.setown(getQueryRegistry(source, true));
- if (!srcQuerySet)
- throw MakeStringException(ECLWATCH_QUERYSET_NOT_FOUND, "Source Queryset %s %s not found", srcAddress.str(), source);
- destQuerySet.setown(getQueryRegistry(target, false));
- if (!destQuerySet) // getQueryRegistry should have created if not found
- throw MakeStringException(ECLWATCH_QUERYSET_NOT_FOUND, "Destination Queryset %s could not be created, or found", target.str());
- factory.setown(getWorkUnitFactory(context->querySecManager(), context->queryUser()));
- }
- QueryCloner(IEspContext *_context, IPropertyTree *srcTree, const char *_target) :
- context(_context), target(_target)
- {
- srcQuerySet.set(srcTree);
- destQuerySet.setown(getQueryRegistry(target, false));
- if (!destQuerySet) // getQueryRegistry should have created if not found
- throw MakeStringException(ECLWATCH_QUERYSET_NOT_FOUND, "Destination Queryset %s could not be created, or found", target.str());
- factory.setown(getWorkUnitFactory(context->querySecManager(), context->queryUser()));
- }
- void setQueryDirectory(const char *dir)
- {
- queryDirectory.set(dir);
- }
- void cloneQueryRemote(IPropertyTree *query, bool makeActive)
- {
- StringBuffer wuid(query->queryProp("Wuid"));
- if (!wuid.length())
- return;
- const char *queryName = query->queryProp("Name");
- if (!queryName || !*queryName)
- return;
- StringBuffer xml;
- MemoryBuffer dll;
- StringBuffer dllname;
- StringBuffer fetchedName;
- StringBuffer remoteDfs;
- fetchRemoteWorkunit(NULL, context, srcAddress.str(), NULL, NULL, wuid, fetchedName, xml, dllname, dll, remoteDfs);
- deploySharedObject(*context, wuid, dllname, target, queryName, dll, queryDirectory, xml.str());
- SCMStringBuffer existingQueryId;
- queryIdFromQuerySetWuid(destQuerySet, wuid, queryName, existingQueryId);
- if (existingQueryId.length())
- {
- existingQueryIds.append(existingQueryId.str());
- if (makeActive)
- activateQuery(destQuerySet, ACTIVATE_SUSPEND_PREVIOUS, queryName, existingQueryId.str(), context->queryUserId());
- return;
- }
- StringBuffer newQueryId;
- Owned<IWorkUnit> workunit = factory->updateWorkUnit(wuid);
- addQueryToQuerySet(workunit, destQuerySet, queryName, makeActive ? ACTIVATE_SUSPEND_PREVIOUS : DO_NOT_ACTIVATE, newQueryId, context->queryUserId());
- copiedQueryIds.append(newQueryId);
- Owned<IPropertyTree> destQuery = getQueryById(destQuerySet, newQueryId);
- if (destQuery)
- {
- Owned<IAttributeIterator> aiter = query->getAttributes();
- ForEach(*aiter)
- {
- const char *atname = aiter->queryName();
- if (!destQuery->hasProp(atname))
- destQuery->setProp(atname, aiter->queryValue());
- }
- if (cloneFilesEnabled && wufiles)
- wufiles->addFilesFromQuery(workunit, pm, newQueryId);
- }
- }
- void cloneQueryLocal(IPropertyTree *query, bool makeActive)
- {
- const char *wuid = query->queryProp("@wuid");
- if (!wuid || !*wuid)
- return;
- const char *queryName = query->queryProp("@name");
- if (!queryName || !*queryName)
- return;
- SCMStringBuffer existingQueryId;
- queryIdFromQuerySetWuid(destQuerySet, wuid, queryName, existingQueryId);
- if (existingQueryId.length())
- {
- existingQueryIds.append(existingQueryId.str());
- if (makeActive)
- activateQuery(destQuerySet, ACTIVATE_SUSPEND_PREVIOUS, queryName, existingQueryId.str(), context->queryUserId());
- return;
- }
- StringBuffer newQueryId;
- Owned<IWorkUnit> workunit = factory->updateWorkUnit(wuid);
- if (!workunit)
- {
- StringBuffer msg(wuid);
- msg.append(": ").append(query->queryProp("@id"));
- missingWuids.append(msg);
- return;
- }
- if (!newQueryId.length())
- addQueryToQuerySet(workunit, destQuerySet, queryName, makeActive ? ACTIVATE_SUSPEND_PREVIOUS : DO_NOT_ACTIVATE, newQueryId, context->queryUserId());
- copiedQueryIds.append(newQueryId);
- Owned<IPropertyTree> destQuery = getQueryById(destQuerySet, newQueryId);
- if (destQuery)
- {
- Owned<IAttributeIterator> aiter = query->getAttributes();
- ForEach(*aiter)
- {
- const char *atname = aiter->queryName();
- if (!destQuery->hasProp(atname))
- destQuery->setProp(atname, aiter->queryValue());
- }
- Owned<IPropertyTreeIterator> children = query->getElements("*");
- ForEach(*children)
- {
- IPropertyTree &child = children->query();
- destQuery->addPropTree(child.queryName(), createPTreeFromIPT(&child));
- }
- if (cloneFilesEnabled && wufiles)
- wufiles->addFilesFromQuery(workunit, pm, newQueryId);
- }
- }
- void cloneActiveRemote(bool makeActive)
- {
- Owned<IPropertyTreeIterator> activeQueries = srcQuerySet->getElements("QuerysetAliases/QuerySetAlias");
- ForEach(*activeQueries)
- {
- IPropertyTree &alias = activeQueries->query();
- VStringBuffer xpath("QuerysetQueries/QuerySetQuery[Id='%s'][1]", alias.queryProp("Id"));
- IPropertyTree *query = srcQuerySet->queryPropTree(xpath);
- if (!query)
- continue;
- cloneQueryRemote(query, makeActive);
- }
- }
- void cloneActiveLocal(bool makeActive, const char *mask)
- {
- StringBuffer xpath("Alias");
- if (mask && *mask)
- xpath.appendf("[@id='%s']", mask);
- Owned<IPropertyTreeIterator> activeQueries = srcQuerySet->getElements(xpath);
- ForEach(*activeQueries)
- {
- IPropertyTree &alias = activeQueries->query();
- Owned<IPropertyTree> query = getQueryById(srcQuerySet, alias.queryProp("@id"));
- if (!query)
- return;
- cloneQueryLocal(query, makeActive);
- }
- }
- void cloneActive(bool makeActive)
- {
- if (srcAddress.length())
- cloneActiveRemote(makeActive);
- else
- cloneActiveLocal(makeActive, nullptr);
- }
- void cloneAllRemote(bool cloneActiveState)
- {
- Owned<IPropertyTreeIterator> allQueries = srcQuerySet->getElements("QuerysetQueries/QuerySetQuery");
- ForEach(*allQueries)
- {
- IPropertyTree &query = allQueries->query();
- bool makeActive = false;
- if (cloneActiveState)
- {
- VStringBuffer xpath("QuerysetAliases/QuerySetAlias[Id='%s']", query.queryProp("Id"));
- makeActive = srcQuerySet->hasProp(xpath);
- }
- cloneQueryRemote(&query, makeActive);
- }
- }
- void cloneAllLocal(bool cloneActiveState, const char *mask)
- {
- StringBuffer xpath("Query");
- if (mask && *mask)
- xpath.appendf("[@id='%s']", mask);
- Owned<IPropertyTreeIterator> allQueries = srcQuerySet->getElements(xpath);
- ForEach(*allQueries)
- {
- IPropertyTree &query = allQueries->query();
- bool makeActive = false;
- if (cloneActiveState)
- {
- VStringBuffer xpath("Alias[@id='%s']", query.queryProp("@id"));
- makeActive = srcQuerySet->hasProp(xpath);
- }
- cloneQueryLocal(&query, makeActive);
- }
- }
- void cloneAll(bool cloneActiveState)
- {
- if (srcAddress.length())
- cloneAllRemote(cloneActiveState);
- else
- cloneAllLocal(cloneActiveState, nullptr);
- }
- void enableFileCloning(unsigned _updateFlags, const char *dfsServer, const char *destProcess, const char *sourceProcess, bool allowForeign)
- {
- cloneFilesEnabled = true;
- updateFlags = _updateFlags;
- splitDerivedDfsLocation(dfsServer, srcCluster, dfsIP, srcPrefix, sourceProcess, sourceProcess, NULL, NULL);
- wufiles.setown(createReferencedFileList(context->queryUserId(), context->queryPassword(), allowForeign, false));
- Owned<IHpccPackageSet> ps = createPackageSet(destProcess);
- pm.set(ps->queryActiveMap(target));
- process.set(destProcess);
- }
- void cloneFiles()
- {
- if (cloneFilesEnabled)
- {
- wufiles->resolveFiles(process, dfsIP, srcPrefix, srcCluster, !(updateFlags & (DALI_UPDATEF_REPLACE_FILE | DALI_UPDATEF_CLONE_FROM)), true, false, true);
- Owned<IDFUhelper> helper = createIDFUhelper();
- Owned <IConstWUClusterInfo> cl = getTargetClusterInfo(target);
- if (cl)
- {
- SCMStringBuffer process;
- StringBuffer defReplicateFolder;
- getConfigurationDirectory(NULL, "data2", "roxie", cl->getRoxieProcess(process).str(), defReplicateFolder);
- wufiles->cloneAllInfo(updateFlags, helper, true, true, cl->getRoxieRedundancy(), cl->getChannelsPerNode(), cl->getRoxieReplicateOffset(), defReplicateFolder);
- }
- }
- }
- void gatherFileErrors(IArrayOf<IConstLogicalFileError> &errors)
- {
- if (wufiles)
- ::gatherFileErrors(wufiles, errors);
- }
- private:
- Linked<IEspContext> context;
- Linked<IWorkUnitFactory> factory;
- Owned<IPropertyTree> destQuerySet;
- Owned<IPropertyTree> srcQuerySet;
- Owned<IReferencedFileList> wufiles;
- Owned<const IHpccPackageMap> pm;
- StringBuffer dfsIP;
- StringBuffer srcAddress;
- StringBuffer srcCluster;
- StringBuffer srcPrefix;
- StringAttr target;
- StringAttr process;
- StringAttr queryDirectory;
- bool cloneFilesEnabled = false;
- unsigned updateFlags = 0;
- public:
- StringArray existingQueryIds;
- StringArray copiedQueryIds;
- StringArray missingWuids;
- };
- bool CWsWorkunitsEx::onWUCopyQuerySet(IEspContext &context, IEspWUCopyQuerySetRequest &req, IEspWUCopyQuerySetResponse &resp)
- {
- const char *source = req.getSource();
- if (!source || !*source)
- throw MakeStringException(ECLWATCH_MISSING_PARAMS, "No source target specified");
- StringBuffer srcAddress;
- StringBuffer srcTarget;
- if (!splitQueryPath(source, srcAddress, srcTarget, NULL))
- throw MakeStringException(ECLWATCH_INVALID_INPUT, "Invalid source target");
- if (!srcAddress.length() && !isValidCluster(srcTarget))
- throw MakeStringException(ECLWATCH_INVALID_CLUSTER_NAME, "Invalid source target name: %s", source);
- const char *target = req.getTarget();
- if (!target || !*target)
- throw MakeStringException(ECLWATCH_MISSING_PARAMS, "No destination target specified");
- if (!isValidCluster(target))
- throw MakeStringException(ECLWATCH_INVALID_CLUSTER_NAME, "Invalid destination target name: %s", target);
- DBGLOG("%s copying queryset %s from %s target %s", context.queryUserId(), target, srcAddress.str(), srcTarget.str());
- QueryCloner cloner(&context, srcAddress, srcTarget, target);
- cloner.setQueryDirectory(queryDirectory);
- SCMStringBuffer process;
- if (req.getCopyFiles())
- {
- Owned <IConstWUClusterInfo> clusterInfo = getTargetClusterInfo(target);
- if (clusterInfo && clusterInfo->getPlatform()==RoxieCluster)
- {
- clusterInfo->getRoxieProcess(process);
- if (!process.length())
- throw MakeStringException(ECLWATCH_INVALID_CLUSTER_INFO, "DFS process cluster not found for destination target %s", target);
- unsigned updateFlags = 0;
- if (req.getOverwriteDfs())
- updateFlags |= (DALI_UPDATEF_REPLACE_FILE | DALI_UPDATEF_CLONE_FROM | DALI_UPDATEF_SUPERFILES);
- if (req.getUpdateCloneFrom())
- updateFlags |= DALI_UPDATEF_CLONE_FROM;
- if (req.getUpdateSuperFiles())
- updateFlags |= DALI_UPDATEF_SUPERFILES;
- if (req.getAppendCluster())
- updateFlags |= DALI_UPDATEF_APPEND_CLUSTER;
- cloner.enableFileCloning(updateFlags, req.getDfsServer(), process.str(), req.getSourceProcess(), req.getAllowForeignFiles());
- }
- }
- if (req.getActiveOnly())
- cloner.cloneActive(req.getCloneActiveState());
- else
- cloner.cloneAll(req.getCloneActiveState());
- cloner.cloneFiles();
- if (req.getIncludeFileErrors())
- cloner.gatherFileErrors(resp.getFileErrors());
- resp.setCopiedQueries(cloner.copiedQueryIds);
- resp.setExistingQueries(cloner.existingQueryIds);
- return true;
- }
- bool CWsWorkunitsEx::onWUQuerysetCopyQuery(IEspContext &context, IEspWUQuerySetCopyQueryRequest &req, IEspWUQuerySetCopyQueryResponse &resp)
- {
- unsigned start = msTick();
- const char *source = req.getSource();
- if (!source || !*source)
- throw MakeStringException(ECLWATCH_MISSING_PARAMS, "No source query specified");
- const char *target = req.getTarget();
- if (!target || !*target)
- throw MakeStringException(ECLWATCH_MISSING_PARAMS, "No destination specified");
- if (strchr(target, '/')) //for future use
- throw MakeStringException(ECLWATCH_INVALID_INPUT, "Invalid target queryset name");
- if (req.getCluster() && *req.getCluster() && !strieq(req.getCluster(), target)) //backward compatability check
- throw MakeStringException(ECLWATCH_INVALID_INPUT, "Invalid target cluster and queryset must match");
- if (!isValidCluster(target))
- throw MakeStringException(ECLWATCH_INVALID_CLUSTER_NAME, "Invalid target name: %s", target);
- StringBuffer srcAddress, srcQuerySet, srcQuery;
- if (!splitQueryPath(source, srcAddress, srcQuerySet, &srcQuery))
- throw MakeStringException(ECLWATCH_INVALID_INPUT, "Invalid source query path");
- StringAttr targetQueryName(req.getDestName());
- Owned<IClientWUQuerySetDetailsResponse> sourceQueryInfoResp;
- IConstQuerySetQuery *srcInfo=NULL;
- DBGLOG("%s copying query %s to target %s from %s target %s", context.queryUserId(), srcQuery.str(), target, srcAddress.str(), srcQuerySet.str());
- StringBuffer remoteIP;
- StringBuffer wuid;
- if (srcAddress.length())
- {
- StringBuffer xml;
- MemoryBuffer dll;
- StringBuffer dllname;
- StringBuffer queryName;
- fetchRemoteWorkunitAndQueryDetails(NULL, &context, srcAddress.str(), srcQuerySet.str(), srcQuery.str(), NULL, queryName, xml, dllname, dll, remoteIP, sourceQueryInfoResp);
- if (sourceQueryInfoResp && sourceQueryInfoResp->getQuerysetQueries().ordinality())
- srcInfo = &sourceQueryInfoResp->getQuerysetQueries().item(0);
- if (srcInfo)
- wuid.set(srcInfo->getWuid());
- if (targetQueryName.isEmpty())
- targetQueryName.set(queryName);
- deploySharedObject(context, wuid, dllname.str(), target, targetQueryName.get(), dll, queryDirectory.str(), xml.str());
- }
- else
- {
- //Could get the atributes without soap call, but this creates a common data structure shared with fetching remote query info
- //Get query attributes before resolveQueryAlias, to avoid deadlock
- sourceQueryInfoResp.setown(fetchQueryDetails(NULL, &context, NULL, srcQuerySet, srcQuery));
- if (sourceQueryInfoResp && sourceQueryInfoResp->getQuerysetQueries().ordinality())
- srcInfo = &sourceQueryInfoResp->getQuerysetQueries().item(0);
- Owned<IPropertyTree> queryset = getQueryRegistry(srcQuerySet.str(), true);
- if (!queryset)
- throw MakeStringException(ECLWATCH_QUERYSET_NOT_FOUND, "Source Queryset %s not found", srcQuery.str());
- Owned<IPropertyTree> query = resolveQueryAlias(queryset, srcQuery.str());
- if (!query)
- throw MakeStringException(ECLWATCH_QUERYSET_NOT_FOUND, "Source query %s not found", source);
- wuid.set(query->queryProp("@wuid"));
- if (targetQueryName.isEmpty())
- targetQueryName.set(query->queryProp("@name"));
- }
- Owned<IWorkUnitFactory> factory = getWorkUnitFactory(context.querySecManager(), context.queryUser());
- Owned<IConstWorkUnit> cw = factory->openWorkUnit(wuid.str());
- if (!req.getDontCopyFiles())
- {
- StringBuffer daliIP;
- StringBuffer srcCluster;
- StringBuffer srcPrefix;
- splitDerivedDfsLocation(req.getDaliServer(), srcCluster, daliIP, srcPrefix, req.getSourceProcess(), req.getSourceProcess(), remoteIP.str(), NULL);
- unsigned updateFlags = 0;
- if (req.getOverwrite())
- updateFlags |= (DALI_UPDATEF_REPLACE_FILE | DALI_UPDATEF_CLONE_FROM | DALI_UPDATEF_SUPERFILES);
- if (req.getUpdateCloneFrom())
- updateFlags |= DALI_UPDATEF_CLONE_FROM;
- if (req.getUpdateSuperFiles())
- updateFlags |= DALI_UPDATEF_SUPERFILES;
- if (req.getAppendCluster())
- updateFlags |= DALI_UPDATEF_APPEND_CLUSTER;
- QueryFileCopier cpr(target);
- cpr.init(context, req.getAllowForeignFiles());
- cpr.remoteIP.set(daliIP);
- cpr.remotePrefix.set(srcPrefix);
- cpr.srcCluster.set(srcCluster);
- cpr.queryname.set(targetQueryName);
- cpr.copy(cw, updateFlags);
- if (req.getIncludeFileErrors())
- cpr.gatherFileErrors(resp.getFileErrors());
- }
- WorkunitUpdate wu(&cw->lock());
- if (!wu)
- throw MakeStringException(ECLWATCH_CANNOT_OPEN_WORKUNIT, "Error opening wuid %s for query %s", wuid.str(), source);
- StringBuffer targetQueryId;
- WUQueryActivationOptions activate = (WUQueryActivationOptions)req.getActivate();
- addQueryToQuerySet(wu, target, targetQueryName.get(), activate, targetQueryId, context.queryUserId());
- Owned<IPropertyTree> queryTree = getQueryById(target, targetQueryId, false);
- if (queryTree)
- {
- updateMemoryLimitSetting(queryTree, req.getMemoryLimit(), srcInfo);
- updateQueryPriority(queryTree, req.getPriority(), srcInfo);
- updateTimeLimitSetting(queryTree, req.getTimeLimit_isNull(), req.getTimeLimit(), srcInfo);
- updateWarnTimeLimitSetting(queryTree, req.getWarnTimeLimit_isNull(), req.getWarnTimeLimit(), srcInfo);
- if (req.getComment())
- queryTree->setProp("@comment", req.getComment());
- else if (srcInfo && srcInfo->getComment())
- queryTree->setProp("@comment", srcInfo->getComment());
- if (srcInfo && srcInfo->getSnapshot())
- queryTree->setProp("@snapshot", srcInfo->getSnapshot());
- }
- wu.clear();
- resp.setQueryId(targetQueryId.str());
- if (0!=req.getWait() && !req.getNoReload())
- reloadCluster(target, remainingMsWait(req.getWait(), start));
- return true;
- }
- bool CWsWorkunitsEx::onWUQuerysetImport(IEspContext &context, IEspWUQuerysetImportRequest &req, IEspWUQuerysetImportResponse &resp)
- {
- try
- {
- const char* target = req.getTarget();
- if (!target || !*target)
- throw MakeStringException(ECLWATCH_QUERYSET_NOT_FOUND, "Target not specified");
- Owned <IConstWUClusterInfo> clusterInfo = getTargetClusterInfo(target);
- if (!clusterInfo)
- throw MakeStringException(ECLWATCH_CANNOT_RESOLVE_CLUSTER_NAME, "Target not found");
- if (req.getCopyFiles() && clusterInfo->getPlatform()!=RoxieCluster)
- throw MakeStringException(ECLWATCH_INVALID_ACTION, "Copy files option only supported for Roxie");
- MemoryBuffer &mb = const_cast<MemoryBuffer &>(req.getData()); //for efficiency, content of request shouldn't matter after
- if (req.getCompressed())
- {
- MemoryBuffer decompressed;
- fastLZDecompressToBuffer(decompressed, mb);
- mb.swapWith(decompressed);
- }
- mb.append('\0');
- Owned<IPropertyTree> srcTree = createPTreeFromXMLString(mb.toByteArray());
- const char *archivedTarget = srcTree->queryProp("@target");
- if (archivedTarget && *archivedTarget) //support simple queryset or with archived (exported) root format
- {
- VStringBuffer xpath("QuerySet[@id='%s']", archivedTarget);
- IPropertyTree *qsTree = srcTree->queryPropTree(xpath);
- if (qsTree)
- srcTree.setown(LINK(qsTree));
- }
- if (req.getReplace())
- {
- Owned<IPropertyTree> queryRegistry = getQueryRegistry(target, false);
- queryRegistry->removeProp("*");
- resp.setClearedExisting(true);
- }
- const bool activate = CQuerysetImportActivation_ImportedActive == req.getActivation(); //only two options now but may evolve
- QueryCloner cloner(&context, srcTree, target);
- SCMStringBuffer process;
- if (req.getCopyFiles())
- {
- clusterInfo->getRoxieProcess(process); //checked if roxie when copying files above
- if (!process.length())
- throw MakeStringException(ECLWATCH_INVALID_CLUSTER_INFO, "DFS process cluster not found for destination target %s", target);
- unsigned updateFlags = 0;
- if (req.getOverwriteDfs())
- updateFlags |= (DALI_UPDATEF_REPLACE_FILE | DALI_UPDATEF_CLONE_FROM | DALI_UPDATEF_SUPERFILES);
- if (req.getUpdateCloneFrom())
- updateFlags |= DALI_UPDATEF_CLONE_FROM;
- if (req.getUpdateSuperFiles())
- updateFlags |= DALI_UPDATEF_SUPERFILES;
- if (req.getAppendCluster())
- updateFlags |= DALI_UPDATEF_APPEND_CLUSTER;
- cloner.enableFileCloning(updateFlags, req.getDfsServer(), process.str(), req.getSourceProcess(), req.getAllowForeignFiles());
- }
- if (req.getActiveOnly())
- cloner.cloneActiveLocal(activate, req.getQueryMask());
- else
- cloner.cloneAllLocal(activate, req.getQueryMask());
- cloner.cloneFiles();
- if (req.getIncludeFileErrors())
- cloner.gatherFileErrors(resp.getFileErrors());
- resp.setImportedQueries(cloner.copiedQueryIds);
- resp.setExistingQueries(cloner.existingQueryIds);
- resp.setMissingWuids(cloner.missingWuids);
- }
- catch(IException* e)
- {
- FORWARDEXCEPTION(context, e, ECLWATCH_INTERNAL_ERROR);
- }
- return true;
- }
- bool CWsWorkunitsEx::onWUQuerysetExport(IEspContext &context, IEspWUQuerysetExportRequest &req, IEspWUQuerysetExportResponse &resp)
- {
- try
- {
- const char* target = req.getTarget();
- if (!target || !*target)
- throw MakeStringException(ECLWATCH_QUERYSET_NOT_FOUND, "Target not specified");
- Owned<IPropertyTree> queryRegistry = getQueryRegistry(target, true);
- if (req.getActiveOnly())
- {
- Owned<IPropertyTree> activeOnly = createPTree("QuerySet");
- Owned<IAttributeIterator> attrs = queryRegistry->getAttributes();
- ForEach(*attrs)
- activeOnly->setProp(attrs->queryName(), attrs->queryValue());
- Owned<IPropertyTreeIterator> aliases = queryRegistry->getElements("Alias");
- ForEach(*aliases)
- {
- IPropertyTree &alias = aliases->query();
- const char *id = alias.queryProp("@id");
- if (id && *id)
- {
- VStringBuffer xpath("Query[@id='%s']", id);
- IPropertyTree *query = queryRegistry->queryPropTree(xpath);
- if (query)
- {
- activeOnly->addPropTree("Query", LINK(query));
- activeOnly->addPropTree("Alias", LINK(&alias));
- }
- }
- }
- queryRegistry.setown(activeOnly.getClear());
- }
- if (req.getProtect())
- {
- StringArray wuids;
- Owned<IPropertyTreeIterator> queries = queryRegistry->getElements("Query");
- ForEach(*queries)
- {
- IPropertyTree &query = queries->query();
- const char *wuid = query.queryProp("@wuid");
- if (wuid && *wuid)
- wuids.append(wuid);
- }
- if (wuids.length())
- doProtectWorkunits(context, wuids, nullptr);
- }
- CDateTime dt;
- dt.setNow();
- StringBuffer dts;
- VStringBuffer qs("<QuerySetArchive exported='%s' target='%s' activeOnly='%s'>\n", dt.getString(dts, true).str(), target, req.getActiveOnly() ? "true" : "false");
- toXML(queryRegistry, qs);
- qs.append("</QuerySetArchive>");
- MemoryBuffer content;
- if (req.getCompress())
- fastLZCompressToBuffer(content, qs.length()+1, qs);
- else
- content.append(qs.str());
- resp.setTarget(target);
- resp.setCompressed(req.getCompress());
- resp.setData(content);
- }
- catch(IException* e)
- {
- FORWARDEXCEPTION(context, e, ECLWATCH_INTERNAL_ERROR);
- }
- return true;
- }
- void CWsWorkunitsEx::getGraphsByQueryId(const char *target, const char *queryId, const char *graphId, const char *subGraphId, IArrayOf<IEspECLGraphEx>& ECLGraphs)
- {
- if (!target || !*target)
- throw MakeStringException(ECLWATCH_MISSING_PARAMS, "Target name required");
- if (!queryId || !*queryId)
- throw MakeStringException(ECLWATCH_MISSING_PARAMS, "Query Id required");
- Owned<IConstWUClusterInfo> info = getTargetClusterInfo(target);
- if (!info || (info->getPlatform()!=RoxieCluster)) //Only support roxie for now
- throw MakeStringException(ECLWATCH_INVALID_CLUSTER_NAME, "Invalid Roxie name");
- PROGLOG("getGraphsByQueryId: target %s, query %s", target, queryId);
- const SocketEndpointArray &eps = info->getRoxieServers();
- if (eps.empty())
- return;
- VStringBuffer control("<control:querystats><Query id='%s'/></control:querystats>", queryId);
- Owned<IPropertyTree> querystats = sendRoxieControlAllNodes(eps.item(0), control.str(), false, ROXIELOCKCONNECTIONTIMEOUT);
- if (!querystats)
- return;
- Owned<IPropertyTreeIterator> graphs = querystats->getElements("Endpoint/Query/Graph");
- ForEach(*graphs)
- {
- IPropertyTree &graph = graphs->query();
- const char* aGraphId = graph.queryProp("@id");
- if (graphId && *graphId && !strieq(graphId, aGraphId))
- continue;
- IPropertyTree* xgmml = graph.getBranch("xgmml/graph");
- if (!xgmml)
- continue;
- Owned<IEspECLGraphEx> g = createECLGraphEx("","");
- g->setName(aGraphId);
- StringBuffer xml;
- if (!subGraphId || !*subGraphId)
- toXML(xgmml, xml);
- else
- {
- VStringBuffer xpath("//node[@id='%s']", subGraphId);
- toXML(xgmml->queryPropTree(xpath.str()), xml);
- }
- g->setGraph(xml.str());
- ECLGraphs.append(*g.getClear());
- }
- return;
- }
- bool CWsWorkunitsEx::onWUQueryGetGraph(IEspContext& context, IEspWUQueryGetGraphRequest& req, IEspWUQueryGetGraphResponse& resp)
- {
- try
- {
- IArrayOf<IEspECLGraphEx> graphs;
- getGraphsByQueryId(req.getTarget(), req.getQueryId(), req.getGraphName(), req.getSubGraphId(), graphs);
- resp.setGraphs(graphs);
- }
- catch(IException* e)
- {
- FORWARDEXCEPTION(context, e, ECLWATCH_INTERNAL_ERROR);
- }
- return true;
- }
- bool CWsWorkunitsEx::resetQueryStats(IEspContext& context, const char* target, IProperties* queryIds, IEspWUQuerySetQueryActionResponse& resp)
- {
- IArrayOf<IEspQuerySetQueryActionResult> results;
- Owned<IEspQuerySetQueryActionResult> result = createQuerySetQueryActionResult();
- try
- {
- StringBuffer control;
- Owned<IPropertyIterator> it = queryIds->getIterator();
- ForEach(*it)
- {
- const char *queryId = it->getPropKey();
- if (queryId && *queryId)
- {
- appendXMLOpenTag(control, "Query", NULL, false);
- appendXMLAttr(control, "id", queryId);
- if (target && *target)
- appendXMLAttr(control, "target", target);
- control.append("/>");
- }
- }
- if (!control.length())
- throw MakeStringException(ECLWATCH_MISSING_PARAMS, "CWsWorkunitsEx::resetQueryStats: Query ID not specified");
- control.insert(0, "<control:resetquerystats>");
- control.append("</control:resetquerystats>");
- if (!sendControlQuery(context, target, control.str(), ROXIECONNECTIONTIMEOUT))
- throw MakeStringException(ECLWATCH_INTERNAL_ERROR, "CWsWorkunitsEx::resetQueryStats: Failed to send roxie control query");
- result->setMessage("Query stats reset succeeded");
- result->setSuccess(true);;
- }
- catch(IMultiException *me)
- {
- StringBuffer msg;
- result->setMessage(me->errorMessage(msg).str());
- result->setCode(me->errorCode());
- result->setSuccess(false);
- me->Release();
- }
- catch(IException *e)
- {
- StringBuffer msg;
- result->setMessage(e->errorMessage(msg).str());
- result->setCode(e->errorCode());
- result->setSuccess(false);
- e->Release();
- }
- results.append(*result.getClear());
- resp.setResults(results);
- return true;
- }
- IPropertyTree* CWsWorkunitsEx::sendControlQuery(IEspContext& context, const char* target, const char* query, unsigned timeout)
- {
- if (!target || !*target)
- throw MakeStringException(ECLWATCH_MISSING_PARAMS, "CWsWorkunitsEx::sendControlQuery: target not specified");
- if (!query || !*query)
- throw MakeStringException(ECLWATCH_MISSING_PARAMS, "CWsWorkunitsEx::sendControlQuery: Control query not specified");
- Owned<IConstWUClusterInfo> info = getTargetClusterInfo(target);
- if (!info || (info->getPlatform()!=RoxieCluster)) //Only support roxie for now
- throw MakeStringException(ECLWATCH_INVALID_CLUSTER_NAME, "CWsWorkunitsEx::sendControlQuery: Invalid target name %s", target);
- const SocketEndpointArray &eps = info->getRoxieServers();
- if (eps.empty())
- throw MakeStringException(ECLWATCH_INVALID_CLUSTER_NAME, "CWsWorkunitsEx::sendControlQuery: Server not found for %s", target);
- Owned<ISocket> sock = ISocket::connect_timeout(eps.item(0), timeout);
- return sendRoxieControlQuery(sock, query, timeout);
- }
- bool CWsWorkunitsEx::onWUUpdateQueryEntry(IEspContext& context, IEspWUUpdateQueryEntryRequest& req, IEspWUUpdateQueryEntryResponse& resp)
- {
- try
- {
- StringBuffer querySetName, query;
- ensureInputString(req.getQuerySet(), true, querySetName, ECLWATCH_QUERYSET_NOT_FOUND, "Query Set not specified");
- ensureInputString(req.getQueryId(), true, query, ECLWATCH_QUERYID_NOT_FOUND, "Query not specified");
- Owned<IPropertyTree> querySet = getQueryRegistry(querySetName.str(), true);
- if (!querySet)
- throw MakeStringException(ECLWATCH_QUERYSET_NOT_FOUND, "Queryset %s not found", querySetName.str());
- VStringBuffer xpath("Query[@id=\"%s\"]", query.str());
- IPropertyTree *tree = querySet->queryPropTree(xpath);
- if (!tree)
- throw MakeStringException(ECLWATCH_QUERYSET_NOT_FOUND, "Query %s not found", query.str());
- StringBuffer comment(req.getComment());
- if (comment.isEmpty())
- tree->removeProp("@comment");
- else
- tree->setProp("@comment", comment.str());
- }
- catch(IException* e)
- {
- FORWARDEXCEPTION(context, e, ECLWATCH_INTERNAL_ERROR);
- }
- return true;
- }
- bool CWsWorkunitsEx::onWUGetNumFileToCopy(IEspContext& context, IEspWUGetNumFileToCopyRequest& req, IEspWUGetNumFileToCopyResponse& resp)
- {
- class CWUGetNumFileToCopyPager : public CSimpleInterface, implements IElementsPager
- {
- StringAttr clusterName;
- StringAttr sortOrder;
- public:
- IMPLEMENT_IINTERFACE_USING(CSimpleInterface);
- CWUGetNumFileToCopyPager(const char* _clusterName, const char *_sortOrder)
- : clusterName(_clusterName), sortOrder(_sortOrder) { };
- virtual IRemoteConnection* getElements(IArrayOf<IPropertyTree> &elements)
- {
- SocketEndpointArray servers;
- getRoxieProcessServers(clusterName.get(), servers);
- if (servers.length() < 1)
- {
- PROGLOG("WUGetNumFileToCopy: Process Server not found for %s", clusterName.get());
- return NULL;
- }
- Owned<IPropertyTree> result = sendRoxieControlAllNodes(servers.item(0), "<control:numfilestoprocess/>", false, ROXIELOCKCONNECTIONTIMEOUT);
- if (!result)
- {
- PROGLOG("WUGetNumFileToCopy: Empty result received for cluster %s", clusterName.get());
- return NULL;
- }
- Owned<IPropertyTreeIterator> iter = result->getElements("*");
- if (!iter)
- return NULL;
- StringArray unknownAttributes;
- sortElements(iter, sortOrder.get(), NULL, NULL, unknownAttributes, elements);
- return NULL;
- }
- virtual bool allMatchingElementsReceived() { return true; } //For now, roxie always returns all of matched items.
- };
- try
- {
- StringBuffer clusterName(req.getClusterName());
- if (clusterName.isEmpty())
- throw MakeStringException(ECLWATCH_CANNOT_RESOLVE_CLUSTER_NAME, "Cluster not specified");
- StringBuffer so;
- bool descending = req.getDescending();
- if (descending)
- so.set("-");
- const char *sortBy = req.getSortby();
- if (!isEmptyString(sortBy) && strieq(sortBy, "URL"))
- so.append("?@ep");
- else if (!isEmptyString(sortBy) && strieq(sortBy, "Status"))
- so.append("?Status");
- else
- so.append("#FilesToProcess/@value");
- unsigned pageSize = req.getPageSize();
- unsigned pageStartFrom = req.getPageStartFrom();
- if(pageSize < 1)
- pageSize = 100;
- __int64 cacheHint = 0;
- if (!req.getCacheHint_isNull())
- cacheHint = req.getCacheHint();
- unsigned numberOfEndpoints = 0;
- IArrayOf<IPropertyTree> results;
- Owned<IElementsPager> elementsPager = new CWUGetNumFileToCopyPager(clusterName.str(), so.str());
- getElementsPaged(elementsPager, pageStartFrom, pageSize, NULL, "", &cacheHint, results, &numberOfEndpoints, NULL, false);
- IArrayOf<IEspClusterEndpoint> endpoints;
- ForEachItemIn(i, results)
- {
- IPropertyTree &item = results.item(i);
- Owned<IEspClusterEndpoint> endpoint = createClusterEndpoint();
- endpoint->setURL(item.queryProp("@ep"));
- endpoint->setStatus(item.queryProp("Status"));
- endpoint->setNumQueryFileToCopy(item.getPropInt("FilesToProcess/@value", 0));
- endpoints.append(*endpoint.getClear());
- }
- resp.setEndpoints(endpoints);
- resp.setCacheHint(cacheHint);
- resp.setTotal(numberOfEndpoints);
- }
- catch(IException* e)
- {
- FORWARDEXCEPTION(context, e, ECLWATCH_INTERNAL_ERROR);
- }
- return true;
- }
- bool CWsWorkunitsEx::onWUQueryGetSummaryStats(IEspContext& context, IEspWUQueryGetSummaryStatsRequest& req, IEspWUQueryGetSummaryStatsResponse& resp)
- {
- try
- {
- const char *target = req.getTarget();
- if (isEmptyString(target))
- throw MakeStringException(ECLWATCH_MISSING_PARAMS, "Target name required");
- Owned<IConstWUClusterInfo> info = getTargetClusterInfo(target);
- if (!info || (info->getPlatform()!=RoxieCluster)) //Only support roxie for now
- throw MakeStringException(ECLWATCH_INVALID_CLUSTER_NAME, "Roxie name not found");
- double version = context.getClientVersion();
- const char *queryId = req.getQueryId();
- if (!isEmptyString(queryId))
- PROGLOG("WUQueryGetSummaryStats: target %s, query %s", target, queryId);
- else
- PROGLOG("WUQueryGetSummaryStats: target %s", target);
- const SocketEndpointArray &eps = info->getRoxieServers();
- if (eps.empty())
- {
- IERRLOG("WUQueryGetSummaryStats: Failed to getRoxieServers for %s", target);
- return true;
- }
- bool includeRawStats = req.getIncludeRawStats();
- const char *fromTime = req.getFromTime();
- const char *toTime = req.getToTime();
- VStringBuffer control("<control:queryAggregates");
- if (!isEmpty(fromTime))
- control.appendf(" from='%s'", fromTime);
- if (!isEmpty(toTime))
- control.appendf(" to='%s'", toTime);
- if (includeRawStats)
- {
- if (!isEmptyString(queryId))
- control.appendf(" rawStats='1'><Query id='%s'/></control:queryAggregates>", queryId);
- else
- control.append(" all='1' rawStats='1' />");
- }
- else if (!isEmptyString(queryId))
- {
- control.appendf("><Query id='%s'/></control:queryAggregates>", queryId);
- }
- else
- control.append(" />");
- Owned<IPropertyTree> queryAggregates = sendRoxieControlAllNodes(eps.item(0), control.str(), false, ROXIELOCKCONNECTIONTIMEOUT);
- if (!queryAggregates)
- {
- PROGLOG("WUQueryGetSummaryStats: %s returns empty for %s", control.str(), target);
- return true;
- }
- if (getEspLogLevel() >= LogMax)
- {
- StringBuffer sb;
- toXML(queryAggregates, sb);
- DBGLOG("getQueryStats(): '%s' => '%s'", control.str(), sb.str());
- }
- IArrayOf<IEspQuerySummaryStats> querySummaryStatsList;
- IArrayOf<IEspEndpointQueryStats> queryStatsList;
- Owned<IPropertyTreeIterator> queryStatsOnEndpointItr = queryAggregates->getElements("Endpoint");
- ForEach(*queryStatsOnEndpointItr)
- {
- IPropertyTree &queryStatsOnEndpoint = queryStatsOnEndpointItr->query();
- const char *status = queryStatsOnEndpoint.queryProp("Status");
- const char *ep = queryStatsOnEndpoint.queryProp("@ep");
- if (isEmptyString(ep))
- continue;
- if (version >= 1.75)
- {
- if (includeRawStats)
- readQueryStatsList(&queryStatsOnEndpoint, status, ep, includeRawStats, queryStatsList);
- else
- readQueryAggregateStats(queryStatsOnEndpoint.queryPropTree("Query"), status, ep, querySummaryStatsList);
- }
- else if (!isEmptyString(queryId))
- readQueryAggregateStats(queryStatsOnEndpoint.queryPropTree("Query"), status, ep, querySummaryStatsList);
- }
- resp.setStatsList(querySummaryStatsList);
- resp.setQueryStatsList(queryStatsList);
- }
- catch(IException* e)
- {
- FORWARDEXCEPTION(context, e, ECLWATCH_INTERNAL_ERROR);
- }
- return true;
- }
- void CWsWorkunitsEx::readQueryAggregateStats(IPropertyTree *queryStats, const char *status, const char *ep,
- IArrayOf<IEspQuerySummaryStats> &querySummaryStatsList)
- {
- if (!queryStats)
- return;
- Owned<IEspQuerySummaryStats> querySummaryStats = createQuerySummaryStats();
- if (!isEmptyString(ep))
- querySummaryStats->setEndpoint(ep);
- if (queryStats->hasProp("countFailed"))
- querySummaryStats->setCountFailed(queryStats->getPropInt("countFailed"));
- if (queryStats->hasProp("countTotal"))
- querySummaryStats->setCountTotal(queryStats->getPropInt("countTotal"));
- if (queryStats->hasProp("averageBytesOut"))
- querySummaryStats->setAverageBytesOut(queryStats->getPropInt64("averageBytesOut"));
- if (queryStats->hasProp("averageMemUsed"))
- querySummaryStats->setSizeAvgPeakMemory(queryStats->getPropInt64("averageMemUsed"));
- if (queryStats->hasProp("averageSlavesReplyLen"))
- querySummaryStats->setAverageSlavesReplyLen(queryStats->getPropInt("averageSlavesReplyLen"));
- if (queryStats->hasProp("averageTimeMs"))
- querySummaryStats->setTimeAvgTotalExecuteMinutes(queryStats->getPropInt64("averageTimeMs"));
- if (queryStats->hasProp("minTimeMs"))
- querySummaryStats->setTimeMinTotalExecuteMinutes(queryStats->getPropInt64("minTimeMs"));
- if (queryStats->hasProp("maxTimeMs"))
- querySummaryStats->setTimeMaxTotalExecuteMinutes(queryStats->getPropInt64("maxTimeMs"));
- if (queryStats->hasProp("percentile97"))
- {
- querySummaryStats->setPercentile97(queryStats->getPropInt("percentile97"));
- if (queryStats->hasProp("percentile97/@estimate"))
- querySummaryStats->setPercentile97Estimate(queryStats->getPropBool("percentile97/@estimate"));
- }
- const char *startTime = queryStats->queryProp("startTime");
- const char *endTime = queryStats->queryProp("endTime");
- if (!isEmptyString(startTime))
- querySummaryStats->setStartTime(startTime);
- if (!isEmptyString(endTime))
- querySummaryStats->setEndTime(endTime);
- if (!isEmptyString(status))
- querySummaryStats->setStatus(status);
- querySummaryStatsList.append(*querySummaryStats.getLink());
- }
- void CWsWorkunitsEx::readQueryStatsList(IPropertyTree *queryStatsTree, const char *status, const char *ep,
- bool includeRawStats, IArrayOf<IEspEndpointQueryStats> &endpointQueryStatsList)
- {
- if (!queryStatsTree)
- return;
- IArrayOf<IEspQueryStats> queryStatsList;
- Owned<IPropertyTreeIterator> queryItr = queryStatsTree->getElements("QueryStats/Query");
- ForEach(*queryItr)
- {
- IPropertyTree &query = queryItr->query();
- readQueryStats(&query, query.queryProp("@id"), includeRawStats, queryStatsList);
- }
- IPropertyTree *globalStats = queryStatsTree->queryPropTree("QueryStats/Global");
- if (globalStats)
- readQueryStats(globalStats, "Global", includeRawStats, queryStatsList);
- if (queryStatsList.ordinality() == 0)
- return;
- Owned<IEspEndpointQueryStats> endpointQueryStats = createEndpointQueryStats();
- endpointQueryStats->setEndpoint(ep);
- if (!isEmptyString(status))
- endpointQueryStats->setStatus(status);
- endpointQueryStats->setQueryStatsList(queryStatsList);
- endpointQueryStatsList.append(*endpointQueryStats.getLink());
- }
- void CWsWorkunitsEx::readQueryStats(IPropertyTree *queryStatsTree, const char *id, bool includeRawStats,
- IArrayOf<IEspQueryStats> &queryStatsList)
- {
- if (!queryStatsTree)
- return;
- Owned<IEspQueryStats> queryStats = createQueryStats();
- if (!isEmptyString(id))
- queryStats->setID(id);
- if (!includeRawStats)
- {
- IArrayOf<IEspQuerySummaryStats> aggregateQueryStatsList;
- readQueryAggregateStats(queryStatsTree, nullptr, nullptr, aggregateQueryStatsList);
- queryStats->setAggregateQueryStatsList(aggregateQueryStatsList);
- queryStatsList.append(*queryStats.getLink());
- return;
- }
- IArrayOf<IEspQuerySummaryStats> aggregateQueryStatsList;
- Owned<IPropertyTreeIterator> aggregateRecordItr = queryStatsTree->getElements("QueryStatsAggregateRecord");
- ForEach(*aggregateRecordItr)
- {
- IPropertyTree &query = aggregateRecordItr->query();
- readQueryAggregateStats(&query, nullptr, nullptr, aggregateQueryStatsList);
- }
- queryStats->setAggregateQueryStatsList(aggregateQueryStatsList);
- IArrayOf<IEspQueryStatsRecord> recordList;
- Owned<IPropertyTreeIterator> recordItr = queryStatsTree->getElements("QueryStatsRecord");
- ForEach(*recordItr)
- {
- IPropertyTree &query = recordItr->query();
- readQueryStatsRecord(&query, recordList);
- }
- queryStats->setQueryStatsRecordList(recordList);
- queryStatsList.append(*queryStats.getLink());
- }
- void CWsWorkunitsEx::readQueryStatsRecord(IPropertyTree *queryRecord, IArrayOf<IEspQueryStatsRecord> &recordList)
- {
- if (!queryRecord)
- return;
- Owned<IEspQueryStatsRecord> record = createQueryStatsRecord();
- const char *startTime = queryRecord->queryProp("@startTime");
- if (!isEmptyString(startTime))
- record->setStartTime(startTime);
- if (queryRecord->hasProp("elapsedTimeMs"))
- record->setElapsedTimeMs(queryRecord->getPropInt64("elapsedTimeMs"));
- if (queryRecord->hasProp("memUsed"))
- record->setMemoryUsed(queryRecord->getPropInt64("memUsed"));
- if (queryRecord->hasProp("bytesOut"))
- record->setBytesOut(queryRecord->getPropInt64("bytesOut"));
- if (queryRecord->hasProp("slavesReplyLen"))
- record->setSlavesReplyLen(queryRecord->getPropInt("slavesReplyLen"));
- recordList.append(*record.getLink());
- }
|