ws_workunitsService.cpp 136 KB


  1. /*##############################################################################
  2. HPCC SYSTEMS software Copyright (C) 2012 HPCC Systems.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. ############################################################################## */
  13. #include "ws_workunitsService.hpp"
  14. #include "ws_fs.hpp"
  15. #include "jlib.hpp"
  16. #include "daclient.hpp"
  17. #include "dalienv.hpp"
  18. #include "dadfs.hpp"
  19. #include "daaudit.hpp"
  20. #include "exception_util.hpp"
  21. #include "wujobq.hpp"
  22. #include "eventqueue.hpp"
  23. #include "fileview.hpp"
  24. #include "hqlerror.hpp"
  25. #include "sacmd.hpp"
  26. #include "wuwebview.hpp"
  27. #include "portlist.h"
  28. #include "dllserver.hpp"
  29. #include "schedulectrl.hpp"
  30. #include "scheduleread.hpp"
  31. #include "dadfs.hpp"
  32. #include "dfuwu.hpp"
  33. #include "thorplugin.hpp"
  34. #include "roxiecontrol.hpp"
  35. #include "package.h"
  36. #ifdef _USE_ZLIB
  37. #include "zcrypt.hpp"
  38. #endif
  39. #define ESP_WORKUNIT_DIR "workunits/"
  40. class NewWsWorkunit : public Owned<IWorkUnit>
  41. {
  42. public:
  43. NewWsWorkunit(IWorkUnitFactory *factory, IEspContext &context)
  44. {
  45. create(factory, context);
  46. }
  47. NewWsWorkunit(IEspContext &context)
  48. {
  49. Owned<IWorkUnitFactory> factory = getWorkUnitFactory(context.querySecManager(), context.queryUser());
  50. create(factory, context);
  51. }
  52. ~NewWsWorkunit() { if (get()) get()->commit(); }
  53. void create(IWorkUnitFactory *factory, IEspContext &context)
  54. {
  55. setown(factory->createWorkUnit(NULL, "ws_workunits", context.queryUserId()));
  56. if(!get())
  57. throw MakeStringException(ECLWATCH_CANNOT_CREATE_WORKUNIT,"Could not create workunit.");
  58. get()->setUser(context.queryUserId());
  59. }
  60. void associateDll(const char *dllpath, const char *dllname)
  61. {
  62. Owned<IWUQuery> query = get()->updateQuery();
  63. StringBuffer dllurl;
  64. createUNCFilename(dllpath, dllurl);
  65. unsigned crc = crc_file(dllpath);
  66. associateLocalFile(query, FileTypeDll, dllpath, "Workunit DLL", crc);
  67. queryDllServer().registerDll(dllname, "Workunit DLL", dllurl.str());
  68. }
  69. void setQueryText(const char *text)
  70. {
  71. if (!text || !*text)
  72. return;
  73. Owned<IWUQuery> query=get()->updateQuery();
  74. query->setQueryText(text);
  75. }
  76. void setQueryMain(const char *s)
  77. {
  78. if (!s || !*s)
  79. return;
  80. Owned<IWUQuery> query=get()->updateQuery();
  81. query->setQueryMainDefinition(s);
  82. }
  83. };
  84. void setWsWuXmlParameters(IWorkUnit *wu, const char *xml, bool setJobname=false)
  85. {
  86. if (!xml || !*xml)
  87. return;
  88. Owned<IPropertyTree> tree = createPTreeFromXMLString(xml, ipt_none, (PTreeReaderOptions)(ptr_ignoreWhiteSpace | ptr_ignoreNameSpaces));
  89. IPropertyTree *root = tree.get();
  90. if (strieq(root->queryName(), "Envelope"))
  91. root = root->queryPropTree("Body/*[1]");
  92. if (!root)
  93. return;
  94. if (setJobname)
  95. {
  96. SCMStringBuffer name;
  97. wu->getJobName(name);
  98. if (!name.length())
  99. wu->setJobName(root->queryName());
  100. }
  101. wu->setXmlParams(LINK(root));
  102. }
  103. void setWsWuXmlParameters(IWorkUnit *wu, const char *xml, IArrayOf<IConstNamedValue> *variables, bool setJobname=false)
  104. {
  105. StringBuffer extParamXml;
  106. if (variables && variables->length())
  107. {
  108. Owned<IPropertyTree> paramTree = (xml && *xml) ? createPTreeFromXMLString(xml) : createPTree("input");
  109. ForEachItemIn(i, *variables)
  110. {
  111. IConstNamedValue &item = variables->item(i);
  112. const char *name = item.getName();
  113. const char *value = item.getValue();
  114. if (!name || !*name)
  115. continue;
  116. if (!value)
  117. {
  118. size_t len = strlen(name);
  119. char last = name[len-1];
  120. if (last == '-' || last == '+')
  121. {
  122. StringAttr s(name, len-1);
  123. paramTree->setPropInt(s.get(), last == '+' ? 1 : 0);
  124. }
  125. else
  126. paramTree->setPropInt(name, 1);
  127. continue;
  128. }
  129. paramTree->setProp(name, value);
  130. }
  131. toXML(paramTree, extParamXml);
  132. xml=extParamXml.str();
  133. }
  134. setWsWuXmlParameters(wu, xml, setJobname);
  135. }
  136. void submitWsWorkunit(IEspContext& context, IConstWorkUnit* cw, const char* cluster, const char* snapshot, int maxruntime, bool compile, bool resetWorkflow, bool resetVariables,
  137. const char *paramXml=NULL, IArrayOf<IConstNamedValue> *variables=NULL, IArrayOf<IConstNamedValue> *debugs=NULL)
  138. {
  139. ensureWsWorkunitAccess(context, *cw, SecAccess_Write);
  140. switch(cw->getState())
  141. {
  142. case WUStateRunning:
  143. case WUStateDebugPaused:
  144. case WUStateDebugRunning:
  145. case WUStateCompiling:
  146. case WUStateAborting:
  147. case WUStateBlocked:
  148. {
  149. SCMStringBuffer descr;
  150. throw MakeStringException(ECLWATCH_CANNOT_SUBMIT_WORKUNIT, "Cannot submit the workunit. Workunit state is '%s'.", cw->getStateDesc(descr).str());
  151. }
  152. }
  153. SCMStringBuffer wuid;
  154. cw->getWuid(wuid);
  155. WorkunitUpdate wu(&cw->lock());
  156. if(!wu.get())
  157. throw MakeStringException(ECLWATCH_CANNOT_UPDATE_WORKUNIT, "Cannot update workunit %s.", wuid.str());
  158. wu->clearExceptions();
  159. if(notEmpty(cluster))
  160. wu->setClusterName(cluster);
  161. if(notEmpty(snapshot))
  162. wu->setSnapshot(snapshot);
  163. wu->setState(WUStateSubmitted);
  164. if (maxruntime)
  165. wu->setDebugValueInt("maxRunTime",maxruntime,true);
  166. if (debugs && debugs->length())
  167. {
  168. ForEachItemIn(i, *debugs)
  169. {
  170. IConstNamedValue &item = debugs->item(i);
  171. const char *name = item.getName();
  172. const char *value = item.getValue();
  173. if (!name || !*name)
  174. continue;
  175. if (!value)
  176. {
  177. size_t len = strlen(name);
  178. char last = name[len-1];
  179. if (last == '-' || last == '+')
  180. {
  181. StringAttr s(name, len-1);
  182. wu->setDebugValueInt(s.get(), last == '+' ? 1 : 0, true);
  183. }
  184. else
  185. wu->setDebugValueInt(name, 1, true);
  186. continue;
  187. }
  188. wu->setDebugValue(name, value, true);
  189. }
  190. }
  191. if (resetWorkflow)
  192. wu->resetWorkflow();
  193. if (!compile)
  194. wu->schedule();
  195. if (resetVariables)
  196. {
  197. SCMStringBuffer varname;
  198. Owned<IConstWUResultIterator> vars = &wu->getVariables();
  199. ForEach (*vars)
  200. {
  201. vars->query().getResultName(varname);
  202. Owned<IWUResult> v = wu->updateVariableByName(varname.str());
  203. if (v)
  204. v->setResultStatus(ResultStatusUndefined);
  205. }
  206. }
  207. setWsWuXmlParameters(wu, paramXml, variables, (wu->getAction()==WUActionExecuteExisting));
  208. wu->commit();
  209. wu.clear();
  210. if (!compile)
  211. runWorkUnit(wuid.str());
  212. else if (context.querySecManager())
  213. secSubmitWorkUnit(wuid.str(), *context.querySecManager(), *context.queryUser());
  214. else
  215. submitWorkUnit(wuid.str(), context.queryUserId(), context.queryPassword());
  216. AuditSystemAccess(context.queryUserId(), true, "Submitted %s", wuid.str());
  217. }
  218. void submitWsWorkunit(IEspContext& context, const char *wuid, const char* cluster, const char* snapshot, int maxruntime, bool compile, bool resetWorkflow, bool resetVariables,
  219. const char *paramXml=NULL, IArrayOf<IConstNamedValue> *variables=NULL, IArrayOf<IConstNamedValue> *debugs=NULL)
  220. {
  221. Owned<IWorkUnitFactory> factory = getWorkUnitFactory(context.querySecManager(), context.queryUser());
  222. Owned<IConstWorkUnit> cw = factory->openWorkUnit(wuid, false);
  223. if(!cw)
  224. throw MakeStringException(ECLWATCH_CANNOT_OPEN_WORKUNIT,"Cannot open workunit %s.",wuid);
  225. return submitWsWorkunit(context, cw, cluster, snapshot, maxruntime, compile, resetWorkflow, resetVariables, paramXml, variables, debugs);
  226. }
  227. void copyWsWorkunit(IEspContext &context, IWorkUnit &wu, const char *srcWuid)
  228. {
  229. Owned<IWorkUnitFactory> factory = getWorkUnitFactory(context.querySecManager(), context.queryUser());
  230. Owned<IConstWorkUnit> src(factory->openWorkUnit(srcWuid, false));
  231. SCMStringBuffer wuid;
  232. wu.getWuid(wuid);
  233. queryExtendedWU(&wu)->copyWorkUnit(src, false);
  234. SCMStringBuffer token;
  235. wu.setSecurityToken(createToken(wuid.str(), context.queryUserId(), context.queryPassword(), token).str());
  236. wu.commit();
  237. }
  238. void runWsWorkunit(IEspContext &context, StringBuffer &wuid, const char *srcWuid, const char *cluster, const char *paramXml=NULL,
  239. IArrayOf<IConstNamedValue> *variables=NULL, IArrayOf<IConstNamedValue> *debugs=NULL)
  240. {
  241. StringBufferAdaptor isvWuid(wuid);
  242. NewWsWorkunit wu(context);
  243. wu->getWuid(isvWuid);
  244. copyWsWorkunit(context, *wu, srcWuid);
  245. wu.clear();
  246. submitWsWorkunit(context, wuid.str(), cluster, NULL, 0, false, true, true, paramXml, variables, debugs);
  247. }
  248. void runWsWorkunit(IEspContext &context, IConstWorkUnit *cw, const char *srcWuid, const char *cluster, const char *paramXml=NULL,
  249. IArrayOf<IConstNamedValue> *variables=NULL, IArrayOf<IConstNamedValue> *debugs=NULL)
  250. {
  251. WorkunitUpdate wu(&cw->lock());
  252. copyWsWorkunit(context, *wu, srcWuid);
  253. wu.clear();
  254. submitWsWorkunit(context, cw, cluster, NULL, 0, false, true, true, paramXml, variables, debugs);
  255. }
  256. IException *noteException(IWorkUnit *wu, IException *e, WUExceptionSeverity level=ExceptionSeverityError)
  257. {
  258. if (wu)
  259. {
  260. Owned<IWUException> we = wu->createException();
  261. StringBuffer s;
  262. we->setExceptionMessage(e->errorMessage(s).str());
  263. we->setExceptionSource("WsWorkunits");
  264. we->setSeverity(level);
  265. if (level==ExceptionSeverityError)
  266. wu->setState(WUStateFailed);
  267. }
  268. return e;
  269. }
  270. StringBuffer &resolveQueryWuid(StringBuffer &wuid, const char *queryset, const char *query, bool notSuspended=true, IWorkUnit *wu=NULL)
  271. {
  272. Owned<IPropertyTree> qs = getQueryRegistry(queryset, true);
  273. if (!qs)
  274. throw noteException(wu, MakeStringException(ECLWATCH_QUERYSET_NOT_FOUND, "QuerySet '%s' not found", queryset));
  275. Owned<IPropertyTree> q = resolveQueryAlias(qs, query);
  276. if (!q)
  277. throw noteException(wu, MakeStringException(ECLWATCH_QUERYID_NOT_FOUND, "Query '%s/%s' not found", queryset, query));
  278. if (notSuspended && q->getPropBool("@suspended"))
  279. throw noteException(wu, MakeStringException(ECLWATCH_QUERY_SUSPENDED, "Query '%s/%s' is suspended", queryset, query));
  280. return wuid.append(q->queryProp("@wuid"));
  281. }
  282. void runWsWuQuery(IEspContext &context, IConstWorkUnit *cw, const char *queryset, const char *query, const char *cluster, const char *paramXml=NULL)
  283. {
  284. StringBuffer srcWuid;
  285. WorkunitUpdate wu(&cw->lock());
  286. resolveQueryWuid(srcWuid, queryset, query, true, wu);
  287. copyWsWorkunit(context, *wu, srcWuid);
  288. wu.clear();
  289. submitWsWorkunit(context, cw, cluster, NULL, 0, false, true, true, paramXml);
  290. }
  291. void runWsWuQuery(IEspContext &context, StringBuffer &wuid, const char *queryset, const char *query, const char *cluster, const char *paramXml=NULL)
  292. {
  293. StringBuffer srcWuid;
  294. StringBufferAdaptor isvWuid(wuid);
  295. NewWsWorkunit wu(context);
  296. wu->getWuid(isvWuid);
  297. resolveQueryWuid(srcWuid, queryset, query, true, wu);
  298. copyWsWorkunit(context, *wu, srcWuid);
  299. wu.clear();
  300. submitWsWorkunit(context, wuid.str(), cluster, NULL, 0, false, true, true, paramXml);
  301. }
  302. class ExecuteExistingQueryInfo
  303. {
  304. public:
  305. ExecuteExistingQueryInfo(IConstWorkUnit *cw)
  306. {
  307. SCMStringBuffer isv;
  308. cw->getJobName(isv);
  309. const char *name = isv.str();
  310. const char *div = strchr(name, '.');
  311. if (div)
  312. {
  313. queryset.set(name, div-name);
  314. query.set(div+1);
  315. }
  316. }
  317. public:
  318. StringAttr queryset;
  319. StringAttr query;
  320. };
  321. typedef enum _WuActionType
  322. {
  323. ActionDelete=0,
  324. ActionProtect,
  325. ActionAbort,
  326. ActionRestore,
  327. ActionEventSchedule,
  328. ActionEventDeschedule,
  329. ActionChangeState,
  330. ActionPause,
  331. ActionPauseNow,
  332. ActionResume,
  333. ActionUnknown
  334. } WsWuActionType;
  335. void setActionResult(const char* wuid, int action, const char* result, StringBuffer& strAction, IArrayOf<IConstWUActionResult>* results)
  336. {
  337. if (!results || !wuid || !*wuid || !result || !*result)
  338. return;
  339. switch(action)
  340. {
  341. case ActionDelete:
  342. {
  343. strAction = "Delete";
  344. break;
  345. }
  346. case ActionProtect:
  347. {
  348. strAction = "Protect";
  349. break;
  350. }
  351. case ActionAbort:
  352. {
  353. strAction = "Abort";
  354. break;
  355. }
  356. case ActionRestore:
  357. {
  358. strAction = "Restore";
  359. break;
  360. }
  361. case ActionEventSchedule:
  362. {
  363. strAction = "EventSchedule";
  364. break;
  365. }
  366. case ActionEventDeschedule:
  367. {
  368. strAction = "EventDeschedule";
  369. break;
  370. }
  371. case ActionChangeState:
  372. {
  373. strAction = "ChangeState";
  374. break;
  375. }
  376. case ActionPause:
  377. {
  378. strAction = "Pause";
  379. break;
  380. }
  381. case ActionPauseNow:
  382. {
  383. strAction = "PauseNow";
  384. break;
  385. }
  386. case ActionResume:
  387. {
  388. strAction = "Resume";
  389. break;
  390. }
  391. default:
  392. {
  393. strAction = "Unknown";
  394. }
  395. }
  396. Owned<IEspWUActionResult> res = createWUActionResult("", "");
  397. res->setWuid(wuid);
  398. res->setAction(strAction.str());
  399. res->setResult(result);
  400. results->append(*res.getClear());
  401. }
  402. bool doAction(IEspContext& context, StringArray& wuids, int action, IProperties* params, IArrayOf<IConstWUActionResult>* results)
  403. {
  404. if (!wuids.length())
  405. return true;
  406. Owned<IWorkUnitFactory> factory = getWorkUnitFactory(context.querySecManager(), context.queryUser());
  407. bool bAllSuccess = true;
  408. for(aindex_t i=0; i<wuids.length();i++)
  409. {
  410. StringBuffer strAction;
  411. StringBuffer wuidStr = wuids.item(i);
  412. const char* wuid = wuidStr.trim().str();
  413. if (isEmpty(wuid))
  414. {
  415. WARNLOG("Empty Workunit ID");
  416. continue;
  417. }
  418. try
  419. {
  420. if (!looksLikeAWuid(wuid))
  421. throw MakeStringException(ECLWATCH_INVALID_INPUT, "Invalid Workunit ID: %s", wuid);
  422. if ((action == ActionRestore) || (action == ActionEventDeschedule))
  423. {
  424. switch(action)
  425. {
  426. case ActionRestore:
  427. {
  428. SocketEndpoint ep;
  429. getSashaNode(ep);
  430. Owned<ISashaCommand> cmd = createSashaCommand();
  431. cmd->setAction(SCA_RESTORE);
  432. cmd->addId(wuid);
  433. Owned<INode> node = createINode(ep);
  434. if (!node)
  435. throw MakeStringException(ECLWATCH_INODE_NOT_FOUND,"INode not found.");
  436. StringBuffer s;
  437. if (!cmd->send(node, 1*60*1000))
  438. throw MakeStringException(ECLWATCH_CANNOT_CONNECT_ARCHIVE_SERVER,"Cannot connect to Archive server at %s.", ep.getUrlStr(s).str());
  439. if (cmd->numIds()==0)
  440. throw MakeStringException(ECLWATCH_CANNOT_UPDATE_WORKUNIT,"Could not Archive/restore %s",wuid);
  441. StringBuffer reply;
  442. cmd->getId(0,reply);
  443. AuditSystemAccess(context.queryUserId(), true, "Updated %s", wuid);
  444. ensureWsWorkunitAccess(context, wuid, SecAccess_Write);
  445. break;
  446. }
  447. case ActionEventDeschedule:
  448. if (!context.validateFeatureAccess(OWN_WU_ACCESS, SecAccess_Full, false)
  449. || !context.validateFeatureAccess(OTHERS_WU_ACCESS, SecAccess_Full, false))
  450. ensureWsWorkunitAccess(context, wuid, SecAccess_Full);
  451. descheduleWorkunit(wuid);
  452. AuditSystemAccess(context.queryUserId(), true, "Updated %s", wuid);
  453. break;
  454. }
  455. }
  456. else
  457. {
  458. Owned<IWorkUnitFactory> factory = getWorkUnitFactory(context.querySecManager(), context.queryUser());
  459. Owned<IConstWorkUnit> cw = factory->openWorkUnit(wuid, false);
  460. if(!cw)
  461. throw MakeStringException(ECLWATCH_CANNOT_OPEN_WORKUNIT,"Cannot open workunit %s.",wuid);
  462. if ((action == ActionDelete) && (cw->getState() == WUStateWait))
  463. throw MakeStringException(ECLWATCH_CANNOT_DELETE_WORKUNIT,"Cannot delete a workunit which is in a 'Wait' status.");
  464. switch(action)
  465. {
  466. case ActionPause:
  467. {
  468. ensureWsWorkunitAccess(context, *cw, SecAccess_Full);
  469. WorkunitUpdate wu(&cw->lock());
  470. wu->setAction(WUActionPause);
  471. break;
  472. }
  473. case ActionPauseNow:
  474. {
  475. ensureWsWorkunitAccess(context, *cw, SecAccess_Full);
  476. WorkunitUpdate wu(&cw->lock());
  477. wu->setAction(WUActionPauseNow);
  478. break;
  479. }
  480. case ActionResume:
  481. {
  482. ensureWsWorkunitAccess(context, *cw, SecAccess_Full);
  483. WorkunitUpdate wu(&cw->lock());
  484. wu->setAction(WUActionResume);
  485. break;
  486. }
  487. case ActionDelete:
  488. ensureWsWorkunitAccess(context, *cw, SecAccess_Full);
  489. {
  490. int state = cw->getState();
  491. switch (state)
  492. {
  493. case WUStateWait:
  494. case WUStateAborted:
  495. case WUStateCompleted:
  496. case WUStateFailed:
  497. case WUStateArchived:
  498. case WUStateCompiled:
  499. case WUStateUploadingFiles:
  500. break;
  501. default:
  502. {
  503. WorkunitUpdate wu(&cw->lock());
  504. wu->setState(WUStateFailed);
  505. }
  506. }
  507. cw.clear();
  508. factory->deleteWorkUnit(wuid);
  509. AuditSystemAccess(context.queryUserId(), true, "Deleted %s", wuid);
  510. }
  511. break;
  512. case ActionAbort:
  513. ensureWsWorkunitAccess(context, *cw, SecAccess_Full);
  514. {
  515. if (cw->getState() == WUStateWait)
  516. {
  517. WorkunitUpdate wu(&cw->lock());
  518. wu->deschedule();
  519. wu->setState(WUStateAborted);
  520. }
  521. else
  522. secAbortWorkUnit(wuid, *context.querySecManager(), *context.queryUser());
  523. AuditSystemAccess(context.queryUserId(), true, "Aborted %s", wuid);
  524. }
  525. break;
  526. case ActionProtect:
  527. cw->protect(!params || params->getPropBool("Protect",true));
  528. AuditSystemAccess(context.queryUserId(), true, "Updated %s", wuid);
  529. break;
  530. case ActionChangeState:
  531. {
  532. if (params)
  533. {
  534. WUState state = (WUState) params->getPropInt("State");
  535. if (state > WUStateUnknown && state < WUStateSize)
  536. {
  537. WorkunitUpdate wu(&cw->lock());
  538. wu->setState(state);
  539. AuditSystemAccess(context.queryUserId(), true, "Updated %s", wuid);
  540. }
  541. }
  542. }
  543. break;
  544. case ActionEventSchedule:
  545. {
  546. WorkunitUpdate wu(&cw->lock());
  547. wu->schedule();
  548. AuditSystemAccess(context.queryUserId(), true, "Updated %s", wuid);
  549. }
  550. break;
  551. }
  552. }
  553. setActionResult(wuid, action, "Success", strAction, results);
  554. }
  555. catch (IException *e)
  556. {
  557. bAllSuccess = false;
  558. StringBuffer eMsg;
  559. StringBuffer failedMsg("Failed: ");
  560. setActionResult(wuid, action, failedMsg.append(e->errorMessage(eMsg)).str(), strAction, results);
  561. WARNLOG("Failed to %s for workunit: %s, %s", strAction.str(), wuid, eMsg.str());
  562. AuditSystemAccess(context.queryUserId(), false, "Failed to %s %s", strAction.str(), wuid);
  563. e->Release();
  564. continue;
  565. }
  566. catch (...)
  567. {
  568. bAllSuccess = false;
  569. StringBuffer failedMsg;
  570. failedMsg.appendf("Unknown exception");
  571. setActionResult(wuid, action, failedMsg.str(), strAction, results);
  572. WARNLOG("Failed to %s for workunit: %s, %s", strAction.str(), wuid, failedMsg.str());
  573. AuditSystemAccess(context.queryUserId(), false, "Failed to %s %s", strAction.str(), wuid);
  574. continue;
  575. }
  576. }
  577. int timeToWait = 0;
  578. if (params)
  579. timeToWait = params->getPropInt("BlockTillFinishTimer");
  580. if (timeToWait != 0)
  581. {
  582. for(aindex_t i=0; i<wuids.length();i++)
  583. {
  584. const char* wuid=wuids.item(i);
  585. if (isEmpty(wuid))
  586. continue;
  587. waitForWorkUnitToComplete(wuid, timeToWait);
  588. }
  589. }
  590. return bAllSuccess;
  591. }
  592. MapStringTo<int> wuActionTable;
  593. void CWsWorkunitsEx::init(IPropertyTree *cfg, const char *process, const char *service)
  594. {
  595. if (!daliClientActive())
  596. {
  597. ERRLOG("No Dali Connection Active.");
  598. throw MakeStringException(-1, "No Dali Connection Active. Please Specify a Dali to connect to in you configuration file");
  599. }
  600. setPasswordsFromSDS();
  601. DBGLOG("Initializing %s service [process = %s]", service, process);
  602. refreshValidClusters();
  603. daliServers.set(cfg->queryProp("Software/EspProcess/@daliServers"));
  604. wuActionTable.setValue("delete", ActionDelete);
  605. wuActionTable.setValue("abort", ActionAbort);
  606. wuActionTable.setValue("pausenow", ActionPauseNow);
  607. wuActionTable.setValue("pause", ActionPause);
  608. wuActionTable.setValue("resume", ActionResume);
  609. wuActionTable.setValue("protect", ActionProtect);
  610. wuActionTable.setValue("unprotect", ActionProtect);
  611. wuActionTable.setValue("restore", ActionRestore);
  612. wuActionTable.setValue("reschedule", ActionEventSchedule);
  613. wuActionTable.setValue("deschedule", ActionEventDeschedule);
  614. wuActionTable.setValue("settofailed", ActionChangeState);
  615. awusCacheMinutes = AWUS_CACHE_MIN_DEFAULT;
  616. VStringBuffer xpath("Software/EspProcess[@name=\"%s\"]/EspService[@name=\"%s\"]/AWUsCacheMinutes", process, service);
  617. cfg->getPropInt(xpath.str(), awusCacheMinutes);
  618. directories.set(cfg->queryPropTree("Software/Directories"));
  619. const char *name = cfg->queryProp("Software/EspProcess/@name");
  620. getConfigurationDirectory(directories, "query", "esp", name ? name : "esp", queryDirectory);
  621. recursiveCreateDirectory(queryDirectory.str());
  622. dataCache.setown(new DataCache(DATA_SIZE));
  623. archivedWuCache.setown(new ArchivedWuCache(AWUS_CACHE_SIZE));
  624. //Create a folder for temporarily holding gzip files by WUResultBin()
  625. Owned<IFile> tmpdir = createIFile(TEMPZIPDIR);
  626. if(!tmpdir->exists())
  627. tmpdir->createDirectory();
  628. recursiveCreateDirectory(ESP_WORKUNIT_DIR);
  629. m_sched.start();
  630. }
  631. void CWsWorkunitsEx::refreshValidClusters()
  632. {
  633. validClusters.kill();
  634. Owned<IStringIterator> it = getTargetClusters(NULL, NULL);
  635. ForEach(*it)
  636. {
  637. SCMStringBuffer s;
  638. IStringVal &val = it->str(s);
  639. if (!validClusters.getValue(val.str()))
  640. validClusters.setValue(val.str(), true);
  641. }
  642. }
  643. bool CWsWorkunitsEx::isValidCluster(const char *cluster)
  644. {
  645. if (!cluster || !*cluster)
  646. return false;
  647. CriticalBlock block(crit);
  648. if (validClusters.getValue(cluster))
  649. return true;
  650. if (validateTargetClusterName(cluster))
  651. {
  652. refreshValidClusters();
  653. return true;
  654. }
  655. return false;
  656. }
  657. void CWsWorkunitsEx::checkAndTrimWorkunit(const char* methodName, StringBuffer& input)
  658. {
  659. const char* trimmedInput = input.trim().str();
  660. if (isEmpty(trimmedInput))
  661. throw MakeStringException(ECLWATCH_INVALID_INPUT, "%s: Workunit ID not set", methodName);
  662. if (!looksLikeAWuid(trimmedInput))
  663. throw MakeStringException(ECLWATCH_INVALID_INPUT, "%s: Invalid Workunit ID: %s", methodName, trimmedInput);
  664. return;
  665. }
  666. bool CWsWorkunitsEx::onWUCreate(IEspContext &context, IEspWUCreateRequest &req, IEspWUCreateResponse &resp)
  667. {
  668. try
  669. {
  670. if (!context.validateFeatureAccess(OWN_WU_ACCESS, SecAccess_Write, false))
  671. throw MakeStringException(ECLWATCH_ECL_WU_ACCESS_DENIED, "Failed to create workunit. Permission denied.");
  672. NewWsWorkunit wu(context);
  673. SCMStringBuffer wuid;
  674. resp.updateWorkunit().setWuid(wu->getWuid(wuid).str());
  675. AuditSystemAccess(context.queryUserId(), true, "Updated %s", wuid.str());
  676. }
  677. catch(IException* e)
  678. {
  679. FORWARDEXCEPTION(context, e, ECLWATCH_INTERNAL_ERROR);
  680. }
  681. return true;
  682. }
  683. static bool origValueChanged(const char *newValue, const char *origValue, StringBuffer &s, bool nillable=true)
  684. {
  685. if (!nillable && isEmpty(newValue))
  686. return false;
  687. if(newValue && origValue)
  688. {
  689. if (!streq(origValue, newValue))
  690. {
  691. s.append(newValue).trim();
  692. return true;
  693. }
  694. return false;
  695. }
  696. if (newValue)
  697. {
  698. s.append(newValue).trim();
  699. return true;
  700. }
  701. return (origValue!=NULL);
  702. }
  703. bool CWsWorkunitsEx::onWUUpdate(IEspContext &context, IEspWUUpdateRequest &req, IEspWUUpdateResponse &resp)
  704. {
  705. try
  706. {
  707. StringBuffer wuid = req.getWuid();
  708. checkAndTrimWorkunit("WUUpdate", wuid);
  709. ensureWsWorkunitAccess(context, wuid.str(), SecAccess_Write);
  710. Owned<IWorkUnitFactory> factory = getWorkUnitFactory(context.querySecManager(), context.queryUser());
  711. Owned<IConstWorkUnit> cw = factory->openWorkUnit(wuid.str(), false);
  712. if(!cw)
  713. throw MakeStringException(ECLWATCH_CANNOT_UPDATE_WORKUNIT,"Cannot open workunit %s.",wuid.str());
  714. if(req.getProtected() != req.getProtectedOrig())
  715. {
  716. cw->protect(req.getProtected());
  717. cw.clear();
  718. cw.setown(factory->openWorkUnit(wuid.str(), false));
  719. if(!cw)
  720. throw MakeStringException(ECLWATCH_CANNOT_UPDATE_WORKUNIT,"Cannot open workunit %s.",wuid.str());
  721. }
  722. if ((req.getState() == WUStateRunning)||(req.getState() == WUStateDebugPaused)||(req.getState() == WUStateDebugRunning))
  723. {
  724. WsWuInfo winfo(context, cw);
  725. winfo.getInfo(resp.updateWorkunit(), WUINFO_All);
  726. resp.setRedirectUrl(StringBuffer("/WsWorkunits/WUInfo?Wuid=").append(wuid).str());
  727. AuditSystemAccess(context.queryUserId(), true, "Updated %s", wuid.str());
  728. return true;
  729. }
  730. WorkunitUpdate wu(&cw->lock());
  731. if(!req.getState_isNull() && (req.getStateOrig_isNull() || req.getState() != req.getStateOrig()))
  732. {
  733. if (!req.getStateOrig_isNull() && cw->getState() != (WUState) req.getStateOrig())
  734. throw MakeStringException(ECLWATCH_CANNOT_UPDATE_WORKUNIT, "Cannot update workunit %s because its state has been changed internally. Please refresh the page and try again.", wuid.str());
  735. WUState state = (WUState) req.getState();
  736. if(state < WUStateSize)
  737. wu->setState(state);
  738. }
  739. StringBuffer s;
  740. if (origValueChanged(req.getJobname(), req.getJobnameOrig(), s))
  741. wu->setJobName(s.trim().str());
  742. if (origValueChanged(req.getDescription(), req.getDescriptionOrig(), s.clear()))
  743. wu->setDebugValue("description", (req.getDescription() && *req.getDescription()) ? s.trim().str() : NULL, true);
  744. double version = context.getClientVersion();
  745. if (version > 1.04)
  746. {
  747. if (origValueChanged(req.getClusterSelection(), req.getClusterOrig(), s.clear(), false))
  748. {
  749. if (!isValidCluster(s.str()))
  750. throw MakeStringException(ECLWATCH_INVALID_CLUSTER_NAME, "Invalid cluster name: %s", s.str());
  751. if (req.getState() == WUStateBlocked)
  752. switchWorkUnitQueue(wu.get(), s.str());
  753. else if ((req.getState() != WUStateSubmitted) && (req.getState() != WUStateRunning) && (req.getState() != WUStateDebugPaused) && (req.getState() != WUStateDebugRunning))
  754. wu->setClusterName(s.str());
  755. }
  756. }
  757. setWsWuXmlParameters(wu, req.getXmlParams(), (req.getAction()==WUActionExecuteExisting));
  758. if (notEmpty(req.getQueryText()))
  759. {
  760. Owned<IWUQuery> query=wu->updateQuery();
  761. query->setQueryText(req.getQueryText());
  762. }
  763. if (version > 1.34 && notEmpty(req.getQueryMainDefinition()))
  764. {
  765. Owned<IWUQuery> query=wu->updateQuery();
  766. query->setQueryMainDefinition(req.getQueryMainDefinition());
  767. }
  768. if (!req.getResultLimit_isNull())
  769. wu->setResultLimit(req.getResultLimit());
  770. if (!req.getAction_isNull())
  771. {
  772. WUAction action = (WUAction) req.getAction();
  773. if(action < WUActionSize)
  774. wu->setAction(action);
  775. }
  776. if (!req.getPriorityClass_isNull())
  777. {
  778. WUPriorityClass priority = (WUPriorityClass) req.getPriorityClass();
  779. if(priority<PriorityClassSize)
  780. wu->setPriority(priority);
  781. }
  782. if (!req.getPriorityLevel_isNull())
  783. wu->setPriorityLevel(req.getPriorityLevel());
  784. if (origValueChanged(req.getScope(), req.getScopeOrig(), s.clear(), false))
  785. wu->setWuScope(s.str());
  786. ForEachItemIn(di, req.getDebugValues())
  787. {
  788. IConstDebugValue& item = req.getDebugValues().item(di);
  789. if (notEmpty(item.getName()))
  790. wu->setDebugValue(item.getName(), item.getValue(), true);
  791. }
  792. ForEachItemIn(ai, req.getApplicationValues())
  793. {
  794. IConstApplicationValue& item=req.getApplicationValues().item(ai);
  795. if(notEmpty(item.getApplication()) && notEmpty(item.getName()))
  796. wu->setApplicationValue(item.getApplication(), item.getName(), item.getValue(), true);
  797. }
  798. wu->commit();
  799. wu.clear();
  800. WsWuInfo winfo(context, cw);
  801. winfo.getInfo(resp.updateWorkunit(), WUINFO_All);
  802. StringBuffer thorSlaveIP;
  803. if (version > 1.24 && notEmpty(req.getThorSlaveIP()))
  804. thorSlaveIP = req.getThorSlaveIP();
  805. if (thorSlaveIP.length() > 0)
  806. {
  807. StringBuffer url;
  808. url.appendf("/WsWorkunits/WUInfo?Wuid=%s&ThorSlaveIP=%s", wuid.str(), thorSlaveIP.str());
  809. resp.setRedirectUrl(url.str());
  810. }
  811. else
  812. resp.setRedirectUrl(StringBuffer("/WsWorkunits/WUInfo?Wuid=").append(wuid).str());
  813. AuditSystemAccess(context.queryUserId(), true, "Updated %s", wuid.str());
  814. }
  815. catch(IException* e)
  816. {
  817. FORWARDEXCEPTION(context, e, ECLWATCH_INTERNAL_ERROR);
  818. }
  819. return true;
  820. }
  821. bool CWsWorkunitsEx::onWUCreateAndUpdate(IEspContext &context, IEspWUUpdateRequest &req, IEspWUUpdateResponse &resp)
  822. {
  823. try
  824. {
  825. if (!context.validateFeatureAccess(OWN_WU_ACCESS, SecAccess_Write, false))
  826. throw MakeStringException(ECLWATCH_ECL_WU_ACCESS_DENIED, "Failed to create workunit. Permission denied.");
  827. NewWsWorkunit wu(context);
  828. SCMStringBuffer wuid;
  829. wu->getWuid(wuid);
  830. req.setWuid(wuid.str());
  831. }
  832. catch(IException* e)
  833. {
  834. FORWARDEXCEPTION(context, e, ECLWATCH_INTERNAL_ERROR);
  835. }
  836. return onWUUpdate(context, req, resp);
  837. }
  838. static inline StringBuffer &appendUrlParameter(StringBuffer &url, const char *name, const char *value, bool &first)
  839. {
  840. if (notEmpty(value))
  841. {
  842. url.append(first ? '?' : '&').append(name).append('=').append(value);
  843. first=false;
  844. }
  845. return url;
  846. }
  847. bool CWsWorkunitsEx::onWUAction(IEspContext &context, IEspWUActionRequest &req, IEspWUActionResponse &resp)
  848. {
  849. try
  850. {
  851. StringBuffer sAction(req.getActionType());
  852. if (!sAction.length())
  853. throw MakeStringException(ECLWATCH_INVALID_INPUT,"Action not defined.");
  854. int *action=wuActionTable.getValue(sAction.toLowerCase().str());
  855. if (!action)
  856. throw MakeStringException(ECLWATCH_INVALID_INPUT,"Invalid Action '%s'.", sAction.str());
  857. Owned<IProperties> params = createProperties(true);
  858. params->setProp("BlockTillFinishTimer", req.getBlockTillFinishTimer());
  859. if (*action==ActionProtect)
  860. params->setProp("Protect", streq(sAction.str(), "protect"));
  861. if (*action==ActionChangeState && streq(sAction.str(), "settofailed"))
  862. params->setProp("State",4);
  863. IArrayOf<IConstWUActionResult> results;
  864. if (doAction(context, req.getWuids(), *action, params, &results) && *action!=ActionDelete && checkRedirect(context))
  865. {
  866. StringBuffer redirect;
  867. if(req.getPageFrom() && strieq(req.getPageFrom(), "wuid"))
  868. redirect.append("/WsWorkunits/WUInfo?Wuid=").append(req.getWuids().item(0));
  869. else if (req.getPageFrom() && strieq(req.getPageFrom(), "scheduler"))
  870. {
  871. redirect.set("/WsWorkunits/WUShowScheduled");
  872. bool first=true;
  873. appendUrlParameter(redirect, "Cluster", req.getEventServer(), first);
  874. appendUrlParameter(redirect, "EventName", req.getEventName(), first);
  875. }
  876. else
  877. {
  878. redirect.append("/WsWorkunits/WUQuery");
  879. bool first=true;
  880. appendUrlParameter(redirect, "PageSize", req.getPageSize(), first);
  881. appendUrlParameter(redirect, "PageStartFrom", req.getCurrentPage(), first);
  882. appendUrlParameter(redirect, "Sortby", req.getSortby(), first);
  883. appendUrlParameter(redirect, "Descending", req.getDescending() ? "1" : "0", first);
  884. appendUrlParameter(redirect, "State", req.getState(), first);
  885. appendUrlParameter(redirect, "Cluster", req.getCluster(), first);
  886. appendUrlParameter(redirect, "Owner", req.getOwner(), first);
  887. appendUrlParameter(redirect, "StartDate", req.getStartDate(), first);
  888. appendUrlParameter(redirect, "EndDate", req.getEndDate(), first);
  889. appendUrlParameter(redirect, "ECL", req.getECL(), first);
  890. appendUrlParameter(redirect, "Jobname", req.getJobname(), first);
  891. }
  892. resp.setRedirectUrl(redirect.str());
  893. }
  894. else
  895. resp.setActionResults(results);
  896. }
  897. catch(IException* e)
  898. {
  899. FORWARDEXCEPTION(context, e, ECLWATCH_INTERNAL_ERROR);
  900. }
  901. return true;
  902. }
  903. bool CWsWorkunitsEx::onWUDelete(IEspContext &context, IEspWUDeleteRequest &req, IEspWUDeleteResponse &resp)
  904. {
  905. try
  906. {
  907. IArrayOf<IConstWUActionResult> results;
  908. Owned<IProperties> params = createProperties(true);
  909. params->setProp("BlockTillFinishTimer", req.getBlockTillFinishTimer());
  910. if (!doAction(context,req.getWuids(), ActionDelete, params, &results))
  911. resp.setActionResults(results);
  912. }
  913. catch(IException* e)
  914. {
  915. FORWARDEXCEPTION(context, e, ECLWATCH_INTERNAL_ERROR);
  916. }
  917. return true;
  918. }
  919. bool CWsWorkunitsEx::onWUAbort(IEspContext &context, IEspWUAbortRequest &req, IEspWUAbortResponse &resp)
  920. {
  921. try
  922. {
  923. IArrayOf<IConstWUActionResult> results;
  924. Owned<IProperties> params = createProperties(true);
  925. params->setProp("BlockTillFinishTimer", req.getBlockTillFinishTimer());
  926. if (!doAction(context,req.getWuids(), ActionAbort, params, &results))
  927. resp.setActionResults(results);
  928. }
  929. catch(IException* e)
  930. {
  931. FORWARDEXCEPTION(context, e, ECLWATCH_INTERNAL_ERROR);
  932. }
  933. return true;
  934. }
  935. bool CWsWorkunitsEx::onWUProtect(IEspContext &context, IEspWUProtectRequest &req, IEspWUProtectResponse &resp)\
  936. {
  937. try
  938. {
  939. IArrayOf<IConstWUActionResult> results;
  940. Owned<IProperties> params(createProperties(true));
  941. params->setProp("Protect", req.getProtect());
  942. params->setProp("BlockTillFinishTimer", 0);
  943. if (!doAction(context,req.getWuids(), ActionProtect, params, &results))
  944. resp.setActionResults(results);
  945. }
  946. catch(IException* e)
  947. {
  948. FORWARDEXCEPTION(context, e, ECLWATCH_INTERNAL_ERROR);
  949. }
  950. return true;
  951. }
  952. bool CWsWorkunitsEx::onWUResubmit(IEspContext &context, IEspWUResubmitRequest &req, IEspWUResubmitResponse &resp)
  953. {
  954. try
  955. {
  956. Owned<IMultiException> me = MakeMultiException();
  957. SCMStringBuffer wuid;
  958. StringArray wuids;
  959. double version = context.getClientVersion();
  960. IArrayOf<IEspResubmittedWU> resubmittedWUs;
  961. for(aindex_t i=0; i<req.getWuids().length();i++)
  962. {
  963. StringBuffer requestWuid = req.getWuids().item(i);
  964. checkAndTrimWorkunit("WUResubmit", requestWuid);
  965. ensureWsWorkunitAccess(context, requestWuid.str(), SecAccess_Write);
  966. wuid.set(requestWuid.str());
  967. try
  968. {
  969. Owned<IWorkUnitFactory> factory = getWorkUnitFactory(context.querySecManager(), context.queryUser());
  970. if(req.getCloneWorkunit() || req.getRecompile())
  971. {
  972. Owned<IConstWorkUnit> src(factory->openWorkUnit(wuid.str(), false));
  973. NewWsWorkunit wu(factory, context);
  974. wu->getWuid(wuid);
  975. queryExtendedWU(wu)->copyWorkUnit(src, false);
  976. SCMStringBuffer token;
  977. wu->setSecurityToken(createToken(wuid.str(), context.queryUserId(), context.queryPassword(), token).str());
  978. }
  979. wuids.append(wuid.str());
  980. Owned<IConstWorkUnit> cw(factory->openWorkUnit(wuid.str(), false));
  981. if(!cw)
  982. throw MakeStringException(ECLWATCH_CANNOT_OPEN_WORKUNIT,"Cannot open workunit %s.",wuid.str());
  983. submitWsWorkunit(context, cw, NULL, NULL, 0, req.getRecompile(), req.getResetWorkflow(), false);
  984. if (version < 1.40)
  985. continue;
  986. Owned<IEspResubmittedWU> resubmittedWU = createResubmittedWU();
  987. resubmittedWU->setWUID(wuid.str());
  988. if (!streq(requestWuid.str(), wuid.str()))
  989. resubmittedWU->setParentWUID(requestWuid.str());
  990. resubmittedWUs.append(*resubmittedWU.getClear());
  991. }
  992. catch (IException *E)
  993. {
  994. me->append(*E);
  995. }
  996. catch (...)
  997. {
  998. me->append(*MakeStringException(0,"Unknown exception submitting %s",wuid.str()));
  999. }
  1000. }
  1001. if(me->ordinality())
  1002. throw me.getLink();
  1003. int timeToWait = req.getBlockTillFinishTimer();
  1004. if (timeToWait != 0)
  1005. {
  1006. for(aindex_t i=0; i<wuids.length(); i++)
  1007. waitForWorkUnitToComplete(wuids.item(i), timeToWait);
  1008. }
  1009. if (version >= 1.40)
  1010. resp.setWUs(resubmittedWUs);
  1011. if(wuids.length()==1)
  1012. {
  1013. resp.setRedirectUrl(StringBuffer("/WsWorkunits/WUInfo?Wuid=").append(wuids.item(0)));
  1014. }
  1015. }
  1016. catch(IException* e)
  1017. {
  1018. FORWARDEXCEPTION(context, e, ECLWATCH_INTERNAL_ERROR);
  1019. }
  1020. return true;
  1021. }
  1022. bool CWsWorkunitsEx::onWUPushEvent(IEspContext &context, IEspWUPushEventRequest &req, IEspWUPushEventResponse &resp)
  1023. {
  1024. try
  1025. {
  1026. const char *name = req.getEventName();
  1027. const char *text = req.getEventText();
  1028. const char *target = NULL;
  1029. if (notEmpty(name) && notEmpty(text))
  1030. {
  1031. Owned<IScheduleEventPusher> pusher(getScheduleEventPusher());
  1032. pusher->push(name, text, target);
  1033. StringBuffer redirect("/WsWorkunits/WUShowScheduled");
  1034. bool first=true;
  1035. appendUrlParameter(redirect, "PushEventName", name, first);
  1036. appendUrlParameter(redirect, "PushEventText", text, first);
  1037. resp.setRedirectUrl(redirect.str());
  1038. return true;
  1039. }
  1040. }
  1041. catch(IException* e)
  1042. {
  1043. FORWARDEXCEPTION(context, e, ECLWATCH_INTERNAL_ERROR);
  1044. }
  1045. return false;
  1046. }
  1047. bool CWsWorkunitsEx::onWUSchedule(IEspContext &context, IEspWUScheduleRequest &req, IEspWUScheduleResponse &resp)
  1048. {
  1049. try
  1050. {
  1051. StringBuffer wuid = req.getWuid();
  1052. checkAndTrimWorkunit("WUSchedule", wuid);
  1053. const char* cluster = req.getCluster();
  1054. if (isEmpty(cluster))
  1055. throw MakeStringException(ECLWATCH_INVALID_INPUT,"No Cluster defined.");
  1056. if (!isValidCluster(cluster))
  1057. throw MakeStringException(ECLWATCH_INVALID_CLUSTER_NAME, "Invalid cluster name: %s", cluster);
  1058. Owned<IWorkUnitFactory> factory = getWorkUnitFactory(context.querySecManager(), context.queryUser());
  1059. WorkunitUpdate wu(factory->updateWorkUnit(wuid.str()));
  1060. ensureWsWorkunitAccess(context, *wu.get(), SecAccess_Write);
  1061. switch(wu->getState())
  1062. {
  1063. case WUStateDebugPaused:
  1064. case WUStateDebugRunning:
  1065. case WUStateRunning:
  1066. case WUStateAborting:
  1067. case WUStateBlocked:
  1068. {
  1069. SCMStringBuffer descr;
  1070. throw MakeStringException(ECLWATCH_CANNOT_SCHEDULE_WORKUNIT, "Cannot schedule the workunit. Workunit state is '%s'.", wu->getStateDesc(descr).str());
  1071. }
  1072. }
  1073. wu->clearExceptions();
  1074. wu->setClusterName(cluster);
  1075. if (notEmpty(req.getWhen()))
  1076. {
  1077. WsWuDateTime dt;
  1078. dt.setString(req.getWhen());
  1079. wu->setTimeScheduled(dt);
  1080. }
  1081. if(notEmpty(req.getSnapshot()))
  1082. wu->setSnapshot(req.getSnapshot());
  1083. wu->setState(WUStateScheduled);
  1084. if (req.getMaxRunTime())
  1085. wu->setDebugValueInt("maxRunTime", req.getMaxRunTime(), true);
  1086. SCMStringBuffer token;
  1087. wu->setSecurityToken(createToken(wuid.str(), context.queryUserId(), context.queryPassword(), token).str());
  1088. AuditSystemAccess(context.queryUserId(), true, "Scheduled %s", wuid.str());
  1089. }
  1090. catch(IException* e)
  1091. {
  1092. FORWARDEXCEPTION(context, e, ECLWATCH_INTERNAL_ERROR);
  1093. }
  1094. return true;
  1095. }
  1096. bool CWsWorkunitsEx::onWUSubmit(IEspContext &context, IEspWUSubmitRequest &req, IEspWUSubmitResponse &resp)
  1097. {
  1098. try
  1099. {
  1100. StringBuffer wuid = req.getWuid();
  1101. checkAndTrimWorkunit("WUSubmit", wuid);
  1102. const char *cluster = req.getCluster();
  1103. if (isEmpty(cluster))
  1104. throw MakeStringException(ECLWATCH_INVALID_INPUT,"No Cluster defined.");
  1105. if (!isValidCluster(cluster))
  1106. throw MakeStringException(ECLWATCH_INVALID_CLUSTER_NAME, "Invalid cluster name: %s", cluster);
  1107. Owned<IWorkUnitFactory> factory = getWorkUnitFactory(context.querySecManager(), context.queryUser());
  1108. Owned<IConstWorkUnit> cw = factory->openWorkUnit(wuid.str(), false);
  1109. if(!cw)
  1110. throw MakeStringException(ECLWATCH_CANNOT_OPEN_WORKUNIT,"Cannot open workunit %s.",wuid.str());
  1111. if (cw->getAction()==WUActionExecuteExisting)
  1112. {
  1113. ExecuteExistingQueryInfo info(cw);
  1114. if (info.queryset.isEmpty() || info.query.isEmpty())
  1115. {
  1116. WorkunitUpdate wu(&cw->lock());
  1117. throw noteException(wu, MakeStringException(ECLWATCH_INVALID_INPUT,"Queryset and/or query not specified"));
  1118. }
  1119. runWsWuQuery(context, cw, info.queryset.sget(), info.query.sget(), cluster, NULL);
  1120. }
  1121. else
  1122. submitWsWorkunit(context, cw, cluster, req.getSnapshot(), req.getMaxRunTime(), true, false, false);
  1123. if (req.getBlockTillFinishTimer() != 0)
  1124. waitForWorkUnitToComplete(wuid.str(), req.getBlockTillFinishTimer());
  1125. resp.setRedirectUrl(StringBuffer("/WsWorkunits/WUInfo?Wuid=").append(wuid).str());
  1126. }
  1127. catch(IException* e)
  1128. {
  1129. FORWARDEXCEPTION(context, e, ECLWATCH_INTERNAL_ERROR);
  1130. }
  1131. return true;
  1132. }
  1133. bool CWsWorkunitsEx::onWURun(IEspContext &context, IEspWURunRequest &req, IEspWURunResponse &resp)
  1134. {
  1135. try
  1136. {
  1137. const char *cluster = req.getCluster();
  1138. if (notEmpty(cluster) && !isValidCluster(cluster))
  1139. throw MakeStringException(ECLWATCH_INVALID_CLUSTER_NAME, "Invalid cluster name: %s", cluster);
  1140. StringBuffer wuidStr = req.getWuid();
  1141. const char* runWuid = wuidStr.trim().str();
  1142. StringBuffer wuid;
  1143. if (runWuid && *runWuid)
  1144. {
  1145. if (!looksLikeAWuid(runWuid))
  1146. throw MakeStringException(ECLWATCH_INVALID_INPUT, "Invalid Workunit ID: %s", runWuid);
  1147. if (req.getCloneWorkunit())
  1148. runWsWorkunit(context, wuid, runWuid, cluster, req.getInput(), &req.getVariables(), &req.getDebugValues());
  1149. else
  1150. {
  1151. submitWsWorkunit(context, runWuid, cluster, NULL, 0, false, true, true, req.getInput(), &req.getVariables(), &req.getDebugValues());
  1152. wuid.set(runWuid);
  1153. }
  1154. }
  1155. else if (notEmpty(req.getQuerySet()) && notEmpty(req.getQuery()))
  1156. runWsWuQuery(context, wuid, req.getQuerySet(), req.getQuery(), cluster, req.getInput());
  1157. else
  1158. throw MakeStringException(ECLWATCH_MISSING_PARAMS,"Workunit or Query required");
  1159. int timeToWait = req.getWait();
  1160. if (timeToWait != 0)
  1161. waitForWorkUnitToComplete(wuid.str(), timeToWait);
  1162. Owned<IWorkUnitFactory> factory = getWorkUnitFactory(context.querySecManager(), context.queryUser());
  1163. Owned<IConstWorkUnit> cw = factory->openWorkUnit(wuid.str(), false);
  1164. if (!cw)
  1165. throw MakeStringException(ECLWATCH_CANNOT_UPDATE_WORKUNIT,"Cannot open workunit %s.", wuid.str());
  1166. SCMStringBuffer stateDesc;
  1167. resp.setState(cw->getStateDesc(stateDesc).str());
  1168. resp.setWuid(wuid.str());
  1169. switch (cw->getState())
  1170. {
  1171. case WUStateCompleted:
  1172. case WUStateFailed:
  1173. case WUStateUnknown:
  1174. {
  1175. SCMStringBuffer result;
  1176. unsigned flags = WorkUnitXML_SeverityTags;
  1177. if (req.getNoRootTag())
  1178. flags |= WorkUnitXML_NoRoot;
  1179. getFullWorkUnitResultsXML(context.queryUserId(), context.queryPassword(), cw.get(), result, flags);
  1180. resp.setResults(result.str());
  1181. break;
  1182. }
  1183. default:
  1184. break;
  1185. }
  1186. }
  1187. catch(IException* e)
  1188. {
  1189. FORWARDEXCEPTION(context, e, ECLWATCH_INTERNAL_ERROR);
  1190. }
  1191. return true;
  1192. }
  1193. bool CWsWorkunitsEx::onWUWaitCompiled(IEspContext &context, IEspWUWaitRequest &req, IEspWUWaitResponse &resp)
  1194. {
  1195. try
  1196. {
  1197. StringBuffer wuid = req.getWuid();
  1198. checkAndTrimWorkunit("WUWaitCompiled", wuid);
  1199. secWaitForWorkUnitToCompile(wuid.str(), *context.querySecManager(), *context.queryUser(), req.getWait());
  1200. Owned<IWorkUnitFactory> factory = getWorkUnitFactory(context.querySecManager(), context.queryUser());
  1201. Owned<IConstWorkUnit> cw = factory->openWorkUnit(wuid.str(), false);
  1202. if(!cw)
  1203. throw MakeStringException(ECLWATCH_CANNOT_OPEN_WORKUNIT,"Cannot open workunit %s.",wuid.str());
  1204. resp.setStateID(cw->getState());
  1205. }
  1206. catch(IException* e)
  1207. {
  1208. FORWARDEXCEPTION(context, e, ECLWATCH_INTERNAL_ERROR);
  1209. }
  1210. return true;
  1211. }
  1212. bool CWsWorkunitsEx::onWUWaitComplete(IEspContext &context, IEspWUWaitRequest &req, IEspWUWaitResponse &resp)
  1213. {
  1214. try
  1215. {
  1216. StringBuffer wuid = req.getWuid();
  1217. checkAndTrimWorkunit("WUWaitComplete", wuid);
  1218. resp.setStateID(secWaitForWorkUnitToComplete(wuid.str(), *context.querySecManager(), *context.queryUser(), req.getWait(), req.getReturnOnWait()));
  1219. }
  1220. catch(IException* e)
  1221. {
  1222. FORWARDEXCEPTION(context, e, ECLWATCH_INTERNAL_ERROR);
  1223. }
  1224. return true;
  1225. }
  1226. bool CWsWorkunitsEx::onWUCDebug(IEspContext &context, IEspWUDebugRequest &req, IEspWUDebugResponse &resp)
  1227. {
  1228. try
  1229. {
  1230. StringBuffer wuid = req.getWuid();
  1231. checkAndTrimWorkunit("WUCDebug", wuid);
  1232. StringBuffer result;
  1233. secDebugWorkunit(wuid.str(), *context.querySecManager(), *context.queryUser(), req.getCommand(), result);
  1234. resp.setResult(result);
  1235. }
  1236. catch(IException* e)
  1237. {
  1238. FORWARDEXCEPTION(context, e, ECLWATCH_INTERNAL_ERROR);
  1239. }
  1240. return true;
  1241. }
  1242. bool CWsWorkunitsEx::onWUSyntaxCheckECL(IEspContext &context, IEspWUSyntaxCheckRequest &req, IEspWUSyntaxCheckResponse &resp)
  1243. {
  1244. try
  1245. {
  1246. Owned<IWorkUnitFactory> factory = getWorkUnitFactory(context.querySecManager(), context.queryUser());
  1247. NewWsWorkunit wu(factory, context);
  1248. wu->setAction(WUActionCheck);
  1249. if(notEmpty(req.getModuleName()) && notEmpty(req.getAttributeName()))
  1250. {
  1251. wu->setApplicationValue("SyntaxCheck", "ModuleName", req.getModuleName(), true);
  1252. wu->setApplicationValue("SyntaxCheck", "AttributeName", req.getAttributeName(), true);
  1253. }
  1254. ForEachItemIn(di, req.getDebugValues())
  1255. {
  1256. IConstDebugValue& item=req.getDebugValues().item(di);
  1257. if(notEmpty(item.getName()))
  1258. wu->setDebugValue(item.getName(), item.getValue(), true);
  1259. }
  1260. wu.setQueryText(req.getECL());
  1261. SCMStringBuffer wuid;
  1262. wu->getWuid(wuid);
  1263. wu->commit();
  1264. wu.clear();
  1265. submitWsWorkunit(context, wuid.str(), req.getCluster(), req.getSnapshot(), 0, true, false, false);
  1266. waitForWorkUnitToComplete(wuid.str(), req.getTimeToWait());
  1267. Owned<IConstWorkUnit> cw(factory->openWorkUnit(wuid.str(), false));
  1268. WsWUExceptions errors(*cw);
  1269. resp.setErrors(errors);
  1270. cw.clear();
  1271. factory->deleteWorkUnit(wuid.str());
  1272. }
  1273. catch(IException* e)
  1274. {
  1275. FORWARDEXCEPTION(context, e, ECLWATCH_INTERNAL_ERROR);
  1276. }
  1277. return true;
  1278. }
  1279. bool CWsWorkunitsEx::onWUCompileECL(IEspContext &context, IEspWUCompileECLRequest &req, IEspWUCompileECLResponse &resp)
  1280. {
  1281. try
  1282. {
  1283. ensureWsCreateWorkunitAccess(context);
  1284. Owned<IWorkUnitFactory> factory = getWorkUnitFactory(context.querySecManager(), context.queryUser());
  1285. NewWsWorkunit wu(factory, context);
  1286. if(req.getIncludeComplexity())
  1287. {
  1288. wu->setAction(WUActionCompile);
  1289. wu->setDebugValueInt("calculateComplexity",1,true);
  1290. }
  1291. else
  1292. wu->setAction(WUActionCheck);
  1293. if(req.getModuleName() && req.getAttributeName())
  1294. {
  1295. wu->setApplicationValue("SyntaxCheck","ModuleName",req.getModuleName(),true);
  1296. wu->setApplicationValue("SyntaxCheck","AttributeName",req.getAttributeName(),true);
  1297. }
  1298. if(req.getIncludeDependencies())
  1299. wu->setApplicationValueInt("SyntaxCheck","IncludeDependencies",1,true);
  1300. wu.setQueryText(req.getECL());
  1301. SCMStringBuffer wuid;
  1302. wu->getWuid(wuid);
  1303. wu.clear();
  1304. submitWsWorkunit(context, wuid.str(), req.getCluster(), req.getSnapshot(), 0, true, false, false);
  1305. waitForWorkUnitToComplete(wuid.str(),req.getTimeToWait());
  1306. Owned<IConstWorkUnit> cw = factory->openWorkUnit(wuid.str(), false);
  1307. SCMStringBuffer s;
  1308. cw->getDebugValue("__Calculated__Complexity__",s);
  1309. if(s.length())
  1310. resp.setComplexity(s.str());
  1311. WsWUExceptions errors(*cw);
  1312. resp.setErrors(errors);
  1313. if(!errors.ErrCount())
  1314. {
  1315. IArrayOf<IEspWUECLAttribute> dependencies;
  1316. for(unsigned count=1;;count++)
  1317. {
  1318. SCMStringBuffer xml;
  1319. cw->getApplicationValue("SyntaxCheck",StringBuffer("Dependency").append(count).str(),xml);
  1320. if(!xml.length())
  1321. break;
  1322. Owned<IPropertyTree> dep=createPTreeFromXMLString(xml.str(), ipt_caseInsensitive);
  1323. if(!dep)
  1324. continue;
  1325. Owned<IEspWUECLAttribute> att = createWUECLAttribute("","");
  1326. att->setModuleName(dep->queryProp("@module"));
  1327. att->setAttributeName(dep->queryProp("@name"));
  1328. int flags = dep->getPropInt("@flags",0);
  1329. if(flags & ob_locked)
  1330. {
  1331. if(flags & ob_lockedself)
  1332. att->setIsCheckedOut(true);
  1333. else
  1334. att->setIsLocked(true);
  1335. }
  1336. if(flags & ob_sandbox)
  1337. att->setIsSandbox(true);
  1338. if(flags & ob_orphaned)
  1339. att->setIsOrphaned(true);
  1340. dependencies.append(*att.getLink());
  1341. }
  1342. resp.setDependencies(dependencies);
  1343. }
  1344. cw.clear();
  1345. factory->deleteWorkUnit(wuid.str());
  1346. }
  1347. catch(IException* e)
  1348. {
  1349. FORWARDEXCEPTION(context, e, ECLWATCH_INTERNAL_ERROR);
  1350. }
  1351. return true;
  1352. }
  1353. bool CWsWorkunitsEx::onWUGetDependancyTrees(IEspContext& context, IEspWUGetDependancyTreesRequest& req, IEspWUGetDependancyTreesResponse& resp)
  1354. {
  1355. try
  1356. {
  1357. DBGLOG("WUGetDependancyTrees");
  1358. unsigned int timeMilliSec = 500;
  1359. Owned<IWorkUnitFactory> factory = getWorkUnitFactory(context.querySecManager(), context.queryUser());
  1360. NewWsWorkunit wu(factory, context);
  1361. wu->setAction(WUActionCheck);
  1362. if (notEmpty(req.getCluster()))
  1363. wu->setClusterName(req.getCluster());
  1364. if (notEmpty(req.getSnapshot()))
  1365. wu->setSnapshot(req.getSnapshot());
  1366. wu->setDebugValue("gatherDependenciesSelection",notEmpty(req.getItems()) ? req.getItems() : NULL,true);
  1367. if (context.getClientVersion() > 1.12)
  1368. {
  1369. wu->setDebugValueInt("gatherDependencies", 1, true);
  1370. const char *timeout = req.getTimeoutMilliSec();
  1371. if (notEmpty(timeout))
  1372. {
  1373. const char *finger = timeout;
  1374. while (*finger)
  1375. {
  1376. if (!isdigit(*finger++))
  1377. throw MakeStringException(ECLWATCH_INVALID_INPUT, "Incorrect timeout value");
  1378. }
  1379. timeMilliSec = atol(timeout);
  1380. }
  1381. }
  1382. SCMStringBuffer wuid;
  1383. wu->getWuid(wuid);
  1384. wu->commit();
  1385. wu.clear();
  1386. ensureWsWorkunitAccess(context, wuid.str(), SecAccess_Read);
  1387. submitWsWorkunit(context, wuid.str(), req.getCluster(), req.getSnapshot(), 0, true, false, false);
  1388. int state = waitForWorkUnitToComplete(wuid.str(), timeMilliSec);
  1389. Owned<IConstWorkUnit> cw = factory->openWorkUnit(wuid.str(), false);
  1390. WsWUExceptions errors(*cw);
  1391. resp.setErrors(errors);
  1392. MemoryBuffer temp;
  1393. MemoryBuffer2IDataVal xmlresult(temp);
  1394. Owned<IConstWUResult> result = cw->getResultBySequence(0);
  1395. if (result)
  1396. {
  1397. result->getResultRaw(xmlresult, NULL, NULL);
  1398. resp.setDependancyTrees(temp);
  1399. }
  1400. wu.setown(&cw->lock());
  1401. wu->setState(WUStateAborted);
  1402. wu->commit();
  1403. wu.clear();
  1404. factory->deleteWorkUnit(wuid.str());
  1405. }
  1406. catch(IException* e)
  1407. {
  1408. FORWARDEXCEPTION(context, e, ECLWATCH_INTERNAL_ERROR);
  1409. }
  1410. return true;
  1411. }
  1412. bool getWsWuInfoFromSasha(IEspContext &context, SocketEndpoint &ep, const char* wuid, IEspECLWorkunit *info)
  1413. {
  1414. Owned<INode> node = createINode(ep);
  1415. Owned<ISashaCommand> cmd = createSashaCommand();
  1416. cmd->addId(wuid);
  1417. cmd->setAction(SCA_GET);
  1418. if (!cmd->send(node, 1*60*1000))
  1419. {
  1420. StringBuffer url;
  1421. DBGLOG("Could not connect to Sasha server at %s", ep.getUrlStr(url).str());
  1422. throw MakeStringException(ECLWATCH_CANNOT_CONNECT_ARCHIVE_SERVER,"Cannot connect to archive server at %s.", url.str());
  1423. }
  1424. if (cmd->numIds()==0)
  1425. {
  1426. DBGLOG("Could not read archived workunit %s",wuid);
  1427. throw MakeStringException(ECLWATCH_CANNOT_GET_WORKUNIT,"Cannot read workunit %s.",wuid);
  1428. }
  1429. unsigned num = cmd->numResults();
  1430. if (num < 1)
  1431. return false;
  1432. StringBuffer res;
  1433. cmd->getResult(0, res);
  1434. if(res.length() < 1)
  1435. return false;
  1436. Owned<IPropertyTree> wpt = createPTreeFromXMLString(res.str());
  1437. if (!wpt)
  1438. return false;
  1439. const char * owner = wpt->queryProp("@submitID");
  1440. ensureWsWorkunitAccessByOwnerId(context, owner, SecAccess_Read);
  1441. info->setWuid(wuid);
  1442. info->setArchived(true);
  1443. if (notEmpty(owner))
  1444. info->setOwner(owner);
  1445. const char * state = wpt->queryProp("@state");
  1446. if (notEmpty(state))
  1447. info->setState(state);
  1448. const char * cluster = wpt->queryProp("@clusterName");
  1449. if (notEmpty(cluster))
  1450. info->setCluster(cluster);
  1451. const char * scope = wpt->queryProp("@scope");
  1452. if (notEmpty(scope))
  1453. info->setScope(scope);
  1454. const char * jobName = wpt->queryProp("@jobName");
  1455. if (notEmpty(jobName))
  1456. info->setJobname(jobName);
  1457. const char * description = wpt->queryProp("Debug/description");
  1458. if (notEmpty(description))
  1459. info->setDescription(description);
  1460. const char * queryText = wpt->queryProp("Query/Text");
  1461. if (notEmpty(queryText))
  1462. info->updateQuery().setText(queryText);
  1463. const char * protectedWU = wpt->queryProp("@protected");
  1464. info->setProtected((protectedWU && *protectedWU!='0'));
  1465. return true;
  1466. }
  1467. #define WUDETAILS_REFRESH_MINS 1
  1468. void getArchivedWUInfo(IEspContext &context, const char *wuid, IEspWUInfoResponse &resp)
  1469. {
  1470. Owned<IEnvironmentFactory> envFactory = getEnvironmentFactory();
  1471. Owned<IConstEnvironment> constEnv = envFactory->openEnvironmentByFile();
  1472. Owned<IPropertyTree> root = &constEnv->getPTree();
  1473. if (!root)
  1474. throw MakeStringExceptionDirect(ECLWATCH_CANNOT_GET_ENV_INFO, "Failed to get environment info");
  1475. Owned<IPropertyTreeIterator> instances = root->getElements("Software/SashaServerProcess/Instance");
  1476. ForEach(*instances)
  1477. {
  1478. IPropertyTree &instance = instances->query();
  1479. SocketEndpoint ep(instance.queryProp("@netAddress"), instance.getPropInt("@port", 8877));
  1480. if (getWsWuInfoFromSasha(context, ep, wuid, &resp.updateWorkunit()))
  1481. {
  1482. resp.setAutoRefresh(WUDETAILS_REFRESH_MINS);
  1483. resp.setCanCompile(false);
  1484. return;
  1485. }
  1486. }
  1487. throw MakeStringException(ECLWATCH_CANNOT_GET_WORKUNIT, "Cannot find workunit %s.", wuid);
  1488. return;
  1489. }
  1490. #define WUDETAILS_REFRESH_MINS 1
  1491. bool CWsWorkunitsEx::onWUInfo(IEspContext &context, IEspWUInfoRequest &req, IEspWUInfoResponse &resp)
  1492. {
  1493. try
  1494. {
  1495. StringBuffer wuid = req.getWuid();
  1496. checkAndTrimWorkunit("WUInfo", wuid);
  1497. if (req.getType() && strieq(req.getType(), "archived workunits"))
  1498. getArchivedWUInfo(context, wuid.str(), resp);
  1499. else
  1500. {
  1501. try
  1502. {
  1503. //The access is checked here because getArchivedWUInfo() has its own access check.
  1504. ensureWsWorkunitAccess(context, wuid.str(), SecAccess_Read);
  1505. unsigned flags=0;
  1506. if (req.getTruncateEclTo64k())
  1507. flags|=WUINFO_TruncateEclTo64k;
  1508. if (req.getIncludeExceptions())
  1509. flags|=WUINFO_IncludeExceptions;
  1510. if (req.getIncludeGraphs())
  1511. flags|=WUINFO_IncludeGraphs;
  1512. if (req.getIncludeSourceFiles())
  1513. flags|=WUINFO_IncludeSourceFiles;
  1514. if (req.getIncludeResults())
  1515. flags|=WUINFO_IncludeResults;
  1516. if (req.getIncludeVariables())
  1517. flags|=WUINFO_IncludeVariables;
  1518. if (req.getIncludeTimers())
  1519. flags|=WUINFO_IncludeTimers;
  1520. if (req.getIncludeDebugValues())
  1521. flags|=WUINFO_IncludeDebugValues;
  1522. if (req.getIncludeApplicationValues())
  1523. flags|=WUINFO_IncludeApplicationValues;
  1524. if (req.getIncludeWorkflows())
  1525. flags|=WUINFO_IncludeWorkflows;
  1526. if (!req.getSuppressResultSchemas())
  1527. flags|=WUINFO_IncludeEclSchemas;
  1528. if (req.getIncludeXmlSchemas())
  1529. flags|=WUINFO_IncludeXmlSchema;
  1530. WsWuInfo winfo(context, wuid.str());
  1531. winfo.getInfo(resp.updateWorkunit(), flags);
  1532. if (req.getIncludeResultsViewNames())
  1533. {
  1534. StringArray views;
  1535. winfo.getResultViews(views, WUINFO_IncludeResultsViewNames);
  1536. resp.setResultViews(views);
  1537. }
  1538. }
  1539. catch (IException *e)
  1540. {
  1541. if (e->errorCode() != ECLWATCH_CANNOT_OPEN_WORKUNIT)
  1542. throw e;
  1543. getArchivedWUInfo(context, wuid.str(), resp);
  1544. }
  1545. switch (resp.getWorkunit().getStateID())
  1546. {
  1547. case WUStateCompiling:
  1548. case WUStateCompiled:
  1549. case WUStateScheduled:
  1550. case WUStateSubmitted:
  1551. case WUStateRunning:
  1552. case WUStateAborting:
  1553. case WUStateWait:
  1554. case WUStateUploadingFiles:
  1555. case WUStateDebugPaused:
  1556. case WUStateDebugRunning:
  1557. resp.setAutoRefresh(WUDETAILS_REFRESH_MINS);
  1558. break;
  1559. case WUStateBlocked:
  1560. resp.setAutoRefresh(WUDETAILS_REFRESH_MINS*5);
  1561. break;
  1562. }
  1563. resp.setCanCompile(notEmpty(context.queryUserId()));
  1564. if (context.getClientVersion() > 1.24 && notEmpty(req.getThorSlaveIP()))
  1565. resp.setThorSlaveIP(req.getThorSlaveIP());
  1566. }
  1567. }
  1568. catch(IException* e)
  1569. {
  1570. FORWARDEXCEPTION(context, e, ECLWATCH_INTERNAL_ERROR);
  1571. }
  1572. return true;
  1573. }
  1574. bool CWsWorkunitsEx::onWUInfoDetails(IEspContext &context, IEspWUInfoRequest &req, IEspWUInfoResponse &resp)
  1575. {
  1576. return onWUInfo(context, req, resp);
  1577. }
  1578. bool CWsWorkunitsEx::onWUResultView(IEspContext &context, IEspWUResultViewRequest &req, IEspWUResultViewResponse &resp)
  1579. {
  1580. StringBuffer wuid = req.getWuid();
  1581. checkAndTrimWorkunit("WUResultView", wuid);
  1582. ensureWsWorkunitAccess(context, wuid.str(), SecAccess_Read);
  1583. Owned<IWuWebView> wv = createWuWebView(wuid.str(), NULL, getCFD(), true);
  1584. StringBuffer html;
  1585. wv->renderSingleResult(req.getViewName(), req.getResultName(), html);
  1586. resp.setResult(html.str());
  1587. resp.setResult_mimetype("text/html");
  1588. return true;
  1589. }
  1590. void doWUQueryBySingleWuid(IEspContext &context, const char *wuid, IEspWUQueryResponse &resp)
  1591. {
  1592. Owned<IEspECLWorkunit> info= createECLWorkunit("","");
  1593. WsWuInfo winfo(context, wuid);
  1594. winfo.getCommon(*info, 0);
  1595. IArrayOf<IEspECLWorkunit> results;
  1596. results.append(*info.getClear());
  1597. resp.setWorkunits(results);
  1598. resp.setPageSize(1);
  1599. resp.setCount(1);
  1600. }
  1601. void doWUQueryByFile(IEspContext &context, const char *logicalFile, IEspWUQueryResponse &resp)
  1602. {
  1603. StringBuffer wuid;
  1604. getWuidFromLogicalFileName(context, logicalFile, wuid);
  1605. if (!wuid.length())
  1606. throw MakeStringException(ECLWATCH_CANNOT_GET_WORKUNIT,"Cannot find the workunit for file %s.", logicalFile);
  1607. Owned<IWorkUnitFactory> factory = getWorkUnitFactory(context.querySecManager(), context.queryUser());
  1608. Owned<IConstWorkUnit> cw= factory->openWorkUnit(wuid.str(), false);
  1609. if (!cw)
  1610. throw MakeStringException(ECLWATCH_CANNOT_OPEN_WORKUNIT,"Cannot find the workunit for file %s.", logicalFile);
  1611. if (getWsWorkunitAccess(context, *cw) < SecAccess_Read)
  1612. throw MakeStringException(ECLWATCH_ECL_WU_ACCESS_DENIED,"Cannot access the workunit for file %s.",logicalFile);
  1613. SCMStringBuffer parent;
  1614. if (!cw->getParentWuid(parent).length())
  1615. doWUQueryBySingleWuid(context, wuid.str(), resp);
  1616. resp.setFirst(false);
  1617. resp.setPageSize(1);
  1618. resp.setCount(1);
  1619. }
  1620. void doWUQueryByXPath(IEspContext &context, IEspWUQueryRequest & req, IEspWUQueryResponse & resp)
  1621. {
  1622. IArrayOf<IEspECLWorkunit> results;
  1623. WsWuSearch wlist(context,req.getOwner(),req.getState(),req.getCluster(),req.getStartDate(),req.getEndDate(),req.getECL(),req.getJobname(),req.getApplicationName(),req.getApplicationKey(),req.getApplicationData());
  1624. int count=(int)req.getPageSize();
  1625. if (!count)
  1626. count=100;
  1627. if (wlist.getSize() < 1)
  1628. {
  1629. resp.setNumWUs(0);
  1630. return;
  1631. }
  1632. if (wlist.getSize() < count)
  1633. count = (int) wlist.getSize() - 1;
  1634. WsWuSearch::iterator begin, end;
  1635. if(notEmpty(req.getAfter()))
  1636. {
  1637. begin=wlist.locate(req.getAfter());
  1638. end=min(begin+count,wlist.end());
  1639. }
  1640. else if (notEmpty(req.getBefore()))
  1641. {
  1642. end=wlist.locate(req.getBefore());
  1643. begin=max(end-count,wlist.begin());
  1644. }
  1645. else
  1646. {
  1647. begin=wlist.begin();
  1648. end=min(begin+count,wlist.end());
  1649. }
  1650. if(begin>wlist.begin() && begin<wlist.end())
  1651. resp.setCurrent(begin->c_str());
  1652. if (context.getClientVersion() > 1.02)
  1653. {
  1654. resp.setPageStartFrom(begin - wlist.begin() + 1);
  1655. resp.setNumWUs((int)wlist.getSize());
  1656. resp.setCount(end - begin);
  1657. }
  1658. if(end<wlist.end())
  1659. resp.setNext(end->c_str());
  1660. for(;begin!=end;begin++)
  1661. {
  1662. Owned<IEspECLWorkunit> info = createECLWorkunit("","");
  1663. WsWuInfo winfo(context, begin->c_str());
  1664. winfo.getCommon(*info, 0);
  1665. results.append(*info.getClear());
  1666. }
  1667. resp.setPageSize(abs(count));
  1668. resp.setWorkunits(results);
  1669. return;
  1670. }
  1671. bool addWUQueryFilter(WUSortField *filters, unsigned short &count, MemoryBuffer &buff, const char *name, WUSortField value)
  1672. {
  1673. if (isEmpty(name))
  1674. return false;
  1675. filters[count++] = value;
  1676. buff.append(name);
  1677. return true;
  1678. }
  1679. bool addWUQueryFilterTime(WUSortField *filters, unsigned short &count, MemoryBuffer &buff, const char *stime, WUSortField value)
  1680. {
  1681. if (isEmpty(stime))
  1682. return false;
  1683. CDateTime dt;
  1684. dt.setString(stime, NULL, true);
  1685. unsigned year, month, day, hour, minute, second, nano;
  1686. dt.getDate(year, month, day, true);
  1687. dt.getTime(hour, minute, second, nano, true);
  1688. VStringBuffer wuid("W%4d%02d%02d-%02d%02d%02d",year,month,day,hour,minute,second);
  1689. filters[count++] = value;
  1690. buff.append(wuid.str());
  1691. return true;
  1692. }
  1693. void doWUQueryWithSort(IEspContext &context, IEspWUQueryRequest & req, IEspWUQueryResponse & resp)
  1694. {
  1695. SecAccessFlags accessOwn;
  1696. SecAccessFlags accessOthers;
  1697. getUserWuAccessFlags(context, accessOwn, accessOthers, true);
  1698. double version = context.getClientVersion();
  1699. IArrayOf<IEspECLWorkunit> results;
  1700. int begin = 0;
  1701. unsigned int count = 100;
  1702. int pagesize = 100;
  1703. if (version > 1.01)
  1704. {
  1705. pagesize = (int)req.getPageSize();
  1706. if (!req.getCount_isNull())
  1707. pagesize = req.getCount();
  1708. if(pagesize < 1)
  1709. pagesize = 100;
  1710. begin = (int)req.getPageStartFrom();
  1711. }
  1712. else
  1713. {
  1714. count=(unsigned)req.getCount();
  1715. if(!count)
  1716. count=100;
  1717. if (notEmpty(req.getAfter()))
  1718. begin=atoi(req.getAfter());
  1719. else if (notEmpty(req.getBefore()))
  1720. begin=atoi(req.getBefore())-count;
  1721. if (begin < 0)
  1722. begin = 0;
  1723. pagesize = count;
  1724. }
  1725. WUSortField sortorder[2] = {(WUSortField) (WUSFwuid | WUSFreverse), WUSFterm};
  1726. if(notEmpty(req.getSortby()))
  1727. {
  1728. const char *sortby = req.getSortby();
  1729. if (strieq(sortby, "Owner"))
  1730. sortorder[0] = WUSFuser;
  1731. else if (strieq(sortby, "JobName"))
  1732. sortorder[0] = WUSFjob;
  1733. else if (strieq(sortby, "Cluster"))
  1734. sortorder[0] = WUSFcluster;
  1735. else if (strieq(sortby, "RoxieCluster"))
  1736. sortorder[0] = WUSFroxiecluster;
  1737. else if (strieq(sortby, "Protected"))
  1738. sortorder[0] = WUSFprotected;
  1739. else if (strieq(sortby, "State"))
  1740. sortorder[0] = WUSFstate;
  1741. else if (strieq(sortby, "ThorTime"))
  1742. sortorder[0] = (WUSortField) (WUSFtotalthortime+WUSFnumeric);
  1743. else
  1744. sortorder[0] = WUSFwuid;
  1745. sortorder[0] = (WUSortField) (sortorder[0] | WUSFnocase);
  1746. bool descending = req.getDescending();
  1747. if (descending)
  1748. sortorder[0] = (WUSortField) (sortorder[0] | WUSFreverse);
  1749. }
  1750. WUSortField filters[10];
  1751. unsigned short filterCount = 0;
  1752. MemoryBuffer filterbuf;
  1753. bool bDoubleCheckState = false;
  1754. if(req.getState())
  1755. {
  1756. addWUQueryFilter(filters, filterCount, filterbuf, strieq(req.getState(), "unknown") ? "" : req.getState(), WUSFstate);
  1757. if (strieq(req.getState(), "submitted"))
  1758. bDoubleCheckState = true;
  1759. }
  1760. addWUQueryFilter(filters, filterCount, filterbuf, req.getCluster(), WUSFcluster);
  1761. if(version > 1.07)
  1762. addWUQueryFilter(filters, filterCount, filterbuf, req.getRoxieCluster(), WUSFroxiecluster);
  1763. addWUQueryFilter(filters, filterCount, filterbuf, req.getLogicalFile(), WUSFfileread);
  1764. addWUQueryFilter(filters, filterCount, filterbuf, req.getOwner(), (WUSortField) (WUSFuser | WUSFnocase));
  1765. addWUQueryFilter(filters, filterCount, filterbuf, req.getJobname(), (WUSortField) (WUSFjob | WUSFnocase));
  1766. addWUQueryFilterTime(filters, filterCount, filterbuf, req.getStartDate(), WUSFwuid);
  1767. addWUQueryFilterTime(filters, filterCount, filterbuf, req.getEndDate(), WUSFwuidhigh);
  1768. filters[filterCount] = WUSFterm;
  1769. __int64 cacheHint = 0;
  1770. if (!req.getCacheHint_isNull())
  1771. cacheHint = req.getCacheHint();
  1772. Owned<IWorkUnitFactory> factory = getWorkUnitFactory(context.querySecManager(), context.queryUser());
  1773. unsigned numWUs;
  1774. Owned<IConstWorkUnitIterator> it = factory->getWorkUnitsSorted(sortorder, filters, filterbuf.bufferBase(), begin, pagesize+1, "", &cacheHint, &numWUs);
  1775. if (version >= 1.41)
  1776. resp.setCacheHint(cacheHint);
  1777. unsigned actualCount = 0;
  1778. ForEach(*it)
  1779. {
  1780. IConstWorkUnit& cw = it->query();
  1781. if (chooseWuAccessFlagsByOwnership(context.queryUserId(), cw, accessOwn, accessOthers) < SecAccess_Read)
  1782. {
  1783. numWUs--;
  1784. continue;
  1785. }
  1786. if (bDoubleCheckState && (cw.getState() != WUStateSubmitted))
  1787. {
  1788. numWUs--;
  1789. continue;
  1790. }
  1791. SCMStringBuffer parent;
  1792. if (!cw.getParentWuid(parent).length())
  1793. {
  1794. const char* wuid = cw.getWuid(parent).str();
  1795. if (!looksLikeAWuid(wuid))
  1796. {
  1797. numWUs--;
  1798. continue;
  1799. }
  1800. actualCount++;
  1801. Owned<IEspECLWorkunit> info = createECLWorkunit("","");
  1802. WsWuInfo winfo(context, wuid);
  1803. winfo.getCommon(*info, 0);
  1804. results.append(*info.getClear());
  1805. }
  1806. }
  1807. if (version > 1.02)
  1808. {
  1809. resp.setPageStartFrom(begin+1);
  1810. resp.setNumWUs(numWUs);
  1811. if (results.length() > (aindex_t)pagesize)
  1812. results.pop();
  1813. if(unsigned (begin + pagesize) < numWUs)
  1814. {
  1815. resp.setNextPage(begin + pagesize);
  1816. resp.setPageEndAt(begin + pagesize);
  1817. int last = begin + pagesize;
  1818. while (numWUs > (unsigned) last + pagesize)
  1819. last += pagesize;
  1820. resp.setLastPage(last);
  1821. }
  1822. else
  1823. {
  1824. resp.setNextPage(-1);
  1825. resp.setPageEndAt(numWUs);
  1826. }
  1827. if(begin > 0)
  1828. {
  1829. resp.setFirst(false);
  1830. if (begin - pagesize > 0)
  1831. resp.setPrevPage(begin - pagesize);
  1832. else
  1833. resp.setPrevPage(0);
  1834. }
  1835. resp.setPageSize(pagesize);
  1836. }
  1837. else
  1838. {
  1839. if(begin>0 && actualCount > 0)
  1840. {
  1841. char buf[10];
  1842. itoa(begin, buf, 10);
  1843. resp.setCurrent(buf);
  1844. }
  1845. if(count<actualCount)
  1846. {
  1847. char buf[10];
  1848. itoa(begin+count, buf, 10);
  1849. resp.setNext(buf);
  1850. resp.setNumWUs(numWUs);
  1851. if (results.length() > count)
  1852. results.pop();
  1853. }
  1854. if(begin == 0 && actualCount <= count)
  1855. resp.setFirst(false);
  1856. resp.setCount(count);
  1857. }
  1858. resp.setWorkunits(results);
  1859. return;
  1860. }
  1861. void doWUQueryFromArchive(IEspContext &context, ArchivedWuCache &archivedWuCache, int cacheTime, IEspWUQueryRequest & req, IEspWUQueryResponse & resp)
  1862. {
  1863. SecAccessFlags accessOwn;
  1864. SecAccessFlags accessOthers;
  1865. getUserWuAccessFlags(context, accessOwn, accessOthers, true);
  1866. __int64 pageSize = req.getPageSize();
  1867. if(pageSize < 1)
  1868. pageSize=100;
  1869. __int64 displayStart = req.getPageStartFrom();
  1870. __int64 displayEnd = displayStart + pageSize;
  1871. unsigned dateLimit = 0;
  1872. bool hasNextPage = true;
  1873. SocketEndpoint ep;
  1874. getSashaNode(ep);
  1875. Owned<INode> sashaserver = createINode(ep);
  1876. CDateTime wuTimeFrom, wuTimeTo;
  1877. if(notEmpty(req.getEndDate()))
  1878. wuTimeTo.setString(req.getEndDate(), NULL, true);
  1879. else
  1880. wuTimeTo.setNow();
  1881. if(notEmpty(req.getStartDate()))
  1882. {
  1883. wuTimeFrom.setString(req.getStartDate(), NULL, true);
  1884. dateLimit = 1;
  1885. }
  1886. IArrayOf<IEspECLWorkunit> results;
  1887. StringBuffer filter;
  1888. addToQueryString(filter, "cluster", req.getCluster(), ';');
  1889. addToQueryString(filter, "owner", req.getOwner(), ';');
  1890. addToQueryString(filter, "jobName", req.getJobname(), ';');
  1891. addToQueryString(filter, "state", req.getState(), ';');
  1892. StringBuffer s;
  1893. if (!req.getLastNDays_isNull() && req.getLastNDays()>0)
  1894. addToQueryString(filter, "LastNDays", s.clear().append(req.getLastNDays()).str(), ';');
  1895. else
  1896. {
  1897. addToQueryString(filter, "wuTimeFrom", req.getStartDate(), ';');
  1898. addToQueryString(filter, "wuTimeTo", req.getEndDate(), ';');
  1899. }
  1900. addToQueryString(filter, "displayStart", s.append(displayStart).str(), ';');
  1901. addToQueryString(filter, "pageSize", s.clear().append(pageSize).str(), ';');
  1902. Owned<ArchivedWuCacheElement> found = archivedWuCache.lookup(context, filter, "AddWhenAvailable", cacheTime);
  1903. if (found)
  1904. {
  1905. hasNextPage = found->m_hasNextPage;
  1906. if (found->m_results.length())
  1907. {
  1908. ForEachItemIn(ai, found->m_results)
  1909. {
  1910. Owned<IEspECLWorkunit> info= createECLWorkunit("","");
  1911. info->copy(found->m_results.item(ai));
  1912. results.append(*info.getClear());
  1913. }
  1914. }
  1915. }
  1916. else
  1917. {
  1918. IArrayOf<IEspECLWorkunit> resultList;
  1919. CDateTime timeTo = wuTimeTo;
  1920. __int64 totalWus = 0;
  1921. bool complete = false;
  1922. while (!complete)
  1923. {
  1924. CDateTime timeFrom = timeTo;
  1925. timeFrom.adjustTime(-1439); //one day earlier
  1926. if (dateLimit > 0 && wuTimeFrom > timeFrom)
  1927. timeFrom = wuTimeFrom;
  1928. unsigned year0, month0, day0, hour0, minute0, second0, nano0;
  1929. timeFrom.getDate(year0, month0, day0, true);
  1930. timeFrom.getTime(hour0, minute0, second0, nano0, true);
  1931. VStringBuffer wuFrom("%4d%02d%02d%02d%02d", year0, month0, day0, hour0, minute0);
  1932. unsigned year, month, day, hour, minute, second, nano;
  1933. timeTo.getDate(year, month, day, true);
  1934. timeTo.getTime(hour, minute, second, nano, true);
  1935. VStringBuffer wuTo("%4d%02d%02d%02d%02d", year, month, day, hour, minute);
  1936. __int64 begin = 0;
  1937. unsigned limit = 1000;
  1938. bool continueSashaLoop = true;
  1939. while (continueSashaLoop)
  1940. {
  1941. Owned<ISashaCommand> cmd = createSashaCommand();
  1942. cmd->setAction(SCA_LIST);
  1943. cmd->setOnline(false);
  1944. cmd->setArchived(true);
  1945. cmd->setAfter(wuFrom.str());
  1946. cmd->setBefore(wuTo.str());
  1947. cmd->setStart((unsigned)begin);
  1948. cmd->setLimit(limit);
  1949. if (notEmpty(req.getCluster()))
  1950. cmd->setCluster(req.getCluster());
  1951. if (notEmpty(req.getOwner()))
  1952. cmd->setOwner(req.getOwner());
  1953. if (notEmpty(req.getJobname()))
  1954. cmd->setJobName(req.getJobname());
  1955. if (notEmpty(req.getState()))
  1956. cmd->setState(req.getState());
  1957. cmd->setOutputFormat("owner,jobname,cluster,state");
  1958. if (!cmd->send(sashaserver))
  1959. {
  1960. StringBuffer msg("Cannot connect to archive server at ");
  1961. sashaserver->endpoint().getUrlStr(msg);
  1962. throw MakeStringException(ECLWATCH_CANNOT_CONNECT_ARCHIVE_SERVER, "%s", msg.str());
  1963. }
  1964. unsigned actualCount = cmd->numIds();
  1965. if (actualCount < 1)
  1966. break;
  1967. totalWus += actualCount;
  1968. if (actualCount < limit)
  1969. continueSashaLoop = false;
  1970. for (unsigned ii=0; ii<actualCount; ii++)
  1971. {
  1972. const char *csline = cmd->queryId(ii);
  1973. if (!csline)
  1974. continue;
  1975. StringArray wuidArray;
  1976. wuidArray.appendList(csline, ",");
  1977. if (chooseWuAccessFlagsByOwnership(context.queryUserId(), cmd->queryOwner(), accessOwn, accessOthers) < SecAccess_Read)
  1978. continue;
  1979. const char* wuid = wuidArray.item(0);
  1980. if (isEmpty(wuid))
  1981. continue;
  1982. __int64 addToPos = -1;
  1983. ForEachItemIn(ridx, resultList)
  1984. {
  1985. IEspECLWorkunit& w = resultList.item(ridx);
  1986. if (isEmpty(w.getWuid()))
  1987. continue;
  1988. if (strcmp(wuid, w.getWuid())>0)
  1989. {
  1990. addToPos = ridx;
  1991. break;
  1992. }
  1993. }
  1994. if (addToPos < 0 && (ridx > displayEnd))
  1995. continue;
  1996. Owned<IEspECLWorkunit> info= createECLWorkunit("","");
  1997. info->setWuid(wuid);
  1998. if (notEmpty(wuidArray.item(1)))
  1999. info->setOwner(wuidArray.item(1));
  2000. if (notEmpty(wuidArray.item(2)))
  2001. info->setJobname(wuidArray.item(2));
  2002. if (notEmpty(wuidArray.item(3)))
  2003. info->setCluster(wuidArray.item(3));
  2004. if (notEmpty(wuidArray.item(4)))
  2005. info->setState(wuidArray.item(4));
  2006. if (addToPos < 0)
  2007. resultList.append(*info.getClear());
  2008. else
  2009. resultList.add(*info.getClear(), (aindex_t) addToPos);
  2010. if (resultList.length() > displayEnd)
  2011. resultList.pop();
  2012. }
  2013. begin += limit;
  2014. }
  2015. timeTo.adjustTime(-1440);//one day earlier
  2016. if (dateLimit > 0 && wuTimeFrom > timeTo) //we reach the date limit
  2017. {
  2018. if (totalWus <= displayEnd)
  2019. hasNextPage = false;
  2020. complete = true;
  2021. }
  2022. else if ( resultList.length() >= displayEnd) //we have all we need
  2023. complete = true;
  2024. }
  2025. if (displayEnd > resultList.length())
  2026. displayEnd = resultList.length();
  2027. for (aindex_t i = (aindex_t)displayStart; i < (aindex_t)displayEnd; i++)
  2028. {
  2029. Owned<IEspECLWorkunit> info = createECLWorkunit("","");
  2030. info->copy(resultList.item(i));
  2031. results.append(*info.getClear());
  2032. }
  2033. archivedWuCache.add(filter, "AddWhenAvailable", hasNextPage, results);
  2034. }
  2035. resp.setPageStartFrom(displayStart+1);
  2036. resp.setPageEndAt(displayEnd);
  2037. if(dateLimit < 1 || hasNextPage)
  2038. resp.setNextPage(displayStart + pageSize);
  2039. else
  2040. resp.setNextPage(-1);
  2041. if(displayStart > 0)
  2042. {
  2043. resp.setFirst(false);
  2044. if (displayStart - pageSize > 0)
  2045. resp.setPrevPage(displayStart - pageSize);
  2046. else
  2047. resp.setPrevPage(0);
  2048. }
  2049. resp.setPageSize(pageSize);
  2050. resp.setWorkunits(results);
  2051. resp.setType("archived only");
  2052. return;
  2053. }
  2054. bool CWsWorkunitsEx::onWUQuery(IEspContext &context, IEspWUQueryRequest & req, IEspWUQueryResponse & resp)
  2055. {
  2056. try
  2057. {
  2058. StringBuffer wuidStr = req.getWuid();
  2059. const char* wuid = wuidStr.trim().str();
  2060. if (req.getType() && strieq(req.getType(), "archived workunits"))
  2061. doWUQueryFromArchive(context, *archivedWuCache, awusCacheMinutes, req, resp);
  2062. else if(notEmpty(wuid))
  2063. {
  2064. if (!looksLikeAWuid(wuid))
  2065. throw MakeStringException(ECLWATCH_INVALID_INPUT, "Invalid Workunit ID: %s", wuid);
  2066. doWUQueryBySingleWuid(context, wuid, resp);
  2067. }
  2068. else if (notEmpty(req.getECL()) || notEmpty(req.getApplicationName()) || notEmpty(req.getApplicationKey()) || notEmpty(req.getApplicationData()))
  2069. doWUQueryByXPath(context, req, resp);
  2070. else if (notEmpty(req.getLogicalFile()) && req.getLogicalFileSearchType() && strieq(req.getLogicalFileSearchType(), "Created"))
  2071. doWUQueryByFile(context, req.getLogicalFile(), resp);
  2072. else
  2073. doWUQueryWithSort(context, req, resp);
  2074. resp.setState(req.getState());
  2075. resp.setCluster(req.getCluster());
  2076. resp.setRoxieCluster(req.getRoxieCluster());
  2077. resp.setOwner(req.getOwner());
  2078. resp.setStartDate(req.getStartDate());
  2079. resp.setEndDate(req.getEndDate());
  2080. double version = context.getClientVersion();
  2081. StringBuffer basicQuery;
  2082. addToQueryString(basicQuery, "State", req.getState());
  2083. addToQueryString(basicQuery, "Cluster", req.getCluster());
  2084. if (version > 1.07)
  2085. addToQueryString(basicQuery, "RoxieCluster", req.getRoxieCluster());
  2086. addToQueryString(basicQuery, "Owner", req.getOwner());
  2087. addToQueryString(basicQuery, "StartDate", req.getStartDate());
  2088. addToQueryString(basicQuery, "EndDate", req.getEndDate());
  2089. if (version > 1.25 && req.getLastNDays() > -1)
  2090. addToQueryString(basicQuery, "LastNDays", StringBuffer().append(req.getLastNDays()).str());
  2091. addToQueryString(basicQuery, "ECL", req.getECL());
  2092. addToQueryString(basicQuery, "Jobname", req.getJobname());
  2093. addToQueryString(basicQuery, "Type", req.getType());
  2094. if (addToQueryString(basicQuery, "LogicalFile", req.getLogicalFile()))
  2095. addToQueryString(basicQuery, "LogicalFileSearchType", req.getLogicalFileSearchType());
  2096. resp.setFilters(basicQuery.str());
  2097. if (notEmpty(req.getSortby()) && !strstr(basicQuery.str(), StringBuffer(req.getSortby()).append('=').str()))
  2098. {
  2099. resp.setSortby(req.getSortby());
  2100. addToQueryString(basicQuery, "Sortby", req.getSortby());
  2101. if (req.getDescending())
  2102. {
  2103. resp.setDescending(req.getDescending());
  2104. addToQueryString(basicQuery, "Descending", "1");
  2105. }
  2106. }
  2107. resp.setBasicQuery(basicQuery.str());
  2108. StringBuffer s;
  2109. if(notEmpty(req.getECL()))
  2110. resp.setECL(Utils::url_encode(req.getECL(), s).str());
  2111. if(notEmpty(req.getJobname()))
  2112. resp.setJobname(Utils::url_encode(req.getJobname(), s.clear()).str());
  2113. }
  2114. catch(IException* e)
  2115. {
  2116. FORWARDEXCEPTION(context, e, ECLWATCH_INTERNAL_ERROR);
  2117. }
  2118. return true;
  2119. }
  2120. void appendResultSet(MemoryBuffer& mb, INewResultSet* result, const char *name, __int64 start, unsigned& count, __int64& total, bool bin, bool xsd)
  2121. {
  2122. if (!result)
  2123. return;
  2124. const IResultSetMetaData &meta = result->getMetaData();
  2125. Owned<IResultSetCursor> cursor(result->createCursor());
  2126. total=result->getNumRows();
  2127. if(bin)
  2128. count = getResultBin(mb, result, (unsigned)start, count);
  2129. else
  2130. {
  2131. struct MemoryBuffer2IStringVal : public CInterface, implements IStringVal
  2132. {
  2133. MemoryBuffer2IStringVal(MemoryBuffer & _buffer) : buffer(_buffer) {}
  2134. IMPLEMENT_IINTERFACE;
  2135. virtual const char * str() const { UNIMPLEMENTED; }
  2136. virtual void set(const char *val) { buffer.append(strlen(val),val); }
  2137. virtual void clear() { } // support appending only
  2138. virtual void setLen(const char *val, unsigned length) { buffer.append(length, val); }
  2139. virtual unsigned length() const { return buffer.length(); };
  2140. MemoryBuffer & buffer;
  2141. } adaptor(mb);
  2142. count = getResultXml(adaptor, result, name, (unsigned) start, count, (xsd) ? "myschema" : NULL);
  2143. }
  2144. }
  2145. void getWsWuResult(IEspContext &context, const char* wuid, const char *name, const char *logical, unsigned index, __int64 start, unsigned& count, __int64& total, IStringVal& resname, bool bin, MemoryBuffer& mb, bool xsd=true)
  2146. {
  2147. Owned<IWorkUnitFactory> factory = getWorkUnitFactory(context.querySecManager(), context.queryUser());
  2148. Owned<IConstWorkUnit> cw = factory->openWorkUnit(wuid, false);
  2149. if(!cw)
  2150. throw MakeStringException(ECLWATCH_CANNOT_OPEN_WORKUNIT,"Cannot open workunit %s.",wuid);
  2151. Owned<IConstWUResult> result;
  2152. if (notEmpty(name))
  2153. result.setown(cw->getResultByName(name));
  2154. else if (notEmpty(logical))
  2155. {
  2156. Owned<IConstWUResultIterator> it = &cw->getResults();
  2157. ForEach(*it)
  2158. {
  2159. IConstWUResult &r = it->query();
  2160. SCMStringBuffer filename;
  2161. if(strieq(r.getResultLogicalName(filename).str(), logical))
  2162. {
  2163. result.setown(LINK(&r));
  2164. break;
  2165. }
  2166. }
  2167. }
  2168. else
  2169. result.setown(cw->getResultBySequence(index));
  2170. if (!result)
  2171. throw MakeStringException(ECLWATCH_CANNOT_GET_WU_RESULT,"Cannot open the workunit result.");
  2172. if (!resname.length())
  2173. result->getResultName(resname);
  2174. Owned<IResultSetFactory> resultSetFactory = getSecResultSetFactory(context.querySecManager(), context.queryUser(), context.queryUserId(), context.queryPassword());
  2175. SCMStringBuffer logicalName;
  2176. result->getResultLogicalName(logicalName);
  2177. Owned<INewResultSet> rs;
  2178. if (logicalName.length())
  2179. {
  2180. SCMStringBuffer cluster; //MORE is this wrong cluster?
  2181. rs.setown(resultSetFactory->createNewFileResultSet(logicalName.str(), cw->getClusterName(cluster).str()));
  2182. }
  2183. else
  2184. rs.setown(resultSetFactory->createNewResultSet(result, wuid));
  2185. appendResultSet(mb, rs, name, start, count, total, bin, xsd);
  2186. }
  2187. void openSaveFile(IEspContext &context, int opt, const char* filename, const char* origMimeType, MemoryBuffer& buf, IEspWULogFileResponse &resp)
  2188. {
  2189. if (opt < 1)
  2190. {
  2191. resp.setThefile(buf);
  2192. resp.setThefile_mimetype(origMimeType);
  2193. }
  2194. else if (opt < 2)
  2195. {
  2196. StringBuffer headerStr("attachment;");
  2197. if (filename && *filename)
  2198. {
  2199. const char* pFileName = strrchr(filename, PATHSEPCHAR);
  2200. if (pFileName)
  2201. headerStr.appendf("filename=%s", pFileName+1);
  2202. else
  2203. headerStr.appendf("filename=%s", filename);
  2204. }
  2205. MemoryBuffer buf0;
  2206. unsigned i = 0;
  2207. char* p = (char*) buf.toByteArray();
  2208. while (i < buf.length())
  2209. {
  2210. if (p[0] != 10)
  2211. buf0.append(p[0]);
  2212. else
  2213. buf0.append(0x0d);
  2214. p++;
  2215. i++;
  2216. }
  2217. resp.setThefile(buf);
  2218. resp.setThefile_mimetype(origMimeType);
  2219. context.addCustomerHeader("Content-disposition", headerStr.str());
  2220. }
  2221. else
  2222. {
  2223. #ifndef _USE_ZLIB
  2224. throw MakeStringException(ECLWATCH_CANNOT_COMPRESS_DATA,"The data cannot be compressed.");
  2225. #else
  2226. StringBuffer fileNameStr, headerStr("attachment;");
  2227. if (notEmpty(filename))
  2228. {
  2229. fileNameStr.append(filename);
  2230. headerStr.append("filename=").append(filename).append((opt>2) ? ".gz" : ".zip");
  2231. }
  2232. else
  2233. fileNameStr.append("file");
  2234. StringBuffer ifname;
  2235. ifname.appendf("%s%sT%xAT%x", TEMPZIPDIR, PATHSEPSTR, (unsigned)(memsize_t)GetCurrentThreadId(), msTick()).append((opt>2)? "" : ".zip");
  2236. IZZIPor* Zipor = createZZIPor();
  2237. int ret = 0;
  2238. if (opt > 2)
  2239. ret = Zipor->gzipToFile(buf.length(), (void*)buf.toByteArray(), ifname.str());
  2240. else
  2241. ret = Zipor->zipToFile(buf.length(), (void*)buf.toByteArray(), fileNameStr.str(), ifname.str());
  2242. releaseIZ(Zipor);
  2243. if (ret < 0)
  2244. {
  2245. Owned<IFile> rFile = createIFile(ifname.str());
  2246. if (rFile->exists())
  2247. rFile->remove();
  2248. throw MakeStringException(ECLWATCH_CANNOT_COMPRESS_DATA,"The data cannot be compressed.");
  2249. }
  2250. Owned <IFile> rf = createIFile(ifname.str());
  2251. if (!rf->exists())
  2252. throw MakeStringException(ECLWATCH_CANNOT_COMPRESS_DATA,"The data cannot be compressed.");
  2253. MemoryBuffer out;
  2254. Owned <IFileIO> fio = rf->open(IFOread);
  2255. read(fio, 0, (size32_t) rf->size(), out);
  2256. resp.setThefile(out);
  2257. fio.clear();
  2258. rf->remove();
  2259. resp.setThefile_mimetype((opt > 2) ? "application/x-gzip" : "application/zip");
  2260. context.addCustomerHeader("Content-disposition", headerStr.str());
  2261. #endif
  2262. }
  2263. }
  2264. bool CWsWorkunitsEx::onWUFile(IEspContext &context,IEspWULogFileRequest &req, IEspWULogFileResponse &resp)
  2265. {
  2266. try
  2267. {
  2268. StringBuffer wuidStr = req.getWuid();
  2269. const char* wuidIn = wuidStr.trim().str();
  2270. if (wuidIn && *wuidIn)
  2271. {
  2272. if (!looksLikeAWuid(wuidIn))
  2273. throw MakeStringException(ECLWATCH_INVALID_INPUT, "Invalid Workunit ID");
  2274. ensureWsWorkunitAccess(context, wuidIn, SecAccess_Read);
  2275. }
  2276. StringAttr wuid(wuidIn);
  2277. if (wuid.isEmpty() && notEmpty(req.getQuerySet()) && notEmpty(req.getQuery()))
  2278. {
  2279. Owned<IPropertyTree> registry = getQueryRegistry(req.getQuerySet(), false);
  2280. if (!registry)
  2281. throw MakeStringException(ECLWATCH_QUERYSET_NOT_FOUND, "Queryset %s not found", req.getQuerySet());
  2282. Owned<IPropertyTree> query = resolveQueryAlias(registry, req.getQuery());
  2283. if (!query)
  2284. throw MakeStringException(ECLWATCH_QUERYID_NOT_FOUND, "Query %s not found", req.getQuery());
  2285. resp.setQuerySet(req.getQuerySet());
  2286. resp.setQueryName(query->queryProp("@name"));
  2287. resp.setQueryId(query->queryProp("@id"));
  2288. wuid.set(query->queryProp("@wuid"));
  2289. }
  2290. int opt = req.getOption();
  2291. if (!wuid.isEmpty())
  2292. {
  2293. resp.setWuid(wuid.get());
  2294. MemoryBuffer mb;
  2295. WsWuInfo winfo(context, wuid);
  2296. if (strieq(File_ArchiveQuery, req.getType()))
  2297. {
  2298. winfo.getWorkunitArchiveQuery(mb);
  2299. openSaveFile(context, opt, "ArchiveQuery.xml", HTTP_TYPE_APPLICATION_XML, mb, resp);
  2300. }
  2301. else if (strieq(File_Cpp,req.getType()) && notEmpty(req.getName()))
  2302. {
  2303. winfo.getWorkunitCpp(req.getName(), req.getDescription(), req.getIPAddress(),mb, opt > 0);
  2304. openSaveFile(context, opt, req.getName(), HTTP_TYPE_TEXT_PLAIN, mb, resp);
  2305. }
  2306. else if (strieq(File_DLL,req.getType()))
  2307. {
  2308. StringBuffer name;
  2309. winfo.getWorkunitDll(name, mb);
  2310. resp.setFileName(name.str());
  2311. resp.setDaliServer(daliServers.get());
  2312. openSaveFile(context, opt, req.getName(), HTTP_TYPE_OCTET_STREAM, mb, resp);
  2313. }
  2314. else if (strieq(File_Res,req.getType()))
  2315. {
  2316. winfo.getWorkunitResTxt(mb);
  2317. openSaveFile(context, opt, "res.txt", HTTP_TYPE_TEXT_PLAIN, mb, resp);
  2318. }
  2319. else if (strncmp(req.getType(), File_ThorLog, 7) == 0)
  2320. {
  2321. winfo.getWorkunitThorLog(req.getName(), mb);
  2322. openSaveFile(context, opt, "thormaster.log", HTTP_TYPE_TEXT_PLAIN, mb, resp);
  2323. }
  2324. else if (strieq(File_ThorSlaveLog,req.getType()))
  2325. {
  2326. StringBuffer logDir;
  2327. getConfigurationDirectory(directories, "log", "thor", req.getProcess(), logDir);
  2328. winfo.getWorkunitThorSlaveLog(req.getClusterGroup(), req.getIPAddress(), req.getLogDate(), logDir.str(), req.getSlaveNumber(), mb, false);
  2329. openSaveFile(context, opt, "ThorSlave.log", HTTP_TYPE_TEXT_PLAIN, mb, resp);
  2330. }
  2331. else if (strieq(File_EclAgentLog,req.getType()))
  2332. {
  2333. winfo.getWorkunitEclAgentLog(req.getName(), req.getProcess(), mb);
  2334. openSaveFile(context, opt, "eclagent.log", HTTP_TYPE_TEXT_PLAIN, mb, resp);
  2335. }
  2336. else if (strieq(File_XML,req.getType()) && notEmpty(req.getName()))
  2337. {
  2338. const char* name = req.getName();
  2339. const char* ptr = strrchr(name, '/');
  2340. if (ptr)
  2341. ptr++;
  2342. else
  2343. ptr = name;
  2344. winfo.getWorkunitAssociatedXml(name, req.getIPAddress(), req.getPlainText(), req.getDescription(), opt > 0, mb);
  2345. openSaveFile(context, opt, ptr, HTTP_TYPE_APPLICATION_XML, mb, resp);
  2346. }
  2347. else if (strieq(File_XML,req.getType()))
  2348. {
  2349. winfo.getWorkunitXml(req.getPlainText(), mb);
  2350. resp.setThefile(mb);
  2351. const char* plainText = req.getPlainText();
  2352. if (plainText && (!stricmp(plainText, "yes")))
  2353. resp.setThefile_mimetype(HTTP_TYPE_TEXT_PLAIN);
  2354. else
  2355. resp.setThefile_mimetype(HTTP_TYPE_APPLICATION_XML);
  2356. }
  2357. }
  2358. }
  2359. catch(IException* e)
  2360. {
  2361. FORWARDEXCEPTION(context, e, ECLWATCH_INTERNAL_ERROR);
  2362. }
  2363. return true;
  2364. }
  2365. bool CWsWorkunitsEx::onWUResultBin(IEspContext &context,IEspWUResultBinRequest &req, IEspWUResultBinResponse &resp)
  2366. {
  2367. try
  2368. {
  2369. StringBuffer wuidStr = req.getWuid();
  2370. const char* wuidIn = wuidStr.trim().str();
  2371. if (wuidIn && *wuidIn)
  2372. {
  2373. if (!looksLikeAWuid(wuidIn))
  2374. throw MakeStringException(ECLWATCH_INVALID_INPUT, "Invalid Workunit ID: %s", wuidIn);
  2375. ensureWsWorkunitAccess(context, wuidIn, SecAccess_Read);
  2376. }
  2377. MemoryBuffer mb;
  2378. __int64 total=0;
  2379. __int64 start = req.getStart() > 0 ? req.getStart() : 0;
  2380. unsigned count = req.getCount(), requested=count;
  2381. SCMStringBuffer name;
  2382. bool bin = (req.getFormat() && strieq(req.getFormat(),"raw"));
  2383. if (notEmpty(wuidIn) && notEmpty(req.getResultName()))
  2384. getWsWuResult(context, wuidIn, req.getResultName(), NULL, 0, start, count, total, name, bin, mb);
  2385. else if (notEmpty(wuidIn) && (req.getSequence() >= 0))
  2386. getWsWuResult(context, wuidIn, NULL, NULL, req.getSequence(), start, count, total, name, bin,mb);
  2387. else if (notEmpty(req.getLogicalName()))
  2388. {
  2389. const char* logicalName = req.getLogicalName();
  2390. StringBuffer wuid;
  2391. getWuidFromLogicalFileName(context, logicalName, wuid);
  2392. if (!wuid.length())
  2393. throw MakeStringException(ECLWATCH_CANNOT_GET_WORKUNIT,"Cannot find the workunit for file %s.",logicalName);
  2394. getWsWuResult(context, wuid.str(), NULL, logicalName, 0, start, count, total, name, bin, mb);
  2395. }
  2396. else
  2397. throw MakeStringException(ECLWATCH_CANNOT_GET_WU_RESULT,"Cannot open the workunit result.");
  2398. if(stricmp(req.getFormat(),"xls")==0)
  2399. {
  2400. Owned<IProperties> params(createProperties());
  2401. params->setProp("showCount",0);
  2402. StringBuffer xml;
  2403. xml.append("<WUResultExcel><Result>").append(mb.length(), mb.toByteArray()).append("</Result></WUResultExcel>");
  2404. if (xml.length() > MAXXLSTRANSFER)
  2405. throw MakeStringException(ECLWATCH_TOO_BIG_DATA_SET, "The data set is too big to be converted to an Excel file. Please use the gzip link to download a compressed XML data file.");
  2406. StringBuffer xls;
  2407. xsltTransform(xml.str(), StringBuffer(getCFD()).append("./smc_xslt/result.xslt").str(), params, xls);
  2408. MemoryBuffer out;
  2409. out.setBuffer(xls.length(), (void*)xls.str());
  2410. resp.setResult(out);
  2411. resp.setResult_mimetype("application/vnd.ms-excel");
  2412. }
  2413. #ifdef _USE_ZLIB
  2414. else if(strieq(req.getFormat(),"zip") || strieq(req.getFormat(),"gzip"))
  2415. {
  2416. bool gzip = strieq(req.getFormat(),"gzip");
  2417. StringBuffer xml("<?xml version=\"1.0\" encoding=\"UTF-8\"?>");
  2418. xml.append("<Result>").append(mb.length(),mb.toByteArray()).append("</Result>");
  2419. VStringBuffer ifname("%s%sT%xAT%x%s", TEMPZIPDIR, PATHSEPSTR, (unsigned)(memsize_t)GetCurrentThreadId(), msTick(), gzip ? "" : ".zip");
  2420. IZZIPor* Zipor = createZZIPor();
  2421. int ret = 0;
  2422. if (gzip)
  2423. ret = Zipor->gzipToFile(xml.length(), (void*)xml.str(), ifname.str());
  2424. else
  2425. ret = Zipor->zipToFile(xml.length(), (void*)xml.str(), "WUResult.xml", ifname.str());
  2426. releaseIZ(Zipor);
  2427. if (ret < 0)
  2428. {
  2429. Owned<IFile> rFile = createIFile(ifname.str());
  2430. if (rFile->exists())
  2431. rFile->remove();
  2432. throw MakeStringException(ECLWATCH_CANNOT_COMPRESS_DATA, "The data cannot be compressed.");
  2433. }
  2434. MemoryBuffer out;
  2435. Owned <IFile> rf = createIFile(ifname.str());
  2436. if (rf->exists())
  2437. {
  2438. Owned <IFileIO> fio = rf->open(IFOread);
  2439. read(fio, 0, (size32_t) rf->size(), out);
  2440. resp.setResult(out);
  2441. }
  2442. if (gzip)
  2443. {
  2444. resp.setResult_mimetype("application/x-gzip");
  2445. context.addCustomerHeader("Content-disposition", "attachment;filename=WUResult.xml.gz");
  2446. }
  2447. else
  2448. {
  2449. resp.setResult_mimetype("application/zip");
  2450. context.addCustomerHeader("Content-disposition", "attachment;filename=WUResult.xml.zip");
  2451. }
  2452. Owned<IFile> rFile = createIFile(ifname.str());
  2453. if (rFile->exists())
  2454. rFile->remove();
  2455. }
  2456. #endif
  2457. else
  2458. {
  2459. resp.setResult(mb);
  2460. }
  2461. resp.setName(name.str());
  2462. resp.setWuid(wuidIn);
  2463. resp.setSequence(req.getSequence());
  2464. resp.setStart(start);
  2465. if (requested > total)
  2466. requested = (unsigned)total;
  2467. resp.setRequested(requested);
  2468. resp.setCount(count);
  2469. resp.setTotal(total);
  2470. resp.setFormat(req.getFormat());
  2471. }
  2472. catch(IException* e)
  2473. {
  2474. FORWARDEXCEPTION(context, e, ECLWATCH_INTERNAL_ERROR);
  2475. }
  2476. return true;
  2477. }
  2478. bool CWsWorkunitsEx::onWUResultSummary(IEspContext &context, IEspWUResultSummaryRequest &req, IEspWUResultSummaryResponse &resp)
  2479. {
  2480. try
  2481. {
  2482. StringBuffer wuid = req.getWuid();
  2483. checkAndTrimWorkunit("WUResultSummary", wuid);
  2484. Owned<IWorkUnitFactory> factory = getWorkUnitFactory(context.querySecManager(), context.queryUser());
  2485. Owned<IConstWorkUnit> cw = factory->openWorkUnit(wuid.str(), false);
  2486. if(!cw)
  2487. throw MakeStringException(ECLWATCH_CANNOT_OPEN_WORKUNIT,"Cannot open workunit %s.",wuid.str());
  2488. ensureWsWorkunitAccess(context, *cw, SecAccess_Read);
  2489. resp.setWuid(wuid.str());
  2490. resp.setSequence(req.getSequence());
  2491. IArrayOf<IEspECLResult> results;
  2492. Owned<IConstWUResult> r = cw->getResultBySequence(req.getSequence());
  2493. if (r)
  2494. {
  2495. WsWuInfo winfo(context, cw);
  2496. winfo.getResult(*r, results, 0);
  2497. resp.setFormat(r->getResultFormat());
  2498. resp.setResult(results.item(0));
  2499. }
  2500. }
  2501. catch(IException* e)
  2502. {
  2503. FORWARDEXCEPTION(context, e, ECLWATCH_INTERNAL_ERROR);
  2504. }
  2505. return true;
  2506. }
  2507. void getFileResults(IEspContext &context, const char* logicalName, const char* cluster,__int64 start, unsigned& count,__int64& total,IStringVal& resname,bool bin, MemoryBuffer& buf, bool xsd)
  2508. {
  2509. Owned<IResultSetFactory> resultSetFactory = getSecResultSetFactory(context.querySecManager(), context.queryUser(), context.queryUserId(), context.queryPassword());
  2510. Owned<INewResultSet> result(resultSetFactory->createNewFileResultSet(logicalName, cluster));
  2511. appendResultSet(buf, result, resname.str(), start, count, total, bin, xsd);
  2512. }
  2513. void getWorkunitCluster(IEspContext &context, const char* wuid, SCMStringBuffer& cluster, bool checkArchiveWUs)
  2514. {
  2515. if (isEmpty(wuid))
  2516. return;
  2517. Owned<IWorkUnitFactory> factory = getWorkUnitFactory(context.querySecManager(), context.queryUser());
  2518. Owned<IConstWorkUnit> cw = factory->openWorkUnit(wuid, false);
  2519. if (cw)
  2520. cw->getClusterName(cluster);
  2521. else if (checkArchiveWUs)
  2522. {
  2523. Owned<IPropertyTree> wuProps;// = getArchivedWorkUnitProperties(wuid);
  2524. if (wuProps)
  2525. cluster.set(wuProps->queryProp("@clusterName"));
  2526. }
  2527. }
  2528. bool CWsWorkunitsEx::onWUResult(IEspContext &context, IEspWUResultRequest &req, IEspWUResultResponse &resp)
  2529. {
  2530. try
  2531. {
  2532. StringBuffer wuidStr = req.getWuid();
  2533. const char* wuid = wuidStr.trim().str();
  2534. if (wuid && *wuid)
  2535. {
  2536. if (!looksLikeAWuid(wuid))
  2537. throw MakeStringException(ECLWATCH_INVALID_INPUT, "Invalid Workunit ID: %s", wuid);
  2538. ensureWsWorkunitAccess(context, wuid, SecAccess_Read);
  2539. }
  2540. MemoryBuffer mb;
  2541. SCMStringBuffer name;
  2542. __int64 total=0;
  2543. __int64 start = req.getStart() > 0 ? req.getStart() : 0;
  2544. unsigned count=req.getCount() ? req.getCount() : 100, requested=count;
  2545. unsigned seq = req.getSequence();
  2546. bool inclXsd = !req.getSuppressXmlSchema();
  2547. VStringBuffer filter("start=%"I64F"d;count=%d", start, count);
  2548. addToQueryString(filter, "clusterName", req.getCluster(), ';');
  2549. addToQueryString(filter, "logicalName", req.getLogicalName(), ';');
  2550. if (wuid && *wuid)
  2551. addToQueryString(filter, "wuid", wuid, ';');
  2552. addToQueryString(filter, "resultName", req.getResultName(), ';');
  2553. filter.appendf(";seq=%d;", seq);
  2554. if (inclXsd)
  2555. filter.append("xsd;");
  2556. const char* logicalName = req.getLogicalName();
  2557. const char* clusterName = req.getCluster();
  2558. const char* resultName = req.getResultName();
  2559. Owned<DataCacheElement> data = dataCache->lookup(context, filter, awusCacheMinutes);
  2560. if (data)
  2561. {
  2562. mb.append(data->m_data.c_str());
  2563. name.set(data->m_name.c_str());
  2564. logicalName = data->m_logicalName.c_str();
  2565. wuid = data->m_wuid.c_str();
  2566. resultName = data->m_resultName.c_str();
  2567. seq = data->m_seq;
  2568. start = data->m_start;
  2569. count = data->m_rowcount;
  2570. requested = (unsigned)data->m_requested;
  2571. total = data->m_total;
  2572. if (notEmpty(logicalName))
  2573. resp.setLogicalName(logicalName);
  2574. else
  2575. {
  2576. if (notEmpty(wuid))
  2577. resp.setWuid(wuid);
  2578. resp.setSequence(seq);
  2579. }
  2580. }
  2581. else
  2582. {
  2583. if(logicalName && *logicalName)
  2584. {
  2585. StringBuffer lwuid;
  2586. getWuidFromLogicalFileName(context, logicalName, lwuid);
  2587. SCMStringBuffer cluster;
  2588. if (lwuid.length())
  2589. getWorkunitCluster(context, lwuid.str(), cluster, true);
  2590. if (cluster.length())
  2591. {
  2592. getFileResults(context, logicalName, cluster.str(), start, count, total, name, false, mb, inclXsd);
  2593. resp.setLogicalName(logicalName);
  2594. }
  2595. else if (notEmpty(clusterName))
  2596. {
  2597. getFileResults(context, logicalName, clusterName, start, count, total, name, false, mb, inclXsd);
  2598. resp.setLogicalName(logicalName);
  2599. }
  2600. else
  2601. throw MakeStringException(ECLWATCH_INVALID_INPUT,"Need valid target cluster to browse file %s.",logicalName);
  2602. }
  2603. else if (notEmpty(wuid) && notEmpty(resultName))
  2604. {
  2605. name.set(resultName);
  2606. getWsWuResult(context, wuid, resultName, NULL, 0, start, count, total, name, false, mb, inclXsd);
  2607. resp.setWuid(wuid);
  2608. resp.setSequence(seq);
  2609. }
  2610. else
  2611. {
  2612. getWsWuResult(context, wuid, NULL, NULL, seq, start, count, total, name, false, mb, inclXsd);
  2613. resp.setWuid(wuid);
  2614. resp.setSequence(seq);
  2615. }
  2616. mb.append(0);
  2617. if (requested > total)
  2618. requested = (unsigned)total;
  2619. dataCache->add(filter, mb.toByteArray(), name.str(), logicalName, wuid, resultName, seq, start, count, requested, total);
  2620. }
  2621. resp.setName(name.str());
  2622. resp.setStart(start);
  2623. if (clusterName && *clusterName)
  2624. resp.setCluster(clusterName);
  2625. resp.setRequested(requested);
  2626. resp.setCount(count);
  2627. resp.setTotal(total);
  2628. resp.setResult(mb.toByteArray());
  2629. context.queryXslParameters()->setProp("escapeResults","1");
  2630. }
  2631. catch(IException* e)
  2632. {
  2633. FORWARDEXCEPTION(context, e, ECLWATCH_INTERNAL_ERROR);
  2634. }
  2635. return true;
  2636. }
  2637. void getScheduledWUs(IEspContext &context, const char *serverName, const char *eventName, IArrayOf<IEspScheduledWU> & results)
  2638. {
  2639. if (notEmpty(serverName))
  2640. {
  2641. Owned<IWorkUnitFactory> factory = getWorkUnitFactory(context.querySecManager(), context.queryUser());
  2642. Owned<IScheduleReader> reader = getScheduleReader(serverName, eventName);
  2643. Owned<IScheduleReaderIterator> it(reader->getIterator());
  2644. while(it->isValidEventName())
  2645. {
  2646. StringBuffer ieventName;
  2647. it->getEventName(ieventName);
  2648. while(it->isValidEventText())
  2649. {
  2650. StringBuffer ieventText;
  2651. it->getEventText(ieventText);
  2652. while(it->isValidWuid())
  2653. {
  2654. StringBuffer wuid;
  2655. it->getWuid(wuid);
  2656. if (wuid.length())
  2657. {
  2658. Owned<IEspScheduledWU> scheduledWU = createScheduledWU("");
  2659. scheduledWU->setWuid(wuid.str());
  2660. scheduledWU->setCluster(serverName);
  2661. if (ieventName.length())
  2662. scheduledWU->setEventName(ieventName.str());
  2663. if (ieventText.str())
  2664. scheduledWU->setEventText(ieventText.str());
  2665. try
  2666. {
  2667. SCMStringBuffer s;
  2668. Owned<IConstWorkUnit> cw = factory->openWorkUnit(wuid.str(), false);
  2669. if (cw)
  2670. scheduledWU->setJobName(cw->getJobName(s).str());
  2671. }
  2672. catch (IException *e)
  2673. {
  2674. e->Release();
  2675. }
  2676. results.append(*scheduledWU.getLink());
  2677. }
  2678. it->nextWuid();
  2679. }
  2680. it->nextEventText();
  2681. }
  2682. it->nextEventName();
  2683. }
  2684. }
  2685. return;
  2686. }
  2687. bool CWsWorkunitsEx::onWUShowScheduled(IEspContext &context, IEspWUShowScheduledRequest & req, IEspWUShowScheduledResponse & resp)
  2688. {
  2689. try
  2690. {
  2691. DBGLOG("WUShowScheduled");
  2692. const char *clusterName = req.getCluster();
  2693. const char *eventName = req.getEventName();
  2694. IArrayOf<IEspScheduledWU> results;
  2695. if(notEmpty(req.getPushEventName()))
  2696. resp.setPushEventName(req.getPushEventName());
  2697. if(notEmpty(req.getPushEventText()))
  2698. resp.setPushEventText(req.getPushEventText());
  2699. Owned<IEnvironmentFactory> factory = getEnvironmentFactory();
  2700. Owned<IConstEnvironment> environment = factory->openEnvironmentByFile();
  2701. Owned<IPropertyTree> root = &environment->getPTree();
  2702. if (!root)
  2703. throw MakeStringException(ECLWATCH_CANNOT_GET_ENV_INFO, "Failed to get environment information.");
  2704. unsigned i = 0;
  2705. Owned<IPropertyTreeIterator> ic = root->getElements("Software/Topology/Cluster");
  2706. IArrayOf<IEspServerInfo> servers;
  2707. ForEach(*ic)
  2708. {
  2709. IPropertyTree &cluster = ic->query();
  2710. const char *iclusterName = cluster.queryProp("@name");
  2711. if (isEmpty(iclusterName))
  2712. continue;
  2713. if(isEmpty(clusterName))
  2714. getScheduledWUs(context, iclusterName, eventName, results);
  2715. else if (strieq(clusterName, iclusterName))
  2716. {
  2717. getScheduledWUs(context, clusterName, eventName, results);
  2718. resp.setClusterSelected(i+1);
  2719. }
  2720. Owned<IEspServerInfo> server = createServerInfo("");
  2721. server->setName(iclusterName);
  2722. servers.append(*server.getLink());
  2723. i++;
  2724. }
  2725. if (servers.length())
  2726. resp.setClusters(servers);
  2727. if (results.length())
  2728. resp.setWorkunits(results);
  2729. bool first=false;
  2730. StringBuffer Query("PageFrom=Scheduler");
  2731. appendUrlParameter(Query, "EventName", eventName, first);
  2732. appendUrlParameter(Query, "ECluster", clusterName, first);
  2733. resp.setQuery(Query.str());
  2734. }
  2735. catch(IException* e)
  2736. {
  2737. FORWARDEXCEPTION(context, e, ECLWATCH_INTERNAL_ERROR);
  2738. }
  2739. return true;
  2740. }
  2741. bool CWsWorkunitsEx::onWUExport(IEspContext &context, IEspWUExportRequest &req, IEspWUExportResponse &resp)
  2742. {
  2743. try
  2744. {
  2745. Owned<IWorkUnitFactory> factory = getWorkUnitFactory(context.querySecManager(), context.queryUser());
  2746. WsWuSearch ws(context, req.getOwner(), req.getState(), req.getCluster(), req.getStartDate(), req.getEndDate(), req.getECL(), req.getJobname());
  2747. StringBuffer xml("<?xml version=\"1.0\" encoding=\"UTF-8\"?><Workunits>");
  2748. for(WsWuSearch::iterator it=ws.begin(); it!=ws.end(); it++)
  2749. {
  2750. Owned<IConstWorkUnit> cw = factory->openWorkUnit(it->c_str(), false);
  2751. if (cw)
  2752. exportWorkUnitToXML(cw, xml, true);
  2753. }
  2754. xml.append("</Workunits>");
  2755. MemoryBuffer mb;
  2756. mb.setBuffer(xml.length(),(void*)xml.str());
  2757. resp.setExportData(mb);
  2758. resp.setExportData_mimetype(HTTP_TYPE_APPLICATION_XML);
  2759. }
  2760. catch(IException* e)
  2761. {
  2762. FORWARDEXCEPTION(context, e, ECLWATCH_INTERNAL_ERROR);
  2763. }
  2764. return true;
  2765. }
  2766. bool CWsWorkunitsEx::onWUListLocalFileRequired(IEspContext& context, IEspWUListLocalFileRequiredRequest& req, IEspWUListLocalFileRequiredResponse& resp)
  2767. {
  2768. try
  2769. {
  2770. StringBuffer wuid = req.getWuid();
  2771. checkAndTrimWorkunit("WUListLocalFileRequired", wuid);
  2772. ensureWsWorkunitAccess(context, wuid.str(), SecAccess_Read);
  2773. Owned<IWorkUnitFactory> factory = getWorkUnitFactory(context.querySecManager(), context.queryUser());
  2774. Owned<IConstWorkUnit> cw = factory->openWorkUnit(wuid.str(), false);
  2775. if (!cw)
  2776. throw MakeStringException(ECLWATCH_CANNOT_UPDATE_WORKUNIT, "Workunit %s not found.", wuid.str());
  2777. IArrayOf<IEspLogicalFileUpload> localFiles;
  2778. Owned<IConstLocalFileUploadIterator> it = cw->getLocalFileUploads();
  2779. ForEach(*it)
  2780. {
  2781. Owned<IConstLocalFileUpload> file = it->get();
  2782. if(!file)
  2783. continue;
  2784. Owned<IEspLogicalFileUpload> up = createLogicalFileUpload();
  2785. SCMStringBuffer s;
  2786. up->setType(file->queryType());
  2787. up->setSource(file->getSource(s).str());
  2788. up->setDestination(file->getDestination(s).str());
  2789. up->setEventTag(file->getEventTag(s).str());
  2790. localFiles.append(*up.getLink());
  2791. }
  2792. resp.setLocalFileUploads(localFiles);
  2793. }
  2794. catch(IException* e)
  2795. {
  2796. FORWARDEXCEPTION(context, e, ECLWATCH_INTERNAL_ERROR);
  2797. }
  2798. return true;
  2799. }
  2800. typedef enum wsEclTypes_
  2801. {
  2802. wsEclTypeUnknown,
  2803. xsdString,
  2804. xsdBoolean,
  2805. xsdDecimal,
  2806. xsdFloat,
  2807. xsdDouble,
  2808. xsdDuration,
  2809. xsdDateTime,
  2810. xsdTime,
  2811. xsdDate,
  2812. xsdYearMonth,
  2813. xsdYear,
  2814. xsdMonthDay,
  2815. xsdDay,
  2816. xsdMonth,
  2817. xsdHexBinary,
  2818. xsdBase64Binary,
  2819. xsdAnyURI,
  2820. xsdQName,
  2821. xsdNOTATION,
  2822. xsdNormalizedString,
  2823. xsdToken,
  2824. xsdLanguage,
  2825. xsdNMTOKEN,
  2826. xsdNMTOKENS,
  2827. xsdName,
  2828. xsdNCName,
  2829. xsdID,
  2830. xsdIDREF,
  2831. xsdIDREFS,
  2832. xsdENTITY,
  2833. xsdENTITIES,
  2834. xsdInteger,
  2835. xsdNonPositiveInteger,
  2836. xsdNegativeInteger,
  2837. xsdLong,
  2838. xsdInt,
  2839. xsdShort,
  2840. xsdByte,
  2841. xsdNonNegativeInteger,
  2842. xsdUnsignedLong,
  2843. xsdUnsignedInt,
  2844. xsdUnsignedShort,
  2845. xsdUnsignedByte,
  2846. xsdPositiveInteger,
  2847. tnsRawDataFile,
  2848. tnsCsvDataFile,
  2849. tnsEspStringArray,
  2850. tnsEspIntArray,
  2851. tnsXmlDataSet,
  2852. maxWsEclType
  2853. } wsEclType;
  2854. bool CWsWorkunitsEx::onWUAddLocalFileToWorkunit(IEspContext& context, IEspWUAddLocalFileToWorkunitRequest& req, IEspWUAddLocalFileToWorkunitResponse& resp)
  2855. {
  2856. try
  2857. {
  2858. StringBuffer wuid = req.getWuid();
  2859. checkAndTrimWorkunit("WUAddLocalFileToWorkunit", wuid);
  2860. ensureWsWorkunitAccess(context, wuid.str(), SecAccess_Write);
  2861. resp.setWuid(wuid.str());
  2862. const char* varname = req.getName();
  2863. if (isEmpty(varname))
  2864. {
  2865. resp.setResult("Name is not defined!");
  2866. return true;
  2867. }
  2868. resp.setName(varname);
  2869. wsEclType type = (wsEclType) req.getType();
  2870. const char *val = req.getVal();
  2871. unsigned len = req.getLength();
  2872. Owned<IWorkUnitFactory> factory = getWorkUnitFactory(context.querySecManager(), context.queryUser());
  2873. WorkunitUpdate wu(factory->updateWorkUnit(wuid.str()));
  2874. if (!wu)
  2875. {
  2876. resp.setResult("Workunit not found!");
  2877. return true;
  2878. }
  2879. Owned<IWUResult> wuRslt = wu->updateResultByName(varname);
  2880. if (isEmpty(val))
  2881. val=req.getDefVal();
  2882. if (notEmpty(val))
  2883. {
  2884. switch (type)
  2885. {
  2886. case xsdBoolean:
  2887. wuRslt->setResultBool((strieq(val, "1") || strieq(val, "true") || strieq(val, "on")));
  2888. wuRslt->setResultStatus(ResultStatusSupplied);
  2889. break;
  2890. case xsdDecimal:
  2891. case xsdFloat:
  2892. case xsdDouble:
  2893. wuRslt->setResultReal(atof(val));
  2894. wuRslt->setResultStatus(ResultStatusSupplied);
  2895. break;
  2896. case xsdInteger:
  2897. case xsdNonPositiveInteger:
  2898. case xsdNegativeInteger:
  2899. case xsdLong:
  2900. case xsdInt:
  2901. case xsdShort:
  2902. case xsdByte:
  2903. case xsdNonNegativeInteger:
  2904. case xsdUnsignedLong:
  2905. case xsdUnsignedInt:
  2906. case xsdUnsignedShort:
  2907. case xsdUnsignedByte:
  2908. case xsdPositiveInteger:
  2909. wuRslt->setResultInt(_atoi64(val));
  2910. wuRslt->setResultStatus(ResultStatusSupplied);
  2911. break;
  2912. case tnsEspIntArray:
  2913. case tnsEspStringArray:
  2914. wuRslt->setResultRaw(len, val, ResultFormatXmlSet);
  2915. break;
  2916. case tnsRawDataFile:
  2917. wuRslt->setResultRaw(len, val, ResultFormatRaw);
  2918. break;
  2919. case tnsXmlDataSet:
  2920. wuRslt->setResultRaw(len, val, ResultFormatXml);
  2921. break;
  2922. case tnsCsvDataFile:
  2923. case xsdBase64Binary: //tbd
  2924. case xsdHexBinary:
  2925. break;
  2926. default:
  2927. wuRslt->setResultString(val, len);
  2928. wuRslt->setResultStatus(ResultStatusSupplied);
  2929. break;
  2930. }
  2931. }
  2932. resp.setResult("Result has been set as required!");
  2933. }
  2934. catch(IException* e)
  2935. {
  2936. FORWARDEXCEPTION(context, e, ECLWATCH_INTERNAL_ERROR);
  2937. }
  2938. return true;
  2939. }
  2940. void getClusterConfig(char const * clusterType, char const * clusterName, char const * processName, StringBuffer& netAddress)
  2941. {
  2942. Owned<IEnvironmentFactory> factory = getEnvironmentFactory();
  2943. Owned<IConstEnvironment> environment = factory->openEnvironmentByFile();
  2944. Owned<IPropertyTree> pRoot = &environment->getPTree();
  2945. VStringBuffer xpath("Software/%s[@name='%s']", clusterType, clusterName);
  2946. IPropertyTree* pCluster = pRoot->queryPropTree(xpath.str());
  2947. if (!pCluster)
  2948. throw MakeStringException(ECLWATCH_CLUSTER_NOT_IN_ENV_INFO, "'%s %s' is not defined.", clusterType, clusterName);
  2949. const char* port = pCluster->queryProp(xpath.set(processName).append("@port").str());
  2950. const char* computer = pCluster->queryProp(xpath.set(processName).append("@computer").str());
  2951. if (isEmpty(computer))
  2952. throw MakeStringException(ECLWATCH_INVALID_CLUSTER_INFO, "'%s %s: %s' is not defined.", clusterType, clusterName, processName);
  2953. Owned<IConstMachineInfo> pMachine = environment->getMachine(computer);
  2954. if (pMachine)
  2955. {
  2956. StringBufferAdaptor s(netAddress);
  2957. pMachine->getNetAddress(s);
  2958. #ifdef MACHINE_IP
  2959. if (streq(netAddress.str(), "."))
  2960. netAddress = MACHINE_IP;
  2961. #endif
  2962. netAddress.append(':').append(port);
  2963. }
  2964. return;
  2965. }
  2966. bool CWsWorkunitsEx::onWUProcessGraph(IEspContext &context,IEspWUProcessGraphRequest &req, IEspWUProcessGraphResponse &resp)
  2967. {
  2968. try
  2969. {
  2970. StringBuffer wuid = req.getWuid();
  2971. checkAndTrimWorkunit("WUProcessGraph", wuid);
  2972. Owned<IWorkUnitFactory> factory = getWorkUnitFactory(context.querySecManager(), context.queryUser());
  2973. Owned<IConstWorkUnit> cw = factory->openWorkUnit(wuid.str(), false);
  2974. if(!cw)
  2975. throw MakeStringException(ECLWATCH_CANNOT_OPEN_WORKUNIT,"Cannot open workunit %s.",wuid.str());
  2976. ensureWsWorkunitAccess(context, *cw, SecAccess_Read);
  2977. Owned <IConstWUGraph> graph = cw->getGraph(req.getName());
  2978. Owned <IPropertyTree> xgmml = graph->getXGMMLTree(true); // merge in graph progress information
  2979. StringBuffer xml;
  2980. resp.setTheGraph(toXML(xgmml.get(), xml).str());
  2981. }
  2982. catch(IException* e)
  2983. {
  2984. FORWARDEXCEPTION(context, e, ECLWATCH_INTERNAL_ERROR);
  2985. }
  2986. return true;
  2987. }
  2988. bool isRunning(IConstWorkUnit &cw)
  2989. {
  2990. // MORE - move into workunit interface
  2991. switch (cw.getState())
  2992. {
  2993. case WUStateFailed:
  2994. case WUStateAborted:
  2995. case WUStateCompleted:
  2996. return false;
  2997. default:
  2998. return true;
  2999. }
  3000. }
  3001. bool CWsWorkunitsEx::onWUGetGraph(IEspContext& context, IEspWUGetGraphRequest& req, IEspWUGetGraphResponse& resp)
  3002. {
  3003. try
  3004. {
  3005. StringBuffer wuid = req.getWuid();
  3006. checkAndTrimWorkunit("WUGetGraph", wuid);
  3007. Owned<IWorkUnitFactory> factory = getWorkUnitFactory(context.querySecManager(), context.queryUser());
  3008. Owned<IConstWorkUnit> cw = factory->openWorkUnit(wuid.str(), false);
  3009. if(!cw)
  3010. throw MakeStringException(ECLWATCH_CANNOT_OPEN_WORKUNIT,"Cannot open workunit %s.",wuid.str());
  3011. ensureWsWorkunitAccess(context, *cw, SecAccess_Read);
  3012. WUGraphIDType id;
  3013. SCMStringBuffer runningGraph;
  3014. bool running= (isRunning(*cw) && cw->getRunningGraph(runningGraph,id));
  3015. IArrayOf<IEspECLGraphEx> graphs;
  3016. Owned<IConstWUGraphIterator> it = &cw->getGraphs(GraphTypeAny);
  3017. ForEach(*it)
  3018. {
  3019. IConstWUGraph &graph = it->query();
  3020. if(!graph.isValid())
  3021. continue;
  3022. SCMStringBuffer name, label, type;
  3023. graph.getName(name);
  3024. graph.getLabel(label);
  3025. graph.getTypeName(type);
  3026. if(isEmpty(req.getGraphName()) || strieq(name.str(), req.getGraphName()))
  3027. {
  3028. Owned<IEspECLGraphEx> g = createECLGraphEx("","");
  3029. g->setName(name.str());
  3030. g->setLabel(label.str());
  3031. g->setType(type.str());
  3032. if(running && streq(name.str(), runningGraph.str()))
  3033. {
  3034. g->setRunning(true);
  3035. g->setRunningId(id);
  3036. }
  3037. Owned<IPropertyTree> xgmml = graph.getXGMMLTree(true);
  3038. // New functionality, if a subgraph id is specified and we only want to load the xgmml for that subgraph
  3039. // then we need to conditionally pull a propertytree from the xgmml graph one and use that for the xgmml.
  3040. StringBuffer xml;
  3041. if (notEmpty(req.getSubGraphId()))
  3042. {
  3043. VStringBuffer xpath("//node[@id='%s']", req.getSubGraphId());
  3044. toXML(xgmml->queryPropTree(xpath.str()), xml);
  3045. }
  3046. else
  3047. toXML(xgmml, xml);
  3048. g->setGraph(xml.str());
  3049. if (context.getClientVersion() > 1.20)
  3050. {
  3051. Owned<IConstWUGraphProgress> progress = cw->getGraphProgress(name.str());
  3052. if (progress)
  3053. {
  3054. WUGraphState graphstate= progress->queryGraphState();
  3055. if (graphstate == WUGraphComplete)
  3056. g->setComplete(true);
  3057. else if (graphstate == WUGraphFailed)
  3058. g->setFailed(true);
  3059. }
  3060. }
  3061. graphs.append(*g.getClear());
  3062. }
  3063. }
  3064. resp.setGraphs(graphs);
  3065. }
  3066. catch(IException* e)
  3067. {
  3068. FORWARDEXCEPTION(context, e, ECLWATCH_INTERNAL_ERROR);
  3069. }
  3070. return true;
  3071. }
  3072. bool CWsWorkunitsEx::onGVCAjaxGraph(IEspContext &context, IEspGVCAjaxGraphRequest &req, IEspGVCAjaxGraphResponse &resp)
  3073. {
  3074. try
  3075. {
  3076. resp.setName(req.getName());
  3077. resp.setGraphName(req.getGraphName());
  3078. resp.setGraphType("eclwatch");
  3079. double version = context.getClientVersion();
  3080. if (version > 1.19)
  3081. resp.setSubGraphId(req.getSubGraphId());
  3082. if (version > 1.20)
  3083. resp.setSubGraphOnly(req.getSubGraphOnly());
  3084. }
  3085. catch(IException* e)
  3086. {
  3087. FORWARDEXCEPTION(context, e, ECLWATCH_INTERNAL_ERROR);
  3088. }
  3089. return true;
  3090. }
  3091. bool CWsWorkunitsEx::onWUGraphInfo(IEspContext &context,IEspWUGraphInfoRequest &req, IEspWUGraphInfoResponse &resp)
  3092. {
  3093. try
  3094. {
  3095. StringBuffer wuid = req.getWuid();
  3096. checkAndTrimWorkunit("WUGraphInfo", wuid);
  3097. Owned<IWorkUnitFactory> factory = getWorkUnitFactory(context.querySecManager(), context.queryUser());
  3098. Owned<IConstWorkUnit> cw = factory->openWorkUnit(wuid.str(), false);
  3099. if(!cw)
  3100. throw MakeStringException(ECLWATCH_CANNOT_UPDATE_WORKUNIT,"Cannot open workunit %s.",wuid.str());
  3101. ensureWsWorkunitAccess(context, *cw, SecAccess_Write);
  3102. resp.setWuid(wuid.str());
  3103. resp.setName(req.getName());
  3104. resp.setRunning(isRunning(*cw));
  3105. if (notEmpty(req.getGID()))
  3106. resp.setGID(req.getGID());
  3107. if(!req.getBatchWU_isNull())
  3108. resp.setBatchWU(req.getBatchWU());
  3109. }
  3110. catch(IException* e)
  3111. {
  3112. FORWARDEXCEPTION(context, e, ECLWATCH_INTERNAL_ERROR);
  3113. }
  3114. return true;
  3115. }
  3116. bool CWsWorkunitsEx::onWUGVCGraphInfo(IEspContext &context,IEspWUGVCGraphInfoRequest &req, IEspWUGVCGraphInfoResponse &resp)
  3117. {
  3118. try
  3119. {
  3120. StringBuffer wuid = req.getWuid();
  3121. checkAndTrimWorkunit("WUGVCGraphInfo", wuid);
  3122. Owned<IWorkUnitFactory> factory = getWorkUnitFactory(context.querySecManager(), context.queryUser());
  3123. Owned<IConstWorkUnit> cw = factory->openWorkUnit(wuid.str(), false);
  3124. if(!cw)
  3125. throw MakeStringException(ECLWATCH_CANNOT_UPDATE_WORKUNIT,"Cannot open workunit %s.",wuid.str());
  3126. ensureWsWorkunitAccess(context, *cw, SecAccess_Read);
  3127. resp.setWuid(wuid.str());
  3128. resp.setName(req.getName());
  3129. resp.setRunning(isRunning(*cw));
  3130. if (notEmpty(req.getGID()))
  3131. resp.setGID(req.getGID());
  3132. if(!req.getBatchWU_isNull())
  3133. resp.setBatchWU(req.getBatchWU());
  3134. StringBuffer xml("<Control><Endpoint><Query id=\"Gordon.Extractor.0\">");
  3135. xml.appendf("<Graph id=\"%s\">", req.getName());
  3136. if (context.getClientVersion() > 1.17)
  3137. {
  3138. xml.append("<Subgraph>");
  3139. xml.append(req.getSubgraphId_isNull() ? 0 : req.getSubgraphId());
  3140. xml.append("</Subgraph>");
  3141. }
  3142. xml.append("</Graph></Query></Endpoint></Control>");
  3143. resp.setTheGraph(xml.str());
  3144. }
  3145. catch(IException* e)
  3146. {
  3147. FORWARDEXCEPTION(context, e, ECLWATCH_INTERNAL_ERROR);
  3148. }
  3149. return true;
  3150. }
  3151. bool CWsWorkunitsEx::onWUGraphTiming(IEspContext &context, IEspWUGraphTimingRequest &req, IEspWUGraphTimingResponse &resp)
  3152. {
  3153. try
  3154. {
  3155. StringBuffer wuid = req.getWuid();
  3156. checkAndTrimWorkunit("WUGraphTiming", wuid);
  3157. Owned<IWorkUnitFactory> factory = getWorkUnitFactory(context.querySecManager(), context.queryUser());
  3158. Owned<IConstWorkUnit> cw = factory->openWorkUnit(wuid.str(), false);
  3159. if(!cw)
  3160. throw MakeStringException(ECLWATCH_CANNOT_UPDATE_WORKUNIT,"Cannot open workunit %s.",wuid.str());
  3161. ensureWsWorkunitAccess(context, *cw, SecAccess_Read);
  3162. resp.updateWorkunit().setWuid(wuid.str());
  3163. WsWuInfo winfo(context, cw);
  3164. IArrayOf<IConstECLTimingData> timingData;
  3165. winfo.getGraphTimingData(timingData, 0);
  3166. resp.updateWorkunit().setTimingData(timingData);
  3167. }
  3168. catch(IException* e)
  3169. {
  3170. FORWARDEXCEPTION(context, e, ECLWATCH_INTERNAL_ERROR);
  3171. }
  3172. return true;
  3173. }
  3174. int CWsWorkunitsSoapBindingEx::onGetForm(IEspContext &context, CHttpRequest* request, CHttpResponse* response, const char *service, const char *method)
  3175. {
  3176. try
  3177. {
  3178. StringBuffer xml;
  3179. StringBuffer xslt;
  3180. if(strieq(method,"WUQuery") || strieq(method,"WUJobList"))
  3181. {
  3182. Owned<IEnvironmentFactory> factory = getEnvironmentFactory();
  3183. Owned<IConstEnvironment> environment = factory->openEnvironmentByFile();
  3184. Owned<IPropertyTree> root = &environment->getPTree();
  3185. if (!root)
  3186. throw MakeStringException(ECLWATCH_CANNOT_GET_ENV_INFO, "Failed to get environment information.");
  3187. if(strieq(method,"WUQuery"))
  3188. {
  3189. SecAccessFlags accessOwn;
  3190. SecAccessFlags accessOthers;
  3191. getUserWuAccessFlags(context, accessOwn, accessOthers, false);
  3192. xml.append("<WUQuery>");
  3193. if ((accessOwn == SecAccess_None) && (accessOthers == SecAccess_None))
  3194. xml.appendf("<ErrorMessage>Access to workunit is denied.</ErrorMessage>");
  3195. else
  3196. {
  3197. MapStringTo<bool> added;
  3198. Owned<IPropertyTreeIterator> it = root->getElements("Software/Topology/Cluster");
  3199. ForEach(*it)
  3200. {
  3201. const char *name = it->query().queryProp("@name");
  3202. if (notEmpty(name) && !added.getValue(name))
  3203. {
  3204. added.setValue(name, true);
  3205. appendXMLTag(xml, "Cluster", name);
  3206. }
  3207. }
  3208. }
  3209. xml.append("</WUQuery>");
  3210. xslt.append(getCFD()).append("./smc_xslt/wuid_search.xslt");
  3211. }
  3212. else if (strieq(method,"WUJobList"))
  3213. {
  3214. StringBuffer cluster;
  3215. request->getParameter("Cluster", cluster);
  3216. StringBuffer range;
  3217. request->getParameter("Range",range);
  3218. Owned<IConstWUClusterInfo> clusterInfo = getTargetClusterInfo(cluster);
  3219. xml.append("<WUJobList>");
  3220. if (range.length())
  3221. appendXMLTag(xml, "Range", range.str());
  3222. if (clusterInfo)
  3223. {
  3224. const StringArray &thorInstances = clusterInfo->getThorProcesses();
  3225. ForEachItemIn(i, thorInstances)
  3226. {
  3227. xml.append("<Cluster").append('>').append(thorInstances.item(i)).append("</Cluster>");
  3228. }
  3229. }
  3230. xml.append("<TargetCluster>").append(cluster).append("</TargetCluster>");
  3231. xml.append("</WUJobList>");
  3232. xslt.append(getCFD()).append("./smc_xslt/jobs_search.xslt");
  3233. response->addHeader("Expires", "0");
  3234. }
  3235. }
  3236. if (xslt.length() && xml.length())
  3237. {
  3238. StringBuffer html;
  3239. xsltTransform(xml.str(), xslt.str(), NULL, html);
  3240. response->setContent(html.str());
  3241. response->setContentType(HTTP_TYPE_TEXT_HTML_UTF8);
  3242. response->send();
  3243. return 0;
  3244. }
  3245. }
  3246. catch(IException* e)
  3247. {
  3248. FORWARDEXCEPTION(context, e, ECLWATCH_INTERNAL_ERROR);
  3249. }
  3250. return onGetNotFound(context, request, response, service);
  3251. }
  3252. void deployEclOrArchive(IEspContext &context, IEspWUDeployWorkunitRequest & req, IEspWUDeployWorkunitResponse & resp)
  3253. {
  3254. NewWsWorkunit wu(context);
  3255. SCMStringBuffer wuid;
  3256. wu->getWuid(wuid);
  3257. wu->setAction(WUActionCompile);
  3258. StringBuffer name(req.getName());
  3259. if (!name.trim().length() && notEmpty(req.getFileName()))
  3260. splitFilename(req.getFileName(), NULL, NULL, &name, NULL);
  3261. if (name.length())
  3262. wu->setJobName(name.str());
  3263. if (req.getObject().length())
  3264. {
  3265. StringBuffer text(req.getObject().length(), req.getObject().toByteArray());
  3266. wu.setQueryText(text.str());
  3267. }
  3268. if (req.getQueryMainDefinition())
  3269. wu.setQueryMain(req.getQueryMainDefinition());
  3270. if (req.getSnapshot())
  3271. wu->setSnapshot(req.getSnapshot());
  3272. if (!req.getResultLimit_isNull())
  3273. wu->setResultLimit(req.getResultLimit());
  3274. wu->commit();
  3275. wu.clear();
  3276. submitWsWorkunit(context, wuid.str(), req.getCluster(), NULL, 0, true, false, false, NULL, NULL, &req.getDebugValues());
  3277. waitForWorkUnitToCompile(wuid.str(), req.getWait());
  3278. WsWuInfo winfo(context, wuid.str());
  3279. winfo.getCommon(resp.updateWorkunit(), WUINFO_All);
  3280. winfo.getExceptions(resp.updateWorkunit(), WUINFO_All);
  3281. name.clear();
  3282. if (notEmpty(resp.updateWorkunit().getJobname()))
  3283. origValueChanged(req.getName(), resp.updateWorkunit().getJobname(), name, false);
  3284. if (name.length()) //non generated user specified name, so override #Workunit('name')
  3285. {
  3286. WorkunitUpdate wx(&winfo.cw->lock());
  3287. wx->setJobName(name.str());
  3288. resp.updateWorkunit().setJobname(name.str());
  3289. }
  3290. AuditSystemAccess(context.queryUserId(), true, "Updated %s", wuid.str());
  3291. }
  3292. StringBuffer &sharedObjectFileName(StringBuffer &filename, const char *name, const char *ext, unsigned copy)
  3293. {
  3294. filename.append((name && *name) ? name : "workunit");
  3295. if (copy)
  3296. filename.append('-').append(copy);
  3297. if (notEmpty(ext))
  3298. filename.append(ext);
  3299. return filename;
  3300. }
  3301. inline StringBuffer &buildFullDllPath(StringBuffer &dllpath, StringBuffer &dllname, const char *dir, const char *name, const char *ext, unsigned copy)
  3302. {
  3303. return addPathSepChar(dllpath.set(dir)).append(sharedObjectFileName(dllname, name, ext, copy));
  3304. }
  3305. void writeSharedObject(const char *srcpath, const MemoryBuffer &obj, const char *dir, StringBuffer &dllpath, StringBuffer &dllname)
  3306. {
  3307. StringBuffer name, ext;
  3308. if (srcpath && *srcpath)
  3309. splitFilename(srcpath, NULL, NULL, &name, &ext);
  3310. unsigned copy=0;
  3311. buildFullDllPath(dllpath.clear(), dllname.clear(), dir, name.str(), ext.str(), copy);
  3312. while (checkFileExists(dllpath.str()))
  3313. buildFullDllPath(dllpath.clear(), dllname.clear(), dir, name.str(), ext.str(), ++copy);
  3314. DBGLOG("Writing workunit dll: %s", dllpath.str());
  3315. Owned<IFile> f = createIFile(dllpath.str());
  3316. Owned<IFileIO> io = f->open(IFOcreate);
  3317. io->write(0, obj.length(), obj.toByteArray());
  3318. }
  3319. void CWsWorkunitsEx::deploySharedObject(IEspContext &context, StringBuffer &wuid, const char *filename, const char *cluster, const char *name, const MemoryBuffer &obj, const char *dir, const char *xml)
  3320. {
  3321. StringBuffer dllpath, dllname;
  3322. StringBuffer srcname(filename);
  3323. if (!srcname.length())
  3324. srcname.append(name).append(SharedObjectExtension);
  3325. writeSharedObject(srcname.str(), obj, dir, dllpath, dllname);
  3326. NewWsWorkunit wu(context);
  3327. StringBufferAdaptor isvWuid(wuid);
  3328. wu->getWuid(isvWuid);
  3329. wu->setClusterName(cluster);
  3330. wu->commit();
  3331. StringBuffer dllXML;
  3332. if (getWorkunitXMLFromFile(dllpath.str(), dllXML))
  3333. {
  3334. Owned<ILocalWorkUnit> embeddedWU = createLocalWorkUnit();
  3335. embeddedWU->loadXML(dllXML.str());
  3336. queryExtendedWU(wu)->copyWorkUnit(embeddedWU, true);
  3337. }
  3338. wu.associateDll(dllpath.str(), dllname.str());
  3339. if (name && *name)
  3340. wu->setJobName(name);
  3341. //clean slate, copy only select items from processed workunit xml
  3342. if (xml && *xml)
  3343. {
  3344. Owned<IPropertyTree> srcxml = createPTreeFromXMLString(xml);
  3345. if (srcxml->hasProp("@jobName"))
  3346. wu->setJobName(srcxml->queryProp("@jobName"));
  3347. if (srcxml->hasProp("@token"))
  3348. wu->setSecurityToken(srcxml->queryProp("@token"));
  3349. if (srcxml->hasProp("Query/Text"))
  3350. wu.setQueryText(srcxml->queryProp("Query/Text"));
  3351. }
  3352. wu->setState(WUStateCompiled);
  3353. wu->commit();
  3354. wu.clear();
  3355. AuditSystemAccess(context.queryUserId(), true, "Updated %s", wuid.str());
  3356. }
  3357. void CWsWorkunitsEx::deploySharedObject(IEspContext &context, IEspWUDeployWorkunitRequest & req, IEspWUDeployWorkunitResponse & resp, const char *dir, const char *xml)
  3358. {
  3359. if (isEmpty(req.getFileName()))
  3360. throw MakeStringException(ECLWATCH_INVALID_INPUT, "File name required when deploying a shared object.");
  3361. const char *cluster = req.getCluster();
  3362. if (isEmpty(cluster))
  3363. throw MakeStringException(ECLWATCH_INVALID_INPUT, "Cluster name required when deploying a shared object.");
  3364. StringBuffer wuid;
  3365. deploySharedObject(context, wuid, req.getFileName(), cluster, req.getName(), req.getObject(), dir, xml);
  3366. WsWuInfo winfo(context, wuid.str());
  3367. winfo.getCommon(resp.updateWorkunit(), WUINFO_All);
  3368. AuditSystemAccess(context.queryUserId(), true, "Updated %s", wuid.str());
  3369. }
  3370. bool CWsWorkunitsEx::onWUDeployWorkunit(IEspContext &context, IEspWUDeployWorkunitRequest & req, IEspWUDeployWorkunitResponse & resp)
  3371. {
  3372. const char *type = req.getObjType();
  3373. try
  3374. {
  3375. if (!context.validateFeatureAccess(OWN_WU_ACCESS, SecAccess_Write, false))
  3376. throw MakeStringException(ECLWATCH_ECL_WU_ACCESS_DENIED, "Failed to create workunit. Permission denied.");
  3377. if (notEmpty(req.getCluster()) && !isValidCluster(req.getCluster()))
  3378. throw MakeStringException(ECLWATCH_INVALID_CLUSTER_NAME, "Invalid cluster name: %s", req.getCluster());
  3379. if (strieq(type, "archive")|| strieq(type, "ecl_text"))
  3380. deployEclOrArchive(context, req, resp);
  3381. else if (strieq(type, "shared_object"))
  3382. deploySharedObject(context, req, resp, queryDirectory.str());
  3383. else
  3384. throw MakeStringException(ECLWATCH_INVALID_INPUT, "WUDeployWorkunit '%s' unkown object type.", type);
  3385. }
  3386. catch(IException* e)
  3387. {
  3388. FORWARDEXCEPTION(context, e, ECLWATCH_INTERNAL_ERROR);
  3389. }
  3390. return true;
  3391. }