ccdstate.cpp 129 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323132413251326132713281329133013311332133313341335133613371338133913401341134213431344134513461347134813491350135113521353135413551356135713581359136013611362136313641365136613671368136913701371137213731374137513761377137813791380138113821383138413851386138713881389139013911392139313941395139613971398139914001401140214031404140514061407140814091410141114121413141414151416141714181419142014211422142314241425142614271428142914301431143214331434143514361437143814391440144114421443144414451446144714481449145014511452145314541455145614571458145914601461146214631464146514661467146814691470147114721473147414751476147714781479148014811482148314841485148614871488148914901491149214931494149514961497149814991500150115021503150415051506150715081509151015111512151315141515151615171518151915201521152215231524152515261527152815291530153115321533153415351536153715381539154015411542154315441545154615471548154915501551155215531554155515561557155815591560156115621563156415651566156715681569157015711572157315741575157615771578157915801581158215831584158515861587158815891590159115921593159415951596159715981599160016011602160316041605160616071608160916101611161216131614161516161617161816191620162116221623162416251626162716281629163016311632163316341635163616371638163916401641164216431644164516461647164816491650165116521653165416551656165716581659166016611662166316641665166616671668166916701671167216731674167516761677167816791680168116821683168416851686168716881689169016911692169316941695169616971698169917001701170217031704170517061707170817091710171117121713171417151716171717181719172017211722172317241725172617271728172917301731173217331734173517361737173817391740174117421743174417451746174717481749175017511752175317541755175617571758175917601761176217631764176517661767176817691770177117721773177417751776177717781779178017811782178317841785178617871788178917901791179217931794179517961797179817991800180118021803180418051806180718081809181018111812181318141815181618171818181918201821182218231824182518261827182818291830183118321833183418351836183718381839184018411842184318441845184618471848184918501851185218531854185518561857185818591860186118621863186418651866186718681869187018711872187318741875187618771878187918801881188218831884188518861887188818891890189118921893189418951896189718981899190019011902190319041905190619071908190919101911191219131914191519161917191819191920192119221923192419251926192719281929193019311932193319341935193619371938193919401941194219431944194519461947194819491950195119521953195419551956195719581959196019611962196319641965196619671968196919701971197219731974197519761977197819791980198119821983198419851986198719881989199019911992199319941995199619971998199920002001200220032004200520062007200820092010201120122013201420152016201720182019202020212022202320242025202620272028202920302031203220332034203520362037203820392040204120422043204420452046204720482049205020512052205320542055205620572058205920602061206220632064206520662067206820692070207120722073207420752076207720782079208020812082208320842085208620872088208920902091209220932094209520962097209820992100210121022103210421052106210721082109211021112112211321142115211621172118211921202121212221232124212521262127212821292130213121322133213421352136213721382139214021412142214321442145214621472148214921502151215221532154215521562157215821592160216121622163216421652166216721682169217021712172217321742175217621772178217921802181218221832184218521862187218821892190219121922193219421952196219721982199220022012202220322042205220622072208220922102211221222132214221522162217221822192220222122222223222422252226222722282229223022312232223322342235223622372238223922402241224222432244224522462247224822492250225122522253225422552256225722582259226022612262226322642265226622672268226922702271227222732274227522762277227822792280228122822283228422852286228722882289229022912292229322942295229622972298229923002301230223032304230523062307230823092310231123122313231423152316231723182319232023212322232323242325232623272328232923302331233223332334233523362337233823392340234123422343234423452346234723482349235023512352235323542355235623572358235923602361236223632364236523662367236823692370237123722373237423752376237723782379238023812382238323842385238623872388238923902391239223932394239523962397239823992400240124022403240424052406240724082409241024112412241324142415241624172418241924202421242224232424242524262427242824292430243124322433243424352436243724382439244024412442244324442445244624472448244924502451245224532454245524562457245824592460246124622463246424652466246724682469247024712472247324742475247624772478247924802481248224832484248524862487248824892490249124922493249424952496249724982499250025012502250325042505250625072508250925102511251225132514251525162517251825192520252125222523252425252526252725282529253025312532253325342535253625372538253925402541254225432544254525462547254825492550255125522553255425552556255725582559256025612562256325642565256625672568256925702571257225732574257525762577257825792580258125822583258425852586258725882589259025912592259325942595259625972598259926002601260226032604260526062607260826092610261126122613261426152616261726182619262026212622262326242625262626272628262926302631263226332634263526362637263826392640264126422643264426452646264726482649265026512652265326542655265626572658265926602661266226632664266526662667266826692670267126722673267426752676267726782679268026812682268326842685268626872688268926902691269226932694269526962697269826992700270127022703270427052706270727082709271027112712271327142715271627172718271927202721272227232724272527262727272827292730273127322733273427352736273727382739274027412742274327442745274627472748274927502751275227532754275527562757275827592760276127622763276427652766276727682769277027712772277327742775277627772778277927802781278227832784278527862787278827892790279127922793279427952796279727982799280028012802280328042805280628072808280928102811281228132814281528162817281828192820282128222823282428252826282728282829283028312832283328342835283628372838283928402841284228432844284528462847284828492850285128522853285428552856285728582859286028612862286328642865286628672868286928702871287228732874287528762877287828792880288128822883288428852886288728882889289028912892289328942895289628972898289929002901290229032904290529062907290829092910291129122913291429152916291729182919292029212922292329242925292629272928292929302931293229332934293529362937293829392940294129422943294429452946294729482949295029512952295329542955295629572958295929602961296229632964296529662967296829692970297129722973297429752976297729782979298029812982298329842985298629872988298929902991299229932994299529962997299829993000300130023003300430053006300730083009301030113012301330143015301630173018301930203021302230233024302530263027302830293030303130323033303430353036303730383039304030413042304330443045304630473048304930503051305230533054305530563057305830593060306130623063306430653066306730683069307030713072307330743075307630773078307930803081308230833084308530863087308830893090309130923093309430953096309730983099310031013102310331043105310631073108310931103111311231133114311531163117311831193120312131223123312431253126312731283129313031313132313331343135313631373138313931403141314231433144314531463147314831493150315131523153315431553156315731583159316031613162316331643165316631673168316931703171317231733174317531763177317831793180318131823183318431853186318731883189319031913192319331943195319631973198319932003201320232033204320532063207320832093210321132123213321432153216321732183219322032213222322332243225322632273228322932303231323232333234323532363237323832393240324132423243324432453246324732483249325032513252325332543255325632573258325932603261326232633264326532663267326832693270327132723273327432753276327732783279328032813282328332843285328632873288328932903291329232933294329532963297329832993300330133023303330433053306330733083309331033113312331333143315331633173318331933203321332233233324332533263327332833293330333133323333333433353336333733383339334033413342334333443345334633473348334933503351335233533354335533563357335833593360336133623363336433653366336733683369337033713372337333743375337633773378337933803381338233833384338533863387338833893390339133923393339433953396339733983399340034013402340334043405
  1. /*##############################################################################
  2. HPCC SYSTEMS software Copyright (C) 2012 HPCC Systems®.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. ############################################################################## */
  13. #include <platform.h>
  14. #include <jlib.hpp>
  15. #include "jisem.hpp"
  16. #include "jhash.hpp"
  17. #include "jsort.hpp"
  18. #include "jregexp.hpp"
  19. #include "udptopo.hpp"
  20. #include "ccd.hpp"
  21. #include "ccdquery.hpp"
  22. #include "ccddali.hpp"
  23. #include "ccdstate.hpp"
  24. #include "ccdqueue.ipp"
  25. #include "ccdlistener.hpp"
  26. #include "ccdfile.hpp"
  27. #include "ccdsnmp.hpp"
  28. #include "hqlplugins.hpp"
  29. #include "thorplugin.hpp"
  30. #include "eclrtl.hpp"
  31. #include "dafdesc.hpp"
  32. #include "dautils.hpp"
  33. #include "rmtfile.hpp"
  34. #include "pkgimpl.hpp"
  35. #include "roxiehelper.hpp"
  36. #include "ws_dfsclient.hpp"
  37. //-------------------------------------------------------------------------------------------
  38. // class CRoxiePluginCtx - provide the environments for plugins loaded by roxie.
  39. // Base class handles making sure memory allocation comes from the right heap.
  40. // implement get/set properties to allow plugin configuration information to be retrieved from Roxie topology file
  41. //-------------------------------------------------------------------------------------------
  42. class CRoxiePluginCtx : public SimplePluginCtx
  43. {
  44. public:
  45. virtual int ctxGetPropInt(const char *propName, int defaultValue) const
  46. {
  47. return topology->getPropInt(propName, defaultValue);
  48. }
  49. virtual const char *ctxQueryProp(const char *propName) const
  50. {
  51. return topology->queryProp(propName);
  52. }
  53. } PluginCtx;
  54. SafePluginMap *plugins;
  55. //================================================================================================
  56. // In legacy state files, the original file names passed in _fileName or _indexFileName may have been translated into _superFileName or _superKeyName,
  57. // and then 0 or more (max 1 for subfiles, no limit for subkeys) _fileName or _indexFileName will have been added. This translation will not take place
  58. // if the files resolve to single file/key, or if we are using new embedded wu system
  59. // Basic mode of operation therefore is to get the original name, see if it can be resolved by package into a list of subfiles, and if not, use
  60. // iterator on the xgmml node to get the list.
  61. // These two helper functions will return the original filenames placed in the XGMML by the codegen, regardless of how/if roxieconfig resolved them
  62. static const char *_queryNodeFileName(const IPropertyTree &graphNode)
  63. {
  64. if (graphNode.hasProp("att[@name='_file_dynamic']"))
  65. return NULL;
  66. else
  67. return graphNode.queryProp("att[@name='_fileName']/@value");
  68. }
  69. static const char *_queryNodeIndexName(const IPropertyTree &graphNode)
  70. {
  71. if (graphNode.hasProp("att[@name='_indexFile_dynamic']"))
  72. return NULL;
  73. else
  74. return graphNode.queryProp("att[@name='_indexFileName']/@value");
  75. }
  76. static bool isSimpleIndexActivity(ThorActivityKind kind)
  77. {
  78. switch (kind)
  79. {
  80. case TAKindexaggregate:
  81. case TAKindexcount:
  82. case TAKindexexists:
  83. case TAKindexgroupaggregate:
  84. case TAKindexgroupcount:
  85. case TAKindexgroupexists:
  86. case TAKindexnormalize:
  87. case TAKindexread:
  88. return true;
  89. default:
  90. return false;
  91. }
  92. }
  93. const char *queryNodeFileName(const IPropertyTree &graphNode, ThorActivityKind kind)
  94. {
  95. if (isSimpleIndexActivity(kind))
  96. return NULL;
  97. else
  98. return _queryNodeFileName(graphNode);
  99. }
  100. const char *queryNodeIndexName(const IPropertyTree &graphNode, ThorActivityKind kind)
  101. {
  102. if (isSimpleIndexActivity(kind))
  103. return _queryNodeFileName(graphNode);
  104. else
  105. return _queryNodeIndexName(graphNode);
  106. }
  107. // DelayedReleaser mechanism hangs on to a link to an object for a while...
  108. class DelayedReleaseQueueItem : public CInterfaceOf<IInterface>
  109. {
  110. Owned<IInterface> goer;
  111. time_t goTime;
  112. public:
  113. DelayedReleaseQueueItem(IInterface *_goer, unsigned delaySeconds)
  114. : goer(_goer)
  115. {
  116. time(&goTime);
  117. goTime += delaySeconds;
  118. }
  119. unsigned remaining()
  120. {
  121. time_t now;
  122. time(&now);
  123. if (now > goTime)
  124. return 0;
  125. else
  126. return (unsigned)(goTime - now);
  127. }
  128. };
  129. class DelayedReleaserThread : public Thread
  130. {
  131. private:
  132. std::atomic<bool> closing;
  133. bool started;
  134. CriticalSection lock;
  135. IArrayOf<DelayedReleaseQueueItem> queue;
  136. Semaphore sem;
  137. public:
  138. DelayedReleaserThread() : Thread("DelayedReleaserThread")
  139. {
  140. closing = false;
  141. started = false;
  142. }
  143. ~DelayedReleaserThread()
  144. {
  145. stop();
  146. }
  147. virtual int run()
  148. {
  149. if (traceLevel)
  150. DBGLOG("DelayedReleaserThread %p starting", this);
  151. unsigned nextTimeout = INFINITE;
  152. while (!closing || queue.length())
  153. {
  154. sem.wait(nextTimeout);
  155. CriticalBlock b(lock);
  156. nextTimeout = INFINITE;
  157. ForEachItemInRev(idx, queue)
  158. {
  159. DelayedReleaseQueueItem &goer = queue.item(idx);
  160. unsigned timeRemaining = goer.remaining();
  161. if (!timeRemaining)
  162. queue.remove(idx);
  163. else if (timeRemaining < nextTimeout)
  164. nextTimeout = timeRemaining;
  165. }
  166. if (nextTimeout != INFINITE)
  167. nextTimeout = nextTimeout * 1000;
  168. clearKeyStoreCache(false); // Allows us to fully release files we no longer need because of unloaded queries
  169. }
  170. if (traceLevel)
  171. DBGLOG("DelayedReleaserThread %p exiting", this);
  172. return 0;
  173. }
  174. void stop()
  175. {
  176. if (started)
  177. {
  178. closing = true;
  179. sem.signal();
  180. join();
  181. }
  182. }
  183. void delayedRelease(IInterface *goer, unsigned delaySeconds)
  184. {
  185. if (goer)
  186. {
  187. CriticalBlock b(lock);
  188. if (!started)
  189. {
  190. start();
  191. started = true;
  192. }
  193. queue.append(*new DelayedReleaseQueueItem(goer, delaySeconds));
  194. sem.signal();
  195. }
  196. }
  197. };
  198. Owned<DelayedReleaserThread> delayedReleaser;
  199. void createDelayedReleaser()
  200. {
  201. delayedReleaser.setown(new DelayedReleaserThread);
  202. }
  203. void stopDelayedReleaser()
  204. {
  205. if (delayedReleaser)
  206. delayedReleaser->stop();
  207. delayedReleaser.clear();
  208. }
  209. //-------------------------------------------------------------------------
  210. class CSimpleSuperFileArray : implements ISimpleSuperFileEnquiry, public CInterface
  211. {
  212. IArrayOf<IPropertyTree> subFiles;
  213. public:
  214. IMPLEMENT_IINTERFACE;
  215. CSimpleSuperFileArray(IPropertyTreeIterator &_subs)
  216. {
  217. ForEach(_subs)
  218. {
  219. IPropertyTree &sub = _subs.query();
  220. sub.Link();
  221. subFiles.append(sub);
  222. }
  223. }
  224. virtual unsigned numSubFiles() const
  225. {
  226. return subFiles.length();
  227. }
  228. virtual bool getSubFileName(unsigned num, StringBuffer &name) const
  229. {
  230. if (subFiles.isItem(num))
  231. {
  232. name.append(subFiles.item(num).queryProp("@value"));
  233. return true;
  234. }
  235. else
  236. return false;
  237. }
  238. virtual unsigned findSubName(const char *subname) const
  239. {
  240. ForEachItemIn(idx, subFiles)
  241. {
  242. if (strieq(subFiles.item(idx).queryProp("@value"), subname))
  243. return idx;
  244. }
  245. return NotFound;
  246. }
  247. virtual unsigned getContents(StringArray &contents) const
  248. {
  249. ForEachItemIn(idx, subFiles)
  250. {
  251. contents.append(subFiles.item(idx).queryProp("@value"));
  252. }
  253. return subFiles.length();
  254. }
  255. };
  256. //-------------------------------------------------------------------------------------------
  257. // class CRoxiePackage - provide the environment in which file names and query options are interpreted
  258. // by a roxie query.
  259. // File names are resolved into IResolvedFile objects. A cache is used to ensure that the IResolvedFile is
  260. // shared wherever possible.
  261. // Effective environment is precomputed in mergedEnvironment for efficient recall by queries
  262. // Packages are described using XML files - see documentation for details.
  263. //-------------------------------------------------------------------------------------------
  264. /**
  265. * Packages are hierarchical - they are searched recursively to get the info you want
  266. * A PackageMap defines the entire environment - potentially each query that uses that PackageMap will pick a different package within it
  267. * A particular instantiation of a roxie query (i.e. a IQueryFactory) will have a pointer to the specific IRoxiePackage within the active PackageMap
  268. * that is providing its environment.
  269. *
  270. * A PackageMap can also indicate the name of the QuerySet it applies to. If not specified, at will apply to all QuerySets on the Roxie.
  271. *
  272. * A PackageSet is a list of PackageMap id's, and is used to tell Roxie what PackageMaps to load.
  273. * A Roxie can have multiple PackageMap's active. When updating the data, you might:
  274. * - create a new PackageMap to refer to the new data
  275. * - once it has loaded, mark it active, and mark the previous one as inactive
  276. * - Once sure no queries in flight, unload the previous one
  277. *
  278. * Each Roxie will load all PackageMaps that are in any PackageSet whose @process attribute matches the cluster name.
  279. *
  280. * All package information is stored in Dali (and cached locally)
  281. *
  282. * <PackageSets>
  283. * <PackageSet id = 'ps1' process='*'> # use this packageset for all roxies (same as omitting process)
  284. * <PackageMap id='pm1b' querySet='qs1' active='true'/> # Use the PackageMap pm1b for QuerySet qs1 and make it active
  285. * <PackageMap id='pm1a' querySet='qs1' active='false'/> # Use the PackageMap pm1a for QuerySet qs1 but don't make it active
  286. * <PackageMap id='pm2' querySet='dev*' active='true'/> # Use the PackageMap pm1a for all QuerySets with names starting dev and make it active
  287. * </PackageMapSet>
  288. * </PackageSets>
  289. *
  290. * <PackageMaps>
  291. * <PackageMap id='pm1a'>
  292. * <Package id='package1'>
  293. * ...
  294. * </Package>
  295. * <Package id='package2'>
  296. * </Package>
  297. * </PackageMap>
  298. * <PackageMap id='pm2'>
  299. * </PackageMap>
  300. * <PackageMap id='pm3'>
  301. * </PackageMap>
  302. * </PackageMaps>
  303. */
  304. class CResolvedFileCache : implements IResolvedFileCache
  305. {
  306. CriticalSection cacheLock;
  307. CopyMapStringToMyClass<IResolvedFile> files;
  308. public:
  309. // Retrieve number of files in cache
  310. inline unsigned count() const
  311. {
  312. return files.count();
  313. }
  314. // Add a filename and the corresponding IResolvedFile to the cache
  315. virtual void addCache(const char *filename, const IResolvedFile *file)
  316. {
  317. CriticalBlock b(cacheLock);
  318. IResolvedFile *add = const_cast<IResolvedFile *>(file);
  319. add->setCache(this);
  320. files.setValue(filename, add);
  321. }
  322. // Remove an IResolvedFile from the cache
  323. virtual void removeCache(const IResolvedFile *file)
  324. {
  325. CriticalBlock b(cacheLock);
  326. if (traceLevel > 9)
  327. DBGLOG("removeCache %s", file->queryFileName());
  328. // NOTE: it's theoretically possible for the final release to happen after a replacement has been inserted into hash table.
  329. // So only remove from hash table if what we find there matches the item that is being deleted.
  330. IResolvedFile *goer = files.getValue(file->queryFileName());
  331. if (goer == file)
  332. files.remove(file->queryFileName());
  333. // You might want to remove files from the daliServer cache too, but it's not safe to do so here as there may be multiple package caches
  334. }
  335. // Lookup a filename in the cache
  336. virtual IResolvedFile *lookupCache(const char *filename)
  337. {
  338. CriticalBlock b(cacheLock);
  339. IResolvedFile *cache = files.getValue(filename);
  340. if (cache)
  341. {
  342. if (cache->isAliveAndLink())
  343. return cache;
  344. if (traceLevel)
  345. DBGLOG("Not returning %s from cache as isAlive() returned false", filename);
  346. }
  347. return NULL;
  348. }
  349. };
  350. // Note - we use a separate cache for the misses rather than any clever attempts to overload
  351. // the one cache with a "special" value, since (among other reasons) the misses are cleared
  352. // prior to a package reload, but the hits need not be (as the file will be locked as long as it
  353. // is in the cache)
  354. static CriticalSection daliMissesCrit;
  355. static Owned<KeptLowerCaseAtomTable> daliMisses;
  356. static void noteDaliMiss(const char *filename)
  357. {
  358. CriticalBlock b(daliMissesCrit);
  359. if (traceLevel > 9)
  360. DBGLOG("noteDaliMiss %s", filename);
  361. daliMisses->addAtom(filename);
  362. }
  363. static bool checkCachedDaliMiss(const char *filename)
  364. {
  365. CriticalBlock b(daliMissesCrit);
  366. bool ret = daliMisses->find(filename) != NULL;
  367. if (traceLevel > 9)
  368. DBGLOG("checkCachedDaliMiss %s returns %d", filename, ret);
  369. return ret;
  370. }
  371. static void clearDaliMisses()
  372. {
  373. CriticalBlock b(daliMissesCrit);
  374. if (traceLevel)
  375. DBGLOG("Clearing dali misses cache");
  376. daliMisses.setown(new KeptLowerCaseAtomTable);
  377. }
  378. class CRoxiePackageNode : extends CPackageNode, implements IRoxiePackage
  379. {
  380. protected:
  381. static CResolvedFileCache daliFiles;
  382. static CriticalSection daliLookupCrits[NUM_DALI_CRITS];
  383. mutable CResolvedFileCache fileCache;
  384. IArrayOf<IResolvedFile> files; // Used when preload set
  385. IArrayOf<IKeyArray> keyArrays; // Used when preload set
  386. IArrayOf<IFileIOArray> fileArrays; // Used when preload set
  387. virtual aindex_t getBaseCount() const = 0;
  388. virtual const CRoxiePackageNode *getBaseNode(aindex_t pos) const = 0;
  389. virtual RecordTranslationMode getSysFieldTranslationEnabled() const override { return fieldTranslationEnabled; } //roxie configured value
  390. // Use local package file only to resolve subfile into physical file info
  391. IResolvedFile *resolveLFNusingPackage(const char *fileName) const
  392. {
  393. if (node)
  394. {
  395. StringBuffer xpath;
  396. IPropertyTree *fileInfo = node->queryPropTree(xpath.appendf("File[@id='%s']", fileName).str());
  397. if (fileInfo)
  398. {
  399. Owned <IResolvedFileCreator> result = createResolvedFile(fileName, NULL, false);
  400. result->addSubFile(createFileDescriptorFromRoxieXML(fileInfo), NULL);
  401. return result.getClear();
  402. }
  403. }
  404. return NULL;
  405. }
  406. // Use dali to resolve subfile into physical file info
  407. static IResolvedFile *resolveLFNusingDaliOrLocal(const char *fileName, bool useCache, bool cacheResult, bool writeAccess, bool alwaysCreate, bool resolveLocal, bool isPrivilegedUser)
  408. {
  409. unsigned hash = hashc((const unsigned char *) fileName, strlen(fileName), 0x811C9DC5);
  410. CriticalBlock b(daliLookupCrits[hash % NUM_DALI_CRITS]);
  411. // MORE - look at alwaysCreate... This may be useful to implement earlier locking semantics.
  412. if (traceLevel > 9)
  413. DBGLOG("resolveLFNusingDaliOrLocal %s %d %d %d %d", fileName, useCache, cacheResult, writeAccess, alwaysCreate);
  414. IResolvedFile* result = NULL;
  415. if (useCache)
  416. {
  417. result = daliFiles.lookupCache(fileName);
  418. if (result)
  419. {
  420. if (traceLevel > 9)
  421. DBGLOG("resolveLFNusingDaliOrLocal %s - cache hit", fileName);
  422. return result;
  423. }
  424. }
  425. if (alwaysCreate || !useCache || !checkCachedDaliMiss(fileName))
  426. {
  427. Owned<IRoxieDaliHelper> daliHelper = connectToDali();
  428. if (daliHelper)
  429. {
  430. if (daliHelper->connected())
  431. {
  432. Owned<IDistributedFile> dFile = daliHelper->resolveLFN(fileName, cacheResult, writeAccess, isPrivilegedUser);
  433. if (dFile)
  434. result = createResolvedFile(fileName, NULL, dFile.getClear(), daliHelper, !useCache, cacheResult, writeAccess);
  435. }
  436. else if (!writeAccess) // If we need write access and expect a dali, but don't have one, we should probably fail
  437. {
  438. // we have no dali, we can't lock..
  439. Owned<IFileDescriptor> fd = daliHelper->resolveCachedLFN(fileName);
  440. if (fd)
  441. {
  442. Owned <IResolvedFileCreator> creator = createResolvedFile(fileName, NULL, false);
  443. Owned<IFileDescriptor> remoteFDesc = daliHelper->checkClonedFromRemote(fileName, fd, cacheResult, isPrivilegedUser);
  444. creator->addSubFile(fd.getClear(), remoteFDesc.getClear());
  445. result = creator.getClear();
  446. }
  447. }
  448. }
  449. if (!result && (resolveLocal || alwaysCreate))
  450. {
  451. StringBuffer useName;
  452. bool wasDFS = false;
  453. if (!resolveLocal || strstr(fileName,"::") != NULL)
  454. {
  455. makeSinglePhysicalPartName(fileName, useName, true, wasDFS);
  456. }
  457. else
  458. useName.append(fileName);
  459. bool exists = checkFileExists(useName);
  460. if (exists || alwaysCreate)
  461. {
  462. Owned <IResolvedFileCreator> creator = createResolvedFile(fileName, wasDFS ? NULL : useName.str(), false);
  463. if (exists)
  464. creator->addSubFile(useName);
  465. result = creator.getClear();
  466. }
  467. }
  468. }
  469. if (cacheResult)
  470. {
  471. if (traceLevel > 9)
  472. DBGLOG("resolveLFNusingDaliOrLocal %s - cache add %d", fileName, result != NULL);
  473. if (result)
  474. daliFiles.addCache(fileName, result);
  475. else
  476. noteDaliMiss(fileName);
  477. }
  478. return result;
  479. }
  480. // Use local package and its bases to resolve existing file into physical file info via all supported resolvers
  481. IResolvedFile *lookupExpandedFileName(const char *fileName, bool useCache, bool cacheResult, bool writeAccess, bool alwaysCreate, bool checkCompulsory, bool isPrivilegedUser) const
  482. {
  483. IResolvedFile *result = lookupFile(fileName, useCache, cacheResult, writeAccess, alwaysCreate, isPrivilegedUser);
  484. if (!result && (!checkCompulsory || !isCompulsory()))
  485. result = resolveLFNusingDaliOrLocal(fileName, useCache, cacheResult, writeAccess, alwaysCreate, resolveLocally(), isPrivilegedUser);
  486. return result;
  487. }
  488. IResolvedFile *lookupFile(const char *fileName, bool useCache, bool cacheResult, bool writeAccess, bool alwaysCreate, bool isPrivilegedUser) const
  489. {
  490. // Order of resolution:
  491. // 1. Files named in package
  492. // 2. Files named in bases
  493. IResolvedFile* result = useCache ? fileCache.lookupCache(fileName) : NULL;
  494. if (result)
  495. return result;
  496. Owned<const ISimpleSuperFileEnquiry> subFileInfo = resolveSuperFile(fileName);
  497. if (subFileInfo)
  498. {
  499. unsigned numSubFiles = subFileInfo->numSubFiles();
  500. // Note: do not try to optimize the common case of a single subfile
  501. // as we still want to report the superfile info from the resolvedFile
  502. Owned<IResolvedFileCreator> super;
  503. for (unsigned idx = 0; idx < numSubFiles; idx++)
  504. {
  505. StringBuffer subFileName;
  506. subFileInfo->getSubFileName(idx, subFileName);
  507. if (subFileName.length()) // Empty subfile names can come from package file - just ignore
  508. {
  509. if (subFileName.charAt(0)=='~')
  510. {
  511. // implies that a package file had ~ in subfile names - shouldn't really, but we allow it (and just strip the ~)
  512. subFileName.remove(0,1);
  513. }
  514. if (traceLevel > 9)
  515. DBGLOG("Looking up subfile %s", subFileName.str());
  516. Owned<const IResolvedFile> subFileInfo = lookupExpandedFileName(subFileName, useCache, cacheResult, false, false, false, isPrivilegedUser); // NOTE - overwriting a superfile does NOT require write access to subfiles
  517. if (subFileInfo)
  518. {
  519. if (!super)
  520. super.setown(createResolvedFile(fileName, NULL, true));
  521. super->addSubFile(subFileInfo);
  522. }
  523. }
  524. }
  525. if (super && cacheResult)
  526. fileCache.addCache(fileName, super);
  527. return super.getClear();
  528. }
  529. result = resolveLFNusingPackage(fileName);
  530. if (result)
  531. {
  532. if (cacheResult)
  533. fileCache.addCache(fileName, result);
  534. return result;
  535. }
  536. aindex_t count = getBaseCount();
  537. for (aindex_t i = 0; i < count; i++)
  538. {
  539. const CRoxiePackageNode *basePackage = getBaseNode(i);
  540. if (!basePackage)
  541. continue;
  542. IResolvedFile *result = basePackage->lookupFile(fileName, useCache, cacheResult, writeAccess, alwaysCreate, isPrivilegedUser);
  543. if (result)
  544. return result;
  545. }
  546. return NULL;
  547. }
  548. void doPreload(unsigned channel, const IResolvedFile *resolved)
  549. {
  550. if (resolved->isKey())
  551. keyArrays.append(*resolved->getKeyArray(false, channel));
  552. else
  553. fileArrays.append(*resolved->getIFileIOArray(false, channel));
  554. }
  555. void checkPreload()
  556. {
  557. if (isPreload())
  558. {
  559. // Look through all files and resolve them now
  560. Owned<IPropertyTreeIterator> supers = node->getElements("SuperFile");
  561. const bool isCodeSigned = isActivityCodeSigned(*node);
  562. ForEach(*supers)
  563. {
  564. IPropertyTree &super = supers->query();
  565. const char *name = super.queryProp("@id");
  566. if (name)
  567. {
  568. try
  569. {
  570. const IResolvedFile *resolved = lookupFileName(name, false, true, true, NULL, true, isCodeSigned);
  571. if (resolved)
  572. {
  573. files.append(*const_cast<IResolvedFile *>(resolved));
  574. Owned<const ITopologyServer> topology = getTopology();
  575. for (unsigned channel : topology->queryChannels())
  576. {
  577. assertex(channel);
  578. doPreload(channel, resolved);
  579. }
  580. }
  581. }
  582. catch (IException *E)
  583. {
  584. VStringBuffer msg("Failed to preload file %s for package node %s", name, queryId());
  585. EXCLOG(E, msg.str());
  586. E->Release();
  587. }
  588. }
  589. }
  590. }
  591. }
  592. // default constructor for derived class use
  593. CRoxiePackageNode()
  594. {
  595. }
  596. public:
  597. IMPLEMENT_IINTERFACE;
  598. CRoxiePackageNode(IPropertyTree *p) : CPackageNode(p)
  599. {
  600. }
  601. ~CRoxiePackageNode()
  602. {
  603. keyArrays.kill();
  604. fileArrays.kill();
  605. files.kill();
  606. assertex(fileCache.count()==0);
  607. // If it's possible for cached objects to outlive the cache I think there is a problem...
  608. // we could set the cache field to null here for any objects still in cache but there would be a race condition
  609. }
  610. virtual void setHash(hash64_t newhash)
  611. {
  612. hash = newhash;
  613. }
  614. virtual IPropertyTreeIterator *getInMemoryIndexInfo(const IPropertyTree &graphNode) const
  615. {
  616. StringBuffer xpath;
  617. xpath.append("SuperFile[@id='").append(queryNodeFileName(graphNode, getActivityKind(graphNode))).append("']");
  618. return lookupElements(xpath.str(), "MemIndex");
  619. }
  620. virtual const IResolvedFile *lookupFileName(const char *_fileName, bool opt, bool useCache, bool cacheResult, IConstWorkUnit *wu, bool ignoreForeignPrefix, bool isPrivilegedUser) const
  621. {
  622. StringBuffer fileName;
  623. expandLogicalFilename(fileName, _fileName, wu, false, ignoreForeignPrefix);
  624. if (traceLevel > 5)
  625. DBGLOG("lookupFileName %s", fileName.str());
  626. const IResolvedFile *result = lookupExpandedFileName(fileName, useCache, cacheResult, false, false, true, isPrivilegedUser);
  627. if (!result)
  628. {
  629. StringBuffer compulsoryMsg;
  630. if (isCompulsory())
  631. compulsoryMsg.append(" (Package is compulsory)");
  632. if (!opt && !pretendAllOpt)
  633. throw MakeStringException(ROXIE_FILE_ERROR, "Could not resolve filename %s%s", fileName.str(), compulsoryMsg.str());
  634. if (traceLevel > 4)
  635. DBGLOG("Could not resolve OPT filename %s%s", fileName.str(), compulsoryMsg.str());
  636. }
  637. return result;
  638. }
  639. virtual IRoxieWriteHandler *createFileName(const char *_fileName, bool overwrite, bool extend, const StringArray &clusters, IConstWorkUnit *wu, bool isPrivilegedUser) const
  640. {
  641. StringBuffer fileName;
  642. expandLogicalFilename(fileName, _fileName, wu, false, false);
  643. Owned<IResolvedFile> resolved = lookupFile(fileName, false, false, true, true, isPrivilegedUser);
  644. if (!resolved)
  645. resolved.setown(resolveLFNusingDaliOrLocal(fileName, false, false, true, true, resolveLocally(), isPrivilegedUser));
  646. if (resolved)
  647. {
  648. if (resolved->exists())
  649. {
  650. if (!overwrite)
  651. throw MakeStringException(99, "Cannot write %s, file already exists (missing OVERWRITE attribute?)", resolved->queryFileName());
  652. if (extend)
  653. UNIMPLEMENTED; // How does extend fit in with the clusterwritemanager stuff? They can't specify cluster and extend together...
  654. resolved->setCache(NULL);
  655. resolved->remove();
  656. }
  657. if (resolved->queryPhysicalName())
  658. fileName.clear().append(resolved->queryPhysicalName()); // if it turned out to be a local file
  659. resolved.clear();
  660. }
  661. else
  662. throw MakeStringException(ROXIE_FILE_ERROR, "Cannot write %s", fileName.str());
  663. // filename by now may be a local filename, or a dali one
  664. Owned<IRoxieDaliHelper> daliHelper = connectToDali();
  665. bool onlyLocal = fileNameServiceDali.isEmpty();
  666. bool onlyDFS = !resolveLocally() && !onlyLocal;
  667. IUserDescriptor *user = NULL;
  668. if (wu)
  669. user = wu->queryUserDescriptor();//ad-hoc mode
  670. else if (daliHelper)
  671. user = daliHelper->queryUserDescriptor();//predeployed query mode
  672. Owned<ILocalOrDistributedFile> ldFile = createLocalOrDistributedFile(fileName, user, onlyLocal, onlyDFS, true, isPrivilegedUser, &clusters);
  673. if (!ldFile)
  674. throw MakeStringException(ROXIE_FILE_ERROR, "Cannot write %s", fileName.str());
  675. return createRoxieWriteHandler(daliHelper, ldFile.getClear(), clusters);
  676. }
  677. //map ambiguous IHpccPackage
  678. virtual ISimpleSuperFileEnquiry *resolveSuperFile(const char *superFileName) const
  679. {
  680. return CPackageNode::resolveSuperFile(superFileName);
  681. }
  682. virtual const char *queryEnv(const char *varname) const
  683. {
  684. return CPackageNode::queryEnv(varname);
  685. }
  686. virtual RecordTranslationMode getEnableFieldTranslation() const override
  687. {
  688. return CPackageNode::getEnableFieldTranslation();
  689. }
  690. virtual bool isCompulsory() const
  691. {
  692. return CPackageNode::isCompulsory();
  693. }
  694. virtual bool isPreload() const
  695. {
  696. return CPackageNode::isPreload();
  697. }
  698. virtual const IPropertyTree *queryTree() const
  699. {
  700. return CPackageNode::queryTree();
  701. }
  702. virtual hash64_t queryHash() const
  703. {
  704. return CPackageNode::queryHash();
  705. }
  706. virtual const char *queryId() const
  707. {
  708. return CPackageNode::queryId();
  709. }
  710. virtual bool resolveLocally() const
  711. {
  712. return CPackageNode::resolveLocally();
  713. }
  714. };
  715. CResolvedFileCache CRoxiePackageNode::daliFiles;
  716. CriticalSection CRoxiePackageNode::daliLookupCrits[NUM_DALI_CRITS];
  717. typedef CResolvedPackage<CRoxiePackageNode> CRoxiePackage;
  718. IRoxiePackage *createRoxiePackage(IPropertyTree *p, IRoxiePackageMap *packages)
  719. {
  720. Owned<CRoxiePackage> pkg = new CRoxiePackage(p);
  721. pkg->resolveBases(packages);
  722. return pkg.getClear();
  723. }
  724. //================================================================================================
  725. // CPackageMap - an implementation of IPackageMap using a string map
  726. //================================================================================================
  727. class CRoxiePackageMap : public CPackageMapOf<CRoxiePackageNode, IRoxiePackage>, implements IRoxiePackageMap
  728. {
  729. public:
  730. IMPLEMENT_IINTERFACE;
  731. typedef CPackageMapOf<CRoxiePackageNode, IRoxiePackage> BASE;
  732. CRoxiePackageMap(const char *_packageId, const char *_querySet, bool _active)
  733. : BASE(_packageId, _querySet, _active)
  734. {
  735. }
  736. //map ambiguous IHpccPackageMap interface
  737. virtual const IHpccPackage *queryPackage(const char *name) const
  738. {
  739. return BASE::queryPackage(name);
  740. }
  741. virtual const IHpccPackage *matchPackage(const char *name) const
  742. {
  743. return BASE::matchPackage(name);
  744. }
  745. virtual const char *queryPackageId() const
  746. {
  747. return BASE::queryPackageId();
  748. }
  749. virtual bool isActive() const
  750. {
  751. return BASE::isActive();
  752. }
  753. virtual const StringArray &getPartIds() const
  754. {
  755. return BASE::getPartIds();
  756. }
  757. virtual bool validate(const StringArray &queryids, const StringArray &queriesToIgnore, StringArray &wrn, StringArray &err, StringArray &unmatchedQueries, StringArray &unusedPackages, StringArray &unmatchedFiles, bool ignoreOptionalFiles) const
  758. {
  759. return BASE::validate(queryids, queriesToIgnore, wrn, err, unmatchedQueries, unusedPackages, unmatchedFiles, ignoreOptionalFiles);
  760. }
  761. virtual void gatherFileMappingForQuery(const char *queryname, IPropertyTree *fileInfo) const
  762. {
  763. BASE::gatherFileMappingForQuery(queryname, fileInfo);
  764. }
  765. virtual const IRoxiePackage *queryRoxiePackage(const char *name) const
  766. {
  767. return queryResolvedPackage(name);
  768. }
  769. virtual const IRoxiePackage *matchRoxiePackage(const char *name) const
  770. {
  771. return matchResolvedPackage(name);
  772. }
  773. };
  774. static CRoxiePackageMap *emptyPackageMap;
  775. static CRoxiePackage *rootPackage;
  776. static SpinLock emptyPackageMapCrit;
  777. static IRoxieDebugSessionManager *debugSessionManager;
  778. extern const IRoxiePackage &queryRootRoxiePackage()
  779. {
  780. SpinBlock b(emptyPackageMapCrit);
  781. if (!rootPackage)
  782. {
  783. // Set up the root package. This contains global settings from topology file
  784. rootPackage = new CRoxiePackage(topology); // attributes become control: environment settings. Rest of topology ignored.
  785. rootPackage->setHash(0); // we don't include the topology in the package hashes...
  786. rootPackage->resolveBases(NULL);
  787. }
  788. return *rootPackage;
  789. }
  790. extern const IRoxiePackageMap &queryEmptyRoxiePackageMap()
  791. {
  792. SpinBlock b(emptyPackageMapCrit);
  793. if (!emptyPackageMap)
  794. emptyPackageMap = new CRoxiePackageMap("<none>", NULL, true);
  795. return *emptyPackageMap;
  796. }
  797. MODULE_INIT(INIT_PRIORITY_STANDARD)
  798. {
  799. emptyPackageMap = NULL;
  800. debugSessionManager = NULL;
  801. return true;
  802. }
  803. MODULE_EXIT()
  804. {
  805. ::Release(emptyPackageMap); // You can't use static Owned to release anything that may own a IPropertyTree
  806. ::Release(rootPackage);
  807. ::Release(debugSessionManager);
  808. }
  809. // IRoxieQuerySetManager
  810. // - CRoxieQuerySetManager -
  811. // - CRoxieServerQuerySetManager
  812. // - CRoxieAgentQuerySetManager
  813. //
  814. // Manages a set of instantiated queries and allows us to look them up by queryname or alias
  815. //
  816. // IRoxieQuerySetManagerSet
  817. // - CRoxieAgentQuerySetManagerSet
  818. //
  819. // Manages the IRoxieQuerySetManager for multiple channels
  820. //
  821. // CRoxieQueryPackageManager
  822. // - CRoxieDaliQueryPackageManager
  823. // - CStandaloneQueryPackageManager
  824. //
  825. // Groups a server resource manager and a set of agent resource managers (one per channel) together.
  826. // There is one per PackageMap
  827. //
  828. // CQueryPackageSetManager at outer level
  829. // There will be exactly one of these. It will reload the CQueryPackageManager's if dali Package info changes
  830. //================================================================================================
  831. // CRoxieQuerySetManager - shared base class for agent and server query set manager classes
  832. // Manages a set of instantiated queries and allows us to look them up by queryname or alias,
  833. // as well as controlling their lifespan
  834. //================================================================================================
  835. class CRoxieQuerySetManager : public CInterface, implements IRoxieQuerySetManager
  836. {
  837. protected:
  838. MapStringToMyClass<IQueryFactory> queries;
  839. MapStringToMyClass<IQueryFactory> aliases; // Do we gain anything by having two tables?
  840. unsigned channelNo;
  841. bool active;
  842. StringAttr querySetName;
  843. CriticalSection crit; // For parallel load
  844. void addQuery(const char *id, IQueryFactory *n)
  845. {
  846. {
  847. CriticalBlock b(crit);
  848. queries.setValue(id, n);
  849. }
  850. n->Release(); // setValue links
  851. }
  852. void addAlias(const char *alias, const char *original)
  853. {
  854. if (original && alias)
  855. {
  856. IQueryFactory *orig = queries.getValue(original);
  857. if (orig)
  858. {
  859. CriticalBlock b(crit);
  860. aliases.setValue(alias, orig);
  861. }
  862. else
  863. throw MakeStringException(ROXIE_UNKNOWN_QUERY, "Unknown query %s", original);
  864. }
  865. else
  866. throw MakeStringException(ROXIE_INTERNAL_ERROR, "Invalid parameters to addAlias");
  867. }
  868. virtual IQueryFactory *loadQueryFromDll(const char *id, const IQueryDll *dll, const IRoxiePackage &package, const IPropertyTree *stateInfo, bool forceRetry) = 0;
  869. public:
  870. IMPLEMENT_IINTERFACE;
  871. CRoxieQuerySetManager(unsigned _channelNo, const char *_querySetName)
  872. : queries(true), aliases(true), active(false), querySetName(_querySetName)
  873. {
  874. channelNo = _channelNo;
  875. }
  876. virtual const char *queryId() const
  877. {
  878. return querySetName;
  879. }
  880. virtual bool isActive() const
  881. {
  882. return active;
  883. }
  884. virtual void load(const IPropertyTree *querySet, const IRoxiePackageMap &packages, hash64_t &hash, bool forceRetry)
  885. {
  886. unsigned numQueries = const_cast<IPropertyTree *>(querySet)->getCount("Query");
  887. if (numQueries)
  888. {
  889. std::vector<hash64_t> queryHashes(numQueries);
  890. asyncFor(numQueries, parallelQueryLoadThreads, [this, querySet, &packages, &queryHashes, forceRetry](unsigned i)
  891. {
  892. queryHashes[i] = 0;
  893. VStringBuffer xpath("Query[%u]", i+1);
  894. const IPropertyTree *query = querySet->queryPropTree(xpath);
  895. assertex(query);
  896. const char *id = query->queryProp("@id");
  897. const char *dllName = query->queryProp("@dll");
  898. try
  899. {
  900. if (!id || !*id || !dllName || !*dllName)
  901. throw MakeStringException(ROXIE_QUERY_MODIFICATION, "dll and id must be specified");
  902. Owned<const IQueryDll> queryDll = createQueryDll(dllName);
  903. const IHpccPackage *package = NULL;
  904. const char *packageName = query->queryProp("@package");
  905. if (packageName && *packageName)
  906. {
  907. package = packages.queryPackage(packageName); // if a package is specified, require exact match
  908. if (!package)
  909. throw MakeStringException(ROXIE_QUERY_MODIFICATION, "Package %s specified by query %s not found", packageName, id);
  910. }
  911. else
  912. {
  913. package = packages.queryPackage(id); // Look for an exact match, then a fuzzy match, using query name as the package id
  914. if(!package) package = packages.matchPackage(id);
  915. if (!package) package = &queryRootRoxiePackage();
  916. }
  917. assertex(package && QUERYINTERFACE(package, const IRoxiePackage));
  918. IQueryFactory *qf = loadQueryFromDll(id, queryDll.getClear(), *QUERYINTERFACE(package, const IRoxiePackage), query, forceRetry);
  919. queryHashes[i] = qf->queryHash();
  920. addQuery(id, qf);
  921. }
  922. catch (IException *E)
  923. {
  924. // we don't want a single bad query in the set to stop us loading all the others
  925. StringBuffer msg;
  926. msg.appendf("Failed to load query %s from %s", id ? id : "(null)", dllName ? dllName : "(null)");
  927. EXCLOG(E, msg.str());
  928. if (id)
  929. {
  930. StringBuffer emsg;
  931. E->errorMessage(emsg);
  932. Owned<IPropertyTree> stateInfo = createPTree();
  933. stateInfo->setPropBool("@suspended", true);
  934. stateInfo->setProp("@loadFailedReason", emsg);
  935. Owned<IQueryFactory> dummyQuery = loadQueryFromDll(id, NULL, queryRootRoxiePackage(), stateInfo, false);
  936. queryHashes[i] = dummyQuery->queryHash();
  937. addQuery(id, dummyQuery.getClear());
  938. }
  939. E->Release();
  940. }
  941. });
  942. for (auto h : queryHashes)
  943. hash = rtlHash64Data(sizeof(h), &h, hash);
  944. }
  945. unsigned numAliases = const_cast<IPropertyTree *>(querySet)->getCount("Alias");
  946. if (numAliases)
  947. {
  948. std::vector<hash64_t> aliasHashes(numAliases);
  949. asyncFor(numAliases, parallelQueryLoadThreads, [this, querySet, &aliasHashes](unsigned i)
  950. {
  951. aliasHashes[i] = 0;
  952. VStringBuffer xpath("Alias[%u]", i+1);
  953. IPropertyTree *item = querySet->queryPropTree(xpath);
  954. assertex(item);
  955. const char *alias = item->queryProp("@name");
  956. const char *original = item->queryProp("@id");
  957. try
  958. {
  959. addAlias(alias, original);
  960. hash64_t hash = rtlHash64VStr(alias, 0);
  961. aliasHashes[i] = rtlHash64VStr(original, hash);
  962. }
  963. catch (IException *E)
  964. {
  965. // we don't want a single bad alias in the set to stop us loading all the others
  966. VStringBuffer msg("Failed to set alias %s on %s", alias, original);
  967. EXCLOG(E, msg.str());
  968. E->Release();
  969. aliasHashes[i] = 0;
  970. }
  971. });
  972. for (auto h : aliasHashes)
  973. hash = rtlHash64Data(sizeof(h), &h, hash);
  974. }
  975. active = packages.isActive();
  976. if (active)
  977. hash = rtlHash64VStr("active", hash);
  978. }
  979. virtual void getStats(const char *queryName, const char *graphName, IConstWorkUnit *statsWu, unsigned channel, bool reset, const IRoxieContextLogger &logctx) const override
  980. {
  981. Owned<IQueryFactory> f = getQuery(queryName, NULL, logctx);
  982. if (f)
  983. {
  984. f->gatherStats(statsWu, channel, reset);
  985. }
  986. else
  987. throw MakeStringException(ROXIE_UNKNOWN_QUERY, "Unknown query %s", queryName);
  988. }
  989. virtual void resetQueryTimings(const char *queryName, const IRoxieContextLogger &logctx)
  990. {
  991. Owned<IQueryFactory> f = getQuery(queryName, NULL, logctx);
  992. if (f)
  993. f->resetQueryTimings();
  994. else
  995. throw MakeStringException(ROXIE_UNKNOWN_QUERY, "Unknown query %s", queryName);
  996. }
  997. virtual void resetAllQueryTimings()
  998. {
  999. HashIterator elems(queries);
  1000. for (elems.first(); elems.isValid(); elems.next())
  1001. {
  1002. IMapping &cur = elems.query();
  1003. queries.mapToValue(&cur)->resetQueryTimings();
  1004. }
  1005. }
  1006. virtual void getActivityMetrics(StringBuffer &reply) const
  1007. {
  1008. HashIterator elems(queries);
  1009. for (elems.first(); elems.isValid(); elems.next())
  1010. {
  1011. IMapping &cur = elems.query();
  1012. queries.mapToValue(&cur)->getActivityMetrics(reply);
  1013. }
  1014. }
  1015. virtual void getAllQueryInfo(StringBuffer &reply, bool full, const IRoxieQuerySetManagerSet *agents, const IRoxieContextLogger &logctx) const
  1016. {
  1017. HashIterator elems(queries);
  1018. for (elems.first(); elems.isValid(); elems.next())
  1019. {
  1020. IMapping &cur = elems.query();
  1021. IQueryFactory *query = queries.mapToValue(&cur);
  1022. IArrayOf<IQueryFactory> agentQueries;
  1023. agents->getQueries(query->queryQueryName(), agentQueries, logctx);
  1024. query->getQueryInfo(reply, full, &agentQueries, logctx);
  1025. }
  1026. HashIterator aliasIterator(aliases);
  1027. for (aliasIterator.first(); aliasIterator.isValid(); aliasIterator.next())
  1028. {
  1029. IMapping &cur = aliasIterator.query();
  1030. reply.appendf(" <Alias id='%s' query='%s'/>\n", (const char *) cur.getKey(), aliases.mapToValue(&cur)->queryQueryName());
  1031. }
  1032. }
  1033. virtual IQueryFactory *getQuery(const char *id, StringBuffer *querySet, const IRoxieContextLogger &logctx) const
  1034. {
  1035. if (querySet && querySet->length() && !streq(querySet->str(), querySetName))
  1036. return NULL;
  1037. IQueryFactory *ret;
  1038. ret = aliases.getValue(id);
  1039. if (ret && logctx.queryTraceLevel() > 5)
  1040. logctx.CTXLOG("Found query alias %s => %s", id, ret->queryQueryName());
  1041. if (!ret)
  1042. ret = queries.getValue(id);
  1043. if (ret && querySet)
  1044. querySet->set(querySetName);
  1045. return LINK(ret);
  1046. }
  1047. };
  1048. //===============================================================================================================
  1049. class CRoxieServerQuerySetManager : public CRoxieQuerySetManager
  1050. {
  1051. public:
  1052. CRoxieServerQuerySetManager(const char *_querySetName)
  1053. : CRoxieQuerySetManager(0, _querySetName)
  1054. {
  1055. }
  1056. virtual IQueryFactory * loadQueryFromDll(const char *id, const IQueryDll *dll, const IRoxiePackage &package, const IPropertyTree *stateInfo, bool forceRetry)
  1057. {
  1058. return createServerQueryFactory(id, dll, package, stateInfo, false, forceRetry);
  1059. }
  1060. };
  1061. extern IRoxieQuerySetManager *createServerManager(const char *querySet)
  1062. {
  1063. return new CRoxieServerQuerySetManager(querySet);
  1064. }
  1065. //===============================================================================================================
  1066. class CRoxieAgentQuerySetManager : public CRoxieQuerySetManager
  1067. {
  1068. public:
  1069. CRoxieAgentQuerySetManager(unsigned _channelNo, const char *_querySetName)
  1070. : CRoxieQuerySetManager(_channelNo, _querySetName)
  1071. {
  1072. channelNo = _channelNo;
  1073. }
  1074. virtual IQueryFactory *loadQueryFromDll(const char *id, const IQueryDll *dll, const IRoxiePackage &package, const IPropertyTree *stateInfo, bool forceRetry)
  1075. {
  1076. return createAgentQueryFactory(id, dll, package, channelNo, stateInfo, false, forceRetry);
  1077. }
  1078. };
  1079. class CRoxieAgentQuerySetManagerSet : public CInterface, implements IRoxieQuerySetManagerSet
  1080. {
  1081. public:
  1082. IMPLEMENT_IINTERFACE;
  1083. CRoxieAgentQuerySetManagerSet(unsigned _numChannels, const char *querySetName)
  1084. : numChannels(_numChannels)
  1085. {
  1086. managers = new CRoxieAgentQuerySetManager *[numChannels];
  1087. memset(managers, 0, sizeof(CRoxieAgentQuerySetManager *) * numChannels);
  1088. Owned<const ITopologyServer> topology = getTopology();
  1089. for (unsigned channelNo : topology->queryChannels())
  1090. {
  1091. assertex(channelNo>0 && channelNo<=numChannels);
  1092. if (managers[channelNo-1] == NULL)
  1093. managers[channelNo-1] = new CRoxieAgentQuerySetManager(channelNo, querySetName);
  1094. else
  1095. throw MakeStringException(ROXIE_INVALID_TOPOLOGY, "Invalid topology file - channel %d repeated for this agent", channelNo);
  1096. }
  1097. }
  1098. ~CRoxieAgentQuerySetManagerSet()
  1099. {
  1100. for (unsigned channel = 0; channel < numChannels; channel++)
  1101. ::Release(managers[channel]);
  1102. delete [] managers;
  1103. }
  1104. inline CRoxieAgentQuerySetManager *item(int idx)
  1105. {
  1106. return managers[idx];
  1107. }
  1108. virtual void load(const IPropertyTree *querySets, const IRoxiePackageMap &packages, hash64_t &hash, bool forceRetry)
  1109. {
  1110. for (unsigned channel = 0; channel < numChannels; channel++)
  1111. if (managers[channel])
  1112. managers[channel]->load(querySets, packages, hash, forceRetry); // MORE - this means the hash depends on the number of channels. Is that desirable?
  1113. }
  1114. virtual void getQueries(const char *id, IArrayOf<IQueryFactory> &queries, const IRoxieContextLogger &logctx) const
  1115. {
  1116. for (unsigned channel = 0; channel < numChannels; channel++)
  1117. if (managers[channel])
  1118. {
  1119. IQueryFactory *query = managers[channel]->getQuery(id, NULL, logctx);
  1120. if (query)
  1121. queries.append(*query);
  1122. }
  1123. }
  1124. private:
  1125. unsigned numChannels;
  1126. CRoxieAgentQuerySetManager **managers;
  1127. };
  1128. //===============================================================================================================
  1129. class CRoxieDebugSessionManager : implements IRoxieDebugSessionManager, public CInterface
  1130. {
  1131. protected:
  1132. ReadWriteLock debugLock;
  1133. MapStringToMyClass<IDebuggerContext> debuggerContexts;
  1134. public:
  1135. IMPLEMENT_IINTERFACE;
  1136. void getActiveQueries(StringBuffer &reply)
  1137. {
  1138. HashIterator q(debuggerContexts);
  1139. for (q.first(); q.isValid(); q.next())
  1140. {
  1141. IDebuggerContext *ctx = debuggerContexts.mapToValue(&q.query());
  1142. reply.appendf(" <Query id='%s' uid='%s' debug='1'/>\n", ctx->queryQueryName(), ctx->queryDebugId());
  1143. }
  1144. }
  1145. virtual void registerDebugId(const char *id, IDebuggerContext *ctx)
  1146. {
  1147. WriteLockBlock block(debugLock);
  1148. debuggerContexts.setValue(id, ctx);
  1149. }
  1150. virtual void deregisterDebugId(const char *id)
  1151. {
  1152. WriteLockBlock block(debugLock);
  1153. debuggerContexts.remove(id);
  1154. }
  1155. virtual IDebuggerContext *lookupDebuggerContext(const char *id)
  1156. {
  1157. ReadLockBlock block(debugLock);
  1158. IDebuggerContext *ctx = debuggerContexts.getValue(id);
  1159. if (ctx)
  1160. return LINK(ctx);
  1161. else
  1162. {
  1163. #ifdef _DEBUG
  1164. // In a debug environment, it is convenient to be able to use '*' to mean 'the only active debug session'...
  1165. if (strcmp(id, "*")==0 && debuggerContexts.count()==1)
  1166. {
  1167. HashIterator q(debuggerContexts);
  1168. for (q.first(); q.isValid(); q.next())
  1169. {
  1170. IDebuggerContext *ctx = debuggerContexts.mapToValue(&q.query());
  1171. return LINK(ctx);
  1172. }
  1173. }
  1174. #endif
  1175. throw MakeStringException(ROXIE_INTERNAL_ERROR, "Debug context %s not found", id);
  1176. }
  1177. }
  1178. };
  1179. //===============================================================================================
  1180. /*----------------------------------------------------------------------------------------------
  1181. * A CRoxieQueryPackageManager object manages all the queries that are currently runnable via XML.
  1182. * There may be more than one in existence, but only one will be active and therefore used to
  1183. * look up queries that are received - this corresponds to the currently active package.
  1184. *-----------------------------------------------------------------------------------------------*/
  1185. static hash64_t hashXML(const IPropertyTree *tree)
  1186. {
  1187. StringBuffer xml;
  1188. toXML(tree, xml, 0, XML_SortTags);
  1189. return rtlHash64Data(xml.length(), xml.str(), 877029);
  1190. }
  1191. class CRoxieQueryPackageManagerBase : public CInterface
  1192. {
  1193. public:
  1194. virtual hash64_t getHash()
  1195. {
  1196. CriticalBlock b2(updateCrit);
  1197. return queryHash;
  1198. }
  1199. IRoxieQuerySetManager* getRoxieServerManager() const
  1200. {
  1201. CriticalBlock b2(updateCrit);
  1202. return serverManager.getLink();
  1203. }
  1204. IRoxieQuerySetManagerSet* getRoxieAgentManagers() const
  1205. {
  1206. CriticalBlock b2(updateCrit);
  1207. return agentManagers.getLink();
  1208. }
  1209. protected:
  1210. ~CRoxieQueryPackageManagerBase()
  1211. {
  1212. if (agentQueryReleaseDelaySeconds)
  1213. delayedReleaser->delayedRelease(agentManagers.getClear(), agentQueryReleaseDelaySeconds);
  1214. }
  1215. // Derived classes wanting to read serverManager or agentManagers must call this function to safely obtain their current values
  1216. void getQueryManagers(Owned<IRoxieQuerySetManager> &_serverManager, Owned<CRoxieAgentQuerySetManagerSet> &_agentManagers) const
  1217. {
  1218. CriticalBlock b2(updateCrit);
  1219. _serverManager.set(serverManager);
  1220. _agentManagers.set(agentManagers);
  1221. }
  1222. void reloadQueryManagers(CRoxieAgentQuerySetManagerSet *newAgentManagers, IRoxieQuerySetManager *newServerManager, hash64_t newHash)
  1223. {
  1224. Owned<CRoxieAgentQuerySetManagerSet> oldAgentManagers;
  1225. Owned<IRoxieQuerySetManager> oldServerManager;
  1226. {
  1227. // Atomically, replace the existing query managers with the new ones
  1228. CriticalBlock b2(updateCrit);
  1229. oldAgentManagers.setown(agentManagers.getClear()); // so that the release happens outside the critblock
  1230. oldServerManager.setown(serverManager.getClear()); // so that the release happens outside the critblock
  1231. agentManagers.setown(newAgentManagers);
  1232. serverManager.setown(newServerManager);
  1233. queryHash = newHash;
  1234. }
  1235. if (agentQueryReleaseDelaySeconds)
  1236. delayedReleaser->delayedRelease(oldAgentManagers.getClear(), agentQueryReleaseDelaySeconds);
  1237. }
  1238. private:
  1239. mutable CriticalSection updateCrit; // protects updates of agentManagers and serverManager, and queryHash. Must be held ONLY to link, release, or overwrite these values.
  1240. Owned<CRoxieAgentQuerySetManagerSet> agentManagers;
  1241. Owned<IRoxieQuerySetManager> serverManager;
  1242. hash64_t queryHash = 0;
  1243. };
  1244. class CRoxieQueryPackageManager : public CRoxieQueryPackageManagerBase
  1245. {
  1246. public:
  1247. CRoxieQueryPackageManager(unsigned _numChannels, const char *_querySet, const IRoxiePackageMap *_packages, hash64_t _xmlHash)
  1248. : packages(_packages), numChannels(_numChannels), xmlHash(_xmlHash), querySet(_querySet)
  1249. {
  1250. }
  1251. ~CRoxieQueryPackageManager()
  1252. {
  1253. }
  1254. inline const char *queryPackageId() const
  1255. {
  1256. return packages->queryPackageId();
  1257. }
  1258. virtual void reload()
  1259. {
  1260. // Default is to do nothing...
  1261. }
  1262. virtual void load(bool forceReload) = 0;
  1263. bool matches(hash64_t _xmlHash, bool _active) const
  1264. {
  1265. return _xmlHash == xmlHash && _active==packages->isActive();
  1266. }
  1267. void getInfo(StringBuffer &reply, const IRoxieContextLogger &logctx) const
  1268. {
  1269. reply.appendf(" <PackageSet id=\"%s\" querySet=\"%s\"", queryPackageId(), querySet.get());
  1270. if (!packages || !packages->getPartIds().ordinality())
  1271. {
  1272. reply.append("/>\n");
  1273. return;
  1274. }
  1275. reply.append(">\n");
  1276. const StringArray &parts = packages->getPartIds();
  1277. ForEachItemIn(i, parts)
  1278. reply.appendf(" <Part id='%s'/>\n", parts.item(i));
  1279. reply.append(" </PackageSet>\n");
  1280. }
  1281. bool resetStats(const char *queryId, const IRoxieContextLogger &logctx)
  1282. {
  1283. Owned<IRoxieQuerySetManager> serverManager;
  1284. Owned<CRoxieAgentQuerySetManagerSet> agentManagers;
  1285. getQueryManagers(serverManager, agentManagers);
  1286. if (queryId)
  1287. {
  1288. Owned<IQueryFactory> query = serverManager->getQuery(queryId, NULL, logctx);
  1289. if (!query)
  1290. return false;
  1291. const char *id = query->queryQueryName();
  1292. serverManager->resetQueryTimings(id, logctx);
  1293. for (unsigned channel = 0; channel < numChannels; channel++)
  1294. if (agentManagers->item(channel))
  1295. {
  1296. agentManagers->item(channel)->resetQueryTimings(id, logctx);
  1297. }
  1298. }
  1299. else
  1300. {
  1301. serverManager->resetAllQueryTimings();
  1302. for (unsigned channel = 0; channel < numChannels; channel++)
  1303. if (agentManagers->item(channel))
  1304. agentManagers->item(channel)->resetAllQueryTimings();
  1305. }
  1306. return true;
  1307. }
  1308. bool getStats(const char *queryId, const char *graphName, StringBuffer &reply, const char *wuid, const IRoxieContextLogger &logctx) const
  1309. {
  1310. Owned<IRoxieQuerySetManager> serverManager;
  1311. Owned<CRoxieAgentQuerySetManagerSet> agentManagers;
  1312. getQueryManagers(serverManager, agentManagers);
  1313. if (serverManager->isActive())
  1314. {
  1315. Owned<IQueryFactory> query = serverManager->getQuery(queryId, NULL, logctx);
  1316. if (query)
  1317. {
  1318. bool reset = false; // MORE - tidy up around here.
  1319. Owned<IConstWorkUnit> statsWu;
  1320. if (wuid)
  1321. {
  1322. Owned<IRoxieDaliHelper> daliHelper = ::connectToDali();
  1323. if (!daliHelper->connected())
  1324. throw makeStringException(ROXIE_CONTROL_MSG_ERROR, "Can't create stats WU - dali not connected");
  1325. statsWu.setown(daliHelper->createStatsWorkUnit(wuid, query->queryDll()->queryName()));
  1326. }
  1327. else
  1328. {
  1329. statsWu.setown(createLocalWorkUnitFromPTree(createPTreeFromIPT(queryExtendedWU(query->queryWorkUnit())->queryPTree())));
  1330. }
  1331. query->gatherStats(statsWu, -1, reset);
  1332. for (unsigned channel = 0; channel < numChannels; channel++)
  1333. if (agentManagers->item(channel))
  1334. agentManagers->item(channel)->getStats(queryId, graphName, statsWu, channel+1, reset, logctx);
  1335. if (!wuid || *wuid=='*')
  1336. {
  1337. WorkunitUpdate wu(&statsWu->lock());
  1338. wu->setState(WUStateCompleted); // We don't set the state when updating existing workunits
  1339. }
  1340. reply.appendf("<Query id='%s'>\n", queryId);
  1341. if (wuid)
  1342. reply.appendf(" <wuid>%s</wuid>\n", statsWu->queryWuid());
  1343. else
  1344. {
  1345. Owned<IConstWUGraphIterator> graphs = &statsWu->getGraphs(GraphTypeActivities);
  1346. ForEach(*graphs)
  1347. {
  1348. IConstWUGraph &graph = graphs->query();
  1349. SCMStringBuffer s;
  1350. reply.appendf("<Graph id='%s'>\n <xgmml>\n", graph.getName(s).str());
  1351. Owned<IPropertyTree> xgmml = graph.getXGMMLTree(true, false); // We can't merge between nodes if we format the values
  1352. toXML(xgmml, reply, 2);
  1353. reply.append(" </xgmml>\n</Graph>\n");
  1354. }
  1355. }
  1356. reply.append("</Query>\n");
  1357. return true;
  1358. }
  1359. }
  1360. return false;
  1361. }
  1362. void getActivityMetrics(StringBuffer &reply) const
  1363. {
  1364. Owned<IRoxieQuerySetManager> serverManager;
  1365. Owned<CRoxieAgentQuerySetManagerSet> agentManagers;
  1366. getQueryManagers(serverManager, agentManagers);
  1367. serverManager->getActivityMetrics(reply);
  1368. for (unsigned channel = 0; channel < numChannels; channel++)
  1369. {
  1370. if (agentManagers->item(channel))
  1371. {
  1372. agentManagers->item(channel)->getActivityMetrics(reply);
  1373. }
  1374. }
  1375. }
  1376. void getAllQueryInfo(StringBuffer &reply, bool full, const IRoxieContextLogger &logctx) const
  1377. {
  1378. Owned<IRoxieQuerySetManager> serverManager;
  1379. Owned<CRoxieAgentQuerySetManagerSet> agentManagers;
  1380. getQueryManagers(serverManager, agentManagers);
  1381. serverManager->getAllQueryInfo(reply, full, agentManagers, logctx);
  1382. }
  1383. const char *queryQuerySetName()
  1384. {
  1385. return querySet;
  1386. }
  1387. // These are set at construction and not changed for the lifetime of the object
  1388. const Owned<const IRoxiePackageMap> packages;
  1389. const unsigned numChannels;
  1390. const hash64_t xmlHash;
  1391. const StringAttr querySet;
  1392. };
  1393. /**
  1394. * class CRoxieDaliQueryPackageManager - manages queries specified in QuerySets, for a given package set.
  1395. *
  1396. * If the QuerySet is modified, it will be reloaded.
  1397. * There is one CRoxieDaliQueryPackageManager for every PackageSet - only one will be active for query lookup
  1398. * at a given time (the one associated with the active PackageSet).
  1399. *
  1400. * To deploy new data, typically we will load a new PackageSet, make it active, then release the old one
  1401. * A packageSet is not modified while loaded, to avoid timing issues between agents and server.
  1402. *
  1403. * We need to be able to spot a change (in dali) to the active package indicator (and switch the active CRoxieDaliQueryPackageManager)
  1404. * We need to be able to spot a change (in dali) that adds a new PackageSet
  1405. * We need to decide what to do about a change (in dali) to an existing PackageSet. Maybe we allow it (leave it up to the gui to
  1406. * encourage changing in the right sequence). In which case a change to the package info in dali means reload all global package
  1407. * managers (and then discard the old ones). Hash-based queries means everything should work ok.
  1408. * -> If the active ptr changes, just change what is active
  1409. * If any change to any package set, reload all globalResourceManagers and discard prior
  1410. * The query caching code should ensure that it is quick enough to do so
  1411. *
  1412. **/
  1413. class CRoxieDaliQueryPackageManager : public CRoxieQueryPackageManager, implements ISafeSDSSubscription
  1414. {
  1415. Owned<IRoxieDaliHelper> daliHelper;
  1416. Owned<IDaliPackageWatcher> notifier;
  1417. public:
  1418. IMPLEMENT_IINTERFACE;
  1419. CRoxieDaliQueryPackageManager(unsigned _numChannels, const IRoxiePackageMap *_packages, const char *_querySet, hash64_t _xmlHash)
  1420. : CRoxieQueryPackageManager(_numChannels, _querySet, _packages, _xmlHash)
  1421. {
  1422. daliHelper.setown(connectToDali());
  1423. }
  1424. ~CRoxieDaliQueryPackageManager()
  1425. {
  1426. if (notifier)
  1427. daliHelper->releaseSubscription(notifier);
  1428. }
  1429. virtual ISafeSDSSubscription *linkIfAlive() override { return isAliveAndLink() ? this : nullptr; }
  1430. virtual void notify(SubscriptionId id, const char *xpath, SDSNotifyFlags flags, unsigned valueLen, const void *valueData)
  1431. {
  1432. reload(false);
  1433. daliHelper->commitCache();
  1434. }
  1435. virtual void load(bool forceReload)
  1436. {
  1437. notifier.setown(daliHelper->getQuerySetSubscription(querySet, this));
  1438. reload(forceReload);
  1439. }
  1440. virtual void reload(bool forceRetry)
  1441. {
  1442. hash64_t newHash = numChannels;
  1443. Owned<IPropertyTree> newQuerySet = daliHelper->getQuerySet(querySet);
  1444. Owned<CRoxieAgentQuerySetManagerSet> newAgentManagers = new CRoxieAgentQuerySetManagerSet(numChannels, querySet);
  1445. Owned<IRoxieQuerySetManager> newServerManager = createServerManager(querySet);
  1446. newServerManager->load(newQuerySet, *packages, newHash, forceRetry);
  1447. newAgentManagers->load(newQuerySet, *packages, newHash, forceRetry);
  1448. reloadQueryManagers(newAgentManagers.getClear(), newServerManager.getClear(), newHash);
  1449. clearKeyStoreCache(false); // Allows us to fully release files we no longer need because of unloaded queries
  1450. }
  1451. };
  1452. class CStandaloneQueryPackageManager : public CRoxieQueryPackageManager
  1453. {
  1454. Owned<IPropertyTree> standaloneDll;
  1455. public:
  1456. CStandaloneQueryPackageManager(unsigned _numChannels, const char *_querySet, const IRoxiePackageMap *_packages, IPropertyTree *_standaloneDll)
  1457. : CRoxieQueryPackageManager(_numChannels, _querySet, _packages, 0), standaloneDll(_standaloneDll)
  1458. {
  1459. assertex(standaloneDll);
  1460. }
  1461. ~CStandaloneQueryPackageManager()
  1462. {
  1463. }
  1464. virtual void load(bool forceReload)
  1465. {
  1466. hash64_t newHash = numChannels;
  1467. Owned<IPropertyTree> newQuerySet = createPTree("QuerySet", ipt_lowmem);
  1468. newQuerySet->setProp("@name", "_standalone");
  1469. newQuerySet->addPropTree("Query", standaloneDll.getLink());
  1470. Owned<CRoxieAgentQuerySetManagerSet> newAgentManagers = new CRoxieAgentQuerySetManagerSet(numChannels, querySet);
  1471. Owned<IRoxieQuerySetManager> newServerManager = createServerManager(querySet);
  1472. newServerManager->load(newQuerySet, *packages, newHash, forceReload);
  1473. newAgentManagers->load(newQuerySet, *packages, newHash, forceReload);
  1474. reloadQueryManagers(newAgentManagers.getClear(), newServerManager.getClear(), newHash);
  1475. }
  1476. };
  1477. static SpinLock roxieDebugSessionManagerLock;
  1478. extern IRoxieDebugSessionManager &queryRoxieDebugSessionManager()
  1479. {
  1480. SpinBlock b(roxieDebugSessionManagerLock);
  1481. if (!debugSessionManager)
  1482. debugSessionManager = new CRoxieDebugSessionManager();
  1483. return *debugSessionManager;
  1484. }
  1485. class CRoxiePackageSetWatcher : public CInterface
  1486. {
  1487. public:
  1488. CRoxiePackageSetWatcher(IRoxieDaliHelper *_daliHelper, unsigned numChannels, CRoxiePackageSetWatcher *oldPackages, bool forceReload)
  1489. : daliHelper(_daliHelper), stateHash(0)
  1490. {
  1491. ForEachItemIn(idx, allQuerySetNames)
  1492. {
  1493. createQueryPackageManagers(numChannels, allQuerySetNames.item(idx), oldPackages, forceReload);
  1494. }
  1495. }
  1496. CRoxiePackageSetWatcher(IRoxieDaliHelper *_daliHelper, const IQueryDll *standAloneDll, unsigned numChannels, const char *querySet, bool forceReload)
  1497. : daliHelper(_daliHelper), stateHash(0)
  1498. {
  1499. Owned<IPropertyTree> standAloneDllTree;
  1500. standAloneDllTree.setown(createPTree("Query", ipt_lowmem));
  1501. standAloneDllTree->setProp("@id", "roxie");
  1502. standAloneDllTree->setProp("@dll", standAloneDll->queryDll()->queryName());
  1503. Owned<CRoxieQueryPackageManager> qpm = new CStandaloneQueryPackageManager(numChannels, querySet, LINK(&queryEmptyRoxiePackageMap()), standAloneDllTree.getClear());
  1504. qpm->load(forceReload);
  1505. stateHash = rtlHash64Data(sizeof(stateHash), &stateHash, qpm->getHash());
  1506. allQueryPackages.append(*qpm.getClear());
  1507. }
  1508. IQueryFactory *lookupLibrary(const char *libraryName, unsigned expectedInterfaceHash, const IRoxieContextLogger &logctx) const
  1509. {
  1510. ForEachItemIn(idx, allQueryPackages)
  1511. {
  1512. Owned<IRoxieQuerySetManager> sm = allQueryPackages.item(idx).getRoxieServerManager();
  1513. if (sm->isActive())
  1514. {
  1515. Owned<IQueryFactory> library = sm->getQuery(libraryName, NULL, logctx);
  1516. if (library)
  1517. {
  1518. if (library->isQueryLibrary())
  1519. {
  1520. unsigned foundInterfaceHash = library->getQueryLibraryInterfaceHash();
  1521. if (!foundInterfaceHash || (foundInterfaceHash == expectedInterfaceHash))
  1522. return library.getClear();
  1523. else
  1524. throw MakeStringException(ROXIE_LIBRARY_ERROR, "The library interface found in %s is not compatible (found %d, expected %d)", libraryName, foundInterfaceHash, expectedInterfaceHash);
  1525. }
  1526. else
  1527. throw MakeStringException(ROXIE_LIBRARY_ERROR, "The query resolved by %s is not a library", libraryName);
  1528. }
  1529. }
  1530. }
  1531. throw MakeStringException(ROXIE_LIBRARY_ERROR, "No library available for %s", libraryName);
  1532. }
  1533. IQueryFactory *getQuery(const char *id, StringBuffer *querySet, IArrayOf<IQueryFactory> *agentQueries, const IRoxieContextLogger &logctx) const
  1534. {
  1535. if (querySet && querySet->length() && !allQuerySetNames.contains(querySet->str()))
  1536. throw MakeStringException(ROXIE_INVALID_TARGET, "Target %s not found", querySet->str());
  1537. ForEachItemIn(idx, allQueryPackages)
  1538. {
  1539. Owned<IRoxieQuerySetManager> sm = allQueryPackages.item(idx).getRoxieServerManager();
  1540. if (sm->isActive())
  1541. {
  1542. IQueryFactory *query = sm->getQuery(id, querySet, logctx);
  1543. if (query)
  1544. {
  1545. if (agentQueries)
  1546. {
  1547. Owned<IRoxieQuerySetManagerSet> agentManagers = allQueryPackages.item(idx).getRoxieAgentManagers();
  1548. agentManagers->getQueries(id, *agentQueries, logctx);
  1549. }
  1550. return query;
  1551. }
  1552. }
  1553. }
  1554. return NULL;
  1555. }
  1556. int getActivePackageCount() const
  1557. {
  1558. int count = 0;
  1559. ForEachItemIn(idx, allQueryPackages)
  1560. {
  1561. Owned<IRoxieQuerySetManager> sm = allQueryPackages.item(idx).getRoxieServerManager();
  1562. if (sm->isActive())
  1563. count++;
  1564. }
  1565. return count;
  1566. }
  1567. inline hash64_t queryHash() const
  1568. {
  1569. return stateHash;
  1570. }
  1571. void getAllQueryInfo(StringBuffer &reply, bool full, const IRoxieContextLogger &logctx) const
  1572. {
  1573. ForEachItemIn(idx, allQueryPackages)
  1574. {
  1575. Owned<IRoxieQuerySetManager> serverManager = allQueryPackages.item(idx).getRoxieServerManager();
  1576. if (serverManager->isActive())
  1577. {
  1578. Owned<IRoxieQuerySetManagerSet> agentManagers = allQueryPackages.item(idx).getRoxieAgentManagers();
  1579. serverManager->getAllQueryInfo(reply, full, agentManagers, logctx);
  1580. }
  1581. }
  1582. }
  1583. void getActivityMetrics(StringBuffer &reply) const
  1584. {
  1585. ForEachItemIn(idx, allQueryPackages)
  1586. {
  1587. CRoxieQueryPackageManager &qpm = allQueryPackages.item(idx);
  1588. qpm.getActivityMetrics(reply);
  1589. }
  1590. }
  1591. void getInfo(StringBuffer &reply, const IRoxieContextLogger &logctx) const
  1592. {
  1593. reply.append("<PackageSets>\n");
  1594. ForEachItemIn(idx, allQueryPackages)
  1595. {
  1596. allQueryPackages.item(idx).getInfo(reply, logctx);
  1597. }
  1598. reply.append("</PackageSets>\n");
  1599. }
  1600. void getStats(StringBuffer &reply, const char *id, const char *graphName, const char *wuid, const IRoxieContextLogger &logctx) const
  1601. {
  1602. ForEachItemIn(idx, allQueryPackages)
  1603. {
  1604. if (allQueryPackages.item(idx).getStats(id, graphName, reply, wuid, logctx))
  1605. return;
  1606. }
  1607. }
  1608. void resetStats(const char *target, const char *id, const IRoxieContextLogger &logctx) const
  1609. {
  1610. bool matched = false;
  1611. ForEachItemIn(idx, allQueryPackages)
  1612. {
  1613. CRoxieQueryPackageManager &queryPackage = allQueryPackages.item(idx);
  1614. if (target && *target && !strieq(queryPackage.queryQuerySetName(), target))
  1615. continue;
  1616. if (allQueryPackages.item(idx).resetStats(id, logctx))
  1617. {
  1618. if (target && *target)
  1619. return;
  1620. matched = true;
  1621. }
  1622. }
  1623. if (!matched && id && *id)
  1624. throw MakeStringException(ROXIE_UNKNOWN_QUERY, "Unknown query %s", id);
  1625. }
  1626. private:
  1627. CIArrayOf<CRoxieQueryPackageManager> allQueryPackages;
  1628. Linked<IRoxieDaliHelper> daliHelper;
  1629. hash64_t stateHash;
  1630. CRoxieQueryPackageManager *getPackageManager(const char *id)
  1631. {
  1632. ForEachItemIn(idx, allQueryPackages)
  1633. {
  1634. CRoxieQueryPackageManager &pm = allQueryPackages.item(idx);
  1635. if (strcmp(pm.queryPackageId(), id)==0)
  1636. return LINK(&pm);
  1637. }
  1638. return NULL;
  1639. }
  1640. void createQueryPackageManager(unsigned numChannels, const IRoxiePackageMap *packageMap, const char *querySet, hash64_t xmlHash, bool forceReload)
  1641. {
  1642. Owned<CRoxieQueryPackageManager> qpm = new CRoxieDaliQueryPackageManager(numChannels, packageMap, querySet, xmlHash);
  1643. qpm->load(forceReload);
  1644. stateHash = rtlHash64Data(sizeof(stateHash), &stateHash, qpm->getHash());
  1645. allQueryPackages.append(*qpm.getClear());
  1646. }
  1647. void createQueryPackageManagers(unsigned numChannels, const char *querySet, CRoxiePackageSetWatcher *oldPackages, bool forceReload)
  1648. {
  1649. int loadedPackages = 0;
  1650. int activePackages = 0;
  1651. Owned<IPropertyTree> packageTree = daliHelper->getPackageSets();
  1652. Owned<IPropertyTreeIterator> packageSets = packageTree->getElements("PackageSet");
  1653. ForEach(*packageSets)
  1654. {
  1655. IPropertyTree &ps = packageSets->query();
  1656. const char *packageSetSpec = ps.queryProp("@process");
  1657. if (!packageSetSpec || WildMatch(roxieName, packageSetSpec, false))
  1658. {
  1659. if (traceLevel)
  1660. {
  1661. DBGLOG("Loading package set %s, process spec %s", ps.queryProp("@id") ? ps.queryProp("@id") : "<no-id>",
  1662. packageSetSpec ? packageSetSpec : "<*>");
  1663. }
  1664. Owned<IPropertyTreeIterator> packageMaps = ps.getElements("PackageMap");
  1665. ForEach(*packageMaps)
  1666. {
  1667. IPropertyTree &pm = packageMaps->query();
  1668. const char *packageMapId = pm.queryProp("@id");
  1669. const char *packageMapFilter = pm.queryProp("@querySet");
  1670. if (packageMapId && *packageMapId && (!packageMapFilter || WildMatch(querySet, packageMapFilter, false)))
  1671. {
  1672. try
  1673. {
  1674. bool isActive = pm.getPropBool("@active", true);
  1675. Owned<IPropertyTree> xml = daliHelper->getPackageMap(packageMapId);
  1676. hash64_t xmlHash = hashXML(xml);
  1677. Owned<CRoxieQueryPackageManager> oldPackageManager;
  1678. if (oldPackages)
  1679. {
  1680. oldPackageManager.setown(oldPackages->getPackageManager(packageMapId));
  1681. }
  1682. if (oldPackageManager && oldPackageManager->matches(xmlHash, isActive))
  1683. {
  1684. if (traceLevel)
  1685. DBGLOG("Package map %s, active %s already loaded", packageMapId, isActive ? "true" : "false");
  1686. stateHash = rtlHash64Data(sizeof(stateHash), &stateHash, oldPackageManager->getHash());
  1687. allQueryPackages.append(*oldPackageManager.getClear());
  1688. }
  1689. else
  1690. {
  1691. if (traceLevel)
  1692. DBGLOG("Loading package map %s, active %s", packageMapId, isActive ? "true" : "false");
  1693. Owned<CRoxiePackageMap> packageMap = new CRoxiePackageMap(packageMapId, packageMapFilter, isActive);
  1694. packageMap->load(xml);
  1695. createQueryPackageManager(numChannels, packageMap.getLink(), querySet, xmlHash, forceReload);
  1696. }
  1697. loadedPackages++;
  1698. if (isActive)
  1699. activePackages++;
  1700. }
  1701. catch (IException *E)
  1702. {
  1703. StringBuffer msg;
  1704. msg.appendf("Failed to load package map %s", packageMapId);
  1705. EXCLOG(E, msg.str());
  1706. E->Release();
  1707. }
  1708. }
  1709. }
  1710. }
  1711. }
  1712. if (!loadedPackages)
  1713. {
  1714. if (traceLevel)
  1715. DBGLOG("Loading empty package for QuerySet %s", querySet);
  1716. createQueryPackageManager(numChannels, LINK(&queryEmptyRoxiePackageMap()), querySet, 0, forceReload);
  1717. }
  1718. else if (traceLevel)
  1719. DBGLOG("Loaded %d packages (%d active)", loadedPackages, activePackages);
  1720. }
  1721. };
  1722. class CRoxiePackageSetManager : implements IRoxieQueryPackageManagerSet, implements ISafeSDSSubscription, public CInterface
  1723. {
  1724. public:
  1725. IMPLEMENT_IINTERFACE;
  1726. CRoxiePackageSetManager(const IQueryDll *_standAloneDll) :
  1727. standAloneDll(_standAloneDll), autoReloadThread(*this)
  1728. {
  1729. if (topology && topology->getPropBool("@lockDali", false))
  1730. daliHelper.setown(connectToDali());
  1731. else
  1732. daliHelper.setown(connectToDali(ROXIE_DALI_CONNECT_TIMEOUT));
  1733. autoPending = 0;
  1734. autoSignalsPending = 0;
  1735. forcePending = false;
  1736. pSetsNotifier.setown(daliHelper->getPackageSetsSubscription(this));
  1737. pMapsNotifier.setown(daliHelper->getPackageMapsSubscription(this));
  1738. }
  1739. ~CRoxiePackageSetManager()
  1740. {
  1741. autoReloadThread.stop();
  1742. autoReloadThread.join();
  1743. if (pSetsNotifier)
  1744. daliHelper->releaseSubscription(pSetsNotifier);
  1745. if (pMapsNotifier)
  1746. daliHelper->releaseSubscription(pMapsNotifier);
  1747. }
  1748. virtual ISafeSDSSubscription *linkIfAlive() override { return isAliveAndLink() ? this : nullptr; }
  1749. void requestReload(bool signal, bool force)
  1750. {
  1751. if (force)
  1752. forcePending = true;
  1753. if (signal)
  1754. ++autoSignalsPending;
  1755. ++autoPending;
  1756. autoReloadTrigger.signal();
  1757. if (signal)
  1758. autoReloadComplete.wait();
  1759. }
  1760. virtual void load()
  1761. {
  1762. try
  1763. {
  1764. reload(false);
  1765. daliHelper->commitCache();
  1766. controlSem.signal();
  1767. autoReloadThread.start(); // Don't want to overlap auto-reloads with the initial load
  1768. }
  1769. catch(IException *E)
  1770. {
  1771. EXCLOG(E, "No configuration could be loaded");
  1772. controlSem.interrupt();
  1773. throw; // Roxie will refuse to start up if configuration invalid
  1774. }
  1775. }
  1776. virtual void doControlMessage(IPropertyTree *xml, StringBuffer &reply, const IRoxieContextLogger &logctx)
  1777. {
  1778. if (!controlSem.wait(20000))
  1779. throw MakeStringException(ROXIE_TIMEOUT, "Timed out waiting for current control query to complete");
  1780. try
  1781. {
  1782. _doControlMessage(xml, reply, logctx);
  1783. reply.append(" <Status>ok</Status>\n");
  1784. }
  1785. catch(IException *E)
  1786. {
  1787. controlSem.signal();
  1788. EXCLOG(E);
  1789. throw;
  1790. }
  1791. catch(...)
  1792. {
  1793. controlSem.signal();
  1794. throw;
  1795. }
  1796. controlSem.signal();
  1797. }
  1798. virtual IQueryFactory *lookupLibrary(const char *libraryName, unsigned expectedInterfaceHash, const IRoxieContextLogger &logctx) const
  1799. {
  1800. ReadLockBlock b(packageCrit);
  1801. return allQueryPackages->lookupLibrary(libraryName, expectedInterfaceHash, logctx);
  1802. }
  1803. virtual IQueryFactory *getQuery(const char *id, StringBuffer *querySet, IArrayOf<IQueryFactory> *agentQueries, const IRoxieContextLogger &logctx) const
  1804. {
  1805. ReadLockBlock b(packageCrit);
  1806. return allQueryPackages->getQuery(id, querySet, agentQueries, logctx);
  1807. }
  1808. virtual int getActivePackageCount() const
  1809. {
  1810. ReadLockBlock b(packageCrit);
  1811. return allQueryPackages->getActivePackageCount();
  1812. }
  1813. virtual void notify(SubscriptionId id, const char *xpath, SDSNotifyFlags flags, unsigned valueLen, const void *valueData)
  1814. {
  1815. requestReload(false, false);
  1816. }
  1817. private:
  1818. Owned<const IQueryDll> standAloneDll;
  1819. Owned<CRoxieDebugSessionManager> debugSessionManager;
  1820. Owned<IRoxieDaliHelper> daliHelper;
  1821. Owned<IDaliPackageWatcher> pSetsNotifier;
  1822. Owned<IDaliPackageWatcher> pMapsNotifier;
  1823. mutable ReadWriteLock packageCrit;
  1824. InterruptableSemaphore controlSem;
  1825. Owned<CRoxiePackageSetWatcher> allQueryPackages;
  1826. Semaphore autoReloadTrigger;
  1827. Semaphore autoReloadComplete;
  1828. std::atomic<unsigned> autoSignalsPending;
  1829. std::atomic<unsigned> autoPending;
  1830. bool forcePending;
  1831. class AutoReloadThread : public Thread
  1832. {
  1833. std::atomic<bool> closing;
  1834. CRoxiePackageSetManager &owner;
  1835. public:
  1836. AutoReloadThread(CRoxiePackageSetManager &_owner)
  1837. : Thread("AutoReloadThread"), owner(_owner)
  1838. {
  1839. closing = false;
  1840. }
  1841. virtual int run()
  1842. {
  1843. if (traceLevel)
  1844. DBGLOG("AutoReloadThread %p starting", this);
  1845. while (!closing)
  1846. {
  1847. owner.autoReloadTrigger.wait();
  1848. if (closing)
  1849. break;
  1850. unsigned signalsPending = owner.autoSignalsPending;
  1851. if (!signalsPending)
  1852. Sleep(500); // Typically notifications come in clumps - this avoids reloading too often
  1853. if (owner.autoPending)
  1854. {
  1855. owner.autoPending = 0;
  1856. try
  1857. {
  1858. owner.reload(owner.forcePending);
  1859. }
  1860. catch (IException *E)
  1861. {
  1862. if (!closing)
  1863. EXCLOG(MCoperatorError, E, "AutoReloadThread: ");
  1864. E->Release();
  1865. }
  1866. catch (...)
  1867. {
  1868. IERRLOG("Unknown exception in AutoReloadThread");
  1869. }
  1870. owner.forcePending = false;
  1871. }
  1872. if (signalsPending)
  1873. {
  1874. owner.autoSignalsPending--;
  1875. owner.autoReloadComplete.signal();
  1876. }
  1877. }
  1878. if (traceLevel)
  1879. DBGLOG("AutoReloadThread %p exiting", this);
  1880. return 0;
  1881. }
  1882. void stop()
  1883. {
  1884. closing = true;
  1885. owner.autoReloadTrigger.signal();
  1886. }
  1887. } autoReloadThread;
  1888. void reload(bool forceRetry)
  1889. {
  1890. clearDaliMisses();
  1891. // We want to kill the old packages, but not until we have created the new ones
  1892. // So that the query/dll caching will work for anything that is not affected by the changes
  1893. Owned<CRoxiePackageSetWatcher> newPackages;
  1894. if (standAloneDll)
  1895. newPackages.setown(new CRoxiePackageSetWatcher(daliHelper, standAloneDll, numChannels, "roxie", forceRetry));
  1896. else
  1897. {
  1898. Owned<CRoxiePackageSetWatcher> currentPackages;
  1899. {
  1900. ReadLockBlock b(packageCrit);
  1901. currentPackages.setown(allQueryPackages.getLink());
  1902. }
  1903. newPackages.setown(new CRoxiePackageSetWatcher(daliHelper, numChannels, currentPackages, forceRetry));
  1904. }
  1905. // Hold the lock for as little time as we can
  1906. // Note that we must NOT hold the lock during the delete of the old object - or we deadlock.
  1907. // Hence the slightly convoluted code below
  1908. Owned<CRoxiePackageSetWatcher> oldPackages; // NB Destroyed outside the WriteLockBlock
  1909. {
  1910. WriteLockBlock b(packageCrit);
  1911. oldPackages.setown(allQueryPackages.getLink()); // Ensure we don't delete the old packages until after we have loaded the new
  1912. allQueryPackages.setown(newPackages.getClear());
  1913. }
  1914. daliHelper->commitCache();
  1915. }
  1916. // Common code used by control:queries and control:getQueryXrefInfo
  1917. void getQueryInfo(IPropertyTree *control, StringBuffer &reply, bool full, const IRoxieContextLogger &logctx) const
  1918. {
  1919. Owned<IPropertyTreeIterator> ids = control->getElements("Query");
  1920. reply.append("<Queries reporting='1'>\n");
  1921. if (ids->first())
  1922. {
  1923. ForEach(*ids)
  1924. {
  1925. const char *id = ids->query().queryProp("@id");
  1926. if (id)
  1927. {
  1928. IArrayOf<IQueryFactory> agentQueries;
  1929. Owned<IQueryFactory> query = getQuery(id, NULL, &agentQueries, logctx);
  1930. if (query)
  1931. query->getQueryInfo(reply, full, &agentQueries, logctx);
  1932. else
  1933. reply.appendf(" <Query id=\"%s\" error=\"Query not found\"/>\n", id);
  1934. }
  1935. }
  1936. }
  1937. else
  1938. {
  1939. ReadLockBlock readBlock(packageCrit);
  1940. allQueryPackages->getAllQueryInfo(reply, full, logctx);
  1941. }
  1942. reply.append("</Queries>\n");
  1943. }
  1944. void _doControlMessage(IPropertyTree *control, StringBuffer &reply, const IRoxieContextLogger &logctx)
  1945. {
  1946. const char *queryName = control->queryName();
  1947. logctx.CTXLOG("doControlMessage - %s", queryName);
  1948. assertex(memicmp(queryName, "control:", 8) == 0);
  1949. bool unknown = false;
  1950. switch (_toupper(queryName[8]))
  1951. {
  1952. case 'A':
  1953. if (stricmp(queryName, "control:aclupdate") == 0)
  1954. {
  1955. // MORE - do nothing for now - possibly needed in the future - leave this so no exception is thrown
  1956. }
  1957. else if (stricmp(queryName, "control:activeQueries")==0)
  1958. {
  1959. if (debugSessionManager)
  1960. debugSessionManager->getActiveQueries(reply);
  1961. }
  1962. else if (stricmp(queryName, "control:activitymetrics")==0)
  1963. {
  1964. ReadLockBlock readBlock(packageCrit);
  1965. allQueryPackages->getActivityMetrics(reply);
  1966. }
  1967. else if (stricmp(queryName, "control:alive")==0)
  1968. {
  1969. reply.appendf("<Alive restarts='%d'/>", restarts.load());
  1970. }
  1971. else
  1972. unknown = true;
  1973. break;
  1974. case 'B':
  1975. if (stricmp(queryName, "control:blobCacheMem")==0)
  1976. {
  1977. blobCacheMB = control->getPropInt("@val", 0);
  1978. topology->setPropInt("@blobCacheMem", blobCacheMB);
  1979. setBlobCacheMem(blobCacheMB * 0x100000);
  1980. }
  1981. else
  1982. unknown = true;
  1983. break;
  1984. case 'C':
  1985. if (stricmp(queryName, "control:cacheInfo")==0)
  1986. {
  1987. bool clear = control->getPropBool("@clear", false);
  1988. unsigned channel = control->getPropInt("@channel", -1);
  1989. if (clear)
  1990. queryFileCache().clearOsCache();
  1991. else
  1992. {
  1993. reply.append(" <CacheInfo");
  1994. if (channel != (unsigned) -1)
  1995. reply.appendf(" channel='%u'", channel);
  1996. reply.append(">\n");
  1997. queryFileCache().reportOsCache(reply, channel);
  1998. reply.appendf(" </CacheInfo>\n");
  1999. }
  2000. }
  2001. else if (stricmp(queryName, "control:checkCompleted")==0)
  2002. {
  2003. checkCompleted = control->getPropBool("@val", true);
  2004. topology->setPropBool("@checkCompleted", checkCompleted );
  2005. }
  2006. else if (stricmp(queryName, "control:checkingHeap")==0)
  2007. {
  2008. defaultCheckingHeap = control->getPropBool("@val", true);
  2009. topology->setPropInt("@checkingHeap", defaultCheckingHeap);
  2010. }
  2011. else if (stricmp(queryName, "control:clearIndexCache")==0)
  2012. {
  2013. bool clearAll = control->getPropBool("@clearAll", true);
  2014. clearKeyStoreCache(clearAll);
  2015. }
  2016. else if (stricmp(queryName, "control:closedown")==0)
  2017. {
  2018. closedown();
  2019. }
  2020. else if (stricmp(queryName, "control:closeExpired")==0)
  2021. {
  2022. queryFileCache().closeExpired(false);
  2023. queryFileCache().closeExpired(true);
  2024. }
  2025. else if (stricmp(queryName, "control:closeLocalExpired")==0)
  2026. {
  2027. queryFileCache().closeExpired(false);
  2028. }
  2029. else if (stricmp(queryName, "control:closeRemoteExpired")==0)
  2030. {
  2031. queryFileCache().closeExpired(true);
  2032. }
  2033. else
  2034. unknown = true;
  2035. break;
  2036. case 'D':
  2037. if (stricmp(queryName, "control:dafilesrvLookupTimeout")==0)
  2038. {
  2039. dafilesrvLookupTimeout = control->getPropInt("@val", 10000);
  2040. topology->setPropInt("@dafilesrvLookupTimeout", dafilesrvLookupTimeout);
  2041. setRemoteFileTimeouts(dafilesrvLookupTimeout, 0);
  2042. }
  2043. else if (stricmp(queryName, "control:defaultConcatPreload")==0)
  2044. {
  2045. defaultConcatPreload = control->getPropInt("@val", 0);
  2046. topology->setPropInt("@defaultConcatPreload", defaultConcatPreload);
  2047. }
  2048. else if (stricmp(queryName, "control:defaultFetchPreload")==0)
  2049. {
  2050. defaultFetchPreload = control->getPropInt("@val", 0);
  2051. topology->setPropInt("@defaultFetchPreload", defaultFetchPreload);
  2052. }
  2053. else if (stricmp(queryName, "control:defaultFullKeyedJoinPreload")==0)
  2054. {
  2055. defaultFullKeyedJoinPreload = control->getPropInt("@val", 0);
  2056. topology->setPropInt("@defaultFullKeyedJoinPreload", defaultFullKeyedJoinPreload);
  2057. }
  2058. else if (stricmp(queryName, "control:defaultHighPriorityTimeLimit")==0)
  2059. {
  2060. defaultTimeLimit[1] = control->getPropInt("@limit", 0);
  2061. topology->setPropInt("@defaultHighPriorityTimeLimit", defaultTimeLimit[1]);
  2062. }
  2063. else if (stricmp(queryName, "control:defaultHighPriorityTimeWarning")==0)
  2064. {
  2065. defaultWarnTimeLimit[1] = control->getPropInt("@limit", 0);
  2066. topology->setPropInt("@defaultHighPriorityTimeWarning", defaultWarnTimeLimit[1]);
  2067. }
  2068. else if (stricmp(queryName, "control:defaultKeyedJoinPreload")==0)
  2069. {
  2070. defaultKeyedJoinPreload = control->getPropInt("@val", 0);
  2071. topology->setPropInt("@defaultKeyedJoinPreload", defaultKeyedJoinPreload);
  2072. }
  2073. else if (stricmp(queryName, "control:defaultLowPriorityTimeLimit")==0)
  2074. {
  2075. defaultTimeLimit[0] = control->getPropInt("@limit", 0);
  2076. topology->setPropInt("@defaultLowPriorityTimeLimit", defaultTimeLimit[0]);
  2077. }
  2078. else if (stricmp(queryName, "control:defaultLowPriorityTimeWarning")==0)
  2079. {
  2080. defaultWarnTimeLimit[0] = control->getPropInt("@limit", 0);
  2081. topology->setPropInt("@defaultLowPriorityTimeWarning", defaultWarnTimeLimit[0]);
  2082. }
  2083. else if (stricmp(queryName, "control:defaultParallelJoinPreload")==0)
  2084. {
  2085. defaultParallelJoinPreload = control->getPropInt("@val", 0);
  2086. topology->setPropInt("@defaultParallelJoinPreload", defaultParallelJoinPreload);
  2087. }
  2088. else if (stricmp(queryName, "control:defaultSLAPriorityTimeLimit")==0)
  2089. {
  2090. defaultTimeLimit[2] = control->getPropInt("@limit", 0);
  2091. topology->setPropInt("@defaultSLAPriorityTimeLimit", defaultTimeLimit[2]);
  2092. }
  2093. else if (stricmp(queryName, "control:defaultSLAPriorityTimeWarning")==0)
  2094. {
  2095. defaultWarnTimeLimit[2] = control->getPropInt("@limit", 0);
  2096. topology->setPropInt("@defaultSLAPriorityTimeWarning", defaultWarnTimeLimit[2]);
  2097. }
  2098. else if (stricmp(queryName, "control:deleteUnneededPhysicalFiles")==0)
  2099. {
  2100. UNIMPLEMENTED;
  2101. }
  2102. else if (stricmp(queryName, "control:deleteUnneededQueryCacheFiles")==0)
  2103. {
  2104. UNIMPLEMENTED;
  2105. }
  2106. else if (stricmp(queryName, "control:doIbytiDelay")==0)
  2107. { // WARNING: use with extra care only during inactivity on system
  2108. doIbytiDelay = control->getPropBool("@val", true);
  2109. topology->setPropBool("@doIbytiDelay", doIbytiDelay);
  2110. }
  2111. else
  2112. unknown = true;
  2113. break;
  2114. case 'F':
  2115. if (stricmp(queryName, "control:fieldTranslationEnabled")==0)
  2116. {
  2117. const char *val = control->queryProp("@val");
  2118. if (val)
  2119. fieldTranslationEnabled = getTranslationMode(val, false);
  2120. else
  2121. fieldTranslationEnabled = RecordTranslationMode::Payload;
  2122. val = getTranslationModeText(fieldTranslationEnabled);
  2123. topology->setProp("@fieldTranslationEnabled", val);
  2124. }
  2125. else if (stricmp(queryName, "control:flushJHtreeCacheOnOOM")==0)
  2126. {
  2127. flushJHtreeCacheOnOOM = control->getPropBool("@val", true);
  2128. topology->setPropInt("@flushJHtreeCacheOnOOM", flushJHtreeCacheOnOOM);
  2129. }
  2130. else
  2131. unknown = true;
  2132. break;
  2133. case 'G':
  2134. if (stricmp(queryName, "control:getACLinfo") == 0)
  2135. {
  2136. // MORE - do nothing for now - possibly needed in the future - leave this so no exception is thrown
  2137. }
  2138. else if (stricmp(queryName, "control:getClusterName")==0)
  2139. {
  2140. reply.appendf("<clusterName id='%s'/>", roxieName.str());
  2141. }
  2142. else if (stricmp(queryName, "control:getQueryXrefInfo")==0)
  2143. {
  2144. getQueryInfo(control, reply, true, logctx);
  2145. }
  2146. else if (stricmp(queryName, "control:getQuery")==0)
  2147. {
  2148. const char* id = control->queryProp("@id");
  2149. if (!id)
  2150. throw MakeStringException(ROXIE_MISSING_PARAMS, "No query name specified");
  2151. Owned<IQueryFactory> q = getQuery(id, NULL, NULL, logctx);
  2152. if (q)
  2153. {
  2154. Owned<IPropertyTree> tempTree = q->cloneQueryXGMML();
  2155. tempTree->setProp("@roxieName", roxieName.str());
  2156. toXML(tempTree, reply);
  2157. }
  2158. else
  2159. throw MakeStringException(ROXIE_UNKNOWN_QUERY, "Unknown query %s", id);
  2160. }
  2161. else if (stricmp(queryName, "control:getQueryWarningTime")==0)
  2162. {
  2163. const char *id = control->queryProp("Query/@id");
  2164. if (!id)
  2165. badFormat();
  2166. Owned<IQueryFactory> f = getQuery(id, NULL, NULL, logctx);
  2167. if (f)
  2168. {
  2169. unsigned warnLimit = f->queryOptions().warnTimeLimit;
  2170. reply.appendf("<QueryTimeWarning val='%d'/>", warnLimit);
  2171. }
  2172. }
  2173. else if (stricmp(queryName, "control:getBuildVersion")==0)
  2174. {
  2175. reply.appendf("<version id='%s'/>", hpccBuildInfo.buildTag);
  2176. }
  2177. else
  2178. unknown = true;
  2179. break;
  2180. case 'I':
  2181. if (stricmp(queryName, "control:indexmetrics")==0)
  2182. {
  2183. getIndexMetrics(reply);
  2184. }
  2185. else if (stricmp(queryName, "control:inMemoryKeysEnabled")==0)
  2186. {
  2187. inMemoryKeysEnabled = control->getPropBool("@val", true);
  2188. topology->setPropBool("@inMemoryKeysEnabled", inMemoryKeysEnabled);
  2189. }
  2190. else
  2191. unknown = true;
  2192. break;
  2193. case 'L':
  2194. if (stricmp(queryName, "control:leafCacheMem")==0)
  2195. {
  2196. leafCacheMB = control->getPropInt("@val", 50);
  2197. topology->setPropInt("@leafCacheMem", leafCacheMB);
  2198. setLeafCacheMem(leafCacheMB * 0x100000);
  2199. }
  2200. else if (stricmp(queryName, "control:legacyNodeCache")==0)
  2201. {
  2202. bool legacyNodeCache = control->getPropBool("@val", true);
  2203. topology->setPropInt("@legacyNodeCache", legacyNodeCache);
  2204. setLegacyNodeCache(legacyNodeCache);
  2205. }
  2206. else if (stricmp(queryName, "control:listFileOpenErrors")==0)
  2207. {
  2208. // this just creates a delta state file to remove references to Keys / Files we now longer have interest in
  2209. StringAttrMapping *mapping = queryFileCache().queryFileErrorList();
  2210. HashIterator iter(*mapping);
  2211. StringBuffer err;
  2212. for (iter.first(); iter.isValid(); iter.next())
  2213. {
  2214. IMapping &cur = iter.query();
  2215. StringAttr *item = mapping->mapToValue(&cur);
  2216. const char *filename = (const char*)cur.getKey();
  2217. const char *filetype = item->get();
  2218. reply.appendf("<file><name>%s</name><type>%s</type></file>", filename, filetype);
  2219. }
  2220. }
  2221. else if (stricmp(queryName, "control:listUnusedFiles")==0)
  2222. {
  2223. UNIMPLEMENTED;
  2224. }
  2225. else if (stricmp(queryName, "control:lockDali")==0)
  2226. {
  2227. if (adhocRoxie)
  2228. throw MakeStringException(ROXIE_CONTROL_MSG_ERROR, "Cannot lock dali when listening for workunits");
  2229. topology->setPropBool("@lockDali", true);
  2230. if (daliHelper)
  2231. daliHelper->disconnect();
  2232. saveTopology();
  2233. }
  2234. else if (stricmp(queryName, "control:logfullqueries")==0)
  2235. {
  2236. logFullQueries = control->getPropBool("@val", true);
  2237. topology->setPropBool("@logFullQueries", logFullQueries);
  2238. }
  2239. else
  2240. unknown = true;
  2241. break;
  2242. case 'M':
  2243. if (stricmp(queryName, "control:memoryStatsInterval")==0)
  2244. {
  2245. memoryStatsInterval = (unsigned) control->getPropInt64("@val", 0);
  2246. roxiemem::setMemoryStatsInterval(memoryStatsInterval);
  2247. topology->setPropInt64("@memoryStatsInterval", memoryStatsInterval);
  2248. }
  2249. else if (stricmp(queryName, "control:memtrace")==0)
  2250. {
  2251. unsigned memTraceLevel = control->getPropInt("@level", 0);
  2252. roxiemem::setMemTraceLevel(memTraceLevel);
  2253. topology->setPropInt("@memTraceLevel", memTraceLevel);
  2254. }
  2255. else if (stricmp(queryName, "control:memtracesizelimit")==0)
  2256. {
  2257. memsize_t memTraceSizeLimit = (memsize_t) control->getPropInt64("@val", control->getPropInt64("@value", 0)); // used to accept @value so coded like this for backward compatibility
  2258. roxiemem::setMemTraceSizeLimit(memTraceSizeLimit);
  2259. topology->setPropInt64("@memTraceSizeLimit", memTraceSizeLimit);
  2260. }
  2261. else if (stricmp(queryName, "control:metrics")==0)
  2262. {
  2263. roxieMetrics->getMetrics(reply);
  2264. }
  2265. else if (stricmp(queryName, "control:minFreeDiskSpace")==0)
  2266. {
  2267. minFreeDiskSpace = (unsigned) control->getPropInt64("@val", 1048576);
  2268. topology->setPropInt64("@minFreeDiskSpace", minFreeDiskSpace);
  2269. }
  2270. else if (stricmp(queryName, "control:misctrace")==0)
  2271. {
  2272. miscDebugTraceLevel = control->getPropInt("@level", 0);
  2273. topology->setPropInt("@miscDebugTraceLevel", miscDebugTraceLevel);
  2274. }
  2275. else
  2276. unknown = true;
  2277. break;
  2278. case 'N':
  2279. if (stricmp(queryName, "control:nodeCacheMem")==0)
  2280. {
  2281. nodeCacheMB = control->getPropInt("@val", 100);
  2282. topology->setPropInt("@nodeCacheMem", nodeCacheMB);
  2283. setNodeCacheMem(nodeCacheMB * 0x100000);
  2284. }
  2285. else if (stricmp(queryName, "control:numFilesToProcess")==0)
  2286. {
  2287. int numFiles = queryFileCache().numFilesToCopy();
  2288. reply.appendf("<FilesToProcess value='%d'/>", numFiles);
  2289. }
  2290. else
  2291. unknown = true;
  2292. break;
  2293. case 'P':
  2294. if (stricmp(queryName, "control:parallelAggregate")==0)
  2295. {
  2296. parallelAggregate = control->getPropInt("@val", 0);
  2297. if (!parallelAggregate)
  2298. parallelAggregate = hdwInfo.numCPUs;
  2299. if (!parallelAggregate)
  2300. parallelAggregate = 1;
  2301. topology->setPropInt("@parallelAggregate", parallelAggregate);
  2302. }
  2303. else if (stricmp(queryName, "control:pingInterval")==0)
  2304. {
  2305. unsigned newInterval = (unsigned) control->getPropInt64("@val", 0);
  2306. if (newInterval && !pingInterval)
  2307. {
  2308. pingInterval = newInterval; // best to set before the start...
  2309. startPingTimer();
  2310. }
  2311. else if (pingInterval && !newInterval)
  2312. stopPingTimer(); // but after the stop
  2313. pingInterval = newInterval;
  2314. topology->setPropInt64("@pingInterval", pingInterval);
  2315. }
  2316. else if (stricmp(queryName, "control:preabortIndexReadsThreshold")==0)
  2317. {
  2318. preabortIndexReadsThreshold = control->getPropInt("@val", 100);
  2319. topology->setPropInt("@preabortIndexReadsThreshold", preabortIndexReadsThreshold);
  2320. }
  2321. else if (stricmp(queryName, "control:preabortKeyedJoinsThreshold")==0)
  2322. {
  2323. preabortKeyedJoinsThreshold = control->getPropInt("@val", 100);
  2324. topology->setPropInt("@preabortKeyedJoinsThreshold", preabortKeyedJoinsThreshold);
  2325. }
  2326. else
  2327. unknown = true;
  2328. break;
  2329. case 'Q':
  2330. if (stricmp(queryName, "control:queries")==0)
  2331. {
  2332. getQueryInfo(control, reply, false, logctx);
  2333. }
  2334. else if (stricmp(queryName, "control:queryAggregates")==0)
  2335. {
  2336. time_t from;
  2337. const char *fromTime = control->queryProp("@from");
  2338. if (fromTime)
  2339. {
  2340. CDateTime f;
  2341. f.setString(fromTime, NULL, true);
  2342. from = f.getSimple();
  2343. }
  2344. else
  2345. from = startupTime;
  2346. time_t to;
  2347. const char *toTime = control->queryProp("@to");
  2348. if (toTime)
  2349. {
  2350. CDateTime t;
  2351. t.setString(toTime, NULL, true);
  2352. to = t.getSimple();
  2353. }
  2354. else
  2355. time(&to);
  2356. const char *id = control->queryProp("Query/@id");
  2357. bool rawStats = control->getPropBool("@rawStats", false);
  2358. if (id)
  2359. {
  2360. if (!rawStats)
  2361. {
  2362. Owned<IQueryFactory> f = getQuery(id, NULL, NULL, logctx);
  2363. if (f)
  2364. {
  2365. Owned<const IPropertyTree> stats = f->getQueryStats(from, to);
  2366. toXML(stats, reply);
  2367. }
  2368. else
  2369. throw MakeStringException(ROXIE_CONTROL_MSG_ERROR, "Unknown query %s", id);
  2370. }
  2371. else
  2372. {
  2373. Owned<const IPropertyTree> stats = getQueryRawStats(id, from, to);
  2374. toXML(stats, reply);
  2375. }
  2376. }
  2377. else
  2378. {
  2379. bool includeAllQueries = control->getPropBool("@all", true);
  2380. Owned<const IPropertyTree> stats = getAllQueryStats(includeAllQueries, rawStats, from, to);
  2381. toXML(stats, reply);
  2382. }
  2383. }
  2384. else if (stricmp(queryName, "control:queryPackageInfo")==0)
  2385. {
  2386. ReadLockBlock readBlock(packageCrit);
  2387. allQueryPackages->getInfo(reply, logctx);
  2388. }
  2389. else if (stricmp(queryName, "control:queryStats")==0)
  2390. {
  2391. const char *id = control->queryProp("Query/@id");
  2392. if (!id)
  2393. badFormat();
  2394. const char *action = control->queryProp("Query/@action");
  2395. const char *wuid = control->queryProp("Query/@wuid");
  2396. const char *graphName = 0;
  2397. if (action)
  2398. {
  2399. if (stricmp(action, "listGraphNames") == 0)
  2400. {
  2401. Owned<IQueryFactory> query = getQuery(id, NULL, NULL, logctx);
  2402. if (query)
  2403. {
  2404. reply.appendf("<Query id='%s'>\n", id);
  2405. StringArray graphNames;
  2406. query->getGraphNames(graphNames);
  2407. ForEachItemIn(idx, graphNames)
  2408. {
  2409. const char *graphName = graphNames.item(idx);
  2410. reply.appendf("<Graph id='%s'/>", graphName);
  2411. }
  2412. reply.appendf("</Query>\n");
  2413. }
  2414. return; // done
  2415. }
  2416. else if (stricmp(action, "selectGraph") == 0)
  2417. graphName = control->queryProp("Query/@name");
  2418. else if (stricmp(action, "allGraphs") != 0) // if we get here and its NOT allgraphs - then error
  2419. throw MakeStringException(ROXIE_CONTROL_MSG_ERROR, "invalid action in control:queryStats %s", action);
  2420. }
  2421. ReadLockBlock readBlock(packageCrit);
  2422. allQueryPackages->getStats(reply, id, graphName, wuid, logctx);
  2423. }
  2424. else if (stricmp(queryName, "control:queryWuid")==0)
  2425. {
  2426. UNIMPLEMENTED;
  2427. }
  2428. else
  2429. unknown = true;
  2430. break;
  2431. case 'R':
  2432. if (stricmp(queryName, "control:reload")==0)
  2433. {
  2434. requestReload(true, control->getPropBool("@forceRetry", false));
  2435. if (daliHelper && daliHelper->connected())
  2436. reply.appendf("<Dali connected='1'/>");
  2437. else
  2438. reply.appendf("<Dali connected='0'/>");
  2439. unsigned __int64 thash = getTopologyHash();
  2440. unsigned __int64 shash;
  2441. {
  2442. ReadLockBlock readBlock(packageCrit);
  2443. shash = allQueryPackages->queryHash();
  2444. }
  2445. reply.appendf("<State hash='%" I64F "u' topologyHash='%" I64F "u'/>", shash, thash);
  2446. }
  2447. else if (stricmp(queryName, "control:resetcache")==0)
  2448. {
  2449. releaseAgentDynamicFileCache();
  2450. }
  2451. else if (stricmp(queryName, "control:resetindexmetrics")==0)
  2452. {
  2453. resetIndexMetrics();
  2454. }
  2455. else if (stricmp(queryName, "control:resetmetrics")==0)
  2456. {
  2457. roxieMetrics->resetMetrics();
  2458. }
  2459. else if (stricmp(queryName, "control:resetquerystats")==0)
  2460. {
  2461. ReadLockBlock readBlock(packageCrit);
  2462. Owned<IPropertyTreeIterator> queries = control->getElements("Query");
  2463. if (queries->first())
  2464. {
  2465. while (queries->isValid())
  2466. {
  2467. IPropertyTree &query = queries->query();
  2468. const char *id = query.queryProp("@id");
  2469. const char *target = query.queryProp("@target");
  2470. if (!id)
  2471. badFormat();
  2472. allQueryPackages->resetStats(target, id, logctx);
  2473. queries->next();
  2474. }
  2475. }
  2476. else
  2477. allQueryPackages->resetStats(NULL, NULL, logctx);
  2478. }
  2479. else if (stricmp(queryName, "control:resetremotedalicache")==0)
  2480. {
  2481. queryNamedGroupStore().resetCache();
  2482. }
  2483. else if (stricmp(queryName, "control:restart")==0)
  2484. {
  2485. FatalError("Roxie process restarted by operator request");
  2486. }
  2487. else if (stricmp(queryName, "control:retrieveActivityDetails")==0)
  2488. {
  2489. UNIMPLEMENTED;
  2490. }
  2491. else if (stricmp(queryName, "control:retrieveFileInfo")==0)
  2492. {
  2493. UNIMPLEMENTED;
  2494. }
  2495. else if (stricmp(queryName, "control:roxiememstats") == 0)
  2496. {
  2497. StringBuffer memStats;
  2498. queryMemoryPoolStats(memStats);
  2499. reply.append("<MemoryStats>").append(memStats.str()).append("</MemoryStats>\n");
  2500. }
  2501. else
  2502. unknown = true;
  2503. break;
  2504. case 'S':
  2505. if (stricmp(queryName, "control:setAffinity")==0)
  2506. {
  2507. __uint64 affinity = control->getPropInt64("@val", 0); // by default just refresh cached settings
  2508. topology->setPropInt64("@affinity", affinity);
  2509. updateAffinity(affinity);
  2510. }
  2511. else if (stricmp(queryName, "control:setCacheInfo")==0)
  2512. {
  2513. Owned<IPTreeIterator> infos = control->getElements(".//CacheInfo");
  2514. ForEach(*infos)
  2515. {
  2516. IPropertyTree &info = infos->query();
  2517. queryFileCache().warmOsCache(info.queryProp(""));
  2518. }
  2519. }
  2520. else if (stricmp(queryName, "control:setCopyResources")==0)
  2521. {
  2522. copyResources = control->getPropBool("@val", true);
  2523. topology->setPropBool("@copyResources", copyResources);
  2524. }
  2525. else if (stricmp(queryName, "control:simpleLocalKeyedJoins")==0)
  2526. {
  2527. simpleLocalKeyedJoins = control->getPropBool("@val", true);
  2528. }
  2529. else if (stricmp(queryName, "control:soapInfo")==0)
  2530. {
  2531. UNIMPLEMENTED;
  2532. }
  2533. else if (stricmp(queryName, "control:soapTrace")==0)
  2534. {
  2535. soapTraceLevel = control->getPropInt("@level", 0);
  2536. topology->setPropInt("@soapTraceLevel", soapTraceLevel);
  2537. }
  2538. else if (stricmp(queryName, "control:socketCheckInterval")==0)
  2539. {
  2540. socketCheckInterval = (unsigned) control->getPropInt64("@val", 0);
  2541. topology->setPropInt64("@socketCheckInterval", socketCheckInterval);
  2542. }
  2543. else if (stricmp(queryName, "control:state")==0)
  2544. {
  2545. if (daliHelper && daliHelper->connected())
  2546. reply.appendf("<Dali connected='1'/>");
  2547. else
  2548. reply.appendf("<Dali connected='0'/>");
  2549. unsigned __int64 thash = getTopologyHash();
  2550. unsigned __int64 shash;
  2551. {
  2552. ReadLockBlock readBlock(packageCrit);
  2553. shash = allQueryPackages->queryHash();
  2554. }
  2555. reply.appendf("<State hash='%" I64F "u' topologyHash='%" I64F "u'/>", shash, thash);
  2556. }
  2557. else if (stricmp(queryName, "control:steppingEnabled")==0)
  2558. {
  2559. steppingEnabled = control->getPropBool("@val", true);
  2560. }
  2561. else if (stricmp(queryName, "control:systemMonitor")==0)
  2562. {
  2563. #ifndef _CONTAINERIZED
  2564. unsigned interval = control->getPropInt("@interval", 60000);
  2565. bool enable = control->getPropBool("@enable", true);
  2566. if (enable)
  2567. startPerformanceMonitor(interval, PerfMonStandard, perfMonHook);
  2568. else
  2569. stopPerformanceMonitor();
  2570. #else
  2571. UNIMPLEMENTED; //better than ignoring 'control:systemMonitor' in containerized mode
  2572. #endif
  2573. }
  2574. //MORE: control:stats??
  2575. else
  2576. unknown = true;
  2577. break;
  2578. case 'T':
  2579. if (stricmp(queryName, "control:testAgentFailure")==0)
  2580. {
  2581. testAgentFailure = control->getPropInt("@val", 20);
  2582. }
  2583. else if (stricmp(queryName, "control:timeActivities")==0)
  2584. {
  2585. defaultTimeActivities = control->getPropBool("@val", true);
  2586. topology->setPropInt("@timeActivities", defaultTimeActivities);
  2587. }
  2588. else if (stricmp(queryName, "control:timings")==0)
  2589. {
  2590. reply.append("<Timings>");
  2591. queryActiveTimer()->getTimings(reply);
  2592. reply.append("</Timings>");
  2593. if (control->getPropBool("@reset", false))
  2594. {
  2595. queryActiveTimer()->reset();
  2596. }
  2597. }
  2598. else if (stricmp(queryName, "control:topology")==0)
  2599. {
  2600. toXML(topology, reply);
  2601. }
  2602. else if (stricmp(queryName, "control:toposerver")==0)
  2603. {
  2604. if (control->hasProp("@freeze"))
  2605. {
  2606. freezeTopology(control->getPropBool("@freeze"));
  2607. }
  2608. else
  2609. {
  2610. reply.append("<Toposerver>");
  2611. getTopology()->report(reply);
  2612. reply.append("</Toposerver>");
  2613. }
  2614. }
  2615. else if (stricmp(queryName, "control:trace")==0)
  2616. {
  2617. traceLevel = control->getPropInt("@level", 0);
  2618. if (traceLevel > MAXTRACELEVEL)
  2619. traceLevel = MAXTRACELEVEL;
  2620. topology->setPropInt("@traceLevel", traceLevel);
  2621. }
  2622. else if (stricmp(queryName, "control:traceSmartStepping")==0)
  2623. {
  2624. traceSmartStepping = control->getPropBool("@val", true);
  2625. topology->setPropInt("@traceSmartStepping", traceSmartStepping);
  2626. }
  2627. else if (stricmp(queryName, "control:traceStartStop")==0)
  2628. {
  2629. traceStartStop = control->getPropBool("@val", true);
  2630. topology->setPropInt("@traceStartStop", traceStartStop);
  2631. }
  2632. else
  2633. unknown = true;
  2634. break;
  2635. case 'U':
  2636. if (stricmp(queryName, "control:udptrace")==0)
  2637. {
  2638. udpTraceLevel = control->getPropInt("@level", 0);
  2639. topology->setPropInt("@udpTraceLevel", udpTraceLevel);
  2640. }
  2641. else if (stricmp(queryName, "control:unlockDali")==0)
  2642. {
  2643. topology->setPropBool("@lockDali", false);
  2644. // Dali will reattach via the timer that checks every so often if can reattach...
  2645. saveTopology();
  2646. }
  2647. else if (stricmp(queryName, "control:unsuspend")==0)
  2648. {
  2649. UNIMPLEMENTED;
  2650. }
  2651. else if (stricmp(queryName, "control:userMetric")==0)
  2652. {
  2653. const char *name = control->queryProp("@name");
  2654. const char *regex= control->queryProp("@regex");
  2655. if (name && regex)
  2656. {
  2657. roxieMetrics->addUserMetric(name, regex);
  2658. // MORE - we could add to topology, we could check for dups, and we could support removing them.
  2659. }
  2660. else
  2661. throw MakeStringException(ROXIE_MISSING_PARAMS, "Metric name or regex missing");
  2662. }
  2663. else
  2664. unknown = true;
  2665. break;
  2666. case 'W':
  2667. if (stricmp(queryName, "control:watchActivityId")==0)
  2668. {
  2669. watchActivityId = control->getPropInt("@id", true);
  2670. }
  2671. else
  2672. unknown = true;
  2673. break;
  2674. default:
  2675. unknown = true;
  2676. break;
  2677. }
  2678. if (unknown)
  2679. throw MakeStringException(ROXIE_UNKNOWN_QUERY, "Unknown query %s", queryName);
  2680. }
  2681. void badFormat()
  2682. {
  2683. throw MakeStringException(ROXIE_INVALID_INPUT, "Badly formated control query");
  2684. }
  2685. hash64_t getTopologyHash()
  2686. {
  2687. StringBuffer xml;
  2688. toXML(topology, xml, 0, XML_SortTags);
  2689. return rtlHash64Data(xml.length(), xml.str(), 707018);
  2690. }
  2691. };
  2692. extern IRoxieQueryPackageManagerSet *createRoxiePackageSetManager(const IQueryDll *standAloneDll)
  2693. {
  2694. return new CRoxiePackageSetManager(standAloneDll);
  2695. }
  2696. IRoxieQueryPackageManagerSet *globalPackageSetManager = NULL;
  2697. extern void loadPlugins()
  2698. {
  2699. DBGLOG("Preloading plugins from %s", pluginDirectory.str());
  2700. if (pluginDirectory.length())
  2701. {
  2702. plugins = new SafePluginMap(&PluginCtx, traceLevel >= 1);
  2703. if (topology->hasProp("preload"))
  2704. {
  2705. Owned<IPropertyTreeIterator> preloads = topology->getElements("preload");
  2706. ForEach(*preloads)
  2707. {
  2708. const char *preload = preloads->query().queryProp(".");
  2709. if (!streq(preload, "none"))
  2710. {
  2711. VStringBuffer soname(SharedObjectPrefix "%s" SharedObjectExtension, preload);
  2712. if (!plugins->loadNamed(pluginDirectory, soname))
  2713. DBGLOG("Could not preload plugin %s at any of the following locations: %s", soname.str(), pluginDirectory.str());
  2714. }
  2715. }
  2716. }
  2717. else
  2718. plugins->loadFromList(pluginDirectory);
  2719. }
  2720. }
  2721. extern void cleanupPlugins()
  2722. {
  2723. delete plugins;
  2724. plugins = NULL;
  2725. }
  2726. /*=======================================================================================================
  2727. * mergeStats and associated code is used to combine the graph stats from multiple nodes in a cluster into
  2728. * a single aggregate structure
  2729. * It should be moved into ccdquery.cpp really
  2730. *========================================================================================================*/
  2731. typedef void (*mergefunc)(IPropertyTree *s1, IPropertyTree *s2, unsigned level);
  2732. struct MergeInfo
  2733. {
  2734. const char *element;
  2735. const char *attribute;
  2736. mergefunc f;
  2737. };
  2738. void mergeSubGraphs(IPropertyTree *s1, IPropertyTree *s2, unsigned);
  2739. void mergeNodes(IPropertyTree *s1, IPropertyTree *s2)
  2740. {
  2741. Owned<IPropertyTreeIterator> elems = s1->getElements("att");
  2742. ForEach(*elems)
  2743. {
  2744. IPropertyTree &e1 = elems->query();
  2745. unsigned __int64 v1 = e1.getPropInt64("@value", 0);
  2746. const char *name = e1.queryProp("@name");
  2747. if (stricmp(name, "_kind")==0 && v1 == TAKsubgraph)
  2748. {
  2749. IPropertyTree *s1child = s1->queryPropTree("att/graph");
  2750. IPropertyTree *s2child = s2->queryPropTree("att[@name='_kind']/graph");
  2751. if (s1child && s2child)
  2752. {
  2753. mergeSubGraphs(s1child, s2child, 0);
  2754. s2->removeProp("att[@name='_kind']");
  2755. }
  2756. }
  2757. else
  2758. {
  2759. StringBuffer xpath;
  2760. xpath.appendf("att[@name='%s']", name);
  2761. if (startsWith(name, "SizeMax"))
  2762. {
  2763. IPropertyTree *e2 = s2->queryPropTree(xpath.str());
  2764. if (e2)
  2765. {
  2766. unsigned __int64 v2 = e2->getPropInt64("@value", 0);
  2767. if (v2 > v1)
  2768. e1.setPropInt64("@value", v2);
  2769. s2->removeTree(e2);
  2770. }
  2771. }
  2772. else if (startsWith(name, "Size") || startsWith(name, "Time") || startsWith(name, "Num"))
  2773. {
  2774. IPropertyTree *e2 = s2->queryPropTree(xpath.str());
  2775. if (e2)
  2776. {
  2777. unsigned __int64 v2 = e2->getPropInt64("@value", 0);
  2778. e1.setPropInt64("@value", v1+v2);
  2779. s2->removeTree(e2);
  2780. }
  2781. }
  2782. else
  2783. {
  2784. // remove from s2 any complete dups
  2785. const char *s1val = e1.queryProp("@value");
  2786. Owned<IPropertyTreeIterator> s2elems = s2->getElements(xpath.str());
  2787. IArrayOf<IPropertyTree> goers;
  2788. ForEach(*s2elems)
  2789. {
  2790. IPropertyTree &e2 = s2elems->query();
  2791. const char *s2val = e2.queryProp("@value");
  2792. if ((!s1val && !s2val) || (s1val && s2val && strcmp(s1val, s2val)==0))
  2793. goers.append(*LINK(&e2));
  2794. }
  2795. ForEachItemIn(idx, goers)
  2796. {
  2797. s2->removeTree(&goers.item(idx));
  2798. }
  2799. }
  2800. }
  2801. }
  2802. elems.setown(s2->getElements("*"));
  2803. ForEach(*elems)
  2804. {
  2805. IPropertyTree &e2 = elems->query();
  2806. s1->addPropTree(e2.queryName(), LINK(&e2));
  2807. }
  2808. }
  2809. void mergeSubGraphs(IPropertyTree *s1, IPropertyTree *s2, unsigned)
  2810. {
  2811. Owned<IPropertyTreeIterator> elems = s1->getElements("*");
  2812. ForEach(*elems)
  2813. {
  2814. IPropertyTree &e1 = elems->query();
  2815. const char *elemName = e1.queryName();
  2816. StringBuffer xpath;
  2817. if (strcmp(elemName, "att")==0)
  2818. {
  2819. xpath.appendf("att[@name='%s']", e1.queryProp("@name"));
  2820. IPropertyTree *e2 = s2->queryPropTree(xpath.str());
  2821. if (e2)
  2822. s2->removeTree(e2);
  2823. }
  2824. else
  2825. {
  2826. xpath.appendf("%s[@id='%s']", elemName, e1.queryProp("@id"));
  2827. IPropertyTree *e2 = s2->queryPropTree(xpath.str());
  2828. if (e2)
  2829. {
  2830. mergeNodes(&e1, e2);
  2831. s2->removeTree(e2);
  2832. }
  2833. }
  2834. }
  2835. elems.setown(s2->getElements("*"));
  2836. ForEach(*elems)
  2837. {
  2838. IPropertyTree &e2 = elems->query();
  2839. s1->addPropTree(e2.queryName(), LINK(&e2));
  2840. }
  2841. }
  2842. void mergeNode(IPropertyTree *s1, IPropertyTree *s2, unsigned level);
  2843. MergeInfo mergeTable[] =
  2844. {
  2845. {"Query", "@id", mergeStats},
  2846. {"Graph", "@id", mergeStats},
  2847. {"xgmml", NULL, mergeStats},
  2848. {"graph", NULL, mergeStats},
  2849. {"node", "@id", mergeNode},
  2850. {"att", NULL, mergeStats},
  2851. {"graph", NULL, mergeSubGraphs},
  2852. };
  2853. void mergeNode(IPropertyTree *s1, IPropertyTree *s2, unsigned level)
  2854. {
  2855. if (s1->hasProp("att/@name"))
  2856. mergeNodes(s1, s2);
  2857. else
  2858. mergeStats(s1, s2, level);
  2859. }
  2860. void mergeStats(IPropertyTree *s1, IPropertyTree *s2, unsigned level)
  2861. {
  2862. MergeInfo & mi = mergeTable[level];
  2863. Owned<IPropertyTreeIterator> elems = s2->getElements(mi.element);
  2864. ForEach(*elems)
  2865. {
  2866. IPropertyTree &e2 = elems->query();
  2867. StringBuffer xpath;
  2868. if (mi.attribute)
  2869. xpath.appendf("%s[%s='%s']", mi.element, mi.attribute, e2.queryProp(mi.attribute));
  2870. else
  2871. xpath.append(mi.element);
  2872. IPropertyTree *e1 = s1->queryPropTree(xpath.str());
  2873. if (e1)
  2874. {
  2875. mi.f(e1, &e2, level+1);
  2876. }
  2877. else
  2878. s1->addPropTree(mi.element, LINK(&e2));
  2879. }
  2880. }
  2881. void mergeStats(IPropertyTree *s1, IPropertyTree *s2)
  2882. {
  2883. Owned<IPropertyTreeIterator> elems = s2->getElements("Exception");
  2884. ForEach(*elems)
  2885. {
  2886. s1->addPropTree("Exception", LINK(&elems->query()));
  2887. }
  2888. mergeStats(s1, s2, 0);
  2889. }
  2890. void mergeQueries(IPropertyTree *dest, IPropertyTree *src)
  2891. {
  2892. Owned<IPropertyTreeIterator> elems = src->getElements("Exception");
  2893. ForEach(*elems)
  2894. {
  2895. dest->addPropTree("Exception", LINK(&elems->query()));
  2896. }
  2897. IPropertyTree *destQueries = ensurePTree(dest, "Queries");
  2898. IPropertyTree *srcQueries = src->queryPropTree("Queries");
  2899. if (!srcQueries)
  2900. return;
  2901. destQueries->setPropInt("@reporting", destQueries->getPropInt("@reporting") + srcQueries->getPropInt("@reporting"));
  2902. Owned<IPropertyTreeIterator> queries = srcQueries->getElements("Query");
  2903. ForEach(*queries)
  2904. {
  2905. IPropertyTree *srcQuery = &queries->query();
  2906. const char *id = srcQuery->queryProp("@id");
  2907. if (!id || !*id)
  2908. continue;
  2909. VStringBuffer xpath("Query[@id='%s']", id);
  2910. IPropertyTree *destQuery = destQueries->queryPropTree(xpath);
  2911. if (!destQuery)
  2912. {
  2913. destQueries->addPropTree("Query", LINK(srcQuery));
  2914. continue;
  2915. }
  2916. int suspended = destQuery->getPropInt("@suspended") + srcQuery->getPropInt("@suspended"); //keep count to recognize "partially suspended" queries
  2917. mergePTree(destQuery, srcQuery);
  2918. if (suspended)
  2919. destQuery->setPropInt("@suspended", suspended);
  2920. }
  2921. Owned<IPropertyTreeIterator> aliases = srcQueries->getElements("Alias");
  2922. ForEach(*aliases)
  2923. {
  2924. IPropertyTree *srcQuery = &aliases->query();
  2925. const char *id = srcQuery->queryProp("@id");
  2926. if (!id || !*id)
  2927. continue;
  2928. VStringBuffer xpath("Alias[@id='%s']", id);
  2929. IPropertyTree *destQuery = destQueries->queryPropTree(xpath);
  2930. if (!destQuery)
  2931. {
  2932. destQueries->addPropTree("Alias", LINK(srcQuery));
  2933. continue;
  2934. }
  2935. mergePTree(destQuery, srcQuery);
  2936. }
  2937. }
  2938. #ifdef _USE_CPPUNIT
  2939. #include <cppunit/extensions/HelperMacros.h>
  2940. static const char *g1 =
  2941. "<Stats>"
  2942. "<Query id='stats'>"
  2943. "<Graph id='graph1'>"
  2944. "<xgmml>"
  2945. "<graph>"
  2946. "<node id='1'>"
  2947. "<att>"
  2948. "<graph>"
  2949. "<node id='2' label='Temp Table'>"
  2950. "<att name='name' value='d'/>"
  2951. "<att name='_kind' value='25'/>"
  2952. "<att name='helper' value='f2'/>"
  2953. "</node>"
  2954. "<node id='2a'>"
  2955. " <att name='_kind' value='1'>" // TAKsubgraph
  2956. " <graph>"
  2957. " <node id='7696' label='Nested'>"
  2958. " <att name='NumSeeks' value='15'/>"
  2959. " </node>"
  2960. " </graph>"
  2961. " </att>"
  2962. "</node>"
  2963. "<node id='3' label='Filter'>"
  2964. "<att name='name' value='ds'/>"
  2965. "<att name='_kind' value='5'/>"
  2966. "<att name='helper' value='f3'/>"
  2967. "</node>"
  2968. "<att name='rootGraph' value='1'/>"
  2969. "<edge id='2_0' source='2' target='3'>"
  2970. "<att name='NumRows' value='15'/>"
  2971. "<att name='started' value='1'/>"
  2972. "<att name='stopped' value='1'/>"
  2973. "</edge>"
  2974. "<edge id='3_0' source='3' target='5'>"
  2975. "<att name='NumRows' value='15'/>"
  2976. "<att name='started' value='1'/>"
  2977. "<att name='stopped' value='1'/>"
  2978. "</edge>"
  2979. "<edge id='5_0' source='5' target='6'>"
  2980. "<att name='NumRows' value='3'/>"
  2981. "<att name='started' value='1'/>"
  2982. "<att name='stopped' value='1'/>"
  2983. "</edge>"
  2984. "<edge id='5_1' source='5' target='7'>"
  2985. "<att name='_sourceIndex' value='1'/>"
  2986. "<att name='NumRows' value='15'/>"
  2987. "<att name='started' value='1'/>"
  2988. "<att name='stopped' value='1'/>"
  2989. "</edge>"
  2990. "</graph>"
  2991. "</att>"
  2992. "</node>"
  2993. "</graph>"
  2994. "</xgmml>"
  2995. "</Graph>"
  2996. "</Query>"
  2997. "</Stats>";
  2998. static const char *g2 =
  2999. "<Stats>"
  3000. "<Query id='stats'>"
  3001. "<Graph id='graph1'>"
  3002. "<xgmml>"
  3003. "<graph>"
  3004. "<node id='1'>"
  3005. "<att>"
  3006. "<graph>"
  3007. "<node id='2' label='Temp Table'>"
  3008. "<att name='name' value='d'/>"
  3009. "<att name='_kind' value='25'/>"
  3010. "<att name='helper' value='f2'/>"
  3011. "</node>"
  3012. "<node id='2a'>"
  3013. " <att name='_kind' value='1'>" // TAKsubgraph
  3014. " <graph>"
  3015. " <node id='7696' label='Nested'>"
  3016. " <att name='NumSeeks' value='25'/>"
  3017. " </node>"
  3018. " </graph>"
  3019. " </att>"
  3020. "</node>"
  3021. "<node id='4' label='Filter2'>"
  3022. "<att name='name' value='ds2'/>"
  3023. "<att name='_kind' value='53'/>"
  3024. "<att name='helper' value='f23'/>"
  3025. "</node>"
  3026. "<att name='rootGraph' value='1'/>"
  3027. "<edge id='2_0' source='2' target='3'>"
  3028. "<att name='NumRows' value='15'/>"
  3029. "<att name='started' value='1'/>"
  3030. "<att name='stopped' value='1'/>"
  3031. "</edge>"
  3032. "<edge id='3_0' source='3' target='5'>"
  3033. "<att name='NumRows' value='15'/>"
  3034. "<att name='started' value='1'/>"
  3035. "<att name='stopped' value='1'/>"
  3036. "</edge>"
  3037. "<edge id='5_0' source='5' target='6'>"
  3038. "<att name='NumRows' value='3'/>"
  3039. "<att name='started' value='1'/>"
  3040. "<att name='stopped' value='1'/>"
  3041. "</edge>"
  3042. "</graph>"
  3043. "</att>"
  3044. "</node>"
  3045. "</graph>"
  3046. "</xgmml>"
  3047. "</Graph>"
  3048. "</Query>"
  3049. "</Stats>";
  3050. static const char *expected =
  3051. "<Stats>"
  3052. "<Query id='stats'>"
  3053. "<Graph id='graph1'>"
  3054. "<xgmml>"
  3055. "<graph>"
  3056. "<node id='1'>"
  3057. "<att>"
  3058. "<graph>"
  3059. "<node id='2' label='Temp Table'>"
  3060. "<att name='name' value='d'/>"
  3061. "<att name='_kind' value='25'/>"
  3062. "<att name='helper' value='f2'/>"
  3063. "</node>"
  3064. "<node id='2a'>"
  3065. " <att name='_kind' value='1'>" // TAKsubgraph
  3066. " <graph>"
  3067. " <node id='7696' label='Nested'>"
  3068. " <att name='NumSeeks' value='40'/>"
  3069. " </node>"
  3070. " </graph>"
  3071. " </att>"
  3072. "</node>"
  3073. "<node id='3' label='Filter'>"
  3074. "<att name='name' value='ds'/>"
  3075. "<att name='_kind' value='5'/>"
  3076. "<att name='helper' value='f3'/>"
  3077. "</node>"
  3078. "<node id='4' label='Filter2'>"
  3079. "<att name='name' value='ds2'/>"
  3080. "<att name='_kind' value='53'/>"
  3081. "<att name='helper' value='f23'/>"
  3082. "</node>"
  3083. "<att name='rootGraph' value='1'/>"
  3084. "<edge id='2_0' source='2' target='3'>"
  3085. "<att name='NumRows' value='30'/>"
  3086. "<att name='started' value='1'/>"
  3087. "<att name='stopped' value='1'/>"
  3088. "</edge>"
  3089. "<edge id='3_0' source='3' target='5'>"
  3090. "<att name='NumRows' value='30'/>"
  3091. "<att name='started' value='1'/>"
  3092. "<att name='stopped' value='1'/>"
  3093. "</edge>"
  3094. "<edge id='5_0' source='5' target='6'>"
  3095. "<att name='NumRows' value='6'/>"
  3096. "<att name='started' value='1'/>"
  3097. "<att name='stopped' value='1'/>"
  3098. "</edge>"
  3099. "<edge id='5_1' source='5' target='7'>"
  3100. "<att name='_sourceIndex' value='1'/>"
  3101. "<att name='NumRows' value='15'/>"
  3102. "<att name='started' value='1'/>"
  3103. "<att name='stopped' value='1'/>"
  3104. "</edge>"
  3105. "</graph>"
  3106. "</att>"
  3107. "</node>"
  3108. "</graph>"
  3109. "</xgmml>"
  3110. "</Graph>"
  3111. "</Query>"
  3112. "</Stats>"
  3113. ;
  3114. class MergeStatsTest : public CppUnit::TestFixture
  3115. {
  3116. CPPUNIT_TEST_SUITE( MergeStatsTest );
  3117. CPPUNIT_TEST(test1);
  3118. // CPPUNIT_TEST(test2); Handy for debugging problem cases...
  3119. CPPUNIT_TEST_SUITE_END();
  3120. protected:
  3121. void test1()
  3122. {
  3123. Owned<IPropertyTree> p1 = createPTreeFromXMLString(g1);
  3124. Owned<IPropertyTree> p2 = createPTreeFromXMLString(g2);
  3125. Owned<IPropertyTree> e = createPTreeFromXMLString(expected);
  3126. mergeStats(p1, p2);
  3127. StringBuffer s1, s2;
  3128. toXML(p1, s1);
  3129. toXML(e, s2);
  3130. CPPUNIT_ASSERT(strcmp(s1, s2)==0);
  3131. }
  3132. void test2()
  3133. {
  3134. Owned<IPropertyTree> mergedReply = createPTree("Merged");
  3135. Owned<IPropertyTree> p1 = createPTreeFromXMLFile("stats1.xml");
  3136. Owned<IPropertyTreeIterator> meat = p1->getElements("Endpoint");
  3137. ForEach(*meat)
  3138. {
  3139. if (mergedReply)
  3140. {
  3141. mergeStats(mergedReply, &meat->query());
  3142. }
  3143. }
  3144. Owned<IPropertyTree> p2 = createPTreeFromXMLFile("stats2.xml");
  3145. meat.setown(p2->getElements("Endpoint"));
  3146. ForEach(*meat)
  3147. {
  3148. if (mergedReply)
  3149. {
  3150. mergeStats(mergedReply, &meat->query());
  3151. }
  3152. }
  3153. StringBuffer s1;
  3154. toXML(mergedReply, s1);
  3155. //toXML(e, s2);
  3156. //CPPUNIT_ASSERT(strcmp(s1, s2)==0);
  3157. printf("%s", s1.str());
  3158. }
  3159. };
  3160. CPPUNIT_TEST_SUITE_REGISTRATION( MergeStatsTest );
  3161. CPPUNIT_TEST_SUITE_NAMED_REGISTRATION( MergeStatsTest, "MergeStatsTest" );
  3162. #endif