ws_workunitsService.hpp 29 KB


  1. /*##############################################################################
  2. HPCC SYSTEMS software Copyright (C) 2012 HPCC Systems®.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. ############################################################################## */
  13. #ifndef _ESPWIZ_ws_workunits_HPP__
  14. #define _ESPWIZ_ws_workunits_HPP__
  15. #include "ws_workunits_esp.ipp"
  16. #include "workunit.hpp"
  17. #include "ws_workunitsHelpers.hpp"
  18. #include "dasds.hpp"
  19. #include "environment.hpp"
  20. #ifdef _USE_ZLIB
  21. #include "zcrypt.hpp"
  22. #endif
  23. #include "referencedfilelist.hpp"
  24. #include "ws_wuresult.hpp"
  25. #define UFO_DIRTY 0x01
  26. #define UFO_RELOAD_TARGETS_CHANGED_PMID 0x02
  27. #define UFO_RELOAD_MAPPED_QUERIES 0x04
  28. #define UFO_REMOVE_QUERIES_NOT_IN_QUERYSET 0x08
  29. static const __uint64 defaultWUResultMaxSize = 10000000; //10M
  30. class QueryFilesInUse : public CInterface, implements ISDSSubscription
  31. {
  32. mutable CriticalSection crit;
  33. MapStringTo<IUserDescriptor *> roxieUserMap;
  34. IArrayOf<IUserDescriptor> roxieUsers;
  35. Owned<IPropertyTree> tree;
  36. SubscriptionId qsChange;
  37. SubscriptionId pmChange;
  38. SubscriptionId psChange;
  39. mutable CriticalSection dirtyCrit; //if there were an atomic_or I would just use atomic
  40. unsigned dirty;
  41. bool aborting;
  42. private:
  43. void loadTarget(IPropertyTree *tree, const char *target, unsigned flags);
  44. void loadTargets(IPropertyTree *tree, unsigned flags);
  45. void load(unsigned flags)
  46. {
  47. Owned<IPropertyTree> t = createPTreeFromIPT(tree);
  48. loadTargets(t, flags);
  49. tree.setown(t.getClear());
  50. }
  51. void updateUsers()
  52. {
  53. Owned<IStringIterator> clusters = getTargetClusters("RoxieCluster", NULL);
  54. ForEach(*clusters)
  55. {
  56. SCMStringBuffer target;
  57. clusters->str(target);
  58. Owned<IConstWUClusterInfo> info = getTargetClusterInfo(target.str());
  59. Owned<IUserDescriptor> user = createUserDescriptor();
  60. user->set(info->getLdapUser(), info->getLdapPassword());
  61. roxieUserMap.setValue(target.str(), user);
  62. roxieUsers.append(*user.getClear());
  63. }
  64. }
  65. public:
  66. IMPLEMENT_IINTERFACE;
  67. QueryFilesInUse() : aborting(false), qsChange(0), pmChange(0), psChange(0), dirty(UFO_DIRTY)
  68. {
  69. tree.setown(createPTree("QueryFilesInUse"));
  70. updateUsers();
  71. }
  72. virtual void notify(SubscriptionId subid, const char *xpath, SDSNotifyFlags flags, unsigned valueLen, const void *valueData)
  73. {
  74. Linked<QueryFilesInUse> me = this; // Ensure that I am not released by the notify call (which would then access freed memory to release the critsec)
  75. CriticalBlock b(dirtyCrit);
  76. if (subid == qsChange)
  77. dirty |= UFO_REMOVE_QUERIES_NOT_IN_QUERYSET;
  78. else if (subid == pmChange)
  79. dirty |= UFO_RELOAD_MAPPED_QUERIES;
  80. else if (subid == psChange)
  81. dirty |= UFO_RELOAD_TARGETS_CHANGED_PMID;
  82. PROGLOG("QueryFilesInUse.notify() called: <%d>", dirty);
  83. }
  84. virtual bool subscribe()
  85. {
  86. CriticalBlock b(crit);
  87. bool success = true;
  88. try
  89. {
  90. qsChange = querySDS().subscribe("QuerySets", *this, true);
  91. pmChange = querySDS().subscribe("PackageMaps", *this, true);
  92. psChange = querySDS().subscribe("PackageSets", *this, true);
  93. PROGLOG("QueryFilesInUse.subscribe() called: QuerySets PackageMaps PackageSets");
  94. }
  95. catch (IException *E)
  96. {
  97. success = false;
  98. //TBD failure to subscribe implies dali is down...
  99. E->Release();
  100. }
  101. return success && qsChange != 0 && pmChange != 0 && psChange != 0;
  102. }
  103. virtual bool unsubscribe()
  104. {
  105. CriticalBlock b(crit);
  106. bool success = true;
  107. try
  108. {
  109. if (qsChange)
  110. querySDS().unsubscribe(qsChange);
  111. if (pmChange)
  112. querySDS().unsubscribe(pmChange);
  113. if (psChange)
  114. querySDS().unsubscribe(psChange);
  115. }
  116. catch (IException *E)
  117. {
  118. success = false;
  119. E->Release();
  120. }
  121. qsChange = 0;
  122. pmChange = 0;
  123. psChange = 0;
  124. PROGLOG("QueryFilesInUse.unsubscribe() called");
  125. return success && qsChange == 0 && pmChange == 0 && psChange == 0;
  126. }
  127. void abort()
  128. {
  129. aborting=true;
  130. CriticalBlock b(crit);
  131. }
  132. IPropertyTree *getTree()
  133. {
  134. CriticalBlock b(crit);
  135. unsigned flags;
  136. {
  137. CriticalBlock b(dirtyCrit);
  138. flags = dirty;
  139. dirty = 0;
  140. }
  141. if (flags)
  142. load(flags);
  143. return LINK(tree);
  144. }
  145. IPropertyTreeIterator *findAllQueriesUsingFile(const char *lfn);
  146. IPropertyTreeIterator *findQueriesUsingFile(const char *target, const char *lfn, StringAttr &pmid);
  147. StringBuffer &toStr(StringBuffer &s)
  148. {
  149. Owned<IPropertyTree> t = getTree();
  150. return toXML(t, s);
  151. }
  152. };
  153. struct WUShowScheduledFilters
  154. {
  155. StringAttr cluster, state, eventName, jobName, owner, eventText;
  156. WUShowScheduledFilters(const char *_cluster, const char *_state, const char *_owner,
  157. const char *_jobName, const char *_eventName, const char *_eventText)
  158. : cluster(_cluster), state(_state), owner(_owner),
  159. jobName(_jobName), eventName(_eventName), eventText(_eventText) {};
  160. };
  161. class CWUQueryDetailsReq
  162. {
  163. StringAttr querySet, queryIdOrAlias;
  164. bool includeWUDetails = false;
  165. bool IncludeWUQueryFiles = false;
  166. bool includeSuperFiles = false;
  167. bool includeWsEclAddresses = false;
  168. bool includeStateOnClusters = false;
  169. bool checkAllNodes = false;
  170. public:
  171. CWUQueryDetailsReq(IEspWUQueryDetailsRequest &req);
  172. CWUQueryDetailsReq(IEspWUQueryDetailsLightWeightRequest &req);
  173. const char *getQuerySet() const { return querySet.get(); }
  174. const char *getQueryIdOrAlias() const { return queryIdOrAlias.get(); }
  175. const bool getIncludeWUDetails() const { return includeWUDetails; }
  176. const bool getIncludeWUQueryFiles() const { return IncludeWUQueryFiles; }
  177. const bool getIncludeSuperFiles() const { return includeSuperFiles; }
  178. const bool getIncludeWsEclAddresses() const { return includeWsEclAddresses; }
  179. const bool getIncludeStateOnClusters() const { return includeStateOnClusters; }
  180. const bool getCheckAllNodes() const { return checkAllNodes; }
  181. };
  182. class CWsWorkunitsEx : public CWsWorkunits
  183. {
  184. public:
  185. IMPLEMENT_IINTERFACE;
  186. CWsWorkunitsEx() : maxRequestEntityLength(0) {port=8010;}
  187. virtual ~CWsWorkunitsEx()
  188. {
  189. filesInUse.unsubscribe();
  190. filesInUse.abort();
  191. clusterQueryStatePool.clear();
  192. };
  193. virtual void init(IPropertyTree *cfg, const char *process, const char *service);
  194. virtual void setContainer(IEspContainer * container)
  195. {
  196. CWsWorkunits::setContainer(container);
  197. m_sched.setContainer(container);
  198. }
  199. void refreshValidClusters();
  200. bool isValidCluster(const char *cluster);
  201. void deploySharedObjectReq(IEspContext &context, IEspWUDeployWorkunitRequest & req, IEspWUDeployWorkunitResponse & resp, const char *dir, const char *xml=NULL);
  202. unsigned getGraphIdsByQueryId(const char *target, const char *queryId, StringArray& graphIds);
  203. bool getQueryFiles(IEspContext &context, const char* wuid, const char* query, const char* target, StringArray& logicalFiles, IArrayOf<IEspQuerySuperFile> *superFiles);
  204. void getGraphsByQueryId(const char *target, const char *queryId, const char *graphName, const char *subGraphId, IArrayOf<IEspECLGraphEx>& ECLGraphs);
  205. void checkAndSetClusterQueryState(IEspContext &context, const char* cluster, const char* querySetId, IArrayOf<IEspQuerySetQuery>& queries, bool checkAllNodes);
  206. void checkAndSetClusterQueryState(IEspContext &context, const char* cluster, StringArray& querySetIds, IArrayOf<IEspQuerySetQuery>& queries, bool checkAllNodes);
  207. IWorkUnitFactory *queryWUFactory() { return wuFactory; };
  208. const char *getDataDirectory() const { return dataDirectory.str(); };
  209. bool onWUQuery(IEspContext &context, IEspWUQueryRequest &req, IEspWUQueryResponse &resp);
  210. bool onWULightWeightQuery(IEspContext &context, IEspWULightWeightQueryRequest &req, IEspWULightWeightQueryResponse &resp);
  211. bool onWUPublishWorkunit(IEspContext &context, IEspWUPublishWorkunitRequest & req, IEspWUPublishWorkunitResponse & resp);
  212. bool onWUQuerysets(IEspContext &context, IEspWUQuerysetsRequest & req, IEspWUQuerysetsResponse & resp);
  213. bool onWUQuerysetDetails(IEspContext &context, IEspWUQuerySetDetailsRequest & req, IEspWUQuerySetDetailsResponse & resp);
  214. bool onWUQuerysetExport(IEspContext &context, IEspWUQuerysetExportRequest &req, IEspWUQuerysetExportResponse &resp);
  215. bool onWUQuerysetImport(IEspContext &context, IEspWUQuerysetImportRequest &req, IEspWUQuerysetImportResponse &resp);
  216. bool onWUMultiQuerysetDetails(IEspContext &context, IEspWUMultiQuerySetDetailsRequest &req, IEspWUMultiQuerySetDetailsResponse &resp);
  217. bool onWUQuerysetQueryAction(IEspContext &context, IEspWUQuerySetQueryActionRequest & req, IEspWUQuerySetQueryActionResponse & resp);
  218. bool onWUQuerysetAliasAction(IEspContext &context, IEspWUQuerySetAliasActionRequest &req, IEspWUQuerySetAliasActionResponse &resp);
  219. bool onWUQueryConfig(IEspContext &context, IEspWUQueryConfigRequest &req, IEspWUQueryConfigResponse &resp);
  220. bool onWUQuerysetCopyQuery(IEspContext &context, IEspWUQuerySetCopyQueryRequest &req, IEspWUQuerySetCopyQueryResponse &resp);
  221. bool onWUCopyQuerySet(IEspContext &context, IEspWUCopyQuerySetRequest &req, IEspWUCopyQuerySetResponse &resp);
  222. bool onWUCopyLogicalFiles(IEspContext &context, IEspWUCopyLogicalFilesRequest &req, IEspWUCopyLogicalFilesResponse &resp);
  223. bool onWUQueryDetails(IEspContext &context, IEspWUQueryDetailsRequest & req, IEspWUQueryDetailsResponse & resp);
  224. bool onWUQueryDetailsLightWeight(IEspContext &context, IEspWUQueryDetailsLightWeightRequest & req, IEspWUQueryDetailsResponse & resp);
  225. bool onWUListQueries(IEspContext &context, IEspWUListQueriesRequest &req, IEspWUListQueriesResponse &resp);
  226. bool onWUListQueriesUsingFile(IEspContext &context, IEspWUListQueriesUsingFileRequest &req, IEspWUListQueriesUsingFileResponse &resp);
  227. bool onWUQueryFiles(IEspContext &context, IEspWUQueryFilesRequest &req, IEspWUQueryFilesResponse &resp);
  228. bool onWUUpdateQueryEntry(IEspContext &context, IEspWUUpdateQueryEntryRequest &req, IEspWUUpdateQueryEntryResponse &resp);
  229. bool onWUInfo(IEspContext &context, IEspWUInfoRequest &req, IEspWUInfoResponse &resp);
  230. bool onWUInfoDetails(IEspContext &context, IEspWUInfoRequest &req, IEspWUInfoResponse &resp);
  231. bool onWUFile(IEspContext &context,IEspWULogFileRequest &req, IEspWULogFileResponse &resp);
  232. bool onWUResult(IEspContext &context,IEspWUResultRequest &req, IEspWUResultResponse &resp);
  233. bool onWUFullResult(IEspContext &context, IEspWUFullResultRequest &req, IEspWUFullResultResponse &resp);
  234. bool onWUResultView(IEspContext &context, IEspWUResultViewRequest &req, IEspWUResultViewResponse &resp);
  235. bool onWUResultSummary(IEspContext &context, IEspWUResultSummaryRequest &req, IEspWUResultSummaryResponse &resp);
  236. bool onWUResultBin(IEspContext &context, IEspWUResultBinRequest &req, IEspWUResultBinResponse &resp);
  237. bool onWUGraphInfo(IEspContext &context,IEspWUGraphInfoRequest &req, IEspWUGraphInfoResponse &resp);
  238. bool onWUGVCGraphInfo(IEspContext &context,IEspWUGVCGraphInfoRequest &req, IEspWUGVCGraphInfoResponse &resp);
  239. bool onWUGetGraphNameAndTypes(IEspContext &context,IEspWUGetGraphNameAndTypesRequest &req, IEspWUGetGraphNameAndTypesResponse &resp);
  240. bool onWUProcessGraph(IEspContext &context,IEspWUProcessGraphRequest &req, IEspWUProcessGraphResponse &resp);
  241. bool onGVCAjaxGraph(IEspContext &context, IEspGVCAjaxGraphRequest &req, IEspGVCAjaxGraphResponse &resp);
  242. bool onWUAction(IEspContext &context, IEspWUActionRequest &req, IEspWUActionResponse &resp);
  243. bool onWUShowScheduled(IEspContext &context, IEspWUShowScheduledRequest &req, IEspWUShowScheduledResponse &resp);
  244. bool onWUUpdate(IEspContext &context, IEspWUUpdateRequest &req, IEspWUUpdateResponse &resp);
  245. bool onWUDelete(IEspContext &context, IEspWUDeleteRequest &req, IEspWUDeleteResponse &resp);
  246. bool onWUProtect(IEspContext &context, IEspWUProtectRequest &req, IEspWUProtectResponse &resp);
  247. bool onWUAbort(IEspContext &context, IEspWUAbortRequest &req, IEspWUAbortResponse &resp);
  248. bool onWUSchedule(IEspContext &context, IEspWUScheduleRequest &req, IEspWUScheduleResponse &resp);
  249. bool onWUSubmit(IEspContext &context, IEspWUSubmitRequest &req, IEspWUSubmitResponse &resp);
  250. bool onWURun(IEspContext &context, IEspWURunRequest &req, IEspWURunResponse &resp);
  251. bool onWUCreate(IEspContext &context, IEspWUCreateRequest &req, IEspWUCreateResponse &resp);
  252. bool onWUCreateAndUpdate(IEspContext &context, IEspWUUpdateRequest &req, IEspWUUpdateResponse &resp);
  253. bool onWURecreateQuery(IEspContext &context, IEspWURecreateQueryRequest &req, IEspWURecreateQueryResponse &resp);
  254. bool onWUResubmit(IEspContext &context, IEspWUResubmitRequest &req, IEspWUResubmitResponse &resp);
  255. bool onWUPushEvent(IEspContext &context, IEspWUPushEventRequest &req, IEspWUPushEventResponse &resp);
  256. bool onWUExport(IEspContext &context, IEspWUExportRequest &req, IEspWUExportResponse &resp);
  257. bool onWUWaitCompiled(IEspContext &context, IEspWUWaitRequest &req, IEspWUWaitResponse &resp);
  258. bool onWUWaitComplete(IEspContext &context, IEspWUWaitRequest &req, IEspWUWaitResponse &resp);
  259. bool onWUSyntaxCheckECL(IEspContext &context, IEspWUSyntaxCheckRequest &req, IEspWUSyntaxCheckResponse &resp);
  260. bool onWUCompileECL(IEspContext &context, IEspWUCompileECLRequest &req, IEspWUCompileECLResponse &resp);
  261. bool onWUJobList(IEspContext &context, IEspWUJobListRequest &req, IEspWUJobListResponse &resp);
  262. bool onWUQueryGetGraph(IEspContext& context, IEspWUQueryGetGraphRequest& req, IEspWUQueryGetGraphResponse& resp);
  263. bool onWUQueryGetSummaryStats(IEspContext& context, IEspWUQueryGetSummaryStatsRequest& req, IEspWUQueryGetSummaryStatsResponse& resp);
  264. bool onWUGetGraph(IEspContext& context, IEspWUGetGraphRequest& req, IEspWUGetGraphResponse& resp);
  265. bool onWUGraphTiming(IEspContext& context, IEspWUGraphTimingRequest& req, IEspWUGraphTimingResponse& resp);
  266. bool onWUGetDependancyTrees(IEspContext& context, IEspWUGetDependancyTreesRequest& req, IEspWUGetDependancyTreesResponse& resp);
  267. bool onWUGetNumFileToCopy(IEspContext &context, IEspWUGetNumFileToCopyRequest &req, IEspWUGetNumFileToCopyResponse &resp);
  268. bool onWUListLocalFileRequired(IEspContext& context, IEspWUListLocalFileRequiredRequest& req, IEspWUListLocalFileRequiredResponse& resp);
  269. bool onWUAddLocalFileToWorkunit(IEspContext& context, IEspWUAddLocalFileToWorkunitRequest& req, IEspWUAddLocalFileToWorkunitResponse& resp);
  270. bool onWUClusterJobQueueXLS(IEspContext &context, IEspWUClusterJobQueueXLSRequest &req, IEspWUClusterJobQueueXLSResponse &resp);
  271. bool onWUClusterJobQueueLOG(IEspContext &context,IEspWUClusterJobQueueLOGRequest &req, IEspWUClusterJobQueueLOGResponse &resp);
  272. bool onWUClusterJobXLS(IEspContext &context, IEspWUClusterJobXLSRequest &req, IEspWUClusterJobXLSResponse &resp);
  273. bool onWUClusterJobSummaryXLS(IEspContext &context, IEspWUClusterJobSummaryXLSRequest &req, IEspWUClusterJobSummaryXLSResponse &resp);
  274. bool onWUGetThorJobQueue(IEspContext &context, IEspWUGetThorJobQueueRequest &req, IEspWUGetThorJobQueueResponse &resp);
  275. bool onWUGetThorJobList(IEspContext &context, IEspWUGetThorJobListRequest &req, IEspWUGetThorJobListResponse &resp);
  276. bool onWUCDebug(IEspContext &context, IEspWUDebugRequest &req, IEspWUDebugResponse &resp);
  277. bool onWUDeployWorkunit(IEspContext &context, IEspWUDeployWorkunitRequest & req, IEspWUDeployWorkunitResponse & resp);
  278. bool onWUDetails(IEspContext &context, IEspWUDetailsRequest &req, IEspWUDetailsResponse &resp);
  279. bool onWUDetailsMeta(IEspContext &context, IEspWUDetailsMetaRequest &req, IEspWUDetailsMetaResponse &resp);
  280. void setPort(unsigned short _port){port=_port;}
  281. bool isQuerySuspended(const char* query, IConstWUClusterInfo *clusterInfo, unsigned wait, StringBuffer& errorMessage);
  282. bool onWUCreateZAPInfo(IEspContext &context, IEspWUCreateZAPInfoRequest &req, IEspWUCreateZAPInfoResponse &resp);
  283. bool onWUGetZAPInfo(IEspContext &context, IEspWUGetZAPInfoRequest &req, IEspWUGetZAPInfoResponse &resp);
  284. bool onWUCheckFeatures(IEspContext &context, IEspWUCheckFeaturesRequest &req, IEspWUCheckFeaturesResponse &resp);
  285. bool onWUGetStats(IEspContext &context, IEspWUGetStatsRequest &req, IEspWUGetStatsResponse &resp);
  286. bool onWUListArchiveFiles(IEspContext &context, IEspWUListArchiveFilesRequest &req, IEspWUListArchiveFilesResponse &resp);
  287. bool onWUGetArchiveFile(IEspContext &context, IEspWUGetArchiveFileRequest &req, IEspWUGetArchiveFileResponse &resp);
  288. bool onWUEclDefinitionAction(IEspContext &context, IEspWUEclDefinitionActionRequest &req, IEspWUEclDefinitionActionResponse &resp);
  289. bool onWUGetPlugins(IEspContext &context, IEspWUGetPluginsRequest &req, IEspWUGetPluginsResponse &resp);
  290. bool unsubscribeServiceFromDali() override
  291. {
  292. return filesInUse.unsubscribe();
  293. }
  294. bool subscribeServiceToDali() override
  295. {
  296. return filesInUse.subscribe();
  297. }
  298. bool attachServiceToDali() override
  299. {
  300. m_sched.setDetachedState(false);
  301. return true;
  302. }
  303. bool detachServiceFromDali() override
  304. {
  305. m_sched.setDetachedState(true);
  306. return true;
  307. }
  308. private:
  309. IPropertyTree* sendControlQuery(IEspContext &context, const char* target, const char* query, unsigned timeout);
  310. bool resetQueryStats(IEspContext &context, const char* target, IProperties* queryIds, IEspWUQuerySetQueryActionResponse& resp);
  311. void readGraph(IEspContext& context, const char* subGraphId, WUGraphIDType& id, bool running,
  312. IConstWUGraph* graph, IArrayOf<IEspECLGraphEx>& graphs);
  313. IPropertyTree* getWorkunitArchive(IEspContext &context, WsWuInfo& winfo, const char* wuid, unsigned cacheMinutes);
  314. void readSuperFiles(IEspContext &context, IReferencedFile* rf, const char* fileName, IReferencedFileList* wufiles, IArrayOf<IEspQuerySuperFile>* files);
  315. IReferencedFile* getReferencedFileByName(const char* name, IReferencedFileList* wufiles);
  316. void checkEclDefinitionSyntax(IEspContext &context, const char *target, const char *eclDefinition,
  317. int msToWait, IArrayOf<IConstWUEclDefinitionActionResult> &results);
  318. bool deployEclDefinition(IEspContext &context, const char *target, const char *name, int msToWait, StringBuffer &wuid, StringBuffer &result);
  319. void deployEclDefinition(IEspContext &context, const char *target, const char *eclDefinition, int msToWait, IArrayOf<IConstWUEclDefinitionActionResult> &results);
  320. void publishEclDefinition(IEspContext &context, const char *target, const char* eclDefinition, int msToWait, IEspWUEclDefinitionActionRequest &req,
  321. IArrayOf<IConstWUEclDefinitionActionResult> &results);
  322. const char* gatherQueryFileCopyErrors(IArrayOf<IConstLogicalFileError> &errors, StringBuffer &msg);
  323. bool readDeployWUResponse(CWUDeployWorkunitResponse* deployResponse, StringBuffer &wuid, StringBuffer &result);
  324. const char* gatherExceptionMessage(const IMultiException &me, StringBuffer &exceptionMsg);
  325. const char* gatherWUException(IConstWUExceptionIterator &it, StringBuffer &exceptionMsg);
  326. const char* gatherECLException(IArrayOf<IConstECLException> &exceptions, StringBuffer &exceptionMsg);
  327. void addEclDefinitionActionResult(const char *eclDefinition, const char *result, const char *wuid,
  328. const char *queryID, const char* strAction, bool logResult, IArrayOf<IConstWUEclDefinitionActionResult> &results);
  329. void checkAddToInProgressECLJobList(double version, const char* wuid, const char* graph,
  330. const char* subGraph, const char* cluster, const char* startTime, const char* endTime,
  331. IArrayOf<IEspECLJob>& eclJobList, IArrayOf<IEspECLJob>& inProgressECLJobList);
  332. void getInProgressThorJobsFromAuditLog(double version, CDateTime& queryAuditLogFrom, CDateTime& queryAuditLogTo,
  333. const char* queryAuditLogToStr, const char* cluster, IArrayOf<IEspECLJob>& eclJobList,
  334. IArrayOf<IEspECLJob>& inProgressECLJobList);
  335. void getPreviousInProgressThorJobsFromAuditLog(double version, CDateTime queryAuditLogFrom, const char *queryAuditLogToStr,
  336. const char *cluster, IArrayOf<IEspECLJob> &eclJobList, IArrayOf<IEspECLJob> &inProgressECLJobList);
  337. bool getThorJobsFromAuditLog(double version, CDateTime &queryAuditLogFrom, CDateTime &queryAuditLogTo,
  338. const char *cluster, unsigned maxJobsToReturn, IArrayOf<IEspECLJob> &eclJobList);
  339. void readQueryAggregateStats(IPropertyTree *queryStats, const char *status, const char *ep,
  340. IArrayOf<IEspQuerySummaryStats> &querySummaryStatsList);
  341. void readQueryStatsRecord(IPropertyTree *queryRecord, IArrayOf<IEspQueryStatsRecord> &recordList);
  342. void readQueryStats(IPropertyTree *queryStatsTree, const char *id, bool all,
  343. IArrayOf<IEspQueryStats> &queryStatsList);
  344. void readQueryStatsList(IPropertyTree *queryStatsTree, const char *status, const char *ep,
  345. bool all, IArrayOf<IEspEndpointQueryStats> &endpointQueryStatsList);
  346. void getWsWuResult(IEspContext &context, const char *wuid, const char *name, const char *logical, unsigned index, __int64 start,
  347. unsigned &count, __int64 &total, IStringVal &resname, bool bin, IArrayOf<IConstNamedValue> *filterBy, MemoryBuffer &mb,
  348. WUState &wuState, bool xsd=true);
  349. void getFileResults(IEspContext &context, const char *logicalName, const char *cluster, __int64 start, unsigned &count, __int64 &total,
  350. IStringVal &resname, bool bin, IArrayOf<IConstNamedValue> *filterBy, MemoryBuffer &buf, bool xsd);
  351. void getSuspendedQueriesByCluster(MapStringTo<bool> &suspendedByCluster, const char *querySet, const char *queryID, bool checkAllNodes);
  352. void addSuspendedQueryIDs(MapStringTo<bool> &suspendedQueryIDs, IPropertyTree *queriesOnCluster, const char *target);
  353. void getWUQueryDetails(IEspContext &context, CWUQueryDetailsReq &req, IEspWUQueryDetailsResponse &resp);
  354. void readPluginFolders(StringBuffer &eclccPaths, StringArray &pluginFolders);
  355. void findPlugins(const char *pluginFolder, bool dotSoFile, StringArray &plugins);
  356. bool checkPluginECLAttr(const char *fileNameWithPath);
  357. unsigned awusCacheMinutes;
  358. StringBuffer queryDirectory;
  359. StringBuffer envLocalAddress;
  360. StringAttr daliServers;
  361. Owned<DataCache> dataCache;
  362. Owned<ArchivedWuCache> archivedWuCache;
  363. Owned<WUArchiveCache> wuArchiveCache;
  364. StringAttr sashaServerIp;
  365. unsigned short sashaServerPort;
  366. BoolHash validClusters;
  367. CriticalSection crit;
  368. WUSchedule m_sched;
  369. unsigned short port;
  370. Owned<IPropertyTree> directories;
  371. int maxRequestEntityLength;
  372. Owned<IThreadPool> clusterQueryStatePool;
  373. unsigned thorSlaveLogThreadPoolSize = THOR_SLAVE_LOG_THREAD_POOL_SIZE;
  374. Owned<IWorkUnitFactory> wuFactory;
  375. StringBuffer dataDirectory;
  376. __uint64 wuResultMaxSize = defaultWUResultMaxSize;
  377. public:
  378. QueryFilesInUse filesInUse;
  379. StringAttr zapEmailTo, zapEmailFrom, zapEmailServer;
  380. unsigned zapEmailMaxAttachmentSize = 0;
  381. unsigned zapEmailServerPort = 0;
  382. };
  383. class CWsWorkunitsSoapBindingEx : public CWsWorkunitsSoapBinding
  384. {
  385. void createAndDownloadWUZAPFile(IEspContext &context, CHttpRequest *request, CHttpResponse *response);
  386. void downloadWUFiles(IEspContext &context, CHttpRequest *request, CHttpResponse *response);
  387. public:
  388. CWsWorkunitsSoapBindingEx(IPropertyTree *cfg, const char *name, const char *process, http_soap_log_level llevel) : CWsWorkunitsSoapBinding(cfg, name, process, llevel)
  389. {
  390. wswService = NULL;
  391. VStringBuffer xpath("Software/EspProcess[@name=\"%s\"]/EspBinding[@name=\"%s\"]/BatchWatch", process, name);
  392. batchWatchFeaturesOnly = cfg->getPropBool(xpath.str(), false);
  393. directories.set(cfg->queryPropTree("Software/Directories"));
  394. xpath.setf("Software/EspProcess[@name=\"%s\"]/EspBinding[@name=\"%s\"]/@service", process, name);
  395. const char *service = cfg->queryProp(xpath);
  396. xpath.setf("Software/EspProcess[@name=\"%s\"]/EspService[@name=\"%s\"]/ThorSlaveLogThreadPoolSize", process, service);
  397. thorSlaveLogThreadPoolSize = cfg->getPropInt(xpath, THOR_SLAVE_LOG_THREAD_POOL_SIZE);
  398. xpath.setf("Software/EspProcess[@name=\"%s\"]/EspService[@name=\"%s\"]/WUResultDownloadFlushThreshold", process, service);
  399. wuResultDownloadFlushThreshold = cfg->getPropInt(xpath, defaultWUResultDownloadFlushThreshold);
  400. }
  401. virtual void getNavigationData(IEspContext &context, IPropertyTree & data)
  402. {
  403. if (queryComponentConfig().getPropBool("@api_only"))
  404. {
  405. CHttpSoapBinding::getNavigationData(context, data);
  406. return;
  407. }
  408. if (!batchWatchFeaturesOnly)
  409. {
  410. IPropertyTree *folder = ensureNavFolder(data, "ECL", "Run Ecl code and review Ecl workunits", NULL, false, 2);
  411. ensureNavLink(*folder, "Search Workunits", "/WsWorkunits/WUQuery?form_", "Search Workunits", NULL, NULL, 1);
  412. ensureNavLink(*folder, "Browse Workunits", "/WsWorkunits/WUQuery", "Browse Workunits", NULL, NULL, 2);
  413. ensureNavLink(*folder, "ECL Playground", "/esp/files/stub.htm?Widget=ECLPlaygroundWidget", "ECL Editor, Executor, Graph and Result Viewer", NULL, NULL, 4);
  414. IPropertyTree *folderQueryset = ensureNavFolder(data, "Queries", NULL, NULL, false, 3);
  415. ensureNavLink(*folderQueryset, "Browse", "/WsWorkunits/WUQuerySets", "Browse Published Queries");
  416. }
  417. }
  418. int onGetForm(IEspContext &context, CHttpRequest* request, CHttpResponse* response, const char *service, const char *method);
  419. int onGet(CHttpRequest* request, CHttpResponse* response);
  420. int onStartUpload(IEspContext& ctx, CHttpRequest* request, CHttpResponse* response, const char* service, const char* method);
  421. virtual void addService(const char * name, const char * host, unsigned short port, IEspService & service)
  422. {
  423. wswService = dynamic_cast<CWsWorkunitsEx*>(&service);
  424. if (wswService)
  425. wswService->setPort(port);
  426. CWsWorkunitsSoapBinding::addService(name, host, port, service);
  427. }
  428. private:
  429. bool batchWatchFeaturesOnly;
  430. CWsWorkunitsEx *wswService;
  431. Owned<IPropertyTree> directories;
  432. unsigned thorSlaveLogThreadPoolSize = THOR_SLAVE_LOG_THREAD_POOL_SIZE;
  433. size32_t wuResultDownloadFlushThreshold = defaultWUResultDownloadFlushThreshold;
  434. };
  435. void deploySharedObject(IEspContext &context, StringBuffer &wuid, const char *filename, const char *cluster, const char *name, const MemoryBuffer &obj, const char *dir, const char *xml=NULL);
  436. class CClusterQueryStateParam : public CInterface
  437. {
  438. Linked<CWsWorkunitsEx> wsWorkunitsService;
  439. IEspContext& context;
  440. StringAttr cluster;
  441. StringAttr querySetId;
  442. IArrayOf<IEspQuerySetQuery>& queries;
  443. bool checkAllNodes;
  444. public:
  445. IMPLEMENT_IINTERFACE;
  446. CClusterQueryStateParam(CWsWorkunitsEx* _service, IEspContext& _context, const char* _cluster, const char* _querySetId, IArrayOf<IEspQuerySetQuery>& _queries, bool _checkAllNodes)
  447. : wsWorkunitsService(_service), context(_context), cluster(_cluster), querySetId(_querySetId), queries(_queries), checkAllNodes(_checkAllNodes)
  448. {
  449. }
  450. virtual void doWork()
  451. {
  452. wsWorkunitsService->checkAndSetClusterQueryState(context, cluster.get(), querySetId.get(), queries, checkAllNodes);
  453. }
  454. };
  455. class CClusterQueryStateThreadFactory : public CInterface, public IThreadFactory
  456. {
  457. class CClusterQueryStateThread : public CInterface, implements IPooledThread
  458. {
  459. Owned<CClusterQueryStateParam> param;
  460. public:
  461. IMPLEMENT_IINTERFACE;
  462. virtual void init(void *_param) override
  463. {
  464. param.setown((CClusterQueryStateParam *)_param);
  465. }
  466. virtual void threadmain() override
  467. {
  468. param->doWork();
  469. param.clear();
  470. }
  471. virtual bool canReuse() const override
  472. {
  473. return true;
  474. }
  475. virtual bool stop() override
  476. {
  477. return true;
  478. }
  479. };
  480. public:
  481. IMPLEMENT_IINTERFACE;
  482. IPooledThread *createNew()
  483. {
  484. return new CClusterQueryStateThread();
  485. }
  486. };
  487. bool origValueChanged(const char *newValue, const char *origValue, StringBuffer &s, bool nillable=true);
  488. bool doProtectWorkunits(IEspContext& context, StringArray& wuids, IArrayOf<IConstWUActionResult>* results);
  489. bool doUnProtectWorkunits(IEspContext& context, StringArray& wuids, IArrayOf<IConstWUActionResult>* results);
  490. #endif