ws_smcService.cpp 94 KB


  1. /*##############################################################################
  2. HPCC SYSTEMS software Copyright (C) 2012 HPCC Systems®.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. ############################################################################## */
  13. #pragma warning (disable : 4786)
  14. #include "build-config.h"
  15. #ifdef _USE_OPENLDAP
  16. #include "ldapsecurity.ipp"
  17. #endif
  18. #include "ws_smcService.hpp"
  19. #include "wshelpers.hpp"
  20. #include "dalienv.hpp"
  21. #include "dasds.hpp"
  22. #include "WUWrapper.hpp"
  23. #include "dfuwu.hpp"
  24. #include "exception_util.hpp"
  25. #include "roxiecontrol.hpp"
  26. #include "workunit.hpp"
  27. #define STATUS_SERVER_THOR "ThorMaster"
  28. #define STATUS_SERVER_HTHOR "HThorServer"
  29. #define STATUS_SERVER_ROXIE "RoxieServer"
  30. #define STATUS_SERVER_DFUSERVER "DFUserver"
  31. #define STATUS_SERVER_ECLSERVER "ECLserver"
  32. #define STATUS_SERVER_ECLCCSERVER "ECLCCserver"
  33. #define STATUS_SERVER_ECLAGENT "ECLagent"
  34. #define CLUSTER_TYPE_THOR "Thor"
  35. #define CLUSTER_TYPE_HTHOR "HThor"
  36. #define CLUSTER_TYPE_ROXIE "Roxie"
  37. static const char* FEATURE_URL = "SmcAccess";
  38. const char* THORQUEUE_FEATURE = "ThorQueueAccess";
  39. static const char* ROXIE_CONTROL_URL = "RoxieControlAccess";
  40. static const char* OWN_WU_ACCESS = "OwnWorkunitsAccess";
  41. static const char* OTHERS_WU_ACCESS = "OthersWorkunitsAccess";
  42. const char* PERMISSIONS_FILENAME = "espsmc_permissions.xml";
  43. const unsigned DEFAULTACTIVITYINFOCACHETIMEOUTSECOND = 10;
  44. void AccessSuccess(IEspContext& context, char const * msg,...) __attribute__((format(printf, 2, 3)));
  45. void AccessSuccess(IEspContext& context, char const * msg,...)
  46. {
  47. StringBuffer buf;
  48. buf.appendf("User %s: ",context.queryUserId());
  49. va_list args;
  50. va_start(args, msg);
  51. buf.valist_appendf(msg, args);
  52. va_end(args);
  53. AUDIT(AUDIT_TYPE_ACCESS_SUCCESS,buf.str());
  54. }
  55. void AccessFailure(IEspContext& context, char const * msg,...) __attribute__((format(printf, 2, 3)));
  56. void AccessFailure(IEspContext& context, char const * msg,...)
  57. {
  58. StringBuffer buf;
  59. buf.appendf("User %s: ",context.queryUserId());
  60. va_list args;
  61. va_start(args, msg);
  62. buf.valist_appendf(msg, args);
  63. va_end(args);
  64. AUDIT(AUDIT_TYPE_ACCESS_FAILURE,buf.str());
  65. }
  66. struct QueueLock
  67. {
  68. QueueLock(IJobQueue* q): queue(q) { queue->lock(); }
  69. ~QueueLock()
  70. {
  71. queue->unlock();
  72. }
  73. Linked<IJobQueue> queue;
  74. };
  75. static int sortTargetClustersByNameDescending(IInterface * const *L, IInterface * const *R)
  76. {
  77. IEspTargetCluster *left = (IEspTargetCluster *) *L;
  78. IEspTargetCluster *right = (IEspTargetCluster *) *R;
  79. return strcmp(right->getClusterName(), left->getClusterName());
  80. }
  81. static int sortTargetClustersByNameAscending(IInterface * const *L, IInterface * const *R)
  82. {
  83. IEspTargetCluster *left = (IEspTargetCluster *) *L;
  84. IEspTargetCluster *right = (IEspTargetCluster *) *R;
  85. return strcmp(left->getClusterName(), right->getClusterName());
  86. }
  87. static int sortTargetClustersBySizeDescending(IInterface * const *L, IInterface * const *R)
  88. {
  89. IEspTargetCluster *left = (IEspTargetCluster *) *L;
  90. IEspTargetCluster *right = (IEspTargetCluster *) *R;
  91. return right->getClusterSize() - left->getClusterSize();
  92. }
  93. static int sortTargetClustersBySizeAscending(IInterface * const *L, IInterface * const *R)
  94. {
  95. IEspTargetCluster *left = (IEspTargetCluster *) *L;
  96. IEspTargetCluster *right = (IEspTargetCluster *) *R;
  97. return left->getClusterSize() - right->getClusterSize();
  98. }
  99. void CWsSMCEx::init(IPropertyTree *cfg, const char *process, const char *service)
  100. {
  101. if (!daliClientActive())
  102. {
  103. ERRLOG("No Dali Connection Active.");
  104. throw MakeStringException(-1, "No Dali Connection Active. Please Specify a Dali to connect to in you configuration file");
  105. }
  106. m_BannerAction = 0;
  107. m_EnableChatURL = false;
  108. m_BannerSize = "4";
  109. m_BannerColor = "red";
  110. m_BannerScroll = "2";
  111. StringBuffer xpath;
  112. xpath.appendf("Software/EspProcess[@name='%s']/@portalurl", process);
  113. const char* portalURL = cfg->queryProp(xpath.str());
  114. if (portalURL && *portalURL)
  115. m_PortalURL.append(portalURL);
  116. xpath.setf("Software/EspProcess[@name=\"%s\"]/EspService[@name=\"%s\"]/ActivityInfoCacheSeconds", process, service);
  117. activityInfoCacheSeconds = cfg->getPropInt(xpath.str(), DEFAULTACTIVITYINFOCACHETIMEOUTSECOND);
  118. }
  119. struct CActiveWorkunitWrapper: public CActiveWorkunit
  120. {
  121. CActiveWorkunitWrapper(IEspContext &context, const char* wuid,const char* location = NULL,unsigned index=0): CActiveWorkunit("","")
  122. {
  123. CWUWrapper wu(wuid, context);
  124. setActiveWorkunit(wu, wuid, location, index, context.getClientVersion(), false);
  125. }
  126. CActiveWorkunitWrapper(const char* wuid, const char* location = NULL, unsigned index=0): CActiveWorkunit("","")
  127. {
  128. CWUWrapper wu(wuid);
  129. setActiveWorkunit(wu, wuid, location, index, 0.0, true);
  130. }
  131. CActiveWorkunitWrapper(const char* wuid,const char* owner, const char* jobname, const char* state, const char* priority): CActiveWorkunit("","")
  132. {
  133. setWuid(wuid);
  134. setState(state);
  135. setOwner(owner);
  136. setJobname(jobname);
  137. setPriority(priority);
  138. }
  139. void setActiveWorkunit(CWUWrapper& wu, const char* wuid, const char* location, unsigned index, double version, bool notCheckVersion)
  140. {
  141. SCMStringBuffer stateEx;
  142. setWuid(wuid);
  143. const char *state = wu->queryStateDesc();
  144. setStateID(wu->getState());
  145. if (wu->getState() == WUStateBlocked)
  146. {
  147. wu->getStateEx(stateEx);
  148. if (notCheckVersion || (version > 1.00))
  149. setExtra(stateEx.str());
  150. }
  151. buildAndSetState(state, stateEx.str(), location, index);
  152. if ((notCheckVersion || (version > 1.09)) && (wu->getState() == WUStateFailed))
  153. setWarning("The job will ultimately not complete. Please check ECLAgent.");
  154. setOwner(wu->queryUser());
  155. setJobname(wu->queryJobName());
  156. setPriorityStr(wu->getPriority());
  157. if ((notCheckVersion || (version > 1.08)) && wu->isPausing())
  158. {
  159. setIsPausing(true);
  160. }
  161. if (notCheckVersion || (version > 1.14))
  162. {
  163. setClusterName(wu->queryClusterName());
  164. }
  165. }
  166. void buildAndSetState(const char* state, const char* stateEx, const char* location, unsigned index)
  167. {
  168. if (!state || !*state)
  169. return;
  170. StringBuffer stateStr;
  171. if(index && location && *location)
  172. stateStr.setf("queued(%d) [%s on %s]", index, state, location);
  173. else if(index)
  174. stateStr.setf("queued(%d) [%s]", index, state);
  175. else if(location && *location)
  176. stateStr.setf("%s [%s]", state, location);
  177. else
  178. stateStr.set(state);
  179. if (stateEx && *stateEx)
  180. stateStr.appendf(" %s", stateEx);
  181. setState(stateStr.str());
  182. }
  183. void setPriorityStr(unsigned priorityType)
  184. {
  185. switch(priorityType)
  186. {
  187. case PriorityClassHigh: setPriority("high"); break;
  188. default:
  189. case PriorityClassNormal: setPriority("normal"); break;
  190. case PriorityClassLow: setPriority("low"); break;
  191. }
  192. return;
  193. }
  194. };
  195. bool CActivityInfo::isCachedActivityInfoValid(unsigned timeOutSeconds)
  196. {
  197. CDateTime timeNow;
  198. timeNow.setNow();
  199. return timeNow.getSimple() <= timeCached.getSimple() + timeOutSeconds;;
  200. }
  201. void CActivityInfo::createActivityInfo(IEspContext& context)
  202. {
  203. Owned<IEnvironmentFactory> factory = getEnvironmentFactory();
  204. Owned<IConstEnvironment> env = factory->openEnvironment();
  205. if (!env)
  206. throw MakeStringException(ECLWATCH_CANNOT_GET_ENV_INFO,"Failed to get environment information.");
  207. CConstWUClusterInfoArray clusters;
  208. Owned<IPropertyTree> envRoot= &env->getPTree();
  209. getEnvironmentClusterInfo(envRoot, clusters);
  210. try
  211. {
  212. jobQueueSnapshot.setown(createJQSnapshot());
  213. }
  214. catch(IException* e)
  215. {
  216. EXCLOG(e, "CActivityInfo::createActivityInfo");
  217. e->Release();
  218. }
  219. Owned<IRemoteConnection> connStatusServers = querySDS().connect("/Status/Servers",myProcessSession(),RTM_LOCK_READ,30000);
  220. if (!connStatusServers)
  221. throw MakeStringException(ECLWATCH_CANNOT_GET_STATUS_INFO, "Failed to get status server information.");
  222. IPropertyTree* serverStatusRoot = connStatusServers->queryRoot();
  223. readTargetClusterInfo(clusters, serverStatusRoot);
  224. readActiveWUsAndQueuedWUs(context, envRoot, serverStatusRoot);
  225. timeCached.setNow();
  226. }
  227. void CActivityInfo::readTargetClusterInfo(CConstWUClusterInfoArray& clusters, IPropertyTree* serverStatusRoot)
  228. {
  229. ForEachItemIn(c, clusters)
  230. {
  231. IConstWUClusterInfo &cluster = clusters.item(c);
  232. Owned<CWsSMCTargetCluster> targetCluster = new CWsSMCTargetCluster();
  233. readTargetClusterInfo(cluster, serverStatusRoot, targetCluster);
  234. if (cluster.getPlatform() == ThorLCRCluster)
  235. thorTargetClusters.append(*targetCluster.getClear());
  236. else if (cluster.getPlatform() == RoxieCluster)
  237. roxieTargetClusters.append(*targetCluster.getClear());
  238. else
  239. hthorTargetClusters.append(*targetCluster.getClear());
  240. }
  241. }
  242. void CActivityInfo::readTargetClusterInfo(IConstWUClusterInfo& cluster, IPropertyTree* serverStatusRoot, CWsSMCTargetCluster* targetCluster)
  243. {
  244. SCMStringBuffer clusterName;
  245. cluster.getName(clusterName);
  246. targetCluster->clusterName.set(clusterName.str());
  247. targetCluster->clusterType = cluster.getPlatform();
  248. targetCluster->clusterSize = cluster.getSize();
  249. cluster.getServerQueue(targetCluster->serverQueue.queueName);
  250. cluster.getAgentQueue(targetCluster->agentQueue.queueName);
  251. StringBuffer statusServerName;
  252. CWsSMCQueue* smcQueue = NULL;
  253. if (targetCluster->clusterType == ThorLCRCluster)
  254. {
  255. statusServerName.set(getStatusServerTypeName(WsSMCSSTThorLCRCluster));
  256. smcQueue = &targetCluster->clusterQueue;
  257. cluster.getThorQueue(smcQueue->queueName);
  258. }
  259. else if (targetCluster->clusterType == RoxieCluster)
  260. {
  261. statusServerName.set(getStatusServerTypeName(WsSMCSSTRoxieCluster));
  262. smcQueue = &targetCluster->agentQueue;
  263. }
  264. else
  265. {
  266. statusServerName.set(getStatusServerTypeName(WsSMCSSTHThorCluster));
  267. smcQueue = &targetCluster->agentQueue;
  268. }
  269. targetCluster->statusServerName.set(statusServerName.str());
  270. targetCluster->queueName.set(smcQueue->queueName.str());
  271. bool validQueue = readJobQueue(smcQueue->queueName.str(), targetCluster->queuedWUIDs, smcQueue->queueState, smcQueue->queueStateDetails);
  272. if (validQueue && smcQueue->queueState.length())
  273. targetCluster->queueStatus.set(smcQueue->queueState.str());
  274. if (serverStatusRoot)
  275. {
  276. smcQueue->foundQueueInStatusServer = findQueueInStatusServer(serverStatusRoot, statusServerName.str(), targetCluster->queueName.get());
  277. if (!smcQueue->foundQueueInStatusServer)
  278. targetCluster->clusterStatusDetails.appendf("Cluster %s not attached; ", clusterName.str());
  279. }
  280. readJobQueue(targetCluster->serverQueue.queueName.str(), targetCluster->wuidsOnServerQueue, targetCluster->serverQueue.queueState, targetCluster->serverQueue.queueStateDetails);
  281. }
  282. bool CActivityInfo::readJobQueue(const char* queueName, StringArray& wuids, StringBuffer& state, StringBuffer& stateDetails)
  283. {
  284. if (!queueName || !*queueName)
  285. return false;
  286. if (!jobQueueSnapshot)
  287. {
  288. WARNLOG("CActivityInfo::readJobQueue: jobQueueSnapshot not found.");
  289. return false;
  290. }
  291. Owned<IJobQueueConst> jobQueue = jobQueueSnapshot->getJobQueue(queueName);
  292. if (!jobQueue)
  293. {
  294. WARNLOG("CActivityInfo::readJobQueue: failed to get info for job queue %s", queueName);
  295. return false;
  296. }
  297. CJobQueueContents queuedJobs;
  298. jobQueue->copyItemsAndState(queuedJobs, state, stateDetails);
  299. Owned<IJobQueueIterator> iter = queuedJobs.getIterator();
  300. ForEach(*iter)
  301. {
  302. const char* wuid = iter->query().queryWUID();
  303. if (wuid && *wuid)
  304. wuids.append(wuid);
  305. }
  306. return true;
  307. }
  308. const char *CActivityInfo::getStatusServerTypeName(WsSMCStatusServerType type)
  309. {
  310. return (type < WsSMCSSTterm) ? WsSMCStatusServerTypeNames[type] : NULL;
  311. }
  312. bool CActivityInfo::findQueueInStatusServer(IPropertyTree* serverStatusRoot, const char* serverName, const char* queueName)
  313. {
  314. VStringBuffer path("Server[@name=\"%s\"]", serverName);
  315. Owned<IPropertyTreeIterator> it(serverStatusRoot->getElements(path.str()));
  316. ForEach(*it)
  317. {
  318. IPropertyTree& serverStatusNode = it->query();
  319. const char* queue = serverStatusNode.queryProp("@queue");
  320. if (!queue || !*queue)
  321. continue;
  322. StringArray qlist;
  323. qlist.appendListUniq(queue, ",");
  324. ForEachItemIn(q, qlist)
  325. {
  326. if (strieq(qlist.item(q), queueName))
  327. return true;
  328. }
  329. }
  330. return false;
  331. }
  332. void CActivityInfo::readActiveWUsAndQueuedWUs(IEspContext& context, IPropertyTree* envRoot, IPropertyTree* serverStatusRoot)
  333. {
  334. readRunningWUsOnStatusServer(context, serverStatusRoot, WsSMCSSTThorLCRCluster);
  335. readWUsInTargetClusterJobQueues(context, thorTargetClusters);
  336. readRunningWUsOnStatusServer(context, serverStatusRoot, WsSMCSSTRoxieCluster);
  337. readWUsInTargetClusterJobQueues(context, roxieTargetClusters);
  338. readRunningWUsOnStatusServer(context, serverStatusRoot, WsSMCSSTHThorCluster);
  339. readWUsInTargetClusterJobQueues(context, hthorTargetClusters);
  340. readRunningWUsOnStatusServer(context, serverStatusRoot, WsSMCSSTECLagent);
  341. readRunningWUsAndJobQueueforOtherStatusServers(context, serverStatusRoot);
  342. //TODO: add queued WUs for ECLCCServer/ECLServer here. Right now, they are under target clusters.
  343. getDFUServersAndWUs(envRoot, serverStatusRoot);
  344. getDFURecoveryJobs();
  345. }
  346. void CActivityInfo::readRunningWUsOnStatusServer(IEspContext& context, IPropertyTree* serverStatusRoot, WsSMCStatusServerType statusServerType)
  347. {
  348. const char* serverName = getStatusServerTypeName(statusServerType);
  349. if (!serverName || !*serverName)
  350. return;
  351. bool isECLAgent = (statusServerType == WsSMCSSTECLagent);
  352. VStringBuffer path("Server[@name=\"%s\"]", serverName);
  353. Owned<IPropertyTreeIterator> itrStatusServer(serverStatusRoot->getElements(path.str()));
  354. ForEach(*itrStatusServer)
  355. {
  356. IPropertyTree& serverStatusNode = itrStatusServer->query();
  357. StringBuffer serverInstance;
  358. if ((statusServerType == WsSMCSSTThorLCRCluster) || (statusServerType == WsSMCSSTRoxieCluster))
  359. serverStatusNode.getProp("@cluster", serverInstance);
  360. else
  361. serverInstance.appendf("%s on %s", serverName, serverStatusNode.queryProp("@node"));
  362. Owned<IPropertyTreeIterator> wuids(serverStatusNode.getElements("WorkUnit"));
  363. ForEach(*wuids)
  364. {
  365. const char* wuid=wuids->query().queryProp(NULL);
  366. if (!wuid || !*wuid || checkSetUniqueECLWUID(wuid))
  367. continue;
  368. CWsSMCTargetCluster* targetCluster;
  369. if (statusServerType == WsSMCSSTRoxieCluster)
  370. targetCluster = findWUClusterInfo(wuid, isECLAgent, roxieTargetClusters, thorTargetClusters, hthorTargetClusters);
  371. else if (statusServerType == WsSMCSSTHThorCluster)
  372. targetCluster = findWUClusterInfo(wuid, isECLAgent, hthorTargetClusters, thorTargetClusters, roxieTargetClusters);
  373. else
  374. targetCluster = findWUClusterInfo(wuid, isECLAgent, thorTargetClusters, roxieTargetClusters, hthorTargetClusters);
  375. if (!targetCluster)
  376. continue;
  377. const char* targetClusterName = targetCluster->clusterName.get();
  378. CWsSMCQueue* jobQueue;
  379. if (statusServerType == WsSMCSSTThorLCRCluster)
  380. jobQueue = &targetCluster->clusterQueue;
  381. else
  382. jobQueue = &targetCluster->agentQueue;
  383. Owned<IEspActiveWorkunit> wu;
  384. if (!isECLAgent)
  385. {
  386. const char *cluster = serverStatusNode.queryProp("Cluster");
  387. StringBuffer queueName;
  388. if (cluster) // backward compat check.
  389. getClusterThorQueueName(queueName, cluster);
  390. else
  391. queueName.append(targetCluster->queueName.get());
  392. createActiveWorkUnit(context, wu, wuid, !strieq(targetClusterName, serverInstance.str()) ? serverInstance.str() : NULL, 0, serverName,
  393. queueName, serverInstance.str(), targetClusterName, false);
  394. if (wu->getStateID() == WUStateRunning) //'aborting' may be another possible status
  395. {
  396. int sgDuration = serverStatusNode.getPropInt("@sg_duration", -1);
  397. int subgraph = serverStatusNode.getPropInt("@subgraph", -1);
  398. if (subgraph > -1 && sgDuration > -1)
  399. {
  400. const char* graph = serverStatusNode.queryProp("@graph");
  401. VStringBuffer durationStr("%d min", sgDuration);
  402. VStringBuffer subgraphStr("%d", subgraph);
  403. wu->setGraphName(graph);
  404. wu->setDuration(durationStr.str());
  405. wu->setGID(subgraphStr.str());
  406. }
  407. if (serverStatusNode.getPropInt("@memoryBlocked ", 0) != 0)
  408. wu->setMemoryBlocked(1);
  409. }
  410. }
  411. else
  412. {
  413. createActiveWorkUnit(context, wu, wuid, serverInstance.str(), 0, serverName, serverName, serverInstance.str(), targetClusterName, false);
  414. if (targetCluster->clusterType == ThorLCRCluster)
  415. wu->setClusterType(CLUSTER_TYPE_THOR);
  416. else if (targetCluster->clusterType == RoxieCluster)
  417. wu->setClusterType(CLUSTER_TYPE_ROXIE);
  418. else
  419. wu->setClusterType(CLUSTER_TYPE_HTHOR);
  420. wu->setClusterQueueName(targetCluster->queueName.get());
  421. if (wu->getStateID() != WUStateRunning)
  422. {
  423. const char *extra = wu->getExtra();
  424. if (wu->getStateID() != WUStateBlocked || !extra || !*extra) // Blocked on persist treated as running here
  425. {
  426. aws.append(*wu.getClear());
  427. jobQueue->countQueuedJobs++;
  428. continue;
  429. }
  430. }
  431. //Should this be set only if wu->getStateID() == WUStateRunning?
  432. if (serverStatusNode.getPropInt("@memoryBlocked ", 0) != 0)
  433. wu->setMemoryBlocked(1);
  434. }
  435. aws.append(*wu.getClear());
  436. jobQueue->countRunningJobs++;
  437. }
  438. }
  439. }
  440. bool CActivityInfo::checkSetUniqueECLWUID(const char* wuid)
  441. {
  442. bool* idFound = uniqueECLWUIDs.getValue(wuid);
  443. if (!idFound || !*idFound)
  444. uniqueECLWUIDs.setValue(wuid, true);
  445. return idFound && *idFound;
  446. }
  447. CWsSMCTargetCluster* CActivityInfo::findWUClusterInfo(const char* wuid, bool isOnECLAgent, CIArrayOf<CWsSMCTargetCluster>& targetClusters,
  448. CIArrayOf<CWsSMCTargetCluster>& targetClusters1, CIArrayOf<CWsSMCTargetCluster>& targetClusters2)
  449. {
  450. StringAttr clusterName;
  451. try
  452. {
  453. Owned<IWorkUnitFactory> factory = getWorkUnitFactory();
  454. Owned<IConstWorkUnit> cw = factory->openWorkUnit(wuid);
  455. if (!cw)
  456. return NULL;
  457. clusterName.set(cw->queryClusterName());
  458. if (!clusterName.length())
  459. return NULL;
  460. }
  461. catch (IException *e)
  462. {//Exception may be thrown when the openWorkUnit() is called inside the CWUWrapper
  463. StringBuffer msg;
  464. WARNLOG("Failed to open workunit %s: %s", wuid, e->errorMessage(msg).str());
  465. e->Release();
  466. return NULL;
  467. }
  468. const char* cluster = clusterName.str();
  469. CWsSMCTargetCluster* targetCluster = findTargetCluster(cluster, targetClusters);
  470. if (targetCluster || !isOnECLAgent)
  471. return targetCluster;
  472. targetCluster = findTargetCluster(cluster, targetClusters1);
  473. if (targetCluster)
  474. return targetCluster;
  475. return findTargetCluster(cluster, targetClusters2);
  476. }
  477. CWsSMCTargetCluster* CActivityInfo::findTargetCluster(const char* clusterName, CIArrayOf<CWsSMCTargetCluster>& targetClusters)
  478. {
  479. ForEachItemIn(i, targetClusters)
  480. {
  481. CWsSMCTargetCluster& targetCluster = targetClusters.item(i);
  482. if (strieq(targetCluster.clusterName.get(), clusterName))
  483. return &targetCluster;
  484. }
  485. return NULL;
  486. }
  487. void CActivityInfo::createActiveWorkUnit(IEspContext& context, Owned<IEspActiveWorkunit>& ownedWU, const char* wuid, const char* location,
  488. unsigned index, const char* serverName, const char* queueName, const char* instanceName, const char* targetClusterName, bool useContext)
  489. {
  490. try
  491. {
  492. if (useContext)
  493. ownedWU.setown(new CActiveWorkunitWrapper(context, wuid, location, index));
  494. else
  495. ownedWU.setown(new CActiveWorkunitWrapper(wuid, location, index));
  496. }
  497. catch (IException *e)
  498. { //if the wu cannot be opened for some reason, the openWorkUnit() inside the CActiveWorkunitWrapper() may throw an exception.
  499. //We do not want the exception stops this process of retrieving/showing all active WUs. And that WU should still be displayed
  500. //with the exception.
  501. StringBuffer msg;
  502. ownedWU.setown(new CActiveWorkunitWrapper(wuid, "", "", e->errorMessage(msg).str(), "normal"));
  503. ownedWU->setStateID(WUStateUnknown);
  504. e->Release();
  505. }
  506. ownedWU->setServer(serverName);
  507. ownedWU->setQueueName(queueName);
  508. if (instanceName && *instanceName)
  509. ownedWU->setInstance(instanceName); // JCSMORE In thor case at least, if queued it is unknown which instance it will run on..
  510. if (targetClusterName && *targetClusterName)
  511. ownedWU->setTargetClusterName(targetClusterName);
  512. }
  513. void CActivityInfo::readWUsInTargetClusterJobQueues(IEspContext& context, CIArrayOf<CWsSMCTargetCluster>& targetClusters)
  514. {
  515. ForEachItemIn(i, targetClusters)
  516. {
  517. CWsSMCTargetCluster& targetCluster = targetClusters.item(i);
  518. if (targetCluster.clusterType == ThorLCRCluster)
  519. readWUsInTargetClusterJobQueue(context, targetCluster, targetCluster.clusterQueue, targetCluster.clusterName.get());
  520. if (targetCluster.agentQueue.queueName.length())
  521. readWUsInTargetClusterJobQueue(context, targetCluster, targetCluster.agentQueue, targetCluster.agentQueue.queueName.str());
  522. if (targetCluster.serverQueue.queueName.length()) //TODO: queued WUs for ECLCCServer/ECLServer should not be here.
  523. readWUsInTargetClusterJobQueue(context, targetCluster, targetCluster.serverQueue, targetCluster.serverQueue.queueName.str());
  524. }
  525. }
  526. void CActivityInfo::readWUsInTargetClusterJobQueue(IEspContext& context, CWsSMCTargetCluster& targetCluster, CWsSMCQueue& jobQueue, const char* queueName)
  527. {
  528. ForEachItemIn(i, targetCluster.queuedWUIDs)
  529. {
  530. const char* wuid = targetCluster.queuedWUIDs.item(i);
  531. if (!wuid || !*wuid || checkSetUniqueECLWUID(wuid))
  532. continue;
  533. Owned<IEspActiveWorkunit> wu;
  534. createActiveWorkUnit(context, wu, wuid, jobQueue.queueName.str(), ++jobQueue.countQueuedJobs, targetCluster.statusServerName.str(),
  535. queueName, NULL, targetCluster.clusterName.get(), false);
  536. aws.append(*wu.getClear());
  537. }
  538. }
  539. void CActivityInfo::addQueuedServerQueueJob(IEspContext& context, const char* serverName, const char* queueName, const char* instanceName, CIArrayOf<CWsSMCTargetCluster>& targetClusters)
  540. {
  541. ForEachItemIn(i, targetClusters)
  542. {
  543. CWsSMCTargetCluster& targetCluster = targetClusters.item(i);
  544. if (!targetCluster.wuidsOnServerQueue.length() || !strieq(queueName, targetCluster.serverQueue.queueName.str()))
  545. continue;
  546. ForEachItemIn(i1, targetCluster.wuidsOnServerQueue)
  547. {
  548. const char* wuid = targetCluster.wuidsOnServerQueue.item(i1);
  549. if (!wuid || !*wuid) //Multiple servers may monitor one queue. The WU may be shown under the multiple servers.
  550. continue;
  551. Owned<IEspActiveWorkunit> wu;
  552. createActiveWorkUnit(context, wu, wuid, NULL, 0, serverName, queueName, instanceName, NULL, false);
  553. aws.append(*wu.getClear());
  554. }
  555. }
  556. }
  557. void CActivityInfo::readRunningWUsAndJobQueueforOtherStatusServers(IEspContext& context, IPropertyTree* serverStatusRoot)
  558. {
  559. BoolHash uniqueServers;
  560. Owned<IPropertyTreeIterator> it(serverStatusRoot->getElements("Server"));
  561. ForEach(*it)
  562. {
  563. IPropertyTree& serverNode = it->query();
  564. const char* serverName = serverNode.queryProp("@name");
  565. const char* node = serverNode.queryProp("@node");
  566. const char* queueName = serverNode.queryProp("@queue");
  567. unsigned port = serverNode.getPropInt("@mpport", 0);
  568. if (!serverName || !*serverName || !node || !*node || strieq(serverName, STATUS_SERVER_DFUSERVER)
  569. || strieq(serverName, getStatusServerTypeName(WsSMCSSTThorLCRCluster)) || strieq(serverName, getStatusServerTypeName(WsSMCSSTRoxieCluster))
  570. || strieq(serverName, getStatusServerTypeName(WsSMCSSTHThorCluster)) || strieq(serverName, getStatusServerTypeName(WsSMCSSTECLagent)))
  571. continue; //target clusters, ECLAgent, DFUServer already handled separately
  572. VStringBuffer instanceName("%s_on_%s:%d", serverName, node, port); //where to get a better instance name?
  573. Owned<IPropertyTreeIterator> wuids(serverNode.getElements("WorkUnit"));
  574. ForEach(*wuids)
  575. {
  576. const char* wuid=wuids->query().queryProp(NULL);
  577. if (!wuid || !*wuid || checkSetUniqueECLWUID(wuid))
  578. continue;
  579. Owned<IEspActiveWorkunit> wu;
  580. createActiveWorkUnit(context, wu, wuid, NULL, 0, serverName, queueName, instanceName.str(), NULL, false);
  581. aws.append(*wu.getClear());
  582. }
  583. bool* found = uniqueServers.getValue(instanceName);
  584. if (!found || !*found)
  585. {
  586. uniqueServers.setValue(instanceName, true);
  587. getServerJobQueue(queueName, instanceName, serverName, node, port);
  588. //Now, we found a new server. we need to add queued jobs from the queues the server is monitoring.
  589. StringArray qList;
  590. qList.appendListUniq(queueName, ",");
  591. ForEachItemIn(q, qList)
  592. {
  593. addQueuedServerQueueJob(context, serverName, qList.item(q), instanceName.str(), thorTargetClusters);
  594. addQueuedServerQueueJob(context, serverName, qList.item(q), instanceName.str(), roxieTargetClusters);
  595. addQueuedServerQueueJob(context, serverName, qList.item(q), instanceName.str(), hthorTargetClusters);
  596. }
  597. }
  598. }
  599. return;
  600. }
  601. void CActivityInfo::getDFUServersAndWUs(IPropertyTree* envRoot, IPropertyTree* serverStatusRoot)
  602. {
  603. if (!envRoot)
  604. return;
  605. VStringBuffer path("Software/%s", eqDfu);
  606. Owned<IPropertyTreeIterator> services = envRoot->getElements(path);
  607. ForEach(*services)
  608. {
  609. IPropertyTree &serviceTree = services->query();
  610. const char *qname = serviceTree.queryProp("@queue");
  611. if (!qname || !*qname)
  612. continue;
  613. StringArray queues;
  614. queues.appendListUniq(qname, ",");
  615. const char *serverName = serviceTree.queryProp("@name");
  616. ForEachItemIn(q, queues)
  617. {
  618. StringArray wuidList;
  619. const char *queueName = queues.item(q);
  620. readDFUWUDetails(queueName, serverName, wuidList, readDFUWUIDs(serverStatusRoot, queueName, wuidList));
  621. getServerJobQueue(queueName, serverName, STATUS_SERVER_DFUSERVER, NULL, 0);
  622. }
  623. }
  624. }
  625. unsigned CActivityInfo::readDFUWUIDs(IPropertyTree* serverStatusRoot, const char* queueName, StringArray& wuidList)
  626. {
  627. if (!queueName || !*queueName)
  628. {
  629. WARNLOG("CActivityInfo::readDFUWUIDs: queue name not specified");
  630. return 0;
  631. }
  632. unsigned runningWUCount = 0;
  633. VStringBuffer path("Server[@name=\"DFUserver\"]/Queue[@name=\"%s\"]",queueName);
  634. Owned<IPropertyTreeIterator> iter = serverStatusRoot->getElements(path.str());
  635. ForEach(*iter)
  636. {
  637. Owned<IPropertyTreeIterator> iterj = iter->query().getElements("Job");
  638. ForEach(*iterj)
  639. {
  640. const char *wuid = iterj->query().queryProp("@wuid");
  641. if (wuid && *wuid && (*wuid!='!')) // filter escapes -- see queuedJobs() in dfuwu.cpp
  642. {
  643. wuidList.append(wuid);
  644. runningWUCount++;
  645. }
  646. }
  647. }
  648. if (!jobQueueSnapshot)
  649. return runningWUCount;
  650. //Read queued jobs
  651. Owned<IJobQueueConst> jobQueue = jobQueueSnapshot->getJobQueue(queueName);
  652. if (!jobQueue)
  653. {
  654. WARNLOG("CActivityInfo::readDFUWUIDs: failed to get info for job queue %s.", queueName);
  655. return runningWUCount;
  656. }
  657. CJobQueueContents jobList;
  658. jobQueue->copyItems(jobList);
  659. Owned<IJobQueueIterator> iterq = jobList.getIterator();
  660. ForEach(*iterq)
  661. {
  662. const char* wuid = iterq->query().queryWUID();
  663. if (wuid && *wuid)
  664. wuidList.append(wuid);
  665. }
  666. return runningWUCount;
  667. }
  668. void CActivityInfo::readDFUWUDetails(const char* queueName, const char* serverName, StringArray& wuidList, unsigned runningWUCount)
  669. {
  670. Owned<IDFUWorkUnitFactory> factory = getDFUWorkUnitFactory();
  671. ForEachItemIn(i, wuidList)
  672. {
  673. StringBuffer jname, uname, state, error;
  674. const char *wuid = wuidList.item(i);
  675. if (i<runningWUCount)
  676. state.set("running");
  677. else
  678. state.set("queued");
  679. try
  680. {
  681. Owned<IConstDFUWorkUnit> dfuwu = factory->openWorkUnit(wuid, false);
  682. dfuwu->getUser(uname);
  683. dfuwu->getJobName(jname);
  684. }
  685. catch (IException *e)
  686. {
  687. e->errorMessage(error);
  688. state.appendf(" (%s)", error.str());
  689. e->Release();
  690. }
  691. Owned<IEspActiveWorkunit> wu(new CActiveWorkunitWrapper(wuid, uname.str(), jname.str(), state.str(), "normal"));
  692. wu->setServer(STATUS_SERVER_DFUSERVER);
  693. wu->setInstance(serverName);
  694. wu->setQueueName(queueName);
  695. aws.append(*wu.getClear());
  696. }
  697. }
  698. void CActivityInfo::getDFURecoveryJobs()
  699. {
  700. Owned<IRemoteConnection> connDFURecovery = querySDS().connect("DFU/RECOVERY",myProcessSession(), RTM_LOCK_READ, 30000);
  701. if (!connDFURecovery)
  702. return;
  703. Owned<IPropertyTreeIterator> it(connDFURecovery->queryRoot()->getElements("job"));
  704. ForEach(*it)
  705. {
  706. IPropertyTree &jb=it->query();
  707. if (!jb.getPropBool("Running",false))
  708. continue;
  709. unsigned done = 0, total = 0;
  710. Owned<IPropertyTreeIterator> it = jb.getElements("DFT/progress");
  711. ForEach(*it)
  712. {
  713. IPropertyTree &p=it->query();
  714. if (p.getPropInt("@done",0))
  715. done++;
  716. total++;
  717. }
  718. StringBuffer cmd;
  719. cmd.append(jb.queryProp("@command")).append(" ").append(jb.queryProp("@command_parameters"));
  720. Owned<IEspDFUJob> job = new CDFUJob("","");
  721. job->setTimeStarted(jb.queryProp("@time_started"));
  722. job->setDone(done);
  723. job->setTotal(total);
  724. job->setCommand(cmd.str());
  725. DFURecoveryJobs.append(*job.getClear());
  726. }
  727. }
  728. void CActivityInfo::getServerJobQueue(const char* queueName, const char* serverName,
  729. const char* serverType, const char* networkAddress, unsigned port)
  730. {
  731. if (!queueName || !*queueName || !serverName || !*serverName || !serverType || !*serverType)
  732. return;
  733. Owned<IEspServerJobQueue> jobQueue = createServerJobQueue("", "");
  734. jobQueue->setQueueName(queueName);
  735. jobQueue->setServerName(serverName);
  736. jobQueue->setServerType(serverType);
  737. if (networkAddress && *networkAddress)
  738. {
  739. jobQueue->setNetworkAddress(networkAddress);
  740. jobQueue->setPort(port);
  741. }
  742. readServerJobQueueStatus(jobQueue);
  743. serverJobQueues.append(*jobQueue.getClear());
  744. }
  745. void CActivityInfo::readServerJobQueueStatus(IEspServerJobQueue* jobQueue)
  746. {
  747. if (!jobQueueSnapshot)
  748. {
  749. WARNLOG("CActivityInfo::readServerJobQueueStatus: jobQueueSnapshot not found.");
  750. return;
  751. }
  752. StringBuffer queueStateDetails;
  753. bool jobQueueFound = false, hasRunning = false, hasPaused = false;
  754. StringArray qlist;
  755. qlist.appendListUniq(jobQueue->getQueueName(), ",");
  756. ForEachItemIn(i, qlist)
  757. {
  758. const char* qname = qlist.item(i);
  759. Owned<IJobQueueConst> queue = jobQueueSnapshot->getJobQueue(qname);
  760. if (!queue)
  761. continue;
  762. jobQueueFound = true;
  763. StringBuffer status, details;
  764. queue->getState(status, details);
  765. if (!status || !*status)
  766. continue;
  767. if (strieq(status.str(), "paused"))
  768. hasPaused = true;
  769. else if (!strieq(status.str(), "stopped"))
  770. hasRunning = true;
  771. if (details && *details)
  772. queueStateDetails.appendf("%s: queue %s; %s;", qname, status.str(), details.str());
  773. else
  774. queueStateDetails.appendf("%s: queue %s;", qname, status.str());
  775. }
  776. if (hasRunning)
  777. jobQueue->setQueueStatus("running");
  778. else if (hasPaused)
  779. jobQueue->setQueueStatus("paused");
  780. else
  781. {
  782. jobQueue->setQueueStatus("stopped");
  783. if (!jobQueueFound)
  784. queueStateDetails.setf("%s not found in Status Server list", jobQueue->getQueueName());
  785. else if (!queueStateDetails.length())
  786. queueStateDetails.setf("No status set in Status Server list for %s", jobQueue->getQueueName());
  787. }
  788. jobQueue->setStatusDetails(queueStateDetails.str());
  789. }
  790. bool CWsSMCEx::onIndex(IEspContext &context, IEspSMCIndexRequest &req, IEspSMCIndexResponse &resp)
  791. {
  792. resp.setRedirectUrl("/");
  793. return true;
  794. }
  795. void CWsSMCEx::readBannerAndChatRequest(IEspContext& context, IEspActivityRequest &req, IEspActivityResponse& resp)
  796. {
  797. StringBuffer chatURLStr, bannerStr;
  798. const char* chatURL = req.getChatURL();
  799. const char* banner = req.getBannerContent();
  800. //Filter out invalid chars
  801. if (chatURL && *chatURL)
  802. {
  803. const char* pStr = chatURL;
  804. unsigned len = strlen(chatURL);
  805. for (unsigned i = 0; i < len; i++)
  806. {
  807. if (isprint(*pStr))
  808. chatURLStr.append(*pStr);
  809. pStr++;
  810. }
  811. }
  812. if (banner && *banner)
  813. {
  814. const char* pStr = banner;
  815. unsigned len = strlen(banner);
  816. for (unsigned i = 0; i < len; i++)
  817. {
  818. bannerStr.append(isprint(*pStr) ? *pStr : '.');
  819. pStr++;
  820. }
  821. }
  822. chatURLStr.trim();
  823. bannerStr.trim();
  824. if (!req.getBannerAction_isNull() && req.getBannerAction() && (bannerStr.length() < 1))
  825. throw MakeStringException(ECLWATCH_MISSING_BANNER_CONTENT, "If a Banner is enabled, the Banner content must be specified.");
  826. if (!req.getEnableChatURL_isNull() && req.getEnableChatURL() && (chatURLStr.length() < 1))
  827. throw MakeStringException(ECLWATCH_MISSING_CHAT_URL, "If a Chat is enabled, the Chat URL must be specified.");
  828. //Now, store the strings since they are valid.
  829. m_ChatURL = chatURLStr;
  830. m_Banner = bannerStr;
  831. const char* bannerSize = req.getBannerSize();
  832. if (bannerSize && *bannerSize)
  833. m_BannerSize.set(bannerSize);
  834. const char* bannerColor = req.getBannerColor();
  835. if (bannerColor && *bannerColor)
  836. m_BannerColor.set(bannerColor);
  837. const char* bannerScroll = req.getBannerScroll();
  838. if (bannerScroll && *bannerScroll)
  839. m_BannerScroll.set(bannerScroll);
  840. m_BannerAction = req.getBannerAction();
  841. if(!req.getEnableChatURL_isNull())
  842. m_EnableChatURL = req.getEnableChatURL();
  843. }
  844. void CWsSMCEx::setBannerAndChatData(double version, IEspActivityResponse& resp)
  845. {
  846. resp.setShowBanner(m_BannerAction);
  847. resp.setShowChatURL(m_EnableChatURL);
  848. resp.setBannerContent(m_Banner.str());
  849. resp.setBannerSize(m_BannerSize.str());
  850. resp.setBannerColor(m_BannerColor.str());
  851. resp.setChatURL(m_ChatURL.str());
  852. if (version >= 1.08)
  853. resp.setBannerScroll(m_BannerScroll.str());
  854. }
  855. void CWsSMCEx::sortTargetClusters(IArrayOf<IEspTargetCluster>& clusters, const char* sortBy, bool descending)
  856. {
  857. if (!sortBy || !*sortBy || strieq(sortBy, "name"))
  858. clusters.sort(descending ? sortTargetClustersByNameDescending : sortTargetClustersByNameAscending);
  859. else
  860. clusters.sort(descending ? sortTargetClustersBySizeDescending : sortTargetClustersBySizeAscending);
  861. }
  862. void CWsSMCEx::getClusterQueueStatus(const CWsSMCTargetCluster& targetCluster, ClusterStatusType& queueStatusType, StringBuffer& queueStatusDetails)
  863. {
  864. const CWsSMCQueue* jobQueue = &targetCluster.clusterQueue;
  865. if (targetCluster.clusterType != ThorLCRCluster)
  866. jobQueue = &targetCluster.agentQueue;
  867. if (!jobQueue->queueName.length())
  868. return;
  869. bool queuePausedOrStopped = false;
  870. //get queueStatusDetails
  871. if (targetCluster.clusterStatusDetails.length())
  872. queueStatusDetails.set(targetCluster.clusterStatusDetails.str());
  873. if (jobQueue->queueState.length())
  874. {
  875. const char* queueState = jobQueue->queueState.str();
  876. queueStatusDetails.appendf("%s: queue %s; ", jobQueue->queueName.str(), queueState);
  877. if (jobQueue->queueStateDetails.length())
  878. queueStatusDetails.appendf(" %s;", jobQueue->queueStateDetails.str());
  879. if (strieq(queueState,"stopped") || strieq(queueState,"paused"))
  880. queuePausedOrStopped = true;
  881. }
  882. //get queueStatusType
  883. if (!jobQueue->foundQueueInStatusServer)
  884. {
  885. if (queuePausedOrStopped)
  886. queueStatusType = QueuePausedOrStoppedNotFound;
  887. else
  888. queueStatusType = QueueRunningNotFound;
  889. }
  890. else if (!queuePausedOrStopped)
  891. queueStatusType = RunningNormal;
  892. else if (jobQueue->countRunningJobs > 0)
  893. queueStatusType = QueuePausedOrStoppedWithJobs;
  894. else
  895. queueStatusType = QueuePausedOrStoppedWithNoJob;
  896. return;
  897. }
  898. void CWsSMCEx::setClusterStatus(IEspContext& context, const CWsSMCTargetCluster& targetCluster, IEspTargetCluster* returnCluster)
  899. {
  900. ClusterStatusType queueStatusType = RunningNormal;
  901. StringBuffer queueStatusDetails;
  902. getClusterQueueStatus(targetCluster, queueStatusType, queueStatusDetails);
  903. returnCluster->setClusterStatus(queueStatusType);
  904. //Set 'Warning' which may be displayed beside cluster name
  905. if (queueStatusType == QueueRunningNotFound)
  906. returnCluster->setWarning("Cluster not attached");
  907. else if (queueStatusType == QueuePausedOrStoppedNotFound)
  908. returnCluster->setWarning("Queue paused or stopped - Cluster not attached");
  909. else if (queueStatusType != RunningNormal)
  910. returnCluster->setWarning("Queue paused or stopped");
  911. //Set 'StatusDetails' which may be displayed when a mouse is moved over cluster icon
  912. if (queueStatusDetails.length())
  913. returnCluster->setStatusDetails(queueStatusDetails.str());
  914. }
  915. // This method reads job information from both /Status/Servers and IJobQueue.
  916. //
  917. // Each server component (a thor cluster, a dfuserver, or an eclagent) is one 'Server' branch under
  918. // /Status/Servers. A 'Server' branch has a @queue which indicates the queue name of the server.
  919. // A 'Server' branch also contains the information about running WUs on that 'Server'. This
  920. // method reads the information. Those WUs are displays under that server (identified by its queue name)
  921. // on Activity page.
  922. //
  923. // For the WUs list inside /Status/Servers/Server[@name=ECLagent] but not list under other 'Server', the
  924. // existing code has to find out WUID and @clusterName of the WU. Then, uses @clusterName to find out the
  925. // queue name in IConstWUClusterInfo. Those WUs list under that server (identified by its queue name) with
  926. // a note 'on ECLagent'. TBD: the logic here will be simpler if the /Status/Servers/Server is named the
  927. // instance and/or cluster.
  928. //
  929. // In order to get information about queued WUs, this method gets queue names from both IConstWUClusterInfo
  930. // and other environment functions. Each of those queue names is linked to one IJobQueues. From the
  931. // IJobQueues, this method reads queued jobs for each server component and list them under the server
  932. // component (identified by its queue name).
  933. bool CWsSMCEx::onActivity(IEspContext &context, IEspActivityRequest &req, IEspActivityResponse& resp)
  934. {
  935. context.validateFeatureAccess(FEATURE_URL, SecAccess_Read, true);
  936. try
  937. {
  938. const char* build_ver = getBuildVersion();
  939. resp.setBuild(build_ver);
  940. double version = context.getClientVersion();
  941. bool isSuperUser = true;
  942. #ifdef _USE_OPENLDAP
  943. CLdapSecManager* secmgr = dynamic_cast<CLdapSecManager*>(context.querySecManager());
  944. if(secmgr && !secmgr->isSuperUser(context.queryUser()))
  945. isSuperUser = false;
  946. #endif
  947. if(isSuperUser && req.getFromSubmitBtn())
  948. readBannerAndChatRequest(context, req, resp);
  949. if (version >= 1.12)
  950. resp.setSuperUser(isSuperUser);
  951. if (version >= 1.06)
  952. setBannerAndChatData(version, resp);
  953. Owned<CActivityInfo> activityInfo = getActivityInfo(context);
  954. setActivityResponse(context, activityInfo, req, resp);
  955. }
  956. catch(IException* e)
  957. {
  958. FORWARDEXCEPTION(context, e, ECLWATCH_INTERNAL_ERROR);
  959. }
  960. return true;
  961. }
  962. void CWsSMCEx::clearActivityInfoCache()
  963. {
  964. CriticalBlock b(getActivityCrit);
  965. activityInfoCache.clear();
  966. }
  967. CActivityInfo* CWsSMCEx::getActivityInfo(IEspContext &context)
  968. {
  969. CriticalBlock b(getActivityCrit);
  970. if (activityInfoCache && activityInfoCache->isCachedActivityInfoValid(activityInfoCacheSeconds))
  971. return activityInfoCache.getLink();
  972. DBGLOG("CWsSMCEx::getActivityInfo - rebuild cached information");
  973. {
  974. EspTimeSection timer("createActivityInfo");
  975. activityInfoCache.setown(new CActivityInfo());
  976. activityInfoCache->createActivityInfo(context);
  977. }
  978. return activityInfoCache.getLink();
  979. }
  980. void CWsSMCEx::addWUsToResponse(IEspContext &context, const IArrayOf<IEspActiveWorkunit>& aws, IEspActivityResponse& resp)
  981. {
  982. const char* user = context.queryUserId();
  983. IArrayOf<IEspActiveWorkunit> awsReturned;
  984. ForEachItemIn(i, aws)
  985. {
  986. IEspActiveWorkunit& wu = aws.item(i);
  987. const char* wuid = wu.getWuid();
  988. if (wuid[0] == 'D')//DFU WU
  989. {
  990. awsReturned.append(*LINK(&wu));
  991. continue;
  992. }
  993. try
  994. {
  995. //if no access, throw an exception and go to the 'catch' section.
  996. const char* owner = wu.getOwner();
  997. context.validateFeatureAccess((!owner || !*owner || (user && streq(user, owner))) ? OWN_WU_ACCESS : OTHERS_WU_ACCESS, SecAccess_Read, true);
  998. awsReturned.append(*LINK(&wu));
  999. continue;
  1000. }
  1001. catch (IException *e)
  1002. { //if the wu cannot be opened for some reason, the openWorkUnit() inside the CActiveWorkunitWrapper() may throw an exception.
  1003. //We do not want the exception stops this process of retrieving/showing all active WUs. And that WU should still be displayed
  1004. //with the exception.
  1005. StringBuffer msg;
  1006. Owned<IEspActiveWorkunit> cw = new CActiveWorkunitWrapper(wuid, "", "", e->errorMessage(msg).str(), "normal");
  1007. cw->setStateID(WUStateUnknown);
  1008. cw->setServer(wu.getServer());
  1009. cw->setQueueName(wu.getQueueName());
  1010. const char* instanceName = wu.getInstance();
  1011. const char* targetClusterName = wu.getTargetClusterName();
  1012. if (instanceName && *instanceName)
  1013. cw->setInstance(instanceName); // JCSMORE In thor case at least, if queued it is unknown which instance it will run on..
  1014. if (targetClusterName && *targetClusterName)
  1015. cw->setTargetClusterName(targetClusterName);
  1016. awsReturned.append(*cw.getClear());
  1017. e->Release();
  1018. }
  1019. }
  1020. resp.setRunning(awsReturned);
  1021. return;
  1022. }
  1023. void CWsSMCEx::setActivityResponse(IEspContext &context, CActivityInfo* activityInfo, IEspActivityRequest &req, IEspActivityResponse& resp)
  1024. {
  1025. double version = context.getClientVersion();
  1026. const char* sortBy = req.getSortBy();
  1027. bool descending = req.getDescending();
  1028. if (version >= 1.16)
  1029. {
  1030. IArrayOf<IEspTargetCluster> thorClusters;
  1031. IArrayOf<IEspTargetCluster> hthorClusters;
  1032. IArrayOf<IEspTargetCluster> roxieClusters;
  1033. setESPTargetClusters(context, activityInfo->queryThorTargetClusters(), thorClusters);
  1034. setESPTargetClusters(context, activityInfo->queryRoxieTargetClusters(), roxieClusters);
  1035. setESPTargetClusters(context, activityInfo->queryHThorTargetClusters(), hthorClusters);
  1036. sortTargetClusters(thorClusters, sortBy, descending);
  1037. sortTargetClusters(roxieClusters, sortBy, descending);
  1038. SecAccessFlags access;
  1039. if (context.authorizeFeature(THORQUEUE_FEATURE, access) && access>=SecAccess_Full)
  1040. resp.setAccessRight("Access_Full");
  1041. resp.setSortBy(sortBy);
  1042. resp.setDescending(descending);
  1043. resp.setThorClusterList(thorClusters);
  1044. resp.setRoxieClusterList(roxieClusters);
  1045. resp.setHThorClusterList(hthorClusters);
  1046. resp.setServerJobQueues(activityInfo->queryServerJobQueues());
  1047. }
  1048. else
  1049. {//for backward compatible
  1050. IArrayOf<IEspThorCluster> thorClusters;
  1051. CIArrayOf<CWsSMCTargetCluster>& thorTargetClusters = activityInfo->queryThorTargetClusters();
  1052. ForEachItemIn(i, thorTargetClusters)
  1053. {
  1054. CWsSMCTargetCluster& targetCluster = thorTargetClusters.item(i);
  1055. Owned<IEspThorCluster> respThorCluster = new CThorCluster("", "");
  1056. respThorCluster->setClusterName(targetCluster.clusterName.get());
  1057. respThorCluster->setQueueStatus(targetCluster.queueStatus.get());
  1058. if (version >= 1.03)
  1059. respThorCluster->setQueueName(targetCluster.queueName.get());
  1060. if (version >= 1.11)
  1061. respThorCluster->setClusterSize(targetCluster.clusterSize);
  1062. thorClusters.append(*respThorCluster.getClear());
  1063. }
  1064. resp.setThorClusters(thorClusters);
  1065. if (version > 1.06)
  1066. {
  1067. IArrayOf<IEspRoxieCluster> roxieClusters;
  1068. CIArrayOf<CWsSMCTargetCluster>& roxieTargetClusters = activityInfo->queryRoxieTargetClusters();
  1069. ForEachItemIn(i, roxieTargetClusters)
  1070. {
  1071. CWsSMCTargetCluster& targetCluster = roxieTargetClusters.item(i);
  1072. Owned<IEspRoxieCluster> respRoxieCluster = new CRoxieCluster("", "");
  1073. respRoxieCluster->setClusterName(targetCluster.clusterName.get());
  1074. respRoxieCluster->setQueueStatus(targetCluster.queueStatus.get());
  1075. respRoxieCluster->setQueueName(targetCluster.queueName.get());
  1076. if (version >= 1.11)
  1077. respRoxieCluster->setClusterSize(targetCluster.clusterSize);
  1078. roxieClusters.append(*respRoxieCluster.getClear());
  1079. }
  1080. resp.setRoxieClusters(roxieClusters);
  1081. }
  1082. if (version > 1.10)
  1083. {
  1084. resp.setSortBy(sortBy);
  1085. resp.setDescending(req.getDescending());
  1086. }
  1087. if (version > 1.11)
  1088. {
  1089. IArrayOf<IEspHThorCluster> hThorClusters;
  1090. CIArrayOf<CWsSMCTargetCluster>& hthorTargetClusters = activityInfo->queryHThorTargetClusters();
  1091. ForEachItemIn(i, hthorTargetClusters)
  1092. {
  1093. CWsSMCTargetCluster& targetCluster = hthorTargetClusters.item(i);
  1094. Owned<IEspHThorCluster> respHThorCluster = new CHThorCluster("", "");
  1095. respHThorCluster->setClusterName(targetCluster.clusterName.get());
  1096. respHThorCluster->setQueueStatus(targetCluster.queueStatus.get());
  1097. respHThorCluster->setQueueName(targetCluster.queueName.get());
  1098. respHThorCluster->setClusterSize(targetCluster.clusterSize);
  1099. hThorClusters.append(*respHThorCluster.getClear());
  1100. }
  1101. resp.setHThorClusters(hThorClusters);
  1102. SecAccessFlags access;
  1103. if (context.authorizeFeature(THORQUEUE_FEATURE, access) && access>=SecAccess_Full)
  1104. resp.setAccessRight("Access_Full");
  1105. }
  1106. if (version > 1.03)
  1107. resp.setServerJobQueues(activityInfo->queryServerJobQueues());
  1108. }
  1109. resp.setDFUJobs(activityInfo->queryDFURecoveryJobs());
  1110. addWUsToResponse(context, activityInfo->queryActiveWUs(), resp);
  1111. return;
  1112. }
  1113. void CWsSMCEx::setESPTargetClusters(IEspContext& context, const CIArrayOf<CWsSMCTargetCluster>& targetClusters, IArrayOf<IEspTargetCluster>& respTargetClusters)
  1114. {
  1115. ForEachItemIn(i, targetClusters)
  1116. {
  1117. Owned<IEspTargetCluster> respTargetCluster = new CTargetCluster("", "");
  1118. setESPTargetCluster(context, targetClusters.item(i), respTargetCluster);
  1119. respTargetClusters.append(*respTargetCluster.getClear());
  1120. }
  1121. }
  1122. void CWsSMCEx::addCapabilities(IPropertyTree* pFeatureNode, const char* access,
  1123. IArrayOf<IEspCapability>& capabilities)
  1124. {
  1125. StringBuffer xpath(access);
  1126. xpath.append("/Capability");
  1127. Owned<IPropertyTreeIterator> it = pFeatureNode->getElements(xpath.str());
  1128. ForEach(*it)
  1129. {
  1130. IPropertyTree* pCapabilityNode = &it->query();
  1131. IEspCapability* pCapability = new CCapability("ws_smc");
  1132. pCapability->setName( pCapabilityNode->queryProp("@name") );
  1133. pCapability->setDescription( pCapabilityNode->queryProp("@description") );
  1134. capabilities.append(*pCapability);
  1135. }
  1136. }
  1137. static void checkAccess(IEspContext &context, const char* feature,int level)
  1138. {
  1139. if (!context.validateFeatureAccess(feature, level, false))
  1140. throw MakeStringException(ECLWATCH_THOR_QUEUE_ACCESS_DENIED, "Failed to access the queue functions. Permission denied.");
  1141. }
  1142. bool CWsSMCEx::onMoveJobDown(IEspContext &context, IEspSMCJobRequest &req, IEspSMCJobResponse &resp)
  1143. {
  1144. try
  1145. {
  1146. checkAccess(context,THORQUEUE_FEATURE,SecAccess_Full);
  1147. {
  1148. Owned<IJobQueue> queue = createJobQueue(req.getQueueName());
  1149. QueueLock lock(queue);
  1150. unsigned index=queue->findRank(req.getWuid());
  1151. if(index<queue->ordinality())
  1152. {
  1153. Owned<IJobQueueItem> item0 = queue->getItem(index);
  1154. Owned<IJobQueueItem> item = queue->getItem(index+1);
  1155. if(item && item0 && (item0->getPriority() == item->getPriority()))
  1156. queue->moveAfter(req.getWuid(),item->queryWUID());
  1157. }
  1158. }
  1159. AccessSuccess(context, "Changed job priority %s",req.getWuid());
  1160. clearActivityInfoCache();
  1161. resp.setRedirectUrl("/WsSMC/");
  1162. }
  1163. catch(IException* e)
  1164. {
  1165. FORWARDEXCEPTION(context, e, ECLWATCH_INTERNAL_ERROR);
  1166. }
  1167. return true;
  1168. }
  1169. bool CWsSMCEx::onMoveJobUp(IEspContext &context, IEspSMCJobRequest &req, IEspSMCJobResponse &resp)
  1170. {
  1171. try
  1172. {
  1173. checkAccess(context,THORQUEUE_FEATURE,SecAccess_Full);
  1174. {
  1175. Owned<IJobQueue> queue = createJobQueue(req.getQueueName());
  1176. QueueLock lock(queue);
  1177. unsigned index=queue->findRank(req.getWuid());
  1178. if(index>0 && index<queue->ordinality())
  1179. {
  1180. Owned<IJobQueueItem> item0 = queue->getItem(index);
  1181. Owned<IJobQueueItem> item = queue->getItem(index-1);
  1182. if(item && item0 && (item0->getPriority() == item->getPriority()))
  1183. queue->moveBefore(req.getWuid(),item->queryWUID());
  1184. }
  1185. }
  1186. AccessSuccess(context, "Changed job priority %s",req.getWuid());
  1187. clearActivityInfoCache();
  1188. resp.setRedirectUrl("/WsSMC/");
  1189. }
  1190. catch(IException* e)
  1191. {
  1192. FORWARDEXCEPTION(context, e, ECLWATCH_INTERNAL_ERROR);
  1193. }
  1194. return true;
  1195. }
  1196. bool CWsSMCEx::onMoveJobBack(IEspContext &context, IEspSMCJobRequest &req, IEspSMCJobResponse &resp)
  1197. {
  1198. try
  1199. {
  1200. checkAccess(context,THORQUEUE_FEATURE,SecAccess_Full);
  1201. {
  1202. Owned<IJobQueue> queue = createJobQueue(req.getQueueName());
  1203. QueueLock lock(queue);
  1204. unsigned index=queue->findRank(req.getWuid());
  1205. if(index<queue->ordinality())
  1206. {
  1207. Owned<IJobQueueItem> item = queue->getItem(index);
  1208. int priority0 = item->getPriority();
  1209. unsigned biggestIndoxInSamePriority = index;
  1210. unsigned nextIndex = biggestIndoxInSamePriority + 1;
  1211. while (nextIndex<queue->ordinality())
  1212. {
  1213. item.setown(queue->getItem(nextIndex));
  1214. if (priority0 != item->getPriority())
  1215. {
  1216. break;
  1217. }
  1218. biggestIndoxInSamePriority = nextIndex;
  1219. nextIndex++;
  1220. }
  1221. if (biggestIndoxInSamePriority != index)
  1222. {
  1223. item.setown(queue->getItem(biggestIndoxInSamePriority));
  1224. queue->moveAfter(req.getWuid(), item->queryWUID());
  1225. }
  1226. }
  1227. }
  1228. AccessSuccess(context, "Changed job priority %s",req.getWuid());
  1229. clearActivityInfoCache();
  1230. resp.setRedirectUrl("/WsSMC/");
  1231. }
  1232. catch(IException* e)
  1233. {
  1234. FORWARDEXCEPTION(context, e, ECLWATCH_INTERNAL_ERROR);
  1235. }
  1236. return true;
  1237. }
  1238. bool CWsSMCEx::onMoveJobFront(IEspContext &context, IEspSMCJobRequest &req, IEspSMCJobResponse &resp)
  1239. {
  1240. try
  1241. {
  1242. checkAccess(context,THORQUEUE_FEATURE,SecAccess_Full);
  1243. {
  1244. Owned<IJobQueue> queue=createJobQueue(req.getQueueName());
  1245. QueueLock lock(queue);
  1246. unsigned index=queue->findRank(req.getWuid());
  1247. if (index>0 && index<queue->ordinality())
  1248. {
  1249. Owned<IJobQueueItem> item = queue->getItem(index);
  1250. int priority0 = item->getPriority();
  1251. unsigned smallestIndoxInSamePriority = index;
  1252. int nextIndex = smallestIndoxInSamePriority - 1;
  1253. while (nextIndex >= 0)
  1254. {
  1255. item.setown(queue->getItem(nextIndex));
  1256. if (priority0 != item->getPriority())
  1257. {
  1258. break;
  1259. }
  1260. smallestIndoxInSamePriority = nextIndex;
  1261. nextIndex--;
  1262. }
  1263. if (smallestIndoxInSamePriority != index)
  1264. {
  1265. item.setown(queue->getItem(smallestIndoxInSamePriority));
  1266. queue->moveBefore(req.getWuid(), item->queryWUID());
  1267. }
  1268. }
  1269. }
  1270. AccessSuccess(context, "Changed job priority %s",req.getWuid());
  1271. clearActivityInfoCache();
  1272. resp.setRedirectUrl("/WsSMC/");
  1273. }
  1274. catch(IException* e)
  1275. {
  1276. FORWARDEXCEPTION(context, e, ECLWATCH_INTERNAL_ERROR);
  1277. }
  1278. return true;
  1279. }
  1280. bool CWsSMCEx::onRemoveJob(IEspContext &context, IEspSMCJobRequest &req, IEspSMCJobResponse &resp)
  1281. {
  1282. try
  1283. {
  1284. checkAccess(context,THORQUEUE_FEATURE,SecAccess_Full);
  1285. secAbortWorkUnit(req.getWuid(), *context.querySecManager(), *context.queryUser());
  1286. {
  1287. Owned<IJobQueue> queue = createJobQueue(req.getQueueName());
  1288. QueueLock lock(queue);
  1289. unsigned index=queue->findRank(req.getWuid());
  1290. if(index<queue->ordinality())
  1291. {
  1292. if(!queue->cancelInitiateConversation(req.getWuid()))
  1293. throw MakeStringException(ECLWATCH_CANNOT_DELETE_WORKUNIT,"Failed to remove the workunit %s",req.getWuid());
  1294. }
  1295. }
  1296. AccessSuccess(context, "Removed job %s",req.getWuid());
  1297. clearActivityInfoCache();
  1298. resp.setRedirectUrl("/WsSMC/");
  1299. }
  1300. catch(IException* e)
  1301. {
  1302. FORWARDEXCEPTION(context, e, ECLWATCH_INTERNAL_ERROR);
  1303. }
  1304. return true;
  1305. }
  1306. bool CWsSMCEx::onStopQueue(IEspContext &context, IEspSMCQueueRequest &req, IEspSMCQueueResponse &resp)
  1307. {
  1308. try
  1309. {
  1310. checkAccess(context,THORQUEUE_FEATURE,SecAccess_Full);
  1311. {
  1312. Owned<IJobQueue> queue = createJobQueue(req.getQueueName());
  1313. StringBuffer info;
  1314. queue->stop(createQueueActionInfo(context, "stopped", req, info));
  1315. }
  1316. AccessSuccess(context, "Stopped queue %s", req.getCluster());
  1317. clearActivityInfoCache();
  1318. double version = context.getClientVersion();
  1319. if (version >= 1.19)
  1320. getStatusServerInfo(context, req.getServerType(), req.getCluster(), req.getNetworkAddress(), req.getPort(), resp.updateStatusServerInfo());
  1321. resp.setRedirectUrl("/WsSMC/");
  1322. }
  1323. catch(IException* e)
  1324. {
  1325. FORWARDEXCEPTION(context, e, ECLWATCH_INTERNAL_ERROR);
  1326. }
  1327. return true;
  1328. }
  1329. bool CWsSMCEx::onResumeQueue(IEspContext &context, IEspSMCQueueRequest &req, IEspSMCQueueResponse &resp)
  1330. {
  1331. try
  1332. {
  1333. checkAccess(context,THORQUEUE_FEATURE,SecAccess_Full);
  1334. {
  1335. Owned<IJobQueue> queue = createJobQueue(req.getQueueName());
  1336. StringBuffer info;
  1337. queue->resume(createQueueActionInfo(context, "resumed", req, info));
  1338. }
  1339. AccessSuccess(context, "Resumed queue %s", req.getCluster());
  1340. clearActivityInfoCache();
  1341. double version = context.getClientVersion();
  1342. if (version >= 1.19)
  1343. getStatusServerInfo(context, req.getServerType(), req.getCluster(), req.getNetworkAddress(), req.getPort(), resp.updateStatusServerInfo());
  1344. resp.setRedirectUrl("/WsSMC/");
  1345. }
  1346. catch(IException* e)
  1347. {
  1348. FORWARDEXCEPTION(context, e, ECLWATCH_INTERNAL_ERROR);
  1349. }
  1350. return true;
  1351. }
  1352. const char* CWsSMCEx::createQueueActionInfo(IEspContext &context, const char* state, IEspSMCQueueRequest &req, StringBuffer& info)
  1353. {
  1354. StringBuffer peer, currentTime;
  1355. context.getPeer(peer);
  1356. const char* userId = context.queryUserId();
  1357. if (!userId || !*userId)
  1358. userId = "Unknown user";
  1359. CDateTime now;
  1360. now.setNow();
  1361. now.getString(currentTime);
  1362. info.appendf("%s by <%s> at <%s> from <%s>", state, userId, currentTime.str(), peer.str());
  1363. const char* comment = req.getComment();
  1364. if (comment && *comment)
  1365. info.append(": ' ").append(comment).append("'");
  1366. return info.str();
  1367. }
  1368. bool CWsSMCEx::onPauseQueue(IEspContext &context, IEspSMCQueueRequest &req, IEspSMCQueueResponse &resp)
  1369. {
  1370. try
  1371. {
  1372. checkAccess(context,THORQUEUE_FEATURE,SecAccess_Full);
  1373. {
  1374. Owned<IJobQueue> queue = createJobQueue(req.getQueueName());
  1375. StringBuffer info;
  1376. queue->pause(createQueueActionInfo(context, "paused", req, info));
  1377. }
  1378. AccessSuccess(context, "Paused queue %s", req.getCluster());
  1379. clearActivityInfoCache();
  1380. double version = context.getClientVersion();
  1381. if (version >= 1.19)
  1382. getStatusServerInfo(context, req.getServerType(), req.getCluster(), req.getNetworkAddress(), req.getPort(), resp.updateStatusServerInfo());
  1383. resp.setRedirectUrl("/WsSMC/");
  1384. }
  1385. catch(IException* e)
  1386. {
  1387. FORWARDEXCEPTION(context, e, ECLWATCH_INTERNAL_ERROR);
  1388. }
  1389. return true;
  1390. }
  1391. bool CWsSMCEx::onClearQueue(IEspContext &context, IEspSMCQueueRequest &req, IEspSMCQueueResponse &resp)
  1392. {
  1393. try
  1394. {
  1395. checkAccess(context,THORQUEUE_FEATURE,SecAccess_Full);
  1396. {
  1397. Owned<IJobQueue> queue = createJobQueue(req.getQueueName());
  1398. QueueLock lock(queue);
  1399. for(unsigned i=0;i<queue->ordinality();i++)
  1400. {
  1401. Owned<IJobQueueItem> item = queue->getItem(i);
  1402. secAbortWorkUnit(item->queryWUID(), *context.querySecManager(), *context.queryUser());
  1403. }
  1404. queue->clear();
  1405. }
  1406. AccessSuccess(context, "Cleared queue %s",req.getCluster());
  1407. clearActivityInfoCache();
  1408. double version = context.getClientVersion();
  1409. if (version >= 1.19)
  1410. getStatusServerInfo(context, req.getServerType(), req.getCluster(), req.getNetworkAddress(), req.getPort(), resp.updateStatusServerInfo());
  1411. resp.setRedirectUrl("/WsSMC/");
  1412. }
  1413. catch(IException* e)
  1414. {
  1415. FORWARDEXCEPTION(context, e, ECLWATCH_INTERNAL_ERROR);
  1416. }
  1417. return true;
  1418. }
  1419. void CWsSMCEx::setJobPriority(IEspContext &context, IWorkUnitFactory* factory, const char* wuid, const char* queueName, WUPriorityClass& priority)
  1420. {
  1421. if (!wuid || !*wuid)
  1422. throw MakeStringException(ECLWATCH_INVALID_INPUT, "Workunit ID not specified.");
  1423. if (!queueName || !*queueName)
  1424. throw MakeStringException(ECLWATCH_INVALID_INPUT, "queue not specified.");
  1425. Owned<IWorkUnit> lw = factory->updateWorkUnit(wuid, context.querySecManager(), context.queryUser());
  1426. if (!lw)
  1427. throw MakeStringException(ECLWATCH_CANNOT_UPDATE_WORKUNIT, "Cannot update Workunit %s", wuid);
  1428. lw->setPriority(priority);
  1429. // set job priority to queue
  1430. int priorityValue = lw->getPriorityValue();
  1431. {
  1432. CriticalBlock b(crit);
  1433. Owned<IJobQueue> queue = createJobQueue(queueName);
  1434. QueueLock lock(queue);
  1435. queue->changePriority(wuid,priorityValue);
  1436. }
  1437. return;
  1438. }
  1439. bool CWsSMCEx::onSetJobPriority(IEspContext &context, IEspSMCPriorityRequest &req, IEspSMCPriorityResponse &resp)
  1440. {
  1441. try
  1442. {
  1443. WUPriorityClass priority = PriorityClassNormal;
  1444. if(strieq(req.getPriority(),"high"))
  1445. priority = PriorityClassHigh;
  1446. else if(strieq(req.getPriority(),"low"))
  1447. priority = PriorityClassLow;
  1448. {
  1449. Owned<IWorkUnitFactory> factory = getWorkUnitFactory();
  1450. IArrayOf<IConstSMCJob>& jobs = req.getSMCJobs();
  1451. if (!jobs.length())
  1452. setJobPriority(context, factory, req.getWuid(), req.getQueueName(), priority);
  1453. else
  1454. {
  1455. ForEachItemIn(i, jobs)
  1456. {
  1457. IConstSMCJob &item = jobs.item(i);
  1458. const char *wuid = item.getWuid();
  1459. const char *queueName = item.getQueueName();
  1460. if (wuid && *wuid && queueName && *queueName)
  1461. setJobPriority(context, factory, wuid, queueName, priority);
  1462. }
  1463. }
  1464. }
  1465. clearActivityInfoCache();
  1466. resp.setRedirectUrl("/WsSMC/");
  1467. }
  1468. catch(IException* e)
  1469. {
  1470. FORWARDEXCEPTION(context, e, ECLWATCH_INTERNAL_ERROR);
  1471. }
  1472. return true;
  1473. }
  1474. bool CWsSMCEx::onGetThorQueueAvailability(IEspContext &context, IEspGetThorQueueAvailabilityRequest &req, IEspGetThorQueueAvailabilityResponse& resp)
  1475. {
  1476. try
  1477. {
  1478. if (!context.validateFeatureAccess(FEATURE_URL, SecAccess_Read, false))
  1479. throw MakeStringException(ECLWATCH_SMC_ACCESS_DENIED, "Failed to get Thor Queue availability. Permission denied.");
  1480. StringArray thorNames, groupNames, targetNames, queueNames;
  1481. getEnvironmentThorClusterNames(thorNames, groupNames, targetNames, queueNames);
  1482. IArrayOf<IEspThorCluster> ThorClusters;
  1483. ForEachItemIn(x, thorNames)
  1484. {
  1485. const char* targetName = targetNames.item(x);
  1486. const char* queueName = queueNames.item(x);
  1487. IEspThorCluster* returnCluster = new CThorCluster("","");
  1488. returnCluster->setClusterName(targetName);
  1489. returnCluster->setQueueName(queueName);
  1490. StringBuffer info;
  1491. Owned<IJobQueue> queue = createJobQueue(queueName);
  1492. if(queue->stopped(info))
  1493. returnCluster->setQueueStatus("stopped");
  1494. else if (queue->paused(info))
  1495. returnCluster->setQueueStatus("paused");
  1496. else
  1497. returnCluster->setQueueStatus("running");
  1498. unsigned enqueued=0;
  1499. unsigned connected=0;
  1500. unsigned waiting=0;
  1501. queue->getStats(connected,waiting,enqueued);
  1502. returnCluster->setQueueAvailable(waiting);
  1503. returnCluster->setJobsRunning(connected - waiting);
  1504. returnCluster->setJobsInQueue(enqueued);
  1505. ThorClusters.append(*returnCluster);
  1506. }
  1507. resp.setThorClusters(ThorClusters);
  1508. }
  1509. catch(IException* e)
  1510. {
  1511. FORWARDEXCEPTION(context, e, ECLWATCH_INTERNAL_ERROR);
  1512. }
  1513. return true;
  1514. }
  1515. bool CWsSMCEx::onSetBanner(IEspContext &context, IEspSetBannerRequest &req, IEspSetBannerResponse& resp)
  1516. {
  1517. try
  1518. {
  1519. #ifdef _USE_OPENLDAP
  1520. CLdapSecManager* secmgr = dynamic_cast<CLdapSecManager*>(context.querySecManager());
  1521. if(!secmgr || !secmgr->isSuperUser(context.queryUser()))
  1522. throw MakeStringException(ECLWATCH_SUPER_USER_ACCESS_DENIED, "access denied, administrators only.");
  1523. #endif
  1524. StringBuffer chatURLStr, bannerStr;
  1525. const char* chatURL = req.getChatURL();
  1526. const char* banner = req.getBannerContent();
  1527. //Only display valid strings
  1528. if (chatURL)
  1529. {
  1530. const char* pStr = chatURL;
  1531. for (unsigned i = 0; i < strlen(chatURL); i++)
  1532. {
  1533. if ((pStr[0] > 31) && (pStr[0] < 127))
  1534. chatURLStr.append(pStr[0]);
  1535. pStr++;
  1536. }
  1537. }
  1538. if (banner)
  1539. {
  1540. const char* pStr = banner;
  1541. for (unsigned i = 0; i < strlen(banner); i++)
  1542. {
  1543. if ((pStr[0] > 31) && (pStr[0] < 127))
  1544. bannerStr.append(pStr[0]);
  1545. pStr++;
  1546. }
  1547. }
  1548. chatURLStr.trim();
  1549. bannerStr.trim();
  1550. if (!req.getBannerAction_isNull() && req.getBannerAction() && (bannerStr.length() < 1))
  1551. {
  1552. throw MakeStringException(ECLWATCH_MISSING_BANNER_CONTENT, "If a Banner is enabled, the Banner content must be specified.");
  1553. }
  1554. if (!req.getEnableChatURL_isNull() && req.getEnableChatURL() && (!req.getChatURL() || !*req.getChatURL()))
  1555. {
  1556. throw MakeStringException(ECLWATCH_MISSING_CHAT_URL, "If a Chat is enabled, the Chat URL must be specified.");
  1557. }
  1558. m_ChatURL = chatURLStr;
  1559. m_Banner = bannerStr;
  1560. const char* bannerSize = req.getBannerSize();
  1561. if (bannerSize && *bannerSize)
  1562. m_BannerSize.clear().append(bannerSize);
  1563. const char* bannerColor = req.getBannerColor();
  1564. if (bannerColor && *bannerColor)
  1565. m_BannerColor.clear().append(bannerColor);
  1566. const char* bannerScroll = req.getBannerScroll();
  1567. if (bannerScroll && *bannerScroll)
  1568. m_BannerScroll.clear().append(bannerScroll);
  1569. m_BannerAction = 0;
  1570. if(!req.getBannerAction_isNull())
  1571. m_BannerAction = req.getBannerAction();
  1572. m_EnableChatURL = 0;
  1573. if(!req.getEnableChatURL_isNull())
  1574. m_EnableChatURL = req.getEnableChatURL();
  1575. resp.setRedirectUrl("/WsSMC/Activity");
  1576. }
  1577. catch(IException* e)
  1578. {
  1579. FORWARDEXCEPTION(context, e, ECLWATCH_INTERNAL_ERROR);
  1580. }
  1581. return true;
  1582. }
  1583. bool CWsSMCEx::onNotInCommunityEdition(IEspContext &context, IEspNotInCommunityEditionRequest &req, IEspNotInCommunityEditionResponse &resp)
  1584. {
  1585. return true;
  1586. }
  1587. bool CWsSMCEx::onBrowseResources(IEspContext &context, IEspBrowseResourcesRequest & req, IEspBrowseResourcesResponse & resp)
  1588. {
  1589. try
  1590. {
  1591. if (!context.validateFeatureAccess(FEATURE_URL, SecAccess_Read, false))
  1592. throw MakeStringException(ECLWATCH_SMC_ACCESS_DENIED, "Failed to Browse Resources. Permission denied.");
  1593. double version = context.getClientVersion();
  1594. Owned<IEnvironmentFactory> factory = getEnvironmentFactory();
  1595. Owned<IConstEnvironment> constEnv = factory->openEnvironment();
  1596. //The resource files will be downloaded from the same box of ESP (not dali)
  1597. StringBuffer ipStr;
  1598. IpAddress ipaddr = queryHostIP();
  1599. ipaddr.getIpText(ipStr);
  1600. if (ipStr.length() > 0)
  1601. {
  1602. resp.setNetAddress(ipStr.str());
  1603. Owned<IConstMachineInfo> machine = constEnv->getMachineByAddress(ipStr.str());
  1604. if (machine)
  1605. {
  1606. int os = machine->getOS();
  1607. resp.setOS(os);
  1608. }
  1609. }
  1610. if (m_PortalURL.length() > 0)
  1611. resp.setPortalURL(m_PortalURL.str());
  1612. #ifndef USE_RESOURCE
  1613. if (version > 1.12)
  1614. resp.setUseResource(false);
  1615. #else
  1616. if (version > 1.12)
  1617. resp.setUseResource(true);
  1618. //Now, get a list of resources stored inside the ESP box
  1619. IArrayOf<IEspHPCCResourceRepository> resourceRepositories;
  1620. Owned<IPropertyTree> pEnvRoot = &constEnv->getPTree();
  1621. const char* ossInstall = pEnvRoot->queryProp("EnvSettings/path");
  1622. if (!ossInstall || !*ossInstall)
  1623. {
  1624. WARNLOG("Failed to get EnvSettings/Path in environment settings.");
  1625. return true;
  1626. }
  1627. StringBuffer path;
  1628. path.appendf("%s/componentfiles/files/downloads", ossInstall);
  1629. Owned<IFile> f = createIFile(path.str());
  1630. if(!f->exists() || !f->isDirectory())
  1631. {
  1632. WARNLOG("Invalid resource folder");
  1633. return true;
  1634. }
  1635. Owned<IDirectoryIterator> di = f->directoryFiles(NULL, false, true);
  1636. if(di.get() == NULL)
  1637. {
  1638. WARNLOG("Resource folder is empty.");
  1639. return true;
  1640. }
  1641. ForEach(*di)
  1642. {
  1643. if (!di->isDir())
  1644. continue;
  1645. StringBuffer folder, path0, tmpBuf;
  1646. di->getName(folder);
  1647. if (folder.length() == 0)
  1648. continue;
  1649. path0.appendf("%s/%s/description.xml", path.str(), folder.str());
  1650. Owned<IFile> f0 = createIFile(path0.str());
  1651. if(!f0->exists())
  1652. {
  1653. WARNLOG("Description file not found for %s", folder.str());
  1654. continue;
  1655. }
  1656. OwnedIFileIO rIO = f0->openShared(IFOread,IFSHfull);
  1657. if(!rIO)
  1658. {
  1659. WARNLOG("Failed to open the description file for %s", folder.str());
  1660. continue;
  1661. }
  1662. offset_t fileSize = f0->size();
  1663. tmpBuf.ensureCapacity((unsigned)fileSize);
  1664. tmpBuf.setLength((unsigned)fileSize);
  1665. size32_t nRead = rIO->read(0, (size32_t) fileSize, (char*)tmpBuf.str());
  1666. if (nRead != fileSize)
  1667. {
  1668. WARNLOG("Failed to read the description file for %s", folder.str());
  1669. continue;
  1670. }
  1671. Owned<IPropertyTree> desc = createPTreeFromXMLString(tmpBuf.str());
  1672. if (!desc)
  1673. {
  1674. WARNLOG("Invalid description file for %s", folder.str());
  1675. continue;
  1676. }
  1677. Owned<IPropertyTreeIterator> fileIterator = desc->getElements("file");
  1678. if (!fileIterator->first())
  1679. {
  1680. WARNLOG("Invalid description file for %s", folder.str());
  1681. continue;
  1682. }
  1683. IArrayOf<IEspHPCCResource> resourcs;
  1684. do {
  1685. IPropertyTree &fileItem = fileIterator->query();
  1686. const char* filename = fileItem.queryProp("filename");
  1687. if (!filename || !*filename)
  1688. continue;
  1689. const char* name0 = fileItem.queryProp("name");
  1690. const char* description0 = fileItem.queryProp("description");
  1691. const char* version0 = fileItem.queryProp("version");
  1692. Owned<IEspHPCCResource> onefile = createHPCCResource();
  1693. onefile->setFileName(filename);
  1694. if (name0 && *name0)
  1695. onefile->setName(name0);
  1696. if (description0 && *description0)
  1697. onefile->setDescription(description0);
  1698. if (version0 && *version0)
  1699. onefile->setVersion(version0);
  1700. resourcs.append(*onefile.getLink());
  1701. } while (fileIterator->next());
  1702. if (resourcs.ordinality())
  1703. {
  1704. StringBuffer path1;
  1705. path1.appendf("%s/%s", path.str(), folder.str());
  1706. Owned<IEspHPCCResourceRepository> oneRepository = createHPCCResourceRepository();
  1707. oneRepository->setName(folder.str());
  1708. oneRepository->setPath(path1.str());
  1709. oneRepository->setHPCCResources(resourcs);
  1710. resourceRepositories.append(*oneRepository.getLink());
  1711. }
  1712. }
  1713. if (resourceRepositories.ordinality())
  1714. resp.setHPCCResourceRepositories(resourceRepositories);
  1715. #endif
  1716. }
  1717. catch(IException* e)
  1718. {
  1719. FORWARDEXCEPTION(context, e, ECLWATCH_INTERNAL_ERROR);
  1720. }
  1721. return true;
  1722. }
  1723. int CWsSMCSoapBindingEx::onHttpEcho(CHttpRequest* request, CHttpResponse* response)
  1724. {
  1725. StringBuffer xml;
  1726. xml.append(
  1727. "<?xml version=\"1.0\" encoding=\"UTF-8\"?>"
  1728. "<soap:Envelope xmlns:soap=\"http://schemas.xmlsoap.org/soap/envelope/\">"
  1729. "<soap:Body>"
  1730. "<HttpEchoResponse xmlns='urn:hpccsystems:ws:httpecho'>");
  1731. appendXMLTag(xml, "Method", request->queryMethod());
  1732. appendXMLTag(xml, "UrlPath", request->queryPath());
  1733. appendXMLTag(xml, "UrlParameters", request->queryParamStr());
  1734. appendXMLOpenTag(xml, "Headers");
  1735. StringArray &headers = request->queryHeaders();
  1736. headers.sortAscii(false);
  1737. ForEachItemIn(i, headers)
  1738. {
  1739. const char *h = headers.item(i);
  1740. if (strnicmp(h, "Authorization", 13))
  1741. appendXMLTag(xml, "Header", h);
  1742. }
  1743. appendXMLCloseTag(xml, "Headers");
  1744. const char *content = request->queryContent();
  1745. if (content && *content)
  1746. appendXMLTag(xml, "Content", content);
  1747. xml.append("</HttpEchoResponse></soap:Body></soap:Envelope>");
  1748. response->setContent(xml);
  1749. response->setContentType("text/xml");
  1750. response->send();
  1751. return 0;
  1752. }
  1753. int CWsSMCSoapBindingEx::onGet(CHttpRequest* request, CHttpResponse* response)
  1754. {
  1755. const char *operation = request->queryServiceMethod();
  1756. if (!operation || !strieq(operation, "HttpEcho"))
  1757. return CWsSMCSoapBinding::onGet(request, response);
  1758. return onHttpEcho(request, response);
  1759. }
  1760. void CWsSMCSoapBindingEx::handleHttpPost(CHttpRequest *request, CHttpResponse *response)
  1761. {
  1762. sub_service sstype;
  1763. StringBuffer operation;
  1764. request->getEspPathInfo(sstype, NULL, NULL, &operation, false);
  1765. if (!operation || !strieq(operation, "HttpEcho"))
  1766. CWsSMCSoapBinding::handleHttpPost(request, response);
  1767. else
  1768. onHttpEcho(request, response);
  1769. }
  1770. int CWsSMCSoapBindingEx::onGetForm(IEspContext &context, CHttpRequest* request, CHttpResponse* response, const char *service, const char *method)
  1771. {
  1772. try
  1773. {
  1774. if(stricmp(method,"NotInCommunityEdition")==0)
  1775. {
  1776. StringBuffer page, url, link;
  1777. request->getParameter("EEPortal", url);
  1778. if (url.length() > 0)
  1779. link.appendf("Further information can be found at <a href=\"%s\" target=\"_blank\">%s</a>.", url.str(), url.str());
  1780. page.append(
  1781. "<html>"
  1782. "<head>"
  1783. "<meta http-equiv=\"Content-Type\" content=\"text/html; charset=utf-8\" />"
  1784. "<link rel=\"stylesheet\" type=\"text/css\" href=\"/esp/files/default.css\"/>"
  1785. "<link rel=\"stylesheet\" type=\"text/css\" href=\"/esp/files/yui/build/fonts/fonts-min.css\" />"
  1786. "<title>Advanced feature in Enterprise Edition</title>"
  1787. "</head>"
  1788. "<body>"
  1789. "<h3 style=\"text-align:centre;\">Advanced feature in the Enterprise Edition</h4>"
  1790. "<p style=\"text-align:centre;\">Support for this feature is coming soon. ");
  1791. if (link.length() > 0)
  1792. page.append(link.str());
  1793. page.append("</p></body>"
  1794. "</html>");
  1795. response->setContent(page.str());
  1796. response->setContentType("text/html");
  1797. response->send();
  1798. return 0;
  1799. }
  1800. else if(stricmp(method,"DisabledInThisVersion")==0)
  1801. {
  1802. StringBuffer page;
  1803. page.append(
  1804. "<html>"
  1805. "<head>"
  1806. "<meta http-equiv=\"Content-Type\" content=\"text/html; charset=utf-8\" />"
  1807. "<link rel=\"stylesheet\" type=\"text/css\" href=\"/esp/files/default.css\"/>"
  1808. "<link rel=\"stylesheet\" type=\"text/css\" href=\"/esp/files/yui/build/fonts/fonts-min.css\" />"
  1809. "<title>Disabled Feature in This Version</title>"
  1810. "</head>"
  1811. "<body>"
  1812. "<h3 style=\"text-align:centre;\">Disabled Feature in This Version</h4>"
  1813. "<p style=\"text-align:centre;\">This feature is disabled in this version. ");
  1814. page.append("</p></body>"
  1815. "</html>");
  1816. response->setContent(page.str());
  1817. response->setContentType("text/html");
  1818. response->send();
  1819. return 0;
  1820. }
  1821. }
  1822. catch(IException* e)
  1823. {
  1824. FORWARDEXCEPTION(context, e, ECLWATCH_INTERNAL_ERROR);
  1825. }
  1826. return onGetForm(context, request, response, service, method);
  1827. }
  1828. inline const char *controlCmdMessage(int cmd)
  1829. {
  1830. switch (cmd)
  1831. {
  1832. case CRoxieControlCmd_ATTACH:
  1833. return "<control:unlockDali/>";
  1834. case CRoxieControlCmd_DETACH:
  1835. return "<control:lockDali/>";
  1836. case CRoxieControlCmd_RELOAD:
  1837. return "<control:reload/>";
  1838. case CRoxieControlCmd_RELOAD_RETRY:
  1839. return "<control:reload forceRetry='1' />";
  1840. case CRoxieControlCmd_STATE:
  1841. return "<control:state/>";
  1842. default:
  1843. throw MakeStringException(ECLWATCH_MISSING_PARAMS, "Unknown Roxie Control Command.");
  1844. }
  1845. return NULL;
  1846. }
  1847. bool CWsSMCEx::onRoxieControlCmd(IEspContext &context, IEspRoxieControlCmdRequest &req, IEspRoxieControlCmdResponse &resp)
  1848. {
  1849. if (!context.validateFeatureAccess(ROXIE_CONTROL_URL, SecAccess_Full, false))
  1850. throw MakeStringException(ECLWATCH_SMC_ACCESS_DENIED, "Cannot Access Roxie Control. Permission denied.");
  1851. const char *process = req.getProcessCluster();
  1852. if (!process || !*process)
  1853. throw MakeStringException(ECLWATCH_MISSING_PARAMS, "Process cluster not specified.");
  1854. const char *controlReq = controlCmdMessage(req.getCommand());
  1855. SocketEndpointArray addrs;
  1856. getRoxieProcessServers(process, addrs);
  1857. if (!addrs.length())
  1858. throw MakeStringException(ECLWATCH_CANNOT_GET_ENV_INFO, "Process cluster not found.");
  1859. Owned<IPropertyTree> controlResp = sendRoxieControlAllNodes(addrs.item(0), controlReq, true, req.getWait());
  1860. if (!controlResp)
  1861. throw MakeStringException(ECLWATCH_INTERNAL_ERROR, "Failed to get control response from roxie.");
  1862. IArrayOf<IEspRoxieControlEndpointInfo> respEndpoints;
  1863. Owned<IPropertyTreeIterator> roxieEndpoints = controlResp->getElements("Endpoint");
  1864. ForEach(*roxieEndpoints)
  1865. {
  1866. IPropertyTree &roxieEndpoint = roxieEndpoints->query();
  1867. Owned<IEspRoxieControlEndpointInfo> respEndpoint = createRoxieControlEndpointInfo();
  1868. respEndpoint->setAddress(roxieEndpoint.queryProp("@ep"));
  1869. respEndpoint->setStatus(roxieEndpoint.queryProp("Status"));
  1870. if (roxieEndpoint.hasProp("Dali/@connected"))
  1871. respEndpoint->setAttached(roxieEndpoint.getPropBool("Dali/@connected"));
  1872. if (roxieEndpoint.hasProp("State/@hash"))
  1873. respEndpoint->setStateHash(roxieEndpoint.queryProp("State/@hash"));
  1874. respEndpoints.append(*respEndpoint.getClear());
  1875. }
  1876. resp.setEndpoints(respEndpoints);
  1877. return true;
  1878. }
  1879. bool CWsSMCEx::onGetStatusServerInfo(IEspContext &context, IEspGetStatusServerInfoRequest &req, IEspGetStatusServerInfoResponse &resp)
  1880. {
  1881. getStatusServerInfo(context, req.getServerType(), req.getServerName(), req.getNetworkAddress(), req.getPort(), resp.updateStatusServerInfo());
  1882. return true;
  1883. }
  1884. void CWsSMCEx::getStatusServerInfo(IEspContext &context, const char *serverType, const char *server, const char *networkAddress, unsigned port,
  1885. IEspStatusServerInfo& statusServerInfo)
  1886. {
  1887. if (!serverType || !*serverType)
  1888. throw MakeStringException(ECLWATCH_MISSING_PARAMS, "Server type not specified.");
  1889. Owned<CActivityInfo> activityInfo = getActivityInfo(context);
  1890. if (!activityInfo)
  1891. throw MakeStringException(ECLWATCH_INTERNAL_ERROR, "Failed to get Activity Info cache.");
  1892. if (strieq(serverType,STATUS_SERVER_THOR))
  1893. {
  1894. setTargetClusterInfo(context, serverType, server, activityInfo->queryThorTargetClusters(), activityInfo->queryActiveWUs(), statusServerInfo);
  1895. }
  1896. else if (strieq(serverType,STATUS_SERVER_ROXIE))
  1897. {
  1898. setTargetClusterInfo(context, serverType, server, activityInfo->queryRoxieTargetClusters(), activityInfo->queryActiveWUs(), statusServerInfo);
  1899. }
  1900. else if (strieq(serverType,STATUS_SERVER_HTHOR))
  1901. {
  1902. setTargetClusterInfo(context, serverType, server, activityInfo->queryHThorTargetClusters(), activityInfo->queryActiveWUs(), statusServerInfo);
  1903. }
  1904. else if (strieq(serverType,STATUS_SERVER_DFUSERVER))
  1905. {
  1906. setServerQueueInfo(context, serverType, server, activityInfo->queryServerJobQueues(), activityInfo->queryActiveWUs(), statusServerInfo);
  1907. }
  1908. else
  1909. {
  1910. setServerQueueInfo(context, serverType, networkAddress, port, activityInfo->queryServerJobQueues(), activityInfo->queryActiveWUs(), statusServerInfo);
  1911. }
  1912. }
  1913. void CWsSMCEx::setTargetClusterInfo(IEspContext &context, const char *serverType, const char *clusterName, const CIArrayOf<CWsSMCTargetCluster>& targetClusters,
  1914. const IArrayOf<IEspActiveWorkunit>& aws, IEspStatusServerInfo& statusServerInfo)
  1915. {
  1916. if (!clusterName || !*clusterName)
  1917. throw MakeStringException(ECLWATCH_MISSING_PARAMS, "Cluster name not specified.");
  1918. IEspTargetCluster& clusterInfo = statusServerInfo.updateTargetClusterInfo();
  1919. ForEachItemIn(i, targetClusters)
  1920. {
  1921. CWsSMCTargetCluster& targetCluster = targetClusters.item(i);
  1922. const char* name = targetCluster.clusterName.get();
  1923. if (name && strieq(name, clusterName))
  1924. {
  1925. setESPTargetCluster(context, targetCluster, &clusterInfo);
  1926. break;
  1927. }
  1928. }
  1929. setActiveWUs(context, serverType, clusterName, clusterInfo.getQueueName(), aws, statusServerInfo);
  1930. }
  1931. void CWsSMCEx::setServerQueueInfo(IEspContext &context, const char *serverType, const char *serverName, const IArrayOf<IEspServerJobQueue>& serverJobQueues,
  1932. const IArrayOf<IEspActiveWorkunit>& aws, IEspStatusServerInfo& statusServerInfo)
  1933. {
  1934. if (!serverName || !*serverName)
  1935. throw MakeStringException(ECLWATCH_MISSING_PARAMS, "Server name not specified.");
  1936. ForEachItemIn(i, serverJobQueues)
  1937. {
  1938. IEspServerJobQueue& serverJobQueue = serverJobQueues.item(i);
  1939. const char* name = serverJobQueue.getServerName();
  1940. if (name && strieq(name, serverName))
  1941. {
  1942. IEspServerJobQueue& serverQueue = statusServerInfo.updateServerInfo();
  1943. serverQueue.copy(serverJobQueue);
  1944. break;
  1945. }
  1946. }
  1947. setActiveWUs(context, serverType, serverName, aws, statusServerInfo);
  1948. }
  1949. void CWsSMCEx::setServerQueueInfo(IEspContext &context, const char *serverType, const char *networkAddress, unsigned port, const IArrayOf<IEspServerJobQueue>& serverJobQueues,
  1950. const IArrayOf<IEspActiveWorkunit>& aws, IEspStatusServerInfo& statusServerInfo)
  1951. {
  1952. if (!networkAddress || !*networkAddress)
  1953. throw MakeStringException(ECLWATCH_MISSING_PARAMS, "Network address not specified.");
  1954. ForEachItemIn(i, serverJobQueues)
  1955. {
  1956. IEspServerJobQueue& serverJobQueue = serverJobQueues.item(i);
  1957. const char* ipAddress = serverJobQueue.getNetworkAddress();
  1958. unsigned thePort = serverJobQueue.getPort();
  1959. if (ipAddress && strieq(ipAddress, networkAddress) && (thePort == port))
  1960. {
  1961. IEspServerJobQueue& serverQueue = statusServerInfo.updateServerInfo();
  1962. serverQueue.copy(serverJobQueue);
  1963. break;
  1964. }
  1965. }
  1966. VStringBuffer instance("%s_on_%s:%d", serverType, networkAddress, port);
  1967. setActiveWUs(context, serverType, instance.str(), aws, statusServerInfo);
  1968. }
  1969. void CWsSMCEx::setESPTargetCluster(IEspContext &context, const CWsSMCTargetCluster& targetCluster, IEspTargetCluster* espTargetCluster)
  1970. {
  1971. espTargetCluster->setClusterName(targetCluster.clusterName.get());
  1972. espTargetCluster->setClusterSize(targetCluster.clusterSize);
  1973. espTargetCluster->setClusterType(targetCluster.clusterType);
  1974. espTargetCluster->setQueueName(targetCluster.queueName.get());
  1975. espTargetCluster->setQueueStatus(targetCluster.queueStatus.get());
  1976. setClusterStatus(context, targetCluster, espTargetCluster);
  1977. }
  1978. void CWsSMCEx::setActiveWUs(IEspContext &context, const char *serverType, const char *clusterName, const char *queueName, const IArrayOf<IEspActiveWorkunit>& aws, IEspStatusServerInfo& statusServerInfo)
  1979. {
  1980. const char* clusterType = CLUSTER_TYPE_THOR;
  1981. if (strieq(serverType,STATUS_SERVER_ROXIE))
  1982. clusterType = CLUSTER_TYPE_ROXIE;
  1983. else if (strieq(serverType,STATUS_SERVER_HTHOR))
  1984. clusterType = CLUSTER_TYPE_HTHOR;
  1985. IArrayOf<IEspActiveWorkunit> awsOnThisQueue;
  1986. ForEachItemIn(i, aws)
  1987. {
  1988. IEspActiveWorkunit& wu = aws.item(i);
  1989. const char* wuid = wu.getWuid();
  1990. if (!wuid || !*wuid)
  1991. continue;
  1992. const char* wuServerType = wu.getServer();
  1993. const char* wuClusterName = wu.getTargetClusterName();
  1994. if (!wuServerType || !wuClusterName || !strieq(serverType, wuServerType) || !strieq(clusterName, wuClusterName))
  1995. {
  1996. const char* wuClusterType = wu.getClusterType();
  1997. const char* wuClusterQueueName = wu.getClusterQueueName();
  1998. if (!wuClusterType || !wuClusterQueueName || !strieq(clusterType, wuClusterType) || !strieq(queueName, wuClusterQueueName))
  1999. continue;
  2000. }
  2001. Owned<IEspActiveWorkunit> wuOnThisQueue = new CActiveWorkunitWrapper(wuid, "", "", "", "");
  2002. setActiveWUs(context, wu, wuOnThisQueue);
  2003. awsOnThisQueue.append(*wuOnThisQueue.getClear());
  2004. }
  2005. statusServerInfo.setWorkunits(awsOnThisQueue);
  2006. }
  2007. void CWsSMCEx::setActiveWUs(IEspContext &context, const char *serverType, const char *instance, const IArrayOf<IEspActiveWorkunit>& aws, IEspStatusServerInfo& statusServerInfo)
  2008. {
  2009. IArrayOf<IEspActiveWorkunit> awsOnThisQueue;
  2010. ForEachItemIn(i, aws)
  2011. {
  2012. IEspActiveWorkunit& wu = aws.item(i);
  2013. const char* wuid = wu.getWuid();
  2014. if (!wuid || !*wuid)
  2015. continue;
  2016. const char* wuInstance = wu.getInstance();
  2017. if (!wuInstance || !strieq(wuInstance, instance))
  2018. continue;
  2019. Owned<IEspActiveWorkunit> wuOnThisQueue = new CActiveWorkunitWrapper(wuid, "", "", "", "");
  2020. setActiveWUs(context, wu, wuOnThisQueue);
  2021. awsOnThisQueue.append(*wuOnThisQueue.getClear());
  2022. }
  2023. statusServerInfo.setWorkunits(awsOnThisQueue);
  2024. }
  2025. void CWsSMCEx::setActiveWUs(IEspContext &context, IEspActiveWorkunit& wu, IEspActiveWorkunit* wuToSet)
  2026. {
  2027. try
  2028. {
  2029. const char* user = context.queryUserId();
  2030. const char* owner = wu.getOwner();
  2031. //if no access, throw an exception and go to the 'catch' section.
  2032. context.validateFeatureAccess((!owner || !*owner || (user && streq(user, owner))) ? OWN_WU_ACCESS : OTHERS_WU_ACCESS, SecAccess_Read, true);
  2033. wuToSet->copy(wu);
  2034. }
  2035. catch (IException *e)
  2036. { //if the wu cannot be opened for some reason, the openWorkUnit() inside the CActiveWorkunitWrapper() may throw an exception.
  2037. //We do not want the exception stops this process of retrieving/showing all active WUs. And that WU should still be displayed
  2038. //with the exception.
  2039. wuToSet->setStateID(WUStateUnknown);
  2040. wuToSet->setServer(wu.getServer());
  2041. wuToSet->setQueueName(wu.getQueueName());
  2042. const char* instanceName = wu.getInstance();
  2043. const char* targetClusterName = wu.getTargetClusterName();
  2044. if (instanceName && *instanceName)
  2045. wuToSet->setInstance(instanceName); // JCSMORE In thor case at least, if queued it is unknown which instance it will run on..
  2046. if (targetClusterName && *targetClusterName)
  2047. wuToSet->setTargetClusterName(targetClusterName);
  2048. e->Release();
  2049. }
  2050. }
  2051. static const char *LockModeNames[] = { "ALL", "READ", "WRITE", "HOLD", "SUB" };
  2052. void CWsSMCEx::addLockInfo(CLockMetaData& lD, const char* xPath, const char* lfn, unsigned msNow, time_t ttNow, IArrayOf<IEspLock>& locks)
  2053. {
  2054. Owned<IEspLock> lock = createLock();
  2055. if (xPath && *xPath)
  2056. lock->setXPath(xPath);
  2057. else if (lfn && *lfn)
  2058. lock->setLogicalFile(lfn);
  2059. else
  2060. return; //Should not happen
  2061. lock->setEPIP(lD.queryEp());
  2062. lock->setSessionID(lD.sessId);
  2063. unsigned duration = msNow-lD.timeLockObtained;
  2064. lock->setDurationMS(duration);
  2065. CDateTime timeLocked;
  2066. StringBuffer timeStr;
  2067. time_t ttLocked = ttNow - duration/1000;
  2068. timeLocked.set(ttLocked);
  2069. timeLocked.getString(timeStr);
  2070. lock->setTimeLocked(timeStr.str());
  2071. unsigned mode = lD.mode;
  2072. VStringBuffer modeStr("%x", mode);
  2073. lock->setModes(modeStr.str());
  2074. StringArray modes;
  2075. if (RTM_MODE(mode, RTM_LOCK_READ))
  2076. modes.append(LockModeNames[CLockModes_READ]);
  2077. if (RTM_MODE(mode, RTM_LOCK_WRITE))
  2078. modes.append(LockModeNames[CLockModes_WRITE]);
  2079. if (RTM_MODE(mode, RTM_LOCK_HOLD)) // long-term lock
  2080. modes.append(LockModeNames[CLockModes_HOLD]);
  2081. if (RTM_MODE(mode, RTM_LOCK_SUB)) // locks all descendants as well as self
  2082. modes.append(LockModeNames[CLockModes_SUB]);
  2083. lock->setModeNames(modes);
  2084. locks.append(*lock.getClear());
  2085. }
  2086. bool CWsSMCEx::onLockQuery(IEspContext &context, IEspLockQueryRequest &req, IEspLockQueryResponse &resp)
  2087. {
  2088. class CLockPostFilter
  2089. {
  2090. CLockModes mode;
  2091. time_t ttLTLow, ttLTHigh;
  2092. bool checkLTLow, checkLTHigh;
  2093. int durationLow, durationHigh;
  2094. bool checkMode(unsigned lockMode)
  2095. {
  2096. unsigned modeReq;
  2097. switch (mode)
  2098. {
  2099. case CLockModes_READ:
  2100. modeReq = RTM_LOCK_READ;
  2101. break;
  2102. case CLockModes_WRITE:
  2103. modeReq = RTM_LOCK_WRITE;
  2104. break;
  2105. case CLockModes_HOLD:
  2106. modeReq = RTM_LOCK_HOLD;
  2107. break;
  2108. case CLockModes_SUB:
  2109. modeReq = RTM_LOCK_SUB;
  2110. break;
  2111. default:
  2112. return true;
  2113. }
  2114. if (lockMode & modeReq)
  2115. return true;
  2116. return false;
  2117. }
  2118. public:
  2119. CLockPostFilter(IEspLockQueryRequest& req)
  2120. {
  2121. ttLTLow = 0;
  2122. ttLTHigh = 0;
  2123. mode = req.getMode();
  2124. if (mode == LockModes_Undefined)
  2125. throw MakeStringException(ECLWATCH_INVALID_INPUT, "Invalid Lock Mode");
  2126. if (req.getDurationMSLow_isNull())
  2127. durationLow = -1;
  2128. else
  2129. durationLow = req.getDurationMSLow();
  2130. if (req.getDurationMSHigh_isNull())
  2131. durationHigh = -1;
  2132. else
  2133. durationHigh = req.getDurationMSHigh();
  2134. const char* timeLow = req.getTimeLockedLow();
  2135. if (!timeLow || !*timeLow)
  2136. checkLTLow = false;
  2137. else
  2138. {
  2139. CDateTime dtLow;
  2140. dtLow.setString(timeLow, NULL, false);
  2141. ttLTLow = dtLow.getSimple();
  2142. checkLTLow = true;
  2143. }
  2144. const char* timeHigh = req.getTimeLockedHigh();
  2145. if (!timeHigh || !*timeHigh)
  2146. checkLTHigh = false;
  2147. else
  2148. {
  2149. CDateTime dtHigh;
  2150. dtHigh.setString(timeHigh, NULL, false);
  2151. ttLTHigh = dtHigh.getSimple();
  2152. checkLTHigh = true;
  2153. }
  2154. }
  2155. bool check(CLockMetaData& lD, unsigned msNow, time_t ttNow)
  2156. {
  2157. if (!checkMode(lD.mode))
  2158. return false;
  2159. int duration = msNow-lD.timeLockObtained;
  2160. if (durationLow > duration)
  2161. return false;
  2162. if ((durationHigh >= 0) && (durationHigh < duration))
  2163. return false;
  2164. if (checkLTLow && (ttNow - duration/1000 < ttLTLow))
  2165. return false;
  2166. if (checkLTHigh && (ttNow - duration/1000 > ttLTHigh))
  2167. return false;
  2168. return true;
  2169. }
  2170. };
  2171. try
  2172. {
  2173. CLockPostFilter postFilter(req);
  2174. StringBuffer xPath;
  2175. if (req.getAllFileLocks())
  2176. xPath.appendf("/%s/*", querySdsFilesRoot());
  2177. else
  2178. xPath = req.getXPath();
  2179. Owned<ILockInfoCollection> lockInfoCollection = querySDS().getLocks(req.getEPIP(), xPath.str());
  2180. IArrayOf<IEspLock> locks;
  2181. CDateTime time;
  2182. time.setNow();
  2183. time_t ttNow = time.getSimple();
  2184. unsigned msNow = msTick();
  2185. for (unsigned l=0; l<lockInfoCollection->queryLocks(); l++)
  2186. {
  2187. ILockInfo& lockInfo = lockInfoCollection->queryLock(l);
  2188. CDfsLogicalFileName dlfn;
  2189. const char* lfn = NULL;
  2190. const char* xPath = NULL;
  2191. if (dlfn.setFromXPath(lockInfo.queryXPath()))
  2192. lfn = dlfn.get();
  2193. else
  2194. xPath = lockInfo.queryXPath();
  2195. for (unsigned i=0; i<lockInfo.queryConnections(); i++)
  2196. {
  2197. CLockMetaData& lMD = lockInfo.queryLockData(i);
  2198. if (postFilter.check(lMD, msNow, ttNow))
  2199. addLockInfo(lMD, xPath, lfn, msNow, ttNow, locks);
  2200. }
  2201. }
  2202. unsigned numLocks = locks.length();
  2203. if (numLocks)
  2204. resp.setLocks(locks);
  2205. resp.setNumLocks(numLocks);
  2206. }
  2207. catch(IException* e)
  2208. {
  2209. FORWARDEXCEPTION(context, e, ECLWATCH_INTERNAL_ERROR);
  2210. }
  2211. return true;
  2212. }