ws_workunitsHelpers.cpp 78 KB


  1. /*##############################################################################
  2. HPCC SYSTEMS software Copyright (C) 2012 HPCC Systems.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. ############################################################################## */
  13. #include "jlib.hpp"
  14. #include "ws_workunitsHelpers.hpp"
  15. #include "exception_util.hpp"
  16. #include "daclient.hpp"
  17. #include "dalienv.hpp"
  18. #include "daaudit.hpp"
  19. #include "portlist.h"
  20. #include "dadfs.hpp"
  21. #include "fileview.hpp"
  22. #include "wuwebview.hpp"
  23. #include "dllserver.hpp"
  24. #include "wujobq.hpp"
  25. #ifdef _USE_ZLIB
  26. #include "zcrypt.hpp"
  27. #endif
  28. //#include "workunit.hpp"
  29. //#include "daclient.hpp"
  30. //#include "dalienv.hpp"
  31. //#include "exception_util.hpp"
  32. //#include "wujobq.hpp"
  33. //#include "eventqueue.hpp"
  34. //#include "hqlerror.hpp"
  35. //#include "sacmd.hpp"
  36. //#include "portlist.h"
  37. //#define OWN_WU_ACCESS "OwnWorkunitsAccess"
  38. //#define OTHERS_WU_ACCESS "OthersWorkunitsAccess"
  39. //const unsigned MAXTHORS = 1024;
  40. //#define File_Cpp "cpp"
  41. //#define File_ThorLog "ThorLog"
  42. //#define File_ThorSlaveLog "ThorSlaveLog"
  43. //#define File_EclAgentLog "EclAgentLog"
  44. //#define File_XML "XML"
  45. //#define File_Res "res"
  46. //#define File_DLL "dll"
  47. //#define File_ArchiveQuery "ArchiveQuery"
  48. namespace ws_workunits {
  49. SecAccessFlags chooseWuAccessFlagsByOwnership(const char *user, const char *owner, SecAccessFlags accessOwn, SecAccessFlags accessOthers)
  50. {
  51. return (isEmpty(owner) || (user && streq(user, owner))) ? accessOwn : accessOthers;
  52. }
  53. SecAccessFlags chooseWuAccessFlagsByOwnership(const char *user, IConstWorkUnit& cw, SecAccessFlags accessOwn, SecAccessFlags accessOthers)
  54. {
  55. SCMStringBuffer owner;
  56. return chooseWuAccessFlagsByOwnership(user, cw.getUser(owner).str(), accessOwn, accessOthers);
  57. }
  58. const char *getWuAccessType(const char *owner, const char *user)
  59. {
  60. return (isEmpty(owner) || (user && streq(user, owner))) ? OWN_WU_ACCESS : OTHERS_WU_ACCESS;
  61. }
  62. const char *getWuAccessType(IConstWorkUnit& cw, const char *user)
  63. {
  64. SCMStringBuffer owner;
  65. return getWuAccessType(cw.getUser(owner).str(), user);
  66. }
  67. void getUserWuAccessFlags(IEspContext& context, SecAccessFlags& accessOwn, SecAccessFlags& accessOthers, bool except)
  68. {
  69. if (!context.authorizeFeature(OWN_WU_ACCESS, accessOwn))
  70. accessOwn = SecAccess_None;
  71. if (!context.authorizeFeature(OTHERS_WU_ACCESS, accessOthers))
  72. accessOthers = SecAccess_None;
  73. if (except && (accessOwn == SecAccess_None) && (accessOthers == SecAccess_None))
  74. {
  75. AuditSystemAccess(context.queryUserId(), false, "Access Denied: User can't view any workunits");
  76. VStringBuffer msg("Access Denied: User %s does not have rights to access workunits.", context.queryUserId());
  77. throw MakeStringException(ECLWATCH_ECL_WU_ACCESS_DENIED, "%s", msg.str());
  78. }
  79. }
  80. SecAccessFlags getWsWorkunitAccess(IEspContext& cxt, IConstWorkUnit& cw)
  81. {
  82. SecAccessFlags accessFlag = SecAccess_None;
  83. cxt.authorizeFeature(getWuAccessType(cw, cxt.queryUserId()), accessFlag);
  84. return accessFlag;
  85. }
  86. void ensureWsWorkunitAccessByOwnerId(IEspContext& cxt, const char* owner, SecAccessFlags minAccess)
  87. {
  88. if (!cxt.validateFeatureAccess(getWuAccessType(owner, cxt.queryUserId()), minAccess, false))
  89. throw MakeStringException(ECLWATCH_ECL_WU_ACCESS_DENIED, "Failed to access workunit. Permission denied.");
  90. }
  91. void ensureWsWorkunitAccess(IEspContext& cxt, IConstWorkUnit& cw, SecAccessFlags minAccess)
  92. {
  93. if (!cxt.validateFeatureAccess(getWuAccessType(cw, cxt.queryUserId()), minAccess, false))
  94. throw MakeStringException(ECLWATCH_ECL_WU_ACCESS_DENIED, "Failed to access workunit. Permission denied.");
  95. }
  96. void ensureWsWorkunitAccess(IEspContext& context, const char* wuid, SecAccessFlags minAccess)
  97. {
  98. Owned<IWorkUnitFactory> wf = getWorkUnitFactory(context.querySecManager(), context.queryUser());
  99. Owned<IConstWorkUnit> cw = wf->openWorkUnit(wuid, false);
  100. if (!cw)
  101. throw MakeStringException(ECLWATCH_ECL_WU_ACCESS_DENIED, "Failed to open workunit %s when ensuring workunit access", wuid);
  102. ensureWsWorkunitAccess(context, *cw, minAccess);
  103. }
  104. void ensureWsCreateWorkunitAccess(IEspContext& cxt)
  105. {
  106. if (!cxt.validateFeatureAccess(OWN_WU_ACCESS, SecAccess_Write, false))
  107. throw MakeStringException(ECLWATCH_ECL_WU_ACCESS_DENIED, "Failed to create workunit. Permission denied.");
  108. }
  109. StringBuffer &getWuidFromLogicalFileName(IEspContext &context, const char *logicalName, StringBuffer &wuid)
  110. {
  111. Owned<IUserDescriptor> userdesc = createUserDescriptor();
  112. userdesc->set(context.queryUserId(), context.queryPassword());
  113. Owned<IDistributedFile> df = queryDistributedFileDirectory().lookup(logicalName, userdesc);
  114. if (!df)
  115. throw MakeStringException(ECLWATCH_FILE_NOT_EXIST,"Cannot find file %s.",logicalName);
  116. return wuid.append(df->queryAttributes().queryProp("@workunit"));
  117. }
  118. void formatDuration(StringBuffer &s, unsigned ms)
  119. {
  120. unsigned days = ms / (1000*60*60*24);
  121. ms %= (1000*60*60*24);
  122. unsigned hours = ms / (1000*60*60);
  123. ms %= (1000*60*60);
  124. unsigned mins = ms / (1000*60);
  125. ms %= (1000*60);
  126. unsigned secs = ms / 1000;
  127. ms %= 1000;
  128. if (days)
  129. s.appendf("%d days ", days);
  130. if (hours || s.length())
  131. s.appendf("%d:", hours);
  132. if (mins || s.length())
  133. s.appendf("%d:", mins);
  134. if (s.length())
  135. s.appendf("%02d.%03d", secs, ms);
  136. else
  137. s.appendf("%d.%03d", secs, ms);
  138. }
  139. WsWUExceptions::WsWUExceptions(IConstWorkUnit& wu): numerr(0), numwrn(0), numinf(0)
  140. {
  141. Owned<IConstWUExceptionIterator> it = &wu.getExceptions();
  142. ForEach(*it)
  143. {
  144. SCMStringBuffer src, msg, file;
  145. Owned<IEspECLException> e= createECLException("","");
  146. e->setCode(it->query().getExceptionCode());
  147. e->setSource(it->query().getExceptionSource(src).str());
  148. e->setMessage(it->query().getExceptionMessage(msg).str());
  149. e->setFileName(it->query().getExceptionFileName(file).str());
  150. e->setLineNo(it->query().getExceptionLineNo());
  151. e->setColumn(it->query().getExceptionColumn());
  152. const char * label = "";
  153. switch (it->query().getSeverity())
  154. {
  155. default:
  156. case ExceptionSeverityError: label = "Error"; numerr++; break;
  157. case ExceptionSeverityWarning: label = "Warning"; numwrn++; break;
  158. case ExceptionSeverityInformation: label = "Info"; numinf++; break;
  159. }
  160. e->setSeverity(label);
  161. errors.append(*e.getLink());
  162. }
  163. }
  164. #define SDS_LOCK_TIMEOUT 30000
  165. void getSashaNode(SocketEndpoint &ep)
  166. {
  167. Owned<IRemoteConnection> econn = querySDS().connect("/Environment", myProcessSession(), 0, SDS_LOCK_TIMEOUT);
  168. if (!econn)
  169. throw MakeStringException(ECLWATCH_CANNOT_CONNECT_DALI,"Cannot connect to DALI server.");
  170. IPropertyTree *root = econn->queryRoot();
  171. if (!root)
  172. throw MakeStringException(ECLWATCH_CANNOT_GET_ENV_INFO,"Cannot get environment information.");
  173. IPropertyTree *pt = root->queryPropTree("Software/SashaServerProcess/Instance[1]");
  174. if (!pt)
  175. throw MakeStringException(ECLWATCH_ARCHIVE_SERVER_NOT_FOUND, "Archive Server not found.");
  176. ep.set(pt->queryProp("@netAddress"), pt->getPropInt("@port",DEFAULT_SASHA_PORT));
  177. }
  178. void WsWuInfo::getSourceFiles(IEspECLWorkunit &info, unsigned flags)
  179. {
  180. if (!(flags & WUINFO_IncludeSourceFiles))
  181. return;
  182. try
  183. {
  184. Owned<IUserDescriptor> userdesc;
  185. StringBuffer username;
  186. context.getUserID(username);
  187. const char* passwd = context.queryPassword();
  188. userdesc.setown(createUserDescriptor());
  189. userdesc->set(username.str(), passwd);
  190. IArrayOf<IEspECLSourceFile> files;
  191. if (version < 1.27)
  192. {
  193. Owned<IPropertyTreeIterator> f=&cw->getFilesReadIterator();
  194. ForEach(*f)
  195. {
  196. IPropertyTree &query = f->query();
  197. const char *clusterName = query.queryProp("@cluster");
  198. const char *fileName = query.queryProp("@name");
  199. int fileCount = query.getPropInt("@useCount");
  200. Owned<IEspECLSourceFile> file= createECLSourceFile("","");
  201. if(clusterName && *clusterName)
  202. {
  203. file->setFileCluster(clusterName);
  204. }
  205. if (version > 1.11)
  206. {
  207. Owned<IPropertyTreeIterator> filetrees= query.getElements("Subfile");
  208. if (filetrees->first())
  209. file->setIsSuperFile(true);
  210. }
  211. if (fileName && *fileName)
  212. {
  213. file->setName(fileName);
  214. }
  215. file->setCount(fileCount);
  216. files.append(*file.getLink());
  217. }
  218. }
  219. else
  220. {
  221. StringArray fileNames;
  222. Owned<IPropertyTreeIterator> f=&cw->getFilesReadIterator();
  223. ForEach(*f)
  224. {
  225. IPropertyTree &query = f->query();
  226. const char *clusterName = query.queryProp("@cluster");
  227. const char *fileName = query.queryProp("@name");
  228. int fileCount = query.getPropInt("@useCount");
  229. bool bFound = false;
  230. if (fileName && *fileName && (fileNames.length() > 0))
  231. {
  232. for (unsigned i = 0; i < fileNames.length(); i++ )
  233. {
  234. const char *fileName0 = fileNames.item(i);
  235. if (!stricmp(fileName, fileName0))
  236. {
  237. bFound = true;
  238. break;
  239. }
  240. }
  241. }
  242. if (bFound)
  243. continue;
  244. Owned<IEspECLSourceFile> file= createECLSourceFile("","");
  245. if(clusterName && *clusterName)
  246. {
  247. file->setFileCluster(clusterName);
  248. }
  249. if (fileName && *fileName)
  250. {
  251. file->setName(fileName);
  252. }
  253. file->setCount(fileCount);
  254. Owned<IPropertyTreeIterator> filetrees= query.getElements("Subfile");
  255. if (filetrees->first())
  256. {
  257. file->setIsSuperFile(true);
  258. getSubFiles(filetrees, file, fileNames);
  259. }
  260. files.append(*file.getLink());
  261. }
  262. }
  263. info.setSourceFiles(files);
  264. }
  265. catch(IException* e)
  266. {
  267. StringBuffer eMsg;
  268. ERRLOG("%s", e->errorMessage(eMsg).str());
  269. info.setSourceFilesDesc(eMsg.str());
  270. e->Release();
  271. }
  272. }
  273. void WsWuInfo::getExceptions(IEspECLWorkunit &info, unsigned flags)
  274. {
  275. if ((flags & WUINFO_IncludeExceptions) || version > 1.16)
  276. {
  277. WsWUExceptions errors(*cw);
  278. if (version > 1.16)
  279. {
  280. info.setErrorCount(errors.ErrCount());
  281. info.setWarningCount(errors.WrnCount());
  282. info.setInfoCount(errors.InfCount());
  283. }
  284. if ((flags & WUINFO_IncludeExceptions))
  285. info.setExceptions(errors);
  286. }
  287. }
  288. void WsWuInfo::getVariables(IEspECLWorkunit &info, unsigned flags)
  289. {
  290. if (!(flags & WUINFO_IncludeVariables))
  291. return;
  292. try
  293. {
  294. IArrayOf<IEspECLResult> results;
  295. Owned<IConstWUResultIterator> vars = &cw->getVariables();
  296. ForEach(*vars)
  297. getResult(vars->query(), results, flags);
  298. info.setVariables(results);
  299. results.kill();
  300. }
  301. catch(IException* e)
  302. {
  303. StringBuffer eMsg;
  304. ERRLOG("%s", e->errorMessage(eMsg).str());
  305. info.setVariablesDesc(eMsg.str());
  306. e->Release();
  307. }
  308. }
  309. void WsWuInfo::getTimers(IEspECLWorkunit &info, unsigned flags)
  310. {
  311. if (!(flags & WUINFO_IncludeTimers))
  312. return;
  313. try
  314. {
  315. StringBuffer totalThorTimeValue;
  316. unsigned totalThorTimerCount; //Do we need this?
  317. IArrayOf<IEspECLTimer> timers;
  318. Owned<IStringIterator> it = &cw->getTimers();
  319. ForEach(*it)
  320. {
  321. SCMStringBuffer name;
  322. it->str(name);
  323. SCMStringBuffer value;
  324. unsigned count = cw->getTimerCount(name.str(), NULL);
  325. unsigned duration = cw->getTimerDuration(name.str(), NULL);
  326. StringBuffer fd;
  327. formatDuration(fd, duration);
  328. for (unsigned i = 0; i < name.length(); i++)
  329. if (name.s.charAt(i)=='_')
  330. name.s.setCharAt(i, ' ');
  331. if (strieq(name.str(), TOTALTHORTIME))
  332. {
  333. totalThorTimeValue = fd;
  334. totalThorTimerCount = count;
  335. continue;
  336. }
  337. Owned<IEspECLTimer> t= createECLTimer("","");
  338. t->setName(name.str());
  339. t->setValue(fd.str());
  340. t->setCount(count);
  341. if (version > 1.19)
  342. {
  343. StringBuffer graphName;
  344. unsigned subGraphNum;
  345. unsigned __int64 subId;
  346. if (parseGraphTimerLabel(name.str(), graphName, subGraphNum, subId))
  347. {
  348. if (graphName.length() > 0)
  349. {
  350. t->setGraphName(graphName.str());
  351. }
  352. if (subId > 0)
  353. {
  354. t->setSubGraphId((int)subId);
  355. }
  356. }
  357. }
  358. timers.append(*t.getLink());
  359. }
  360. if (totalThorTimeValue.length() > 0)
  361. {
  362. Owned<IEspECLTimer> t= createECLTimer("","");
  363. t->setName(TOTALTHORTIME);
  364. t->setValue(totalThorTimeValue.str());
  365. t->setCount(totalThorTimerCount);
  366. timers.append(*t.getLink());
  367. }
  368. info.setTimers(timers);
  369. }
  370. catch(IException* e)
  371. {
  372. StringBuffer eMsg;
  373. ERRLOG("%s", e->errorMessage(eMsg).str());
  374. info.setTimersDesc(eMsg.str());
  375. e->Release();
  376. }
  377. }
  378. void WsWuInfo::getHelpers(IEspECLWorkunit &info, unsigned flags)
  379. {
  380. try
  381. {
  382. Owned <IConstWUQuery> query = cw->getQuery();
  383. if(!query)
  384. {
  385. ERRLOG("Cannot get Query for this workunit.");
  386. info.setHelpersDesc("Cannot get Query for this workunit.");
  387. }
  388. SCMStringBuffer qname;
  389. query->getQueryShortText(qname);
  390. if(qname.length())
  391. {
  392. if((flags & WUINFO_TruncateEclTo64k) && (qname.length() > 64000))
  393. qname.setLen(qname.str(), 64000);
  394. IEspECLQuery* q=&info.updateQuery();
  395. q->setText(qname.str());
  396. }
  397. if (version > 1.34)
  398. {
  399. SCMStringBuffer mainDefinition;
  400. query->getQueryMainDefinition(mainDefinition);
  401. if(mainDefinition.length())
  402. {
  403. IEspECLQuery* q=&info.updateQuery();
  404. q->setQueryMainDefinition(mainDefinition.str());
  405. }
  406. }
  407. if (version > 1.30)
  408. {
  409. SCMStringBuffer qText;
  410. query->getQueryText(qText);
  411. if ((qText.length() > 0) && isArchiveQuery(qText.str()))
  412. info.setHasArchiveQuery(true);
  413. }
  414. IArrayOf<IEspECLHelpFile> helpers;
  415. getHelpFiles(query, FileTypeCpp, helpers);
  416. getHelpFiles(query, FileTypeDll, helpers);
  417. getHelpFiles(query, FileTypeResText, helpers);
  418. getWorkunitThorLogInfo(helpers, info);
  419. if (cw->getWuidVersion() > 0)
  420. {
  421. Owned<IStringIterator> eclAgentLogs = cw->getLogs("EclAgent");
  422. ForEach (*eclAgentLogs)
  423. {
  424. SCMStringBuffer logName;
  425. eclAgentLogs->str(logName);
  426. if (logName.length() < 1)
  427. continue;
  428. Owned<IEspECLHelpFile> h= createECLHelpFile("","");
  429. h->setName(logName.str());
  430. h->setType(File_EclAgentLog);
  431. helpers.append(*h.getLink());
  432. }
  433. }
  434. else // legacy wuid
  435. {
  436. Owned<IStringIterator> eclAgentLogs = cw->getLogs("EclAgent");
  437. ForEach (*eclAgentLogs)
  438. {
  439. SCMStringBuffer name;
  440. eclAgentLogs->str(name);
  441. if (name.length() < 1)
  442. continue;
  443. Owned<IEspECLHelpFile> h= createECLHelpFile("","");
  444. h->setName(name.str());
  445. h->setType(File_EclAgentLog);
  446. helpers.append(*h.getLink());
  447. break;
  448. }
  449. }
  450. info.setHelpers(helpers);
  451. }
  452. catch(IException* e)
  453. {
  454. StringBuffer eMsg;
  455. ERRLOG("%s", e->errorMessage(eMsg).str());
  456. info.setHelpersDesc(eMsg.str());
  457. e->Release();
  458. }
  459. }
  460. void WsWuInfo::getApplicationValues(IEspECLWorkunit &info, unsigned flags)
  461. {
  462. if (!(flags & WUINFO_IncludeApplicationValues))
  463. return;
  464. try
  465. {
  466. IArrayOf<IEspApplicationValue> av;
  467. Owned<IConstWUAppValueIterator> app(&cw->getApplicationValues());
  468. ForEach(*app)
  469. {
  470. IConstWUAppValue& val=app->query();
  471. SCMStringBuffer buf;
  472. Owned<IEspApplicationValue> t= createApplicationValue("","");
  473. t->setApplication(val.getApplication(buf).str());
  474. t->setValue(val.getValue(buf).str());
  475. t->setName(val.getName(buf).str());
  476. t->setValue(val.getValue(buf).str());
  477. av.append(*t.getLink());
  478. }
  479. info.setApplicationValues(av);
  480. }
  481. catch(IException* e)
  482. {
  483. StringBuffer eMsg;
  484. ERRLOG("%s", e->errorMessage(eMsg).str());
  485. info.setApplicationValuesDesc(eMsg.str());
  486. e->Release();
  487. }
  488. }
  489. void WsWuInfo::getDebugValues(IEspECLWorkunit &info, unsigned flags)
  490. {
  491. if (!(flags & WUINFO_IncludeDebugValues))
  492. return;
  493. try
  494. {
  495. IArrayOf<IEspDebugValue> dv;
  496. Owned<IStringIterator> debugs(&cw->getDebugValues());
  497. ForEach(*debugs)
  498. {
  499. SCMStringBuffer name, val;
  500. debugs->str(name);
  501. cw->getDebugValue(name.str(),val);
  502. Owned<IEspDebugValue> t= createDebugValue("","");
  503. t->setName(name.str());
  504. t->setValue(val.str());
  505. dv.append(*t.getLink());
  506. }
  507. info.setDebugValues(dv);
  508. }
  509. catch(IException* e)
  510. {
  511. StringBuffer eMsg;
  512. ERRLOG("%s", e->errorMessage(eMsg).str());
  513. info.setDebugValuesDesc(eMsg.str());
  514. e->Release();
  515. }
  516. }
  517. const char *getGraphNum(const char *s,unsigned &num)
  518. {
  519. while (*s && !isdigit(*s))
  520. s++;
  521. num = 0;
  522. while (isdigit(*s))
  523. {
  524. num = num*10+*s-'0';
  525. s++;
  526. }
  527. return s;
  528. }
  529. void WsWuInfo::getGraphInfo(IEspECLWorkunit &info, unsigned flags)
  530. {
  531. if (version > 1.01)
  532. {
  533. info.setHaveSubGraphTimings(false);
  534. StringBuffer xpath("/WorkUnits/");
  535. xpath.append(wuid.str());
  536. Owned<IRemoteConnection> conn = querySDS().connect(xpath.str(), myProcessSession(), 0, 5*60*1000);
  537. if (!conn)
  538. {
  539. DBGLOG("Cannot connect to SDS");
  540. info.setGraphsDesc("Cannot connect to SDS");
  541. return;
  542. }
  543. IPropertyTree *wpt = conn->queryRoot();
  544. if (!wpt)
  545. {
  546. DBGLOG("Cannot get data from SDS");
  547. info.setGraphsDesc("Cannot get data from SDS");
  548. return;
  549. }
  550. Owned<IPropertyTreeIterator> iter = wpt->getElements("Timings/Timing");
  551. StringBuffer name;
  552. IArrayOf<IConstECLTimingData> timingdatarray;
  553. ForEach(*iter)
  554. {
  555. if (iter->query().getProp("@name",name.clear()))
  556. {
  557. if ((name.length()>11) && (strncmp("Graph graph", name.str(), 11)==0))
  558. {
  559. unsigned gn;
  560. const char *s = getGraphNum(name.str()+11, gn);
  561. unsigned sn;
  562. s = getGraphNum(s,sn);
  563. if (gn && sn)
  564. {
  565. info.setHaveSubGraphTimings(true);
  566. break;
  567. }
  568. }
  569. }
  570. }
  571. }
  572. if (!(flags & WUINFO_IncludeGraphs))
  573. return;
  574. try
  575. {
  576. SCMStringBuffer runningGraph;
  577. WUGraphIDType id;
  578. WUState st = cw->getState();
  579. bool running = (!(st==WUStateFailed || st==WUStateAborted || st==WUStateCompleted) && cw->getRunningGraph(runningGraph,id));
  580. IArrayOf<IEspECLGraph> graphs;
  581. Owned<IConstWUGraphIterator> it = &cw->getGraphs(GraphTypeAny);
  582. ForEach(*it)
  583. {
  584. IConstWUGraph &graph = it->query();
  585. if(!graph.isValid())
  586. continue;
  587. SCMStringBuffer name, label, type;
  588. graph.getName(name);
  589. graph.getLabel(label);
  590. graph.getTypeName(type);
  591. Owned<IEspECLGraph> g= createECLGraph("","");
  592. g->setName(name.str());
  593. g->setLabel(label.str());
  594. g->setType(type.str());
  595. if(running && strcmp(name.str(),runningGraph.str())==0)
  596. {
  597. g->setRunning(true);
  598. g->setRunningId(id);
  599. }
  600. Owned<IConstWUGraphProgress> progress = cw->getGraphProgress(name.str());
  601. if (progress)
  602. {
  603. WUGraphState graphstate= progress->queryGraphState();
  604. if (graphstate == WUGraphComplete)
  605. g->setComplete(true);
  606. if (version > 1.13 && graphstate == WUGraphFailed)
  607. {
  608. g->setFailed(true);
  609. }
  610. }
  611. graphs.append(*g.getLink());
  612. }
  613. info.setGraphs(graphs);
  614. }
  615. catch(IException* e)
  616. {
  617. StringBuffer eMsg;
  618. ERRLOG("%s", e->errorMessage(eMsg).str());
  619. info.setGraphsDesc(eMsg.str());
  620. e->Release();
  621. }
  622. }
  623. void WsWuInfo::getGraphTimingData(IArrayOf<IConstECLTimingData> &timingData, unsigned flags)
  624. {
  625. StringBuffer xpath("/WorkUnits/");
  626. xpath.append(wuid.str());
  627. Owned<IRemoteConnection> conn = querySDS().connect(xpath.str(), myProcessSession(), 0, 5*60*1000);
  628. if (!conn)
  629. {
  630. DBGLOG("Could not connect to SDS");
  631. throw MakeStringException(ECLWATCH_CANNOT_CONNECT_DALI, "Cannot connect to dali server.");
  632. }
  633. IPropertyTree *wpt = conn->queryRoot();
  634. Owned<IPropertyTreeIterator> iter = wpt->getElements("Timings/Timing");
  635. ForEach(*iter)
  636. {
  637. StringBuffer name;
  638. if (iter->query().getProp("@name", name))
  639. {
  640. if ((name.length()>11)&&(strncmp("Graph graph", name.str(), 11)==0))
  641. {
  642. unsigned gn;
  643. const char *s = getGraphNum(name.str(),gn);
  644. unsigned sn;
  645. s = getGraphNum(s, sn);
  646. if (gn && sn)
  647. {
  648. const char *gs = strchr(name.str(),'(');
  649. unsigned gid = 0;
  650. if (gs)
  651. getGraphNum(gs+1, gid);
  652. unsigned time = iter->query().getPropInt("@duration");
  653. Owned<IEspECLTimingData> g = createECLTimingData();
  654. g->setName(name.str());
  655. g->setGraphNum(gn);
  656. g->setSubGraphNum(sn);
  657. g->setGID(gid);
  658. g->setMS(time);
  659. g->setMin(time/60000);
  660. timingData.append(*g.getClear());
  661. }
  662. }
  663. }
  664. }
  665. }
  666. void WsWuInfo::getRoxieCluster(IEspECLWorkunit &info, unsigned flags)
  667. {
  668. if (version > 1.06)
  669. {
  670. Owned<IConstWURoxieQueryInfo> roxieQueryInfo = cw->getRoxieQueryInfo();
  671. if (roxieQueryInfo)
  672. {
  673. SCMStringBuffer roxieClusterName;
  674. roxieQueryInfo->getRoxieClusterName(roxieClusterName);
  675. info.setRoxieCluster(roxieClusterName.str());
  676. }
  677. }
  678. }
  679. void WsWuInfo::getEventScheduleFlag(IEspECLWorkunit &info)
  680. {
  681. info.setEventSchedule(0);
  682. if (info.getState() && !stricmp(info.getState(), "wait"))
  683. {
  684. info.setEventSchedule(2); //Can deschedule
  685. }
  686. else
  687. {
  688. Owned<IConstWorkflowItemIterator> it = cw->getWorkflowItems();
  689. if (it)
  690. {
  691. ForEach(*it)
  692. {
  693. IConstWorkflowItem *r = it->query();
  694. if (!r)
  695. continue;
  696. IWorkflowEvent *wfevent = r->getScheduleEvent();
  697. if (!wfevent)
  698. continue;
  699. if (!r->hasScheduleCount() || (r->queryScheduleCountRemaining() > 0))
  700. {
  701. info.setEventSchedule(1); //Can reschedule
  702. break;
  703. }
  704. }
  705. }
  706. }
  707. }
  708. void WsWuInfo::getCommon(IEspECLWorkunit &info, unsigned flags)
  709. {
  710. SCMStringBuffer s;
  711. info.setWuid(cw->getWuid(s).str());
  712. info.setProtected(cw->isProtected() ? 1 : 0);
  713. info.setJobname(cw->getJobName(s).str());
  714. info.setOwner(cw->getUser(s).str());
  715. info.setCluster(cw->getClusterName(clusterName).str());
  716. info.setSnapshot(cw->getSnapshot(s).str());
  717. if ((cw->getState() == WUStateScheduled) && cw->aborting())
  718. {
  719. info.setStateID(WUStateAborting);
  720. info.setState("aborting");
  721. }
  722. else
  723. {
  724. info.setStateID(cw->getState());
  725. info.setState(cw->getStateDesc(s).str());
  726. }
  727. if (cw->isPausing())
  728. info.setIsPausing(true);
  729. getEventScheduleFlag(info);
  730. if (version > 1.27)
  731. {
  732. StringBuffer totalThorTimeStr;
  733. unsigned totalThorTimeMS = cw->getTimerDuration(TOTALTHORTIME, NULL);
  734. formatDuration(totalThorTimeStr, totalThorTimeMS);
  735. info.setTotalThorTime(totalThorTimeStr.str());
  736. }
  737. WsWuDateTime dt;
  738. cw->getTimeScheduled(dt);
  739. if(dt.isValid())
  740. info.setDateTimeScheduled(dt.getString(s).str());
  741. getRoxieCluster(info, flags);
  742. }
  743. void WsWuInfo::getInfo(IEspECLWorkunit &info, unsigned flags)
  744. {
  745. getCommon(info, flags);
  746. SecAccessFlags accessFlag = getWsWorkunitAccess(context, *cw);
  747. info.setAccessFlag(accessFlag);
  748. SCMStringBuffer s;
  749. info.setStateEx(cw->getStateEx(s).str());
  750. info.setPriorityClass(cw->getPriority());
  751. info.setPriorityLevel(cw->getPriorityLevel());
  752. info.setScope(cw->getWuScope(s).str());
  753. info.setActionEx(cw->getActionEx(s).str());
  754. info.setDescription(cw->getDebugValue("description", s).str());
  755. if (version > 1.21)
  756. info.setXmlParams(cw->getXmlParams(s).str());
  757. info.setResultLimit(cw->getResultLimit());
  758. info.setArchived(false);
  759. info.setGraphCount(cw->getGraphCount());
  760. info.setSourceFileCount(cw->getSourceFileCount());
  761. info.setVariableCount(cw->getVariableCount());
  762. info.setTimerCount(cw->getTimerCount());
  763. info.setSourceFileCount(cw->getSourceFileCount());
  764. info.setApplicationValueCount(cw->getApplicationValueCount());
  765. info.setHasDebugValue(cw->hasDebugValue("__calculated__complexity__"));
  766. getClusterInfo(info, flags);
  767. getExceptions(info, flags);
  768. getHelpers(info, flags);
  769. getGraphInfo(info, flags);
  770. getSourceFiles(info, flags);
  771. getResults(info, flags);
  772. getVariables(info, flags);
  773. getTimers(info, flags);
  774. getDebugValues(info, flags);
  775. getApplicationValues(info, flags);
  776. getWorkflow(info, flags);
  777. }
  778. unsigned WsWuInfo::getWorkunitThorLogInfo(IArrayOf<IEspECLHelpFile>& helpers, IEspECLWorkunit &info)
  779. {
  780. unsigned countThorLog = 0;
  781. IArrayOf<IConstThorLogInfo> thorLogList;
  782. if (cw->getWuidVersion() > 0)
  783. {
  784. SCMStringBuffer clusterName;
  785. Owned<IConstWUClusterInfo> clusterInfo = getTargetClusterInfo(cw->getClusterName(clusterName).str());
  786. if (!clusterInfo)
  787. {
  788. SCMStringBuffer wuid;
  789. WARNLOG("Cannot find TargetClusterInfo for workunit %s", cw->getWuid(wuid).str());
  790. return countThorLog;
  791. }
  792. unsigned numberOfSlaves = clusterInfo->getSize();
  793. BoolHash uniqueProcesses;
  794. Owned<IStringIterator> thorInstances = cw->getProcesses("Thor");
  795. ForEach (*thorInstances)
  796. {
  797. SCMStringBuffer processName;
  798. thorInstances->str(processName);
  799. if ((processName.length() < 1) || uniqueProcesses.getValue(processName.str()))
  800. continue;
  801. uniqueProcesses.setValue(processName.str(), true);
  802. StringBuffer groupName;
  803. getClusterThorGroupName(groupName, processName.str());
  804. Owned<IStringIterator> thorLogs = cw->getLogs("Thor", processName.str());
  805. ForEach (*thorLogs)
  806. {
  807. SCMStringBuffer logName;
  808. thorLogs->str(logName);
  809. if (logName.length() < 1)
  810. continue;
  811. countThorLog++;
  812. StringBuffer fileType;
  813. if (countThorLog < 2)
  814. fileType.append(File_ThorLog);
  815. else
  816. fileType.appendf("%s%d", File_ThorLog, countThorLog);
  817. Owned<IEspECLHelpFile> h= createECLHelpFile("","");
  818. h->setName(logName.str());
  819. h->setDescription(processName.str());
  820. h->setType(fileType.str());
  821. helpers.append(*h.getLink());
  822. if (version < 1.38)
  823. continue;
  824. const char* pStr = logName.str();
  825. const char* ppStr = strstr(pStr, "/thormaster.");
  826. if (!ppStr)
  827. {
  828. WARNLOG("Invalid thorlog entry in workunit xml: %s", logName.str());
  829. continue;
  830. }
  831. ppStr += 12;
  832. StringBuffer logDate = ppStr;
  833. logDate.setLength(10);
  834. Owned<IEspThorLogInfo> thorLog = createThorLogInfo("","");
  835. thorLog->setProcessName(processName.str());
  836. thorLog->setClusterGroup(groupName.str());
  837. thorLog->setLogDate(logDate.str());
  838. thorLog->setNumberSlaves(numberOfSlaves);
  839. thorLogList.append(*thorLog.getLink());
  840. }
  841. }
  842. }
  843. else //legacy wuid
  844. {
  845. Owned<IStringIterator> thorLogs = cw->getLogs("Thor");
  846. ForEach (*thorLogs)
  847. {
  848. SCMStringBuffer name;
  849. thorLogs->str(name);
  850. if (name.length() < 1)
  851. continue;
  852. countThorLog++;
  853. StringBuffer fileType;
  854. if (countThorLog < 2)
  855. fileType.append(File_ThorLog);
  856. else
  857. fileType.appendf("%s%d", File_ThorLog, countThorLog);
  858. Owned<IEspECLHelpFile> h= createECLHelpFile("","");
  859. h->setName(name.str());
  860. h->setType(fileType.str());
  861. helpers.append(*h.getLink());
  862. }
  863. StringBuffer logDir;
  864. Owned<IEnvironmentFactory> envFactory = getEnvironmentFactory();
  865. Owned<IConstEnvironment> constEnv = envFactory->openEnvironmentByFile();
  866. constEnv->getPTree().getProp("EnvSettings/log", logDir);
  867. if (logDir.length() > 0)
  868. {
  869. Owned<IStringIterator> debugs = cw->getLogs("Thor");
  870. ForEach(*debugs)
  871. {
  872. SCMStringBuffer val;
  873. debugs->str(val);
  874. if (val.length() < 1)
  875. continue;
  876. const char* pStr = val.str();
  877. const char* ppStr = strstr(pStr, logDir.str());
  878. if (!ppStr)
  879. {
  880. WARNLOG("Invalid thorlog entry in workunit xml: %s", val.str());
  881. continue;
  882. }
  883. const char* pProcessName = ppStr + logDir.length();
  884. char sep = pProcessName[0];
  885. StringBuffer processName = pProcessName + 1;
  886. ppStr = strchr(pProcessName + 1, sep);
  887. if (!ppStr)
  888. {
  889. WARNLOG("Invalid thorlog entry in workunit xml: %s", val.str());
  890. continue;
  891. }
  892. processName.setLength(ppStr - pProcessName - 1);
  893. StringBuffer groupName;
  894. getClusterThorGroupName(groupName, processName.str());
  895. StringBuffer logDate = ppStr + 12;
  896. logDate.setLength(10);
  897. Owned<IEspThorLogInfo> thorLog = createThorLogInfo("","");
  898. thorLog->setProcessName(processName.str());
  899. thorLog->setClusterGroup(groupName.str());
  900. thorLog->setLogDate(logDate.str());
  901. //for legacy wuid, the log name does not contain slaveNum. So, a user may not specify
  902. //a slaveNum and we only display the first slave log if > 1 per IP.
  903. thorLog->setNumberSlaves(0);
  904. thorLogList.append(*thorLog.getLink());
  905. }
  906. }
  907. }
  908. if (thorLogList.length() > 0)
  909. info.setThorLogList(thorLogList);
  910. thorLogList.kill();
  911. return countThorLog;
  912. }
  913. bool WsWuInfo::getClusterInfo(IEspECLWorkunit &info, unsigned flags)
  914. {
  915. if (version > 1.04)
  916. {
  917. StringArray allowedClusters;
  918. SCMStringBuffer val;
  919. cw->getAllowedClusters(val);
  920. if (val.length() > 0)
  921. {
  922. const char* ptr = val.str();
  923. while(*ptr != '\0')
  924. {
  925. StringBuffer onesub;
  926. while(*ptr != '\0' && *ptr != ',')
  927. {
  928. onesub.append((char)(*ptr));
  929. ptr++;
  930. }
  931. if(onesub.length() > 0)
  932. allowedClusters.append(onesub.str());
  933. if(*ptr != '\0')
  934. ptr++;
  935. }
  936. }
  937. if (allowedClusters.length() > 0)
  938. info.setAllowedClusters(allowedClusters);
  939. }
  940. if (version > 1.23 && clusterName.length())
  941. {
  942. int clusterTypeFlag = 0;
  943. Owned<IEnvironmentFactory> envFactory = getEnvironmentFactory();
  944. Owned<IConstEnvironment> constEnv = envFactory->openEnvironment();
  945. Owned<IPropertyTree> root = &constEnv->getPTree();
  946. if (!root)
  947. throw MakeStringException(ECLWATCH_CANNOT_CONNECT_DALI,"Cannot connect to DALI server.");
  948. Owned<IConstWUClusterInfo> clusterInfo = getTargetClusterInfo(clusterName.str());
  949. if (clusterInfo.get())
  950. {//Set thor flag or roxie flag in order to display some options for thor or roxie
  951. ClusterType platform = clusterInfo->getPlatform();
  952. if (isThorCluster(platform))
  953. {
  954. clusterTypeFlag=1;
  955. if (version > 1.29)
  956. info.setThorLCR(ThorLCRCluster == platform);
  957. }
  958. else if (RoxieCluster == platform)
  959. clusterTypeFlag=2;
  960. }
  961. info.setClusterFlag(clusterTypeFlag);
  962. }
  963. return true;
  964. }
  965. void WsWuInfo::getWorkflow(IEspECLWorkunit &info, unsigned flags)
  966. {
  967. bool eventCountRemaining = false;
  968. bool eventCountUnlimited = false;
  969. try
  970. {
  971. info.setEventSchedule(0);
  972. IArrayOf<IConstECLWorkflow> workflows;
  973. Owned<IConstWorkflowItemIterator> it = cw->getWorkflowItems();
  974. if (it)
  975. {
  976. ForEach(*it)
  977. {
  978. IConstWorkflowItem *r = it->query();
  979. if (r)
  980. {
  981. IWorkflowEvent *wfevent = r->getScheduleEvent();
  982. if (wfevent)
  983. {
  984. Owned<IEspECLWorkflow> g;
  985. if (flags & WUINFO_IncludeWorkflows)
  986. {
  987. StringBuffer id;
  988. g.setown(createECLWorkflow("",""));
  989. g->setWFID(id.appendf("%d", r->queryWfid()).str());
  990. g->setEventName(wfevent->queryName());
  991. g->setEventText(wfevent->queryText());
  992. }
  993. if (r->hasScheduleCount())
  994. {
  995. if (r->queryScheduleCountRemaining() > 0)
  996. eventCountRemaining = true;
  997. if (flags & WUINFO_IncludeWorkflows)
  998. {
  999. g->setCount(r->queryScheduleCount());
  1000. g->setCountRemaining(r->queryScheduleCountRemaining());
  1001. }
  1002. }
  1003. else
  1004. {
  1005. eventCountUnlimited = true;
  1006. }
  1007. if (flags & WUINFO_IncludeWorkflows)
  1008. workflows.append(*g.getLink());
  1009. }
  1010. }
  1011. }
  1012. if (workflows.length() > 0)
  1013. info.setWorkflows(workflows);
  1014. workflows.kill();
  1015. }
  1016. }
  1017. catch(IException* e)
  1018. {
  1019. StringBuffer eMsg;
  1020. ERRLOG("%s", e->errorMessage(eMsg).str());
  1021. info.setWorkflowsDesc(eMsg.str());
  1022. e->Release();
  1023. }
  1024. if (info.getState() && !stricmp(info.getState(), "wait"))
  1025. info.setEventSchedule(2); //Can deschedule
  1026. else if (eventCountUnlimited || eventCountRemaining)
  1027. info.setEventSchedule(1); //Can reschedule
  1028. }
  1029. bool shouldFileContentBeShown(IEspContext &context, const char * logicalName)
  1030. {
  1031. StringBuffer username;
  1032. context.getUserID(username);
  1033. Owned<IUserDescriptor> userdesc(createUserDescriptor());
  1034. userdesc->set(username.str(), context.queryPassword());
  1035. Owned<IDistributedFile> df = queryDistributedFileDirectory().lookup(logicalName, userdesc);
  1036. if (!df)
  1037. return false;
  1038. bool blocked;
  1039. if (df->isCompressed(&blocked) && !blocked)
  1040. return false;
  1041. IPropertyTree & properties = df->queryAttributes();
  1042. const char * format = properties.queryProp("@format");
  1043. if (format && (stricmp(format,"csv")==0 || memicmp(format, "utf", 3) == 0))
  1044. {
  1045. return true;
  1046. }
  1047. const char * recordEcl = properties.queryProp("ECL");
  1048. if (!recordEcl)
  1049. return false;
  1050. MultiErrorReceiver errs;
  1051. Owned<IHqlExpression> ret = ::parseQuery(recordEcl, &errs);
  1052. return errs.errCount() == 0;
  1053. }
  1054. void WsWuInfo::getEclSchemaChildFields(IArrayOf<IEspECLSchemaItem>& schemas, IHqlExpression * expr, bool isConditional)
  1055. {
  1056. if(!expr)
  1057. return;
  1058. ForEachChild(idx, expr)
  1059. getEclSchemaFields(schemas, expr->queryChild(idx), isConditional);
  1060. }
  1061. void WsWuInfo::getEclSchemaFields(IArrayOf<IEspECLSchemaItem>& schemas, IHqlExpression * expr, bool isConditional)
  1062. {
  1063. if(!expr)
  1064. return;
  1065. int ret = expr->getOperator();
  1066. switch (ret)
  1067. {
  1068. case no_record:
  1069. getEclSchemaChildFields(schemas, expr, isConditional);
  1070. break;
  1071. case no_ifblock:
  1072. {
  1073. getEclSchemaChildFields(schemas, expr->queryChild(1), true);
  1074. break;
  1075. }
  1076. case no_field:
  1077. {
  1078. if (expr->hasProperty(__ifblockAtom))
  1079. break;
  1080. ITypeInfo * type = expr->queryType();
  1081. IAtom * name = expr->queryName();
  1082. IHqlExpression * nameAttr = expr->queryProperty(namedAtom);
  1083. StringBuffer outname;
  1084. if (nameAttr && nameAttr->queryChild(0) && nameAttr->queryChild(0)->queryValue())
  1085. nameAttr->queryChild(0)->queryValue()->getStringValue(outname);
  1086. else
  1087. outname.append(name).toLowerCase();
  1088. if(type)
  1089. {
  1090. type_t tc = type->getTypeCode();
  1091. if (tc == type_row)
  1092. {
  1093. getEclSchemaChildFields(schemas, expr->queryRecord(), isConditional);
  1094. }
  1095. else
  1096. {
  1097. if (type->getTypeCode() == type_alien)
  1098. {
  1099. IHqlAlienTypeInfo * alien = queryAlienType(type);
  1100. type = alien->queryPhysicalType();
  1101. }
  1102. Owned<IEspECLSchemaItem> schema = createECLSchemaItem("","");
  1103. StringBuffer eclType;
  1104. type->getECLType(eclType);
  1105. schema->setColumnName(outname);
  1106. schema->setColumnType(eclType.str());
  1107. schema->setColumnTypeCode(tc);
  1108. schema->setIsConditional(isConditional);
  1109. schemas.append(*schema.getClear());
  1110. }
  1111. }
  1112. break;
  1113. }
  1114. }
  1115. }
  1116. bool WsWuInfo::getResultEclSchemas(IConstWUResult &r, IArrayOf<IEspECLSchemaItem>& schemas)
  1117. {
  1118. SCMStringBuffer schema;
  1119. r.getResultEclSchema(schema);
  1120. if (!schema.length())
  1121. return false;
  1122. MultiErrorReceiver errs;
  1123. Owned<IHqlExpression> expr = ::parseQuery(schema.str(), &errs);
  1124. if (errs.errCount() != 0)
  1125. return false;
  1126. getEclSchemaFields(schemas, expr, false);
  1127. return true;
  1128. }
  1129. void WsWuInfo::getResult(IConstWUResult &r, IArrayOf<IEspECLResult>& results, unsigned flags)
  1130. {
  1131. SCMStringBuffer name;
  1132. r.getResultName(name);
  1133. SCMStringBuffer filename;
  1134. r.getResultLogicalName(filename);
  1135. StringBuffer value, link;
  1136. if (r.getResultStatus() == ResultStatusUndefined)
  1137. value.set("[undefined]");
  1138. else if (r.isResultScalar())
  1139. {
  1140. try
  1141. {
  1142. SCMStringBuffer xml;
  1143. r.getResultXml(xml);
  1144. Owned<IPropertyTree> props = createPTreeFromXMLString(xml.str(), ipt_caseInsensitive);
  1145. IPropertyTree *val = props->queryPropTree("Row/*");
  1146. if(val)
  1147. value.set(val->queryProp(NULL));
  1148. else
  1149. {
  1150. Owned<IResultSetFactory> resultSetFactory = getSecResultSetFactory(context.querySecManager(), context.queryUser(), context.queryUserId(), context.queryPassword());
  1151. Owned<INewResultSet> result;
  1152. result.setown(resultSetFactory->createNewResultSet(&r, wuid.str()));
  1153. Owned<IResultSetCursor> cursor(result->createCursor());
  1154. cursor->first();
  1155. if (cursor->getIsAll(0))
  1156. {
  1157. value.set("<All/>");
  1158. }
  1159. else
  1160. {
  1161. Owned<IResultSetCursor> childCursor = cursor->getChildren(0);
  1162. if (childCursor)
  1163. {
  1164. ForEach(*childCursor)
  1165. {
  1166. StringBuffer out;
  1167. StringBufferAdaptor adaptor(out);
  1168. childCursor->getDisplayText(adaptor, 0);
  1169. if (!value.length())
  1170. value.append('[');
  1171. else
  1172. value.append(", ");
  1173. value.append('\'').append(out.str()).append('\'');
  1174. }
  1175. if (value.length())
  1176. value.append(']');
  1177. }
  1178. }
  1179. }
  1180. }
  1181. catch(...)
  1182. {
  1183. value.append("[value not available]");
  1184. }
  1185. }
  1186. else
  1187. {
  1188. value.append('[').append(r.getResultTotalRowCount()).append(" rows]");
  1189. if(r.getResultSequence()>=0)
  1190. {
  1191. if(filename.length())
  1192. {
  1193. StringBuffer username;
  1194. context.getUserID(username);
  1195. Owned<IUserDescriptor> userdesc(createUserDescriptor());
  1196. userdesc->set(username.str(), context.queryPassword());
  1197. Owned<IDistributedFile> df = queryDistributedFileDirectory().lookup(filename.str(), userdesc);
  1198. if(df && df->queryAttributes().hasProp("ECL"))
  1199. link.append(r.getResultSequence());
  1200. }
  1201. else
  1202. link.append(r.getResultSequence());
  1203. }
  1204. }
  1205. Owned<IEspECLResult> result= createECLResult("","");
  1206. if (flags & WUINFO_IncludeEclSchemas)
  1207. {
  1208. IArrayOf<IEspECLSchemaItem> schemas;
  1209. if (getResultEclSchemas(r, schemas))
  1210. result->setECLSchemas(schemas);
  1211. }
  1212. if (flags & WUINFO_IncludeXmlSchema)
  1213. {
  1214. Owned<IResultSetFactory> resultSetFactory = getSecResultSetFactory(context.querySecManager(), context.queryUser(), context.queryUserId(), context.queryPassword());
  1215. Owned<INewResultSet> rs = resultSetFactory->createNewResultSet(&r, wuid.str());
  1216. Owned<IResultSetCursor> cursor(rs->createCursor());
  1217. SCMStringBuffer xsd;
  1218. const IResultSetMetaData & meta = cursor->queryResultSet()->getMetaData();
  1219. meta.getXmlXPathSchema(xsd, false);
  1220. result->setXmlSchema(xsd.str());
  1221. }
  1222. if (filename.length())
  1223. result->setShowFileContent(shouldFileContentBeShown(context, filename.str()));
  1224. result->setName(name.str());
  1225. result->setLink(link.str());
  1226. result->setSequence(r.getResultSequence());
  1227. result->setValue(value.str());
  1228. result->setFileName(filename.str());
  1229. result->setIsSupplied(r.getResultStatus() == ResultStatusSupplied);
  1230. result->setTotal(r.getResultTotalRowCount());
  1231. results.append(*result.getLink());
  1232. }
  1233. void WsWuInfo::getResults(IEspECLWorkunit &info, unsigned flags)
  1234. {
  1235. try
  1236. {
  1237. unsigned count = 0;
  1238. IArrayOf<IEspECLResult> results;
  1239. Owned<IConstWUResultIterator> it = &(cw->getResults());
  1240. ForEach(*it)
  1241. {
  1242. IConstWUResult &r = it->query();
  1243. if(r.getResultSequence()>=0)
  1244. {
  1245. if (flags & WUINFO_IncludeResults)
  1246. getResult(r, results, flags);
  1247. count++;
  1248. }
  1249. }
  1250. if (version >= 1.17)
  1251. info.setResultCount(count);
  1252. if ((flags & WUINFO_IncludeResults) && results.length() > 0)
  1253. info.setResults(results);
  1254. results.kill();
  1255. }
  1256. catch(IException* e)
  1257. {
  1258. StringBuffer eMsg;
  1259. ERRLOG("%s", e->errorMessage(eMsg).str());
  1260. info.setResultsDesc(eMsg.str());
  1261. e->Release();
  1262. }
  1263. }
  1264. void WsWuInfo::getHelpFiles(IConstWUQuery* query, WUFileType type, IArrayOf<IEspECLHelpFile>& helpers)
  1265. {
  1266. if (!query)
  1267. return;
  1268. SCMStringBuffer name, Ip, description;
  1269. Owned<IConstWUAssociatedFileIterator> iter = &query->getAssociatedFiles();
  1270. ForEach(*iter)
  1271. {
  1272. IConstWUAssociatedFile & cur = iter->query();
  1273. if (cur.getType() != type)
  1274. continue;
  1275. cur.getName(name);
  1276. Owned<IEspECLHelpFile> h= createECLHelpFile("","");
  1277. h->setName(name.str());
  1278. switch (type)
  1279. {
  1280. case FileTypeCpp:
  1281. h->setType("cpp");
  1282. break;
  1283. case FileTypeDll:
  1284. h->setType("dll");
  1285. break;
  1286. default:
  1287. h->setType("res");
  1288. break;
  1289. }
  1290. if (version > 1.31)
  1291. {
  1292. cur.getIp(Ip);
  1293. h->setIPAddress(Ip.str());
  1294. Ip.clear();
  1295. cur.getDescription(description);
  1296. if ((description.length() < 1) && (name.length() > 0))
  1297. {
  1298. const char* desc = pathTail(name.str());
  1299. if (desc && *desc)
  1300. description.set(desc);
  1301. }
  1302. if (description.length() < 1)
  1303. description.set("Help File");
  1304. h->setDescription(description.str());
  1305. description.clear();
  1306. }
  1307. helpers.append(*h.getLink());
  1308. name.clear();
  1309. }
  1310. }
  1311. void WsWuInfo::getSubFiles(IPropertyTreeIterator* f, IEspECLSourceFile* eclSuperFile, StringArray& fileNames)
  1312. {
  1313. IArrayOf<IEspECLSourceFile> files;
  1314. ForEach(*f)
  1315. {
  1316. IPropertyTree &query = f->query();
  1317. const char *clusterName = query.queryProp("@cluster");
  1318. const char *fileName = query.queryProp("@name");
  1319. int fileCount = query.getPropInt("@useCount");
  1320. bool bFound = false;
  1321. if (fileName && *fileName && (fileNames.length() > 0))
  1322. {
  1323. for (unsigned i = 0; i < fileNames.length(); i++ )
  1324. {
  1325. const char *fileName0 = fileNames.item(i);
  1326. if (!stricmp(fileName, fileName0))
  1327. {
  1328. bFound = true;
  1329. break;
  1330. }
  1331. }
  1332. }
  1333. if (bFound)
  1334. continue;
  1335. Owned<IEspECLSourceFile> file= createECLSourceFile("","");
  1336. if(clusterName && *clusterName)
  1337. {
  1338. file->setFileCluster(clusterName);
  1339. }
  1340. if (fileName && *fileName)
  1341. {
  1342. file->setName(fileName);
  1343. fileNames.append(fileName);
  1344. }
  1345. file->setCount(fileCount);
  1346. Owned<IPropertyTreeIterator> filetrees= query.getElements("Subfile");
  1347. if (filetrees->first())
  1348. {
  1349. file->setIsSuperFile(true);
  1350. getSubFiles(filetrees, file, fileNames);
  1351. }
  1352. files.append(*file.getLink());
  1353. }
  1354. eclSuperFile->setECLSourceFiles(files);
  1355. return;
  1356. }
  1357. bool WsWuInfo::getResultViews(StringArray &viewnames, unsigned flags)
  1358. {
  1359. if (!(flags & WUINFO_IncludeResultsViewNames))
  1360. return true;
  1361. try
  1362. {
  1363. Owned<IWuWebView> wv = createWuWebView(*cw, NULL, NULL, false);
  1364. if (wv)
  1365. wv->getResultViewNames(viewnames);
  1366. return true;
  1367. }
  1368. catch(IException* e)
  1369. {
  1370. StringBuffer eMsg;
  1371. ERRLOG("%s", e->errorMessage(eMsg).str());
  1372. e->Release();
  1373. }
  1374. return false;
  1375. }
  1376. void appendIOStreamContent(MemoryBuffer &mb, IFileIOStream *ios, bool forDownload)
  1377. {
  1378. StringBuffer line;
  1379. bool eof = false;
  1380. while (!eof)
  1381. {
  1382. line.clear();
  1383. loop
  1384. {
  1385. char c;
  1386. size32_t numRead = ios->read(1, &c);
  1387. if (!numRead)
  1388. {
  1389. eof = true;
  1390. break;
  1391. }
  1392. line.append(c);
  1393. if (c=='\n')
  1394. break;
  1395. }
  1396. mb.append(line.length(), line.str());
  1397. if (!forDownload && (mb.length() > 640000))
  1398. break;
  1399. }
  1400. }
  1401. void WsWuInfo::getWorkunitEclAgentLog(const char* fileName, MemoryBuffer& buf)
  1402. {
  1403. if(!fileName || !*fileName)
  1404. throw MakeStringException(ECLWATCH_ECLAGENT_LOG_NOT_FOUND,"Log file not specified");
  1405. Owned<IFile> rFile = createIFile(fileName);
  1406. if(!rFile)
  1407. throw MakeStringException(ECLWATCH_CANNOT_OPEN_FILE, "Cannot open file %s.", fileName);
  1408. OwnedIFileIO rIO = rFile->openShared(IFOread,IFSHfull);
  1409. if(!rIO)
  1410. throw MakeStringException(ECLWATCH_CANNOT_READ_FILE, "Cannot read file %s.", fileName);
  1411. OwnedIFileIOStream ios = createBufferedIOStream(rIO);
  1412. StringBuffer line;
  1413. bool eof = false;
  1414. bool wuidFound = false;
  1415. unsigned pid = cw->getAgentPID();
  1416. VStringBuffer pidstr(" %5d ", pid);
  1417. char const * pidchars = pidstr.str();
  1418. while(!eof)
  1419. {
  1420. line.clear();
  1421. loop
  1422. {
  1423. char c;
  1424. size32_t numRead = ios->read(1, &c);
  1425. if (!numRead)
  1426. {
  1427. eof = true;
  1428. break;
  1429. }
  1430. line.append(c);
  1431. if (c=='\n')
  1432. break;
  1433. }
  1434. //Retain all rows that match a unique program instance - by retaining all rows that match a pid
  1435. if(strstr(line.str(), pidchars))
  1436. {
  1437. //Check if this is a new instance using line sequence number
  1438. if (strncmp(line.str(), "00000000", 8) == 0)
  1439. {
  1440. if (wuidFound) //If the correct instance has been found, return that instance before the next instance.
  1441. break;
  1442. //The last instance is not a correct instance. Clean the buf in order to start a new instance.
  1443. buf.clear();
  1444. }
  1445. //If we spot the workunit id anywhere in the tacing for this pid then assume it is the correct instance.
  1446. if(!wuidFound && strstr(line.str(), wuid.str()))
  1447. wuidFound = true;
  1448. buf.append(line.length(), line.str());
  1449. }
  1450. }
  1451. if (buf.length() < 1)
  1452. buf.append(47, "(Not found a log line related to this workunit)");
  1453. }
  1454. void WsWuInfo::getWorkunitThorLog(const char* fileName, MemoryBuffer& buf)
  1455. {
  1456. if(!fileName || !*fileName)
  1457. throw MakeStringException(ECLWATCH_ECLAGENT_LOG_NOT_FOUND,"Log file not specified");
  1458. Owned<IFile> rFile = createIFile(fileName);
  1459. if (!rFile)
  1460. throw MakeStringException(ECLWATCH_CANNOT_OPEN_FILE,"Cannot open file %s.",fileName);
  1461. OwnedIFileIO rIO = rFile->openShared(IFOread,IFSHfull);
  1462. if (!rIO)
  1463. throw MakeStringException(ECLWATCH_CANNOT_READ_FILE,"Cannot read file %s.",fileName);
  1464. OwnedIFileIOStream ios = createBufferedIOStream(rIO);
  1465. StringBuffer line;
  1466. bool eof = false;
  1467. bool include = false;
  1468. VStringBuffer startwuid("Started wuid=%s", wuid.str());
  1469. VStringBuffer endwuid("Finished wuid=%s", wuid.str());
  1470. const char *sw = startwuid.str();
  1471. const char *ew = endwuid.str();
  1472. while (!eof)
  1473. {
  1474. line.clear();
  1475. loop
  1476. {
  1477. char c;
  1478. size32_t numRead = ios->read(1, &c);
  1479. if (!numRead)
  1480. {
  1481. eof = true;
  1482. break;
  1483. }
  1484. line.append(c);
  1485. if (c=='\n')
  1486. break;
  1487. }
  1488. if (strstr(line.str(), sw))
  1489. include = true;
  1490. if (include)
  1491. buf.append(line.length(), line.str());
  1492. if (strstr(line.str(), ew))
  1493. include = false;
  1494. }
  1495. }
  1496. void WsWuInfo::getWorkunitThorSlaveLog(const char *groupName, const char *ipAddress, const char* logDate, const char* logDir, int slaveNum, MemoryBuffer& buf, bool forDownload)
  1497. {
  1498. if (isEmpty(logDir))
  1499. throw MakeStringException(ECLWATCH_INVALID_INPUT,"ThorSlave log path not specified.");
  1500. if (isEmpty(logDate))
  1501. throw MakeStringException(ECLWATCH_INVALID_INPUT,"ThorSlave log date not specified.");
  1502. StringBuffer slaveIPAddress, logName;
  1503. if (slaveNum > 0)
  1504. {
  1505. if (isEmpty(groupName))
  1506. throw MakeStringException(ECLWATCH_INVALID_INPUT,"Thor group not specified.");
  1507. Owned<IGroup> nodeGroup = queryNamedGroupStore().lookup(groupName);
  1508. if (!nodeGroup || (nodeGroup->ordinality() == 0))
  1509. {
  1510. WARNLOG("Node group %s not found", groupName);
  1511. return;
  1512. }
  1513. nodeGroup->queryNode(slaveNum-1).endpoint().getIpText(slaveIPAddress);
  1514. if (slaveIPAddress.length() < 1)
  1515. throw MakeStringException(ECLWATCH_INVALID_INPUT,"ThorSlave log network address not found.");
  1516. logName.appendf("thorslave.%d.%s.log", slaveNum, logDate);
  1517. }
  1518. else
  1519. {//legacy wuid: a user types in an IP address for a thor slave
  1520. if (isEmpty(ipAddress))
  1521. throw MakeStringException(ECLWATCH_INVALID_INPUT,"ThorSlave address not specified.");
  1522. //thorslave.10.239.219.6_20100.2012_05_23.log
  1523. logName.appendf("thorslave.%s*.%s.log", ipAddress, logDate);
  1524. const char* portPtr = strchr(ipAddress, '_');
  1525. if (!portPtr)
  1526. slaveIPAddress.append(ipAddress);
  1527. else
  1528. {
  1529. StringBuffer ipAddressStr = ipAddress;
  1530. ipAddressStr.setLength(portPtr - ipAddress);
  1531. slaveIPAddress.append(ipAddressStr.str());
  1532. }
  1533. }
  1534. RemoteFilename rfn;
  1535. rfn.setRemotePath(logDir);
  1536. SocketEndpoint ep(slaveIPAddress.str());
  1537. rfn.setIp(ep);
  1538. Owned<IFile> dir = createIFile(rfn);
  1539. Owned<IDirectoryIterator> diriter = dir->directoryFiles(logName.str());
  1540. if (!diriter->first())
  1541. throw MakeStringException(ECLWATCH_FILE_NOT_EXIST,"Cannot find Thor slave log file %s.", logName.str());
  1542. Linked<IFile> logfile = &diriter->query();
  1543. diriter.clear();
  1544. dir.clear();
  1545. // logfile is now the file to load
  1546. OwnedIFileIO rIO = logfile->openShared(IFOread,IFSHfull);
  1547. if (!rIO)
  1548. throw MakeStringException(ECLWATCH_CANNOT_READ_FILE,"Cannot read file %s.",logName.str());
  1549. OwnedIFileIOStream ios = createBufferedIOStream(rIO);
  1550. if (slaveNum > 0)
  1551. {
  1552. StringBuffer line;
  1553. bool eof = false;
  1554. bool include = false;
  1555. VStringBuffer startwuid("Started wuid=%s", wuid.str());
  1556. VStringBuffer endwuid("Finished wuid=%s", wuid.str());
  1557. const char *sw = startwuid.str();
  1558. const char *ew = endwuid.str();
  1559. while (!eof)
  1560. {
  1561. line.clear();
  1562. loop
  1563. {
  1564. char c;
  1565. size32_t numRead = ios->read(1, &c);
  1566. if (!numRead)
  1567. {
  1568. eof = true;
  1569. break;
  1570. }
  1571. line.append(c);
  1572. if (c=='\n')
  1573. break;
  1574. }
  1575. if (strstr(line.str(), sw))
  1576. include = true;
  1577. if (include)
  1578. buf.append(line.length(), line.str());
  1579. if (strstr(line.str(), ew))
  1580. include = false;
  1581. }
  1582. }
  1583. else
  1584. {//legacy wuid
  1585. appendIOStreamContent(buf, ios.get(), forDownload);
  1586. }
  1587. }
  1588. void WsWuInfo::getWorkunitResTxt(MemoryBuffer& buf)
  1589. {
  1590. Owned<IConstWUQuery> query = cw->getQuery();
  1591. if(!query)
  1592. throw MakeStringException(ECLWATCH_QUERY_NOT_FOUND_FOR_WU,"No query for workunit %s.",wuid.str());
  1593. SCMStringBuffer resname;
  1594. queryDllServer().getDll(query->getQueryResTxtName(resname).str(), buf);
  1595. }
  1596. void WsWuInfo::getWorkunitArchiveQuery(MemoryBuffer& buf)
  1597. {
  1598. Owned<IConstWUQuery> query = cw->getQuery();
  1599. if(!query)
  1600. throw MakeStringException(ECLWATCH_QUERY_NOT_FOUND_FOR_WU,"No query for workunit %s.",wuid.str());
  1601. SCMStringBuffer queryText;
  1602. query->getQueryText(queryText);
  1603. if ((queryText.length() < 1) || !isArchiveQuery(queryText.str()))
  1604. throw MakeStringException(ECLWATCH_CANNOT_GET_WORKUNIT, "Archive Query not found for workunit %s.", wuid.str());
  1605. buf.append(queryText.length(), queryText.str());
  1606. }
  1607. void WsWuInfo::getWorkunitDll(StringBuffer &dllname, MemoryBuffer& buf)
  1608. {
  1609. Owned<IConstWUQuery> query = cw->getQuery();
  1610. if(!query)
  1611. throw MakeStringException(ECLWATCH_QUERY_NOT_FOUND_FOR_WU,"No query for workunit %s.",wuid.str());
  1612. StringBufferAdaptor isvName(dllname);
  1613. query->getQueryDllName(isvName);
  1614. queryDllServer().getDll(dllname.str(), buf);
  1615. }
  1616. void WsWuInfo::getWorkunitXml(const char* plainText, MemoryBuffer& buf)
  1617. {
  1618. const char* header;
  1619. if (plainText && (!stricmp(plainText, "yes")))
  1620. header = "<?xml version=\"1.0\" encoding=\"UTF-8\"?>";
  1621. else
  1622. header = "<?xml version=\"1.0\" encoding=\"UTF-8\"?><?xml-stylesheet href=\"../esp/xslt/xmlformatter.xsl\" type=\"text/xsl\"?>";
  1623. SCMStringBuffer xml;
  1624. exportWorkUnitToXML(cw, xml);
  1625. buf.append(strlen(header), header);
  1626. buf.append(xml.length(), xml.str());
  1627. }
  1628. void WsWuInfo::getWorkunitCpp(const char *cppname, const char* description, const char* ipAddress, MemoryBuffer& buf, bool forDownload)
  1629. {
  1630. if (isEmpty(description))
  1631. throw MakeStringException(ECLWATCH_INVALID_INPUT, "File not specified.");
  1632. if (isEmpty(ipAddress))
  1633. throw MakeStringException(ECLWATCH_INVALID_INPUT, "File location not specified.");
  1634. if (isEmpty(cppname))
  1635. throw MakeStringException(ECLWATCH_INVALID_FILE_NAME, "File path not specified.");
  1636. RemoteFilename rfn;
  1637. rfn.setRemotePath(cppname);
  1638. SocketEndpoint ep(ipAddress);
  1639. rfn.setIp(ep);
  1640. Owned<IFile> cppfile = createIFile(rfn);
  1641. if (!cppfile)
  1642. throw MakeStringException(ECLWATCH_CANNOT_OPEN_FILE, "Cannot open %s.", description);
  1643. OwnedIFileIO rIO = cppfile->openShared(IFOread,IFSHfull);
  1644. if (!rIO)
  1645. throw MakeStringException(ECLWATCH_CANNOT_READ_FILE,"Cannot read %s.", description);
  1646. OwnedIFileIOStream ios = createBufferedIOStream(rIO);
  1647. if (!ios)
  1648. throw MakeStringException(ECLWATCH_CANNOT_READ_FILE,"Cannot read %s.", description);
  1649. appendIOStreamContent(buf, ios.get(), forDownload);
  1650. }
  1651. WsWuSearch::WsWuSearch(IEspContext& context,const char* owner,const char* state,const char* cluster,const char* startDate,const char* endDate,const char* ecl,const char* jobname,const char* appname,const char* appkey,const char* appvalue)
  1652. {
  1653. SecAccessFlags accessOwn;
  1654. SecAccessFlags accessOthers;
  1655. getUserWuAccessFlags(context, accessOwn, accessOthers, true);
  1656. Owned<IWorkUnitFactory> factory = getWorkUnitFactory(context.querySecManager(), context.queryUser());
  1657. StringBuffer xpath("*");
  1658. if(ecl && *ecl)
  1659. xpath.append("[Query/Text=?~\"*").append(ecl).append("*\"]");
  1660. if(state && *state)
  1661. xpath.append("[@state=\"").append(state).append("\"]");
  1662. if(cluster && *cluster)
  1663. xpath.append("[@clusterName=\"").append(cluster).append("\"]");
  1664. if(owner && *owner)
  1665. xpath.append("[@submitID=?~\"").append(owner).append("\"]");
  1666. if(jobname && *jobname)
  1667. xpath.append("[@jobName=?~\"*").append(jobname).append("*\"]");
  1668. if((appname && *appname) || (appkey && *appkey) || (appvalue && *appvalue))
  1669. {
  1670. xpath.append("[Application/").append(appname && *appname ? appname : "*");
  1671. xpath.append("/").append(appkey && *appkey ? appkey : "*");
  1672. if(appvalue && *appvalue)
  1673. xpath.append("=?~\"").append(appvalue).append("\"");
  1674. xpath.append("]");
  1675. }
  1676. Owned<IConstWorkUnitIterator> it(factory->getWorkUnitsByXPath(xpath.str()));
  1677. StringBuffer wuFrom, wuTo;
  1678. if(startDate && *startDate)
  1679. createWuidFromDate(startDate, wuFrom);
  1680. if(endDate && *endDate)
  1681. createWuidFromDate(endDate, wuTo);
  1682. ForEach(*it)
  1683. {
  1684. IConstWorkUnit &cw = it->query();
  1685. if (chooseWuAccessFlagsByOwnership(context.queryUserId(), cw, accessOwn, accessOthers) < SecAccess_Read)
  1686. continue;
  1687. SCMStringBuffer wuid;
  1688. cw.getWuid(wuid);
  1689. if (wuFrom.length() && strcmp(wuid.str(),wuFrom.str())<0)
  1690. continue;
  1691. if (wuTo.length() && strcmp(wuid.str(),wuTo.str())>0)
  1692. continue;
  1693. if (state && *state)
  1694. {
  1695. SCMStringBuffer descr;
  1696. if(!strieq(cw.getStateDesc(descr).str(),state))
  1697. continue;
  1698. }
  1699. SCMStringBuffer parent;
  1700. if (!cw.getParentWuid(parent).length())
  1701. {
  1702. parent.clear();
  1703. wuids.push_back(cw.getWuid(parent).str());
  1704. }
  1705. }
  1706. std::sort(wuids.begin(),wuids.end(),std::greater<std::string>());
  1707. }
  1708. StringBuffer& WsWuSearch::createWuidFromDate(const char* timestamp,StringBuffer& s)
  1709. {
  1710. CDateTime wuTime;
  1711. wuTime.setString(timestamp,NULL,true);
  1712. unsigned year, month, day, hour, minute, second, nano;
  1713. wuTime.getDate(year, month, day, true);
  1714. wuTime.getTime(hour, minute, second, nano, true);
  1715. s.appendf("W%4d%02d%02d-%02d%02d%02d",year,month,day,hour,minute,second);
  1716. return s;
  1717. }
  1718. struct CompareData
  1719. {
  1720. CompareData(const char* _filter): filter(_filter) {}
  1721. bool operator()(const Linked<DataCacheElement>& e) const
  1722. {
  1723. return stricmp(e->m_filter.c_str(),filter)==0;
  1724. }
  1725. const char* filter;
  1726. };
  1727. DataCacheElement* DataCache::lookup(IEspContext &context, const char* filter, unsigned timeOutMin)
  1728. {
  1729. CriticalBlock block(crit);
  1730. if (cache.size() < 1)
  1731. return NULL;
  1732. //erase data if it should be
  1733. CDateTime timeNow;
  1734. int timeout = timeOutMin;
  1735. timeNow.setNow();
  1736. timeNow.adjustTime(-timeout);
  1737. while (true)
  1738. {
  1739. std::list<Linked<DataCacheElement> >::iterator list_iter = cache.begin();
  1740. if (list_iter == cache.end())
  1741. break;
  1742. DataCacheElement* awu = list_iter->getLink();
  1743. if (!awu || (awu->m_timeCached > timeNow))
  1744. break;
  1745. cache.pop_front();
  1746. }
  1747. if (cache.size() < 1)
  1748. return NULL;
  1749. //Check whether we have the data cache for this cluster. If yes, get the version
  1750. std::list<Linked<DataCacheElement> >::iterator it = std::find_if(cache.begin(),cache.end(),CompareData(filter));
  1751. if(it!=cache.end())
  1752. {
  1753. return it->getLink();
  1754. }
  1755. return NULL;
  1756. }
  1757. void DataCache::add(const char* filter, const char* data, const char* name, const char* localName, const char* wuid,
  1758. const char* resultName, unsigned seq, __int64 start, unsigned count, __int64 requested, __int64 total)
  1759. {
  1760. CriticalBlock block(crit);
  1761. //Save new data
  1762. Owned<DataCacheElement> e=new DataCacheElement(filter, data, name, localName, wuid, resultName, seq, start, count, requested, total);
  1763. if (cacheSize > 0)
  1764. {
  1765. if (cache.size() >= cacheSize)
  1766. cache.pop_front();
  1767. cache.push_back(e.get());
  1768. }
  1769. return;
  1770. }
  1771. struct CompareArchivedWUs
  1772. {
  1773. CompareArchivedWUs(const char* _filter): filter(_filter) {}
  1774. bool operator()(const Linked<ArchivedWuCacheElement>& e) const
  1775. {
  1776. return stricmp(e->m_filter.c_str(),filter)==0;
  1777. }
  1778. const char* filter;
  1779. };
  1780. ArchivedWuCacheElement* ArchivedWuCache::lookup(IEspContext &context, const char* filter, const char* sashaUpdatedWhen, unsigned timeOutMin)
  1781. {
  1782. CriticalBlock block(crit);
  1783. if (cache.size() < 1)
  1784. return NULL;
  1785. //erase data if it should be
  1786. CDateTime timeNow;
  1787. int timeout = timeOutMin;
  1788. timeNow.setNow();
  1789. timeNow.adjustTime(-timeout);
  1790. while (true)
  1791. {
  1792. std::list<Linked<ArchivedWuCacheElement> >::iterator list_iter = cache.begin();
  1793. if (list_iter == cache.end())
  1794. break;
  1795. ArchivedWuCacheElement* awu = list_iter->getLink();
  1796. if (awu && !stricmp(sashaUpdatedWhen, awu->m_sashaUpdatedWhen.c_str()) && (awu->m_timeCached > timeNow))
  1797. break;
  1798. cache.pop_front();
  1799. }
  1800. if (cache.size() < 1)
  1801. return NULL;
  1802. //Check whether we have the data cache for this cluster. If yes, get the version
  1803. std::list<Linked<ArchivedWuCacheElement> >::iterator it = std::find_if(cache.begin(),cache.end(),CompareArchivedWUs(filter));
  1804. if(it!=cache.end())
  1805. return it->getLink();
  1806. return NULL;
  1807. }
  1808. void ArchivedWuCache::add(const char* filter, const char* sashaUpdatedWhen, bool hasNextPage, IArrayOf<IEspECLWorkunit>& wus)
  1809. {
  1810. CriticalBlock block(crit);
  1811. //Save new data
  1812. Owned<ArchivedWuCacheElement> e=new ArchivedWuCacheElement(filter, sashaUpdatedWhen, hasNextPage, /*data.str(),*/ wus);
  1813. if (cacheSize > 0)
  1814. {
  1815. if (cache.size() >= cacheSize)
  1816. cache.pop_front();
  1817. cache.push_back(e.get());
  1818. }
  1819. return;
  1820. }
  1821. WsWuJobQueueAuditInfo::WsWuJobQueueAuditInfo(IEspContext &context, const char *cluster, const char *from , const char *to, CHttpResponse* response, const char *xls)
  1822. {
  1823. if(!response)
  1824. return;
  1825. unsigned maxDisplay = 125;
  1826. IArrayOf<IEspThorQueue> items;
  1827. CDateTime fromTime;
  1828. CDateTime toTime;
  1829. StringBuffer fromstr;
  1830. StringBuffer tostr;
  1831. if(from && *from)
  1832. {
  1833. fromTime.setString(from,NULL,false);
  1834. fromTime.getString(fromstr, false);
  1835. }
  1836. if(to && *to)
  1837. {
  1838. toTime.setString(to,NULL,false);
  1839. toTime.getString(tostr, false);
  1840. }
  1841. StringBuffer filter("ThorQueueMonitor");
  1842. if(notEmpty(cluster))
  1843. filter.appendf(",%s", cluster);
  1844. StringAttrArray lines;
  1845. queryAuditLogs(fromTime, toTime, filter.str(), lines);
  1846. unsigned countLines = 0;
  1847. unsigned maxConnected = 0;
  1848. unsigned longestQueue = 0;
  1849. ForEachItemIn(idx, lines)
  1850. {
  1851. const char* line = lines.item(idx).text;
  1852. if(!line || !*line)
  1853. continue;
  1854. if (idx < (lines.length() - 1))
  1855. getAuditLineInfo(line, longestQueue, maxConnected, maxDisplay, 1, items);
  1856. else
  1857. getAuditLineInfo(line, longestQueue, maxConnected, maxDisplay, 2, items);
  1858. countLines++;
  1859. }
  1860. StringBuffer responsebuf;
  1861. if (items.length() < 1)
  1862. {
  1863. responsebuf.append("<script language=\"javascript\">\r\nparent.displayQEnd(\'No data found\')</script>\r\n");
  1864. response->sendChunk(responsebuf.str());
  1865. return;
  1866. }
  1867. unsigned itemCount = items.length();
  1868. if (itemCount > maxDisplay)
  1869. itemCount = maxDisplay;
  1870. responsebuf.append("<script language=\"javascript\">parent.displayQLegend()</script>\r\n");
  1871. response->sendChunk(responsebuf.str());
  1872. responsebuf.clear();
  1873. responsebuf.append("<script language=\"javascript\">parent.displayQBegin(").append(longestQueue).append(",").append(maxConnected).append(",").append(itemCount).append(")</script>\r\n");
  1874. response->sendChunk(responsebuf.str());
  1875. responsebuf.clear();
  1876. responsebuf.append("<script language=\"javascript\">\r\n");
  1877. //bool displayDT = false;
  1878. unsigned count = 0;
  1879. unsigned jobpending=0;
  1880. ForEachItemIn(i,items)
  1881. {
  1882. IEspThorQueue& tq = items.item(i);
  1883. //displayDT = !displayDT;
  1884. count++;
  1885. if (count > maxDisplay)
  1886. break;
  1887. StringBuffer countStr, dtStr;
  1888. countStr.appendulong(count);
  1889. //if (displayDT)
  1890. dtStr = tq.getDT();
  1891. responsebuf.append("parent.displayQueue(\'").append(count).append("\',\'").append(dtStr.str()).append("\',\'").append(tq.getRunningWUs()).append("\',");
  1892. responsebuf.append("\'").append(tq.getQueuedWUs()).append("\',\'").append(tq.getWaitingThors()).append("\',");
  1893. responsebuf.append("\'").append(tq.getConnectedThors()).append("\',\'").append(tq.getIdledThors()).append("\',");
  1894. responsebuf.append("\'").append(tq.getRunningWU1()).append("\',\'").append(tq.getRunningWU2()).append("\')\r\n");
  1895. if(++jobpending>=50)
  1896. {
  1897. responsebuf.append("</script>\r\n");
  1898. response->sendChunk(responsebuf.str());
  1899. responsebuf.clear();
  1900. responsebuf.append("<script language=\"javascript\">\r\n");
  1901. jobpending=0;
  1902. }
  1903. }
  1904. StringBuffer countStr;
  1905. countStr.appendulong(count);
  1906. StringBuffer msg("<table><tr><td>");
  1907. msg.append("Total Records in the Time Period: ").append(items.length()).append(" (<a href=\"/WsWorkunits/WUClusterJobQueueLOG?").append(xls).append("\">txt</a>...<a href=\"/WsWorkunits/WUClusterJobQueueXLS?").append(xls).append("\">xls</a>).");
  1908. msg.append("</td></tr><tr><td>");
  1909. if (count > maxDisplay)
  1910. msg.append("Displayed: First ").append(maxDisplay).append(". ");
  1911. msg.append("Max. Queue Length: ").append(longestQueue).append(".");
  1912. msg.append("</td></tr></table>");
  1913. responsebuf.append("parent.displayQEnd(\'").append(msg).append("\')</script>\r\n");
  1914. response->sendChunk(responsebuf.str());
  1915. }
  1916. void WsWuJobQueueAuditInfo::getAuditLineInfo(const char* line, unsigned& longestQueue, unsigned& maxConnected, unsigned maxDisplay, unsigned showAll, IArrayOf<IEspThorQueue>& items)
  1917. {
  1918. //2009-08-12 02:44:12 ,ThorQueueMonitor,thor400_88_dev,0,0,1,1,114,---,---
  1919. if(!line || !*line)
  1920. return;
  1921. Owned<IEspThorQueue> tq = createThorQueue();
  1922. StringBuffer dt, runningWUs, queuedWUs, waitingThors, connectedThors, idledThors, runningWU1, runningWU2;
  1923. // date/time
  1924. const char* bptr = line;
  1925. const char* eptr = strchr(bptr, ',');
  1926. if(eptr)
  1927. dt.append(eptr - bptr, bptr);
  1928. else
  1929. dt.append(bptr);
  1930. tq->setDT(dt.str());
  1931. if(!eptr)
  1932. {
  1933. if (checkNewThorQueueItem(tq, showAll, items))
  1934. items.append(*tq.getClear());
  1935. return;
  1936. }
  1937. //skip title
  1938. bptr = eptr + 1;
  1939. eptr = strchr(bptr, ',');
  1940. if(!eptr)
  1941. {
  1942. if (checkNewThorQueueItem(tq, showAll, items))
  1943. items.append(*tq.getClear());
  1944. return;
  1945. }
  1946. //skip queue name
  1947. bptr = eptr + 1;
  1948. eptr = strchr(bptr, ',');
  1949. if(!eptr)
  1950. {
  1951. if (checkNewThorQueueItem(tq, showAll, items))
  1952. items.append(*tq.getClear());
  1953. return;
  1954. }
  1955. //running
  1956. bptr = eptr + 1;
  1957. eptr = strchr(bptr, ',');
  1958. if(eptr)
  1959. runningWUs.append(eptr - bptr, bptr);
  1960. else
  1961. runningWUs.append(bptr);
  1962. tq->setRunningWUs(runningWUs.str());
  1963. if(!eptr)
  1964. {
  1965. if (checkNewThorQueueItem(tq, showAll, items))
  1966. items.append(*tq.getClear());
  1967. return;
  1968. }
  1969. //queued
  1970. bptr = eptr + 1;
  1971. eptr = strchr(bptr, ',');
  1972. if(eptr)
  1973. queuedWUs.append(eptr - bptr, bptr);
  1974. else
  1975. queuedWUs.append(bptr);
  1976. if (maxDisplay > items.length())
  1977. {
  1978. unsigned queueLen = atoi(queuedWUs.str());
  1979. if (queueLen > longestQueue)
  1980. longestQueue = queueLen;
  1981. }
  1982. tq->setQueuedWUs(queuedWUs.str());
  1983. if(!eptr)
  1984. {
  1985. if (checkNewThorQueueItem(tq, showAll, items))
  1986. items.append(*tq.getClear());
  1987. return;
  1988. }
  1989. //waiting
  1990. bptr = eptr + 1;
  1991. eptr = strchr(bptr, ',');
  1992. if(eptr)
  1993. waitingThors.append(eptr - bptr, bptr);
  1994. else
  1995. waitingThors.append(bptr);
  1996. tq->setWaitingThors(waitingThors.str());
  1997. if(!eptr)
  1998. {
  1999. if (checkNewThorQueueItem(tq, showAll, items))
  2000. items.append(*tq.getClear());
  2001. return;
  2002. }
  2003. //connected
  2004. bptr = eptr + 1;
  2005. eptr = strchr(bptr, ',');
  2006. if(eptr)
  2007. connectedThors.append(eptr - bptr, bptr);
  2008. else
  2009. connectedThors.append(bptr);
  2010. if (maxDisplay > items.length())
  2011. {
  2012. unsigned connnectedLen = atoi(connectedThors.str());
  2013. if (connnectedLen > maxConnected)
  2014. maxConnected = connnectedLen;
  2015. }
  2016. tq->setConnectedThors(connectedThors.str());
  2017. if(!eptr)
  2018. {
  2019. if (checkNewThorQueueItem(tq, showAll, items))
  2020. items.append(*tq.getClear());
  2021. return;
  2022. }
  2023. //idled
  2024. bptr = eptr + 1;
  2025. eptr = strchr(bptr, ',');
  2026. if(eptr)
  2027. idledThors.append(eptr - bptr, bptr);
  2028. else
  2029. idledThors.append(bptr);
  2030. tq->setIdledThors(idledThors.str());
  2031. if(!eptr)
  2032. {
  2033. items.append(*tq.getClear());
  2034. return;
  2035. }
  2036. //runningWU1
  2037. bptr = eptr + 1;
  2038. eptr = strchr(bptr, ',');
  2039. if(eptr)
  2040. runningWU1.append(eptr - bptr, bptr);
  2041. else
  2042. {
  2043. runningWU1.append(bptr);
  2044. }
  2045. if (!strcmp(runningWU1.str(), "---"))
  2046. runningWU1.clear();
  2047. if (runningWU1.length() > 0)
  2048. tq->setRunningWU1(runningWU1.str());
  2049. if(!eptr)
  2050. {
  2051. if (checkNewThorQueueItem(tq, showAll, items))
  2052. items.append(*tq.getClear());
  2053. return;
  2054. }
  2055. //runningWU2
  2056. bptr = eptr + 1;
  2057. eptr = strchr(bptr, ',');
  2058. if(eptr)
  2059. runningWU2.append(eptr - bptr, bptr);
  2060. else
  2061. {
  2062. runningWU2.append(bptr);
  2063. }
  2064. if (!strcmp(runningWU2.str(), "---"))
  2065. runningWU2.clear();
  2066. if (runningWU2.length() > 0)
  2067. tq->setRunningWU2(runningWU2.str());
  2068. if (checkNewThorQueueItem(tq, showAll, items))
  2069. items.append(*tq.getClear());
  2070. DBGLOG("Queue log: [%s]", line);
  2071. }
  2072. bool WsWuJobQueueAuditInfo::checkSameStrings(const char* s1, const char* s2)
  2073. {
  2074. if (s1)
  2075. {
  2076. if (!s2)
  2077. return false;
  2078. if (strcmp(s1, s2))
  2079. return false;
  2080. }
  2081. else if (s2)
  2082. {
  2083. if (!s1)
  2084. return false;
  2085. }
  2086. return true;
  2087. }
  2088. bool WsWuJobQueueAuditInfo::checkNewThorQueueItem(IEspThorQueue* tq, unsigned showAll, IArrayOf<IEspThorQueue>& items)
  2089. {
  2090. bool bAdd = false;
  2091. if (showAll < 1) //show every lines
  2092. bAdd = true;
  2093. else if (items.length() < 1)
  2094. bAdd = true;
  2095. else if (showAll > 1) //last line now
  2096. {
  2097. IEspThorQueue& tq0 = items.item(items.length()-1);
  2098. if (!checkSameStrings(tq->getDT(), tq0.getDT()))
  2099. bAdd = true;
  2100. }
  2101. else
  2102. {
  2103. IEspThorQueue& tq0 = items.item(items.length()-1);
  2104. if (!checkSameStrings(tq->getRunningWUs(), tq0.getRunningWUs()))
  2105. bAdd = true;
  2106. if (!checkSameStrings(tq->getQueuedWUs(), tq0.getQueuedWUs()))
  2107. bAdd = true;
  2108. if (!checkSameStrings(tq->getConnectedThors(), tq0.getConnectedThors()))
  2109. bAdd = true;
  2110. if (!checkSameStrings(tq->getConnectedThors(), tq0.getConnectedThors()))
  2111. bAdd = true;
  2112. if (!checkSameStrings(tq->getRunningWU1(), tq0.getRunningWU1()))
  2113. bAdd = true;
  2114. if (!checkSameStrings(tq->getRunningWU2(), tq0.getRunningWU2()))
  2115. bAdd = true;
  2116. }
  2117. return bAdd;
  2118. }
  2119. void xsltTransform(const char* xml, const char* sheet, IProperties *params, StringBuffer& ret)
  2120. {
  2121. StringBuffer xsl;
  2122. if(!checkFileExists(sheet))
  2123. throw MakeStringException(ECLWATCH_FILE_NOT_EXIST, "Could not find stylesheet %s.",sheet);
  2124. Owned<IXslProcessor> proc = getXslProcessor();
  2125. Owned<IXslTransform> trans = proc->createXslTransform();
  2126. trans->setXmlSource(xml, strlen(xml));
  2127. trans->loadXslFromFile(sheet);
  2128. trans->copyParameters(params);
  2129. trans->transform(ret);
  2130. }
  2131. bool addToQueryString(StringBuffer &queryString, const char *name, const char *value, const char delim)
  2132. {
  2133. if (isEmpty(name) || isEmpty(value))
  2134. return false;
  2135. if (queryString.length() > 0)
  2136. queryString.append(delim);
  2137. queryString.append(name).append("=").append(value);
  2138. return true;
  2139. }
  2140. int WUSchedule::run()
  2141. {
  2142. try
  2143. {
  2144. while(true)
  2145. {
  2146. Owned<IWorkUnitFactory> factory = getWorkUnitFactory();
  2147. Owned<IConstWorkUnitIterator> itr = factory->getWorkUnitsByState(WUStateScheduled);
  2148. if (itr)
  2149. {
  2150. ForEach(*itr)
  2151. {
  2152. try
  2153. {
  2154. IConstWorkUnit & cw = itr->query();
  2155. if (cw.aborting())
  2156. {
  2157. WorkunitUpdate wu(&cw.lock());
  2158. wu->setState(WUStateAborted);
  2159. continue;
  2160. }
  2161. WsWuDateTime dt, now;
  2162. now.setNow();
  2163. cw.getTimeScheduled(dt);
  2164. if (now.compare(dt)>=0)
  2165. {
  2166. SCMStringBuffer wuid;
  2167. runWorkUnit(cw.getWuid(wuid).str());
  2168. }
  2169. }
  2170. catch(IException *e)
  2171. {
  2172. StringBuffer msg;
  2173. ERRLOG("Exception %d:%s in WsWorkunits Schedule::run", e->errorCode(), e->errorMessage(msg).str());
  2174. e->Release();
  2175. }
  2176. }
  2177. }
  2178. sleep(60);
  2179. }
  2180. }
  2181. catch(IException *e)
  2182. {
  2183. StringBuffer msg;
  2184. ERRLOG("Exception %d:%s in WsWorkunits Schedule::run", e->errorCode(), e->errorMessage(msg).str());
  2185. e->Release();
  2186. }
  2187. catch(...)
  2188. {
  2189. ERRLOG("Unknown exception in WsWorkunits Schedule::run");
  2190. }
  2191. if (m_container)
  2192. m_container->exitESP();
  2193. return 0;
  2194. }
  2195. }