ws_workunitsService.hpp 30 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588
  1. /*##############################################################################
  2. HPCC SYSTEMS software Copyright (C) 2012 HPCC Systems®.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. ############################################################################## */
  13. #ifndef _ESPWIZ_ws_workunits_HPP__
  14. #define _ESPWIZ_ws_workunits_HPP__
  15. #include "ws_workunits_esp.ipp"
  16. #include "workunit.hpp"
  17. #include "ws_workunitsHelpers.hpp"
  18. #include "dasds.hpp"
  19. #include "environment.hpp"
  20. #ifdef _USE_ZLIB
  21. #include "zcrypt.hpp"
  22. #endif
  23. #include "referencedfilelist.hpp"
  24. #include "ws_wuresult.hpp"
  25. #include "jsmartsock.ipp"
  26. #include <atomic>
  27. #define UFO_DIRTY 0x01
  28. #define UFO_RELOAD_TARGETS_CHANGED_PMID 0x02
  29. #define UFO_RELOAD_MAPPED_QUERIES 0x04
  30. #define UFO_REMOVE_QUERIES_NOT_IN_QUERYSET 0x08
  31. static const __uint64 defaultWUResultMaxSize = 0x100000*10; //10M
  32. class QueryFilesInUse : public CInterface, implements ISDSSubscription, implements IThreaded
  33. {
  34. mutable CriticalSection crit;
  35. CThreaded threaded;
  36. Semaphore sem;
  37. MapStringTo<IUserDescriptor *> roxieUserMap;
  38. IArrayOf<IUserDescriptor> roxieUsers;
  39. Owned<IPropertyTree> tree;
  40. SubscriptionId qsChange;
  41. SubscriptionId pmChange;
  42. SubscriptionId psChange;
  43. mutable CriticalSection dirtyCrit; //if there were an atomic_or I would just use atomic
  44. unsigned dirty;
  45. std::atomic_bool aborting;
  46. private:
  47. void loadTarget(IPropertyTree *tree, const char *target, unsigned flags);
  48. void loadTargets(IPropertyTree *tree, unsigned flags);
  49. void load(unsigned flags)
  50. {
  51. Owned<IPropertyTree> t = createPTreeFromIPT(tree);
  52. loadTargets(t, flags);
  53. tree.setown(t.getClear());
  54. }
  55. void updateUsers()
  56. {
  57. #ifdef _CONTAINERIZED
  58. IERRLOG("CONTAINERIZED(QueryFilesInUse::updateUsers)");
  59. #else
  60. Owned<IStringIterator> clusters = getTargetClusters("RoxieCluster", NULL);
  61. ForEach(*clusters)
  62. {
  63. SCMStringBuffer target;
  64. clusters->str(target);
  65. Owned<IConstWUClusterInfo> info = getTargetClusterInfo(target.str());
  66. Owned<IUserDescriptor> user = createUserDescriptor();
  67. user->set(info->getLdapUser(), info->getLdapPassword());
  68. roxieUserMap.setValue(target.str(), user);
  69. roxieUsers.append(*user.getClear());
  70. }
  71. #endif
  72. }
  73. public:
  74. IMPLEMENT_IINTERFACE;
  75. QueryFilesInUse() : threaded("QueryFilesInUse"), aborting(false), qsChange(0), pmChange(0), psChange(0), dirty(UFO_DIRTY)
  76. {
  77. tree.setown(createPTree("QueryFilesInUse"));
  78. updateUsers();
  79. threaded.init(this);
  80. }
  81. ~QueryFilesInUse()
  82. {
  83. aborting = true;
  84. sem.signal();
  85. threaded.join();
  86. }
  87. virtual void threadmain()
  88. {
  89. while (!aborting)
  90. {
  91. //prepopulate the cache, lazy mode is very slow
  92. //getTree() builds the cache, but only does work if the cache is dirty (changes in dali would have dirtied the cache)
  93. Owned<IPropertyTree> thetree = getTree();
  94. //wait 1 minute, then check if dirty again
  95. if (sem.wait(60000))
  96. break;
  97. }
  98. }
  99. virtual void notify(SubscriptionId subid, const char *xpath, SDSNotifyFlags flags, unsigned valueLen, const void *valueData)
  100. {
  101. Linked<QueryFilesInUse> me = this; // Ensure that I am not released by the notify call (which would then access freed memory to release the critsec)
  102. CriticalBlock b(dirtyCrit);
  103. if (subid == qsChange)
  104. dirty |= UFO_REMOVE_QUERIES_NOT_IN_QUERYSET;
  105. else if (subid == pmChange)
  106. dirty |= UFO_RELOAD_MAPPED_QUERIES;
  107. else if (subid == psChange)
  108. dirty |= UFO_RELOAD_TARGETS_CHANGED_PMID;
  109. PROGLOG("QueryFilesInUse.notify() called: <%d>", dirty);
  110. }
  111. virtual bool subscribe()
  112. {
  113. CriticalBlock b(crit);
  114. bool success = true;
  115. try
  116. {
  117. qsChange = querySDS().subscribe("QuerySets", *this, true);
  118. pmChange = querySDS().subscribe("PackageMaps", *this, true);
  119. psChange = querySDS().subscribe("PackageSets", *this, true);
  120. PROGLOG("QueryFilesInUse.subscribe() called: QuerySets PackageMaps PackageSets");
  121. }
  122. catch (IException *E)
  123. {
  124. success = false;
  125. //TBD failure to subscribe implies dali is down...
  126. E->Release();
  127. }
  128. return success && qsChange != 0 && pmChange != 0 && psChange != 0;
  129. }
  130. virtual bool unsubscribe()
  131. {
  132. CriticalBlock b(crit);
  133. bool success = true;
  134. try
  135. {
  136. if (qsChange)
  137. querySDS().unsubscribe(qsChange);
  138. if (pmChange)
  139. querySDS().unsubscribe(pmChange);
  140. if (psChange)
  141. querySDS().unsubscribe(psChange);
  142. }
  143. catch (IException *E)
  144. {
  145. success = false;
  146. E->Release();
  147. }
  148. qsChange = 0;
  149. pmChange = 0;
  150. psChange = 0;
  151. PROGLOG("QueryFilesInUse.unsubscribe() called");
  152. return success && qsChange == 0 && pmChange == 0 && psChange == 0;
  153. }
  154. void abort()
  155. {
  156. aborting=true;
  157. CriticalBlock b(crit);
  158. }
  159. IPropertyTree *getTree()
  160. {
  161. //getTree() is thread safe because when load(flags) is called below it creates a new working tree,
  162. // the previous tree might still be in use outside this class, in which case it will not actually freed until there are no remaining references outstanding
  163. CriticalBlock b(crit);
  164. unsigned flags;
  165. {
  166. CriticalBlock b(dirtyCrit);
  167. flags = dirty;
  168. dirty = 0;
  169. }
  170. if (flags)
  171. load(flags);
  172. return LINK(tree);
  173. }
  174. IPropertyTreeIterator *findAllQueriesUsingFile(const char *lfn);
  175. IPropertyTreeIterator *findQueriesUsingFile(const char *target, const char *lfn, StringAttr &pmid);
  176. StringBuffer &toStr(StringBuffer &s)
  177. {
  178. Owned<IPropertyTree> t = getTree();
  179. return toXML(t, s);
  180. }
  181. };
  182. struct WUShowScheduledFilters
  183. {
  184. StringAttr cluster, state, eventName, jobName, owner, eventText;
  185. WUShowScheduledFilters(const char *_cluster, const char *_state, const char *_owner,
  186. const char *_jobName, const char *_eventName, const char *_eventText)
  187. : cluster(_cluster), state(_state), owner(_owner),
  188. jobName(_jobName), eventName(_eventName), eventText(_eventText) {};
  189. };
  190. class CWUQueryDetailsReq
  191. {
  192. StringAttr querySet, queryIdOrAlias;
  193. bool includeWUDetails = false;
  194. bool IncludeWUQueryFiles = false;
  195. bool includeSuperFiles = false;
  196. bool includeWsEclAddresses = false;
  197. bool includeStateOnClusters = false;
  198. bool checkAllNodes = false;
  199. public:
  200. CWUQueryDetailsReq(IEspWUQueryDetailsRequest &req);
  201. CWUQueryDetailsReq(IEspWUQueryDetailsLightWeightRequest &req);
  202. const char *getQuerySet() const { return querySet.get(); }
  203. const char *getQueryIdOrAlias() const { return queryIdOrAlias.get(); }
  204. const bool getIncludeWUDetails() const { return includeWUDetails; }
  205. const bool getIncludeWUQueryFiles() const { return IncludeWUQueryFiles; }
  206. const bool getIncludeSuperFiles() const { return includeSuperFiles; }
  207. const bool getIncludeWsEclAddresses() const { return includeWsEclAddresses; }
  208. const bool getIncludeStateOnClusters() const { return includeStateOnClusters; }
  209. const bool getCheckAllNodes() const { return checkAllNodes; }
  210. };
  211. class CWsWorkunitsEx : public CWsWorkunits
  212. {
  213. public:
  214. IMPLEMENT_IINTERFACE;
  215. CWsWorkunitsEx() : maxRequestEntityLength(0) {port=8010;}
  216. virtual ~CWsWorkunitsEx()
  217. {
  218. filesInUse.unsubscribe();
  219. filesInUse.abort();
  220. clusterQueryStatePool.clear();
  221. };
  222. virtual void init(IPropertyTree *cfg, const char *process, const char *service);
  223. virtual void setContainer(IEspContainer * container)
  224. {
  225. CWsWorkunits::setContainer(container);
  226. m_sched.setContainer(container);
  227. }
  228. void deploySharedObjectReq(IEspContext &context, IEspWUDeployWorkunitRequest & req, IEspWUDeployWorkunitResponse & resp, const char *dir, const char *xml=NULL);
  229. unsigned getGraphIdsByQueryId(const char *target, const char *queryId, StringArray& graphIds);
  230. bool getQueryFiles(IEspContext &context, const char* wuid, const char* query, const char* target, StringArray& logicalFiles, IArrayOf<IEspQuerySuperFile> *superFiles);
  231. void getGraphsByQueryId(const char *target, const char *queryId, const char *graphName, const char *subGraphId, IArrayOf<IEspECLGraphEx>& ECLGraphs);
  232. void checkAndSetClusterQueryState(IEspContext &context, const char* cluster, const char* querySetId, IArrayOf<IEspQuerySetQuery>& queries, bool checkAllNodes);
  233. void checkAndSetClusterQueryState(IEspContext &context, const char* cluster, StringArray& querySetIds, IArrayOf<IEspQuerySetQuery>& queries, bool checkAllNodes);
  234. IWorkUnitFactory *queryWUFactory() { return wuFactory; };
  235. const char *getTempDirectory() const { return tempDirectory.str(); };
  236. bool onWUQuery(IEspContext &context, IEspWUQueryRequest &req, IEspWUQueryResponse &resp);
  237. bool onWULightWeightQuery(IEspContext &context, IEspWULightWeightQueryRequest &req, IEspWULightWeightQueryResponse &resp);
  238. bool onWUPublishWorkunit(IEspContext &context, IEspWUPublishWorkunitRequest & req, IEspWUPublishWorkunitResponse & resp);
  239. bool onWUQuerysets(IEspContext &context, IEspWUQuerysetsRequest & req, IEspWUQuerysetsResponse & resp);
  240. bool onWUQuerysetDetails(IEspContext &context, IEspWUQuerySetDetailsRequest & req, IEspWUQuerySetDetailsResponse & resp);
  241. bool onWUQuerysetExport(IEspContext &context, IEspWUQuerysetExportRequest &req, IEspWUQuerysetExportResponse &resp);
  242. bool onWUQuerysetImport(IEspContext &context, IEspWUQuerysetImportRequest &req, IEspWUQuerysetImportResponse &resp);
  243. bool onWUMultiQuerysetDetails(IEspContext &context, IEspWUMultiQuerySetDetailsRequest &req, IEspWUMultiQuerySetDetailsResponse &resp);
  244. bool onWUQuerysetQueryAction(IEspContext &context, IEspWUQuerySetQueryActionRequest & req, IEspWUQuerySetQueryActionResponse & resp);
  245. bool onWUQuerysetAliasAction(IEspContext &context, IEspWUQuerySetAliasActionRequest &req, IEspWUQuerySetAliasActionResponse &resp);
  246. bool onWUQueryConfig(IEspContext &context, IEspWUQueryConfigRequest &req, IEspWUQueryConfigResponse &resp);
  247. bool onWUQuerysetCopyQuery(IEspContext &context, IEspWUQuerySetCopyQueryRequest &req, IEspWUQuerySetCopyQueryResponse &resp);
  248. bool onWUCopyQuerySet(IEspContext &context, IEspWUCopyQuerySetRequest &req, IEspWUCopyQuerySetResponse &resp);
  249. bool onWUCopyLogicalFiles(IEspContext &context, IEspWUCopyLogicalFilesRequest &req, IEspWUCopyLogicalFilesResponse &resp);
  250. bool onWUQueryDetails(IEspContext &context, IEspWUQueryDetailsRequest & req, IEspWUQueryDetailsResponse & resp);
  251. bool onWUQueryDetailsLightWeight(IEspContext &context, IEspWUQueryDetailsLightWeightRequest & req, IEspWUQueryDetailsResponse & resp);
  252. bool onWUListQueries(IEspContext &context, IEspWUListQueriesRequest &req, IEspWUListQueriesResponse &resp);
  253. bool onWUListQueriesUsingFile(IEspContext &context, IEspWUListQueriesUsingFileRequest &req, IEspWUListQueriesUsingFileResponse &resp);
  254. bool onWUQueryFiles(IEspContext &context, IEspWUQueryFilesRequest &req, IEspWUQueryFilesResponse &resp);
  255. bool onWUUpdateQueryEntry(IEspContext &context, IEspWUUpdateQueryEntryRequest &req, IEspWUUpdateQueryEntryResponse &resp);
  256. bool onWUInfo(IEspContext &context, IEspWUInfoRequest &req, IEspWUInfoResponse &resp);
  257. bool onWUInfoDetails(IEspContext &context, IEspWUInfoRequest &req, IEspWUInfoResponse &resp);
  258. bool onWUFile(IEspContext &context,IEspWULogFileRequest &req, IEspWULogFileResponse &resp);
  259. bool onWUResult(IEspContext &context,IEspWUResultRequest &req, IEspWUResultResponse &resp);
  260. bool onWUFullResult(IEspContext &context, IEspWUFullResultRequest &req, IEspWUFullResultResponse &resp);
  261. bool onWUResultView(IEspContext &context, IEspWUResultViewRequest &req, IEspWUResultViewResponse &resp);
  262. bool onWUResultSummary(IEspContext &context, IEspWUResultSummaryRequest &req, IEspWUResultSummaryResponse &resp);
  263. bool onWUResultBin(IEspContext &context, IEspWUResultBinRequest &req, IEspWUResultBinResponse &resp);
  264. bool onWUGraphInfo(IEspContext &context,IEspWUGraphInfoRequest &req, IEspWUGraphInfoResponse &resp);
  265. bool onWUGVCGraphInfo(IEspContext &context,IEspWUGVCGraphInfoRequest &req, IEspWUGVCGraphInfoResponse &resp);
  266. bool onWUGetGraphNameAndTypes(IEspContext &context,IEspWUGetGraphNameAndTypesRequest &req, IEspWUGetGraphNameAndTypesResponse &resp);
  267. bool onWUProcessGraph(IEspContext &context,IEspWUProcessGraphRequest &req, IEspWUProcessGraphResponse &resp);
  268. bool onGVCAjaxGraph(IEspContext &context, IEspGVCAjaxGraphRequest &req, IEspGVCAjaxGraphResponse &resp);
  269. bool onWUAction(IEspContext &context, IEspWUActionRequest &req, IEspWUActionResponse &resp);
  270. bool onWUShowScheduled(IEspContext &context, IEspWUShowScheduledRequest &req, IEspWUShowScheduledResponse &resp);
  271. bool onWUUpdate(IEspContext &context, IEspWUUpdateRequest &req, IEspWUUpdateResponse &resp);
  272. bool onWUDelete(IEspContext &context, IEspWUDeleteRequest &req, IEspWUDeleteResponse &resp);
  273. bool onWUProtect(IEspContext &context, IEspWUProtectRequest &req, IEspWUProtectResponse &resp);
  274. bool onWUAbort(IEspContext &context, IEspWUAbortRequest &req, IEspWUAbortResponse &resp);
  275. bool onWUSchedule(IEspContext &context, IEspWUScheduleRequest &req, IEspWUScheduleResponse &resp);
  276. bool onWUSubmit(IEspContext &context, IEspWUSubmitRequest &req, IEspWUSubmitResponse &resp);
  277. bool onWURun(IEspContext &context, IEspWURunRequest &req, IEspWURunResponse &resp);
  278. bool onWUCreate(IEspContext &context, IEspWUCreateRequest &req, IEspWUCreateResponse &resp);
  279. bool onWUCreateAndUpdate(IEspContext &context, IEspWUUpdateRequest &req, IEspWUUpdateResponse &resp);
  280. bool onWURecreateQuery(IEspContext &context, IEspWURecreateQueryRequest &req, IEspWURecreateQueryResponse &resp);
  281. bool onWUResubmit(IEspContext &context, IEspWUResubmitRequest &req, IEspWUResubmitResponse &resp);
  282. bool onWUPushEvent(IEspContext &context, IEspWUPushEventRequest &req, IEspWUPushEventResponse &resp);
  283. bool onWUExport(IEspContext &context, IEspWUExportRequest &req, IEspWUExportResponse &resp);
  284. bool onWUWaitCompiled(IEspContext &context, IEspWUWaitRequest &req, IEspWUWaitResponse &resp);
  285. bool onWUWaitComplete(IEspContext &context, IEspWUWaitRequest &req, IEspWUWaitResponse &resp);
  286. bool onWUSyntaxCheckECL(IEspContext &context, IEspWUSyntaxCheckRequest &req, IEspWUSyntaxCheckResponse &resp);
  287. bool onWUCompileECL(IEspContext &context, IEspWUCompileECLRequest &req, IEspWUCompileECLResponse &resp);
  288. bool onWUJobList(IEspContext &context, IEspWUJobListRequest &req, IEspWUJobListResponse &resp);
  289. bool onWUQueryGetGraph(IEspContext& context, IEspWUQueryGetGraphRequest& req, IEspWUQueryGetGraphResponse& resp);
  290. bool onWUQueryGetSummaryStats(IEspContext& context, IEspWUQueryGetSummaryStatsRequest& req, IEspWUQueryGetSummaryStatsResponse& resp);
  291. bool onWUGetGraph(IEspContext& context, IEspWUGetGraphRequest& req, IEspWUGetGraphResponse& resp);
  292. bool onWUGraphTiming(IEspContext& context, IEspWUGraphTimingRequest& req, IEspWUGraphTimingResponse& resp);
  293. bool onWUGetDependancyTrees(IEspContext& context, IEspWUGetDependancyTreesRequest& req, IEspWUGetDependancyTreesResponse& resp);
  294. bool onWUGetNumFileToCopy(IEspContext &context, IEspWUGetNumFileToCopyRequest &req, IEspWUGetNumFileToCopyResponse &resp);
  295. bool onWUListLocalFileRequired(IEspContext& context, IEspWUListLocalFileRequiredRequest& req, IEspWUListLocalFileRequiredResponse& resp);
  296. bool onWUAddLocalFileToWorkunit(IEspContext& context, IEspWUAddLocalFileToWorkunitRequest& req, IEspWUAddLocalFileToWorkunitResponse& resp);
  297. bool onWUClusterJobQueueXLS(IEspContext &context, IEspWUClusterJobQueueXLSRequest &req, IEspWUClusterJobQueueXLSResponse &resp);
  298. bool onWUClusterJobQueueLOG(IEspContext &context,IEspWUClusterJobQueueLOGRequest &req, IEspWUClusterJobQueueLOGResponse &resp);
  299. bool onWUClusterJobXLS(IEspContext &context, IEspWUClusterJobXLSRequest &req, IEspWUClusterJobXLSResponse &resp);
  300. bool onWUClusterJobSummaryXLS(IEspContext &context, IEspWUClusterJobSummaryXLSRequest &req, IEspWUClusterJobSummaryXLSResponse &resp);
  301. bool onWUGetThorJobQueue(IEspContext &context, IEspWUGetThorJobQueueRequest &req, IEspWUGetThorJobQueueResponse &resp);
  302. bool onWUGetThorJobList(IEspContext &context, IEspWUGetThorJobListRequest &req, IEspWUGetThorJobListResponse &resp);
  303. bool onWUCDebug(IEspContext &context, IEspWUDebugRequest &req, IEspWUDebugResponse &resp);
  304. bool onWUDeployWorkunit(IEspContext &context, IEspWUDeployWorkunitRequest & req, IEspWUDeployWorkunitResponse & resp);
  305. bool onWUDetails(IEspContext &context, IEspWUDetailsRequest &req, IEspWUDetailsResponse &resp);
  306. bool onWUDetailsMeta(IEspContext &context, IEspWUDetailsMetaRequest &req, IEspWUDetailsMetaResponse &resp);
  307. void setPort(unsigned short _port){port=_port;}
  308. #ifndef _CONTAINERIZED
  309. bool isQuerySuspended(const char* query, IConstWUClusterInfo *clusterInfo, unsigned wait, StringBuffer& errorMessage);
  310. #else
  311. bool isQuerySuspended(const char* query, const char* target, unsigned wait, StringBuffer& errorMessage);
  312. #endif
  313. bool onWUCreateZAPInfo(IEspContext &context, IEspWUCreateZAPInfoRequest &req, IEspWUCreateZAPInfoResponse &resp);
  314. bool onWUGetZAPInfo(IEspContext &context, IEspWUGetZAPInfoRequest &req, IEspWUGetZAPInfoResponse &resp);
  315. bool onWUCheckFeatures(IEspContext &context, IEspWUCheckFeaturesRequest &req, IEspWUCheckFeaturesResponse &resp);
  316. bool onWUGetStats(IEspContext &context, IEspWUGetStatsRequest &req, IEspWUGetStatsResponse &resp);
  317. bool onWUListArchiveFiles(IEspContext &context, IEspWUListArchiveFilesRequest &req, IEspWUListArchiveFilesResponse &resp);
  318. bool onWUGetArchiveFile(IEspContext &context, IEspWUGetArchiveFileRequest &req, IEspWUGetArchiveFileResponse &resp);
  319. bool onWUEclDefinitionAction(IEspContext &context, IEspWUEclDefinitionActionRequest &req, IEspWUEclDefinitionActionResponse &resp);
  320. bool onWUGetPlugins(IEspContext &context, IEspWUGetPluginsRequest &req, IEspWUGetPluginsResponse &resp);
  321. bool unsubscribeServiceFromDali() override
  322. {
  323. return filesInUse.unsubscribe();
  324. }
  325. bool subscribeServiceToDali() override
  326. {
  327. return filesInUse.subscribe();
  328. }
  329. bool attachServiceToDali() override
  330. {
  331. m_sched.setDetachedState(false);
  332. return true;
  333. }
  334. bool detachServiceFromDali() override
  335. {
  336. m_sched.setDetachedState(true);
  337. return true;
  338. }
  339. private:
  340. IPropertyTree* sendControlQuery(IEspContext &context, const char* target, const char* query, unsigned timeout);
  341. bool resetQueryStats(IEspContext &context, const char* target, IProperties* queryIds, IEspWUQuerySetQueryActionResponse& resp);
  342. void readGraph(IEspContext& context, const char* subGraphId, WUGraphIDType& id, bool running,
  343. IConstWUGraph* graph, IArrayOf<IEspECLGraphEx>& graphs);
  344. IPropertyTree* getWorkunitArchive(IEspContext &context, WsWuInfo& winfo, const char* wuid, unsigned cacheMinutes);
  345. void readSuperFiles(IEspContext &context, IReferencedFile* rf, const char* fileName, IReferencedFileList* wufiles, IArrayOf<IEspQuerySuperFile>* files);
  346. IReferencedFile* getReferencedFileByName(const char* name, IReferencedFileList* wufiles);
  347. void checkEclDefinitionSyntax(IEspContext &context, const char *target, const char *eclDefinition,
  348. int msToWait, IArrayOf<IConstWUEclDefinitionActionResult> &results);
  349. bool deployEclDefinition(IEspContext &context, const char *target, const char *name, int msToWait, StringBuffer &wuid, StringBuffer &result);
  350. void deployEclDefinition(IEspContext &context, const char *target, const char *eclDefinition, int msToWait, IArrayOf<IConstWUEclDefinitionActionResult> &results);
  351. void publishEclDefinition(IEspContext &context, const char *target, const char* eclDefinition, int msToWait, IEspWUEclDefinitionActionRequest &req,
  352. IArrayOf<IConstWUEclDefinitionActionResult> &results);
  353. const char* gatherQueryFileCopyErrors(IArrayOf<IConstLogicalFileError> &errors, StringBuffer &msg);
  354. bool readDeployWUResponse(CWUDeployWorkunitResponse* deployResponse, StringBuffer &wuid, StringBuffer &result);
  355. const char* gatherExceptionMessage(const IMultiException &me, StringBuffer &exceptionMsg);
  356. const char* gatherWUException(IConstWUExceptionIterator &it, StringBuffer &exceptionMsg);
  357. const char* gatherECLException(IArrayOf<IConstECLException> &exceptions, StringBuffer &exceptionMsg);
  358. void addEclDefinitionActionResult(const char *eclDefinition, const char *result, const char *wuid,
  359. const char *queryID, const char* strAction, bool logResult, IArrayOf<IConstWUEclDefinitionActionResult> &results);
  360. void checkAddToInProgressECLJobList(double version, const char* wuid, const char* graph,
  361. const char* subGraph, const char* cluster, const char* startTime, const char* endTime,
  362. IArrayOf<IEspECLJob>& eclJobList, IArrayOf<IEspECLJob>& inProgressECLJobList);
  363. void getInProgressThorJobsFromAuditLog(double version, CDateTime& queryAuditLogFrom, CDateTime& queryAuditLogTo,
  364. const char* queryAuditLogToStr, const char* cluster, IArrayOf<IEspECLJob>& eclJobList,
  365. IArrayOf<IEspECLJob>& inProgressECLJobList);
  366. void getPreviousInProgressThorJobsFromAuditLog(double version, CDateTime queryAuditLogFrom, const char *queryAuditLogToStr,
  367. const char *cluster, IArrayOf<IEspECLJob> &eclJobList, IArrayOf<IEspECLJob> &inProgressECLJobList);
  368. bool getThorJobsFromAuditLog(double version, CDateTime &queryAuditLogFrom, CDateTime &queryAuditLogTo,
  369. const char *cluster, unsigned maxJobsToReturn, IArrayOf<IEspECLJob> &eclJobList);
  370. void readQueryAggregateStats(IPropertyTree *queryStats, const char *status, const char *ep,
  371. IArrayOf<IEspQuerySummaryStats> &querySummaryStatsList);
  372. void readQueryStatsRecord(IPropertyTree *queryRecord, IArrayOf<IEspQueryStatsRecord> &recordList);
  373. void readQueryStats(IPropertyTree *queryStatsTree, const char *id, bool all,
  374. IArrayOf<IEspQueryStats> &queryStatsList);
  375. void readQueryStatsList(IPropertyTree *queryStatsTree, const char *status, const char *ep,
  376. bool all, IArrayOf<IEspEndpointQueryStats> &endpointQueryStatsList);
  377. void getWsWuResult(IEspContext &context, const char *wuid, const char *name, const char *logical, unsigned index, __int64 start,
  378. unsigned &count, __int64 &total, IStringVal &resname, bool bin, IArrayOf<IConstNamedValue> *filterBy, MemoryBuffer &mb,
  379. WUState &wuState, bool xsd=true);
  380. void getFileResults(IEspContext &context, const char *logicalName, const char *cluster, __int64 start, unsigned &count, __int64 &total,
  381. IStringVal &resname, bool bin, IArrayOf<IConstNamedValue> *filterBy, MemoryBuffer &buf, bool xsd);
  382. void getSuspendedQueriesByCluster(MapStringTo<bool> &suspendedByCluster, const char *querySet, const char *queryID, bool checkAllNodes);
  383. void addSuspendedQueryIDs(MapStringTo<bool> &suspendedQueryIDs, IPropertyTree *queriesOnCluster, const char *target);
  384. void getWUQueryDetails(IEspContext &context, CWUQueryDetailsReq &req, IEspWUQueryDetailsResponse &resp);
  385. void readPluginFolders(StringBuffer &eclccPaths, StringArray &pluginFolders);
  386. void findPlugins(const char *pluginFolder, bool dotSoFile, StringArray &plugins);
  387. bool checkPluginECLAttr(const char *fileNameWithPath);
  388. unsigned awusCacheMinutes;
  389. StringBuffer queryDirectory;
  390. StringBuffer envLocalAddress;
  391. StringAttr daliServers;
  392. Owned<DataCache> dataCache;
  393. Owned<ArchivedWuCache> archivedWuCache;
  394. Owned<WUArchiveCache> wuArchiveCache;
  395. StringAttr sashaServerIp;
  396. unsigned short sashaServerPort;
  397. WUSchedule m_sched;
  398. unsigned short port;
  399. Owned<IPropertyTree> directories;
  400. int maxRequestEntityLength;
  401. Owned<IThreadPool> clusterQueryStatePool;
  402. unsigned thorSlaveLogThreadPoolSize = THOR_SLAVE_LOG_THREAD_POOL_SIZE;
  403. Owned<IWorkUnitFactory> wuFactory;
  404. StringBuffer tempDirectory;
  405. __uint64 wuResultMaxSize = defaultWUResultMaxSize;
  406. public:
  407. QueryFilesInUse filesInUse;
  408. MapStringToMyClass<ISmartSocketFactory> roxieConnMap;
  409. StringAttr zapEmailTo, zapEmailFrom, zapEmailServer;
  410. unsigned zapEmailMaxAttachmentSize = 0;
  411. unsigned zapEmailServerPort = 0;
  412. };
  413. class CWsWorkunitsSoapBindingEx : public CWsWorkunitsSoapBinding
  414. {
  415. void createAndDownloadWUZAPFile(IEspContext &context, CHttpRequest *request, CHttpResponse *response);
  416. void downloadWUFiles(IEspContext &context, CHttpRequest *request, CHttpResponse *response);
  417. public:
  418. CWsWorkunitsSoapBindingEx(IPropertyTree *cfg, const char *name, const char *process, http_soap_log_level llevel) : CWsWorkunitsSoapBinding(cfg, name, process, llevel)
  419. {
  420. wswService = NULL;
  421. VStringBuffer xpath("Software/EspProcess[@name=\"%s\"]/EspBinding[@name=\"%s\"]/BatchWatch", process, name);
  422. batchWatchFeaturesOnly = cfg->getPropBool(xpath.str(), false);
  423. directories.set(cfg->queryPropTree("Software/Directories"));
  424. xpath.setf("Software/EspProcess[@name=\"%s\"]/EspBinding[@name=\"%s\"]/@service", process, name);
  425. const char *service = cfg->queryProp(xpath);
  426. xpath.setf("Software/EspProcess[@name=\"%s\"]/EspService[@name=\"%s\"]/ThorSlaveLogThreadPoolSize", process, service);
  427. thorSlaveLogThreadPoolSize = cfg->getPropInt(xpath, THOR_SLAVE_LOG_THREAD_POOL_SIZE);
  428. xpath.setf("Software/EspProcess[@name=\"%s\"]/EspService[@name=\"%s\"]/WUResultDownloadFlushThreshold", process, service);
  429. wuResultDownloadFlushThreshold = cfg->getPropInt(xpath, defaultWUResultDownloadFlushThreshold);
  430. }
  431. virtual void getNavigationData(IEspContext &context, IPropertyTree & data)
  432. {
  433. if (getComponentConfigSP()->getPropBool("@api_only"))
  434. {
  435. CHttpSoapBinding::getNavigationData(context, data);
  436. return;
  437. }
  438. if (!batchWatchFeaturesOnly)
  439. {
  440. IPropertyTree *folder = ensureNavFolder(data, "ECL", "Run Ecl code and review Ecl workunits", NULL, false, 2);
  441. ensureNavLink(*folder, "Search Workunits", "/WsWorkunits/WUQuery?form_", "Search Workunits", NULL, NULL, 1);
  442. ensureNavLink(*folder, "Browse Workunits", "/WsWorkunits/WUQuery", "Browse Workunits", NULL, NULL, 2);
  443. ensureNavLink(*folder, "ECL Playground", "/esp/files/stub.htm?Widget=ECLPlaygroundWidget", "ECL Editor, Executor, Graph and Result Viewer", NULL, NULL, 4);
  444. IPropertyTree *folderQueryset = ensureNavFolder(data, "Queries", NULL, NULL, false, 3);
  445. ensureNavLink(*folderQueryset, "Browse", "/WsWorkunits/WUQuerySets", "Browse Published Queries");
  446. }
  447. }
  448. #ifndef _CONTAINERIZED
  449. int onGetForm(IEspContext &context, CHttpRequest* request, CHttpResponse* response, const char *service, const char *method);
  450. #endif
  451. int onGet(CHttpRequest* request, CHttpResponse* response);
  452. int onStartUpload(IEspContext& ctx, CHttpRequest* request, CHttpResponse* response, const char* service, const char* method);
  453. virtual void addService(const char * name, const char * host, unsigned short port, IEspService & service)
  454. {
  455. wswService = dynamic_cast<CWsWorkunitsEx*>(&service);
  456. if (wswService)
  457. wswService->setPort(port);
  458. CWsWorkunitsSoapBinding::addService(name, host, port, service);
  459. }
  460. private:
  461. bool batchWatchFeaturesOnly;
  462. CWsWorkunitsEx *wswService;
  463. Owned<IPropertyTree> directories;
  464. unsigned thorSlaveLogThreadPoolSize = THOR_SLAVE_LOG_THREAD_POOL_SIZE;
  465. size32_t wuResultDownloadFlushThreshold = defaultWUResultDownloadFlushThreshold;
  466. };
  467. void deploySharedObject(IEspContext &context, StringBuffer &wuid, const char *filename, const char *cluster, const char *name, const MemoryBuffer &obj, const char *dir, const char *xml=NULL, bool protect=false);
  468. class CClusterQueryStateParam : public CInterface
  469. {
  470. Linked<CWsWorkunitsEx> wsWorkunitsService;
  471. IEspContext& context;
  472. StringAttr cluster;
  473. StringAttr querySetId;
  474. IArrayOf<IEspQuerySetQuery>& queries;
  475. bool checkAllNodes;
  476. public:
  477. IMPLEMENT_IINTERFACE;
  478. CClusterQueryStateParam(CWsWorkunitsEx* _service, IEspContext& _context, const char* _cluster, const char* _querySetId, IArrayOf<IEspQuerySetQuery>& _queries, bool _checkAllNodes)
  479. : wsWorkunitsService(_service), context(_context), cluster(_cluster), querySetId(_querySetId), queries(_queries), checkAllNodes(_checkAllNodes)
  480. {
  481. }
  482. virtual void doWork()
  483. {
  484. wsWorkunitsService->checkAndSetClusterQueryState(context, cluster.get(), querySetId.get(), queries, checkAllNodes);
  485. }
  486. };
  487. class CClusterQueryStateThreadFactory : public CInterface, public IThreadFactory
  488. {
  489. class CClusterQueryStateThread : public CInterface, implements IPooledThread
  490. {
  491. Owned<CClusterQueryStateParam> param;
  492. public:
  493. IMPLEMENT_IINTERFACE;
  494. virtual void init(void *_param) override
  495. {
  496. param.setown((CClusterQueryStateParam *)_param);
  497. }
  498. virtual void threadmain() override
  499. {
  500. param->doWork();
  501. param.clear();
  502. }
  503. virtual bool canReuse() const override
  504. {
  505. return true;
  506. }
  507. virtual bool stop() override
  508. {
  509. return true;
  510. }
  511. };
  512. public:
  513. IMPLEMENT_IINTERFACE;
  514. IPooledThread *createNew()
  515. {
  516. return new CClusterQueryStateThread();
  517. }
  518. };
  519. bool origValueChanged(const char *newValue, const char *origValue, StringBuffer &s, bool nillable=true);
  520. bool doProtectWorkunits(IEspContext& context, StringArray& wuids, IArrayOf<IConstWUActionResult>* results);
  521. bool doUnProtectWorkunits(IEspContext& context, StringArray& wuids, IArrayOf<IConstWUActionResult>* results);
  522. #endif