workunit.ipp 32 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774
  1. /*##############################################################################
  2. HPCC SYSTEMS software Copyright (C) 2014 HPCC Systems.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. ############################################################################## */
  13. #ifndef WORKUNIT_IPP_INCL
  14. #include "seclib.hpp"
  15. #include "dasess.hpp" /// For IUserDescriptor
  16. #include "dasds.hpp"
  17. #include "workunit.hpp"
  18. /* NB: Some of the classes in this file are also used from casandrawu - which means they all need WORKUNIT_API */
  19. #define SDS_LOCK_TIMEOUT (5*60*1000) // 5 mins
  20. #define WUID_VERSION 2 // recorded in each wuid created, useful for bkwd compat. checks
  21. #define GLOBAL_WORKUNIT "global"
  22. class WORKUNIT_API CLocalWUAppValue : implements IConstWUAppValue, public CInterface
  23. {
  24. Owned<IPropertyTree> p;
  25. StringAttr prop;
  26. public:
  27. IMPLEMENT_IINTERFACE;
  28. CLocalWUAppValue(IPropertyTree *p,unsigned child);
  29. virtual const char *queryApplication() const;
  30. virtual const char *queryName() const;
  31. virtual const char *queryValue() const;
  32. };
  33. class WORKUNIT_API CLocalWUStatistic : implements IConstWUStatistic, public CInterface
  34. {
  35. Owned<IPropertyTree> p;
  36. public:
  37. IMPLEMENT_IINTERFACE;
  38. CLocalWUStatistic(IPropertyTree *p);
  39. virtual IStringVal & getCreator(IStringVal & str) const override;
  40. virtual IStringVal & getDescription(IStringVal & str, bool createDefault) const override;
  41. virtual IStringVal & getFormattedValue(IStringVal & str) const override;
  42. virtual const char * queryScope() const override;
  43. virtual StatisticMeasure getMeasure() const override;
  44. virtual StatisticCreatorType getCreatorType() const override;
  45. virtual StatisticScopeType getScopeType() const override;
  46. virtual StatisticKind getKind() const override;
  47. virtual unsigned __int64 getValue() const override;
  48. virtual unsigned __int64 getCount() const override;
  49. virtual unsigned __int64 getMax() const override;
  50. virtual unsigned __int64 getTimestamp() const override;
  51. };
  52. //==========================================================================================
  53. template <typename T, typename IT> struct CachedTags
  54. {
  55. CachedTags(): cached(false) {}
  56. void load(IPropertyTree* p,const char* xpath)
  57. {
  58. if (!cached)
  59. {
  60. assertex(tags.length() == 0);
  61. Owned<IPropertyTreeIterator> r = p->getElements(xpath);
  62. for (r->first(); r->isValid(); r->next())
  63. {
  64. IPropertyTree *rp = &r->query();
  65. rp->Link();
  66. tags.append(*new T(rp));
  67. }
  68. cached = true;
  69. }
  70. }
  71. void loadBranch(IPropertyTree* p,const char* xpath)
  72. {
  73. if (!cached)
  74. {
  75. assertex(tags.length() == 0);
  76. Owned<IPropertyTree> branch = p->getBranch(xpath);
  77. if (branch)
  78. {
  79. Owned<IPropertyTreeIterator> r = branch->getElements("*");
  80. for (r->first(); r->isValid(); r->next())
  81. {
  82. IPropertyTree *rp = &r->query();
  83. rp->Link();
  84. tags.append(*new T(rp));
  85. }
  86. }
  87. cached = true;
  88. }
  89. }
  90. void append(IPropertyTree * p)
  91. {
  92. tags.append(*new T(p));
  93. }
  94. operator IArrayOf<IT>&() { return tags; }
  95. unsigned ordinality() const { return tags.ordinality(); }
  96. IT & item(unsigned i) const { return tags.item(i); }
  97. void kill()
  98. {
  99. cached = false;
  100. tags.kill();
  101. }
  102. bool cached;
  103. IArrayOf<IT> tags;
  104. };
  105. template <> struct CachedTags<CLocalWUAppValue, IConstWUAppValue>
  106. {
  107. CachedTags(): cached(false) {}
  108. void load(IPropertyTree* p,const char* xpath)
  109. {
  110. if (!cached)
  111. {
  112. assertex(tags.length() == 0);
  113. Owned<IPropertyTreeIterator> r = p->getElements(xpath);
  114. for (r->first(); r->isValid(); r->next())
  115. {
  116. IPropertyTree *rp = &r->query();
  117. Owned<IPropertyTreeIterator> v = rp->getElements("*");
  118. unsigned pos = 1;
  119. for (v->first(); v->isValid(); v->next())
  120. {
  121. rp->Link();
  122. tags.append(*new CLocalWUAppValue(rp,pos++));
  123. }
  124. }
  125. cached = true;
  126. }
  127. }
  128. void loadBranch(IPropertyTree* p,const char* xpath)
  129. {
  130. if (!cached)
  131. {
  132. assertex(tags.length() == 0);
  133. Owned<IPropertyTree> branch = p->getBranch(xpath);
  134. if (branch)
  135. {
  136. Owned<IPropertyTreeIterator> r = branch->getElements("*");
  137. for (r->first(); r->isValid(); r->next())
  138. {
  139. IPropertyTree *rp = &r->query();
  140. Owned<IPropertyTreeIterator> v = rp->getElements("*");
  141. unsigned pos = 1;
  142. for (v->first(); v->isValid(); v->next())
  143. {
  144. rp->Link();
  145. tags.append(*new CLocalWUAppValue(rp,pos++));
  146. }
  147. }
  148. }
  149. cached = true;
  150. }
  151. }
  152. operator IArrayOf<IConstWUAppValue>&() { return tags; }
  153. void kill()
  154. {
  155. cached = false;
  156. tags.kill();
  157. }
  158. bool cached;
  159. IArrayOf<IConstWUAppValue> tags;
  160. };
  161. //==========================================================================================
  162. class WORKUNIT_API CLocalWorkUnit : implements IWorkUnit , implements IExtendedWUInterface, public CInterface
  163. {
  164. friend StringBuffer &exportWorkUnitToXML(const IConstWorkUnit *wu, StringBuffer &str, bool decodeGraphs, bool includeProgress, bool hidePasswords);
  165. friend void exportWorkUnitToXMLFile(const IConstWorkUnit *wu, const char * filename, unsigned extraXmlFlags, bool decodeGraphs, bool includeProgress, bool hidePasswords, bool regressionTest);
  166. protected:
  167. Owned<IPropertyTree> p;
  168. mutable CriticalSection crit;
  169. mutable Owned<IWUQuery> query;
  170. mutable Owned<IWUWebServicesInfo> webServicesInfo;
  171. mutable Owned<IWorkflowItemIterator> workflowIterator;
  172. mutable bool workflowIteratorCached;
  173. mutable bool resultsCached;
  174. mutable unsigned char graphsCached; // 0 means uncached, 1 means light, 2 means heavy
  175. mutable bool temporariesCached;
  176. mutable bool variablesCached;
  177. mutable bool exceptionsCached;
  178. mutable bool pluginsCached;
  179. mutable bool librariesCached;
  180. mutable bool activitiesCached;
  181. mutable bool webServicesInfoCached;
  182. mutable bool roxieQueryInfoCached;
  183. mutable IArrayOf<IWUPlugin> plugins;
  184. mutable IArrayOf<IWULibrary> libraries;
  185. mutable IArrayOf<IWUException> exceptions;
  186. mutable IArrayOf<IConstWUGraph> graphs;
  187. mutable IArrayOf<IWUResult> results;
  188. mutable IArrayOf<IWUResult> temporaries;
  189. mutable IArrayOf<IWUResult> variables;
  190. mutable CachedTags<CLocalWUAppValue,IConstWUAppValue> appvalues;
  191. mutable CachedTags<CLocalWUStatistic,IConstWUStatistic> statistics;
  192. mutable Owned<IUserDescriptor> userDesc;
  193. Mutex locked;
  194. Owned<ISecManager> secMgr;
  195. Owned<ISecUser> secUser;
  196. protected:
  197. public:
  198. IMPLEMENT_IINTERFACE;
  199. CLocalWorkUnit(ISecManager *secmgr, ISecUser *secuser);
  200. void loadPTree(IPropertyTree *ptree);
  201. void beforeDispose();
  202. IPropertyTree *getUnpackedTree(bool includeProgress) const;
  203. ISecManager *querySecMgr() { return secMgr.get(); }
  204. ISecUser *querySecUser() { return secUser.get(); }
  205. virtual bool aborting() const { return false; }
  206. virtual void forceReload() {};
  207. virtual WUAction getAction() const;
  208. virtual const char *queryActionDesc() const;
  209. virtual IStringVal & getApplicationValue(const char * application, const char * propname, IStringVal & str) const;
  210. virtual int getApplicationValueInt(const char * application, const char * propname, int defVal) const;
  211. virtual IConstWUAppValueIterator & getApplicationValues() const;
  212. virtual bool hasWorkflow() const;
  213. virtual unsigned queryEventScheduledCount() const;
  214. virtual IPropertyTree * queryWorkflowTree() const;
  215. virtual IConstWorkflowItemIterator * getWorkflowItems() const;
  216. virtual IWorkflowItemArray * getWorkflowClone() const;
  217. virtual IConstLocalFileUploadIterator * getLocalFileUploads() const;
  218. virtual bool requiresLocalFileUpload() const;
  219. virtual bool getIsQueryService() const;
  220. virtual const char *queryClusterName() const;
  221. virtual bool hasDebugValue(const char * propname) const;
  222. virtual IStringVal & getDebugValue(const char * propname, IStringVal & str) const;
  223. virtual IStringIterator & getDebugValues() const;
  224. virtual IStringIterator & getDebugValues(const char *prop) const;
  225. virtual int getDebugValueInt(const char * propname, int defVal) const;
  226. virtual __int64 getDebugValueInt64(const char * propname, __int64 defVal) const;
  227. virtual bool getDebugValueBool(const char * propname, bool defVal) const;
  228. virtual unsigned getExceptionCount() const;
  229. virtual IConstWUExceptionIterator & getExceptions() const;
  230. virtual IConstWUResult * getGlobalByName(const char * name) const;
  231. virtual unsigned getGraphCount() const;
  232. virtual unsigned getSourceFileCount() const;
  233. virtual unsigned getResultCount() const;
  234. virtual unsigned getVariableCount() const;
  235. virtual unsigned getApplicationValueCount() const;
  236. virtual IConstWUGraphIterator & getGraphs(WUGraphType type) const;
  237. virtual IConstWUGraphMetaIterator & getGraphsMeta(WUGraphType type) const;
  238. virtual IConstWUGraph * getGraph(const char *name) const;
  239. virtual IConstWUGraphProgress * getGraphProgress(const char * name) const;
  240. virtual WUGraphState queryGraphState(const char *graphName) const;
  241. virtual void setGraphState(const char *graphName, unsigned wfid, WUGraphState state) const;
  242. virtual void setNodeState(const char *graphName, WUGraphIDType nodeId, WUGraphState state) const;
  243. virtual WUGraphState queryNodeState(const char *graphName, WUGraphIDType nodeId) const;
  244. virtual IWUGraphStats *updateStats(const char *graphName, StatisticCreatorType creatorType, const char * creator, unsigned _wfid, unsigned subgraph) const override;
  245. void clearGraphProgress() const;
  246. virtual const char *queryJobName() const;
  247. virtual IConstWUPlugin * getPluginByName(const char * name) const;
  248. virtual IConstWUPluginIterator & getPlugins() const;
  249. virtual IConstWULibraryIterator & getLibraries() const;
  250. virtual WUPriorityClass getPriority() const;
  251. virtual const char *queryPriorityDesc() const;
  252. virtual int getPriorityLevel() const;
  253. virtual int getPriorityValue() const;
  254. virtual IConstWUQuery * getQuery() const;
  255. virtual bool getRescheduleFlag() const;
  256. virtual IConstWUResult * getResultByName(const char * name) const;
  257. virtual IConstWUResult * getResultBySequence(unsigned seq) const;
  258. virtual IConstWUResult * getQueryResultByName(const char *qname) const;
  259. virtual unsigned getResultLimit() const;
  260. virtual IConstWUResultIterator & getResults() const;
  261. virtual IStringVal & getScope(IStringVal & str) const;
  262. virtual IStringVal & getWorkunitDistributedAccessToken(IStringVal & tok) const;
  263. virtual WUState getState() const;
  264. virtual IStringVal & getStateEx(IStringVal & str) const;
  265. virtual __int64 getAgentSession() const;
  266. virtual unsigned getAgentPID() const;
  267. virtual const char *queryStateDesc() const;
  268. virtual IConstWUResult * getTemporaryByName(const char * name) const;
  269. virtual IConstWUResultIterator & getTemporaries() const;
  270. virtual IConstWUScopeIterator & getScopeIterator(const WuScopeFilter & filter) const override;
  271. virtual bool getStatistic(stat_type & value, const char * scope, StatisticKind kind) const override;
  272. virtual IConstWUWebServicesInfo * getWebServicesInfo() const;
  273. virtual IStringVal & getXmlParams(IStringVal & params, bool hidePasswords) const;
  274. virtual const IPropertyTree *getXmlParams() const;
  275. virtual unsigned __int64 getHash() const;
  276. virtual IStringIterator *getLogs(const char *type, const char *component) const;
  277. virtual IStringIterator *getProcesses(const char *type) const;
  278. virtual IPropertyTreeIterator* getProcesses(const char *type, const char *instance) const;
  279. virtual IStringVal & getSnapshot(IStringVal & str) const;
  280. virtual ErrorSeverity getWarningSeverity(unsigned code, ErrorSeverity defaultSeverity) const;
  281. virtual const char *queryUser() const;
  282. virtual const char *queryWuScope() const;
  283. virtual IConstWUResult * getVariableByName(const char * name) const;
  284. virtual IConstWUResultIterator & getVariables() const;
  285. virtual const char *queryWuid() const;
  286. virtual bool getRunningGraph(IStringVal &graphName, WUGraphIDType &subId) const;
  287. virtual bool isProtected() const;
  288. virtual bool isPausing() const;
  289. virtual IWorkUnit& lock();
  290. virtual void requestAbort();
  291. virtual unsigned calculateHash(unsigned prevHash);
  292. virtual void copyWorkUnit(IConstWorkUnit *cached, bool copyStats, bool all);
  293. virtual IPropertyTree *queryPTree() const;
  294. virtual unsigned queryFileUsage(const char *filename) const;
  295. virtual IConstWUFileUsageIterator * getFieldUsage() const;
  296. virtual bool getFieldUsageArray(StringArray & filenames, StringArray & columnnames, const char * clusterName) const;
  297. virtual bool getCloneable() const;
  298. virtual IUserDescriptor * queryUserDescriptor() const;
  299. virtual unsigned getCodeVersion() const;
  300. virtual unsigned getWuidVersion() const;
  301. virtual void getBuildVersion(IStringVal & buildVersion, IStringVal & eclVersion) const;
  302. virtual IPropertyTree * getDiskUsageStats();
  303. virtual IPropertyTreeIterator & getFileIterator() const;
  304. virtual bool archiveWorkUnit(const char *base,bool del,bool ignoredllerrors,bool deleteOwned,bool exportAssociatedFiles);
  305. virtual IJlibDateTime & getTimeScheduled(IJlibDateTime &val) const;
  306. virtual IPropertyTreeIterator & getFilesReadIterator() const;
  307. virtual void protect(bool protectMode);
  308. virtual IConstWULibrary * getLibraryByName(const char * name) const;
  309. virtual unsigned getDebugAgentListenerPort() const;
  310. virtual IStringVal & getDebugAgentListenerIP(IStringVal &ip) const;
  311. virtual unsigned getTotalThorTime() const;
  312. virtual IStringVal & getAbortBy(IStringVal & str) const;
  313. virtual unsigned __int64 getAbortTimeStamp() const;
  314. void clearExceptions(const char *source=nullptr);
  315. void commit();
  316. IWUException *createException();
  317. void addProcess(const char *type, const char *instance, unsigned pid, const char *log);
  318. void setAction(WUAction action);
  319. void setApplicationValue(const char * application, const char * propname, const char * value, bool overwrite);
  320. void setApplicationValueInt(const char * application, const char * propname, int value, bool overwrite);
  321. void incEventScheduledCount();
  322. void setIsQueryService(bool value);
  323. void setCloneable(bool value);
  324. void setIsClone(bool value);
  325. void setClusterName(const char * value);
  326. void setCodeVersion(unsigned version, const char * buildVersion, const char * eclVersion);
  327. void setDebugValue(const char * propname, const char * value, bool overwrite);
  328. void setDebugValueInt(const char * propname, int value, bool overwrite);
  329. void setJobName(const char * value);
  330. void setPriority(WUPriorityClass cls);
  331. void setPriorityLevel(int level);
  332. void setRescheduleFlag(bool value);
  333. void setResultLimit(unsigned value);
  334. void setState(WUState state);
  335. void setStateEx(const char * text);
  336. void setAgentSession(__int64 sessionId);
  337. bool setDistributedAccessToken(const char * user);
  338. void setStatistic(StatisticCreatorType creatorType, const char * creator, StatisticScopeType scopeType, const char * scope, StatisticKind kind, const char * optDescription, unsigned __int64 value, unsigned __int64 count, unsigned __int64 maxValue, StatsMergeAction mergeAction);
  339. void setTracingValue(const char * propname, const char * value);
  340. void setTracingValueInt(const char * propname, int value);
  341. void setTracingValueInt64(const char * propname, __int64 value);
  342. void setUser(const char * value);
  343. void setWarningSeverity(unsigned code, ErrorSeverity severity);
  344. void setWuScope(const char * value);
  345. void setSnapshot(const char * value);
  346. void setDebugAgentListenerPort(unsigned port);
  347. void setDebugAgentListenerIP(const char * ip);
  348. void setXmlParams(const char *params);
  349. void setXmlParams(IPropertyTree *tree);
  350. void setHash(unsigned __int64 hash);
  351. IWorkflowItem* addWorkflowItem(unsigned wfid, WFType type, WFMode mode, unsigned success, unsigned failure, unsigned recovery, unsigned retriesAllowed, unsigned contingencyFor);
  352. IWorkflowItemIterator * updateWorkflowItems();
  353. void syncRuntimeWorkflow(IWorkflowItemArray * array);
  354. void resetWorkflow();
  355. void schedule();
  356. void deschedule();
  357. unsigned addLocalFileUpload(LocalFileUploadType type, char const * source, char const * destination, char const * eventTag);
  358. IWUResult * updateGlobalByName(const char * name);
  359. void createGraph(const char * name, const char *label, WUGraphType type, IPropertyTree *xgmml, unsigned wfid);
  360. IWUQuery * updateQuery();
  361. IWUWebServicesInfo* updateWebServicesInfo(bool create);
  362. IWUPlugin * updatePluginByName(const char * name);
  363. IWULibrary * updateLibraryByName(const char * name);
  364. virtual IWUResult * updateResultByName(const char * name);
  365. virtual IWUResult * updateResultBySequence(unsigned seq);
  366. virtual IWUResult * updateTemporaryByName(const char * name);
  367. virtual IWUResult * updateVariableByName(const char * name);
  368. void addFile(const char *fileName, StringArray *clusters, unsigned usageCount, WUFileKind fileKind, const char *graphOwner);
  369. void noteFileRead(IDistributedFile *file);
  370. void noteFieldUsage(IPropertyTree * usage);
  371. void releaseFile(const char *fileName);
  372. void resetBeforeGeneration();
  373. void deleteTempFiles(const char *graph, bool deleteOwned, bool deleteJobOwned);
  374. void deleteTemporaries();
  375. void addDiskUsageStats(__int64 avgNodeUsage, unsigned minNode, __int64 minNodeUsage, unsigned maxNode, __int64 maxNodeUsage, __int64 graphId);
  376. void setTimeScheduled(const IJlibDateTime &val);
  377. virtual void subscribe(WUSubscribeOptions options) {};
  378. // ILocalWorkUnit - used for debugging etc
  379. void loadXML(const char *xml);
  380. void serialize(MemoryBuffer &tgt);
  381. void deserialize(MemoryBuffer &src);
  382. IWorkUnit &lockRemote(bool commit);
  383. void unlockRemote();
  384. void abort();
  385. bool switchThorQueue(const char *cluster, IQueueSwitcher *qs);
  386. void setAllowedClusters(const char *value);
  387. IStringVal & getAllowedClusters(IStringVal & str) const;
  388. void remoteCheckAccess(IUserDescriptor *user, bool writeaccess) const;
  389. void setAllowAutoQueueSwitch(bool val);
  390. bool getAllowAutoQueueSwitch() const;
  391. void setLibraryInformation(const char * name, unsigned interfaceHash, unsigned definitionHash);
  392. virtual void cleanupAndDelete(bool deldll,bool deleteOwned, const StringArray *deleteExclusions=NULL);
  393. virtual void setResultInt(const char * name, unsigned sequence, __int64 val)
  394. {
  395. Owned<IWUResult> r = updateResult(name, sequence);
  396. if (r)
  397. {
  398. r->setResultInt(val);
  399. r->setResultStatus(ResultStatusCalculated);
  400. }
  401. }
  402. virtual void setResultUInt(const char * name, unsigned sequence, unsigned __int64 val)
  403. {
  404. Owned<IWUResult> r = updateResult(name, sequence);
  405. if (r)
  406. {
  407. r->setResultUInt(val);
  408. r->setResultStatus(ResultStatusCalculated);
  409. }
  410. }
  411. virtual void setResultReal(const char *name, unsigned sequence, double val)
  412. {
  413. Owned<IWUResult> r = updateResult(name, sequence);
  414. if (r)
  415. {
  416. r->setResultReal(val);
  417. r->setResultStatus(ResultStatusCalculated);
  418. }
  419. }
  420. virtual void setResultVarString(const char * stepname, unsigned sequence, const char *val)
  421. {
  422. setResultString(stepname, sequence, strlen(val), val);
  423. }
  424. virtual void setResultVarUnicode(const char * stepname, unsigned sequence, UChar const *val)
  425. {
  426. setResultUnicode(stepname, sequence, rtlUnicodeStrlen(val), val);
  427. }
  428. virtual void setResultString(const char * stepname, unsigned sequence, int len, const char *val)
  429. {
  430. doSetResultString(type_string, stepname, sequence, len, val);
  431. }
  432. virtual void setResultData(const char * stepname, unsigned sequence, int len, const void *val)
  433. {
  434. doSetResultString(type_data, stepname, sequence, len, (const char *)val);
  435. }
  436. virtual void setResultRaw(const char * name, unsigned sequence, int len, const void *val)
  437. {
  438. Owned<IWUResult> r = updateResult(name, sequence);
  439. if (r)
  440. {
  441. r->setResultRaw(len, val, ResultFormatRaw);
  442. r->setResultStatus(ResultStatusCalculated);
  443. }
  444. }
  445. virtual void setResultSet(const char * name, unsigned sequence, bool isAll, size32_t len, const void *val, ISetToXmlTransformer *)
  446. {
  447. Owned<IWUResult> r = updateResult(name, sequence);
  448. if (r)
  449. {
  450. r->setResultIsAll(isAll);
  451. r->setResultRaw(len, val, ResultFormatRaw);
  452. r->setResultStatus(ResultStatusCalculated);
  453. }
  454. }
  455. virtual void setResultUnicode(const char * name, unsigned sequence, int len, UChar const * val)
  456. {
  457. Owned<IWUResult> r = updateResult(name, sequence);
  458. if (r)
  459. {
  460. r->setResultUnicode((char const *)val, len);
  461. r->setResultStatus(ResultStatusCalculated);
  462. }
  463. }
  464. virtual void setResultBool(const char *name, unsigned sequence, bool val)
  465. {
  466. Owned<IWUResult> r = updateResult(name, sequence);
  467. if (r)
  468. {
  469. r->setResultBool(val);
  470. r->setResultStatus(ResultStatusCalculated);
  471. }
  472. }
  473. virtual void setResultDecimal(const char *name, unsigned sequence, int len, int precision, bool isSigned, const void *val)
  474. {
  475. Owned<IWUResult> r = updateResult(name, sequence);
  476. if (r)
  477. {
  478. r->setResultDecimal(val, len);
  479. r->setResultStatus(ResultStatusCalculated);
  480. }
  481. }
  482. virtual void setResultDataset(const char * name, unsigned sequence, size32_t len, const void *val, unsigned numRows, bool extend)
  483. {
  484. Owned<IWUResult> r = updateResult(name, sequence);
  485. if (r)
  486. {
  487. __int64 totalRows = numRows;
  488. if (extend)
  489. {
  490. totalRows += r->getResultRowCount();
  491. r->addResultRaw(len, val, ResultFormatRaw);
  492. }
  493. else
  494. r->setResultRaw(len, val, ResultFormatRaw);
  495. r->setResultStatus(ResultStatusCalculated);
  496. r->setResultRowCount(totalRows);
  497. r->setResultTotalRowCount(totalRows);
  498. }
  499. }
  500. IWUResult *updateResult(const char *name, unsigned sequence)
  501. {
  502. Owned <IWUResult> result = updateWorkUnitResult(this, name, sequence);
  503. if (result)
  504. {
  505. SCMStringBuffer rname;
  506. if (!result->getResultName(rname).length())
  507. result->setResultName(name);
  508. }
  509. return result.getClear();
  510. }
  511. void doSetResultString(type_t type, const char *name, unsigned sequence, int len, const char *val)
  512. {
  513. Owned<IWUResult> r = updateResult(name, sequence);
  514. if (r)
  515. {
  516. r->setResultString(val, len);
  517. r->setResultStatus(ResultStatusCalculated);
  518. }
  519. }
  520. protected:
  521. void clearCached(bool clearTree);
  522. IWUResult *createResult();
  523. void loadGraphs(bool heavy) const;
  524. void loadResults() const;
  525. void loadTemporaries() const;
  526. void loadVariables() const;
  527. void loadExceptions() const;
  528. void loadPlugins() const;
  529. void loadLibraries() const;
  530. void loadClusters() const;
  531. void checkAgentRunning(WUState & state);
  532. // Implemented by derived classes
  533. virtual IPropertyTree *getGraphProgressTree() const { return NULL; };
  534. virtual void unsubscribe() {};
  535. virtual void _lockRemote() {};
  536. virtual void _unlockRemote() {};
  537. virtual void _loadFilesRead() const;
  538. virtual void _loadFilesWritten() const;
  539. virtual void _loadGraphs(bool heavy) const;
  540. virtual void _loadResults() const;
  541. virtual void _loadVariables() const;
  542. virtual void _loadTemporaries() const;
  543. virtual void _loadStatistics() const;
  544. virtual void _loadExceptions() const;
  545. };
  546. class WORKUNIT_API CPersistedWorkUnit : public CLocalWorkUnit, implements IWorkUnitSubscriber
  547. {
  548. public:
  549. CPersistedWorkUnit(ISecManager *secmgr, ISecUser *secuser) : CLocalWorkUnit(secmgr, secuser)
  550. {
  551. abortDirty = true;
  552. abortState = false;
  553. }
  554. virtual void subscribe(WUSubscribeOptions options);
  555. virtual void unsubscribe();
  556. virtual bool aborting() const;
  557. protected:
  558. virtual void notify(WUSubscribeOptions, unsigned, const void *) override { abortDirty = true; }
  559. Owned<IWorkUnitWatcher> abortWatcher;
  560. mutable bool abortDirty;
  561. mutable bool abortState;
  562. };
  563. class WORKUNIT_API CWorkUnitFactory : implements IWorkUnitFactory, public CInterface
  564. {
  565. public:
  566. IMPLEMENT_IINTERFACE;
  567. CWorkUnitFactory();
  568. ~CWorkUnitFactory();
  569. // interface IWorkUnitFactory - some are left for derived classes
  570. virtual IWorkUnit * createWorkUnit(const char * app, const char * user, ISecManager *secmgr, ISecUser *secuser);
  571. virtual bool deleteWorkUnit(const char * wuid, ISecManager *secmgr, ISecUser *secuser);
  572. virtual bool deleteWorkUnitEx(const char * wuid, bool throwException, ISecManager *secmgr, ISecUser *secuser);
  573. virtual IConstWorkUnit * openWorkUnit(const char * wuid, ISecManager *secmgr, ISecUser *secuser);
  574. virtual IWorkUnit * updateWorkUnit(const char * wuid, ISecManager *secmgr, ISecUser *secuser);
  575. virtual bool restoreWorkUnit(const char *base, const char *wuid, bool restoreAssociated);
  576. virtual int setTracingLevel(int newlevel);
  577. virtual IWorkUnit * createNamedWorkUnit(const char * wuid, const char * app, const char *scope, ISecManager *secmgr, ISecUser *secuser);
  578. virtual IWorkUnit * getGlobalWorkUnit(ISecManager *secmgr, ISecUser *secuser) = 0;
  579. virtual IConstWorkUnitIterator * getWorkUnitsByOwner(const char * owner, ISecManager *secmgr, ISecUser *secuser) = 0;
  580. virtual IConstWorkUnitIterator* getWorkUnitsSorted(WUSortField sortorder, // field to sort by
  581. WUSortField *filters, // NULL or list of fields to filter on (terminated by WUSFterm)
  582. const void *filterbuf, // (appended) string values for filters
  583. unsigned startoffset,
  584. unsigned maxnum,
  585. __int64 *cachehint,
  586. unsigned *total,
  587. ISecManager *secmgr,
  588. ISecUser *secuser) = 0;
  589. virtual unsigned numWorkUnits() = 0;
  590. virtual IConstWorkUnitIterator *getScheduledWorkUnits(ISecManager *secmgr = NULL, ISecUser *secuser = NULL) = 0;
  591. virtual void descheduleAllWorkUnits(ISecManager *secmgr, ISecUser *secuser);
  592. virtual IConstQuerySetQueryIterator * getQuerySetQueriesSorted(WUQuerySortField *sortorder, WUQuerySortField *filters, const void *filterbuf, unsigned startoffset, unsigned maxnum, __int64 *cachehint, unsigned *total, const MapStringTo<bool> *subset);
  593. virtual bool isAborting(const char *wuid) const;
  594. virtual void clearAborting(const char *wuid);
  595. virtual WUState waitForWorkUnit(const char * wuid, unsigned timeout, bool compiled, bool returnOnWaitState) = 0;
  596. virtual StringArray &getUniqueValues(WUSortField field, const char *prefix, StringArray &result) const
  597. {
  598. return result; // Default behaviour if we can't implement efficiently is to return empty list
  599. }
  600. protected:
  601. void reportAbnormalTermination(const char *wuid, WUState &state, SessionId agent);
  602. // These need to be implemented by the derived classes
  603. virtual CLocalWorkUnit* _createWorkUnit(const char *wuid, ISecManager *secmgr, ISecUser *secuser) = 0;
  604. virtual CLocalWorkUnit* _openWorkUnit(const char *wuid, ISecManager *secmgr, ISecUser *secuser) = 0; // for read access
  605. virtual CLocalWorkUnit* _updateWorkUnit(const char *wuid, ISecManager *secmgr, ISecUser *secuser) = 0; // for write access
  606. virtual bool _restoreWorkUnit(IPTree *pt, const char *wuid) = 0;
  607. };
  608. class WORKUNIT_API CLocalWUGraph : implements IConstWUGraph, public CInterface
  609. {
  610. const CLocalWorkUnit &owner;
  611. Owned<IPropertyTree> p;
  612. mutable Owned<IPropertyTree> graph; // cached copy of graph xgmml
  613. unsigned wuidVersion;
  614. void mergeProgress(IPropertyTree &tree, IPropertyTree &progressTree, const unsigned &progressV) const;
  615. public:
  616. IMPLEMENT_IINTERFACE;
  617. CLocalWUGraph(const CLocalWorkUnit &owner, IPropertyTree *p);
  618. virtual IStringVal & getXGMML(IStringVal & ret, bool mergeProgress) const override;
  619. virtual IStringVal & getName(IStringVal & ret) const override;
  620. virtual IStringVal & getLabel(IStringVal & ret) const override;
  621. virtual IStringVal & getTypeName(IStringVal & ret) const override;
  622. virtual WUGraphType getType() const override;
  623. virtual WUGraphState getState() const override;
  624. virtual unsigned getWfid() const override;
  625. virtual IPropertyTree * getXGMMLTree(bool mergeProgress) const override;
  626. virtual IPropertyTree * getXGMMLTreeRaw() const override;
  627. void setName(const char *str);
  628. void setLabel(const char *str);
  629. void setType(WUGraphType type);
  630. void setWfid(unsigned wfid);
  631. void setXGMML(const char *str);
  632. void setXGMMLTree(IPropertyTree * tree);
  633. };
  634. class WORKUNIT_API CWuGraphStats : public CInterfaceOf<IWUGraphStats>
  635. {
  636. public:
  637. CWuGraphStats(IPropertyTree *_progress, StatisticCreatorType _creatorType, const char * _creator, unsigned wfid, const char * _rootScope, unsigned _id);
  638. virtual void beforeDispose();
  639. virtual IStatisticGatherer & queryStatsBuilder();
  640. protected:
  641. Owned<IPropertyTree> progress;
  642. Owned<IStatisticGatherer> collector;
  643. StringAttr creator;
  644. StatisticCreatorType creatorType;
  645. unsigned id;
  646. };
  647. class WORKUNIT_API CWorkUnitWatcher : public CInterface, implements IWorkUnitWatcher, implements ISDSSubscription
  648. {
  649. protected:
  650. CriticalSection crit;
  651. IWorkUnitSubscriber *subscriber; // not linked - it will generally link me
  652. SubscriptionId abortId, stateId, actionId;
  653. public:
  654. IMPLEMENT_IINTERFACE;
  655. CWorkUnitWatcher(IWorkUnitSubscriber *_subscriber, WUSubscribeOptions flags, const char *wuid);
  656. ~CWorkUnitWatcher();
  657. void unsubscribe();
  658. void notify(SubscriptionId id, const char *xpath, SDSNotifyFlags flags, unsigned valueLen, const void *valueData);
  659. };
  660. class WorkUnitWaiter : public CInterface, implements IAbortHandler, implements IWorkUnitSubscriber
  661. {
  662. Semaphore changed;
  663. Owned<IWorkUnitWatcher> watcher;
  664. bool aborted;
  665. public:
  666. IMPLEMENT_IINTERFACE;
  667. WorkUnitWaiter(const char *wuid, WUSubscribeOptions watchFor)
  668. {
  669. Owned<IWorkUnitFactory> factory = getWorkUnitFactory();
  670. watcher.setown(factory->getWatcher(this, watchFor, wuid));
  671. aborted = false;
  672. }
  673. ~WorkUnitWaiter()
  674. {
  675. unsubscribe();
  676. }
  677. bool isAborted() const { return aborted; }
  678. bool wait(unsigned timeout)
  679. {
  680. return changed.wait(timeout) && !aborted;
  681. }
  682. void unsubscribe()
  683. {
  684. if (watcher)
  685. {
  686. watcher->unsubscribe();
  687. watcher.clear();
  688. }
  689. }
  690. // IWorkUnitSubscriber
  691. virtual void notify(WUSubscribeOptions flags, unsigned valueLen, const void *valueData) override
  692. {
  693. changed.signal();
  694. }
  695. // IAbortHandler
  696. virtual bool onAbort() override
  697. {
  698. aborted = true;
  699. changed.signal();
  700. return false;
  701. }
  702. };
  703. #define PROGRESS_FORMAT_V 2
  704. extern WORKUNIT_API IConstWUGraphProgress *createConstGraphProgress(const char *_wuid, const char *_graphName, IPropertyTree *_progress);
  705. extern WORKUNIT_API IPropertyTree * pruneBranch(IPropertyTree * from, char const * xpath);
  706. #endif