ws_workunitsQuerySets.cpp 135 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323132413251326132713281329133013311332133313341335133613371338133913401341134213431344134513461347134813491350135113521353135413551356135713581359136013611362136313641365136613671368136913701371137213731374137513761377137813791380138113821383138413851386138713881389139013911392139313941395139613971398139914001401140214031404140514061407140814091410141114121413141414151416141714181419142014211422142314241425142614271428142914301431143214331434143514361437143814391440144114421443144414451446144714481449145014511452145314541455145614571458145914601461146214631464146514661467146814691470147114721473147414751476147714781479148014811482148314841485148614871488148914901491149214931494149514961497149814991500150115021503150415051506150715081509151015111512151315141515151615171518151915201521152215231524152515261527152815291530153115321533153415351536153715381539154015411542154315441545154615471548154915501551155215531554155515561557155815591560156115621563156415651566156715681569157015711572157315741575157615771578157915801581158215831584158515861587158815891590159115921593159415951596159715981599160016011602160316041605160616071608160916101611161216131614161516161617161816191620162116221623162416251626162716281629163016311632163316341635163616371638163916401641164216431644164516461647164816491650165116521653165416551656165716581659166016611662166316641665166616671668166916701671167216731674167516761677167816791680168116821683168416851686168716881689169016911692169316941695169616971698169917001701170217031704170517061707170817091710171117121713171417151716171717181719172017211722172317241725172617271728172917301731173217331734173517361737173817391740174117421743174417451746174717481749175017511752175317541755175617571758175917601761176217631764176517661767176817691770177117721773177417751776177717781779178017811782178317841785178617871788178917901791179217931794179517961797179817991800180118021803180418051806180718081809181018111812181318141815181618171818181918201821182218231824182518261827182818291830183118321833183418351836183718381839184018411842184318441845184618471848184918501851185218531854185518561857185818591860186118621863186418651866186718681869187018711872187318741875187618771878187918801881188218831884188518861887188818891890189118921893189418951896189718981899190019011902190319041905190619071908190919101911191219131914191519161917191819191920192119221923192419251926192719281929193019311932193319341935193619371938193919401941194219431944194519461947194819491950195119521953195419551956195719581959196019611962196319641965196619671968196919701971197219731974197519761977197819791980198119821983198419851986198719881989199019911992199319941995199619971998199920002001200220032004200520062007200820092010201120122013201420152016201720182019202020212022202320242025202620272028202920302031203220332034203520362037203820392040204120422043204420452046204720482049205020512052205320542055205620572058205920602061206220632064206520662067206820692070207120722073207420752076207720782079208020812082208320842085208620872088208920902091209220932094209520962097209820992100210121022103210421052106210721082109211021112112211321142115211621172118211921202121212221232124212521262127212821292130213121322133213421352136213721382139214021412142214321442145214621472148214921502151215221532154215521562157215821592160216121622163216421652166216721682169217021712172217321742175217621772178217921802181218221832184218521862187218821892190219121922193219421952196219721982199220022012202220322042205220622072208220922102211221222132214221522162217221822192220222122222223222422252226222722282229223022312232223322342235223622372238223922402241224222432244224522462247224822492250225122522253225422552256225722582259226022612262226322642265226622672268226922702271227222732274227522762277227822792280228122822283228422852286228722882289229022912292229322942295229622972298229923002301230223032304230523062307230823092310231123122313231423152316231723182319232023212322232323242325232623272328232923302331233223332334233523362337233823392340234123422343234423452346234723482349235023512352235323542355235623572358235923602361236223632364236523662367236823692370237123722373237423752376237723782379238023812382238323842385238623872388238923902391239223932394239523962397239823992400240124022403240424052406240724082409241024112412241324142415241624172418241924202421242224232424242524262427242824292430243124322433243424352436243724382439244024412442244324442445244624472448244924502451245224532454245524562457245824592460246124622463246424652466246724682469247024712472247324742475247624772478247924802481248224832484248524862487248824892490249124922493249424952496249724982499250025012502250325042505250625072508250925102511251225132514251525162517251825192520252125222523252425252526252725282529253025312532253325342535253625372538253925402541254225432544254525462547254825492550255125522553255425552556255725582559256025612562256325642565256625672568256925702571257225732574257525762577257825792580258125822583258425852586258725882589259025912592259325942595259625972598259926002601260226032604260526062607260826092610261126122613261426152616261726182619262026212622262326242625262626272628262926302631263226332634263526362637263826392640264126422643264426452646264726482649265026512652265326542655265626572658265926602661266226632664266526662667266826692670267126722673267426752676267726782679268026812682268326842685268626872688268926902691269226932694269526962697269826992700270127022703270427052706270727082709271027112712271327142715271627172718271927202721272227232724272527262727272827292730273127322733273427352736273727382739274027412742274327442745274627472748274927502751275227532754275527562757275827592760276127622763276427652766276727682769277027712772277327742775277627772778277927802781278227832784278527862787278827892790279127922793279427952796279727982799280028012802280328042805280628072808280928102811281228132814281528162817281828192820282128222823282428252826282728282829283028312832283328342835283628372838283928402841284228432844284528462847284828492850285128522853285428552856285728582859286028612862286328642865286628672868286928702871287228732874287528762877287828792880288128822883288428852886288728882889289028912892289328942895289628972898289929002901290229032904290529062907290829092910291129122913291429152916291729182919292029212922292329242925292629272928292929302931293229332934293529362937293829392940294129422943294429452946294729482949295029512952295329542955295629572958295929602961296229632964296529662967296829692970297129722973297429752976297729782979298029812982298329842985298629872988298929902991299229932994299529962997299829993000300130023003300430053006300730083009301030113012301330143015301630173018301930203021302230233024302530263027302830293030303130323033303430353036303730383039304030413042304330443045304630473048304930503051305230533054305530563057305830593060306130623063306430653066306730683069307030713072307330743075307630773078307930803081308230833084308530863087308830893090309130923093309430953096309730983099310031013102310331043105310631073108310931103111311231133114311531163117311831193120312131223123312431253126312731283129313031313132313331343135313631373138313931403141314231433144314531463147314831493150315131523153315431553156315731583159316031613162316331643165316631673168316931703171317231733174317531763177317831793180318131823183318431853186318731883189319031913192319331943195319631973198319932003201320232033204320532063207320832093210321132123213321432153216321732183219322032213222322332243225322632273228322932303231323232333234323532363237323832393240324132423243324432453246324732483249325032513252325332543255325632573258325932603261326232633264326532663267326832693270327132723273327432753276327732783279328032813282328332843285328632873288328932903291329232933294329532963297329832993300330133023303330433053306330733083309331033113312331333143315331633173318331933203321332233233324332533263327332833293330333133323333333433353336333733383339334033413342334333443345334633473348334933503351335233533354335533563357335833593360336133623363336433653366336733683369337033713372337333743375337633773378337933803381338233833384338533863387338833893390339133923393339433953396339733983399
  1. /*##############################################################################
  2. HPCC SYSTEMS software Copyright (C) 2012 HPCC Systems®.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. ############################################################################## */
  13. #include "ws_workunitsService.hpp"
  14. #include "ws_fs.hpp"
  15. #include "jlib.hpp"
  16. #include "jflz.hpp"
  17. #include "daclient.hpp"
  18. #include "dalienv.hpp"
  19. #include "dadfs.hpp"
  20. #include "dfuwu.hpp"
  21. #include "eclhelper.hpp"
  22. #include "roxiecontrol.hpp"
  23. #include "dfuutil.hpp"
  24. #include "dautils.hpp"
  25. #include "httpclient.hpp"
  26. #include "portlist.h" //ROXIE_SERVER_PORT
  27. #define DALI_FILE_LOOKUP_TIMEOUT (1000*15*1) // 15 seconds
  28. const unsigned ROXIECONNECTIONTIMEOUT = 1000; //1 second
  29. const unsigned ROXIECONTROLQUERYTIMEOUT = 3000; //3 second
  30. const unsigned ROXIECONTROLQUERIESTIMEOUT = 30000; //30 second
  31. const unsigned ROXIELOCKCONNECTIONTIMEOUT = 60000; //60 second
  32. #define SDS_LOCK_TIMEOUT (5*60*1000) // 5mins, 30s a bit short
  33. //The CQuerySetQueryActionTypes[] has to match with the ESPenum QuerySetQueryActionTypes in the ecm file.
  34. static unsigned NumOfQuerySetQueryActionTypes = 7;
  35. static const char *QuerySetQueryActionTypes[] = { "Suspend", "Unsuspend", "ToggleSuspend", "Activate",
  36. "Delete", "RemoveAllAliases", "ResetQueryStats", NULL };
  37. //The CQuerySetAliasActionTypes[] has to match with the ESPenum QuerySetAliasActionTypes in the ecm file.
  38. static unsigned NumOfQuerySetAliasActionTypes = 1;
  39. static const char *QuerySetAliasActionTypes[] = { "Deactivate", NULL };
  40. bool isRoxieProcess(const char *process)
  41. {
  42. if (!process)
  43. return false;
  44. Owned<IEnvironmentFactory> factory = getEnvironmentFactory(true);
  45. Owned<IConstEnvironment> env = factory->openEnvironment();
  46. Owned<IPropertyTree> root = &env->getPTree();
  47. VStringBuffer xpath("Software/RoxieCluster[@name=\"%s\"]", process);
  48. return root->hasProp(xpath.str());
  49. }
  50. void checkUseEspOrDaliIP(SocketEndpoint &ep, const char *ip, const char *esp)
  51. {
  52. if (!ip || !*ip)
  53. return;
  54. ep.set(ip, 7070);
  55. if (ep.isLoopBack() || *ip=='.' || (ip[0]=='0' && ip[1]=='.'))
  56. ep.ipset(esp);
  57. }
  58. void ensureInputString(const char* input, bool lowerCase, StringBuffer& inputStr, int code, const char* msg)
  59. {
  60. inputStr.set(input).trim();
  61. if (inputStr.isEmpty())
  62. throw MakeStringException(code, "%s", msg);
  63. if (lowerCase)
  64. inputStr.toLowerCase();
  65. }
  66. static IClientWsWorkunits *ensureWsWorkunitsClient(IClientWsWorkunits *ws, IEspContext *ctx, const char *netAddress)
  67. {
  68. if (ws)
  69. return LINK(ws);
  70. StringBuffer url;
  71. if (netAddress && *netAddress)
  72. url.appendf("http://%s%s/WsWorkunits", netAddress, (!strchr(netAddress, ':')) ? ":8010" : "");
  73. else
  74. {
  75. if (!ctx)
  76. throw MakeStringException(ECLWATCH_INVALID_IP, "Missing WsWorkunits service address");
  77. StringBuffer ip;
  78. short port = 0;
  79. ctx->getServAddress(ip, port);
  80. url.appendf("http://%s:%d/WsWorkunits", ip.str(), port);
  81. }
  82. Owned<IClientWsWorkunits> cws = createWsWorkunitsClient();
  83. cws->addServiceUrl(url);
  84. if (ctx && ctx->queryUserId() && *ctx->queryUserId())
  85. cws->setUsernameToken(ctx->queryUserId(), ctx->queryPassword(), NULL);
  86. return cws.getClear();
  87. }
  88. IClientWUQuerySetDetailsResponse *fetchQueryDetails(IClientWsWorkunits *_ws, IEspContext *ctx, const char *netAddress, const char *target, const char *queryid)
  89. {
  90. Owned<IClientWsWorkunits> ws = ensureWsWorkunitsClient(_ws, ctx, netAddress);
  91. //using existing WUQuerysetDetails rather than extending WUQueryDetails, to support copying query meta data from prior releases
  92. Owned<IClientWUQuerySetDetailsRequest> reqQueryInfo = ws->createWUQuerysetDetailsRequest();
  93. reqQueryInfo->setClusterName(target);
  94. reqQueryInfo->setQuerySetName(target);
  95. reqQueryInfo->setFilter(queryid);
  96. reqQueryInfo->setFilterType("Id");
  97. return ws->WUQuerysetDetails(reqQueryInfo);
  98. }
  99. void fetchRemoteWorkunit(IClientWsWorkunits *_ws, IEspContext *ctx, const char *netAddress, const char *queryset, const char *query, const char *wuid, StringBuffer &name, StringBuffer &xml, StringBuffer &dllname, MemoryBuffer &dll, StringBuffer &daliServer)
  100. {
  101. Owned<IClientWsWorkunits> ws = ensureWsWorkunitsClient(_ws, ctx, netAddress);
  102. Owned<IClientWULogFileRequest> req = ws->createWUFileRequest();
  103. if (queryset && *queryset)
  104. req->setQuerySet(queryset);
  105. if (query && *query)
  106. req->setQuery(query);
  107. if (wuid && *wuid)
  108. req->setWuid(wuid);
  109. req->setType("xml");
  110. Owned<IClientWULogFileResponse> resp = ws->WUFile(req);
  111. if (!resp || resp->getExceptions().ordinality() || !resp->getThefile().length())
  112. throw MakeStringException(ECLWATCH_CANNOT_GET_WORKUNIT, "Cannot retrieve remote workunit");
  113. xml.append(resp->getThefile().length(), resp->getThefile().toByteArray());
  114. req->setType("dll");
  115. resp.setown(ws->WUFile(req));
  116. if (!resp || resp->getExceptions().ordinality() || !resp->getThefile().length())
  117. throw MakeStringException(ECLWATCH_CANNOT_GET_WORKUNIT, "Cannot retrieve remote workunit shared object");
  118. dll.append(resp->getThefile());
  119. dllname.append(resp->getFileName());
  120. name.append(resp->getQueryName());
  121. SocketEndpoint ep;
  122. checkUseEspOrDaliIP(ep, resp->getDaliServer(), netAddress);
  123. if (!ep.isNull())
  124. ep.getUrlStr(daliServer);
  125. }
  126. void fetchRemoteWorkunitAndQueryDetails(IClientWsWorkunits *_ws, IEspContext *ctx, const char *netAddress, const char *queryset, const char *query, const char *wuid, StringBuffer &name, StringBuffer &xml, StringBuffer &dllname, MemoryBuffer &dll, StringBuffer &daliServer, Owned<IClientWUQuerySetDetailsResponse> &respQueryInfo)
  127. {
  128. Owned<IClientWsWorkunits> ws = ensureWsWorkunitsClient(_ws, ctx, netAddress);
  129. fetchRemoteWorkunit(ws, ctx, netAddress, queryset, query, wuid, name, xml, dllname, dll, daliServer);
  130. respQueryInfo.setown(fetchQueryDetails(ws, ctx, netAddress, queryset, query));
  131. }
  132. void doWuFileCopy(IClientFileSpray &fs, IEspWULogicalFileCopyInfo &info, const char *logicalname, const char *cluster, bool isRoxie, bool supercopy)
  133. {
  134. try
  135. {
  136. Owned<IClientCopy> req = fs.createCopyRequest();
  137. req->setSourceLogicalName(logicalname);
  138. req->setDestLogicalName(logicalname);
  139. req->setDestGroup(cluster);
  140. req->setSuperCopy(supercopy);
  141. if (isRoxie)
  142. req->setDestGroupRoxie("Yes");
  143. Owned<IClientCopyResponse> resp = fs.Copy(req);
  144. info.setDfuCopyWuid(resp->getResult());
  145. }
  146. catch (IException *e)
  147. {
  148. StringBuffer msg;
  149. info.setDfuCopyError(e->errorMessage(msg).str());
  150. e->Release();
  151. }
  152. }
  153. bool copyWULogicalFiles(IEspContext &context, IConstWorkUnit &cw, const char *cluster, bool copyLocal, IEspWUCopyLogicalClusterFileSections &lfinfo)
  154. {
  155. if (isEmpty(cluster))
  156. throw MakeStringException(ECLWATCH_INVALID_CLUSTER_NAME, "copyWULogicalFiles Cluster parameter not set.");
  157. Owned<IUserDescriptor> udesc = createUserDescriptor();
  158. udesc->set(context.queryUserId(), context.queryPassword(), context.querySessionToken(), context.querySignature());
  159. IArrayOf<IEspWULogicalFileCopyInfo> foreign;
  160. IArrayOf<IEspWULogicalFileCopyInfo> onCluster;
  161. IArrayOf<IEspWULogicalFileCopyInfo> notOnCluster;
  162. IArrayOf<IEspWULogicalFileCopyInfo> notFound;
  163. Owned<IClientFileSpray> fs;
  164. if (copyLocal)
  165. {
  166. fs.setown(createFileSprayClient());
  167. VStringBuffer url("http://.:%d/FileSpray", 8010);
  168. fs->addServiceUrl(url.str());
  169. }
  170. bool isRoxie = isRoxieProcess(cluster);
  171. Owned<IConstWUGraphIterator> graphs = &cw.getGraphs(GraphTypeActivities);
  172. ForEach(*graphs)
  173. {
  174. Owned <IPropertyTree> xgmml = graphs->query().getXGMMLTree(false);
  175. Owned<IPropertyTreeIterator> iter = xgmml->getElements(".//node");
  176. ForEach(*iter)
  177. {
  178. try
  179. {
  180. IPropertyTree &node = iter->query();
  181. ThorActivityKind kind = (ThorActivityKind) node.getPropInt("att[@name='_kind']/@value", TAKnone);
  182. if(kind==TAKdiskwrite || kind==TAKspillwrite || kind==TAKindexwrite || kind==TAKcsvwrite || kind==TAKxmlwrite || kind==TAKjsonwrite)
  183. continue;
  184. if (node.getPropBool("att[@name='_isSpill']/@value") || node.getPropBool("att[@name='_isTransformSpill']/@value"))
  185. continue;
  186. Owned<IEspWULogicalFileCopyInfo> info = createWULogicalFileCopyInfo();
  187. const char *logicalname = node.queryProp("att[@name='_indexFileName']/@value");
  188. if (logicalname)
  189. info->setIsIndex(true);
  190. else
  191. logicalname = node.queryProp("att[@name='_fileName']/@value");
  192. info->setLogicalName(logicalname);
  193. if (logicalname)
  194. {
  195. if (!strnicmp("~foreign::", logicalname, 10))
  196. foreign.append(*info.getClear());
  197. else
  198. {
  199. Owned<IDistributedFile> df = queryDistributedFileDirectory().lookup(logicalname, udesc);
  200. if(!df)
  201. notFound.append(*info.getClear());
  202. else if (df->findCluster(cluster)!=NotFound)
  203. {
  204. onCluster.append(*info.getClear());
  205. }
  206. else
  207. {
  208. StringArray clusters;
  209. df->getClusterNames(clusters);
  210. info->setClusters(clusters);
  211. if (copyLocal)
  212. {
  213. StringBuffer wuid;
  214. bool supercopy = queryDistributedFileDirectory().isSuperFile(logicalname, udesc, NULL);
  215. doWuFileCopy(*fs, *info, logicalname, cluster, isRoxie, supercopy);
  216. }
  217. notOnCluster.append(*info.getClear());
  218. }
  219. }
  220. }
  221. }
  222. catch(IException *e)
  223. {
  224. e->Release();
  225. }
  226. }
  227. lfinfo.setClusterName(cluster);
  228. lfinfo.setNotOnCluster(notOnCluster);
  229. lfinfo.setOnCluster(onCluster);
  230. lfinfo.setForeign(foreign);
  231. lfinfo.setNotFound(notFound);
  232. }
  233. return true;
  234. }
  235. void copyWULogicalFilesToTarget(IEspContext &context, IConstWUClusterInfo &clusterInfo, IConstWorkUnit &cw, IArrayOf<IConstWUCopyLogicalClusterFileSections> &clusterfiles, bool doLocalCopy)
  236. {
  237. const StringArray &thors = clusterInfo.getThorProcesses();
  238. ForEachItemIn(i, thors)
  239. {
  240. Owned<IEspWUCopyLogicalClusterFileSections> files = createWUCopyLogicalClusterFileSections();
  241. copyWULogicalFiles(context, cw, thors.item(i), doLocalCopy, *files);
  242. clusterfiles.append(*files.getClear());
  243. }
  244. SCMStringBuffer roxie;
  245. clusterInfo.getRoxieProcess(roxie);
  246. if (roxie.length())
  247. {
  248. Owned<IEspWUCopyLogicalClusterFileSections> files = createWUCopyLogicalClusterFileSections();
  249. copyWULogicalFiles(context, cw, roxie.str(), doLocalCopy, *files);
  250. clusterfiles.append(*files.getClear());
  251. }
  252. }
  253. void QueryFilesInUse::loadTarget(IPropertyTree *t, const char *target, unsigned flags)
  254. {
  255. if (!target || !*target)
  256. return;
  257. Owned<IConstWUClusterInfo> clusterInfo = getTargetClusterInfo(target);
  258. if (!clusterInfo || !(clusterInfo->getPlatform() == RoxieCluster))
  259. return;
  260. Owned<IPropertyTree> queryRegistry = getQueryRegistry(target, true);
  261. if (!queryRegistry)
  262. return;
  263. SCMStringBuffer process;
  264. clusterInfo->getRoxieProcess(process);
  265. if (!process.length())
  266. return;
  267. Owned<IHpccPackageSet> ps = createPackageSet(process.str());
  268. const IHpccPackageMap *pm = (ps) ? ps->queryActiveMap(target) : NULL;
  269. const char *pmid = (pm) ? pm->queryPackageId() : NULL;
  270. VStringBuffer xpath("%s/@pmid", target);
  271. const char *pmidPrev = t->queryProp(xpath);
  272. if ((flags & UFO_RELOAD_TARGETS_CHANGED_PMID) && (pmid || pmidPrev))
  273. {
  274. if (!(pmid && pmidPrev) || !streq(pmid, pmidPrev))
  275. t->removeProp(target);
  276. }
  277. IPropertyTree *targetTree = ensurePTree(t, target);
  278. if (pm)
  279. targetTree->setProp("@pmid", pmid);
  280. if (flags & UFO_REMOVE_QUERIES_NOT_IN_QUERYSET)
  281. {
  282. Owned<IPropertyTreeIterator> cachedQueries = targetTree->getElements("Query");
  283. ForEach(*cachedQueries)
  284. {
  285. IPropertyTree &cachedQuery = cachedQueries->query();
  286. VStringBuffer xpath("Query[@id='%s']", cachedQuery.queryProp("@id"));
  287. if (!queryRegistry->hasProp(xpath))
  288. targetTree->removeTree(&cachedQuery);
  289. }
  290. }
  291. Owned<IPropertyTreeIterator> queries = queryRegistry->getElements("Query");
  292. ForEach(*queries)
  293. {
  294. if (aborting)
  295. return;
  296. IPropertyTree &query = queries->query();
  297. const char *queryid = query.queryProp("@id");
  298. if (!queryid || !*queryid)
  299. continue;
  300. const char *wuid = query.queryProp("@wuid");
  301. if (!wuid || !*wuid)
  302. continue;
  303. const char *pkgid=NULL;
  304. if (pm)
  305. {
  306. const IHpccPackage *pkg = pm->matchPackage(queryid);
  307. if (pkg)
  308. pkgid = pkg->queryId();
  309. }
  310. VStringBuffer xpath("Query[@id='%s']", queryid);
  311. IPropertyTree *queryTree = targetTree->queryPropTree(xpath);
  312. if (queryTree)
  313. {
  314. const char *cachedPkgid = queryTree->queryProp("@pkgid");
  315. if (pkgid && *pkgid)
  316. {
  317. if (!(flags & UFO_RELOAD_MAPPED_QUERIES) && (cachedPkgid && streq(pkgid, cachedPkgid)))
  318. continue;
  319. }
  320. else if (!cachedPkgid || !*cachedPkgid)
  321. continue;
  322. targetTree->removeTree(queryTree);
  323. queryTree = NULL;
  324. }
  325. Owned<IWorkUnitFactory> factory = getWorkUnitFactory();
  326. Owned<IConstWorkUnit> cw = factory->openWorkUnit(wuid);
  327. if (!cw)
  328. continue;
  329. queryTree = targetTree->addPropTree("Query", createPTree("Query"));
  330. queryTree->setProp("@target", target); //for reference when searching across targets
  331. queryTree->setProp("@id", queryid);
  332. if (pkgid && *pkgid)
  333. queryTree->setProp("@pkgid", pkgid);
  334. IUserDescriptor **roxieUser = roxieUserMap.getValue(target);
  335. Owned<IReferencedFileList> wufiles = createReferencedFileList(roxieUser ? *roxieUser : NULL, true, true);
  336. wufiles->addFilesFromQuery(cw, pm, queryid);
  337. if (aborting)
  338. return;
  339. wufiles->resolveFiles(process.str(), NULL, NULL, NULL, true, true, false, false);
  340. Owned<IReferencedFileIterator> files = wufiles->getFiles();
  341. ForEach(*files)
  342. {
  343. if (aborting)
  344. return;
  345. IReferencedFile &rf = files->query();
  346. //if (!(rf.getFlags() & RefSubFile))
  347. // continue;
  348. const char *lfn = rf.getLogicalName();
  349. if (!lfn || !*lfn)
  350. continue;
  351. if (!queryTree->hasProp(xpath.setf("File[@lfn='%s']", lfn)))
  352. {
  353. IPropertyTree *fileTree = queryTree->addPropTree("File", createPTree("File"));
  354. fileTree->setProp("@lfn", lfn);
  355. if (rf.getFlags() & RefFileSuper)
  356. fileTree->setPropBool("@super", true);
  357. if (rf.getFlags() & RefFileNotFound)
  358. fileTree->setPropBool("@notFound", true);
  359. const char *fpkgid = rf.queryPackageId();
  360. if (fpkgid && *fpkgid)
  361. fileTree->setProp("@pkgid", fpkgid);
  362. if (rf.getFileSize())
  363. fileTree->setPropInt64("@size", rf.getFileSize());
  364. if (rf.getNumParts())
  365. fileTree->setPropInt("@numparts", rf.getNumParts());
  366. }
  367. }
  368. }
  369. }
  370. void QueryFilesInUse::loadTargets(IPropertyTree *t, unsigned flags)
  371. {
  372. Owned<IStringIterator> targets = getTargetClusters("RoxieCluster", NULL);
  373. SCMStringBuffer s;
  374. ForEach(*targets)
  375. {
  376. if (aborting)
  377. return;
  378. loadTarget(t, targets->str(s).str(), flags);
  379. }
  380. }
  381. IPropertyTreeIterator *QueryFilesInUse::findAllQueriesUsingFile(const char *lfn)
  382. {
  383. if (!lfn || !*lfn)
  384. return NULL;
  385. Owned<IPropertyTree> t = getTree();
  386. VStringBuffer xpath("*/Query[File/@lfn='%s']", lfn);
  387. return t->getElements(xpath);
  388. }
  389. IPropertyTreeIterator *QueryFilesInUse::findQueriesUsingFile(const char *target, const char *lfn, StringAttr &pmid)
  390. {
  391. if (!lfn || !*lfn)
  392. return NULL;
  393. if (!target || !*target)
  394. return findAllQueriesUsingFile(lfn);
  395. Owned<IPropertyTree> t = getTree();
  396. IPropertyTree *targetTree = t->queryPropTree(target);
  397. if (!targetTree)
  398. return NULL;
  399. pmid.set(targetTree->queryProp("@pmid"));
  400. VStringBuffer xpath("Query[File/@lfn='%s']", lfn);
  401. return targetTree->getElements(xpath);
  402. }
  403. bool CWsWorkunitsEx::onWUCopyLogicalFiles(IEspContext &context, IEspWUCopyLogicalFilesRequest &req, IEspWUCopyLogicalFilesResponse &resp)
  404. {
  405. StringBuffer wuid = req.getWuid();
  406. WsWuHelpers::checkAndTrimWorkunit("WUCopyLogicalFiles", wuid);
  407. Owned<IWorkUnitFactory> factory = getWorkUnitFactory(context.querySecManager(), context.queryUser());
  408. Owned<IConstWorkUnit> cw = factory->openWorkUnit(wuid.str());
  409. if (!cw)
  410. throw MakeStringException(ECLWATCH_CANNOT_OPEN_WORKUNIT,"Cannot open workunit %s", wuid.str());
  411. resp.setWuid(wuid.str());
  412. StringAttr cluster;
  413. if (notEmpty(req.getCluster()))
  414. cluster.set(req.getCluster());
  415. else
  416. cluster.set(cw->queryClusterName());
  417. if (!isValidCluster(cluster))
  418. throw MakeStringException(ECLWATCH_INVALID_CLUSTER_NAME, "Invalid cluster name: %s", cluster.str());
  419. Owned <IConstWUClusterInfo> clusterInfo = getTargetClusterInfo(cluster.str());
  420. IArrayOf<IConstWUCopyLogicalClusterFileSections> clusterfiles;
  421. PROGLOG("WUCopyLogicalFiles: %s", wuid.str());
  422. copyWULogicalFilesToTarget(context, *clusterInfo, *cw, clusterfiles, req.getCopyLocal());
  423. resp.setClusterFiles(clusterfiles);
  424. return true;
  425. }
  426. static inline unsigned remainingMsWait(unsigned wait, unsigned start)
  427. {
  428. if (wait==0 || wait==(unsigned)-1)
  429. return wait;
  430. unsigned waited = msTick()-start;
  431. return (wait>waited) ? wait-waited : 0;
  432. }
  433. bool reloadCluster(IConstWUClusterInfo *clusterInfo, unsigned wait)
  434. {
  435. if (0==wait || !clusterInfo || clusterInfo->getPlatform()!=RoxieCluster)
  436. return true;
  437. const SocketEndpointArray &addrs = clusterInfo->getRoxieServers();
  438. if (addrs.length())
  439. {
  440. try
  441. {
  442. Owned<IPropertyTree> result = sendRoxieControlAllNodes(addrs.item(0), "<control:reload/>", false, wait);
  443. const char *status = result->queryProp("Endpoint[1]/Status");
  444. if (!status || !strieq(status, "ok"))
  445. return false;
  446. }
  447. catch(IMultiException *me)
  448. {
  449. StringBuffer err;
  450. DBGLOG("ERROR control:reloading roxie query info %s", me->errorMessage(err.append(me->errorCode()).append(' ')).str());
  451. me->Release();
  452. return false;
  453. }
  454. catch(IException *e)
  455. {
  456. StringBuffer err;
  457. DBGLOG("ERROR control:reloading roxie query info %s", e->errorMessage(err.append(e->errorCode()).append(' ')).str());
  458. e->Release();
  459. return false;
  460. }
  461. }
  462. return true;
  463. }
  464. bool reloadCluster(const char *cluster, unsigned wait)
  465. {
  466. Owned <IConstWUClusterInfo> clusterInfo = getTargetClusterInfo(cluster);
  467. return (clusterInfo) ? reloadCluster(clusterInfo, wait) : true;
  468. }
  469. static inline void updateQuerySetting(bool ignore, IPropertyTree *queryTree, const char *xpath, int value)
  470. {
  471. if (ignore || !queryTree)
  472. return;
  473. if (value!=0)
  474. queryTree->setPropInt(xpath, value);
  475. else
  476. queryTree->removeProp(xpath);
  477. }
  478. static inline void updateTimeLimitSetting(IPropertyTree *queryTree, bool valueNotSet, int value, IConstQuerySetQuery *srcInfo=NULL)
  479. {
  480. if (valueNotSet && srcInfo && !srcInfo->getTimeLimit_isNull())
  481. {
  482. value = srcInfo->getTimeLimit();
  483. valueNotSet=false;
  484. }
  485. updateQuerySetting(valueNotSet, queryTree, "@timeLimit", value);
  486. }
  487. static inline void updateWarnTimeLimitSetting(IPropertyTree *queryTree, bool valueNotSet, int value, IConstQuerySetQuery *srcInfo=NULL)
  488. {
  489. if (valueNotSet && srcInfo && !srcInfo->getWarnTimeLimit_isNull())
  490. {
  491. value = srcInfo->getWarnTimeLimit();
  492. valueNotSet=false;
  493. }
  494. updateQuerySetting(valueNotSet, queryTree, "@warnTimeLimit", value);
  495. }
  496. static inline unsigned __int64 memoryLimitUInt64FromString(const char *value)
  497. {
  498. if (!value || !*value || !isdigit(*value))
  499. return 0;
  500. unsigned __int64 result = (*value - '0');
  501. const char *s = value+1;
  502. while (isdigit(*s))
  503. {
  504. result = 10 * result + ((*s) - '0');
  505. s++;
  506. }
  507. if (*s)
  508. {
  509. const char unit = toupper(*s++);
  510. if (*s && !strieq("B", s)) //more?
  511. return 0;
  512. switch (unit)
  513. {
  514. case 'E':
  515. result <<=60;
  516. break;
  517. case 'P':
  518. result <<=50;
  519. break;
  520. case 'T':
  521. result <<=40;
  522. break;
  523. case 'G':
  524. result <<=30;
  525. break;
  526. case 'M':
  527. result <<=20;
  528. break;
  529. case 'K':
  530. result <<=10;
  531. break;
  532. case 'B':
  533. break;
  534. default:
  535. return 0;
  536. }
  537. }
  538. return result;
  539. }
  540. const char memUnitAbbrev[] = {'B', 'K', 'M', 'G', 'T', 'P', 'E'};
  541. #define MAX_MEMUNIT_ABBREV 6
  542. static inline StringBuffer &memoryLimitStringFromUInt64(StringBuffer &s, unsigned __int64 in)
  543. {
  544. if (!in)
  545. return s;
  546. unsigned __int64 value = in;
  547. unsigned char unit = 0;
  548. while (!(value & 0x3FF) && unit < MAX_MEMUNIT_ABBREV)
  549. {
  550. value >>= 10;
  551. unit++;
  552. }
  553. return s.append(value).append(memUnitAbbrev[unit]);
  554. }
  555. static inline void updateMemoryLimitSetting(IPropertyTree *queryTree, const char *value, IConstQuerySetQuery *srcInfo=NULL)
  556. {
  557. if (!queryTree)
  558. return;
  559. if (!value && srcInfo)
  560. value = srcInfo->getMemoryLimit();
  561. if (!value)
  562. return;
  563. unsigned __int64 limit = memoryLimitUInt64FromString(value);
  564. if (0==limit)
  565. queryTree->removeProp("@memoryLimit");
  566. else
  567. queryTree->setPropInt64("@memoryLimit", limit);
  568. }
  569. enum QueryPriority {
  570. QueryPriorityNone = -1,
  571. QueryPriorityLow = 0,
  572. QueryPriorityHigh = 1,
  573. QueryPrioritySLA = 2,
  574. QueryPriorityInvalid = 3
  575. };
  576. static inline const char *getQueryPriorityName(int value)
  577. {
  578. switch (value)
  579. {
  580. case QueryPriorityLow:
  581. return "LOW";
  582. case QueryPriorityHigh:
  583. return "HIGH";
  584. case QueryPrioritySLA:
  585. return "SLA";
  586. case QueryPriorityNone:
  587. return "NONE";
  588. }
  589. return "INVALID";
  590. }
  591. static inline void updateQueryPriority(IPropertyTree *queryTree, const char *value, IConstQuerySetQuery *srcInfo=NULL)
  592. {
  593. if (!queryTree)
  594. return;
  595. if ((!value || !*value) && srcInfo)
  596. value = srcInfo->getPriority();
  597. if (!value || !*value)
  598. return;
  599. int priority = QueryPriorityInvalid;
  600. if (strieq("LOW", value))
  601. priority=QueryPriorityLow;
  602. else if (strieq("HIGH", value))
  603. priority=QueryPriorityHigh;
  604. else if (strieq("SLA", value))
  605. priority=QueryPrioritySLA;
  606. else if (strieq("NONE", value))
  607. priority=QueryPriorityNone;
  608. switch (priority)
  609. {
  610. case QueryPriorityInvalid:
  611. break;
  612. case QueryPriorityNone:
  613. queryTree->removeProp("@priority");
  614. break;
  615. default:
  616. queryTree->setPropInt("@priority", priority);
  617. break;
  618. }
  619. }
  620. void gatherFileErrors(IReferencedFileList *files, IArrayOf<IConstLogicalFileError> &errors)
  621. {
  622. Owned<IReferencedFileIterator> it = files->getFiles();
  623. ForEach(*it)
  624. {
  625. IReferencedFile &file = it->query();
  626. unsigned flags = file.getFlags();
  627. if (!(flags & (RefFileNotFound | RefFileCopyInfoFailed)))
  628. continue;
  629. StringBuffer msg;
  630. if (flags & RefFileOptional)
  631. msg.append("OPT ");
  632. if (flags & RefFileNotFound)
  633. msg.append("Not Found");
  634. else
  635. msg.append("Copy Failed");
  636. Owned<IEspLogicalFileError> error = createLogicalFileError();
  637. error->setLogicalName(file.getLogicalName());
  638. error->setError(msg);
  639. errors.append(*static_cast<IConstLogicalFileError*>(error.getClear()));
  640. }
  641. }
  642. class QueryFileCopier
  643. {
  644. public:
  645. QueryFileCopier(const char *target_) : target(target_) {}
  646. void init(IEspContext &context, bool allowForeignFiles)
  647. {
  648. files.setown(createReferencedFileList(context.queryUserId(), context.queryPassword(), allowForeignFiles, false));
  649. clusterInfo.setown(getTargetClusterInfo(target));
  650. StringBufferAdaptor sba(process);
  651. if (clusterInfo && clusterInfo->getPlatform()==RoxieCluster)
  652. clusterInfo->getRoxieProcess(sba);
  653. if (!process.length())
  654. return;
  655. ps.setown(createPackageSet(process.str()));
  656. if (ps)
  657. pm = ps->queryActiveMap(target);
  658. }
  659. void copy(IConstWorkUnit *cw, unsigned updateFlags)
  660. {
  661. StringBuffer queryid;
  662. if (queryname && *queryname)
  663. queryname = queryid.append(queryname).append(".0").str(); //prepublish dummy version number to support fuzzy match like queries="myquery.*" in package
  664. files->addFilesFromQuery(cw, pm, queryname);
  665. files->resolveFiles(process.str(), remoteIP, remotePrefix, srcCluster, !(updateFlags & (DALI_UPDATEF_REPLACE_FILE | DALI_UPDATEF_CLONE_FROM | DALI_UPDATEF_SUPERFILES)), true, false, true);
  666. StringBuffer defReplicateFolder;
  667. getConfigurationDirectory(NULL, "data2", "roxie", process.str(), defReplicateFolder);
  668. Owned<IDFUhelper> helper = createIDFUhelper();
  669. files->cloneAllInfo(updateFlags, helper, true, true, clusterInfo->getRoxieRedundancy(), clusterInfo->getChannelsPerNode(), clusterInfo->getRoxieReplicateOffset(), defReplicateFolder);
  670. }
  671. void gatherFileErrors(IArrayOf<IConstLogicalFileError> &errors)
  672. {
  673. ::gatherFileErrors(files, errors);
  674. }
  675. private:
  676. Owned <IConstWUClusterInfo> clusterInfo;
  677. Owned<IHpccPackageSet> ps;
  678. const IHpccPackageMap *pm = nullptr;
  679. StringAttr target;
  680. public:
  681. Owned<IReferencedFileList> files;
  682. StringBuffer process;
  683. StringAttr remoteIP;
  684. StringAttr remotePrefix;
  685. StringAttr srcCluster;
  686. StringAttr queryname;
  687. };
  688. bool CWsWorkunitsEx::isQuerySuspended(const char* query, IConstWUClusterInfo *clusterInfo, unsigned wait, StringBuffer& errorMessage)
  689. {
  690. try
  691. {
  692. if (0==wait || !clusterInfo || clusterInfo->getPlatform()!=RoxieCluster)
  693. return false;
  694. const SocketEndpointArray &addrs = clusterInfo->getRoxieServers();
  695. if (addrs.length() < 1)
  696. return false;
  697. StringBuffer control;
  698. control.appendf("<control:queries><Query id='%s'/></control:queries>", query);
  699. Owned<IPropertyTree> result = sendRoxieControlAllNodes(addrs.item(0), control.str(), false, wait);
  700. if (!result)
  701. return false;
  702. Owned<IPropertyTreeIterator> suspendedQueries = result->getElements("Endpoint/Queries/Query[@suspended='1']");
  703. if (!suspendedQueries->first())
  704. return false;
  705. errorMessage.set(suspendedQueries->query().queryProp("@error"));
  706. return true;
  707. }
  708. catch(IMultiException *me)
  709. {
  710. StringBuffer err;
  711. DBGLOG("ERROR control:queries roxie query info %s", me->errorMessage(err.append(me->errorCode()).append(' ')).str());
  712. me->Release();
  713. return false;
  714. }
  715. catch(IException *e)
  716. {
  717. StringBuffer err;
  718. DBGLOG("ERROR control:queries roxie query info %s", e->errorMessage(err.append(e->errorCode()).append(' ')).str());
  719. e->Release();
  720. return false;
  721. }
  722. }
  723. bool CWsWorkunitsEx::onWUPublishWorkunit(IEspContext &context, IEspWUPublishWorkunitRequest & req, IEspWUPublishWorkunitResponse & resp)
  724. {
  725. StringBuffer wuid = req.getWuid();
  726. WsWuHelpers::checkAndTrimWorkunit("WUPublishWorkunit", wuid);
  727. Owned<IWorkUnitFactory> factory = getWorkUnitFactory(context.querySecManager(), context.queryUser());
  728. Owned<IConstWorkUnit> cw = factory->openWorkUnit(wuid.str());
  729. if (!cw)
  730. throw MakeStringException(ECLWATCH_CANNOT_OPEN_WORKUNIT,"Cannot find the workunit %s", wuid.str());
  731. resp.setWuid(wuid.str());
  732. StringAttr queryName;
  733. if (notEmpty(req.getJobName()))
  734. queryName.set(req.getJobName());
  735. else
  736. queryName.set(cw->queryJobName());
  737. if (!queryName.length())
  738. throw MakeStringException(ECLWATCH_MISSING_PARAMS, "Query/Job name not defined for publishing workunit %s", wuid.str());
  739. StringAttr target;
  740. if (notEmpty(req.getCluster()))
  741. target.set(req.getCluster());
  742. else
  743. target.set(cw->queryClusterName());
  744. if (!target.length())
  745. throw MakeStringException(ECLWATCH_MISSING_PARAMS, "Cluster name not defined for publishing workunit %s", wuid.str());
  746. if (!isValidCluster(target.str()))
  747. throw MakeStringException(ECLWATCH_INVALID_CLUSTER_NAME, "Invalid cluster name: %s", target.str());
  748. DBGLOG("%s publishing wuid %s to target %s as query %s", context.queryUserId(), wuid.str(), target.str(), queryName.str());
  749. StringBuffer daliIP;
  750. StringBuffer srcCluster;
  751. StringBuffer srcPrefix;
  752. splitDerivedDfsLocation(req.getRemoteDali(), srcCluster, daliIP, srcPrefix, req.getSourceProcess(),req.getSourceProcess(), NULL, NULL);
  753. if (srcCluster.length())
  754. {
  755. if (!isProcessCluster(daliIP, srcCluster))
  756. throw MakeStringException(ECLWATCH_INVALID_CLUSTER_NAME, "Process cluster %s not found on %s DALI", srcCluster.str(), daliIP.length() ? daliIP.str() : "local");
  757. }
  758. unsigned updateFlags = 0;
  759. if (req.getUpdateDfs())
  760. updateFlags |= (DALI_UPDATEF_SUPERFILES | DALI_UPDATEF_REPLACE_FILE | DALI_UPDATEF_CLONE_FROM);
  761. if (req.getUpdateCloneFrom())
  762. updateFlags |= DALI_UPDATEF_CLONE_FROM;
  763. if (req.getUpdateSuperFiles())
  764. updateFlags |= DALI_UPDATEF_SUPERFILES;
  765. if (req.getAppendCluster())
  766. updateFlags |= DALI_UPDATEF_APPEND_CLUSTER;
  767. if (!req.getDontCopyFiles())
  768. {
  769. QueryFileCopier cpr(target);
  770. cpr.init(context, req.getAllowForeignFiles());
  771. cpr.remoteIP.set(daliIP);
  772. cpr.remotePrefix.set(srcPrefix);
  773. cpr.srcCluster.set(srcCluster);
  774. cpr.queryname.set(queryName);
  775. cpr.copy(cw, updateFlags);
  776. if (req.getIncludeFileErrors())
  777. cpr.gatherFileErrors(resp.getFileErrors());
  778. }
  779. WorkunitUpdate wu(&cw->lock());
  780. if (req.getUpdateWorkUnitName() && notEmpty(req.getJobName()))
  781. wu->setJobName(req.getJobName());
  782. StringBuffer queryId;
  783. WUQueryActivationOptions activate = (WUQueryActivationOptions)req.getActivate();
  784. addQueryToQuerySet(wu, target.str(), queryName.str(), activate, queryId, context.queryUserId());
  785. if (req.getMemoryLimit() || !req.getTimeLimit_isNull() || !req.getWarnTimeLimit_isNull() || req.getPriority() || req.getComment())
  786. {
  787. Owned<IPropertyTree> queryTree = getQueryById(target.str(), queryId, false);
  788. updateMemoryLimitSetting(queryTree, req.getMemoryLimit());
  789. updateQuerySetting(req.getTimeLimit_isNull(), queryTree, "@timeLimit", req.getTimeLimit());
  790. updateQuerySetting(req.getWarnTimeLimit_isNull(), queryTree, "@warnTimeLimit", req.getWarnTimeLimit());
  791. updateQueryPriority(queryTree, req.getPriority());
  792. if (req.getComment())
  793. queryTree->setProp("@comment", req.getComment());
  794. }
  795. wu->commit();
  796. wu.clear();
  797. if (queryId.length())
  798. resp.setQueryId(queryId.str());
  799. resp.setQueryName(queryName.str());
  800. resp.setQuerySet(target.str());
  801. Owned <IConstWUClusterInfo> clusterInfo = getTargetClusterInfo(target.str());
  802. bool reloadFailed = false;
  803. if (0!=req.getWait() && !req.getNoReload())
  804. reloadFailed = !reloadCluster(clusterInfo, (unsigned)req.getWait());
  805. resp.setReloadFailed(reloadFailed);
  806. double version = context.getClientVersion();
  807. if (version > 1.38)
  808. {
  809. StringBuffer errorMessage;
  810. if (!reloadFailed && !req.getNoReload() && isQuerySuspended(queryName.str(), clusterInfo, (unsigned)req.getWait(), errorMessage))
  811. {
  812. resp.setSuspended(true);
  813. resp.setErrorMessage(errorMessage);
  814. }
  815. }
  816. return true;
  817. }
  818. bool CWsWorkunitsEx::onWUQuerysets(IEspContext &context, IEspWUQuerysetsRequest & req, IEspWUQuerysetsResponse & resp)
  819. {
  820. IArrayOf<IEspQuerySet> querySets;
  821. Owned<IStringIterator> targets = getTargetClusters(NULL, NULL);
  822. SCMStringBuffer target;
  823. ForEach(*targets)
  824. {
  825. Owned<IEspQuerySet> qs = createQuerySet();
  826. qs->setQuerySetName(targets->str(target).str());
  827. querySets.append(*qs.getClear());
  828. }
  829. resp.setQuerysets(querySets);
  830. return true;
  831. }
  832. void addClusterQueryStates(IPropertyTree* queriesOnCluster, const char *target, const char *id, IArrayOf<IEspClusterQueryState>& clusterStates, double version)
  833. {
  834. if (queriesOnCluster)
  835. queriesOnCluster = queriesOnCluster->queryPropTree("Endpoint[1]/Queries[1]");
  836. if (!queriesOnCluster)
  837. return;
  838. int reporting = queriesOnCluster->getPropInt("@reporting");
  839. Owned<IEspClusterQueryState> clusterState = createClusterQueryState();
  840. clusterState->setCluster(target);
  841. VStringBuffer xpath("Query[@id='%s']", id);
  842. IPropertyTree *query = queriesOnCluster->queryPropTree(xpath.str());
  843. if (!query)
  844. clusterState->setState("Not Found");
  845. else
  846. {
  847. int suspended = query->getPropInt("@suspended");
  848. const char* error = query->queryProp("@error");
  849. if (0==suspended)
  850. clusterState->setState("Available");
  851. else
  852. {
  853. clusterState->setState("Suspended");
  854. if (suspended<reporting)
  855. clusterState->setMixedNodeStates(true);
  856. }
  857. if (error && *error)
  858. clusterState->setErrors(error);
  859. }
  860. clusterStates.append(*clusterState.getClear());
  861. }
  862. void gatherQuerySetQueryDetails(IEspContext &context, IPropertyTree *query, IEspQuerySetQuery *queryInfo, const char *cluster, IPropertyTree *queriesOnCluster)
  863. {
  864. queryInfo->setId(query->queryProp("@id"));
  865. queryInfo->setName(query->queryProp("@name"));
  866. queryInfo->setDll(query->queryProp("@dll"));
  867. queryInfo->setWuid(query->queryProp("@wuid"));
  868. queryInfo->setSuspended(query->getPropBool("@suspended", false));
  869. if (query->hasProp("@memoryLimit"))
  870. {
  871. StringBuffer s;
  872. memoryLimitStringFromUInt64(s, query->getPropInt64("@memoryLimit"));
  873. queryInfo->setMemoryLimit(s);
  874. }
  875. if (query->hasProp("@timeLimit"))
  876. queryInfo->setTimeLimit(query->getPropInt("@timeLimit"));
  877. if (query->hasProp("@warnTimeLimit"))
  878. queryInfo->setWarnTimeLimit(query->getPropInt("@warnTimeLimit"));
  879. if (query->hasProp("@priority"))
  880. queryInfo->setPriority(getQueryPriorityName(query->getPropInt("@priority")));
  881. if (query->hasProp("@comment"))
  882. queryInfo->setComment(query->queryProp("@comment"));
  883. if (query->hasProp("@snapshot"))
  884. queryInfo->setSnapshot(query->queryProp("@snapshot"));
  885. double version = context.getClientVersion();
  886. if (version >= 1.46)
  887. {
  888. queryInfo->setPublishedBy(query->queryProp("@publishedBy"));
  889. queryInfo->setIsLibrary(query->getPropBool("@isLibrary"));
  890. }
  891. if (queriesOnCluster)
  892. {
  893. IArrayOf<IEspClusterQueryState> clusters;
  894. addClusterQueryStates(queriesOnCluster, cluster, query->queryProp("@id"), clusters, version);
  895. queryInfo->setClusters(clusters);
  896. }
  897. }
  898. void gatherQuerySetAliasDetails(IPropertyTree *alias, IEspQuerySetAlias *aliasInfo)
  899. {
  900. aliasInfo->setName(alias->queryProp("@name"));
  901. aliasInfo->setId(alias->queryProp("@id"));
  902. }
  903. void retrieveAllQuerysetDetails(IEspContext &context, IPropertyTree *registry, IArrayOf<IEspQuerySetQuery> &queries, IArrayOf<IEspQuerySetAlias> &aliases, const char *cluster=NULL, IPropertyTree *queriesOnCluster=NULL, const char *type=NULL, const char *value=NULL)
  904. {
  905. Owned<IPropertyTreeIterator> regQueries = registry->getElements("Query");
  906. ForEach(*regQueries)
  907. {
  908. IPropertyTree &query = regQueries->query();
  909. Owned<IEspQuerySetQuery> q = createQuerySetQuery();
  910. gatherQuerySetQueryDetails(context, &query, q, cluster, queriesOnCluster);
  911. if (isEmpty(cluster) || isEmpty(type) || isEmpty(value) || !strieq(type, "Status"))
  912. queries.append(*q.getClear());
  913. else
  914. {
  915. IArrayOf<IConstClusterQueryState>& cs = q->getClusters();
  916. ForEachItemIn(i, cs)
  917. {
  918. IConstClusterQueryState& c = cs.item(i);
  919. if (strieq(c.getCluster(), cluster) && (strieq(value, "All") || strieq(c.getState(), value)))
  920. {
  921. queries.append(*q.getClear());
  922. break;
  923. }
  924. }
  925. }
  926. }
  927. Owned<IPropertyTreeIterator> regAliases = registry->getElements("Alias");
  928. ForEach(*regAliases)
  929. {
  930. IPropertyTree &alias = regAliases->query();
  931. Owned<IEspQuerySetAlias> a = createQuerySetAlias();
  932. gatherQuerySetAliasDetails(&alias, a);
  933. aliases.append(*a.getClear());
  934. }
  935. }
  936. void retrieveQuerysetDetailsFromAlias(IEspContext &context, IPropertyTree *registry, const char *name, IArrayOf<IEspQuerySetQuery> &queries, IArrayOf<IEspQuerySetAlias> &aliases, const char *cluster, IPropertyTree *queriesOnCluster)
  937. {
  938. StringBuffer xpath;
  939. xpath.append("Alias[@name='").append(name).append("']");
  940. Owned<IPropertyTreeIterator> regAliases = registry->getElements(xpath.str());
  941. if (!regAliases->first())
  942. {
  943. DBGLOG("Alias %s not found", name);
  944. return;
  945. }
  946. ForEach(*regAliases)
  947. {
  948. IPropertyTree& alias = regAliases->query();
  949. Owned<IEspQuerySetAlias> a = createQuerySetAlias();
  950. gatherQuerySetAliasDetails(&alias, a);
  951. xpath.clear().append("Query[@id='").append(a->getId()).append("']");
  952. aliases.append(*a.getClear());
  953. IPropertyTree *query = registry->queryPropTree(xpath);
  954. if (!query)
  955. {
  956. DBGLOG("No matching Query %s found for Alias %s", a->getId(), name);
  957. return;
  958. }
  959. Owned<IEspQuerySetQuery> q = createQuerySetQuery();
  960. gatherQuerySetQueryDetails(context, query, q, cluster, queriesOnCluster);
  961. queries.append(*q.getClear());
  962. }
  963. }
  964. void retrieveQuerysetDetailsFromQuery(IEspContext &context, IPropertyTree *registry, const char *value, const char *type, IArrayOf<IEspQuerySetQuery> &queries, IArrayOf<IEspQuerySetAlias> &aliases, const char *cluster=NULL, IPropertyTree *queriesOnCluster=NULL)
  965. {
  966. if (!strieq(type, "Id") && !strieq(type, "Name"))
  967. throw MakeStringException(ECLWATCH_INVALID_INPUT, "Unrecognized queryset filter type %s", type);
  968. StringBuffer attributeName(type);
  969. StringBuffer xpath;
  970. xpath.clear().append("Query[@").append(attributeName.toLowerCase()).append("='").append(value).append("']");
  971. Owned<IPropertyTreeIterator> regQueries = registry->getElements(xpath.str());
  972. if (!regQueries->first())
  973. {
  974. DBGLOG("No matching Query %s found for %s", value, type);
  975. return;
  976. }
  977. ForEach(*regQueries)
  978. {
  979. IPropertyTree& query = regQueries->query();
  980. Owned<IEspQuerySetQuery> q = createQuerySetQuery();
  981. gatherQuerySetQueryDetails(context, &query, q, cluster, queriesOnCluster);
  982. xpath.clear().append("Alias[@id='").append(q->getId()).append("']");
  983. queries.append(*q.getClear());
  984. Owned<IPropertyTreeIterator> regAliases = registry->getElements(xpath.str());
  985. ForEach(*regAliases)
  986. {
  987. IPropertyTree &alias = regAliases->query();
  988. Owned<IEspQuerySetAlias> a = createQuerySetAlias();
  989. gatherQuerySetAliasDetails(&alias, a);
  990. aliases.append(*a.getClear());
  991. }
  992. }
  993. }
  994. void retrieveQuerysetDetails(IEspContext &context, IPropertyTree *registry, const char *type, const char *value, IArrayOf<IEspQuerySetQuery> &queries, IArrayOf<IEspQuerySetAlias> &aliases, const char *cluster=NULL, IPropertyTree *queriesOnCluster=NULL)
  995. {
  996. if (strieq(type, "All"))
  997. return retrieveAllQuerysetDetails(context, registry, queries, aliases, cluster, queriesOnCluster);
  998. if (!value || !*value)
  999. return;
  1000. if (strieq(type, "Alias"))
  1001. return retrieveQuerysetDetailsFromAlias(context, registry, value, queries, aliases, cluster, queriesOnCluster);
  1002. if (strieq(type, "Status") && !isEmpty(cluster))
  1003. return retrieveAllQuerysetDetails(context, registry, queries, aliases, cluster, queriesOnCluster, type, value);
  1004. return retrieveQuerysetDetailsFromQuery(context, registry, value, type, queries, aliases, cluster, queriesOnCluster);
  1005. }
  1006. void retrieveQuerysetDetails(IEspContext &context, IArrayOf<IEspWUQuerySetDetail> &details, IPropertyTree *registry, const char *type, const char *value, const char *cluster=NULL, IPropertyTree *queriesOnCluster=NULL)
  1007. {
  1008. if (!registry)
  1009. return;
  1010. IArrayOf<IEspQuerySetQuery> queries;
  1011. IArrayOf<IEspQuerySetAlias> aliases;
  1012. retrieveQuerysetDetails(context, registry, type, value, queries, aliases, cluster, queriesOnCluster);
  1013. Owned<IEspWUQuerySetDetail> detail = createWUQuerySetDetail();
  1014. detail->setQuerySetName(registry->queryProp("@id"));
  1015. detail->setQueries(queries);
  1016. detail->setAliases(aliases);
  1017. details.append(*detail.getClear());
  1018. }
  1019. void retrieveQuerysetDetails(IEspContext &context, IArrayOf<IEspWUQuerySetDetail> &details, const char *queryset, const char *type, const char *value, const char *cluster=NULL, IPropertyTree *queriesOnCluster=NULL)
  1020. {
  1021. if (!queryset || !*queryset)
  1022. return;
  1023. Owned<IPropertyTree> registry = getQueryRegistry(queryset, true);
  1024. if (!registry)
  1025. return;
  1026. retrieveQuerysetDetails(context, details, registry, type, value, cluster, queriesOnCluster);
  1027. }
  1028. IPropertyTree* getQueriesOnCluster(const char *target, const char *queryset, bool checkAllNodes)
  1029. {
  1030. if (isEmpty(target))
  1031. target = queryset;
  1032. Owned<IConstWUClusterInfo> info = getTargetClusterInfo(target);
  1033. if (!info)
  1034. throw MakeStringException(ECLWATCH_CANNOT_RESOLVE_CLUSTER_NAME, "Cluster %s not found", target);
  1035. if (queryset && *queryset && !strieq(target, queryset))
  1036. throw MakeStringException(ECLWATCH_QUERYSET_NOT_ON_CLUSTER, "Target %s and QuerySet %s should match", target, queryset);
  1037. if (info->getPlatform()!=RoxieCluster)
  1038. return NULL;
  1039. const SocketEndpointArray &eps = info->getRoxieServers();
  1040. if (!eps.length())
  1041. return NULL;
  1042. try
  1043. {
  1044. Owned<ISocket> sock = ISocket::connect_timeout(eps.item(0), ROXIECONNECTIONTIMEOUT);
  1045. if (checkAllNodes)
  1046. return sendRoxieControlAllNodes(sock, "<control:queries/>", false, ROXIECONTROLQUERIESTIMEOUT);
  1047. else
  1048. return sendRoxieControlQuery(sock, "<control:queries/>", ROXIECONTROLQUERIESTIMEOUT);
  1049. }
  1050. catch(IException* e)
  1051. {
  1052. StringBuffer err;
  1053. DBGLOG("Get exception in control:queries: %s", e->errorMessage(err.append(e->errorCode()).append(' ')).str());
  1054. e->Release();
  1055. return NULL;
  1056. }
  1057. }
  1058. void retrieveQuerysetDetailsByCluster(IEspContext &context, IArrayOf<IEspWUQuerySetDetail> &details, const char *target, const char *queryset, const char *type, const char *value, bool checkAllNodes)
  1059. {
  1060. Owned<IPropertyTree> queriesOnCluster = getQueriesOnCluster(target, queryset, checkAllNodes);
  1061. retrieveQuerysetDetails(context, details, target, type, value, target, queriesOnCluster);
  1062. }
  1063. void retrieveAllQuerysetDetails(IEspContext &context, IArrayOf<IEspWUQuerySetDetail> &details, const char *type, const char *value)
  1064. {
  1065. Owned<IPropertyTree> root = getQueryRegistryRoot();
  1066. if (!root)
  1067. throw MakeStringException(ECLWATCH_QUERYSET_NOT_FOUND, "QuerySet Registry not found");
  1068. Owned<IPropertyTreeIterator> querysets = root->getElements("QuerySet");
  1069. ForEach(*querysets)
  1070. retrieveQuerysetDetails(context, details, &querysets->query(), type, value);
  1071. }
  1072. bool CWsWorkunitsEx::onWUQuerysetDetails(IEspContext &context, IEspWUQuerySetDetailsRequest & req, IEspWUQuerySetDetailsResponse & resp)
  1073. {
  1074. resp.setQuerySetName(req.getQuerySetName());
  1075. double version = context.getClientVersion();
  1076. if (version > 1.36)
  1077. {
  1078. Owned<IPropertyTree> queryRegistry = getQueryRegistry(req.getQuerySetName(), false);
  1079. resp.setClusterName(req.getClusterName());
  1080. resp.setFilter(req.getFilter());
  1081. resp.setFilterType(req.getFilterType());
  1082. }
  1083. Owned<IPropertyTree> registry = getQueryRegistry(req.getQuerySetName(), true);
  1084. if (!registry)
  1085. return false;
  1086. PROGLOG("WUQuerysetDetails for queryset %s", req.getQuerySetName());
  1087. IArrayOf<IEspQuerySetQuery> respQueries;
  1088. IArrayOf<IEspQuerySetAlias> respAliases;
  1089. if (isEmpty(req.getClusterName()) || isEmpty(req.getFilterTypeAsString()) || !strieq(req.getFilterTypeAsString(), "Status") || isEmpty(req.getFilter()))
  1090. {
  1091. const char* cluster = req.getClusterName();
  1092. if (isEmpty(cluster))
  1093. cluster = req.getQuerySetName();
  1094. Owned<IPropertyTree> queriesOnCluster = getQueriesOnCluster(cluster, req.getQuerySetName(), req.getCheckAllNodes());
  1095. retrieveQuerysetDetails(context, registry, req.getFilterTypeAsString(), req.getFilter(), respQueries, respAliases, cluster, queriesOnCluster);
  1096. resp.setQuerysetQueries(respQueries);
  1097. resp.setQuerysetAliases(respAliases);
  1098. }
  1099. else
  1100. {
  1101. IArrayOf<IEspWUQuerySetDetail> respDetails;
  1102. retrieveQuerysetDetailsByCluster(context, respDetails, req.getClusterName(), req.getQuerySetName(), req.getFilterTypeAsString(), req.getFilter(), false);
  1103. if (respDetails.ordinality())
  1104. {
  1105. IEspWUQuerySetDetail& detail = respDetails.item(0);
  1106. resp.setQuerysetQueries(detail.getQueries());
  1107. resp.setQuerysetAliases(detail.getAliases());
  1108. }
  1109. }
  1110. return true;
  1111. }
  1112. bool CWsWorkunitsEx::onWUMultiQuerysetDetails(IEspContext &context, IEspWUMultiQuerySetDetailsRequest & req, IEspWUMultiQuerySetDetailsResponse & resp)
  1113. {
  1114. IArrayOf<IEspWUQuerySetDetail> respDetails;
  1115. if (notEmpty(req.getClusterName()))
  1116. {
  1117. PROGLOG("WUMultiQuerysetDetails for cluster %s", req.getClusterName());
  1118. retrieveQuerysetDetailsByCluster(context, respDetails, req.getClusterName(), req.getQuerySetName(), req.getFilterTypeAsString(), req.getFilter(), req.getCheckAllNodes());
  1119. }
  1120. else if (notEmpty(req.getQuerySetName()))
  1121. {
  1122. PROGLOG("WUMultiQuerysetDetails for queryset %s", req.getQuerySetName());
  1123. retrieveQuerysetDetails(context, respDetails, req.getQuerySetName(), req.getFilterTypeAsString(), req.getFilter());
  1124. }
  1125. else
  1126. {
  1127. VStringBuffer logMsg("WUMultiQuerysetDetails: FilterType %s", req.getFilterTypeAsString());
  1128. if (notEmpty(req.getFilter()))
  1129. logMsg.append(", Filter ").append(req.getFilter());
  1130. PROGLOG("%s", logMsg.str());
  1131. retrieveAllQuerysetDetails(context, respDetails, req.getFilterTypeAsString(), req.getFilter());
  1132. }
  1133. resp.setQuerysets(respDetails);
  1134. return true;
  1135. }
  1136. bool addWUQSQueryFilter(WUQuerySortField *filters, unsigned short &count, MemoryBuffer &buff, const char* value, WUQuerySortField name)
  1137. {
  1138. if (isEmpty(value))
  1139. return false;
  1140. filters[count++] = name;
  1141. buff.append(value);
  1142. return true;
  1143. }
  1144. bool addWUQSQueryFilterInt(WUQuerySortField *filters, unsigned short &count, MemoryBuffer &buff, int value, WUQuerySortField name)
  1145. {
  1146. VStringBuffer vBuf("%d", value);
  1147. filters[count++] = name;
  1148. buff.append(vBuf.str());
  1149. return true;
  1150. }
  1151. bool addWUQSQueryFilterInt64(WUQuerySortField *filters, unsigned short &count, MemoryBuffer &buff, __int64 value, WUQuerySortField name)
  1152. {
  1153. VStringBuffer vBuf("%" I64F "d", value);
  1154. filters[count++] = name;
  1155. buff.append(vBuf.str());
  1156. return true;
  1157. }
  1158. unsigned CWsWorkunitsEx::getGraphIdsByQueryId(const char *target, const char *queryId, StringArray& graphIds)
  1159. {
  1160. if (!target || !*target)
  1161. throw MakeStringException(ECLWATCH_MISSING_PARAMS, "Target name required");
  1162. if (!queryId || !*queryId)
  1163. throw MakeStringException(ECLWATCH_MISSING_PARAMS, "Query Id required");
  1164. Owned<IConstWUClusterInfo> info = getTargetClusterInfo(target);
  1165. if (!info || (info->getPlatform()!=RoxieCluster)) //Only roxie query has query graph.
  1166. return 0;
  1167. const SocketEndpointArray &eps = info->getRoxieServers();
  1168. if (eps.empty())
  1169. return 0;
  1170. VStringBuffer xpath("<control:querystats><Query id='%s'/></control:querystats>", queryId);
  1171. Owned<ISocket> sock = ISocket::connect_timeout(eps.item(0), ROXIECONNECTIONTIMEOUT);
  1172. Owned<IPropertyTree> querystats = sendRoxieControlQuery(sock, xpath.str(), ROXIECONTROLQUERYTIMEOUT);
  1173. if (!querystats)
  1174. return 0;
  1175. Owned<IPropertyTreeIterator> graphs = querystats->getElements("Endpoint/Query/Graph");
  1176. ForEach(*graphs)
  1177. {
  1178. IPropertyTree &graph = graphs->query();
  1179. const char* graphId = graph.queryProp("@id");
  1180. if (graphId && *graphId)
  1181. graphIds.appendUniq(graphId);
  1182. }
  1183. return graphIds.length();
  1184. }
  1185. //This method is thread safe because a query belongs to a single queryset. The method may be called by different threads.
  1186. //Since one thread is for one queryset and a query only belongs to a single queryset, it is impossible for different threads
  1187. //to update the same query object.
  1188. void CWsWorkunitsEx::checkAndSetClusterQueryState(IEspContext &context, const char* cluster, const char* querySetId, IArrayOf<IEspQuerySetQuery>& queries, bool checkAllNodes)
  1189. {
  1190. try
  1191. {
  1192. double version = context.getClientVersion();
  1193. if (isEmpty(cluster))
  1194. cluster = querySetId;
  1195. Owned<IPropertyTree> queriesOnCluster = getQueriesOnCluster(cluster, querySetId, checkAllNodes);
  1196. if (!queriesOnCluster)
  1197. {
  1198. DBGLOG("getQueriesOnCluster() returns NULL for cluster<%s> and querySetId<%s>", cluster, querySetId);
  1199. return;
  1200. }
  1201. ForEachItemIn(i, queries)
  1202. {
  1203. IEspQuerySetQuery& query = queries.item(i);
  1204. const char* queryId = query.getId();
  1205. const char* querySetId0 = query.getQuerySetId();
  1206. if (!queryId || !querySetId0 || !strieq(querySetId0, querySetId))
  1207. continue;
  1208. IArrayOf<IEspClusterQueryState> clusters;
  1209. addClusterQueryStates(queriesOnCluster, cluster, queryId, clusters, version);
  1210. query.setClusters(clusters);
  1211. }
  1212. }
  1213. catch(IException *e)
  1214. {
  1215. EXCLOG(e, "CWsWorkunitsEx::checkAndSetClusterQueryState: Failed to read Query State On Cluster");
  1216. e->Release();
  1217. }
  1218. }
  1219. void CWsWorkunitsEx::checkAndSetClusterQueryState(IEspContext &context, const char* cluster, StringArray& querySetIds, IArrayOf<IEspQuerySetQuery>& queries, bool checkAllNodes)
  1220. {
  1221. UnsignedArray threadHandles;
  1222. ForEachItemIn(i, querySetIds)
  1223. {
  1224. const char* querySetId = querySetIds.item(i);
  1225. if(!querySetId || !*querySetId)
  1226. continue;
  1227. Owned<CClusterQueryStateParam> threadReq = new CClusterQueryStateParam(this, context, cluster, querySetId, queries, checkAllNodes);
  1228. PooledThreadHandle handle = clusterQueryStatePool->start( threadReq.getClear() );
  1229. threadHandles.append(handle);
  1230. }
  1231. //block for worker threads to finish, if necessary and then collect results
  1232. //Not use joinAll() because multiple threads may call this method. Each call uses the pool to create
  1233. //its own threads of checking query state. Each call should only join the ones created by that call.
  1234. ForEachItemIn(ii, threadHandles)
  1235. clusterQueryStatePool->join(threadHandles.item(ii));
  1236. }
  1237. bool CWsWorkunitsEx::onWUListQueries(IEspContext &context, IEspWUListQueriesRequest & req, IEspWUListQueriesResponse & resp)
  1238. {
  1239. bool descending = req.getDescending();
  1240. const char *sortBy = req.getSortby();
  1241. WUQuerySortField sortOrder[2] = {WUQSFId, WUQSFterm};
  1242. if(notEmpty(sortBy))
  1243. {
  1244. if (strieq(sortBy, "Name"))
  1245. sortOrder[0] = WUQSFname;
  1246. else if (strieq(sortBy, "WUID"))
  1247. sortOrder[0] = WUQSFwuid;
  1248. else if (strieq(sortBy, "DLL"))
  1249. sortOrder[0] = WUQSFdll;
  1250. else if (strieq(sortBy, "Activated"))
  1251. sortOrder[0] = WUQSFActivited;
  1252. else if (strieq(sortBy, "MemoryLimit"))
  1253. sortOrder[0] = (WUQuerySortField) (WUQSFmemoryLimit | WUQSFnumeric);
  1254. else if (strieq(sortBy, "TimeLimit"))
  1255. sortOrder[0] = (WUQuerySortField) (WUQSFtimeLimit | WUQSFnumeric);
  1256. else if (strieq(sortBy, "WarnTimeLimit"))
  1257. sortOrder[0] = (WUQuerySortField) (WUQSFwarnTimeLimit | WUQSFnumeric);
  1258. else if (strieq(sortBy, "Priority"))
  1259. sortOrder[0] = (WUQuerySortField) (WUQSFpriority | WUQSFnumeric);
  1260. else if (strieq(sortBy, "PublishedBy"))
  1261. sortOrder[0] = WUQSFPublishedBy;
  1262. else if (strieq(sortBy, "QuerySetId"))
  1263. sortOrder[0] = WUQSFQuerySet;
  1264. else
  1265. sortOrder[0] = WUQSFId;
  1266. sortOrder[0] = (WUQuerySortField) (sortOrder[0] | WUQSFnocase);
  1267. if (descending)
  1268. sortOrder[0] = (WUQuerySortField) (sortOrder[0] | WUQSFreverse);
  1269. }
  1270. WUQuerySortField filters[16];
  1271. unsigned short filterCount = 0;
  1272. MemoryBuffer filterBuf;
  1273. const char* clusterReq = req.getClusterName();
  1274. addWUQSQueryFilter(filters, filterCount, filterBuf, req.getQuerySetName(), WUQSFQuerySet);
  1275. addWUQSQueryFilter(filters, filterCount, filterBuf, req.getQueryID(), (WUQuerySortField) (WUQSFId | WUQSFwild | WUSFnocase));
  1276. addWUQSQueryFilter(filters, filterCount, filterBuf, req.getQueryName(), (WUQuerySortField) (WUQSFname | WUQSFwild | WUSFnocase));
  1277. addWUQSQueryFilter(filters, filterCount, filterBuf, req.getWUID(), (WUQuerySortField) (WUQSFwuid | WUSFnocase));
  1278. addWUQSQueryFilter(filters, filterCount, filterBuf, req.getLibraryName(), (WUQuerySortField) (WUQSFLibrary | WUQSFnocase));
  1279. addWUQSQueryFilter(filters, filterCount, filterBuf, req.getPublishedBy(), (WUQuerySortField) (WUQSFPublishedBy | WUQSFwild | WUSFnocase));
  1280. if (!req.getMemoryLimitLow_isNull())
  1281. addWUQSQueryFilterInt64(filters, filterCount, filterBuf, req.getMemoryLimitLow(), (WUQuerySortField) (WUQSFmemoryLimit | WUQSFnumeric));
  1282. if (!req.getMemoryLimitHigh_isNull())
  1283. addWUQSQueryFilterInt64(filters, filterCount, filterBuf, req.getMemoryLimitHigh(), (WUQuerySortField) (WUQSFmemoryLimitHi | WUQSFnumeric));
  1284. if (!req.getTimeLimitLow_isNull())
  1285. addWUQSQueryFilterInt(filters, filterCount, filterBuf, req.getTimeLimitLow(), (WUQuerySortField) (WUQSFtimeLimit | WUQSFnumeric));
  1286. if (!req.getTimeLimitHigh_isNull())
  1287. addWUQSQueryFilterInt(filters, filterCount, filterBuf, req.getTimeLimitHigh(), (WUQuerySortField) (WUQSFtimeLimitHi | WUQSFnumeric));
  1288. if (!req.getWarnTimeLimitLow_isNull())
  1289. addWUQSQueryFilterInt(filters, filterCount, filterBuf, req.getWarnTimeLimitLow(), (WUQuerySortField) (WUQSFwarnTimeLimit | WUQSFnumeric));
  1290. if (!req.getWarnTimeLimitHigh_isNull())
  1291. addWUQSQueryFilterInt(filters, filterCount, filterBuf, req.getWarnTimeLimitHigh(), (WUQuerySortField) (WUQSFwarnTimeLimitHi | WUQSFnumeric));
  1292. if (!req.getPriorityLow_isNull())
  1293. addWUQSQueryFilterInt(filters, filterCount, filterBuf, req.getPriorityLow(), (WUQuerySortField) (WUQSFpriority | WUQSFnumeric));
  1294. if (!req.getPriorityHigh_isNull())
  1295. addWUQSQueryFilterInt(filters, filterCount, filterBuf, req.getPriorityHigh(), (WUQuerySortField) (WUQSFpriorityHi | WUQSFnumeric));
  1296. if (!req.getActivated_isNull())
  1297. addWUQSQueryFilterInt(filters, filterCount, filterBuf, req.getActivated(), (WUQuerySortField) (WUQSFActivited | WUQSFnumeric));
  1298. if (!req.getSuspendedByUser_isNull())
  1299. addWUQSQueryFilterInt(filters, filterCount, filterBuf, req.getSuspendedByUser(), (WUQuerySortField) (WUQSFSuspendedByUser | WUQSFnumeric));
  1300. filters[filterCount] = WUQSFterm;
  1301. unsigned numberOfQueries = 0;
  1302. unsigned pageSize = req.getPageSize();
  1303. unsigned pageStartFrom = req.getPageStartFrom();
  1304. if(pageSize < 1)
  1305. pageSize = 100;
  1306. __int64 cacheHint = 0;
  1307. if (!req.getCacheHint_isNull())
  1308. cacheHint = req.getCacheHint();
  1309. Owned<MapStringTo<bool> > queriesUsingFileMap;
  1310. const char *lfn = req.getFileName();
  1311. if (lfn && *lfn)
  1312. {
  1313. queriesUsingFileMap.setown(new MapStringTo<bool>());
  1314. StringAttr dummy;
  1315. Owned<IPropertyTreeIterator> queriesUsingFile = filesInUse.findQueriesUsingFile(clusterReq, lfn, dummy);
  1316. ForEach (*queriesUsingFile)
  1317. {
  1318. IPropertyTree &queryUsingFile = queriesUsingFile->query();
  1319. const char *queryTarget = queryUsingFile.queryProp("@target");
  1320. const char *queryId = queryUsingFile.queryProp("@id");
  1321. if (queryTarget && *queryTarget && queryId && *queryId)
  1322. {
  1323. VStringBuffer targetQuery("%s/%s", queryTarget, queryId);
  1324. queriesUsingFileMap->setValue(targetQuery, true);
  1325. }
  1326. }
  1327. }
  1328. PROGLOG("WUListQueries: getQuerySetQueriesSorted");
  1329. Owned<IWorkUnitFactory> factory = getWorkUnitFactory(context.querySecManager(), context.queryUser());
  1330. Owned<IConstQuerySetQueryIterator> it = factory->getQuerySetQueriesSorted(sortOrder, filters, filterBuf.bufferBase(), pageStartFrom, pageSize, &cacheHint, &numberOfQueries, queriesUsingFileMap);
  1331. resp.setCacheHint(cacheHint);
  1332. PROGLOG("WUListQueries: getQuerySetQueriesSorted done");
  1333. StringArray querySetIds;
  1334. IArrayOf<IEspQuerySetQuery> queries;
  1335. double version = context.getClientVersion();
  1336. ForEach(*it)
  1337. {
  1338. IPropertyTree &query=it->query();
  1339. const char *queryId = query.queryProp("@id");
  1340. const char *queryTarget = query.queryProp("@querySetId");
  1341. Owned<IEspQuerySetQuery> q = createQuerySetQuery();
  1342. q->setId(queryId);
  1343. q->setQuerySetId(queryTarget);
  1344. q->setName(query.queryProp("@name"));
  1345. q->setDll(query.queryProp("@dll"));
  1346. q->setWuid(query.queryProp("@wuid"));
  1347. q->setActivated(query.getPropBool("@activated", false));
  1348. q->setSuspended(query.getPropBool("@suspended", false));
  1349. if (query.hasProp("@memoryLimit"))
  1350. {
  1351. StringBuffer s;
  1352. memoryLimitStringFromUInt64(s, query.getPropInt64("@memoryLimit"));
  1353. q->setMemoryLimit(s);
  1354. }
  1355. if (query.hasProp("@timeLimit"))
  1356. q->setTimeLimit(query.getPropInt("@timeLimit"));
  1357. if (query.hasProp("@warnTimeLimit"))
  1358. q->setWarnTimeLimit(query.getPropInt("@warnTimeLimit"));
  1359. if (query.hasProp("@priority"))
  1360. q->setPriority(getQueryPriorityName(query.getPropInt("@priority")));
  1361. if (query.hasProp("@comment"))
  1362. q->setComment(query.queryProp("@comment"));
  1363. if (version >= 1.46)
  1364. {
  1365. q->setPublishedBy(query.queryProp("@publishedBy"));
  1366. q->setIsLibrary(query.getPropBool("@isLibrary"));
  1367. }
  1368. if (!querySetIds.contains(queryTarget))
  1369. querySetIds.append(queryTarget);
  1370. queries.append(*q.getClear());
  1371. }
  1372. checkAndSetClusterQueryState(context, clusterReq, querySetIds, queries, req.getCheckAllNodes());
  1373. resp.setQuerysetQueries(queries);
  1374. resp.setNumberOfQueries(numberOfQueries);
  1375. return true;
  1376. }
  1377. bool CWsWorkunitsEx::onWUListQueriesUsingFile(IEspContext &context, IEspWUListQueriesUsingFileRequest &req, IEspWUListQueriesUsingFileResponse &resp)
  1378. {
  1379. const char *target = req.getTarget();
  1380. const char *process = req.getProcess();
  1381. StringBuffer lfn(req.getFileName());
  1382. resp.setFileName(lfn.toLowerCase());
  1383. resp.setProcess(process);
  1384. if (lfn.isEmpty())
  1385. throw MakeStringException(ECLWATCH_MISSING_PARAMS, "FileName required");
  1386. VStringBuffer logMsg("WUListQueriesUsingFile: %s", lfn.str());
  1387. StringArray targets;
  1388. if (target && *target)
  1389. {
  1390. targets.append(target);
  1391. logMsg.append(", target ").append(target);
  1392. }
  1393. else // if (process && *process)
  1394. {
  1395. SCMStringBuffer targetStr;
  1396. Owned<IStringIterator> targetClusters = getTargetClusters("RoxieCluster", process);
  1397. ForEach(*targetClusters)
  1398. targets.append(targetClusters->str(targetStr).str());
  1399. logMsg.append(", process ").append(process);
  1400. }
  1401. PROGLOG("%s", logMsg.str());
  1402. IArrayOf<IEspTargetQueriesUsingFile> respTargets;
  1403. ForEachItemIn(i, targets)
  1404. {
  1405. target = targets.item(i);
  1406. Owned<IEspTargetQueriesUsingFile> respTarget = createTargetQueriesUsingFile();
  1407. respTarget->setTarget(target);
  1408. StringAttr pmid;
  1409. Owned<IPropertyTreeIterator> queries = filesInUse.findQueriesUsingFile(target, lfn, pmid);
  1410. if (!pmid.isEmpty())
  1411. respTarget->setPackageMap(pmid);
  1412. if (queries)
  1413. {
  1414. IArrayOf<IEspQueryUsingFile> respQueries;
  1415. ForEach(*queries)
  1416. {
  1417. IPropertyTree &query = queries->query();
  1418. Owned<IEspQueryUsingFile> q = createQueryUsingFile();
  1419. q->setId(query.queryProp("@id"));
  1420. VStringBuffer xpath("File[@lfn='%s']/@pkgid", lfn.str());
  1421. if (query.hasProp(xpath))
  1422. q->setPackage(query.queryProp(xpath));
  1423. respQueries.append(*q.getClear());
  1424. }
  1425. respTarget->setQueries(respQueries);
  1426. }
  1427. respTargets.append(*respTarget.getClear());
  1428. }
  1429. resp.setTargets(respTargets);
  1430. return true;
  1431. }
  1432. bool CWsWorkunitsEx::onWUQueryFiles(IEspContext &context, IEspWUQueryFilesRequest &req, IEspWUQueryFilesResponse &resp)
  1433. {
  1434. const char *target = req.getTarget();
  1435. const char *query = req.getQueryId();
  1436. if (!target || !*target)
  1437. throw MakeStringException(ECLWATCH_QUERYSET_NOT_FOUND, "Target not specified");
  1438. if (!isValidCluster(target))
  1439. throw MakeStringException(ECLWATCH_INVALID_CLUSTER_NAME, "Invalid target name: %s", target);
  1440. if (!query || !*query)
  1441. throw MakeStringException(ECLWATCH_QUERYID_NOT_FOUND, "Query not specified");
  1442. Owned<IPropertyTree> registeredQuery = resolveQueryAlias(target, query, true);
  1443. if (!registeredQuery)
  1444. throw MakeStringException(ECLWATCH_QUERYID_NOT_FOUND, "Query not found");
  1445. PROGLOG("WUQueryFiles: target %s, query %s", target, query);
  1446. StringAttr queryid(registeredQuery->queryProp("@id"));
  1447. registeredQuery.clear();
  1448. Owned<IPropertyTree> tree = filesInUse.getTree();
  1449. VStringBuffer xpath("%s/Query[@id='%s']", target, queryid.get());
  1450. IPropertyTree *queryTree = tree->queryPropTree(xpath);
  1451. if (!queryTree)
  1452. throw MakeStringException(ECLWATCH_QUERYID_NOT_FOUND, "Query not found in file cache (%s)", xpath.str());
  1453. IArrayOf<IEspFileUsedByQuery> referencedFiles;
  1454. Owned<IPropertyTreeIterator> files = queryTree->getElements("File");
  1455. ForEach(*files)
  1456. {
  1457. IPropertyTree &file = files->query();
  1458. if (file.getPropBool("@super", 0))
  1459. continue;
  1460. Owned<IEspFileUsedByQuery> respFile = createFileUsedByQuery();
  1461. respFile->setFileName(file.queryProp("@lfn"));
  1462. respFile->setFileSize(file.getPropInt64("@size"));
  1463. respFile->setNumberOfParts(file.getPropInt("@numparts"));
  1464. referencedFiles.append(*respFile.getClear());
  1465. }
  1466. resp.setFiles(referencedFiles);
  1467. return true;
  1468. }
  1469. void copyWorkunitForRecompile(IEspContext &context, IWorkUnitFactory *factory, const char *srcWuid, StringAttr &wuid, StringAttr &jobname)
  1470. {
  1471. Owned<IConstWorkUnit> src(factory->openWorkUnit(srcWuid));
  1472. if (!src)
  1473. throw MakeStringException(ECLWATCH_CANNOT_OPEN_WORKUNIT,"Cannot open workunit %s.", srcWuid);
  1474. WsWuInfo info(context, src);
  1475. StringBuffer archiveText;
  1476. info.getWorkunitArchiveQuery(archiveText); //archive required, fail otherwise
  1477. if (!isArchiveQuery(archiveText))
  1478. throw MakeStringException(ECLWATCH_RESOURCE_NOT_FOUND,"Cannot retrieve workunit ECL archive %s.", srcWuid);
  1479. SCMStringBuffer mainDefinition;
  1480. Owned <IConstWUQuery> query = src->getQuery();
  1481. if (query)
  1482. query->getQueryMainDefinition(mainDefinition);
  1483. NewWsWorkunit wu(factory, context);
  1484. wuid.set(wu->queryWuid());
  1485. wu->setAction(WUActionCompile);
  1486. SCMStringBuffer token;
  1487. wu->setSecurityToken(createToken(wuid.str(), context.queryUserId(), context.queryPassword(), token).str());
  1488. jobname.set(src->queryJobName());
  1489. if (jobname.length())
  1490. wu->setJobName(jobname);
  1491. wu.setQueryText(archiveText.str());
  1492. if (mainDefinition.length())
  1493. wu.setQueryMain(mainDefinition.str());
  1494. wu->setResultLimit(src->getResultLimit());
  1495. IStringIterator &names = src->getDebugValues();
  1496. ForEach(names)
  1497. {
  1498. SCMStringBuffer name, value;
  1499. names.str(name);
  1500. if (0==strncmp(name.str(), "eclcc", 5))
  1501. wu->setDebugValue(name.str(), src->getDebugValue(name.str(), value).str(), true);
  1502. }
  1503. }
  1504. bool CWsWorkunitsEx::onWURecreateQuery(IEspContext &context, IEspWURecreateQueryRequest &req, IEspWURecreateQueryResponse &resp)
  1505. {
  1506. try
  1507. {
  1508. const char* srcTarget = req.getTarget();
  1509. const char* queryIdOrAlias = req.getQueryId();
  1510. if (!srcTarget || !*srcTarget)
  1511. throw MakeStringException(ECLWATCH_QUERYSET_NOT_FOUND, "Target not specified");
  1512. if (!queryIdOrAlias || !*queryIdOrAlias)
  1513. throw MakeStringException(ECLWATCH_QUERYID_NOT_FOUND, "QueryId not specified");
  1514. const char *target = req.getDestTarget();
  1515. if (isEmptyString(target))
  1516. target = srcTarget;
  1517. Owned<IPropertyTree> queryRegistry = getQueryRegistry(srcTarget, false);
  1518. Owned<IPropertyTree> srcQueryTree = resolveQueryAlias(queryRegistry, queryIdOrAlias);
  1519. if (!srcQueryTree)
  1520. {
  1521. DBGLOG("WURecreateQuery - No matching Query");
  1522. throw MakeStringException(ECLWATCH_QUERYID_NOT_FOUND,"No matching query for given id or alias %s.", queryIdOrAlias);
  1523. }
  1524. resp.setPriority(isEmptyString(req.getPriority()) ? srcQueryTree->queryProp("@priority") : req.getPriority());
  1525. resp.setComment(isEmptyString(req.getComment()) ? srcQueryTree->queryProp("@comment") : req.getComment());
  1526. resp.setMemoryLimit(isEmptyString(req.getMemoryLimit()) ? srcQueryTree->queryProp("@memoryLimit") : req.getMemoryLimit());
  1527. resp.setTimeLimit(req.getTimeLimit_isNull() ? srcQueryTree->getPropInt("@timeLimit") : req.getTimeLimit());
  1528. resp.setWarnTimeLimit(req.getWarnTimeLimit_isNull() ? srcQueryTree->getPropInt("@warnTimeLimit") : req.getWarnTimeLimit());
  1529. StringAttr wuid;
  1530. StringAttr jobname;
  1531. const char* srcQueryId = srcQueryTree->queryProp("@id");
  1532. const char* srcQueryName = srcQueryTree->queryProp("@name");
  1533. const char *srcWuid = srcQueryTree->queryProp("@wuid");
  1534. PROGLOG("WURecreateQuery: QuerySet %s, query %s, wuid %s", srcTarget, srcQueryId, srcWuid);
  1535. ensureWsWorkunitAccess(context, srcWuid, SecAccess_Write);
  1536. Owned<IWorkUnitFactory> factory = getWorkUnitFactory(context.querySecManager(), context.queryUser());
  1537. copyWorkunitForRecompile(context, factory, srcWuid, wuid, jobname);
  1538. resp.setWuid(wuid);
  1539. WsWuHelpers::submitWsWorkunit(context, wuid.str(), target, NULL, 0, true, false, false, NULL, NULL, &req.getDebugValues());
  1540. waitForWorkUnitToCompile(wuid.str(), req.getWait());
  1541. Owned<IConstWorkUnit> cw(factory->openWorkUnit(wuid.str()));
  1542. if (!cw)
  1543. throw MakeStringException(ECLWATCH_CANNOT_OPEN_WORKUNIT,"Cannot open recreated workunit %s.",wuid.str());
  1544. if (jobname.length())
  1545. {
  1546. StringBuffer name;
  1547. origValueChanged(jobname.str(), cw->queryJobName(), name, false);
  1548. if (name.length()) //non generated user specified name, so override #Workunit('name')
  1549. {
  1550. WorkunitUpdate wx(&cw->lock());
  1551. wx->setJobName(name.str());
  1552. }
  1553. }
  1554. PROGLOG("WURecreateQuery generated: %s", wuid.str());
  1555. AuditSystemAccess(context.queryUserId(), true, "Updated %s", wuid.str());
  1556. queryRegistry.clear();
  1557. srcQueryTree.clear();
  1558. if (req.getRepublish())
  1559. {
  1560. if (!req.getDontCopyFiles())
  1561. {
  1562. StringBuffer daliIP;
  1563. StringBuffer srcCluster;
  1564. StringBuffer srcPrefix;
  1565. splitDerivedDfsLocation(req.getRemoteDali(), srcCluster, daliIP, srcPrefix, req.getSourceProcess(),req.getSourceProcess(), NULL, NULL);
  1566. if (srcCluster.length())
  1567. {
  1568. if (!isProcessCluster(daliIP, srcCluster))
  1569. throw MakeStringException(ECLWATCH_INVALID_CLUSTER_NAME, "Process cluster %s not found on %s DALI", srcCluster.str(), daliIP.length() ? daliIP.str() : "local");
  1570. }
  1571. unsigned updateFlags = 0;
  1572. if (req.getUpdateDfs())
  1573. updateFlags |= (DALI_UPDATEF_SUPERFILES | DALI_UPDATEF_REPLACE_FILE | DALI_UPDATEF_CLONE_FROM);
  1574. if (req.getUpdateCloneFrom())
  1575. updateFlags |= DALI_UPDATEF_CLONE_FROM;
  1576. if (req.getUpdateSuperFiles())
  1577. updateFlags |= DALI_UPDATEF_SUPERFILES;
  1578. if (req.getAppendCluster())
  1579. updateFlags |= DALI_UPDATEF_APPEND_CLUSTER;
  1580. QueryFileCopier cpr(target);
  1581. cpr.init(context, req.getAllowForeignFiles());
  1582. cpr.remoteIP.set(daliIP);
  1583. cpr.remotePrefix.set(srcPrefix);
  1584. cpr.srcCluster.set(srcCluster);
  1585. cpr.queryname.set(srcQueryName);
  1586. cpr.copy(cw, updateFlags);
  1587. if (req.getIncludeFileErrors())
  1588. cpr.gatherFileErrors(resp.getFileErrors());
  1589. }
  1590. StringBuffer queryId;
  1591. WorkunitUpdate wu(&cw->lock());
  1592. WUQueryActivationOptions activate = (WUQueryActivationOptions)req.getActivate();
  1593. addQueryToQuerySet(wu, target, srcQueryName, activate, queryId, context.queryUserId());
  1594. {
  1595. Owned<IPropertyTree> queryTree = getQueryById(target, queryId, false);
  1596. if (queryTree)
  1597. {
  1598. queryTree->setProp("@priority", resp.getPriority());
  1599. updateMemoryLimitSetting(queryTree, resp.getMemoryLimit());
  1600. updateQuerySetting(resp.getTimeLimit_isNull(), queryTree, "@timeLimit", resp.getTimeLimit());
  1601. updateQuerySetting(resp.getWarnTimeLimit_isNull(), queryTree, "@warnTimeLimit", resp.getWarnTimeLimit());
  1602. updateQueryPriority(queryTree, resp.getPriority());
  1603. queryTree->setProp("@comment", resp.getComment());
  1604. }
  1605. }
  1606. wu->commit();
  1607. wu.clear();
  1608. PROGLOG("WURecreateQuery published: %s as %s/%s", wuid.str(), target, queryId.str());
  1609. resp.setQuerySet(target);
  1610. resp.setQueryName(srcQueryName);
  1611. resp.setQueryId(queryId.str());
  1612. Owned <IConstWUClusterInfo> clusterInfo = getTargetClusterInfo(target);
  1613. bool reloadFailed = false;
  1614. if (0!=req.getWait() && !req.getNoReload())
  1615. reloadFailed = !reloadCluster(clusterInfo, (unsigned)req.getWait());
  1616. resp.setReloadFailed(reloadFailed);
  1617. StringBuffer errorMessage;
  1618. if (!reloadFailed && !req.getNoReload() && isQuerySuspended(queryId.str(), clusterInfo, (unsigned)req.getWait(), errorMessage))
  1619. {
  1620. resp.setSuspended(true);
  1621. resp.setErrorMessage(errorMessage);
  1622. }
  1623. }
  1624. }
  1625. catch(IException* e)
  1626. {
  1627. FORWARDEXCEPTION(context, e, ECLWATCH_INTERNAL_ERROR);
  1628. }
  1629. return true;
  1630. }
  1631. bool CWsWorkunitsEx::onWUQueryDetails(IEspContext &context, IEspWUQueryDetailsRequest & req, IEspWUQueryDetailsResponse & resp)
  1632. {
  1633. const char* querySet = req.getQuerySet();
  1634. const char* queryIdOrAlias = req.getQueryId();
  1635. bool includeStateOnClusters = req.getIncludeStateOnClusters();
  1636. if (!querySet || !*querySet)
  1637. throw MakeStringException(ECLWATCH_QUERYSET_NOT_FOUND, "QuerySet not specified");
  1638. if (!queryIdOrAlias || !*queryIdOrAlias)
  1639. throw MakeStringException(ECLWATCH_QUERYID_NOT_FOUND, "QueryId not specified");
  1640. Owned<IPropertyTree> queryRegistry = getQueryRegistry(querySet, false);
  1641. Owned<IPropertyTree> query = resolveQueryAlias(queryRegistry, queryIdOrAlias);
  1642. if (!query)
  1643. {
  1644. DBGLOG("No matching Query");
  1645. throw MakeStringException(ECLWATCH_QUERYID_NOT_FOUND,"No matching query for given id or alias %s.", queryIdOrAlias);
  1646. }
  1647. const char* queryId = query->queryProp("@id");
  1648. resp.setQueryId(queryId);
  1649. resp.setQuerySet(querySet);
  1650. PROGLOG("WUQueryDetails: QuerySet %s, query %s", querySet, queryId);
  1651. const char* queryName = query->queryProp("@name");
  1652. const char* wuid = query->queryProp("@wuid");
  1653. resp.setQueryName(queryName);
  1654. resp.setWuid(wuid);
  1655. resp.setDll(query->queryProp("@dll"));
  1656. resp.setPublishedBy(query->queryProp("@publishedBy"));
  1657. resp.setSuspended(query->getPropBool("@suspended", false));
  1658. resp.setSuspendedBy(query->queryProp("@suspendedBy"));
  1659. resp.setComment(query->queryProp("@comment"));
  1660. double version = context.getClientVersion();
  1661. if (version >= 1.46)
  1662. {
  1663. Owned<IWorkUnitFactory> factory = getWorkUnitFactory(context.querySecManager(), context.queryUser());
  1664. Owned<IConstWorkUnit> cw = factory->openWorkUnit(wuid);
  1665. if(!cw)
  1666. throw MakeStringException(ECLWATCH_CANNOT_UPDATE_WORKUNIT,"Cannot open workunit %s.",wuid);
  1667. if (query->hasProp("@priority"))
  1668. resp.setPriority(getQueryPriorityName(query->getPropInt("@priority")));
  1669. resp.setIsLibrary(query->getPropBool("@isLibrary"));
  1670. SCMStringBuffer s;
  1671. resp.setWUSnapShot(cw->getSnapshot(s).str()); //Label
  1672. stat_type whenCompiled;
  1673. if (cw->getStatistic(whenCompiled, "", StWhenCompiled))
  1674. {
  1675. formatStatistic(s.s.clear(), whenCompiled, StWhenCompiled);
  1676. resp.setCompileTime(s.str());
  1677. }
  1678. StringArray libUsed, graphIds;
  1679. Owned<IConstWULibraryIterator> libs = &cw->getLibraries();
  1680. ForEach(*libs)
  1681. libUsed.append(libs->query().getName(s).str());
  1682. if (libUsed.length())
  1683. resp.setLibrariesUsed(libUsed);
  1684. if (version < 1.64)
  1685. {
  1686. unsigned numGraphIds = getGraphIdsByQueryId(querySet, queryId, graphIds);
  1687. resp.setCountGraphs(numGraphIds);
  1688. if (numGraphIds > 0)
  1689. resp.setGraphIds(graphIds);
  1690. }
  1691. }
  1692. StringArray logicalFiles;
  1693. IArrayOf<IEspQuerySuperFile> superFiles;
  1694. getQueryFiles(context, wuid, queryId, querySet, logicalFiles, req.getIncludeSuperFiles() ? &superFiles : NULL);
  1695. if (logicalFiles.length())
  1696. resp.setLogicalFiles(logicalFiles);
  1697. if (superFiles.length())
  1698. resp.setSuperFiles(superFiles);
  1699. if (version >= 1.42)
  1700. {
  1701. VStringBuffer xpath("Alias[@id='%s']", queryId);
  1702. IPropertyTree *alias = queryRegistry->queryPropTree(xpath.str());
  1703. if (!alias)
  1704. resp.setActivated(false);
  1705. else
  1706. resp.setActivated(true);
  1707. }
  1708. if (includeStateOnClusters && (version >= 1.43))
  1709. {
  1710. Owned<IPropertyTree> queriesOnCluster = getQueriesOnCluster(querySet, querySet, req.getCheckAllNodes());
  1711. if (queriesOnCluster)
  1712. {
  1713. IArrayOf<IEspClusterQueryState> clusterStates;
  1714. addClusterQueryStates(queriesOnCluster, querySet, queryId, clusterStates, version);
  1715. resp.setClusters(clusterStates);
  1716. }
  1717. }
  1718. if (version >= 1.50)
  1719. {
  1720. WsWuInfo winfo(context, wuid);
  1721. resp.setResourceURLCount(winfo.getResourceURLCount());
  1722. if (version >= 1.64)
  1723. {
  1724. IArrayOf<IEspECLTimer> timers;
  1725. winfo.doGetTimers(timers); //Graph Duration
  1726. if (timers.length())
  1727. resp.setWUTimers(timers);
  1728. IArrayOf<IEspECLGraph> graphs;
  1729. winfo.doGetGraphs(graphs); //Graph Name, Label, Started, Finished, Type
  1730. unsigned numGraphIds = graphs.length();
  1731. resp.setCountGraphs(numGraphIds);
  1732. if (numGraphIds > 0)
  1733. resp.setWUGraphs(graphs);
  1734. }
  1735. }
  1736. if (req.getIncludeWsEclAddresses())
  1737. {
  1738. StringArray wseclAddresses;
  1739. Owned<IEnvironmentFactory> factory = getEnvironmentFactory(true);
  1740. Owned<IConstEnvironment> env = factory->openEnvironment();
  1741. Owned<IPropertyTree> root = &env->getPTree();
  1742. Owned<IPropertyTreeIterator> services = root->getElements("Software/EspService[Properties/@type='ws_ecl']");
  1743. StringArray serviceNames;
  1744. VStringBuffer xpath("Target[@name='%s']", querySet);
  1745. ForEach(*services)
  1746. {
  1747. IPropertyTree &service = services->query();
  1748. if (!service.hasProp("Target") || service.hasProp(xpath))
  1749. serviceNames.append(service.queryProp("@name"));
  1750. }
  1751. Owned<IPropertyTreeIterator> processes = root->getElements("Software/EspProcess");
  1752. ForEach(*processes)
  1753. {
  1754. StringArray netAddrs;
  1755. IPropertyTree &process = processes->query();
  1756. Owned<IPropertyTreeIterator> instances = process.getElements("Instance");
  1757. ForEach(*instances)
  1758. {
  1759. IPropertyTree &instance = instances->query();
  1760. const char *netAddr = instance.queryProp("@netAddress");
  1761. if (!netAddr || !*netAddr)
  1762. continue;
  1763. if (streq(netAddr, "."))
  1764. netAddrs.appendUniq(envLocalAddress); //not necessarily local to this server
  1765. else
  1766. netAddrs.appendUniq(netAddr);
  1767. }
  1768. Owned<IPropertyTreeIterator> bindings = process.getElements("EspBinding");
  1769. ForEach(*bindings)
  1770. {
  1771. IPropertyTree &binding = bindings->query();
  1772. const char *srvName = binding.queryProp("@service");
  1773. if (!serviceNames.contains(srvName))
  1774. continue;
  1775. const char *port = binding.queryProp("@port"); //should always be an integer, but we're just concatenating strings
  1776. if (!port || !*port)
  1777. continue;
  1778. ForEachItemIn(i, netAddrs)
  1779. {
  1780. VStringBuffer wseclAddr("%s:%s", netAddrs.item(i), port);
  1781. wseclAddresses.append(wseclAddr);
  1782. }
  1783. }
  1784. }
  1785. resp.setWsEclAddresses(wseclAddresses);
  1786. }
  1787. return true;
  1788. }
  1789. int EspQuerySuperFileCompareFunc(IInterface * const *i1, IInterface * const *i2)
  1790. {
  1791. if (!i1 || !*i1 || !i2 || !*i2)
  1792. return 0;
  1793. IEspQuerySuperFile *sf1 = QUERYINTERFACE(*i1, IEspQuerySuperFile);
  1794. IEspQuerySuperFile *sf2 = QUERYINTERFACE(*i2, IEspQuerySuperFile);
  1795. if (!sf1 || !sf2)
  1796. return 0;
  1797. const char *name1 = sf1->getName();
  1798. const char *name2 = sf2->getName();
  1799. if (!name1 || !name2)
  1800. return 0;
  1801. return strcmp(name1, name2);
  1802. }
  1803. IReferencedFile* CWsWorkunitsEx::getReferencedFileByName(const char* name, IReferencedFileList* wufiles)
  1804. {
  1805. Owned<IReferencedFileIterator> refFileItr = wufiles->getFiles();
  1806. ForEach(*refFileItr)
  1807. {
  1808. IReferencedFile& rf = refFileItr->query();
  1809. const char* lfn = rf.getLogicalName();
  1810. if (lfn && strieq(lfn, name))
  1811. return &rf;
  1812. }
  1813. return NULL;
  1814. }
  1815. void CWsWorkunitsEx::readSuperFiles(IEspContext &context, IReferencedFile* rf, const char* fileName, IReferencedFileList* wufiles, IArrayOf<IEspQuerySuperFile>* files)
  1816. {
  1817. double version = context.getClientVersion();
  1818. StringArray subFiles;
  1819. IArrayOf<IEspQuerySuperFile> superFiles;
  1820. const StringArray& subFileNames = rf->getSubFileNames();
  1821. ForEachItemIn(i, subFileNames)
  1822. {
  1823. const char* name = subFileNames.item(i);
  1824. if (!name || !*name)
  1825. continue;
  1826. IReferencedFile* pRF = getReferencedFileByName(name, wufiles);
  1827. if (!pRF)
  1828. continue;
  1829. if (!(pRF->getFlags() & RefFileSuper))
  1830. {
  1831. subFiles.append(name);
  1832. }
  1833. else if (version >= 1.57)
  1834. {
  1835. readSuperFiles(context, pRF, name, wufiles, &superFiles);
  1836. }
  1837. }
  1838. Owned<IEspQuerySuperFile> newSuperFile = createQuerySuperFile();
  1839. newSuperFile->setName(fileName);
  1840. if (subFiles.length())
  1841. {
  1842. subFiles.sortAscii();
  1843. newSuperFile->setSubFiles(subFiles);
  1844. }
  1845. if ((version >= 1.57) && superFiles.length())
  1846. {
  1847. superFiles.sort(EspQuerySuperFileCompareFunc);
  1848. newSuperFile->setSuperFiles(superFiles);
  1849. }
  1850. files->append(*newSuperFile.getClear());
  1851. }
  1852. bool CWsWorkunitsEx::getQueryFiles(IEspContext &context, const char* wuid, const char* query, const char* target, StringArray& logicalFiles, IArrayOf<IEspQuerySuperFile> *respSuperFiles)
  1853. {
  1854. try
  1855. {
  1856. Owned<IConstWUClusterInfo> info = getTargetClusterInfo(target);
  1857. if (!info || (info->getPlatform()!=RoxieCluster))
  1858. return false;
  1859. SCMStringBuffer process;
  1860. info->getRoxieProcess(process);
  1861. if (!process.length())
  1862. return false;
  1863. Owned<IWorkUnitFactory> factory = getWorkUnitFactory();
  1864. Owned<IConstWorkUnit> cw = factory->openWorkUnit(wuid);
  1865. if (!cw)
  1866. return false;
  1867. StringArray superFileNames;
  1868. Owned<IHpccPackageSet> ps = createPackageSet(process.str());
  1869. Owned<IReferencedFileList> wufiles = createReferencedFileList(context.queryUserId(),
  1870. context.queryPassword(), true, true);
  1871. wufiles->addFilesFromQuery(cw, (ps) ? ps->queryActiveMap(target) : NULL, query);
  1872. wufiles->resolveFiles(process.str(), NULL, NULL, NULL, true, true, true, true);
  1873. Owned<IReferencedFileIterator> refFileItr = wufiles->getFiles();
  1874. ForEach(*refFileItr)
  1875. {
  1876. IReferencedFile &rf = refFileItr->query();
  1877. const char *lfn = rf.getLogicalName();
  1878. if (lfn && *lfn)
  1879. {
  1880. logicalFiles.append(lfn);
  1881. if (respSuperFiles && (rf.getFlags() & RefFileSuper))
  1882. readSuperFiles(context, &rf, lfn, wufiles, respSuperFiles);
  1883. }
  1884. }
  1885. logicalFiles.sortAscii();
  1886. if (respSuperFiles)
  1887. respSuperFiles->sort(EspQuerySuperFileCompareFunc);
  1888. return true;
  1889. }
  1890. catch(IMultiException *me)
  1891. {
  1892. StringBuffer err;
  1893. DBGLOG("ERROR control:getQueryXrefInfo roxie query info %s", me->errorMessage(err.append(me->errorCode()).append(' ')).str());
  1894. me->Release();
  1895. return false;
  1896. }
  1897. catch(IException *e)
  1898. {
  1899. StringBuffer err;
  1900. DBGLOG("ERROR control:getQueryXrefInfo roxie query info %s", e->errorMessage(err.append(e->errorCode()).append(' ')).str());
  1901. e->Release();
  1902. return false;
  1903. }
  1904. }
  1905. inline void verifyQueryActionAllowsWild(bool &allowWildChecked, CQuerySetQueryActionTypes action)
  1906. {
  1907. if (allowWildChecked)
  1908. return;
  1909. switch (action)
  1910. {
  1911. case CQuerySetQueryActionTypes_ToggleSuspend:
  1912. throw MakeStringException(ECLWATCH_INVALID_ACTION, "Wildcards not supported for toggling suspended state");
  1913. case CQuerySetQueryActionTypes_Activate:
  1914. throw MakeStringException(ECLWATCH_INVALID_ACTION, "Wildcards not supported for Activating queries");
  1915. }
  1916. allowWildChecked=true;
  1917. }
  1918. inline bool verifyQueryActionAllowsAlias(CQuerySetQueryActionTypes action)
  1919. {
  1920. return (action!=CQuerySetQueryActionTypes_Activate && action!=CQuerySetQueryActionTypes_RemoveAllAliases);
  1921. }
  1922. void expandQueryActionTargetList(IProperties *queryIds, IPropertyTree *queryset, IArrayOf<IConstQuerySetQueryActionItem> &items, CQuerySetQueryActionTypes action)
  1923. {
  1924. bool allowWildChecked=false;
  1925. ForEachItemIn(i, items)
  1926. {
  1927. const char *itemId = items.item(i).getQueryId();
  1928. if (!isWildString(itemId))
  1929. {
  1930. bool suspendedByUser = false;
  1931. const char *itemSuspendState = items.item(i).getClientState().getSuspended();
  1932. if (itemSuspendState && (strieq(itemSuspendState, "By User") || strieq(itemSuspendState, "1")))
  1933. suspendedByUser = true;
  1934. if (!verifyQueryActionAllowsAlias(action))
  1935. queryIds->setProp(itemId, suspendedByUser);
  1936. else
  1937. {
  1938. Owned<IPropertyTree> query = resolveQueryAlias(queryset, itemId);
  1939. if (query)
  1940. {
  1941. const char *id = query->queryProp("@id");
  1942. if (id && *id)
  1943. queryIds->setProp(id, suspendedByUser);
  1944. }
  1945. }
  1946. }
  1947. else
  1948. {
  1949. verifyQueryActionAllowsWild(allowWildChecked, action);
  1950. if (verifyQueryActionAllowsAlias(action))
  1951. {
  1952. Owned<IPropertyTreeIterator> active = queryset->getElements("Alias");
  1953. ForEach(*active)
  1954. {
  1955. const char *name = active->query().queryProp("@name");
  1956. const char *id = active->query().queryProp("@id");
  1957. if (name && id && WildMatch(name, itemId))
  1958. queryIds->setProp(id, 0);
  1959. }
  1960. }
  1961. Owned<IPropertyTreeIterator> queries = queryset->getElements("Query");
  1962. ForEach(*queries)
  1963. {
  1964. const char *id = queries->query().queryProp("@id");
  1965. if (id && WildMatch(id, itemId))
  1966. queryIds->setProp(id, 0);
  1967. }
  1968. }
  1969. }
  1970. }
  1971. void expandQueryActionTargetList(IProperties *queryIds, IPropertyTree *queryset, const char *id, CQuerySetQueryActionTypes action)
  1972. {
  1973. IArrayOf<IConstQuerySetQueryActionItem> items;
  1974. Owned<IEspQuerySetQueryActionItem> item = createQuerySetQueryActionItem();
  1975. item->setQueryId(id);
  1976. items.append(*(IConstQuerySetQueryActionItem*)item.getClear());
  1977. expandQueryActionTargetList(queryIds, queryset, items, action);
  1978. }
  1979. bool CWsWorkunitsEx::onWUQueryConfig(IEspContext &context, IEspWUQueryConfigRequest & req, IEspWUQueryConfigResponse & resp)
  1980. {
  1981. StringAttr target(req.getTarget());
  1982. if (target.isEmpty())
  1983. throw MakeStringException(ECLWATCH_MISSING_PARAMS, "Target name required");
  1984. if (!isValidCluster(target))
  1985. throw MakeStringException(ECLWATCH_INVALID_CLUSTER_NAME, "Invalid target name: %s", target.get());
  1986. Owned<IPropertyTree> queryset = getQueryRegistry(target.get(), false);
  1987. if (!queryset)
  1988. throw MakeStringException(ECLWATCH_QUERYSET_NOT_FOUND, "Target Queryset %s not found", req.getTarget());
  1989. PROGLOG("WUQueryConfig: target %s", target.get());
  1990. Owned<IProperties> queryIds = createProperties();
  1991. expandQueryActionTargetList(queryIds, queryset, req.getQueryId(), QuerySetQueryActionTypes_Undefined);
  1992. IArrayOf<IEspWUQueryConfigResult> results;
  1993. Owned<IPropertyIterator> it = queryIds->getIterator();
  1994. ForEach(*it)
  1995. {
  1996. Owned<IEspWUQueryConfigResult> result = createWUQueryConfigResult();
  1997. result->setQueryId(it->getPropKey());
  1998. VStringBuffer xpath("Query[@id='%s']", it->getPropKey());
  1999. IPropertyTree *queryTree = queryset->queryPropTree(xpath);
  2000. if (queryTree)
  2001. {
  2002. updateMemoryLimitSetting(queryTree, req.getMemoryLimit());
  2003. updateQueryPriority(queryTree, req.getPriority());
  2004. updateQuerySetting(req.getTimeLimit_isNull(), queryTree, "@timeLimit", req.getTimeLimit());
  2005. updateQuerySetting(req.getWarnTimeLimit_isNull(), queryTree, "@warnTimeLimit", req.getWarnTimeLimit());
  2006. if (req.getComment())
  2007. queryTree->setProp("@comment", req.getComment());
  2008. }
  2009. results.append(*result.getClear());
  2010. }
  2011. resp.setResults(results);
  2012. bool reloadFailed = false;
  2013. if (0!=req.getWait() && !req.getNoReload())
  2014. reloadFailed = !reloadCluster(target.get(), (unsigned)req.getWait());
  2015. resp.setReloadFailed(reloadFailed);
  2016. return true;
  2017. }
  2018. bool CWsWorkunitsEx::onWUQuerysetQueryAction(IEspContext &context, IEspWUQuerySetQueryActionRequest & req, IEspWUQuerySetQueryActionResponse & resp)
  2019. {
  2020. resp.setQuerySetName(req.getQuerySetName());
  2021. resp.setAction(req.getAction());
  2022. if (isEmpty(req.getQuerySetName()))
  2023. throw MakeStringException(ECLWATCH_MISSING_PARAMS, "Queryset name required");
  2024. Owned<IPropertyTree> queryset = getQueryRegistry(req.getQuerySetName(), true);
  2025. if (!queryset)
  2026. throw MakeStringException(ECLWATCH_QUERYSET_NOT_FOUND, "Queryset %s not found", req.getQuerySetName());
  2027. Owned<IProperties> queryIds = createProperties();
  2028. expandQueryActionTargetList(queryIds, queryset, req.getQueries(), req.getAction());
  2029. if (req.getAction() == CQuerySetQueryActionTypes_ResetQueryStats)
  2030. return resetQueryStats(context, req.getQuerySetName(), queryIds, resp);
  2031. IArrayOf<IEspQuerySetQueryActionResult> results;
  2032. Owned<IPropertyIterator> it = queryIds->getIterator();
  2033. ForEach(*it)
  2034. {
  2035. const char *id = it->getPropKey();
  2036. Owned<IEspQuerySetQueryActionResult> result = createQuerySetQueryActionResult();
  2037. result->setQueryId(id);
  2038. try
  2039. {
  2040. Owned<IPropertyTree> query = getQueryById(queryset, id);
  2041. if (!query)
  2042. throw MakeStringException(ECLWATCH_QUERYID_NOT_FOUND, "Query %s/%s not found.", req.getQuerySetName(), id);
  2043. CQuerySetQueryActionTypes action = req.getAction();
  2044. const char* strAction = (action > -1) && (action < NumOfQuerySetQueryActionTypes) ? QuerySetQueryActionTypes[action] : "Undefined";
  2045. PROGLOG("%s: queryset %s, query %s", strAction, req.getQuerySetName(), id);
  2046. switch (action)
  2047. {
  2048. case CQuerySetQueryActionTypes_ToggleSuspend:
  2049. setQuerySuspendedState(queryset, id, !queryIds->getPropBool(id), context.queryUserId());
  2050. break;
  2051. case CQuerySetQueryActionTypes_Suspend:
  2052. setQuerySuspendedState(queryset, id, true, context.queryUserId());
  2053. break;
  2054. case CQuerySetQueryActionTypes_Unsuspend:
  2055. setQuerySuspendedState(queryset, id, false, NULL);
  2056. break;
  2057. case CQuerySetQueryActionTypes_Activate:
  2058. setQueryAlias(queryset, query->queryProp("@name"), id);
  2059. break;
  2060. case CQuerySetQueryActionTypes_Delete:
  2061. removeNamedQuery(queryset, id);
  2062. query.clear();
  2063. break;
  2064. case CQuerySetQueryActionTypes_RemoveAllAliases:
  2065. removeAliasesFromNamedQuery(queryset, id);
  2066. break;
  2067. }
  2068. result->setSuccess(true);
  2069. if (query)
  2070. result->setSuspended(query->getPropBool("@suspended"));
  2071. }
  2072. catch(IException *e)
  2073. {
  2074. StringBuffer msg;
  2075. result->setMessage(e->errorMessage(msg).str());
  2076. result->setCode(e->errorCode());
  2077. result->setSuccess(false);
  2078. e->Release();
  2079. }
  2080. results.append(*result.getClear());
  2081. }
  2082. resp.setResults(results);
  2083. return true;
  2084. }
  2085. bool CWsWorkunitsEx::onWUQuerysetAliasAction(IEspContext &context, IEspWUQuerySetAliasActionRequest &req, IEspWUQuerySetAliasActionResponse &resp)
  2086. {
  2087. resp.setQuerySetName(req.getQuerySetName());
  2088. resp.setAction(req.getAction());
  2089. if (isEmpty(req.getQuerySetName()))
  2090. throw MakeStringException(ECLWATCH_MISSING_PARAMS, "Queryset name required");
  2091. Owned<IPropertyTree> queryset = getQueryRegistry(req.getQuerySetName(), true);
  2092. if (!queryset)
  2093. throw MakeStringException(ECLWATCH_QUERYSET_NOT_FOUND, "Queryset %s not found", req.getQuerySetName());
  2094. IArrayOf<IEspQuerySetAliasActionResult> results;
  2095. ForEachItemIn(i, req.getAliases())
  2096. {
  2097. IConstQuerySetAliasActionItem& item=req.getAliases().item(i);
  2098. Owned<IEspQuerySetAliasActionResult> result = createQuerySetAliasActionResult();
  2099. try
  2100. {
  2101. VStringBuffer xpath("Alias[@name='%s']", item.getName());
  2102. IPropertyTree *alias = queryset->queryPropTree(xpath.str());
  2103. if (!alias)
  2104. throw MakeStringException(ECLWATCH_ALIAS_NOT_FOUND, "Alias %s/%s not found.", req.getQuerySetName(), item.getName());
  2105. CQuerySetAliasActionTypes action = req.getAction();
  2106. const char* strAction = (action > -1) && (action < NumOfQuerySetAliasActionTypes) ? QuerySetAliasActionTypes[action] : "Undefined";
  2107. PROGLOG("%s: queryset %s, alias %s", strAction, req.getQuerySetName(), item.getName());
  2108. switch (action)
  2109. {
  2110. case CQuerySetAliasActionTypes_Deactivate:
  2111. removeQuerySetAlias(req.getQuerySetName(), item.getName());
  2112. break;
  2113. }
  2114. result->setSuccess(true);
  2115. }
  2116. catch(IException *e)
  2117. {
  2118. StringBuffer msg;
  2119. result->setMessage(e->errorMessage(msg).str());
  2120. result->setCode(e->errorCode());
  2121. result->setSuccess(false);
  2122. e->Release();
  2123. }
  2124. results.append(*result.getClear());
  2125. }
  2126. resp.setResults(results);
  2127. return true;
  2128. }
  2129. #define QUERYPATH_SEP_CHAR '/'
  2130. bool nextQueryPathNode(const char *&path, StringBuffer &node)
  2131. {
  2132. if (*path==QUERYPATH_SEP_CHAR)
  2133. path++;
  2134. while (*path && *path!=QUERYPATH_SEP_CHAR)
  2135. node.append(*path++);
  2136. return (*path && *++path);
  2137. }
  2138. bool splitQueryPath(const char *path, StringBuffer &netAddress, StringBuffer &queryset, StringBuffer *query)
  2139. {
  2140. if (!path || !*path)
  2141. return false;
  2142. if (*path==QUERYPATH_SEP_CHAR && path[1]==QUERYPATH_SEP_CHAR)
  2143. {
  2144. path+=2;
  2145. if (!nextQueryPathNode(path, netAddress))
  2146. return false;
  2147. }
  2148. if (!nextQueryPathNode(path, queryset))
  2149. return (query==NULL);
  2150. if (!query)
  2151. return false;
  2152. if (nextQueryPathNode(path, *query))
  2153. return false; //query path too deep
  2154. return true;
  2155. }
  2156. IPropertyTree *fetchRemoteQuerySetInfo(IEspContext *context, const char *srcAddress, const char *srcTarget)
  2157. {
  2158. if (!srcAddress || !*srcAddress || !srcTarget || !*srcTarget)
  2159. return NULL;
  2160. VStringBuffer url("http://%s%s/WsWorkunits/WUQuerysetDetails.xml?ver_=1.51&QuerySetName=%s&FilterType=All", srcAddress, (!strchr(srcAddress, ':')) ? ":8010" : "", srcTarget);
  2161. Owned<IHttpClientContext> httpCtx = getHttpClientContext();
  2162. Owned<IHttpClient> httpclient = httpCtx->createHttpClient(NULL, url);
  2163. const char *user = context->queryUserId();
  2164. if (user && *user)
  2165. httpclient->setUserID(user);
  2166. const char *pw = context->queryPassword();
  2167. if (pw && *pw)
  2168. httpclient->setPassword(pw);
  2169. StringBuffer request; //empty
  2170. StringBuffer response;
  2171. StringBuffer status;
  2172. if (0 > httpclient->sendRequest("GET", NULL, request, response, status) || !response.length() || strncmp("200", status, 3))
  2173. throw MakeStringException(ECLWATCH_QUERYSET_NOT_FOUND, "Error fetching remote queryset information: %s %s %s", srcAddress, srcTarget, status.str());
  2174. return createPTreeFromXMLString(response);
  2175. }
  2176. class QueryCloner
  2177. {
  2178. public:
  2179. QueryCloner(IEspContext *_context, const char *address, const char *source, const char *_target) :
  2180. context(_context), target(_target), srcAddress(address)
  2181. {
  2182. if (srcAddress.length())
  2183. srcQuerySet.setown(fetchRemoteQuerySetInfo(context, srcAddress, source));
  2184. else
  2185. srcQuerySet.setown(getQueryRegistry(source, true));
  2186. if (!srcQuerySet)
  2187. throw MakeStringException(ECLWATCH_QUERYSET_NOT_FOUND, "Source Queryset %s %s not found", srcAddress.str(), source);
  2188. destQuerySet.setown(getQueryRegistry(target, false));
  2189. if (!destQuerySet) // getQueryRegistry should have created if not found
  2190. throw MakeStringException(ECLWATCH_QUERYSET_NOT_FOUND, "Destination Queryset %s could not be created, or found", target.str());
  2191. factory.setown(getWorkUnitFactory(context->querySecManager(), context->queryUser()));
  2192. }
  2193. QueryCloner(IEspContext *_context, IPropertyTree *srcTree, const char *_target) :
  2194. context(_context), target(_target)
  2195. {
  2196. srcQuerySet.set(srcTree);
  2197. destQuerySet.setown(getQueryRegistry(target, false));
  2198. if (!destQuerySet) // getQueryRegistry should have created if not found
  2199. throw MakeStringException(ECLWATCH_QUERYSET_NOT_FOUND, "Destination Queryset %s could not be created, or found", target.str());
  2200. factory.setown(getWorkUnitFactory(context->querySecManager(), context->queryUser()));
  2201. }
  2202. void setQueryDirectory(const char *dir)
  2203. {
  2204. queryDirectory.set(dir);
  2205. }
  2206. void cloneQueryRemote(IPropertyTree *query, bool makeActive)
  2207. {
  2208. StringBuffer wuid = query->queryProp("Wuid");
  2209. if (!wuid.length())
  2210. return;
  2211. const char *queryName = query->queryProp("Name");
  2212. if (!queryName || !*queryName)
  2213. return;
  2214. StringBuffer xml;
  2215. MemoryBuffer dll;
  2216. StringBuffer dllname;
  2217. StringBuffer fetchedName;
  2218. StringBuffer remoteDfs;
  2219. fetchRemoteWorkunit(NULL, context, srcAddress.str(), NULL, NULL, wuid, fetchedName, xml, dllname, dll, remoteDfs);
  2220. deploySharedObject(*context, wuid, dllname, target, queryName, dll, queryDirectory, xml.str());
  2221. SCMStringBuffer existingQueryId;
  2222. queryIdFromQuerySetWuid(destQuerySet, wuid, queryName, existingQueryId);
  2223. if (existingQueryId.length())
  2224. {
  2225. existingQueryIds.append(existingQueryId.str());
  2226. if (makeActive)
  2227. activateQuery(destQuerySet, ACTIVATE_SUSPEND_PREVIOUS, queryName, existingQueryId.str(), context->queryUserId());
  2228. return;
  2229. }
  2230. StringBuffer newQueryId;
  2231. Owned<IWorkUnit> workunit = factory->updateWorkUnit(wuid);
  2232. addQueryToQuerySet(workunit, destQuerySet, queryName, makeActive ? ACTIVATE_SUSPEND_PREVIOUS : DO_NOT_ACTIVATE, newQueryId, context->queryUserId());
  2233. copiedQueryIds.append(newQueryId);
  2234. Owned<IPropertyTree> destQuery = getQueryById(destQuerySet, newQueryId);
  2235. if (destQuery)
  2236. {
  2237. Owned<IAttributeIterator> aiter = query->getAttributes();
  2238. ForEach(*aiter)
  2239. {
  2240. const char *atname = aiter->queryName();
  2241. if (!destQuery->hasProp(atname))
  2242. destQuery->setProp(atname, aiter->queryValue());
  2243. }
  2244. if (cloneFilesEnabled && wufiles)
  2245. wufiles->addFilesFromQuery(workunit, pm, newQueryId);
  2246. }
  2247. }
  2248. void cloneQueryLocal(IPropertyTree *query, bool makeActive)
  2249. {
  2250. const char *wuid = query->queryProp("@wuid");
  2251. if (!wuid || !*wuid)
  2252. return;
  2253. const char *queryName = query->queryProp("@name");
  2254. if (!queryName || !*queryName)
  2255. return;
  2256. SCMStringBuffer existingQueryId;
  2257. queryIdFromQuerySetWuid(destQuerySet, wuid, queryName, existingQueryId);
  2258. if (existingQueryId.length())
  2259. {
  2260. existingQueryIds.append(existingQueryId.str());
  2261. if (makeActive)
  2262. activateQuery(destQuerySet, ACTIVATE_SUSPEND_PREVIOUS, queryName, existingQueryId.str(), context->queryUserId());
  2263. return;
  2264. }
  2265. StringBuffer newQueryId;
  2266. Owned<IWorkUnit> workunit = factory->updateWorkUnit(wuid);
  2267. if (!workunit)
  2268. {
  2269. StringBuffer msg(wuid);
  2270. msg.append(": ").append(query->queryProp("@id"));
  2271. missingWuids.append(msg);
  2272. return;
  2273. }
  2274. if (!newQueryId.length())
  2275. addQueryToQuerySet(workunit, destQuerySet, queryName, makeActive ? ACTIVATE_SUSPEND_PREVIOUS : DO_NOT_ACTIVATE, newQueryId, context->queryUserId());
  2276. copiedQueryIds.append(newQueryId);
  2277. Owned<IPropertyTree> destQuery = getQueryById(destQuerySet, newQueryId);
  2278. if (destQuery)
  2279. {
  2280. Owned<IAttributeIterator> aiter = query->getAttributes();
  2281. ForEach(*aiter)
  2282. {
  2283. const char *atname = aiter->queryName();
  2284. if (!destQuery->hasProp(atname))
  2285. destQuery->setProp(atname, aiter->queryValue());
  2286. }
  2287. Owned<IPropertyTreeIterator> children = query->getElements("*");
  2288. ForEach(*children)
  2289. {
  2290. IPropertyTree &child = children->query();
  2291. destQuery->addPropTree(child.queryName(), createPTreeFromIPT(&child));
  2292. }
  2293. if (cloneFilesEnabled && wufiles)
  2294. wufiles->addFilesFromQuery(workunit, pm, newQueryId);
  2295. }
  2296. }
  2297. void cloneActiveRemote(bool makeActive)
  2298. {
  2299. Owned<IPropertyTreeIterator> activeQueries = srcQuerySet->getElements("QuerysetAliases/QuerySetAlias");
  2300. ForEach(*activeQueries)
  2301. {
  2302. IPropertyTree &alias = activeQueries->query();
  2303. VStringBuffer xpath("QuerysetQueries/QuerySetQuery[Id='%s'][1]", alias.queryProp("Id"));
  2304. IPropertyTree *query = srcQuerySet->queryPropTree(xpath);
  2305. if (!query)
  2306. continue;
  2307. cloneQueryRemote(query, makeActive);
  2308. }
  2309. }
  2310. void cloneActiveLocal(bool makeActive, const char *mask)
  2311. {
  2312. StringBuffer xpath("Alias");
  2313. if (mask && *mask)
  2314. xpath.appendf("[@id='%s']", mask);
  2315. Owned<IPropertyTreeIterator> activeQueries = srcQuerySet->getElements(xpath);
  2316. ForEach(*activeQueries)
  2317. {
  2318. IPropertyTree &alias = activeQueries->query();
  2319. Owned<IPropertyTree> query = getQueryById(srcQuerySet, alias.queryProp("@id"));
  2320. if (!query)
  2321. return;
  2322. cloneQueryLocal(query, makeActive);
  2323. }
  2324. }
  2325. void cloneActive(bool makeActive)
  2326. {
  2327. if (srcAddress.length())
  2328. cloneActiveRemote(makeActive);
  2329. else
  2330. cloneActiveLocal(makeActive, nullptr);
  2331. }
  2332. void cloneAllRemote(bool cloneActiveState)
  2333. {
  2334. Owned<IPropertyTreeIterator> allQueries = srcQuerySet->getElements("QuerysetQueries/QuerySetQuery");
  2335. ForEach(*allQueries)
  2336. {
  2337. IPropertyTree &query = allQueries->query();
  2338. bool makeActive = false;
  2339. if (cloneActiveState)
  2340. {
  2341. VStringBuffer xpath("QuerysetAliases/QuerySetAlias[Id='%s']", query.queryProp("Id"));
  2342. makeActive = srcQuerySet->hasProp(xpath);
  2343. }
  2344. cloneQueryRemote(&query, makeActive);
  2345. }
  2346. }
  2347. void cloneAllLocal(bool cloneActiveState, const char *mask)
  2348. {
  2349. StringBuffer xpath("Query");
  2350. if (mask && *mask)
  2351. xpath.appendf("[@id='%s']", mask);
  2352. Owned<IPropertyTreeIterator> allQueries = srcQuerySet->getElements(xpath);
  2353. ForEach(*allQueries)
  2354. {
  2355. IPropertyTree &query = allQueries->query();
  2356. bool makeActive = false;
  2357. if (cloneActiveState)
  2358. {
  2359. VStringBuffer xpath("Alias[@id='%s']", query.queryProp("@id"));
  2360. makeActive = srcQuerySet->hasProp(xpath);
  2361. }
  2362. cloneQueryLocal(&query, makeActive);
  2363. }
  2364. }
  2365. void cloneAll(bool cloneActiveState)
  2366. {
  2367. if (srcAddress.length())
  2368. cloneAllRemote(cloneActiveState);
  2369. else
  2370. cloneAllLocal(cloneActiveState, nullptr);
  2371. }
  2372. void enableFileCloning(unsigned _updateFlags, const char *dfsServer, const char *destProcess, const char *sourceProcess, bool allowForeign)
  2373. {
  2374. cloneFilesEnabled = true;
  2375. updateFlags = _updateFlags;
  2376. splitDerivedDfsLocation(dfsServer, srcCluster, dfsIP, srcPrefix, sourceProcess, sourceProcess, NULL, NULL);
  2377. wufiles.setown(createReferencedFileList(context->queryUserId(), context->queryPassword(), allowForeign, false));
  2378. Owned<IHpccPackageSet> ps = createPackageSet(destProcess);
  2379. pm.set(ps->queryActiveMap(target));
  2380. process.set(destProcess);
  2381. }
  2382. void cloneFiles()
  2383. {
  2384. if (cloneFilesEnabled)
  2385. {
  2386. wufiles->resolveFiles(process, dfsIP, srcPrefix, srcCluster, !(updateFlags & (DALI_UPDATEF_REPLACE_FILE | DALI_UPDATEF_CLONE_FROM)), true, false, true);
  2387. Owned<IDFUhelper> helper = createIDFUhelper();
  2388. Owned <IConstWUClusterInfo> cl = getTargetClusterInfo(target);
  2389. if (cl)
  2390. {
  2391. SCMStringBuffer process;
  2392. StringBuffer defReplicateFolder;
  2393. getConfigurationDirectory(NULL, "data2", "roxie", cl->getRoxieProcess(process).str(), defReplicateFolder);
  2394. wufiles->cloneAllInfo(updateFlags, helper, true, true, cl->getRoxieRedundancy(), cl->getChannelsPerNode(), cl->getRoxieReplicateOffset(), defReplicateFolder);
  2395. }
  2396. }
  2397. }
  2398. void gatherFileErrors(IArrayOf<IConstLogicalFileError> &errors)
  2399. {
  2400. ::gatherFileErrors(wufiles, errors);
  2401. }
  2402. private:
  2403. Linked<IEspContext> context;
  2404. Linked<IWorkUnitFactory> factory;
  2405. Owned<IPropertyTree> destQuerySet;
  2406. Owned<IPropertyTree> srcQuerySet;
  2407. Owned<IReferencedFileList> wufiles;
  2408. Owned<const IHpccPackageMap> pm;
  2409. StringBuffer dfsIP;
  2410. StringBuffer srcAddress;
  2411. StringBuffer srcCluster;
  2412. StringBuffer srcPrefix;
  2413. StringAttr target;
  2414. StringAttr process;
  2415. StringAttr queryDirectory;
  2416. bool cloneFilesEnabled = false;
  2417. unsigned updateFlags = 0;
  2418. public:
  2419. StringArray existingQueryIds;
  2420. StringArray copiedQueryIds;
  2421. StringArray missingWuids;
  2422. };
  2423. bool CWsWorkunitsEx::onWUCopyQuerySet(IEspContext &context, IEspWUCopyQuerySetRequest &req, IEspWUCopyQuerySetResponse &resp)
  2424. {
  2425. const char *source = req.getSource();
  2426. if (!source || !*source)
  2427. throw MakeStringException(ECLWATCH_MISSING_PARAMS, "No source target specified");
  2428. StringBuffer srcAddress;
  2429. StringBuffer srcTarget;
  2430. if (!splitQueryPath(source, srcAddress, srcTarget, NULL))
  2431. throw MakeStringException(ECLWATCH_INVALID_INPUT, "Invalid source target");
  2432. if (!srcAddress.length() && !isValidCluster(srcTarget))
  2433. throw MakeStringException(ECLWATCH_INVALID_CLUSTER_NAME, "Invalid source target name: %s", source);
  2434. const char *target = req.getTarget();
  2435. if (!target || !*target)
  2436. throw MakeStringException(ECLWATCH_MISSING_PARAMS, "No destination target specified");
  2437. if (!isValidCluster(target))
  2438. throw MakeStringException(ECLWATCH_INVALID_CLUSTER_NAME, "Invalid destination target name: %s", target);
  2439. DBGLOG("%s copying queryset %s from %s target %s", context.queryUserId(), target, srcAddress.str(), srcTarget.str());
  2440. QueryCloner cloner(&context, srcAddress, srcTarget, target);
  2441. cloner.setQueryDirectory(queryDirectory);
  2442. SCMStringBuffer process;
  2443. if (req.getCopyFiles())
  2444. {
  2445. Owned <IConstWUClusterInfo> clusterInfo = getTargetClusterInfo(target);
  2446. if (clusterInfo && clusterInfo->getPlatform()==RoxieCluster)
  2447. {
  2448. clusterInfo->getRoxieProcess(process);
  2449. if (!process.length())
  2450. throw MakeStringException(ECLWATCH_INVALID_CLUSTER_INFO, "DFS process cluster not found for destination target %s", target);
  2451. unsigned updateFlags = 0;
  2452. if (req.getOverwriteDfs())
  2453. updateFlags |= (DALI_UPDATEF_REPLACE_FILE | DALI_UPDATEF_CLONE_FROM | DALI_UPDATEF_SUPERFILES);
  2454. if (req.getUpdateCloneFrom())
  2455. updateFlags |= DALI_UPDATEF_CLONE_FROM;
  2456. if (req.getUpdateSuperFiles())
  2457. updateFlags |= DALI_UPDATEF_SUPERFILES;
  2458. if (req.getAppendCluster())
  2459. updateFlags |= DALI_UPDATEF_APPEND_CLUSTER;
  2460. cloner.enableFileCloning(updateFlags, req.getDfsServer(), process.str(), req.getSourceProcess(), req.getAllowForeignFiles());
  2461. }
  2462. }
  2463. if (req.getActiveOnly())
  2464. cloner.cloneActive(req.getCloneActiveState());
  2465. else
  2466. cloner.cloneAll(req.getCloneActiveState());
  2467. cloner.cloneFiles();
  2468. if (req.getIncludeFileErrors())
  2469. cloner.gatherFileErrors(resp.getFileErrors());
  2470. resp.setCopiedQueries(cloner.copiedQueryIds);
  2471. resp.setExistingQueries(cloner.existingQueryIds);
  2472. return true;
  2473. }
  2474. bool CWsWorkunitsEx::onWUQuerysetCopyQuery(IEspContext &context, IEspWUQuerySetCopyQueryRequest &req, IEspWUQuerySetCopyQueryResponse &resp)
  2475. {
  2476. unsigned start = msTick();
  2477. const char *source = req.getSource();
  2478. if (!source || !*source)
  2479. throw MakeStringException(ECLWATCH_MISSING_PARAMS, "No source query specified");
  2480. const char *target = req.getTarget();
  2481. if (!target || !*target)
  2482. throw MakeStringException(ECLWATCH_MISSING_PARAMS, "No destination specified");
  2483. if (strchr(target, '/')) //for future use
  2484. throw MakeStringException(ECLWATCH_INVALID_INPUT, "Invalid target queryset name");
  2485. if (req.getCluster() && *req.getCluster() && !strieq(req.getCluster(), target)) //backward compatability check
  2486. throw MakeStringException(ECLWATCH_INVALID_INPUT, "Invalid target cluster and queryset must match");
  2487. if (!isValidCluster(target))
  2488. throw MakeStringException(ECLWATCH_INVALID_CLUSTER_NAME, "Invalid target name: %s", target);
  2489. StringBuffer srcAddress, srcQuerySet, srcQuery;
  2490. if (!splitQueryPath(source, srcAddress, srcQuerySet, &srcQuery))
  2491. throw MakeStringException(ECLWATCH_INVALID_INPUT, "Invalid source query path");
  2492. StringAttr targetQueryName(req.getDestName());
  2493. Owned<IClientWUQuerySetDetailsResponse> sourceQueryInfoResp;
  2494. IConstQuerySetQuery *srcInfo=NULL;
  2495. DBGLOG("%s copying query %s to target %s from %s target %s", context.queryUserId(), srcQuery.str(), target, srcAddress.str(), srcQuerySet.str());
  2496. StringBuffer remoteIP;
  2497. StringBuffer wuid;
  2498. if (srcAddress.length())
  2499. {
  2500. StringBuffer xml;
  2501. MemoryBuffer dll;
  2502. StringBuffer dllname;
  2503. StringBuffer queryName;
  2504. fetchRemoteWorkunitAndQueryDetails(NULL, &context, srcAddress.str(), srcQuerySet.str(), srcQuery.str(), NULL, queryName, xml, dllname, dll, remoteIP, sourceQueryInfoResp);
  2505. if (sourceQueryInfoResp && sourceQueryInfoResp->getQuerysetQueries().ordinality())
  2506. srcInfo = &sourceQueryInfoResp->getQuerysetQueries().item(0);
  2507. if (srcInfo)
  2508. wuid.set(srcInfo->getWuid());
  2509. if (targetQueryName.isEmpty())
  2510. targetQueryName.set(queryName);
  2511. deploySharedObject(context, wuid, dllname.str(), target, targetQueryName.get(), dll, queryDirectory.str(), xml.str());
  2512. }
  2513. else
  2514. {
  2515. //Could get the atributes without soap call, but this creates a common data structure shared with fetching remote query info
  2516. //Get query attributes before resolveQueryAlias, to avoid deadlock
  2517. sourceQueryInfoResp.setown(fetchQueryDetails(NULL, &context, NULL, srcQuerySet, srcQuery));
  2518. if (sourceQueryInfoResp && sourceQueryInfoResp->getQuerysetQueries().ordinality())
  2519. srcInfo = &sourceQueryInfoResp->getQuerysetQueries().item(0);
  2520. Owned<IPropertyTree> queryset = getQueryRegistry(srcQuerySet.str(), true);
  2521. if (!queryset)
  2522. throw MakeStringException(ECLWATCH_QUERYSET_NOT_FOUND, "Source Queryset %s not found", srcQuery.str());
  2523. Owned<IPropertyTree> query = resolveQueryAlias(queryset, srcQuery.str());
  2524. if (!query)
  2525. throw MakeStringException(ECLWATCH_QUERYSET_NOT_FOUND, "Source query %s not found", source);
  2526. wuid.set(query->queryProp("@wuid"));
  2527. if (targetQueryName.isEmpty())
  2528. targetQueryName.set(query->queryProp("@name"));
  2529. }
  2530. Owned<IWorkUnitFactory> factory = getWorkUnitFactory(context.querySecManager(), context.queryUser());
  2531. Owned<IConstWorkUnit> cw = factory->openWorkUnit(wuid.str());
  2532. if (!req.getDontCopyFiles())
  2533. {
  2534. StringBuffer daliIP;
  2535. StringBuffer srcCluster;
  2536. StringBuffer srcPrefix;
  2537. splitDerivedDfsLocation(req.getDaliServer(), srcCluster, daliIP, srcPrefix, req.getSourceProcess(), req.getSourceProcess(), remoteIP.str(), NULL);
  2538. unsigned updateFlags = 0;
  2539. if (req.getOverwrite())
  2540. updateFlags |= (DALI_UPDATEF_REPLACE_FILE | DALI_UPDATEF_CLONE_FROM | DALI_UPDATEF_SUPERFILES);
  2541. if (req.getUpdateCloneFrom())
  2542. updateFlags |= DALI_UPDATEF_CLONE_FROM;
  2543. if (req.getUpdateSuperFiles())
  2544. updateFlags |= DALI_UPDATEF_SUPERFILES;
  2545. if (req.getAppendCluster())
  2546. updateFlags |= DALI_UPDATEF_APPEND_CLUSTER;
  2547. QueryFileCopier cpr(target);
  2548. cpr.init(context, req.getAllowForeignFiles());
  2549. cpr.remoteIP.set(daliIP);
  2550. cpr.remotePrefix.set(srcPrefix);
  2551. cpr.srcCluster.set(srcCluster);
  2552. cpr.queryname.set(targetQueryName);
  2553. cpr.copy(cw, updateFlags);
  2554. if (req.getIncludeFileErrors())
  2555. cpr.gatherFileErrors(resp.getFileErrors());
  2556. }
  2557. WorkunitUpdate wu(&cw->lock());
  2558. if (!wu)
  2559. throw MakeStringException(ECLWATCH_CANNOT_OPEN_WORKUNIT, "Error opening wuid %s for query %s", wuid.str(), source);
  2560. StringBuffer targetQueryId;
  2561. WUQueryActivationOptions activate = (WUQueryActivationOptions)req.getActivate();
  2562. addQueryToQuerySet(wu, target, targetQueryName.get(), activate, targetQueryId, context.queryUserId());
  2563. Owned<IPropertyTree> queryTree = getQueryById(target, targetQueryId, false);
  2564. if (queryTree)
  2565. {
  2566. updateMemoryLimitSetting(queryTree, req.getMemoryLimit(), srcInfo);
  2567. updateQueryPriority(queryTree, req.getPriority(), srcInfo);
  2568. updateTimeLimitSetting(queryTree, req.getTimeLimit_isNull(), req.getTimeLimit(), srcInfo);
  2569. updateWarnTimeLimitSetting(queryTree, req.getWarnTimeLimit_isNull(), req.getWarnTimeLimit(), srcInfo);
  2570. if (req.getComment())
  2571. queryTree->setProp("@comment", req.getComment());
  2572. else if (srcInfo && srcInfo->getComment())
  2573. queryTree->setProp("@comment", srcInfo->getComment());
  2574. if (srcInfo && srcInfo->getSnapshot())
  2575. queryTree->setProp("@snapshot", srcInfo->getSnapshot());
  2576. }
  2577. wu.clear();
  2578. resp.setQueryId(targetQueryId.str());
  2579. if (0!=req.getWait() && !req.getNoReload())
  2580. reloadCluster(target, remainingMsWait(req.getWait(), start));
  2581. return true;
  2582. }
  2583. bool CWsWorkunitsEx::onWUQuerysetImport(IEspContext &context, IEspWUQuerysetImportRequest &req, IEspWUQuerysetImportResponse &resp)
  2584. {
  2585. try
  2586. {
  2587. const char* target = req.getTarget();
  2588. if (!target || !*target)
  2589. throw MakeStringException(ECLWATCH_QUERYSET_NOT_FOUND, "Target not specified");
  2590. Owned <IConstWUClusterInfo> clusterInfo = getTargetClusterInfo(target);
  2591. if (!clusterInfo)
  2592. throw MakeStringException(ECLWATCH_CANNOT_RESOLVE_CLUSTER_NAME, "Target not found");
  2593. if (req.getCopyFiles() && clusterInfo->getPlatform()!=RoxieCluster)
  2594. throw MakeStringException(ECLWATCH_INVALID_ACTION, "Copy files option only supported for Roxie");
  2595. MemoryBuffer &mb = const_cast<MemoryBuffer &>(req.getData()); //for efficiency, content of request shouldn't matter after
  2596. if (req.getCompressed())
  2597. {
  2598. MemoryBuffer decompressed;
  2599. fastLZDecompressToBuffer(decompressed, mb);
  2600. mb.swapWith(decompressed);
  2601. }
  2602. mb.append('\0');
  2603. Owned<IPropertyTree> srcTree = createPTreeFromXMLString(mb.toByteArray());
  2604. const char *archivedTarget = srcTree->queryProp("@target");
  2605. if (archivedTarget && *archivedTarget) //support simple queryset or with archived (exported) root format
  2606. {
  2607. VStringBuffer xpath("QuerySet[@id='%s']", archivedTarget);
  2608. IPropertyTree *qsTree = srcTree->queryPropTree(xpath);
  2609. if (qsTree)
  2610. srcTree.setown(LINK(qsTree));
  2611. }
  2612. if (req.getReplace())
  2613. {
  2614. Owned<IPropertyTree> queryRegistry = getQueryRegistry(target, false);
  2615. queryRegistry->removeProp("*");
  2616. resp.setClearedExisting(true);
  2617. }
  2618. const bool activate = CQuerysetImportActivation_ImportedActive == req.getActivation(); //only two options now but may evolve
  2619. QueryCloner cloner(&context, srcTree, target);
  2620. SCMStringBuffer process;
  2621. if (req.getCopyFiles())
  2622. {
  2623. clusterInfo->getRoxieProcess(process); //checked if roxie when copying files above
  2624. if (!process.length())
  2625. throw MakeStringException(ECLWATCH_INVALID_CLUSTER_INFO, "DFS process cluster not found for destination target %s", target);
  2626. unsigned updateFlags = 0;
  2627. if (req.getOverwriteDfs())
  2628. updateFlags |= (DALI_UPDATEF_REPLACE_FILE | DALI_UPDATEF_CLONE_FROM | DALI_UPDATEF_SUPERFILES);
  2629. if (req.getUpdateCloneFrom())
  2630. updateFlags |= DALI_UPDATEF_CLONE_FROM;
  2631. if (req.getUpdateSuperFiles())
  2632. updateFlags |= DALI_UPDATEF_SUPERFILES;
  2633. if (req.getAppendCluster())
  2634. updateFlags |= DALI_UPDATEF_APPEND_CLUSTER;
  2635. cloner.enableFileCloning(updateFlags, req.getDfsServer(), process.str(), req.getSourceProcess(), req.getAllowForeignFiles());
  2636. }
  2637. if (req.getActiveOnly())
  2638. cloner.cloneActiveLocal(activate, req.getQueryMask());
  2639. else
  2640. cloner.cloneAllLocal(activate, req.getQueryMask());
  2641. cloner.cloneFiles();
  2642. if (req.getIncludeFileErrors())
  2643. cloner.gatherFileErrors(resp.getFileErrors());
  2644. resp.setImportedQueries(cloner.copiedQueryIds);
  2645. resp.setExistingQueries(cloner.existingQueryIds);
  2646. resp.setMissingWuids(cloner.missingWuids);
  2647. }
  2648. catch(IException* e)
  2649. {
  2650. FORWARDEXCEPTION(context, e, ECLWATCH_INTERNAL_ERROR);
  2651. }
  2652. return true;
  2653. }
  2654. bool CWsWorkunitsEx::onWUQuerysetExport(IEspContext &context, IEspWUQuerysetExportRequest &req, IEspWUQuerysetExportResponse &resp)
  2655. {
  2656. try
  2657. {
  2658. const char* target = req.getTarget();
  2659. if (!target || !*target)
  2660. throw MakeStringException(ECLWATCH_QUERYSET_NOT_FOUND, "Target not specified");
  2661. Owned<IPropertyTree> queryRegistry = getQueryRegistry(target, true);
  2662. if (req.getActiveOnly())
  2663. {
  2664. Owned<IPropertyTree> activeOnly = createPTree("QuerySet");
  2665. Owned<IAttributeIterator> attrs = queryRegistry->getAttributes();
  2666. ForEach(*attrs)
  2667. activeOnly->setProp(attrs->queryName(), attrs->queryValue());
  2668. Owned<IPropertyTreeIterator> aliases = queryRegistry->getElements("Alias");
  2669. ForEach(*aliases)
  2670. {
  2671. IPropertyTree &alias = aliases->query();
  2672. const char *id = alias.queryProp("@id");
  2673. if (id && *id)
  2674. {
  2675. VStringBuffer xpath("Query[@id='%s']", id);
  2676. IPropertyTree *query = queryRegistry->queryPropTree(xpath);
  2677. if (query)
  2678. {
  2679. activeOnly->addPropTree("Query", LINK(query));
  2680. activeOnly->addPropTree("Alias", LINK(&alias));
  2681. }
  2682. }
  2683. }
  2684. queryRegistry.setown(activeOnly.getClear());
  2685. }
  2686. if (req.getProtect())
  2687. {
  2688. StringArray wuids;
  2689. Owned<IPropertyTreeIterator> queries = queryRegistry->getElements("Query");
  2690. ForEach(*queries)
  2691. {
  2692. IPropertyTree &query = queries->query();
  2693. const char *wuid = query.queryProp("@wuid");
  2694. if (wuid && *wuid)
  2695. wuids.append(wuid);
  2696. }
  2697. if (wuids.length())
  2698. doProtectWorkunits(context, wuids, nullptr);
  2699. }
  2700. CDateTime dt;
  2701. dt.setNow();
  2702. StringBuffer dts;
  2703. VStringBuffer qs("<QuerySetArchive exported='%s' target='%s' activeOnly='%s'>\n", dt.getString(dts, true).str(), target, req.getActiveOnly() ? "true" : "false");
  2704. toXML(queryRegistry, qs);
  2705. qs.append("</QuerySetArchive>");
  2706. MemoryBuffer content;
  2707. if (req.getCompress())
  2708. fastLZCompressToBuffer(content, qs.length()+1, qs);
  2709. else
  2710. content.append(qs.str());
  2711. resp.setTarget(target);
  2712. resp.setCompressed(req.getCompress());
  2713. resp.setData(content);
  2714. }
  2715. catch(IException* e)
  2716. {
  2717. FORWARDEXCEPTION(context, e, ECLWATCH_INTERNAL_ERROR);
  2718. }
  2719. return true;
  2720. }
  2721. void CWsWorkunitsEx::getGraphsByQueryId(const char *target, const char *queryId, const char *graphId, const char *subGraphId, IArrayOf<IEspECLGraphEx>& ECLGraphs)
  2722. {
  2723. if (!target || !*target)
  2724. throw MakeStringException(ECLWATCH_MISSING_PARAMS, "Target name required");
  2725. if (!queryId || !*queryId)
  2726. throw MakeStringException(ECLWATCH_MISSING_PARAMS, "Query Id required");
  2727. Owned<IConstWUClusterInfo> info = getTargetClusterInfo(target);
  2728. if (!info || (info->getPlatform()!=RoxieCluster)) //Only support roxie for now
  2729. throw MakeStringException(ECLWATCH_INVALID_CLUSTER_NAME, "Invalid Roxie name");
  2730. PROGLOG("getGraphsByQueryId: target %s, query %s", target, queryId);
  2731. const SocketEndpointArray &eps = info->getRoxieServers();
  2732. if (eps.empty())
  2733. return;
  2734. VStringBuffer control("<control:querystats><Query id='%s'/></control:querystats>", queryId);
  2735. Owned<IPropertyTree> querystats = sendRoxieControlAllNodes(eps.item(0), control.str(), false, ROXIELOCKCONNECTIONTIMEOUT);
  2736. if (!querystats)
  2737. return;
  2738. Owned<IPropertyTreeIterator> graphs = querystats->getElements("Endpoint/Query/Graph");
  2739. ForEach(*graphs)
  2740. {
  2741. IPropertyTree &graph = graphs->query();
  2742. const char* aGraphId = graph.queryProp("@id");
  2743. if (graphId && *graphId && !strieq(graphId, aGraphId))
  2744. continue;
  2745. IPropertyTree* xgmml = graph.getBranch("xgmml/graph");
  2746. if (!xgmml)
  2747. continue;
  2748. Owned<IEspECLGraphEx> g = createECLGraphEx("","");
  2749. g->setName(aGraphId);
  2750. StringBuffer xml;
  2751. if (!subGraphId || !*subGraphId)
  2752. toXML(xgmml, xml);
  2753. else
  2754. {
  2755. VStringBuffer xpath("//node[@id='%s']", subGraphId);
  2756. toXML(xgmml->queryPropTree(xpath.str()), xml);
  2757. }
  2758. g->setGraph(xml.str());
  2759. ECLGraphs.append(*g.getClear());
  2760. }
  2761. return;
  2762. }
  2763. bool CWsWorkunitsEx::onWUQueryGetGraph(IEspContext& context, IEspWUQueryGetGraphRequest& req, IEspWUQueryGetGraphResponse& resp)
  2764. {
  2765. try
  2766. {
  2767. IArrayOf<IEspECLGraphEx> graphs;
  2768. getGraphsByQueryId(req.getTarget(), req.getQueryId(), req.getGraphName(), req.getSubGraphId(), graphs);
  2769. resp.setGraphs(graphs);
  2770. }
  2771. catch(IException* e)
  2772. {
  2773. FORWARDEXCEPTION(context, e, ECLWATCH_INTERNAL_ERROR);
  2774. }
  2775. return true;
  2776. }
  2777. bool CWsWorkunitsEx::resetQueryStats(IEspContext& context, const char* target, IProperties* queryIds, IEspWUQuerySetQueryActionResponse& resp)
  2778. {
  2779. IArrayOf<IEspQuerySetQueryActionResult> results;
  2780. Owned<IEspQuerySetQueryActionResult> result = createQuerySetQueryActionResult();
  2781. try
  2782. {
  2783. StringBuffer control;
  2784. Owned<IPropertyIterator> it = queryIds->getIterator();
  2785. ForEach(*it)
  2786. {
  2787. const char *queryId = it->getPropKey();
  2788. if (queryId && *queryId)
  2789. {
  2790. appendXMLOpenTag(control, "Query", NULL, false);
  2791. appendXMLAttr(control, "id", queryId);
  2792. if (target && *target)
  2793. appendXMLAttr(control, "target", target);
  2794. control.append("/>");
  2795. }
  2796. }
  2797. if (!control.length())
  2798. throw MakeStringException(ECLWATCH_MISSING_PARAMS, "CWsWorkunitsEx::resetQueryStats: Query ID not specified");
  2799. control.insert(0, "<control:resetquerystats>");
  2800. control.append("</control:resetquerystats>");
  2801. if (!sendControlQuery(context, target, control.str(), ROXIECONNECTIONTIMEOUT))
  2802. throw MakeStringException(ECLWATCH_INTERNAL_ERROR, "CWsWorkunitsEx::resetQueryStats: Failed to send roxie control query");
  2803. result->setMessage("Query stats reset succeeded");
  2804. result->setSuccess(true);;
  2805. }
  2806. catch(IMultiException *me)
  2807. {
  2808. StringBuffer msg;
  2809. result->setMessage(me->errorMessage(msg).str());
  2810. result->setCode(me->errorCode());
  2811. result->setSuccess(false);
  2812. me->Release();
  2813. }
  2814. catch(IException *e)
  2815. {
  2816. StringBuffer msg;
  2817. result->setMessage(e->errorMessage(msg).str());
  2818. result->setCode(e->errorCode());
  2819. result->setSuccess(false);
  2820. e->Release();
  2821. }
  2822. results.append(*result.getClear());
  2823. resp.setResults(results);
  2824. return true;
  2825. }
  2826. IPropertyTree* CWsWorkunitsEx::sendControlQuery(IEspContext& context, const char* target, const char* query, unsigned timeout)
  2827. {
  2828. if (!target || !*target)
  2829. throw MakeStringException(ECLWATCH_MISSING_PARAMS, "CWsWorkunitsEx::sendControlQuery: target not specified");
  2830. if (!query || !*query)
  2831. throw MakeStringException(ECLWATCH_MISSING_PARAMS, "CWsWorkunitsEx::sendControlQuery: Control query not specified");
  2832. Owned<IConstWUClusterInfo> info = getTargetClusterInfo(target);
  2833. if (!info || (info->getPlatform()!=RoxieCluster)) //Only support roxie for now
  2834. throw MakeStringException(ECLWATCH_INVALID_CLUSTER_NAME, "CWsWorkunitsEx::sendControlQuery: Invalid target name %s", target);
  2835. const SocketEndpointArray &eps = info->getRoxieServers();
  2836. if (eps.empty())
  2837. throw MakeStringException(ECLWATCH_INVALID_CLUSTER_NAME, "CWsWorkunitsEx::sendControlQuery: Server not found for %s", target);
  2838. Owned<ISocket> sock = ISocket::connect_timeout(eps.item(0), timeout);
  2839. return sendRoxieControlQuery(sock, query, timeout);
  2840. }
  2841. bool CWsWorkunitsEx::onWUUpdateQueryEntry(IEspContext& context, IEspWUUpdateQueryEntryRequest& req, IEspWUUpdateQueryEntryResponse& resp)
  2842. {
  2843. try
  2844. {
  2845. StringBuffer querySetName, query;
  2846. ensureInputString(req.getQuerySet(), true, querySetName, ECLWATCH_QUERYSET_NOT_FOUND, "Query Set not specified");
  2847. ensureInputString(req.getQueryId(), true, query, ECLWATCH_QUERYID_NOT_FOUND, "Query not specified");
  2848. Owned<IPropertyTree> querySet = getQueryRegistry(querySetName.str(), true);
  2849. if (!querySet)
  2850. throw MakeStringException(ECLWATCH_QUERYSET_NOT_FOUND, "Queryset %s not found", querySetName.str());
  2851. VStringBuffer xpath("Query[@id=\"%s\"]", query.str());
  2852. IPropertyTree *tree = querySet->queryPropTree(xpath);
  2853. if (!tree)
  2854. throw MakeStringException(ECLWATCH_QUERYSET_NOT_FOUND, "Query %s not found", query.str());
  2855. StringBuffer comment = req.getComment();
  2856. if (comment.isEmpty())
  2857. tree->removeProp("@comment");
  2858. else
  2859. tree->setProp("@comment", comment.str());
  2860. }
  2861. catch(IException* e)
  2862. {
  2863. FORWARDEXCEPTION(context, e, ECLWATCH_INTERNAL_ERROR);
  2864. }
  2865. return true;
  2866. }
  2867. bool CWsWorkunitsEx::onWUGetNumFileToCopy(IEspContext& context, IEspWUGetNumFileToCopyRequest& req, IEspWUGetNumFileToCopyResponse& resp)
  2868. {
  2869. class CWUGetNumFileToCopyPager : public CSimpleInterface, implements IElementsPager
  2870. {
  2871. StringAttr clusterName;
  2872. StringAttr sortOrder;
  2873. public:
  2874. IMPLEMENT_IINTERFACE_USING(CSimpleInterface);
  2875. CWUGetNumFileToCopyPager(const char* _clusterName, const char *_sortOrder)
  2876. : clusterName(_clusterName), sortOrder(_sortOrder) { };
  2877. virtual IRemoteConnection* getElements(IArrayOf<IPropertyTree> &elements)
  2878. {
  2879. SocketEndpointArray servers;
  2880. getRoxieProcessServers(clusterName.get(), servers);
  2881. if (servers.length() < 1)
  2882. {
  2883. PROGLOG("WUGetNumFileToCopy: Process Server not found for %s", clusterName.get());
  2884. return NULL;
  2885. }
  2886. Owned<IPropertyTree> result = sendRoxieControlAllNodes(servers.item(0), "<control:numfilestoprocess/>", false, ROXIELOCKCONNECTIONTIMEOUT);
  2887. if (!result)
  2888. {
  2889. PROGLOG("WUGetNumFileToCopy: Empty result received for cluster %s", clusterName.get());
  2890. return NULL;
  2891. }
  2892. Owned<IPropertyTreeIterator> iter = result->getElements("*");
  2893. if (!iter)
  2894. return NULL;
  2895. StringArray unknownAttributes;
  2896. sortElements(iter, sortOrder.get(), NULL, NULL, unknownAttributes, elements);
  2897. return NULL;
  2898. }
  2899. virtual bool allMatchingElementsReceived() { return true; } //For now, roxie always returns all of matched items.
  2900. };
  2901. try
  2902. {
  2903. StringBuffer clusterName = req.getClusterName();
  2904. if (clusterName.isEmpty())
  2905. throw MakeStringException(ECLWATCH_CANNOT_RESOLVE_CLUSTER_NAME, "Cluster not specified");
  2906. StringBuffer so;
  2907. bool descending = req.getDescending();
  2908. if (descending)
  2909. so.set("-");
  2910. const char *sortBy = req.getSortby();
  2911. if (!isEmptyString(sortBy) && strieq(sortBy, "URL"))
  2912. so.append("?@ep");
  2913. else if (!isEmptyString(sortBy) && strieq(sortBy, "Status"))
  2914. so.append("?Status");
  2915. else
  2916. so.append("#FilesToProcess/@value");
  2917. unsigned pageSize = req.getPageSize();
  2918. unsigned pageStartFrom = req.getPageStartFrom();
  2919. if(pageSize < 1)
  2920. pageSize = 100;
  2921. __int64 cacheHint = 0;
  2922. if (!req.getCacheHint_isNull())
  2923. cacheHint = req.getCacheHint();
  2924. unsigned numberOfEndpoints = 0;
  2925. IArrayOf<IPropertyTree> results;
  2926. Owned<IElementsPager> elementsPager = new CWUGetNumFileToCopyPager(clusterName.str(), so.str());
  2927. getElementsPaged(elementsPager, pageStartFrom, pageSize, NULL, "", &cacheHint, results, &numberOfEndpoints, NULL, false);
  2928. IArrayOf<IEspClusterEndpoint> endpoints;
  2929. ForEachItemIn(i, results)
  2930. {
  2931. IPropertyTree &item = results.item(i);
  2932. Owned<IEspClusterEndpoint> endpoint = createClusterEndpoint();
  2933. endpoint->setURL(item.queryProp("@ep"));
  2934. endpoint->setStatus(item.queryProp("Status"));
  2935. endpoint->setNumQueryFileToCopy(item.getPropInt("FilesToProcess/@value", 0));
  2936. endpoints.append(*endpoint.getClear());
  2937. }
  2938. resp.setEndpoints(endpoints);
  2939. resp.setCacheHint(cacheHint);
  2940. resp.setTotal(numberOfEndpoints);
  2941. }
  2942. catch(IException* e)
  2943. {
  2944. FORWARDEXCEPTION(context, e, ECLWATCH_INTERNAL_ERROR);
  2945. }
  2946. return true;
  2947. }
  2948. void getSummaryStatsByQueryId(const char *target, const char *queryId, const char *fromTime, const char *toTime, IArrayOf<IEspQuerySummaryStats>& querySummaryStatsList)
  2949. {
  2950. if (!target || !*target)
  2951. throw MakeStringException(ECLWATCH_MISSING_PARAMS, "Target name required");
  2952. if (!queryId || !*queryId)
  2953. throw MakeStringException(ECLWATCH_MISSING_PARAMS, "Query Id required");
  2954. Owned<IConstWUClusterInfo> info = getTargetClusterInfo(target);
  2955. if (!info || (info->getPlatform()!=RoxieCluster)) //Only support roxie for now
  2956. throw MakeStringException(ECLWATCH_INVALID_CLUSTER_NAME, "Invalid Roxie name");
  2957. PROGLOG("getSummaryStatsByQueryId: target %s, query %s", target, queryId);
  2958. const SocketEndpointArray &eps = info->getRoxieServers();
  2959. if (eps.empty())
  2960. return;
  2961. VStringBuffer control("<control:queryAggregates");
  2962. if (!isEmpty(fromTime))
  2963. control.appendf(" from='%s'", fromTime);
  2964. if (!isEmpty(toTime))
  2965. control.appendf(" to='%s'", toTime);
  2966. control.appendf("><Query id='%s'/></control:queryAggregates>", queryId);
  2967. Owned<IPropertyTree> queryAggregates = sendRoxieControlAllNodes(eps.item(0), control.str(), false, ROXIELOCKCONNECTIONTIMEOUT);
  2968. if (!queryAggregates)
  2969. return;
  2970. if (getEspLogLevel() >= LogMax)
  2971. {
  2972. StringBuffer sb;
  2973. toXML(queryAggregates, sb);
  2974. DBGLOG("getSummaryStatsByQueryId(): '%s' => '%s'", control.str(), sb.str());
  2975. }
  2976. //Parse queryAggregates and build querySummaryStatsList.
  2977. Owned<IPropertyTreeIterator> aggregates = queryAggregates->getElements("Endpoint");
  2978. ForEach(*aggregates)
  2979. {
  2980. IPropertyTree &aggregate = aggregates->query();
  2981. const char *status = aggregate.queryProp("Status");
  2982. const char *ep = aggregate.queryProp("@ep");
  2983. if (isEmptyString(ep))
  2984. continue;
  2985. IPropertyTree *query = aggregate.queryPropTree("Query");
  2986. Owned<IEspQuerySummaryStats> querySummaryStats = createQuerySummaryStats();
  2987. querySummaryStats->setEndpoint(ep);
  2988. if (query->hasProp("countFailed"))
  2989. querySummaryStats->setCountFailed(query->getPropInt("countFailed"));
  2990. if (query->hasProp("countTotal"))
  2991. querySummaryStats->setCountTotal(query->getPropInt("countTotal"));
  2992. if (query->hasProp("averageBytesOut"))
  2993. querySummaryStats->setAverageBytesOut(query->getPropInt64("averageBytesOut"));
  2994. if (query->hasProp("averageMemUsed"))
  2995. querySummaryStats->setSizeAvgPeakMemory(query->getPropInt64("averageMemUsed"));
  2996. if (query->hasProp("averageSlavesReplyLen"))
  2997. querySummaryStats->setAverageSlavesReplyLen(query->getPropInt("averageSlavesReplyLen"));
  2998. if (query->hasProp("averageTimeMs"))
  2999. querySummaryStats->setTimeAvgTotalExecuteMinutes(query->getPropInt64("averageTimeMs"));
  3000. if (query->hasProp("minTimeMs"))
  3001. querySummaryStats->setTimeMinTotalExecuteMinutes(query->getPropInt64("minTimeMs"));
  3002. if (query->hasProp("maxTimeMs"))
  3003. querySummaryStats->setTimeMaxTotalExecuteMinutes(query->getPropInt64("maxTimeMs"));
  3004. if (query->hasProp("percentile97"))
  3005. {
  3006. querySummaryStats->setPercentile97(query->getPropInt("percentile97"));
  3007. if (query->hasProp("percentile97/@estimate"))
  3008. querySummaryStats->setPercentile97Estimate(query->getPropBool("percentile97/@estimate"));
  3009. }
  3010. const char *startTime = query->queryProp("startTime");
  3011. const char *endTime = query->queryProp("endTime");
  3012. if (!isEmptyString(startTime))
  3013. querySummaryStats->setStartTime(startTime);
  3014. if (!isEmptyString(endTime))
  3015. querySummaryStats->setEndTime(endTime);
  3016. if (!isEmptyString(status))
  3017. querySummaryStats->setStatus(status);
  3018. querySummaryStatsList.append(*querySummaryStats.getLink());
  3019. }
  3020. return;
  3021. }
  3022. bool CWsWorkunitsEx::onWUQueryGetSummaryStats(IEspContext& context, IEspWUQueryGetSummaryStatsRequest& req, IEspWUQueryGetSummaryStatsResponse& resp)
  3023. {
  3024. try
  3025. {
  3026. IArrayOf<IEspQuerySummaryStats> querySummaryStatsList;
  3027. getSummaryStatsByQueryId(req.getTarget(), req.getQueryId(), req.getFromTime(), req.getToTime(), querySummaryStatsList);
  3028. resp.setStatsList(querySummaryStatsList);
  3029. }
  3030. catch(IException* e)
  3031. {
  3032. FORWARDEXCEPTION(context, e, ECLWATCH_INTERNAL_ERROR);
  3033. }
  3034. return true;
  3035. }