environment.cpp 100 KB


  1. /*##############################################################################
  2. HPCC SYSTEMS software Copyright (C) 2012 HPCC Systems®.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. ############################################################################## */
  13. #include "jlib.hpp"
  14. #include "environment.hpp"
  15. #include "jptree.hpp"
  16. #include "jexcept.hpp"
  17. #include "jiter.ipp"
  18. #include "jmisc.hpp"
  19. #include "jencrypt.hpp"
  20. #include "jutil.hpp"
  21. #include "mpbase.hpp"
  22. #include "daclient.hpp"
  23. #include "dadfs.hpp"
  24. #include "dafdesc.hpp"
  25. #include "dasds.hpp"
  26. #include "dalienv.hpp"
  27. #include "daqueue.hpp"
  28. #include <string>
  29. #include <unordered_map>
  30. #include <tuple>
  31. #define SDS_LOCK_TIMEOUT 30000
  32. #define DEFAULT_DROPZONE_INDEX 1
  33. #define DROPZONE_BY_MACHINE_SUFFIX "-dropzoneByMachine-"
  34. #define DROPZONE_SUFFIX "dropzone-"
  35. #define MACHINE_PREFIX "machine-"
  36. #define SPARKTHOR_SUFFIX "sparkthor-"
  37. static int environmentTraceLevel = 1;
  38. static Owned <IConstEnvironment> cache;
  39. class CLocalEnvironment;
  40. class CConstMachineInfoIterator : public CSimpleInterfaceOf<IConstMachineInfoIterator>
  41. {
  42. public:
  43. CConstMachineInfoIterator();
  44. virtual bool first() override;
  45. virtual bool next() override;
  46. virtual bool isValid() override;
  47. virtual IConstMachineInfo & query() override;
  48. virtual unsigned count() const override;
  49. protected:
  50. Owned<IConstMachineInfo> curr;
  51. Owned<CLocalEnvironment> constEnv;
  52. unsigned index = 1;
  53. unsigned maxIndex = 0;
  54. };
  55. class CConstDropZoneServerInfoIterator : public CSimpleInterfaceOf<IConstDropZoneServerInfoIterator>
  56. {
  57. public:
  58. CConstDropZoneServerInfoIterator(const IConstDropZoneInfo * dropZone);
  59. virtual bool first() override;
  60. virtual bool next() override;
  61. virtual bool isValid() override;
  62. virtual IConstDropZoneServerInfo & query() override;
  63. virtual unsigned count() const override;
  64. protected:
  65. Owned<IConstDropZoneServerInfo> curr;
  66. Owned<CLocalEnvironment> constEnv;
  67. Owned<IPropertyTreeIterator> serverListIt;
  68. unsigned maxIndex = 0;
  69. };
  70. class CConstDropZoneInfoIterator : public CSimpleInterfaceOf<IConstDropZoneInfoIterator>
  71. {
  72. public:
  73. CConstDropZoneInfoIterator();
  74. virtual bool first() override;
  75. virtual bool next() override;
  76. virtual bool isValid() override;
  77. virtual IConstDropZoneInfo & query() override;
  78. virtual unsigned count() const override;
  79. protected:
  80. Owned<IConstDropZoneInfo> curr;
  81. Owned<CLocalEnvironment> constEnv;
  82. unsigned index = 1;
  83. unsigned maxIndex = 0;
  84. };
  85. class CConstDfuQueueInfoIterator : public CSimpleInterfaceOf<IConstDfuQueueInfoIterator>
  86. {
  87. public:
  88. CConstDfuQueueInfoIterator();
  89. virtual bool first() override;
  90. virtual bool next() override;
  91. virtual bool isValid() override;
  92. virtual IConstDfuQueueInfo & query() override;
  93. virtual unsigned count() const override;
  94. protected:
  95. Owned<IConstDfuQueueInfo> curr;
  96. Owned<CLocalEnvironment> constEnv;
  97. unsigned index = 1;
  98. unsigned maxIndex = 0;
  99. };
  100. class CConstSparkThorInfoIterator : public CSimpleInterfaceOf<IConstSparkThorInfoIterator>
  101. {
  102. public:
  103. CConstSparkThorInfoIterator();
  104. virtual bool first() override;
  105. virtual bool next() override;
  106. virtual bool isValid() override;
  107. virtual IConstSparkThorInfo & query() override;
  108. virtual unsigned count() const override;
  109. protected:
  110. Owned<IConstSparkThorInfo> curr;
  111. Owned<CLocalEnvironment> constEnv;
  112. unsigned index = 1;
  113. unsigned maxIndex = 0;
  114. };
  115. class CConstInstanceInfoIterator : public CSimpleInterfaceOf<IConstInstanceInfoIterator>
  116. {
  117. public:
  118. CConstInstanceInfoIterator(const CLocalEnvironment * env, IPropertyTreeIterator * itr);
  119. virtual bool first() override;
  120. virtual bool next() override;
  121. virtual bool isValid() override;
  122. virtual IConstInstanceInfo & query() override;
  123. virtual unsigned count() const override;
  124. protected:
  125. Owned<IPropertyTreeIterator> instanceItr;
  126. Owned<IConstInstanceInfo> curr;
  127. const CLocalEnvironment* constEnv;
  128. unsigned index = 1;
  129. unsigned maxIndex = 0;
  130. };
  131. //==========================================================================================
  132. class CConstInstanceInfo;
  133. class CLocalEnvironment : implements IConstEnvironment, public CInterface
  134. {
  135. private:
  136. // NOTE - order is important - we need to construct before p and (especially) destruct after p
  137. Owned<IRemoteConnection> conn;
  138. Owned<IPropertyTree> p;
  139. mutable MapStringToMyClass<IConstEnvBase> cache; // NB: map of 'MappingStringToIInterface' that Link's the added IConstEnvBase, and Release's on element removal.
  140. mutable Mutex safeCache;
  141. mutable bool dropZoneCacheBuilt;
  142. mutable bool machineCacheBuilt;
  143. mutable bool sparkThorCacheBuilt;
  144. mutable bool clusterGroupKeyNameCache;
  145. StringBuffer fileAccessUrl;
  146. struct KeyPairMapEntity
  147. {
  148. std::string publicKey, privateKey;
  149. };
  150. mutable std::unordered_map<std::string, KeyPairMapEntity> keyPairMap;
  151. mutable std::unordered_map<std::string, std::string> keyGroupMap;
  152. StringBuffer xPath;
  153. mutable unsigned numOfMachines;
  154. mutable unsigned numOfDropZones;
  155. mutable unsigned numOfSparkThors;
  156. mutable bool isDropZoneRestrictionLoaded = false;
  157. mutable bool dropZoneRestrictionEnabled = true;
  158. void buildDfuQueueCache() const;
  159. mutable unsigned numOfDfuQueues = 0;
  160. mutable bool dfuQueueCacheBuilt = false;
  161. IConstEnvBase * getCache(const char *path) const;
  162. void setCache(const char *path, IConstEnvBase *value) const;
  163. void buildMachineCache() const;
  164. void buildDropZoneCache() const;
  165. void buildSparkThorCache() const;
  166. void init();
  167. void ensureClusterGroupKeyMap() const // keyPairMap and keyGroupMap it alters is mutable
  168. {
  169. if (!clusterGroupKeyNameCache)
  170. {
  171. StringBuffer keysDir;
  172. envGetConfigurationDirectory("keys",nullptr, nullptr, keysDir);
  173. Owned<IPropertyTreeIterator> keyPairIt = p->getElements("EnvSettings/Keys/KeyPair");
  174. ForEach(*keyPairIt)
  175. {
  176. IPropertyTree &keyPair = keyPairIt->query();
  177. const char *name = keyPair.queryProp("@name");
  178. const char *publicKeyPath = keyPair.queryProp("@publicKey");
  179. const char *privateKeyPath = keyPair.queryProp("@privateKey");
  180. if (isEmptyString(name))
  181. {
  182. WARNLOG("skipping invalid EnvSettings/Key/KeyPair entry, name not defined");
  183. continue;
  184. }
  185. if (isEmptyString(publicKeyPath) || isEmptyString(privateKeyPath))
  186. {
  187. WARNLOG("skipping invalid EnvSettings/Key/KeyPair entry, name=%s", name);
  188. continue;
  189. }
  190. StringBuffer absPublicKeyPath, absPrivateKeyPath;
  191. if (!isAbsolutePath(publicKeyPath))
  192. {
  193. absPublicKeyPath.append(keysDir);
  194. addPathSepChar(absPublicKeyPath);
  195. absPublicKeyPath.append(publicKeyPath);
  196. }
  197. else
  198. absPublicKeyPath.append(publicKeyPath);
  199. if (!isAbsolutePath(privateKeyPath))
  200. {
  201. absPrivateKeyPath.append(keysDir);
  202. addPathSepChar(absPrivateKeyPath);
  203. absPrivateKeyPath.append(privateKeyPath);
  204. }
  205. else
  206. absPrivateKeyPath.append(privateKeyPath);
  207. keyPairMap[name] = { absPublicKeyPath.str(), absPrivateKeyPath.str() };
  208. }
  209. /* From 7.0.0 until 7.0.6, the <Keys> section of the environment required
  210. * the mappings to be defined as "Cluster" instead of "ClusterGroup" - See: HPCC-21192
  211. */
  212. #define BKWRDCOMPAT_CLUSTER_VS_CLUSTERGROUP
  213. const char *groupKeysPath = "EnvSettings/Keys/ClusterGroup";
  214. #ifdef BKWRDCOMPAT_CLUSTER_VS_CLUSTERGROUP
  215. for (unsigned i=0; i<2; i++) // once for std. "ClusterGroup", 2nd time for legacy "Cluster"
  216. {
  217. #endif
  218. Owned<IPropertyTreeIterator> clusterGroupIter = p->getElements(groupKeysPath);
  219. #ifdef BKWRDCOMPAT_CLUSTER_VS_CLUSTERGROUP
  220. if (clusterGroupIter->first() && keyGroupMap.size()) // NB: always 0 1st time around.
  221. {
  222. WARNLOG("Invalid configuration: mixed 'Keys/ClusterGroup' definitions and legacy 'Keys/Cluster' definitions found, legacy 'Keys/Cluster' definition will be ignored.");
  223. break;
  224. }
  225. #endif
  226. ForEach(*clusterGroupIter)
  227. {
  228. IPropertyTree &clusterGroup = clusterGroupIter->query();
  229. const char *groupName = clusterGroup.queryProp("@name");
  230. if (isEmptyString(groupName))
  231. {
  232. WARNLOG("skipping %s entry with no name", groupKeysPath);
  233. continue;
  234. }
  235. if (clusterGroup.hasProp("@keyPairName"))
  236. {
  237. const char *keyPairName = clusterGroup.queryProp("@keyPairName");
  238. if (isEmptyString(keyPairName))
  239. {
  240. WARNLOG("skipping invalid %s entry, name=%s", groupKeysPath, groupName);
  241. continue;
  242. }
  243. keyGroupMap[groupName] = keyPairName;
  244. }
  245. }
  246. #ifdef BKWRDCOMPAT_CLUSTER_VS_CLUSTERGROUP
  247. groupKeysPath = "EnvSettings/Keys/Cluster";
  248. }
  249. #endif
  250. clusterGroupKeyNameCache = true;
  251. }
  252. }
  253. public:
  254. IMPLEMENT_IINTERFACE;
  255. CLocalEnvironment(IRemoteConnection *_conn, IPropertyTree *x=nullptr, const char* path="Environment");
  256. CLocalEnvironment(const char* path="config.xml");
  257. virtual ~CLocalEnvironment();
  258. virtual IStringVal & getName(IStringVal & str) const;
  259. virtual IStringVal & getXML(IStringVal & str) const;
  260. virtual IPropertyTree & getPTree() const;
  261. virtual IEnvironment& lock() const;
  262. virtual IConstDomainInfo * getDomain(const char * name) const;
  263. virtual IConstMachineInfo * getMachine(const char * name) const;
  264. virtual IConstMachineInfo * getMachineByAddress(const char * hostOrIP) const;
  265. virtual IConstMachineInfo * getMachineForLocalHost() const;
  266. virtual IConstDropZoneInfo * getDropZone(const char * name) const;
  267. virtual IConstInstanceInfo * getInstance(const char * type, const char * version, const char *domain) const;
  268. virtual CConstInstanceInfo * getInstanceByIP(const char *type, const char *version, IpAddress &ip) const;
  269. virtual IConstComputerTypeInfo * getComputerType(const char * name) const;
  270. virtual bool getRunInfo(IStringVal & path, IStringVal & dir, const char *type, const char *version, const char *machineaddr, const char *defprogname) const;
  271. virtual void preload();
  272. virtual IRemoteConnection* getConnection() const { return conn.getLink(); }
  273. void setXML(const char * logicalName);
  274. const char* getPath() const { return xPath.str(); }
  275. void unlockRemote();
  276. virtual bool isConstEnvironment() const { return true; }
  277. virtual void clearCache();
  278. virtual IConstMachineInfoIterator * getMachineIterator() const;
  279. virtual IConstDropZoneInfoIterator * getDropZoneIteratorByAddress(const char * address) const;
  280. virtual IConstDropZoneInfo * getDropZoneByAddressPath(const char * netaddress, const char *targetPath) const;
  281. virtual IConstDropZoneInfoIterator * getDropZoneIterator() const;
  282. unsigned getNumberOfMachines() const { buildMachineCache(); return numOfMachines; }
  283. IConstMachineInfo * getMachineByIndex(unsigned index) const;
  284. unsigned getNumberOfDropZones() const { buildDropZoneCache(); return numOfDropZones; }
  285. IConstDropZoneInfo * getDropZoneByIndex(unsigned index) const;
  286. bool isDropZoneRestrictionEnabled() const;
  287. unsigned getNumberOfDfuQueues() const { buildDfuQueueCache(); return numOfDfuQueues; }
  288. IConstDfuQueueInfo * getDfuQueueByIndex(unsigned index) const;
  289. virtual IConstDfuQueueInfoIterator * getDfuQueueIterator() const;
  290. bool isValidDfuQueueName(const char * queueName) const;
  291. virtual const char *getClusterGroupKeyPairName(const char *group) const override
  292. {
  293. synchronized procedure(safeCache);
  294. ensureClusterGroupKeyMap();
  295. auto it = keyGroupMap.find(group);
  296. if (it == keyGroupMap.end())
  297. return nullptr;
  298. else
  299. return it->second.c_str();
  300. }
  301. virtual const char *getPublicKeyPath(const char *keyPairName) const override
  302. {
  303. synchronized procedure(safeCache);
  304. ensureClusterGroupKeyMap();
  305. auto it = keyPairMap.find(keyPairName);
  306. if (it == keyPairMap.end())
  307. return nullptr;
  308. else
  309. return it->second.publicKey.c_str();
  310. }
  311. virtual const char *getPrivateKeyPath(const char *keyPairName) const override
  312. {
  313. synchronized procedure(safeCache);
  314. ensureClusterGroupKeyMap();
  315. auto it = keyPairMap.find(keyPairName);
  316. if (it == keyPairMap.end())
  317. return nullptr;
  318. else
  319. return it->second.privateKey.c_str();
  320. }
  321. virtual const char *getFileAccessUrl() const
  322. {
  323. synchronized procedure(safeCache);
  324. return fileAccessUrl.length() ? fileAccessUrl.str() : nullptr;
  325. }
  326. virtual IConstDaFileSrvInfo *getDaFileSrvGroupInfo(const char *name) const override;
  327. virtual IConstSparkThorInfo *getSparkThor(const char *name) const;
  328. virtual IConstSparkThorInfoIterator *getSparkThorIterator() const;
  329. unsigned getNumberOfSparkThors() const { buildSparkThorCache(); return numOfSparkThors; }
  330. IConstSparkThorInfo *getSparkThorByIndex(unsigned index) const;
  331. };
  332. class CLockedEnvironment : implements IEnvironment, public CInterface
  333. {
  334. public:
  335. //note that order of construction/destruction is important
  336. Owned<CLocalEnvironment> c;
  337. Owned<CLocalEnvironment> env;
  338. Owned<CLocalEnvironment> constEnv;
  339. IMPLEMENT_IINTERFACE;
  340. CLockedEnvironment(CLocalEnvironment *_c)
  341. {
  342. Owned<IRemoteConnection> connection = _c->getConnection();
  343. if (connection)
  344. {
  345. constEnv.set(_c); //save original constant environment
  346. //we only wish to allow one party to allow updating the environment.
  347. //
  348. //create a new /NewEnvironment subtree, locked for read/write access for self and entire subtree; delete on disconnect
  349. //
  350. StringBuffer newName("/New");
  351. newName.append(constEnv->getPath());
  352. const unsigned int mode = RTM_CREATE | RTM_CREATE_QUERY | RTM_LOCK_READ | RTM_LOCK_WRITE |
  353. RTM_LOCK_SUB | RTM_DELETE_ON_DISCONNECT;
  354. Owned<IRemoteConnection> conn = querySDS().connect(newName.str(), myProcessSession(), mode, SDS_LOCK_TIMEOUT);
  355. if (conn == nullptr)
  356. {
  357. if (environmentTraceLevel > 0)
  358. IERRLOG("Failed to create locked environment %s", newName.str());
  359. throw MakeStringException(-1, "Failed to get a lock on environment /%s", newName.str());
  360. }
  361. //save the locked environment
  362. env.setown(new CLocalEnvironment(conn, nullptr, newName.str()));
  363. //get a lock on the const environment
  364. const unsigned int mode2 = RTM_CREATE_QUERY | RTM_LOCK_READ | RTM_LOCK_WRITE | RTM_LOCK_SUB;
  365. Owned<IRemoteConnection> conn2 = querySDS().connect(constEnv->getPath(), myProcessSession(), mode2, SDS_LOCK_TIMEOUT);
  366. if (conn2 == nullptr)
  367. {
  368. if (environmentTraceLevel > 0)
  369. IERRLOG("Failed to lock environment %s", constEnv->getPath());
  370. throw MakeStringException(-1, "Failed to get a lock on environment /%s", constEnv->getPath());
  371. }
  372. //copy const environment to our member environment
  373. Owned<IPropertyTree> pSrc = conn2->getRoot();
  374. c.setown( new CLocalEnvironment(nullptr, createPTreeFromIPT(pSrc)));
  375. conn2->rollback();
  376. }
  377. else
  378. {
  379. c.set(_c);
  380. }
  381. }
  382. virtual ~CLockedEnvironment()
  383. {
  384. }
  385. virtual IStringVal & getName(IStringVal & str) const
  386. { return c->getName(str); }
  387. virtual IStringVal & getXML(IStringVal & str) const
  388. { return c->getXML(str); }
  389. virtual IPropertyTree & getPTree() const
  390. { return c->getPTree(); }
  391. virtual IConstDomainInfo * getDomain(const char * name) const
  392. { return c->getDomain(name); }
  393. virtual IConstMachineInfo * getMachine(const char * name) const
  394. { return c->getMachine(name); }
  395. virtual IConstMachineInfo * getMachineByAddress(const char * hostOrIP) const
  396. { return c->getMachineByAddress(hostOrIP); }
  397. virtual IConstMachineInfo * getMachineForLocalHost() const
  398. { return c->getMachineForLocalHost(); }
  399. virtual IConstDropZoneInfo * getDropZone(const char * name) const
  400. { return c->getDropZone(name); }
  401. virtual IConstInstanceInfo * getInstance(const char *type, const char *version, const char *domain) const
  402. { return c->getInstance(type, version, domain); }
  403. virtual bool getRunInfo(IStringVal & path, IStringVal & dir, const char *type, const char *version, const char *machineaddr,const char *defprogname) const
  404. { return c->getRunInfo(path, dir, type, version, machineaddr, defprogname); }
  405. virtual IConstComputerTypeInfo * getComputerType(const char * name) const
  406. { return c->getComputerType(name); }
  407. virtual IEnvironment & lock() const
  408. { ((CInterface*)this)->Link(); return *(IEnvironment*)this; }
  409. virtual void commit();
  410. virtual void rollback();
  411. virtual void setXML(const char * pstr)
  412. { c->setXML(pstr); }
  413. virtual void preload()
  414. { c->preload(); }
  415. virtual bool isConstEnvironment() const { return false; }
  416. virtual void clearCache() { c->clearCache(); }
  417. virtual IConstMachineInfoIterator * getMachineIterator() const
  418. { return c->getMachineIterator(); }
  419. virtual IConstDropZoneInfoIterator * getDropZoneIteratorByAddress(const char * address) const
  420. { return c->getDropZoneIteratorByAddress(address); }
  421. virtual IConstDropZoneInfo * getDropZoneByAddressPath(const char * netaddress, const char *targetPath) const
  422. { return c->getDropZoneByAddressPath(netaddress, targetPath); }
  423. virtual IConstDropZoneInfoIterator * getDropZoneIterator() const
  424. { return c->getDropZoneIterator(); }
  425. virtual bool isDropZoneRestrictionEnabled() const
  426. { return c->isDropZoneRestrictionEnabled(); }
  427. virtual const char *getClusterGroupKeyPairName(const char *cluster) const override
  428. { return c->getClusterGroupKeyPairName(cluster); }
  429. virtual const char *getPublicKeyPath(const char *keyPairName) const override
  430. { return c->getPublicKeyPath(keyPairName); }
  431. virtual const char *getPrivateKeyPath(const char *keyPairName) const override
  432. { return c->getPrivateKeyPath(keyPairName); }
  433. virtual const char *getFileAccessUrl() const
  434. { return c->getFileAccessUrl(); }
  435. virtual IConstDaFileSrvInfo *getDaFileSrvGroupInfo(const char *name) const override
  436. { return c->getDaFileSrvGroupInfo(name); }
  437. virtual IConstSparkThorInfo *getSparkThor(const char *name) const
  438. { return c->getSparkThor(name); }
  439. virtual IConstSparkThorInfoIterator *getSparkThorIterator() const
  440. { return c->getSparkThorIterator(); }
  441. virtual IConstDfuQueueInfoIterator * getDfuQueueIterator() const
  442. { return c->getDfuQueueIterator(); }
  443. virtual bool isValidDfuQueueName(const char * queueName) const
  444. { return c->isValidDfuQueueName(queueName); }
  445. };
  446. void CLockedEnvironment::commit()
  447. {
  448. if (constEnv)
  449. {
  450. //get a lock on const environment momentarily
  451. const unsigned int mode2 = RTM_CREATE_QUERY | RTM_LOCK_READ | RTM_LOCK_WRITE | RTM_LOCK_SUB;
  452. Owned<IRemoteConnection> conn2 = querySDS().connect(constEnv->getPath(), myProcessSession(), mode2, SDS_LOCK_TIMEOUT);
  453. if (conn2 == nullptr)
  454. {
  455. if (environmentTraceLevel > 0)
  456. IERRLOG("Failed to lock environment %s", constEnv->getPath());
  457. throw MakeStringException(-1, "Failed to get a lock on environment /%s", constEnv->getPath());
  458. }
  459. //copy locked environment to const environment
  460. Owned<IPropertyTree> pSrc = &getPTree();
  461. Owned<IPropertyTree> pDst = conn2->queryRoot()->getBranch(nullptr);
  462. // JCS - I think it could (and would be more efficient if it had kept the original read lock connection to Env
  463. // - instead of using NewEnv as lock point, still work on copy, then changeMode of original connect
  464. // - as opposed to current scheme, where it recoonects in write mode and has to lazy fetch original env to update.
  465. // ensures pDst is equal to pSrc, whilst minimizing changes to pDst
  466. try { synchronizePTree(pDst, pSrc); }
  467. catch (IException *) { conn2->rollback(); throw; }
  468. conn2->commit();
  469. }
  470. else
  471. {
  472. Owned<IRemoteConnection> conn = c->getConnection();
  473. conn->commit();
  474. }
  475. }
  476. void CLockedEnvironment::rollback()
  477. {
  478. if (constEnv)
  479. {
  480. //get a lock on const environment momentarily
  481. const unsigned int mode2 = RTM_CREATE_QUERY | RTM_LOCK_READ | RTM_LOCK_WRITE | RTM_LOCK_SUB;
  482. Owned<IRemoteConnection> conn2 = querySDS().connect(constEnv->getPath(), myProcessSession(), mode2, SDS_LOCK_TIMEOUT);
  483. if (conn2 == nullptr)
  484. {
  485. if (environmentTraceLevel > 0)
  486. IERRLOG("Failed to lock environment %s", constEnv->getPath());
  487. throw MakeStringException(-1, "Failed to get a lock on environment /%s", constEnv->getPath());
  488. }
  489. //copy const environment to locked environment (as it stands now) again losing any changes we made
  490. Owned<IPropertyTree> pSrc = conn2->getRoot();
  491. Owned<IPropertyTree> pDst = &getPTree();
  492. pDst->removeTree( pDst->queryPropTree("Hardware") );
  493. pDst->removeTree( pDst->queryPropTree("Software") );
  494. pDst->removeTree( pDst->queryPropTree("Programs") );
  495. pDst->removeTree( pDst->queryPropTree("Data") );
  496. mergePTree(pDst, pSrc);
  497. conn2->rollback();
  498. }
  499. else
  500. {
  501. Owned<IRemoteConnection> conn = c->getConnection();
  502. conn->rollback();
  503. }
  504. }
  505. //==========================================================================================
  506. // the following class implements notification handler for subscription to dali for environment
  507. // updates by other clients and is used by environment factory below. This also serves as
  508. // a sample self-contained implementation that can be easily tailored for other purposes.
  509. //==========================================================================================
  510. class CSdsSubscription : implements ISDSSubscription, public CInterface
  511. {
  512. public:
  513. CSdsSubscription(IEnvironmentFactory &_factory) : factory(_factory)
  514. {
  515. m_constEnvUpdated = false;
  516. sub_id = factory.subscribe(this);
  517. }
  518. virtual ~CSdsSubscription()
  519. {
  520. /* note that ideally, we would make this class automatically
  521. unsubscribe in this destructor. However, underlying dali client
  522. layer (CDaliSubscriptionManagerStub) links to this object and so
  523. object would not get destroyed just by an application releasing it.
  524. The application either needs to explicitly unsubscribe or close
  525. the environment which unsubscribes during close down. */
  526. }
  527. void unsubscribe()
  528. {
  529. synchronized block(m_mutexEnv);
  530. if (sub_id)
  531. {
  532. factory.unsubscribe(sub_id);
  533. sub_id = 0;
  534. }
  535. }
  536. IMPLEMENT_IINTERFACE;
  537. //another client (like configenv) may have updated the environment and we got notified
  538. //(thanks to our subscription) but don't just reload it yet since this notification is sent on
  539. //another thread asynchronously and we may be actively working with the old environment. Just
  540. //invoke handleEnvironmentChange() when we are ready to invalidate cache in environment factory.
  541. //
  542. void notify(SubscriptionId id, const char *xpath, SDSNotifyFlags flags, unsigned valueLen=0, const void *valueData=nullptr)
  543. {
  544. DBGLOG("Environment was updated by another client of Dali server. Invalidating cache.\n");
  545. synchronized block(m_mutexEnv);
  546. m_constEnvUpdated = true;
  547. }
  548. void handleEnvironmentChange()
  549. {
  550. synchronized block(m_mutexEnv);
  551. if (m_constEnvUpdated)
  552. {
  553. Owned<IConstEnvironment> constEnv = factory.openEnvironment();
  554. constEnv->clearCache();
  555. m_constEnvUpdated = false;
  556. }
  557. }
  558. private:
  559. SubscriptionId sub_id;
  560. Mutex m_mutexEnv;
  561. bool m_constEnvUpdated;
  562. IEnvironmentFactory &factory;
  563. };
  564. //==========================================================================================
  565. class CEnvironmentFactory : public CInterface,
  566. implements IEnvironmentFactory, implements IDaliClientShutdown
  567. {
  568. public:
  569. IMPLEMENT_IINTERFACE;
  570. typedef ArrayOf<SubscriptionId> SubscriptionIDs;
  571. SubscriptionIDs subIDs;
  572. Mutex mutex;
  573. Owned<CSdsSubscription> subscription;
  574. CEnvironmentFactory()
  575. {
  576. }
  577. virtual void clientShutdown();
  578. virtual ~CEnvironmentFactory()
  579. {
  580. close(); //just in case it was not explicitly closed
  581. }
  582. virtual IConstEnvironment* openEnvironment()
  583. {
  584. synchronized procedure(mutex);
  585. if (!cache)
  586. {
  587. Owned<IRemoteConnection> conn = querySDS().connect("/Environment", myProcessSession(), 0, SDS_LOCK_TIMEOUT);
  588. if (conn)
  589. cache.setown(new CLocalEnvironment(conn));
  590. }
  591. if (!cache)
  592. throw MakeStringException(0, "Failed to get environment information");
  593. return cache.getLink();
  594. }
  595. virtual IEnvironment* updateEnvironment()
  596. {
  597. Owned<IConstEnvironment> pConstEnv = openEnvironment();
  598. synchronized procedure(mutex);
  599. return &pConstEnv->lock();
  600. }
  601. virtual IEnvironment * loadLocalEnvironmentFile(const char * filename)
  602. {
  603. Owned<IPropertyTree> ptree = createPTreeFromXMLFile(filename, ipt_lowmem);
  604. Owned<CLocalEnvironment> pLocalEnv = new CLocalEnvironment(nullptr, ptree);
  605. return new CLockedEnvironment(pLocalEnv);
  606. }
  607. virtual IEnvironment * loadLocalEnvironment(const char * xml)
  608. {
  609. Owned<IPropertyTree> ptree = createPTreeFromXMLString(xml, ipt_lowmem);
  610. Owned<CLocalEnvironment> pLocalEnv = new CLocalEnvironment(nullptr, ptree);
  611. return new CLockedEnvironment(pLocalEnv);
  612. }
  613. void close()
  614. {
  615. SubscriptionIDs copySubIDs;
  616. {
  617. synchronized procedure(mutex);
  618. cache.clear();
  619. //save the active subscriptions in another array
  620. //so they can be unsubscribed without causing deadlock
  621. // since ~CSdsSubscription() would ask us to unsubscribe the
  622. //same requiring a mutex lock (copy is a little price for this
  623. //normally small/empty array).
  624. //
  625. ForEachItemIn(i, subIDs)
  626. copySubIDs.append(subIDs.item(i));
  627. subIDs.kill();
  628. }
  629. //now unsubscribe all outstanding subscriptions
  630. //
  631. subscription.clear();
  632. ForEachItemIn(i, copySubIDs)
  633. querySDS().unsubscribe( copySubIDs.item(i) );
  634. }
  635. virtual SubscriptionId subscribe(ISDSSubscription* pSubHandler)
  636. {
  637. SubscriptionId sub_id = querySDS().subscribe("/Environment", *pSubHandler);
  638. synchronized procedure(mutex);
  639. subIDs.append(sub_id);
  640. return sub_id;
  641. }
  642. virtual void unsubscribe(SubscriptionId sub_id)
  643. {
  644. synchronized procedure(mutex);
  645. aindex_t i = subIDs.find(sub_id);
  646. if (i != NotFound)
  647. {
  648. querySDS().unsubscribe(sub_id);
  649. subIDs.remove(i);
  650. }
  651. }
  652. virtual void validateCache()
  653. {
  654. if (!subscription)
  655. subscription.setown( new CSdsSubscription(*this) );
  656. subscription->handleEnvironmentChange();
  657. }
  658. private:
  659. IRemoteConnection* connect(const char *xpath, unsigned flags)
  660. {
  661. return querySDS().connect(xpath, myProcessSession(), flags, SDS_LOCK_TIMEOUT);
  662. }
  663. };
  664. static CEnvironmentFactory *factory=nullptr;
  665. void CEnvironmentFactory::clientShutdown()
  666. {
  667. closeEnvironment();
  668. }
  669. MODULE_INIT(INIT_PRIORITY_ENV_ENVIRONMENT)
  670. {
  671. return true;
  672. }
  673. MODULE_EXIT()
  674. {
  675. ::Release(factory);
  676. }
  677. //==========================================================================================
  678. class CConstEnvBase : public CInterface
  679. {
  680. protected:
  681. const CLocalEnvironment* env; // Not linked - would be circular....
  682. // That could cause problems
  683. Linked<IPropertyTree> root;
  684. public:
  685. CConstEnvBase(const CLocalEnvironment* _env, IPropertyTree *_root)
  686. : env(_env), root(_root)
  687. {
  688. }
  689. IStringVal& getXML(IStringVal &str) const
  690. {
  691. StringBuffer x;
  692. toXML(root->queryBranch("."), x);
  693. str.set(x.str());
  694. return str;
  695. };
  696. IStringVal& getName(IStringVal &str) const
  697. {
  698. str.set(root->queryProp("@name"));
  699. return str;
  700. }
  701. IPropertyTree& getPTree() const
  702. {
  703. return *LINK(root);
  704. }
  705. };
  706. #define IMPLEMENT_ICONSTENVBASE \
  707. virtual IStringVal& getXML(IStringVal &str) const { return CConstEnvBase::getXML(str); } \
  708. virtual IStringVal& getName(IStringVal &str) const { return CConstEnvBase::getName(str); } \
  709. virtual IPropertyTree& getPTree() const { return CConstEnvBase::getPTree(); }
  710. //==========================================================================================
  711. class CConstDomainInfo : public CConstEnvBase, implements IConstDomainInfo
  712. {
  713. public:
  714. IMPLEMENT_IINTERFACE;
  715. IMPLEMENT_ICONSTENVBASE;
  716. CConstDomainInfo(CLocalEnvironment *env, IPropertyTree *root) : CConstEnvBase(env, root) {}
  717. virtual void getAccountInfo(IStringVal &name, IStringVal &pw) const
  718. {
  719. if (root->hasProp("@username"))
  720. name.set(root->queryProp("@username"));
  721. else
  722. name.clear();
  723. if (root->hasProp("@password"))
  724. {
  725. StringBuffer pwd;
  726. decrypt(pwd, root->queryProp("@password"));
  727. pw.set(pwd.str());
  728. }
  729. else
  730. pw.clear();
  731. }
  732. virtual void getSnmpSecurityString(IStringVal & securityString) const
  733. {
  734. if (root->hasProp("@snmpSecurityString"))
  735. {
  736. StringBuffer sec_string;
  737. decrypt(sec_string, root->queryProp("@snmpSecurityString"));
  738. securityString.set(sec_string.str());
  739. }
  740. else
  741. securityString.set("");
  742. }
  743. virtual void getSSHAccountInfo(IStringVal &name, IStringVal &sshKeyFile, IStringVal& sshKeyPassphrase) const
  744. {
  745. if (root->hasProp("@username"))
  746. name.set(root->queryProp("@username"));
  747. else
  748. name.clear();
  749. if (root->hasProp("@sshKeyFile"))
  750. sshKeyFile.set(root->queryProp("@sshKeyFile"));
  751. else
  752. sshKeyFile.clear();
  753. if (root->hasProp("@sshKeyPassphrase"))
  754. sshKeyPassphrase.set(root->queryProp("@sshKeyPassphrase"));
  755. else
  756. sshKeyPassphrase.clear();
  757. }
  758. };
  759. //==========================================================================================
  760. struct mapOsEnums { EnvMachineOS val; const char *str; };
  761. static EnvMachineOS getEnum(IPropertyTree *p, const char *propname, mapOsEnums *map)
  762. {
  763. const char *v = p->queryProp(propname);
  764. if (v && *v)
  765. {
  766. while (map->str)
  767. {
  768. if (stricmp(v, map->str)==0)
  769. return map->val;
  770. map++;
  771. }
  772. throw MakeStringException(0, "Unknown operating system: \"%s\"", v);
  773. }
  774. return MachineOsUnknown;
  775. }
  776. struct mapStateEnums { EnvMachineState val; const char *str; };
  777. static EnvMachineState getEnum(IPropertyTree *p, const char *propname, mapStateEnums *map)
  778. {
  779. const char *v = p->queryProp(propname);
  780. if (v && *v)
  781. {
  782. while (map->str)
  783. {
  784. if (stricmp(v, map->str)==0)
  785. return map->val;
  786. map++;
  787. }
  788. assertex(!"Unexpected value in getEnum");
  789. }
  790. return MachineStateUnknown;
  791. }
  792. mapOsEnums OperatingSystems[] = {
  793. { MachineOsW2K, "W2K" },
  794. { MachineOsSolaris, "solaris" },
  795. { MachineOsLinux, "linux" },
  796. { MachineOsSize, nullptr }
  797. };
  798. mapStateEnums MachineStates[] = {
  799. { MachineStateAvailable, "Available" },
  800. { MachineStateUnavailable, "Unavailable" },
  801. { MachineStateUnknown, "Unknown" }
  802. };
  803. //==========================================================================================
  804. class CConstMachineInfo : public CConstEnvBase, implements IConstMachineInfo
  805. {
  806. public:
  807. IMPLEMENT_IINTERFACE;
  808. IMPLEMENT_ICONSTENVBASE;
  809. CConstMachineInfo(CLocalEnvironment *env, IPropertyTree *root) : CConstEnvBase(env, root) {}
  810. virtual IConstDomainInfo* getDomain() const
  811. {
  812. return env->getDomain(root->queryProp("@domain"));
  813. }
  814. virtual IStringVal& getNetAddress(IStringVal &str) const
  815. {
  816. str.set(root->queryProp("@netAddress"));
  817. return str;
  818. }
  819. virtual IStringVal& getDescription(IStringVal &str) const
  820. {
  821. UNIMPLEMENTED;
  822. }
  823. virtual unsigned getNicSpeedMbitSec() const
  824. {
  825. const char * v = root->queryProp("@nicSpeed");
  826. if (v && *v)
  827. return atoi(v);
  828. Owned<IConstComputerTypeInfo> type = env->getComputerType(root->queryProp("@computerType"));
  829. if (type)
  830. return type->getNicSpeedMbitSec();
  831. return 0;
  832. }
  833. virtual EnvMachineOS getOS() const
  834. {
  835. EnvMachineOS os = getEnum(root, "@opSys", OperatingSystems);
  836. if (os != MachineOsUnknown)
  837. return os;
  838. Owned<IConstComputerTypeInfo> type = env->getComputerType(root->queryProp("@computerType"));
  839. if (type)
  840. return type->getOS();
  841. return MachineOsUnknown;
  842. }
  843. virtual EnvMachineState getState() const
  844. {
  845. return getEnum(root, "@state", MachineStates);
  846. }
  847. };
  848. //==========================================================================================
  849. class CConstComputerTypeInfo : public CConstEnvBase, implements IConstComputerTypeInfo
  850. {
  851. public:
  852. IMPLEMENT_IINTERFACE;
  853. IMPLEMENT_ICONSTENVBASE;
  854. CConstComputerTypeInfo(CLocalEnvironment *env, IPropertyTree *root) : CConstEnvBase(env, root) {}
  855. virtual EnvMachineOS getOS() const
  856. {
  857. EnvMachineOS os = getEnum(root, "@opSys", OperatingSystems);
  858. if (os != MachineOsUnknown)
  859. return os;
  860. Owned<IConstComputerTypeInfo> type = env->getComputerType(root->queryProp("@computerType"));
  861. if (type && (type.get() != this))
  862. return type->getOS();
  863. return MachineOsUnknown;
  864. }
  865. virtual unsigned getNicSpeedMbitSec() const
  866. {
  867. const char * v = root->queryProp("@nicSpeed");
  868. if (v && *v)
  869. return atoi(v);
  870. Owned<IConstComputerTypeInfo> type = env->getComputerType(root->queryProp("@computerType"));
  871. if (type && (type.get() != this))
  872. return type->getNicSpeedMbitSec();
  873. return 0;
  874. }
  875. };
  876. //==========================================================================================
  877. extern ENVIRONMENT_API unsigned __int64 readSizeSetting(const char * sizeStr, const unsigned long defaultSize)
  878. {
  879. StringBuffer buf(sizeStr);
  880. buf.trim();
  881. if (buf.isEmpty())
  882. return defaultSize;
  883. const char* ptrStart = buf;
  884. const char* ptrAfterDigit = ptrStart;
  885. while (*ptrAfterDigit && isdigit(*ptrAfterDigit))
  886. ptrAfterDigit++;
  887. if (!*ptrAfterDigit)
  888. return atol(buf);
  889. const char* ptr = ptrAfterDigit;
  890. while (*ptr && (ptr[0] == ' '))
  891. ptr++;
  892. char c = ptr[0];
  893. buf.setLength(ptrAfterDigit - ptrStart);
  894. unsigned __int64 size = atoll(buf);
  895. switch (c)
  896. {
  897. case 'k':
  898. case 'K':
  899. size *= 1000;
  900. break;
  901. case 'm':
  902. case 'M':
  903. size *= 1000000;
  904. break;
  905. case 'g':
  906. case 'G':
  907. size *= 1000000000;
  908. break;
  909. case 't':
  910. case 'T':
  911. size *= 1000000000000;
  912. break;
  913. default:
  914. break;
  915. }
  916. return size;
  917. }
  918. class CConstInstanceInfo : public CConstEnvBase, implements IConstInstanceInfo
  919. {
  920. public:
  921. IMPLEMENT_IINTERFACE;
  922. IMPLEMENT_ICONSTENVBASE;
  923. CConstInstanceInfo(const CLocalEnvironment *env, IPropertyTree *root) : CConstEnvBase(env, root)
  924. {
  925. }
  926. virtual IConstMachineInfo * getMachine() const
  927. {
  928. return env->getMachine(root->queryProp("@computer"));
  929. }
  930. virtual IStringVal & getEndPoint(IStringVal & str) const
  931. {
  932. SCMStringBuffer ep;
  933. Owned<IConstMachineInfo> machine = getMachine();
  934. if (machine)
  935. {
  936. machine->getNetAddress(ep);
  937. const char *port = root->queryProp("@port");
  938. if (port)
  939. ep.s.append(':').append(port);
  940. }
  941. str.set(ep.str());
  942. return str;
  943. }
  944. virtual IStringVal & getExecutableDirectory(IStringVal & str) const
  945. {
  946. // this is the deploy directory so uses local path separators (I suspect this call is LEGACY now)
  947. SCMStringBuffer ep;
  948. Owned<IConstMachineInfo> machine = getMachine();
  949. if (machine)
  950. {
  951. machine->getNetAddress(ep);
  952. ep.s.insert(0, PATHSEPSTR PATHSEPSTR);
  953. }
  954. ep.s.append(PATHSEPCHAR).append(root->queryProp("@directory"));
  955. str.set(ep.str());
  956. return str;
  957. }
  958. virtual IStringVal & getDirectory(IStringVal & str) const
  959. {
  960. str.set(root->queryProp("@directory"));
  961. return str;
  962. }
  963. virtual bool doGetRunInfo(IStringVal & progpath, IStringVal & workdir, const char *defprogname, bool useprog) const
  964. {
  965. // this is remote path i.e. path should match *target* nodes format
  966. Owned<IConstMachineInfo> machine = getMachine();
  967. if (!machine)
  968. return false;
  969. char psep;
  970. bool appendexe;
  971. switch (machine->getOS())
  972. {
  973. case MachineOsSolaris:
  974. case MachineOsLinux:
  975. psep = '/';
  976. appendexe = false;
  977. break;
  978. default:
  979. psep = '\\';
  980. appendexe = true;
  981. }
  982. StringBuffer tmp;
  983. const char *program = useprog?root->queryProp("@program"):nullptr; // if program specified assume absolute
  984. if (!program||!*program)
  985. {
  986. SCMStringBuffer ep;
  987. machine->getNetAddress(ep);
  988. const char *dir = root->queryProp("@directory");
  989. if (dir)
  990. {
  991. if (isPathSepChar(*dir))
  992. dir++;
  993. if (!*dir)
  994. return false;
  995. tmp.append(psep).append(psep).append(ep.s).append(psep);
  996. do {
  997. if (isPathSepChar(*dir))
  998. tmp.append(psep);
  999. else
  1000. tmp.append(*dir);
  1001. dir++;
  1002. } while (*dir);
  1003. if (!isPathSepChar(tmp.charAt(tmp.length()-1)))
  1004. tmp.append(psep);
  1005. tmp.append(defprogname);
  1006. size32_t l = strlen(defprogname);
  1007. if (appendexe&&((l<5)||(stricmp(defprogname+l-4,".exe")!=0)))
  1008. tmp.append(".exe");
  1009. }
  1010. program = tmp.str();
  1011. }
  1012. progpath.set(program);
  1013. const char *workd = root->queryProp("@workdir"); // if program specified assume absolute
  1014. workdir.set(workd?workd:"");
  1015. return true;
  1016. }
  1017. virtual bool getRunInfo(IStringVal & progpath, IStringVal & workdir, const char *defprogname) const
  1018. {
  1019. return doGetRunInfo(progpath,workdir,defprogname,true);
  1020. }
  1021. virtual unsigned getPort() const
  1022. {
  1023. return root->getPropInt("@port", 0);
  1024. }
  1025. };
  1026. class CConstDropZoneServerInfo : public CConstEnvBase, implements IConstDropZoneServerInfo
  1027. {
  1028. public:
  1029. IMPLEMENT_IINTERFACE;
  1030. IMPLEMENT_ICONSTENVBASE;
  1031. CConstDropZoneServerInfo(CLocalEnvironment *env, IPropertyTree *root) : CConstEnvBase(env, root), prop(root) {}
  1032. virtual StringBuffer & getName(StringBuffer & name) const
  1033. {
  1034. name.append(prop->queryProp("@name"));
  1035. return name;
  1036. }
  1037. virtual StringBuffer & getServer(StringBuffer & server) const
  1038. {
  1039. server.append(prop->queryProp("@server"));
  1040. return server;
  1041. }
  1042. private:
  1043. IPropertyTree * prop;
  1044. };
  1045. class CConstDropZoneInfo : public CConstEnvBase, implements IConstDropZoneInfo
  1046. {
  1047. public:
  1048. IMPLEMENT_IINTERFACE;
  1049. IMPLEMENT_ICONSTENVBASE;
  1050. CConstDropZoneInfo(CLocalEnvironment *env, IPropertyTree *root) : CConstEnvBase(env, root)
  1051. {
  1052. getStandardPosixPath(posixPath, root->queryProp("@directory"));
  1053. }
  1054. virtual IStringVal& getComputerName(IStringVal &str) const
  1055. {
  1056. str.set(root->queryProp("@computer"));
  1057. return str;
  1058. }
  1059. virtual IStringVal& getDescription(IStringVal &str) const
  1060. {
  1061. str.set(root->queryProp("@description"));
  1062. return str;
  1063. }
  1064. virtual IStringVal& getDirectory(IStringVal &str) const
  1065. {
  1066. str.set(posixPath.str());
  1067. return str;
  1068. }
  1069. virtual IStringVal& getUMask(IStringVal &str) const
  1070. {
  1071. if (root->hasProp("@umask"))
  1072. str.set(root->queryProp("@umask"));
  1073. return str;
  1074. }
  1075. virtual bool isECLWatchVisible() const
  1076. {
  1077. return root->getPropBool("@ECLWatchVisible", true);
  1078. }
  1079. virtual IConstDropZoneServerInfoIterator * getServers() const
  1080. {
  1081. return new CConstDropZoneServerInfoIterator(this);
  1082. }
  1083. private:
  1084. StringBuffer posixPath;
  1085. };
  1086. class CConstDfuQueueInfo : public CConstEnvBase, implements IConstDfuQueueInfo
  1087. {
  1088. public:
  1089. IMPLEMENT_IINTERFACE;
  1090. IMPLEMENT_ICONSTENVBASE;
  1091. CConstDfuQueueInfo(CLocalEnvironment *env, IPropertyTree *root) : CConstEnvBase(env, root)
  1092. {
  1093. }
  1094. virtual IStringVal& getDfuQueueName(IStringVal &str) const
  1095. {
  1096. str.set(root->queryProp("@queue"));
  1097. return str;
  1098. }
  1099. };
  1100. class CConstSparkThorInfo : public CConstEnvBase, implements IConstSparkThorInfo
  1101. {
  1102. public:
  1103. IMPLEMENT_IINTERFACE;
  1104. IMPLEMENT_ICONSTENVBASE;
  1105. CConstSparkThorInfo(CLocalEnvironment *env, IPropertyTree *root) : CConstEnvBase(env, root) {}
  1106. virtual IStringVal &getBuild(IStringVal &str) const
  1107. {
  1108. str.set(root->queryProp("@build"));
  1109. return str;
  1110. }
  1111. virtual IStringVal &getThorClusterName(IStringVal &str) const
  1112. {
  1113. str.set(root->queryProp("@ThorClusterName"));
  1114. return str;
  1115. }
  1116. virtual unsigned getSparkExecutorCores() const
  1117. {
  1118. return root->getPropInt("@SPARK_EXECUTOR_CORES", 0);
  1119. }
  1120. virtual unsigned __int64 getSparkExecutorMemory() const
  1121. {
  1122. return readSizeSetting(root->queryProp("@SPARK_EXECUTOR_MEMORY"), 0);
  1123. }
  1124. virtual unsigned getSparkMasterPort() const
  1125. {
  1126. return root->getPropInt("@SPARK_MASTER_PORT", 0);
  1127. }
  1128. virtual unsigned getSparkMasterWebUIPort() const
  1129. {
  1130. return root->getPropInt("@SPARK_MASTER_WEBUI_PORT", 0);
  1131. }
  1132. virtual unsigned getSparkWorkerCores() const
  1133. {
  1134. return root->getPropInt("@SPARK_WORKER_CORES", 0);
  1135. }
  1136. virtual unsigned __int64 getSparkWorkerMemory() const
  1137. {
  1138. return readSizeSetting(root->queryProp("@SPARK_WORKER_MEMORY"), 0);
  1139. }
  1140. virtual unsigned getSparkWorkerPort() const
  1141. {
  1142. return root->getPropInt("@SPARK_WORKER_PORT", 0);
  1143. }
  1144. virtual IConstInstanceInfoIterator *getInstanceIterator() const
  1145. {
  1146. return new CConstInstanceInfoIterator(env, root->getElements("Instance"));
  1147. }
  1148. };
  1149. #if 0
  1150. //==========================================================================================
  1151. class CConstProcessInfo : public CConstEnvBase, implements IConstProcessInfo
  1152. {
  1153. IArrayOf<IConstInstanceInfo> w;
  1154. CArrayIteratorOf<IInterface, IIterator> it;
  1155. public:
  1156. IMPLEMENT_IINTERFACE;
  1157. IMPLEMENT_ICONSTENVBASE;
  1158. CConstProcessInfo(CLocalEnvironment *env, IPropertyTree *root) : CConstEnvBase(env, root), it(w)
  1159. {
  1160. Owned<IPropertyTreeIterator> _it = root->getElements("*"); // MORE - should be instance
  1161. for (_it->first(); _it->isValid(); _it->next())
  1162. {
  1163. IPropertyTree *rp = &_it->query();
  1164. w.append(*new CConstInstanceInfo(env, rp)); // CConstInstanceInfo will link rp
  1165. }
  1166. }
  1167. bool first() { return it.first(); }
  1168. bool isValid() { return it.isValid(); }
  1169. bool next() { return it.next(); }
  1170. IConstInstanceInfo & query() { return (IConstInstanceInfo &) it.query();}
  1171. virtual IConstInstanceInfo * getInstance(const char *domain)
  1172. {
  1173. for (int pass=0; pass<2; pass++)
  1174. {
  1175. ForEachItemIn(idx, w)
  1176. {
  1177. Owned<IConstMachineInfo> m = w.item(idx).getMachine();
  1178. if (m)
  1179. {
  1180. Owned<IConstDomainInfo> dm = m->getDomain();
  1181. if (dm)
  1182. {
  1183. StringBuffer thisdomain;
  1184. //dm->getName(StringBufferAdaptor(thisdomain)); // confuses g++
  1185. StringBufferAdaptor strval(thisdomain);
  1186. dm->getName(strval);
  1187. if (thisdomain.length() && strcmp(domain, thisdomain.str())==0)
  1188. return LINK(&w.item(idx));
  1189. }
  1190. }
  1191. }
  1192. }
  1193. return nullptr;
  1194. }
  1195. };
  1196. #endif
  1197. class CConstDaFileSrvInfo : public CConstEnvBase, implements IConstDaFileSrvInfo
  1198. {
  1199. public:
  1200. IMPLEMENT_IINTERFACE;
  1201. IMPLEMENT_ICONSTENVBASE;
  1202. CConstDaFileSrvInfo(const CLocalEnvironment *env, IPropertyTree *root) : CConstEnvBase(env, root)
  1203. {
  1204. }
  1205. virtual const char *getName() const override
  1206. {
  1207. return root->queryProp("@name");
  1208. }
  1209. virtual unsigned getPort() const override
  1210. {
  1211. return root->getPropInt("@rowServicePort");
  1212. }
  1213. virtual bool getSecure() const override
  1214. {
  1215. return root->getPropBool("@rowServiceSSL");
  1216. }
  1217. };
  1218. //==========================================================================================
  1219. CLocalEnvironment::CLocalEnvironment(const char* environmentFile)
  1220. {
  1221. if (environmentFile && *environmentFile)
  1222. {
  1223. IPropertyTree* root = createPTreeFromXMLFile(environmentFile);
  1224. if (root)
  1225. p.set(root);
  1226. }
  1227. init();
  1228. }
  1229. CLocalEnvironment::CLocalEnvironment(IRemoteConnection *_conn, IPropertyTree* root/*=nullptr*/,
  1230. const char* path/*="/Environment"*/)
  1231. : xPath(path)
  1232. {
  1233. conn.set(_conn);
  1234. if (root)
  1235. p.set(root);
  1236. else
  1237. p.setown(conn->getRoot());
  1238. init();
  1239. }
  1240. void CLocalEnvironment::init()
  1241. {
  1242. machineCacheBuilt = false;
  1243. dropZoneCacheBuilt = false;
  1244. sparkThorCacheBuilt = false;
  1245. dfuQueueCacheBuilt = false;
  1246. numOfMachines = 0;
  1247. numOfDropZones = 0;
  1248. numOfSparkThors = 0;
  1249. numOfDfuQueues = 0;
  1250. isDropZoneRestrictionLoaded = false;
  1251. clusterGroupKeyNameCache = false;
  1252. ::getFileAccessUrl(fileAccessUrl);
  1253. }
  1254. CLocalEnvironment::~CLocalEnvironment()
  1255. {
  1256. if (conn)
  1257. conn->rollback();
  1258. }
  1259. IEnvironment& CLocalEnvironment::lock() const
  1260. {
  1261. return *new CLockedEnvironment((CLocalEnvironment*)this);
  1262. }
  1263. IStringVal & CLocalEnvironment::getName(IStringVal & str) const
  1264. {
  1265. synchronized procedure(safeCache);
  1266. str.set(p->queryProp("@name"));
  1267. return str;
  1268. }
  1269. IStringVal & CLocalEnvironment::getXML(IStringVal & str) const
  1270. {
  1271. StringBuffer xml;
  1272. {
  1273. synchronized procedure(safeCache);
  1274. toXML(p->queryBranch("."), xml);
  1275. }
  1276. str.set(xml.str());
  1277. return str;
  1278. }
  1279. IPropertyTree & CLocalEnvironment::getPTree() const
  1280. {
  1281. synchronized procedure(safeCache);
  1282. return *LINK(p);
  1283. }
  1284. IConstEnvBase * CLocalEnvironment::getCache(const char *path) const
  1285. {
  1286. IConstEnvBase * ret = cache.getValue(path);
  1287. ::Link(ret);
  1288. return ret;
  1289. }
  1290. void CLocalEnvironment::setCache(const char *path, IConstEnvBase *value) const
  1291. {
  1292. cache.setValue(path, value);
  1293. }
  1294. IConstDomainInfo * CLocalEnvironment::getDomain(const char * name) const
  1295. {
  1296. if (!name)
  1297. return nullptr;
  1298. StringBuffer xpath;
  1299. xpath.appendf("Hardware/Domain[@name=\"%s\"]", name);
  1300. synchronized procedure(safeCache);
  1301. IConstEnvBase *cached = getCache(xpath.str());
  1302. if (!cached)
  1303. {
  1304. IPropertyTree *d = p->queryPropTree(xpath.str());
  1305. if (!d)
  1306. return nullptr;
  1307. cached = new CConstDomainInfo((CLocalEnvironment *) this, d);
  1308. setCache(xpath.str(), cached);
  1309. }
  1310. return (IConstDomainInfo *) cached;
  1311. }
  1312. void CLocalEnvironment::buildMachineCache() const
  1313. {
  1314. synchronized procedure(safeCache);
  1315. if (!machineCacheBuilt)
  1316. {
  1317. Owned<IPropertyTreeIterator> it = p->getElements("Hardware/Computer");
  1318. ForEach(*it)
  1319. {
  1320. Owned<IConstEnvBase> cached = new CConstMachineInfo((CLocalEnvironment *) this, &it->query());
  1321. const char *name = it->query().queryProp("@name");
  1322. if (name)
  1323. {
  1324. StringBuffer x("Hardware/Computer[@name=\"");
  1325. x.append(name).append("\"]");
  1326. cache.setValue(x.str(), cached);
  1327. }
  1328. const char * netAddress = it->query().queryProp("@netAddress");
  1329. if (netAddress)
  1330. {
  1331. StringBuffer x("Hardware/Computer[@netAddress=\"");
  1332. x.append(netAddress).append("\"]");
  1333. cache.setValue(x.str(), cached);
  1334. IpAddress ip;
  1335. ip.ipset(netAddress);
  1336. if (ip.isLocal())
  1337. cache.setValue("Hardware/Computer[@netAddress=\".\"]", cached);
  1338. }
  1339. numOfMachines++;
  1340. StringBuffer x("Hardware/Computer[@id=\"");
  1341. x.append(MACHINE_PREFIX).append(numOfMachines).append("\"]");
  1342. cache.setValue(x.str(), cached);
  1343. }
  1344. machineCacheBuilt = true;
  1345. }
  1346. }
  1347. void CLocalEnvironment::buildDropZoneCache() const
  1348. {
  1349. synchronized procedure(safeCache);
  1350. if (!dropZoneCacheBuilt)
  1351. {
  1352. Owned<IPropertyTreeIterator> it = p->getElements("Software/DropZone");
  1353. ForEach(*it)
  1354. {
  1355. const char *name = it->query().queryProp("@name");
  1356. if (name)
  1357. {
  1358. StringBuffer x("Software/DropZone[@name=\"");
  1359. x.append(name).append("\"]");
  1360. Owned<IConstEnvBase> cached = new CConstDropZoneInfo((CLocalEnvironment *) this, &it->query());
  1361. cache.setValue(x.str(), cached);
  1362. }
  1363. numOfDropZones++;
  1364. StringBuffer x("Software/DropZone[@id=\"");
  1365. x.append(DROPZONE_SUFFIX).append(numOfDropZones).append("\"]");
  1366. Owned<IConstEnvBase> cached = new CConstDropZoneInfo((CLocalEnvironment *) this, &it->query());
  1367. cache.setValue(x.str(), cached);
  1368. }
  1369. dropZoneCacheBuilt = true;
  1370. }
  1371. }
  1372. void CLocalEnvironment::buildDfuQueueCache() const
  1373. {
  1374. synchronized procedure(safeCache);
  1375. if (!dfuQueueCacheBuilt)
  1376. {
  1377. Owned<IPropertyTreeIterator> it = p->getElements("Software/DfuServerProcess");
  1378. ForEach(*it)
  1379. {
  1380. const char *qname = it->query().queryProp("@queue");
  1381. if (qname)
  1382. {
  1383. StringBuffer x("Software/DfuQueue[@qname=\"");
  1384. x.append(qname).append("\"]");
  1385. Owned<IConstEnvBase> cached = new CConstDfuQueueInfo((CLocalEnvironment *) this, &it->query());
  1386. cache.setValue(x.str(), cached);
  1387. }
  1388. numOfDfuQueues++;
  1389. StringBuffer x("Software/DfuQueue[@id=\"");
  1390. x.append(numOfDfuQueues).append("\"]");
  1391. Owned<IConstEnvBase> cached = new CConstDfuQueueInfo((CLocalEnvironment *) this, &it->query());
  1392. cache.setValue(x.str(), cached);
  1393. }
  1394. dfuQueueCacheBuilt = true;
  1395. }
  1396. }
  1397. IConstComputerTypeInfo * CLocalEnvironment::getComputerType(const char * name) const
  1398. {
  1399. if (!name)
  1400. return nullptr;
  1401. StringBuffer xpath;
  1402. xpath.appendf("Hardware/ComputerType[@name=\"%s\"]", name);
  1403. synchronized procedure(safeCache);
  1404. IConstEnvBase *cached = getCache(xpath.str());
  1405. if (!cached)
  1406. {
  1407. IPropertyTree *d = p->queryPropTree(xpath.str());
  1408. if (!d)
  1409. return nullptr;
  1410. cached = new CConstComputerTypeInfo((CLocalEnvironment *) this, d);
  1411. setCache(xpath.str(), cached);
  1412. }
  1413. return (CConstComputerTypeInfo *) cached;
  1414. }
  1415. IConstMachineInfo * CLocalEnvironment::getMachine(const char * name) const
  1416. {
  1417. if (!name)
  1418. return nullptr;
  1419. buildMachineCache();
  1420. StringBuffer xpath;
  1421. xpath.appendf("Hardware/Computer[@name=\"%s\"]", name);
  1422. synchronized procedure(safeCache);
  1423. IConstEnvBase *cached = getCache(xpath.str());
  1424. if (!cached)
  1425. {
  1426. IPropertyTree *d = p->queryPropTree(xpath.str());
  1427. if (!d)
  1428. return nullptr;
  1429. cached = new CConstMachineInfo((CLocalEnvironment *) this, d);
  1430. setCache(xpath.str(), cached);
  1431. }
  1432. return (CConstMachineInfo *) cached;
  1433. }
  1434. IConstMachineInfo * CLocalEnvironment::getMachineByAddress(const char * hostOrIP) const
  1435. {
  1436. if (isEmptyString(hostOrIP))
  1437. return nullptr;
  1438. buildMachineCache();
  1439. synchronized procedure(safeCache);
  1440. VStringBuffer xpath("Hardware/Computer[@netAddress=\"%s\"]", hostOrIP);
  1441. IConstEnvBase *cached = getCache(hostOrIP);
  1442. if (cached)
  1443. return (CConstMachineInfo *) cached;
  1444. IPropertyTree *d = p->queryPropTree(xpath); // exact match
  1445. if (!d && !isIPAddress(xpath)) // if not found and not an IP, resolve and match against resolved entries
  1446. {
  1447. IpAddress ip(hostOrIP);
  1448. Owned<IPropertyTreeIterator> iter = p->getElements("Hardware/Computer");
  1449. ForEach(*iter)
  1450. {
  1451. IPropertyTree &computer = iter->query();
  1452. IpAddress computerIP;
  1453. const char *computerNetAddress = computer.queryProp("@netAddress");
  1454. if (!isEmptyString(computerNetAddress))
  1455. {
  1456. // NB: could 1st check if computerNetAddress isIPAddress() and not bother resolving here if it is.
  1457. computerIP.ipset(computerNetAddress);
  1458. if (ip.ipequals(computerIP))
  1459. {
  1460. d = &computer;
  1461. break;
  1462. }
  1463. }
  1464. }
  1465. }
  1466. if (!d)
  1467. return nullptr;
  1468. cached = new CConstMachineInfo((CLocalEnvironment *) this, d);
  1469. setCache(xpath.str(), cached);
  1470. return (CConstMachineInfo *) cached;
  1471. }
  1472. IConstMachineInfo * CLocalEnvironment::getMachineByIndex(unsigned index) const
  1473. {
  1474. if (!numOfMachines || (index == 0))
  1475. return nullptr;
  1476. buildMachineCache();
  1477. if (index > numOfMachines)
  1478. return nullptr;
  1479. StringBuffer xpath("Hardware/Computer[@id=\"");
  1480. xpath.append(MACHINE_PREFIX).append(index).append("\"]");
  1481. synchronized procedure(safeCache);
  1482. return (IConstMachineInfo *) getCache(xpath.str());
  1483. }
  1484. IConstMachineInfo * CLocalEnvironment::getMachineForLocalHost() const
  1485. {
  1486. buildMachineCache();
  1487. synchronized procedure(safeCache);
  1488. return (CConstMachineInfo *) getCache("Hardware/Computer[@netAddress=\".\"]");
  1489. }
  1490. IConstDropZoneInfo * CLocalEnvironment::getDropZone(const char * name) const
  1491. {
  1492. if (!name)
  1493. return nullptr;
  1494. buildDropZoneCache();
  1495. VStringBuffer xpath("Software/DropZone[@name=\"%s\"]", name);
  1496. synchronized procedure(safeCache);
  1497. return (CConstDropZoneInfo *) getCache(xpath.str());
  1498. }
  1499. IConstDropZoneInfo * CLocalEnvironment::getDropZoneByIndex(unsigned index) const
  1500. {
  1501. if (!numOfDropZones || (index == 0))
  1502. return nullptr;
  1503. buildDropZoneCache();
  1504. if (index > numOfDropZones)
  1505. return nullptr;
  1506. StringBuffer xpath("Software/DropZone[@id=\"");
  1507. xpath.append(DROPZONE_SUFFIX).append(index).append("\"]");
  1508. synchronized procedure(safeCache);
  1509. return (CConstDropZoneInfo *) getCache(xpath.str());
  1510. }
  1511. IConstDfuQueueInfo * CLocalEnvironment::getDfuQueueByIndex(unsigned index) const
  1512. {
  1513. if (!numOfDfuQueues || (index == 0))
  1514. return nullptr;
  1515. buildDfuQueueCache();
  1516. if (index > numOfDfuQueues)
  1517. return nullptr;
  1518. StringBuffer xpath("Software/DfuQueue[@id=\"");
  1519. xpath.append(index).append("\"]");
  1520. synchronized procedure(safeCache);
  1521. return (CConstDfuQueueInfo *) getCache(xpath.str());
  1522. }
  1523. IConstInstanceInfo * CLocalEnvironment::getInstance(const char *type, const char *version, const char *domain) const
  1524. {
  1525. StringBuffer xpath("Software/");
  1526. xpath.append(type);
  1527. if (version)
  1528. xpath.append("[@version='").append(version).append("']");
  1529. xpath.append("/Instance");
  1530. synchronized procedure(safeCache);
  1531. Owned<IPropertyTreeIterator> _it = p->getElements(xpath);
  1532. for (_it->first(); _it->isValid(); _it->next())
  1533. {
  1534. IPropertyTree *rp = &_it->query();
  1535. Owned<CConstInstanceInfo> inst = new CConstInstanceInfo(this, rp); // CConstInstanceInfo will link rp
  1536. Owned<IConstMachineInfo> m = inst->getMachine();
  1537. if (m)
  1538. {
  1539. Owned<IConstDomainInfo> dm = m->getDomain();
  1540. if (dm)
  1541. {
  1542. SCMStringBuffer thisdomain;
  1543. dm->getName(thisdomain);
  1544. if (thisdomain.length() && strcmp(domain, thisdomain.str())==0)
  1545. return inst.getClear();
  1546. }
  1547. }
  1548. }
  1549. return nullptr;
  1550. }
  1551. CConstInstanceInfo * CLocalEnvironment::getInstanceByIP(const char *type, const char *version, IpAddress &ip) const
  1552. {
  1553. StringBuffer xpath("Software/");
  1554. xpath.append(type);
  1555. if (version)
  1556. xpath.append("[@version='").append(version).append("']");
  1557. xpath.append("/Instance");
  1558. synchronized procedure(safeCache);
  1559. assertex(p);
  1560. Owned<IPropertyTreeIterator> _it = p->getElements(xpath);
  1561. assertex(_it);
  1562. for (_it->first(); _it->isValid(); _it->next())
  1563. {
  1564. IPropertyTree *rp = &_it->query();
  1565. assertex(rp);
  1566. Owned<CConstInstanceInfo> inst = new CConstInstanceInfo(this, rp); // CConstInstanceInfo will link rp
  1567. Owned<IConstMachineInfo> m = inst->getMachine();
  1568. if (m)
  1569. {
  1570. SCMStringBuffer eps;
  1571. m->getNetAddress(eps);
  1572. SocketEndpoint ep(eps.str());
  1573. if (ep.ipequals(ip))
  1574. return inst.getClear();
  1575. }
  1576. }
  1577. return nullptr;
  1578. }
  1579. void CLocalEnvironment::unlockRemote()
  1580. {
  1581. #if 0
  1582. conn->commit(true);
  1583. conn->changeMode(0, SDS_LOCK_TIMEOUT);
  1584. #else
  1585. if (conn)
  1586. {
  1587. synchronized procedure(safeCache);
  1588. p.clear();
  1589. conn.setown(querySDS().connect(xPath.str(), myProcessSession(), 0, SDS_LOCK_TIMEOUT));
  1590. p.setown(conn->getRoot());
  1591. }
  1592. #endif
  1593. }
  1594. void CLocalEnvironment::preload()
  1595. {
  1596. synchronized procedure(safeCache);
  1597. p->queryBranch(".");
  1598. }
  1599. void CLocalEnvironment::setXML(const char *xml)
  1600. {
  1601. Owned<IPropertyTree> newRoot = createPTreeFromXMLString(xml, ipt_lowmem);
  1602. synchronized procedure(safeCache);
  1603. Owned<IPropertyTreeIterator> it = p->getElements("*");
  1604. ForEach(*it)
  1605. {
  1606. p->removeTree(&it->query());
  1607. }
  1608. it.setown(newRoot->getElements("*"));
  1609. ForEach(*it)
  1610. {
  1611. IPropertyTree *sub = &it->get();
  1612. p->addPropTree(sub->queryName(), sub);
  1613. }
  1614. }
  1615. bool CLocalEnvironment::getRunInfo(IStringVal & path, IStringVal & dir, const char * tag, const char * version, const char *machineaddr, const char *defprogname) const
  1616. {
  1617. try
  1618. {
  1619. // DBGLOG("getExecutablePath %s %s %s", tag, version, machineaddr);
  1620. // first see if local machine with deployed on
  1621. SocketEndpoint ep(machineaddr);
  1622. Owned<CConstInstanceInfo> ipinstance = getInstanceByIP(tag, version, ep);
  1623. if (ipinstance)
  1624. {
  1625. StringAttr testpath;
  1626. StringAttrAdaptor teststrval(testpath);
  1627. if (ipinstance->doGetRunInfo(teststrval,dir,defprogname,false))
  1628. { // this returns full string
  1629. RemoteFilename rfn;
  1630. rfn.setRemotePath(testpath.get());
  1631. Owned<IFile> file = createIFile(rfn);
  1632. if (file->exists())
  1633. {
  1634. StringBuffer tmp;
  1635. rfn.getLocalPath(tmp);
  1636. path.set(tmp.str());
  1637. return true;
  1638. }
  1639. }
  1640. }
  1641. Owned<IConstMachineInfo> machine = getMachineByAddress(machineaddr);
  1642. if (!machine)
  1643. {
  1644. LOG(MCdebugInfo, unknownJob, "Unable to find machine for %s", machineaddr);
  1645. return false;
  1646. }
  1647. StringAttr targetdomain;
  1648. Owned<IConstDomainInfo> domain = machine->getDomain();
  1649. if (!domain)
  1650. {
  1651. LOG(MCdebugInfo, unknownJob, "Unable to find domain for %s", machineaddr);
  1652. return false;
  1653. }
  1654. //domain->getName(StringAttrAdaptor(targetdomain)); // confuses g++
  1655. StringAttrAdaptor strval(targetdomain);
  1656. domain->getName(strval);
  1657. Owned<IConstInstanceInfo> instance = getInstance(tag, version, targetdomain);
  1658. if (!instance)
  1659. {
  1660. LOG(MCdebugInfo, unknownJob, "Unable to find process %s for domain %s", tag, targetdomain.get());
  1661. return false;
  1662. }
  1663. return instance->getRunInfo(path,dir,defprogname);
  1664. }
  1665. catch (IException * e)
  1666. {
  1667. EXCLOG(e, "Extracting slave version");
  1668. e->Release();
  1669. return false;
  1670. }
  1671. }
  1672. void CLocalEnvironment::clearCache()
  1673. {
  1674. synchronized procedure(safeCache);
  1675. if (conn)
  1676. {
  1677. p.clear();
  1678. unsigned mode = 0;
  1679. try
  1680. {
  1681. conn->reload();
  1682. }
  1683. catch (IException *e)
  1684. {
  1685. EXCLOG(e, "Failed to reload connection");
  1686. e->Release();
  1687. mode = conn->queryMode();
  1688. conn.clear();
  1689. }
  1690. if (!conn)
  1691. conn.setown(querySDS().connect(xPath, myProcessSession(), mode, SDS_LOCK_TIMEOUT));
  1692. p.setown(conn->getRoot());
  1693. }
  1694. cache.kill();
  1695. keyGroupMap.clear();
  1696. keyPairMap.clear();
  1697. init();
  1698. }
  1699. IConstDropZoneInfo * CLocalEnvironment::getDropZoneByAddressPath(const char * netaddress, const char *targetFilePath) const
  1700. {
  1701. IConstDropZoneInfo * dropZone = nullptr;
  1702. IpAddress targetIp(netaddress);
  1703. unsigned dropzonePathLen = _MAX_PATH + 1;
  1704. #ifdef _DEBUG
  1705. LOG(MCdebugInfo, unknownJob, "Netaddress: '%s', targetFilePath: '%s'", netaddress, targetFilePath);
  1706. #endif
  1707. // Check the directory path first
  1708. Owned<IConstDropZoneInfoIterator> zoneIt = getDropZoneIterator();
  1709. ForEach(*zoneIt)
  1710. {
  1711. SCMStringBuffer dropZoneDir;
  1712. zoneIt->query().getDirectory(dropZoneDir);
  1713. StringBuffer fullDropZoneDir(dropZoneDir.str());
  1714. addPathSepChar(fullDropZoneDir);
  1715. IConstDropZoneInfo * candidateDropZone = nullptr;
  1716. if (strncmp(fullDropZoneDir, targetFilePath, fullDropZoneDir.length()) == 0)
  1717. {
  1718. candidateDropZone = &zoneIt->query();
  1719. // The backward compatibility built in IConstDropZoneServerInfoIterator
  1720. Owned<IConstDropZoneServerInfoIterator> dropzoneServerListIt = candidateDropZone->getServers();
  1721. ForEach(*dropzoneServerListIt)
  1722. {
  1723. StringBuffer dropzoneServer;
  1724. dropzoneServerListIt->query().getServer(dropzoneServer);
  1725. // It can be a hostname or an IP -> get the IP
  1726. IpAddress serverIP(dropzoneServer.str());
  1727. #ifdef _DEBUG
  1728. StringBuffer serverIpString;
  1729. serverIP.getIpText(serverIpString);
  1730. LOG(MCdebugInfo, unknownJob, "Listed server: '%s', IP: '%s'", dropzoneServer.str(), serverIpString.str());
  1731. #endif
  1732. if (targetIp.ipequals(serverIP))
  1733. {
  1734. // OK the target is a valid machine in the server list we have a right drop zone candidate
  1735. // Keep this candidate drop zone if its directory path is shorter than we already have
  1736. if (dropzonePathLen > fullDropZoneDir.length())
  1737. {
  1738. dropzonePathLen = fullDropZoneDir.length();
  1739. dropZone = candidateDropZone;
  1740. }
  1741. break;
  1742. }
  1743. }
  1744. }
  1745. }
  1746. return LINK(dropZone);
  1747. }
  1748. IConstDropZoneInfoIterator * CLocalEnvironment::getDropZoneIteratorByAddress(const char *addr) const
  1749. {
  1750. class CByAddrIter : public CSimpleInterfaceOf<IConstDropZoneInfoIterator>
  1751. {
  1752. IArrayOf<IConstDropZoneInfo> matches;
  1753. unsigned cur = NotFound;
  1754. public:
  1755. CByAddrIter(IConstDropZoneInfoIterator *baseIter, const char *addr)
  1756. {
  1757. IpAddress toMatch(addr);
  1758. ForEach(*baseIter)
  1759. {
  1760. IConstDropZoneInfo &dz = baseIter->query();
  1761. Owned<IConstDropZoneServerInfoIterator> serverIter = dz.getServers();
  1762. ForEach(*serverIter)
  1763. {
  1764. IConstDropZoneServerInfo &serverElem = serverIter->query();
  1765. StringBuffer serverName;
  1766. IpAddress serverIp(serverElem.getServer(serverName).str());
  1767. if (serverIp.ipequals(toMatch))
  1768. {
  1769. matches.append(*LINK(&dz));
  1770. break;
  1771. }
  1772. }
  1773. }
  1774. }
  1775. virtual bool first() override
  1776. {
  1777. if (0 == matches.ordinality())
  1778. {
  1779. cur = NotFound;
  1780. return false;
  1781. }
  1782. cur = 0;
  1783. return true;
  1784. }
  1785. virtual bool next() override
  1786. {
  1787. if (cur+1==matches.ordinality())
  1788. {
  1789. cur = NotFound;
  1790. return false;
  1791. }
  1792. ++cur;
  1793. return true;
  1794. }
  1795. virtual bool isValid() override
  1796. {
  1797. return NotFound != cur;
  1798. }
  1799. virtual IConstDropZoneInfo &query() override
  1800. {
  1801. assertex(NotFound != cur);
  1802. return matches.item(cur);
  1803. }
  1804. virtual unsigned count() const override
  1805. {
  1806. return matches.ordinality();
  1807. }
  1808. };
  1809. Owned<IConstDropZoneInfoIterator> baseIter = new CConstDropZoneInfoIterator();
  1810. return new CByAddrIter(baseIter, addr);
  1811. }
  1812. IConstDropZoneInfoIterator * CLocalEnvironment::getDropZoneIterator() const
  1813. {
  1814. return new CConstDropZoneInfoIterator();
  1815. }
  1816. IConstDfuQueueInfoIterator * CLocalEnvironment::getDfuQueueIterator() const
  1817. {
  1818. return new CConstDfuQueueInfoIterator();
  1819. }
  1820. bool CLocalEnvironment::isValidDfuQueueName(const char * queueName) const
  1821. {
  1822. bool retVal = false;
  1823. if (!isEmptyString(queueName))
  1824. {
  1825. Owned<IConstDfuQueueInfoIterator> queueIt = getDfuQueueIterator();
  1826. ForEach(*queueIt)
  1827. {
  1828. SCMStringBuffer _queueName;
  1829. queueIt->query().getDfuQueueName(_queueName);
  1830. retVal = streq(queueName, _queueName.str());
  1831. }
  1832. }
  1833. return retVal;
  1834. }
  1835. IConstMachineInfoIterator * CLocalEnvironment::getMachineIterator() const
  1836. {
  1837. return new CConstMachineInfoIterator();
  1838. }
  1839. bool CLocalEnvironment::isDropZoneRestrictionEnabled() const
  1840. {
  1841. if (!isDropZoneRestrictionLoaded)
  1842. {
  1843. dropZoneRestrictionEnabled = queryEnvironmentConf().getPropBool("useDropZoneRestriction", true);
  1844. isDropZoneRestrictionLoaded=true;
  1845. }
  1846. return dropZoneRestrictionEnabled;
  1847. }
  1848. IConstDaFileSrvInfo *CLocalEnvironment::getDaFileSrvGroupInfo(const char *name) const
  1849. {
  1850. if (!name)
  1851. return nullptr;
  1852. VStringBuffer xpath("Software/DafilesrvGroup[@name=\"%s\"]", name);
  1853. synchronized procedure(safeCache);
  1854. IConstEnvBase *cached = getCache(xpath.str());
  1855. if (!cached)
  1856. {
  1857. IPropertyTree *d = p->queryPropTree(xpath.str());
  1858. if (!d)
  1859. return nullptr;
  1860. cached = new CConstDaFileSrvInfo(this, d);
  1861. setCache(xpath.str(), cached);
  1862. }
  1863. return (IConstDaFileSrvInfo *) cached;
  1864. }
  1865. IConstSparkThorInfo *CLocalEnvironment::getSparkThor(const char *name) const
  1866. {
  1867. if (isEmptyString(name))
  1868. return nullptr;
  1869. buildSparkThorCache();
  1870. VStringBuffer xpath("Software/SparkThor[@name=\"%s\"]", name);
  1871. synchronized procedure(safeCache);
  1872. return (CConstSparkThorInfo *) getCache(xpath);
  1873. }
  1874. IConstSparkThorInfo *CLocalEnvironment::getSparkThorByIndex(unsigned index) const
  1875. {
  1876. if (index == 0)
  1877. return nullptr;
  1878. buildSparkThorCache();
  1879. if (index > numOfSparkThors)
  1880. return nullptr;
  1881. StringBuffer xpath("Software/SparkThor[@id=\"");
  1882. xpath.append(SPARKTHOR_SUFFIX).append(index).append("\"]");
  1883. synchronized procedure(safeCache);
  1884. return (CConstSparkThorInfo *) getCache(xpath);
  1885. }
  1886. IConstSparkThorInfoIterator *CLocalEnvironment::getSparkThorIterator() const
  1887. {
  1888. return new CConstSparkThorInfoIterator();
  1889. }
  1890. void CLocalEnvironment::buildSparkThorCache() const
  1891. {
  1892. synchronized procedure(safeCache);
  1893. if (sparkThorCacheBuilt)
  1894. return;
  1895. Owned<IPropertyTreeIterator> it = p->getElements("Software/SparkThorProcess");
  1896. ForEach(*it)
  1897. {
  1898. const char *name = it->query().queryProp("@name");
  1899. if (!isEmptyString(name))
  1900. {
  1901. StringBuffer x("Software/SparkThor[@name=\"");
  1902. x.append(name).append("\"]");
  1903. Owned<IConstEnvBase> cached = new CConstSparkThorInfo((CLocalEnvironment *) this, &it->query());
  1904. cache.setValue(x, cached);
  1905. }
  1906. numOfSparkThors++;
  1907. StringBuffer x("Software/SparkThor[@id=\"");
  1908. x.append(SPARKTHOR_SUFFIX).append(numOfSparkThors).append("\"]");
  1909. Owned<IConstEnvBase> cached = new CConstSparkThorInfo((CLocalEnvironment *) this, &it->query());
  1910. cache.setValue(x, cached);
  1911. }
  1912. sparkThorCacheBuilt = true;
  1913. }
  1914. //==========================================================================================
  1915. // Iterators implementation
  1916. CConstMachineInfoIterator::CConstMachineInfoIterator()
  1917. {
  1918. Owned<IEnvironmentFactory> factory = getEnvironmentFactory(true);
  1919. constEnv.setown((CLocalEnvironment *)factory->openEnvironment());
  1920. maxIndex = constEnv->getNumberOfMachines();
  1921. }
  1922. bool CConstMachineInfoIterator::first()
  1923. {
  1924. index = 1;
  1925. curr.setown(constEnv->getMachineByIndex(index));
  1926. return curr != nullptr;
  1927. }
  1928. bool CConstMachineInfoIterator::next()
  1929. {
  1930. if (index < maxIndex)
  1931. {
  1932. index++;
  1933. curr.setown(constEnv->getMachineByIndex(index));
  1934. }
  1935. else
  1936. curr.clear();
  1937. return curr != nullptr;
  1938. }
  1939. bool CConstMachineInfoIterator::isValid()
  1940. {
  1941. return curr != nullptr;
  1942. }
  1943. IConstMachineInfo & CConstMachineInfoIterator::query()
  1944. {
  1945. return *curr;
  1946. }
  1947. unsigned CConstMachineInfoIterator::count() const
  1948. {
  1949. return maxIndex;
  1950. }
  1951. CConstDropZoneServerInfoIterator::CConstDropZoneServerInfoIterator(const IConstDropZoneInfo * dropZone)
  1952. {
  1953. Owned<IEnvironmentFactory> factory = getEnvironmentFactory(true);
  1954. constEnv.setown((CLocalEnvironment *)factory->openEnvironment());
  1955. // For backward compatibility
  1956. SCMStringBuffer dropZoneMachineName;
  1957. // Returns dropzone '@computer' value if it is exists
  1958. dropZone->getComputerName(dropZoneMachineName);
  1959. if (0 != dropZoneMachineName.length())
  1960. {
  1961. // Create a ServerList for legacy element.
  1962. Owned<IPropertyTree> legacyServerList = createPTree(ipt_lowmem);
  1963. Owned<IConstMachineInfo> machineInfo = constEnv->getMachine(dropZoneMachineName.str());
  1964. if (machineInfo)
  1965. {
  1966. SCMStringBuffer dropZoneMachineNetAddress;
  1967. machineInfo->getNetAddress(dropZoneMachineNetAddress);
  1968. // Create a single ServerList record related to @computer
  1969. //<ServerList name="ServerList" server="<IP_of_@computer>"/>
  1970. IPropertyTree *newRecord = legacyServerList->addPropTree("ServerList");
  1971. newRecord->setProp("@name", "ServerList");
  1972. newRecord->setProp("@server", dropZoneMachineNetAddress.str());
  1973. maxIndex = 1;
  1974. }
  1975. else
  1976. {
  1977. // Something is terrible wrong because there is no matching machine for DropZone @computer
  1978. maxIndex = 0;
  1979. }
  1980. serverListIt.setown(legacyServerList->getElements("ServerList"));
  1981. }
  1982. else
  1983. {
  1984. Owned<IPropertyTree> pSrc = &dropZone->getPTree();
  1985. serverListIt.setown(pSrc->getElements("ServerList"));
  1986. maxIndex = pSrc->getCount("ServerList");
  1987. }
  1988. }
  1989. bool CConstDropZoneServerInfoIterator::first()
  1990. {
  1991. bool hasFirst = serverListIt->first();
  1992. if (hasFirst)
  1993. curr.setown(new CConstDropZoneServerInfo(constEnv, &serverListIt->query()));
  1994. else
  1995. curr.clear();
  1996. return hasFirst;
  1997. }
  1998. bool CConstDropZoneServerInfoIterator::next()
  1999. {
  2000. bool hasNext = serverListIt->next();
  2001. if (hasNext)
  2002. curr.setown(new CConstDropZoneServerInfo(constEnv, &serverListIt->query()));
  2003. else
  2004. curr.clear();
  2005. return hasNext;
  2006. }
  2007. bool CConstDropZoneServerInfoIterator::isValid()
  2008. {
  2009. return nullptr != curr;
  2010. }
  2011. IConstDropZoneServerInfo & CConstDropZoneServerInfoIterator::query()
  2012. {
  2013. return *curr;
  2014. }
  2015. unsigned CConstDropZoneServerInfoIterator::count() const
  2016. {
  2017. return maxIndex;
  2018. }
  2019. //--------------------------------------------------
  2020. CConstDropZoneInfoIterator::CConstDropZoneInfoIterator()
  2021. {
  2022. Owned<IEnvironmentFactory> factory = getEnvironmentFactory(true);
  2023. constEnv.setown((CLocalEnvironment *)factory->openEnvironment());
  2024. maxIndex = constEnv->getNumberOfDropZones();
  2025. }
  2026. bool CConstDropZoneInfoIterator::first()
  2027. {
  2028. index = 1;
  2029. curr.setown(constEnv->getDropZoneByIndex(index));
  2030. return curr != nullptr;
  2031. }
  2032. bool CConstDropZoneInfoIterator::next()
  2033. {
  2034. if (index < maxIndex)
  2035. {
  2036. index++;
  2037. curr.setown(constEnv->getDropZoneByIndex(index));
  2038. }
  2039. else
  2040. curr.clear();
  2041. return curr != nullptr;
  2042. }
  2043. bool CConstDropZoneInfoIterator::isValid()
  2044. {
  2045. return curr != nullptr;
  2046. }
  2047. IConstDropZoneInfo & CConstDropZoneInfoIterator::query()
  2048. {
  2049. return *curr;
  2050. }
  2051. unsigned CConstDropZoneInfoIterator::count() const
  2052. {
  2053. return maxIndex;
  2054. }
  2055. //--------------------------------------------------
  2056. CConstDfuQueueInfoIterator::CConstDfuQueueInfoIterator()
  2057. {
  2058. Owned<IEnvironmentFactory> factory = getEnvironmentFactory(true);
  2059. constEnv.setown((CLocalEnvironment *)factory->openEnvironment());
  2060. maxIndex = constEnv->getNumberOfDfuQueues();
  2061. }
  2062. bool CConstDfuQueueInfoIterator::first()
  2063. {
  2064. index = 1;
  2065. curr.setown(constEnv->getDfuQueueByIndex(index));
  2066. return curr != nullptr;
  2067. }
  2068. bool CConstDfuQueueInfoIterator::next()
  2069. {
  2070. if (index < maxIndex)
  2071. {
  2072. index++;
  2073. curr.setown(constEnv->getDfuQueueByIndex(index));
  2074. }
  2075. else
  2076. curr.clear();
  2077. return curr != nullptr;
  2078. }
  2079. bool CConstDfuQueueInfoIterator::isValid()
  2080. {
  2081. return curr != nullptr;
  2082. }
  2083. IConstDfuQueueInfo & CConstDfuQueueInfoIterator::query()
  2084. {
  2085. return *curr;
  2086. }
  2087. unsigned CConstDfuQueueInfoIterator::count() const
  2088. {
  2089. return maxIndex;
  2090. }
  2091. //--------------------------------------------------
  2092. CConstSparkThorInfoIterator::CConstSparkThorInfoIterator()
  2093. {
  2094. Owned<IEnvironmentFactory> factory = getEnvironmentFactory(true);
  2095. constEnv.setown((CLocalEnvironment *)factory->openEnvironment());
  2096. maxIndex = constEnv->getNumberOfSparkThors();
  2097. }
  2098. bool CConstSparkThorInfoIterator::first()
  2099. {
  2100. index = 1;
  2101. curr.setown(constEnv->getSparkThorByIndex(index));
  2102. return curr != nullptr;
  2103. }
  2104. bool CConstSparkThorInfoIterator::next()
  2105. {
  2106. if (index < maxIndex)
  2107. {
  2108. index++;
  2109. curr.setown(constEnv->getSparkThorByIndex(index));
  2110. }
  2111. else
  2112. curr.clear();
  2113. return curr != nullptr;
  2114. }
  2115. bool CConstSparkThorInfoIterator::isValid()
  2116. {
  2117. return curr != nullptr;
  2118. }
  2119. IConstSparkThorInfo &CConstSparkThorInfoIterator::query()
  2120. {
  2121. return *curr;
  2122. }
  2123. unsigned CConstSparkThorInfoIterator::count() const
  2124. {
  2125. return maxIndex;
  2126. }
  2127. //--------------------------------------------------
  2128. CConstInstanceInfoIterator::CConstInstanceInfoIterator(const CLocalEnvironment *env, IPropertyTreeIterator *itr)
  2129. : constEnv(env)
  2130. {
  2131. instanceItr.setown(itr);
  2132. maxIndex = 0;
  2133. ForEach(*instanceItr)
  2134. maxIndex++;
  2135. }
  2136. bool CConstInstanceInfoIterator::first()
  2137. {
  2138. index = 1;
  2139. instanceItr->first();
  2140. curr.setown(new CConstInstanceInfo(constEnv, &instanceItr->query()));
  2141. return curr != nullptr;
  2142. }
  2143. bool CConstInstanceInfoIterator::next()
  2144. {
  2145. if (index < maxIndex)
  2146. {
  2147. index++;
  2148. instanceItr->next();
  2149. curr.setown(new CConstInstanceInfo(constEnv, &instanceItr->query()));
  2150. }
  2151. else
  2152. curr.clear();
  2153. return curr != nullptr;
  2154. }
  2155. bool CConstInstanceInfoIterator::isValid()
  2156. {
  2157. return curr != nullptr;
  2158. }
  2159. IConstInstanceInfo &CConstInstanceInfoIterator::query()
  2160. {
  2161. return *curr;
  2162. }
  2163. unsigned CConstInstanceInfoIterator::count() const
  2164. {
  2165. return maxIndex;
  2166. }
  2167. //==========================================================================================
  2168. static CriticalSection getEnvSect;
  2169. extern ENVIRONMENT_API IEnvironmentFactory * getEnvironmentFactory(bool update)
  2170. {
  2171. CriticalBlock block(getEnvSect);
  2172. if (!factory)
  2173. {
  2174. factory = new CEnvironmentFactory();
  2175. addShutdownHook(*factory);
  2176. }
  2177. if (update)
  2178. factory->validateCache();
  2179. return LINK(factory);
  2180. }
  2181. extern ENVIRONMENT_API void closeEnvironment()
  2182. {
  2183. try
  2184. {
  2185. CEnvironmentFactory* pFactory;
  2186. {
  2187. //this method is not meant to be invoked by multiple
  2188. //threads concurrently but just in case...
  2189. CriticalBlock block(getEnvSect);
  2190. pFactory = factory;
  2191. factory = nullptr;
  2192. }
  2193. if (pFactory)
  2194. {
  2195. removeShutdownHook(*pFactory);
  2196. pFactory->close();
  2197. pFactory->Release();
  2198. }
  2199. }
  2200. catch (IException *e)
  2201. {
  2202. EXCLOG(e);
  2203. }
  2204. }
  2205. unsigned getAccessibleServiceURLList(const char *serviceType, std::vector<std::string> &list)
  2206. {
  2207. unsigned added = 0;
  2208. Owned<IEnvironmentFactory> factory = getEnvironmentFactory(true);
  2209. Owned<IConstEnvironment> daliEnv = factory->openEnvironment();
  2210. Owned<IPropertyTree> env = &daliEnv->getPTree();
  2211. if (env.get())
  2212. {
  2213. StringBuffer fileMetaServiceUrl;
  2214. StringBuffer espInstanceComputerName;
  2215. StringBuffer bindingProtocol;
  2216. StringBuffer xpath;
  2217. StringBuffer instanceAddress;
  2218. StringBuffer espServiceType;
  2219. Owned<IPropertyTreeIterator> espProcessIter = env->getElements("Software/EspProcess");
  2220. ForEach(*espProcessIter)
  2221. {
  2222. Owned<IPropertyTreeIterator> espBindingIter = espProcessIter->query().getElements("EspBinding");
  2223. ForEach(*espBindingIter)
  2224. {
  2225. xpath.setf("Software/EspService[@name=\"%s\"]/Properties/@type", espBindingIter->query().queryProp("@service"));
  2226. if (strisame(env->queryProp(xpath), serviceType))
  2227. {
  2228. if (espBindingIter->query().getProp("@protocol", bindingProtocol.clear()))
  2229. {
  2230. Owned<IPropertyTreeIterator> espInstanceIter = espProcessIter->query().getElements("Instance");
  2231. ForEach(*espInstanceIter)
  2232. {
  2233. if (espInstanceIter->query().getProp("@computer", espInstanceComputerName.clear()))
  2234. {
  2235. xpath.setf("Hardware/Computer[@name=\"%s\"]/@netAddress", espInstanceComputerName.str());
  2236. if (env->getProp(xpath.str(), instanceAddress.clear()))
  2237. {
  2238. fileMetaServiceUrl.setf("%s://%s:%d", bindingProtocol.str(), instanceAddress.str(), espBindingIter->query().getPropInt("@port",8010));
  2239. list.push_back(fileMetaServiceUrl.str());
  2240. ++added;
  2241. }
  2242. }
  2243. }
  2244. }
  2245. }
  2246. }//ESPBinding
  2247. }//ESPProcess
  2248. }
  2249. return added;
  2250. }
  2251. //------------------- Moved from workunit.cpp -------------
  2252. IPropertyTree *queryRoxieProcessTree(IPropertyTree *environment, const char *process)
  2253. {
  2254. if (!process || !*process)
  2255. return NULL;
  2256. VStringBuffer xpath("Software/RoxieCluster[@name=\"%s\"]", process);
  2257. return environment->queryPropTree(xpath.str());
  2258. }
  2259. void getRoxieProcessServers(IPropertyTree *roxie, SocketEndpointArray &endpoints)
  2260. {
  2261. if (!roxie)
  2262. return;
  2263. Owned<IPropertyTreeIterator> servers = roxie->getElements("RoxieServerProcess");
  2264. ForEach(*servers)
  2265. {
  2266. IPropertyTree &server = servers->query();
  2267. const char *netAddress = server.queryProp("@netAddress");
  2268. if (netAddress && *netAddress)
  2269. {
  2270. SocketEndpoint ep(netAddress, server.getPropInt("@port", 9876));
  2271. endpoints.append(ep);
  2272. }
  2273. }
  2274. }
  2275. void getRoxieProcessServers(const char *process, SocketEndpointArray &servers)
  2276. {
  2277. Owned<IEnvironmentFactory> factory = getEnvironmentFactory(true);
  2278. Owned<IConstEnvironment> env = factory->openEnvironment();
  2279. Owned<IPropertyTree> root = &env->getPTree();
  2280. getRoxieProcessServers(queryRoxieProcessTree(root, process), servers);
  2281. }
  2282. #define WUERR_MismatchClusterSize 5008
  2283. class CEnvironmentClusterInfo: implements IConstWUClusterInfo, public CInterface
  2284. {
  2285. StringAttr name;
  2286. StringAttr alias;
  2287. StringAttr serverQueue;
  2288. StringAttr agentQueue;
  2289. StringAttr agentName;
  2290. StringAttr roxieProcess;
  2291. SocketEndpointArray roxieServers;
  2292. StringAttr thorQueue;
  2293. StringArray thorProcesses;
  2294. StringArray primaryThorProcesses;
  2295. StringAttr prefix;
  2296. StringAttr ldapUser;
  2297. StringBuffer ldapPassword;
  2298. ClusterType platform;
  2299. unsigned clusterWidth;
  2300. unsigned roxieRedundancy;
  2301. unsigned channelsPerNode;
  2302. unsigned numberOfSlaveLogs;
  2303. int roxieReplicateOffset;
  2304. public:
  2305. IMPLEMENT_IINTERFACE;
  2306. CEnvironmentClusterInfo(const char *_name, const char *_prefix, const char *_alias, IPropertyTree *agent, IArrayOf<IPropertyTree> &thors, IPropertyTree *roxie)
  2307. : name(_name), prefix(_prefix), alias(_alias), roxieRedundancy(0), channelsPerNode(0), numberOfSlaveLogs(0), roxieReplicateOffset(1)
  2308. {
  2309. StringBuffer queue;
  2310. if (thors.ordinality())
  2311. {
  2312. thorQueue.set(getClusterThorQueueName(queue.clear(), name));
  2313. clusterWidth = 0;
  2314. bool isMultiThor = (thors.length() > 1);
  2315. ForEachItemIn(i,thors)
  2316. {
  2317. IPropertyTree &thor = thors.item(i);
  2318. const char* thorName = thor.queryProp("@name");
  2319. thorProcesses.append(thorName);
  2320. if (!isMultiThor)
  2321. primaryThorProcesses.append(thorName);
  2322. else
  2323. {
  2324. const char *nodeGroup = thor.queryProp("@nodeGroup");
  2325. if (!nodeGroup || strieq(nodeGroup, thorName))
  2326. primaryThorProcesses.append(thorName);
  2327. }
  2328. unsigned nodes = thor.getCount("ThorSlaveProcess");
  2329. unsigned slavesPerNode = thor.getPropInt("@slavesPerNode", 1);
  2330. unsigned channelsPerSlave = thor.getPropInt("@channelsPerSlave", 1);
  2331. unsigned ts = nodes * slavesPerNode * channelsPerSlave;
  2332. numberOfSlaveLogs = nodes * slavesPerNode;
  2333. if (clusterWidth && (ts!=clusterWidth))
  2334. throw MakeStringException(WUERR_MismatchClusterSize,"CEnvironmentClusterInfo: mismatched thor sizes in cluster");
  2335. clusterWidth = ts;
  2336. }
  2337. platform = ThorLCRCluster;
  2338. }
  2339. else if (roxie)
  2340. {
  2341. roxieProcess.set(roxie->queryProp("@name"));
  2342. platform = RoxieCluster;
  2343. getRoxieProcessServers(roxie, roxieServers);
  2344. clusterWidth = roxieServers.length();
  2345. ldapUser.set(roxie->queryProp("@ldapUser"));
  2346. StringBuffer encPassword (roxie->queryProp("@ldapPassword"));
  2347. if (encPassword.length())
  2348. decrypt(ldapPassword, encPassword);
  2349. const char *redundancyMode = roxie->queryProp("@slaveConfig");
  2350. if (redundancyMode && *redundancyMode)
  2351. {
  2352. unsigned dataCopies = roxie->getPropInt("@numDataCopies", 1);
  2353. if (strieq(redundancyMode, "overloaded"))
  2354. channelsPerNode = roxie->getPropInt("@channelsPernode", 1);
  2355. else if (strieq(redundancyMode, "full redundancy"))
  2356. {
  2357. roxieRedundancy = dataCopies-1;
  2358. roxieReplicateOffset = 0;
  2359. }
  2360. else if (strieq(redundancyMode, "cyclic redundancy"))
  2361. {
  2362. roxieRedundancy = dataCopies-1;
  2363. channelsPerNode = dataCopies;
  2364. roxieReplicateOffset = roxie->getPropInt("@cyclicOffset", 1);
  2365. }
  2366. }
  2367. }
  2368. else
  2369. {
  2370. clusterWidth = 1;
  2371. platform = HThorCluster;
  2372. }
  2373. #ifdef _CONTAINERIZED
  2374. if (agent || roxie)
  2375. {
  2376. agentQueue.set(getClusterEclAgentQueueName(queue.clear(), name));
  2377. if (agent)
  2378. agentName.set(agent->queryProp("@name"));
  2379. }
  2380. #else
  2381. if (agent)
  2382. {
  2383. assertex(!roxie);
  2384. agentQueue.set(getClusterEclAgentQueueName(queue.clear(), name));
  2385. agentName.set(agent->queryProp("@name"));
  2386. }
  2387. else if (roxie)
  2388. agentQueue.set(getClusterRoxieQueueName(queue.clear(), name));
  2389. #endif
  2390. // MORE - does this need to be conditional?
  2391. serverQueue.set(getClusterEclCCServerQueueName(queue.clear(), name));
  2392. }
  2393. IStringVal & getName(IStringVal & str) const
  2394. {
  2395. str.set(name.get());
  2396. return str;
  2397. }
  2398. const char *getAlias() const
  2399. {
  2400. return alias;
  2401. }
  2402. IStringVal & getScope(IStringVal & str) const
  2403. {
  2404. str.set(prefix.get());
  2405. return str;
  2406. }
  2407. IStringVal & getAgentQueue(IStringVal & str) const
  2408. {
  2409. str.set(agentQueue);
  2410. return str;
  2411. }
  2412. IStringVal & getAgentName(IStringVal & str) const
  2413. {
  2414. str.set(agentName);
  2415. return str;
  2416. }
  2417. virtual IStringVal & getServerQueue(IStringVal & str) const
  2418. {
  2419. str.set(serverQueue);
  2420. return str;
  2421. }
  2422. IStringVal & getThorQueue(IStringVal & str) const
  2423. {
  2424. str.set(thorQueue);
  2425. return str;
  2426. }
  2427. unsigned getSize() const
  2428. {
  2429. return clusterWidth;
  2430. }
  2431. unsigned getNumberOfSlaveLogs() const
  2432. {
  2433. return numberOfSlaveLogs;
  2434. }
  2435. virtual ClusterType getPlatform() const
  2436. {
  2437. return platform;
  2438. }
  2439. IStringVal & getRoxieProcess(IStringVal & str) const
  2440. {
  2441. str.set(roxieProcess.get());
  2442. return str;
  2443. }
  2444. const StringArray & getThorProcesses() const
  2445. {
  2446. return thorProcesses;
  2447. }
  2448. const StringArray & getPrimaryThorProcesses() const
  2449. {
  2450. return primaryThorProcesses;
  2451. }
  2452. const SocketEndpointArray & getRoxieServers() const
  2453. {
  2454. return roxieServers;
  2455. }
  2456. unsigned getRoxieRedundancy() const
  2457. {
  2458. return roxieRedundancy;
  2459. }
  2460. unsigned getChannelsPerNode() const
  2461. {
  2462. return channelsPerNode;
  2463. }
  2464. int getRoxieReplicateOffset() const
  2465. {
  2466. return roxieReplicateOffset;
  2467. }
  2468. const char *getLdapUser() const
  2469. {
  2470. return ldapUser.get();
  2471. }
  2472. virtual const char *getLdapPassword() const
  2473. {
  2474. return ldapPassword.str();
  2475. }
  2476. };
  2477. IStringVal &getProcessQueueNames(IStringVal &ret, const char *process, const char *type, const char *suffix)
  2478. {
  2479. if (process)
  2480. {
  2481. Owned<IEnvironmentFactory> factory = getEnvironmentFactory(true);
  2482. Owned<IConstEnvironment> env = factory->openEnvironment();
  2483. Owned<IPropertyTree> root = &env->getPTree();
  2484. StringBuffer queueNames;
  2485. StringBuffer xpath;
  2486. xpath.appendf("%s[@process=\"%s\"]", type, process);
  2487. Owned<IPropertyTreeIterator> targets = root->getElements("Software/Topology/Cluster");
  2488. ForEach(*targets)
  2489. {
  2490. IPropertyTree &target = targets->query();
  2491. if (target.hasProp(xpath))
  2492. {
  2493. if (queueNames.length())
  2494. queueNames.append(',');
  2495. queueNames.append(target.queryProp("@name")).append(suffix);
  2496. }
  2497. }
  2498. ret.set(queueNames);
  2499. }
  2500. return ret;
  2501. }
  2502. extern void getDFUServerQueueNames(StringArray &ret, const char *process)
  2503. {
  2504. Owned<IEnvironmentFactory> factory = getEnvironmentFactory(true);
  2505. Owned<IConstEnvironment> env = factory->openEnvironment();
  2506. StringBuffer xpath ("Software/DfuServerProcess");
  2507. if (!isEmptyString(process))
  2508. xpath.appendf("[@name=\"%s\"]", process);
  2509. Owned<IPropertyTree> root = &env->getPTree();
  2510. Owned<IPropertyTreeIterator> targets = root->getElements(xpath.str());
  2511. ForEach(*targets)
  2512. {
  2513. IPropertyTree &target = targets->query();
  2514. if (target.hasProp("@queue"))
  2515. ret.appendListUniq(target.queryProp("@queue"), ",");
  2516. }
  2517. return;
  2518. }
  2519. extern IStringVal &getEclCCServerQueueNames(IStringVal &ret, const char *process)
  2520. {
  2521. return getProcessQueueNames(ret, process, "EclCCServerProcess", ECLCCSERVER_QUEUE_EXT);
  2522. }
  2523. extern IStringVal &getEclServerQueueNames(IStringVal &ret, const char *process)
  2524. {
  2525. return getProcessQueueNames(ret, process, "EclServerProcess", ECLSERVER_QUEUE_EXT); // shares queue name with EclCCServer
  2526. }
  2527. extern IStringVal &getEclSchedulerQueueNames(IStringVal &ret, const char *process)
  2528. {
  2529. return getProcessQueueNames(ret, process, "EclSchedulerProcess", ECLSCHEDULER_QUEUE_EXT); // Shares deployment/config with EclCCServer
  2530. }
  2531. extern IStringVal &getAgentQueueNames(IStringVal &ret, const char *process)
  2532. {
  2533. return getProcessQueueNames(ret, process, "EclAgentProcess", ECLAGENT_QUEUE_EXT);
  2534. }
  2535. extern IStringVal &getRoxieQueueNames(IStringVal &ret, const char *process)
  2536. {
  2537. return getProcessQueueNames(ret, process, "RoxieCluster", ROXIE_QUEUE_EXT);
  2538. }
  2539. extern IStringVal &getThorQueueNames(IStringVal &ret, const char *process)
  2540. {
  2541. return getProcessQueueNames(ret, process, "ThorCluster", THOR_QUEUE_EXT);
  2542. }
  2543. extern StringBuffer &getClusterThorGroupName(StringBuffer &ret, const char *cluster)
  2544. {
  2545. Owned<IEnvironmentFactory> factory = getEnvironmentFactory(true);
  2546. Owned<IConstEnvironment> env = factory->openEnvironment();
  2547. Owned<IPropertyTree> root = &env->getPTree();
  2548. StringBuffer path;
  2549. path.append("Software/ThorCluster[@name=\"").append(cluster).append("\"]");
  2550. IPropertyTree * child = root->queryPropTree(path);
  2551. if (child)
  2552. getClusterGroupName(*child, ret);
  2553. return ret;
  2554. }
  2555. extern StringBuffer &getClusterGroupName(StringBuffer &ret, const char *cluster)
  2556. {
  2557. Owned<IEnvironmentFactory> factory = getEnvironmentFactory(true);
  2558. Owned<IConstEnvironment> env = factory->openEnvironment();
  2559. Owned<IPropertyTree> root = &env->getPTree();
  2560. StringBuffer path;
  2561. path.set("Software/ThorCluster[@name=\"").append(cluster).append("\"]");
  2562. IPropertyTree * child = root->queryPropTree(path);
  2563. if (child)
  2564. {
  2565. return getClusterGroupName(*child, ret);
  2566. }
  2567. path.set("Software/RoxieCluster[@name=\"").append(cluster).append("\"]");
  2568. child = root->queryPropTree(path);
  2569. if (child)
  2570. {
  2571. return getClusterGroupName(*child, ret);
  2572. }
  2573. path.set("Software/EclAgentProcess[@name=\"").append(cluster).append("\"]");
  2574. child = root->queryPropTree(path);
  2575. if (child)
  2576. {
  2577. return ret.setf("hthor__%s", cluster);
  2578. }
  2579. return ret;
  2580. }
  2581. extern ClusterType getClusterTypeByClusterName(const char *cluster)
  2582. {
  2583. Owned<IEnvironmentFactory> factory = getEnvironmentFactory(true);
  2584. Owned<IConstEnvironment> env = factory->openEnvironment();
  2585. Owned<IPropertyTree> root = &env->getPTree();
  2586. StringBuffer path;
  2587. path.set("Software/ThorCluster[@name=\"").append(cluster).append("\"]");
  2588. if (root->hasProp(path))
  2589. return ThorLCRCluster;
  2590. path.set("Software/RoxieCluster[@name=\"").append(cluster).append("\"]");
  2591. if (root->hasProp(path))
  2592. return RoxieCluster;
  2593. path.set("Software/EclAgentProcess[@name=\"").append(cluster).append("\"]");
  2594. if (root->hasProp(path))
  2595. return HThorCluster;
  2596. return NoCluster;
  2597. }
  2598. extern IStringIterator *getTargetClusters(const char *processType, const char *processName)
  2599. {
  2600. Owned<CStringArrayIterator> ret = new CStringArrayIterator;
  2601. Owned<IEnvironmentFactory> factory = getEnvironmentFactory(true);
  2602. Owned<IConstEnvironment> env = factory->openEnvironment();
  2603. Owned<IPropertyTree> root = &env->getPTree();
  2604. StringBuffer xpath;
  2605. xpath.appendf("%s", processType ? processType : "*");
  2606. if (processName && *processName)
  2607. xpath.appendf("[@process=\"%s\"]", processName);
  2608. Owned<IPropertyTreeIterator> targets = root->getElements("Software/Topology/Cluster");
  2609. ForEach(*targets)
  2610. {
  2611. IPropertyTree &target = targets->query();
  2612. if (target.hasProp(xpath))
  2613. {
  2614. ret->append(target.queryProp("@name"));
  2615. }
  2616. }
  2617. return ret.getClear();
  2618. }
  2619. extern bool isProcessCluster(const char *process)
  2620. {
  2621. if (!process || !*process)
  2622. return false;
  2623. Owned<IEnvironmentFactory> factory = getEnvironmentFactory(true);
  2624. Owned<IConstEnvironment> env = factory->openEnvironment();
  2625. Owned<IPropertyTree> root = &env->getPTree();
  2626. VStringBuffer xpath("Software/*Cluster[@name=\"%s\"]", process);
  2627. return root->hasProp(xpath.str());
  2628. }
  2629. extern bool isProcessCluster(const char *remoteDali, const char *process)
  2630. {
  2631. if (!remoteDali || !*remoteDali)
  2632. return isProcessCluster(process);
  2633. if (!process || !*process)
  2634. return false;
  2635. Owned<INode> remote = createINode(remoteDali, 7070);
  2636. if (!remote)
  2637. return false;
  2638. //Cannot use getEnvironmentFactory() since it is using a remotedali
  2639. VStringBuffer xpath("Environment/Software/*Cluster[@name=\"%s\"]/@name", process);
  2640. try
  2641. {
  2642. Owned<IPropertyTreeIterator> clusters = querySDS().getElementsRaw(xpath, remote, 1000*60*1);
  2643. return clusters->first();
  2644. }
  2645. catch (IException *E)
  2646. {
  2647. StringBuffer msg;
  2648. E->errorMessage(msg);
  2649. DBGLOG("Exception validating cluster %s/%s: %s", remoteDali, xpath.str(), msg.str());
  2650. E->Release();
  2651. }
  2652. return true;
  2653. }
  2654. IConstWUClusterInfo* getTargetClusterInfo(IPropertyTree *environment, IPropertyTree *cluster)
  2655. {
  2656. const char *clustname = cluster->queryProp("@name");
  2657. // MORE - at the moment configenv specifies eclagent and thor queues by (in effect) placing an 'example' thor or eclagent in the topology
  2658. // that uses the queue that will be used.
  2659. // We should and I hope will change that, at which point the code below gets simpler
  2660. StringBuffer prefix(cluster->queryProp("@prefix"));
  2661. prefix.toLowerCase();
  2662. StringBuffer xpath;
  2663. StringBuffer querySetName;
  2664. IPropertyTree *agent = NULL;
  2665. const char *agentName = cluster->queryProp("EclAgentProcess/@process");
  2666. if (agentName)
  2667. {
  2668. xpath.clear().appendf("Software/EclAgentProcess[@name=\"%s\"]", agentName);
  2669. agent = environment->queryPropTree(xpath.str());
  2670. }
  2671. Owned<IPropertyTreeIterator> ti = cluster->getElements("ThorCluster");
  2672. IArrayOf<IPropertyTree> thors;
  2673. ForEach(*ti)
  2674. {
  2675. const char *thorName = ti->query().queryProp("@process");
  2676. if (thorName)
  2677. {
  2678. xpath.clear().appendf("Software/ThorCluster[@name=\"%s\"]", thorName);
  2679. if (environment->hasProp(xpath.str()))
  2680. thors.append(*environment->getPropTree(xpath.str()));
  2681. }
  2682. }
  2683. const char *roxieName = cluster->queryProp("RoxieCluster/@process");
  2684. return new CEnvironmentClusterInfo(clustname, prefix, cluster->queryProp("@alias"), agent, thors, queryRoxieProcessTree(environment, roxieName));
  2685. }
  2686. IPropertyTree* getTopologyCluster(Owned<IPropertyTree> &envRoot, const char *clustname)
  2687. {
  2688. if (!clustname || !*clustname)
  2689. return NULL;
  2690. Owned<IEnvironmentFactory> factory = getEnvironmentFactory(true);
  2691. Owned<IConstEnvironment> env = factory->openEnvironment();
  2692. envRoot.setown(&env->getPTree());
  2693. StringBuffer xpath;
  2694. xpath.appendf("Software/Topology/Cluster[@name=\"%s\"]", clustname);
  2695. return envRoot->getPropTree(xpath.str());
  2696. }
  2697. bool validateTargetClusterName(const char *clustname)
  2698. {
  2699. Owned<IPropertyTree> envRoot;
  2700. Owned<IPropertyTree> cluster = getTopologyCluster(envRoot, clustname);
  2701. return (cluster.get()!=NULL);
  2702. }
  2703. IConstWUClusterInfo* getTargetClusterInfo(const char *clustname)
  2704. {
  2705. Owned<IPropertyTree> envRoot;
  2706. Owned<IPropertyTree> cluster = getTopologyCluster(envRoot, clustname);
  2707. if (!cluster)
  2708. return NULL;
  2709. return getTargetClusterInfo(envRoot, cluster);
  2710. }
  2711. unsigned getEnvironmentClusterInfo(CConstWUClusterInfoArray &clusters)
  2712. {
  2713. Owned<IEnvironmentFactory> factory = getEnvironmentFactory(true);
  2714. Owned<IConstEnvironment> env = factory->openEnvironment();
  2715. Owned<IPropertyTree> root = &env->getPTree();
  2716. return getEnvironmentClusterInfo(root, clusters);
  2717. }
  2718. unsigned getEnvironmentClusterInfo(IPropertyTree* environmentRoot, CConstWUClusterInfoArray &clusters)
  2719. {
  2720. if (!environmentRoot)
  2721. return 0;
  2722. Owned<IPropertyTreeIterator> clusterIter = environmentRoot->getElements("Software/Topology/Cluster");
  2723. ForEach(*clusterIter)
  2724. {
  2725. IPropertyTree &node = clusterIter->query();
  2726. Owned<IConstWUClusterInfo> cluster = getTargetClusterInfo(environmentRoot, &node);
  2727. clusters.append(*cluster.getClear());
  2728. }
  2729. return clusters.ordinality();
  2730. }
  2731. const char *getTargetClusterComponentName(const char *clustname, const char *processType, StringBuffer &name)
  2732. {
  2733. if (!clustname)
  2734. return NULL;
  2735. Owned<IEnvironmentFactory> factory = getEnvironmentFactory(true);
  2736. Owned<IConstEnvironment> env = factory->openEnvironment();
  2737. Owned<IPropertyTree> root = &env->getPTree();
  2738. StringBuffer xpath;
  2739. xpath.appendf("Software/Topology/Cluster[@name=\"%s\"]", clustname);
  2740. Owned<IPropertyTree> cluster = root->getPropTree(xpath.str());
  2741. if (!cluster)
  2742. return NULL;
  2743. StringBuffer xpath1;
  2744. xpath1.appendf("%s/@process", processType);
  2745. name.append(cluster->queryProp(xpath1.str()));
  2746. return name.str();
  2747. }
  2748. unsigned getEnvironmentThorClusterNames(StringArray &thorNames, StringArray &groupNames, StringArray &targetNames, StringArray &queueNames)
  2749. {
  2750. Owned<IEnvironmentFactory> factory = getEnvironmentFactory(true);
  2751. Owned<IConstEnvironment> env = factory->openEnvironment();
  2752. Owned<IPropertyTree> root = &env->getPTree();
  2753. Owned<IPropertyTreeIterator> allTargets = root->getElements("Software/Topology/Cluster");
  2754. ForEach(*allTargets)
  2755. {
  2756. IPropertyTree &target = allTargets->query();
  2757. const char *targetName = target.queryProp("@name");
  2758. if (targetName && *targetName)
  2759. {
  2760. Owned<IPropertyTreeIterator> thorClusters = target.getElements("ThorCluster");
  2761. ForEach(*thorClusters)
  2762. {
  2763. const char *thorName = thorClusters->query().queryProp("@process");
  2764. VStringBuffer query("Software/ThorCluster[@name=\"%s\"]",thorName);
  2765. IPropertyTree *thorCluster = root->queryPropTree(query.str());
  2766. if (thorCluster)
  2767. {
  2768. const char *groupName = thorCluster->queryProp("@nodeGroup");
  2769. if (!groupName||!*groupName)
  2770. groupName = thorName;
  2771. thorNames.append(thorName);
  2772. groupNames.append(groupName);
  2773. targetNames.append(targetName);
  2774. StringBuffer queueName(targetName);
  2775. queueNames.append(queueName.append(THOR_QUEUE_EXT));
  2776. }
  2777. }
  2778. }
  2779. }
  2780. return thorNames.ordinality();
  2781. }