TpContainer.cpp 26 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795
  1. /*##############################################################################
  2. HPCC SYSTEMS software Copyright (C) 2021 HPCC Systems®.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. ############################################################################## */
  13. #pragma warning (disable : 4786)
  14. // TpWrapper.cpp: implementation of the CTpWrapper class.
  15. //
  16. //////////////////////////////////////////////////////////////////////
  17. #include "TpWrapper.hpp"
  18. #include <stdio.h>
  19. #include "workunit.hpp"
  20. #include "exception_util.hpp"
  21. #include "portlist.h"
  22. #include "daqueue.hpp"
  23. #include "dautils.hpp"
  24. #include "dameta.hpp"
  25. #include "hpccconfig.hpp"
  26. static CConfigUpdateHook configUpdateHook;
  27. const char* MSG_FAILED_GET_ENVIRONMENT_INFO = "Failed to get environment information.";
  28. //////////////////////////////////////////////////////////////////////
  29. // Construction/Destruction
  30. //////////////////////////////////////////////////////////////////////
  31. void CTpWrapper::getClusterMachineList(double clientVersion,
  32. const char* ClusterType,
  33. const char* ClusterPath,
  34. const char* ClusterDirectory,
  35. IArrayOf<IEspTpMachine> &MachineList,
  36. bool& hasThorSpareProcess,
  37. const char* ClusterName)
  38. {
  39. IWARNLOG("UNIMPLEMENTED: CONTAINERIZED(CTpWrapper::getClusterMachineList)");
  40. }
  41. void CTpWrapper::getTpDaliServers(double clientVersion, IArrayOf<IConstTpDali>& list)
  42. {
  43. throw MakeStringExceptionDirect(ECLWATCH_CANNOT_GET_ENV_INFO, MSG_FAILED_GET_ENVIRONMENT_INFO);
  44. }
  45. void CTpWrapper::getTpEclServers(IArrayOf<IConstTpEclServer>& list, const char* serverName)
  46. {
  47. throw MakeStringExceptionDirect(ECLWATCH_CANNOT_GET_ENV_INFO, MSG_FAILED_GET_ENVIRONMENT_INFO);
  48. }
  49. void CTpWrapper::getTpEclCCServers(IArrayOf<IConstTpEclServer>& list, const char* serverName)
  50. {
  51. throw MakeStringExceptionDirect(ECLWATCH_CANNOT_GET_ENV_INFO, MSG_FAILED_GET_ENVIRONMENT_INFO);
  52. }
  53. void CTpWrapper::getTpEclCCServers(IPropertyTree* environmentSoftware, IArrayOf<IConstTpEclServer>& list, const char* serverName)
  54. {
  55. }
  56. void CTpWrapper::getTpEclAgents(IArrayOf<IConstTpEclAgent>& list, const char* agentName)
  57. {
  58. throw MakeStringExceptionDirect(ECLWATCH_CANNOT_GET_ENV_INFO, MSG_FAILED_GET_ENVIRONMENT_INFO);
  59. }
  60. void CTpWrapper::getTpEclSchedulers(IArrayOf<IConstTpEclScheduler>& list, const char* serverName)
  61. {
  62. throw MakeStringExceptionDirect(ECLWATCH_CANNOT_GET_ENV_INFO, MSG_FAILED_GET_ENVIRONMENT_INFO);
  63. }
  64. void CTpWrapper::getTpEspServers(IArrayOf<IConstTpEspServer>& list)
  65. {
  66. throw MakeStringExceptionDirect(ECLWATCH_CANNOT_GET_ENV_INFO, MSG_FAILED_GET_ENVIRONMENT_INFO);
  67. }
  68. static IEspTpMachine * createHostTpMachine(const char * hostname, const char *path)
  69. {
  70. Owned<IEspTpMachine> machine = createTpMachine();
  71. IpAddress ipAddr;
  72. ipAddr.ipset(hostname);
  73. StringBuffer localHost;
  74. ipAddr.getIpText(localHost);
  75. machine->setName(localHost.str());
  76. machine->setNetaddress(localHost.str());
  77. machine->setConfigNetaddress(hostname);
  78. machine->setDirectory(path);
  79. machine->setOS(getPathSepChar(path) == '/' ? MachineOsLinux : MachineOsW2K);
  80. return machine.getClear();
  81. }
  82. static void gatherDropZoneMachinesFromHosts(IArrayOf<IEspTpMachine> & tpMachines, IPropertyTree & planeOrGroup, const char * prefix)
  83. {
  84. Owned<IPropertyTreeIterator> iter = planeOrGroup.getElements("hosts");
  85. ForEach(*iter)
  86. {
  87. const char * host = iter->query().queryProp(nullptr);
  88. tpMachines.append(*createHostTpMachine(host, prefix));
  89. }
  90. }
  91. static void gatherDropZoneMachines(IArrayOf<IEspTpMachine> & tpMachines, IPropertyTree & plane)
  92. {
  93. const char * prefix = plane.queryProp("@prefix");
  94. if (plane.hasProp("hosts"))
  95. {
  96. gatherDropZoneMachinesFromHosts(tpMachines, plane, prefix);
  97. }
  98. else if (plane.hasProp("@hostGroup"))
  99. {
  100. Owned<IPropertyTree> hostGroup = getHostGroup(plane.queryProp("@hostGroup"), true);
  101. gatherDropZoneMachinesFromHosts(tpMachines, *hostGroup, prefix);
  102. }
  103. else
  104. tpMachines.append(*createHostTpMachine("localhost", prefix));
  105. }
  106. void CTpWrapper::getTpDfuServers(IArrayOf<IConstTpDfuServer>& list)
  107. {
  108. Owned<IPropertyTreeIterator> dfuQueues = getComponentConfigSP()->getElements("dfuQueues");
  109. ForEach(*dfuQueues)
  110. {
  111. IPropertyTree & dfuQueue = dfuQueues->query();
  112. const char * dfuName = dfuQueue.queryProp("@name");
  113. StringBuffer queue;
  114. getDfuQueueName(queue, dfuName);
  115. Owned<IEspTpDfuServer> pService = createTpDfuServer("","");
  116. pService->setName(dfuName);
  117. pService->setDescription(dfuName);
  118. pService->setBuild("");
  119. pService->setQueue(queue);
  120. pService->setType(eqDfu);
  121. IArrayOf<IEspTpMachine> tpMachines;
  122. //MORE: The ip and directory don't make any sense on the cloud version
  123. tpMachines.append(*createHostTpMachine("localhost", "/var/lib/HPCCSystems"));
  124. pService->setTpMachines(tpMachines);
  125. list.append(*pService.getClear());
  126. }
  127. }
  128. void CTpWrapper::getTpSashaServers(IArrayOf<IConstTpSashaServer>& list)
  129. {
  130. throw MakeStringExceptionDirect(ECLWATCH_CANNOT_GET_ENV_INFO, MSG_FAILED_GET_ENVIRONMENT_INFO);
  131. }
  132. void CTpWrapper::getTpLdapServers(IArrayOf<IConstTpLdapServer>& list)
  133. {
  134. throw MakeStringExceptionDirect(ECLWATCH_CANNOT_GET_ENV_INFO, MSG_FAILED_GET_ENVIRONMENT_INFO);
  135. }
  136. void CTpWrapper::getTpFTSlaves(IArrayOf<IConstTpFTSlave>& list)
  137. {
  138. throw MakeStringExceptionDirect(ECLWATCH_CANNOT_GET_ENV_INFO, MSG_FAILED_GET_ENVIRONMENT_INFO);
  139. }
  140. void CTpWrapper::getTpDkcSlaves(IArrayOf<IConstTpDkcSlave>& list)
  141. {
  142. throw MakeStringExceptionDirect(ECLWATCH_CANNOT_GET_ENV_INFO, MSG_FAILED_GET_ENVIRONMENT_INFO);
  143. }
  144. void CTpWrapper::getTpGenesisServers(IArrayOf<IConstTpGenesisServer>& list)
  145. {
  146. throw MakeStringExceptionDirect(ECLWATCH_CANNOT_GET_ENV_INFO, MSG_FAILED_GET_ENVIRONMENT_INFO);
  147. }
  148. void CTpWrapper::getTargetClusterList(IArrayOf<IEspTpLogicalCluster>& clusters, const char* clusterType, const char* clusterName)
  149. {
  150. throw MakeStringExceptionDirect(ECLWATCH_CANNOT_GET_ENV_INFO, MSG_FAILED_GET_ENVIRONMENT_INFO);
  151. }
  152. void CTpWrapper::queryTargetClusterProcess(double version, const char* processName, const char* clusterType, IArrayOf<IConstTpCluster>& clusterList)
  153. {
  154. throw MakeStringExceptionDirect(ECLWATCH_CANNOT_GET_ENV_INFO, MSG_FAILED_GET_ENVIRONMENT_INFO);
  155. }
  156. void CTpWrapper::queryTargetClusters(double version, const char* clusterType, const char* clusterName, IArrayOf<IEspTpTargetCluster>& targetClusterList)
  157. {
  158. IWARNLOG("UNIMPLEMENTED: CONTAINERIZED(CTpWrapper::queryTargetClusters)");
  159. }
  160. void CTpWrapper::getClusterProcessList(const char* ClusterType, IArrayOf<IEspTpCluster>& clusterList, bool ignoreduplicatqueues, bool ignoreduplicategroups)
  161. {
  162. IWARNLOG("UNIMPLEMENTED: CONTAINERIZED(CTpWrapper::getClusterProcessList)");
  163. }
  164. void CTpWrapper::getHthorClusterList(IArrayOf<IEspTpCluster>& clusterList)
  165. {
  166. IWARNLOG("UNIMPLEMENTED: CONTAINERIZED(CTpWrapper::getHthorClusterList)");
  167. }
  168. void CTpWrapper::getGroupList(double espVersion, const char* kindReq, IArrayOf<IEspTpGroup> &GroupList)
  169. {
  170. try
  171. {
  172. Owned<IPropertyTreeIterator> dataPlanes = getGlobalConfigSP()->getElements("storage/planes[labels='data']");
  173. ForEach(*dataPlanes)
  174. {
  175. IPropertyTree & plane = dataPlanes->query();
  176. const char * name = plane.queryProp("@name");
  177. IEspTpGroup* pGroup = createTpGroup("","");
  178. pGroup->setName(name);
  179. if (espVersion >= 1.21)
  180. {
  181. pGroup->setKind("Plane");
  182. pGroup->setReplicateOutputs(false);
  183. }
  184. GroupList.append(*pGroup);
  185. }
  186. }
  187. catch(IException* e)
  188. {
  189. StringBuffer msg;
  190. e->errorMessage(msg);
  191. IWARNLOG("%s", msg.str());
  192. e->Release();
  193. }
  194. catch(...)
  195. {
  196. IWARNLOG("Unknown Exception caught within CTpWrapper::getGroupList");
  197. }
  198. }
  199. bool CTpWrapper::checkGroupReplicateOutputs(const char* groupName, const char* kind)
  200. {
  201. return false;
  202. }
  203. void CTpWrapper::getMachineInfo(double clientVersion, const char* name, const char* netAddress, IEspTpMachine& machineInfo)
  204. {
  205. UNIMPLEMENTED_X("CONTAINERIZED(CTpWrapper::getMachineInfo)");
  206. }
  207. void CTpWrapper::setTpMachine(IConstMachineInfo* machine, IEspTpMachine& tpMachine)
  208. {
  209. if (!machine)
  210. return;
  211. SCMStringBuffer machineName, netAddress;
  212. machine->getName(machineName);
  213. machine->getNetAddress(netAddress);
  214. tpMachine.setName(machineName.str());
  215. tpMachine.setNetaddress(netAddress.str());
  216. tpMachine.setOS(machine->getOS());
  217. switch(machine->getState())
  218. {
  219. case MachineStateAvailable:
  220. tpMachine.setAvailable("Available");
  221. break;
  222. case MachineStateUnavailable:
  223. tpMachine.setAvailable("Unavailable");
  224. break;
  225. case MachineStateUnknown:
  226. tpMachine.setAvailable("Unknown");
  227. break;
  228. }
  229. Owned<IConstDomainInfo> pDomain = machine->getDomain();
  230. if (pDomain != 0)
  231. {
  232. SCMStringBuffer sName;
  233. tpMachine.setDomain(pDomain->getName(sName).str());
  234. }
  235. }
  236. void CTpWrapper::appendThorMachineList(double clientVersion, IConstEnvironment* constEnv, INode& node, const char* clusterName,
  237. const char* machineType, unsigned& processNumber, unsigned channels, const char* directory, IArrayOf<IEspTpMachine>& machineList)
  238. {
  239. StringBuffer netAddress;
  240. node.endpoint().getIpText(netAddress);
  241. if (netAddress.length() == 0)
  242. {
  243. OWARNLOG("Net address not found for a node of %s", clusterName);
  244. return;
  245. }
  246. processNumber++;
  247. Owned<IEspTpMachine> machineInfo = createTpMachine("","");
  248. machineInfo->setType(machineType);
  249. machineInfo->setNetaddress(netAddress.str());
  250. if (!isEmptyString(directory))
  251. machineInfo->setDirectory(directory);
  252. Owned<IConstMachineInfo> pMachineInfo = constEnv->getMachineByAddress(netAddress.str());
  253. if (pMachineInfo.get())
  254. {
  255. setTpMachine(pMachineInfo, *machineInfo);
  256. if (clientVersion > 1.17)
  257. {
  258. machineInfo->setProcessNumber(processNumber);
  259. }
  260. }
  261. else
  262. {
  263. machineInfo->setName("external");
  264. machineInfo->setOS(MachineOsUnknown);
  265. }
  266. if (clientVersion >= 1.30)
  267. machineInfo->setChannels(channels);
  268. machineList.append(*machineInfo.getLink());
  269. }
  270. void CTpWrapper::getThorSlaveMachineList(double clientVersion, const char* clusterName, const char* directory, IArrayOf<IEspTpMachine>& machineList)
  271. {
  272. IWARNLOG("UNIMPLEMENTED: CONTAINERIZED(CTpWrapper::getThorSlaveMachineList)");
  273. }
  274. void CTpWrapper::getThorSpareMachineList(double clientVersion, const char* clusterName, const char* directory, IArrayOf<IEspTpMachine>& machineList)
  275. {
  276. IWARNLOG("UNIMPLEMENTED: CONTAINERIZED(CTpWrapper::getThorSpareMachineList)");
  277. }
  278. void CTpWrapper::getMachineList(double clientVersion, const char* MachineType, const char* ParentPath,
  279. const char* Status, const char* Directory, IArrayOf<IEspTpMachine>& MachineList, set<string>* pMachineNames/*=NULL*/)
  280. {
  281. IWARNLOG("UNIMPLEMENTED: CONTAINERIZED(CTpWrapper::getMachineList)");
  282. }
  283. const char* CTpWrapper::getNodeNameTag(const char* MachineType)
  284. {
  285. if (strcmp(MachineType,"Computer")==0)
  286. return "@name";
  287. else
  288. return "@computer";
  289. }
  290. void CTpWrapper::getDropZoneMachineList(double clientVersion, bool ECLWatchVisibleOnly, IArrayOf<IEspTpMachine> &MachineList)
  291. {
  292. try
  293. {
  294. IArrayOf<IConstTpDropZone> list;
  295. getTpDropZones(clientVersion, nullptr, ECLWatchVisibleOnly, list);
  296. ForEachItemIn(i, list)
  297. {
  298. IConstTpDropZone& dropZone = list.item(i);
  299. IArrayOf<IConstTpMachine>& tpMachines = dropZone.getTpMachines();
  300. ForEachItemIn(ii, tpMachines)
  301. {
  302. IConstTpMachine& tpMachine = tpMachines.item(ii);
  303. Owned<IEspTpMachine> machine = createTpMachine();
  304. machine->copy(tpMachine);
  305. MachineList.append(*machine.getLink());
  306. }
  307. }
  308. }
  309. catch(IException* e)
  310. {
  311. EXCLOG(e);
  312. e->Release();
  313. }
  314. catch(...)
  315. {
  316. IWARNLOG("Unknown Exception caught within CTpWrapper::getDropZoneMachineList");
  317. }
  318. }
  319. //For a given dropzone or every dropzones (check ECLWatchVisible if needed), read: "@name",
  320. // "@description", "@build", "@directory", "@ECLWatchVisible" into an IEspTpDropZone object.
  321. //For each ServerList, read "@name" and "@server" (hostname or IP) into an IEspTpMachine object.
  322. //Add the IEspTpMachine object into the IEspTpDropZone.
  323. void CTpWrapper::getTpDropZones(double clientVersion, const char* name, bool ECLWatchVisibleOnly, IArrayOf<IConstTpDropZone>& list)
  324. {
  325. Owned<IPropertyTreeIterator> planes = getDropZonePlanesIterator(name);
  326. ForEach(*planes)
  327. {
  328. IPropertyTree & plane = planes->query();
  329. const char * dropzonename = plane.queryProp("@name");
  330. const char * path = plane.queryProp("@prefix");
  331. Owned<IEspTpDropZone> dropZone = createTpDropZone();
  332. dropZone->setName(dropzonename);
  333. dropZone->setDescription("");
  334. dropZone->setPath(path);
  335. dropZone->setBuild("");
  336. dropZone->setECLWatchVisible(true);
  337. IArrayOf<IEspTpMachine> tpMachines;
  338. gatherDropZoneMachines(tpMachines, plane);
  339. dropZone->setTpMachines(tpMachines);
  340. list.append(*dropZone.getClear());
  341. }
  342. }
  343. void CTpWrapper::getTpSparkThors(double clientVersion, const char* name, IArrayOf<IConstTpSparkThor>& list)
  344. {
  345. UNIMPLEMENTED_X("CONTAINERIZED(CTpWrapper::getTpSparkThors)");
  346. }
  347. void CTpWrapper::appendTpMachine(double clientVersion, IConstEnvironment* constEnv, IConstInstanceInfo& instanceInfo, IArrayOf<IConstTpMachine>& machines)
  348. {
  349. SCMStringBuffer name, networkAddress, description, directory;
  350. Owned<IConstMachineInfo> machineInfo = instanceInfo.getMachine();
  351. machineInfo->getName(name);
  352. machineInfo->getNetAddress(networkAddress);
  353. instanceInfo.getDirectory(directory);
  354. Owned<IEspTpMachine> machine = createTpMachine();
  355. machine->setName(name.str());
  356. if (networkAddress.length() > 0)
  357. {
  358. IpAddress ipAddr;
  359. ipAddr.ipset(networkAddress.str());
  360. StringBuffer networkAddressStr;
  361. ipAddr.getIpText(networkAddressStr);
  362. machine->setNetaddress(networkAddressStr);
  363. }
  364. machine->setPort(instanceInfo.getPort());
  365. machine->setOS(machineInfo->getOS());
  366. machine->setDirectory(directory.str());
  367. machine->setType(eqSparkThorProcess);
  368. machines.append(*machine.getLink());
  369. }
  370. IEspTpMachine* CTpWrapper::createTpMachineEx(const char* name, const char* type, IConstMachineInfo* machineInfo)
  371. {
  372. if (!machineInfo)
  373. return nullptr;
  374. Owned<IEspTpMachine> machine = createTpMachine();
  375. machine->setName(name);
  376. machine->setType(type);
  377. machine->setOS(machineInfo->getOS());
  378. Owned<IConstDomainInfo> domain = machineInfo->getDomain();
  379. if (domain)
  380. {
  381. SCMStringBuffer sName;
  382. machine->setDomain(domain->getName(sName).str());
  383. }
  384. SCMStringBuffer netAddr;
  385. machineInfo->getNetAddress(netAddr);
  386. if (netAddr.length() > 0)
  387. {
  388. StringBuffer networkAddress;
  389. IpAddress ipAddr;
  390. ipAddr.ipset(netAddr.str());
  391. ipAddr.getIpText(networkAddress);
  392. machine->setNetaddress(networkAddress.str());
  393. }
  394. switch(machineInfo->getState())
  395. {
  396. case MachineStateAvailable:
  397. machine->setAvailable("Available");
  398. break;
  399. case MachineStateUnavailable:
  400. machine->setAvailable("Unavailable");
  401. break;
  402. default:
  403. machine->setAvailable("Unknown");
  404. break;
  405. }
  406. return machine.getClear();
  407. }
  408. void CTpWrapper::setAttPath(StringBuffer& Path,const char* PathToAppend,const char* AttName,const char* AttValue,StringBuffer& returnStr)
  409. {
  410. Path.append("/");
  411. Path.append(PathToAppend);
  412. Path.append("[@");
  413. Path.append(AttName);
  414. Path.append("=\"");
  415. Path.append(AttValue);
  416. Path.append("\"]");
  417. StringBuffer rawPath;
  418. const void* buff = (void*)Path.str();
  419. JBASE64_Encode(buff,Path.length(),rawPath, false);
  420. returnStr.append(rawPath.str());
  421. }
  422. void CTpWrapper::getAttPath(const char* Path,StringBuffer& returnStr)
  423. {
  424. StringBuffer decodedStr;
  425. JBASE64_Decode(Path, returnStr);
  426. }
  427. void CTpWrapper::getServices(double version, const char* serviceType, const char* serviceName, IArrayOf<IConstHPCCService>& services)
  428. {
  429. Owned<IPropertyTreeIterator> itr = getGlobalConfigSP()->getElements("services");
  430. ForEach(*itr)
  431. {
  432. IPropertyTree& service = itr->query();
  433. //Only show the public services for now
  434. if (!service.getPropBool("@public"))
  435. continue;
  436. const char* type = service.queryProp("@type");
  437. if (isEmptyString(type) || (!isEmptyString(serviceType) && !strieq(serviceType, type)))
  438. continue;
  439. const char* name = service.queryProp("@name");
  440. if (isEmptyString(name) || (!isEmptyString(serviceName) && !strieq(serviceName, name)))
  441. continue;
  442. Owned<IEspHPCCService> svc = createHPCCService();
  443. svc->setName(name);
  444. svc->setType(type);
  445. svc->setPort(service.getPropInt("@port"));
  446. if (service.getPropBool("@tls"))
  447. svc->setTLSSecure(true);
  448. services.append(*svc.getLink());
  449. if (!isEmptyString(serviceName))
  450. break;
  451. }
  452. }
  453. class CContainerWUClusterInfo : public CSimpleInterfaceOf<IConstWUClusterInfo>
  454. {
  455. StringAttr name;
  456. StringAttr serverQueue;
  457. StringAttr agentQueue;
  458. StringAttr thorQueue;
  459. ClusterType platform;
  460. unsigned clusterWidth;
  461. StringArray thorProcesses;
  462. public:
  463. CContainerWUClusterInfo(const char* _name, const char* type, unsigned _clusterWidth)
  464. : name(_name), clusterWidth(_clusterWidth)
  465. {
  466. StringBuffer queue;
  467. if (strieq(type, "thor"))
  468. {
  469. thorQueue.set(getClusterThorQueueName(queue.clear(), name));
  470. platform = ThorLCRCluster;
  471. thorProcesses.append(name);
  472. }
  473. else if (strieq(type, "roxie"))
  474. {
  475. agentQueue.set(getClusterEclAgentQueueName(queue.clear(), name));
  476. platform = RoxieCluster;
  477. }
  478. else
  479. {
  480. agentQueue.set(getClusterEclAgentQueueName(queue.clear(), name));
  481. platform = HThorCluster;
  482. }
  483. serverQueue.set(getClusterEclCCServerQueueName(queue.clear(), name));
  484. }
  485. virtual IStringVal& getName(IStringVal& str) const override
  486. {
  487. str.set(name.get());
  488. return str;
  489. }
  490. virtual IStringVal& getAgentQueue(IStringVal& str) const override
  491. {
  492. str.set(agentQueue);
  493. return str;
  494. }
  495. virtual IStringVal& getServerQueue(IStringVal& str) const override
  496. {
  497. str.set(serverQueue);
  498. return str;
  499. }
  500. virtual IStringVal& getThorQueue(IStringVal& str) const override
  501. {
  502. str.set(thorQueue);
  503. return str;
  504. }
  505. virtual ClusterType getPlatform() const override
  506. {
  507. return platform;
  508. }
  509. virtual unsigned getSize() const override
  510. {
  511. return clusterWidth;
  512. }
  513. virtual bool isLegacyEclServer() const override
  514. {
  515. return false;
  516. }
  517. virtual IStringVal& getScope(IStringVal& str) const override
  518. {
  519. UNIMPLEMENTED;
  520. }
  521. virtual unsigned getNumberOfSlaveLogs() const override
  522. {
  523. UNIMPLEMENTED;
  524. }
  525. virtual IStringVal & getAgentName(IStringVal & str) const override
  526. {
  527. UNIMPLEMENTED;
  528. }
  529. virtual IStringVal & getECLSchedulerName(IStringVal & str) const override
  530. {
  531. UNIMPLEMENTED;
  532. }
  533. virtual const StringArray & getECLServerNames() const override
  534. {
  535. UNIMPLEMENTED;
  536. }
  537. virtual IStringVal & getRoxieProcess(IStringVal & str) const override
  538. {
  539. str.set(name.get());
  540. return str;
  541. }
  542. virtual const StringArray & getThorProcesses() const override
  543. {
  544. return thorProcesses;
  545. }
  546. virtual const StringArray & getPrimaryThorProcesses() const override
  547. {
  548. UNIMPLEMENTED;
  549. }
  550. virtual const SocketEndpointArray & getRoxieServers() const override
  551. {
  552. UNIMPLEMENTED;
  553. }
  554. virtual const char *getLdapUser() const override
  555. {
  556. UNIMPLEMENTED;
  557. }
  558. virtual const char *getLdapPassword() const override
  559. {
  560. UNIMPLEMENTED;
  561. }
  562. virtual unsigned getRoxieRedundancy() const override
  563. {
  564. return 1;
  565. }
  566. virtual unsigned getChannelsPerNode() const override
  567. {
  568. return 1;
  569. }
  570. virtual int getRoxieReplicateOffset() const override
  571. {
  572. return 0;
  573. }
  574. virtual const char *getAlias() const override
  575. {
  576. UNIMPLEMENTED;
  577. }
  578. };
  579. extern TPWRAPPER_API unsigned getContainerWUClusterInfo(CConstWUClusterInfoArray& clusters)
  580. {
  581. Owned<IPropertyTreeIterator> queues = getComponentConfigSP()->getElements("queues");
  582. ForEach(*queues)
  583. {
  584. IPropertyTree& queue = queues->query();
  585. Owned<IConstWUClusterInfo> cluster = new CContainerWUClusterInfo(queue.queryProp("@name"),
  586. queue.queryProp("@type"), (unsigned) queue.getPropInt("@width", 1));
  587. clusters.append(*cluster.getClear());
  588. }
  589. return clusters.ordinality();
  590. }
  591. extern TPWRAPPER_API unsigned getWUClusterInfo(CConstWUClusterInfoArray& clusters)
  592. {
  593. return getContainerWUClusterInfo(clusters);
  594. }
  595. static IPropertyTree * getContainerClusterConfig(const char * clusterName)
  596. {
  597. VStringBuffer xpath("queues[@name='%s']", clusterName);
  598. return getComponentConfigSP()->getPropTree(xpath);
  599. }
  600. extern TPWRAPPER_API IConstWUClusterInfo* getWUClusterInfoByName(const char* clusterName)
  601. {
  602. Owned<IPropertyTree> queue = getContainerClusterConfig(clusterName);
  603. if (!queue)
  604. return nullptr;
  605. return new CContainerWUClusterInfo(queue->queryProp("@name"), queue->queryProp("@type"),
  606. (unsigned) queue->getPropInt("@width", 1));
  607. }
  608. extern TPWRAPPER_API void initContainerRoxieTargets(MapStringToMyClass<ISmartSocketFactory>& connMap)
  609. {
  610. Owned<IPropertyTreeIterator> services = getGlobalConfigSP()->getElements("services[@type='roxie']");
  611. ForEach(*services)
  612. {
  613. IPropertyTree& service = services->query();
  614. const char* name = service.queryProp("@name");
  615. const char* target = service.queryProp("@target");
  616. const char* port = service.queryProp("@port");
  617. if (isEmptyString(target) || isEmptyString(name)) //bad config?
  618. continue;
  619. StringBuffer s;
  620. s.append(name).append(':').append(port ? port : "9876");
  621. Owned<ISmartSocketFactory> sf = new CSmartSocketFactory(s.str(), false, 60, (unsigned) -1);
  622. connMap.setValue(target, sf.get());
  623. }
  624. }
  625. extern TPWRAPPER_API unsigned getThorClusterNames(StringArray& targetNames, StringArray& queueNames)
  626. {
  627. Owned<IStringIterator> targets = getContainerTargetClusters("thor", nullptr);
  628. ForEach(*targets)
  629. {
  630. SCMStringBuffer target;
  631. targets->str(target);
  632. targetNames.append(target.str());
  633. StringBuffer qName;
  634. queueNames.append(getClusterThorQueueName(qName, target.str()));
  635. }
  636. return targetNames.ordinality();
  637. }
  638. static std::set<std::string> validTargets;
  639. static CriticalSection validTargetSect;
  640. static bool targetsDirty = true;
  641. static void refreshValidTargets()
  642. {
  643. validTargets.clear();
  644. Owned<IStringIterator> it = getContainerTargetClusters(nullptr, nullptr);
  645. ForEach(*it)
  646. {
  647. SCMStringBuffer s;
  648. IStringVal& val = it->str(s);
  649. if (validTargets.find(val.str()) == validTargets.end())
  650. {
  651. validTargets.insert(val.str());
  652. PROGLOG("adding valid target: %s", val.str());
  653. }
  654. }
  655. }
  656. static void configUpdate(const IPropertyTree *oldComponentConfiguration, const IPropertyTree *oldGlobalConfiguration)
  657. {
  658. CriticalBlock block(validTargetSect);
  659. // as much as effort [small] to check if different as to refresh
  660. refreshValidTargets();
  661. PROGLOG("Valid targets updated");
  662. }
  663. extern TPWRAPPER_API void validateTargetName(const char* target)
  664. {
  665. if (isEmptyString(target))
  666. throw makeStringException(ECLWATCH_INVALID_CLUSTER_NAME, "Empty target name.");
  667. CriticalBlock block(validTargetSect);
  668. configUpdateHook.installOnce(configUpdate, true);
  669. if (validTargets.find(target) == validTargets.end())
  670. throw makeStringExceptionV(ECLWATCH_INVALID_CLUSTER_NAME, "Invalid target name: %s", target);
  671. }
  672. // NB: bare-metal has a different implementation in TpWrapper.cpp
  673. bool getSashaService(StringBuffer &serviceAddress, const char *serviceName, bool failIfNotFound)
  674. {
  675. return getService(serviceAddress, serviceName, failIfNotFound);
  676. }
  677. bool getSashaServiceEP(SocketEndpoint &serviceEndpoint, const char *service, bool failIfNotFound)
  678. {
  679. StringBuffer serviceAddress;
  680. if (!getSashaService(serviceAddress, service, failIfNotFound))
  681. return false;
  682. serviceEndpoint.set(serviceAddress);
  683. return true;
  684. }
  685. StringBuffer & getRoxieDefaultPlane(StringBuffer & plane, const char * roxieName)
  686. {
  687. Owned<IPropertyTree> queue = getContainerClusterConfig(roxieName);
  688. if (!queue)
  689. throw makeStringExceptionV(ECLWATCH_INVALID_CLUSTER_NAME, "Unknown queue name %s", roxieName);
  690. if (queue->getProp("@storagePlane", plane))
  691. return plane;
  692. //Find the first data plane - better if it was retrieved from roxie config
  693. Owned<IPropertyTreeIterator> dataPlanes = getGlobalConfigSP()->getElements("storage/planes[labels='data']");
  694. if (!dataPlanes->first())
  695. throwUnexpectedX("No default data plane defined");
  696. return plane.append(dataPlanes->query().queryProp("@name"));
  697. }