ws_smcService.cpp 105 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323132413251326132713281329133013311332133313341335133613371338133913401341134213431344134513461347134813491350135113521353135413551356135713581359136013611362136313641365136613671368136913701371137213731374137513761377137813791380138113821383138413851386138713881389139013911392139313941395139613971398139914001401140214031404140514061407140814091410141114121413141414151416141714181419142014211422142314241425142614271428142914301431143214331434143514361437143814391440144114421443144414451446144714481449145014511452145314541455145614571458145914601461146214631464146514661467146814691470147114721473147414751476147714781479148014811482148314841485148614871488148914901491149214931494149514961497149814991500150115021503150415051506150715081509151015111512151315141515151615171518151915201521152215231524152515261527152815291530153115321533153415351536153715381539154015411542154315441545154615471548154915501551155215531554155515561557155815591560156115621563156415651566156715681569157015711572157315741575157615771578157915801581158215831584158515861587158815891590159115921593159415951596159715981599160016011602160316041605160616071608160916101611161216131614161516161617161816191620162116221623162416251626162716281629163016311632163316341635163616371638163916401641164216431644164516461647164816491650165116521653165416551656165716581659166016611662166316641665166616671668166916701671167216731674167516761677167816791680168116821683168416851686168716881689169016911692169316941695169616971698169917001701170217031704170517061707170817091710171117121713171417151716171717181719172017211722172317241725172617271728172917301731173217331734173517361737173817391740174117421743174417451746174717481749175017511752175317541755175617571758175917601761176217631764176517661767176817691770177117721773177417751776177717781779178017811782178317841785178617871788178917901791179217931794179517961797179817991800180118021803180418051806180718081809181018111812181318141815181618171818181918201821182218231824182518261827182818291830183118321833183418351836183718381839184018411842184318441845184618471848184918501851185218531854185518561857185818591860186118621863186418651866186718681869187018711872187318741875187618771878187918801881188218831884188518861887188818891890189118921893189418951896189718981899190019011902190319041905190619071908190919101911191219131914191519161917191819191920192119221923192419251926192719281929193019311932193319341935193619371938193919401941194219431944194519461947194819491950195119521953195419551956195719581959196019611962196319641965196619671968196919701971197219731974197519761977197819791980198119821983198419851986198719881989199019911992199319941995199619971998199920002001200220032004200520062007200820092010201120122013201420152016201720182019202020212022202320242025202620272028202920302031203220332034203520362037203820392040204120422043204420452046204720482049205020512052205320542055205620572058205920602061206220632064206520662067206820692070207120722073207420752076207720782079208020812082208320842085208620872088208920902091209220932094209520962097209820992100210121022103210421052106210721082109211021112112211321142115211621172118211921202121212221232124212521262127212821292130213121322133213421352136213721382139214021412142214321442145214621472148214921502151215221532154215521562157215821592160216121622163216421652166216721682169217021712172217321742175217621772178217921802181218221832184218521862187218821892190219121922193219421952196219721982199220022012202220322042205220622072208220922102211221222132214221522162217221822192220222122222223222422252226222722282229223022312232223322342235223622372238223922402241224222432244224522462247224822492250225122522253225422552256225722582259226022612262226322642265226622672268226922702271227222732274227522762277227822792280228122822283228422852286228722882289229022912292229322942295229622972298229923002301230223032304230523062307230823092310231123122313231423152316231723182319232023212322232323242325232623272328232923302331233223332334233523362337233823392340234123422343234423452346234723482349235023512352235323542355235623572358235923602361236223632364236523662367236823692370237123722373237423752376237723782379238023812382238323842385238623872388238923902391239223932394239523962397239823992400240124022403240424052406240724082409241024112412241324142415241624172418241924202421242224232424242524262427242824292430243124322433243424352436243724382439244024412442244324442445244624472448244924502451245224532454245524562457245824592460246124622463246424652466246724682469247024712472247324742475247624772478247924802481248224832484248524862487248824892490249124922493249424952496249724982499250025012502250325042505250625072508250925102511251225132514251525162517251825192520252125222523252425252526252725282529253025312532253325342535253625372538253925402541254225432544254525462547254825492550255125522553255425552556255725582559256025612562256325642565256625672568256925702571257225732574257525762577257825792580258125822583258425852586258725882589259025912592259325942595259625972598259926002601260226032604260526062607260826092610261126122613261426152616261726182619262026212622262326242625262626272628262926302631263226332634263526362637263826392640264126422643264426452646264726482649265026512652265326542655265626572658265926602661266226632664266526662667266826692670267126722673267426752676267726782679268026812682268326842685268626872688268926902691269226932694269526962697269826992700270127022703270427052706270727082709271027112712271327142715271627172718271927202721272227232724272527262727272827292730273127322733273427352736273727382739274027412742274327442745274627472748274927502751275227532754275527562757275827592760276127622763276427652766276727682769277027712772277327742775277627772778277927802781
  1. /*##############################################################################
  2. HPCC SYSTEMS software Copyright (C) 2012 HPCC Systems.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. ############################################################################## */
  13. #pragma warning (disable : 4786)
  14. #include "build-config.h"
  15. #ifdef _USE_OPENLDAP
  16. #include "ldapsecurity.ipp"
  17. #endif
  18. #include "ws_smcService.hpp"
  19. #include "wshelpers.hpp"
  20. #include "dalienv.hpp"
  21. #include "WUWrapper.hpp"
  22. #include "wujobq.hpp"
  23. #include "dfuwu.hpp"
  24. #include "exception_util.hpp"
  25. #include "roxiecontrol.hpp"
  26. #include "workunit.hpp"
  27. #define STATUS_SERVER_THOR "ThorMaster"
  28. #define STATUS_SERVER_HTHOR "HThorServer"
  29. #define STATUS_SERVER_ROXIE "RoxieServer"
  30. #define STATUS_SERVER_DFUSERVER "DFUserver"
  31. #define STATUS_SERVER_ECLSERVER "ECLserver"
  32. #define STATUS_SERVER_ECLCCSERVER "ECLCCserver"
  33. #define STATUS_SERVER_ECLAGENT "ECLagent"
  34. static const char* FEATURE_URL = "SmcAccess";
  35. const char* THORQUEUE_FEATURE = "ThorQueueAccess";
  36. static const char* ROXIE_CONTROL_URL = "RoxieControlAccess";
  37. const char* PERMISSIONS_FILENAME = "espsmc_permissions.xml";
  38. void AccessSuccess(IEspContext& context, char const * msg,...) __attribute__((format(printf, 2, 3)));
  39. void AccessSuccess(IEspContext& context, char const * msg,...)
  40. {
  41. StringBuffer buf;
  42. buf.appendf("User %s: ",context.queryUserId());
  43. va_list args;
  44. va_start(args, msg);
  45. buf.valist_appendf(msg, args);
  46. va_end(args);
  47. AUDIT(AUDIT_TYPE_ACCESS_SUCCESS,buf.str());
  48. }
  49. void AccessFailure(IEspContext& context, char const * msg,...) __attribute__((format(printf, 2, 3)));
  50. void AccessFailure(IEspContext& context, char const * msg,...)
  51. {
  52. StringBuffer buf;
  53. buf.appendf("User %s: ",context.queryUserId());
  54. va_list args;
  55. va_start(args, msg);
  56. buf.valist_appendf(msg, args);
  57. va_end(args);
  58. AUDIT(AUDIT_TYPE_ACCESS_FAILURE,buf.str());
  59. }
  60. struct QueueWrapper
  61. {
  62. QueueWrapper(const char* targetName, const char* queueExt)
  63. {
  64. StringBuffer name;
  65. name.append(targetName).append('.').append(queueExt);
  66. queue.setown(createJobQueue(name.str()));
  67. }
  68. QueueWrapper(const char* queueName)
  69. {
  70. queue.setown(createJobQueue(queueName));
  71. }
  72. operator IJobQueue*() { return queue.get(); }
  73. IJobQueue* operator->() { return queue.get(); }
  74. Owned<IJobQueue> queue;
  75. };
  76. struct QueueLock
  77. {
  78. QueueLock(IJobQueue* q): queue(q) { queue->lock(); }
  79. ~QueueLock()
  80. {
  81. queue->unlock();
  82. }
  83. Linked<IJobQueue> queue;
  84. };
  85. static int sortTargetClustersByNameDescending(IInterface **L, IInterface **R)
  86. {
  87. IEspTargetCluster *left = (IEspTargetCluster *) *L;
  88. IEspTargetCluster *right = (IEspTargetCluster *) *R;
  89. return strcmp(right->getClusterName(), left->getClusterName());
  90. }
  91. static int sortTargetClustersByNameAscending(IInterface **L, IInterface **R)
  92. {
  93. IEspTargetCluster *left = (IEspTargetCluster *) *L;
  94. IEspTargetCluster *right = (IEspTargetCluster *) *R;
  95. return strcmp(left->getClusterName(), right->getClusterName());
  96. }
  97. static int sortTargetClustersBySizeDescending(IInterface **L, IInterface **R)
  98. {
  99. IEspTargetCluster *left = (IEspTargetCluster *) *L;
  100. IEspTargetCluster *right = (IEspTargetCluster *) *R;
  101. return right->getClusterSize() - left->getClusterSize();
  102. }
  103. static int sortTargetClustersBySizeAscending(IInterface **L, IInterface **R)
  104. {
  105. IEspTargetCluster *left = (IEspTargetCluster *) *L;
  106. IEspTargetCluster *right = (IEspTargetCluster *) *R;
  107. return left->getClusterSize() - right->getClusterSize();
  108. }
  109. void CWsSMCEx::init(IPropertyTree *cfg, const char *process, const char *service)
  110. {
  111. if (!daliClientActive())
  112. {
  113. ERRLOG("No Dali Connection Active.");
  114. throw MakeStringException(-1, "No Dali Connection Active. Please Specify a Dali to connect to in you configuration file");
  115. }
  116. m_BannerAction = 0;
  117. m_EnableChatURL = false;
  118. m_BannerSize = "4";
  119. m_BannerColor = "red";
  120. m_BannerScroll = "2";
  121. StringBuffer xpath;
  122. xpath.appendf("Software/EspProcess[@name='%s']/@portalurl", process);
  123. const char* portalURL = cfg->queryProp(xpath.str());
  124. if (portalURL && *portalURL)
  125. m_PortalURL.append(portalURL);
  126. }
  127. static void countProgress(IPropertyTree *t,unsigned &done,unsigned &total)
  128. {
  129. total = 0;
  130. done = 0;
  131. Owned<IPropertyTreeIterator> it = t->getElements("DFT/progress");
  132. ForEach(*it) {
  133. IPropertyTree &e=it->query();
  134. if (e.getPropInt("@done",0))
  135. done++;
  136. total++;
  137. }
  138. }
  139. struct CActiveWorkunitWrapper: public CActiveWorkunit
  140. {
  141. CActiveWorkunitWrapper(IEspContext &context, const char* wuid,const char* location = NULL,unsigned index=0): CActiveWorkunit("","")
  142. {
  143. double version = context.getClientVersion();
  144. CWUWrapper wu(wuid, context);
  145. StringBuffer stateStr;
  146. SCMStringBuffer state,owner,jobname;
  147. setWuid(wuid);
  148. wu->getStateDesc(state);
  149. if(index && location && *location)
  150. stateStr.appendf("queued(%d) [%s on %s]", index, state.str(), location);
  151. else if(index)
  152. stateStr.appendf("queued(%d) [%s]", index, state.str());
  153. else if(location && *location)
  154. stateStr.appendf("%s [%s]", state.str(), location);
  155. else
  156. stateStr.set(state.str());
  157. setStateID(wu->getState());
  158. if ((version > 1.00) && (wu->getState() == WUStateBlocked))
  159. {
  160. SCMStringBuffer stateEx;
  161. setExtra(wu->getStateEx(stateEx).str());
  162. stateStr.appendf(" %s", stateEx.str());
  163. }
  164. setState(stateStr.str());
  165. if ((version > 1.09) && (wu->getState() == WUStateFailed))
  166. setWarning("The job will ultimately not complete. Please check ECLAgent.");
  167. setOwner(wu->getUser(owner).str());
  168. setJobname(wu->getJobName(jobname).str());
  169. switch(wu->getPriority())
  170. {
  171. case PriorityClassHigh: setPriority("high"); break;
  172. default:
  173. case PriorityClassNormal: setPriority("normal"); break;
  174. case PriorityClassLow: setPriority("low"); break;
  175. }
  176. if (version > 1.08 && wu->isPausing())
  177. {
  178. setIsPausing(true);
  179. }
  180. if (version > 1.14)
  181. {
  182. SCMStringBuffer clusterName;
  183. setClusterName(wu->getClusterName(clusterName).str());
  184. }
  185. }
  186. CActiveWorkunitWrapper(const char* wuid,const char* owner, const char* jobname, const char* state, const char* priority): CActiveWorkunit("","")
  187. {
  188. setWuid(wuid);
  189. setState(state);
  190. setOwner(owner);
  191. setJobname(jobname);
  192. setPriority(priority);
  193. }
  194. };
  195. bool CWsSMCEx::onIndex(IEspContext &context, IEspSMCIndexRequest &req, IEspSMCIndexResponse &resp)
  196. {
  197. resp.setRedirectUrl("/");
  198. return true;
  199. }
  200. static int stringcmp(const char **a, const char **b)
  201. {
  202. return strcmp(*a, *b);
  203. }
  204. bool isInWuList(IArrayOf<IEspActiveWorkunit>& aws, const char* wuid)
  205. {
  206. bool bFound = false;
  207. if (wuid && *wuid && (aws.length() > 0))
  208. {
  209. ForEachItemIn(k, aws)
  210. {
  211. IEspActiveWorkunit& wu = aws.item(k);
  212. const char* wuid0 = wu.getWuid();
  213. const char* server0 = wu.getServer();
  214. if (wuid0 && !strcmp(wuid0, wuid) && (!server0 || strcmp(server0, "ECLagent")))
  215. {
  216. bFound = true;
  217. break;
  218. }
  219. }
  220. }
  221. return bFound;
  222. }
  223. //This function will only be called when client version < 1.16
  224. void addQueuedWorkUnits(const char *queueName, CJobQueueContents &contents, IArrayOf<IEspActiveWorkunit> &aws, IEspContext &context, const char *serverName, const char *instanceName)
  225. {
  226. Owned<IJobQueueIterator> iter = contents.getIterator();
  227. unsigned count=0;
  228. ForEach(*iter)
  229. {
  230. if (!isInWuList(aws, iter->query().queryWUID()))
  231. {
  232. try
  233. {
  234. Owned<IEspActiveWorkunit> wu(new CActiveWorkunitWrapper(context, iter->query().queryWUID(),NULL,++count));
  235. wu->setServer(serverName);
  236. wu->setInstance(instanceName); // JCSMORE In thor case at least, if queued it is unknown which instance it will run on..
  237. wu->setQueueName(queueName);
  238. aws.append(*wu.getLink());
  239. }
  240. catch (IException *e)
  241. {
  242. // JCSMORE->KWang what is this handling? Why would this succeeed and above fail?
  243. StringBuffer msg;
  244. Owned<IEspActiveWorkunit> wu(new CActiveWorkunitWrapper(iter->query().queryWUID(), "", "", e->errorMessage(msg).str(), "normal"));
  245. wu->setServer(serverName);
  246. wu->setInstance(instanceName);
  247. wu->setQueueName(queueName);
  248. aws.append(*wu.getLink());
  249. e->Release();
  250. }
  251. }
  252. }
  253. }
  254. void CWsSMCEx::getQueueState(int runningJobsInQueue, StringBuffer& queueState, BulletType& bulletType)
  255. {
  256. bool queuePausedOrStopped = false;
  257. if ((queueState.length() > 0) && (strieq(queueState.str(),"stopped") || strieq(queueState.str(),"paused")))
  258. queuePausedOrStopped = true;
  259. else
  260. queueState.set("running");
  261. bulletType = bulletGreen;
  262. if (NotFound == runningJobsInQueue)
  263. {
  264. if (queuePausedOrStopped)
  265. bulletType = bulletWhite;
  266. else
  267. bulletType = bulletError;
  268. }
  269. else if (runningJobsInQueue > 0)
  270. {
  271. if (queuePausedOrStopped)
  272. bulletType = bulletOrange;
  273. else
  274. bulletType = bulletGreen;
  275. }
  276. else if (queuePausedOrStopped)
  277. bulletType = bulletYellow;
  278. return;
  279. }
  280. void CWsSMCEx::readClusterTypeAndQueueName(CConstWUClusterInfoArray& clusters, const char* clusterName, StringBuffer& clusterType, SCMStringBuffer& clusterQueue)
  281. {
  282. if (!clusterName || !*clusterName)
  283. return;
  284. ForEachItemIn(cl, clusters)
  285. {
  286. IConstWUClusterInfo &cluster = clusters.item(cl);
  287. SCMStringBuffer str;
  288. cluster.getName(str);
  289. if (!streq(str.str(), clusterName))
  290. continue;
  291. if (cluster.getPlatform() == HThorCluster)
  292. {
  293. cluster.getAgentQueue(clusterQueue);
  294. clusterType.set("HThor");
  295. }
  296. else if (cluster.getPlatform() == RoxieCluster)
  297. {
  298. cluster.getAgentQueue(clusterQueue);
  299. clusterType.set("Roxie");
  300. }
  301. else
  302. {
  303. cluster.getThorQueue(clusterQueue);
  304. clusterType.set("Thor");
  305. }
  306. break;
  307. }
  308. return;
  309. }
  310. void CWsSMCEx::addRunningWUs(IEspContext &context, IPropertyTree& node, CConstWUClusterInfoArray& clusters,
  311. IArrayOf<IEspActiveWorkunit>& aws, BoolHash& uniqueWUIDs,
  312. StringArray& runningQueueNames, int* runningJobsInQueue)
  313. {
  314. StringBuffer instance;
  315. StringBuffer qname;
  316. int serverID = -1;
  317. const char* name = node.queryProp("@name");
  318. if (name && *name)
  319. {
  320. node.getProp("@queue", qname);
  321. if (0 == stricmp("ThorMaster", name))
  322. {
  323. node.getProp("@thorname",instance);
  324. }
  325. else if (0 == stricmp(name, "ECLAgent"))
  326. {
  327. qname.append(name);
  328. }
  329. if ((instance.length()==0))
  330. {
  331. instance.append( !strcmp(name, "ECLagent") ? "ECL agent" : name);
  332. instance.append(" on ").append(node.queryProp("@node"));
  333. }
  334. }
  335. if (qname.length() > 0)
  336. {
  337. StringArray qlist;
  338. qlist.appendListUniq(qname.str(), ",");
  339. ForEachItemIn(q, qlist)
  340. {
  341. const char *_qname = qlist.item(q);
  342. serverID = runningQueueNames.find(_qname);
  343. if (NotFound == serverID)
  344. {
  345. serverID = runningQueueNames.ordinality(); // i.e. last
  346. runningQueueNames.append(_qname);
  347. }
  348. }
  349. }
  350. Owned<IPropertyTreeIterator> wuids(node.getElements("WorkUnit"));
  351. ForEach(*wuids)
  352. {
  353. const char* wuid=wuids->query().queryProp(NULL);
  354. if (!wuid)
  355. continue;
  356. if (streq(qname.str(), "ECLagent") && uniqueWUIDs.getValue(wuid))
  357. continue;
  358. try
  359. {
  360. IEspActiveWorkunit* wu=new CActiveWorkunitWrapper(context,wuid);
  361. const char* servername = node.queryProp("@name");
  362. const char *cluster = node.queryProp("Cluster");
  363. wu->setServer(servername);
  364. wu->setInstance(instance.str());
  365. StringBuffer queueName;
  366. if (cluster) // backward compat check.
  367. getClusterThorQueueName(queueName, cluster);
  368. else
  369. queueName.append(qname);
  370. serverID = runningQueueNames.find(queueName.str());
  371. wu->setQueueName(queueName);
  372. double version = context.getClientVersion();
  373. if (version > 1.01)
  374. {
  375. if (wu->getStateID() == WUStateRunning)
  376. {
  377. int sg_duration = node.getPropInt("@sg_duration", -1);
  378. const char* graph = node.queryProp("@graph");
  379. int subgraph = node.getPropInt("@subgraph", -1);
  380. if (subgraph > -1 && sg_duration > -1)
  381. {
  382. StringBuffer durationStr;
  383. StringBuffer subgraphStr;
  384. durationStr.appendf("%d min", sg_duration);
  385. subgraphStr.appendf("%d", subgraph);
  386. wu->setGraphName(graph);
  387. wu->setDuration(durationStr.str());
  388. wu->setGID(subgraphStr.str());
  389. }
  390. int memoryBlocked = node.getPropInt("@memoryBlocked ", 0);
  391. if (memoryBlocked != 0)
  392. {
  393. wu->setMemoryBlocked(1);
  394. }
  395. if (serverID > -1)
  396. {
  397. runningJobsInQueue[serverID]++;
  398. }
  399. if ((version > 1.14) && streq(queueName.str(), "ECLagent"))
  400. {
  401. const char* clusterName = wu->getClusterName();
  402. if (clusterName && *clusterName)
  403. {
  404. StringBuffer clusterType;
  405. SCMStringBuffer clusterQueue;
  406. readClusterTypeAndQueueName(clusters, clusterName, clusterType, clusterQueue);
  407. wu->setClusterType(clusterType.str());
  408. wu->setClusterQueueName(clusterQueue.str());
  409. }
  410. }
  411. }
  412. }
  413. uniqueWUIDs.setValue(wuid, true);
  414. aws.append(*wu);
  415. }
  416. catch (IException *e)
  417. {
  418. StringBuffer msg;
  419. Owned<IEspActiveWorkunit> wu(new CActiveWorkunitWrapper(wuid, "", "", e->errorMessage(msg).str(), "normal"));
  420. wu->setServer(node.queryProp("@name"));
  421. wu->setInstance(instance.str());
  422. wu->setQueueName(qname.str());
  423. aws.append(*wu.getLink());
  424. e->Release();
  425. }
  426. }
  427. return;
  428. }
  429. void CWsSMCEx::readBannerAndChatRequest(IEspContext& context, IEspActivityRequest &req, IEspActivityResponse& resp)
  430. {
  431. StringBuffer chatURLStr, bannerStr;
  432. const char* chatURL = req.getChatURL();
  433. const char* banner = req.getBannerContent();
  434. //Filter out invalid chars
  435. if (chatURL && *chatURL)
  436. {
  437. const char* pStr = chatURL;
  438. unsigned len = strlen(chatURL);
  439. for (unsigned i = 0; i < len; i++)
  440. {
  441. if (isprint(*pStr))
  442. chatURLStr.append(*pStr);
  443. pStr++;
  444. }
  445. }
  446. if (banner && *banner)
  447. {
  448. const char* pStr = banner;
  449. unsigned len = strlen(banner);
  450. for (unsigned i = 0; i < len; i++)
  451. {
  452. bannerStr.append(isprint(*pStr) ? *pStr : '.');
  453. pStr++;
  454. }
  455. }
  456. chatURLStr.trim();
  457. bannerStr.trim();
  458. if (!req.getBannerAction_isNull() && req.getBannerAction() && (bannerStr.length() < 1))
  459. throw MakeStringException(ECLWATCH_MISSING_BANNER_CONTENT, "If a Banner is enabled, the Banner content must be specified.");
  460. if (!req.getEnableChatURL_isNull() && req.getEnableChatURL() && (chatURLStr.length() < 1))
  461. throw MakeStringException(ECLWATCH_MISSING_CHAT_URL, "If a Chat is enabled, the Chat URL must be specified.");
  462. //Now, store the strings since they are valid.
  463. m_ChatURL = chatURLStr;
  464. m_Banner = bannerStr;
  465. const char* bannerSize = req.getBannerSize();
  466. if (bannerSize && *bannerSize)
  467. m_BannerSize.set(bannerSize);
  468. const char* bannerColor = req.getBannerColor();
  469. if (bannerColor && *bannerColor)
  470. m_BannerColor.set(bannerColor);
  471. const char* bannerScroll = req.getBannerScroll();
  472. if (bannerScroll && *bannerScroll)
  473. m_BannerScroll.set(bannerScroll);
  474. m_BannerAction = req.getBannerAction();
  475. if(!req.getEnableChatURL_isNull())
  476. m_EnableChatURL = req.getEnableChatURL();
  477. }
  478. void CWsSMCEx::setBannerAndChatData(double version, IEspActivityResponse& resp)
  479. {
  480. resp.setShowBanner(m_BannerAction);
  481. resp.setShowChatURL(m_EnableChatURL);
  482. resp.setBannerContent(m_Banner.str());
  483. resp.setBannerSize(m_BannerSize.str());
  484. resp.setBannerColor(m_BannerColor.str());
  485. resp.setChatURL(m_ChatURL.str());
  486. if (version >= 1.08)
  487. resp.setBannerScroll(m_BannerScroll.str());
  488. }
  489. void CWsSMCEx::getServersAndWUs(IEspContext &context, IEspActivityRequest &req, IEspActivityResponse& resp, double version,
  490. IPropertyTree* envRoot, CConstWUClusterInfoArray& clusters)
  491. {
  492. BoolHash uniqueWUIDs;
  493. Owned<IRemoteConnection> conn = querySDS().connect("/Status/Servers",myProcessSession(),RTM_LOCK_READ,30000);
  494. StringArray runningQueueNames;
  495. int runningJobsInQueue[256];
  496. for (int i = 0; i < 256; i++)
  497. runningJobsInQueue[i] = 0;
  498. IArrayOf<IEspActiveWorkunit> aws;
  499. if (conn.get())
  500. {
  501. Owned<IPropertyTreeIterator> it(conn->queryRoot()->getElements("Server[@name!=\"ECLagent\"]"));
  502. ForEach(*it)
  503. addRunningWUs(context, it->query(), clusters, aws, uniqueWUIDs, runningQueueNames, runningJobsInQueue);
  504. Owned<IPropertyTreeIterator> it1(conn->queryRoot()->getElements("Server[@name=\"ECLagent\"]"));
  505. ForEach(*it1)
  506. addRunningWUs(context, it1->query(), clusters, aws, uniqueWUIDs, runningQueueNames, runningJobsInQueue);
  507. }
  508. SecAccessFlags access;
  509. bool fullAccess=(context.authorizeFeature(THORQUEUE_FEATURE, access) && access>=SecAccess_Full);
  510. IArrayOf<IEspThorCluster> ThorClusters;
  511. IArrayOf<IEspHThorCluster> HThorClusters;
  512. IArrayOf<IEspRoxieCluster> RoxieClusters;
  513. ForEachItemIn(c, clusters)
  514. {
  515. IConstWUClusterInfo &cluster = clusters.item(c);
  516. SCMStringBuffer str;
  517. if (cluster.getThorProcesses().ordinality())
  518. {
  519. IEspThorCluster* returnCluster = new CThorCluster("","");
  520. returnCluster->setThorLCR(ThorLCRCluster == cluster.getPlatform() ? "withLCR" : "noLCR");
  521. str.clear();
  522. returnCluster->setClusterName(cluster.getName(str).str());
  523. str.clear();
  524. const char *queueName = cluster.getThorQueue(str).str();
  525. returnCluster->setQueueName(queueName);
  526. StringBuffer queueState, queueStateDetails;
  527. CJobQueueContents contents;
  528. Owned<IJobQueue> queue = createJobQueue(queueName);
  529. queue->copyItemsAndState(contents, queueState, queueStateDetails);
  530. addQueuedWorkUnits(queueName, contents, aws, context, "ThorMaster", NULL);
  531. BulletType bulletType = bulletGreen;
  532. int serverID = runningQueueNames.find(queueName);
  533. int numRunningJobsInQueue = (NotFound != serverID) ? runningJobsInQueue[serverID] : -1;
  534. getQueueState(numRunningJobsInQueue, queueState, bulletType);
  535. StringBuffer agentQueueState, agentQueueStateDetails;
  536. CJobQueueContents agentContents;
  537. SCMStringBuffer str1;
  538. const char *agentQueueName = cluster.getAgentQueue(str1).str();
  539. Owned<IJobQueue> agentQueue = createJobQueue(agentQueueName);
  540. agentQueue->copyItemsAndState(agentContents, agentQueueState, agentQueueStateDetails);
  541. //Use the same 'queueName' because the job belongs to the same cluster
  542. addQueuedWorkUnits(queueName, agentContents, aws, context, "ThorMaster", NULL);
  543. if (bulletType == bulletGreen)
  544. {//If ThorQueue is normal, check the AgentQueue
  545. serverID = runningQueueNames.find(agentQueueName);
  546. numRunningJobsInQueue = (NotFound != serverID) ? runningJobsInQueue[serverID] : -1;
  547. getQueueState(numRunningJobsInQueue, queueState, bulletType);
  548. }
  549. returnCluster->setQueueStatus(queueState.str());
  550. if (version > 1.06)
  551. returnCluster->setQueueStatus2(bulletType);
  552. if (version > 1.10)
  553. returnCluster->setClusterSize(cluster.getSize());
  554. addToThorClusterList(ThorClusters, returnCluster, req.getSortBy(), req.getDescending());
  555. }
  556. if (version > 1.06)
  557. {
  558. str.clear();
  559. if (cluster.getRoxieProcess(str).length())
  560. {
  561. IEspRoxieCluster* returnCluster = new CRoxieCluster("","");
  562. str.clear();
  563. returnCluster->setClusterName(cluster.getName(str).str());
  564. str.clear();
  565. returnCluster->setQueueName(cluster.getAgentQueue(str).str());
  566. str.clear();
  567. const char *queueName = cluster.getAgentQueue(str).str();
  568. StringBuffer queueState, queueStateDetails;
  569. CJobQueueContents contents;
  570. Owned<IJobQueue> queue = createJobQueue(queueName);
  571. queue->copyItemsAndState(contents, queueState, queueStateDetails);
  572. addQueuedWorkUnits(queueName, contents, aws, context, "RoxieServer", NULL);
  573. BulletType bulletType = bulletGreen;
  574. int serverID = runningQueueNames.find(queueName);
  575. int numRunningJobsInQueue = (NotFound != serverID) ? runningJobsInQueue[serverID] : -1;
  576. getQueueState(numRunningJobsInQueue, queueState, bulletType);
  577. returnCluster->setQueueStatus(queueState.str());
  578. returnCluster->setQueueStatus2(bulletType);
  579. if (version > 1.10)
  580. returnCluster->setClusterSize(cluster.getSize());
  581. addToRoxieClusterList(RoxieClusters, returnCluster, req.getSortBy(), req.getDescending());
  582. }
  583. }
  584. if (version > 1.11 && (cluster.getPlatform() == HThorCluster))
  585. {
  586. IEspHThorCluster* returnCluster = new CHThorCluster("","");
  587. str.clear();
  588. returnCluster->setClusterName(cluster.getName(str).str());
  589. str.clear();
  590. returnCluster->setQueueName(cluster.getAgentQueue(str).str());
  591. str.clear();
  592. const char *queueName = cluster.getAgentQueue(str).str();
  593. StringBuffer queueState, queueStateDetails;
  594. CJobQueueContents contents;
  595. Owned<IJobQueue> queue = createJobQueue(queueName);
  596. queue->copyItemsAndState(contents, queueState, queueStateDetails);
  597. addQueuedWorkUnits(queueName, contents, aws, context, "HThorServer", NULL);
  598. BulletType bulletType = bulletGreen;
  599. int serverID = runningQueueNames.find(queueName);
  600. int numRunningJobsInQueue = (NotFound != serverID) ? runningJobsInQueue[serverID] : -1;
  601. getQueueState(numRunningJobsInQueue, queueState, bulletType);
  602. returnCluster->setQueueStatus(queueState.str());
  603. returnCluster->setQueueStatus2(bulletType);
  604. HThorClusters.append(*returnCluster);
  605. }
  606. }
  607. resp.setThorClusters(ThorClusters);
  608. if (version > 1.06)
  609. resp.setRoxieClusters(RoxieClusters);
  610. if (version > 1.10)
  611. {
  612. resp.setSortBy(req.getSortBy());
  613. resp.setDescending(req.getDescending());
  614. }
  615. if (version > 1.11)
  616. {
  617. resp.setHThorClusters(HThorClusters);
  618. if (fullAccess)
  619. resp.setAccessRight("Access_Full");
  620. }
  621. IArrayOf<IEspServerJobQueue> serverJobQueues;
  622. IArrayOf<IConstTpEclServer> eclccservers;
  623. CTpWrapper dummy;
  624. dummy.getTpEclCCServers(envRoot->queryBranch("Software"), eclccservers);
  625. ForEachItemIn(x1, eclccservers)
  626. {
  627. IConstTpEclServer& eclccserver = eclccservers.item(x1);
  628. const char* serverName = eclccserver.getName();
  629. if (!serverName || !*serverName)
  630. continue;
  631. Owned <IStringIterator> targetClusters = getTargetClusters(eqEclCCServer, serverName);
  632. if (!targetClusters->first())
  633. continue;
  634. ForEach (*targetClusters)
  635. {
  636. SCMStringBuffer targetCluster;
  637. targetClusters->str(targetCluster);
  638. StringBuffer queueName;
  639. StringBuffer queueState, queueStateDetails;
  640. CJobQueueContents contents;
  641. getClusterEclCCServerQueueName(queueName, targetCluster.str());
  642. Owned<IJobQueue> queue = createJobQueue(queueName);
  643. queue->copyItemsAndState(contents, queueState, queueStateDetails);
  644. unsigned count=0;
  645. Owned<IJobQueueIterator> iter = contents.getIterator();
  646. ForEach(*iter)
  647. {
  648. if (isInWuList(aws, iter->query().queryWUID()))
  649. continue;
  650. Owned<IEspActiveWorkunit> wu(new CActiveWorkunitWrapper(context, iter->query().queryWUID(),NULL, ++count));
  651. wu->setServer("ECLCCserver");
  652. wu->setInstance(serverName);
  653. wu->setQueueName(queueName);
  654. aws.append(*wu.getLink());
  655. }
  656. addServerJobQueue(version, serverJobQueues, queueName, serverName, "ECLCCserver", NULL, 0, queueState.str(), queueStateDetails.str());
  657. }
  658. }
  659. StringBuffer dirxpath;
  660. dirxpath.appendf("Software/%s", eqDfu);
  661. Owned<IPropertyTreeIterator> services = envRoot->getElements(dirxpath);
  662. if (services->first())
  663. {
  664. do
  665. {
  666. IPropertyTree &serviceTree = services->query();
  667. const char *queuename = serviceTree.queryProp("@queue");
  668. const char *serverName = serviceTree.queryProp("@name");
  669. if (queuename && *queuename)
  670. {
  671. StringArray queues;
  672. loop
  673. {
  674. StringAttr subq;
  675. const char *comma = strchr(queuename,',');
  676. if (comma)
  677. subq.set(queuename,comma-queuename);
  678. else
  679. subq.set(queuename);
  680. bool added;
  681. const char *s = strdup(subq.get());
  682. queues.bAdd(s, stringcmp, added);
  683. if (!added)
  684. free((void *)s);
  685. if (!comma)
  686. break;
  687. queuename = comma+1;
  688. if (!*queuename)
  689. break;
  690. }
  691. ForEachItemIn(q, queues)
  692. {
  693. const char *queueName = queues.item(q);
  694. StringAttrArray wulist;
  695. unsigned running = queuedJobs(queueName, wulist);
  696. ForEachItemIn(i, wulist)
  697. {
  698. const char *wuid = wulist.item(i).text.get();
  699. try
  700. {
  701. StringBuffer jname, uname, state;
  702. Owned<IConstDFUWorkUnit> wu = getDFUWorkUnitFactory()->openWorkUnit(wuid, false);
  703. if (wu)
  704. {
  705. wu->getUser(uname);
  706. wu->getJobName(jname);
  707. if (i<running)
  708. state.append("running");
  709. else
  710. state.append("queued");
  711. Owned<IEspActiveWorkunit> wu1(new CActiveWorkunitWrapper(wuid, uname.str(), jname.str(), state.str(), "normal"));
  712. wu1->setServer("DFUserver");
  713. wu1->setInstance(serverName);
  714. wu1->setQueueName(queueName);
  715. aws.append(*wu1.getLink());
  716. }
  717. }
  718. catch (IException *e)
  719. {
  720. StringBuffer msg;
  721. Owned<IEspActiveWorkunit> wu1(new CActiveWorkunitWrapper(wuid, "", "", e->errorMessage(msg).str(), "normal"));
  722. wu1->setServer("DFUserver");
  723. wu1->setInstance(serverName);
  724. wu1->setQueueName(queueName);
  725. aws.append(*wu1.getLink());
  726. e->Release();
  727. }
  728. }
  729. addServerJobQueue(version, serverJobQueues, queueName, serverName, "DFUserver", NULL, 0);
  730. }
  731. }
  732. } while (services->next());
  733. }
  734. resp.setRunning(aws);
  735. if (version > 1.03)
  736. resp.setServerJobQueues(serverJobQueues);
  737. IArrayOf<IEspDFUJob> jobs;
  738. conn.setown(querySDS().connect("DFU/RECOVERY",myProcessSession(),0, INFINITE));
  739. if (conn)
  740. {
  741. Owned<IPropertyTreeIterator> it(conn->queryRoot()->getElements("job"));
  742. ForEach(*it)
  743. {
  744. IPropertyTree &e=it->query();
  745. if (e.getPropBool("Running",false))
  746. {
  747. unsigned done;
  748. unsigned total;
  749. countProgress(&e,done,total);
  750. Owned<IEspDFUJob> job = new CDFUJob("","");
  751. job->setTimeStarted(e.queryProp("@time_started"));
  752. job->setDone(done);
  753. job->setTotal(total);
  754. StringBuffer cmd;
  755. cmd.append(e.queryProp("@command")).append(" ").append(e.queryProp("@command_parameters"));
  756. job->setCommand(cmd.str());
  757. jobs.append(*job.getLink());
  758. }
  759. }
  760. }
  761. resp.setDFUJobs(jobs);
  762. }
  763. void CWsSMCEx::createActiveWorkUnit(Owned<IEspActiveWorkunit>& ownedWU, IEspContext &context, const char* wuid, const char* location,
  764. unsigned index, const char* serverName, const char* queueName, const char* instanceName, const char* targetClusterName)
  765. {
  766. try
  767. {
  768. ownedWU.setown(new CActiveWorkunitWrapper(context, wuid, location, index));
  769. }
  770. catch (IException *e)
  771. { //if the wu cannot be opened for some reason, the openWorkUnit() inside the CActiveWorkunitWrapper() may throw an exception.
  772. //We do not want the exception stops this process of retrieving/showing all active WUs. And that WU should still be displayed
  773. //with the exception.
  774. StringBuffer msg;
  775. ownedWU.setown(new CActiveWorkunitWrapper(wuid, "", "", e->errorMessage(msg).str(), "normal"));
  776. ownedWU->setStateID(WUStateUnknown);
  777. e->Release();
  778. }
  779. ownedWU->setServer(serverName);
  780. ownedWU->setQueueName(queueName);
  781. if (instanceName && *instanceName)
  782. ownedWU->setInstance(instanceName); // JCSMORE In thor case at least, if queued it is unknown which instance it will run on..
  783. if (targetClusterName && *targetClusterName)
  784. ownedWU->setTargetClusterName(targetClusterName);
  785. }
  786. void CWsSMCEx::readWUsAndStateFromJobQueue(IEspContext& context, CWsSMCTargetCluster& targetCluster,
  787. CWsSMCQueue& jobQueue, const char* queueName, BoolHash& uniqueWUIDs, IArrayOf<IEspActiveWorkunit>& aws)
  788. {
  789. CJobQueueContents contents;
  790. Owned<IJobQueue> queue = createJobQueue(jobQueue.queueName.str());
  791. queue->copyItemsAndState(contents, jobQueue.queueState, jobQueue.queueStateDetails);
  792. Owned<IJobQueueIterator> iter = contents.getIterator();
  793. jobQueue.countQueuedJobs=0;
  794. ForEach(*iter)
  795. {
  796. const char* wuid = iter->query().queryWUID();
  797. if (!wuid || !*wuid || uniqueWUIDs.getValue(wuid))
  798. continue;
  799. uniqueWUIDs.setValue(wuid, true);
  800. const char* queue = NULL;
  801. if (queueName && *queueName)
  802. queue = queueName;
  803. else
  804. queue = targetCluster.clusterName.get();
  805. Owned<IEspActiveWorkunit> wu;
  806. createActiveWorkUnit(wu, context, wuid, jobQueue.queueName.str(), ++jobQueue.countQueuedJobs, targetCluster.statusServerName.str(),
  807. queue, NULL, targetCluster.clusterName.get());
  808. aws.append(*wu.getLink());
  809. }
  810. }
  811. void CWsSMCEx::readRunningWUsOnServerNode(IEspContext& context, IPropertyTree& serverStatusNode, const char* targetClusterName,
  812. unsigned& runningJobsInQueue, BoolHash& uniqueWUIDs, IArrayOf<IEspActiveWorkunit>& aws)
  813. {
  814. StringBuffer instance, qname, durationStr, subgraphStr;
  815. serverStatusNode.getProp("@queue", qname);
  816. const char* serverName = serverStatusNode.queryProp("@name");
  817. if (serverName && *serverName)
  818. {
  819. if (strieq("ThorMaster", serverName))
  820. serverStatusNode.getProp("@thorname",instance);
  821. else if (strieq("RoxieServer", serverName))
  822. serverStatusNode.getProp("@cluster",instance);
  823. else
  824. {
  825. if (strieq(serverName, "ECLAgent"))
  826. qname.append(serverName);//use set()??
  827. instance.appendf("%s on %s", serverName, serverStatusNode.queryProp("@node"));
  828. }
  829. }
  830. int sg_duration = serverStatusNode.getPropInt("@sg_duration", -1);
  831. const char* graph = serverStatusNode.queryProp("@graph");
  832. int subgraph = serverStatusNode.getPropInt("@subgraph", -1);
  833. durationStr.appendf("%d min", sg_duration);
  834. subgraphStr.appendf("%d", subgraph);
  835. //get all WUs
  836. Owned<IPropertyTreeIterator> wuids(serverStatusNode.getElements("WorkUnit"));
  837. ForEach(*wuids)
  838. {
  839. const char* wuid=wuids->query().queryProp(NULL);
  840. if (!wuid || !*wuid)
  841. continue;
  842. uniqueWUIDs.setValue(wuid, true);
  843. runningJobsInQueue++;
  844. StringBuffer queueName;
  845. const char* processName = NULL;
  846. if (!strieq(targetClusterName, instance.str()))
  847. processName = instance.str();
  848. const char *cluster = serverStatusNode.queryProp("Cluster");
  849. if (cluster) // backward compat check.
  850. getClusterThorQueueName(queueName, cluster);
  851. else
  852. queueName.append(qname);
  853. Owned<IEspActiveWorkunit> wu;
  854. createActiveWorkUnit(wu, context, wuid, processName, 0, serverName, queueName, instance.str(), targetClusterName);
  855. if (wu->getStateID() != WUStateRunning)
  856. {
  857. aws.append(*wu.getLink());
  858. continue;
  859. }
  860. if (subgraph > -1 && sg_duration > -1)
  861. {
  862. wu->setGraphName(graph);
  863. wu->setDuration(durationStr.str());
  864. wu->setGID(subgraphStr.str());
  865. }
  866. if (serverStatusNode.getPropInt("@memoryBlocked ", 0) != 0)
  867. wu->setMemoryBlocked(1);
  868. aws.append(*wu.getLink());
  869. }
  870. }
  871. bool CWsSMCEx::findQueueInStatusServer(IEspContext& context, IPropertyTree* serverStatusRoot, const char* serverName, const char* queueName)
  872. {
  873. bool foundServer = false;
  874. VStringBuffer path("Server[@name=\"%s\"]", serverName);
  875. Owned<IPropertyTreeIterator> it(serverStatusRoot->getElements(path.str()));
  876. ForEach(*it)
  877. {
  878. IPropertyTree& serverStatusNode = it->query();
  879. const char* queue = serverStatusNode.queryProp("@queue");
  880. if (!queue || !*queue)
  881. continue;
  882. StringArray qlist;
  883. qlist.appendListUniq(queue, ",");
  884. ForEachItemIn(q, qlist)
  885. {
  886. if (strieq(qlist.item(q), queueName))
  887. {
  888. foundServer = true;
  889. break;
  890. }
  891. }
  892. if (foundServer)
  893. break;
  894. }
  895. return foundServer;
  896. }
  897. void CWsSMCEx::sortTargetClusters(IArrayOf<IEspTargetCluster>& clusters, const char* sortBy, bool descending)
  898. {
  899. if (!sortBy || !*sortBy || strieq(sortBy, "name"))
  900. clusters.sort(descending ? sortTargetClustersByNameDescending : sortTargetClustersByNameAscending);
  901. else
  902. clusters.sort(descending ? sortTargetClustersBySizeDescending : sortTargetClustersBySizeAscending);
  903. }
  904. void CWsSMCEx::setClusterQueueStatus(CWsSMCTargetCluster& targetCluster)
  905. {
  906. CWsSMCQueue& jobQueue = targetCluster.clusterQueue;
  907. if (targetCluster.clusterType != ThorLCRCluster)
  908. jobQueue = targetCluster.agentQueue;
  909. if (!jobQueue.queueName.length())
  910. return;
  911. targetCluster.clusterStatusDetails.appendf("%s: ", jobQueue.queueName.str());
  912. bool queuePausedOrStopped = false;
  913. unsigned countRunningJobs = jobQueue.countRunningJobs;
  914. unsigned countQueuedJobs = jobQueue.countQueuedJobs;
  915. if (targetCluster.clusterType == ThorLCRCluster)
  916. {
  917. countRunningJobs += targetCluster.agentQueue.countRunningJobs;
  918. countQueuedJobs += targetCluster.agentQueue.countQueuedJobs;
  919. }
  920. if (jobQueue.queueState.length())
  921. {
  922. const char* queueState = jobQueue.queueState.str();
  923. const char* queueStateDetails = jobQueue.queueStateDetails.str();
  924. if (queueStateDetails && *queueStateDetails)
  925. targetCluster.clusterStatusDetails.appendf("queue %s; %s;", queueState, queueStateDetails);
  926. else
  927. targetCluster.clusterStatusDetails.appendf("queue %s; ", queueState);
  928. if (strieq(queueState,"stopped") || strieq(queueState,"paused"))
  929. queuePausedOrStopped = true;
  930. }
  931. if (!jobQueue.foundQueueInStatusServer)
  932. {
  933. if (queuePausedOrStopped)
  934. jobQueue.statusType = QueuePausedOrStoppedNotFound;
  935. else
  936. jobQueue.statusType = QueueRunningNotFound;
  937. }
  938. else
  939. {
  940. if (queuePausedOrStopped)
  941. {
  942. if (jobQueue.countRunningJobs > 0)
  943. jobQueue.statusType = QueuePausedOrStoppedWithJobs;
  944. else
  945. jobQueue.statusType = QueuePausedOrStoppedWithNoJob;
  946. }
  947. }
  948. }
  949. void CWsSMCEx::setClusterStatus(IEspContext& context, CWsSMCTargetCluster& targetCluster, IEspTargetCluster* returnCluster)
  950. {
  951. setClusterQueueStatus(targetCluster);
  952. int statusType = (targetCluster.clusterQueue.statusType > targetCluster.agentQueue.statusType) ? targetCluster.clusterQueue.statusType
  953. : targetCluster.agentQueue.statusType;
  954. returnCluster->setClusterStatus(statusType);
  955. //Set 'Warning' which may be displayed beside cluster name
  956. if (statusType == QueueRunningNotFound)
  957. returnCluster->setWarning("Cluster not attached");
  958. else if (statusType == QueuePausedOrStoppedNotFound)
  959. returnCluster->setWarning("Queue paused or stopped - Cluster not attached");
  960. else if (statusType != RunningNormal)
  961. returnCluster->setWarning("Queue paused or stopped");
  962. //Set 'StatusDetails' which may be displayed when a mouse is moved over cluster icon
  963. if (targetCluster.clusterStatusDetails.length())
  964. returnCluster->setStatusDetails(targetCluster.clusterStatusDetails.str());
  965. }
  966. void CWsSMCEx::getWUsNotOnTargetCluster(IEspContext &context, IPropertyTree* serverStatusRoot, IArrayOf<IEspServerJobQueue>& serverJobQueues,
  967. IArrayOf<IEspActiveWorkunit>& aws)
  968. {
  969. double version = context.getClientVersion();
  970. BoolHash uniqueServers;
  971. Owned<IPropertyTreeIterator> it(serverStatusRoot->getElements("Server"));
  972. ForEach(*it)
  973. {
  974. IPropertyTree& serverNode = it->query();
  975. const char* serverName = serverNode.queryProp("@name");
  976. const char* instance = serverNode.queryProp("@node");
  977. const char* queueName = serverNode.queryProp("@queue");
  978. unsigned port = serverNode.getPropInt("@mpport", 0);
  979. if (!serverName || !*serverName || !instance || !*instance || strieq(serverName, "DFUserver"))//DFUServer already handled separately
  980. continue;
  981. VStringBuffer instanceName("%s_on_%s:%d", serverName, instance, port);
  982. Owned<IPropertyTreeIterator> wuids(serverNode.getElements("WorkUnit"));
  983. ForEach(*wuids)
  984. {
  985. const char* wuid=wuids->query().queryProp(NULL);
  986. if (!wuid || !*wuid)
  987. continue;
  988. if (isInWuList(aws, wuid))
  989. continue;
  990. Owned<IEspActiveWorkunit> wu;
  991. createActiveWorkUnit(wu, context, wuid, NULL, 0, serverName, queueName, instance, NULL);
  992. aws.append(*wu.getLink());
  993. }
  994. if (!uniqueServers.getValue(instanceName))
  995. {
  996. uniqueServers.setValue(instanceName, true);
  997. addServerJobQueue(version, serverJobQueues, queueName, instanceName, serverName, instance, port);
  998. }
  999. }
  1000. return;
  1001. }
  1002. void CWsSMCEx::readDFUWUs(IEspContext &context, const char* queueName, const char* serverName, IArrayOf<IEspActiveWorkunit>& aws)
  1003. {
  1004. StringAttrArray wulist;
  1005. unsigned running = queuedJobs(queueName, wulist);
  1006. ForEachItemIn(i, wulist)
  1007. {
  1008. StringBuffer jname, uname, state, error;
  1009. const char *wuid = wulist.item(i).text.get();
  1010. if (i<running)
  1011. state.set("running");
  1012. else
  1013. state.set("queued");
  1014. try
  1015. {
  1016. Owned<IConstDFUWorkUnit> dfuwu = getDFUWorkUnitFactory()->openWorkUnit(wuid, false);
  1017. dfuwu->getUser(uname);
  1018. dfuwu->getJobName(jname);
  1019. }
  1020. catch (IException *e)
  1021. {
  1022. e->errorMessage(error);
  1023. state.appendf(" (%s)", error.str());
  1024. e->Release();
  1025. }
  1026. Owned<IEspActiveWorkunit> wu(new CActiveWorkunitWrapper(wuid, uname.str(), jname.str(), state.str(), "normal"));
  1027. wu->setServer("DFUserver");
  1028. wu->setInstance(serverName);
  1029. wu->setQueueName(queueName);
  1030. aws.append(*wu.getLink());
  1031. }
  1032. }
  1033. void CWsSMCEx::getDFUServersAndWUs(IEspContext &context, IPropertyTree* envRoot, IArrayOf<IEspServerJobQueue>& serverJobQueues, IArrayOf<IEspActiveWorkunit>& aws)
  1034. {
  1035. if (!envRoot)
  1036. return;
  1037. double version = context.getClientVersion();
  1038. VStringBuffer path("Software/%s", eqDfu);
  1039. Owned<IPropertyTreeIterator> services = envRoot->getElements(path);
  1040. ForEach(*services)
  1041. {
  1042. IPropertyTree &serviceTree = services->query();
  1043. const char *qname = serviceTree.queryProp("@queue");
  1044. const char *serverName = serviceTree.queryProp("@name");
  1045. if (!qname || !*qname)
  1046. continue;
  1047. StringArray queues;
  1048. queues.appendListUniq(qname, ",");
  1049. ForEachItemIn(q, queues)
  1050. {
  1051. const char *queueName = queues.item(q);
  1052. readDFUWUs(context, queueName, serverName, aws);
  1053. addServerJobQueue(version, serverJobQueues, queueName, serverName, "DFUserver", NULL, 0);
  1054. }
  1055. }
  1056. }
  1057. void CWsSMCEx::getDFURecoveryJobs(IEspContext &context, IArrayOf<IEspDFUJob>& jobs)
  1058. {
  1059. Owned<IRemoteConnection> conn = querySDS().connect("DFU/RECOVERY",myProcessSession(),0, INFINITE);
  1060. if (!conn)
  1061. return;
  1062. Owned<IPropertyTreeIterator> it(conn->queryRoot()->getElements("job"));
  1063. ForEach(*it)
  1064. {
  1065. IPropertyTree &e=it->query();
  1066. if (!e.getPropBool("Running",false))
  1067. continue;
  1068. StringBuffer cmd;
  1069. unsigned done, total;
  1070. countProgress(&e,done,total);
  1071. cmd.append(e.queryProp("@command")).append(" ").append(e.queryProp("@command_parameters"));
  1072. Owned<IEspDFUJob> job = new CDFUJob("","");
  1073. job->setTimeStarted(e.queryProp("@time_started"));
  1074. job->setDone(done);
  1075. job->setTotal(total);
  1076. job->setCommand(cmd.str());
  1077. jobs.append(*job.getLink());
  1078. }
  1079. }
  1080. // This method reads job information from both /Status/Servers and IJobQueue.
  1081. //
  1082. // Each server component (a thor cluster, a dfuserver, or an eclagent) is one 'Server' branch under
  1083. // /Status/Servers. A 'Server' branch has a @queue which indicates the queue name of the server.
  1084. // A 'Server' branch also contains the information about running WUs on that 'Server'. This
  1085. // method reads the information. Those WUs are displays under that server (identified by its queue name)
  1086. // on Activity page.
  1087. //
  1088. // For the WUs list inside /Status/Servers/Server[@name=ECLagent] but not list under other 'Server', the
  1089. // existing code has to find out WUID and @clusterName of the WU. Then, uses @clusterName to find out the
  1090. // queue name in IConstWUClusterInfo. Those WUs list under that server (identified by its queue name) with
  1091. // a note 'on ECLagent'. TBD: the logic here will be simpler if the /Status/Servers/Server is named the
  1092. // instance and/or cluster.
  1093. //
  1094. // In order to get information about queued WUs, this method gets queue names from both IConstWUClusterInfo
  1095. // and other environment functions. Each of those queue names is linked to one IJobQueues. From the
  1096. // IJobQueues, this method reads queued jobs for each server component and list them under the server
  1097. // component (identified by its queue name).
  1098. bool CWsSMCEx::onActivity(IEspContext &context, IEspActivityRequest &req, IEspActivityResponse& resp)
  1099. {
  1100. context.validateFeatureAccess(FEATURE_URL, SecAccess_Read, true);
  1101. try
  1102. {
  1103. const char* build_ver = getBuildVersion();
  1104. resp.setBuild(build_ver);
  1105. double version = context.getClientVersion();
  1106. bool isSuperUser = true;
  1107. #ifdef _USE_OPENLDAP
  1108. CLdapSecManager* secmgr = dynamic_cast<CLdapSecManager*>(context.querySecManager());
  1109. if(secmgr && !secmgr->isSuperUser(context.queryUser()))
  1110. isSuperUser = false;
  1111. #endif
  1112. if(isSuperUser && req.getFromSubmitBtn())
  1113. readBannerAndChatRequest(context, req, resp);
  1114. if (version >= 1.12)
  1115. resp.setSuperUser(isSuperUser);
  1116. if (version >= 1.06)
  1117. setBannerAndChatData(version, resp);
  1118. Owned<IRemoteConnection> connEnv = querySDS().connect("Environment", myProcessSession(), RTM_LOCK_READ, SDS_LOCK_TIMEOUT);
  1119. IPropertyTree* envRoot = connEnv->queryRoot();
  1120. if (!envRoot)
  1121. throw MakeStringException(ECLWATCH_CANNOT_GET_ENV_INFO,"Failed to get environment information.");
  1122. CConstWUClusterInfoArray clusters;
  1123. getEnvironmentClusterInfo(envRoot, clusters);
  1124. if (version >= 1.16)
  1125. {
  1126. CIArrayOf<CWsSMCTargetCluster> thorTargetClusters;
  1127. CIArrayOf<CWsSMCTargetCluster> roxieTargetClusters;
  1128. CIArrayOf<CWsSMCTargetCluster> hthorTargetClusters;
  1129. IArrayOf<IEspServerJobQueue> serverJobQueues;
  1130. IArrayOf<IEspActiveWorkunit> aws;
  1131. IArrayOf<IEspDFUJob> DFURecoveryJobs;
  1132. Owned<IRemoteConnection> connStatusServers = querySDS().connect("/Status/Servers",myProcessSession(),RTM_LOCK_READ,30000);
  1133. IPropertyTree* serverStatusRoot = connStatusServers->queryRoot();
  1134. if (!serverStatusRoot)
  1135. throw MakeStringException(ECLWATCH_CANNOT_GET_STATUS_INFO, "Failed to get status server information.");
  1136. readTargetClusterInfo(context, clusters, serverStatusRoot, thorTargetClusters, roxieTargetClusters, hthorTargetClusters);
  1137. readRunningWUsAndQueuedWUs(context, envRoot, serverStatusRoot, thorTargetClusters, roxieTargetClusters, hthorTargetClusters,
  1138. aws, serverJobQueues, DFURecoveryJobs);
  1139. updateActivityResponse(context, req, resp, thorTargetClusters, roxieTargetClusters, hthorTargetClusters, aws, serverJobQueues, DFURecoveryJobs);
  1140. }
  1141. else
  1142. {//for backward compatible
  1143. getServersAndWUs(context, req, resp, version, envRoot, clusters);
  1144. }
  1145. }
  1146. catch(IException* e)
  1147. {
  1148. FORWARDEXCEPTION(context, e, ECLWATCH_INTERNAL_ERROR);
  1149. }
  1150. return true;
  1151. }
  1152. const char *CWsSMCEx::getStatusServerTypeName(WsSMCStatusServerType type)
  1153. {
  1154. return (type < WsSMCSSTterm) ? WsSMCStatusServerTypeNames[type] : NULL;
  1155. }
  1156. void CWsSMCEx::readTargetClusterInfo(IEspContext &context, CConstWUClusterInfoArray& clusters, IPropertyTree* serverStatusRoot,
  1157. CIArrayOf<CWsSMCTargetCluster>& thorTargetClusters, CIArrayOf<CWsSMCTargetCluster>& roxieTargetClusters, CIArrayOf<CWsSMCTargetCluster>& hthorTargetClusters)
  1158. {
  1159. ForEachItemIn(c, clusters)
  1160. {
  1161. IConstWUClusterInfo &cluster = clusters.item(c);
  1162. Owned<CWsSMCTargetCluster> targetCluster = new CWsSMCTargetCluster();
  1163. readTargetClusterInfo(context, cluster, serverStatusRoot, targetCluster);
  1164. if (cluster.getPlatform() == ThorLCRCluster)
  1165. thorTargetClusters.append(*targetCluster.getClear());
  1166. else if (cluster.getPlatform() == RoxieCluster)
  1167. roxieTargetClusters.append(*targetCluster.getClear());
  1168. else
  1169. hthorTargetClusters.append(*targetCluster.getClear());
  1170. }
  1171. }
  1172. void CWsSMCEx::readTargetClusterInfo(IEspContext& context, IConstWUClusterInfo& cluster, IPropertyTree* serverStatusRoot, CWsSMCTargetCluster* targetCluster)
  1173. {
  1174. SCMStringBuffer clusterName;
  1175. cluster.getName(clusterName);
  1176. targetCluster->clusterName.set(clusterName.str());
  1177. targetCluster->clusterType = cluster.getPlatform();
  1178. targetCluster->clusterSize = cluster.getSize();
  1179. cluster.getServerQueue(targetCluster->serverQueue.queueName);
  1180. cluster.getAgentQueue(targetCluster->agentQueue.queueName);
  1181. StringBuffer statusServerName;
  1182. CWsSMCQueue* jobQueue = NULL;
  1183. if (targetCluster->clusterType == ThorLCRCluster)
  1184. {
  1185. statusServerName.set(getStatusServerTypeName(WsSMCSSTThorLCRCluster));
  1186. jobQueue = &targetCluster->clusterQueue;
  1187. cluster.getThorQueue(jobQueue->queueName);
  1188. }
  1189. else if (targetCluster->clusterType == RoxieCluster)
  1190. {
  1191. statusServerName.set(getStatusServerTypeName(WsSMCSSTRoxieCluster));
  1192. jobQueue = &targetCluster->agentQueue;
  1193. }
  1194. else
  1195. {
  1196. statusServerName.set(getStatusServerTypeName(WsSMCSSTHThorCluster));
  1197. jobQueue = &targetCluster->agentQueue;
  1198. }
  1199. targetCluster->statusServerName.set(statusServerName.str());
  1200. targetCluster->queueName.set(jobQueue->queueName.str());
  1201. if (serverStatusRoot)
  1202. {
  1203. jobQueue->foundQueueInStatusServer = findQueueInStatusServer(context, serverStatusRoot, statusServerName.str(), targetCluster->queueName.get());
  1204. if (!jobQueue->foundQueueInStatusServer)
  1205. targetCluster->clusterStatusDetails.appendf("Cluster %s not attached; ", clusterName.str());
  1206. }
  1207. return;
  1208. }
  1209. void CWsSMCEx::readRunningWUsAndQueuedWUs(IEspContext &context, IPropertyTree* envRoot, IPropertyTree* serverStatusRoot,
  1210. CIArrayOf<CWsSMCTargetCluster>& thorTargetClusters, CIArrayOf<CWsSMCTargetCluster>& roxieTargetClusters, CIArrayOf<CWsSMCTargetCluster>& hthorTargetClusters,
  1211. IArrayOf<IEspActiveWorkunit>& aws, IArrayOf<IEspServerJobQueue>& serverJobQueues, IArrayOf<IEspDFUJob>& DFURecoveryJobs)
  1212. {
  1213. BoolHash uniqueWUIDs;
  1214. readRunningWUsOnStatusServer(context, serverStatusRoot, WsSMCSSTThorLCRCluster, thorTargetClusters, roxieTargetClusters, hthorTargetClusters, uniqueWUIDs, aws);
  1215. readWUsAndStateFromJobQueue(context, thorTargetClusters, uniqueWUIDs, aws);
  1216. readRunningWUsOnStatusServer(context, serverStatusRoot, WsSMCSSTRoxieCluster, roxieTargetClusters, thorTargetClusters, hthorTargetClusters, uniqueWUIDs, aws);
  1217. readWUsAndStateFromJobQueue(context, roxieTargetClusters, uniqueWUIDs, aws);
  1218. readRunningWUsOnStatusServer(context, serverStatusRoot, WsSMCSSTHThorCluster, hthorTargetClusters, thorTargetClusters, roxieTargetClusters, uniqueWUIDs, aws);
  1219. readWUsAndStateFromJobQueue(context, hthorTargetClusters, uniqueWUIDs, aws);
  1220. readRunningWUsOnStatusServer(context, serverStatusRoot, WsSMCSSTECLagent, thorTargetClusters, roxieTargetClusters, hthorTargetClusters, uniqueWUIDs, aws);
  1221. getWUsNotOnTargetCluster(context, serverStatusRoot, serverJobQueues, aws);
  1222. getDFUServersAndWUs(context, envRoot, serverJobQueues, aws);
  1223. getDFURecoveryJobs(context, DFURecoveryJobs);
  1224. }
  1225. void CWsSMCEx::readRunningWUsOnStatusServer(IEspContext& context, IPropertyTree* serverStatusRoot, WsSMCStatusServerType statusServerType,
  1226. CIArrayOf<CWsSMCTargetCluster>& targetClusters, CIArrayOf<CWsSMCTargetCluster>& targetClusters1, CIArrayOf<CWsSMCTargetCluster>& targetClusters2,
  1227. BoolHash& uniqueWUIDs, IArrayOf<IEspActiveWorkunit>& aws)
  1228. {
  1229. const char* serverName = getStatusServerTypeName(statusServerType);
  1230. if (!serverName || !*serverName)
  1231. return;
  1232. bool isECLAgent = (statusServerType == WsSMCSSTECLagent);
  1233. VStringBuffer path("Server[@name=\"%s\"]", serverName);
  1234. Owned<IPropertyTreeIterator> itrStatusServer(serverStatusRoot->getElements(path.str()));
  1235. ForEach(*itrStatusServer)
  1236. {
  1237. IPropertyTree& serverStatusNode = itrStatusServer->query();
  1238. StringBuffer instance;
  1239. if ((statusServerType == WsSMCSSTThorLCRCluster) || (statusServerType == WsSMCSSTRoxieCluster))
  1240. serverStatusNode.getProp("@cluster", instance);
  1241. else
  1242. instance.appendf("%s on %s", serverName, serverStatusNode.queryProp("@node"));
  1243. const char* graph = NULL;
  1244. int sgDuration = -1;
  1245. int subgraph = -1;
  1246. StringBuffer durationStr, subgraphStr;
  1247. if (!isECLAgent)
  1248. {
  1249. sgDuration = serverStatusNode.getPropInt("@sg_duration", -1);
  1250. subgraph = serverStatusNode.getPropInt("@subgraph", -1);
  1251. graph = serverStatusNode.queryProp("@graph");
  1252. durationStr.appendf("%d min", sgDuration);
  1253. subgraphStr.appendf("%d", subgraph);
  1254. }
  1255. Owned<IPropertyTreeIterator> wuids(serverStatusNode.getElements("WorkUnit"));
  1256. ForEach(*wuids)
  1257. {
  1258. const char* wuid=wuids->query().queryProp(NULL);
  1259. if (!wuid || !*wuid || (isECLAgent && uniqueWUIDs.getValue(wuid)))
  1260. continue;
  1261. CWsSMCTargetCluster* targetCluster = findWUClusterInfo(context, wuid, isECLAgent, targetClusters, targetClusters1, targetClusters2);
  1262. if (!targetCluster)
  1263. continue;
  1264. const char* targetClusterName = targetCluster->clusterName.get();
  1265. CWsSMCQueue* jobQueue;
  1266. if (statusServerType == WsSMCSSTThorLCRCluster)
  1267. jobQueue = &targetCluster->clusterQueue;
  1268. else
  1269. jobQueue = &targetCluster->agentQueue;
  1270. Owned<IEspActiveWorkunit> wu;
  1271. if (!isECLAgent)
  1272. {
  1273. uniqueWUIDs.setValue(wuid, true);
  1274. const char *cluster = serverStatusNode.queryProp("Cluster");
  1275. StringBuffer queueName;
  1276. if (cluster) // backward compat check.
  1277. getClusterThorQueueName(queueName, cluster);
  1278. else
  1279. queueName.append(targetCluster->queueName.get());
  1280. createActiveWorkUnit(wu, context, wuid, !strieq(targetClusterName, instance.str()) ? instance.str() : NULL, 0, serverName, queueName, instance.str(), targetClusterName);
  1281. if (wu->getStateID() == WUStateRunning) //'aborting' may be another possible status
  1282. {
  1283. if (subgraph > -1 && sgDuration > -1)
  1284. {
  1285. wu->setGraphName(graph);
  1286. wu->setDuration(durationStr.str());
  1287. wu->setGID(subgraphStr.str());
  1288. }
  1289. if (serverStatusNode.getPropInt("@memoryBlocked ", 0) != 0)
  1290. wu->setMemoryBlocked(1);
  1291. }
  1292. }
  1293. else
  1294. {
  1295. createActiveWorkUnit(wu, context, wuid, instance.str(), 0, serverName, serverName, instance.str(), targetClusterName);
  1296. if (targetCluster->clusterType == ThorLCRCluster)
  1297. wu->setClusterType("Thor");
  1298. else if (targetCluster->clusterType == RoxieCluster)
  1299. wu->setClusterType("Roxie");
  1300. else
  1301. wu->setClusterType("HThor");
  1302. wu->setClusterQueueName(targetCluster->queueName.get());
  1303. if (wu->getStateID() != WUStateRunning)
  1304. {
  1305. const char *extra = wu->getExtra();
  1306. if (wu->getStateID() != WUStateBlocked || !extra || !*extra) // Blocked on persist treated as running here
  1307. {
  1308. aws.append(*wu.getLink());
  1309. jobQueue->countQueuedJobs++;
  1310. continue;
  1311. }
  1312. }
  1313. if (serverStatusNode.getPropInt("@memoryBlocked ", 0) != 0)
  1314. wu->setMemoryBlocked(1);
  1315. }
  1316. aws.append(*wu.getLink());
  1317. jobQueue->countRunningJobs++;
  1318. }
  1319. }
  1320. }
  1321. void CWsSMCEx::readWUsAndStateFromJobQueue(IEspContext& context, CIArrayOf<CWsSMCTargetCluster>& targetClusters, BoolHash& uniqueWUIDs, IArrayOf<IEspActiveWorkunit>& aws)
  1322. {
  1323. ForEachItemIn(i, targetClusters)
  1324. readWUsAndStateFromJobQueue(context, targetClusters.item(i), uniqueWUIDs, aws);
  1325. }
  1326. void CWsSMCEx::readWUsAndStateFromJobQueue(IEspContext& context, CWsSMCTargetCluster& targetCluster, BoolHash& uniqueWUIDs, IArrayOf<IEspActiveWorkunit>& aws)
  1327. {
  1328. if (targetCluster.clusterType == ThorLCRCluster)
  1329. {
  1330. readWUsAndStateFromJobQueue(context, targetCluster, targetCluster.clusterQueue, NULL, uniqueWUIDs, aws);
  1331. targetCluster.queueStatus.set(targetCluster.clusterQueue.queueState);
  1332. }
  1333. if (targetCluster.agentQueue.queueName.length())
  1334. {
  1335. readWUsAndStateFromJobQueue(context, targetCluster, targetCluster.agentQueue, targetCluster.agentQueue.queueName.str(), uniqueWUIDs, aws);
  1336. if (targetCluster.clusterType != ThorLCRCluster)
  1337. targetCluster.queueStatus.set(targetCluster.agentQueue.queueState);
  1338. }
  1339. if (targetCluster.serverQueue.queueName.length())
  1340. readWUsAndStateFromJobQueue(context, targetCluster, targetCluster.serverQueue, targetCluster.serverQueue.queueName.str(), uniqueWUIDs, aws);
  1341. }
  1342. CWsSMCTargetCluster* CWsSMCEx::findTargetCluster(const char* clusterName, CIArrayOf<CWsSMCTargetCluster>& targetClusters)
  1343. {
  1344. ForEachItemIn(i, targetClusters)
  1345. {
  1346. CWsSMCTargetCluster& targetCluster = targetClusters.item(i);
  1347. if (strieq(targetCluster.clusterName.get(), clusterName))
  1348. return &targetCluster;
  1349. }
  1350. return NULL;
  1351. }
  1352. CWsSMCTargetCluster* CWsSMCEx::findWUClusterInfo(IEspContext& context, const char* wuid, bool isOnECLAgent, CIArrayOf<CWsSMCTargetCluster>& targetClusters,
  1353. CIArrayOf<CWsSMCTargetCluster>& targetClusters1, CIArrayOf<CWsSMCTargetCluster>& targetClusters2)
  1354. {
  1355. SCMStringBuffer clusterName;
  1356. try
  1357. {
  1358. CWUWrapper cwu(wuid, context);
  1359. cwu->getClusterName(clusterName);
  1360. if (!clusterName.length())
  1361. return NULL;
  1362. }
  1363. catch (IException *e)
  1364. {//Exception may be thrown when the openWorkUnit() is called inside the CWUWrapper
  1365. StringBuffer msg;
  1366. WARNLOG("Failed to open workunit %s: %s", wuid, e->errorMessage(msg).str());
  1367. e->Release();
  1368. return NULL;
  1369. }
  1370. const char* cluster = clusterName.str();
  1371. CWsSMCTargetCluster* targetCluster = findTargetCluster(cluster, targetClusters);
  1372. if (targetCluster || !isOnECLAgent)
  1373. return targetCluster;
  1374. targetCluster = findTargetCluster(cluster, targetClusters1);
  1375. if (targetCluster)
  1376. return targetCluster;
  1377. return findTargetCluster(cluster, targetClusters2);
  1378. }
  1379. void CWsSMCEx::updateActivityResponse(IEspContext &context, IEspActivityRequest &req, IEspActivityResponse& resp,
  1380. CIArrayOf<CWsSMCTargetCluster>& thorTargetClusters, CIArrayOf<CWsSMCTargetCluster>& roxieTargetClusters, CIArrayOf<CWsSMCTargetCluster>& hthorTargetClusters,
  1381. IArrayOf<IEspActiveWorkunit>& aws, IArrayOf<IEspServerJobQueue>& serverJobQueues, IArrayOf<IEspDFUJob>& DFURecoveryJobs)
  1382. {
  1383. IArrayOf<IEspTargetCluster> thorClusters;
  1384. IArrayOf<IEspTargetCluster> hthorClusters;
  1385. IArrayOf<IEspTargetCluster> roxieClusters;
  1386. //Now transfer required data items from CWsSMCTargetCluster to IEspTargetCluster which is needed for Activity response.
  1387. setESPTargetClusters(context, thorTargetClusters, thorClusters);
  1388. setESPTargetClusters(context, roxieTargetClusters, roxieClusters);
  1389. setESPTargetClusters(context, hthorTargetClusters, hthorClusters);
  1390. const char* sortBy = req.getSortBy();
  1391. bool descending = req.getDescending();
  1392. sortTargetClusters(thorClusters, sortBy, descending);
  1393. sortTargetClusters(roxieClusters, sortBy, descending);
  1394. SecAccessFlags access;
  1395. if (context.authorizeFeature(THORQUEUE_FEATURE, access) && access>=SecAccess_Full)
  1396. resp.setAccessRight("Access_Full");
  1397. resp.setSortBy(sortBy);
  1398. resp.setDescending(descending);
  1399. resp.setThorClusterList(thorClusters);
  1400. resp.setRoxieClusterList(roxieClusters);
  1401. resp.setHThorClusterList(hthorClusters);
  1402. resp.setServerJobQueues(serverJobQueues);
  1403. resp.setRunning(aws);
  1404. resp.setDFUJobs(DFURecoveryJobs);
  1405. }
  1406. void CWsSMCEx::setESPTargetClusters(IEspContext& context, CIArrayOf<CWsSMCTargetCluster>& targetClusters, IArrayOf<IEspTargetCluster>& respTargetClusters)
  1407. {
  1408. ForEachItemIn(i, targetClusters)
  1409. {
  1410. CWsSMCTargetCluster& targetCluster = targetClusters.item(i);
  1411. Owned<IEspTargetCluster> respTargetCluster = new CTargetCluster("", "");
  1412. respTargetCluster->setClusterName(targetCluster.clusterName.get());
  1413. respTargetCluster->setClusterSize(targetCluster.clusterSize);
  1414. respTargetCluster->setClusterType(targetCluster.clusterType);
  1415. respTargetCluster->setQueueName(targetCluster.queueName.get());
  1416. respTargetCluster->setQueueStatus(targetCluster.queueStatus.get());
  1417. setClusterStatus(context, targetCluster, respTargetCluster);
  1418. respTargetClusters.append(*respTargetCluster.getClear());
  1419. }
  1420. }
  1421. void CWsSMCEx::addServerJobQueue(double version, IArrayOf<IEspServerJobQueue>& jobQueues, const char* queueName, const char* serverName,
  1422. const char* serverType, const char* networkAddress, unsigned port)
  1423. {
  1424. if (!queueName || !*queueName || !serverName || !*serverName || !serverType || !*serverType)
  1425. return;
  1426. StringBuffer queueState;
  1427. StringBuffer queueStateDetails;
  1428. Owned<IJobQueue> queue = createJobQueue(queueName);
  1429. if (queue->stopped(queueStateDetails))
  1430. queueState.set("stopped");
  1431. else if (queue->paused(queueStateDetails))
  1432. queueState.set("paused");
  1433. else
  1434. queueState.set("running");
  1435. addServerJobQueue(version, jobQueues, queueName, serverName, serverType, networkAddress, port, queueState.str(), queueStateDetails.str());
  1436. }
  1437. void CWsSMCEx::addServerJobQueue(double version, IArrayOf<IEspServerJobQueue>& jobQueues, const char* queueName, const char* serverName,
  1438. const char* serverType, const char* networkAddress, unsigned port, const char* queueState, const char* queueStateDetails)
  1439. {
  1440. if (!queueName || !*queueName || !serverName || !*serverName || !serverType || !*serverType)
  1441. return;
  1442. if (!queueState || !*queueState)
  1443. queueState = "running";
  1444. Owned<IEspServerJobQueue> jobQueue = createServerJobQueue("", "");
  1445. jobQueue->setQueueName(queueName);
  1446. jobQueue->setServerName(serverName);
  1447. jobQueue->setServerType(serverType);
  1448. if ((version >= 1.19) && networkAddress && *networkAddress)
  1449. {
  1450. jobQueue->setNetworkAddress(networkAddress);
  1451. jobQueue->setPort(port);
  1452. }
  1453. setServerJobQueueStatus(version, jobQueue, queueState, queueStateDetails);
  1454. jobQueues.append(*jobQueue.getClear());
  1455. }
  1456. void CWsSMCEx::setServerJobQueueStatus(double version, IEspServerJobQueue* jobQueue, const char* status, const char* details)
  1457. {
  1458. if (!status || !*status)
  1459. return;
  1460. StringBuffer queueState;
  1461. if (details && *details)
  1462. queueState.appendf("queue %s; %s;", status, details);
  1463. else
  1464. queueState.appendf("queue %s;", status);
  1465. jobQueue->setQueueStatus(status);
  1466. if (version >= 1.17)
  1467. jobQueue->setStatusDetails(queueState.str());
  1468. }
  1469. void CWsSMCEx::addToThorClusterList(IArrayOf<IEspThorCluster>& clusters, IEspThorCluster* cluster, const char* sortBy, bool descending)
  1470. {
  1471. if (clusters.length() < 1)
  1472. {
  1473. clusters.append(*cluster);
  1474. return;
  1475. }
  1476. const char* clusterName = cluster->getClusterName();
  1477. unsigned clusterSize = cluster->getClusterSize();
  1478. bool clusterAdded = false;
  1479. ForEachItemIn(i, clusters)
  1480. {
  1481. int strCmp = 0;
  1482. IEspThorCluster& cluster1 = clusters.item(i);
  1483. if (!sortBy || !*sortBy || strieq(sortBy, "name"))
  1484. {
  1485. strCmp = strcmp(cluster1.getClusterName(), clusterName);
  1486. }
  1487. else
  1488. {//size
  1489. //strCmp = cluster1.getClusterSize() - clusterSize;
  1490. int si = cluster1.getClusterSize();
  1491. strCmp = si - clusterSize;
  1492. }
  1493. if ((descending && (strCmp < 0)) || (!descending && (strCmp > 0)))
  1494. {
  1495. clusters.add(*cluster, i);
  1496. clusterAdded = true;
  1497. break;
  1498. }
  1499. }
  1500. if (!clusterAdded)
  1501. clusters.append(*cluster);
  1502. return;
  1503. }
  1504. void CWsSMCEx::addToRoxieClusterList(IArrayOf<IEspRoxieCluster>& clusters, IEspRoxieCluster* cluster, const char* sortBy, bool descending)
  1505. {
  1506. if (clusters.length() < 1)
  1507. {
  1508. clusters.append(*cluster);
  1509. return;
  1510. }
  1511. const char* clusterName = cluster->getClusterName();
  1512. unsigned clusterSize = cluster->getClusterSize();
  1513. bool clusterAdded = false;
  1514. ForEachItemIn(i, clusters)
  1515. {
  1516. int strCmp = 0;
  1517. IEspRoxieCluster& cluster1 = clusters.item(i);
  1518. if (!sortBy || !*sortBy || strieq(sortBy, "name"))
  1519. {
  1520. strCmp = strcmp(cluster1.getClusterName(), clusterName);
  1521. }
  1522. else
  1523. {//size
  1524. strCmp = cluster1.getClusterSize() - clusterSize;
  1525. }
  1526. if ((descending && (strCmp < 0)) || (!descending && (strCmp > 0)))
  1527. {
  1528. clusters.add(*cluster, i);
  1529. clusterAdded = true;
  1530. break;
  1531. }
  1532. }
  1533. if (!clusterAdded)
  1534. clusters.append(*cluster);
  1535. return;
  1536. }
  1537. void CWsSMCEx::addCapabilities(IPropertyTree* pFeatureNode, const char* access,
  1538. IArrayOf<IEspCapability>& capabilities)
  1539. {
  1540. StringBuffer xpath(access);
  1541. xpath.append("/Capability");
  1542. Owned<IPropertyTreeIterator> it = pFeatureNode->getElements(xpath.str());
  1543. ForEach(*it)
  1544. {
  1545. IPropertyTree* pCapabilityNode = &it->query();
  1546. IEspCapability* pCapability = new CCapability("ws_smc");
  1547. pCapability->setName( pCapabilityNode->queryProp("@name") );
  1548. pCapability->setDescription( pCapabilityNode->queryProp("@description") );
  1549. capabilities.append(*pCapability);
  1550. }
  1551. }
  1552. static void checkAccess(IEspContext &context, const char* feature,int level)
  1553. {
  1554. if (!context.validateFeatureAccess(feature, level, false))
  1555. throw MakeStringException(ECLWATCH_THOR_QUEUE_ACCESS_DENIED, "Failed to access the queue functions. Permission denied.");
  1556. }
  1557. bool CWsSMCEx::onMoveJobDown(IEspContext &context, IEspSMCJobRequest &req, IEspSMCJobResponse &resp)
  1558. {
  1559. try
  1560. {
  1561. checkAccess(context,THORQUEUE_FEATURE,SecAccess_Full);
  1562. Owned<IJobQueue> queue = createJobQueue(req.getQueueName());
  1563. QueueLock lock(queue);
  1564. unsigned index=queue->findRank(req.getWuid());
  1565. if(index<queue->ordinality())
  1566. {
  1567. IJobQueueItem * item0 = queue->getItem(index);
  1568. IJobQueueItem * item = queue->getItem(index+1);
  1569. if(item && item0 && (item0->getPriority() == item->getPriority()))
  1570. queue->moveAfter(req.getWuid(),item->queryWUID());
  1571. }
  1572. AccessSuccess(context, "Changed job priority %s",req.getWuid());
  1573. resp.setRedirectUrl("/WsSMC/");
  1574. }
  1575. catch(IException* e)
  1576. {
  1577. FORWARDEXCEPTION(context, e, ECLWATCH_INTERNAL_ERROR);
  1578. }
  1579. return true;
  1580. }
  1581. bool CWsSMCEx::onMoveJobUp(IEspContext &context, IEspSMCJobRequest &req, IEspSMCJobResponse &resp)
  1582. {
  1583. try
  1584. {
  1585. checkAccess(context,THORQUEUE_FEATURE,SecAccess_Full);
  1586. Owned<IJobQueue> queue = createJobQueue(req.getQueueName());
  1587. QueueLock lock(queue);
  1588. unsigned index=queue->findRank(req.getWuid());
  1589. if(index>0 && index<queue->ordinality())
  1590. {
  1591. IJobQueueItem * item0 = queue->getItem(index);
  1592. IJobQueueItem * item = queue->getItem(index-1);
  1593. if(item && item0 && (item0->getPriority() == item->getPriority()))
  1594. queue->moveBefore(req.getWuid(),item->queryWUID());
  1595. }
  1596. AccessSuccess(context, "Changed job priority %s",req.getWuid());
  1597. resp.setRedirectUrl("/WsSMC/");
  1598. }
  1599. catch(IException* e)
  1600. {
  1601. FORWARDEXCEPTION(context, e, ECLWATCH_INTERNAL_ERROR);
  1602. }
  1603. return true;
  1604. }
  1605. bool CWsSMCEx::onMoveJobBack(IEspContext &context, IEspSMCJobRequest &req, IEspSMCJobResponse &resp)
  1606. {
  1607. try
  1608. {
  1609. checkAccess(context,THORQUEUE_FEATURE,SecAccess_Full);
  1610. Owned<IJobQueue> queue = createJobQueue(req.getQueueName());
  1611. QueueLock lock(queue);
  1612. unsigned index=queue->findRank(req.getWuid());
  1613. if(index<queue->ordinality())
  1614. {
  1615. int priority0 = queue->getItem(index)->getPriority();
  1616. unsigned biggestIndoxInSamePriority = index;
  1617. unsigned nextIndex = biggestIndoxInSamePriority + 1;
  1618. while (nextIndex<queue->ordinality())
  1619. {
  1620. IJobQueueItem * item = queue->getItem(nextIndex);
  1621. if (priority0 != item->getPriority())
  1622. {
  1623. break;
  1624. }
  1625. biggestIndoxInSamePriority = nextIndex;
  1626. nextIndex++;
  1627. }
  1628. if (biggestIndoxInSamePriority != index)
  1629. {
  1630. IJobQueueItem * item = queue->getItem(biggestIndoxInSamePriority);
  1631. queue->moveAfter(req.getWuid(),item->queryWUID());
  1632. }
  1633. }
  1634. AccessSuccess(context, "Changed job priority %s",req.getWuid());
  1635. resp.setRedirectUrl("/WsSMC/");
  1636. }
  1637. catch(IException* e)
  1638. {
  1639. FORWARDEXCEPTION(context, e, ECLWATCH_INTERNAL_ERROR);
  1640. }
  1641. return true;
  1642. }
  1643. bool CWsSMCEx::onMoveJobFront(IEspContext &context, IEspSMCJobRequest &req, IEspSMCJobResponse &resp)
  1644. {
  1645. try
  1646. {
  1647. checkAccess(context,THORQUEUE_FEATURE,SecAccess_Full);
  1648. Owned<IJobQueue> queue = createJobQueue(req.getQueueName());
  1649. QueueLock lock(queue);
  1650. unsigned index=queue->findRank(req.getWuid());
  1651. if(index>0 && index<queue->ordinality())
  1652. {
  1653. int priority0 = queue->getItem(index)->getPriority();
  1654. unsigned smallestIndoxInSamePriority = index;
  1655. int nextIndex = smallestIndoxInSamePriority - 1;
  1656. while (nextIndex >= 0)
  1657. {
  1658. IJobQueueItem * item = queue->getItem(nextIndex);
  1659. if (priority0 != item->getPriority())
  1660. {
  1661. break;
  1662. }
  1663. smallestIndoxInSamePriority = nextIndex;
  1664. nextIndex--;
  1665. }
  1666. if (smallestIndoxInSamePriority != index)
  1667. {
  1668. IJobQueueItem * item = queue->getItem(smallestIndoxInSamePriority);
  1669. queue->moveBefore(req.getWuid(),item->queryWUID());
  1670. }
  1671. }
  1672. AccessSuccess(context, "Changed job priority %s",req.getWuid());
  1673. resp.setRedirectUrl("/WsSMC/");
  1674. }
  1675. catch(IException* e)
  1676. {
  1677. FORWARDEXCEPTION(context, e, ECLWATCH_INTERNAL_ERROR);
  1678. }
  1679. return true;
  1680. }
  1681. bool CWsSMCEx::onRemoveJob(IEspContext &context, IEspSMCJobRequest &req, IEspSMCJobResponse &resp)
  1682. {
  1683. try
  1684. {
  1685. checkAccess(context,THORQUEUE_FEATURE,SecAccess_Full);
  1686. secAbortWorkUnit(req.getWuid(), *context.querySecManager(), *context.queryUser());
  1687. Owned<IJobQueue> queue = createJobQueue(req.getQueueName());
  1688. QueueLock lock(queue);
  1689. unsigned index=queue->findRank(req.getWuid());
  1690. if(index<queue->ordinality())
  1691. {
  1692. if(!queue->cancelInitiateConversation(req.getWuid()))
  1693. throw MakeStringException(ECLWATCH_CANNOT_DELETE_WORKUNIT,"Failed to remove the workunit %s",req.getWuid());
  1694. }
  1695. AccessSuccess(context, "Removed job %s",req.getWuid());
  1696. resp.setRedirectUrl("/WsSMC/");
  1697. }
  1698. catch(IException* e)
  1699. {
  1700. FORWARDEXCEPTION(context, e, ECLWATCH_INTERNAL_ERROR);
  1701. }
  1702. return true;
  1703. }
  1704. bool CWsSMCEx::onStopQueue(IEspContext &context, IEspSMCQueueRequest &req, IEspSMCQueueResponse &resp)
  1705. {
  1706. try
  1707. {
  1708. checkAccess(context,THORQUEUE_FEATURE,SecAccess_Full);
  1709. StringBuffer info;
  1710. Owned<IJobQueue> queue = createJobQueue(req.getQueueName());
  1711. queue->stop(createQueueActionInfo(context, "stopped", req, info));
  1712. AccessSuccess(context, "Stopped queue %s",req.getCluster());
  1713. double version = context.getClientVersion();
  1714. if (version >= 1.19)
  1715. getStatusServerInfo(context, req.getServerType(), req.getCluster(), req.getNetworkAddress(), req.getPort(), resp.updateStatusServerInfo());
  1716. resp.setRedirectUrl("/WsSMC/");
  1717. }
  1718. catch(IException* e)
  1719. {
  1720. FORWARDEXCEPTION(context, e, ECLWATCH_INTERNAL_ERROR);
  1721. }
  1722. return true;
  1723. }
  1724. bool CWsSMCEx::onResumeQueue(IEspContext &context, IEspSMCQueueRequest &req, IEspSMCQueueResponse &resp)
  1725. {
  1726. try
  1727. {
  1728. checkAccess(context,THORQUEUE_FEATURE,SecAccess_Full);
  1729. StringBuffer info;
  1730. Owned<IJobQueue> queue = createJobQueue(req.getQueueName());
  1731. queue->resume(createQueueActionInfo(context, "resumed", req, info));
  1732. AccessSuccess(context, "Resumed queue %s",req.getCluster());
  1733. double version = context.getClientVersion();
  1734. if (version >= 1.19)
  1735. getStatusServerInfo(context, req.getServerType(), req.getCluster(), req.getNetworkAddress(), req.getPort(), resp.updateStatusServerInfo());
  1736. resp.setRedirectUrl("/WsSMC/");
  1737. }
  1738. catch(IException* e)
  1739. {
  1740. FORWARDEXCEPTION(context, e, ECLWATCH_INTERNAL_ERROR);
  1741. }
  1742. return true;
  1743. }
  1744. const char* CWsSMCEx::createQueueActionInfo(IEspContext &context, const char* state, IEspSMCQueueRequest &req, StringBuffer& info)
  1745. {
  1746. StringBuffer peer, currentTime;
  1747. context.getPeer(peer);
  1748. const char* userId = context.queryUserId();
  1749. if (!userId || !*userId)
  1750. userId = "Unknown user";
  1751. CDateTime now;
  1752. now.setNow();
  1753. now.getString(currentTime);
  1754. info.appendf("%s by <%s> at <%s> from <%s>", state, userId, currentTime.str(), peer.str());
  1755. const char* comment = req.getComment();
  1756. if (comment && *comment)
  1757. info.append(": ' ").append(comment).append("'");
  1758. return info.str();
  1759. }
  1760. bool CWsSMCEx::onPauseQueue(IEspContext &context, IEspSMCQueueRequest &req, IEspSMCQueueResponse &resp)
  1761. {
  1762. try
  1763. {
  1764. checkAccess(context,THORQUEUE_FEATURE,SecAccess_Full);
  1765. StringBuffer info;
  1766. Owned<IJobQueue> queue = createJobQueue(req.getQueueName());
  1767. queue->pause(createQueueActionInfo(context, "paused", req, info));
  1768. AccessSuccess(context, "Paused queue %s",req.getCluster());
  1769. double version = context.getClientVersion();
  1770. if (version >= 1.19)
  1771. getStatusServerInfo(context, req.getServerType(), req.getCluster(), req.getNetworkAddress(), req.getPort(), resp.updateStatusServerInfo());
  1772. resp.setRedirectUrl("/WsSMC/");
  1773. }
  1774. catch(IException* e)
  1775. {
  1776. FORWARDEXCEPTION(context, e, ECLWATCH_INTERNAL_ERROR);
  1777. }
  1778. return true;
  1779. }
  1780. bool CWsSMCEx::onClearQueue(IEspContext &context, IEspSMCQueueRequest &req, IEspSMCQueueResponse &resp)
  1781. {
  1782. try
  1783. {
  1784. checkAccess(context,THORQUEUE_FEATURE,SecAccess_Full);
  1785. Owned<IJobQueue> queue = createJobQueue(req.getQueueName());
  1786. {
  1787. QueueLock lock(queue);
  1788. for(unsigned i=0;i<queue->ordinality();i++)
  1789. secAbortWorkUnit(queue->getItem(i)->queryWUID(), *context.querySecManager(), *context.queryUser());
  1790. queue->clear();
  1791. }
  1792. AccessSuccess(context, "Cleared queue %s",req.getCluster());
  1793. double version = context.getClientVersion();
  1794. if (version >= 1.19)
  1795. getStatusServerInfo(context, req.getServerType(), req.getCluster(), req.getNetworkAddress(), req.getPort(), resp.updateStatusServerInfo());
  1796. resp.setRedirectUrl("/WsSMC/");
  1797. }
  1798. catch(IException* e)
  1799. {
  1800. FORWARDEXCEPTION(context, e, ECLWATCH_INTERNAL_ERROR);
  1801. }
  1802. return true;
  1803. }
  1804. void CWsSMCEx::setJobPriority(IWorkUnitFactory* factory, const char* wuid, const char* queueName, WUPriorityClass& priority)
  1805. {
  1806. if (!wuid || !*wuid)
  1807. throw MakeStringException(ECLWATCH_INVALID_INPUT, "Workunit ID not specified.");
  1808. if (!queueName || !*queueName)
  1809. throw MakeStringException(ECLWATCH_INVALID_INPUT, "queue not specified.");
  1810. Owned<IWorkUnit> lw = factory->updateWorkUnit(wuid);
  1811. if (!lw)
  1812. throw MakeStringException(ECLWATCH_CANNOT_UPDATE_WORKUNIT, "Cannot update Workunit %s", wuid);
  1813. lw->setPriority(priority);
  1814. // set job priority to queue
  1815. int priorityValue = lw->getPriorityValue();
  1816. {
  1817. CriticalBlock b(crit);
  1818. Owned<IJobQueue> queue = createJobQueue(queueName);
  1819. QueueLock lock(queue);
  1820. queue->changePriority(wuid,priorityValue);
  1821. }
  1822. return;
  1823. }
  1824. bool CWsSMCEx::onSetJobPriority(IEspContext &context, IEspSMCPriorityRequest &req, IEspSMCPriorityResponse &resp)
  1825. {
  1826. try
  1827. {
  1828. WUPriorityClass priority = PriorityClassNormal;
  1829. if(strieq(req.getPriority(),"high"))
  1830. priority = PriorityClassHigh;
  1831. else if(strieq(req.getPriority(),"low"))
  1832. priority = PriorityClassLow;
  1833. Owned<IWorkUnitFactory> factory = getSecWorkUnitFactory(*context.querySecManager(), *context.queryUser());
  1834. IArrayOf<IConstSMCJob>& jobs = req.getSMCJobs();
  1835. if (!jobs.length())
  1836. setJobPriority(factory, req.getWuid(), req.getQueueName(), priority);
  1837. else
  1838. {
  1839. ForEachItemIn(i, jobs)
  1840. {
  1841. IConstSMCJob &item = jobs.item(i);
  1842. const char *wuid = item.getWuid();
  1843. const char *queueName = item.getQueueName();
  1844. if (wuid && *wuid && queueName && *queueName)
  1845. setJobPriority(factory, wuid, queueName, priority);
  1846. }
  1847. }
  1848. resp.setRedirectUrl("/WsSMC/");
  1849. }
  1850. catch(IException* e)
  1851. {
  1852. FORWARDEXCEPTION(context, e, ECLWATCH_INTERNAL_ERROR);
  1853. }
  1854. return true;
  1855. }
  1856. bool CWsSMCEx::onGetThorQueueAvailability(IEspContext &context, IEspGetThorQueueAvailabilityRequest &req, IEspGetThorQueueAvailabilityResponse& resp)
  1857. {
  1858. try
  1859. {
  1860. if (!context.validateFeatureAccess(FEATURE_URL, SecAccess_Read, false))
  1861. throw MakeStringException(ECLWATCH_SMC_ACCESS_DENIED, "Failed to get Thor Queue availability. Permission denied.");
  1862. StringArray thorNames, groupNames, targetNames, queueNames;
  1863. getEnvironmentThorClusterNames(thorNames, groupNames, targetNames, queueNames);
  1864. IArrayOf<IEspThorCluster> ThorClusters;
  1865. ForEachItemIn(x, thorNames)
  1866. {
  1867. const char* targetName = targetNames.item(x);
  1868. const char* queueName = queueNames.item(x);
  1869. IEspThorCluster* returnCluster = new CThorCluster("","");
  1870. returnCluster->setClusterName(targetName);
  1871. returnCluster->setQueueName(queueName);
  1872. StringBuffer info;
  1873. Owned<IJobQueue> queue = createJobQueue(queueName);
  1874. if(queue->stopped(info))
  1875. returnCluster->setQueueStatus("stopped");
  1876. else if (queue->paused(info))
  1877. returnCluster->setQueueStatus("paused");
  1878. else
  1879. returnCluster->setQueueStatus("running");
  1880. unsigned enqueued=0;
  1881. unsigned connected=0;
  1882. unsigned waiting=0;
  1883. queue->getStats(connected,waiting,enqueued);
  1884. returnCluster->setQueueAvailable(waiting);
  1885. returnCluster->setJobsRunning(connected - waiting);
  1886. returnCluster->setJobsInQueue(enqueued);
  1887. ThorClusters.append(*returnCluster);
  1888. }
  1889. resp.setThorClusters(ThorClusters);
  1890. }
  1891. catch(IException* e)
  1892. {
  1893. FORWARDEXCEPTION(context, e, ECLWATCH_INTERNAL_ERROR);
  1894. }
  1895. return true;
  1896. }
  1897. bool CWsSMCEx::onSetBanner(IEspContext &context, IEspSetBannerRequest &req, IEspSetBannerResponse& resp)
  1898. {
  1899. try
  1900. {
  1901. #ifdef _USE_OPENLDAP
  1902. CLdapSecManager* secmgr = dynamic_cast<CLdapSecManager*>(context.querySecManager());
  1903. if(!secmgr || !secmgr->isSuperUser(context.queryUser()))
  1904. throw MakeStringException(ECLWATCH_SUPER_USER_ACCESS_DENIED, "access denied, administrators only.");
  1905. #endif
  1906. StringBuffer chatURLStr, bannerStr;
  1907. const char* chatURL = req.getChatURL();
  1908. const char* banner = req.getBannerContent();
  1909. //Only display valid strings
  1910. if (chatURL)
  1911. {
  1912. const char* pStr = chatURL;
  1913. for (unsigned i = 0; i < strlen(chatURL); i++)
  1914. {
  1915. if ((pStr[0] > 31) && (pStr[0] < 127))
  1916. chatURLStr.append(pStr[0]);
  1917. pStr++;
  1918. }
  1919. }
  1920. if (banner)
  1921. {
  1922. const char* pStr = banner;
  1923. for (unsigned i = 0; i < strlen(banner); i++)
  1924. {
  1925. if ((pStr[0] > 31) && (pStr[0] < 127))
  1926. bannerStr.append(pStr[0]);
  1927. pStr++;
  1928. }
  1929. }
  1930. chatURLStr.trim();
  1931. bannerStr.trim();
  1932. if (!req.getBannerAction_isNull() && req.getBannerAction() && (bannerStr.length() < 1))
  1933. {
  1934. throw MakeStringException(ECLWATCH_MISSING_BANNER_CONTENT, "If a Banner is enabled, the Banner content must be specified.");
  1935. }
  1936. if (!req.getEnableChatURL_isNull() && req.getEnableChatURL() && (!req.getChatURL() || !*req.getChatURL()))
  1937. {
  1938. throw MakeStringException(ECLWATCH_MISSING_CHAT_URL, "If a Chat is enabled, the Chat URL must be specified.");
  1939. }
  1940. m_ChatURL = chatURLStr;
  1941. m_Banner = bannerStr;
  1942. const char* bannerSize = req.getBannerSize();
  1943. if (bannerSize && *bannerSize)
  1944. m_BannerSize.clear().append(bannerSize);
  1945. const char* bannerColor = req.getBannerColor();
  1946. if (bannerColor && *bannerColor)
  1947. m_BannerColor.clear().append(bannerColor);
  1948. const char* bannerScroll = req.getBannerScroll();
  1949. if (bannerScroll && *bannerScroll)
  1950. m_BannerScroll.clear().append(bannerScroll);
  1951. m_BannerAction = 0;
  1952. if(!req.getBannerAction_isNull())
  1953. m_BannerAction = req.getBannerAction();
  1954. m_EnableChatURL = 0;
  1955. if(!req.getEnableChatURL_isNull())
  1956. m_EnableChatURL = req.getEnableChatURL();
  1957. resp.setRedirectUrl("/WsSMC/Activity");
  1958. }
  1959. catch(IException* e)
  1960. {
  1961. FORWARDEXCEPTION(context, e, ECLWATCH_INTERNAL_ERROR);
  1962. }
  1963. return true;
  1964. }
  1965. bool CWsSMCEx::onNotInCommunityEdition(IEspContext &context, IEspNotInCommunityEditionRequest &req, IEspNotInCommunityEditionResponse &resp)
  1966. {
  1967. return true;
  1968. }
  1969. bool CWsSMCEx::onBrowseResources(IEspContext &context, IEspBrowseResourcesRequest & req, IEspBrowseResourcesResponse & resp)
  1970. {
  1971. try
  1972. {
  1973. if (!context.validateFeatureAccess(FEATURE_URL, SecAccess_Read, false))
  1974. throw MakeStringException(ECLWATCH_SMC_ACCESS_DENIED, "Failed to Browse Resources. Permission denied.");
  1975. double version = context.getClientVersion();
  1976. Owned<IEnvironmentFactory> factory = getEnvironmentFactory();
  1977. Owned<IConstEnvironment> constEnv = factory->openEnvironmentByFile();
  1978. //The resource files will be downloaded from the same box of ESP (not dali)
  1979. StringBuffer ipStr;
  1980. IpAddress ipaddr = queryHostIP();
  1981. ipaddr.getIpText(ipStr);
  1982. if (ipStr.length() > 0)
  1983. {
  1984. resp.setNetAddress(ipStr.str());
  1985. Owned<IConstMachineInfo> machine = constEnv->getMachineByAddress(ipStr.str());
  1986. if (machine)
  1987. {
  1988. int os = machine->getOS();
  1989. resp.setOS(os);
  1990. }
  1991. }
  1992. if (m_PortalURL.length() > 0)
  1993. resp.setPortalURL(m_PortalURL.str());
  1994. #ifndef USE_RESOURCE
  1995. if (version > 1.12)
  1996. resp.setUseResource(false);
  1997. #else
  1998. if (version > 1.12)
  1999. resp.setUseResource(true);
  2000. //Now, get a list of resources stored inside the ESP box
  2001. IArrayOf<IEspHPCCResourceRepository> resourceRepositories;
  2002. Owned<IPropertyTree> pEnvRoot = &constEnv->getPTree();
  2003. const char* ossInstall = pEnvRoot->queryProp("EnvSettings/path");
  2004. if (!ossInstall || !*ossInstall)
  2005. {
  2006. WARNLOG("Failed to get EnvSettings/Path in environment settings.");
  2007. return true;
  2008. }
  2009. StringBuffer path;
  2010. path.appendf("%s/componentfiles/files/downloads", ossInstall);
  2011. Owned<IFile> f = createIFile(path.str());
  2012. if(!f->exists() || !f->isDirectory())
  2013. {
  2014. WARNLOG("Invalid resource folder");
  2015. return true;
  2016. }
  2017. Owned<IDirectoryIterator> di = f->directoryFiles(NULL, false, true);
  2018. if(di.get() == NULL)
  2019. {
  2020. WARNLOG("Resource folder is empty.");
  2021. return true;
  2022. }
  2023. ForEach(*di)
  2024. {
  2025. if (!di->isDir())
  2026. continue;
  2027. StringBuffer folder, path0, tmpBuf;
  2028. di->getName(folder);
  2029. if (folder.length() == 0)
  2030. continue;
  2031. path0.appendf("%s/%s/description.xml", path.str(), folder.str());
  2032. Owned<IFile> f0 = createIFile(path0.str());
  2033. if(!f0->exists())
  2034. {
  2035. WARNLOG("Description file not found for %s", folder.str());
  2036. continue;
  2037. }
  2038. OwnedIFileIO rIO = f0->openShared(IFOread,IFSHfull);
  2039. if(!rIO)
  2040. {
  2041. WARNLOG("Failed to open the description file for %s", folder.str());
  2042. continue;
  2043. }
  2044. offset_t fileSize = f0->size();
  2045. tmpBuf.ensureCapacity((unsigned)fileSize);
  2046. tmpBuf.setLength((unsigned)fileSize);
  2047. size32_t nRead = rIO->read(0, (size32_t) fileSize, (char*)tmpBuf.str());
  2048. if (nRead != fileSize)
  2049. {
  2050. WARNLOG("Failed to read the description file for %s", folder.str());
  2051. continue;
  2052. }
  2053. Owned<IPropertyTree> desc = createPTreeFromXMLString(tmpBuf.str());
  2054. if (!desc)
  2055. {
  2056. WARNLOG("Invalid description file for %s", folder.str());
  2057. continue;
  2058. }
  2059. Owned<IPropertyTreeIterator> fileIterator = desc->getElements("file");
  2060. if (!fileIterator->first())
  2061. {
  2062. WARNLOG("Invalid description file for %s", folder.str());
  2063. continue;
  2064. }
  2065. IArrayOf<IEspHPCCResource> resourcs;
  2066. do {
  2067. IPropertyTree &fileItem = fileIterator->query();
  2068. const char* filename = fileItem.queryProp("filename");
  2069. if (!filename || !*filename)
  2070. continue;
  2071. const char* name0 = fileItem.queryProp("name");
  2072. const char* description0 = fileItem.queryProp("description");
  2073. const char* version0 = fileItem.queryProp("version");
  2074. Owned<IEspHPCCResource> onefile = createHPCCResource();
  2075. onefile->setFileName(filename);
  2076. if (name0 && *name0)
  2077. onefile->setName(name0);
  2078. if (description0 && *description0)
  2079. onefile->setDescription(description0);
  2080. if (version0 && *version0)
  2081. onefile->setVersion(version0);
  2082. resourcs.append(*onefile.getLink());
  2083. } while (fileIterator->next());
  2084. if (resourcs.ordinality())
  2085. {
  2086. StringBuffer path1;
  2087. path1.appendf("%s/%s", path.str(), folder.str());
  2088. Owned<IEspHPCCResourceRepository> oneRepository = createHPCCResourceRepository();
  2089. oneRepository->setName(folder.str());
  2090. oneRepository->setPath(path1.str());
  2091. oneRepository->setHPCCResources(resourcs);
  2092. resourceRepositories.append(*oneRepository.getLink());
  2093. }
  2094. }
  2095. if (resourceRepositories.ordinality())
  2096. resp.setHPCCResourceRepositories(resourceRepositories);
  2097. #endif
  2098. }
  2099. catch(IException* e)
  2100. {
  2101. FORWARDEXCEPTION(context, e, ECLWATCH_INTERNAL_ERROR);
  2102. }
  2103. return true;
  2104. }
  2105. int CWsSMCSoapBindingEx::onGetForm(IEspContext &context, CHttpRequest* request, CHttpResponse* response, const char *service, const char *method)
  2106. {
  2107. try
  2108. {
  2109. if(stricmp(method,"NotInCommunityEdition")==0)
  2110. {
  2111. StringBuffer page, url, link;
  2112. request->getParameter("EEPortal", url);
  2113. if (url.length() > 0)
  2114. link.appendf("Further information can be found at <a href=\"%s\" target=\"_blank\">%s</a>.", url.str(), url.str());
  2115. page.append(
  2116. "<html>"
  2117. "<head>"
  2118. "<meta http-equiv=\"Content-Type\" content=\"text/html; charset=utf-8\" />"
  2119. "<link rel=\"stylesheet\" type=\"text/css\" href=\"/esp/files/default.css\"/>"
  2120. "<link rel=\"stylesheet\" type=\"text/css\" href=\"/esp/files/yui/build/fonts/fonts-min.css\" />"
  2121. "<title>Advanced feature in Enterprise Edition</title>"
  2122. "</head>"
  2123. "<body>"
  2124. "<h3 style=\"text-align:centre;\">Advanced feature in the Enterprise Edition</h4>"
  2125. "<p style=\"text-align:centre;\">Support for this feature is coming soon. ");
  2126. if (link.length() > 0)
  2127. page.append(link.str());
  2128. page.append("</p></body>"
  2129. "</html>");
  2130. response->setContent(page.str());
  2131. response->setContentType("text/html");
  2132. response->send();
  2133. return 0;
  2134. }
  2135. else if(stricmp(method,"DisabledInThisVersion")==0)
  2136. {
  2137. StringBuffer page;
  2138. page.append(
  2139. "<html>"
  2140. "<head>"
  2141. "<meta http-equiv=\"Content-Type\" content=\"text/html; charset=utf-8\" />"
  2142. "<link rel=\"stylesheet\" type=\"text/css\" href=\"/esp/files/default.css\"/>"
  2143. "<link rel=\"stylesheet\" type=\"text/css\" href=\"/esp/files/yui/build/fonts/fonts-min.css\" />"
  2144. "<title>Disabled Feature in This Version</title>"
  2145. "</head>"
  2146. "<body>"
  2147. "<h3 style=\"text-align:centre;\">Disabled Feature in This Version</h4>"
  2148. "<p style=\"text-align:centre;\">This feature is disabled in this version. ");
  2149. page.append("</p></body>"
  2150. "</html>");
  2151. response->setContent(page.str());
  2152. response->setContentType("text/html");
  2153. response->send();
  2154. return 0;
  2155. }
  2156. }
  2157. catch(IException* e)
  2158. {
  2159. FORWARDEXCEPTION(context, e, ECLWATCH_INTERNAL_ERROR);
  2160. }
  2161. return onGetForm(context, request, response, service, method);
  2162. }
  2163. inline const char *controlCmdMessage(int cmd)
  2164. {
  2165. switch (cmd)
  2166. {
  2167. case CRoxieControlCmd_ATTACH:
  2168. return "<control:unlockDali/>";
  2169. case CRoxieControlCmd_DETACH:
  2170. return "<control:lockDali/>";
  2171. case CRoxieControlCmd_RELOAD:
  2172. return "<control:reload/>";
  2173. case CRoxieControlCmd_STATE:
  2174. return "<control:state/>";
  2175. default:
  2176. throw MakeStringException(ECLWATCH_MISSING_PARAMS, "Unknown Roxie Control Command.");
  2177. }
  2178. return NULL;
  2179. }
  2180. bool CWsSMCEx::onRoxieControlCmd(IEspContext &context, IEspRoxieControlCmdRequest &req, IEspRoxieControlCmdResponse &resp)
  2181. {
  2182. if (!context.validateFeatureAccess(ROXIE_CONTROL_URL, SecAccess_Full, false))
  2183. throw MakeStringException(ECLWATCH_SMC_ACCESS_DENIED, "Cannot Access Roxie Control. Permission denied.");
  2184. const char *process = req.getProcessCluster();
  2185. if (!process || !*process)
  2186. throw MakeStringException(ECLWATCH_MISSING_PARAMS, "Process cluster not specified.");
  2187. const char *controlReq = controlCmdMessage(req.getCommand());
  2188. SocketEndpointArray addrs;
  2189. getRoxieProcessServers(process, addrs);
  2190. if (!addrs.length())
  2191. throw MakeStringException(ECLWATCH_CANNOT_GET_ENV_INFO, "Process cluster not found.");
  2192. Owned<IPropertyTree> controlResp = sendRoxieControlAllNodes(addrs.item(0), controlReq, true, req.getWait());
  2193. if (!controlResp)
  2194. throw MakeStringException(ECLWATCH_INTERNAL_ERROR, "Failed to get control response from roxie.");
  2195. IArrayOf<IEspRoxieControlEndpointInfo> respEndpoints;
  2196. Owned<IPropertyTreeIterator> roxieEndpoints = controlResp->getElements("Endpoint");
  2197. ForEach(*roxieEndpoints)
  2198. {
  2199. IPropertyTree &roxieEndpoint = roxieEndpoints->query();
  2200. Owned<IEspRoxieControlEndpointInfo> respEndpoint = createRoxieControlEndpointInfo();
  2201. respEndpoint->setAddress(roxieEndpoint.queryProp("@ep"));
  2202. respEndpoint->setStatus(roxieEndpoint.queryProp("Status"));
  2203. if (roxieEndpoint.hasProp("Dali/@connected"))
  2204. respEndpoint->setAttached(roxieEndpoint.getPropBool("Dali/@connected"));
  2205. if (roxieEndpoint.hasProp("State/@hash"))
  2206. respEndpoint->setStateHash(roxieEndpoint.queryProp("State/@hash"));
  2207. respEndpoints.append(*respEndpoint.getClear());
  2208. }
  2209. resp.setEndpoints(respEndpoints);
  2210. return true;
  2211. }
  2212. bool CWsSMCEx::onGetStatusServerInfo(IEspContext &context, IEspGetStatusServerInfoRequest &req, IEspGetStatusServerInfoResponse &resp)
  2213. {
  2214. getStatusServerInfo(context, req.getServerType(), req.getServerName(), req.getNetworkAddress(), req.getPort(), resp.updateStatusServerInfo());
  2215. return true;
  2216. }
  2217. void CWsSMCEx::getStatusServerInfo(IEspContext &context, const char *serverType, const char *server, const char *networkAddress, unsigned port,
  2218. IEspStatusServerInfo& statusServerInfo)
  2219. {
  2220. if (!serverType || !*serverType)
  2221. throw MakeStringException(ECLWATCH_MISSING_PARAMS, "Server type not specified.");
  2222. if (strieq(serverType,STATUS_SERVER_THOR) || strieq(serverType,STATUS_SERVER_HTHOR) || strieq(serverType,STATUS_SERVER_ROXIE))
  2223. {
  2224. if (!server || !*server)
  2225. throw MakeStringException(ECLWATCH_MISSING_PARAMS, "cluster not specified.");
  2226. getStatusServerInfo(context, server, statusServerInfo);
  2227. }
  2228. else if (!strieq(serverType,STATUS_SERVER_DFUSERVER))
  2229. {
  2230. if (!networkAddress || !*networkAddress)
  2231. throw MakeStringException(ECLWATCH_MISSING_PARAMS, "Server network address not specified.");
  2232. getStatusServerInfo(context, serverType, networkAddress, port, statusServerInfo);
  2233. }
  2234. else
  2235. {
  2236. if (!server || !*server)
  2237. throw MakeStringException(ECLWATCH_MISSING_PARAMS, "Server not specified.");
  2238. getDFUServerInfo(context, server, statusServerInfo);
  2239. }
  2240. return;
  2241. }
  2242. void CWsSMCEx::getStatusServerInfo(IEspContext &context, const char* clusteName, IEspStatusServerInfo& statusServerInfo)
  2243. {
  2244. double version = context.getClientVersion();
  2245. Owned<IConstWUClusterInfo> info = getTargetClusterInfo(clusteName);
  2246. if (!info)
  2247. throw MakeStringException(ECLWATCH_CANNOT_GET_ENV_INFO,"Failed to get target cluster information.");
  2248. CWsSMCTargetCluster targetCluster;
  2249. readTargetClusterInfo(context, *info, NULL, &targetCluster);
  2250. bool foundQueueInStatusServer = false;
  2251. Owned<IPropertyTree> statusServerTree = getStatusServerTree(info);
  2252. if (statusServerTree)
  2253. {
  2254. foundQueueInStatusServer = true;
  2255. BoolHash uniqueWUIDs;
  2256. IArrayOf<IEspActiveWorkunit> aws;
  2257. StringBuffer networkAddress;
  2258. statusServerTree->getProp("@node", networkAddress);
  2259. unsigned port = statusServerTree->getPropInt("@mpport");
  2260. readRunningWUsOnCluster(context, clusteName, networkAddress.str(), port, targetCluster, statusServerTree, uniqueWUIDs, aws);
  2261. readWUsAndStateFromJobQueue(context, targetCluster, uniqueWUIDs, aws);
  2262. statusServerInfo.setWorkunits(aws);
  2263. }
  2264. IEspTargetCluster& clusterInfo = statusServerInfo.updateTargetClusterInfo();
  2265. clusterInfo.setClusterName(targetCluster.clusterName.get());
  2266. clusterInfo.setClusterSize(targetCluster.clusterSize);
  2267. clusterInfo.setClusterType(targetCluster.clusterType);
  2268. clusterInfo.setQueueName(targetCluster.queueName.get());
  2269. clusterInfo.setQueueStatus(targetCluster.queueStatus.get());
  2270. if (targetCluster.clusterType != ThorLCRCluster)
  2271. targetCluster.agentQueue.foundQueueInStatusServer = foundQueueInStatusServer;
  2272. else
  2273. targetCluster.clusterQueue.foundQueueInStatusServer = foundQueueInStatusServer;
  2274. setClusterStatus(context, targetCluster, &clusterInfo);
  2275. }
  2276. void CWsSMCEx::getStatusServerInfo(IEspContext &context, const char* type, const char *networkAddress, unsigned port, IEspStatusServerInfo& statusServerInfo)
  2277. {
  2278. double version = context.getClientVersion();
  2279. Owned<IPropertyTree> statusServerTree = getStatusServerTree(networkAddress, port);
  2280. if (!statusServerTree)
  2281. throw MakeStringException(ECLWATCH_CANNOT_GET_STATUS_INFO, "Server not attached");
  2282. IEspServerJobQueue& serverInfo = statusServerInfo.updateServerInfo();
  2283. serverInfo.setNetworkAddress(networkAddress);
  2284. serverInfo.setPort(port);
  2285. StringBuffer queueName, instance;
  2286. statusServerTree->getProp("@queue", queueName);
  2287. setServerJobQueue(version, type, NULL, queueName.str(), serverInfo);
  2288. instance.appendf("%s on %s:%d", type, networkAddress, port);
  2289. IArrayOf<IEspActiveWorkunit> aws;
  2290. Owned<IPropertyTreeIterator> wuids(statusServerTree->getElements("WorkUnit"));
  2291. ForEach(*wuids)
  2292. {
  2293. const char* wuid=wuids->query().queryProp(NULL);
  2294. if (!wuid || !*wuid)
  2295. continue;
  2296. Owned<IEspActiveWorkunit> wu;
  2297. createActiveWorkUnit(wu, context, wuid, NULL, 0, type, queueName.str(), instance, NULL);
  2298. aws.append(*wu.getLink());
  2299. }
  2300. statusServerInfo.setWorkunits(aws);
  2301. }
  2302. void CWsSMCEx::getDFUServerInfo(IEspContext &context, const char* serverName, IEspStatusServerInfo& statusServerInfo)
  2303. {
  2304. double version = context.getClientVersion();
  2305. VStringBuffer xpath("/Environment/Software/%s[@name=\"%s\"]", eqDfu, serverName);
  2306. Owned<IRemoteConnection> connEnv = querySDS().connect(xpath.str(), myProcessSession(), RTM_LOCK_READ, SDS_LOCK_TIMEOUT);
  2307. if (!connEnv)
  2308. throw MakeStringException(ECLWATCH_CANNOT_GET_ENV_INFO,"Failed to get environment information.");
  2309. IPropertyTree* serviceTree = connEnv->queryRoot();
  2310. if (!serviceTree)
  2311. throw MakeStringException(ECLWATCH_CANNOT_GET_ENV_INFO,"Failed to get environment information.");
  2312. const char *queueName = serviceTree->queryProp("@queue");
  2313. if (!queueName || !*queueName)
  2314. throw MakeStringException(ECLWATCH_CANNOT_GET_ENV_INFO, "Server queue not found.");
  2315. setServerJobQueue(version, "DFUserver", serverName, queueName, statusServerInfo.updateServerInfo());
  2316. IArrayOf<IEspActiveWorkunit> aws;
  2317. readDFUWUs(context, queueName, serverName, aws);
  2318. statusServerInfo.setWorkunits(aws);
  2319. }
  2320. IPropertyTree* CWsSMCEx::getStatusServerTree(IConstWUClusterInfo* info)
  2321. {
  2322. SCMStringBuffer str;
  2323. StringBuffer xpath;
  2324. if (info->getPlatform() != HThorCluster)
  2325. {
  2326. if (info->getPlatform() == ThorLCRCluster)
  2327. xpath.setf("/Status/Servers/Server[@name=\"%s\"][@cluster=\"%s\"]", getStatusServerTypeName(WsSMCSSTThorLCRCluster), info->getThorProcesses().item(0));
  2328. else
  2329. xpath.setf("/Status/Servers/Server[@name=\"%s\"][@cluster=\"%s\"]", getStatusServerTypeName(WsSMCSSTRoxieCluster), info->getRoxieProcess(str).str());
  2330. Owned<IRemoteConnection> connStatusServer = querySDS().connect(xpath.str(),myProcessSession(),RTM_LOCK_READ,SDS_LOCK_TIMEOUT);
  2331. if (!connStatusServer)
  2332. return NULL;
  2333. Owned<IPropertyTree> retServerTree = connStatusServer->queryRoot()->getBranch(NULL);
  2334. return retServerTree.getClear();
  2335. }
  2336. else
  2337. {
  2338. Owned<IRemoteConnection> connStatusServer = querySDS().connect("/Status/Servers",myProcessSession(),RTM_LOCK_READ,SDS_LOCK_TIMEOUT);
  2339. if (!connStatusServer)
  2340. throw MakeStringException(ECLWATCH_CANNOT_GET_STATUS_INFO, "Status servers not found");
  2341. info->getAgentQueue(str);
  2342. xpath.setf("Server[@name=\"%s\"]", getStatusServerTypeName(WsSMCSSTHThorCluster));
  2343. Owned<IPropertyTreeIterator> it(connStatusServer->queryRoot()->getElements(xpath));
  2344. ForEach(*it)
  2345. {
  2346. IPropertyTree &serverTree = it->query();
  2347. const char *queueNames = serverTree.queryProp("@queue");
  2348. if (!queueNames || !*queueNames)
  2349. continue;
  2350. StringArray qlist;
  2351. qlist.appendListUniq(queueNames, ",");
  2352. ForEachItemIn(q, qlist)
  2353. {
  2354. if (!strieq(qlist.item(q), str.str()))
  2355. continue;
  2356. Owned<IPropertyTree> retServerTree = serverTree.getBranch(NULL);
  2357. return retServerTree.getClear();
  2358. }
  2359. }
  2360. }
  2361. return NULL;
  2362. }
  2363. IPropertyTree* CWsSMCEx::getStatusServerTree(const char *networkAddress, unsigned port)
  2364. {
  2365. if (!networkAddress || !*networkAddress)
  2366. throw MakeStringException(ECLWATCH_INVALID_INPUT, "Network Address not specified");
  2367. VStringBuffer xpath("/Status/Servers/Server[@node=\"%s\"][@mpport=\"%d\"]", networkAddress, port);
  2368. Owned<IRemoteConnection> connStatusServer = querySDS().connect(xpath.str(),myProcessSession(),RTM_LOCK_READ,SDS_LOCK_TIMEOUT);
  2369. if (!connStatusServer)
  2370. return NULL;
  2371. Owned<IPropertyTree> retServerTree = connStatusServer->queryRoot()->getBranch(NULL);
  2372. return retServerTree.getClear();
  2373. }
  2374. void CWsSMCEx::readRunningWUsOnCluster(IEspContext& context, const char* serverName, const char* node, unsigned port,
  2375. CWsSMCTargetCluster& targetCluster, IPropertyTree* statusServerNode, BoolHash& uniqueWUIDs, IArrayOf<IEspActiveWorkunit>& aws)
  2376. {
  2377. const char *cluster = statusServerNode->queryProp("Cluster");
  2378. StringBuffer queueName;
  2379. if (cluster) // backward compat check.
  2380. getClusterThorQueueName(queueName, cluster);
  2381. else
  2382. queueName.append(targetCluster.queueName.get());
  2383. CWsSMCQueue* jobQueue;
  2384. if (targetCluster.clusterType == ThorLCRCluster)
  2385. jobQueue = &targetCluster.clusterQueue;
  2386. else
  2387. jobQueue = &targetCluster.agentQueue;
  2388. StringBuffer instance;
  2389. if ((targetCluster.clusterType == ThorLCRCluster) || (targetCluster.clusterType == RoxieCluster))
  2390. statusServerNode->getProp("@cluster", instance);
  2391. else
  2392. instance.appendf("%s on %s:%d", serverName, node, port);
  2393. const char* targetClusterName = targetCluster.clusterName.get();
  2394. Owned<IPropertyTreeIterator> wuids(statusServerNode->getElements("WorkUnit"));
  2395. ForEach(*wuids)
  2396. {
  2397. const char* wuid=wuids->query().queryProp(NULL);
  2398. if (!wuid || !*wuid)
  2399. continue;
  2400. Owned<IEspActiveWorkunit> wu;
  2401. createActiveWorkUnit(wu, context, wuid, !strieq(targetClusterName, instance.str()) ? instance.str() : NULL, 0, serverName, queueName, instance.str(), targetClusterName);
  2402. if (wu->getStateID() == WUStateRunning) //'aborting' may be another possible status
  2403. {
  2404. StringBuffer durationStr, subgraphStr;
  2405. int sgDuration = statusServerNode->getPropInt("@sg_duration", -1);
  2406. int subgraph = statusServerNode->getPropInt("@subgraph", -1);
  2407. const char* graph = statusServerNode->queryProp("@graph");
  2408. durationStr.appendf("%d min", sgDuration);
  2409. subgraphStr.appendf("%d", subgraph);
  2410. if (subgraph > -1 && sgDuration > -1)
  2411. {
  2412. wu->setGraphName(graph);
  2413. wu->setDuration(durationStr.str());
  2414. wu->setGID(subgraphStr.str());
  2415. }
  2416. if (statusServerNode->getPropInt("@memoryBlocked ", 0) != 0)
  2417. wu->setMemoryBlocked(1);
  2418. }
  2419. aws.append(*wu.getLink());
  2420. jobQueue->countRunningJobs++;
  2421. }
  2422. }
  2423. void CWsSMCEx::setServerJobQueue(double version, const char* serverType, const char* serverName, const char* queueName, IEspServerJobQueue& serverInfo)
  2424. {
  2425. StringBuffer queueState, queueStateDetails;
  2426. Owned<IJobQueue> queue = createJobQueue(queueName);
  2427. if (queue->stopped(queueStateDetails))
  2428. queueState.set("stopped");
  2429. else if (queue->paused(queueStateDetails))
  2430. queueState.set("paused");
  2431. else
  2432. queueState.set("running");
  2433. serverInfo.setQueueName(queueName);
  2434. serverInfo.setServerType(serverType);
  2435. if (serverName && *serverName)
  2436. serverInfo.setServerName(serverName);
  2437. setServerJobQueueStatus(version, &serverInfo, queueState, queueStateDetails);
  2438. }