workunitservices.cpp 27 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762
  1. /*##############################################################################
  2. HPCC SYSTEMS software Copyright (C) 2012 HPCC Systems®.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. ############################################################################## */
  13. /* TBD
  14. FilesRead
  15. FilesWritten
  16. Errors
  17. Plugins
  18. Results
  19. Timings
  20. Persists changed?
  21. */
  22. #pragma warning (disable : 4786)
  23. #pragma warning (disable : 4297) // function assumed not to throw an exception but does
  24. #include "platform.h"
  25. #include "workunit.hpp"
  26. #include "agentctx.hpp"
  27. #include "enginecontext.hpp"
  28. #include "portlist.h"
  29. #include "jio.hpp"
  30. #include "jmisc.hpp"
  31. #include "jstring.hpp"
  32. #include "dasess.hpp"
  33. #include "dasds.hpp"
  34. #include "dautils.hpp"
  35. #include "daaudit.hpp"
  36. #include "sacmd.hpp"
  37. #include "workunitservices.hpp"
  38. #include "workunitservices.ipp"
  39. #include "environment.hpp"
  40. #include "seclib.hpp"
  41. #define WORKUNITSERVICES_VERSION "WORKUNITSERVICES 1.0.2"
  42. static const char * compatibleVersions[] = {
  43. NULL };
  44. static const char * EclDefinition =
  45. "export WsWorkunitRecord := record "
  46. " string24 wuid;"
  47. " string owner{maxlength(64)};"
  48. " string cluster{maxlength(64)};"
  49. " string roxiecluster{maxlength(64)};"
  50. " string job{maxlength(256)};"
  51. " string10 state;"
  52. " string7 priority;"
  53. " integer2 priorityvalue;"
  54. " string20 created;"
  55. " string20 modified;"
  56. " boolean online;"
  57. " boolean protected;"
  58. " end;\n"
  59. "export WsTimeStamp := record "
  60. " string32 application;"
  61. " string16 id;"
  62. " string20 time;"
  63. " string16 instance;"
  64. " end;\n"
  65. "export WsMessage := record "
  66. " unsigned4 severity;"
  67. " integer4 code;"
  68. " string32 location;"
  69. " unsigned4 row;"
  70. " unsigned4 col;"
  71. " string16 source;"
  72. " string20 time;"
  73. " string message{maxlength(1024)};"
  74. " end;\n"
  75. "export WsFileRead := record "
  76. " string name{maxlength(256)};"
  77. " string cluster{maxlength(64)};"
  78. " boolean isSuper;"
  79. " unsigned4 usage;"
  80. " end;\n"
  81. "export WsFileWritten := record "
  82. " string name{maxlength(256)};"
  83. " string10 graph;"
  84. " string cluster{maxlength(64)};"
  85. " unsigned4 kind;"
  86. " end;\n"
  87. "export WsTiming := record "
  88. " unsigned4 count;"
  89. " unsigned4 duration;"
  90. " unsigned4 max;"
  91. " string name{maxlength(64)};"
  92. " end;\n"
  93. "export WsStatistic := record "
  94. " unsigned8 value;"
  95. " unsigned8 count;"
  96. " unsigned8 maxValue;"
  97. " string creatorType;"
  98. " string creator;"
  99. " string scopeType;"
  100. " string scope;"
  101. " string name;"
  102. " string description;"
  103. " string unit;"
  104. " end;\n"
  105. "export WorkunitServices := SERVICE :time, cpp\n"
  106. " boolean WorkunitExists(const varstring wuid, boolean online=true, boolean archived=false) : context,entrypoint='wsWorkunitExists'; \n"
  107. " dataset(WsWorkunitRecord) WorkunitList("
  108. " const varstring lowwuid='',"
  109. " const varstring highwuid='',"
  110. " const varstring username='',"
  111. " const varstring cluster='',"
  112. " const varstring jobname='',"
  113. " const varstring state='',"
  114. " const varstring priority='',"
  115. " const varstring fileread='',"
  116. " const varstring filewritten='',"
  117. " const varstring roxiecluster='',"
  118. " const varstring eclcontains='',"
  119. " boolean online=true,"
  120. " boolean archived=false,"
  121. " const varstring appvalues=''"
  122. ") : context,entrypoint='wsWorkunitList'; \n"
  123. " varstring WUIDonDate(unsigned4 year,unsigned4 month,unsigned4 day,unsigned4 hour, unsigned4 minute) : entrypoint='wsWUIDonDate'; \n"
  124. " varstring WUIDdaysAgo(unsigned4 daysago) : entrypoint='wsWUIDdaysAgo'; \n"
  125. " dataset(WsTimeStamp) WorkunitTimeStamps(const varstring wuid) : context,entrypoint='wsWorkunitTimeStamps'; \n"
  126. " dataset(WsMessage) WorkunitMessages(const varstring wuid) : context,entrypoint='wsWorkunitMessages'; \n"
  127. " dataset(WsFileRead) WorkunitFilesRead(const varstring wuid) : context,entrypoint='wsWorkunitFilesRead'; \n"
  128. " dataset(WsFileWritten) WorkunitFilesWritten(const varstring wuid) : context,entrypoint='wsWorkunitFilesWritten'; \n"
  129. " dataset(WsTiming) WorkunitTimings(const varstring wuid) : context,entrypoint='wsWorkunitTimings'; \n"
  130. " streamed dataset(WsStatistic) WorkunitStatistics(const varstring wuid, boolean includeActivities = false, const varstring _filter = '') : context,entrypoint='wsWorkunitStatistics'; \n"
  131. " boolean setWorkunitAppValue(const varstring app, const varstring key, const varstring value, boolean overwrite=true) : context,entrypoint='wsSetWorkunitAppValue'; \n"
  132. "END;";
  133. #define WAIT_SECONDS 30
  134. #define SDS_LOCK_TIMEOUT (1000*60*5)
  135. #define SASHA_TIMEOUT (1000*60*10)
  136. WORKUNITSERVICES_API bool getECLPluginDefinition(ECLPluginDefinitionBlock *pb)
  137. {
  138. if (pb->size == sizeof(ECLPluginDefinitionBlockEx))
  139. {
  140. ECLPluginDefinitionBlockEx * pbx = (ECLPluginDefinitionBlockEx *) pb;
  141. pbx->compatibleVersions = compatibleVersions;
  142. }
  143. else if (pb->size != sizeof(ECLPluginDefinitionBlock))
  144. return false;
  145. pb->magicVersion = PLUGIN_VERSION;
  146. pb->version = WORKUNITSERVICES_VERSION;
  147. pb->moduleName = "lib_WORKUNITSERVICES";
  148. pb->ECL = EclDefinition;
  149. pb->flags = PLUGIN_IMPLICIT_MODULE;
  150. pb->description = "WORKUNITSERVICES library";
  151. return true;
  152. }
  153. namespace nsWorkunitservices {
  154. IPluginContext * parentCtx = NULL;
  155. static void getSashaNodes(SocketEndpointArray &epa)
  156. {
  157. Owned<IEnvironmentFactory> factory = getEnvironmentFactory();
  158. Owned<IConstEnvironment> env = factory->openEnvironment();
  159. if (!env)
  160. {
  161. ERRLOG("getSashaNodes: cannot connect to /Environment!");
  162. return;
  163. }
  164. Owned<IPropertyTree> root = &env->getPTree();
  165. StringBuffer tmp;
  166. Owned<IPropertyTreeIterator> siter = root->getElements("Software/SashaServerProcess/Instance");
  167. ForEach(*siter) {
  168. if (siter->query().getProp("@netAddress",tmp.clear())) {
  169. SocketEndpoint sashaep(tmp.str(),siter->query().getPropInt("@port",DEFAULT_SASHA_PORT));
  170. epa.append(sashaep);
  171. }
  172. }
  173. }
  174. static IWorkUnitFactory * getWorkunitFactory(ICodeContext * ctx)
  175. {
  176. IEngineContext *engineCtx = ctx->queryEngineContext();
  177. if (engineCtx && !engineCtx->allowDaliAccess())
  178. {
  179. Owned<IException> e = MakeStringException(-1, "workunitservices cannot access Dali in this context - this normally means it is being called from a thor slave");
  180. EXCLOG(e, NULL);
  181. throw e.getClear();
  182. }
  183. //MORE: These should really be set up correctly - probably should be returned from IEngineContext
  184. ISecManager *secmgr = NULL;
  185. ISecUser *secuser = NULL;
  186. return getWorkUnitFactory(secmgr, secuser);
  187. }
  188. static bool securityDisabled = false;
  189. static bool checkScopeAuthorized(IUserDescriptor *user, const char *scopename)
  190. {
  191. if (securityDisabled)
  192. return true;
  193. unsigned auditflags = DALI_LDAP_AUDIT_REPORT|DALI_LDAP_READ_WANTED;
  194. SecAccessFlags perm = SecAccess_Full;
  195. if (scopename && *scopename)
  196. {
  197. perm = querySessionManager().getPermissionsLDAP("workunit",scopename,user,auditflags);
  198. if (perm<0)
  199. {
  200. if (perm == SecAccess_Unavailable)
  201. {
  202. perm = SecAccess_Full;
  203. securityDisabled = true;
  204. }
  205. else
  206. perm = SecAccess_None;
  207. }
  208. if (!HASREADPERMISSION(perm))
  209. return false;
  210. }
  211. return true;
  212. }
  213. static IConstWorkUnit * getWorkunit(ICodeContext * ctx, const char * wuid)
  214. {
  215. StringBuffer _wuid(wuid);
  216. if (!_wuid.length())
  217. return NULL;
  218. wuid = _wuid.toUpperCase().str();
  219. Owned<IWorkUnitFactory> wuFactory = getWorkunitFactory(ctx);
  220. Owned<IConstWorkUnit> wu = wuFactory->openWorkUnit(wuid);
  221. if (wu)
  222. {
  223. if (!checkScopeAuthorized(ctx->queryUserDescriptor(), wu->queryWuScope()))
  224. wu.clear();
  225. }
  226. return wu.getClear();
  227. }
  228. static IConstWorkUnit *getWorkunit(ICodeContext * ctx)
  229. {
  230. StringAttr wuid;
  231. wuid.setown(ctx->getWuid());
  232. // One assumes we have read access to our own wu
  233. return getWorkunit(ctx, wuid);
  234. }
  235. static StringBuffer &getWUIDonDate(StringBuffer &wuid,unsigned year,unsigned month,unsigned day,unsigned hour,unsigned minute)
  236. {
  237. if ((year==0)||(month==0)||(day==0)) {
  238. CDateTime dt;
  239. dt.setNow();
  240. unsigned y;
  241. unsigned m;
  242. unsigned d;
  243. dt.getDate(y,m,d, true);
  244. if (year==0)
  245. year = y;
  246. if (month==0)
  247. month = m;
  248. if (day==0)
  249. day = d;
  250. }
  251. else if (year<100)
  252. year+=2000;
  253. wuid.appendf("W%d%02d%02d-%02d%02d00",year,month,day,hour,minute);
  254. return wuid;
  255. }
  256. static StringBuffer &getWUIDdaysAgo(StringBuffer &wuid,int daysago)
  257. {
  258. CDateTime dt;
  259. dt.setNow();
  260. dt.adjustTime(-(daysago*60*24));
  261. unsigned y;
  262. unsigned m;
  263. unsigned d;
  264. dt.getDate(y,m,d, true);
  265. unsigned h;
  266. unsigned mn;
  267. unsigned s;
  268. unsigned ns;
  269. dt.getTime(h,mn,s,ns,true);
  270. return getWUIDonDate(wuid,y,m,d,h,mn);
  271. }
  272. static bool addWUQueryFilter(WUSortField *filters, unsigned &count, MemoryBuffer &buff, const char *name, WUSortField value)
  273. {
  274. if (!name || !*name)
  275. return false;
  276. filters[count++] = value;
  277. buff.append(name);
  278. return true;
  279. }
  280. static bool serializeWUInfo(IConstWorkUnitInfo &info,MemoryBuffer &mb)
  281. {
  282. fixedAppend(mb,24,info.queryWuid());
  283. varAppendMax(mb,64,info.queryUser());
  284. varAppendMax(mb,64,info.queryClusterName());
  285. varAppendMax(mb,64,""); // roxiecluster is obsolete
  286. varAppendMax(mb,256,info.queryJobName());
  287. fixedAppend(mb,10,info.queryStateDesc());
  288. fixedAppend(mb,7,info.queryPriorityDesc());
  289. short int prioritylevel = info.getPriorityLevel();
  290. mb.append(prioritylevel);
  291. fixedAppend(mb,20,""); // Created timestamp
  292. fixedAppend(mb,20,""); // Modified timestamp
  293. mb.append(true);
  294. mb.append(info.isProtected());
  295. if (mb.length()>WORKUNIT_SERVICES_BUFFER_MAX) {
  296. mb.clear().append(WUS_STATUS_OVERFLOWED);
  297. return false;
  298. }
  299. return true;
  300. }
  301. }//namespace
  302. using namespace nsWorkunitservices;
  303. static const unsigned MAX_FILTERS=20;
  304. WORKUNITSERVICES_API void wsWorkunitList(
  305. ICodeContext *ctx,
  306. size32_t & __lenResult,
  307. void * & __result,
  308. const char *lowwuid,
  309. const char *highwuid,
  310. const char *username,
  311. const char *cluster,
  312. const char *jobname,
  313. const char *state,
  314. const char *priority,
  315. const char *fileread,
  316. const char *filewritten,
  317. const char *roxiecluster, // Not in use - retained for compatibility only
  318. const char *eclcontains,
  319. bool online,
  320. bool archived,
  321. const char *appvalues
  322. )
  323. {
  324. MemoryBuffer mb;
  325. if (archived) {
  326. SocketEndpointArray sashaeps;
  327. getSashaNodes(sashaeps);
  328. ForEachItemIn(i,sashaeps) {
  329. Owned<ISashaCommand> cmd = createSashaCommand();
  330. cmd->setAction(SCA_WORKUNIT_SERVICES_GET);
  331. cmd->setOnline(false);
  332. cmd->setArchived(true);
  333. cmd->setWUSmode(true);
  334. if (lowwuid&&*lowwuid)
  335. cmd->setAfter(lowwuid);
  336. if (highwuid&&*highwuid)
  337. cmd->setBefore(highwuid);
  338. if (username&&*username)
  339. cmd->setOwner(username);
  340. if (cluster&&*cluster)
  341. cmd->setCluster(cluster);
  342. if (jobname&&*jobname)
  343. cmd->setJobName(jobname);
  344. if (state&&*state)
  345. cmd->setState(state);
  346. if (priority&&*priority)
  347. cmd->setPriority(priority);
  348. if (fileread&&*fileread)
  349. cmd->setFileRead(fileread);
  350. if (filewritten&&*filewritten)
  351. cmd->setFileWritten(filewritten);
  352. if (eclcontains&&*eclcontains)
  353. cmd->setEclContains(eclcontains);
  354. Owned<INode> sashanode = createINode(sashaeps.item(i));
  355. if (cmd->send(sashanode,SASHA_TIMEOUT)) {
  356. byte res = cmd->getWUSresult(mb);
  357. if (res==WUS_STATUS_OVERFLOWED)
  358. throw MakeStringException(-1,"WORKUNITSERVICES: Result buffer overflowed");
  359. if (res!=WUS_STATUS_OK)
  360. throw MakeStringException(-1,"WORKUNITSERVICES: Sasha get results failed (%d)",(int)res);
  361. break;
  362. }
  363. if (i+1>=sashaeps.ordinality()) {
  364. StringBuffer ips;
  365. sashaeps.item(0).getIpText(ips);
  366. throw MakeStringException(-1,"Time out to Sasha server on %s (server not running or query too complex)",ips.str());
  367. }
  368. }
  369. }
  370. if (online)
  371. {
  372. WUSortField filters[MAX_FILTERS+1]; // NOTE - increase if you add a LOT more parameters! The +1 is to allow space for the terminator
  373. unsigned filterCount = 0;
  374. MemoryBuffer filterbuf;
  375. if (state && *state)
  376. {
  377. filters[filterCount++] = WUSFstate;
  378. if (!strieq(state, "unknown"))
  379. filterbuf.append(state);
  380. else
  381. filterbuf.append("");
  382. }
  383. if (priority && *priority)
  384. {
  385. filters[filterCount++] = WUSFpriority;
  386. if (!strieq(priority, "unknown"))
  387. filterbuf.append(priority);
  388. else
  389. filterbuf.append("");
  390. }
  391. addWUQueryFilter(filters, filterCount, filterbuf, cluster, WUSFcluster);
  392. addWUQueryFilter(filters, filterCount, filterbuf, fileread, (WUSortField) (WUSFfileread | WUSFnocase));
  393. addWUQueryFilter(filters, filterCount, filterbuf, filewritten, (WUSortField) (WUSFfilewritten | WUSFnocase));
  394. addWUQueryFilter(filters, filterCount, filterbuf, username, (WUSortField) (WUSFuser | WUSFnocase));
  395. addWUQueryFilter(filters, filterCount, filterbuf, jobname, (WUSortField) (WUSFjob | WUSFwild | WUSFnocase));
  396. addWUQueryFilter(filters, filterCount, filterbuf, eclcontains, (WUSortField) (WUSFecl | WUSFwild));
  397. addWUQueryFilter(filters, filterCount, filterbuf, lowwuid, WUSFwuid);
  398. addWUQueryFilter(filters, filterCount, filterbuf, highwuid, WUSFwuidhigh);
  399. if (appvalues && *appvalues)
  400. {
  401. StringArray appFilters;
  402. appFilters.appendList(appvalues, "|"); // Multiple filters separated by |
  403. ForEachItemIn(idx, appFilters)
  404. {
  405. StringArray appFilter; // individual filter of form appname/key=value or appname/*=value
  406. appFilter.appendList(appFilters.item(idx), "=");
  407. const char *appvalue;
  408. switch (appFilter.length())
  409. {
  410. case 1:
  411. appvalue = NULL;
  412. break;
  413. case 2:
  414. appvalue = appFilter.item(1);
  415. break;
  416. default:
  417. throw MakeStringException(-1,"WORKUNITSERVICES: Invalid application value filter %s (expected format is 'appname/keyname=value')", appFilters.item(idx));
  418. }
  419. const char *appkey = appFilter.item(0);
  420. if (!strchr(appkey, '/'))
  421. throw MakeStringException(-1,"WORKUNITSERVICES: Invalid application value filter %s (expected format is 'appname/keyname=value')", appFilters.item(idx));
  422. if (filterCount>=MAX_FILTERS)
  423. throw MakeStringException(-1,"WORKUNITSERVICES: Too many filters");
  424. filterbuf.append(appkey);
  425. filterbuf.append(appvalue);
  426. filters[filterCount++] = WUSFappvalue;
  427. }
  428. }
  429. filters[filterCount] = WUSFterm;
  430. Owned<IWorkUnitFactory> wuFactory = getWorkunitFactory(ctx);
  431. Owned<IConstWorkUnitIterator> it = wuFactory->getWorkUnitsSorted((WUSortField) (WUSFwuid | WUSFreverse), filters, filterbuf.bufferBase(), 0, INT_MAX, NULL, NULL); // MORE - need security flags here!
  432. ForEach(*it)
  433. {
  434. if (!serializeWUInfo(it->query(), mb))
  435. throw MakeStringException(-1,"WORKUNITSERVICES: Result buffer overflowed");
  436. }
  437. }
  438. __lenResult = mb.length();
  439. __result = mb.detach();
  440. }
  441. WORKUNITSERVICES_API bool wsWorkunitExists(ICodeContext *ctx, const char *wuid, bool online, bool archived)
  442. {
  443. if (!wuid||!*wuid)
  444. return false;
  445. StringBuffer _wuid(wuid);
  446. wuid = _wuid.toUpperCase().str();
  447. if (online)
  448. {
  449. Owned<IWorkUnitFactory> wuFactory = getWorkunitFactory(ctx);
  450. Owned<IConstWorkUnit> wu = wuFactory->openWorkUnit(wuid); // Note - we don't use getWorkUnit as we don't need read access
  451. return wu != NULL;
  452. }
  453. if (archived)
  454. {
  455. SocketEndpointArray sashaeps;
  456. getSashaNodes(sashaeps);
  457. ForEachItemIn(i,sashaeps) {
  458. Owned<ISashaCommand> cmd = createSashaCommand();
  459. cmd->setAction(SCA_LIST);
  460. cmd->setOnline(false);
  461. cmd->setArchived(true);
  462. cmd->addId(wuid);
  463. Owned<INode> sashanode = createINode(sashaeps.item(i));
  464. if (cmd->send(sashanode,SASHA_TIMEOUT)) {
  465. return cmd->numIds()>0;
  466. }
  467. }
  468. }
  469. return false;
  470. }
  471. WORKUNITSERVICES_API char * wsWUIDonDate(unsigned year,unsigned month,unsigned day,unsigned hour,unsigned minute)
  472. {
  473. StringBuffer ret;
  474. return getWUIDonDate(ret,year,month,day,hour,minute).detach();
  475. }
  476. WORKUNITSERVICES_API char * wsWUIDdaysAgo(unsigned daysago)
  477. {
  478. StringBuffer ret;
  479. return getWUIDdaysAgo(ret,(int)daysago).detach();
  480. }
  481. WORKUNITSERVICES_API void wsWorkunitTimeStamps(ICodeContext *ctx, size32_t & __lenResult, void * & __result, const char *wuid)
  482. {
  483. Owned<IConstWorkUnit> wu = getWorkunit(ctx, wuid);
  484. MemoryBuffer mb;
  485. if (wu)
  486. {
  487. Owned<StatisticsFilter> filter = new StatisticsFilter(SCTall, SSTall, SMeasureTimestampUs, StKindAll);
  488. filter->setScopeDepth(1, 2);
  489. Owned<IConstWUStatisticIterator> stats = &wu->getStatistics(filter);
  490. ForEach(*stats)
  491. {
  492. IConstWUStatistic & cur = stats->query();
  493. SCMStringBuffer scope;
  494. cur.getScope(scope);
  495. StatisticKind kind = cur.getKind();
  496. const char * kindName = queryStatisticName(kind);
  497. assertex(kindName && memicmp(kindName, "when", 4) == 0);
  498. kindName += 4;
  499. StringBuffer formattedTime;
  500. convertTimestampToStr(cur.getValue(), formattedTime, true);
  501. SCMStringBuffer creator;
  502. cur.getCreator(creator);
  503. const char * at = strchr(creator.str(), '@');
  504. const char * instance = at ? at + 1 : creator.str();
  505. fixedAppend(mb, 32, scope.str());
  506. fixedAppend(mb, 16, kindName); // id
  507. fixedAppend(mb, 20, formattedTime); // time
  508. fixedAppend(mb, 16, instance); // item correct here
  509. }
  510. }
  511. __lenResult = mb.length();
  512. __result = mb.detach();
  513. }
  514. WORKUNITSERVICES_API void wsWorkunitMessages( ICodeContext *ctx, size32_t & __lenResult, void * & __result, const char *wuid )
  515. {
  516. Owned<IConstWorkUnit> wu = getWorkunit(ctx, wuid);
  517. MemoryBuffer mb;
  518. if (wu)
  519. {
  520. SCMStringBuffer s;
  521. Owned<IConstWUExceptionIterator> exceptions = &wu->getExceptions();
  522. ForEach(*exceptions)
  523. {
  524. IConstWUException &e = exceptions->query();
  525. mb.append((unsigned) e.getSeverity());
  526. mb.append((int) e.getExceptionCode());
  527. e.getExceptionFileName(s);
  528. fixedAppend(mb, 32, s.str(), s.length());
  529. mb.append((unsigned) e.getExceptionLineNo());
  530. mb.append((unsigned) e.getExceptionColumn());
  531. e.getExceptionSource(s);
  532. fixedAppend(mb, 16, s.str(), s.length());
  533. e.getTimeStamp(s);
  534. fixedAppend(mb, 20, s.str(), s.length());
  535. e.getExceptionMessage(s);
  536. varAppendMax(mb, 1024, s.str(), s.length());
  537. }
  538. }
  539. __lenResult = mb.length();
  540. __result = mb.detach();
  541. }
  542. WORKUNITSERVICES_API void wsWorkunitFilesRead( ICodeContext *ctx, size32_t & __lenResult, void * & __result, const char *wuid )
  543. {
  544. MemoryBuffer mb;
  545. Owned<IConstWorkUnit> wu = getWorkunit(ctx, wuid);
  546. if (wu)
  547. {
  548. Owned<IPropertyTreeIterator> sourceFiles = &wu->getFilesReadIterator();
  549. ForEach(*sourceFiles)
  550. {
  551. IPropertyTree &item = sourceFiles->query();
  552. varAppendMax(mb, 256, item, "@name");
  553. varAppendMax(mb, 64, item, "@cluster");
  554. mb.append(item.getPropBool("@super"));
  555. mb.append((unsigned) item.getPropInt("@useCount"));
  556. }
  557. }
  558. __lenResult = mb.length();
  559. __result = mb.detach();
  560. }
  561. WORKUNITSERVICES_API void wsWorkunitFilesWritten( ICodeContext *ctx, size32_t & __lenResult, void * & __result, const char *wuid )
  562. {
  563. MemoryBuffer mb;
  564. Owned<IConstWorkUnit> wu = getWorkunit(ctx, wuid);
  565. if (wu)
  566. {
  567. Owned<IPropertyTreeIterator> sourceFiles = &wu->getFileIterator();
  568. ForEach(*sourceFiles)
  569. {
  570. IPropertyTree &item = sourceFiles->query();
  571. varAppendMax(mb, 256, item, "@name");
  572. fixedAppend(mb, 10, item, "@graph");
  573. varAppendMax(mb, 64, item, "@cluster");
  574. mb.append( (unsigned) item.getPropInt("@kind"));
  575. }
  576. }
  577. __lenResult = mb.length();
  578. __result = mb.detach();
  579. }
  580. WORKUNITSERVICES_API void wsWorkunitTimings( ICodeContext *ctx, size32_t & __lenResult, void * & __result, const char *wuid )
  581. {
  582. Owned<IConstWorkUnit> wu = getWorkunit(ctx, wuid);
  583. MemoryBuffer mb;
  584. if (wu)
  585. {
  586. StatisticsFilter filter;
  587. filter.setScopeDepth(1, 2);
  588. filter.setMeasure(SMeasureTimeNs);
  589. SCMStringBuffer desc;
  590. Owned<IConstWUStatisticIterator> iter = &wu->getStatistics(&filter);
  591. ForEach(*iter)
  592. {
  593. IConstWUStatistic & cur = iter->query();
  594. unsigned __int64 value = cur.getValue();
  595. unsigned __int64 count = cur.getCount();
  596. unsigned __int64 max = cur.getMax();
  597. cur.getDescription(desc, true);
  598. mb.append((unsigned) count);
  599. mb.append((unsigned) (value / 1000000));
  600. mb.append((unsigned) max);
  601. varAppend(mb, desc.str());
  602. }
  603. }
  604. __lenResult = mb.length();
  605. __result = mb.detach();
  606. }
  607. class StreamedStatistics : public CInterfaceOf<IRowStream>
  608. {
  609. public:
  610. StreamedStatistics(IConstWorkUnit * _wu, IEngineRowAllocator * _resultAllocator, IConstWUStatisticIterator * _iter)
  611. : wu(_wu), resultAllocator(_resultAllocator),iter(_iter)
  612. {
  613. }
  614. virtual const void *nextRow()
  615. {
  616. if (!iter || !iter->isValid())
  617. return NULL;
  618. IConstWUStatistic & cur = iter->query();
  619. unsigned __int64 value = cur.getValue();
  620. unsigned __int64 count = cur.getCount();
  621. unsigned __int64 max = cur.getMax();
  622. StatisticCreatorType creatorType = cur.getCreatorType();
  623. cur.getCreator(creator);
  624. StatisticScopeType scopeType = cur.getScopeType();
  625. cur.getScope(scope);
  626. cur.getDescription(description, true);
  627. StatisticMeasure measure = cur.getMeasure();
  628. StatisticKind kind = cur.getKind();
  629. MemoryBuffer mb;
  630. mb.append(sizeof(value),&value);
  631. mb.append(sizeof(count),&count);
  632. mb.append(sizeof(max),&max);
  633. varAppend(mb, queryCreatorTypeName(creatorType));
  634. varAppend(mb, creator.str());
  635. varAppend(mb, queryScopeTypeName(scopeType));
  636. varAppend(mb, scope.str());
  637. varAppend(mb, queryStatisticName(kind));
  638. varAppend(mb, description.str());
  639. varAppend(mb, queryMeasureName(measure));
  640. size32_t len = mb.length();
  641. size32_t newSize;
  642. void * row = resultAllocator->createRow(newSize);
  643. row = resultAllocator->resizeRow(len, row, newSize);
  644. memcpy(row, mb.bufferBase(), len);
  645. iter->next();
  646. return resultAllocator->finalizeRow(len, row, newSize);
  647. }
  648. virtual void stop()
  649. {
  650. iter.clear();
  651. }
  652. protected:
  653. Linked<IConstWorkUnit> wu;
  654. Linked<IEngineRowAllocator> resultAllocator;
  655. Linked<IConstWUStatisticIterator> iter;
  656. SCMStringBuffer creator;
  657. SCMStringBuffer scope;
  658. SCMStringBuffer description;
  659. };
  660. WORKUNITSERVICES_API IRowStream * wsWorkunitStatistics( ICodeContext *ctx, IEngineRowAllocator * allocator, const char *wuid, bool includeActivities, const char * filterText)
  661. {
  662. Owned<IConstWorkUnit> wu = getWorkunit(ctx, wuid);
  663. if (!wu)
  664. return createNullRowStream();
  665. //Filter needs to be allocated because the iterator outlasts it.
  666. Owned<StatisticsFilter> filter = new StatisticsFilter(filterText);
  667. if (!includeActivities)
  668. filter->setScopeDepth(1, 2);
  669. Owned<IConstWUStatisticIterator> stats = &wu->getStatistics(filter);
  670. return new StreamedStatistics(wu, allocator, stats);
  671. }
  672. WORKUNITSERVICES_API bool wsSetWorkunitAppValue( ICodeContext *ctx, const char *appname, const char *key, const char *value, bool overwrite)
  673. {
  674. if (appname && *appname && key && *key && value && *value)
  675. {
  676. WorkunitUpdate w(ctx->updateWorkUnit());
  677. w->setApplicationValue(appname, key, value, overwrite);
  678. return true;
  679. }
  680. return false;
  681. }
  682. WORKUNITSERVICES_API void setPluginContext(IPluginContext * _ctx) { parentCtx = _ctx; }
  683. WORKUNITSERVICES_API char * WORKUNITSERVICES_CALL fsGetBuildInfo(void)
  684. {
  685. return CTXSTRDUP(parentCtx, WORKUNITSERVICES_VERSION);
  686. }