123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323132413251326132713281329133013311332133313341335133613371338133913401341134213431344134513461347134813491350135113521353135413551356135713581359136013611362136313641365136613671368136913701371137213731374137513761377137813791380138113821383138413851386138713881389139013911392139313941395139613971398139914001401140214031404140514061407140814091410141114121413141414151416141714181419142014211422142314241425142614271428142914301431143214331434143514361437143814391440144114421443144414451446144714481449145014511452145314541455145614571458145914601461146214631464146514661467146814691470147114721473147414751476147714781479148014811482148314841485148614871488148914901491149214931494149514961497149814991500150115021503150415051506150715081509151015111512151315141515151615171518151915201521152215231524152515261527152815291530153115321533153415351536153715381539154015411542154315441545154615471548154915501551155215531554155515561557155815591560156115621563156415651566156715681569157015711572157315741575157615771578157915801581158215831584158515861587158815891590159115921593159415951596159715981599160016011602160316041605160616071608160916101611161216131614161516161617161816191620162116221623162416251626162716281629163016311632163316341635163616371638163916401641164216431644164516461647164816491650165116521653165416551656165716581659166016611662166316641665166616671668166916701671167216731674167516761677167816791680168116821683168416851686168716881689169016911692169316941695169616971698169917001701170217031704170517061707170817091710171117121713171417151716171717181719172017211722172317241725172617271728172917301731173217331734173517361737173817391740174117421743174417451746174717481749175017511752175317541755175617571758175917601761176217631764176517661767176817691770177117721773177417751776177717781779178017811782178317841785178617871788178917901791179217931794179517961797179817991800180118021803180418051806180718081809181018111812181318141815181618171818181918201821182218231824182518261827182818291830183118321833183418351836183718381839184018411842184318441845184618471848184918501851185218531854185518561857185818591860186118621863186418651866186718681869187018711872187318741875187618771878187918801881188218831884188518861887188818891890189118921893189418951896189718981899190019011902190319041905190619071908190919101911191219131914191519161917191819191920192119221923192419251926192719281929193019311932193319341935193619371938193919401941194219431944194519461947194819491950195119521953195419551956195719581959196019611962196319641965196619671968196919701971197219731974197519761977197819791980198119821983198419851986198719881989199019911992199319941995199619971998199920002001200220032004200520062007200820092010201120122013201420152016201720182019202020212022202320242025202620272028202920302031203220332034203520362037203820392040204120422043204420452046204720482049205020512052205320542055205620572058205920602061206220632064206520662067206820692070207120722073207420752076207720782079208020812082208320842085208620872088208920902091209220932094209520962097209820992100210121022103210421052106210721082109211021112112211321142115211621172118211921202121212221232124212521262127212821292130213121322133213421352136213721382139214021412142214321442145214621472148214921502151215221532154215521562157215821592160216121622163216421652166216721682169217021712172217321742175217621772178217921802181218221832184218521862187218821892190219121922193219421952196219721982199220022012202220322042205220622072208220922102211221222132214221522162217221822192220222122222223222422252226222722282229223022312232223322342235223622372238223922402241224222432244224522462247224822492250225122522253225422552256225722582259226022612262226322642265226622672268226922702271227222732274227522762277227822792280228122822283228422852286228722882289229022912292229322942295229622972298229923002301230223032304230523062307230823092310231123122313231423152316231723182319232023212322232323242325232623272328232923302331233223332334233523362337233823392340234123422343234423452346234723482349235023512352235323542355235623572358235923602361236223632364236523662367236823692370237123722373237423752376237723782379238023812382238323842385238623872388238923902391239223932394239523962397239823992400240124022403240424052406240724082409241024112412241324142415241624172418241924202421242224232424242524262427242824292430243124322433243424352436243724382439244024412442244324442445244624472448244924502451245224532454245524562457245824592460246124622463246424652466246724682469247024712472247324742475247624772478247924802481248224832484248524862487248824892490249124922493249424952496249724982499250025012502250325042505250625072508250925102511251225132514251525162517251825192520252125222523252425252526252725282529253025312532253325342535253625372538253925402541254225432544254525462547254825492550255125522553255425552556255725582559256025612562256325642565256625672568256925702571257225732574257525762577257825792580258125822583258425852586258725882589259025912592259325942595259625972598259926002601260226032604260526062607260826092610261126122613261426152616261726182619262026212622262326242625262626272628262926302631263226332634263526362637263826392640264126422643264426452646264726482649265026512652265326542655265626572658265926602661266226632664266526662667266826692670267126722673267426752676267726782679268026812682268326842685268626872688268926902691269226932694269526962697269826992700270127022703270427052706270727082709271027112712271327142715271627172718271927202721272227232724272527262727272827292730273127322733273427352736273727382739274027412742274327442745274627472748274927502751275227532754275527562757275827592760276127622763276427652766276727682769277027712772277327742775277627772778277927802781278227832784278527862787278827892790279127922793279427952796279727982799280028012802280328042805280628072808280928102811281228132814281528162817281828192820282128222823282428252826282728282829283028312832283328342835283628372838283928402841284228432844284528462847284828492850285128522853285428552856285728582859286028612862286328642865286628672868286928702871287228732874287528762877287828792880288128822883288428852886288728882889289028912892289328942895289628972898289929002901290229032904290529062907290829092910291129122913291429152916291729182919292029212922292329242925292629272928292929302931293229332934293529362937293829392940294129422943294429452946294729482949295029512952295329542955295629572958295929602961296229632964296529662967296829692970297129722973297429752976297729782979298029812982298329842985298629872988298929902991299229932994299529962997299829993000300130023003300430053006300730083009301030113012301330143015301630173018301930203021302230233024302530263027302830293030303130323033303430353036303730383039304030413042304330443045304630473048304930503051305230533054305530563057305830593060306130623063306430653066306730683069307030713072307330743075307630773078307930803081308230833084 |
- /*##############################################################################
- HPCC SYSTEMS software Copyright (C) 2012 HPCC Systems.
- Licensed under the Apache License, Version 2.0 (the "License");
- you may not use this file except in compliance with the License.
- You may obtain a copy of the License at
- http://www.apache.org/licenses/LICENSE-2.0
- Unless required by applicable law or agreed to in writing, software
- distributed under the License is distributed on an "AS IS" BASIS,
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- See the License for the specific language governing permissions and
- limitations under the License.
- ############################################################################## */
- #include <platform.h>
- #include <jlib.hpp>
- #include "build-config.h"
- #include "jisem.hpp"
- #include "jsort.hpp"
- #include "jregexp.hpp"
- #include "ccd.hpp"
- #include "ccdquery.hpp"
- #include "ccdstate.hpp"
- #include "ccdqueue.ipp"
- #include "ccdlistener.hpp"
- #include "ccdfile.hpp"
- #include "ccdsnmp.hpp"
- #include "hqlplugins.hpp"
- #include "thorplugin.hpp"
- #include "eclrtl.hpp"
- #include "dafdesc.hpp"
- #include "dautils.hpp"
- #include "pkgimpl.hpp"
- #include "roxiehelper.hpp"
- //-------------------------------------------------------------------------------------------
- // class CRoxiePluginCtx - provide the environments for plugins loaded by roxie.
- // Base class handles making sure memory allocation comes from the right heap.
- // implement get/set properties to allow plugin configuration information to be retrieved from Roxie topology file
- //-------------------------------------------------------------------------------------------
- class CRoxiePluginCtx : public SimplePluginCtx
- {
- public:
- virtual int ctxGetPropInt(const char *propName, int defaultValue) const
- {
- return topology->getPropInt(propName, defaultValue);
- }
- virtual const char *ctxQueryProp(const char *propName) const
- {
- return topology->queryProp(propName);
- }
- } PluginCtx;
- SafePluginMap *plugins;
- //================================================================================================
- // In legacy state files, the original file names passed in _fileName or _indexFileName may have been translated into _superFileName or _superKeyName,
- // and then 0 or more (max 1 for subfiles, no limit for subkeys) _fileName or _indexFileName will have been added. This translation will not take place
- // if the files resolve to single file/key, or if we are using new embedded wu system
- // Basic mode of operation therefore is to get the original name, see if it can be resolved by package into a list of subfiles, and if not, use
- // iterator on the xgmml node to get the list.
- // These two helper functions will return the original filenames placed in the XGMML by the codegen, regardless of how/if roxieconfig resolved them
- static const char *_queryNodeFileName(const IPropertyTree &graphNode)
- {
- if (graphNode.hasProp("att[@name='_file_dynamic']"))
- return NULL;
- else
- return graphNode.queryProp("att[@name='_fileName']/@value");
- }
- static const char *_queryNodeIndexName(const IPropertyTree &graphNode)
- {
- if (graphNode.hasProp("att[@name='_indexFile_dynamic']"))
- return NULL;
- else
- return graphNode.queryProp("att[@name='_indexFileName']/@value");
- }
- static bool isSimpleIndexActivity(ThorActivityKind kind)
- {
- switch (kind)
- {
- case TAKindexaggregate:
- case TAKindexcount:
- case TAKindexexists:
- case TAKindexgroupaggregate:
- case TAKindexgroupcount:
- case TAKindexgroupexists:
- case TAKindexnormalize:
- case TAKindexread:
- return true;
- default:
- return false;
- }
- }
- const char *queryNodeFileName(const IPropertyTree &graphNode, ThorActivityKind kind)
- {
- if (isSimpleIndexActivity(kind))
- return NULL;
- else
- return _queryNodeFileName(graphNode);
- }
- const char *queryNodeIndexName(const IPropertyTree &graphNode, ThorActivityKind kind)
- {
- if (isSimpleIndexActivity(kind))
- return _queryNodeFileName(graphNode);
- else
- return _queryNodeIndexName(graphNode);
- }
- // DelayedReleaser mechanism hangs on to a link to an object for a while...
- class DelayedReleaseQueueItem : public CInterfaceOf<IInterface>
- {
- Owned<IInterface> goer;
- time_t goTime;
- public:
- DelayedReleaseQueueItem(IInterface *_goer, unsigned delaySeconds)
- : goer(_goer)
- {
- time(&goTime);
- goTime += delaySeconds;
- }
- unsigned remaining()
- {
- time_t now;
- time(&now);
- if (now > goTime)
- return 0;
- else
- return goTime - now;
- }
- };
- class DelayedReleaserThread : public Thread
- {
- private:
- bool closing;
- bool started;
- CriticalSection lock;
- IArrayOf<DelayedReleaseQueueItem> queue;
- Semaphore sem;
- public:
- DelayedReleaserThread() : Thread("DelayedReleaserThread")
- {
- closing = false;
- started = false;
- }
- ~DelayedReleaserThread()
- {
- stop();
- }
- virtual int run()
- {
- if (traceLevel)
- DBGLOG("DelayedReleaserThread %p starting", this);
- unsigned nextTimeout = INFINITE;
- while (!closing)
- {
- sem.wait(nextTimeout);
- CriticalBlock b(lock);
- nextTimeout = INFINITE;
- ForEachItemInRev(idx, queue)
- {
- DelayedReleaseQueueItem &goer = queue.item(idx);
- unsigned timeRemaining = goer.remaining();
- if (!timeRemaining)
- queue.remove(idx);
- else if (timeRemaining < nextTimeout)
- nextTimeout = timeRemaining;
- }
- if (nextTimeout != INFINITE)
- nextTimeout = nextTimeout * 1000;
- }
- if (traceLevel)
- DBGLOG("DelayedReleaserThread %p exiting", this);
- return 0;
- }
- void stop()
- {
- if (started)
- {
- closing = true;
- sem.signal();
- join();
- }
- }
- void delayedRelease(IInterface *goer, unsigned delaySeconds)
- {
- if (goer)
- {
- CriticalBlock b(lock);
- if (!started)
- {
- start();
- started = true;
- }
- queue.append(*new DelayedReleaseQueueItem(goer, delaySeconds));
- sem.signal();
- }
- }
- };
- Owned<DelayedReleaserThread> delayedReleaser;
- void createDelayedReleaser()
- {
- delayedReleaser.setown(new DelayedReleaserThread);
- }
- void stopDelayedReleaser()
- {
- if (delayedReleaser)
- delayedReleaser->stop();
- delayedReleaser.clear();
- }
- //-------------------------------------------------------------------------
- class CSimpleSuperFileArray : public CInterface, implements ISimpleSuperFileEnquiry
- {
- IArrayOf<IPropertyTree> subFiles;
- public:
- IMPLEMENT_IINTERFACE;
- CSimpleSuperFileArray(IPropertyTreeIterator &_subs)
- {
- ForEach(_subs)
- {
- IPropertyTree &sub = _subs.query();
- sub.Link();
- subFiles.append(sub);
- }
- }
- virtual unsigned numSubFiles() const
- {
- return subFiles.length();
- }
- virtual bool getSubFileName(unsigned num, StringBuffer &name) const
- {
- if (subFiles.isItem(num))
- {
- name.append(subFiles.item(num).queryProp("@value"));
- return true;
- }
- else
- return false;
- }
- virtual unsigned findSubName(const char *subname) const
- {
- ForEachItemIn(idx, subFiles)
- {
- if (stricmp(subFiles.item(idx).queryProp("@value"), subname))
- return idx;
- }
- return NotFound;
- }
- virtual unsigned getContents(StringArray &contents) const
- {
- ForEachItemIn(idx, subFiles)
- {
- contents.append(subFiles.item(idx).queryProp("@value"));
- }
- return subFiles.length();
- }
- };
- //-------------------------------------------------------------------------------------------
- // class CRoxiePackage - provide the environment in which file names and query options are interpreted
- // by a roxie query.
- // File names are resolved into IResolvedFile objects. A cache is used to ensure that the IResolvedFile is
- // shared wherever possible.
- // Effective environment is precomputed in mergedEnvironment for efficient recall by queries
- // Packages are described using XML files - see documentation for details.
- //-------------------------------------------------------------------------------------------
- /**
- * Packages are hierarchical - they are searched recursively to get the info you want
- * A PackageMap defines the entire environment - potentially each query that uses that PackageMap will pick a different package within it
- * A particular instantiation of a roxie query (i.e. a IQueryFactory) will have a pointer to the specific IRoxiePackage within the active PackageMap
- * that is providing its environment.
- *
- * A PackageMap can also indicate the name of the QuerySet it applies to. If not specified, at will apply to all QuerySets on the Roxie.
- *
- * A PackageSet is a list of PackageMap id's, and is used to tell Roxie what PackageMaps to load.
- * A Roxie can have multiple PackageMap's active. When updating the data, you might:
- * - create a new PackageMap to refer to the new data
- * - once it has loaded, mark it active, and mark the previous one as inactive
- * - Once sure no queries in flight, unload the previous one
- *
- * Each Roxie will load all PackageMaps that are in any PackageSet whose @process attribute matches the cluster name.
- *
- * All package information is stored in Dali (and cached locally)
- *
- * <PackageSets>
- * <PackageSet id = 'ps1' process='*'> # use this packageset for all roxies (same as omitting process)
- * <PackageMap id='pm1b' querySet='qs1' active='true'/> # Use the PackageMap pm1b for QuerySet qs1 and make it active
- * <PackageMap id='pm1a' querySet='qs1' active='false'/> # Use the PackageMap pm1a for QuerySet qs1 but don't make it active
- * <PackageMap id='pm2' querySet='dev*' active='true'/> # Use the PackageMap pm1a for all QuerySets with names starting dev and make it active
- * </PackageMapSet>
- * </PackageSets>
- *
- * <PackageMaps>
- * <PackageMap id='pm1a'>
- * <Package id='package1'>
- * ...
- * </Package>
- * <Package id='package2'>
- * </Package>
- * </PackageMap>
- * <PackageMap id='pm2'>
- * </PackageMap>
- * <PackageMap id='pm3'>
- * </PackageMap>
- * </PackageMaps>
- */
- class CResolvedFileCache : implements IResolvedFileCache
- {
- CriticalSection cacheLock;
- CopyMapStringToMyClass<IResolvedFile> files;
- public:
- // Retrieve number of files in cache
- inline unsigned count() const
- {
- return files.count();
- }
- // Add a filename and the corresponding IResolvedFile to the cache
- virtual void addCache(const char *filename, const IResolvedFile *file)
- {
- CriticalBlock b(cacheLock);
- IResolvedFile *add = const_cast<IResolvedFile *>(file);
- add->setCache(this);
- files.setValue(filename, add);
- }
- // Remove an IResolvedFile from the cache
- virtual void removeCache(const IResolvedFile *file)
- {
- CriticalBlock b(cacheLock);
- if (traceLevel > 9)
- DBGLOG("removeCache %s", file->queryFileName());
- // NOTE: it's theoretically possible for the final release to happen after a replacement has been inserted into hash table.
- // So only remove from hash table if what we find there matches the item that is being deleted.
- IResolvedFile *goer = files.getValue(file->queryFileName());
- if (goer == file)
- files.remove(file->queryFileName());
- // You might want to remove files from the daliServer cache too, but it's not safe to do so here as there may be multiple package caches
- }
- // Lookup a filename in the cache
- virtual IResolvedFile *lookupCache(const char *filename)
- {
- CriticalBlock b(cacheLock);
- IResolvedFile *cache = files.getValue(filename);
- if (cache)
- {
- LINK(cache);
- if (cache->isAlive())
- return cache;
- if (traceLevel)
- DBGLOG("Not returning %s from cache as isAlive() returned false", filename);
- }
- return NULL;
- }
- };
- // Note - we use a separate cache for the misses rather than any clever attempts to overload
- // the one cache with a "special" value, since (among other reasons) the misses are cleared
- // prior to a package reload, but the hits need not be (as the file will be locked as long as it
- // is in the cache)
- static CriticalSection daliMissesCrit;
- static Owned<KeptLowerCaseAtomTable> daliMisses;
- static void noteDaliMiss(const char *filename)
- {
- CriticalBlock b(daliMissesCrit);
- if (traceLevel > 9)
- DBGLOG("noteDaliMiss %s", filename);
- daliMisses->addAtom(filename);
- }
- static bool checkCachedDaliMiss(const char *filename)
- {
- CriticalBlock b(daliMissesCrit);
- bool ret = daliMisses->find(filename) != NULL;
- if (traceLevel > 9)
- DBGLOG("checkCachedDaliMiss %s returns %d", filename, ret);
- return ret;
- }
- static void clearDaliMisses()
- {
- CriticalBlock b(daliMissesCrit);
- if (traceLevel)
- DBGLOG("Clearing dali misses cache");
- daliMisses.setown(new KeptLowerCaseAtomTable);
- }
- class CRoxiePackageNode : extends CPackageNode, implements IRoxiePackage
- {
- protected:
- static CResolvedFileCache daliFiles;
- mutable CResolvedFileCache fileCache;
- virtual aindex_t getBaseCount() const = 0;
- virtual const CRoxiePackageNode *getBaseNode(aindex_t pos) const = 0;
- virtual bool getSysFieldTranslationEnabled() const {return fieldTranslationEnabled;} //roxie configured value
- // Use local package file only to resolve subfile into physical file info
- IResolvedFile *resolveLFNusingPackage(const char *fileName) const
- {
- if (node)
- {
- StringBuffer xpath;
- IPropertyTree *fileInfo = node->queryPropTree(xpath.appendf("File[@id='%s']", fileName).str());
- if (fileInfo)
- {
- Owned <IResolvedFileCreator> result = createResolvedFile(fileName, NULL, false);
- result->addSubFile(createFileDescriptorFromRoxieXML(fileInfo), NULL);
- return result.getClear();
- }
- }
- return NULL;
- }
- // Use dali to resolve subfile into physical file info
- static IResolvedFile *resolveLFNusingDaliOrLocal(const char *fileName, bool useCache, bool cacheResult, bool writeAccess, bool alwaysCreate)
- {
- // MORE - look at alwaysCreate... This may be useful to implement earlier locking semantics.
- if (traceLevel > 9)
- DBGLOG("resolveLFNusingDaliOrLocal %s %d %d %d %d", fileName, useCache, cacheResult, writeAccess, alwaysCreate);
- IResolvedFile* result = NULL;
- if (useCache)
- {
- result = daliFiles.lookupCache(fileName);
- if (result)
- {
- if (traceLevel > 9)
- DBGLOG("resolveLFNusingDaliOrLocal %s - cache hit", fileName);
- return result;
- }
- }
- if (alwaysCreate || !useCache || !checkCachedDaliMiss(fileName))
- {
- Owned<IRoxieDaliHelper> daliHelper = connectToDali();
- if (daliHelper)
- {
- if (daliHelper->connected())
- {
- Owned<IDistributedFile> dFile = daliHelper->resolveLFN(fileName, cacheResult, writeAccess);
- if (dFile)
- result = createResolvedFile(fileName, NULL, dFile.getClear(), daliHelper, !useCache, cacheResult, writeAccess);
- }
- else if (!writeAccess) // If we need write access and expect a dali, but don't have one, we should probably fail
- {
- // we have no dali, we can't lock..
- Owned<IFileDescriptor> fd = daliHelper->resolveCachedLFN(fileName);
- if (fd)
- {
- Owned <IResolvedFileCreator> creator = createResolvedFile(fileName, NULL, false);
- Owned<IFileDescriptor> remoteFDesc = daliHelper->checkClonedFromRemote(fileName, fd, cacheResult);
- creator->addSubFile(fd.getClear(), remoteFDesc.getClear());
- result = creator.getClear();
- }
- }
- }
- if (!result)
- {
- StringBuffer useName;
- bool wasDFS = false;
- if (strstr(fileName,"::"))
- {
- makeSinglePhysicalPartName(fileName, useName, true, wasDFS);
- }
- else
- useName.append(fileName);
- bool exists = checkFileExists(useName);
- if (exists || alwaysCreate)
- {
- Owned <IResolvedFileCreator> creator = createResolvedFile(fileName, wasDFS ? NULL : useName.str(), false);
- if (exists)
- creator->addSubFile(useName);
- result = creator.getClear();
- }
- }
- }
- if (cacheResult)
- {
- if (traceLevel > 9)
- DBGLOG("resolveLFNusingDaliOrLocal %s - cache add %d", fileName, result != NULL);
- if (result)
- daliFiles.addCache(fileName, result);
- else
- noteDaliMiss(fileName);
- }
- return result;
- }
- // Use local package and its bases to resolve existing file into physical file info via all supported resolvers
- IResolvedFile *lookupExpandedFileName(const char *fileName, bool useCache, bool cacheResult, bool writeAccess, bool alwaysCreate, bool checkCompulsory) const
- {
- IResolvedFile *result = lookupFile(fileName, useCache, cacheResult, writeAccess, alwaysCreate);
- if (!result && (!checkCompulsory || !isCompulsory()))
- result = resolveLFNusingDaliOrLocal(fileName, useCache, cacheResult, writeAccess, alwaysCreate);
- return result;
- }
- IResolvedFile *lookupFile(const char *fileName, bool useCache, bool cacheResult, bool writeAccess, bool alwaysCreate) const
- {
- // Order of resolution:
- // 1. Files named in package
- // 2. Files named in bases
- IResolvedFile* result = useCache ? fileCache.lookupCache(fileName) : NULL;
- if (result)
- return result;
- Owned<const ISimpleSuperFileEnquiry> subFileInfo = resolveSuperFile(fileName);
- if (subFileInfo)
- {
- unsigned numSubFiles = subFileInfo->numSubFiles();
- // Note: do not try to optimize the common case of a single subfile
- // as we still want to report the superfile info from the resolvedFile
- Owned<IResolvedFileCreator> super;
- for (unsigned idx = 0; idx < numSubFiles; idx++)
- {
- StringBuffer subFileName;
- subFileInfo->getSubFileName(idx, subFileName);
- if (subFileName.length()) // Empty subfile names can come from package file - just ignore
- {
- if (subFileName.charAt(0)=='~')
- {
- // implies that a package file had ~ in subfile names - shouldn't really, but we allow it (and just strip the ~)
- subFileName.remove(0,1);
- }
- if (traceLevel > 9)
- DBGLOG("Looking up subfile %s", subFileName.str());
- Owned<const IResolvedFile> subFileInfo = lookupExpandedFileName(subFileName, useCache, cacheResult, false, false, false); // NOTE - overwriting a superfile does NOT require write access to subfiles
- if (subFileInfo)
- {
- if (!super)
- super.setown(createResolvedFile(fileName, NULL, true));
- super->addSubFile(subFileInfo);
- }
- }
- }
- if (super && cacheResult)
- fileCache.addCache(fileName, super);
- return super.getClear();
- }
- result = resolveLFNusingPackage(fileName);
- if (result)
- {
- if (cacheResult)
- fileCache.addCache(fileName, result);
- return result;
- }
- aindex_t count = getBaseCount();
- for (aindex_t i = 0; i < count; i++)
- {
- const CRoxiePackageNode *basePackage = getBaseNode(i);
- if (!basePackage)
- continue;
- IResolvedFile *result = basePackage->lookupFile(fileName, useCache, cacheResult, writeAccess, alwaysCreate);
- if (result)
- return result;
- }
- return NULL;
- }
- // default constructor for derived class use
- CRoxiePackageNode()
- {
- }
- public:
- IMPLEMENT_IINTERFACE;
- CRoxiePackageNode(IPropertyTree *p) : CPackageNode(p)
- {
- }
- ~CRoxiePackageNode()
- {
- assertex(fileCache.count()==0);
- // If it's possible for cached objects to outlive the cache I think there is a problem...
- // we could set the cache field to null here for any objects still in cache but there would be a race condition
- }
- virtual IPropertyTreeIterator *getInMemoryIndexInfo(const IPropertyTree &graphNode) const
- {
- StringBuffer xpath;
- xpath.append("SuperFile[@id='").append(queryNodeFileName(graphNode, getActivityKind(graphNode))).append("']");
- return lookupElements(xpath.str(), "MemIndex");
- }
- virtual const IResolvedFile *lookupFileName(const char *_fileName, bool opt, bool useCache, bool cacheResult, IConstWorkUnit *wu) const
- {
- StringBuffer fileName;
- expandLogicalFilename(fileName, _fileName, wu, false);
- if (traceLevel > 5)
- DBGLOG("lookupFileName %s", fileName.str());
- const IResolvedFile *result = lookupExpandedFileName(fileName, useCache, cacheResult, false, false, true);
- if (!result)
- {
- StringBuffer compulsoryMsg;
- if (isCompulsory())
- compulsoryMsg.append(" (Package is compulsory)");
- if (!opt)
- throw MakeStringException(ROXIE_FILE_ERROR, "Could not resolve filename %s%s", fileName.str(), compulsoryMsg.str());
- if (traceLevel > 4)
- DBGLOG("Could not resolve OPT filename %s%s", fileName.str(), compulsoryMsg.str());
- }
- return result;
- }
- virtual IRoxieWriteHandler *createFileName(const char *_fileName, bool overwrite, bool extend, const StringArray &clusters, IConstWorkUnit *wu) const
- {
- StringBuffer fileName;
- expandLogicalFilename(fileName, _fileName, wu, false);
- Owned<IResolvedFile> resolved = lookupFile(fileName, false, false, true, true);
- if (!resolved)
- resolved.setown(resolveLFNusingDaliOrLocal(fileName, false, false, true, true));
- if (resolved)
- {
- if (resolved->exists())
- {
- if (!overwrite)
- throw MakeStringException(99, "Cannot write %s, file already exists (missing OVERWRITE attribute?)", resolved->queryFileName());
- if (extend)
- UNIMPLEMENTED; // How does extend fit in with the clusterwritemanager stuff? They can't specify cluster and extend together...
- resolved->setCache(NULL);
- resolved->remove();
- }
- if (resolved->queryPhysicalName())
- fileName.clear().append(resolved->queryPhysicalName()); // if it turned out to be a local file
- resolved.clear();
- }
- else
- throw MakeStringException(ROXIE_FILE_ERROR, "Cannot write %s", fileName.str());
- // filename by now may be a local filename, or a dali one
- Owned<IRoxieDaliHelper> daliHelper = connectToDali();
- Owned<ILocalOrDistributedFile> ldFile = createLocalOrDistributedFile(fileName, NULL, false, false, true);
- if (!ldFile)
- throw MakeStringException(ROXIE_FILE_ERROR, "Cannot write %s", fileName.str());
- return createRoxieWriteHandler(daliHelper, ldFile.getClear(), clusters);
- }
- //map ambiguous IHpccPackage
- virtual ISimpleSuperFileEnquiry *resolveSuperFile(const char *superFileName) const
- {
- return CPackageNode::resolveSuperFile(superFileName);
- }
- virtual const char *queryEnv(const char *varname) const
- {
- return CPackageNode::queryEnv(varname);
- }
- virtual bool getEnableFieldTranslation() const
- {
- return CPackageNode::getEnableFieldTranslation();
- }
- virtual const IPropertyTree *queryTree() const
- {
- return CPackageNode::queryTree();
- }
- virtual hash64_t queryHash() const
- {
- return CPackageNode::queryHash();
- }
- virtual const char *queryId() const
- {
- return CPackageNode::queryId();
- }
- };
- CResolvedFileCache CRoxiePackageNode::daliFiles;
- typedef CResolvedPackage<CRoxiePackageNode> CRoxiePackage;
- IRoxiePackage *createRoxiePackage(IPropertyTree *p, IRoxiePackageMap *packages)
- {
- Owned<CRoxiePackage> pkg = new CRoxiePackage(p);
- if (packages)
- pkg->resolveBases(packages);
- return pkg.getClear();
- }
- //================================================================================================
- // CPackageMap - an implementation of IPackageMap using a string map
- //================================================================================================
- class CRoxiePackageMap : public CPackageMapOf<CRoxiePackageNode, IRoxiePackage>, implements IRoxiePackageMap
- {
- public:
- IMPLEMENT_IINTERFACE;
- typedef CPackageMapOf<CRoxiePackageNode, IRoxiePackage> BASE;
- CRoxiePackageMap(const char *_packageId, const char *_querySet, bool _active)
- : BASE(_packageId, _querySet, _active)
- {
- }
- //map ambiguous IHpccPackageMap interface
- virtual const IHpccPackage *queryPackage(const char *name) const
- {
- return BASE::queryPackage(name);
- }
- virtual const IHpccPackage *matchPackage(const char *name) const
- {
- return BASE::matchPackage(name);
- }
- virtual const char *queryPackageId() const
- {
- return BASE::queryPackageId();
- }
- virtual bool isActive() const
- {
- return BASE::isActive();
- }
- virtual bool validate(StringArray &queryids, StringArray &wrn, StringArray &err, StringArray &unmatchedQueries, StringArray &unusedPackages, StringArray &unmatchedFiles) const
- {
- return BASE::validate(queryids, wrn, err, unmatchedQueries, unusedPackages, unmatchedFiles);
- }
- virtual void gatherFileMappingForQuery(const char *queryname, IPropertyTree *fileInfo) const
- {
- BASE::gatherFileMappingForQuery(queryname, fileInfo);
- }
- virtual const IRoxiePackage *queryRoxiePackage(const char *name) const
- {
- return queryResolvedPackage(name);
- }
- virtual const IRoxiePackage *matchRoxiePackage(const char *name) const
- {
- return matchResolvedPackage(name);
- }
- };
- static CRoxiePackageMap *emptyPackageMap;
- static CRoxiePackage *rootPackage;
- static SpinLock emptyPackageMapCrit;
- static IRoxieDebugSessionManager *debugSessionManager;
- extern const IRoxiePackage &queryRootRoxiePackage()
- {
- SpinBlock b(emptyPackageMapCrit);
- if (!rootPackage)
- {
- // Set up the root package. This contains global settings from topology file
- rootPackage = new CRoxiePackage(topology); // attributes become control: environment settings. Rest of topology ignored.
- rootPackage->resolveBases(NULL);
- }
- return *rootPackage;
- }
- extern const IRoxiePackageMap &queryEmptyRoxiePackageMap()
- {
- SpinBlock b(emptyPackageMapCrit);
- if (!emptyPackageMap)
- emptyPackageMap = new CRoxiePackageMap("<none>", NULL, true);
- return *emptyPackageMap;
- }
- MODULE_INIT(INIT_PRIORITY_STANDARD)
- {
- emptyPackageMap = NULL;
- debugSessionManager = NULL;
- return true;
- }
- MODULE_EXIT()
- {
- ::Release(emptyPackageMap); // You can't use static Owned to release anything that may own a IPropertyTree
- ::Release(rootPackage);
- ::Release(debugSessionManager);
- }
- // IRoxieQuerySetManager
- // - CRoxieQuerySetManager -
- // - CRoxieServerQuerySetManager
- // - CRoxieSlaveQuerySetManager
- //
- // Manages a set of instantiated queries and allows us to look them up by queryname or alias
- //
- // IRoxieQuerySetManagerSet
- // - CRoxieSlaveQuerySetManagerSet
- //
- // Manages the IRoxieQuerySetManager for multiple channels
- //
- // CRoxieQueryPackageManager
- // - CRoxieDaliQueryPackageManager
- // - CStandaloneQueryPackageManager
- //
- // Groups a server resource manager and a set of slave resource managers (one per channel) together.
- // There is one per PackageMap
- //
- // CQueryPackageSetManager at outer level
- // There will be exactly one of these. It will reload the CQueryPackageManager's if dali Package info changes
- //================================================================================================
- // CRoxieQuerySetManager - shared base class for slave and server query set manager classes
- // Manages a set of instantiated queries and allows us to look them up by queryname or alias,
- // as well as controlling their lifespan
- //================================================================================================
- class CRoxieQuerySetManager : public CInterface, implements IRoxieQuerySetManager
- {
- protected:
- MapStringToMyClass<IQueryFactory> queries;
- MapStringToMyClass<IQueryFactory> aliases; // Do we gain anything by having two tables?
- unsigned channelNo;
- bool active;
- StringAttr querySetName;
- void addQuery(const char *id, IQueryFactory *n, hash64_t &hash)
- {
- hash = rtlHash64Data(sizeof(hash), &hash, n->queryHash());
- queries.setValue(id, n);
- n->Release(); // setValue links
- }
- void addAlias(const char *alias, const char *original, hash64_t &hash)
- {
- if (original && alias)
- {
- IQueryFactory *orig = queries.getValue(original);
- if (orig)
- {
- hash = rtlHash64VStr(alias, hash);
- hash = rtlHash64Data(sizeof(hash), &hash, orig->queryHash());
- aliases.setValue(alias, orig);
- }
- else
- throw MakeStringException(ROXIE_UNKNOWN_QUERY, "Unknown query %s", original);
- }
- else
- throw MakeStringException(ROXIE_INTERNAL_ERROR, "Invalid parameters to addAlias");
- }
- virtual IQueryFactory *loadQueryFromDll(const char *id, const IQueryDll *dll, const IRoxiePackage &package, const IPropertyTree *stateInfo, bool forceRetry) = 0;
- public:
- IMPLEMENT_IINTERFACE;
- CRoxieQuerySetManager(unsigned _channelNo, const char *_querySetName)
- : queries(true), aliases(true), active(false), querySetName(_querySetName)
- {
- channelNo = _channelNo;
- }
- virtual const char *queryId() const
- {
- return querySetName;
- }
- virtual bool isActive() const
- {
- return active;
- }
- virtual void load(const IPropertyTree *querySet, const IRoxiePackageMap &packages, hash64_t &hash, bool forceRetry)
- {
- Owned<IPropertyTreeIterator> queryNames = querySet->getElements("Query");
- ForEach (*queryNames)
- {
- const IPropertyTree &query = queryNames->query();
- const char *id = query.queryProp("@id");
- const char *dllName = query.queryProp("@dll");
- try
- {
- if (!id || !*id || !dllName || !*dllName)
- throw MakeStringException(ROXIE_QUERY_MODIFICATION, "dll and id must be specified");
- Owned<const IQueryDll> queryDll = createQueryDll(dllName);
- const IHpccPackage *package = NULL;
- const char *packageName = query.queryProp("@package");
- if (packageName && *packageName)
- {
- package = packages.queryPackage(packageName); // if a package is specified, require exact match
- if (!package)
- throw MakeStringException(ROXIE_QUERY_MODIFICATION, "Package %s specified by query %s not found", packageName, id);
- }
- else
- {
- package = packages.queryPackage(id); // Look for an exact match, then a fuzzy match, using query name as the package id
- if(!package) package = packages.matchPackage(id);
- if (!package) package = &queryRootRoxiePackage();
- }
- assertex(package && QUERYINTERFACE(package, const IRoxiePackage));
- addQuery(id, loadQueryFromDll(id, queryDll.getClear(), *QUERYINTERFACE(package, const IRoxiePackage), &query, forceRetry), hash);
- }
- catch (IException *E)
- {
- // we don't want a single bad query in the set to stop us loading all the others
- StringBuffer msg;
- msg.appendf("Failed to load query %s from %s", id ? id : "(null)", dllName ? dllName : "(null)");
- EXCLOG(E, msg.str());
- if (id)
- {
- StringBuffer emsg;
- E->errorMessage(emsg);
- Owned<IQueryFactory> dummyQuery = loadQueryFromDll(id, NULL, queryRootRoxiePackage(), NULL, false);
- dummyQuery->suspend(emsg.str());
- addQuery(id, dummyQuery.getClear(), hash);
- }
- E->Release();
- }
- }
- Owned<IPropertyTreeIterator> a = querySet->getElements("Alias");
- ForEach(*a)
- {
- IPropertyTree &item = a->query();
- const char *alias = item.queryProp("@name");
- const char *original = item.queryProp("@id");
- try
- {
- addAlias(alias, original, hash);
- }
- catch (IException *E)
- {
- // we don't want a single bad alias in the set to stop us loading all the others
- VStringBuffer msg("Failed to set alias %s on %s", alias, original);
- EXCLOG(E, msg.str());
- E->Release();
- }
- }
- active = packages.isActive();
- if (active)
- hash = rtlHash64VStr("active", hash);
- }
- virtual void getStats(const char *queryName, const char *graphName, StringBuffer &reply, const IRoxieContextLogger &logctx) const
- {
- Owned<IQueryFactory> f = getQuery(queryName, NULL, logctx);
- if (f)
- {
- reply.appendf("<Query id='%s'>\n", queryName);
- f->getStats(reply, graphName);
- reply.append("</Query>\n");
- }
- else
- throw MakeStringException(ROXIE_UNKNOWN_QUERY, "Unknown query %s", queryName);
- }
- virtual void resetQueryTimings(const char *queryName, const IRoxieContextLogger &logctx)
- {
- Owned<IQueryFactory> f = getQuery(queryName, NULL, logctx);
- if (f)
- f->resetQueryTimings();
- else
- throw MakeStringException(ROXIE_UNKNOWN_QUERY, "Unknown query %s", queryName);
- }
- virtual void resetAllQueryTimings()
- {
- HashIterator elems(queries);
- for (elems.first(); elems.isValid(); elems.next())
- {
- IMapping &cur = elems.query();
- queries.mapToValue(&cur)->resetQueryTimings();
- }
- }
- virtual void getActivityMetrics(StringBuffer &reply) const
- {
- HashIterator elems(queries);
- for (elems.first(); elems.isValid(); elems.next())
- {
- IMapping &cur = elems.query();
- queries.mapToValue(&cur)->getActivityMetrics(reply);
- }
- }
- virtual void getAllQueryInfo(StringBuffer &reply, bool full, const IRoxieContextLogger &logctx) const
- {
- HashIterator elems(queries);
- for (elems.first(); elems.isValid(); elems.next())
- {
- IMapping &cur = elems.query();
- IQueryFactory *query = queries.mapToValue(&cur);
- query->getQueryInfo(reply, full, logctx);
- }
- HashIterator aliasIterator(aliases);
- for (aliasIterator.first(); aliasIterator.isValid(); aliasIterator.next())
- {
- IMapping &cur = aliasIterator.query();
- reply.appendf(" <Alias id='%s' query='%s'/>\n", (const char *) cur.getKey(), aliases.mapToValue(&cur)->queryQueryName());
- }
- }
- virtual IQueryFactory *getQuery(const char *id, StringBuffer *querySet, const IRoxieContextLogger &logctx) const
- {
- IQueryFactory *ret;
- ret = aliases.getValue(id);
- if (ret && logctx.queryTraceLevel() > 5)
- logctx.CTXLOG("Found query alias %s => %s", id, ret->queryQueryName());
- if (!ret)
- ret = queries.getValue(id);
- if (ret && querySet)
- querySet->set(querySetName);
- return LINK(ret);
- }
- };
- //===============================================================================================================
- class CRoxieServerQuerySetManager : public CRoxieQuerySetManager
- {
- public:
- IMPLEMENT_IINTERFACE;
- CRoxieServerQuerySetManager(const char *_querySetName)
- : CRoxieQuerySetManager(0, _querySetName)
- {
- }
- virtual IQueryFactory * loadQueryFromDll(const char *id, const IQueryDll *dll, const IRoxiePackage &package, const IPropertyTree *stateInfo, bool forceRetry)
- {
- return createServerQueryFactory(id, dll, package, stateInfo, false, forceRetry);
- }
- };
- extern IRoxieQuerySetManager *createServerManager(const char *querySet)
- {
- return new CRoxieServerQuerySetManager(querySet);
- }
- //===============================================================================================================
- class CRoxieSlaveQuerySetManager : public CRoxieQuerySetManager
- {
- public:
- IMPLEMENT_IINTERFACE;
- CRoxieSlaveQuerySetManager(unsigned _channelNo, const char *_querySetName)
- : CRoxieQuerySetManager(_channelNo, _querySetName)
- {
- channelNo = _channelNo;
- }
- virtual IQueryFactory *loadQueryFromDll(const char *id, const IQueryDll *dll, const IRoxiePackage &package, const IPropertyTree *stateInfo, bool forceRetry)
- {
- return createSlaveQueryFactory(id, dll, package, channelNo, stateInfo, false, forceRetry);
- }
- };
- class CRoxieSlaveQuerySetManagerSet : public CInterface, implements IRoxieQuerySetManagerSet
- {
- public:
- IMPLEMENT_IINTERFACE;
- CRoxieSlaveQuerySetManagerSet(unsigned _numChannels, const char *querySetName)
- : numChannels(_numChannels)
- {
- CriticalBlock b(ccdChannelsCrit);
- managers = new CRoxieSlaveQuerySetManager *[numChannels];
- memset(managers, 0, sizeof(CRoxieSlaveQuerySetManager *) * numChannels);
- Owned<IPropertyTreeIterator> it = ccdChannels->getElements("RoxieSlaveProcess");
- ForEach(*it)
- {
- unsigned channelNo = it->query().getPropInt("@channel", 0);
- assertex(channelNo>0 && channelNo<=numChannels);
- if (managers[channelNo-1] == NULL)
- managers[channelNo-1] = new CRoxieSlaveQuerySetManager(channelNo, querySetName);
- else
- throw MakeStringException(ROXIE_INVALID_TOPOLOGY, "Invalid topology file - channel %d repeated for this slave", channelNo);
- }
- }
- ~CRoxieSlaveQuerySetManagerSet()
- {
- for (unsigned channel = 0; channel < numChannels; channel++)
- ::Release(managers[channel]);
- delete [] managers;
- }
- inline CRoxieSlaveQuerySetManager *item(int idx)
- {
- return managers[idx];
- }
- virtual void load(const IPropertyTree *querySets, const IRoxiePackageMap &packages, hash64_t &hash, bool forceRetry)
- {
- for (unsigned channel = 0; channel < numChannels; channel++)
- if (managers[channel])
- managers[channel]->load(querySets, packages, hash, forceRetry); // MORE - this means the hash depends on the number of channels. Is that desirable?
- }
- private:
- unsigned numChannels;
- CRoxieSlaveQuerySetManager **managers;
- };
- //===============================================================================================================
- class CRoxieDebugSessionManager : public CInterface, implements IRoxieDebugSessionManager
- {
- protected:
- ReadWriteLock debugLock;
- MapStringToMyClass<IDebuggerContext> debuggerContexts;
- public:
- IMPLEMENT_IINTERFACE;
- void getActiveQueries(StringBuffer &reply)
- {
- HashIterator q(debuggerContexts);
- for (q.first(); q.isValid(); q.next())
- {
- IDebuggerContext *ctx = debuggerContexts.mapToValue(&q.query());
- reply.appendf(" <Query id='%s' uid='%s' debug='1'/>\n", ctx->queryQueryName(), ctx->queryDebugId());
- }
- }
- virtual void registerDebugId(const char *id, IDebuggerContext *ctx)
- {
- WriteLockBlock block(debugLock);
- debuggerContexts.setValue(id, ctx);
- }
- virtual void deregisterDebugId(const char *id)
- {
- WriteLockBlock block(debugLock);
- debuggerContexts.remove(id);
- }
- virtual IDebuggerContext *lookupDebuggerContext(const char *id)
- {
- ReadLockBlock block(debugLock);
- IDebuggerContext *ctx = debuggerContexts.getValue(id);
- if (ctx)
- return LINK(ctx);
- else
- {
- #ifdef _DEBUG
- // In a debug environment, it is convenient to be able to use '*' to mean 'the only active debug session'...
- if (strcmp(id, "*")==0 && debuggerContexts.count()==1)
- {
- HashIterator q(debuggerContexts);
- for (q.first(); q.isValid(); q.next())
- {
- IDebuggerContext *ctx = debuggerContexts.mapToValue(&q.query());
- return LINK(ctx);
- }
- }
- #endif
- throw MakeStringException(ROXIE_INTERNAL_ERROR, "Debug context %s not found", id);
- }
- }
- };
- //===============================================================================================
- /*----------------------------------------------------------------------------------------------
- * A CRoxieQueryPackageManager object manages all the queries that are currently runnable via XML.
- * There may be more than one in existence, but only one will be active and therefore used to
- * look up queries that are received - this corresponds to the currently active package.
- *-----------------------------------------------------------------------------------------------*/
- class CRoxieQueryPackageManager : public CInterface
- {
- public:
- IMPLEMENT_IINTERFACE;
- CRoxieQueryPackageManager(unsigned _numChannels, const char *_querySet, const IRoxiePackageMap *_packages)
- : numChannels(_numChannels), packages(_packages), querySet(_querySet)
- {
- queryHash = 0;
- }
- ~CRoxieQueryPackageManager()
- {
- }
- inline const char *queryPackageId() const
- {
- return packages->queryPackageId();
- }
- virtual void reload()
- {
- // Default is to do nothing...
- }
- virtual void load(bool forceReload) = 0;
- virtual hash64_t getHash()
- {
- CriticalBlock b2(updateCrit);
- return queryHash;
- }
- IRoxieQuerySetManager* getRoxieServerManager()
- {
- CriticalBlock b2(updateCrit);
- return serverManager.getLink();
- }
- void getInfo(StringBuffer &reply, const IRoxieContextLogger &logctx) const
- {
- reply.appendf(" <PackageSet id=\"%s\" querySet=\"%s\"/>\n", queryPackageId(), querySet.get());
- }
- void resetStats(const char *queryId, const IRoxieContextLogger &logctx)
- {
- CriticalBlock b(updateCrit);
- if (queryId)
- {
- Owned<IQueryFactory> query = serverManager->getQuery(queryId, NULL, logctx);
- if (query)
- {
- const char *id = query->queryQueryName();
- serverManager->resetQueryTimings(id, logctx);
- for (unsigned channel = 0; channel < numChannels; channel++)
- if (slaveManagers->item(channel))
- {
- slaveManagers->item(channel)->resetQueryTimings(id, logctx);
- }
- }
- }
- else
- {
- serverManager->resetAllQueryTimings();
- for (unsigned channel = 0; channel < numChannels; channel++)
- if (slaveManagers->item(channel))
- slaveManagers->item(channel)->resetAllQueryTimings();
- }
- }
- void getStats(const char *queryId, const char *action, const char *graphName, StringBuffer &reply, const IRoxieContextLogger &logctx) const
- {
- CriticalBlock b2(updateCrit);
- Owned<IQueryFactory> query = serverManager->getQuery(queryId, NULL, logctx);
- if (query)
- {
- StringBuffer freply;
- serverManager->getStats(queryId, graphName, freply, logctx);
- Owned<IPropertyTree> stats = createPTreeFromXMLString(freply.str());
- for (unsigned channel = 0; channel < numChannels; channel++)
- if (slaveManagers->item(channel))
- {
- StringBuffer sreply;
- slaveManagers->item(channel)->getStats(queryId, graphName, sreply, logctx);
- Owned<IPropertyTree> cstats = createPTreeFromXMLString(sreply.str());
- mergeStats(stats, cstats, 1);
- }
- toXML(stats, reply);
- }
- }
- void getActivityMetrics(StringBuffer &reply) const
- {
- CriticalBlock b2(updateCrit);
- serverManager->getActivityMetrics(reply);
- for (unsigned channel = 0; channel < numChannels; channel++)
- {
- if (slaveManagers->item(channel))
- {
- slaveManagers->item(channel)->getActivityMetrics(reply);
- }
- }
- }
- void getAllQueryInfo(StringBuffer &reply, bool full, const IRoxieContextLogger &logctx) const
- {
- CriticalBlock b2(updateCrit);
- serverManager->getAllQueryInfo(reply, full, logctx);
- }
- protected:
- void reloadQueryManagers(CRoxieSlaveQuerySetManagerSet *newSlaveManagers, IRoxieQuerySetManager *newServerManager, hash64_t newHash)
- {
- Owned<CRoxieSlaveQuerySetManagerSet> oldSlaveManagers;
- Owned<IRoxieQuerySetManager> oldServerManager;
- {
- // Atomically, replace the existing query managers with the new ones
- CriticalBlock b2(updateCrit);
- oldSlaveManagers.setown(slaveManagers.getClear()); // so that the release happens outside the critblock
- oldServerManager.setown(serverManager.getClear()); // so that the release happens outside the critblock
- slaveManagers.setown(newSlaveManagers);
- serverManager.setown(newServerManager);
- queryHash = newHash;
- }
- if (slaveQueryReleaseDelaySeconds)
- delayedReleaser->delayedRelease(oldSlaveManagers.getClear(), slaveQueryReleaseDelaySeconds);
- }
- mutable CriticalSection updateCrit; // protects updates of slaveManagers and serverManager
- Owned<CRoxieSlaveQuerySetManagerSet> slaveManagers;
- Owned<IRoxieQuerySetManager> serverManager;
- Owned<const IRoxiePackageMap> packages;
- unsigned numChannels;
- hash64_t queryHash;
- StringAttr querySet;
- };
- /**
- * class CRoxieDaliQueryPackageManager - manages queries specified in QuerySets, for a given package set.
- *
- * If the QuerySet is modified, it will be reloaded.
- * There is one CRoxieDaliQueryPackageManager for every PackageSet - only one will be active for query lookup
- * at a given time (the one associated with the active PackageSet).
- *
- * To deploy new data, typically we will load a new PackageSet, make it active, then release the old one
- * A packageSet is not modified while loaded, to avoid timing issues between slaves and server.
- *
- * We need to be able to spot a change (in dali) to the active package indicator (and switch the active CRoxieDaliQueryPackageManager)
- * We need to be able to spot a change (in dali) that adds a new PackageSet
- * We need to decide what to do about a change (in dali) to an existing PackageSet. Maybe we allow it (leave it up to the gui to
- * encourage changing in the right sequence). In which case a change to the package info in dali means reload all global package
- * managers (and then discard the old ones). Hash-based queries means everything should work ok.
- * -> If the active ptr changes, just change what is active
- * If any change to any package set, reload all globalResourceManagers and discard prior
- * The query caching code should ensure that it is quick enough to do so
- *
- **/
- class CRoxieDaliQueryPackageManager : public CRoxieQueryPackageManager, implements ISDSSubscription
- {
- Owned<IRoxieDaliHelper> daliHelper;
- Owned<IDaliPackageWatcher> notifier;
- public:
- IMPLEMENT_IINTERFACE;
- CRoxieDaliQueryPackageManager(unsigned _numChannels, const IRoxiePackageMap *_packages, const char *_querySet)
- : CRoxieQueryPackageManager(_numChannels, _querySet, _packages)
- {
- daliHelper.setown(connectToDali());
- }
- ~CRoxieDaliQueryPackageManager()
- {
- if (notifier)
- daliHelper->releaseSubscription(notifier);
- }
- virtual void notify(SubscriptionId id, const char *xpath, SDSNotifyFlags flags, unsigned valueLen, const void *valueData)
- {
- reload(false);
- daliHelper->commitCache();
- }
- virtual void load(bool forceReload)
- {
- notifier.setown(daliHelper->getQuerySetSubscription(querySet, this));
- reload(forceReload);
- }
- virtual void reload(bool forceRetry)
- {
- hash64_t newHash = numChannels;
- Owned<IPropertyTree> newQuerySet = daliHelper->getQuerySet(querySet);
- Owned<CRoxieSlaveQuerySetManagerSet> newSlaveManagers = new CRoxieSlaveQuerySetManagerSet(numChannels, querySet);
- Owned<IRoxieQuerySetManager> newServerManager = createServerManager(querySet);
- newServerManager->load(newQuerySet, *packages, newHash, forceRetry);
- newSlaveManagers->load(newQuerySet, *packages, newHash, forceRetry);
- reloadQueryManagers(newSlaveManagers.getClear(), newServerManager.getClear(), newHash);
- clearKeyStoreCache(false); // Allows us to fully release files we no longer need because of unloaded queries
- }
- };
- class CStandaloneQueryPackageManager : public CRoxieQueryPackageManager
- {
- Owned<IPropertyTree> standaloneDll;
- public:
- IMPLEMENT_IINTERFACE;
- CStandaloneQueryPackageManager(unsigned _numChannels, const char *_querySet, const IRoxiePackageMap *_packages, IPropertyTree *_standaloneDll)
- : CRoxieQueryPackageManager(_numChannels, _querySet, _packages), standaloneDll(_standaloneDll)
- {
- assertex(standaloneDll);
- }
- ~CStandaloneQueryPackageManager()
- {
- }
- virtual void load(bool forceReload)
- {
- hash64_t newHash = numChannels;
- Owned<IPropertyTree> newQuerySet = createPTree("QuerySet");
- newQuerySet->setProp("@name", "_standalone");
- newQuerySet->addPropTree("Query", standaloneDll.getLink());
- Owned<CRoxieSlaveQuerySetManagerSet> newSlaveManagers = new CRoxieSlaveQuerySetManagerSet(numChannels, querySet);
- Owned<IRoxieQuerySetManager> newServerManager = createServerManager(querySet);
- newServerManager->load(newQuerySet, *packages, newHash, forceReload);
- newSlaveManagers->load(newQuerySet, *packages, newHash, forceReload);
- reloadQueryManagers(newSlaveManagers.getClear(), newServerManager.getClear(), newHash);
- }
- };
- static SpinLock roxieDebugSessionManagerLock;
- extern IRoxieDebugSessionManager &queryRoxieDebugSessionManager()
- {
- SpinBlock b(roxieDebugSessionManagerLock);
- if (!debugSessionManager)
- debugSessionManager = new CRoxieDebugSessionManager();
- return *debugSessionManager;
- }
- class CRoxiePackageSetWatcher : public CInterface, implements ISDSSubscription
- {
- public:
- IMPLEMENT_IINTERFACE;
- CRoxiePackageSetWatcher(IRoxieDaliHelper *_daliHelper, ISDSSubscription *_owner, unsigned numChannels, bool forceReload)
- : stateHash(0), daliHelper(_daliHelper), owner(_owner)
- {
- Owned<IDaliPackageWatcher> notifier = daliHelper->getPackageSetsSubscription(this);
- if (notifier)
- notifiers.append(*notifier.getClear());
- ForEachItemIn(idx, allQuerySetNames)
- {
- createQueryPackageManagers(numChannels, allQuerySetNames.item(idx), forceReload);
- }
- }
- CRoxiePackageSetWatcher(IRoxieDaliHelper *_daliHelper, ISDSSubscription *_owner, const IQueryDll *standAloneDll, unsigned numChannels, const char *querySet, bool forceReload)
- : stateHash(0), daliHelper(_daliHelper), owner(_owner)
- {
- Owned<IPropertyTree> standAloneDllTree;
- standAloneDllTree.setown(createPTree("Query"));
- standAloneDllTree->setProp("@id", "roxie");
- standAloneDllTree->setProp("@dll", standAloneDll->queryDll()->queryName());
- Owned<CRoxieQueryPackageManager> qpm = new CStandaloneQueryPackageManager(numChannels, querySet, LINK(&queryEmptyRoxiePackageMap()), standAloneDllTree.getClear());
- qpm->load(forceReload);
- stateHash = rtlHash64Data(sizeof(stateHash), &stateHash, qpm->getHash());
- allQueryPackages.append(*qpm.getClear());
- }
- ~CRoxiePackageSetWatcher()
- {
- unsubscribe();
- }
- virtual void notify(SubscriptionId id, const char *xpath, SDSNotifyFlags flags, unsigned valueLen, const void *valueData)
- {
- owner->notify(id, xpath, flags, valueLen, valueData);
- }
- IQueryFactory *lookupLibrary(const char *libraryName, unsigned expectedInterfaceHash, const IRoxieContextLogger &logctx) const
- {
- ForEachItemIn(idx, allQueryPackages)
- {
- Owned<IRoxieQuerySetManager> sm = allQueryPackages.item(idx).getRoxieServerManager();
- if (sm->isActive())
- {
- Owned<IQueryFactory> library = sm->getQuery(libraryName, NULL, logctx);
- if (library)
- {
- if (library->isQueryLibrary())
- {
- unsigned foundInterfaceHash = library->getQueryLibraryInterfaceHash();
- if (!foundInterfaceHash || (foundInterfaceHash == expectedInterfaceHash))
- return library.getClear();
- else
- throw MakeStringException(ROXIE_LIBRARY_ERROR, "The library interface found in %s is not compatible (found %d, expected %d)", libraryName, foundInterfaceHash, expectedInterfaceHash);
- }
- else
- throw MakeStringException(ROXIE_LIBRARY_ERROR, "The query resolved by %s is not a library", libraryName);
- }
- }
- }
- throw MakeStringException(ROXIE_LIBRARY_ERROR, "No library available for %s", libraryName);
- }
- IQueryFactory *getQuery(const char *id, StringBuffer *querySet, const IRoxieContextLogger &logctx) const
- {
- ForEachItemIn(idx, allQueryPackages)
- {
- Owned<IRoxieQuerySetManager> sm = allQueryPackages.item(idx).getRoxieServerManager();
- if (sm->isActive())
- {
- IQueryFactory *query = sm->getQuery(id, querySet, logctx);
- if (query)
- return query;
- }
- }
- return NULL;
- }
- int getActivePackageCount() const
- {
- int count = 0;
- ForEachItemIn(idx, allQueryPackages)
- {
- Owned<IRoxieQuerySetManager> sm = allQueryPackages.item(idx).getRoxieServerManager();
- if (sm->isActive())
- count++;
- }
- return count;
- }
- inline hash64_t queryHash() const
- {
- return stateHash;
- }
- void getAllQueryInfo(StringBuffer &reply, bool full, const IRoxieContextLogger &logctx) const
- {
- ForEachItemIn(idx, allQueryPackages)
- {
- Owned<IRoxieQuerySetManager> sm = allQueryPackages.item(idx).getRoxieServerManager();
- sm->getAllQueryInfo(reply, full, logctx);
- }
- }
- void getActivityMetrics(StringBuffer &reply) const
- {
- ForEachItemIn(idx, allQueryPackages)
- {
- CRoxieQueryPackageManager &qpm = allQueryPackages.item(idx);
- qpm.getActivityMetrics(reply);
- }
- }
- void getInfo(StringBuffer &reply, const IRoxieContextLogger &logctx) const
- {
- reply.append("<PackageSets>\n");
- ForEachItemIn(idx, allQueryPackages)
- {
- allQueryPackages.item(idx).getInfo(reply, logctx);
- }
- reply.append("</PackageSets>\n");
- }
- void getStats(StringBuffer &reply, const char *id, const char *action, const char *graphName, const IRoxieContextLogger &logctx) const
- {
- ForEachItemIn(idx, allQueryPackages)
- {
- allQueryPackages.item(idx).getStats(id, action, graphName, reply, logctx);
- }
- }
- void resetStats(const char *id, const IRoxieContextLogger &logctx) const
- {
- ForEachItemIn(idx, allQueryPackages)
- {
- allQueryPackages.item(idx).resetStats(id, logctx);
- }
- }
- private:
- ISDSSubscription *owner;
- CIArrayOf<CRoxieQueryPackageManager> allQueryPackages;
- IArrayOf<IDaliPackageWatcher> notifiers;
- Linked<IRoxieDaliHelper> daliHelper;
- hash64_t stateHash;
- void createQueryPackageManager(unsigned numChannels, const IRoxiePackageMap *packageMap, const char *querySet, bool forceReload)
- {
- Owned<CRoxieQueryPackageManager> qpm = new CRoxieDaliQueryPackageManager(numChannels, packageMap, querySet);
- qpm->load(forceReload);
- stateHash = rtlHash64Data(sizeof(stateHash), &stateHash, qpm->getHash());
- allQueryPackages.append(*qpm.getClear());
- }
- void createQueryPackageManagers(unsigned numChannels, const char *querySet, bool forceReload)
- {
- int loadedPackages = 0;
- int activePackages = 0;
- Owned<IPropertyTree> packageTree = daliHelper->getPackageSets();
- Owned<IPropertyTreeIterator> packageSets = packageTree->getElements("PackageSet");
- ForEach(*packageSets)
- {
- IPropertyTree &ps = packageSets->query();
- const char *packageSetSpec = ps.queryProp("@process");
- if (!packageSetSpec || WildMatch(roxieName, packageSetSpec, false))
- {
- if (traceLevel)
- {
- DBGLOG("Loading package set %s, process spec %s", ps.queryProp("@id") ? ps.queryProp("@id") : "<no-id>",
- packageSetSpec ? packageSetSpec : "<*>");
- }
- Owned<IPropertyTreeIterator> packageMaps = ps.getElements("PackageMap");
- ForEach(*packageMaps)
- {
- IPropertyTree &pm = packageMaps->query();
- const char *packageMapId = pm.queryProp("@id");
- const char *packageMapFilter = pm.queryProp("@querySet");
- if (packageMapId && *packageMapId && (!packageMapFilter || WildMatch(querySet, packageMapFilter, false)))
- {
- bool isActive = pm.getPropBool("@active", true);
- if (traceLevel)
- DBGLOG("Loading package map %s, active %s", packageMapId, isActive ? "true" : "false");
- try
- {
- Owned<CRoxiePackageMap> packageMap = new CRoxiePackageMap(packageMapId, packageMapFilter, isActive);
- Owned<IPropertyTree> xml = daliHelper->getPackageMap(packageMapId);
- packageMap->load(xml);
- createQueryPackageManager(numChannels, packageMap.getLink(), querySet, forceReload);
- loadedPackages++;
- if (isActive)
- activePackages++;
- notifiers.append(*daliHelper->getPackageMapSubscription(packageMapId, this));
- }
- catch (IException *E)
- {
- StringBuffer msg;
- msg.appendf("Failed to load package map %s", packageMapId);
- EXCLOG(E, msg.str());
- E->Release();
- }
- }
- }
- }
- }
- if (!loadedPackages)
- {
- if (traceLevel)
- DBGLOG("Loading empty package for QuerySet %s", querySet);
- createQueryPackageManager(numChannels, LINK(&queryEmptyRoxiePackageMap()), querySet, forceReload);
- }
- else if (traceLevel)
- DBGLOG("Loaded %d packages (%d active)", loadedPackages, activePackages);
- }
- void unsubscribe()
- {
- ForEachItemIn(idx, notifiers)
- {
- daliHelper->releaseSubscription(¬ifiers.item(idx));
- }
- notifiers.kill();
- }
- };
- class CRoxiePackageSetManager : public CInterface, implements IRoxieQueryPackageManagerSet, implements ISDSSubscription
- {
- public:
- IMPLEMENT_IINTERFACE;
- CRoxiePackageSetManager(const IQueryDll *_standAloneDll) :
- autoReloadThread(*this), standAloneDll(_standAloneDll)
- {
- daliHelper.setown(connectToDali(ROXIE_DALI_CONNECT_TIMEOUT));
- atomic_set(&autoPending, 0);
- autoReloadThread.start();
- }
- ~CRoxiePackageSetManager()
- {
- autoReloadThread.stop();
- autoReloadThread.join();
- }
- virtual void requestReload()
- {
- atomic_inc(&autoPending);
- autoReloadTrigger.signal();
- }
- virtual void load()
- {
- try
- {
- reload(false);
- daliHelper->commitCache();
- controlSem.signal();
- }
- catch(IException *E)
- {
- EXCLOG(E, "No configuration could be loaded");
- controlSem.interrupt();
- throw; // Roxie will refuse to start up if configuration invalid
- }
- }
- virtual void doControlMessage(IPropertyTree *xml, StringBuffer &reply, const IRoxieContextLogger &logctx)
- {
- if (!controlSem.wait(20000))
- throw MakeStringException(ROXIE_TIMEOUT, "Timed out waiting for current control query to complete");
- try
- {
- _doControlMessage(xml, reply, logctx);
- reply.append(" <Status>ok</Status>\n");
- }
- catch(IException *E)
- {
- controlSem.signal();
- EXCLOG(E);
- throw;
- }
- catch(...)
- {
- controlSem.signal();
- throw;
- }
- controlSem.signal();
- }
- virtual IQueryFactory *lookupLibrary(const char *libraryName, unsigned expectedInterfaceHash, const IRoxieContextLogger &logctx) const
- {
- ReadLockBlock b(packageCrit);
- return allQueryPackages->lookupLibrary(libraryName, expectedInterfaceHash, logctx);
- }
- virtual IQueryFactory *getQuery(const char *id, StringBuffer *querySet, const IRoxieContextLogger &logctx) const
- {
- ReadLockBlock b(packageCrit);
- return allQueryPackages->getQuery(id, querySet, logctx);
- }
- virtual int getActivePackageCount() const
- {
- ReadLockBlock b(packageCrit);
- return allQueryPackages->getActivePackageCount();
- }
- virtual void notify(SubscriptionId id, const char *xpath, SDSNotifyFlags flags, unsigned valueLen, const void *valueData)
- {
- requestReload();
- }
- private:
- Owned<const IQueryDll> standAloneDll;
- Owned<CRoxieDebugSessionManager> debugSessionManager;
- Owned<IRoxieDaliHelper> daliHelper;
- mutable ReadWriteLock packageCrit;
- InterruptableSemaphore controlSem;
- Owned<CRoxiePackageSetWatcher> allQueryPackages;
- Semaphore autoReloadTrigger;
- atomic_t autoPending;
- class AutoReloadThread : public Thread
- {
- bool closing;
- CRoxiePackageSetManager &owner;
- public:
- AutoReloadThread(CRoxiePackageSetManager &_owner)
- : owner(_owner), Thread("AutoReloadThread")
- {
- closing = false;
- }
- virtual int run()
- {
- if (traceLevel)
- DBGLOG("AutoReloadThread %p starting", this);
- while (!closing)
- {
- owner.autoReloadTrigger.wait();
- if (atomic_read(&owner.autoPending))
- {
- atomic_set(&owner.autoPending, 0);
- try
- {
- owner.reload(false); // Arguably true should be better...
- }
- catch (IException *E)
- {
- if (!closing)
- EXCLOG(MCoperatorError, E, "AutoReloadThread: ");
- E->Release();
- }
- catch (...)
- {
- DBGLOG("Unknown exception in AutoReloadThread");
- }
- }
- }
- if (traceLevel)
- DBGLOG("AutoReloadThread %p exiting", this);
- return 0;
- }
- void stop()
- {
- closing = true;
- owner.autoReloadTrigger.signal();
- }
- } autoReloadThread;
- void reload(bool forceRetry)
- {
- clearDaliMisses();
- // We want to kill the old packages, but not until we have created the new ones
- // So that the query/dll caching will work for anything that is not affected by the changes
- Owned<CRoxiePackageSetWatcher> newPackages;
- if (standAloneDll)
- newPackages.setown(new CRoxiePackageSetWatcher(daliHelper, this, standAloneDll, numChannels, "roxie", forceRetry));
- else
- newPackages.setown(new CRoxiePackageSetWatcher(daliHelper, this, numChannels, forceRetry));
- // Hold the lock for as little time as we can
- // Note that we must NOT hold the lock during the delete of the old object - or we deadlock.
- // Hence the slightly convoluted code below
- Owned<CRoxiePackageSetWatcher> oldPackages; // NB Destroyed outside the WriteLockBlock
- {
- WriteLockBlock b(packageCrit);
- oldPackages.setown(allQueryPackages.getLink()); // To ensure that the setown just below does not delete it
- allQueryPackages.setown(newPackages.getClear());
- }
- daliHelper->commitCache();
- }
- // Common code used by control:queries and control:getQueryXrefInfo
- void getQueryInfo(IPropertyTree *control, StringBuffer &reply, bool full, const IRoxieContextLogger &logctx) const
- {
- Owned<IPropertyTreeIterator> ids = control->getElements("Query");
- reply.append("<Queries reporting='1'>\n");
- if (ids->first())
- {
- ForEach(*ids)
- {
- const char *id = ids->query().queryProp("@id");
- if (id)
- {
- Owned<IQueryFactory> query = getQuery(id, NULL, logctx);
- if (query)
- query->getQueryInfo(reply, full, logctx);
- else
- reply.appendf(" <Query id=\"%s\" error=\"Query not found\"/>\n", id);
- }
- }
- }
- else
- {
- ReadLockBlock readBlock(packageCrit);
- allQueryPackages->getAllQueryInfo(reply, full, logctx);
- }
- reply.append("</Queries>\n");
- }
- void _doControlMessage(IPropertyTree *control, StringBuffer &reply, const IRoxieContextLogger &logctx)
- {
- const char *queryName = control->queryName();
- logctx.CTXLOG("doControlMessage - %s", queryName);
- assertex(memicmp(queryName, "control:", 8) == 0);
- bool unknown = false;
- switch (_toupper(queryName[8]))
- {
- case 'A':
- if (stricmp(queryName, "control:aclupdate") == 0)
- {
- // MORE - do nothing for now - possibly needed in the future - leave this so no exception is thrown
- }
- else if (stricmp(queryName, "control:activeQueries")==0)
- {
- if (debugSessionManager)
- debugSessionManager->getActiveQueries(reply);
- }
- else if (stricmp(queryName, "control:activitymetrics")==0)
- {
- ReadLockBlock readBlock(packageCrit);
- allQueryPackages->getActivityMetrics(reply);
- }
- else if (stricmp(queryName, "control:alive")==0)
- {
- reply.appendf("<Alive restarts='%d'/>", restarts);
- }
- else
- unknown = true;
- break;
- case 'B':
- if (stricmp(queryName, "control:blobCacheMem")==0)
- {
- blobCacheMB = control->getPropInt("@val", 0);
- topology->setPropInt("@blobCacheMem", blobCacheMB);
- setBlobCacheMem(blobCacheMB * 0x100000);
- }
- else
- unknown = true;
- break;
- case 'C':
- if (stricmp(queryName, "control:checkCompleted")==0)
- {
- checkCompleted = control->getPropBool("@val", true);
- topology->setPropBool("@checkCompleted", checkCompleted );
- }
- else if (stricmp(queryName, "control:checkingHeap")==0)
- {
- defaultCheckingHeap = control->getPropBool("@val", true);
- topology->setPropInt("@checkingHeap", defaultCheckingHeap);
- }
- else if (stricmp(queryName, "control:clearIndexCache")==0)
- {
- bool clearAll = control->getPropBool("@clearAll", true);
- clearKeyStoreCache(clearAll);
- }
- else if (stricmp(queryName, "control:closedown")==0)
- {
- closedown();
- }
- else if (stricmp(queryName, "control:closeExpired")==0)
- {
- queryFileCache().closeExpired(false);
- queryFileCache().closeExpired(true);
- }
- else if (stricmp(queryName, "control:closeLocalExpired")==0)
- {
- queryFileCache().closeExpired(false);
- }
- else if (stricmp(queryName, "control:closeRemoteExpired")==0)
- {
- queryFileCache().closeExpired(true);
- }
- else
- unknown = true;
- break;
- case 'D':
- if (stricmp(queryName, "control:dafilesrvLookupTimeout")==0)
- {
- dafilesrvLookupTimeout = control->getPropInt("@val", 10000);
- topology->setPropInt("@dafilesrvLookupTimeout", dafilesrvLookupTimeout);
- }
- else if (stricmp(queryName, "control:defaultConcatPreload")==0)
- {
- defaultConcatPreload = control->getPropInt("@val", 0);
- topology->setPropInt("@defaultConcatPreload", defaultConcatPreload);
- }
- else if (stricmp(queryName, "control:defaultFetchPreload")==0)
- {
- defaultFetchPreload = control->getPropInt("@val", 0);
- topology->setPropInt("@defaultFetchPreload", defaultFetchPreload);
- }
- else if (stricmp(queryName, "control:defaultFullKeyedJoinPreload")==0)
- {
- defaultFullKeyedJoinPreload = control->getPropInt("@val", 0);
- topology->setPropInt("@defaultFullKeyedJoinPreload", defaultFullKeyedJoinPreload);
- }
- else if (stricmp(queryName, "control:defaultHighPriorityTimeLimit")==0)
- {
- defaultTimeLimit[1] = control->getPropInt("@limit", 0);
- topology->setPropInt("@defaultHighPriorityTimeLimit", defaultTimeLimit[1]);
- }
- else if (stricmp(queryName, "control:defaultHighPriorityTimeWarning")==0)
- {
- defaultWarnTimeLimit[1] = control->getPropInt("@limit", 0);
- topology->setPropInt("@defaultHighPriorityTimeWarning", defaultWarnTimeLimit[1]);
- }
- else if (stricmp(queryName, "control:defaultKeyedJoinPreload")==0)
- {
- defaultKeyedJoinPreload = control->getPropInt("@val", 0);
- topology->setPropInt("@defaultKeyedJoinPreload", defaultKeyedJoinPreload);
- }
- else if (stricmp(queryName, "control:defaultLowPriorityTimeLimit")==0)
- {
- defaultTimeLimit[0] = control->getPropInt("@limit", 0);
- topology->setPropInt("@defaultLowPriorityTimeLimit", defaultTimeLimit[0]);
- }
- else if (stricmp(queryName, "control:defaultLowPriorityTimeWarning")==0)
- {
- defaultWarnTimeLimit[0] = control->getPropInt("@limit", 0);
- topology->setPropInt("@defaultLowPriorityTimeWarning", defaultWarnTimeLimit[0]);
- }
- else if (stricmp(queryName, "control:defaultParallelJoinPreload")==0)
- {
- defaultParallelJoinPreload = control->getPropInt("@val", 0);
- topology->setPropInt("@defaultParallelJoinPreload", defaultParallelJoinPreload);
- }
- else if (stricmp(queryName, "control:defaultSLAPriorityTimeLimit")==0)
- {
- defaultTimeLimit[2] = control->getPropInt("@limit", 0);
- topology->setPropInt("@defaultSLAPriorityTimeLimit", defaultTimeLimit[2]);
- }
- else if (stricmp(queryName, "control:defaultSLAPriorityTimeWarning")==0)
- {
- defaultWarnTimeLimit[2] = control->getPropInt("@limit", 0);
- topology->setPropInt("@defaultSLAPriorityTimeWarning", defaultWarnTimeLimit[2]);
- }
- else if (stricmp(queryName, "control:deleteUnneededPhysicalFiles")==0)
- {
- UNIMPLEMENTED;
- }
- else if (stricmp(queryName, "control:deleteUnneededQueryCacheFiles")==0)
- {
- UNIMPLEMENTED;
- }
- else if (stricmp(queryName, "control:doIbytiDelay")==0)
- { // WARNING: use with extra care only during inactivity on system
- doIbytiDelay = control->getPropBool("@val", true);
- topology->setPropBool("@doIbytiDelay", doIbytiDelay);
- }
- else
- unknown = true;
- break;
- case 'E':
- if (stricmp(queryName, "control:enableKeyDiff")==0)
- {
- enableKeyDiff = control->getPropBool("@val", true);
- topology->setPropBool("@enableKeyDiff", enableKeyDiff);
- }
- else
- unknown = true;
- break;
-
- case 'F':
- if (stricmp(queryName, "control:fieldTranslationEnabled")==0)
- {
- fieldTranslationEnabled = control->getPropBool("@val", true);
- topology->setPropInt("@fieldTranslationEnabled", fieldTranslationEnabled);
- }
- else if (stricmp(queryName, "control:flushJHtreeCacheOnOOM")==0)
- {
- flushJHtreeCacheOnOOM = control->getPropBool("@val", true);
- topology->setPropInt("@flushJHtreeCacheOnOOM", flushJHtreeCacheOnOOM);
- }
- else
- unknown = true;
- break;
- case 'G':
- if (stricmp(queryName, "control:getACLinfo") == 0)
- {
- // MORE - do nothing for now - possibly needed in the future - leave this so no exception is thrown
- }
- else if (stricmp(queryName, "control:getClusterName")==0)
- {
- reply.appendf("<clusterName id='%s'/>", roxieName.str());
- }
- else if (stricmp(queryName, "control:getKeyInfo")==0)
- {
- reportInMemoryIndexStatistics(reply, control->queryProp("@id"), control->getPropInt("@count", 10));
- }
- else if (stricmp(queryName, "control:getQueryXrefInfo")==0)
- {
- getQueryInfo(control, reply, true, logctx);
- }
- else if (stricmp(queryName, "control:getQuery")==0)
- {
- const char* id = control->queryProp("@id");
- if (!id)
- throw MakeStringException(ROXIE_MISSING_PARAMS, "No query name specified");
- Owned<IQueryFactory> q = getQuery(id, NULL, logctx);
- if (q)
- {
- Owned<IPropertyTree> tempTree = q->cloneQueryXGMML();
- tempTree->setProp("@roxieName", roxieName.str());
- toXML(tempTree, reply);
- }
- else
- throw MakeStringException(ROXIE_UNKNOWN_QUERY, "Unknown query %s", id);
- }
- else if (stricmp(queryName, "control:getQueryWarningTime")==0)
- {
- const char *id = control->queryProp("Query/@id");
- if (!id)
- badFormat();
- Owned<IQueryFactory> f = getQuery(id, NULL, logctx);
- if (f)
- {
- unsigned warnLimit = f->getWarnTimeLimit();
- reply.appendf("<QueryTimeWarning val='%d'/>", warnLimit);
- }
- }
- else if (stricmp(queryName, "control:getBuildVersion")==0)
- {
- reply.appendf("<version id='%s'/>", BUILD_TAG);
- }
- else
- unknown = true;
- break;
- case 'I':
- if (stricmp(queryName, "control:indexmetrics")==0)
- {
- getIndexMetrics(reply);
- }
- else if (stricmp(queryName, "control:inMemoryKeysEnabled")==0)
- {
- inMemoryKeysEnabled = control->getPropBool("@val", true);
- topology->setPropBool("@inMemoryKeysEnabled", inMemoryKeysEnabled);
- }
- else
- unknown = true;
- break;
- case 'L':
- if (stricmp(queryName, "control:leafCacheMem")==0)
- {
- leafCacheMB = control->getPropInt("@val", 50);
- topology->setPropInt("@leafCacheMem", leafCacheMB);
- setLeafCacheMem(leafCacheMB * 0x100000);
- }
- else if (stricmp(queryName, "control:listFileOpenErrors")==0)
- {
- // this just creates a delta state file to remove references to Keys / Files we now longer have interest in
- StringAttrMapping *mapping = queryFileCache().queryFileErrorList();
- HashIterator iter(*mapping);
- StringBuffer err;
- for (iter.first(); iter.isValid(); iter.next())
- {
- IMapping &cur = iter.query();
- StringAttr *item = mapping->mapToValue(&cur);
- const char *filename = (const char*)cur.getKey();
- const char *filetype = item->get();
- reply.appendf("<file><name>%s</name><type>%s</type></file>", filename, filetype);
- }
- }
- else if (stricmp(queryName, "control:listUnusedFiles")==0)
- {
- UNIMPLEMENTED;
- }
- else if (stricmp(queryName, "control:lockDali")==0)
- {
- topology->setPropBool("@lockDali", true);
- if (daliHelper)
- daliHelper->disconnect();
- saveTopology();
- }
- else if (stricmp(queryName, "control:logfullqueries")==0)
- {
- logFullQueries = control->getPropBool("@val", true);
- topology->setPropBool("@logFullQueries", logFullQueries);
- }
- else
- unknown = true;
- break;
- case 'M':
- if (stricmp(queryName, "control:memoryStatsInterval")==0)
- {
- memoryStatsInterval = (unsigned) control->getPropInt64("@val", 0);
- roxiemem::setMemoryStatsInterval(memoryStatsInterval);
- topology->setPropInt64("@memoryStatsInterval", memoryStatsInterval);
- }
- else if (stricmp(queryName, "control:memtrace")==0)
- {
- roxiemem::memTraceLevel = control->getPropInt("@level", 0);
- topology->setPropInt("@memTraceLevel", roxiemem::memTraceLevel);
- }
- else if (stricmp(queryName, "control:memtracesizelimit")==0)
- {
- roxiemem::memTraceSizeLimit = (memsize_t) control->getPropInt64("@val", control->getPropInt64("@value", 0)); // used to accept @value so coded like this for backward compatibility
- topology->setPropInt64("@memTraceSizeLimit", roxiemem::memTraceSizeLimit);
- }
- else if (stricmp(queryName, "control:metrics")==0)
- {
- roxieMetrics->getMetrics(reply);
- }
- else if (stricmp(queryName, "control:minFreeDiskSpace")==0)
- {
- minFreeDiskSpace = (unsigned) control->getPropInt64("@val", 1048576);
- topology->setPropInt64("@minFreeDiskSpace", minFreeDiskSpace);
- }
- else if (stricmp(queryName, "control:misctrace")==0)
- {
- miscDebugTraceLevel = control->getPropInt("@level", 0);
- topology->setPropInt("@miscDebugTraceLevel", miscDebugTraceLevel);
- }
- else
- unknown = true;
- break;
- case 'N':
- if (stricmp(queryName, "control:nodeCachePreload")==0)
- {
- nodeCachePreload = control->getPropBool("@val", true);
- topology->setPropBool("@nodeCachePreload", nodeCachePreload);
- setNodeCachePreload(nodeCachePreload);
- }
- else if (stricmp(queryName, "control:nodeCacheMem")==0)
- {
- nodeCacheMB = control->getPropInt("@val", 100);
- topology->setPropInt("@nodeCacheMem", nodeCacheMB);
- setNodeCacheMem(nodeCacheMB * 0x100000);
- }
- else if (stricmp(queryName, "control:numFilesToProcess")==0)
- {
- int numFiles = queryFileCache().numFilesToCopy();
- reply.appendf("<FilesToProcess value='%d'/>", numFiles);
- }
- else
- unknown = true;
- break;
- case 'P':
- if (stricmp(queryName, "control:parallelAggregate")==0)
- {
- parallelAggregate = control->getPropInt("@val", 0);
- if (!parallelAggregate)
- parallelAggregate = hdwInfo.numCPUs;
- if (!parallelAggregate)
- parallelAggregate = 1;
- topology->setPropInt("@parallelAggregate", parallelAggregate);
- }
- else if (stricmp(queryName, "control:pingInterval")==0)
- {
- unsigned newInterval = (unsigned) control->getPropInt64("@val", 0);
- if (newInterval && !pingInterval)
- {
- pingInterval = newInterval; // best to set before the start...
- startPingTimer();
- }
- else if (pingInterval && !newInterval)
- stopPingTimer(); // but after the stop
- pingInterval = newInterval;
- topology->setPropInt64("@pingInterval", pingInterval);
- }
- else if (stricmp(queryName, "control:preabortIndexReadsThreshold")==0)
- {
- preabortIndexReadsThreshold = control->getPropInt("@val", 100);
- topology->setPropInt("@preabortIndexReadsThreshold", preabortIndexReadsThreshold);
- }
- else if (stricmp(queryName, "control:preabortKeyedJoinsThreshold")==0)
- {
- preabortKeyedJoinsThreshold = control->getPropInt("@val", 100);
- topology->setPropInt("@preabortKeyedJoinsThreshold", preabortKeyedJoinsThreshold);
- }
- else if (stricmp(queryName, "control:probeAllRows")==0)
- {
- probeAllRows = control->getPropBool("@val", true);
- }
- else
- unknown = true;
- break;
- case 'Q':
- if (stricmp(queryName, "control:queries")==0)
- {
- getQueryInfo(control, reply, false, logctx);
- }
- else if (stricmp(queryName, "control:queryAggregates")==0)
- {
- time_t from;
- const char *fromTime = control->queryProp("@from");
- if (fromTime)
- {
- CDateTime f;
- f.setString(fromTime, NULL, true);
- from = f.getSimple();
- }
- else
- from = startupTime;
- time_t to;
- const char *toTime = control->queryProp("@to");
- if (toTime)
- {
- CDateTime t;
- t.setString(toTime, NULL, true);
- to = t.getSimple();
- }
- else
- time(&to);
- const char *id = control->queryProp("Query/@id");
- if (id)
- {
- Owned<IQueryFactory> f = getQuery(id, NULL, logctx);
- if (f)
- {
- Owned<const IPropertyTree> stats = f->getQueryStats(from, to);
- toXML(stats, reply);
- }
- else
- throw MakeStringException(ROXIE_CONTROL_MSG_ERROR, "Unknown query %s", id);
- }
- else
- {
- bool includeAllQueries = control->getPropBool("@all", true);
- Owned<const IPropertyTree> stats = getAllQueryStats(includeAllQueries, from, to);
- toXML(stats, reply);
- }
- }
- else if (stricmp(queryName, "control:queryPackageInfo")==0)
- {
- ReadLockBlock readBlock(packageCrit);
- allQueryPackages->getInfo(reply, logctx);
- }
- else if (stricmp(queryName, "control:queryStats")==0)
- {
- const char *id = control->queryProp("Query/@id");
- if (!id)
- badFormat();
- const char *action = control->queryProp("Query/@action");
- const char *graphName = 0;
- if (action)
- {
- if (stricmp(action, "listGraphNames") == 0)
- {
- Owned<IQueryFactory> query = getQuery(id, NULL, logctx);
- if (query)
- {
- reply.appendf("<Query id='%s'>\n", id);
- StringArray graphNames;
- query->getGraphNames(graphNames);
- ForEachItemIn(idx, graphNames)
- {
- const char *graphName = graphNames.item(idx);
- reply.appendf("<Graph id='%s'/>", graphName);
- }
- reply.appendf("</Query>\n");
- }
- return; // done
- }
- else if (stricmp(action, "selectGraph") == 0)
- graphName = control->queryProp("Query/@name");
- else if (stricmp(action, "allGraphs") != 0) // if we get here and its NOT allgraphs - then error
- throw MakeStringException(ROXIE_CONTROL_MSG_ERROR, "invalid action in control:queryStats %s", action);
- }
- ReadLockBlock readBlock(packageCrit);
- allQueryPackages->getStats(reply, id, action, graphName, logctx);
- }
- else if (stricmp(queryName, "control:queryWuid")==0)
- {
- UNIMPLEMENTED;
- }
- else
- unknown = true;
- break;
- case 'R':
- if (stricmp(queryName, "control:reload")==0)
- {
- reload(control->getPropBool("@forceRetry", false));
- if (daliHelper && daliHelper->connected())
- reply.appendf("<Dali connected='1'/>");
- else
- reply.appendf("<Dali connected='0'/>");
- ReadLockBlock readBlock(packageCrit);
- reply.appendf("<State hash='%"I64F"u'/>", (unsigned __int64) allQueryPackages->queryHash());
- }
- else if (stricmp(queryName, "control:resetindexmetrics")==0)
- {
- resetIndexMetrics();
- }
- else if (stricmp(queryName, "control:resetmetrics")==0)
- {
- roxieMetrics->resetMetrics();
- }
- else if (stricmp(queryName, "control:resetquerystats")==0)
- {
- ReadLockBlock readBlock(packageCrit);
- Owned<IPropertyTreeIterator> queries = control->getElements("Query");
- if (queries->first())
- {
- while (queries->isValid())
- {
- IPropertyTree &query = queries->query();
- const char *id = query.queryProp("@id");
- if (!id)
- badFormat();
- allQueryPackages->resetStats(id, logctx);
- queries->next();
- }
- }
- else
- allQueryPackages->resetStats(NULL, logctx);
- }
- else if (stricmp(queryName, "control:resetremotedalicache")==0)
- {
- queryNamedGroupStore().resetCache();
- }
- else if (stricmp(queryName, "control:restart")==0)
- {
- FatalError("Roxie process restarted by operator request");
- }
- else if (stricmp(queryName, "control:retrieveActivityDetails")==0)
- {
- UNIMPLEMENTED;
- }
- else if (stricmp(queryName, "control:retrieveFileInfo")==0)
- {
- UNIMPLEMENTED;
- }
- else if (stricmp(queryName, "control:roxiememstats") == 0)
- {
- StringBuffer memStats;
- queryMemoryPoolStats(memStats);
- reply.append("<MemoryStats>").append(memStats.str()).append("</MemoryStats>\n");
- }
- else
- unknown = true;
- break;
- case 'S':
- if (stricmp(queryName, "control:setCopyResources")==0)
- {
- copyResources = control->getPropBool("@val", true);
- topology->setPropBool("@copyResources", copyResources);
- }
- else if (stricmp(queryName, "control:simpleLocalKeyedJoins")==0)
- {
- simpleLocalKeyedJoins = control->getPropBool("@val", true);
- }
- else if (stricmp(queryName, "control:soapInfo")==0)
- {
- UNIMPLEMENTED;
- }
- else if (stricmp(queryName, "control:soapTrace")==0)
- {
- soapTraceLevel = control->getPropInt("@level", 0);
- topology->setPropInt("@soapTraceLevel", soapTraceLevel);
- }
- else if (stricmp(queryName, "control:socketCheckInterval")==0)
- {
- socketCheckInterval = (unsigned) control->getPropInt64("@val", 0);
- topology->setPropInt64("@socketCheckInterval", socketCheckInterval);
- }
- else if (stricmp(queryName, "control:state")==0)
- {
- if (daliHelper && daliHelper->connected())
- reply.appendf("<Dali connected='1'/>");
- else
- reply.appendf("<Dali connected='0'/>");
- ReadLockBlock readBlock(packageCrit);
- reply.appendf("<State hash='%"I64F"u'/>", (unsigned __int64) allQueryPackages->queryHash());
- }
- else if (stricmp(queryName, "control:steppingEnabled")==0)
- {
- steppingEnabled = control->getPropBool("@val", true);
- }
- else if (stricmp(queryName, "control:suspend")==0)
- {
- StringBuffer id(control->queryProp("Query/@id"));
- if (!id.length())
- badFormat();
- {
- Owned<IQueryFactory> f = getQuery(id, NULL, logctx);
- if (f)
- id.clear().append(f->queryQueryName()); // use the spelling of the query stored with the query factory
- }
- UNIMPLEMENTED;
- }
- else if (stricmp(queryName, "control:suspendChannel")==0)
- {
- if (control->hasProp("@channel") && control->hasProp("@suspend"))
- {
- unsigned channel = control->getPropInt("@channel", 0);
- bool suspend = control->getPropBool("@suspend", true);
- CriticalBlock b(ccdChannelsCrit);
- if (channel)
- {
- StringBuffer xpath;
- IPropertyTree *slaveNode = ccdChannels->queryPropTree(xpath.appendf("RoxieSlaveProcess[@channel='%u']", channel).str());
- if (slaveNode)
- {
- ROQ->suspendChannel(channel, suspend, logctx);
- slaveNode->setPropBool("@suspended", suspend);
- }
- else
- throw MakeStringException(ROXIE_INVALID_INPUT, "Unknown channel %u", channel);
- }
- else
- {
- Owned<IPropertyTreeIterator> slaves = ccdChannels->getElements("RoxieSlaveProcess");
- ForEach(*slaves)
- {
- IPropertyTree &slaveNode = slaves->query();
- channel = slaveNode.getPropInt("@channel", 0);
- ROQ->suspendChannel(channel, suspend, logctx);
- slaveNode.setPropBool("@suspended", suspend);
- }
- }
- toXML(ccdChannels, reply);
- }
- else
- badFormat();
- }
- else if (stricmp(queryName, "control:suspendServer")==0)
- {
- if (control->hasProp("@port") && control->hasProp("@suspend"))
- {
- unsigned port = control->getPropInt("@port", 0);
- bool suspend = control->getPropBool("@suspend", true);
- CriticalBlock b(ccdChannelsCrit);
- if (port)
- {
- StringBuffer xpath;
- IPropertyTree *serverNode = ccdChannels->queryPropTree(xpath.appendf("RoxieServerProcess[@port='%u']", port).str());
- if (serverNode)
- {
- suspendRoxieListener(port, suspend);
- serverNode->setPropBool("@suspended", suspend);
- }
- else
- throw MakeStringException(ROXIE_INVALID_INPUT, "Unknown Roxie server port %u", port);
- }
- else
- {
- Owned<IPropertyTreeIterator> servers = ccdChannels->getElements("RoxieServerProcess");
- ForEach(*servers)
- {
- IPropertyTree &serverNode = servers->query();
- port = serverNode.getPropInt("@port", 0);
- suspendRoxieListener(port, suspend);
- serverNode.setPropBool("@suspended", suspend);
- }
- }
- toXML(ccdChannels, reply);
- }
- else
- badFormat();
- }
- else if (stricmp(queryName, "control:systemMonitor")==0)
- {
- unsigned interval = control->getPropInt("@interval", 60000);
- bool enable = control->getPropBool("@enable", true);
- if (enable)
- startPerformanceMonitor(interval);
- else
- stopPerformanceMonitor();
- }
- //MORE: control:stats??
- else
- unknown = true;
- break;
- case 'T':
- if (stricmp(queryName, "control:testSlaveFailure")==0)
- {
- testSlaveFailure = control->getPropInt("@val", 20);
- }
- else if (stricmp(queryName, "control:timeActivities")==0)
- {
- defaultTimeActivities = control->getPropBool("@val", true);
- topology->setPropInt("@timeActivities", defaultTimeActivities);
- }
- else if (stricmp(queryName, "control:timings")==0)
- {
- reply.append("<Timings>");
- timer->getTimings(reply);
- reply.append("</Timings>");
- if (control->getPropBool("@reset", false))
- {
- timer->reset();
- }
- }
- else if (stricmp(queryName, "control:topology")==0)
- {
- toXML(topology, reply);
- }
- else if (stricmp(queryName, "control:trace")==0)
- {
- traceLevel = control->getPropInt("@level", 0);
- if (traceLevel > MAXTRACELEVEL)
- traceLevel = MAXTRACELEVEL;
- topology->setPropInt("@traceLevel", traceLevel);
- }
- else if (stricmp(queryName, "control:traceServerSideCache")==0)
- {
- traceServerSideCache = control->getPropBool("@val", true);
- topology->setPropInt("@traceServerSideCache", traceServerSideCache);
- }
- else if (stricmp(queryName, "control:traceJHtreeAllocations")==0)
- {
- traceJHtreeAllocations = control->getPropBool("@val", true);
- topology->setPropInt("@traceJHtreeAllocations", traceJHtreeAllocations);
- }
- else if (stricmp(queryName, "control:traceSmartStepping")==0)
- {
- traceSmartStepping = control->getPropBool("@val", true);
- topology->setPropInt("@traceSmartStepping", traceSmartStepping);
- }
- else if (stricmp(queryName, "control:traceStartStop")==0)
- {
- traceStartStop = control->getPropBool("@val", true);
- topology->setPropInt("@traceStartStop", traceStartStop);
- }
- else
- unknown = true;
- break;
- case 'U':
- if (stricmp(queryName, "control:udptrace")==0)
- {
- udpTraceLevel = control->getPropInt("@level", 0);
- topology->setPropInt("@udpTraceLevel", udpTraceLevel);
- }
- else if (stricmp(queryName, "control:unlockDali")==0)
- {
- topology->setPropBool("@lockDali", false);
- // Dali will reattach via the timer that checks every so often if can reattach...
- saveTopology();
- }
- else if (stricmp(queryName, "control:unsuspend")==0)
- {
- UNIMPLEMENTED;
- }
- else if (stricmp(queryName, "control:userMetric")==0)
- {
- const char *name = control->queryProp("@name");
- const char *regex= control->queryProp("@regex");
- if (name && regex)
- {
- roxieMetrics->addUserMetric(name, regex);
- // MORE - we could add to topology, we could check for dups, and we could support removing them.
- }
- else
- throw MakeStringException(ROXIE_MISSING_PARAMS, "Metric name or regex missing");
- }
- else if (stricmp(queryName, "control:useTreeCopy")==0)
- {
- useTreeCopy = control->getPropBool("@val", true);
- topology->setPropInt("@useTreeCopy", useTreeCopy);
- }
- else
- unknown = true;
- break;
- case 'W':
- if (stricmp(queryName, "control:watchActivityId")==0)
- {
- watchActivityId = control->getPropInt("@id", true);
- }
- else
- unknown = true;
- break;
- default:
- unknown = true;
- break;
- }
- if (unknown)
- throw MakeStringException(ROXIE_UNKNOWN_QUERY, "Unknown query %s", queryName);
- }
- void badFormat()
- {
- throw MakeStringException(ROXIE_INVALID_INPUT, "Badly formated control query");
- }
- };
- extern IRoxieQueryPackageManagerSet *createRoxiePackageSetManager(const IQueryDll *standAloneDll)
- {
- return new CRoxiePackageSetManager(standAloneDll);
- }
- IRoxieQueryPackageManagerSet *globalPackageSetManager = NULL;
- extern void loadPlugins()
- {
- if (pluginDirectory.length() && isDirectory(pluginDirectory.str()))
- {
- plugins = new SafePluginMap(&PluginCtx, traceLevel >= 1);
- plugins->loadFromDirectory(pluginDirectory);
- }
- }
- extern void cleanupPlugins()
- {
- delete plugins;
- plugins = NULL;
- }
- /*=======================================================================================================
- * mergeStats and associated code is used to combine the graph stats from multiple nodes in a cluster into
- * a single aggregate structure
- * It should be moved into ccdquery.cpp really
- *========================================================================================================*/
- typedef void (*mergefunc)(IPropertyTree *s1, IPropertyTree *s2, unsigned level);
- struct MergeInfo
- {
- const char *element;
- const char *attribute;
- mergefunc f;
- };
- void mergeSubGraphs(IPropertyTree *s1, IPropertyTree *s2, unsigned);
- void mergeNodes(IPropertyTree *s1, IPropertyTree *s2)
- {
- Owned<IPropertyTreeIterator> elems = s1->getElements("att");
- ForEach(*elems)
- {
- IPropertyTree &e1 = elems->query();
- unsigned __int64 v1 = e1.getPropInt64("@value", 0);
- const char *name = e1.queryProp("@name");
- if (stricmp(name, "_kind")==0 && v1 == TAKsubgraph)
- {
- IPropertyTree *s1child = s1->queryPropTree("att/graph");
- IPropertyTree *s2child = s2->queryPropTree("att[@name='_kind']/graph");
- if (s1child && s2child)
- {
- mergeSubGraphs(s1child, s2child, 0);
- s2->removeProp("att[@name='_kind']");
- }
- }
- else
- {
- StringBuffer xpath;
- xpath.appendf("att[@name='%s']", name);
- const char *type = e1.queryProp("@type");
- if (type)
- {
- IPropertyTree *e2 = s2->queryPropTree(xpath.str());
- if (e2)
- {
- unsigned __int64 v2 = e2->getPropInt64("@value", 0);
- if (strcmp(name, "max")==0)
- {
- if (v2 > v1)
- e1.setPropInt64("@value", v2);
- }
- else if (strcmp(type, "min")==0)
- {
- if (v2 < v1)
- e1.setPropInt64("@value", v2);
- }
- else if (strcmp(type, "sum")==0)
- e1.setPropInt64("@value", v1+v2);
- else
- throw MakeStringException(ROXIE_UNKNOWN_QUERY, "Unknown type %s in graph statistics", type);
- s2->removeTree(e2);
- }
- }
- else
- {
- // remove from s2 any complete dups
- const char *s1val = e1.queryProp("@value");
- Owned<IPropertyTreeIterator> s2elems = s2->getElements(xpath.str());
- IArrayOf<IPropertyTree> goers;
- ForEach(*s2elems)
- {
- IPropertyTree &e2 = s2elems->query();
- const char *s2val = e2.queryProp("@value");
- if ((!s1val && !s2val) || (s1val && s2val && strcmp(s1val, s2val)==0))
- goers.append(*LINK(&e2));
- }
- ForEachItemIn(idx, goers)
- {
- s2->removeTree(&goers.item(idx));
- }
- }
- }
- }
- elems.setown(s2->getElements("*"));
- ForEach(*elems)
- {
- IPropertyTree &e2 = elems->query();
- s1->addPropTree(e2.queryName(), LINK(&e2));
- }
- }
- void mergeSubGraphs(IPropertyTree *s1, IPropertyTree *s2, unsigned)
- {
- Owned<IPropertyTreeIterator> elems = s1->getElements("*");
- ForEach(*elems)
- {
- IPropertyTree &e1 = elems->query();
- const char *elemName = e1.queryName();
- StringBuffer xpath;
- if (strcmp(elemName, "att")==0)
- {
- xpath.appendf("att[@name='%s']", e1.queryProp("@name"));
- IPropertyTree *e2 = s2->queryPropTree(xpath.str());
- if (e2)
- s2->removeTree(e2);
- }
- else
- {
- xpath.appendf("%s[@id='%s']", elemName, e1.queryProp("@id"));
- IPropertyTree *e2 = s2->queryPropTree(xpath.str());
- if (e2)
- {
- mergeNodes(&e1, e2);
- s2->removeTree(e2);
- }
- }
- }
- elems.setown(s2->getElements("*"));
- ForEach(*elems)
- {
- IPropertyTree &e2 = elems->query();
- s1->addPropTree(e2.queryName(), LINK(&e2));
- }
- }
- void mergeNode(IPropertyTree *s1, IPropertyTree *s2, unsigned level);
- MergeInfo mergeTable[] =
- {
- {"Query", "@id", mergeStats},
- {"Graph", "@id", mergeStats},
- {"xgmml", NULL, mergeStats},
- {"graph", NULL, mergeStats},
- {"node", "@id", mergeNode},
- {"att", NULL, mergeStats},
- {"graph", NULL, mergeSubGraphs},
- };
- void mergeNode(IPropertyTree *s1, IPropertyTree *s2, unsigned level)
- {
- if (s1->hasProp("att/@name"))
- mergeNodes(s1, s2);
- else
- mergeStats(s1, s2, level);
- }
- void mergeStats(IPropertyTree *s1, IPropertyTree *s2, unsigned level)
- {
- MergeInfo & mi = mergeTable[level];
- Owned<IPropertyTreeIterator> elems = s1->getElements(mi.element);
- ForEach(*elems)
- {
- IPropertyTree &e1 = elems->query();
- StringBuffer xpath;
- if (mi.attribute)
- xpath.appendf("%s[%s='%s']", mi.element, mi.attribute, e1.queryProp(mi.attribute));
- else
- xpath.append(mi.element);
- IPropertyTree *e2 = s2->queryPropTree(xpath.str());
- if (e2)
- {
- mi.f(&e1, e2, level+1);
- s2->removeTree(e2);
- }
- }
- elems.setown(s2->getElements(mi.element));
- ForEach(*elems)
- {
- IPropertyTree &e2 = elems->query();
- s1->addPropTree(mi.element, LINK(&e2));
- }
- }
- void mergeStats(IPropertyTree *s1, IPropertyTree *s2)
- {
- Owned<IPropertyTreeIterator> elems = s2->getElements("Exception");
- ForEach(*elems)
- {
- s1->addPropTree("Exception", LINK(&elems->query()));
- }
- mergeStats(s1, s2, 0);
- }
- void mergeQueries(IPropertyTree *dest, IPropertyTree *src)
- {
- IPropertyTree *destQueries = ensurePTree(dest, "Queries");
- IPropertyTree *srcQueries = src->queryPropTree("Queries");
- if (!srcQueries)
- return;
- destQueries->setPropInt("@reporting", destQueries->getPropInt("@reporting") + srcQueries->getPropInt("@reporting"));
- Owned<IPropertyTreeIterator> it = srcQueries->getElements("Query");
- ForEach(*it)
- {
- IPropertyTree *srcQuery = &it->query();
- const char *id = srcQuery->queryProp("@id");
- if (!id || !*id)
- continue;
- VStringBuffer xpath("Query[@id='%s']", id);
- IPropertyTree *destQuery = destQueries->queryPropTree(xpath);
- if (!destQuery)
- {
- destQueries->addPropTree("Query", LINK(srcQuery));
- continue;
- }
- int suspended = destQuery->getPropInt("@suspended") + srcQuery->getPropInt("@suspended"); //keep count to recognize "partially suspended" queries
- mergePTree(destQuery, srcQuery);
- if (suspended)
- destQuery->setPropInt("@suspended", suspended);
- }
- }
- #ifdef _USE_CPPUNIT
- #include <cppunit/extensions/HelperMacros.h>
- static const char *g1 =
- "<Stats>"
- "<Query id='stats'>"
- "<Graph id='graph1'>"
- "<xgmml>"
- "<graph>"
- "<node id='1'>"
- "<att>"
- "<graph>"
- "<node id='2' label='Temp Table'>"
- "<att name='name' value='d'/>"
- "<att name='_kind' value='25'/>"
- "<att name='helper' value='f2'/>"
- "</node>"
- "<node id='2a'>"
- " <att name='_kind' value='1'>" // TAKsubgraph
- " <graph>"
- " <node id='7696' label='Nested'>"
- " <att name='seeks' value='15' type='sum'/>"
- " </node>"
- " </graph>"
- " </att>"
- "</node>"
- "<node id='3' label='Filter'>"
- "<att name='name' value='ds'/>"
- "<att name='_kind' value='5'/>"
- "<att name='helper' value='f3'/>"
- "</node>"
- "<att name='rootGraph' value='1'/>"
- "<edge id='2_0' source='2' target='3'>"
- "<att name='count' value='15' type='sum'/>"
- "<att name='started' value='1'/>"
- "<att name='stopped' value='1'/>"
- "</edge>"
- "<edge id='3_0' source='3' target='5'>"
- "<att name='count' value='15' type='sum'/>"
- "<att name='started' value='1'/>"
- "<att name='stopped' value='1'/>"
- "</edge>"
- "<edge id='5_0' source='5' target='6'>"
- "<att name='count' value='3' type='sum'/>"
- "<att name='started' value='1'/>"
- "<att name='stopped' value='1'/>"
- "</edge>"
- "<edge id='5_1' source='5' target='7'>"
- "<att name='_sourceIndex' value='1'/>"
- "<att name='count' value='15' type='sum'/>"
- "<att name='started' value='1'/>"
- "<att name='stopped' value='1'/>"
- "</edge>"
- "</graph>"
- "</att>"
- "</node>"
- "</graph>"
- "</xgmml>"
- "</Graph>"
- "</Query>"
- "</Stats>";
- static const char *g2 =
- "<Stats>"
- "<Query id='stats'>"
- "<Graph id='graph1'>"
- "<xgmml>"
- "<graph>"
- "<node id='1'>"
- "<att>"
- "<graph>"
- "<node id='2' label='Temp Table'>"
- "<att name='name' value='d'/>"
- "<att name='_kind' value='25'/>"
- "<att name='helper' value='f2'/>"
- "</node>"
- "<node id='2a'>"
- " <att name='_kind' value='1'>" // TAKsubgraph
- " <graph>"
- " <node id='7696' label='Nested'>"
- " <att name='seeks' value='25' type='sum'/>"
- " </node>"
- " </graph>"
- " </att>"
- "</node>"
- "<node id='4' label='Filter2'>"
- "<att name='name' value='ds2'/>"
- "<att name='_kind' value='53'/>"
- "<att name='helper' value='f23'/>"
- "</node>"
- "<att name='rootGraph' value='1'/>"
- "<edge id='2_0' source='2' target='3'>"
- "<att name='count' value='15' type='sum'/>"
- "<att name='started' value='1'/>"
- "<att name='stopped' value='1'/>"
- "</edge>"
- "<edge id='3_0' source='3' target='5'>"
- "<att name='count' value='15' type='sum'/>"
- "<att name='started' value='1'/>"
- "<att name='stopped' value='1'/>"
- "</edge>"
- "<edge id='5_0' source='5' target='6'>"
- "<att name='count' value='3' type='sum'/>"
- "<att name='started' value='1'/>"
- "<att name='stopped' value='1'/>"
- "</edge>"
- "</graph>"
- "</att>"
- "</node>"
- "</graph>"
- "</xgmml>"
- "</Graph>"
- "</Query>"
- "</Stats>";
- static const char *expected =
- "<Stats>"
- "<Query id='stats'>"
- "<Graph id='graph1'>"
- "<xgmml>"
- "<graph>"
- "<node id='1'>"
- "<att>"
- "<graph>"
- "<node id='2' label='Temp Table'>"
- "<att name='name' value='d'/>"
- "<att name='_kind' value='25'/>"
- "<att name='helper' value='f2'/>"
- "</node>"
- "<node id='2a'>"
- " <att name='_kind' value='1'>" // TAKsubgraph
- " <graph>"
- " <node id='7696' label='Nested'>"
- " <att name='seeks' type='sum' value='40'/>"
- " </node>"
- " </graph>"
- " </att>"
- "</node>"
- "<node id='3' label='Filter'>"
- "<att name='name' value='ds'/>"
- "<att name='_kind' value='5'/>"
- "<att name='helper' value='f3'/>"
- "</node>"
- "<node id='4' label='Filter2'>"
- "<att name='name' value='ds2'/>"
- "<att name='_kind' value='53'/>"
- "<att name='helper' value='f23'/>"
- "</node>"
- "<att name='rootGraph' value='1'/>"
- "<edge id='2_0' source='2' target='3'>"
- "<att name='count' value='30' type='sum'/>"
- "<att name='started' value='1'/>"
- "<att name='stopped' value='1'/>"
- "</edge>"
- "<edge id='3_0' source='3' target='5'>"
- "<att name='count' value='30' type='sum'/>"
- "<att name='started' value='1'/>"
- "<att name='stopped' value='1'/>"
- "</edge>"
- "<edge id='5_0' source='5' target='6'>"
- "<att name='count' value='6' type='sum'/>"
- "<att name='started' value='1'/>"
- "<att name='stopped' value='1'/>"
- "</edge>"
- "<edge id='5_1' source='5' target='7'>"
- "<att name='_sourceIndex' value='1'/>"
- "<att name='count' value='15' type='sum'/>"
- "<att name='started' value='1'/>"
- "<att name='stopped' value='1'/>"
- "</edge>"
- "</graph>"
- "</att>"
- "</node>"
- "</graph>"
- "</xgmml>"
- "</Graph>"
- "</Query>"
- "</Stats>"
- ;
- class MergeStatsTest : public CppUnit::TestFixture
- {
- CPPUNIT_TEST_SUITE( MergeStatsTest );
- CPPUNIT_TEST(test1);
- CPPUNIT_TEST_SUITE_END();
- protected:
- void test1()
- {
- Owned<IPropertyTree> p1 = createPTreeFromXMLString(g1);
- Owned<IPropertyTree> p2 = createPTreeFromXMLString(g2);
- Owned<IPropertyTree> e = createPTreeFromXMLString(expected);
- mergeStats(p1, p2);
- StringBuffer s1, s2;
- toXML(p1, s1);
- toXML(e, s2);
- CPPUNIT_ASSERT(strcmp(s1, s2)==0);
- }
- };
- CPPUNIT_TEST_SUITE_REGISTRATION( MergeStatsTest );
- CPPUNIT_TEST_SUITE_NAMED_REGISTRATION( MergeStatsTest, "MergeStatsTest" );
- #endif
|