123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323132413251326132713281329133013311332133313341335133613371338133913401341134213431344134513461347134813491350135113521353135413551356135713581359136013611362136313641365136613671368136913701371137213731374137513761377137813791380138113821383138413851386138713881389139013911392139313941395139613971398139914001401140214031404140514061407140814091410141114121413141414151416141714181419142014211422142314241425142614271428142914301431143214331434143514361437143814391440144114421443144414451446144714481449145014511452145314541455145614571458145914601461146214631464146514661467146814691470147114721473147414751476147714781479148014811482148314841485148614871488148914901491149214931494149514961497149814991500150115021503150415051506150715081509151015111512151315141515151615171518151915201521152215231524152515261527152815291530153115321533153415351536153715381539154015411542154315441545154615471548154915501551155215531554155515561557155815591560156115621563156415651566156715681569157015711572157315741575157615771578157915801581158215831584158515861587158815891590159115921593159415951596159715981599160016011602160316041605160616071608160916101611161216131614161516161617161816191620162116221623162416251626162716281629163016311632163316341635163616371638163916401641164216431644164516461647164816491650165116521653165416551656165716581659166016611662166316641665166616671668166916701671167216731674167516761677167816791680168116821683168416851686168716881689169016911692169316941695169616971698169917001701170217031704170517061707170817091710171117121713171417151716171717181719172017211722172317241725172617271728172917301731173217331734173517361737173817391740174117421743174417451746174717481749175017511752175317541755175617571758175917601761176217631764176517661767176817691770177117721773177417751776177717781779178017811782178317841785178617871788178917901791179217931794179517961797179817991800180118021803180418051806180718081809181018111812181318141815181618171818181918201821182218231824182518261827182818291830183118321833183418351836183718381839184018411842184318441845184618471848184918501851185218531854185518561857185818591860186118621863186418651866186718681869187018711872187318741875187618771878187918801881188218831884188518861887188818891890189118921893189418951896189718981899190019011902190319041905190619071908190919101911191219131914191519161917191819191920192119221923192419251926192719281929193019311932193319341935193619371938193919401941194219431944194519461947194819491950195119521953195419551956195719581959196019611962196319641965196619671968196919701971197219731974197519761977 |
- /*##############################################################################
- HPCC SYSTEMS software Copyright (C) 2012 HPCC Systems®.
- Licensed under the Apache License, Version 2.0 (the "License");
- you may not use this file except in compliance with the License.
- You may obtain a copy of the License at
- http://www.apache.org/licenses/LICENSE-2.0
- Unless required by applicable law or agreed to in writing, software
- distributed under the License is distributed on an "AS IS" BASIS,
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- See the License for the specific language governing permissions and
- limitations under the License.
- ############################################################################## */
- #include <platform.h>
- #include <jlib.hpp>
- #include "ccd.hpp"
- #include "ccdquery.hpp"
- #include "ccdstate.hpp"
- #include "ccdsnmp.hpp"
- #include "ccdserver.hpp"
- #include "ccdcontext.hpp"
- #include "thorplugin.hpp"
- void ActivityArray::append(IActivityFactory &cur)
- {
- hash.setValue(cur.queryId(), activities.ordinality());
- activities.append(cur);
- }
- unsigned ActivityArray::findActivityIndex(unsigned id) const
- {
- unsigned *ret = hash.getValue(id);
- if (ret)
- return *ret;
- return NotFound;
- }
- unsigned ActivityArray::recursiveFindActivityIndex(unsigned id)
- {
- // NOTE - this returns the activity index of the PARENT of the specified activity
- unsigned *ret = hash.getValue(id);
- if (ret)
- return *ret;
- ForEachItem(idx)
- {
- IActivityFactory & cur = item(idx);
- unsigned childId;
- for (unsigned childIdx = 0;;childIdx++)
- {
- ActivityArray * children = cur.queryChildQuery(childIdx, childId);
- if (!children)
- break;
- if (children->recursiveFindActivityIndex(id) != NotFound)
- {
- hash.setValue(id, idx);
- return idx;
- }
- }
- }
- return NotFound;
- }
- //----------------------------------------------------------------------------------------------
- // Class CQueryDll maps dlls into loadable workunits, complete with caching to ensure that a refresh of the QuerySet
- // can avoid reloading dlls, and that the same CQueryDll (and the objects it owns) can be shared between server and
- // multiple slave channels
- //----------------------------------------------------------------------------------------------
- class CQueryDll : implements IQueryDll, public CInterface
- {
- StringAttr dllName;
- Owned <ILoadedDllEntry> dll;
- Owned <IConstWorkUnit> wu;
- static CriticalSection dllCacheLock;
- static CopyMapStringToMyClass<CQueryDll> dllCache;
- public:
- IMPLEMENT_IINTERFACE;
- CQueryDll(const char *_dllName, ILoadedDllEntry *_dll) : dllName(_dllName), dll(_dll)
- {
- StringBuffer wuXML;
- if (!selfTestMode && getEmbeddedWorkUnitXML(dll, wuXML))
- {
- Owned<ILocalWorkUnit> localWU = createLocalWorkUnit(wuXML);
- wu.setown(localWU->unlock());
- }
- CriticalBlock b(dllCacheLock);
- dllCache.setValue(dllName, this);
- }
- virtual void beforeDispose()
- {
- CriticalBlock b(dllCacheLock);
- // NOTE: it's theoretically possible for the final release to happen after a replacement has been inserted into hash table.
- // So only remove from hash table if what we find there matches the item that is being deleted.
- CQueryDll *goer = dllCache.getValue(dllName);
- if (goer == this)
- dllCache.remove(dllName);
- }
- static const CQueryDll *getQueryDll(const char *dllName, bool isExe)
- {
- CriticalBlock b(dllCacheLock);
- CQueryDll *dll = LINK(dllCache.getValue(dllName));
- if (dll && dll->isAlive())
- return dll;
- else
- {
- Owned<ILoadedDllEntry> dll = isExe ? createExeDllEntry(dllName) : queryRoxieDllServer().loadDll(dllName, DllLocationDirectory);
- assertex(dll != NULL);
- return new CQueryDll(dllName, dll.getClear());
- }
- }
- static const IQueryDll *getWorkUnitDll(IConstWorkUnit *wu)
- {
- SCMStringBuffer dllName;
- Owned<IConstWUQuery> q = wu->getQuery();
- q->getQueryDllName(dllName);
- if (dllName.length() == 0)
- {
- if (wu->getCodeVersion() == 0)
- throw makeStringExceptionV(ROXIE_MISSING_DLL, "Attempting to load workunit %s that hasn't been compiled", wu->queryWuid());
- else
- throw makeStringExceptionV(ROXIE_MISSING_DLL, "Attempting to load workunit %s with no associated dll", wu->queryWuid());
- }
- return getQueryDll(dllName.str(), false);
- }
- virtual HelperFactory *getFactory(const char *helperName) const
- {
- return (HelperFactory *) dll->getEntry(helperName);
- }
- virtual ILoadedDllEntry *queryDll() const
- {
- return dll;
- }
- virtual IConstWorkUnit *queryWorkUnit() const
- {
- return wu;
- }
- };
- CriticalSection CQueryDll::dllCacheLock;
- CopyMapStringToMyClass<CQueryDll> CQueryDll::dllCache;
- extern const IQueryDll *createQueryDll(const char *dllName)
- {
- return CQueryDll::getQueryDll(dllName, false);
- }
- extern const IQueryDll *createExeQueryDll(const char *exeName)
- {
- return CQueryDll::getQueryDll(exeName, true);
- }
- extern const IQueryDll *createWuQueryDll(IConstWorkUnit *wu)
- {
- return CQueryDll::getWorkUnitDll(wu);
- }
- // Add information to the xref information to be returned for a control:getQueryXrefInfo request
- IPropertyTree * addXrefInfo(IPropertyTree &reply, const char *section, const char *name)
- {
- VStringBuffer xpath("%s[@name='%s']", section, name);
- if (!reply.hasProp(xpath))
- {
- IPropertyTree *info = reply.addPropTree(section);
- info->setProp("@name", name);
- return info;
- }
- return NULL;
- }
- extern void addXrefFileInfo(IPropertyTree &reply, const IResolvedFile *dataFile)
- {
- if (dataFile->isSuperFile())
- {
- IPropertyTree *info = addXrefInfo(reply, "SuperFile", dataFile->queryFileName());
- if (info)
- {
- int numSubs = dataFile->numSubFiles();
- for (int i = 0; i < numSubs; i++)
- {
- StringBuffer subName;
- dataFile->getSubFileName(i, subName);
- addXrefInfo(*info, "File", subName.str());
- }
- }
- }
- else
- addXrefInfo(reply, "File", dataFile->queryFileName());
- }
- extern void addXrefLibraryInfo(IPropertyTree &reply, const char *libraryName)
- {
- addXrefInfo(reply, "Library", libraryName);
- }
- //----------------------------------------------------------------------------------------------
- // Class CSharedOnceContext manages the context for a query's ONCE code, which is shared between
- // all slave and server contexts on a node
- //----------------------------------------------------------------------------------------------
- class CSharedOnceContext : public CInterfaceOf<ISharedOnceContext>
- {
- public:
- CSharedOnceContext()
- {
- }
- ~CSharedOnceContext()
- {
- }
- virtual IDeserializedResultStore &queryOnceResultStore() const
- {
- assertex(onceResultStore!= NULL);
- return *onceResultStore;
- }
- virtual IPropertyTree &queryOnceContext(const IQueryFactory *factory, const IRoxieContextLogger &logctx) const
- {
- checkOnceDone(factory, logctx);
- assertex(onceContext != NULL);
- return *onceContext;
- }
- virtual void checkOnceDone(const IQueryFactory *factory, const IRoxieContextLogger &logctx) const
- {
- CriticalBlock b(onceCrit);
- if (!onceContext)
- {
- onceContext.setown(createPTree(ipt_lowmem));
- onceResultStore.setown(createDeserializedResultStore());
- Owned <IRoxieServerContext> ctx = createOnceServerContext(factory, logctx);
- onceManager.set(&ctx->queryRowManager());
- try
- {
- ctx->process();
- ctx->done(false);
- }
- catch (IException *E)
- {
- ctx->done(true);
- onceException.setown(E);
- }
- catch (...)
- {
- ctx->done(true);
- onceException.setown(MakeStringException(ROXIE_INTERNAL_ERROR, "Unknown exception in ONCE code"));
- }
- }
- if (onceException)
- throw onceException.getLink();
- }
- protected:
- mutable CriticalSection onceCrit;
- mutable Owned<roxiemem::IRowManager> onceManager; // release AFTER resultStore
- mutable Owned<IPropertyTree> onceContext;
- mutable Owned<IDeserializedResultStore> onceResultStore;
- mutable Owned<IException> onceException;
- };
- //----------------------------------------------------------------------------------------------
- // Class CQueryOptions is used to store options affecting the execution of a query
- // These can be set globally, byt he query workunit, or by the query XML parameters
- //----------------------------------------------------------------------------------------------
- QueryOptions::QueryOptions()
- {
- priority = 0;
- timeLimit = defaultTimeLimit[0];
- warnTimeLimit = defaultWarnTimeLimit[0];
- memoryLimit = defaultMemoryLimit;
- parallelJoinPreload = defaultParallelJoinPreload;
- fullKeyedJoinPreload = defaultFullKeyedJoinPreload;
- keyedJoinPreload = defaultKeyedJoinPreload;
- concatPreload = defaultConcatPreload;
- fetchPreload = defaultFetchPreload;
- prefetchProjectPreload = defaultPrefetchProjectPreload;
- bindCores = coresPerQuery;
- strandBlockSize = defaultStrandBlockSize;
- forceNumStrands = defaultForceNumStrands;
- heapFlags = defaultHeapFlags;
- checkingHeap = defaultCheckingHeap;
- disableLocalOptimizations = defaultDisableLocalOptimizations;
- enableFieldTranslation = fieldTranslationEnabled;
- skipFileFormatCrcCheck = false;
- stripWhitespaceFromStoredDataset = ((ptr_ignoreWhiteSpace & defaultXmlReadFlags) != 0);
- timeActivities = defaultTimeActivities;
- traceEnabled = defaultTraceEnabled;
- traceLimit = defaultTraceLimit;
- allSortsMaySpill = false; // No global default for this
- failOnLeaks = false;
- }
- QueryOptions::QueryOptions(const QueryOptions &other)
- {
- priority = other.priority;
- timeLimit = other.timeLimit;
- warnTimeLimit = other.warnTimeLimit;
- memoryLimit = other.memoryLimit;
- parallelJoinPreload = other.parallelJoinPreload;;
- fullKeyedJoinPreload = other.fullKeyedJoinPreload;
- keyedJoinPreload = other.keyedJoinPreload;
- concatPreload = other.concatPreload;
- fetchPreload = other.fetchPreload;
- prefetchProjectPreload = other.prefetchProjectPreload;
- bindCores = other.bindCores;
- strandBlockSize = other.strandBlockSize;
- forceNumStrands = other.forceNumStrands;
- heapFlags = other.heapFlags;
- checkingHeap = other.checkingHeap;
- disableLocalOptimizations = other.disableLocalOptimizations;
- enableFieldTranslation = other.enableFieldTranslation;
- skipFileFormatCrcCheck = other.skipFileFormatCrcCheck;
- stripWhitespaceFromStoredDataset = other.stripWhitespaceFromStoredDataset;
- timeActivities = other.timeActivities;
- traceEnabled = other.traceEnabled;
- traceLimit = other.traceLimit;
- allSortsMaySpill = other.allSortsMaySpill;
- failOnLeaks = other.failOnLeaks;
- }
- void QueryOptions::setFromWorkUnit(IConstWorkUnit &wu, const IPropertyTree *stateInfo)
- {
- // calculate priority before others since it affects the defaults of others
- updateFromWorkUnit(priority, wu, "priority");
- if (stateInfo)
- updateFromContext(priority, stateInfo, "@priority");
- timeLimit = defaultTimeLimit[priority];
- warnTimeLimit = defaultWarnTimeLimit[priority];
- updateFromWorkUnit(timeLimit, wu, "timeLimit");
- updateFromWorkUnit(warnTimeLimit, wu, "warnTimeLimit");
- updateFromWorkUnitM(memoryLimit, wu, "memoryLimit");
- if (stateInfo)
- {
- updateFromContext(timeLimit, stateInfo, "@timeLimit");
- updateFromContext(warnTimeLimit, stateInfo, "@warnTimeLimit");
- updateFromContextM(memoryLimit, stateInfo, "@memoryLimit");
- }
- updateFromWorkUnit(parallelJoinPreload, wu, "parallelJoinPreload");
- updateFromWorkUnit(fullKeyedJoinPreload, wu, "fullKeyedJoinPreload");
- updateFromWorkUnit(keyedJoinPreload, wu, "keyedJoinPreload");
- updateFromWorkUnit(concatPreload, wu, "concatPreload");
- updateFromWorkUnit(fetchPreload, wu, "fetchPreload");
- updateFromWorkUnit(prefetchProjectPreload, wu, "prefetchProjectPreload");
- updateFromWorkUnit(bindCores, wu, "bindCores");
- updateFromWorkUnit(strandBlockSize, wu, "strandBlockSize");
- updateFromWorkUnit(forceNumStrands, wu, "forceNumStrands");
- updateFromWorkUnit(heapFlags, wu, "heapFlags");
- updateFromWorkUnit(checkingHeap, wu, "checkingHeap");
- updateFromWorkUnit(disableLocalOptimizations, wu, "disableLocalOptimizations");
- updateFromWorkUnit(enableFieldTranslation, wu, "layoutTranslationEnabled"); // Name is different for compatibility reasons
- updateFromWorkUnit(skipFileFormatCrcCheck, wu, "skipFileFormatCrcCheck");
- updateFromWorkUnit(stripWhitespaceFromStoredDataset, wu, "stripWhitespaceFromStoredDataset");
- updateFromWorkUnit(timeActivities, wu, "timeActivities");
- updateFromWorkUnit(traceEnabled, wu, "traceEnabled");
- updateFromWorkUnit(traceLimit, wu, "traceLimit");
- updateFromWorkUnit(allSortsMaySpill, wu, "allSortsMaySpill");
- updateFromWorkUnit(failOnLeaks, wu, "failOnLeaks");
- }
- void QueryOptions::updateFromWorkUnitM(memsize_t &value, IConstWorkUnit &wu, const char *name)
- {
- value = (memsize_t) wu.getDebugValueInt64(name, value);
- }
- void QueryOptions::updateFromWorkUnit(int &value, IConstWorkUnit &wu, const char *name)
- {
- value = wu.getDebugValueInt(name, value);
- }
- void QueryOptions::updateFromWorkUnit(unsigned &value, IConstWorkUnit &wu, const char *name)
- {
- value = (unsigned) wu.getDebugValueInt(name, value);
- }
- void QueryOptions::updateFromWorkUnit(bool &value, IConstWorkUnit &wu, const char *name)
- {
- value = wu.getDebugValueBool(name, value);
- }
- void QueryOptions::updateFromWorkUnit(RecordTranslationMode &value, IConstWorkUnit &wu, const char *name)
- {
- SCMStringBuffer val;
- wu.getDebugValue(name, val);
- if (val.length())
- value = getTranslationMode(val.str());
- }
- void QueryOptions::setFromContext(const IPropertyTree *ctx)
- {
- if (ctx)
- {
- // Note: priority cannot be set at context level
- updateFromContext(timeLimit, ctx, "@timeLimit", "_TimeLimit");
- updateFromContext(warnTimeLimit, ctx, "@warnTimeLimit", "_WarnTimeLimit");
- updateFromContextM(memoryLimit, ctx, "@memoryLimit", "_MemoryLimit");
- updateFromContext(parallelJoinPreload, ctx, "@parallelJoinPreload", "_ParallelJoinPreload");
- updateFromContext(fullKeyedJoinPreload, ctx, "@fullKeyedJoinPreload", "_FullKeyedJoinPreload");
- updateFromContext(keyedJoinPreload, ctx, "@keyedJoinPreload", "_KeyedJoinPreload");
- updateFromContext(concatPreload, ctx, "@concatPreload", "_ConcatPreload");
- updateFromContext(fetchPreload, ctx, "@fetchPreload", "_FetchPreload");
- updateFromContext(prefetchProjectPreload, ctx, "@prefetchProjectPreload", "_PrefetchProjectPreload");
- updateFromContext(bindCores, ctx, "@bindCores", "_bindCores");
- updateFromContext(strandBlockSize, ctx, "@strandBlockSize", "_strandBlockSize");
- updateFromContext(forceNumStrands, ctx, "@forceNumStrands", "_forceNumStrands");
- updateFromContext(heapFlags, ctx, "@heapFlags", "_HeapFlags");
- updateFromContext(checkingHeap, ctx, "@checkingHeap", "_CheckingHeap");
- // Note: disableLocalOptimizations is not permitted at context level (too late)
- // Note: enableFieldTranslation is not permitted at context level (generally too late anyway)
- // Note: skipFileFormatCrcCheck is not permitted at context level (generally too late anyway)
- updateFromContext(stripWhitespaceFromStoredDataset, ctx, "_StripWhitespaceFromStoredDataset", "@stripWhitespaceFromStoredDataset");
- updateFromContext(timeActivities, ctx, "@timeActivities", "_TimeActivities");
- updateFromContext(traceEnabled, ctx, "@traceEnabled", "_TraceEnabled");
- updateFromContext(traceLimit, ctx, "@traceLimit", "_TraceLimit");
- // Note: allSortsMaySpill is not permitted at context level (too late anyway, unless I refactored)
- updateFromContext(failOnLeaks, ctx, "@failOnLeaks", "_FailOnLeaks");
- }
- }
- const char * QueryOptions::findProp(const IPropertyTree *ctx, const char *name1, const char *name2)
- {
- if (name1 && ctx->hasProp(name1))
- return name1;
- else if (name2 && ctx->hasProp(name2))
- return name2;
- else
- return NULL;
- }
- void QueryOptions::updateFromContextM(memsize_t &value, const IPropertyTree *ctx, const char *name1, const char *name2)
- {
- const char *name = findProp(ctx, name1, name2);
- if (name)
- value = (memsize_t) ctx->getPropInt64(name);
- }
- void QueryOptions::updateFromContext(int &value, const IPropertyTree *ctx, const char *name1, const char *name2)
- {
- const char *name = findProp(ctx, name1, name2);
- if (name)
- value = ctx->getPropInt(name);
- }
- void QueryOptions::updateFromContext(unsigned &value, const IPropertyTree *ctx, const char *name1, const char *name2)
- {
- const char *name = findProp(ctx, name1, name2);
- if (name)
- value = (unsigned) ctx->getPropInt(name);
- }
- void QueryOptions::updateFromContext(bool &value, const IPropertyTree *ctx, const char *name1, const char *name2)
- {
- const char *name = findProp(ctx, name1, name2);
- if (name)
- value = ctx->getPropBool(name);
- }
- void QueryOptions::setFromSlaveLoggingFlags(unsigned loggingFlags)
- {
- // MORE - priority/timelimit ?
- checkingHeap = (loggingFlags & LOGGING_CHECKINGHEAP) != 0;
- timeActivities = (loggingFlags & LOGGING_TIMEACTIVITIES) != 0;
- }
- //----------------------------------------------------------------------------------------------
- // Class CQueryFactory is the main implementation of IQueryFactory, combining a IQueryDll and a
- // package context into an object that can quickly create a the query context that executes a specific
- // instance of a Roxie query.
- // Caching is used to ensure that only queries that are affected by a package change need to be reloaded.
- // Derived classes handle the differences between slave and server side factories
- //----------------------------------------------------------------------------------------------
- class CQueryFactory : implements IQueryFactory, implements IResourceContext, public CInterface
- {
- protected:
- const IRoxiePackage &package;
- Owned<const IQueryDll> dll;
- Linked<ISharedOnceContext> sharedOnceContext;
- MapStringToActivityArray graphMap;
- StringAttr id;
- StringBuffer errorMessage;
- MapIdToActivityFactory allActivities;
- QueryOptions options;
- bool dynamic;
- bool isSuspended;
- bool isLoadFailed;
- ClusterType targetClusterType;
- unsigned libraryInterfaceHash;
- hash64_t hashValue;
- static SpinLock queriesCrit;
- static CopyMapXToMyClass<hash64_t, hash64_t, CQueryFactory> queryMap;
- mutable CIArrayOf<TerminationCallbackInfo> callbacks;
- mutable CriticalSection callbacksCrit;
- public:
- static CriticalSection queryCreateLock;
- protected:
- IRoxieServerActivityFactory *createActivityFactory(ThorActivityKind kind, unsigned subgraphId, IPropertyTree &node)
- {
- unsigned id = node.getPropInt("@id", 0);
- unsigned rid = id;
- if (isSuspended)
- return createRoxieServerDummyActivityFactory(id, subgraphId, *this, NULL, TAKnone, node, false); // Is there actually any point?
- switch (options.priority)
- {
- case 1:
- rid |= ROXIE_HIGH_PRIORITY;
- break;
- case 2:
- rid |= ROXIE_SLA_PRIORITY;
- break;
- }
- StringBuffer helperName;
- node.getProp("att[@name=\"helper\"]/@value", helperName);
- if (!helperName.length())
- helperName.append("fAc").append(id);
- HelperFactory *helperFactory = dll->getFactory(helperName);
- if (!helperFactory)
- throw MakeStringException(ROXIE_INTERNAL_ERROR, "Internal error: helper function %s not exported", helperName.str());
- RemoteActivityId remoteId(rid, hashValue);
- RemoteActivityId remoteId2(rid | ROXIE_ACTIVITY_FETCH, hashValue);
- switch (kind)
- {
- case TAKalljoin:
- case TAKalldenormalize:
- case TAKalldenormalizegroup:
- return createRoxieServerAllJoinActivityFactory(id, subgraphId, *this, helperFactory, kind, node);
- case TAKapply:
- return createRoxieServerApplyActivityFactory(id, subgraphId, *this, helperFactory, kind, node, isRootAction(node));
- case TAKaggregate:
- case TAKexistsaggregate: // could special case.
- case TAKcountaggregate:
- return createRoxieServerAggregateActivityFactory(id, subgraphId, *this, helperFactory, kind, node);
- case TAKcase:
- case TAKchildcase:
- return createRoxieServerCaseActivityFactory(id, subgraphId, *this, helperFactory, kind, node, isGraphIndependent(node));
- case TAKcatch:
- case TAKskipcatch:
- case TAKcreaterowcatch:
- return createRoxieServerCatchActivityFactory(id, subgraphId, *this, helperFactory, kind, node);
- case TAKchilditerator:
- return createRoxieServerChildIteratorActivityFactory(id, subgraphId, *this, helperFactory, kind, node);
- case TAKchoosesets:
- return createRoxieServerChooseSetsActivityFactory(id, subgraphId, *this, helperFactory, kind, node);
- case TAKchoosesetsenth:
- return createRoxieServerChooseSetsEnthActivityFactory(id, subgraphId, *this, helperFactory, kind, node);
- case TAKchoosesetslast:
- return createRoxieServerChooseSetsLastActivityFactory(id, subgraphId, *this, helperFactory, kind, node);
- case TAKproject:
- case TAKcountproject:
- return createRoxieServerProjectActivityFactory(id, subgraphId, *this, helperFactory, kind, node); // code is common between Project, CountProject
- case TAKfilterproject:
- return createRoxieServerFilterProjectActivityFactory(id, subgraphId, *this, helperFactory, kind, node);
- case TAKdatasetresult:
- case TAKrowresult:
- return createRoxieServerDatasetResultActivityFactory(id, subgraphId, *this, helperFactory, kind, node, isRootAction(node));
- case TAKdedup:
- return createRoxieServerDedupActivityFactory(id, subgraphId, *this, helperFactory, kind, node);
- case TAKdegroup:
- return createRoxieServerDegroupActivityFactory(id, subgraphId, *this, helperFactory, kind, node);
- case TAKcsvread:
- case TAKxmlread:
- case TAKjsonread:
- case TAKdiskread:
- {
- if (node.getPropBool("att[@name='_isSpill']/@value", false) || node.getPropBool("att[@name='_isSpillGlobal']/@value", false))
- return createRoxieServerSpillReadActivityFactory(id, subgraphId, *this, helperFactory, kind, node);
- else
- return createRoxieServerDiskReadActivityFactory(id, subgraphId, *this, helperFactory, kind, node, remoteId);
- }
- case TAKmemoryspillread:
- return createRoxieServerSpillReadActivityFactory(id, subgraphId, *this, helperFactory, kind, node);
- case TAKdisknormalize:
- case TAKdiskcount:
- case TAKdiskaggregate:
- case TAKdiskgroupaggregate:
- return createRoxieServerDiskReadActivityFactory(id, subgraphId, *this, helperFactory, kind, node, remoteId);
- case TAKchildnormalize:
- return createRoxieServerNewChildNormalizeActivityFactory(id, subgraphId, *this, helperFactory, kind, node);
- case TAKchildaggregate:
- return createRoxieServerNewChildAggregateActivityFactory(id, subgraphId, *this, helperFactory, kind, node);
- case TAKchildgroupaggregate:
- return createRoxieServerNewChildGroupAggregateActivityFactory(id, subgraphId, *this, helperFactory, kind, node);
- case TAKchildthroughnormalize:
- return createRoxieServerNewChildThroughNormalizeActivityFactory(id, subgraphId, *this, helperFactory, kind, node);
- case TAKcsvwrite:
- case TAKdiskwrite:
- case TAKxmlwrite:
- case TAKjsonwrite:
- case TAKmemoryspillwrite:
- return createRoxieServerDiskWriteActivityFactory(id, subgraphId, *this, helperFactory, kind, node, isRootAction(node));
- case TAKindexwrite:
- return createRoxieServerIndexWriteActivityFactory(id, subgraphId, *this, helperFactory, kind, node, isRootAction(node));
- case TAKenth:
- return createRoxieServerEnthActivityFactory(id, subgraphId, *this, helperFactory, kind, node);
- case TAKfetch:
- case TAKcsvfetch:
- case TAKxmlfetch:
- case TAKjsonfetch:
- return createRoxieServerFetchActivityFactory(id, subgraphId, *this, helperFactory, kind, node, remoteId);
- case TAKfilter:
- return createRoxieServerFilterActivityFactory(id, subgraphId, *this, helperFactory, kind, node);
- case TAKfiltergroup:
- return createRoxieServerFilterGroupActivityFactory(id, subgraphId, *this, helperFactory, kind, node);
- case TAKfirstn:
- return createRoxieServerFirstNActivityFactory(id, subgraphId, *this, helperFactory, kind, node);
- case TAKfunnel:
- return createRoxieServerConcatActivityFactory(id, subgraphId, *this, helperFactory, kind, node);
- case TAKgroup:
- return createRoxieServerGroupActivityFactory(id, subgraphId, *this, helperFactory, kind, node);
- case TAKhashaggregate:
- return createRoxieServerHashAggregateActivityFactory(id, subgraphId, *this, helperFactory, kind, node);
- case TAKif:
- case TAKchildif:
- return createRoxieServerIfActivityFactory(id, subgraphId, *this, helperFactory, kind, node, isGraphIndependent(node));
- case TAKifaction:
- return createRoxieServerIfActionActivityFactory(id, subgraphId, *this, helperFactory, kind, node, isRootAction(node));
- case TAKparallel:
- return createRoxieServerParallelActionActivityFactory(id, subgraphId, *this, helperFactory, kind, node, isRootAction(node));
- case TAKsequential:
- return createRoxieServerSequentialActionActivityFactory(id, subgraphId, *this, helperFactory, kind, node, isRootAction(node));
- case TAKindexread:
- return createRoxieServerIndexReadActivityFactory(id, subgraphId, *this, helperFactory, kind, node, remoteId);
- case TAKindexnormalize:
- return createRoxieServerIndexNormalizeActivityFactory(id, subgraphId, *this, helperFactory, kind, node, remoteId);
- case TAKindexcount:
- return createRoxieServerIndexCountActivityFactory(id, subgraphId, *this, helperFactory, kind, node, remoteId);
- case TAKindexaggregate:
- return createRoxieServerIndexAggregateActivityFactory(id, subgraphId, *this, helperFactory, kind, node, remoteId);
- case TAKindexgroupaggregate:
- case TAKindexgroupexists:
- case TAKindexgroupcount:
- return createRoxieServerIndexGroupAggregateActivityFactory(id, subgraphId, *this, helperFactory, kind, node, remoteId);
- case TAKhashdedup:
- return createRoxieServerHashDedupActivityFactory(id, subgraphId, *this, helperFactory, kind, node);
- case TAKhashdenormalize:
- case TAKhashdistribute:
- case TAKhashdistributemerge:
- case TAKhashjoin:
- throwUnexpected(); // Code generator should have removed or transformed
- case TAKiterate:
- return createRoxieServerIterateActivityFactory(id, subgraphId, *this, helperFactory, kind, node);
- case TAKprocess:
- return createRoxieServerProcessActivityFactory(id, subgraphId, *this, helperFactory, kind, node);
- case TAKjoin:
- case TAKjoinlight:
- case TAKdenormalize:
- case TAKdenormalizegroup:
- return createRoxieServerJoinActivityFactory(id, subgraphId, *this, helperFactory, kind, node);
- case TAKkeyeddistribute:
- throwUnexpected(); // Code generator should have removed or transformed
- case TAKkeyedjoin:
- case TAKkeyeddenormalize:
- case TAKkeyeddenormalizegroup:
- return createRoxieServerKeyedJoinActivityFactory(id, subgraphId, *this, helperFactory, kind, node, remoteId, remoteId2);
- case TAKlimit:
- return createRoxieServerLimitActivityFactory(id, subgraphId, *this, helperFactory, kind, node);
- case TAKlookupjoin:
- case TAKlookupdenormalize:
- case TAKlookupdenormalizegroup:
- case TAKsmartjoin:
- case TAKsmartdenormalize:
- case TAKsmartdenormalizegroup:
- return createRoxieServerLookupJoinActivityFactory(id, subgraphId, *this, helperFactory, kind, node);
- case TAKmerge:
- return createRoxieServerMergeActivityFactory(id, subgraphId, *this, helperFactory, kind, node);
- case TAKnormalize:
- return createRoxieServerNormalizeActivityFactory(id, subgraphId, *this, helperFactory, kind, node);
- case TAKnormalizechild:
- return createRoxieServerNormalizeChildActivityFactory(id, subgraphId, *this, helperFactory, kind, node);
- case TAKnormalizelinkedchild:
- return createRoxieServerNormalizeLinkedChildActivityFactory(id, subgraphId, *this, helperFactory, kind, node);
- case TAKnull:
- return createRoxieServerNullActivityFactory(id, subgraphId, *this, helperFactory, kind, node);
- case TAKsideeffect:
- return createRoxieServerSideEffectActivityFactory(id, subgraphId, *this, helperFactory, kind, node, isRootAction(node));
- case TAKsimpleaction:
- return createRoxieServerActionActivityFactory(id, subgraphId, *this, helperFactory, kind, node, usageCount(node), isRootAction(node));
- case TAKparse:
- return createRoxieServerParseActivityFactory(id, subgraphId, *this, helperFactory, kind, node, this);
- case TAKworkunitwrite:
- return createRoxieServerWorkUnitWriteActivityFactory(id, subgraphId, *this, helperFactory, kind, node, usageCount(node), isRootAction(node));
- case TAKdictionaryworkunitwrite:
- return createRoxieServerWorkUnitWriteDictActivityFactory(id, subgraphId, *this, helperFactory, kind, node, usageCount(node), isRootAction(node));
- case TAKpiperead:
- return createRoxieServerPipeReadActivityFactory(id, subgraphId, *this, helperFactory, kind, node);
- case TAKpipethrough:
- return createRoxieServerPipeThroughActivityFactory(id, subgraphId, *this, helperFactory, kind, node);
- case TAKpipewrite:
- return createRoxieServerPipeWriteActivityFactory(id, subgraphId, *this, helperFactory, kind, node, usageCount(node), isRootAction(node));
- case TAKpull:
- return createRoxieServerPullActivityFactory(id, subgraphId, *this, helperFactory, kind, node);
- case TAKtrace:
- return createRoxieServerTraceActivityFactory(id, subgraphId, *this, helperFactory, kind, node);
- case TAKlinkedrawiterator:
- return createRoxieServerLinkedRawIteratorActivityFactory(id, subgraphId, *this, helperFactory, kind, node);
- case TAKremoteresult:
- return createRoxieServerRemoteResultActivityFactory(id, subgraphId, *this, helperFactory, kind, node, usageCount(node), isRootAction(node));
- case TAKrollup:
- return createRoxieServerRollupActivityFactory(id, subgraphId, *this, helperFactory, kind, node);
- case TAKsample:
- return createRoxieServerSampleActivityFactory(id, subgraphId, *this, helperFactory, kind, node);
- case TAKselectn:
- return createRoxieServerSelectNActivityFactory(id, subgraphId, *this, helperFactory, kind, node);
- case TAKselfjoin:
- case TAKselfjoinlight:
- return createRoxieServerSelfJoinActivityFactory(id, subgraphId, *this, helperFactory, kind, node);
- case TAKskiplimit:
- case TAKcreaterowlimit:
- return createRoxieServerSkipLimitActivityFactory(id, subgraphId, *this, helperFactory, kind, node);
- case TAKhttp_rowdataset:
- case TAKsoap_rowdataset:
- return createRoxieServerSoapRowCallActivityFactory(id, subgraphId, *this, helperFactory, kind, node);
- case TAKsoap_rowaction:
- return createRoxieServerSoapRowActionActivityFactory(id, subgraphId, *this, helperFactory, kind, node, isRootAction(node));
- case TAKsoap_datasetdataset:
- return createRoxieServerSoapDatasetCallActivityFactory(id, subgraphId, *this, helperFactory, kind, node);
- case TAKsoap_datasetaction:
- return createRoxieServerSoapDatasetActionActivityFactory(id, subgraphId, *this, helperFactory, kind, node, isRootAction(node));
- case TAKsort:
- return createRoxieServerSortActivityFactory(id, subgraphId, *this, helperFactory, kind, node);
- case TAKspill:
- case TAKmemoryspillsplit:
- return createRoxieServerThroughSpillActivityFactory(id, subgraphId, *this, helperFactory, kind, node);
- case TAKsplit:
- return createRoxieServerSplitActivityFactory(id, subgraphId, *this, helperFactory, kind, node);
- case TAKstreamediterator:
- return createRoxieServerStreamedIteratorActivityFactory(id, subgraphId, *this, helperFactory, kind, node);
- case TAKinlinetable:
- return createRoxieServerInlineTableActivityFactory(id, subgraphId, *this, helperFactory, kind, node);
- case TAKthroughaggregate:
- throwUnexpected(); // Concept of through aggregates has been proven not to work in Roxie - codegen should not be creating them any more.
- case TAKtopn:
- return createRoxieServerTopNActivityFactory(id, subgraphId, *this, helperFactory, kind, node);
- case TAKworkunitread:
- return createRoxieServerWorkUnitReadActivityFactory(id, subgraphId, *this, helperFactory, kind, node);
- case TAKxmlparse:
- return createRoxieServerXmlParseActivityFactory(id, subgraphId, *this, helperFactory, kind, node);
- case TAKquantile:
- return createRoxieServerQuantileActivityFactory(id, subgraphId, *this, helperFactory, kind, node);
- case TAKregroup:
- return createRoxieServerRegroupActivityFactory(id, subgraphId, *this, helperFactory, kind, node);
- case TAKcombine:
- return createRoxieServerCombineActivityFactory(id, subgraphId, *this, helperFactory, kind, node);
- case TAKcombinegroup:
- return createRoxieServerCombineGroupActivityFactory(id, subgraphId, *this, helperFactory, kind, node);
- case TAKrollupgroup:
- return createRoxieServerRollupGroupActivityFactory(id, subgraphId, *this, helperFactory, kind, node);
- case TAKlocalresultread:
- {
- unsigned graphId = getGraphId(node);
- return createRoxieServerLocalResultReadActivityFactory(id, subgraphId, *this, helperFactory, kind, node, graphId);
- }
- case TAKlocalstreamread:
- return createRoxieServerLocalResultStreamReadActivityFactory(id, subgraphId, *this, helperFactory, kind, node);
- case TAKlocalresultwrite:
- {
- unsigned graphId = getGraphId(node);
- return createRoxieServerLocalResultWriteActivityFactory(id, subgraphId, *this, helperFactory, kind, node, usageCount(node), graphId, isRootAction(node));
- }
- case TAKdictionaryresultwrite:
- {
- unsigned graphId = getGraphId(node);
- return createRoxieServerDictionaryResultWriteActivityFactory(id, subgraphId, *this, helperFactory, kind, node, usageCount(node), graphId, isRootAction(node));
- }
- case TAKloopcount:
- case TAKlooprow:
- case TAKloopdataset:
- {
- unsigned loopId = node.getPropInt("att[@name=\"_loopid\"]/@value", 0);
- return createRoxieServerLoopActivityFactory(id, subgraphId, *this, helperFactory, kind, node, loopId);
- }
- case TAKremotegraph:
- return createRoxieServerRemoteActivityFactory(id, subgraphId, *this, helperFactory, kind, node, remoteId, isRootAction(node));
- case TAKgraphloopresultread:
- {
- unsigned graphId = getGraphId(node);
- return createRoxieServerGraphLoopResultReadActivityFactory(id, subgraphId, *this, helperFactory, kind, node, graphId);
- }
- case TAKgraphloopresultwrite:
- {
- unsigned graphId = getGraphId(node);
- return createRoxieServerGraphLoopResultWriteActivityFactory(id, subgraphId, *this, helperFactory, kind, node, usageCount(node), graphId);
- }
- case TAKnwaygraphloopresultread:
- {
- unsigned graphId = node.getPropInt("att[@name=\"_graphId\"]/@value", 0);
- return createRoxieServerNWayGraphLoopResultReadActivityFactory(id, subgraphId, *this, helperFactory, kind, node, graphId);
- }
- case TAKnwayinput:
- return createRoxieServerNWayInputActivityFactory(id, subgraphId, *this, helperFactory, kind, node);
- case TAKnwaymerge:
- return createRoxieServerNWayMergeActivityFactory(id, subgraphId, *this, helperFactory, kind, node);
- case TAKnwaymergejoin:
- case TAKnwayjoin:
- return createRoxieServerNWayMergeJoinActivityFactory(id, subgraphId, *this, helperFactory, kind, node);
- case TAKsorted:
- return createRoxieServerSortedActivityFactory(id, subgraphId, *this, helperFactory, kind, node);
- case TAKgraphloop:
- case TAKparallelgraphloop:
- {
- unsigned loopId = node.getPropInt("att[@name=\"_loopid\"]/@value", 0);
- return createRoxieServerGraphLoopActivityFactory(id, subgraphId, *this, helperFactory, kind, node, loopId);
- }
- case TAKlibrarycall:
- {
- LibraryCallFactoryExtra extra;
- extra.maxOutputs = node.getPropInt("att[@name=\"_maxOutputs\"]/@value", 0);
- extra.graphid = node.getPropInt("att[@name=\"_graphid\"]/@value", 0);
- extra.libraryName.set(node.queryProp("att[@name=\"libname\"]/@value"));
- extra.interfaceHash = node.getPropInt("att[@name=\"_interfaceHash\"]/@value", 0);
- extra.embedded = node.getPropBool("att[@name=\"embedded\"]/@value", false) ;
- if (extra.embedded)
- {
- extra.embeddedGraphName.set(node.queryProp("att[@name=\"graph\"]/@value"));
- if (!extra.embeddedGraphName)
- extra.embeddedGraphName.set(extra.libraryName);
- }
- Owned<IPropertyTreeIterator> iter = node.getElements("att[@name=\"_outputUsed\"]");
- ForEach(*iter)
- extra.outputs.append(iter->query().getPropInt("@value"));
- return createRoxieServerLibraryCallActivityFactory(id, subgraphId, *this, helperFactory, kind, node, extra);
- }
- case TAKnwayselect:
- return createRoxieServerNWaySelectActivityFactory(id, subgraphId, *this, helperFactory, kind, node);
- case TAKnonempty:
- return createRoxieServerNonEmptyActivityFactory(id, subgraphId, *this, helperFactory, kind, node);
- case TAKprefetchproject:
- return createRoxieServerPrefetchProjectActivityFactory(id, subgraphId, *this, helperFactory, kind, node);
- case TAKwhen_dataset:
- return createRoxieServerWhenActivityFactory(id, subgraphId, *this, helperFactory, kind, node);
- case TAKwhen_action:
- return createRoxieServerWhenActionActivityFactory(id, subgraphId, *this, helperFactory, kind, node, isRootAction(node));
- case TAKdistribution:
- return createRoxieServerDistributionActivityFactory(id, subgraphId, *this, helperFactory, kind, node, isRootAction(node));
- // These are not required in Roxie for the time being - code generator should trap them
- case TAKchilddataset:
- default:
- throw MakeStringException(ROXIE_UNIMPLEMENTED_ERROR, "Unimplemented activity %s required", getActivityText(kind));
- break;
- }
- throwUnexpected(); // unreachable, but some compilers will complain about missing return
- }
- IActivityFactory *findActivity(unsigned id) const
- {
- if (id)
- {
- IActivityFactory **f = allActivities.getValue(id);
- if (f)
- return *f;
- }
- return NULL;
- }
- virtual IRoxieServerActivityFactory *getRoxieServerActivityFactory(unsigned id) const
- {
- checkSuspended();
- return LINK(QUERYINTERFACE(findActivity(id), IRoxieServerActivityFactory));
- }
- virtual ISlaveActivityFactory *getSlaveActivityFactory(unsigned id) const
- {
- checkSuspended();
- IActivityFactory *f = findActivity(id);
- return LINK(QUERYINTERFACE(f, ISlaveActivityFactory)); // MORE - don't dynamic cast yuk
- }
- ActivityArray *loadChildGraph(IPropertyTree &graph)
- {
- // MORE - this is starting to look very much like loadGraph (on Roxie server side)
- ActivityArray *activities = new ActivityArray(true, graph.getPropBool("@delayed"), graph.getPropBool("@library"), graph.getPropBool("@sequential"));
- unsigned subgraphId = graph.getPropInt("@id");
- try
- {
- Owned<IPropertyTreeIterator> nodes = graph.getElements("node");
- ForEach(*nodes)
- {
- IPropertyTree &node = nodes->query();
- loadNode(node, subgraphId, activities);
- }
- Owned<IPropertyTreeIterator> edges = graph.getElements("edge");
- ForEach(*edges)
- {
- IPropertyTree &edge = edges->query();
- unsigned source = activities->findActivityIndex(edge.getPropInt("@source",0));
- unsigned target = activities->findActivityIndex(edge.getPropInt("@target",0));
- unsigned sourceOutput = edge.getPropInt("att[@name=\"_sourceIndex\"]/@value", 0);
- unsigned targetInput = edge.getPropInt("att[@name=\"_targetIndex\"]/@value", 0);
- int controlId = edge.getPropInt("att[@name=\"_when\"]/@value", 0);
- if (controlId != 0)
- {
- const char * edgeId = edge.queryProp("@id");
- addDependency(sourceOutput, source, target, controlId, edgeId, activities);
- }
- else
- activities->serverItem(target).setInput(targetInput, source, sourceOutput);
- }
- }
- catch (...)
- {
- ::Release(activities);
- allActivities.kill();
- throw;
- }
- return activities;
- }
- void loadNode(IPropertyTree &node, unsigned subgraphId, ActivityArray *activities)
- {
- ThorActivityKind kind = getActivityKind(node);
- if (kind==TAKsubgraph)
- {
- IPropertyTree * childGraphNode = node.queryPropTree("att/graph");
- if (childGraphNode->getPropBool("@child"))
- {
- loadSubgraph(node, activities);
- }
- else
- {
- unsigned parentId = findParentId(node);
- assertex(parentId);
- unsigned parentIdx = activities->findActivityIndex(parentId);
- IActivityFactory &parentFactory = activities->item(parentIdx);
- ActivityArray *childQuery = loadChildGraph(*childGraphNode);
- parentFactory.addChildQuery(node.getPropInt("@id"), childQuery);
- }
- }
- else if (kind)
- {
- IRoxieServerActivityFactory *f = createActivityFactory(kind, subgraphId, node);
- if (f)
- {
- activities->append(*f);
- allActivities.setValue(f->queryId(), f);
- }
- }
- }
- void loadSubgraph(IPropertyTree &graph, ActivityArray *activities)
- {
- unsigned subgraphId = graph.getPropInt("@id");
- Owned<IPropertyTreeIterator> nodes = graph.getElements("att/graph/node");
- ForEach(*nodes)
- {
- IPropertyTree &node = nodes->query();
- loadNode(node, subgraphId, activities);
- }
- if (!isSuspended)
- {
- Owned<IPropertyTreeIterator> edges = graph.getElements("att/graph/edge");
- ForEach(*edges)
- {
- IPropertyTree &edge = edges->query();
- unsigned sourceActivity = edge.getPropInt("@source", 0);
- unsigned targetActivity = edge.getPropInt("@target", 0);
- unsigned source = activities->findActivityIndex(sourceActivity);
- unsigned target = activities->recursiveFindActivityIndex(targetActivity);
- unsigned sourceOutput = edge.getPropInt("att[@name=\"_sourceIndex\"]/@value", 0);
- unsigned targetInput = edge.getPropInt("att[@name=\"_targetIndex\"]/@value", 0);
- int controlId = edge.getPropInt("att[@name=\"_when\"]/@value", 0);
- if (controlId != 0)
- {
- const char * edgeId = edge.queryProp("@id");
- addDependency(sourceOutput, sourceActivity, targetActivity, controlId, edgeId, activities);
- }
- else
- activities->serverItem(target).setInput(targetInput, source, sourceOutput);
- }
- }
- }
- // loadGraph loads outer level graph. This is virtual as slave is very different from Roxie server
- virtual ActivityArray *loadGraph(IPropertyTree &graph, const char *graphName) = 0;
- bool doAddDependency(unsigned sourceIdx, unsigned sourceId, unsigned targetId, int controlId, const char *edgeId, ActivityArray * activities)
- {
- // Note - the dependency is recorded with the target being the parent activity that is at the same level as the source
- // (recording it on the child that was actually dependent would mean it happened too late)
- unsigned source = activities->findActivityIndex(sourceId);
- if (source != NotFound)
- {
- unsigned target = activities->recursiveFindActivityIndex(targetId);
- activities->serverItem(target).addDependency(source, activities->serverItem(source).getKind(), sourceIdx, controlId, edgeId);
- activities->serverItem(source).noteDependent(target);
- return true;
- }
- ForEachItemIn(idx, *activities)
- {
- IActivityFactory & cur = activities->item(idx);
- unsigned childId;
- for (unsigned childIdx = 0;;childIdx++)
- {
- ActivityArray * children = cur.queryChildQuery(childIdx, childId);
- if (!children)
- break;
- if (doAddDependency(sourceIdx, sourceId, targetId, controlId, edgeId, children))
- return true;
- }
- }
- return false;
- }
- virtual void addDependency(unsigned sourceIdx, unsigned sourceId, unsigned targetId, int controlId, const char *edgeId, ActivityArray * activities)
- {
- doAddDependency(sourceIdx, sourceId, targetId, controlId, edgeId, activities);
- }
- void addDependencies(IPropertyTree &graph, ActivityArray *activities)
- {
- Owned<IPropertyTreeIterator> dependencies = graph.getElements("edge");
- ForEach(*dependencies)
- {
- IPropertyTree &edge = dependencies->query();
- if (!edge.getPropInt("att[@name=\"_childGraph\"]/@value", 0))
- {
- unsigned sourceIdx = edge.getPropInt("att[@name=\"_sourceIndex\"]/@value", 0);
- int controlId = edge.getPropInt("att[@name=\"_when\"]/@value", 0);
- addDependency(sourceIdx, edge.getPropInt("att[@name=\"_sourceActivity\"]/@value", 0), edge.getPropInt("att[@name=\"_targetActivity\"]/@value", 0), controlId, edge.queryProp("@id"), activities);
- }
- }
- }
- public:
- IMPLEMENT_IINTERFACE;
- unsigned channelNo;
- CQueryFactory(const char *_id, const IQueryDll *_dll, const IRoxiePackage &_package, hash64_t _hashValue, unsigned _channelNo, ISharedOnceContext *_sharedOnceContext, bool _dynamic)
- : id(_id), package(_package), dll(_dll), channelNo(_channelNo), hashValue(_hashValue), sharedOnceContext(_sharedOnceContext), dynamic(_dynamic)
- {
- package.Link();
- targetClusterType = RoxieCluster;
- isSuspended = false;
- isLoadFailed = false;
- libraryInterfaceHash = 0;
- options.enableFieldTranslation = package.getEnableFieldTranslation(); // NOTE - can be overridden by wu settings
- options.allSortsMaySpill = dynamic;
- }
- ~CQueryFactory()
- {
- HashIterator graphs(graphMap);
- for(graphs.first();graphs.isValid();graphs.next())
- {
- ActivityArray *a = *graphMap.mapToValue(&graphs.query());
- a->Release();
- }
- package.Release();
- }
- virtual IQueryFactory *lookupLibrary(const char *libraryName, unsigned expectedInterfaceHash, const IRoxieContextLogger &logctx) const
- {
- return globalPackageSetManager->lookupLibrary(libraryName, expectedInterfaceHash, logctx);
- }
- virtual void beforeDispose()
- {
- // NOTE: it's theoretically possible for the final release to happen after a replacement has been inserted into hash table.
- // So only remove from hash table if what we find there matches the item that is being deleted.
- hash64_t hv = rtlHash64Data(sizeof(channelNo), &channelNo, hashValue);
- SpinBlock b(queriesCrit);
- CQueryFactory *goer = queryMap.getValue(hv);
- if (goer == this)
- queryMap.remove(hv);
- }
- static IQueryFactory *getQueryFactory(hash64_t hashValue, unsigned channelNo)
- {
- hash64_t hv = rtlHash64Data(sizeof(channelNo), &channelNo, hashValue);
- SpinBlock b(queriesCrit);
- CQueryFactory *factory = LINK(queryMap.getValue(hv));
- if (factory && factory->isAlive())
- return factory;
- else
- return NULL;
- }
- static hash64_t getQueryHash(const char *id, const IQueryDll *dll, const IRoxiePackage &package, const IPropertyTree *stateInfo, IArrayOf<IResolvedFile> &files, bool isDynamic)
- {
- hash64_t hashValue = package.queryHash();
- if (traceLevel > 8)
- DBGLOG("getQueryHash: %s %" I64F "u from package", id, hashValue);
- if (dll)
- {
- hashValue = rtlHash64VStr(dll->queryDll()->queryName(), hashValue);
- if (traceLevel > 8)
- DBGLOG("getQueryHash: %s %" I64F "u from dll", id, hashValue);
- if (!lockSuperFiles && !allFilesDynamic && !isDynamic && !package.isCompulsory())
- {
- IConstWorkUnit *wu = dll->queryWorkUnit();
- if (wu) // wu may be null in some unit test cases
- {
- SCMStringBuffer bStr;
- // Don't want to include files referenced in thor graphs... in practice isDynamic also likely to be set in such cases
- if (getClusterType(wu->getDebugValue("targetClusterType", bStr).str(), RoxieCluster) == RoxieCluster)
- {
- Owned<IConstWUGraphIterator> graphs = &wu->getGraphs(GraphTypeActivities);
- ForEach(*graphs)
- {
- Owned<IPropertyTree> graphXgmml = graphs->query().getXGMMLTree(false);
- Owned<IPropertyTreeIterator> nodes = graphXgmml->getElements(".//node");
- ForEach(*nodes)
- {
- IPropertyTree &node = nodes->query();
- ThorActivityKind kind = getActivityKind(node);
- if (kind != TAKdiskwrite && kind != TAKindexwrite && kind != TAKpiperead && kind != TAKpipewrite)
- {
- const char *fileName = queryNodeFileName(node, kind);
- const char *indexName = queryNodeIndexName(node, kind);
- // What about packages that resolve everything without dali?
- if (indexName)
- {
- bool isOpt = pretendAllOpt || node.getPropBool("att[@name='_isIndexOpt']/@value");
- const IResolvedFile *indexFile = package.lookupFileName(indexName, isOpt, true, true, wu, true);
- if (indexFile)
- {
- hashValue = indexFile->addHash64(hashValue);
- if (traceLevel > 8)
- DBGLOG("getQueryHash: %s %" I64F "u from index %s", id, hashValue, indexName);
- files.append(*const_cast<IResolvedFile *>(indexFile));
- }
- }
- if (fileName)
- {
- if (!node.getPropBool("att[@name='_isSpill']/@value") && !node.getPropBool("att[@name='_isSpillGlobal']/@value"))
- {
- bool isOpt = pretendAllOpt || node.getPropBool("att[@name='_isOpt']/@value");
- const IResolvedFile *dataFile = package.lookupFileName(fileName, isOpt, true, true, wu, true);
- if (dataFile)
- {
- hashValue = dataFile->addHash64(hashValue);
- if (traceLevel > 8)
- DBGLOG("getQueryHash: %s %" I64F "u from index %s", id, hashValue, fileName);
- files.append(*const_cast<IResolvedFile *>(dataFile));
- }
- }
- }
- }
- }
- }
- }
- }
- }
- }
- if (id)
- hashValue = rtlHash64VStr(id, hashValue);
- hashValue = rtlHash64VStr("Roxie", hashValue); // Adds some noise into the hash - otherwise adjacent wuids tend to hash very close together
- if (traceLevel > 8)
- DBGLOG("getQueryHash: %s %" I64F "u from id", id, hashValue);
- if (stateInfo)
- {
- StringBuffer xml;
- toXML(stateInfo, xml);
- hashValue = rtlHash64Data(xml.length(), xml.str(), hashValue);
- if (traceLevel > 8)
- DBGLOG("getQueryHash: %s %" I64F "u from stateInfo", id, hashValue);
- }
- if (traceLevel > 8)
- DBGLOG("getQueryHash: %s %" I64F "u", id, hashValue);
- return hashValue;
- }
-
- virtual void load(const IPropertyTree *stateInfo)
- {
- IConstWorkUnit *wu = dll->queryWorkUnit();
- if (wu) // wu may be null in some unit test cases
- {
- libraryInterfaceHash = wu->getApplicationValueInt("LibraryModule", "interfaceHash", 0);
- options.setFromWorkUnit(*wu, stateInfo);
- SCMStringBuffer bStr;
- targetClusterType = getClusterType(wu->getDebugValue("targetClusterType", bStr).str(), RoxieCluster);
- // NOTE: stateinfo overrides package info
- if (stateInfo)
- {
- // info in querySets can override the defaults from workunit for some limits
- isSuspended = stateInfo->getPropBool("@suspended", false);
- }
- if (targetClusterType == RoxieCluster)
- {
- Owned<IConstWUGraphIterator> graphs = &wu->getGraphs(GraphTypeActivities);
- SCMStringBuffer graphNameStr;
- ForEach(*graphs)
- {
- graphs->query().getName(graphNameStr);
- const char *graphName = graphNameStr.s.str();
- Owned<IPropertyTree> graphXgmml = graphs->query().getXGMMLTree(false);
- try
- {
- ActivityArray *activities = loadGraph(*graphXgmml, graphName);
- graphMap.setValue(graphName, activities);
- }
- catch (IException *E)
- {
- StringBuffer m;
- E->errorMessage(m);
- suspend(m.str());
- ERRLOG("Query %s suspended: %s", id.get(), m.str());
- E->Release();
- }
- }
- }
- }
- hash64_t hv = rtlHash64Data(sizeof(channelNo), &channelNo, hashValue);
- SpinBlock b(queriesCrit);
- queryMap.setValue(hv, this);
- }
- virtual unsigned queryChannel() const
- {
- return channelNo;
- }
- virtual hash64_t queryHash() const
- {
- return hashValue;
- }
- virtual ISharedOnceContext *querySharedOnceContext() const
- {
- return sharedOnceContext;
- }
- virtual IDeserializedResultStore &queryOnceResultStore() const
- {
- assertex(sharedOnceContext);
- return sharedOnceContext->queryOnceResultStore();
- }
- virtual IPropertyTree &queryOnceContext(const IRoxieContextLogger &logctx) const
- {
- assertex(sharedOnceContext);
- return sharedOnceContext->queryOnceContext(this, logctx);
- }
- virtual const char *loadResource(unsigned id)
- {
- return (const char *) queryDll()->getResource(id);
- }
- virtual ActivityArray *lookupGraphActivities(const char *name) const
- {
- return *graphMap.getValue(name);
- }
- virtual IActivityGraph *lookupGraph(IRoxieSlaveContext *ctx, const char *name, IProbeManager *probeManager, const IRoxieContextLogger &logctx, IRoxieServerActivity *parentActivity) const
- {
- assertex(name && *name);
- ActivityArrayPtr *graph = graphMap.getValue(name);
- assertex(graph);
- Owned<IActivityGraph> ret = ::createActivityGraph(ctx, name, 0, **graph, parentActivity, probeManager, logctx, 1);
- return ret.getClear();
- }
- void getGraphStats(StringBuffer &reply, const IPropertyTree &thisGraph) const
- {
- Owned<IPropertyTree> graph = createPTreeFromIPT(&thisGraph, ipt_lowmem);
- Owned<IPropertyTreeIterator> edges = graph->getElements(".//edge");
- ForEach(*edges)
- {
- IPropertyTree &edge = edges->query();
- IActivityFactory *a = findActivity(edge.getPropInt("@source", 0));
- if (!a)
- a = findActivity(edge.getPropInt("att[@name=\"_sourceActivity\"]/@value", 0));
- if (a)
- {
- unsigned sourceOutput = edge.getPropInt("att[@name=\"_sourceIndex\"]/@value", 0);
- a->getEdgeProgressInfo(sourceOutput, edge);
- }
- }
- Owned<IPropertyTreeIterator> nodes = graph->getElements(".//node");
- ForEach(*nodes)
- {
- IPropertyTree &node = nodes->query();
- IActivityFactory *a = findActivity(node.getPropInt("@id", 0));
- if (a)
- a->getNodeProgressInfo(node);
- }
- toXML(graph, reply);
- }
- virtual IPropertyTree* cloneQueryXGMML() const
- {
- assertex(dll && dll->queryWorkUnit());
- Owned<IPropertyTree> tree = createPTree("Query", ipt_lowmem);
- Owned<IConstWUGraphIterator> graphs = &dll->queryWorkUnit()->getGraphs(GraphTypeActivities);
- SCMStringBuffer graphNameStr;
- ForEach(*graphs)
- {
- graphs->query().getName(graphNameStr);
- const char *graphName = graphNameStr.s.str();
- Owned<IPropertyTree> graphXgmml = graphs->query().getXGMMLTree(false);
- IPropertyTree *newGraph = tree->addPropTree("Graph");
- newGraph->setProp("@id", graphName);
- newGraph->addPropTree("xgmml")->addPropTree("graph", graphXgmml.getLink());
- }
- return tree.getClear();
- }
- virtual void getStats(StringBuffer &reply, const char *graphName) const
- {
- if (dll)
- {
- assertex(dll->queryWorkUnit());
- Owned<IConstWUGraphIterator> graphs = &dll->queryWorkUnit()->getGraphs(GraphTypeActivities);
- SCMStringBuffer thisGraphNameStr;
- ForEach(*graphs)
- {
- graphs->query().getName(thisGraphNameStr);
- if (graphName)
- {
- if (thisGraphNameStr.length() && (stricmp(graphName, thisGraphNameStr.s.str()) != 0))
- continue; // not interested in this one
- }
- reply.appendf("<Graph id='%s'><xgmml>", thisGraphNameStr.s.str());
- Owned<IPropertyTree> graphXgmml = graphs->query().getXGMMLTree(false);
- getGraphStats(reply, *graphXgmml);
- reply.append("</xgmml></Graph>");
- }
- }
- }
- virtual void getActivityMetrics(StringBuffer &reply) const
- {
- HashIterator i(allActivities);
- StringBuffer myReply;
- ForEach(i)
- {
- IActivityFactory *f = *allActivities.mapToValue(&i.query());
- f->getActivityMetrics(myReply.clear());
- if (myReply.length())
- {
- reply.appendf(" <activity query='%s' id='%d' channel='%d'\n", queryQueryName(), f->queryId(), queryChannel());
- reply.append(myReply);
- reply.append(" </activity>\n");
- }
- }
- }
- virtual void getQueryInfo(StringBuffer &reply, bool full, IArrayOf<IQueryFactory> *slaveQueries, const IRoxieContextLogger &logctx) const
- {
- Owned<IPropertyTree> xref = createPTree("Query", ipt_fast);
- xref->setProp("@id", id);
- if (suspended())
- {
- xref->setPropBool("@suspended", true);
- xref->setProp("@error", errorMessage);
- }
- if (full)
- {
- HashIterator i(allActivities);
- ForEach(i)
- {
- IActivityFactory *f = *allActivities.mapToValue(&i.query());
- f->getXrefInfo(*xref, logctx);
- }
- }
- if (slaveQueries)
- {
- ForEachItemIn(idx, *slaveQueries)
- {
- if (slaveQueries->item(idx).suspended())
- {
- xref->setPropBool("@suspended", true);
- xref->setPropBool("@slaveSuspended", true);
- }
- }
- }
- toXML(xref, reply, 1, XML_Embed|XML_LineBreak|XML_SortTags);
- }
- virtual void resetQueryTimings()
- {
- HashIterator i(allActivities);
- ForEach(i)
- {
- IActivityFactory *f = *allActivities.mapToValue(&i.query());
- f->resetNodeProgressInfo();
- }
- }
- virtual const char *queryErrorMessage() const
- {
- return errorMessage.str();
- }
- virtual const char *queryQueryName() const
- {
- return id;
- }
- virtual bool isQueryLibrary() const
- {
- return libraryInterfaceHash != 0;
- }
- virtual unsigned getQueryLibraryInterfaceHash() const
- {
- return libraryInterfaceHash;
- }
- virtual void suspend(const char* errMsg)
- {
- isSuspended = true;
- isLoadFailed = true;
- errorMessage.append(errMsg);
- }
- virtual bool loadFailed() const
- {
- return isLoadFailed;
- }
- virtual bool suspended() const
- {
- return isSuspended;
- }
- virtual const QueryOptions &queryOptions() const
- {
- return options;
- }
- virtual ILoadedDllEntry *queryDll() const
- {
- assertex(dll);
- return dll->queryDll();
- }
- virtual IConstWorkUnit *queryWorkUnit() const
- {
- assertex(dll);
- return dll->queryWorkUnit();
- }
- virtual const IRoxiePackage &queryPackage() const
- {
- return package;
- }
- virtual CRoxieWorkflowMachine *createWorkflowMachine(IConstWorkUnit *wu, bool isOnce, const IRoxieContextLogger &logctx) const
- {
- throwUnexpected(); // only on server...
- }
- virtual char *getEnv(const char *name, const char *defaultValue) const
- {
- if (!defaultValue)
- defaultValue = "";
- const char *result;
- if (name && *name=='@')
- {
- // @ is shorthand for control: for legacy compatibility reasons
- StringBuffer useName;
- useName.append("control:").append(name+1);
- result = package.queryEnv(useName.str());
- }
- else
- result = package.queryEnv(name);
- if (!result && name)
- result = getenv(name);
- return strdup(result ? result : defaultValue);
- }
- virtual IRoxieSlaveContext *createSlaveContext(const SlaveContextLogger &logctx, IRoxieQueryPacket *packet, bool hasChildren) const
- {
- throwUnexpected(); // only implemented in derived slave class
- }
- virtual IRoxieServerContext *createContext(IPropertyTree *xml, IHpccProtocolResponse *protocol, unsigned flags, const ContextLogger &_logctx, PTreeReaderOptions xmlReadFlags, const char *querySetName) const
- {
- throwUnexpected(); // only implemented in derived server class
- }
- virtual IRoxieServerContext *createContext(IConstWorkUnit *wu, const ContextLogger &_logctx) const
- {
- throwUnexpected(); // only implemented in derived server class
- }
- virtual void noteQuery(time_t startTime, bool failed, unsigned elapsed, unsigned memused, unsigned slavesReplyLen, unsigned bytesOut)
- {
- throwUnexpected(); // only implemented in derived server class
- }
- virtual IPropertyTree *getQueryStats(time_t from, time_t to)
- {
- throwUnexpected(); // only implemented in derived server class
- }
- virtual void getGraphNames(StringArray &ret) const
- {
- Owned<IConstWUGraphIterator> graphs = &dll->queryWorkUnit()->getGraphs(GraphTypeActivities);
- ForEach(*graphs)
- {
- SCMStringBuffer graphName;
- graphs->query().getName(graphName);
- ret.append(graphName.str());
- }
- }
- virtual bool isDynamic() const
- {
- return dynamic;
- }
- protected:
- IPropertyTree *queryWorkflowTree() const
- {
- assertex(dll->queryWorkUnit());
- return dll->queryWorkUnit()->queryWorkflowTree();
- }
- bool hasOnceSection() const
- {
- IPropertyTree *workflow = queryWorkflowTree();
- if (workflow)
- return workflow->hasProp("Item[@mode='once']");
- else
- return false;
- }
- virtual void checkSuspended() const
- {
- if (isSuspended)
- {
- StringBuffer err;
- if (errorMessage.length())
- err.appendf(" because %s", errorMessage.str());
- throw MakeStringException(ROXIE_QUERY_SUSPENDED, "Query %s is suspended%s", id.get(), err.str());
- }
- }
- virtual void onTermination(TerminationCallbackInfo *info) const override
- {
- CriticalBlock b(callbacksCrit);
- callbacks.append(*info);
- }
- };
- CriticalSection CQueryFactory::queryCreateLock;
- SpinLock CQueryFactory::queriesCrit;
- CopyMapXToMyClass<hash64_t, hash64_t, CQueryFactory> CQueryFactory::queryMap;
- extern IQueryFactory *getQueryFactory(hash64_t hashvalue, unsigned channel)
- {
- return CQueryFactory::getQueryFactory(hashvalue, channel);
- }
- class CRoxieServerQueryFactory : public CQueryFactory
- {
- // Parts of query factory is only interesting on the server - workflow support, and tracking of total query times
- protected:
- Owned<IQueryStatsAggregator> queryStats;
- public:
- CRoxieServerQueryFactory(const char *_id, const IQueryDll *_dll, const IRoxiePackage &_package, hash64_t _hashValue, ISharedOnceContext *_sharedOnceContext, bool _dynamic)
- : CQueryFactory(_id, _dll, _package, _hashValue, 0, _sharedOnceContext, _dynamic)
- {
- queryStats.setown(createQueryStatsAggregator(id.get(), statsExpiryTime));
- }
- virtual void noteQuery(time_t startTime, bool failed, unsigned elapsed, unsigned memused, unsigned slavesReplyLen, unsigned bytesOut)
- {
- queryStats->noteQuery(startTime, failed, elapsed, memused, slavesReplyLen, bytesOut);
- queryGlobalQueryStatsAggregator()->noteQuery(startTime, failed, elapsed, memused, slavesReplyLen, bytesOut);
- }
- virtual void addDependency(unsigned sourceIdx, unsigned sourceId, unsigned targetId, int controlId, const char *edgeId, ActivityArray * activities)
- {
- // addDependency is expected to fail occasionally on slave, but never on Roxie server
- if (!doAddDependency(sourceIdx, sourceId, targetId, controlId, edgeId, activities))
- throw MakeStringException(ROXIE_ADDDEPENDENCY_ERROR, "Failed to create dependency from %u on %u", sourceId, targetId);
- }
- virtual ActivityArray *loadGraph(IPropertyTree &graph, const char *graphName)
- {
- bool isLibraryGraph = graph.getPropBool("@library");
- bool isSequential = graph.getPropBool("@sequential");
- ActivityArray *activities = new ActivityArray(isLibraryGraph, false, isLibraryGraph, isSequential);
- if (isLibraryGraph)
- activities->setLibraryGraphId(graph.getPropInt("node/@id"));
- try
- {
- Owned<IPropertyTreeIterator> subgraphs = graph.getElements("node");
- ForEach(*subgraphs)
- {
- IPropertyTree &node = subgraphs->query();
- loadSubgraph(node, activities);
- loadNode(node, 0, activities);
- }
- addDependencies(graph, activities);
- }
- catch (...)
- {
- ::Release(activities);
- allActivities.kill();
- throw;
- }
- return activities;
- }
- virtual IRoxieServerContext *createContext(IPropertyTree *context, IHpccProtocolResponse *protocol, unsigned flags, const ContextLogger &_logctx, PTreeReaderOptions _xmlReadFlags, const char *_querySetName) const
- {
- checkSuspended();
- return createRoxieServerContext(context, protocol, this, flags, _logctx, _xmlReadFlags, _querySetName);
- }
- virtual IRoxieServerContext *createContext(IConstWorkUnit *wu, const ContextLogger &_logctx) const
- {
- checkSuspended();
- return createWorkUnitServerContext(wu, this, _logctx);
- }
- virtual CRoxieWorkflowMachine *createWorkflowMachine(IConstWorkUnit *wu, bool isOnce, const IRoxieContextLogger &logctx) const
- {
- IPropertyTree *workflow = queryWorkflowTree();
- if (workflow)
- {
- return ::createRoxieWorkflowMachine(workflow, wu, isOnce, logctx);
- }
- else
- return NULL;
- }
- virtual IPropertyTree *getQueryStats(time_t from, time_t to)
- {
- return queryStats->getStats(from, to);
- }
- };
- unsigned checkWorkunitVersionConsistency(const IConstWorkUnit *wu)
- {
- assertex(wu);
- unsigned wuVersion = wu->getCodeVersion();
- if (wuVersion == 0)
- throw makeStringException(ROXIE_MISMATCH, "Attempting to execute a workunit that hasn't been compiled");
- if (wuVersion > ACTIVITY_INTERFACE_VERSION || wuVersion < MIN_ACTIVITY_INTERFACE_VERSION)
- throw MakeStringException(ROXIE_MISMATCH, "Workunit was compiled for eclhelper interface version %d, this roxie requires version %d..%d", wuVersion, MIN_ACTIVITY_INTERFACE_VERSION, ACTIVITY_INTERFACE_VERSION);
- return wuVersion;
- }
- static void checkWorkunitVersionConsistency(const IQueryDll *dll)
- {
- unsigned wuVersion = checkWorkunitVersionConsistency(dll->queryWorkUnit());
- EclProcessFactory processFactory = (EclProcessFactory) dll->queryDll()->getEntry("createProcess");
- if (processFactory)
- {
- Owned<IEclProcess> process = processFactory();
- assertex(process);
- if (process->getActivityVersion() != wuVersion)
- throw MakeStringException(ROXIE_MISMATCH, "Inconsistent interface versions. Workunit was created using eclcc for version %u, but the c++ compiler used version %u", wuVersion, process->getActivityVersion());
- }
- else
- throw MakeStringException(ROXIE_MISMATCH, "Workunit did not export createProcess function");
- }
- extern IQueryFactory *createServerQueryFactory(const char *id, const IQueryDll *dll, const IRoxiePackage &package, const IPropertyTree *stateInfo, bool isDynamic, bool forceRetry)
- {
- CriticalBlock b(CQueryFactory::queryCreateLock);
- IArrayOf<IResolvedFile> queryFiles; // Note - these should stay in scope long enough to ensure still cached when (if) query is loaded for real
- hash64_t hashValue = CQueryFactory::getQueryHash(id, dll, package, stateInfo, queryFiles, isDynamic);
- IQueryFactory *cached = getQueryFactory(hashValue, 0);
- if (cached && !(cached->loadFailed() && (reloadRetriesFailed || forceRetry)))
- {
- ::Release(dll);
- return cached;
- }
- if (dll && !selfTestMode)
- {
- checkWorkunitVersionConsistency(dll);
- Owned<ISharedOnceContext> sharedOnceContext;
- IPropertyTree *workflow = dll->queryWorkUnit()->queryWorkflowTree();
- if (workflow && workflow->hasProp("Item[@mode='once']"))
- sharedOnceContext.setown(new CSharedOnceContext);
- Owned<CRoxieServerQueryFactory> newFactory = new CRoxieServerQueryFactory(id, dll, dynamic_cast<const IRoxiePackage&>(package), hashValue, sharedOnceContext, isDynamic);
- newFactory->load(stateInfo);
- if (sharedOnceContext && preloadOnceData)
- {
- Owned<StringContextLogger> logctx = new StringContextLogger(id); // NB may get linked by the onceContext
- sharedOnceContext->checkOnceDone(newFactory, *logctx);
- }
- return newFactory.getClear();
- }
- else
- return new CRoxieServerQueryFactory(id, NULL, dynamic_cast<const IRoxiePackage&>(package), hashValue, NULL, isDynamic);
- }
- extern IQueryFactory *createServerQueryFactoryFromWu(IConstWorkUnit *wu, const IQueryDll *_dll)
- {
- Linked<const IQueryDll> dll = _dll;
- if (!dll)
- dll.setown(createWuQueryDll(wu));
- if (!dll)
- return NULL;
- return createServerQueryFactory(wu->queryWuid(), dll.getClear(), queryRootRoxiePackage(), NULL, true, false); // MORE - if use a constant for id might cache better?
- }
- //==============================================================================================================================================
- class CSlaveQueryFactory : public CQueryFactory
- {
- void addActivity(ISlaveActivityFactory *activity, ActivityArray *activities)
- {
- activities->append(*activity);
- unsigned activityId = activity->queryId();
- allActivities.setValue(activityId, activity);
- }
- void loadSlaveNode(IPropertyTree &node, unsigned subgraphId, ActivityArray *activities)
- {
- ThorActivityKind kind = getActivityKind(node);
- switch (kind)
- {
- case TAKcsvread:
- case TAKxmlread:
- case TAKdiskread:
- case TAKjsonread:
- if (node.getPropBool("att[@name='_isSpill']/@value", false) || node.getPropBool("att[@name='_isSpillGlobal']/@value", false))
- return;
- break;
- case TAKkeyedjoin:
- case TAKkeyeddenormalize:
- case TAKkeyeddenormalizegroup:
- case TAKdisknormalize:
- case TAKdiskcount:
- case TAKdiskaggregate:
- case TAKdiskgroupaggregate:
- case TAKindexread:
- case TAKindexnormalize:
- case TAKindexcount:
- case TAKindexaggregate:
- case TAKindexgroupaggregate:
- case TAKindexgroupexists:
- case TAKindexgroupcount:
- case TAKfetch:
- case TAKcsvfetch:
- case TAKxmlfetch:
- case TAKjsonfetch:
- case TAKremotegraph:
- break;
- case TAKsubgraph:
- break;
- default:
- return;
- }
- ISlaveActivityFactory *newAct = NULL;
- if (kind != TAKsubgraph)
- {
- if (isSuspended)
- newAct = createRoxieDummyActivityFactory(node, subgraphId, *this, false); // MORE - is there any point?
- else
- {
- StringBuffer helperName;
- node.getProp("att[@name=\"helper\"]/@value", helperName);
- if (!helperName.length())
- helperName.append("fAc").append(node.getPropInt("@id", 0));
- HelperFactory *helperFactory = dll->getFactory(helperName.str());
- if (!helperFactory)
- throw MakeStringException(ROXIE_INTERNAL_ERROR, "Internal error: helper function %s not exported", helperName.str());
- switch (kind)
- {
- case TAKdiskread:
- newAct = createRoxieDiskReadActivityFactory(node, subgraphId, *this, helperFactory);
- break;
- case TAKcsvread:
- newAct = createRoxieCsvReadActivityFactory(node, subgraphId, *this, helperFactory);
- break;
- case TAKxmlread:
- case TAKjsonread:
- newAct = createRoxieXmlReadActivityFactory(node, subgraphId, *this, helperFactory);
- break;
- case TAKdisknormalize:
- newAct = createRoxieDiskNormalizeActivityFactory(node, subgraphId, *this, helperFactory);
- break;
- case TAKdiskcount:
- newAct = createRoxieDiskCountActivityFactory(node, subgraphId, *this, helperFactory);
- break;
- case TAKdiskaggregate:
- newAct = createRoxieDiskAggregateActivityFactory(node, subgraphId, *this, helperFactory);
- break;
- case TAKdiskgroupaggregate:
- newAct = createRoxieDiskGroupAggregateActivityFactory(node, subgraphId, *this, helperFactory);
- break;
- case TAKindexread:
- newAct = createRoxieIndexReadActivityFactory(node, subgraphId, *this, helperFactory);
- break;
- case TAKindexnormalize:
- newAct = createRoxieIndexNormalizeActivityFactory(node, subgraphId, *this, helperFactory);
- break;
- case TAKindexcount:
- newAct = createRoxieIndexCountActivityFactory(node, subgraphId, *this, helperFactory);
- break;
- case TAKindexaggregate:
- newAct = createRoxieIndexAggregateActivityFactory(node, subgraphId, *this, helperFactory);
- break;
- case TAKindexgroupaggregate:
- case TAKindexgroupexists:
- case TAKindexgroupcount:
- newAct = createRoxieIndexGroupAggregateActivityFactory(node, subgraphId, *this, helperFactory, kind);
- break;
- case TAKfetch:
- newAct = createRoxieFetchActivityFactory(node, subgraphId, *this, helperFactory);
- break;
- case TAKcsvfetch:
- newAct = createRoxieCSVFetchActivityFactory(node, subgraphId, *this, helperFactory);
- break;
- case TAKxmlfetch:
- case TAKjsonfetch:
- newAct = createRoxieXMLFetchActivityFactory(node, subgraphId, *this, helperFactory);
- break;
- case TAKkeyedjoin:
- case TAKkeyeddenormalize:
- case TAKkeyeddenormalizegroup:
- newAct = createRoxieKeyedJoinIndexActivityFactory(node, subgraphId, *this, helperFactory);
- if (node.getPropBool("att[@name=\"_diskAccessRequired\"]/@value"))
- {
- ISlaveActivityFactory *newAct2 = createRoxieKeyedJoinFetchActivityFactory(node, subgraphId, *this, helperFactory);
- unsigned activityId2 = newAct2->queryId() | ROXIE_ACTIVITY_FETCH;
- activities->append(*newAct2);
- allActivities.setValue(activityId2, newAct2);
- }
- break;
- case TAKremotegraph:
- {
- unsigned graphId = node.getPropInt("att[@name=\"_graphid\"]/@value", 0);
- newAct = createRoxieRemoteActivityFactory(node, subgraphId, *this, helperFactory, graphId);
- break;
- }
- default:
- throwUnexpected();
- }
- }
- if (newAct)
- {
- addActivity(newAct, activities);
- }
- }
- else if (kind == TAKsubgraph)
- {
- // If the subgraph belongs to a remote activity, we need to be able to execute it on the slave...
- IPropertyTree * childGraphNode = node.queryPropTree("att/graph");
- if (!childGraphNode->getPropBool("@child"))
- {
- unsigned parentId = findParentId(node);
- assertex(parentId);
- unsigned parentIndex = activities->findActivityIndex(parentId);
- if (parentIndex != NotFound)
- {
- ActivityArray *childQuery = loadChildGraph(*childGraphNode);
- activities->item(parentIndex).addChildQuery(node.getPropInt("@id"), childQuery);
- }
- }
- // Regardless, we need to make sure we create remote activities as required throughout the graph
- Owned<IPropertyTreeIterator> nodes = node.getElements("att/graph/node");
- unsigned subgraphId = node.getPropInt("@id");
- ForEach(*nodes)
- {
- IPropertyTree &node = nodes->query();
- loadSlaveNode(node, subgraphId, activities);
- }
- }
- }
- void loadOuterSubgraph(IPropertyTree &graph, ActivityArray *activities)
- {
- Owned<IPropertyTreeIterator> nodes = graph.getElements("att/graph/node");
- unsigned subgraphId = graph.getPropInt("@id");
- ForEach(*nodes)
- {
- IPropertyTree &node = nodes->query();
- loadSlaveNode(node, subgraphId, activities);
- }
- loadSlaveNode(graph, subgraphId, activities); // MORE - not really sure why this line is here!
- }
- public:
- CSlaveQueryFactory(const char *_id, const IQueryDll *_dll, const IRoxiePackage &_package, hash64_t _hashValue, unsigned _channelNo, ISharedOnceContext *_sharedOnceContext, bool _dynamic)
- : CQueryFactory(_id, _dll, _package, _hashValue, _channelNo, _sharedOnceContext, _dynamic)
- {
- }
- virtual IRoxieSlaveContext *createSlaveContext(const SlaveContextLogger &logctx, IRoxieQueryPacket *packet, bool hasChildren) const
- {
- return ::createSlaveContext(this, logctx, packet, hasChildren);
- }
- virtual ActivityArray *loadGraph(IPropertyTree &graph, const char *graphName)
- {
- // MORE: common up with loadGraph for the Roxie server..
- bool isLibraryGraph = graph.getPropBool("@library");
- bool isSequential = graph.getPropBool("@sequential");
- ActivityArray *activities = new ActivityArray(isLibraryGraph, false, isLibraryGraph, isSequential);
- if (isLibraryGraph)
- activities->setLibraryGraphId(graph.getPropInt("node/@id"));
- try
- {
- if (false && isLibraryGraph)
- {
- //Really only need to do this if the library is called from a remote activity
- //but it's a bit tricky to work out since the library graph will come before the use.
- //Not a major issue since libraries won't be embedded for production queries.
- // this comment makes little sense...
- Owned<IPropertyTreeIterator> subgraphs = graph.getElements("node");
- ForEach(*subgraphs)
- {
- IPropertyTree &node = subgraphs->query();
- loadSubgraph(node, activities);
- loadNode(node, 0, activities);
- }
- }
- Owned<IPropertyTreeIterator> subgraphs = graph.getElements("node");
- ForEach(*subgraphs)
- {
- IPropertyTree &subgraph = subgraphs->query();
- loadOuterSubgraph(subgraph, activities);
- }
- addDependencies(graph, activities);
- }
- catch (...)
- {
- ::Release(activities);
- allActivities.kill();
- throw;
- }
- return activities;
- }
- };
- IQueryFactory *createSlaveQueryFactory(const char *id, const IQueryDll *dll, const IRoxiePackage &package, unsigned channel, const IPropertyTree *stateInfo, bool isDynamic, bool forceRetry)
- {
- CriticalBlock b(CQueryFactory::queryCreateLock);
- IArrayOf<IResolvedFile> queryFiles; // Note - these should stay in scope long enough to ensure still cached when (if) query is loaded for real
- hash64_t hashValue = CQueryFactory::getQueryHash(id, dll, package, stateInfo, queryFiles, isDynamic);
- IQueryFactory *cached = getQueryFactory(hashValue, channel);
- if (cached)
- {
- ::Release(dll);
- return cached;
- }
- if (dll)
- {
- checkWorkunitVersionConsistency(dll);
- Owned<IQueryFactory> serverFactory = createServerQueryFactory(id, LINK(dll), package, stateInfo, false, forceRetry); // Should always find a cached one
- Owned<CSlaveQueryFactory> newFactory = new CSlaveQueryFactory(id, dll, dynamic_cast<const IRoxiePackage&>(package), hashValue, channel, serverFactory->querySharedOnceContext(), isDynamic);
- newFactory->load(stateInfo);
- return newFactory.getClear();
- }
- else
- return new CSlaveQueryFactory(id, NULL, dynamic_cast<const IRoxiePackage&>(package), hashValue, channel, NULL, isDynamic);
- }
- extern IQueryFactory *createSlaveQueryFactoryFromWu(IConstWorkUnit *wu, unsigned channelNo)
- {
- Owned<const IQueryDll> dll = createWuQueryDll(wu);
- if (!dll)
- return NULL;
- return createSlaveQueryFactory(wu->queryWuid(), dll.getClear(), queryRootRoxiePackage(), channelNo, NULL, true, false); // MORE - if use a constant for id might cache better?
- }
|