ws_workunitsService.hpp 18 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385
  1. /*##############################################################################
  2. HPCC SYSTEMS software Copyright (C) 2012 HPCC Systems.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. ############################################################################## */
  13. #ifndef _ESPWIZ_ws_workunits_HPP__
  14. #define _ESPWIZ_ws_workunits_HPP__
  15. #include "ws_workunits_esp.ipp"
  16. #include "workunit.hpp"
  17. #include "ws_workunitsHelpers.hpp"
  18. #include "dasds.hpp"
  19. #ifdef _USE_ZLIB
  20. #include "zcrypt.hpp"
  21. #endif
  22. #define UFO_DIRTY 0x01
  23. #define UFO_RELOAD_TARGETS_CHANGED_PMID 0x02
  24. #define UFO_RELOAD_MAPPED_QUERIES 0x04
  25. #define UFO_REMOVE_QUERIES_NOT_IN_QUERYSET 0x08
  26. class QueryFilesInUse : public CInterface, implements ISDSSubscription
  27. {
  28. mutable CriticalSection crit;
  29. MapStringTo<IUserDescriptor *> roxieUserMap;
  30. IArrayOf<IUserDescriptor> roxieUsers;
  31. Owned<IPropertyTree> tree;
  32. SubscriptionId qsChange;
  33. SubscriptionId pmChange;
  34. SubscriptionId psChange;
  35. mutable CriticalSection dirtyCrit; //if there were an atomic_or I would just use atomic
  36. unsigned dirty;
  37. bool aborting;
  38. private:
  39. void loadTarget(IPropertyTree *tree, const char *target, unsigned flags);
  40. void loadTargets(IPropertyTree *tree, unsigned flags);
  41. void load(unsigned flags)
  42. {
  43. Owned<IPropertyTree> t = createPTreeFromIPT(tree);
  44. loadTargets(t, flags);
  45. tree.setown(t.getClear());
  46. }
  47. void updateUsers()
  48. {
  49. Owned<IStringIterator> clusters = getTargetClusters("RoxieCluster", NULL);
  50. ForEach(*clusters)
  51. {
  52. SCMStringBuffer target;
  53. clusters->str(target);
  54. Owned<IConstWUClusterInfo> info = getTargetClusterInfo(target.str());
  55. Owned<IUserDescriptor> user = createUserDescriptor();
  56. user->set(info->getLdapUser(), info->getLdapPassword());
  57. roxieUserMap.setValue(target.str(), user);
  58. roxieUsers.append(*user.getClear());
  59. }
  60. }
  61. public:
  62. IMPLEMENT_IINTERFACE;
  63. QueryFilesInUse() : aborting(false), qsChange(0), pmChange(0), psChange(0), dirty(UFO_DIRTY)
  64. {
  65. tree.setown(createPTree("QueryFilesInUse"));
  66. updateUsers();
  67. }
  68. virtual void notify(SubscriptionId subid, const char *xpath, SDSNotifyFlags flags, unsigned valueLen, const void *valueData)
  69. {
  70. Linked<QueryFilesInUse> me = this; // Ensure that I am not released by the notify call (which would then access freed memory to release the critsec)
  71. CriticalBlock b(dirtyCrit);
  72. if (subid == qsChange)
  73. dirty |= UFO_REMOVE_QUERIES_NOT_IN_QUERYSET;
  74. else if (subid == pmChange)
  75. dirty |= UFO_RELOAD_MAPPED_QUERIES;
  76. else if (subid == psChange)
  77. dirty |= UFO_RELOAD_TARGETS_CHANGED_PMID;
  78. }
  79. virtual void subscribe()
  80. {
  81. CriticalBlock b(crit);
  82. try
  83. {
  84. qsChange = querySDS().subscribe("QuerySets", *this, true);
  85. pmChange = querySDS().subscribe("PackageMaps", *this, true);
  86. psChange = querySDS().subscribe("PackageSets", *this, true);
  87. }
  88. catch (IException *E)
  89. {
  90. //TBD failure to subscribe implies dali is down...
  91. E->Release();
  92. }
  93. }
  94. virtual void unsubscribe()
  95. {
  96. CriticalBlock b(crit);
  97. try
  98. {
  99. if (qsChange)
  100. querySDS().unsubscribe(qsChange);
  101. if (pmChange)
  102. querySDS().unsubscribe(pmChange);
  103. if (psChange)
  104. querySDS().unsubscribe(psChange);
  105. }
  106. catch (IException *E)
  107. {
  108. E->Release();
  109. }
  110. qsChange = 0;
  111. pmChange = 0;
  112. psChange = 0;
  113. }
  114. void abort()
  115. {
  116. aborting=true;
  117. CriticalBlock b(crit);
  118. }
  119. IPropertyTree *getTree()
  120. {
  121. CriticalBlock b(crit);
  122. unsigned flags;
  123. {
  124. CriticalBlock b(dirtyCrit);
  125. flags = dirty;
  126. dirty = 0;
  127. }
  128. if (flags)
  129. load(flags);
  130. return LINK(tree);
  131. }
  132. IPropertyTreeIterator *findAllQueriesUsingFile(const char *lfn);
  133. IPropertyTreeIterator *findQueriesUsingFile(const char *target, const char *lfn, StringAttr &pmid);
  134. StringBuffer &toStr(StringBuffer &s)
  135. {
  136. Owned<IPropertyTree> t = getTree();
  137. return toXML(t, s);
  138. }
  139. };
  140. class CWsWorkunitsEx : public CWsWorkunits
  141. {
  142. public:
  143. IMPLEMENT_IINTERFACE;
  144. CWsWorkunitsEx() : maxRequestEntityLength(0) {port=8010;}
  145. virtual ~CWsWorkunitsEx()
  146. {
  147. filesInUse.unsubscribe();
  148. filesInUse.abort();
  149. clusterQueryStatePool.clear();
  150. };
  151. virtual void init(IPropertyTree *cfg, const char *process, const char *service);
  152. virtual void setContainer(IEspContainer * container)
  153. {
  154. CWsWorkunits::setContainer(container);
  155. m_sched.setContainer(container);
  156. }
  157. void refreshValidClusters();
  158. bool isValidCluster(const char *cluster);
  159. void deploySharedObjectReq(IEspContext &context, IEspWUDeployWorkunitRequest & req, IEspWUDeployWorkunitResponse & resp, const char *dir, const char *xml=NULL);
  160. unsigned getGraphIdsByQueryId(const char *target, const char *queryId, StringArray& graphIds);
  161. bool getQueryFiles(const char* query, const char* target, StringArray& logicalFiles, IArrayOf<IEspQuerySuperFile> *superFiles);
  162. void getGraphsByQueryId(const char *target, const char *queryId, const char *graphName, const char *subGraphId, IArrayOf<IEspECLGraphEx>& ECLGraphs);
  163. void checkAndSetClusterQueryState(IEspContext &context, const char* cluster, const char* querySetId, IArrayOf<IEspQuerySetQuery>& queries);
  164. void checkAndSetClusterQueryState(IEspContext &context, const char* cluster, StringArray& querySetIds, IArrayOf<IEspQuerySetQuery>& queries);
  165. bool onWUQuery(IEspContext &context, IEspWUQueryRequest &req, IEspWUQueryResponse &resp);
  166. bool onWUPublishWorkunit(IEspContext &context, IEspWUPublishWorkunitRequest & req, IEspWUPublishWorkunitResponse & resp);
  167. bool onWUQuerysets(IEspContext &context, IEspWUQuerysetsRequest & req, IEspWUQuerysetsResponse & resp);
  168. bool onWUQuerysetDetails(IEspContext &context, IEspWUQuerySetDetailsRequest & req, IEspWUQuerySetDetailsResponse & resp);
  169. bool onWUMultiQuerysetDetails(IEspContext &context, IEspWUMultiQuerySetDetailsRequest &req, IEspWUMultiQuerySetDetailsResponse &resp);
  170. bool onWUQuerysetQueryAction(IEspContext &context, IEspWUQuerySetQueryActionRequest & req, IEspWUQuerySetQueryActionResponse & resp);
  171. bool onWUQuerysetAliasAction(IEspContext &context, IEspWUQuerySetAliasActionRequest &req, IEspWUQuerySetAliasActionResponse &resp);
  172. bool onWUQueryConfig(IEspContext &context, IEspWUQueryConfigRequest &req, IEspWUQueryConfigResponse &resp);
  173. bool onWUQuerysetCopyQuery(IEspContext &context, IEspWUQuerySetCopyQueryRequest &req, IEspWUQuerySetCopyQueryResponse &resp);
  174. bool onWUCopyQuerySet(IEspContext &context, IEspWUCopyQuerySetRequest &req, IEspWUCopyQuerySetResponse &resp);
  175. bool onWUCopyLogicalFiles(IEspContext &context, IEspWUCopyLogicalFilesRequest &req, IEspWUCopyLogicalFilesResponse &resp);
  176. bool onWUQueryDetails(IEspContext &context, IEspWUQueryDetailsRequest & req, IEspWUQueryDetailsResponse & resp);
  177. bool onWUListQueries(IEspContext &context, IEspWUListQueriesRequest &req, IEspWUListQueriesResponse &resp);
  178. bool onWUListQueriesUsingFile(IEspContext &context, IEspWUListQueriesUsingFileRequest &req, IEspWUListQueriesUsingFileResponse &resp);
  179. bool onWUQueryFiles(IEspContext &context, IEspWUQueryFilesRequest &req, IEspWUQueryFilesResponse &resp);
  180. bool onWUInfo(IEspContext &context, IEspWUInfoRequest &req, IEspWUInfoResponse &resp);
  181. bool onWUInfoDetails(IEspContext &context, IEspWUInfoRequest &req, IEspWUInfoResponse &resp);
  182. bool onWUFile(IEspContext &context,IEspWULogFileRequest &req, IEspWULogFileResponse &resp);
  183. bool onWUResult(IEspContext &context,IEspWUResultRequest &req, IEspWUResultResponse &resp);
  184. bool onWUResultView(IEspContext &context, IEspWUResultViewRequest &req, IEspWUResultViewResponse &resp);
  185. bool onWUResultSummary(IEspContext &context, IEspWUResultSummaryRequest &req, IEspWUResultSummaryResponse &resp);
  186. bool onWUResultBin(IEspContext &context, IEspWUResultBinRequest &req, IEspWUResultBinResponse &resp);
  187. bool onWUGraphInfo(IEspContext &context,IEspWUGraphInfoRequest &req, IEspWUGraphInfoResponse &resp);
  188. bool onWUGVCGraphInfo(IEspContext &context,IEspWUGVCGraphInfoRequest &req, IEspWUGVCGraphInfoResponse &resp);
  189. bool onWUProcessGraph(IEspContext &context,IEspWUProcessGraphRequest &req, IEspWUProcessGraphResponse &resp);
  190. bool onGVCAjaxGraph(IEspContext &context, IEspGVCAjaxGraphRequest &req, IEspGVCAjaxGraphResponse &resp);
  191. bool onWUAction(IEspContext &context, IEspWUActionRequest &req, IEspWUActionResponse &resp);
  192. bool onWUShowScheduled(IEspContext &context, IEspWUShowScheduledRequest &req, IEspWUShowScheduledResponse &resp);
  193. bool onWUUpdate(IEspContext &context, IEspWUUpdateRequest &req, IEspWUUpdateResponse &resp);
  194. bool onWUDelete(IEspContext &context, IEspWUDeleteRequest &req, IEspWUDeleteResponse &resp);
  195. bool onWUProtect(IEspContext &context, IEspWUProtectRequest &req, IEspWUProtectResponse &resp);
  196. bool onWUAbort(IEspContext &context, IEspWUAbortRequest &req, IEspWUAbortResponse &resp);
  197. bool onWUSchedule(IEspContext &context, IEspWUScheduleRequest &req, IEspWUScheduleResponse &resp);
  198. bool onWUSubmit(IEspContext &context, IEspWUSubmitRequest &req, IEspWUSubmitResponse &resp);
  199. bool onWURun(IEspContext &context, IEspWURunRequest &req, IEspWURunResponse &resp);
  200. bool onWUCreate(IEspContext &context, IEspWUCreateRequest &req, IEspWUCreateResponse &resp);
  201. bool onWUCreateAndUpdate(IEspContext &context, IEspWUUpdateRequest &req, IEspWUUpdateResponse &resp);
  202. bool onWUResubmit(IEspContext &context, IEspWUResubmitRequest &req, IEspWUResubmitResponse &resp);
  203. bool onWUPushEvent(IEspContext &context, IEspWUPushEventRequest &req, IEspWUPushEventResponse &resp);
  204. bool onWUExport(IEspContext &context, IEspWUExportRequest &req, IEspWUExportResponse &resp);
  205. bool onWUWaitCompiled(IEspContext &context, IEspWUWaitRequest &req, IEspWUWaitResponse &resp);
  206. bool onWUWaitComplete(IEspContext &context, IEspWUWaitRequest &req, IEspWUWaitResponse &resp);
  207. bool onWUSyntaxCheckECL(IEspContext &context, IEspWUSyntaxCheckRequest &req, IEspWUSyntaxCheckResponse &resp);
  208. bool onWUCompileECL(IEspContext &context, IEspWUCompileECLRequest &req, IEspWUCompileECLResponse &resp);
  209. bool onWUJobList(IEspContext &context, IEspWUJobListRequest &req, IEspWUJobListResponse &resp);
  210. bool onWUQueryGetGraph(IEspContext& context, IEspWUQueryGetGraphRequest& req, IEspWUQueryGetGraphResponse& resp);
  211. bool onWUGetGraph(IEspContext& context, IEspWUGetGraphRequest& req, IEspWUGetGraphResponse& resp);
  212. bool onWUGraphTiming(IEspContext& context, IEspWUGraphTimingRequest& req, IEspWUGraphTimingResponse& resp);
  213. bool onWUGetDependancyTrees(IEspContext& context, IEspWUGetDependancyTreesRequest& req, IEspWUGetDependancyTreesResponse& resp);
  214. bool onWUListLocalFileRequired(IEspContext& context, IEspWUListLocalFileRequiredRequest& req, IEspWUListLocalFileRequiredResponse& resp);
  215. bool onWUAddLocalFileToWorkunit(IEspContext& context, IEspWUAddLocalFileToWorkunitRequest& req, IEspWUAddLocalFileToWorkunitResponse& resp);
  216. bool onWUClusterJobQueueXLS(IEspContext &context, IEspWUClusterJobQueueXLSRequest &req, IEspWUClusterJobQueueXLSResponse &resp);
  217. bool onWUClusterJobQueueLOG(IEspContext &context,IEspWUClusterJobQueueLOGRequest &req, IEspWUClusterJobQueueLOGResponse &resp);
  218. bool onWUClusterJobXLS(IEspContext &context, IEspWUClusterJobXLSRequest &req, IEspWUClusterJobXLSResponse &resp);
  219. bool onWUClusterJobSummaryXLS(IEspContext &context, IEspWUClusterJobSummaryXLSRequest &req, IEspWUClusterJobSummaryXLSResponse &resp);
  220. bool onWUCDebug(IEspContext &context, IEspWUDebugRequest &req, IEspWUDebugResponse &resp);
  221. bool onWUDeployWorkunit(IEspContext &context, IEspWUDeployWorkunitRequest & req, IEspWUDeployWorkunitResponse & resp);
  222. void setPort(unsigned short _port){port=_port;}
  223. bool isQuerySuspended(const char* query, IConstWUClusterInfo *clusterInfo, unsigned wait, StringBuffer& errorMessage);
  224. bool onWUCreateZAPInfo(IEspContext &context, IEspWUCreateZAPInfoRequest &req, IEspWUCreateZAPInfoResponse &resp);
  225. bool onWUGetZAPInfo(IEspContext &context, IEspWUGetZAPInfoRequest &req, IEspWUGetZAPInfoResponse &resp);
  226. bool onWUCheckFeatures(IEspContext &context, IEspWUCheckFeaturesRequest &req, IEspWUCheckFeaturesResponse &resp);
  227. private:
  228. void addProcessLogfile(Owned<IConstWorkUnit> &cwu, WsWuInfo &winfo, const char * process, const char* path);
  229. void createZAPWUInfoFile(IEspWUCreateZAPInfoRequest &req, Owned<IConstWorkUnit>& cwu, const char* pathNameStr);
  230. void createZAPWUXMLFile(WsWuInfo &winfo, const char* pathNameStr);
  231. void createZAPECLQueryArchiveFiles(Owned<IConstWorkUnit>& cwu, const char* pathNameStr);
  232. void createZAPFile(const char* fileName, size32_t len, const void* data);
  233. void cleanZAPFolder(IFile* zipDir, bool removeFolder);
  234. IPropertyTree* sendControlQuery(IEspContext &context, const char* target, const char* query, unsigned timeout);
  235. bool resetQueryStats(IEspContext &context, const char* target, IProperties* queryIds, IEspWUQuerySetQueryActionResponse& resp);
  236. unsigned awusCacheMinutes;
  237. StringBuffer queryDirectory;
  238. StringAttr daliServers;
  239. Owned<DataCache> dataCache;
  240. Owned<ArchivedWuCache> archivedWuCache;
  241. StringAttr sashaServerIp;
  242. unsigned short sashaServerPort;
  243. BoolHash validClusters;
  244. CriticalSection crit;
  245. WUSchedule m_sched;
  246. unsigned short port;
  247. Owned<IPropertyTree> directories;
  248. int maxRequestEntityLength;
  249. Owned<IThreadPool> clusterQueryStatePool;
  250. public:
  251. QueryFilesInUse filesInUse;
  252. };
  253. class CWsWorkunitsSoapBindingEx : public CWsWorkunitsSoapBinding
  254. {
  255. public:
  256. CWsWorkunitsSoapBindingEx(IPropertyTree *cfg, const char *name, const char *process, http_soap_log_level llevel) : CWsWorkunitsSoapBinding(cfg, name, process, llevel)
  257. {
  258. wswService = NULL;
  259. VStringBuffer xpath("Software/EspProcess[@name=\"%s\"]/EspBinding[@name=\"%s\"]/BatchWatch", process, name);
  260. batchWatchFeaturesOnly = cfg->getPropBool(xpath.str(), false);
  261. }
  262. virtual void getNavigationData(IEspContext &context, IPropertyTree & data)
  263. {
  264. if (!batchWatchFeaturesOnly)
  265. {
  266. IPropertyTree *folder = ensureNavFolder(data, "ECL", "Run Ecl code and review Ecl workunits", NULL, false, 2);
  267. ensureNavLink(*folder, "Search Workunits", "/WsWorkunits/WUQuery?form_", "Search Workunits", NULL, NULL, 1);
  268. ensureNavLink(*folder, "Browse Workunits", "/WsWorkunits/WUQuery", "Browse Workunits", NULL, NULL, 2);
  269. ensureNavLink(*folder, "ECL Playground", "/esp/files/stub.htm?Widget=ECLPlaygroundWidget", "ECL Editor, Executor, Graph and Result Viewer", NULL, NULL, 4);
  270. IPropertyTree *folderQueryset = ensureNavFolder(data, "Queries", NULL, NULL, false, 3);
  271. ensureNavLink(*folderQueryset, "Browse", "/WsWorkunits/WUQuerySets", "Browse Published Queries");
  272. }
  273. }
  274. int onGetForm(IEspContext &context, CHttpRequest* request, CHttpResponse* response, const char *service, const char *method);
  275. int onGet(CHttpRequest* request, CHttpResponse* response);
  276. virtual void addService(const char * name, const char * host, unsigned short port, IEspService & service)
  277. {
  278. wswService = dynamic_cast<CWsWorkunitsEx*>(&service);
  279. if (wswService)
  280. wswService->setPort(port);
  281. CWsWorkunitsSoapBinding::addService(name, host, port, service);
  282. }
  283. private:
  284. bool batchWatchFeaturesOnly;
  285. CWsWorkunitsEx *wswService;
  286. };
  287. void deploySharedObject(IEspContext &context, StringBuffer &wuid, const char *filename, const char *cluster, const char *name, const MemoryBuffer &obj, const char *dir, const char *xml=NULL);
  288. class CClusterQueryStateParam : public CInterface
  289. {
  290. Linked<CWsWorkunitsEx> wsWorkunitsService;
  291. IEspContext& context;
  292. StringAttr cluster;
  293. StringAttr querySetId;
  294. IArrayOf<IEspQuerySetQuery>& queries;
  295. public:
  296. IMPLEMENT_IINTERFACE;
  297. CClusterQueryStateParam(CWsWorkunitsEx* _service, IEspContext& _context, const char* _cluster, const char* _querySetId, IArrayOf<IEspQuerySetQuery>& _queries )
  298. : wsWorkunitsService(_service), context(_context), cluster(_cluster), querySetId(_querySetId), queries(_queries)
  299. {
  300. }
  301. virtual void doWork()
  302. {
  303. wsWorkunitsService->checkAndSetClusterQueryState(context, cluster.get(), querySetId.get(), queries);
  304. }
  305. };
  306. class CClusterQueryStateThreadFactory : public CInterface, public IThreadFactory
  307. {
  308. class CClusterQueryStateThread : public CInterface, implements IPooledThread
  309. {
  310. Owned<CClusterQueryStateParam> param;
  311. public:
  312. IMPLEMENT_IINTERFACE;
  313. void init(void *_param)
  314. {
  315. param.setown((CClusterQueryStateParam *)_param);
  316. }
  317. void main()
  318. {
  319. param->doWork();
  320. param.clear();
  321. }
  322. bool canReuse()
  323. {
  324. return true;
  325. }
  326. bool stop()
  327. {
  328. return true;
  329. }
  330. };
  331. public:
  332. IMPLEMENT_IINTERFACE;
  333. IPooledThread *createNew()
  334. {
  335. return new CClusterQueryStateThread();
  336. }
  337. };
  338. #endif