ccdstate.cpp 104 KB


  1. /*##############################################################################
  2. HPCC SYSTEMS software Copyright (C) 2012 HPCC Systems.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. ############################################################################## */
  13. #include <platform.h>
  14. #include <jlib.hpp>
  15. #include "build-config.h"
  16. #include "jisem.hpp"
  17. #include "jsort.hpp"
  18. #include "jregexp.hpp"
  19. #include "ccd.hpp"
  20. #include "ccdquery.hpp"
  21. #include "ccdstate.hpp"
  22. #include "ccdqueue.ipp"
  23. #include "ccdlistener.hpp"
  24. #include "ccdfile.hpp"
  25. #include "ccdsnmp.hpp"
  26. #include "hqlplugins.hpp"
  27. #include "thorplugin.hpp"
  28. #include "eclrtl.hpp"
  29. #include "dafdesc.hpp"
  30. #include "dautils.hpp"
  31. #include "pkgimpl.hpp"
  32. //-------------------------------------------------------------------------------------------
  33. // class CRoxiePluginCtx - provide the environments for plugins loaded by roxie.
  34. // Base class handles making sure memory allocation comes from the right heap.
  35. // implement get/set properties to allow plugin configuration information to be retrieved from Roxie topology file
  36. //-------------------------------------------------------------------------------------------
  37. class CRoxiePluginCtx : public SimplePluginCtx
  38. {
  39. public:
  40. virtual int ctxGetPropInt(const char *propName, int defaultValue) const
  41. {
  42. return topology->getPropInt(propName, defaultValue);
  43. }
  44. virtual const char *ctxQueryProp(const char *propName) const
  45. {
  46. return topology->queryProp(propName);
  47. }
  48. } PluginCtx;
  49. SafePluginMap *plugins;
  50. //================================================================================================
  51. // In legacy state files, the original file names passed in _fileName or _indexFileName may have been translated into _superFileName or _superKeyName,
  52. // and then 0 or more (max 1 for subfiles, no limit for subkeys) _fileName or _indexFileName will have been added. This translation will not take place
  53. // if the files resolve to single file/key, or if we are using new embedded wu system
  54. // Basic mode of operation therefore is to get the original name, see if it can be resolved by package into a list of subfiles, and if not, use
  55. // iterator on the xgmml node to get the list.
  56. // These two helper functions will return the original filenames placed in the XGMML by the codegen, regardless of how/if roxieconfig resolved them
  57. const char *queryNodeFileName(const IPropertyTree &graphNode)
  58. {
  59. const char *id = graphNode.queryProp("att[@name='_fileName']/@value");
  60. return id;
  61. }
  62. const char *queryNodeIndexName(const IPropertyTree &graphNode)
  63. {
  64. const char * id = graphNode.queryProp("att[@name='_indexFileName']/@value");
  65. if (!id && !graphNode.hasProp("att[@name='_indexFileName_dynamic']")) // can remove soon
  66. id = graphNode.queryProp("att[@name='_fileName']/@value");
  67. return id;
  68. }
  69. class CSimpleSuperFileArray : public CInterface, implements ISimpleSuperFileEnquiry
  70. {
  71. IArrayOf<IPropertyTree> subFiles;
  72. public:
  73. IMPLEMENT_IINTERFACE;
  74. CSimpleSuperFileArray(IPropertyTreeIterator &_subs)
  75. {
  76. ForEach(_subs)
  77. {
  78. IPropertyTree &sub = _subs.query();
  79. sub.Link();
  80. subFiles.append(sub);
  81. }
  82. }
  83. virtual unsigned numSubFiles() const
  84. {
  85. return subFiles.length();
  86. }
  87. virtual bool getSubFileName(unsigned num, StringBuffer &name) const
  88. {
  89. if (subFiles.isItem(num))
  90. {
  91. name.append(subFiles.item(num).queryProp("@value"));
  92. return true;
  93. }
  94. else
  95. return false;
  96. }
  97. virtual unsigned findSubName(const char *subname) const
  98. {
  99. ForEachItemIn(idx, subFiles)
  100. {
  101. if (stricmp(subFiles.item(idx).queryProp("@value"), subname))
  102. return idx;
  103. }
  104. return NotFound;
  105. }
  106. virtual unsigned getContents(StringArray &contents) const
  107. {
  108. ForEachItemIn(idx, subFiles)
  109. {
  110. contents.append(subFiles.item(idx).queryProp("@value"));
  111. }
  112. return subFiles.length();
  113. }
  114. };
  115. //-------------------------------------------------------------------------------------------
  116. // class CRoxiePackage - provide the environment in which file names and query options are interpreted
  117. // by a roxie query.
  118. // File names are resolved into IResolvedFile objects. A cache is used to ensure that the IResolvedFile is
  119. // shared wherever possible.
  120. // Effective environment is precomputed in mergedEnvironment for efficient recall by queries
  121. // Packages are described using XML files - see documentation for details.
  122. //-------------------------------------------------------------------------------------------
  123. /**
  124. * Packages are hierarchical - they are searched recursively to get the info you want
  125. * A PackageMap defines the entire environment - potentially each query that uses that PackageMap will pick a different package within it
  126. * A particular instantiation of a roxie query (i.e. a IQueryFactory) will have a pointer to the specific IRoxiePackage within the active PackageMap
  127. * that is providing its environment.
  128. *
  129. * A PackageMap can also indicate the name of the QuerySet it applies to. If not specified, at will apply to all QuerySets on the Roxie.
  130. *
  131. * A PackageSet is a list of PackageMap id's, and is used to tell Roxie what PackageMaps to load.
  132. * A Roxie can have multiple PackageMap's active. When updating the data, you might:
  133. * - create a new PackageMap to refer to the new data
  134. * - once it has loaded, mark it active, and mark the previous one as inactive
  135. * - Once sure no queries in flight, unload the previous one
  136. *
  137. * Each Roxie will load all PackageMaps that are in any PackageSet whose @process attribute matches the cluster name.
  138. *
  139. * All package information is stored in Dali (and cached locally)
  140. *
  141. * <PackageSets>
  142. * <PackageSet id = 'ps1' process='*'> # use this packageset for all roxies (same as omitting process)
  143. * <PackageMap id='pm1b' querySet='qs1' active='true'/> # Use the PackageMap pm1b for QuerySet qs1 and make it active
  144. * <PackageMap id='pm1a' querySet='qs1' active='false'/> # Use the PackageMap pm1a for QuerySet qs1 but don't make it active
  145. * <PackageMap id='pm2' querySet='dev*' active='true'/> # Use the PackageMap pm1a for all QuerySets with names starting dev and make it active
  146. * </PackageMapSet>
  147. * </PackageSets>
  148. *
  149. * <PackageMaps>
  150. * <PackageMap id='pm1a'>
  151. * <Package id='package1'>
  152. * ...
  153. * </Package>
  154. * <Package id='package2'>
  155. * </Package>
  156. * </PackageMap>
  157. * <PackageMap id='pm2'>
  158. * </PackageMap>
  159. * <PackageMap id='pm3'>
  160. * </PackageMap>
  161. * </PackageMaps>
  162. */
  163. class CResolvedFileCache : implements IResolvedFileCache
  164. {
  165. CriticalSection cacheLock;
  166. CopyMapStringToMyClass<IResolvedFile> files;
  167. public:
  168. // Retrieve number of files in cache
  169. inline unsigned count() const
  170. {
  171. return files.count();
  172. }
  173. // Add a filename and the corresponding IResolvedFile to the cache
  174. virtual void addCache(const char *filename, const IResolvedFile *file)
  175. {
  176. CriticalBlock b(cacheLock);
  177. IResolvedFile *add = const_cast<IResolvedFile *>(file);
  178. add->setCache(this);
  179. files.setValue(filename, add);
  180. }
  181. // Remove an IResolvedFile from the cache
  182. virtual void removeCache(const IResolvedFile *file)
  183. {
  184. CriticalBlock b(cacheLock);
  185. // NOTE: it's theoretically possible for the final release to happen after a replacement has been inserted into hash table.
  186. // So only remove from hash table if what we find there matches the item that is being deleted.
  187. IResolvedFile *goer = files.getValue(file->queryFileName());
  188. if (goer == file)
  189. files.remove(file->queryFileName());
  190. // You might want to remove files from the daliServer cache too, but it's not safe to do so here as there may be multiple package caches
  191. }
  192. // Lookup a filename in the cache
  193. virtual IResolvedFile *lookupCache(const char *filename)
  194. {
  195. CriticalBlock b(cacheLock);
  196. IResolvedFile *cache = files.getValue(filename);
  197. if (cache)
  198. {
  199. LINK(cache);
  200. if (cache->isAlive())
  201. return cache;
  202. }
  203. return NULL;
  204. }
  205. };
  206. class CRoxiePackageNode : extends CPackageNode, implements IRoxiePackage
  207. {
  208. protected:
  209. static CResolvedFileCache daliFiles;
  210. mutable CResolvedFileCache fileCache;
  211. virtual aindex_t getBaseCount() const = 0;
  212. virtual const CRoxiePackageNode *getBaseNode(aindex_t pos) const = 0;
  213. virtual bool getSysFieldTranslationEnabled() const {return fieldTranslationEnabled;} //roxie configured value
  214. // Use local package file only to resolve subfile into physical file info
  215. IResolvedFile *resolveLFNusingPackage(const char *fileName) const
  216. {
  217. if (node)
  218. {
  219. StringBuffer xpath;
  220. IPropertyTree *fileInfo = node->queryPropTree(xpath.appendf("File[@id='%s']", fileName).str());
  221. if (fileInfo)
  222. {
  223. Owned <IResolvedFileCreator> result = createResolvedFile(fileName, NULL, false);
  224. result->addSubFile(createFileDescriptorFromRoxieXML(fileInfo), NULL);
  225. return result.getClear();
  226. }
  227. }
  228. return NULL;
  229. }
  230. // Use dali to resolve subfile into physical file info
  231. static IResolvedFile *resolveLFNusingDaliOrLocal(const char *fileName, bool cacheIt, bool writeAccess, bool alwaysCreate)
  232. {
  233. // MORE - look at alwaysCreate... This may be useful to implement earlier locking semantics.
  234. IResolvedFile* result = daliFiles.lookupCache(fileName);
  235. if (result)
  236. return result;
  237. Owned<IRoxieDaliHelper> daliHelper = connectToDali();
  238. if (daliHelper)
  239. {
  240. if (daliHelper->connected())
  241. {
  242. Owned<IDistributedFile> dFile = daliHelper->resolveLFN(fileName, cacheIt, writeAccess);
  243. if (dFile)
  244. result = createResolvedFile(fileName, NULL, dFile.getClear(), daliHelper, cacheIt, writeAccess);
  245. }
  246. else if (!writeAccess) // If we need write access and expect a dali, but don't have one, we should probably fail
  247. {
  248. // we have no dali, we can't lock..
  249. Owned<IFileDescriptor> fd = daliHelper->resolveCachedLFN(fileName);
  250. if (fd)
  251. {
  252. Owned <IResolvedFileCreator> creator = createResolvedFile(fileName, NULL, false);
  253. Owned<IFileDescriptor> remoteFDesc = daliHelper->checkClonedFromRemote(fileName, fd, cacheIt);
  254. creator->addSubFile(fd.getClear(), remoteFDesc.getClear());
  255. result = creator.getClear();
  256. }
  257. }
  258. }
  259. if (!result)
  260. {
  261. StringBuffer useName;
  262. if (strstr(fileName,"::"))
  263. {
  264. bool wasDFS;
  265. makeSinglePhysicalPartName(fileName, useName, true, wasDFS);
  266. }
  267. else
  268. useName.append(fileName);
  269. bool exists = checkFileExists(useName);
  270. if (exists || alwaysCreate)
  271. {
  272. Owned <IResolvedFileCreator> creator = createResolvedFile(fileName, useName, false);
  273. if (exists)
  274. creator->addSubFile(useName);
  275. result = creator.getClear();
  276. }
  277. }
  278. if (result && cacheIt)
  279. daliFiles.addCache(fileName, result);
  280. return result;
  281. }
  282. // Use local package and its bases to resolve existing file into physical file info via all supported resolvers
  283. IResolvedFile *lookupExpandedFileName(const char *fileName, bool cache, bool writeAccess, bool alwaysCreate) const
  284. {
  285. IResolvedFile *result = lookupFile(fileName, cache, writeAccess, alwaysCreate);
  286. if (!result)
  287. result = resolveLFNusingDaliOrLocal(fileName, cache, writeAccess, alwaysCreate);
  288. return result;
  289. }
  290. IResolvedFile *lookupFile(const char *fileName, bool cache, bool writeAccess, bool alwaysCreate) const
  291. {
  292. // Order of resolution:
  293. // 1. Files named in package
  294. // 2. Files named in bases
  295. IResolvedFile* result = fileCache.lookupCache(fileName);
  296. if (result)
  297. return result;
  298. Owned<const ISimpleSuperFileEnquiry> subFileInfo = resolveSuperFile(fileName);
  299. if (subFileInfo)
  300. {
  301. unsigned numSubFiles = subFileInfo->numSubFiles();
  302. // Note: do not try to optimize the common case of a single subfile
  303. // as we still want to report the superfile info from the resolvedFile
  304. Owned<IResolvedFileCreator> super;
  305. for (unsigned idx = 0; idx < numSubFiles; idx++)
  306. {
  307. StringBuffer subFileName;
  308. subFileInfo->getSubFileName(idx, subFileName);
  309. Owned<const IResolvedFile> subFileInfo = lookupExpandedFileName(subFileName, cache, false, false); // NOTE - overwriting a superfile does NOT require write access to subfiles
  310. if (subFileInfo)
  311. {
  312. if (!super)
  313. super.setown(createResolvedFile(fileName, NULL, true));
  314. super->addSubFile(subFileInfo);
  315. }
  316. }
  317. if (super && cache)
  318. fileCache.addCache(fileName, super);
  319. return super.getClear();
  320. }
  321. result = resolveLFNusingPackage(fileName);
  322. if (result)
  323. {
  324. if (cache)
  325. fileCache.addCache(fileName, result);
  326. return result;
  327. }
  328. aindex_t count = getBaseCount();
  329. for (aindex_t i = 0; i < count; i++)
  330. {
  331. const CRoxiePackageNode *basePackage = getBaseNode(i);
  332. if (!basePackage)
  333. continue;
  334. IResolvedFile *result = basePackage->lookupFile(fileName, cache, writeAccess, alwaysCreate);
  335. if (result)
  336. return result;
  337. }
  338. return NULL;
  339. }
  340. // default constructor for derived class use
  341. CRoxiePackageNode()
  342. {
  343. }
  344. public:
  345. IMPLEMENT_IINTERFACE;
  346. CRoxiePackageNode(IPropertyTree *p) : CPackageNode(p)
  347. {
  348. if (!fileNameServiceDali.length())
  349. node->setPropBool("@localFiles", true);
  350. }
  351. ~CRoxiePackageNode()
  352. {
  353. assertex(fileCache.count()==0);
  354. // If it's possible for cached objects to outlive the cache I think there is a problem...
  355. // we could set the cache field to null here for any objects still in cache but there would be a race condition
  356. }
  357. virtual IPropertyTreeIterator *getInMemoryIndexInfo(const IPropertyTree &graphNode) const
  358. {
  359. StringBuffer xpath;
  360. xpath.append("SuperFile[@id='").append(queryNodeFileName(graphNode)).append("']");
  361. return lookupElements(xpath.str(), "MemIndex");
  362. }
  363. virtual const IResolvedFile *lookupFileName(const char *_fileName, bool opt, bool cache, IConstWorkUnit *wu) const
  364. {
  365. StringBuffer fileName;
  366. expandLogicalFilename(fileName, _fileName, wu, false);
  367. if (traceLevel > 5)
  368. DBGLOG("lookupFileName %s", fileName.str());
  369. const IResolvedFile *result = lookupExpandedFileName(fileName, cache, false, false);
  370. if (!result)
  371. {
  372. if (!opt)
  373. throw MakeStringException(ROXIE_FILE_ERROR, "Could not resolve filename %s", fileName.str());
  374. if (traceLevel > 4)
  375. DBGLOG("Could not resolve OPT filename %s", fileName.str());
  376. }
  377. return result;
  378. }
  379. virtual IRoxieWriteHandler *createFileName(const char *_fileName, bool overwrite, bool extend, const StringArray &clusters, IConstWorkUnit *wu) const
  380. {
  381. StringBuffer fileName;
  382. expandLogicalFilename(fileName, _fileName, wu, false);
  383. Owned<IResolvedFile> resolved = lookupFile(fileName, false, true, true);
  384. if (!resolved)
  385. resolved.setown(resolveLFNusingDaliOrLocal(fileName, false, true, true));
  386. if (resolved)
  387. {
  388. if (resolved->exists())
  389. {
  390. if (!overwrite)
  391. throw MakeStringException(99, "Cannot write %s, file already exists (missing OVERWRITE attribute?)", resolved->queryFileName());
  392. if (extend)
  393. UNIMPLEMENTED; // How does extend fit in with the clusterwritemanager stuff? They can't specify cluster and extend together...
  394. resolved->setCache(NULL);
  395. }
  396. if (resolved->queryPhysicalName())
  397. fileName.clear().append(resolved->queryPhysicalName());
  398. resolved.clear();
  399. }
  400. Owned<IRoxieDaliHelper> daliHelper = connectToDali();
  401. bool disconnected = !daliHelper->connected();
  402. // MORE - not sure this is really the right test. If there SHOULD be a dali but is's unavailable, we should fail.
  403. Owned<ILocalOrDistributedFile> ldFile = createLocalOrDistributedFile(fileName, NULL, disconnected, !disconnected, true);
  404. if (!ldFile)
  405. throw MakeStringException(ROXIE_FILE_ERROR, "Cannot write %s", fileName.str());
  406. return createRoxieWriteHandler(daliHelper, ldFile.getClear(), clusters);
  407. }
  408. //map ambiguous IHpccPackage
  409. virtual ISimpleSuperFileEnquiry *resolveSuperFile(const char *superFileName) const
  410. {
  411. return CPackageNode::resolveSuperFile(superFileName);
  412. }
  413. virtual const char *queryEnv(const char *varname) const
  414. {
  415. return CPackageNode::queryEnv(varname);
  416. }
  417. virtual bool getEnableFieldTranslation() const
  418. {
  419. return CPackageNode::getEnableFieldTranslation();
  420. }
  421. virtual const IPropertyTree *queryTree() const
  422. {
  423. return CPackageNode::queryTree();
  424. }
  425. virtual hash64_t queryHash() const
  426. {
  427. return CPackageNode::queryHash();
  428. }
  429. virtual const char *queryId() const
  430. {
  431. return CPackageNode::queryId();
  432. }
  433. };
  434. CResolvedFileCache CRoxiePackageNode::daliFiles;
  435. typedef CResolvedPackage<CRoxiePackageNode> CRoxiePackage;
  436. IRoxiePackage *createRoxiePackage(IPropertyTree *p, IRoxiePackageMap *packages)
  437. {
  438. Owned<CRoxiePackage> pkg = new CRoxiePackage(p);
  439. if (packages)
  440. pkg->resolveBases(packages);
  441. return pkg.getClear();
  442. }
  443. //================================================================================================
  444. // CPackageMap - an implementation of IPackageMap using a string map
  445. //================================================================================================
  446. class CRoxiePackageMap : public CPackageMapOf<CRoxiePackageNode, IRoxiePackage>, implements IRoxiePackageMap
  447. {
  448. public:
  449. IMPLEMENT_IINTERFACE;
  450. typedef CPackageMapOf<CRoxiePackageNode, IRoxiePackage> BASE;
  451. CRoxiePackageMap(const char *_packageId, const char *_querySet, bool _active)
  452. : BASE(_packageId, _querySet, _active)
  453. {
  454. }
  455. //map ambiguous IHpccPackageMap interface
  456. virtual const IHpccPackage *queryPackage(const char *name) const
  457. {
  458. return BASE::queryPackage(name);
  459. }
  460. virtual const IHpccPackage *matchPackage(const char *name) const
  461. {
  462. return BASE::matchPackage(name);
  463. }
  464. virtual const char *queryPackageId() const
  465. {
  466. return BASE::queryPackageId();
  467. }
  468. virtual bool isActive() const
  469. {
  470. return BASE::isActive();
  471. }
  472. virtual bool validate(const char *queryid, StringArray &wrn, StringArray &err, StringArray &unmatchedQueries, StringArray &unusedPackages, StringArray &unmatchedFiles) const
  473. {
  474. return BASE::validate(queryid, wrn, err, unmatchedQueries, unusedPackages, unmatchedFiles);
  475. }
  476. virtual void gatherFileMappingForQuery(const char *queryname, IPropertyTree *fileInfo) const
  477. {
  478. BASE::gatherFileMappingForQuery(queryname, fileInfo);
  479. }
  480. virtual const IRoxiePackage *queryRoxiePackage(const char *name) const
  481. {
  482. return queryResolvedPackage(name);
  483. }
  484. virtual const IRoxiePackage *matchRoxiePackage(const char *name) const
  485. {
  486. return matchResolvedPackage(name);
  487. }
  488. };
  489. static CRoxiePackageMap *emptyPackageMap;
  490. static CRoxiePackage *rootPackage;
  491. static SpinLock emptyPackageMapCrit;
  492. static IRoxieDebugSessionManager *debugSessionManager;
  493. extern const IRoxiePackage &queryRootRoxiePackage()
  494. {
  495. SpinBlock b(emptyPackageMapCrit);
  496. if (!rootPackage)
  497. {
  498. // Set up the root package. This contains global settings from topology file
  499. rootPackage = new CRoxiePackage(topology); // attributes become control: environment settings. Rest of topology ignored.
  500. rootPackage->resolveBases(NULL);
  501. }
  502. return *rootPackage;
  503. }
  504. extern const IRoxiePackageMap &queryEmptyRoxiePackageMap()
  505. {
  506. SpinBlock b(emptyPackageMapCrit);
  507. if (!emptyPackageMap)
  508. emptyPackageMap = new CRoxiePackageMap("<none>", NULL, true);
  509. return *emptyPackageMap;
  510. }
  511. MODULE_INIT(INIT_PRIORITY_STANDARD)
  512. {
  513. emptyPackageMap = NULL;
  514. debugSessionManager = NULL;
  515. return true;
  516. }
  517. MODULE_EXIT()
  518. {
  519. ::Release(emptyPackageMap); // You can't use static Owned to release anything that may own a IPropertyTree
  520. ::Release(rootPackage);
  521. ::Release(debugSessionManager);
  522. }
  523. // IRoxieQuerySetManager
  524. // - CRoxieQuerySetManager -
  525. // - CRoxieServerQuerySetManager
  526. // - CRoxieSlaveQuerySetManager
  527. //
  528. // Manages a set of instantiated queries and allows us to look them up by queryname or alias
  529. //
  530. // IRoxieQuerySetManagerSet
  531. // - CRoxieSlaveQuerySetManagerSet
  532. //
  533. // Manages the IRoxieQuerySetManager for multiple channels
  534. //
  535. // CRoxieQueryPackageManager
  536. // - CRoxieDaliQueryPackageManager
  537. // - CStandaloneQueryPackageManager
  538. //
  539. // Groups a server resource manager and a set of slave resource managers (one per channel) together.
  540. // There is one per PackageMap
  541. //
  542. // CQueryPackageSetManager at outer level
  543. // There will be exactly one of these. It will reload the CQueryPackageManager's if dali Package info changes
  544. //================================================================================================
  545. // CRoxieQuerySetManager - shared base class for slave and server query set manager classes
  546. // Manages a set of instantiated queries and allows us to look them up by queryname or alias,
  547. // as well as controlling their lifespan
  548. //================================================================================================
  549. class CRoxieQuerySetManager : public CInterface, implements IRoxieQuerySetManager
  550. {
  551. protected:
  552. MapStringToMyClass<IQueryFactory> queries;
  553. MapStringToMyClass<IQueryFactory> aliases; // Do we gain anything by having two tables?
  554. unsigned channelNo;
  555. bool active;
  556. StringAttr querySetName;
  557. void addQuery(const char *id, IQueryFactory *n, hash64_t &hash)
  558. {
  559. hash = rtlHash64Data(sizeof(hash), &hash, n->queryHash());
  560. queries.setValue(id, n);
  561. n->Release(); // setValue links
  562. }
  563. void addAlias(const char *alias, const char *original, hash64_t &hash)
  564. {
  565. if (original && alias)
  566. {
  567. IQueryFactory *orig = queries.getValue(original);
  568. if (orig)
  569. {
  570. hash = rtlHash64VStr(alias, hash);
  571. hash = rtlHash64Data(sizeof(hash), &hash, orig->queryHash());
  572. aliases.setValue(alias, orig);
  573. }
  574. else
  575. throw MakeStringException(ROXIE_UNKNOWN_QUERY, "Unknown query %s", original);
  576. }
  577. else
  578. throw MakeStringException(ROXIE_INTERNAL_ERROR, "Invalid parameters to addAlias");
  579. }
  580. virtual IQueryFactory *loadQueryFromDll(const char *id, const IQueryDll *dll, const IHpccPackage &package, const IPropertyTree *stateInfo) = 0;
  581. public:
  582. IMPLEMENT_IINTERFACE;
  583. CRoxieQuerySetManager(unsigned _channelNo, const char *_querySetName)
  584. : queries(true), aliases(true), active(false), querySetName(_querySetName)
  585. {
  586. channelNo = _channelNo;
  587. }
  588. virtual const char *queryId() const
  589. {
  590. return querySetName;
  591. }
  592. virtual bool isActive() const
  593. {
  594. return active;
  595. }
  596. virtual void load(const IPropertyTree *querySet, const IRoxiePackageMap &packages, hash64_t &hash)
  597. {
  598. Owned<IPropertyTreeIterator> queryNames = querySet->getElements("Query");
  599. ForEach (*queryNames)
  600. {
  601. const IPropertyTree &query = queryNames->query();
  602. const char *id = query.queryProp("@id");
  603. const char *dllName = query.queryProp("@dll");
  604. try
  605. {
  606. if (!id || !*id || !dllName || !*dllName)
  607. throw MakeStringException(ROXIE_QUERY_MODIFICATION, "dll and id must be specified");
  608. Owned<const IQueryDll> queryDll = createQueryDll(dllName);
  609. const IHpccPackage *package = NULL;
  610. const char *packageName = query.queryProp("@package");
  611. if (packageName && *packageName)
  612. {
  613. package = packages.queryPackage(packageName); // if a package is specified, require exact match
  614. if (!package)
  615. throw MakeStringException(ROXIE_QUERY_MODIFICATION, "Package %s specified by query %s not found", packageName, id);
  616. }
  617. else
  618. {
  619. package = packages.queryPackage(id); // Look for an exact match, then a fuzzy match, using query name as the package id
  620. if(!package) package = packages.matchPackage(id);
  621. if (!package) package = &queryRootRoxiePackage();
  622. }
  623. assertex(package);
  624. addQuery(id, loadQueryFromDll(id, queryDll.getClear(), *package, &query), hash);
  625. }
  626. catch (IException *E)
  627. {
  628. // we don't want a single bad query in the set to stop us loading all the others
  629. StringBuffer msg;
  630. msg.appendf("Failed to load query %s from %s", id ? id : "(null)", dllName ? dllName : "(null)");
  631. EXCLOG(E, msg.str());
  632. if (id)
  633. {
  634. StringBuffer emsg;
  635. E->errorMessage(emsg);
  636. Owned<IQueryFactory> dummyQuery = loadQueryFromDll(id, NULL, queryRootRoxiePackage(), NULL);
  637. dummyQuery->suspend(true, emsg.str(), "roxie", true);
  638. addQuery(id, dummyQuery.getClear(), hash);
  639. }
  640. E->Release();
  641. }
  642. }
  643. Owned<IPropertyTreeIterator> a = querySet->getElements("Alias");
  644. ForEach(*a)
  645. {
  646. IPropertyTree &item = a->query();
  647. const char *alias = item.queryProp("@name");
  648. const char *original = item.queryProp("@id");
  649. try
  650. {
  651. addAlias(alias, original, hash);
  652. }
  653. catch (IException *E)
  654. {
  655. // we don't want a single bad alias in the set to stop us loading all the others
  656. VStringBuffer msg("Failed to set alias %s on %s", alias, original);
  657. EXCLOG(E, msg.str());
  658. E->Release();
  659. }
  660. }
  661. active = packages.isActive();
  662. if (active)
  663. hash = rtlHash64VStr("active", hash);
  664. }
  665. virtual void getStats(const char *queryName, const char *graphName, StringBuffer &reply, const IRoxieContextLogger &logctx) const
  666. {
  667. Owned<IQueryFactory> f = getQuery(queryName, logctx);
  668. if (f)
  669. {
  670. reply.appendf("<Query id='%s'>\n", queryName);
  671. f->getStats(reply, graphName);
  672. reply.append("</Query>\n");
  673. }
  674. else
  675. throw MakeStringException(ROXIE_UNKNOWN_QUERY, "Unknown query %s", queryName);
  676. }
  677. virtual void resetQueryTimings(const char *queryName, const IRoxieContextLogger &logctx)
  678. {
  679. Owned<IQueryFactory> f = getQuery(queryName, logctx);
  680. if (f)
  681. f->resetQueryTimings();
  682. else
  683. throw MakeStringException(ROXIE_UNKNOWN_QUERY, "Unknown query %s", queryName);
  684. }
  685. virtual void resetAllQueryTimings()
  686. {
  687. HashIterator elems(queries);
  688. for (elems.first(); elems.isValid(); elems.next())
  689. {
  690. IMapping &cur = elems.query();
  691. queries.mapToValue(&cur)->resetQueryTimings();
  692. }
  693. }
  694. virtual void getActivityMetrics(StringBuffer &reply) const
  695. {
  696. HashIterator elems(queries);
  697. for (elems.first(); elems.isValid(); elems.next())
  698. {
  699. IMapping &cur = elems.query();
  700. queries.mapToValue(&cur)->getActivityMetrics(reply);
  701. }
  702. }
  703. virtual void getAllQueryInfo(StringBuffer &reply, bool full, const IRoxieContextLogger &logctx) const
  704. {
  705. HashIterator elems(queries);
  706. for (elems.first(); elems.isValid(); elems.next())
  707. {
  708. IMapping &cur = elems.query();
  709. IQueryFactory *query = queries.mapToValue(&cur);
  710. query->getQueryInfo(reply, full, logctx);
  711. }
  712. HashIterator aliasIterator(aliases);
  713. for (aliasIterator.first(); aliasIterator.isValid(); aliasIterator.next())
  714. {
  715. IMapping &cur = aliasIterator.query();
  716. reply.appendf(" <Alias id='%s' query='%s'/>\n", (const char *) cur.getKey(), aliases.mapToValue(&cur)->queryQueryName());
  717. }
  718. }
  719. virtual IQueryFactory *getQuery(const char *id, const IRoxieContextLogger &logctx) const
  720. {
  721. IQueryFactory *ret;
  722. ret = aliases.getValue(id);
  723. if (ret && logctx.queryTraceLevel() > 5)
  724. logctx.CTXLOG("Found query alias %s => %s", id, ret->queryQueryName());
  725. if (!ret)
  726. ret = queries.getValue(id);
  727. return LINK(ret);
  728. }
  729. };
  730. //===============================================================================================================
  731. class CRoxieServerQuerySetManager : public CRoxieQuerySetManager
  732. {
  733. public:
  734. IMPLEMENT_IINTERFACE;
  735. CRoxieServerQuerySetManager(const char *_querySetName)
  736. : CRoxieQuerySetManager(0, _querySetName)
  737. {
  738. }
  739. virtual IQueryFactory * loadQueryFromDll(const char *id, const IQueryDll *dll, const IHpccPackage &package, const IPropertyTree *stateInfo)
  740. {
  741. return createServerQueryFactory(id, dll, package, stateInfo);
  742. }
  743. };
  744. extern IRoxieQuerySetManager *createServerManager(const char *querySet)
  745. {
  746. return new CRoxieServerQuerySetManager(querySet);
  747. }
  748. //===============================================================================================================
  749. class CRoxieSlaveQuerySetManager : public CRoxieQuerySetManager
  750. {
  751. public:
  752. IMPLEMENT_IINTERFACE;
  753. CRoxieSlaveQuerySetManager(unsigned _channelNo, const char *_querySetName)
  754. : CRoxieQuerySetManager(_channelNo, _querySetName)
  755. {
  756. channelNo = _channelNo;
  757. }
  758. virtual IQueryFactory *loadQueryFromDll(const char *id, const IQueryDll *dll, const IHpccPackage &package, const IPropertyTree *stateInfo)
  759. {
  760. return createSlaveQueryFactory(id, dll, package, channelNo, stateInfo);
  761. }
  762. };
  763. class CRoxieSlaveQuerySetManagerSet : public CInterface, implements IRoxieQuerySetManagerSet
  764. {
  765. public:
  766. IMPLEMENT_IINTERFACE;
  767. CRoxieSlaveQuerySetManagerSet(unsigned _numChannels, const char *querySetName)
  768. : numChannels(_numChannels)
  769. {
  770. CriticalBlock b(ccdChannelsCrit);
  771. managers = new CRoxieSlaveQuerySetManager *[numChannels];
  772. memset(managers, 0, sizeof(CRoxieSlaveQuerySetManager *) * numChannels);
  773. Owned<IPropertyTreeIterator> it = ccdChannels->getElements("RoxieSlaveProcess");
  774. ForEach(*it)
  775. {
  776. unsigned channelNo = it->query().getPropInt("@channel", 0);
  777. assertex(channelNo>0 && channelNo<=numChannels);
  778. if (managers[channelNo-1] == NULL)
  779. managers[channelNo-1] = new CRoxieSlaveQuerySetManager(channelNo, querySetName);
  780. else
  781. throw MakeStringException(ROXIE_INVALID_TOPOLOGY, "Invalid topology file - channel %d repeated for this slave", channelNo);
  782. }
  783. }
  784. ~CRoxieSlaveQuerySetManagerSet()
  785. {
  786. for (unsigned channel = 0; channel < numChannels; channel++)
  787. ::Release(managers[channel]);
  788. delete [] managers;
  789. }
  790. inline CRoxieSlaveQuerySetManager *item(int idx)
  791. {
  792. return managers[idx];
  793. }
  794. virtual void load(const IPropertyTree *querySets, const IRoxiePackageMap &packages, hash64_t &hash)
  795. {
  796. for (unsigned channel = 0; channel < numChannels; channel++)
  797. if (managers[channel])
  798. managers[channel]->load(querySets, packages, hash); // MORE - this means the hash depends on the number of channels. Is that desirable?
  799. }
  800. private:
  801. unsigned numChannels;
  802. CRoxieSlaveQuerySetManager **managers;
  803. };
  804. //===============================================================================================================
  805. class CRoxieDebugSessionManager : public CInterface, implements IRoxieDebugSessionManager
  806. {
  807. protected:
  808. ReadWriteLock debugLock;
  809. MapStringToMyClass<IDebuggerContext> debuggerContexts;
  810. public:
  811. IMPLEMENT_IINTERFACE;
  812. void getActiveQueries(StringBuffer &reply)
  813. {
  814. HashIterator q(debuggerContexts);
  815. for (q.first(); q.isValid(); q.next())
  816. {
  817. IDebuggerContext *ctx = debuggerContexts.mapToValue(&q.query());
  818. reply.appendf(" <Query id='%s' uid='%s' debug='1'/>\n", ctx->queryQueryName(), ctx->queryDebugId());
  819. }
  820. }
  821. virtual void registerDebugId(const char *id, IDebuggerContext *ctx)
  822. {
  823. WriteLockBlock block(debugLock);
  824. debuggerContexts.setValue(id, ctx);
  825. }
  826. virtual void deregisterDebugId(const char *id)
  827. {
  828. WriteLockBlock block(debugLock);
  829. debuggerContexts.remove(id);
  830. }
  831. virtual IDebuggerContext *lookupDebuggerContext(const char *id)
  832. {
  833. ReadLockBlock block(debugLock);
  834. IDebuggerContext *ctx = debuggerContexts.getValue(id);
  835. if (ctx)
  836. return LINK(ctx);
  837. else
  838. {
  839. #ifdef _DEBUG
  840. // In a debug environment, it is convenient to be able to use '*' to mean 'the only active debug session'...
  841. if (strcmp(id, "*")==0 && debuggerContexts.count()==1)
  842. {
  843. HashIterator q(debuggerContexts);
  844. for (q.first(); q.isValid(); q.next())
  845. {
  846. IDebuggerContext *ctx = debuggerContexts.mapToValue(&q.query());
  847. return LINK(ctx);
  848. }
  849. }
  850. #endif
  851. throw MakeStringException(ROXIE_INTERNAL_ERROR, "Debug context %s not found", id);
  852. }
  853. }
  854. };
  855. //===============================================================================================
  856. /*----------------------------------------------------------------------------------------------
  857. * A CRoxieQueryPackageManager object manages all the queries that are currently runnable via XML.
  858. * There may be more than one in existence, but only one will be active and therefore used to
  859. * look up queries that are received - this corresponds to the currently active package.
  860. *-----------------------------------------------------------------------------------------------*/
  861. class CRoxieQueryPackageManager : public CInterface
  862. {
  863. public:
  864. IMPLEMENT_IINTERFACE;
  865. CRoxieQueryPackageManager(unsigned _numChannels, const char *_querySet, const IRoxiePackageMap *_packages)
  866. : numChannels(_numChannels), packages(_packages), querySet(_querySet)
  867. {
  868. queryHash = 0;
  869. }
  870. ~CRoxieQueryPackageManager()
  871. {
  872. }
  873. inline const char *queryPackageId() const
  874. {
  875. return packages->queryPackageId();
  876. }
  877. virtual void reload()
  878. {
  879. // Default is to do nothing...
  880. }
  881. virtual void load() = 0;
  882. virtual hash64_t getHash()
  883. {
  884. CriticalBlock b2(updateCrit);
  885. return queryHash;
  886. }
  887. IRoxieQuerySetManager* getRoxieServerManager()
  888. {
  889. CriticalBlock b2(updateCrit);
  890. return serverManager.getLink();
  891. }
  892. void getInfo(StringBuffer &reply, const IRoxieContextLogger &logctx) const
  893. {
  894. reply.appendf(" <PackageSet id=\"%s\" querySet=\"%s\"/>\n", queryPackageId(), querySet.get());
  895. }
  896. void resetStats(const char *queryId, const IRoxieContextLogger &logctx)
  897. {
  898. CriticalBlock b(updateCrit);
  899. if (queryId)
  900. {
  901. Owned<IQueryFactory> query = serverManager->getQuery(queryId, logctx);
  902. if (query)
  903. {
  904. const char *id = query->queryQueryName();
  905. serverManager->resetQueryTimings(id, logctx);
  906. for (unsigned channel = 0; channel < numChannels; channel++)
  907. if (slaveManagers->item(channel))
  908. {
  909. slaveManagers->item(channel)->resetQueryTimings(id, logctx);
  910. }
  911. }
  912. }
  913. else
  914. {
  915. serverManager->resetAllQueryTimings();
  916. for (unsigned channel = 0; channel < numChannels; channel++)
  917. if (slaveManagers->item(channel))
  918. slaveManagers->item(channel)->resetAllQueryTimings();
  919. }
  920. }
  921. void getStats(const char *queryId, const char *action, const char *graphName, StringBuffer &reply, const IRoxieContextLogger &logctx) const
  922. {
  923. CriticalBlock b2(updateCrit);
  924. Owned<IQueryFactory> query = serverManager->getQuery(queryId, logctx);
  925. if (query)
  926. {
  927. StringBuffer freply;
  928. serverManager->getStats(queryId, graphName, freply, logctx);
  929. Owned<IPropertyTree> stats = createPTreeFromXMLString(freply.str());
  930. for (unsigned channel = 0; channel < numChannels; channel++)
  931. if (slaveManagers->item(channel))
  932. {
  933. StringBuffer sreply;
  934. slaveManagers->item(channel)->getStats(queryId, graphName, sreply, logctx);
  935. Owned<IPropertyTree> cstats = createPTreeFromXMLString(sreply.str());
  936. mergeStats(stats, cstats, 1);
  937. }
  938. toXML(stats, reply);
  939. }
  940. }
  941. void getActivityMetrics(StringBuffer &reply) const
  942. {
  943. CriticalBlock b2(updateCrit);
  944. serverManager->getActivityMetrics(reply);
  945. for (unsigned channel = 0; channel < numChannels; channel++)
  946. {
  947. if (slaveManagers->item(channel))
  948. {
  949. slaveManagers->item(channel)->getActivityMetrics(reply);
  950. }
  951. }
  952. }
  953. void getAllQueryInfo(StringBuffer &reply, bool full, const IRoxieContextLogger &logctx) const
  954. {
  955. CriticalBlock b2(updateCrit);
  956. serverManager->getAllQueryInfo(reply, full, logctx);
  957. }
  958. protected:
  959. void reloadQueryManagers(CRoxieSlaveQuerySetManagerSet *newSlaveManagers, IRoxieQuerySetManager *newServerManager, hash64_t newHash)
  960. {
  961. Owned<CRoxieSlaveQuerySetManagerSet> oldSlaveManagers;
  962. Owned<IRoxieQuerySetManager> oldServerManager;
  963. {
  964. // Atomically, replace the existing query managers with the new ones
  965. CriticalBlock b2(updateCrit);
  966. oldSlaveManagers.setown(slaveManagers.getClear()); // so that the release happens outside the critblock
  967. oldServerManager.setown(serverManager.getClear()); // so that the release happens outside the critblock
  968. slaveManagers.setown(newSlaveManagers);
  969. serverManager.setown(newServerManager);
  970. queryHash = newHash;
  971. }
  972. }
  973. mutable CriticalSection updateCrit; // protects updates of slaveManagers and serverManager
  974. Owned<CRoxieSlaveQuerySetManagerSet> slaveManagers;
  975. Owned<IRoxieQuerySetManager> serverManager;
  976. Owned<const IRoxiePackageMap> packages;
  977. unsigned numChannels;
  978. hash64_t queryHash;
  979. StringAttr querySet;
  980. };
  981. /**
  982. * class CRoxieDaliQueryPackageManager - manages queries specified in QuerySets, for a given package set.
  983. *
  984. * If the QuerySet is modified, it will be reloaded.
  985. * There is one CRoxieDaliQueryPackageManager for every PackageSet - only one will be active for query lookup
  986. * at a given time (the one associated with the active PackageSet).
  987. *
  988. * To deploy new data, typically we will load a new PackageSet, make it active, then release the old one
  989. * A packageSet is not modified while loaded, to avoid timing issues between slaves and server.
  990. *
  991. * We need to be able to spot a change (in dali) to the active package indicator (and switch the active CRoxieDaliQueryPackageManager)
  992. * We need to be able to spot a change (in dali) that adds a new PackageSet
  993. * We need to decide what to do about a change (in dali) to an existing PackageSet. Maybe we allow it (leave it up to the gui to
  994. * encourage changing in the right sequence). In which case a change to the package info in dali means reload all global package
  995. * managers (and then discard the old ones). Hash-based queries means everything should work ok.
  996. * -> If the active ptr changes, just change what is active
  997. * If any change to any package set, reload all globalResourceManagers and discard prior
  998. * The query caching code should ensure that it is quick enough to do so
  999. *
  1000. **/
  1001. class CRoxieDaliQueryPackageManager : public CRoxieQueryPackageManager, implements ISDSSubscription
  1002. {
  1003. Owned<IRoxieDaliHelper> daliHelper;
  1004. Owned<IDaliPackageWatcher> notifier;
  1005. public:
  1006. IMPLEMENT_IINTERFACE;
  1007. CRoxieDaliQueryPackageManager(unsigned _numChannels, const IRoxiePackageMap *_packages, const char *_querySet)
  1008. : CRoxieQueryPackageManager(_numChannels, _querySet, _packages)
  1009. {
  1010. daliHelper.setown(connectToDali());
  1011. }
  1012. ~CRoxieDaliQueryPackageManager()
  1013. {
  1014. if (notifier)
  1015. daliHelper->releaseSubscription(notifier);
  1016. }
  1017. virtual void notify(SubscriptionId id, const char *xpath, SDSNotifyFlags flags, unsigned valueLen, const void *valueData)
  1018. {
  1019. reload();
  1020. daliHelper->commitCache();
  1021. }
  1022. virtual void load()
  1023. {
  1024. notifier.setown(daliHelper->getQuerySetSubscription(querySet, this));
  1025. reload();
  1026. }
  1027. virtual void reload()
  1028. {
  1029. hash64_t newHash = numChannels;
  1030. Owned<IPropertyTree> newQuerySet = daliHelper->getQuerySet(querySet);
  1031. Owned<CRoxieSlaveQuerySetManagerSet> newSlaveManagers = new CRoxieSlaveQuerySetManagerSet(numChannels, querySet);
  1032. Owned<IRoxieQuerySetManager> newServerManager = createServerManager(querySet);
  1033. newServerManager->load(newQuerySet, *packages, newHash);
  1034. newSlaveManagers->load(newQuerySet, *packages, newHash);
  1035. reloadQueryManagers(newSlaveManagers.getClear(), newServerManager.getClear(), newHash);
  1036. clearKeyStoreCache(false); // Allows us to fully release files we no longer need because of unloaded queries
  1037. }
  1038. };
  1039. class CStandaloneQueryPackageManager : public CRoxieQueryPackageManager
  1040. {
  1041. Owned<IPropertyTree> standaloneDll;
  1042. public:
  1043. IMPLEMENT_IINTERFACE;
  1044. CStandaloneQueryPackageManager(unsigned _numChannels, const char *_querySet, const IRoxiePackageMap *_packages, IPropertyTree *_standaloneDll)
  1045. : CRoxieQueryPackageManager(_numChannels, _querySet, _packages), standaloneDll(_standaloneDll)
  1046. {
  1047. assertex(standaloneDll);
  1048. }
  1049. ~CStandaloneQueryPackageManager()
  1050. {
  1051. }
  1052. virtual void load()
  1053. {
  1054. hash64_t newHash = numChannels;
  1055. Owned<IPropertyTree> newQuerySet = createPTree("QuerySet");
  1056. newQuerySet->setProp("@name", "_standalone");
  1057. newQuerySet->addPropTree("Query", standaloneDll.getLink());
  1058. Owned<CRoxieSlaveQuerySetManagerSet> newSlaveManagers = new CRoxieSlaveQuerySetManagerSet(numChannels, querySet);
  1059. Owned<IRoxieQuerySetManager> newServerManager = createServerManager(querySet);
  1060. newServerManager->load(newQuerySet, *packages, newHash);
  1061. newSlaveManagers->load(newQuerySet, *packages, newHash);
  1062. reloadQueryManagers(newSlaveManagers.getClear(), newServerManager.getClear(), newHash);
  1063. }
  1064. };
  1065. static SpinLock roxieDebugSessionManagerLock;
  1066. extern IRoxieDebugSessionManager &queryRoxieDebugSessionManager()
  1067. {
  1068. SpinBlock b(roxieDebugSessionManagerLock);
  1069. if (!debugSessionManager)
  1070. debugSessionManager = new CRoxieDebugSessionManager();
  1071. return *debugSessionManager;
  1072. }
  1073. class CRoxiePackageSetWatcher : public CInterface, implements ISDSSubscription
  1074. {
  1075. public:
  1076. IMPLEMENT_IINTERFACE;
  1077. CRoxiePackageSetWatcher(IRoxieDaliHelper *_daliHelper, ISDSSubscription *_owner, unsigned numChannels)
  1078. : stateHash(0), daliHelper(_daliHelper), owner(_owner)
  1079. {
  1080. Owned<IDaliPackageWatcher> notifier = daliHelper->getPackageSetsSubscription(this);
  1081. if (notifier)
  1082. notifiers.append(*notifier.getClear());
  1083. ForEachItemIn(idx, allQuerySetNames)
  1084. {
  1085. createQueryPackageManagers(numChannels, allQuerySetNames.item(idx));
  1086. }
  1087. }
  1088. CRoxiePackageSetWatcher(IRoxieDaliHelper *_daliHelper, ISDSSubscription *_owner, const IQueryDll *standAloneDll, unsigned numChannels, const char *querySet)
  1089. : stateHash(0), daliHelper(_daliHelper), owner(_owner)
  1090. {
  1091. Owned<IPropertyTree> standAloneDllTree;
  1092. standAloneDllTree.setown(createPTree("Query"));
  1093. standAloneDllTree->setProp("@id", "roxie");
  1094. standAloneDllTree->setProp("@dll", standAloneDll->queryDll()->queryName());
  1095. Owned<CRoxieQueryPackageManager> qpm = new CStandaloneQueryPackageManager(numChannels, querySet, LINK(&queryEmptyRoxiePackageMap()), standAloneDllTree.getClear());
  1096. qpm->load();
  1097. stateHash = rtlHash64Data(sizeof(stateHash), &stateHash, qpm->getHash());
  1098. allQueryPackages.append(*qpm.getClear());
  1099. }
  1100. ~CRoxiePackageSetWatcher()
  1101. {
  1102. unsubscribe();
  1103. }
  1104. virtual void notify(SubscriptionId id, const char *xpath, SDSNotifyFlags flags, unsigned valueLen, const void *valueData)
  1105. {
  1106. owner->notify(id, xpath, flags, valueLen, valueData);
  1107. }
  1108. IQueryFactory *lookupLibrary(const char *libraryName, unsigned expectedInterfaceHash, const IRoxieContextLogger &logctx) const
  1109. {
  1110. ForEachItemIn(idx, allQueryPackages)
  1111. {
  1112. Owned<IRoxieQuerySetManager> sm = allQueryPackages.item(idx).getRoxieServerManager();
  1113. if (sm->isActive())
  1114. {
  1115. Owned<IQueryFactory> library = sm->getQuery(libraryName, logctx);
  1116. if (library)
  1117. {
  1118. if (library->isQueryLibrary())
  1119. {
  1120. unsigned foundInterfaceHash = library->getQueryLibraryInterfaceHash();
  1121. if (!foundInterfaceHash || (foundInterfaceHash == expectedInterfaceHash))
  1122. return library.getClear();
  1123. else
  1124. throw MakeStringException(ROXIE_LIBRARY_ERROR, "The library interface found in %s is not compatible (found %d, expected %d)", libraryName, foundInterfaceHash, expectedInterfaceHash);
  1125. }
  1126. else
  1127. throw MakeStringException(ROXIE_LIBRARY_ERROR, "The query resolved by %s is not a library", libraryName);
  1128. }
  1129. }
  1130. }
  1131. throw MakeStringException(ROXIE_LIBRARY_ERROR, "No library available for %s", libraryName);
  1132. }
  1133. IQueryFactory *getQuery(const char *id, const IRoxieContextLogger &logctx) const
  1134. {
  1135. ForEachItemIn(idx, allQueryPackages)
  1136. {
  1137. Owned<IRoxieQuerySetManager> sm = allQueryPackages.item(idx).getRoxieServerManager();
  1138. if (sm->isActive())
  1139. {
  1140. IQueryFactory *query = sm->getQuery(id, logctx);
  1141. if (query)
  1142. return query;
  1143. }
  1144. }
  1145. return NULL;
  1146. }
  1147. int getActivePackageCount() const
  1148. {
  1149. int count = 0;
  1150. ForEachItemIn(idx, allQueryPackages)
  1151. {
  1152. Owned<IRoxieQuerySetManager> sm = allQueryPackages.item(idx).getRoxieServerManager();
  1153. if (sm->isActive())
  1154. count++;
  1155. }
  1156. return count;
  1157. }
  1158. inline hash64_t queryHash() const
  1159. {
  1160. return stateHash;
  1161. }
  1162. void getAllQueryInfo(StringBuffer &reply, bool full, const IRoxieContextLogger &logctx) const
  1163. {
  1164. ForEachItemIn(idx, allQueryPackages)
  1165. {
  1166. Owned<IRoxieQuerySetManager> sm = allQueryPackages.item(idx).getRoxieServerManager();
  1167. sm->getAllQueryInfo(reply, full, logctx);
  1168. }
  1169. }
  1170. void getActivityMetrics(StringBuffer &reply) const
  1171. {
  1172. ForEachItemIn(idx, allQueryPackages)
  1173. {
  1174. CRoxieQueryPackageManager &qpm = allQueryPackages.item(idx);
  1175. qpm.getActivityMetrics(reply);
  1176. }
  1177. }
  1178. void getInfo(StringBuffer &reply, const IRoxieContextLogger &logctx) const
  1179. {
  1180. reply.append("<PackageSets>\n");
  1181. ForEachItemIn(idx, allQueryPackages)
  1182. {
  1183. allQueryPackages.item(idx).getInfo(reply, logctx);
  1184. }
  1185. reply.append("</PackageSets>\n");
  1186. }
  1187. void getStats(StringBuffer &reply, const char *id, const char *action, const char *graphName, const IRoxieContextLogger &logctx) const
  1188. {
  1189. ForEachItemIn(idx, allQueryPackages)
  1190. {
  1191. allQueryPackages.item(idx).getStats(id, action, graphName, reply, logctx);
  1192. }
  1193. }
  1194. void resetStats(const char *id, const IRoxieContextLogger &logctx) const
  1195. {
  1196. ForEachItemIn(idx, allQueryPackages)
  1197. {
  1198. allQueryPackages.item(idx).resetStats(id, logctx);
  1199. }
  1200. }
  1201. private:
  1202. ISDSSubscription *owner;
  1203. CIArrayOf<CRoxieQueryPackageManager> allQueryPackages;
  1204. IArrayOf<IDaliPackageWatcher> notifiers;
  1205. Linked<IRoxieDaliHelper> daliHelper;
  1206. hash64_t stateHash;
  1207. void createQueryPackageManager(unsigned numChannels, const IRoxiePackageMap *packageMap, const char *querySet)
  1208. {
  1209. Owned<CRoxieQueryPackageManager> qpm = new CRoxieDaliQueryPackageManager(numChannels, packageMap, querySet);
  1210. qpm->load();
  1211. stateHash = rtlHash64Data(sizeof(stateHash), &stateHash, qpm->getHash());
  1212. allQueryPackages.append(*qpm.getClear());
  1213. }
  1214. void createQueryPackageManagers(unsigned numChannels, const char *querySet)
  1215. {
  1216. int loadedPackages = 0;
  1217. int activePackages = 0;
  1218. Owned<IPropertyTree> packageTree = daliHelper->getPackageSets();
  1219. Owned<IPropertyTreeIterator> packageSets = packageTree->getElements("PackageSet");
  1220. ForEach(*packageSets)
  1221. {
  1222. IPropertyTree &ps = packageSets->query();
  1223. const char *packageSetSpec = ps.queryProp("@process");
  1224. if (!packageSetSpec || WildMatch(roxieName, packageSetSpec, false))
  1225. {
  1226. if (traceLevel)
  1227. {
  1228. DBGLOG("Loading package set %s, process spec %s", ps.queryProp("@id") ? ps.queryProp("@id") : "<no-id>",
  1229. packageSetSpec ? packageSetSpec : "<*>");
  1230. }
  1231. Owned<IPropertyTreeIterator> packageMaps = ps.getElements("PackageMap");
  1232. ForEach(*packageMaps)
  1233. {
  1234. IPropertyTree &pm = packageMaps->query();
  1235. const char *packageMapId = pm.queryProp("@id");
  1236. const char *packageMapFilter = pm.queryProp("@querySet");
  1237. if (packageMapId && *packageMapId && (!packageMapFilter || WildMatch(querySet, packageMapFilter, false)))
  1238. {
  1239. bool isActive = pm.getPropBool("@active", true);
  1240. if (traceLevel)
  1241. DBGLOG("Loading package map %s, active %s", packageMapId, isActive ? "true" : "false");
  1242. try
  1243. {
  1244. Owned<CRoxiePackageMap> packageMap = new CRoxiePackageMap(packageMapId, packageMapFilter, isActive);
  1245. Owned<IPropertyTree> xml = daliHelper->getPackageMap(packageMapId);
  1246. packageMap->load(xml);
  1247. createQueryPackageManager(numChannels, packageMap.getLink(), querySet);
  1248. loadedPackages++;
  1249. if (isActive)
  1250. activePackages++;
  1251. notifiers.append(*daliHelper->getPackageMapSubscription(packageMapId, this));
  1252. }
  1253. catch (IException *E)
  1254. {
  1255. StringBuffer msg;
  1256. msg.appendf("Failed to load package map %s", packageMapId);
  1257. EXCLOG(E, msg.str());
  1258. E->Release();
  1259. }
  1260. }
  1261. }
  1262. }
  1263. }
  1264. if (!loadedPackages)
  1265. {
  1266. if (traceLevel)
  1267. DBGLOG("Loading empty package for QuerySet %s", querySet);
  1268. createQueryPackageManager(numChannels, LINK(&queryEmptyRoxiePackageMap()), querySet);
  1269. }
  1270. else if (traceLevel)
  1271. DBGLOG("Loaded %d packages (%d active)", loadedPackages, activePackages);
  1272. }
  1273. void unsubscribe()
  1274. {
  1275. ForEachItemIn(idx, notifiers)
  1276. {
  1277. daliHelper->releaseSubscription(&notifiers.item(idx));
  1278. }
  1279. notifiers.kill();
  1280. }
  1281. };
  1282. class CRoxiePackageSetManager : public CInterface, implements IRoxieQueryPackageManagerSet, implements ISDSSubscription
  1283. {
  1284. public:
  1285. IMPLEMENT_IINTERFACE;
  1286. CRoxiePackageSetManager(const IQueryDll *_standAloneDll) :
  1287. standAloneDll(_standAloneDll)
  1288. {
  1289. daliHelper.setown(connectToDali(ROXIE_DALI_CONNECT_TIMEOUT));
  1290. }
  1291. ~CRoxiePackageSetManager()
  1292. {
  1293. }
  1294. virtual void load()
  1295. {
  1296. try
  1297. {
  1298. reload();
  1299. daliHelper->commitCache();
  1300. controlSem.signal();
  1301. }
  1302. catch(IException *E)
  1303. {
  1304. EXCLOG(E, "No configuration could be loaded");
  1305. controlSem.interrupt();
  1306. throw; // Roxie will refuse to start up if configuration invalid
  1307. }
  1308. }
  1309. virtual void doControlMessage(IPropertyTree *xml, StringBuffer &reply, const IRoxieContextLogger &logctx)
  1310. {
  1311. if (!controlSem.wait(20000))
  1312. throw MakeStringException(ROXIE_TIMEOUT, "Timed out waiting for current control query to complete");
  1313. try
  1314. {
  1315. _doControlMessage(xml, reply, logctx);
  1316. reply.append(" <Status>ok</Status>\n");
  1317. }
  1318. catch(IException *E)
  1319. {
  1320. controlSem.signal();
  1321. EXCLOG(E);
  1322. throw;
  1323. }
  1324. catch(...)
  1325. {
  1326. controlSem.signal();
  1327. throw;
  1328. }
  1329. controlSem.signal();
  1330. }
  1331. virtual IQueryFactory *lookupLibrary(const char *libraryName, unsigned expectedInterfaceHash, const IRoxieContextLogger &logctx) const
  1332. {
  1333. ReadLockBlock b(packageCrit);
  1334. return allQueryPackages->lookupLibrary(libraryName, expectedInterfaceHash, logctx);
  1335. }
  1336. virtual IQueryFactory *getQuery(const char *id, const IRoxieContextLogger &logctx) const
  1337. {
  1338. ReadLockBlock b(packageCrit);
  1339. return allQueryPackages->getQuery(id, logctx);
  1340. }
  1341. virtual int getActivePackageCount() const
  1342. {
  1343. ReadLockBlock b(packageCrit);
  1344. return allQueryPackages->getActivePackageCount();
  1345. }
  1346. virtual void notify(SubscriptionId id, const char *xpath, SDSNotifyFlags flags, unsigned valueLen, const void *valueData)
  1347. {
  1348. reload();
  1349. daliHelper->commitCache();
  1350. }
  1351. private:
  1352. Owned<const IQueryDll> standAloneDll;
  1353. Owned<CRoxieDebugSessionManager> debugSessionManager;
  1354. Owned<IRoxieDaliHelper> daliHelper;
  1355. mutable ReadWriteLock packageCrit;
  1356. InterruptableSemaphore controlSem;
  1357. Owned<CRoxiePackageSetWatcher> allQueryPackages;
  1358. void reload()
  1359. {
  1360. // We want to kill the old packages, but not until we have created the new ones
  1361. // So that the query/dll caching will work for anything that is not affected by the changes
  1362. Owned<CRoxiePackageSetWatcher> newPackages;
  1363. if (standAloneDll)
  1364. newPackages.setown(new CRoxiePackageSetWatcher(daliHelper, this, standAloneDll, numChannels, "roxie"));
  1365. else
  1366. newPackages.setown(new CRoxiePackageSetWatcher(daliHelper, this, numChannels));
  1367. // Hold the lock for as little time as we can
  1368. // Note that we must NOT hold the lock during the delete of the old object - or we deadlock.
  1369. // Hence the slightly convoluted code below
  1370. Owned<CRoxiePackageSetWatcher> oldPackages; // NB Destroyed outside the WriteLockBlock
  1371. {
  1372. WriteLockBlock b(packageCrit);
  1373. oldPackages.setown(allQueryPackages.getLink()); // To ensure that the setown just below does not delete it
  1374. allQueryPackages.setown(newPackages.getClear());
  1375. }
  1376. }
  1377. // Common code used by control:queries and control:getQueryXrefInfo
  1378. void getQueryInfo(IPropertyTree *control, StringBuffer &reply, bool full, const IRoxieContextLogger &logctx) const
  1379. {
  1380. Owned<IPropertyTreeIterator> ids = control->getElements("Query");
  1381. reply.append("<Queries>\n");
  1382. if (ids->first())
  1383. {
  1384. ForEach(*ids)
  1385. {
  1386. const char *id = ids->query().queryProp("@id");
  1387. if (id)
  1388. {
  1389. Owned<IQueryFactory> query = getQuery(id, logctx);
  1390. if (query)
  1391. query->getQueryInfo(reply, full, logctx);
  1392. else
  1393. reply.appendf(" <Query id=\"%s\" error=\"Query not found\"/>\n", id);
  1394. }
  1395. }
  1396. }
  1397. else
  1398. {
  1399. ReadLockBlock readBlock(packageCrit);
  1400. allQueryPackages->getAllQueryInfo(reply, full, logctx);
  1401. }
  1402. reply.append("</Queries>\n");
  1403. }
  1404. void _doControlMessage(IPropertyTree *control, StringBuffer &reply, const IRoxieContextLogger &logctx)
  1405. {
  1406. const char *queryName = control->queryName();
  1407. logctx.CTXLOG("doControlMessage - %s", queryName);
  1408. assertex(memicmp(queryName, "control:", 8) == 0);
  1409. bool unknown = false;
  1410. switch (_toupper(queryName[8]))
  1411. {
  1412. case 'A':
  1413. if (stricmp(queryName, "control:aclupdate") == 0)
  1414. {
  1415. // MORE - do nothing for now - possibly needed in the future - leave this so no exception is thrown
  1416. }
  1417. else if (stricmp(queryName, "control:activeQueries")==0)
  1418. {
  1419. if (debugSessionManager)
  1420. debugSessionManager->getActiveQueries(reply);
  1421. }
  1422. else if (stricmp(queryName, "control:activitymetrics")==0)
  1423. {
  1424. ReadLockBlock readBlock(packageCrit);
  1425. allQueryPackages->getActivityMetrics(reply);
  1426. }
  1427. else if (stricmp(queryName, "control:alive")==0)
  1428. {
  1429. reply.appendf("<Alive restarts='%d'/>", restarts);
  1430. }
  1431. else
  1432. unknown = true;
  1433. break;
  1434. case 'B':
  1435. if (stricmp(queryName, "control:blobCacheMem")==0)
  1436. {
  1437. blobCacheMB = control->getPropInt("@val", 0);
  1438. topology->setPropInt("@blobCacheMem", blobCacheMB);
  1439. setBlobCacheMem(blobCacheMB * 0x100000);
  1440. }
  1441. else
  1442. unknown = true;
  1443. break;
  1444. case 'C':
  1445. if (stricmp(queryName, "control:checkCompleted")==0)
  1446. {
  1447. checkCompleted = control->getPropBool("@val", true);
  1448. topology->setPropBool("@checkCompleted", checkCompleted );
  1449. }
  1450. else if (stricmp(queryName, "control:checkingHeap")==0)
  1451. {
  1452. defaultCheckingHeap = control->getPropBool("@val", true);
  1453. topology->setPropInt("@checkingHeap", defaultCheckingHeap);
  1454. }
  1455. else if (stricmp(queryName, "control:clearIndexCache")==0)
  1456. {
  1457. bool clearAll = control->getPropBool("@clearAll", true);
  1458. clearKeyStoreCache(clearAll);
  1459. }
  1460. else if (stricmp(queryName, "control:closedown")==0)
  1461. {
  1462. closedown();
  1463. }
  1464. else if (stricmp(queryName, "control:closeExpired")==0)
  1465. {
  1466. queryFileCache().closeExpired(false);
  1467. queryFileCache().closeExpired(true);
  1468. }
  1469. else if (stricmp(queryName, "control:closeLocalExpired")==0)
  1470. {
  1471. queryFileCache().closeExpired(false);
  1472. }
  1473. else if (stricmp(queryName, "control:closeRemoteExpired")==0)
  1474. {
  1475. queryFileCache().closeExpired(true);
  1476. }
  1477. else
  1478. unknown = true;
  1479. break;
  1480. case 'D':
  1481. if (stricmp(queryName, "control:dafilesrvLookupTimeout")==0)
  1482. {
  1483. dafilesrvLookupTimeout = control->getPropInt("@val", 10000);
  1484. topology->setPropInt("@dafilesrvLookupTimeout", dafilesrvLookupTimeout);
  1485. }
  1486. else if (stricmp(queryName, "control:defaultConcatPreload")==0)
  1487. {
  1488. defaultConcatPreload = control->getPropInt("@val", 0);
  1489. topology->setPropInt("@defaultConcatPreload", defaultConcatPreload);
  1490. }
  1491. else if (stricmp(queryName, "control:defaultFetchPreload")==0)
  1492. {
  1493. defaultFetchPreload = control->getPropInt("@val", 0);
  1494. topology->setPropInt("@defaultFetchPreload", defaultFetchPreload);
  1495. }
  1496. else if (stricmp(queryName, "control:defaultFullKeyedJoinPreload")==0)
  1497. {
  1498. defaultFullKeyedJoinPreload = control->getPropInt("@val", 0);
  1499. topology->setPropInt("@defaultFullKeyedJoinPreload", defaultFullKeyedJoinPreload);
  1500. }
  1501. else if (stricmp(queryName, "control:defaultHighPriorityTimeLimit")==0)
  1502. {
  1503. defaultTimeLimit[1] = control->getPropInt("@limit", 0);
  1504. topology->setPropInt("@defaultHighPriorityTimeLimit", defaultTimeLimit[1]);
  1505. }
  1506. else if (stricmp(queryName, "control:defaultHighPriorityTimeWarning")==0)
  1507. {
  1508. defaultWarnTimeLimit[1] = control->getPropInt("@limit", 0);
  1509. topology->setPropInt("@defaultHighPriorityTimeWarning", defaultWarnTimeLimit[1]);
  1510. }
  1511. else if (stricmp(queryName, "control:defaultKeyedJoinPreload")==0)
  1512. {
  1513. defaultKeyedJoinPreload = control->getPropInt("@val", 0);
  1514. topology->setPropInt("@defaultKeyedJoinPreload", defaultKeyedJoinPreload);
  1515. }
  1516. else if (stricmp(queryName, "control:defaultLowPriorityTimeLimit")==0)
  1517. {
  1518. defaultTimeLimit[0] = control->getPropInt("@limit", 0);
  1519. topology->setPropInt("@defaultLowPriorityTimeLimit", defaultTimeLimit[0]);
  1520. }
  1521. else if (stricmp(queryName, "control:defaultLowPriorityTimeWarning")==0)
  1522. {
  1523. defaultWarnTimeLimit[0] = control->getPropInt("@limit", 0);
  1524. topology->setPropInt("@defaultLowPriorityTimeWarning", defaultWarnTimeLimit[0]);
  1525. }
  1526. else if (stricmp(queryName, "control:defaultParallelJoinPreload")==0)
  1527. {
  1528. defaultParallelJoinPreload = control->getPropInt("@val", 0);
  1529. topology->setPropInt("@defaultParallelJoinPreload", defaultParallelJoinPreload);
  1530. }
  1531. else if (stricmp(queryName, "control:defaultSLAPriorityTimeLimit")==0)
  1532. {
  1533. defaultTimeLimit[2] = control->getPropInt("@limit", 0);
  1534. topology->setPropInt("@defaultSLAPriorityTimeLimit", defaultTimeLimit[2]);
  1535. }
  1536. else if (stricmp(queryName, "control:defaultSLAPriorityTimeWarning")==0)
  1537. {
  1538. defaultWarnTimeLimit[2] = control->getPropInt("@limit", 0);
  1539. topology->setPropInt("@defaultSLAPriorityTimeWarning", defaultWarnTimeLimit[2]);
  1540. }
  1541. else if (stricmp(queryName, "control:deleteUnneededPhysicalFiles")==0)
  1542. {
  1543. UNIMPLEMENTED;
  1544. }
  1545. else if (stricmp(queryName, "control:deleteUnneededQueryCacheFiles")==0)
  1546. {
  1547. UNIMPLEMENTED;
  1548. }
  1549. else if (stricmp(queryName, "control:doIbytiDelay")==0)
  1550. { // WARNING: use with extra care only during inactivity on system
  1551. doIbytiDelay = control->getPropBool("@val", true);
  1552. topology->setPropBool("@doIbytiDelay", doIbytiDelay);
  1553. }
  1554. else
  1555. unknown = true;
  1556. break;
  1557. case 'E':
  1558. if (stricmp(queryName, "control:enableKeyDiff")==0)
  1559. {
  1560. enableKeyDiff = control->getPropBool("@val", true);
  1561. topology->setPropBool("@enableKeyDiff", enableKeyDiff);
  1562. }
  1563. else
  1564. unknown = true;
  1565. break;
  1566. case 'F':
  1567. if (stricmp(queryName, "control:fieldTranslationEnabled")==0)
  1568. {
  1569. fieldTranslationEnabled = control->getPropBool("@val", true);
  1570. topology->setPropInt("@fieldTranslationEnabled", fieldTranslationEnabled);
  1571. }
  1572. else if (stricmp(queryName, "control:flushJHtreeCacheOnOOM")==0)
  1573. {
  1574. flushJHtreeCacheOnOOM = control->getPropBool("@val", true);
  1575. topology->setPropInt("@flushJHtreeCacheOnOOM", flushJHtreeCacheOnOOM);
  1576. }
  1577. else
  1578. unknown = true;
  1579. break;
  1580. case 'G':
  1581. if (stricmp(queryName, "control:getACLinfo") == 0)
  1582. {
  1583. // MORE - do nothing for now - possibly needed in the future - leave this so no exception is thrown
  1584. }
  1585. else if (stricmp(queryName, "control:getClusterName")==0)
  1586. {
  1587. reply.appendf("<clusterName id='%s'/>", roxieName.str());
  1588. }
  1589. else if (stricmp(queryName, "control:getKeyInfo")==0)
  1590. {
  1591. reportInMemoryIndexStatistics(reply, control->queryProp("@id"), control->getPropInt("@count", 10));
  1592. }
  1593. else if (stricmp(queryName, "control:getQueryXrefInfo")==0)
  1594. {
  1595. getQueryInfo(control, reply, true, logctx);
  1596. }
  1597. else if (stricmp(queryName, "control:getQuery")==0)
  1598. {
  1599. const char* id = control->queryProp("@id");
  1600. if (!id)
  1601. throw MakeStringException(ROXIE_MISSING_PARAMS, "No query name specified");
  1602. Owned<IQueryFactory> q = getQuery(id, logctx);
  1603. if (q)
  1604. {
  1605. Owned<IPropertyTree> tempTree = q->cloneQueryXGMML();
  1606. tempTree->setProp("@roxieName", roxieName.str());
  1607. toXML(tempTree, reply);
  1608. }
  1609. else
  1610. throw MakeStringException(ROXIE_UNKNOWN_QUERY, "Unknown query %s", id);
  1611. }
  1612. else if (stricmp(queryName, "control:getQueryWarningTime")==0)
  1613. {
  1614. const char *id = control->queryProp("Query/@id");
  1615. if (!id)
  1616. badFormat();
  1617. Owned<IQueryFactory> f = getQuery(id, logctx);
  1618. if (f)
  1619. {
  1620. unsigned warnLimit = f->getWarnTimeLimit();
  1621. reply.appendf("<QueryTimeWarning val='%d'/>", warnLimit);
  1622. }
  1623. }
  1624. else if (stricmp(queryName, "control:getBuildVersion")==0)
  1625. {
  1626. reply.appendf("<version id='%s'/>", BUILD_TAG);
  1627. }
  1628. else
  1629. unknown = true;
  1630. break;
  1631. case 'I':
  1632. if (stricmp(queryName, "control:indexmetrics")==0)
  1633. {
  1634. getIndexMetrics(reply);
  1635. }
  1636. else if (stricmp(queryName, "control:inMemoryKeysEnabled")==0)
  1637. {
  1638. inMemoryKeysEnabled = control->getPropBool("@val", true);
  1639. topology->setPropBool("@inMemoryKeysEnabled", inMemoryKeysEnabled);
  1640. }
  1641. else
  1642. unknown = true;
  1643. break;
  1644. case 'L':
  1645. if (stricmp(queryName, "control:leafCacheMem")==0)
  1646. {
  1647. leafCacheMB = control->getPropInt("@val", 50);
  1648. topology->setPropInt("@leafCacheMem", leafCacheMB);
  1649. setLeafCacheMem(leafCacheMB * 0x100000);
  1650. }
  1651. else if (stricmp(queryName, "control:listFileOpenErrors")==0)
  1652. {
  1653. // this just creates a delta state file to remove references to Keys / Files we now longer have interest in
  1654. StringAttrMapping *mapping = queryFileCache().queryFileErrorList();
  1655. HashIterator iter(*mapping);
  1656. StringBuffer err;
  1657. for (iter.first(); iter.isValid(); iter.next())
  1658. {
  1659. IMapping &cur = iter.query();
  1660. StringAttr *item = mapping->mapToValue(&cur);
  1661. const char *filename = (const char*)cur.getKey();
  1662. const char *filetype = item->get();
  1663. reply.appendf("<file><name>%s</name><type>%s</type></file>", filename, filetype);
  1664. }
  1665. }
  1666. else if (stricmp(queryName, "control:listUnusedFiles")==0)
  1667. {
  1668. UNIMPLEMENTED;
  1669. }
  1670. else if (stricmp(queryName, "control:lockDali")==0)
  1671. {
  1672. topology->setPropBool("@lockDali", true);
  1673. if (daliHelper)
  1674. daliHelper->disconnect();
  1675. saveTopology();
  1676. }
  1677. else if (stricmp(queryName, "control:logfullqueries")==0)
  1678. {
  1679. logFullQueries = control->getPropBool("@val", true);
  1680. topology->setPropBool("@logFullQueries", logFullQueries);
  1681. }
  1682. else
  1683. unknown = true;
  1684. break;
  1685. case 'M':
  1686. if (stricmp(queryName, "control:memoryStatsInterval")==0)
  1687. {
  1688. memoryStatsInterval = (unsigned) control->getPropInt64("@val", 0);
  1689. roxiemem::setMemoryStatsInterval(memoryStatsInterval);
  1690. topology->setPropInt64("@memoryStatsInterval", memoryStatsInterval);
  1691. }
  1692. else if (stricmp(queryName, "control:memtrace")==0)
  1693. {
  1694. roxiemem::memTraceLevel = control->getPropInt("@level", 0);
  1695. topology->setPropInt("@memTraceLevel", roxiemem::memTraceLevel);
  1696. }
  1697. else if (stricmp(queryName, "control:memtracesizelimit")==0)
  1698. {
  1699. roxiemem::memTraceSizeLimit = (memsize_t) control->getPropInt64("@val", control->getPropInt64("@value", 0)); // used to accept @value so coded like this for backward compatibility
  1700. topology->setPropInt64("@memTraceSizeLimit", roxiemem::memTraceSizeLimit);
  1701. }
  1702. else if (stricmp(queryName, "control:metrics")==0)
  1703. {
  1704. roxieMetrics->getMetrics(reply);
  1705. }
  1706. else if (stricmp(queryName, "control:minFreeDiskSpace")==0)
  1707. {
  1708. minFreeDiskSpace = (unsigned) control->getPropInt64("@val", 1048576);
  1709. topology->setPropInt64("@minFreeDiskSpace", minFreeDiskSpace);
  1710. }
  1711. else if (stricmp(queryName, "control:misctrace")==0)
  1712. {
  1713. miscDebugTraceLevel = control->getPropInt("@level", 0);
  1714. topology->setPropInt("@miscDebugTraceLevel", miscDebugTraceLevel);
  1715. }
  1716. else
  1717. unknown = true;
  1718. break;
  1719. case 'N':
  1720. if (stricmp(queryName, "control:nodeCachePreload")==0)
  1721. {
  1722. nodeCachePreload = control->getPropBool("@val", true);
  1723. topology->setPropBool("@nodeCachePreload", nodeCachePreload);
  1724. setNodeCachePreload(nodeCachePreload);
  1725. }
  1726. else if (stricmp(queryName, "control:nodeCacheMem")==0)
  1727. {
  1728. nodeCacheMB = control->getPropInt("@val", 100);
  1729. topology->setPropInt("@nodeCacheMem", nodeCacheMB);
  1730. setNodeCacheMem(nodeCacheMB * 0x100000);
  1731. }
  1732. else if (stricmp(queryName, "control:numFilesToProcess")==0)
  1733. {
  1734. int numFiles = queryFileCache().numFilesToCopy();
  1735. reply.appendf("<FilesToProcess value='%d'/>", numFiles);
  1736. }
  1737. else
  1738. unknown = true;
  1739. break;
  1740. case 'P':
  1741. if (stricmp(queryName, "control:parallelAggregate")==0)
  1742. {
  1743. parallelAggregate = control->getPropInt("@val", 0);
  1744. if (!parallelAggregate)
  1745. parallelAggregate = hdwInfo.numCPUs;
  1746. if (!parallelAggregate)
  1747. parallelAggregate = 1;
  1748. topology->setPropInt("@parallelAggregate", parallelAggregate);
  1749. }
  1750. else if (stricmp(queryName, "control:pingInterval")==0)
  1751. {
  1752. unsigned newInterval = (unsigned) control->getPropInt64("@val", 0);
  1753. if (newInterval && !pingInterval)
  1754. {
  1755. pingInterval = newInterval; // best to set before the start...
  1756. startPingTimer();
  1757. }
  1758. else if (pingInterval && !newInterval)
  1759. stopPingTimer(); // but after the stop
  1760. pingInterval = newInterval;
  1761. topology->setPropInt64("@pingInterval", pingInterval);
  1762. }
  1763. else if (stricmp(queryName, "control:preabortIndexReadsThreshold")==0)
  1764. {
  1765. preabortIndexReadsThreshold = control->getPropInt("@val", 100);
  1766. topology->setPropInt("@preabortIndexReadsThreshold", preabortIndexReadsThreshold);
  1767. }
  1768. else if (stricmp(queryName, "control:preabortKeyedJoinsThreshold")==0)
  1769. {
  1770. preabortKeyedJoinsThreshold = control->getPropInt("@val", 100);
  1771. topology->setPropInt("@preabortKeyedJoinsThreshold", preabortKeyedJoinsThreshold);
  1772. }
  1773. else if (stricmp(queryName, "control:probeAllRows")==0)
  1774. {
  1775. probeAllRows = control->getPropBool("@val", true);
  1776. }
  1777. else
  1778. unknown = true;
  1779. break;
  1780. case 'Q':
  1781. if (stricmp(queryName, "control:queries")==0)
  1782. {
  1783. getQueryInfo(control, reply, false, logctx);
  1784. }
  1785. else if (stricmp(queryName, "control:queryAggregates")==0)
  1786. {
  1787. time_t from;
  1788. const char *fromTime = control->queryProp("@from");
  1789. if (fromTime)
  1790. {
  1791. CDateTime f;
  1792. f.setString(fromTime, NULL, true);
  1793. from = f.getSimple();
  1794. }
  1795. else
  1796. from = startupTime;
  1797. time_t to;
  1798. const char *toTime = control->queryProp("@to");
  1799. if (toTime)
  1800. {
  1801. CDateTime t;
  1802. t.setString(toTime, NULL, true);
  1803. to = t.getSimple();
  1804. }
  1805. else
  1806. time(&to);
  1807. const char *id = control->queryProp("Query/@id");
  1808. if (id)
  1809. {
  1810. Owned<IQueryFactory> f = getQuery(id, logctx);
  1811. if (f)
  1812. {
  1813. Owned<const IPropertyTree> stats = f->getQueryStats(from, to);
  1814. toXML(stats, reply);
  1815. }
  1816. else
  1817. throw MakeStringException(ROXIE_CONTROL_MSG_ERROR, "Unknown query %s", id);
  1818. }
  1819. else
  1820. {
  1821. bool includeAllQueries = control->getPropBool("@all", true);
  1822. Owned<const IPropertyTree> stats = getAllQueryStats(includeAllQueries, from, to);
  1823. toXML(stats, reply);
  1824. }
  1825. }
  1826. else if (stricmp(queryName, "control:queryPackageInfo")==0)
  1827. {
  1828. ReadLockBlock readBlock(packageCrit);
  1829. allQueryPackages->getInfo(reply, logctx);
  1830. }
  1831. else if (stricmp(queryName, "control:queryStats")==0)
  1832. {
  1833. const char *id = control->queryProp("Query/@id");
  1834. if (!id)
  1835. badFormat();
  1836. const char *action = control->queryProp("Query/@action");
  1837. const char *graphName = 0;
  1838. if (action)
  1839. {
  1840. if (stricmp(action, "listGraphNames") == 0)
  1841. {
  1842. Owned<IQueryFactory> query = getQuery(id, logctx);
  1843. if (query)
  1844. {
  1845. reply.appendf("<Query id='%s'>\n", id);
  1846. StringArray graphNames;
  1847. query->getGraphNames(graphNames);
  1848. ForEachItemIn(idx, graphNames)
  1849. {
  1850. const char *graphName = graphNames.item(idx);
  1851. reply.appendf("<Graph id='%s'/>", graphName);
  1852. }
  1853. reply.appendf("</Query>\n");
  1854. }
  1855. return; // done
  1856. }
  1857. else if (stricmp(action, "selectGraph") == 0)
  1858. graphName = control->queryProp("Query/@name");
  1859. else if (stricmp(action, "allGraphs") != 0) // if we get here and its NOT allgraphs - then error
  1860. throw MakeStringException(ROXIE_CONTROL_MSG_ERROR, "invalid action in control:queryStats %s", action);
  1861. }
  1862. ReadLockBlock readBlock(packageCrit);
  1863. allQueryPackages->getStats(reply, id, action, graphName, logctx);
  1864. }
  1865. else if (stricmp(queryName, "control:queryWuid")==0)
  1866. {
  1867. UNIMPLEMENTED;
  1868. }
  1869. else
  1870. unknown = true;
  1871. break;
  1872. case 'R':
  1873. if (stricmp(queryName, "control:reload")==0)
  1874. {
  1875. reload();
  1876. if (daliHelper && daliHelper->connected())
  1877. reply.appendf("<Dali connected='1'/>");
  1878. else
  1879. reply.appendf("<Dali connected='0'/>");
  1880. ReadLockBlock readBlock(packageCrit);
  1881. reply.appendf("<State hash='%"I64F"u'/>", (unsigned __int64) allQueryPackages->queryHash());
  1882. }
  1883. else if (stricmp(queryName, "control:resetindexmetrics")==0)
  1884. {
  1885. resetIndexMetrics();
  1886. }
  1887. else if (stricmp(queryName, "control:resetmetrics")==0)
  1888. {
  1889. roxieMetrics->resetMetrics();
  1890. }
  1891. else if (stricmp(queryName, "control:resetquerystats")==0)
  1892. {
  1893. ReadLockBlock readBlock(packageCrit);
  1894. Owned<IPropertyTreeIterator> queries = control->getElements("Query");
  1895. if (queries->first())
  1896. {
  1897. while (queries->isValid())
  1898. {
  1899. IPropertyTree &query = queries->query();
  1900. const char *id = query.queryProp("@id");
  1901. if (!id)
  1902. badFormat();
  1903. allQueryPackages->resetStats(id, logctx);
  1904. queries->next();
  1905. }
  1906. }
  1907. else
  1908. allQueryPackages->resetStats(NULL, logctx);
  1909. }
  1910. else if (stricmp(queryName, "control:restart")==0)
  1911. {
  1912. FatalError("Roxie process restarted by operator request");
  1913. }
  1914. else if (stricmp(queryName, "control:retrieveActivityDetails")==0)
  1915. {
  1916. UNIMPLEMENTED;
  1917. }
  1918. else if (stricmp(queryName, "control:retrieveFileInfo")==0)
  1919. {
  1920. UNIMPLEMENTED;
  1921. }
  1922. else if (stricmp(queryName, "control:roxiememstats") == 0)
  1923. {
  1924. StringBuffer memStats;
  1925. queryMemoryPoolStats(memStats);
  1926. reply.append("<MemoryStats>").append(memStats.str()).append("</MemoryStats>\n");
  1927. }
  1928. else
  1929. unknown = true;
  1930. break;
  1931. case 'S':
  1932. if (stricmp(queryName, "control:setCopyResources")==0)
  1933. {
  1934. copyResources = control->getPropBool("@val", true);
  1935. topology->setPropBool("@copyResources", copyResources);
  1936. }
  1937. else if (stricmp(queryName, "control:simpleLocalKeyedJoins")==0)
  1938. {
  1939. simpleLocalKeyedJoins = control->getPropBool("@val", true);
  1940. }
  1941. else if (stricmp(queryName, "control:soapInfo")==0)
  1942. {
  1943. UNIMPLEMENTED;
  1944. }
  1945. else if (stricmp(queryName, "control:soapTrace")==0)
  1946. {
  1947. soapTraceLevel = control->getPropInt("@level", 0);
  1948. topology->setPropInt("@soapTraceLevel", soapTraceLevel);
  1949. }
  1950. else if (stricmp(queryName, "control:socketCheckInterval")==0)
  1951. {
  1952. socketCheckInterval = (unsigned) control->getPropInt64("@val", 0);
  1953. topology->setPropInt64("@socketCheckInterval", socketCheckInterval);
  1954. }
  1955. else if (stricmp(queryName, "control:state")==0)
  1956. {
  1957. if (daliHelper && daliHelper->connected())
  1958. reply.appendf("<Dali connected='1'/>");
  1959. else
  1960. reply.appendf("<Dali connected='0'/>");
  1961. ReadLockBlock readBlock(packageCrit);
  1962. reply.appendf("<State hash='%"I64F"u'/>", (unsigned __int64) allQueryPackages->queryHash());
  1963. }
  1964. else if (stricmp(queryName, "control:steppingEnabled")==0)
  1965. {
  1966. steppingEnabled = control->getPropBool("@val", true);
  1967. }
  1968. else if (stricmp(queryName, "control:suspend")==0)
  1969. {
  1970. StringBuffer id(control->queryProp("Query/@id"));
  1971. if (!id.length())
  1972. badFormat();
  1973. {
  1974. Owned<IQueryFactory> f = getQuery(id, logctx);
  1975. if (f)
  1976. id.clear().append(f->queryQueryName()); // use the spelling of the query stored with the query factory
  1977. }
  1978. UNIMPLEMENTED;
  1979. }
  1980. else if (stricmp(queryName, "control:suspendChannel")==0)
  1981. {
  1982. if (control->hasProp("@channel") && control->hasProp("@suspend"))
  1983. {
  1984. unsigned channel = control->getPropInt("@channel", 0);
  1985. bool suspend = control->getPropBool("@suspend", true);
  1986. CriticalBlock b(ccdChannelsCrit);
  1987. if (channel)
  1988. {
  1989. StringBuffer xpath;
  1990. IPropertyTree *slaveNode = ccdChannels->queryPropTree(xpath.appendf("RoxieSlaveProcess[@channel='%u']", channel).str());
  1991. if (slaveNode)
  1992. {
  1993. ROQ->suspendChannel(channel, suspend, logctx);
  1994. slaveNode->setPropBool("@suspended", suspend);
  1995. }
  1996. else
  1997. throw MakeStringException(ROXIE_INVALID_INPUT, "Unknown channel %u", channel);
  1998. }
  1999. else
  2000. {
  2001. Owned<IPropertyTreeIterator> slaves = ccdChannels->getElements("RoxieSlaveProcess");
  2002. ForEach(*slaves)
  2003. {
  2004. IPropertyTree &slaveNode = slaves->query();
  2005. channel = slaveNode.getPropInt("@channel", 0);
  2006. ROQ->suspendChannel(channel, suspend, logctx);
  2007. slaveNode.setPropBool("@suspended", suspend);
  2008. }
  2009. }
  2010. toXML(ccdChannels, reply);
  2011. }
  2012. else
  2013. badFormat();
  2014. }
  2015. else if (stricmp(queryName, "control:suspendServer")==0)
  2016. {
  2017. if (control->hasProp("@port") && control->hasProp("@suspend"))
  2018. {
  2019. unsigned port = control->getPropInt("@port", 0);
  2020. bool suspend = control->getPropBool("@suspend", true);
  2021. CriticalBlock b(ccdChannelsCrit);
  2022. if (port)
  2023. {
  2024. StringBuffer xpath;
  2025. IPropertyTree *serverNode = ccdChannels->queryPropTree(xpath.appendf("RoxieServerProcess[@port='%u']", port).str());
  2026. if (serverNode)
  2027. {
  2028. suspendRoxieListener(port, suspend);
  2029. serverNode->setPropBool("@suspended", suspend);
  2030. }
  2031. else
  2032. throw MakeStringException(ROXIE_INVALID_INPUT, "Unknown Roxie server port %u", port);
  2033. }
  2034. else
  2035. {
  2036. Owned<IPropertyTreeIterator> servers = ccdChannels->getElements("RoxieServerProcess");
  2037. ForEach(*servers)
  2038. {
  2039. IPropertyTree &serverNode = servers->query();
  2040. port = serverNode.getPropInt("@port", 0);
  2041. suspendRoxieListener(port, suspend);
  2042. serverNode.setPropBool("@suspended", suspend);
  2043. }
  2044. }
  2045. toXML(ccdChannels, reply);
  2046. }
  2047. else
  2048. badFormat();
  2049. }
  2050. else if (stricmp(queryName, "control:systemMonitor")==0)
  2051. {
  2052. unsigned interval = control->getPropInt("@interval", 60000);
  2053. bool enable = control->getPropBool("@enable", true);
  2054. if (enable)
  2055. startPerformanceMonitor(interval);
  2056. else
  2057. stopPerformanceMonitor();
  2058. }
  2059. else
  2060. unknown = true;
  2061. break;
  2062. case 'T':
  2063. if (stricmp(queryName, "control:testSlaveFailure")==0)
  2064. {
  2065. testSlaveFailure = control->getPropInt("@val", 20);
  2066. }
  2067. else if (stricmp(queryName, "control:timeActivities")==0)
  2068. {
  2069. timeActivities = control->getPropBool("@val", true);
  2070. topology->setPropInt("@timeActivities", timeActivities);
  2071. }
  2072. else if (stricmp(queryName, "control:timings")==0)
  2073. {
  2074. reply.append("<Timings>");
  2075. timer->getTimings(reply);
  2076. reply.append("</Timings>");
  2077. if (control->getPropBool("@reset", false))
  2078. {
  2079. timer->reset();
  2080. }
  2081. }
  2082. else if (stricmp(queryName, "control:topology")==0)
  2083. {
  2084. toXML(topology, reply);
  2085. }
  2086. else if (stricmp(queryName, "control:trace")==0)
  2087. {
  2088. traceLevel = control->getPropInt("@level", 0);
  2089. if (traceLevel > MAXTRACELEVEL)
  2090. traceLevel = MAXTRACELEVEL;
  2091. topology->setPropInt("@traceLevel", traceLevel);
  2092. }
  2093. else if (stricmp(queryName, "control:traceServerSideCache")==0)
  2094. {
  2095. traceServerSideCache = control->getPropBool("@val", true);
  2096. topology->setPropInt("@traceServerSideCache", traceServerSideCache);
  2097. }
  2098. else if (stricmp(queryName, "control:traceJHtreeAllocations")==0)
  2099. {
  2100. traceJHtreeAllocations = control->getPropBool("@val", true);
  2101. topology->setPropInt("@traceJHtreeAllocations", traceJHtreeAllocations);
  2102. }
  2103. else if (stricmp(queryName, "control:traceSmartStepping")==0)
  2104. {
  2105. traceSmartStepping = control->getPropBool("@val", true);
  2106. topology->setPropInt("@traceSmartStepping", traceSmartStepping);
  2107. }
  2108. else if (stricmp(queryName, "control:traceStartStop")==0)
  2109. {
  2110. traceStartStop = control->getPropBool("@val", true);
  2111. topology->setPropInt("@traceStartStop", traceStartStop);
  2112. }
  2113. else
  2114. unknown = true;
  2115. break;
  2116. case 'U':
  2117. if (stricmp(queryName, "control:udptrace")==0)
  2118. {
  2119. udpTraceLevel = control->getPropInt("@level", 0);
  2120. topology->setPropInt("@udpTraceLevel", udpTraceLevel);
  2121. }
  2122. else if (stricmp(queryName, "control:unlockDali")==0)
  2123. {
  2124. topology->setPropBool("@lockDali", false);
  2125. // Dali will reattach via the timer that checks every so often if can reattach...
  2126. saveTopology();
  2127. }
  2128. else if (stricmp(queryName, "control:unsuspend")==0)
  2129. {
  2130. UNIMPLEMENTED;
  2131. }
  2132. else if (stricmp(queryName, "control:userMetric")==0)
  2133. {
  2134. const char *name = control->queryProp("@name");
  2135. const char *regex= control->queryProp("@regex");
  2136. if (name && regex)
  2137. {
  2138. roxieMetrics->addUserMetric(name, regex);
  2139. // MORE - we could add to topology, we could check for dups, and we could support removing them.
  2140. }
  2141. else
  2142. throw MakeStringException(ROXIE_MISSING_PARAMS, "Metric name or regex missing");
  2143. }
  2144. else if (stricmp(queryName, "control:useTreeCopy")==0)
  2145. {
  2146. useTreeCopy = control->getPropBool("@val", true);
  2147. topology->setPropInt("@useTreeCopy", useTreeCopy);
  2148. }
  2149. else
  2150. unknown = true;
  2151. break;
  2152. case 'W':
  2153. if (stricmp(queryName, "control:watchActivityId")==0)
  2154. {
  2155. watchActivityId = control->getPropInt("@id", true);
  2156. }
  2157. else
  2158. unknown = true;
  2159. break;
  2160. default:
  2161. unknown = true;
  2162. break;
  2163. }
  2164. if (unknown)
  2165. throw MakeStringException(ROXIE_UNKNOWN_QUERY, "Unknown query %s", queryName);
  2166. }
  2167. void badFormat()
  2168. {
  2169. throw MakeStringException(ROXIE_INVALID_INPUT, "Badly formated control query");
  2170. }
  2171. };
  2172. extern IRoxieQueryPackageManagerSet *createRoxiePackageSetManager(const IQueryDll *standAloneDll)
  2173. {
  2174. return new CRoxiePackageSetManager(standAloneDll);
  2175. }
  2176. IRoxieQueryPackageManagerSet *globalPackageSetManager = NULL;
  2177. extern void loadPlugins()
  2178. {
  2179. if (pluginDirectory.length() && isDirectory(pluginDirectory.str()))
  2180. {
  2181. plugins = new SafePluginMap(&PluginCtx, traceLevel >= 1);
  2182. plugins->loadFromDirectory(pluginDirectory);
  2183. }
  2184. }
  2185. extern void cleanupPlugins()
  2186. {
  2187. delete plugins;
  2188. plugins = NULL;
  2189. }
  2190. /*=======================================================================================================
  2191. * mergeStats and associated code is used to combine the graph stats from multiple nodes in a cluster into
  2192. * a single aggregate structure
  2193. * It should be moved into ccdquery.cpp really
  2194. *========================================================================================================*/
  2195. typedef void (*mergefunc)(IPropertyTree *s1, IPropertyTree *s2, unsigned level);
  2196. struct MergeInfo
  2197. {
  2198. const char *element;
  2199. const char *attribute;
  2200. mergefunc f;
  2201. };
  2202. void mergeSubGraphs(IPropertyTree *s1, IPropertyTree *s2, unsigned);
  2203. void mergeNodes(IPropertyTree *s1, IPropertyTree *s2)
  2204. {
  2205. Owned<IPropertyTreeIterator> elems = s1->getElements("att");
  2206. ForEach(*elems)
  2207. {
  2208. IPropertyTree &e1 = elems->query();
  2209. unsigned __int64 v1 = e1.getPropInt64("@value", 0);
  2210. const char *name = e1.queryProp("@name");
  2211. if (stricmp(name, "_kind")==0 && v1 == TAKsubgraph)
  2212. {
  2213. IPropertyTree *s1child = s1->queryPropTree("att/graph");
  2214. IPropertyTree *s2child = s2->queryPropTree("att[@name='_kind']/graph");
  2215. if (s1child && s2child)
  2216. {
  2217. mergeSubGraphs(s1child, s2child, 0);
  2218. s2->removeProp("att[@name='_kind']");
  2219. }
  2220. }
  2221. else
  2222. {
  2223. StringBuffer xpath;
  2224. xpath.appendf("att[@name='%s']", name);
  2225. const char *type = e1.queryProp("@type");
  2226. if (type)
  2227. {
  2228. IPropertyTree *e2 = s2->queryPropTree(xpath.str());
  2229. if (e2)
  2230. {
  2231. unsigned __int64 v2 = e2->getPropInt64("@value", 0);
  2232. if (strcmp(name, "max")==0)
  2233. {
  2234. if (v2 > v1)
  2235. e1.setPropInt64("@value", v2);
  2236. }
  2237. else if (strcmp(type, "min")==0)
  2238. {
  2239. if (v2 < v1)
  2240. e1.setPropInt64("@value", v2);
  2241. }
  2242. else if (strcmp(type, "sum")==0)
  2243. e1.setPropInt64("@value", v1+v2);
  2244. else
  2245. throw MakeStringException(ROXIE_UNKNOWN_QUERY, "Unknown type %s in graph statistics", type);
  2246. s2->removeTree(e2);
  2247. }
  2248. }
  2249. else
  2250. {
  2251. // remove from s2 any complete dups
  2252. const char *s1val = e1.queryProp("@value");
  2253. Owned<IPropertyTreeIterator> s2elems = s2->getElements(xpath.str());
  2254. IArrayOf<IPropertyTree> goers;
  2255. ForEach(*s2elems)
  2256. {
  2257. IPropertyTree &e2 = s2elems->query();
  2258. const char *s2val = e2.queryProp("@value");
  2259. if ((!s1val && !s2val) || (s1val && s2val && strcmp(s1val, s2val)==0))
  2260. goers.append(*LINK(&e2));
  2261. }
  2262. ForEachItemIn(idx, goers)
  2263. {
  2264. s2->removeTree(&goers.item(idx));
  2265. }
  2266. }
  2267. }
  2268. }
  2269. elems.setown(s2->getElements("*"));
  2270. ForEach(*elems)
  2271. {
  2272. IPropertyTree &e2 = elems->query();
  2273. s1->addPropTree(e2.queryName(), LINK(&e2));
  2274. }
  2275. }
  2276. void mergeSubGraphs(IPropertyTree *s1, IPropertyTree *s2, unsigned)
  2277. {
  2278. Owned<IPropertyTreeIterator> elems = s1->getElements("*");
  2279. ForEach(*elems)
  2280. {
  2281. IPropertyTree &e1 = elems->query();
  2282. const char *elemName = e1.queryName();
  2283. StringBuffer xpath;
  2284. if (strcmp(elemName, "att")==0)
  2285. {
  2286. xpath.appendf("att[@name='%s']", e1.queryProp("@name"));
  2287. IPropertyTree *e2 = s2->queryPropTree(xpath.str());
  2288. if (e2)
  2289. s2->removeTree(e2);
  2290. }
  2291. else
  2292. {
  2293. xpath.appendf("%s[@id='%s']", elemName, e1.queryProp("@id"));
  2294. IPropertyTree *e2 = s2->queryPropTree(xpath.str());
  2295. if (e2)
  2296. {
  2297. mergeNodes(&e1, e2);
  2298. s2->removeTree(e2);
  2299. }
  2300. }
  2301. }
  2302. elems.setown(s2->getElements("*"));
  2303. ForEach(*elems)
  2304. {
  2305. IPropertyTree &e2 = elems->query();
  2306. s1->addPropTree(e2.queryName(), LINK(&e2));
  2307. }
  2308. }
  2309. void mergeNode(IPropertyTree *s1, IPropertyTree *s2, unsigned level);
  2310. MergeInfo mergeTable[] =
  2311. {
  2312. {"Query", "@id", mergeStats},
  2313. {"Graph", "@id", mergeStats},
  2314. {"xgmml", NULL, mergeStats},
  2315. {"graph", NULL, mergeStats},
  2316. {"node", "@id", mergeNode},
  2317. {"att", NULL, mergeStats},
  2318. {"graph", NULL, mergeSubGraphs},
  2319. };
  2320. void mergeNode(IPropertyTree *s1, IPropertyTree *s2, unsigned level)
  2321. {
  2322. if (s1->hasProp("att/@name"))
  2323. mergeNodes(s1, s2);
  2324. else
  2325. mergeStats(s1, s2, level);
  2326. }
  2327. void mergeStats(IPropertyTree *s1, IPropertyTree *s2, unsigned level)
  2328. {
  2329. MergeInfo & mi = mergeTable[level];
  2330. Owned<IPropertyTreeIterator> elems = s1->getElements(mi.element);
  2331. ForEach(*elems)
  2332. {
  2333. IPropertyTree &e1 = elems->query();
  2334. StringBuffer xpath;
  2335. if (mi.attribute)
  2336. xpath.appendf("%s[%s='%s']", mi.element, mi.attribute, e1.queryProp(mi.attribute));
  2337. else
  2338. xpath.append(mi.element);
  2339. IPropertyTree *e2 = s2->queryPropTree(xpath.str());
  2340. if (e2)
  2341. {
  2342. mi.f(&e1, e2, level+1);
  2343. s2->removeTree(e2);
  2344. }
  2345. }
  2346. elems.setown(s2->getElements(mi.element));
  2347. ForEach(*elems)
  2348. {
  2349. IPropertyTree &e2 = elems->query();
  2350. s1->addPropTree(mi.element, LINK(&e2));
  2351. }
  2352. }
  2353. void mergeStats(IPropertyTree *s1, IPropertyTree *s2)
  2354. {
  2355. Owned<IPropertyTreeIterator> elems = s2->getElements("Exception");
  2356. ForEach(*elems)
  2357. {
  2358. s1->addPropTree("Exception", LINK(&elems->query()));
  2359. }
  2360. mergeStats(s1, s2, 0);
  2361. }
  2362. #ifdef _USE_CPPUNIT
  2363. #include <cppunit/extensions/HelperMacros.h>
  2364. static const char *g1 =
  2365. "<Stats>"
  2366. "<Query id='stats'>"
  2367. "<Graph id='graph1'>"
  2368. "<xgmml>"
  2369. "<graph>"
  2370. "<node id='1'>"
  2371. "<att>"
  2372. "<graph>"
  2373. "<node id='2' label='Temp Table'>"
  2374. "<att name='name' value='d'/>"
  2375. "<att name='_kind' value='25'/>"
  2376. "<att name='helper' value='f2'/>"
  2377. "</node>"
  2378. "<node id='2a'>"
  2379. " <att name='_kind' value='1'>" // TAKsubgraph
  2380. " <graph>"
  2381. " <node id='7696' label='Nested'>"
  2382. " <att name='seeks' value='15' type='sum'/>"
  2383. " </node>"
  2384. " </graph>"
  2385. " </att>"
  2386. "</node>"
  2387. "<node id='3' label='Filter'>"
  2388. "<att name='name' value='ds'/>"
  2389. "<att name='_kind' value='5'/>"
  2390. "<att name='helper' value='f3'/>"
  2391. "</node>"
  2392. "<att name='rootGraph' value='1'/>"
  2393. "<edge id='2_0' source='2' target='3'>"
  2394. "<att name='count' value='15' type='sum'/>"
  2395. "<att name='started' value='1'/>"
  2396. "<att name='stopped' value='1'/>"
  2397. "</edge>"
  2398. "<edge id='3_0' source='3' target='5'>"
  2399. "<att name='count' value='15' type='sum'/>"
  2400. "<att name='started' value='1'/>"
  2401. "<att name='stopped' value='1'/>"
  2402. "</edge>"
  2403. "<edge id='5_0' source='5' target='6'>"
  2404. "<att name='count' value='3' type='sum'/>"
  2405. "<att name='started' value='1'/>"
  2406. "<att name='stopped' value='1'/>"
  2407. "</edge>"
  2408. "<edge id='5_1' source='5' target='7'>"
  2409. "<att name='_sourceIndex' value='1'/>"
  2410. "<att name='count' value='15' type='sum'/>"
  2411. "<att name='started' value='1'/>"
  2412. "<att name='stopped' value='1'/>"
  2413. "</edge>"
  2414. "</graph>"
  2415. "</att>"
  2416. "</node>"
  2417. "</graph>"
  2418. "</xgmml>"
  2419. "</Graph>"
  2420. "</Query>"
  2421. "</Stats>";
  2422. static const char *g2 =
  2423. "<Stats>"
  2424. "<Query id='stats'>"
  2425. "<Graph id='graph1'>"
  2426. "<xgmml>"
  2427. "<graph>"
  2428. "<node id='1'>"
  2429. "<att>"
  2430. "<graph>"
  2431. "<node id='2' label='Temp Table'>"
  2432. "<att name='name' value='d'/>"
  2433. "<att name='_kind' value='25'/>"
  2434. "<att name='helper' value='f2'/>"
  2435. "</node>"
  2436. "<node id='2a'>"
  2437. " <att name='_kind' value='1'>" // TAKsubgraph
  2438. " <graph>"
  2439. " <node id='7696' label='Nested'>"
  2440. " <att name='seeks' value='25' type='sum'/>"
  2441. " </node>"
  2442. " </graph>"
  2443. " </att>"
  2444. "</node>"
  2445. "<node id='4' label='Filter2'>"
  2446. "<att name='name' value='ds2'/>"
  2447. "<att name='_kind' value='53'/>"
  2448. "<att name='helper' value='f23'/>"
  2449. "</node>"
  2450. "<att name='rootGraph' value='1'/>"
  2451. "<edge id='2_0' source='2' target='3'>"
  2452. "<att name='count' value='15' type='sum'/>"
  2453. "<att name='started' value='1'/>"
  2454. "<att name='stopped' value='1'/>"
  2455. "</edge>"
  2456. "<edge id='3_0' source='3' target='5'>"
  2457. "<att name='count' value='15' type='sum'/>"
  2458. "<att name='started' value='1'/>"
  2459. "<att name='stopped' value='1'/>"
  2460. "</edge>"
  2461. "<edge id='5_0' source='5' target='6'>"
  2462. "<att name='count' value='3' type='sum'/>"
  2463. "<att name='started' value='1'/>"
  2464. "<att name='stopped' value='1'/>"
  2465. "</edge>"
  2466. "</graph>"
  2467. "</att>"
  2468. "</node>"
  2469. "</graph>"
  2470. "</xgmml>"
  2471. "</Graph>"
  2472. "</Query>"
  2473. "</Stats>";
  2474. static const char *expected =
  2475. "<Stats>"
  2476. "<Query id='stats'>"
  2477. "<Graph id='graph1'>"
  2478. "<xgmml>"
  2479. "<graph>"
  2480. "<node id='1'>"
  2481. "<att>"
  2482. "<graph>"
  2483. "<node id='2' label='Temp Table'>"
  2484. "<att name='name' value='d'/>"
  2485. "<att name='_kind' value='25'/>"
  2486. "<att name='helper' value='f2'/>"
  2487. "</node>"
  2488. "<node id='2a'>"
  2489. " <att name='_kind' value='1'>" // TAKsubgraph
  2490. " <graph>"
  2491. " <node id='7696' label='Nested'>"
  2492. " <att name='seeks' type='sum' value='40'/>"
  2493. " </node>"
  2494. " </graph>"
  2495. " </att>"
  2496. "</node>"
  2497. "<node id='3' label='Filter'>"
  2498. "<att name='name' value='ds'/>"
  2499. "<att name='_kind' value='5'/>"
  2500. "<att name='helper' value='f3'/>"
  2501. "</node>"
  2502. "<node id='4' label='Filter2'>"
  2503. "<att name='name' value='ds2'/>"
  2504. "<att name='_kind' value='53'/>"
  2505. "<att name='helper' value='f23'/>"
  2506. "</node>"
  2507. "<att name='rootGraph' value='1'/>"
  2508. "<edge id='2_0' source='2' target='3'>"
  2509. "<att name='count' value='30' type='sum'/>"
  2510. "<att name='started' value='1'/>"
  2511. "<att name='stopped' value='1'/>"
  2512. "</edge>"
  2513. "<edge id='3_0' source='3' target='5'>"
  2514. "<att name='count' value='30' type='sum'/>"
  2515. "<att name='started' value='1'/>"
  2516. "<att name='stopped' value='1'/>"
  2517. "</edge>"
  2518. "<edge id='5_0' source='5' target='6'>"
  2519. "<att name='count' value='6' type='sum'/>"
  2520. "<att name='started' value='1'/>"
  2521. "<att name='stopped' value='1'/>"
  2522. "</edge>"
  2523. "<edge id='5_1' source='5' target='7'>"
  2524. "<att name='_sourceIndex' value='1'/>"
  2525. "<att name='count' value='15' type='sum'/>"
  2526. "<att name='started' value='1'/>"
  2527. "<att name='stopped' value='1'/>"
  2528. "</edge>"
  2529. "</graph>"
  2530. "</att>"
  2531. "</node>"
  2532. "</graph>"
  2533. "</xgmml>"
  2534. "</Graph>"
  2535. "</Query>"
  2536. "</Stats>"
  2537. ;
  2538. class MergeStatsTest : public CppUnit::TestFixture
  2539. {
  2540. CPPUNIT_TEST_SUITE( MergeStatsTest );
  2541. CPPUNIT_TEST(test1);
  2542. CPPUNIT_TEST_SUITE_END();
  2543. protected:
  2544. void test1()
  2545. {
  2546. Owned<IPropertyTree> p1 = createPTreeFromXMLString(g1);
  2547. Owned<IPropertyTree> p2 = createPTreeFromXMLString(g2);
  2548. Owned<IPropertyTree> e = createPTreeFromXMLString(expected);
  2549. mergeStats(p1, p2);
  2550. StringBuffer s1, s2;
  2551. toXML(p1, s1);
  2552. toXML(e, s2);
  2553. CPPUNIT_ASSERT(strcmp(s1, s2)==0);
  2554. }
  2555. };
  2556. CPPUNIT_TEST_SUITE_REGISTRATION( MergeStatsTest );
  2557. CPPUNIT_TEST_SUITE_NAMED_REGISTRATION( MergeStatsTest, "MergeStatsTest" );
  2558. #endif