workunit.ipp 33 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792
  1. /*##############################################################################
  2. HPCC SYSTEMS software Copyright (C) 2014 HPCC Systems.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. ############################################################################## */
  13. #ifndef WORKUNIT_IPP_INCL
  14. #include "seclib.hpp"
  15. #include "dasess.hpp" /// For IUserDescriptor
  16. #include "dasds.hpp"
  17. #include "workunit.hpp"
  18. /* NB: Some of the classes in this file are also used from casandrawu - which means they all need WORKUNIT_API */
  19. #define SDS_LOCK_TIMEOUT (5*60*1000) // 5 mins
  20. #define WUID_VERSION 2 // recorded in each wuid created, useful for bkwd compat. checks
  21. #define GLOBAL_WORKUNIT "global"
  22. class WORKUNIT_API CLocalWUAppValue : implements IConstWUAppValue, public CInterface
  23. {
  24. Owned<IPropertyTree> p;
  25. StringAttr prop;
  26. public:
  27. IMPLEMENT_IINTERFACE;
  28. CLocalWUAppValue(IPropertyTree *p,unsigned child);
  29. virtual const char *queryApplication() const;
  30. virtual const char *queryName() const;
  31. virtual const char *queryValue() const;
  32. };
  33. class WORKUNIT_API CLocalWUStatistic : implements IConstWUStatistic, public CInterface
  34. {
  35. Owned<IPropertyTree> p;
  36. public:
  37. IMPLEMENT_IINTERFACE;
  38. CLocalWUStatistic(IPropertyTree *p);
  39. virtual IStringVal & getCreator(IStringVal & str) const;
  40. virtual IStringVal & getDescription(IStringVal & str, bool createDefault) const;
  41. virtual IStringVal & getFormattedValue(IStringVal & str) const;
  42. virtual IStringVal & getType(IStringVal & str) const;
  43. virtual IStringVal & getScope(IStringVal & str) const;
  44. virtual StatisticMeasure getMeasure() const;
  45. virtual StatisticCreatorType getCreatorType() const;
  46. virtual StatisticScopeType getScopeType() const;
  47. virtual StatisticKind getKind() const;
  48. virtual unsigned __int64 getValue() const;
  49. virtual unsigned __int64 getCount() const;
  50. virtual unsigned __int64 getMax() const;
  51. virtual unsigned __int64 getTimestamp() const;
  52. virtual bool matches(const IStatisticsFilter * filter) const;
  53. };
  54. class WORKUNIT_API CLocalWULegacyTiming : implements IConstWUStatistic, public CInterface
  55. {
  56. Owned<IPropertyTree> p;
  57. public:
  58. IMPLEMENT_IINTERFACE;
  59. CLocalWULegacyTiming(IPropertyTree *p);
  60. virtual IStringVal & getCreator(IStringVal & str) const;
  61. virtual IStringVal & getDescription(IStringVal & str, bool createDefault) const;
  62. virtual IStringVal & getFormattedValue(IStringVal & str) const;
  63. virtual IStringVal & getType(IStringVal & str) const;
  64. virtual IStringVal & getScope(IStringVal & str) const;
  65. virtual StatisticMeasure getMeasure() const;
  66. virtual StatisticCreatorType getCreatorType() const;
  67. virtual StatisticScopeType getScopeType() const;
  68. virtual StatisticKind getKind() const;
  69. virtual unsigned __int64 getValue() const;
  70. virtual unsigned __int64 getCount() const;
  71. virtual unsigned __int64 getMax() const;
  72. virtual unsigned __int64 getTimestamp() const;
  73. virtual bool matches(const IStatisticsFilter * filter) const;
  74. };
  75. //==========================================================================================
  76. template <typename T, typename IT> struct CachedTags
  77. {
  78. CachedTags(): cached(false) {}
  79. void load(IPropertyTree* p,const char* xpath)
  80. {
  81. if (!cached)
  82. {
  83. assertex(tags.length() == 0);
  84. Owned<IPropertyTreeIterator> r = p->getElements(xpath);
  85. for (r->first(); r->isValid(); r->next())
  86. {
  87. IPropertyTree *rp = &r->query();
  88. rp->Link();
  89. tags.append(*new T(rp));
  90. }
  91. cached = true;
  92. }
  93. }
  94. void loadBranch(IPropertyTree* p,const char* xpath)
  95. {
  96. if (!cached)
  97. {
  98. assertex(tags.length() == 0);
  99. Owned<IPropertyTree> branch = p->getBranch(xpath);
  100. if (branch)
  101. {
  102. Owned<IPropertyTreeIterator> r = branch->getElements("*");
  103. for (r->first(); r->isValid(); r->next())
  104. {
  105. IPropertyTree *rp = &r->query();
  106. rp->Link();
  107. tags.append(*new T(rp));
  108. }
  109. }
  110. cached = true;
  111. }
  112. }
  113. void append(IPropertyTree * p)
  114. {
  115. tags.append(*new T(p));
  116. }
  117. operator IArrayOf<IT>&() { return tags; }
  118. unsigned ordinality() const { return tags.ordinality(); }
  119. void kill()
  120. {
  121. cached = false;
  122. tags.kill();
  123. }
  124. bool cached;
  125. IArrayOf<IT> tags;
  126. };
  127. template <> struct CachedTags<CLocalWUAppValue, IConstWUAppValue>
  128. {
  129. CachedTags(): cached(false) {}
  130. void load(IPropertyTree* p,const char* xpath)
  131. {
  132. if (!cached)
  133. {
  134. assertex(tags.length() == 0);
  135. Owned<IPropertyTreeIterator> r = p->getElements(xpath);
  136. for (r->first(); r->isValid(); r->next())
  137. {
  138. IPropertyTree *rp = &r->query();
  139. Owned<IPropertyTreeIterator> v = rp->getElements("*");
  140. unsigned pos = 1;
  141. for (v->first(); v->isValid(); v->next())
  142. {
  143. rp->Link();
  144. tags.append(*new CLocalWUAppValue(rp,pos++));
  145. }
  146. }
  147. cached = true;
  148. }
  149. }
  150. void loadBranch(IPropertyTree* p,const char* xpath)
  151. {
  152. if (!cached)
  153. {
  154. assertex(tags.length() == 0);
  155. Owned<IPropertyTree> branch = p->getBranch(xpath);
  156. if (branch)
  157. {
  158. Owned<IPropertyTreeIterator> r = branch->getElements("*");
  159. for (r->first(); r->isValid(); r->next())
  160. {
  161. IPropertyTree *rp = &r->query();
  162. Owned<IPropertyTreeIterator> v = rp->getElements("*");
  163. unsigned pos = 1;
  164. for (v->first(); v->isValid(); v->next())
  165. {
  166. rp->Link();
  167. tags.append(*new CLocalWUAppValue(rp,pos++));
  168. }
  169. }
  170. }
  171. cached = true;
  172. }
  173. }
  174. operator IArrayOf<IConstWUAppValue>&() { return tags; }
  175. void kill()
  176. {
  177. cached = false;
  178. tags.kill();
  179. }
  180. bool cached;
  181. IArrayOf<IConstWUAppValue> tags;
  182. };
  183. //==========================================================================================
  184. class WORKUNIT_API CLocalWorkUnit : implements IWorkUnit , implements IExtendedWUInterface, public CInterface
  185. {
  186. friend StringBuffer &exportWorkUnitToXML(const IConstWorkUnit *wu, StringBuffer &str, bool decodeGraphs, bool includeProgress, bool hidePasswords);
  187. friend void exportWorkUnitToXMLFile(const IConstWorkUnit *wu, const char * filename, unsigned extraXmlFlags, bool decodeGraphs, bool includeProgress, bool hidePasswords, bool splitStats);
  188. protected:
  189. Owned<IPropertyTree> p;
  190. mutable CriticalSection crit;
  191. mutable Owned<IWUQuery> query;
  192. mutable Owned<IWUWebServicesInfo> webServicesInfo;
  193. mutable Owned<IWorkflowItemIterator> workflowIterator;
  194. mutable bool workflowIteratorCached;
  195. mutable bool resultsCached;
  196. mutable unsigned char graphsCached; // 0 means uncached, 1 means light, 2 means heavy
  197. mutable bool temporariesCached;
  198. mutable bool variablesCached;
  199. mutable bool exceptionsCached;
  200. mutable bool pluginsCached;
  201. mutable bool librariesCached;
  202. mutable bool activitiesCached;
  203. mutable bool webServicesInfoCached;
  204. mutable bool roxieQueryInfoCached;
  205. mutable IArrayOf<IWUPlugin> plugins;
  206. mutable IArrayOf<IWULibrary> libraries;
  207. mutable IArrayOf<IWUException> exceptions;
  208. mutable IArrayOf<IConstWUGraph> graphs;
  209. mutable IArrayOf<IWUResult> results;
  210. mutable IArrayOf<IWUResult> temporaries;
  211. mutable IArrayOf<IWUResult> variables;
  212. mutable CachedTags<CLocalWUAppValue,IConstWUAppValue> appvalues;
  213. mutable CachedTags<CLocalWUStatistic,IConstWUStatistic> statistics;
  214. mutable CachedTags<CLocalWULegacyTiming,IConstWUStatistic> legacyTimings;
  215. mutable Owned<IUserDescriptor> userDesc;
  216. Mutex locked;
  217. Owned<ISecManager> secMgr;
  218. Owned<ISecUser> secUser;
  219. protected:
  220. public:
  221. IMPLEMENT_IINTERFACE;
  222. CLocalWorkUnit(ISecManager *secmgr, ISecUser *secuser);
  223. void loadPTree(IPropertyTree *ptree);
  224. void beforeDispose();
  225. IPropertyTree *getUnpackedTree(bool includeProgress) const;
  226. ISecManager *querySecMgr() { return secMgr.get(); }
  227. ISecUser *querySecUser() { return secUser.get(); }
  228. virtual bool aborting() const { return false; }
  229. virtual void forceReload() {};
  230. virtual WUAction getAction() const;
  231. virtual const char *queryActionDesc() const;
  232. virtual IStringVal & getApplicationValue(const char * application, const char * propname, IStringVal & str) const;
  233. virtual int getApplicationValueInt(const char * application, const char * propname, int defVal) const;
  234. virtual IConstWUAppValueIterator & getApplicationValues() const;
  235. virtual bool hasWorkflow() const;
  236. virtual unsigned queryEventScheduledCount() const;
  237. virtual IPropertyTree * queryWorkflowTree() const;
  238. virtual IConstWorkflowItemIterator * getWorkflowItems() const;
  239. virtual IWorkflowItemArray * getWorkflowClone() const;
  240. virtual IConstLocalFileUploadIterator * getLocalFileUploads() const;
  241. virtual bool requiresLocalFileUpload() const;
  242. virtual bool getIsQueryService() const;
  243. virtual const char *queryClusterName() const;
  244. virtual bool hasDebugValue(const char * propname) const;
  245. virtual IStringVal & getDebugValue(const char * propname, IStringVal & str) const;
  246. virtual IStringIterator & getDebugValues() const;
  247. virtual IStringIterator & getDebugValues(const char *prop) const;
  248. virtual int getDebugValueInt(const char * propname, int defVal) const;
  249. virtual __int64 getDebugValueInt64(const char * propname, __int64 defVal) const;
  250. virtual bool getDebugValueBool(const char * propname, bool defVal) const;
  251. virtual unsigned getExceptionCount() const;
  252. virtual IConstWUExceptionIterator & getExceptions() const;
  253. virtual IConstWUResult * getGlobalByName(const char * name) const;
  254. virtual unsigned getGraphCount() const;
  255. virtual unsigned getSourceFileCount() const;
  256. virtual unsigned getResultCount() const;
  257. virtual unsigned getVariableCount() const;
  258. virtual unsigned getApplicationValueCount() const;
  259. virtual IConstWUGraphIterator & getGraphs(WUGraphType type) const;
  260. virtual IConstWUGraphMetaIterator & getGraphsMeta(WUGraphType type) const;
  261. virtual IConstWUGraph * getGraph(const char *name) const;
  262. virtual IConstWUGraphProgress * getGraphProgress(const char * name) const;
  263. virtual WUGraphState queryGraphState(const char *graphName) const;
  264. virtual void setGraphState(const char *graphName, WUGraphState state) const;
  265. virtual void setNodeState(const char *graphName, WUGraphIDType nodeId, WUGraphState state) const;
  266. virtual WUGraphState queryNodeState(const char *graphName, WUGraphIDType nodeId) const;
  267. virtual IWUGraphStats *updateStats(const char *graphName, StatisticCreatorType creatorType, const char * creator, unsigned subgraph) const;
  268. void clearGraphProgress() const;
  269. virtual const char *queryJobName() const;
  270. virtual IConstWUPlugin * getPluginByName(const char * name) const;
  271. virtual IConstWUPluginIterator & getPlugins() const;
  272. virtual IConstWULibraryIterator & getLibraries() const;
  273. virtual WUPriorityClass getPriority() const;
  274. virtual const char *queryPriorityDesc() const;
  275. virtual int getPriorityLevel() const;
  276. virtual int getPriorityValue() const;
  277. virtual IConstWUQuery * getQuery() const;
  278. virtual bool getRescheduleFlag() const;
  279. virtual IConstWUResult * getResultByName(const char * name) const;
  280. virtual IConstWUResult * getResultBySequence(unsigned seq) const;
  281. virtual unsigned getResultLimit() const;
  282. virtual IConstWUResultIterator & getResults() const;
  283. virtual IStringVal & getScope(IStringVal & str) const;
  284. virtual IStringVal & getSecurityToken(IStringVal & str) const;
  285. virtual WUState getState() const;
  286. virtual IStringVal & getStateEx(IStringVal & str) const;
  287. virtual __int64 getAgentSession() const;
  288. virtual unsigned getAgentPID() const;
  289. virtual const char *queryStateDesc() const;
  290. virtual IConstWUResult * getTemporaryByName(const char * name) const;
  291. virtual IConstWUResultIterator & getTemporaries() const;
  292. virtual IConstWUStatisticIterator & getStatistics(const IStatisticsFilter * filter) const;
  293. virtual IConstWUStatistic * getStatistic(const char * creator, const char * scope, StatisticKind kind) const;
  294. virtual IConstWUWebServicesInfo * getWebServicesInfo() const;
  295. virtual IStringVal & getXmlParams(IStringVal & params, bool hidePasswords) const;
  296. virtual const IPropertyTree *getXmlParams() const;
  297. virtual unsigned __int64 getHash() const;
  298. virtual IStringIterator *getLogs(const char *type, const char *component) const;
  299. virtual IStringIterator *getProcesses(const char *type) const;
  300. virtual IPropertyTreeIterator* getProcesses(const char *type, const char *instance) const;
  301. virtual IStringVal & getSnapshot(IStringVal & str) const;
  302. virtual ErrorSeverity getWarningSeverity(unsigned code, ErrorSeverity defaultSeverity) const;
  303. virtual const char *queryUser() const;
  304. virtual const char *queryWuScope() const;
  305. virtual IConstWUResult * getVariableByName(const char * name) const;
  306. virtual IConstWUResultIterator & getVariables() const;
  307. virtual const char *queryWuid() const;
  308. virtual bool getRunningGraph(IStringVal &graphName, WUGraphIDType &subId) const;
  309. virtual bool isProtected() const;
  310. virtual bool isPausing() const;
  311. virtual IWorkUnit& lock();
  312. virtual void requestAbort();
  313. virtual unsigned calculateHash(unsigned prevHash);
  314. virtual void copyWorkUnit(IConstWorkUnit *cached, bool all);
  315. virtual IPropertyTree *queryPTree() const;
  316. virtual unsigned queryFileUsage(const char *filename) const;
  317. virtual IConstWUFileUsageIterator * getFieldUsage() const;
  318. virtual bool getFieldUsageArray(StringArray & filenames, StringArray & columnnames, const char * clusterName) const;
  319. virtual bool getCloneable() const;
  320. virtual IUserDescriptor * queryUserDescriptor() const;
  321. virtual unsigned getCodeVersion() const;
  322. virtual unsigned getWuidVersion() const;
  323. virtual void getBuildVersion(IStringVal & buildVersion, IStringVal & eclVersion) const;
  324. virtual IPropertyTree * getDiskUsageStats();
  325. virtual IPropertyTreeIterator & getFileIterator() const;
  326. virtual bool archiveWorkUnit(const char *base,bool del,bool ignoredllerrors,bool deleteOwned,bool exportAssociatedFiles);
  327. virtual IJlibDateTime & getTimeScheduled(IJlibDateTime &val) const;
  328. virtual IPropertyTreeIterator & getFilesReadIterator() const;
  329. virtual void protect(bool protectMode);
  330. virtual IConstWULibrary * getLibraryByName(const char * name) const;
  331. virtual unsigned getDebugAgentListenerPort() const;
  332. virtual IStringVal & getDebugAgentListenerIP(IStringVal &ip) const;
  333. virtual unsigned getTotalThorTime() const;
  334. virtual IStringVal & getAbortBy(IStringVal & str) const;
  335. virtual unsigned __int64 getAbortTimeStamp() const;
  336. void clearExceptions();
  337. void commit();
  338. IWUException *createException();
  339. void addProcess(const char *type, const char *instance, unsigned pid, const char *log);
  340. void setAction(WUAction action);
  341. void setApplicationValue(const char * application, const char * propname, const char * value, bool overwrite);
  342. void setApplicationValueInt(const char * application, const char * propname, int value, bool overwrite);
  343. void incEventScheduledCount();
  344. void setIsQueryService(bool value);
  345. void setCloneable(bool value);
  346. void setIsClone(bool value);
  347. void setClusterName(const char * value);
  348. void setCodeVersion(unsigned version, const char * buildVersion, const char * eclVersion);
  349. void setDebugValue(const char * propname, const char * value, bool overwrite);
  350. void setDebugValueInt(const char * propname, int value, bool overwrite);
  351. void setJobName(const char * value);
  352. void setPriority(WUPriorityClass cls);
  353. void setPriorityLevel(int level);
  354. void setRescheduleFlag(bool value);
  355. void setResultLimit(unsigned value);
  356. void setState(WUState state);
  357. void setStateEx(const char * text);
  358. void setAgentSession(__int64 sessionId);
  359. void setSecurityToken(const char *value);
  360. void setStatistic(StatisticCreatorType creatorType, const char * creator, StatisticScopeType scopeType, const char * scope, StatisticKind kind, const char * optDescription, unsigned __int64 value, unsigned __int64 count, unsigned __int64 maxValue, StatsMergeAction mergeAction);
  361. void setTracingValue(const char * propname, const char * value);
  362. void setTracingValueInt(const char * propname, int value);
  363. void setTracingValueInt64(const char * propname, __int64 value);
  364. void setUser(const char * value);
  365. void setWarningSeverity(unsigned code, ErrorSeverity severity);
  366. void setWuScope(const char * value);
  367. void setSnapshot(const char * value);
  368. void setDebugAgentListenerPort(unsigned port);
  369. void setDebugAgentListenerIP(const char * ip);
  370. void setXmlParams(const char *params);
  371. void setXmlParams(IPropertyTree *tree);
  372. void setHash(unsigned __int64 hash);
  373. IWorkflowItem* addWorkflowItem(unsigned wfid, WFType type, WFMode mode, unsigned success, unsigned failure, unsigned recovery, unsigned retriesAllowed, unsigned contingencyFor);
  374. IWorkflowItemIterator * updateWorkflowItems();
  375. void syncRuntimeWorkflow(IWorkflowItemArray * array);
  376. void resetWorkflow();
  377. void schedule();
  378. void deschedule();
  379. unsigned addLocalFileUpload(LocalFileUploadType type, char const * source, char const * destination, char const * eventTag);
  380. IWUResult * updateGlobalByName(const char * name);
  381. void createGraph(const char * name, const char *label, WUGraphType type, IPropertyTree *xgmml);
  382. IWUQuery * updateQuery();
  383. IWUWebServicesInfo* updateWebServicesInfo(bool create);
  384. IWUPlugin * updatePluginByName(const char * name);
  385. IWULibrary * updateLibraryByName(const char * name);
  386. virtual IWUResult * updateResultByName(const char * name);
  387. virtual IWUResult * updateResultBySequence(unsigned seq);
  388. virtual IWUResult * updateTemporaryByName(const char * name);
  389. virtual IWUResult * updateVariableByName(const char * name);
  390. void addFile(const char *fileName, StringArray *clusters, unsigned usageCount, WUFileKind fileKind, const char *graphOwner);
  391. void noteFileRead(IDistributedFile *file);
  392. void noteFieldUsage(IPropertyTree * usage);
  393. void releaseFile(const char *fileName);
  394. void resetBeforeGeneration();
  395. void deleteTempFiles(const char *graph, bool deleteOwned, bool deleteJobOwned);
  396. void deleteTemporaries();
  397. void addDiskUsageStats(__int64 avgNodeUsage, unsigned minNode, __int64 minNodeUsage, unsigned maxNode, __int64 maxNodeUsage, __int64 graphId);
  398. void setTimeScheduled(const IJlibDateTime &val);
  399. virtual void subscribe(WUSubscribeOptions options) {};
  400. // ILocalWorkUnit - used for debugging etc
  401. void loadXML(const char *xml);
  402. void serialize(MemoryBuffer &tgt);
  403. void deserialize(MemoryBuffer &src);
  404. IWorkUnit &lockRemote(bool commit);
  405. void unlockRemote();
  406. void abort();
  407. bool switchThorQueue(const char *cluster, IQueueSwitcher *qs);
  408. void setAllowedClusters(const char *value);
  409. IStringVal & getAllowedClusters(IStringVal & str) const;
  410. void remoteCheckAccess(IUserDescriptor *user, bool writeaccess) const;
  411. void setAllowAutoQueueSwitch(bool val);
  412. bool getAllowAutoQueueSwitch() const;
  413. void setLibraryInformation(const char * name, unsigned interfaceHash, unsigned definitionHash);
  414. virtual void cleanupAndDelete(bool deldll,bool deleteOwned, const StringArray *deleteExclusions=NULL);
  415. virtual void setResultInt(const char * name, unsigned sequence, __int64 val)
  416. {
  417. Owned<IWUResult> r = updateResult(name, sequence);
  418. if (r)
  419. {
  420. r->setResultInt(val);
  421. r->setResultStatus(ResultStatusCalculated);
  422. }
  423. }
  424. virtual void setResultUInt(const char * name, unsigned sequence, unsigned __int64 val)
  425. {
  426. Owned<IWUResult> r = updateResult(name, sequence);
  427. if (r)
  428. {
  429. r->setResultUInt(val);
  430. r->setResultStatus(ResultStatusCalculated);
  431. }
  432. }
  433. virtual void setResultReal(const char *name, unsigned sequence, double val)
  434. {
  435. Owned<IWUResult> r = updateResult(name, sequence);
  436. if (r)
  437. {
  438. r->setResultReal(val);
  439. r->setResultStatus(ResultStatusCalculated);
  440. }
  441. }
  442. virtual void setResultVarString(const char * stepname, unsigned sequence, const char *val)
  443. {
  444. setResultString(stepname, sequence, strlen(val), val);
  445. }
  446. virtual void setResultVarUnicode(const char * stepname, unsigned sequence, UChar const *val)
  447. {
  448. setResultUnicode(stepname, sequence, rtlUnicodeStrlen(val), val);
  449. }
  450. virtual void setResultString(const char * stepname, unsigned sequence, int len, const char *val)
  451. {
  452. doSetResultString(type_string, stepname, sequence, len, val);
  453. }
  454. virtual void setResultData(const char * stepname, unsigned sequence, int len, const void *val)
  455. {
  456. doSetResultString(type_data, stepname, sequence, len, (const char *)val);
  457. }
  458. virtual void setResultRaw(const char * name, unsigned sequence, int len, const void *val)
  459. {
  460. Owned<IWUResult> r = updateResult(name, sequence);
  461. if (r)
  462. {
  463. r->setResultRaw(len, val, ResultFormatRaw);
  464. r->setResultStatus(ResultStatusCalculated);
  465. }
  466. }
  467. virtual void setResultSet(const char * name, unsigned sequence, bool isAll, size32_t len, const void *val, ISetToXmlTransformer *)
  468. {
  469. Owned<IWUResult> r = updateResult(name, sequence);
  470. if (r)
  471. {
  472. r->setResultIsAll(isAll);
  473. r->setResultRaw(len, val, ResultFormatRaw);
  474. r->setResultStatus(ResultStatusCalculated);
  475. }
  476. }
  477. virtual void setResultUnicode(const char * name, unsigned sequence, int len, UChar const * val)
  478. {
  479. Owned<IWUResult> r = updateResult(name, sequence);
  480. if (r)
  481. {
  482. r->setResultUnicode((char const *)val, len);
  483. r->setResultStatus(ResultStatusCalculated);
  484. }
  485. }
  486. virtual void setResultBool(const char *name, unsigned sequence, bool val)
  487. {
  488. Owned<IWUResult> r = updateResult(name, sequence);
  489. if (r)
  490. {
  491. r->setResultBool(val);
  492. r->setResultStatus(ResultStatusCalculated);
  493. }
  494. }
  495. virtual void setResultDecimal(const char *name, unsigned sequence, int len, int precision, bool isSigned, const void *val)
  496. {
  497. Owned<IWUResult> r = updateResult(name, sequence);
  498. if (r)
  499. {
  500. r->setResultDecimal(val, len);
  501. r->setResultStatus(ResultStatusCalculated);
  502. }
  503. }
  504. virtual void setResultDataset(const char * name, unsigned sequence, size32_t len, const void *val, unsigned numRows, bool extend)
  505. {
  506. Owned<IWUResult> r = updateResult(name, sequence);
  507. if (r)
  508. {
  509. __int64 totalRows = numRows;
  510. if (extend)
  511. {
  512. totalRows += r->getResultRowCount();
  513. r->addResultRaw(len, val, ResultFormatRaw);
  514. }
  515. else
  516. r->setResultRaw(len, val, ResultFormatRaw);
  517. r->setResultStatus(ResultStatusCalculated);
  518. r->setResultRowCount(totalRows);
  519. r->setResultTotalRowCount(totalRows);
  520. }
  521. }
  522. IWUResult *updateResult(const char *name, unsigned sequence)
  523. {
  524. Owned <IWUResult> result = updateWorkUnitResult(this, name, sequence);
  525. if (result)
  526. {
  527. SCMStringBuffer rname;
  528. if (!result->getResultName(rname).length())
  529. result->setResultName(name);
  530. }
  531. return result.getClear();
  532. }
  533. void doSetResultString(type_t type, const char *name, unsigned sequence, int len, const char *val)
  534. {
  535. Owned<IWUResult> r = updateResult(name, sequence);
  536. if (r)
  537. {
  538. r->setResultString(val, len);
  539. r->setResultStatus(ResultStatusCalculated);
  540. }
  541. }
  542. protected:
  543. void clearCached(bool clearTree);
  544. IWUResult *createResult();
  545. void loadGraphs(bool heavy) const;
  546. void loadResults() const;
  547. void loadTemporaries() const;
  548. void loadVariables() const;
  549. void loadExceptions() const;
  550. void loadPlugins() const;
  551. void loadLibraries() const;
  552. void loadClusters() const;
  553. void checkAgentRunning(WUState & state);
  554. // Implemented by derived classes
  555. virtual IPropertyTree *getGraphProgressTree() const { return NULL; };
  556. virtual void unsubscribe() {};
  557. virtual void _lockRemote() {};
  558. virtual void _unlockRemote() {};
  559. virtual void _loadFilesRead() const;
  560. virtual void _loadFilesWritten() const;
  561. virtual void _loadGraphs(bool heavy) const;
  562. virtual void _loadResults() const;
  563. virtual void _loadVariables() const;
  564. virtual void _loadTemporaries() const;
  565. virtual void _loadStatistics() const;
  566. virtual void _loadExceptions() const;
  567. };
  568. class WORKUNIT_API CPersistedWorkUnit : public CLocalWorkUnit, implements IWorkUnitSubscriber
  569. {
  570. public:
  571. CPersistedWorkUnit(ISecManager *secmgr, ISecUser *secuser) : CLocalWorkUnit(secmgr, secuser)
  572. {
  573. abortDirty = true;
  574. abortState = false;
  575. }
  576. virtual void subscribe(WUSubscribeOptions options);
  577. virtual void unsubscribe();
  578. virtual bool aborting() const;
  579. protected:
  580. virtual void notify(WUSubscribeOptions) { abortDirty = true; }
  581. Owned<IWorkUnitWatcher> abortWatcher;
  582. mutable bool abortDirty;
  583. mutable bool abortState;
  584. };
  585. class WORKUNIT_API CWorkUnitFactory : implements IWorkUnitFactory, public CInterface
  586. {
  587. public:
  588. IMPLEMENT_IINTERFACE;
  589. CWorkUnitFactory();
  590. ~CWorkUnitFactory();
  591. // interface IWorkUnitFactory - some are left for derived classes
  592. virtual IWorkUnit * createWorkUnit(const char * app, const char * user, ISecManager *secmgr, ISecUser *secuser);
  593. virtual bool deleteWorkUnit(const char * wuid, ISecManager *secmgr, ISecUser *secuser);
  594. virtual IConstWorkUnit * openWorkUnit(const char * wuid, ISecManager *secmgr, ISecUser *secuser);
  595. virtual IWorkUnit * updateWorkUnit(const char * wuid, ISecManager *secmgr, ISecUser *secuser);
  596. virtual bool restoreWorkUnit(const char *base, const char *wuid, bool restoreAssociated);
  597. virtual int setTracingLevel(int newlevel);
  598. virtual IWorkUnit * createNamedWorkUnit(const char * wuid, const char * app, const char *scope, ISecManager *secmgr, ISecUser *secuser);
  599. virtual IWorkUnit * getGlobalWorkUnit(ISecManager *secmgr, ISecUser *secuser) = 0;
  600. virtual IConstWorkUnitIterator * getWorkUnitsByOwner(const char * owner, ISecManager *secmgr, ISecUser *secuser) = 0;
  601. virtual IConstWorkUnitIterator* getWorkUnitsSorted(WUSortField sortorder, // field to sort by
  602. WUSortField *filters, // NULL or list of fields to filter on (terminated by WUSFterm)
  603. const void *filterbuf, // (appended) string values for filters
  604. unsigned startoffset,
  605. unsigned maxnum,
  606. __int64 *cachehint,
  607. unsigned *total,
  608. ISecManager *secmgr,
  609. ISecUser *secuser) = 0;
  610. virtual unsigned numWorkUnits() = 0;
  611. virtual IConstWorkUnitIterator *getScheduledWorkUnits(ISecManager *secmgr = NULL, ISecUser *secuser = NULL) = 0;
  612. virtual void descheduleAllWorkUnits(ISecManager *secmgr, ISecUser *secuser);
  613. virtual IConstQuerySetQueryIterator * getQuerySetQueriesSorted(WUQuerySortField *sortorder, WUQuerySortField *filters, const void *filterbuf, unsigned startoffset, unsigned maxnum, __int64 *cachehint, unsigned *total, const MapStringTo<bool> *subset);
  614. virtual bool isAborting(const char *wuid) const;
  615. virtual void clearAborting(const char *wuid);
  616. virtual WUState waitForWorkUnit(const char * wuid, unsigned timeout, bool compiled, bool returnOnWaitState) = 0;
  617. virtual StringArray &getUniqueValues(WUSortField field, const char *prefix, StringArray &result) const
  618. {
  619. return result; // Default behaviour if we can't implement efficiently is to return empty list
  620. }
  621. protected:
  622. void reportAbnormalTermination(const char *wuid, WUState &state, SessionId agent);
  623. // These need to be implemented by the derived classes
  624. virtual CLocalWorkUnit* _createWorkUnit(const char *wuid, ISecManager *secmgr, ISecUser *secuser) = 0;
  625. virtual CLocalWorkUnit* _openWorkUnit(const char *wuid, ISecManager *secmgr, ISecUser *secuser) = 0; // for read access
  626. virtual CLocalWorkUnit* _updateWorkUnit(const char *wuid, ISecManager *secmgr, ISecUser *secuser) = 0; // for write access
  627. virtual bool _restoreWorkUnit(IPTree *pt, const char *wuid) = 0;
  628. };
  629. class WORKUNIT_API CLocalWUGraph : implements IConstWUGraph, public CInterface
  630. {
  631. const CLocalWorkUnit &owner;
  632. Owned<IPropertyTree> p;
  633. mutable Owned<IPropertyTree> graph; // cached copy of graph xgmml
  634. unsigned wuidVersion;
  635. void mergeProgress(IPropertyTree &tree, IPropertyTree &progressTree, const unsigned &progressV) const;
  636. public:
  637. IMPLEMENT_IINTERFACE;
  638. CLocalWUGraph(const CLocalWorkUnit &owner, IPropertyTree *p);
  639. virtual IStringVal & getXGMML(IStringVal & ret, bool mergeProgress) const;
  640. virtual IStringVal & getName(IStringVal & ret) const;
  641. virtual IStringVal & getLabel(IStringVal & ret) const;
  642. virtual IStringVal & getTypeName(IStringVal & ret) const;
  643. virtual WUGraphType getType() const;
  644. virtual WUGraphState getState() const;
  645. virtual IPropertyTree * getXGMMLTree(bool mergeProgress) const;
  646. virtual IPropertyTree * getXGMMLTreeRaw() const;
  647. void setName(const char *str);
  648. void setLabel(const char *str);
  649. void setType(WUGraphType type);
  650. void setXGMML(const char *str);
  651. void setXGMMLTree(IPropertyTree * tree);
  652. };
  653. class WORKUNIT_API CWuGraphStats : public CInterfaceOf<IWUGraphStats>
  654. {
  655. public:
  656. CWuGraphStats(IPropertyTree *_progress, StatisticCreatorType _creatorType, const char * _creator, const char * _rootScope, unsigned _id);
  657. virtual void beforeDispose();
  658. virtual IStatisticGatherer & queryStatsBuilder();
  659. protected:
  660. Owned<IPropertyTree> progress;
  661. Owned<IStatisticGatherer> collector;
  662. StringAttr creator;
  663. StatisticCreatorType creatorType;
  664. unsigned id;
  665. };
  666. class WORKUNIT_API CWorkUnitWatcher : public CInterface, implements IWorkUnitWatcher, implements ISDSSubscription
  667. {
  668. protected:
  669. CriticalSection crit;
  670. IWorkUnitSubscriber *subscriber; // not linked - it will generally link me
  671. SubscriptionId abortId, stateId, actionId;
  672. public:
  673. IMPLEMENT_IINTERFACE;
  674. CWorkUnitWatcher(IWorkUnitSubscriber *_subscriber, WUSubscribeOptions flags, const char *wuid);
  675. ~CWorkUnitWatcher();
  676. void unsubscribe();
  677. void notify(SubscriptionId id, const char *xpath, SDSNotifyFlags flags, unsigned valueLen, const void *valueData);
  678. };
  679. class WorkUnitWaiter : public CInterface, implements IAbortHandler, implements IWorkUnitSubscriber
  680. {
  681. Semaphore changed;
  682. Owned<IWorkUnitWatcher> watcher;
  683. public:
  684. IMPLEMENT_IINTERFACE;
  685. WorkUnitWaiter(const char *wuid, WUSubscribeOptions watchFor)
  686. {
  687. Owned<IWorkUnitFactory> factory = getWorkUnitFactory();
  688. watcher.setown(factory->getWatcher(this, watchFor, wuid));
  689. aborted = false;
  690. }
  691. ~WorkUnitWaiter()
  692. {
  693. unsubscribe();
  694. }
  695. void notify(WUSubscribeOptions flags)
  696. {
  697. changed.signal();
  698. }
  699. bool wait(unsigned timeout)
  700. {
  701. return changed.wait(timeout) && !aborted;
  702. }
  703. bool onAbort()
  704. {
  705. aborted = true;
  706. changed.signal();
  707. return false;
  708. }
  709. void unsubscribe()
  710. {
  711. if (watcher)
  712. {
  713. watcher->unsubscribe();
  714. watcher.clear();
  715. }
  716. }
  717. bool aborted;
  718. };
  719. #define PROGRESS_FORMAT_V 2
  720. extern WORKUNIT_API IConstWUGraphProgress *createConstGraphProgress(const char *_wuid, const char *_graphName, IPropertyTree *_progress);
  721. extern WORKUNIT_API IPropertyTree * pruneBranch(IPropertyTree * from, char const * xpath);
  722. #endif