workunitservices.cpp 29 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824
  1. /*##############################################################################
  2. HPCC SYSTEMS software Copyright (C) 2012 HPCC Systems®.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. ############################################################################## */
  13. /* TBD
  14. FilesRead
  15. FilesWritten
  16. Errors
  17. Plugins
  18. Results
  19. Timings
  20. Persists changed?
  21. */
  22. #pragma warning (disable : 4786)
  23. #pragma warning (disable : 4297) // function assumed not to throw an exception but does
  24. #include "platform.h"
  25. #include "workunit.hpp"
  26. #include "agentctx.hpp"
  27. #include "enginecontext.hpp"
  28. #include "portlist.h"
  29. #include "jio.hpp"
  30. #include "jmisc.hpp"
  31. #include "jstring.hpp"
  32. #include "dasess.hpp"
  33. #include "dasds.hpp"
  34. #include "dautils.hpp"
  35. #include "daaudit.hpp"
  36. #include "sacmd.hpp"
  37. #include "workunitservices.hpp"
  38. #include "workunitservices.ipp"
  39. #include "environment.hpp"
  40. #include "seclib.hpp"
  41. #include "hpccconfig.hpp"
  42. #define WORKUNITSERVICES_VERSION "WORKUNITSERVICES 1.0.2"
  43. static const char * compatibleVersions[] = {
  44. NULL };
  45. static const char * EclDefinition =
  46. "export WsWorkunitRecord := record "
  47. " string24 wuid;"
  48. " string owner{maxlength(64)};"
  49. " string cluster{maxlength(64)};"
  50. " string roxiecluster{maxlength(64)};"
  51. " string job{maxlength(256)};"
  52. " string10 state;"
  53. " string7 priority;"
  54. " integer2 priorityvalue;"
  55. " string20 created;"
  56. " string20 modified;"
  57. " boolean online;"
  58. " boolean protected;"
  59. " end;\n"
  60. "export WsTimeStamp := record "
  61. " string32 application;"
  62. " string16 id;"
  63. " string20 time;"
  64. " string16 instance;"
  65. " end;\n"
  66. "export WsMessage := record "
  67. " unsigned4 severity;"
  68. " integer4 code;"
  69. " string32 location;"
  70. " unsigned4 row;"
  71. " unsigned4 col;"
  72. " string16 source;"
  73. " string20 time;"
  74. " string message{maxlength(1024)};"
  75. " end;\n"
  76. "export WsFileRead := record "
  77. " string name{maxlength(256)};"
  78. " string cluster{maxlength(64)};"
  79. " boolean isSuper;"
  80. " unsigned4 usage;"
  81. " end;\n"
  82. "export WsFileWritten := record "
  83. " string name{maxlength(256)};"
  84. " string10 graph;"
  85. " string cluster{maxlength(64)};"
  86. " unsigned4 kind;"
  87. " end;\n"
  88. "export WsTiming := record "
  89. " unsigned4 count;"
  90. " unsigned4 duration;"
  91. " unsigned4 max;"
  92. " string name{maxlength(64)};"
  93. " end;\n"
  94. "export WsStatistic := record "
  95. " unsigned8 value;"
  96. " unsigned8 count;"
  97. " unsigned8 maxValue;"
  98. " string creatorType;"
  99. " string creator;"
  100. " string scopeType;"
  101. " string scope;"
  102. " string name;"
  103. " string description;"
  104. " string unit;"
  105. " end;\n"
  106. "export WorkunitServices := SERVICE :time, cpp\n"
  107. " boolean WorkunitExists(const varstring wuid, boolean online=true, boolean archived=false) : context,entrypoint='wsWorkunitExists'; \n"
  108. " dataset(WsWorkunitRecord) WorkunitList("
  109. " const varstring lowwuid='',"
  110. " const varstring highwuid='',"
  111. " const varstring username='',"
  112. " const varstring cluster='',"
  113. " const varstring jobname='',"
  114. " const varstring state='',"
  115. " const varstring priority='',"
  116. " const varstring fileread='',"
  117. " const varstring filewritten='',"
  118. " const varstring roxiecluster='',"
  119. " const varstring eclcontains='',"
  120. " boolean online=true,"
  121. " boolean archived=false,"
  122. " const varstring appvalues=''"
  123. ") : context,entrypoint='wsWorkunitList'; \n"
  124. " varstring WUIDonDate(unsigned4 year,unsigned4 month,unsigned4 day,unsigned4 hour, unsigned4 minute) : entrypoint='wsWUIDonDate'; \n"
  125. " varstring WUIDdaysAgo(unsigned4 daysago) : entrypoint='wsWUIDdaysAgo'; \n"
  126. " dataset(WsTimeStamp) WorkunitTimeStamps(const varstring wuid) : context,entrypoint='wsWorkunitTimeStamps'; \n"
  127. " dataset(WsMessage) WorkunitMessages(const varstring wuid) : context,entrypoint='wsWorkunitMessages'; \n"
  128. " dataset(WsFileRead) WorkunitFilesRead(const varstring wuid) : context,entrypoint='wsWorkunitFilesRead'; \n"
  129. " dataset(WsFileWritten) WorkunitFilesWritten(const varstring wuid) : context,entrypoint='wsWorkunitFilesWritten'; \n"
  130. " dataset(WsTiming) WorkunitTimings(const varstring wuid) : context,entrypoint='wsWorkunitTimings'; \n"
  131. " streamed dataset(WsStatistic) WorkunitStatistics(const varstring wuid, boolean includeActivities = false, const varstring _filter = '') : context,entrypoint='wsWorkunitStatistics'; \n"
  132. " boolean setWorkunitAppValue(const varstring app, const varstring key, const varstring value, boolean overwrite=true) : context,entrypoint='wsSetWorkunitAppValue'; \n"
  133. "END;";
  134. #define WAIT_SECONDS 30
  135. #define SDS_LOCK_TIMEOUT (1000*60*5)
  136. #define SASHA_TIMEOUT (1000*60*10)
  137. WORKUNITSERVICES_API bool getECLPluginDefinition(ECLPluginDefinitionBlock *pb)
  138. {
  139. if (pb->size == sizeof(ECLPluginDefinitionBlockEx))
  140. {
  141. ECLPluginDefinitionBlockEx * pbx = (ECLPluginDefinitionBlockEx *) pb;
  142. pbx->compatibleVersions = compatibleVersions;
  143. }
  144. else if (pb->size != sizeof(ECLPluginDefinitionBlock))
  145. return false;
  146. pb->magicVersion = PLUGIN_VERSION;
  147. pb->version = WORKUNITSERVICES_VERSION;
  148. pb->moduleName = "lib_WORKUNITSERVICES";
  149. pb->ECL = EclDefinition;
  150. pb->flags = PLUGIN_IMPLICIT_MODULE;
  151. pb->description = "WORKUNITSERVICES library";
  152. return true;
  153. }
  154. namespace nsWorkunitservices {
  155. IPluginContext * parentCtx = NULL;
  156. static void getSashaWUArchiveNodes(SocketEndpointArray &epa)
  157. {
  158. #ifdef _CONTAINERIZED
  159. StringBuffer service;
  160. getService(service, "wu-archiver", true);
  161. SocketEndpoint sashaep(service);
  162. epa.append(sashaep);
  163. #else
  164. Owned<IEnvironmentFactory> factory = getEnvironmentFactory(true);
  165. Owned<IConstEnvironment> env = factory->openEnvironment();
  166. Owned<IPropertyTree> root = &env->getPTree();
  167. StringBuffer tmp;
  168. Owned<IPropertyTreeIterator> siter = root->getElements("Software/SashaServerProcess/Instance");
  169. ForEach(*siter) {
  170. if (siter->query().getProp("@netAddress",tmp.clear())) {
  171. SocketEndpoint sashaep(tmp.str(),siter->query().getPropInt("@port",DEFAULT_SASHA_PORT));
  172. epa.append(sashaep);
  173. }
  174. }
  175. #endif
  176. }
  177. static IWorkUnitFactory * getWorkunitFactory(ICodeContext * ctx)
  178. {
  179. IEngineContext *engineCtx = ctx->queryEngineContext();
  180. if (engineCtx && !engineCtx->allowDaliAccess())
  181. {
  182. Owned<IException> e = MakeStringException(-1, "workunitservices cannot access Dali in this context - this normally means it is being called from a thor slave");
  183. EXCLOG(e, NULL);
  184. throw e.getClear();
  185. }
  186. //MORE: These should really be set up correctly - probably should be returned from IEngineContext
  187. ISecManager *secmgr = NULL;
  188. ISecUser *secuser = NULL;
  189. return getWorkUnitFactory(secmgr, secuser);
  190. }
  191. static bool securityDisabled = false;
  192. static bool checkScopeAuthorized(IUserDescriptor *user, const char *scopename)
  193. {
  194. if (securityDisabled)
  195. return true;
  196. unsigned auditflags = DALI_LDAP_AUDIT_REPORT|DALI_LDAP_READ_WANTED;
  197. SecAccessFlags perm = SecAccess_Full;
  198. if (scopename && *scopename)
  199. {
  200. perm = querySessionManager().getPermissionsLDAP("workunit",scopename,user,auditflags);
  201. if (perm<0)
  202. {
  203. if (perm == SecAccess_Unavailable)
  204. {
  205. perm = SecAccess_Full;
  206. securityDisabled = true;
  207. }
  208. else
  209. perm = SecAccess_None;
  210. }
  211. if (!HASREADPERMISSION(perm))
  212. return false;
  213. }
  214. return true;
  215. }
  216. static IConstWorkUnit * getWorkunit(ICodeContext * ctx, const char * wuid)
  217. {
  218. StringBuffer _wuid(wuid);
  219. if (!_wuid.length())
  220. return NULL;
  221. wuid = _wuid.toUpperCase().clip().str();
  222. Owned<IWorkUnitFactory> wuFactory = getWorkunitFactory(ctx);
  223. Owned<IConstWorkUnit> wu = wuFactory->openWorkUnit(wuid);
  224. if (wu)
  225. {
  226. if (!checkScopeAuthorized(ctx->queryUserDescriptor(), wu->queryWuScope()))
  227. wu.clear();
  228. }
  229. return wu.getClear();
  230. }
  231. static IConstWorkUnit *getWorkunit(ICodeContext * ctx)
  232. {
  233. StringAttr wuid;
  234. wuid.setown(ctx->getWuid());
  235. // One assumes we have read access to our own wu
  236. return getWorkunit(ctx, wuid);
  237. }
  238. static StringBuffer &getWUIDonDate(StringBuffer &wuid,unsigned year,unsigned month,unsigned day,unsigned hour,unsigned minute)
  239. {
  240. if ((year==0)||(month==0)||(day==0)) {
  241. CDateTime dt;
  242. dt.setNow();
  243. unsigned y;
  244. unsigned m;
  245. unsigned d;
  246. dt.getDate(y,m,d, true);
  247. if (year==0)
  248. year = y;
  249. if (month==0)
  250. month = m;
  251. if (day==0)
  252. day = d;
  253. }
  254. else if (year<100)
  255. year+=2000;
  256. wuid.appendf("W%d%02d%02d-%02d%02d00",year,month,day,hour,minute);
  257. return wuid;
  258. }
  259. static StringBuffer &getWUIDdaysAgo(StringBuffer &wuid,int daysago)
  260. {
  261. CDateTime dt;
  262. dt.setNow();
  263. dt.adjustTime(-(daysago*60*24));
  264. unsigned y;
  265. unsigned m;
  266. unsigned d;
  267. dt.getDate(y,m,d, true);
  268. unsigned h;
  269. unsigned mn;
  270. unsigned s;
  271. unsigned ns;
  272. dt.getTime(h,mn,s,ns,true);
  273. return getWUIDonDate(wuid,y,m,d,h,mn);
  274. }
  275. static bool addWUQueryFilter(WUSortField *filters, unsigned &count, MemoryBuffer &buff, const char *name, WUSortField value)
  276. {
  277. if (!name || !*name)
  278. return false;
  279. filters[count++] = value;
  280. buff.append(name);
  281. return true;
  282. }
  283. static bool serializeWUInfo(IConstWorkUnitInfo &info,MemoryBuffer &mb)
  284. {
  285. fixedAppend(mb,24,info.queryWuid());
  286. varAppendMax(mb,64,info.queryUser());
  287. varAppendMax(mb,64,info.queryClusterName());
  288. varAppendMax(mb,64,""); // roxiecluster is obsolete
  289. varAppendMax(mb,256,info.queryJobName());
  290. fixedAppend(mb,10,info.queryStateDesc());
  291. fixedAppend(mb,7,info.queryPriorityDesc());
  292. short int prioritylevel = info.getPriorityLevel();
  293. mb.append(prioritylevel);
  294. fixedAppend(mb,20,""); // Created timestamp
  295. fixedAppend(mb,20,""); // Modified timestamp
  296. mb.append(true);
  297. mb.append(info.isProtected());
  298. if (mb.length()>WORKUNIT_SERVICES_BUFFER_MAX) {
  299. mb.clear().append(WUS_STATUS_OVERFLOWED);
  300. return false;
  301. }
  302. return true;
  303. }
  304. }//namespace
  305. using namespace nsWorkunitservices;
  306. static const unsigned MAX_FILTERS=20;
  307. WORKUNITSERVICES_API void wsWorkunitList(
  308. ICodeContext *ctx,
  309. size32_t & __lenResult,
  310. void * & __result,
  311. const char *lowwuid,
  312. const char *highwuid,
  313. const char *username,
  314. const char *cluster,
  315. const char *jobname,
  316. const char *state,
  317. const char *priority,
  318. const char *fileread,
  319. const char *filewritten,
  320. const char *roxiecluster, // Not in use - retained for compatibility only
  321. const char *eclcontains,
  322. bool online,
  323. bool archived,
  324. const char *appvalues
  325. )
  326. {
  327. MemoryBuffer mb;
  328. if (archived) {
  329. SocketEndpointArray sashaeps;
  330. getSashaWUArchiveNodes(sashaeps);
  331. ForEachItemIn(i,sashaeps) {
  332. Owned<ISashaCommand> cmd = createSashaCommand();
  333. cmd->setAction(SCA_WORKUNIT_SERVICES_GET);
  334. cmd->setOnline(false);
  335. cmd->setArchived(true);
  336. cmd->setWUSmode(true);
  337. if (lowwuid&&*lowwuid)
  338. cmd->setAfter(lowwuid);
  339. if (highwuid&&*highwuid)
  340. cmd->setBefore(highwuid);
  341. if (username&&*username)
  342. cmd->setOwner(username);
  343. if (cluster&&*cluster)
  344. cmd->setCluster(cluster);
  345. if (jobname&&*jobname)
  346. cmd->setJobName(jobname);
  347. if (state&&*state)
  348. cmd->setState(state);
  349. if (priority&&*priority)
  350. cmd->setPriority(priority);
  351. if (fileread&&*fileread)
  352. cmd->setFileRead(fileread);
  353. if (filewritten&&*filewritten)
  354. cmd->setFileWritten(filewritten);
  355. if (eclcontains&&*eclcontains)
  356. cmd->setEclContains(eclcontains);
  357. Owned<INode> sashanode = createINode(sashaeps.item(i));
  358. if (cmd->send(sashanode,SASHA_TIMEOUT)) {
  359. byte res = cmd->getWUSresult(mb);
  360. if (res==WUS_STATUS_OVERFLOWED)
  361. throw MakeStringException(-1,"WORKUNITSERVICES: Result buffer overflowed");
  362. if (res!=WUS_STATUS_OK)
  363. throw MakeStringException(-1,"WORKUNITSERVICES: Sasha get results failed (%d)",(int)res);
  364. break;
  365. }
  366. if (i+1>=sashaeps.ordinality()) {
  367. StringBuffer ips;
  368. sashaeps.item(0).getIpText(ips);
  369. throw MakeStringException(-1,"Time out to Sasha server on %s (server not running or query too complex)",ips.str());
  370. }
  371. }
  372. }
  373. if (online)
  374. {
  375. WUSortField filters[MAX_FILTERS+1]; // NOTE - increase if you add a LOT more parameters! The +1 is to allow space for the terminator
  376. unsigned filterCount = 0;
  377. MemoryBuffer filterbuf;
  378. if (state && *state)
  379. {
  380. filters[filterCount++] = WUSFstate;
  381. if (!strieq(state, "unknown"))
  382. filterbuf.append(state);
  383. else
  384. filterbuf.append("");
  385. }
  386. if (priority && *priority)
  387. {
  388. filters[filterCount++] = WUSFpriority;
  389. if (!strieq(priority, "unknown"))
  390. filterbuf.append(priority);
  391. else
  392. filterbuf.append("");
  393. }
  394. addWUQueryFilter(filters, filterCount, filterbuf, cluster, WUSFcluster);
  395. addWUQueryFilter(filters, filterCount, filterbuf, fileread, (WUSortField) (WUSFfileread | WUSFnocase));
  396. addWUQueryFilter(filters, filterCount, filterbuf, filewritten, (WUSortField) (WUSFfilewritten | WUSFnocase));
  397. addWUQueryFilter(filters, filterCount, filterbuf, username, (WUSortField) (WUSFuser | WUSFnocase));
  398. addWUQueryFilter(filters, filterCount, filterbuf, jobname, (WUSortField) (WUSFjob | WUSFwild | WUSFnocase));
  399. addWUQueryFilter(filters, filterCount, filterbuf, eclcontains, (WUSortField) (WUSFecl | WUSFwild));
  400. addWUQueryFilter(filters, filterCount, filterbuf, lowwuid, WUSFwuid);
  401. addWUQueryFilter(filters, filterCount, filterbuf, highwuid, WUSFwuidhigh);
  402. if (appvalues && *appvalues)
  403. {
  404. StringArray appFilters;
  405. appFilters.appendList(appvalues, "|"); // Multiple filters separated by |
  406. ForEachItemIn(idx, appFilters)
  407. {
  408. StringArray appFilter; // individual filter of form appname/key=value or appname/*=value
  409. appFilter.appendList(appFilters.item(idx), "=");
  410. const char *appvalue;
  411. switch (appFilter.length())
  412. {
  413. case 1:
  414. appvalue = NULL;
  415. break;
  416. case 2:
  417. appvalue = appFilter.item(1);
  418. break;
  419. default:
  420. throw MakeStringException(-1,"WORKUNITSERVICES: Invalid application value filter %s (expected format is 'appname/keyname=value')", appFilters.item(idx));
  421. }
  422. const char *appkey = appFilter.item(0);
  423. if (!strchr(appkey, '/'))
  424. throw MakeStringException(-1,"WORKUNITSERVICES: Invalid application value filter %s (expected format is 'appname/keyname=value')", appFilters.item(idx));
  425. if (filterCount>=MAX_FILTERS)
  426. throw MakeStringException(-1,"WORKUNITSERVICES: Too many filters");
  427. filterbuf.append(appkey);
  428. filterbuf.append(appvalue);
  429. filters[filterCount++] = WUSFappvalue;
  430. }
  431. }
  432. filters[filterCount] = WUSFterm;
  433. Owned<IWorkUnitFactory> wuFactory = getWorkunitFactory(ctx);
  434. Owned<IConstWorkUnitIterator> it = wuFactory->getWorkUnitsSorted((WUSortField) (WUSFwuid | WUSFreverse), filters, filterbuf.bufferBase(), 0, INT_MAX, NULL, NULL); // MORE - need security flags here!
  435. ForEach(*it)
  436. {
  437. if (!serializeWUInfo(it->query(), mb))
  438. throw MakeStringException(-1,"WORKUNITSERVICES: Result buffer overflowed");
  439. }
  440. }
  441. __lenResult = mb.length();
  442. __result = mb.detach();
  443. }
  444. WORKUNITSERVICES_API bool wsWorkunitExists(ICodeContext *ctx, const char *wuid, bool online, bool archived)
  445. {
  446. if (!wuid||!*wuid)
  447. return false;
  448. StringBuffer _wuid(wuid);
  449. wuid = _wuid.toUpperCase().str();
  450. if (online)
  451. {
  452. Owned<IWorkUnitFactory> wuFactory = getWorkunitFactory(ctx);
  453. Owned<IConstWorkUnit> wu = wuFactory->openWorkUnit(wuid); // Note - we don't use getWorkUnit as we don't need read access
  454. return wu != NULL;
  455. }
  456. if (archived)
  457. {
  458. SocketEndpointArray sashaeps;
  459. getSashaWUArchiveNodes(sashaeps);
  460. ForEachItemIn(i,sashaeps) {
  461. Owned<ISashaCommand> cmd = createSashaCommand();
  462. cmd->setAction(SCA_LIST);
  463. cmd->setOnline(false);
  464. cmd->setArchived(true);
  465. cmd->addId(wuid);
  466. Owned<INode> sashanode = createINode(sashaeps.item(i));
  467. if (cmd->send(sashanode,SASHA_TIMEOUT)) {
  468. return cmd->numIds()>0;
  469. }
  470. }
  471. }
  472. return false;
  473. }
  474. WORKUNITSERVICES_API char * wsWUIDonDate(unsigned year,unsigned month,unsigned day,unsigned hour,unsigned minute)
  475. {
  476. StringBuffer ret;
  477. return getWUIDonDate(ret,year,month,day,hour,minute).detach();
  478. }
  479. WORKUNITSERVICES_API char * wsWUIDdaysAgo(unsigned daysago)
  480. {
  481. StringBuffer ret;
  482. return getWUIDdaysAgo(ret,(int)daysago).detach();
  483. }
  484. class WsTimeStampVisitor : public WuScopeVisitorBase
  485. {
  486. public:
  487. WsTimeStampVisitor(MemoryBuffer & _mb) : mb(_mb) {}
  488. virtual void noteStatistic(StatisticKind kind, unsigned __int64 value, IConstWUStatistic & cur) override
  489. {
  490. const char * curScope = cur.queryScope();
  491. const char * kindName = queryStatisticName(kind);
  492. assertex(kindName);
  493. ///The following will be true on workunits >= 7.0, but may not be for 6.4 and earlier
  494. if (memicmp(kindName, "when", 4) == 0)
  495. kindName += 4;
  496. StringBuffer formattedTime;
  497. convertTimestampToStr(value, formattedTime, true);
  498. SCMStringBuffer creator;
  499. cur.getCreator(creator);
  500. const char * at = strchr(creator.str(), '@');
  501. const char * instance = at ? at + 1 : creator.str();
  502. fixedAppend(mb, 32, curScope);
  503. fixedAppend(mb, 16, kindName); // id
  504. fixedAppend(mb, 20, formattedTime); // time
  505. fixedAppend(mb, 16, instance); // item correct here
  506. }
  507. protected:
  508. MemoryBuffer & mb;
  509. };
  510. WORKUNITSERVICES_API void wsWorkunitTimeStamps(ICodeContext *ctx, size32_t & __lenResult, void * & __result, const char *wuid)
  511. {
  512. Owned<IConstWorkUnit> wu = getWorkunit(ctx, wuid);
  513. MemoryBuffer mb;
  514. if (wu)
  515. {
  516. WsTimeStampVisitor visitor(mb);
  517. WuScopeFilter filter("measure[When],source[global]");
  518. Owned<IConstWUScopeIterator> iter = &wu->getScopeIterator(filter);
  519. ForEach(*iter)
  520. {
  521. iter->playProperties(visitor);
  522. }
  523. }
  524. __lenResult = mb.length();
  525. __result = mb.detach();
  526. }
  527. WORKUNITSERVICES_API void wsWorkunitMessages( ICodeContext *ctx, size32_t & __lenResult, void * & __result, const char *wuid )
  528. {
  529. Owned<IConstWorkUnit> wu = getWorkunit(ctx, wuid);
  530. MemoryBuffer mb;
  531. if (wu)
  532. {
  533. SCMStringBuffer s;
  534. Owned<IConstWUExceptionIterator> exceptions = &wu->getExceptions();
  535. ForEach(*exceptions)
  536. {
  537. IConstWUException &e = exceptions->query();
  538. mb.append((unsigned) e.getSeverity());
  539. mb.append((int) e.getExceptionCode());
  540. e.getExceptionFileName(s);
  541. fixedAppend(mb, 32, s.str(), s.length());
  542. mb.append((unsigned) e.getExceptionLineNo());
  543. mb.append((unsigned) e.getExceptionColumn());
  544. e.getExceptionSource(s);
  545. fixedAppend(mb, 16, s.str(), s.length());
  546. e.getTimeStamp(s);
  547. fixedAppend(mb, 20, s.str(), s.length());
  548. e.getExceptionMessage(s);
  549. varAppendMax(mb, 1024, s.str(), s.length());
  550. }
  551. }
  552. __lenResult = mb.length();
  553. __result = mb.detach();
  554. }
  555. WORKUNITSERVICES_API void wsWorkunitFilesRead( ICodeContext *ctx, size32_t & __lenResult, void * & __result, const char *wuid )
  556. {
  557. MemoryBuffer mb;
  558. Owned<IConstWorkUnit> wu = getWorkunit(ctx, wuid);
  559. if (wu)
  560. {
  561. Owned<IPropertyTreeIterator> sourceFiles = &wu->getFilesReadIterator();
  562. ForEach(*sourceFiles)
  563. {
  564. IPropertyTree &item = sourceFiles->query();
  565. varAppendMax(mb, 256, item, "@name");
  566. varAppendMax(mb, 64, item, "@cluster");
  567. mb.append(item.getPropBool("@super"));
  568. mb.append((unsigned) item.getPropInt("@useCount"));
  569. }
  570. }
  571. __lenResult = mb.length();
  572. __result = mb.detach();
  573. }
  574. WORKUNITSERVICES_API void wsWorkunitFilesWritten( ICodeContext *ctx, size32_t & __lenResult, void * & __result, const char *wuid )
  575. {
  576. MemoryBuffer mb;
  577. Owned<IConstWorkUnit> wu = getWorkunit(ctx, wuid);
  578. if (wu)
  579. {
  580. Owned<IPropertyTreeIterator> sourceFiles = &wu->getFileIterator();
  581. ForEach(*sourceFiles)
  582. {
  583. IPropertyTree &item = sourceFiles->query();
  584. varAppendMax(mb, 256, item, "@name");
  585. fixedAppend(mb, 10, item, "@graph");
  586. varAppendMax(mb, 64, item, "@cluster");
  587. mb.append( (unsigned) item.getPropInt("@kind"));
  588. }
  589. }
  590. __lenResult = mb.length();
  591. __result = mb.detach();
  592. }
  593. class WsTimingVisitor : public WuScopeVisitorBase
  594. {
  595. public:
  596. WsTimingVisitor(MemoryBuffer & _mb) : mb(_mb) {}
  597. virtual void noteStatistic(StatisticKind kind, unsigned __int64 value, IConstWUStatistic & cur) override
  598. {
  599. SCMStringBuffer desc;
  600. unsigned __int64 count = cur.getCount();
  601. unsigned __int64 max = cur.getMax();
  602. cur.getDescription(desc, true);
  603. mb.append((unsigned) count);
  604. mb.append((unsigned) (value / 1000000));
  605. mb.append((unsigned) max);
  606. varAppend(mb, desc.str());
  607. }
  608. protected:
  609. MemoryBuffer & mb;
  610. };
  611. WORKUNITSERVICES_API void wsWorkunitTimings( ICodeContext *ctx, size32_t & __lenResult, void * & __result, const char *wuid )
  612. {
  613. Owned<IConstWorkUnit> wu = getWorkunit(ctx, wuid);
  614. MemoryBuffer mb;
  615. if (wu)
  616. {
  617. WsTimingVisitor visitor(mb);
  618. WuScopeFilter filter("measure[Time],source[global]");
  619. Owned<IConstWUScopeIterator> iter = &wu->getScopeIterator(filter);
  620. ForEach(*iter)
  621. {
  622. iter->playProperties(visitor);
  623. }
  624. }
  625. __lenResult = mb.length();
  626. __result = mb.detach();
  627. }
  628. class WsStreamedStatistics : public CInterfaceOf<IRowStream>, public IWuScopeVisitor
  629. {
  630. public:
  631. WsStreamedStatistics(IConstWorkUnit * _wu, IEngineRowAllocator * _resultAllocator, const char * _filter)
  632. : wu(_wu), resultAllocator(_resultAllocator)
  633. {
  634. filter.addOutputProperties(PTstatistics);
  635. filter.addFilter(_filter);
  636. filter.finishedFilter();
  637. iter.setown(&wu->getScopeIterator(filter));
  638. if (iter->first())
  639. gatherStats();
  640. }
  641. ~WsStreamedStatistics()
  642. {
  643. releaseRows();
  644. }
  645. virtual const void *nextRow()
  646. {
  647. for (;;)
  648. {
  649. if (!iter->isValid())
  650. return nullptr;
  651. if (rows.isItem(curRow))
  652. return rows.item(curRow++);
  653. if (iter->next())
  654. gatherStats();
  655. }
  656. }
  657. virtual void stop()
  658. {
  659. iter.clear();
  660. releaseRows();
  661. }
  662. //interface IWuScopeVisitor
  663. virtual void noteStatistic(StatisticKind kind, unsigned __int64 value, IConstWUStatistic & extra)
  664. {
  665. SCMStringBuffer creator;
  666. SCMStringBuffer description;
  667. unsigned __int64 count = extra.getCount();
  668. unsigned __int64 max = extra.getMax();
  669. StatisticCreatorType creatorType = extra.getCreatorType();
  670. extra.getCreator(creator);
  671. StatisticScopeType scopeType = extra.getScopeType();
  672. const char * scope = extra.queryScope();
  673. if (!scope) scope = "";
  674. extra.getDescription(description, true);
  675. StatisticMeasure measure = extra.getMeasure();
  676. MemoryBuffer mb;
  677. mb.append(sizeof(value),&value);
  678. mb.append(sizeof(count),&count);
  679. mb.append(sizeof(max),&max);
  680. varAppend(mb, queryCreatorTypeName(creatorType));
  681. varAppend(mb, creator.str());
  682. varAppend(mb, queryScopeTypeName(scopeType));
  683. varAppend(mb, scope);
  684. varAppend(mb, queryStatisticName(kind));
  685. varAppend(mb, description.str());
  686. varAppend(mb, queryMeasureName(measure));
  687. size32_t len = mb.length();
  688. size32_t newSize;
  689. void * row = resultAllocator->createRow(len, newSize);
  690. memcpy(row, mb.bufferBase(), len);
  691. rows.append(row);
  692. }
  693. virtual void noteAttribute(WuAttr attr, const char * value) {}
  694. virtual void noteHint(const char * kind, const char * value) {}
  695. virtual void noteException(IConstWUException & exception) override {}
  696. protected:
  697. bool gatherStats()
  698. {
  699. rows.clear();
  700. curRow = 0;
  701. for (;;)
  702. {
  703. iter->playProperties(*this, PTstatistics);
  704. if (rows.ordinality())
  705. return true;
  706. if (!iter->next())
  707. return false;
  708. }
  709. }
  710. void releaseRows()
  711. {
  712. while (rows.isItem(curRow))
  713. resultAllocator->releaseRow(rows.item(curRow++));
  714. }
  715. protected:
  716. Linked<IConstWorkUnit> wu;
  717. Linked<IEngineRowAllocator> resultAllocator;
  718. WuScopeFilter filter;
  719. Linked<IConstWUScopeIterator> iter;
  720. ConstPointerArray rows;
  721. unsigned curRow = 0;
  722. };
  723. //This function is deprecated and no longer supported - I'm not sure it ever worked
  724. WORKUNITSERVICES_API IRowStream * wsWorkunitStatistics( ICodeContext *ctx, IEngineRowAllocator * allocator, const char *wuid, bool includeActivities, const char * filterText)
  725. {
  726. Owned<IConstWorkUnit> wu = getWorkunit(ctx, wuid);
  727. if (!wu)
  728. return createNullRowStream();
  729. return new WsStreamedStatistics(wu, allocator, filterText);
  730. }
  731. WORKUNITSERVICES_API bool wsSetWorkunitAppValue( ICodeContext *ctx, const char *appname, const char *key, const char *value, bool overwrite)
  732. {
  733. if (appname && *appname && key && *key && value && *value)
  734. {
  735. WorkunitUpdate w(ctx->updateWorkUnit());
  736. w->setApplicationValue(appname, key, value, overwrite);
  737. return true;
  738. }
  739. return false;
  740. }
  741. WORKUNITSERVICES_API void setPluginContext(IPluginContext * _ctx) { parentCtx = _ctx; }
  742. WORKUNITSERVICES_API char * WORKUNITSERVICES_CALL fsGetBuildInfo(void)
  743. {
  744. return CTXSTRDUP(parentCtx, WORKUNITSERVICES_VERSION);
  745. }