123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795 |
- /*##############################################################################
- HPCC SYSTEMS software Copyright (C) 2021 HPCC Systems®.
- Licensed under the Apache License, Version 2.0 (the "License");
- you may not use this file except in compliance with the License.
- You may obtain a copy of the License at
- http://www.apache.org/licenses/LICENSE-2.0
- Unless required by applicable law or agreed to in writing, software
- distributed under the License is distributed on an "AS IS" BASIS,
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- See the License for the specific language governing permissions and
- limitations under the License.
- ############################################################################## */
- #pragma warning (disable : 4786)
- // TpWrapper.cpp: implementation of the CTpWrapper class.
- //
- //////////////////////////////////////////////////////////////////////
- #include "TpWrapper.hpp"
- #include <stdio.h>
- #include "workunit.hpp"
- #include "exception_util.hpp"
- #include "portlist.h"
- #include "daqueue.hpp"
- #include "dautils.hpp"
- #include "dameta.hpp"
- #include "hpccconfig.hpp"
- static CConfigUpdateHook configUpdateHook;
- const char* MSG_FAILED_GET_ENVIRONMENT_INFO = "Failed to get environment information.";
- //////////////////////////////////////////////////////////////////////
- // Construction/Destruction
- //////////////////////////////////////////////////////////////////////
- void CTpWrapper::getClusterMachineList(double clientVersion,
- const char* ClusterType,
- const char* ClusterPath,
- const char* ClusterDirectory,
- IArrayOf<IEspTpMachine> &MachineList,
- bool& hasThorSpareProcess,
- const char* ClusterName)
- {
- IWARNLOG("UNIMPLEMENTED: CONTAINERIZED(CTpWrapper::getClusterMachineList)");
- }
- void CTpWrapper::getTpDaliServers(double clientVersion, IArrayOf<IConstTpDali>& list)
- {
- throw MakeStringExceptionDirect(ECLWATCH_CANNOT_GET_ENV_INFO, MSG_FAILED_GET_ENVIRONMENT_INFO);
- }
- void CTpWrapper::getTpEclServers(IArrayOf<IConstTpEclServer>& list, const char* serverName)
- {
- throw MakeStringExceptionDirect(ECLWATCH_CANNOT_GET_ENV_INFO, MSG_FAILED_GET_ENVIRONMENT_INFO);
- }
- void CTpWrapper::getTpEclCCServers(IArrayOf<IConstTpEclServer>& list, const char* serverName)
- {
- throw MakeStringExceptionDirect(ECLWATCH_CANNOT_GET_ENV_INFO, MSG_FAILED_GET_ENVIRONMENT_INFO);
- }
- void CTpWrapper::getTpEclCCServers(IPropertyTree* environmentSoftware, IArrayOf<IConstTpEclServer>& list, const char* serverName)
- {
- }
- void CTpWrapper::getTpEclAgents(IArrayOf<IConstTpEclAgent>& list, const char* agentName)
- {
- throw MakeStringExceptionDirect(ECLWATCH_CANNOT_GET_ENV_INFO, MSG_FAILED_GET_ENVIRONMENT_INFO);
- }
- void CTpWrapper::getTpEclSchedulers(IArrayOf<IConstTpEclScheduler>& list, const char* serverName)
- {
- throw MakeStringExceptionDirect(ECLWATCH_CANNOT_GET_ENV_INFO, MSG_FAILED_GET_ENVIRONMENT_INFO);
- }
- void CTpWrapper::getTpEspServers(IArrayOf<IConstTpEspServer>& list)
- {
- throw MakeStringExceptionDirect(ECLWATCH_CANNOT_GET_ENV_INFO, MSG_FAILED_GET_ENVIRONMENT_INFO);
- }
- static IEspTpMachine * createHostTpMachine(const char * hostname, const char *path)
- {
- Owned<IEspTpMachine> machine = createTpMachine();
- IpAddress ipAddr;
- ipAddr.ipset(hostname);
- StringBuffer localHost;
- ipAddr.getIpText(localHost);
- machine->setName(localHost.str());
- machine->setNetaddress(localHost.str());
- machine->setConfigNetaddress(hostname);
- machine->setDirectory(path);
- machine->setOS(getPathSepChar(path) == '/' ? MachineOsLinux : MachineOsW2K);
- return machine.getClear();
- }
- static void gatherDropZoneMachinesFromHosts(IArrayOf<IEspTpMachine> & tpMachines, IPropertyTree & planeOrGroup, const char * prefix)
- {
- Owned<IPropertyTreeIterator> iter = planeOrGroup.getElements("hosts");
- ForEach(*iter)
- {
- const char * host = iter->query().queryProp(nullptr);
- tpMachines.append(*createHostTpMachine(host, prefix));
- }
- }
- static void gatherDropZoneMachines(IArrayOf<IEspTpMachine> & tpMachines, IPropertyTree & plane)
- {
- const char * prefix = plane.queryProp("@prefix");
- if (plane.hasProp("hosts"))
- {
- gatherDropZoneMachinesFromHosts(tpMachines, plane, prefix);
- }
- else if (plane.hasProp("@hostGroup"))
- {
- Owned<IPropertyTree> hostGroup = getHostGroup(plane.queryProp("@hostGroup"), true);
- gatherDropZoneMachinesFromHosts(tpMachines, *hostGroup, prefix);
- }
- else
- tpMachines.append(*createHostTpMachine("localhost", prefix));
- }
- void CTpWrapper::getTpDfuServers(IArrayOf<IConstTpDfuServer>& list)
- {
- Owned<IPropertyTreeIterator> dfuQueues = getComponentConfigSP()->getElements("dfuQueues");
- ForEach(*dfuQueues)
- {
- IPropertyTree & dfuQueue = dfuQueues->query();
- const char * dfuName = dfuQueue.queryProp("@name");
- StringBuffer queue;
- getDfuQueueName(queue, dfuName);
- Owned<IEspTpDfuServer> pService = createTpDfuServer("","");
- pService->setName(dfuName);
- pService->setDescription(dfuName);
- pService->setBuild("");
- pService->setQueue(queue);
- pService->setType(eqDfu);
- IArrayOf<IEspTpMachine> tpMachines;
- //MORE: The ip and directory don't make any sense on the cloud version
- tpMachines.append(*createHostTpMachine("localhost", "/var/lib/HPCCSystems"));
- pService->setTpMachines(tpMachines);
- list.append(*pService.getClear());
- }
- }
- void CTpWrapper::getTpSashaServers(IArrayOf<IConstTpSashaServer>& list)
- {
- throw MakeStringExceptionDirect(ECLWATCH_CANNOT_GET_ENV_INFO, MSG_FAILED_GET_ENVIRONMENT_INFO);
- }
- void CTpWrapper::getTpLdapServers(IArrayOf<IConstTpLdapServer>& list)
- {
- throw MakeStringExceptionDirect(ECLWATCH_CANNOT_GET_ENV_INFO, MSG_FAILED_GET_ENVIRONMENT_INFO);
- }
- void CTpWrapper::getTpFTSlaves(IArrayOf<IConstTpFTSlave>& list)
- {
- throw MakeStringExceptionDirect(ECLWATCH_CANNOT_GET_ENV_INFO, MSG_FAILED_GET_ENVIRONMENT_INFO);
- }
- void CTpWrapper::getTpDkcSlaves(IArrayOf<IConstTpDkcSlave>& list)
- {
- throw MakeStringExceptionDirect(ECLWATCH_CANNOT_GET_ENV_INFO, MSG_FAILED_GET_ENVIRONMENT_INFO);
- }
- void CTpWrapper::getTpGenesisServers(IArrayOf<IConstTpGenesisServer>& list)
- {
- throw MakeStringExceptionDirect(ECLWATCH_CANNOT_GET_ENV_INFO, MSG_FAILED_GET_ENVIRONMENT_INFO);
- }
- void CTpWrapper::getTargetClusterList(IArrayOf<IEspTpLogicalCluster>& clusters, const char* clusterType, const char* clusterName)
- {
- throw MakeStringExceptionDirect(ECLWATCH_CANNOT_GET_ENV_INFO, MSG_FAILED_GET_ENVIRONMENT_INFO);
- }
- void CTpWrapper::queryTargetClusterProcess(double version, const char* processName, const char* clusterType, IArrayOf<IConstTpCluster>& clusterList)
- {
- throw MakeStringExceptionDirect(ECLWATCH_CANNOT_GET_ENV_INFO, MSG_FAILED_GET_ENVIRONMENT_INFO);
- }
- void CTpWrapper::queryTargetClusters(double version, const char* clusterType, const char* clusterName, IArrayOf<IEspTpTargetCluster>& targetClusterList)
- {
- IWARNLOG("UNIMPLEMENTED: CONTAINERIZED(CTpWrapper::queryTargetClusters)");
- }
- void CTpWrapper::getClusterProcessList(const char* ClusterType, IArrayOf<IEspTpCluster>& clusterList, bool ignoreduplicatqueues, bool ignoreduplicategroups)
- {
- IWARNLOG("UNIMPLEMENTED: CONTAINERIZED(CTpWrapper::getClusterProcessList)");
- }
- void CTpWrapper::getHthorClusterList(IArrayOf<IEspTpCluster>& clusterList)
- {
- IWARNLOG("UNIMPLEMENTED: CONTAINERIZED(CTpWrapper::getHthorClusterList)");
- }
- void CTpWrapper::getGroupList(double espVersion, const char* kindReq, IArrayOf<IEspTpGroup> &GroupList)
- {
- try
- {
- Owned<IPropertyTreeIterator> dataPlanes = getGlobalConfigSP()->getElements("storage/planes[labels='data']");
- ForEach(*dataPlanes)
- {
- IPropertyTree & plane = dataPlanes->query();
- const char * name = plane.queryProp("@name");
- IEspTpGroup* pGroup = createTpGroup("","");
- pGroup->setName(name);
- if (espVersion >= 1.21)
- {
- pGroup->setKind("Plane");
- pGroup->setReplicateOutputs(false);
- }
- GroupList.append(*pGroup);
- }
- }
- catch(IException* e)
- {
- StringBuffer msg;
- e->errorMessage(msg);
- IWARNLOG("%s", msg.str());
- e->Release();
- }
- catch(...)
- {
- IWARNLOG("Unknown Exception caught within CTpWrapper::getGroupList");
- }
- }
- bool CTpWrapper::checkGroupReplicateOutputs(const char* groupName, const char* kind)
- {
- return false;
- }
- void CTpWrapper::getMachineInfo(double clientVersion, const char* name, const char* netAddress, IEspTpMachine& machineInfo)
- {
- UNIMPLEMENTED_X("CONTAINERIZED(CTpWrapper::getMachineInfo)");
- }
- void CTpWrapper::setTpMachine(IConstMachineInfo* machine, IEspTpMachine& tpMachine)
- {
- if (!machine)
- return;
- SCMStringBuffer machineName, netAddress;
- machine->getName(machineName);
- machine->getNetAddress(netAddress);
- tpMachine.setName(machineName.str());
- tpMachine.setNetaddress(netAddress.str());
- tpMachine.setOS(machine->getOS());
- switch(machine->getState())
- {
- case MachineStateAvailable:
- tpMachine.setAvailable("Available");
- break;
- case MachineStateUnavailable:
- tpMachine.setAvailable("Unavailable");
- break;
- case MachineStateUnknown:
- tpMachine.setAvailable("Unknown");
- break;
- }
- Owned<IConstDomainInfo> pDomain = machine->getDomain();
- if (pDomain != 0)
- {
- SCMStringBuffer sName;
- tpMachine.setDomain(pDomain->getName(sName).str());
- }
- }
- void CTpWrapper::appendThorMachineList(double clientVersion, IConstEnvironment* constEnv, INode& node, const char* clusterName,
- const char* machineType, unsigned& processNumber, unsigned channels, const char* directory, IArrayOf<IEspTpMachine>& machineList)
- {
- StringBuffer netAddress;
- node.endpoint().getIpText(netAddress);
- if (netAddress.length() == 0)
- {
- OWARNLOG("Net address not found for a node of %s", clusterName);
- return;
- }
- processNumber++;
- Owned<IEspTpMachine> machineInfo = createTpMachine("","");
- machineInfo->setType(machineType);
- machineInfo->setNetaddress(netAddress.str());
- if (!isEmptyString(directory))
- machineInfo->setDirectory(directory);
- Owned<IConstMachineInfo> pMachineInfo = constEnv->getMachineByAddress(netAddress.str());
- if (pMachineInfo.get())
- {
- setTpMachine(pMachineInfo, *machineInfo);
- if (clientVersion > 1.17)
- {
- machineInfo->setProcessNumber(processNumber);
- }
- }
- else
- {
- machineInfo->setName("external");
- machineInfo->setOS(MachineOsUnknown);
- }
- if (clientVersion >= 1.30)
- machineInfo->setChannels(channels);
- machineList.append(*machineInfo.getLink());
- }
- void CTpWrapper::getThorSlaveMachineList(double clientVersion, const char* clusterName, const char* directory, IArrayOf<IEspTpMachine>& machineList)
- {
- IWARNLOG("UNIMPLEMENTED: CONTAINERIZED(CTpWrapper::getThorSlaveMachineList)");
- }
- void CTpWrapper::getThorSpareMachineList(double clientVersion, const char* clusterName, const char* directory, IArrayOf<IEspTpMachine>& machineList)
- {
- IWARNLOG("UNIMPLEMENTED: CONTAINERIZED(CTpWrapper::getThorSpareMachineList)");
- }
- void CTpWrapper::getMachineList(double clientVersion, const char* MachineType, const char* ParentPath,
- const char* Status, const char* Directory, IArrayOf<IEspTpMachine>& MachineList, set<string>* pMachineNames/*=NULL*/)
- {
- IWARNLOG("UNIMPLEMENTED: CONTAINERIZED(CTpWrapper::getMachineList)");
- }
- const char* CTpWrapper::getNodeNameTag(const char* MachineType)
- {
- if (strcmp(MachineType,"Computer")==0)
- return "@name";
- else
- return "@computer";
- }
- void CTpWrapper::getDropZoneMachineList(double clientVersion, bool ECLWatchVisibleOnly, IArrayOf<IEspTpMachine> &MachineList)
- {
- try
- {
- IArrayOf<IConstTpDropZone> list;
- getTpDropZones(clientVersion, nullptr, ECLWatchVisibleOnly, list);
- ForEachItemIn(i, list)
- {
- IConstTpDropZone& dropZone = list.item(i);
- IArrayOf<IConstTpMachine>& tpMachines = dropZone.getTpMachines();
- ForEachItemIn(ii, tpMachines)
- {
- IConstTpMachine& tpMachine = tpMachines.item(ii);
- Owned<IEspTpMachine> machine = createTpMachine();
- machine->copy(tpMachine);
- MachineList.append(*machine.getLink());
- }
- }
- }
- catch(IException* e)
- {
- EXCLOG(e);
- e->Release();
- }
- catch(...)
- {
- IWARNLOG("Unknown Exception caught within CTpWrapper::getDropZoneMachineList");
- }
- }
- //For a given dropzone or every dropzones (check ECLWatchVisible if needed), read: "@name",
- // "@description", "@build", "@directory", "@ECLWatchVisible" into an IEspTpDropZone object.
- //For each ServerList, read "@name" and "@server" (hostname or IP) into an IEspTpMachine object.
- //Add the IEspTpMachine object into the IEspTpDropZone.
- void CTpWrapper::getTpDropZones(double clientVersion, const char* name, bool ECLWatchVisibleOnly, IArrayOf<IConstTpDropZone>& list)
- {
- Owned<IPropertyTreeIterator> planes = getDropZonePlanesIterator(name);
- ForEach(*planes)
- {
- IPropertyTree & plane = planes->query();
- const char * dropzonename = plane.queryProp("@name");
- const char * path = plane.queryProp("@prefix");
- Owned<IEspTpDropZone> dropZone = createTpDropZone();
- dropZone->setName(dropzonename);
- dropZone->setDescription("");
- dropZone->setPath(path);
- dropZone->setBuild("");
- dropZone->setECLWatchVisible(true);
- IArrayOf<IEspTpMachine> tpMachines;
- gatherDropZoneMachines(tpMachines, plane);
- dropZone->setTpMachines(tpMachines);
- list.append(*dropZone.getClear());
- }
- }
- void CTpWrapper::getTpSparkThors(double clientVersion, const char* name, IArrayOf<IConstTpSparkThor>& list)
- {
- UNIMPLEMENTED_X("CONTAINERIZED(CTpWrapper::getTpSparkThors)");
- }
- void CTpWrapper::appendTpMachine(double clientVersion, IConstEnvironment* constEnv, IConstInstanceInfo& instanceInfo, IArrayOf<IConstTpMachine>& machines)
- {
- SCMStringBuffer name, networkAddress, description, directory;
- Owned<IConstMachineInfo> machineInfo = instanceInfo.getMachine();
- machineInfo->getName(name);
- machineInfo->getNetAddress(networkAddress);
- instanceInfo.getDirectory(directory);
- Owned<IEspTpMachine> machine = createTpMachine();
- machine->setName(name.str());
- if (networkAddress.length() > 0)
- {
- IpAddress ipAddr;
- ipAddr.ipset(networkAddress.str());
- StringBuffer networkAddressStr;
- ipAddr.getIpText(networkAddressStr);
- machine->setNetaddress(networkAddressStr);
- }
- machine->setPort(instanceInfo.getPort());
- machine->setOS(machineInfo->getOS());
- machine->setDirectory(directory.str());
- machine->setType(eqSparkThorProcess);
- machines.append(*machine.getLink());
- }
- IEspTpMachine* CTpWrapper::createTpMachineEx(const char* name, const char* type, IConstMachineInfo* machineInfo)
- {
- if (!machineInfo)
- return nullptr;
- Owned<IEspTpMachine> machine = createTpMachine();
- machine->setName(name);
- machine->setType(type);
- machine->setOS(machineInfo->getOS());
- Owned<IConstDomainInfo> domain = machineInfo->getDomain();
- if (domain)
- {
- SCMStringBuffer sName;
- machine->setDomain(domain->getName(sName).str());
- }
- SCMStringBuffer netAddr;
- machineInfo->getNetAddress(netAddr);
- if (netAddr.length() > 0)
- {
- StringBuffer networkAddress;
- IpAddress ipAddr;
- ipAddr.ipset(netAddr.str());
- ipAddr.getIpText(networkAddress);
- machine->setNetaddress(networkAddress.str());
- }
- switch(machineInfo->getState())
- {
- case MachineStateAvailable:
- machine->setAvailable("Available");
- break;
- case MachineStateUnavailable:
- machine->setAvailable("Unavailable");
- break;
- default:
- machine->setAvailable("Unknown");
- break;
- }
- return machine.getClear();
- }
- void CTpWrapper::setAttPath(StringBuffer& Path,const char* PathToAppend,const char* AttName,const char* AttValue,StringBuffer& returnStr)
- {
- Path.append("/");
- Path.append(PathToAppend);
- Path.append("[@");
- Path.append(AttName);
- Path.append("=\"");
- Path.append(AttValue);
- Path.append("\"]");
- StringBuffer rawPath;
- const void* buff = (void*)Path.str();
- JBASE64_Encode(buff,Path.length(),rawPath, false);
- returnStr.append(rawPath.str());
- }
- void CTpWrapper::getAttPath(const char* Path,StringBuffer& returnStr)
- {
- StringBuffer decodedStr;
- JBASE64_Decode(Path, returnStr);
- }
- void CTpWrapper::getServices(double version, const char* serviceType, const char* serviceName, IArrayOf<IConstHPCCService>& services)
- {
- Owned<IPropertyTreeIterator> itr = getGlobalConfigSP()->getElements("services");
- ForEach(*itr)
- {
- IPropertyTree& service = itr->query();
- //Only show the public services for now
- if (!service.getPropBool("@public"))
- continue;
- const char* type = service.queryProp("@type");
- if (isEmptyString(type) || (!isEmptyString(serviceType) && !strieq(serviceType, type)))
- continue;
- const char* name = service.queryProp("@name");
- if (isEmptyString(name) || (!isEmptyString(serviceName) && !strieq(serviceName, name)))
- continue;
- Owned<IEspHPCCService> svc = createHPCCService();
- svc->setName(name);
- svc->setType(type);
- svc->setPort(service.getPropInt("@port"));
- if (service.getPropBool("@tls"))
- svc->setTLSSecure(true);
- services.append(*svc.getLink());
- if (!isEmptyString(serviceName))
- break;
- }
- }
- class CContainerWUClusterInfo : public CSimpleInterfaceOf<IConstWUClusterInfo>
- {
- StringAttr name;
- StringAttr serverQueue;
- StringAttr agentQueue;
- StringAttr thorQueue;
- ClusterType platform;
- unsigned clusterWidth;
- StringArray thorProcesses;
- public:
- CContainerWUClusterInfo(const char* _name, const char* type, unsigned _clusterWidth)
- : name(_name), clusterWidth(_clusterWidth)
- {
- StringBuffer queue;
- if (strieq(type, "thor"))
- {
- thorQueue.set(getClusterThorQueueName(queue.clear(), name));
- platform = ThorLCRCluster;
- thorProcesses.append(name);
- }
- else if (strieq(type, "roxie"))
- {
- agentQueue.set(getClusterEclAgentQueueName(queue.clear(), name));
- platform = RoxieCluster;
- }
- else
- {
- agentQueue.set(getClusterEclAgentQueueName(queue.clear(), name));
- platform = HThorCluster;
- }
- serverQueue.set(getClusterEclCCServerQueueName(queue.clear(), name));
- }
- virtual IStringVal& getName(IStringVal& str) const override
- {
- str.set(name.get());
- return str;
- }
- virtual IStringVal& getAgentQueue(IStringVal& str) const override
- {
- str.set(agentQueue);
- return str;
- }
- virtual IStringVal& getServerQueue(IStringVal& str) const override
- {
- str.set(serverQueue);
- return str;
- }
- virtual IStringVal& getThorQueue(IStringVal& str) const override
- {
- str.set(thorQueue);
- return str;
- }
- virtual ClusterType getPlatform() const override
- {
- return platform;
- }
- virtual unsigned getSize() const override
- {
- return clusterWidth;
- }
- virtual bool isLegacyEclServer() const override
- {
- return false;
- }
- virtual IStringVal& getScope(IStringVal& str) const override
- {
- UNIMPLEMENTED;
- }
- virtual unsigned getNumberOfSlaveLogs() const override
- {
- UNIMPLEMENTED;
- }
- virtual IStringVal & getAgentName(IStringVal & str) const override
- {
- UNIMPLEMENTED;
- }
- virtual IStringVal & getECLSchedulerName(IStringVal & str) const override
- {
- UNIMPLEMENTED;
- }
- virtual const StringArray & getECLServerNames() const override
- {
- UNIMPLEMENTED;
- }
- virtual IStringVal & getRoxieProcess(IStringVal & str) const override
- {
- str.set(name.get());
- return str;
- }
- virtual const StringArray & getThorProcesses() const override
- {
- return thorProcesses;
- }
- virtual const StringArray & getPrimaryThorProcesses() const override
- {
- UNIMPLEMENTED;
- }
- virtual const SocketEndpointArray & getRoxieServers() const override
- {
- UNIMPLEMENTED;
- }
- virtual const char *getLdapUser() const override
- {
- UNIMPLEMENTED;
- }
- virtual const char *getLdapPassword() const override
- {
- UNIMPLEMENTED;
- }
- virtual unsigned getRoxieRedundancy() const override
- {
- return 1;
- }
- virtual unsigned getChannelsPerNode() const override
- {
- return 1;
- }
- virtual int getRoxieReplicateOffset() const override
- {
- return 0;
- }
- virtual const char *getAlias() const override
- {
- UNIMPLEMENTED;
- }
- };
- extern TPWRAPPER_API unsigned getContainerWUClusterInfo(CConstWUClusterInfoArray& clusters)
- {
- Owned<IPropertyTreeIterator> queues = getComponentConfigSP()->getElements("queues");
- ForEach(*queues)
- {
- IPropertyTree& queue = queues->query();
- Owned<IConstWUClusterInfo> cluster = new CContainerWUClusterInfo(queue.queryProp("@name"),
- queue.queryProp("@type"), (unsigned) queue.getPropInt("@width", 1));
- clusters.append(*cluster.getClear());
- }
- return clusters.ordinality();
- }
- extern TPWRAPPER_API unsigned getWUClusterInfo(CConstWUClusterInfoArray& clusters)
- {
- return getContainerWUClusterInfo(clusters);
- }
- static IPropertyTree * getContainerClusterConfig(const char * clusterName)
- {
- VStringBuffer xpath("queues[@name='%s']", clusterName);
- return getComponentConfigSP()->getPropTree(xpath);
- }
- extern TPWRAPPER_API IConstWUClusterInfo* getWUClusterInfoByName(const char* clusterName)
- {
- Owned<IPropertyTree> queue = getContainerClusterConfig(clusterName);
- if (!queue)
- return nullptr;
- return new CContainerWUClusterInfo(queue->queryProp("@name"), queue->queryProp("@type"),
- (unsigned) queue->getPropInt("@width", 1));
- }
- extern TPWRAPPER_API void initContainerRoxieTargets(MapStringToMyClass<ISmartSocketFactory>& connMap)
- {
- Owned<IPropertyTreeIterator> services = getGlobalConfigSP()->getElements("services[@type='roxie']");
- ForEach(*services)
- {
- IPropertyTree& service = services->query();
- const char* name = service.queryProp("@name");
- const char* target = service.queryProp("@target");
- const char* port = service.queryProp("@port");
- if (isEmptyString(target) || isEmptyString(name)) //bad config?
- continue;
- StringBuffer s;
- s.append(name).append(':').append(port ? port : "9876");
- Owned<ISmartSocketFactory> sf = new CSmartSocketFactory(s.str(), false, 60, (unsigned) -1);
- connMap.setValue(target, sf.get());
- }
- }
- extern TPWRAPPER_API unsigned getThorClusterNames(StringArray& targetNames, StringArray& queueNames)
- {
- Owned<IStringIterator> targets = getContainerTargetClusters("thor", nullptr);
- ForEach(*targets)
- {
- SCMStringBuffer target;
- targets->str(target);
- targetNames.append(target.str());
- StringBuffer qName;
- queueNames.append(getClusterThorQueueName(qName, target.str()));
- }
- return targetNames.ordinality();
- }
- static std::set<std::string> validTargets;
- static CriticalSection validTargetSect;
- static bool targetsDirty = true;
- static void refreshValidTargets()
- {
- validTargets.clear();
- Owned<IStringIterator> it = getContainerTargetClusters(nullptr, nullptr);
- ForEach(*it)
- {
- SCMStringBuffer s;
- IStringVal& val = it->str(s);
- if (validTargets.find(val.str()) == validTargets.end())
- {
- validTargets.insert(val.str());
- PROGLOG("adding valid target: %s", val.str());
- }
- }
- }
- static void configUpdate(const IPropertyTree *oldComponentConfiguration, const IPropertyTree *oldGlobalConfiguration)
- {
- CriticalBlock block(validTargetSect);
- // as much as effort [small] to check if different as to refresh
- refreshValidTargets();
- PROGLOG("Valid targets updated");
- }
- extern TPWRAPPER_API void validateTargetName(const char* target)
- {
- if (isEmptyString(target))
- throw makeStringException(ECLWATCH_INVALID_CLUSTER_NAME, "Empty target name.");
- CriticalBlock block(validTargetSect);
- configUpdateHook.installOnce(configUpdate, true);
- if (validTargets.find(target) == validTargets.end())
- throw makeStringExceptionV(ECLWATCH_INVALID_CLUSTER_NAME, "Invalid target name: %s", target);
- }
- // NB: bare-metal has a different implementation in TpWrapper.cpp
- bool getSashaService(StringBuffer &serviceAddress, const char *serviceName, bool failIfNotFound)
- {
- return getService(serviceAddress, serviceName, failIfNotFound);
- }
- bool getSashaServiceEP(SocketEndpoint &serviceEndpoint, const char *service, bool failIfNotFound)
- {
- StringBuffer serviceAddress;
- if (!getSashaService(serviceAddress, service, failIfNotFound))
- return false;
- serviceEndpoint.set(serviceAddress);
- return true;
- }
- StringBuffer & getRoxieDefaultPlane(StringBuffer & plane, const char * roxieName)
- {
- Owned<IPropertyTree> queue = getContainerClusterConfig(roxieName);
- if (!queue)
- throw makeStringExceptionV(ECLWATCH_INVALID_CLUSTER_NAME, "Unknown queue name %s", roxieName);
- if (queue->getProp("@storagePlane", plane))
- return plane;
- //Find the first data plane - better if it was retrieved from roxie config
- Owned<IPropertyTreeIterator> dataPlanes = getGlobalConfigSP()->getElements("storage/planes[labels='data']");
- if (!dataPlanes->first())
- throwUnexpectedX("No default data plane defined");
- return plane.append(dataPlanes->query().queryProp("@name"));
- }
|