ccdquery.cpp 86 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323132413251326132713281329133013311332133313341335133613371338133913401341134213431344134513461347134813491350135113521353135413551356135713581359136013611362136313641365136613671368136913701371137213731374137513761377137813791380138113821383138413851386138713881389139013911392139313941395139613971398139914001401140214031404140514061407140814091410141114121413141414151416141714181419142014211422142314241425142614271428142914301431143214331434143514361437143814391440144114421443144414451446144714481449145014511452145314541455145614571458145914601461146214631464146514661467146814691470147114721473147414751476147714781479148014811482148314841485148614871488148914901491149214931494149514961497149814991500150115021503150415051506150715081509151015111512151315141515151615171518151915201521152215231524152515261527152815291530153115321533153415351536153715381539154015411542154315441545154615471548154915501551155215531554155515561557155815591560156115621563156415651566156715681569157015711572157315741575157615771578157915801581158215831584158515861587158815891590159115921593159415951596159715981599160016011602160316041605160616071608160916101611161216131614161516161617161816191620162116221623162416251626162716281629163016311632163316341635163616371638163916401641164216431644164516461647164816491650165116521653165416551656165716581659166016611662166316641665166616671668166916701671167216731674167516761677167816791680168116821683168416851686168716881689169016911692169316941695169616971698169917001701170217031704170517061707170817091710171117121713171417151716171717181719172017211722172317241725172617271728172917301731173217331734173517361737173817391740174117421743174417451746174717481749175017511752175317541755175617571758175917601761176217631764176517661767176817691770177117721773177417751776177717781779178017811782178317841785178617871788178917901791179217931794179517961797179817991800180118021803180418051806180718081809181018111812181318141815181618171818181918201821182218231824182518261827182818291830183118321833183418351836183718381839184018411842184318441845184618471848184918501851185218531854185518561857185818591860186118621863186418651866186718681869187018711872187318741875187618771878187918801881188218831884188518861887188818891890189118921893189418951896189718981899190019011902190319041905190619071908190919101911191219131914191519161917191819191920192119221923192419251926192719281929193019311932193319341935193619371938193919401941194219431944194519461947194819491950195119521953195419551956195719581959196019611962196319641965196619671968196919701971197219731974197519761977
  1. /*##############################################################################
  2. HPCC SYSTEMS software Copyright (C) 2012 HPCC Systems®.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. ############################################################################## */
  13. #include <platform.h>
  14. #include <jlib.hpp>
  15. #include "ccd.hpp"
  16. #include "ccdquery.hpp"
  17. #include "ccdstate.hpp"
  18. #include "ccdsnmp.hpp"
  19. #include "ccdserver.hpp"
  20. #include "ccdcontext.hpp"
  21. #include "thorplugin.hpp"
  22. void ActivityArray::append(IActivityFactory &cur)
  23. {
  24. hash.setValue(cur.queryId(), activities.ordinality());
  25. activities.append(cur);
  26. }
  27. unsigned ActivityArray::findActivityIndex(unsigned id) const
  28. {
  29. unsigned *ret = hash.getValue(id);
  30. if (ret)
  31. return *ret;
  32. return NotFound;
  33. }
  34. unsigned ActivityArray::recursiveFindActivityIndex(unsigned id)
  35. {
  36. // NOTE - this returns the activity index of the PARENT of the specified activity
  37. unsigned *ret = hash.getValue(id);
  38. if (ret)
  39. return *ret;
  40. ForEachItem(idx)
  41. {
  42. IActivityFactory & cur = item(idx);
  43. unsigned childId;
  44. for (unsigned childIdx = 0;;childIdx++)
  45. {
  46. ActivityArray * children = cur.queryChildQuery(childIdx, childId);
  47. if (!children)
  48. break;
  49. if (children->recursiveFindActivityIndex(id) != NotFound)
  50. {
  51. hash.setValue(id, idx);
  52. return idx;
  53. }
  54. }
  55. }
  56. return NotFound;
  57. }
  58. //----------------------------------------------------------------------------------------------
  59. // Class CQueryDll maps dlls into loadable workunits, complete with caching to ensure that a refresh of the QuerySet
  60. // can avoid reloading dlls, and that the same CQueryDll (and the objects it owns) can be shared between server and
  61. // multiple slave channels
  62. //----------------------------------------------------------------------------------------------
  63. class CQueryDll : implements IQueryDll, public CInterface
  64. {
  65. StringAttr dllName;
  66. Owned <ILoadedDllEntry> dll;
  67. Owned <IConstWorkUnit> wu;
  68. static CriticalSection dllCacheLock;
  69. static CopyMapStringToMyClass<CQueryDll> dllCache;
  70. public:
  71. IMPLEMENT_IINTERFACE;
  72. CQueryDll(const char *_dllName, ILoadedDllEntry *_dll) : dllName(_dllName), dll(_dll)
  73. {
  74. StringBuffer wuXML;
  75. if (!selfTestMode && getEmbeddedWorkUnitXML(dll, wuXML))
  76. {
  77. Owned<ILocalWorkUnit> localWU = createLocalWorkUnit(wuXML);
  78. wu.setown(localWU->unlock());
  79. }
  80. CriticalBlock b(dllCacheLock);
  81. dllCache.setValue(dllName, this);
  82. }
  83. virtual void beforeDispose()
  84. {
  85. CriticalBlock b(dllCacheLock);
  86. // NOTE: it's theoretically possible for the final release to happen after a replacement has been inserted into hash table.
  87. // So only remove from hash table if what we find there matches the item that is being deleted.
  88. CQueryDll *goer = dllCache.getValue(dllName);
  89. if (goer == this)
  90. dllCache.remove(dllName);
  91. }
  92. static const CQueryDll *getQueryDll(const char *dllName, bool isExe)
  93. {
  94. CriticalBlock b(dllCacheLock);
  95. CQueryDll *dll = LINK(dllCache.getValue(dllName));
  96. if (dll && dll->isAlive())
  97. return dll;
  98. else
  99. {
  100. Owned<ILoadedDllEntry> dll = isExe ? createExeDllEntry(dllName) : queryRoxieDllServer().loadDll(dllName, DllLocationDirectory);
  101. assertex(dll != NULL);
  102. return new CQueryDll(dllName, dll.getClear());
  103. }
  104. }
  105. static const IQueryDll *getWorkUnitDll(IConstWorkUnit *wu)
  106. {
  107. SCMStringBuffer dllName;
  108. Owned<IConstWUQuery> q = wu->getQuery();
  109. q->getQueryDllName(dllName);
  110. if (dllName.length() == 0)
  111. {
  112. if (wu->getCodeVersion() == 0)
  113. throw makeStringExceptionV(ROXIE_MISSING_DLL, "Attempting to load workunit %s that hasn't been compiled", wu->queryWuid());
  114. else
  115. throw makeStringExceptionV(ROXIE_MISSING_DLL, "Attempting to load workunit %s with no associated dll", wu->queryWuid());
  116. }
  117. return getQueryDll(dllName.str(), false);
  118. }
  119. virtual HelperFactory *getFactory(const char *helperName) const
  120. {
  121. return (HelperFactory *) dll->getEntry(helperName);
  122. }
  123. virtual ILoadedDllEntry *queryDll() const
  124. {
  125. return dll;
  126. }
  127. virtual IConstWorkUnit *queryWorkUnit() const
  128. {
  129. return wu;
  130. }
  131. };
  132. CriticalSection CQueryDll::dllCacheLock;
  133. CopyMapStringToMyClass<CQueryDll> CQueryDll::dllCache;
  134. extern const IQueryDll *createQueryDll(const char *dllName)
  135. {
  136. return CQueryDll::getQueryDll(dllName, false);
  137. }
  138. extern const IQueryDll *createExeQueryDll(const char *exeName)
  139. {
  140. return CQueryDll::getQueryDll(exeName, true);
  141. }
  142. extern const IQueryDll *createWuQueryDll(IConstWorkUnit *wu)
  143. {
  144. return CQueryDll::getWorkUnitDll(wu);
  145. }
  146. // Add information to the xref information to be returned for a control:getQueryXrefInfo request
  147. IPropertyTree * addXrefInfo(IPropertyTree &reply, const char *section, const char *name)
  148. {
  149. VStringBuffer xpath("%s[@name='%s']", section, name);
  150. if (!reply.hasProp(xpath))
  151. {
  152. IPropertyTree *info = reply.addPropTree(section);
  153. info->setProp("@name", name);
  154. return info;
  155. }
  156. return NULL;
  157. }
  158. extern void addXrefFileInfo(IPropertyTree &reply, const IResolvedFile *dataFile)
  159. {
  160. if (dataFile->isSuperFile())
  161. {
  162. IPropertyTree *info = addXrefInfo(reply, "SuperFile", dataFile->queryFileName());
  163. if (info)
  164. {
  165. int numSubs = dataFile->numSubFiles();
  166. for (int i = 0; i < numSubs; i++)
  167. {
  168. StringBuffer subName;
  169. dataFile->getSubFileName(i, subName);
  170. addXrefInfo(*info, "File", subName.str());
  171. }
  172. }
  173. }
  174. else
  175. addXrefInfo(reply, "File", dataFile->queryFileName());
  176. }
  177. extern void addXrefLibraryInfo(IPropertyTree &reply, const char *libraryName)
  178. {
  179. addXrefInfo(reply, "Library", libraryName);
  180. }
  181. //----------------------------------------------------------------------------------------------
  182. // Class CSharedOnceContext manages the context for a query's ONCE code, which is shared between
  183. // all slave and server contexts on a node
  184. //----------------------------------------------------------------------------------------------
  185. class CSharedOnceContext : public CInterfaceOf<ISharedOnceContext>
  186. {
  187. public:
  188. CSharedOnceContext()
  189. {
  190. }
  191. ~CSharedOnceContext()
  192. {
  193. }
  194. virtual IDeserializedResultStore &queryOnceResultStore() const
  195. {
  196. assertex(onceResultStore!= NULL);
  197. return *onceResultStore;
  198. }
  199. virtual IPropertyTree &queryOnceContext(const IQueryFactory *factory, const IRoxieContextLogger &logctx) const
  200. {
  201. checkOnceDone(factory, logctx);
  202. assertex(onceContext != NULL);
  203. return *onceContext;
  204. }
  205. virtual void checkOnceDone(const IQueryFactory *factory, const IRoxieContextLogger &logctx) const
  206. {
  207. CriticalBlock b(onceCrit);
  208. if (!onceContext)
  209. {
  210. onceContext.setown(createPTree(ipt_lowmem));
  211. onceResultStore.setown(createDeserializedResultStore());
  212. Owned <IRoxieServerContext> ctx = createOnceServerContext(factory, logctx);
  213. onceManager.set(&ctx->queryRowManager());
  214. try
  215. {
  216. ctx->process();
  217. ctx->done(false);
  218. }
  219. catch (IException *E)
  220. {
  221. ctx->done(true);
  222. onceException.setown(E);
  223. }
  224. catch (...)
  225. {
  226. ctx->done(true);
  227. onceException.setown(MakeStringException(ROXIE_INTERNAL_ERROR, "Unknown exception in ONCE code"));
  228. }
  229. }
  230. if (onceException)
  231. throw onceException.getLink();
  232. }
  233. protected:
  234. mutable CriticalSection onceCrit;
  235. mutable Owned<roxiemem::IRowManager> onceManager; // release AFTER resultStore
  236. mutable Owned<IPropertyTree> onceContext;
  237. mutable Owned<IDeserializedResultStore> onceResultStore;
  238. mutable Owned<IException> onceException;
  239. };
  240. //----------------------------------------------------------------------------------------------
  241. // Class CQueryOptions is used to store options affecting the execution of a query
  242. // These can be set globally, byt he query workunit, or by the query XML parameters
  243. //----------------------------------------------------------------------------------------------
  244. QueryOptions::QueryOptions()
  245. {
  246. priority = 0;
  247. timeLimit = defaultTimeLimit[0];
  248. warnTimeLimit = defaultWarnTimeLimit[0];
  249. memoryLimit = defaultMemoryLimit;
  250. parallelJoinPreload = defaultParallelJoinPreload;
  251. fullKeyedJoinPreload = defaultFullKeyedJoinPreload;
  252. keyedJoinPreload = defaultKeyedJoinPreload;
  253. concatPreload = defaultConcatPreload;
  254. fetchPreload = defaultFetchPreload;
  255. prefetchProjectPreload = defaultPrefetchProjectPreload;
  256. bindCores = coresPerQuery;
  257. strandBlockSize = defaultStrandBlockSize;
  258. forceNumStrands = defaultForceNumStrands;
  259. heapFlags = defaultHeapFlags;
  260. checkingHeap = defaultCheckingHeap;
  261. disableLocalOptimizations = defaultDisableLocalOptimizations;
  262. enableFieldTranslation = fieldTranslationEnabled;
  263. skipFileFormatCrcCheck = false;
  264. stripWhitespaceFromStoredDataset = ((ptr_ignoreWhiteSpace & defaultXmlReadFlags) != 0);
  265. timeActivities = defaultTimeActivities;
  266. traceEnabled = defaultTraceEnabled;
  267. traceLimit = defaultTraceLimit;
  268. allSortsMaySpill = false; // No global default for this
  269. failOnLeaks = false;
  270. }
  271. QueryOptions::QueryOptions(const QueryOptions &other)
  272. {
  273. priority = other.priority;
  274. timeLimit = other.timeLimit;
  275. warnTimeLimit = other.warnTimeLimit;
  276. memoryLimit = other.memoryLimit;
  277. parallelJoinPreload = other.parallelJoinPreload;;
  278. fullKeyedJoinPreload = other.fullKeyedJoinPreload;
  279. keyedJoinPreload = other.keyedJoinPreload;
  280. concatPreload = other.concatPreload;
  281. fetchPreload = other.fetchPreload;
  282. prefetchProjectPreload = other.prefetchProjectPreload;
  283. bindCores = other.bindCores;
  284. strandBlockSize = other.strandBlockSize;
  285. forceNumStrands = other.forceNumStrands;
  286. heapFlags = other.heapFlags;
  287. checkingHeap = other.checkingHeap;
  288. disableLocalOptimizations = other.disableLocalOptimizations;
  289. enableFieldTranslation = other.enableFieldTranslation;
  290. skipFileFormatCrcCheck = other.skipFileFormatCrcCheck;
  291. stripWhitespaceFromStoredDataset = other.stripWhitespaceFromStoredDataset;
  292. timeActivities = other.timeActivities;
  293. traceEnabled = other.traceEnabled;
  294. traceLimit = other.traceLimit;
  295. allSortsMaySpill = other.allSortsMaySpill;
  296. failOnLeaks = other.failOnLeaks;
  297. }
  298. void QueryOptions::setFromWorkUnit(IConstWorkUnit &wu, const IPropertyTree *stateInfo)
  299. {
  300. // calculate priority before others since it affects the defaults of others
  301. updateFromWorkUnit(priority, wu, "priority");
  302. if (stateInfo)
  303. updateFromContext(priority, stateInfo, "@priority");
  304. timeLimit = defaultTimeLimit[priority];
  305. warnTimeLimit = defaultWarnTimeLimit[priority];
  306. updateFromWorkUnit(timeLimit, wu, "timeLimit");
  307. updateFromWorkUnit(warnTimeLimit, wu, "warnTimeLimit");
  308. updateFromWorkUnitM(memoryLimit, wu, "memoryLimit");
  309. if (stateInfo)
  310. {
  311. updateFromContext(timeLimit, stateInfo, "@timeLimit");
  312. updateFromContext(warnTimeLimit, stateInfo, "@warnTimeLimit");
  313. updateFromContextM(memoryLimit, stateInfo, "@memoryLimit");
  314. }
  315. updateFromWorkUnit(parallelJoinPreload, wu, "parallelJoinPreload");
  316. updateFromWorkUnit(fullKeyedJoinPreload, wu, "fullKeyedJoinPreload");
  317. updateFromWorkUnit(keyedJoinPreload, wu, "keyedJoinPreload");
  318. updateFromWorkUnit(concatPreload, wu, "concatPreload");
  319. updateFromWorkUnit(fetchPreload, wu, "fetchPreload");
  320. updateFromWorkUnit(prefetchProjectPreload, wu, "prefetchProjectPreload");
  321. updateFromWorkUnit(bindCores, wu, "bindCores");
  322. updateFromWorkUnit(strandBlockSize, wu, "strandBlockSize");
  323. updateFromWorkUnit(forceNumStrands, wu, "forceNumStrands");
  324. updateFromWorkUnit(heapFlags, wu, "heapFlags");
  325. updateFromWorkUnit(checkingHeap, wu, "checkingHeap");
  326. updateFromWorkUnit(disableLocalOptimizations, wu, "disableLocalOptimizations");
  327. updateFromWorkUnit(enableFieldTranslation, wu, "layoutTranslationEnabled"); // Name is different for compatibility reasons
  328. updateFromWorkUnit(skipFileFormatCrcCheck, wu, "skipFileFormatCrcCheck");
  329. updateFromWorkUnit(stripWhitespaceFromStoredDataset, wu, "stripWhitespaceFromStoredDataset");
  330. updateFromWorkUnit(timeActivities, wu, "timeActivities");
  331. updateFromWorkUnit(traceEnabled, wu, "traceEnabled");
  332. updateFromWorkUnit(traceLimit, wu, "traceLimit");
  333. updateFromWorkUnit(allSortsMaySpill, wu, "allSortsMaySpill");
  334. updateFromWorkUnit(failOnLeaks, wu, "failOnLeaks");
  335. }
  336. void QueryOptions::updateFromWorkUnitM(memsize_t &value, IConstWorkUnit &wu, const char *name)
  337. {
  338. value = (memsize_t) wu.getDebugValueInt64(name, value);
  339. }
  340. void QueryOptions::updateFromWorkUnit(int &value, IConstWorkUnit &wu, const char *name)
  341. {
  342. value = wu.getDebugValueInt(name, value);
  343. }
  344. void QueryOptions::updateFromWorkUnit(unsigned &value, IConstWorkUnit &wu, const char *name)
  345. {
  346. value = (unsigned) wu.getDebugValueInt(name, value);
  347. }
  348. void QueryOptions::updateFromWorkUnit(bool &value, IConstWorkUnit &wu, const char *name)
  349. {
  350. value = wu.getDebugValueBool(name, value);
  351. }
  352. void QueryOptions::updateFromWorkUnit(RecordTranslationMode &value, IConstWorkUnit &wu, const char *name)
  353. {
  354. SCMStringBuffer val;
  355. wu.getDebugValue(name, val);
  356. if (val.length())
  357. value = getTranslationMode(val.str());
  358. }
  359. void QueryOptions::setFromContext(const IPropertyTree *ctx)
  360. {
  361. if (ctx)
  362. {
  363. // Note: priority cannot be set at context level
  364. updateFromContext(timeLimit, ctx, "@timeLimit", "_TimeLimit");
  365. updateFromContext(warnTimeLimit, ctx, "@warnTimeLimit", "_WarnTimeLimit");
  366. updateFromContextM(memoryLimit, ctx, "@memoryLimit", "_MemoryLimit");
  367. updateFromContext(parallelJoinPreload, ctx, "@parallelJoinPreload", "_ParallelJoinPreload");
  368. updateFromContext(fullKeyedJoinPreload, ctx, "@fullKeyedJoinPreload", "_FullKeyedJoinPreload");
  369. updateFromContext(keyedJoinPreload, ctx, "@keyedJoinPreload", "_KeyedJoinPreload");
  370. updateFromContext(concatPreload, ctx, "@concatPreload", "_ConcatPreload");
  371. updateFromContext(fetchPreload, ctx, "@fetchPreload", "_FetchPreload");
  372. updateFromContext(prefetchProjectPreload, ctx, "@prefetchProjectPreload", "_PrefetchProjectPreload");
  373. updateFromContext(bindCores, ctx, "@bindCores", "_bindCores");
  374. updateFromContext(strandBlockSize, ctx, "@strandBlockSize", "_strandBlockSize");
  375. updateFromContext(forceNumStrands, ctx, "@forceNumStrands", "_forceNumStrands");
  376. updateFromContext(heapFlags, ctx, "@heapFlags", "_HeapFlags");
  377. updateFromContext(checkingHeap, ctx, "@checkingHeap", "_CheckingHeap");
  378. // Note: disableLocalOptimizations is not permitted at context level (too late)
  379. // Note: enableFieldTranslation is not permitted at context level (generally too late anyway)
  380. // Note: skipFileFormatCrcCheck is not permitted at context level (generally too late anyway)
  381. updateFromContext(stripWhitespaceFromStoredDataset, ctx, "_StripWhitespaceFromStoredDataset", "@stripWhitespaceFromStoredDataset");
  382. updateFromContext(timeActivities, ctx, "@timeActivities", "_TimeActivities");
  383. updateFromContext(traceEnabled, ctx, "@traceEnabled", "_TraceEnabled");
  384. updateFromContext(traceLimit, ctx, "@traceLimit", "_TraceLimit");
  385. // Note: allSortsMaySpill is not permitted at context level (too late anyway, unless I refactored)
  386. updateFromContext(failOnLeaks, ctx, "@failOnLeaks", "_FailOnLeaks");
  387. }
  388. }
  389. const char * QueryOptions::findProp(const IPropertyTree *ctx, const char *name1, const char *name2)
  390. {
  391. if (name1 && ctx->hasProp(name1))
  392. return name1;
  393. else if (name2 && ctx->hasProp(name2))
  394. return name2;
  395. else
  396. return NULL;
  397. }
  398. void QueryOptions::updateFromContextM(memsize_t &value, const IPropertyTree *ctx, const char *name1, const char *name2)
  399. {
  400. const char *name = findProp(ctx, name1, name2);
  401. if (name)
  402. value = (memsize_t) ctx->getPropInt64(name);
  403. }
  404. void QueryOptions::updateFromContext(int &value, const IPropertyTree *ctx, const char *name1, const char *name2)
  405. {
  406. const char *name = findProp(ctx, name1, name2);
  407. if (name)
  408. value = ctx->getPropInt(name);
  409. }
  410. void QueryOptions::updateFromContext(unsigned &value, const IPropertyTree *ctx, const char *name1, const char *name2)
  411. {
  412. const char *name = findProp(ctx, name1, name2);
  413. if (name)
  414. value = (unsigned) ctx->getPropInt(name);
  415. }
  416. void QueryOptions::updateFromContext(bool &value, const IPropertyTree *ctx, const char *name1, const char *name2)
  417. {
  418. const char *name = findProp(ctx, name1, name2);
  419. if (name)
  420. value = ctx->getPropBool(name);
  421. }
  422. void QueryOptions::setFromSlaveLoggingFlags(unsigned loggingFlags)
  423. {
  424. // MORE - priority/timelimit ?
  425. checkingHeap = (loggingFlags & LOGGING_CHECKINGHEAP) != 0;
  426. timeActivities = (loggingFlags & LOGGING_TIMEACTIVITIES) != 0;
  427. }
  428. //----------------------------------------------------------------------------------------------
  429. // Class CQueryFactory is the main implementation of IQueryFactory, combining a IQueryDll and a
  430. // package context into an object that can quickly create a the query context that executes a specific
  431. // instance of a Roxie query.
  432. // Caching is used to ensure that only queries that are affected by a package change need to be reloaded.
  433. // Derived classes handle the differences between slave and server side factories
  434. //----------------------------------------------------------------------------------------------
  435. class CQueryFactory : implements IQueryFactory, implements IResourceContext, public CInterface
  436. {
  437. protected:
  438. const IRoxiePackage &package;
  439. Owned<const IQueryDll> dll;
  440. Linked<ISharedOnceContext> sharedOnceContext;
  441. MapStringToActivityArray graphMap;
  442. StringAttr id;
  443. StringBuffer errorMessage;
  444. MapIdToActivityFactory allActivities;
  445. QueryOptions options;
  446. bool dynamic;
  447. bool isSuspended;
  448. bool isLoadFailed;
  449. ClusterType targetClusterType;
  450. unsigned libraryInterfaceHash;
  451. hash64_t hashValue;
  452. static SpinLock queriesCrit;
  453. static CopyMapXToMyClass<hash64_t, hash64_t, CQueryFactory> queryMap;
  454. mutable CIArrayOf<TerminationCallbackInfo> callbacks;
  455. mutable CriticalSection callbacksCrit;
  456. public:
  457. static CriticalSection queryCreateLock;
  458. protected:
  459. IRoxieServerActivityFactory *createActivityFactory(ThorActivityKind kind, unsigned subgraphId, IPropertyTree &node)
  460. {
  461. unsigned id = node.getPropInt("@id", 0);
  462. unsigned rid = id;
  463. if (isSuspended)
  464. return createRoxieServerDummyActivityFactory(id, subgraphId, *this, NULL, TAKnone, node, false); // Is there actually any point?
  465. switch (options.priority)
  466. {
  467. case 1:
  468. rid |= ROXIE_HIGH_PRIORITY;
  469. break;
  470. case 2:
  471. rid |= ROXIE_SLA_PRIORITY;
  472. break;
  473. }
  474. StringBuffer helperName;
  475. node.getProp("att[@name=\"helper\"]/@value", helperName);
  476. if (!helperName.length())
  477. helperName.append("fAc").append(id);
  478. HelperFactory *helperFactory = dll->getFactory(helperName);
  479. if (!helperFactory)
  480. throw MakeStringException(ROXIE_INTERNAL_ERROR, "Internal error: helper function %s not exported", helperName.str());
  481. RemoteActivityId remoteId(rid, hashValue);
  482. RemoteActivityId remoteId2(rid | ROXIE_ACTIVITY_FETCH, hashValue);
  483. switch (kind)
  484. {
  485. case TAKalljoin:
  486. case TAKalldenormalize:
  487. case TAKalldenormalizegroup:
  488. return createRoxieServerAllJoinActivityFactory(id, subgraphId, *this, helperFactory, kind, node);
  489. case TAKapply:
  490. return createRoxieServerApplyActivityFactory(id, subgraphId, *this, helperFactory, kind, node, isRootAction(node));
  491. case TAKaggregate:
  492. case TAKexistsaggregate: // could special case.
  493. case TAKcountaggregate:
  494. return createRoxieServerAggregateActivityFactory(id, subgraphId, *this, helperFactory, kind, node);
  495. case TAKcase:
  496. case TAKchildcase:
  497. return createRoxieServerCaseActivityFactory(id, subgraphId, *this, helperFactory, kind, node, isGraphIndependent(node));
  498. case TAKcatch:
  499. case TAKskipcatch:
  500. case TAKcreaterowcatch:
  501. return createRoxieServerCatchActivityFactory(id, subgraphId, *this, helperFactory, kind, node);
  502. case TAKchilditerator:
  503. return createRoxieServerChildIteratorActivityFactory(id, subgraphId, *this, helperFactory, kind, node);
  504. case TAKchoosesets:
  505. return createRoxieServerChooseSetsActivityFactory(id, subgraphId, *this, helperFactory, kind, node);
  506. case TAKchoosesetsenth:
  507. return createRoxieServerChooseSetsEnthActivityFactory(id, subgraphId, *this, helperFactory, kind, node);
  508. case TAKchoosesetslast:
  509. return createRoxieServerChooseSetsLastActivityFactory(id, subgraphId, *this, helperFactory, kind, node);
  510. case TAKproject:
  511. case TAKcountproject:
  512. return createRoxieServerProjectActivityFactory(id, subgraphId, *this, helperFactory, kind, node); // code is common between Project, CountProject
  513. case TAKfilterproject:
  514. return createRoxieServerFilterProjectActivityFactory(id, subgraphId, *this, helperFactory, kind, node);
  515. case TAKdatasetresult:
  516. case TAKrowresult:
  517. return createRoxieServerDatasetResultActivityFactory(id, subgraphId, *this, helperFactory, kind, node, isRootAction(node));
  518. case TAKdedup:
  519. return createRoxieServerDedupActivityFactory(id, subgraphId, *this, helperFactory, kind, node);
  520. case TAKdegroup:
  521. return createRoxieServerDegroupActivityFactory(id, subgraphId, *this, helperFactory, kind, node);
  522. case TAKcsvread:
  523. case TAKxmlread:
  524. case TAKjsonread:
  525. case TAKdiskread:
  526. {
  527. if (node.getPropBool("att[@name='_isSpill']/@value", false) || node.getPropBool("att[@name='_isSpillGlobal']/@value", false))
  528. return createRoxieServerSpillReadActivityFactory(id, subgraphId, *this, helperFactory, kind, node);
  529. else
  530. return createRoxieServerDiskReadActivityFactory(id, subgraphId, *this, helperFactory, kind, node, remoteId);
  531. }
  532. case TAKmemoryspillread:
  533. return createRoxieServerSpillReadActivityFactory(id, subgraphId, *this, helperFactory, kind, node);
  534. case TAKdisknormalize:
  535. case TAKdiskcount:
  536. case TAKdiskaggregate:
  537. case TAKdiskgroupaggregate:
  538. return createRoxieServerDiskReadActivityFactory(id, subgraphId, *this, helperFactory, kind, node, remoteId);
  539. case TAKchildnormalize:
  540. return createRoxieServerNewChildNormalizeActivityFactory(id, subgraphId, *this, helperFactory, kind, node);
  541. case TAKchildaggregate:
  542. return createRoxieServerNewChildAggregateActivityFactory(id, subgraphId, *this, helperFactory, kind, node);
  543. case TAKchildgroupaggregate:
  544. return createRoxieServerNewChildGroupAggregateActivityFactory(id, subgraphId, *this, helperFactory, kind, node);
  545. case TAKchildthroughnormalize:
  546. return createRoxieServerNewChildThroughNormalizeActivityFactory(id, subgraphId, *this, helperFactory, kind, node);
  547. case TAKcsvwrite:
  548. case TAKdiskwrite:
  549. case TAKxmlwrite:
  550. case TAKjsonwrite:
  551. case TAKmemoryspillwrite:
  552. return createRoxieServerDiskWriteActivityFactory(id, subgraphId, *this, helperFactory, kind, node, isRootAction(node));
  553. case TAKindexwrite:
  554. return createRoxieServerIndexWriteActivityFactory(id, subgraphId, *this, helperFactory, kind, node, isRootAction(node));
  555. case TAKenth:
  556. return createRoxieServerEnthActivityFactory(id, subgraphId, *this, helperFactory, kind, node);
  557. case TAKfetch:
  558. case TAKcsvfetch:
  559. case TAKxmlfetch:
  560. case TAKjsonfetch:
  561. return createRoxieServerFetchActivityFactory(id, subgraphId, *this, helperFactory, kind, node, remoteId);
  562. case TAKfilter:
  563. return createRoxieServerFilterActivityFactory(id, subgraphId, *this, helperFactory, kind, node);
  564. case TAKfiltergroup:
  565. return createRoxieServerFilterGroupActivityFactory(id, subgraphId, *this, helperFactory, kind, node);
  566. case TAKfirstn:
  567. return createRoxieServerFirstNActivityFactory(id, subgraphId, *this, helperFactory, kind, node);
  568. case TAKfunnel:
  569. return createRoxieServerConcatActivityFactory(id, subgraphId, *this, helperFactory, kind, node);
  570. case TAKgroup:
  571. return createRoxieServerGroupActivityFactory(id, subgraphId, *this, helperFactory, kind, node);
  572. case TAKhashaggregate:
  573. return createRoxieServerHashAggregateActivityFactory(id, subgraphId, *this, helperFactory, kind, node);
  574. case TAKif:
  575. case TAKchildif:
  576. return createRoxieServerIfActivityFactory(id, subgraphId, *this, helperFactory, kind, node, isGraphIndependent(node));
  577. case TAKifaction:
  578. return createRoxieServerIfActionActivityFactory(id, subgraphId, *this, helperFactory, kind, node, isRootAction(node));
  579. case TAKparallel:
  580. return createRoxieServerParallelActionActivityFactory(id, subgraphId, *this, helperFactory, kind, node, isRootAction(node));
  581. case TAKsequential:
  582. return createRoxieServerSequentialActionActivityFactory(id, subgraphId, *this, helperFactory, kind, node, isRootAction(node));
  583. case TAKindexread:
  584. return createRoxieServerIndexReadActivityFactory(id, subgraphId, *this, helperFactory, kind, node, remoteId);
  585. case TAKindexnormalize:
  586. return createRoxieServerIndexNormalizeActivityFactory(id, subgraphId, *this, helperFactory, kind, node, remoteId);
  587. case TAKindexcount:
  588. return createRoxieServerIndexCountActivityFactory(id, subgraphId, *this, helperFactory, kind, node, remoteId);
  589. case TAKindexaggregate:
  590. return createRoxieServerIndexAggregateActivityFactory(id, subgraphId, *this, helperFactory, kind, node, remoteId);
  591. case TAKindexgroupaggregate:
  592. case TAKindexgroupexists:
  593. case TAKindexgroupcount:
  594. return createRoxieServerIndexGroupAggregateActivityFactory(id, subgraphId, *this, helperFactory, kind, node, remoteId);
  595. case TAKhashdedup:
  596. return createRoxieServerHashDedupActivityFactory(id, subgraphId, *this, helperFactory, kind, node);
  597. case TAKhashdenormalize:
  598. case TAKhashdistribute:
  599. case TAKhashdistributemerge:
  600. case TAKhashjoin:
  601. throwUnexpected(); // Code generator should have removed or transformed
  602. case TAKiterate:
  603. return createRoxieServerIterateActivityFactory(id, subgraphId, *this, helperFactory, kind, node);
  604. case TAKprocess:
  605. return createRoxieServerProcessActivityFactory(id, subgraphId, *this, helperFactory, kind, node);
  606. case TAKjoin:
  607. case TAKjoinlight:
  608. case TAKdenormalize:
  609. case TAKdenormalizegroup:
  610. return createRoxieServerJoinActivityFactory(id, subgraphId, *this, helperFactory, kind, node);
  611. case TAKkeyeddistribute:
  612. throwUnexpected(); // Code generator should have removed or transformed
  613. case TAKkeyedjoin:
  614. case TAKkeyeddenormalize:
  615. case TAKkeyeddenormalizegroup:
  616. return createRoxieServerKeyedJoinActivityFactory(id, subgraphId, *this, helperFactory, kind, node, remoteId, remoteId2);
  617. case TAKlimit:
  618. return createRoxieServerLimitActivityFactory(id, subgraphId, *this, helperFactory, kind, node);
  619. case TAKlookupjoin:
  620. case TAKlookupdenormalize:
  621. case TAKlookupdenormalizegroup:
  622. case TAKsmartjoin:
  623. case TAKsmartdenormalize:
  624. case TAKsmartdenormalizegroup:
  625. return createRoxieServerLookupJoinActivityFactory(id, subgraphId, *this, helperFactory, kind, node);
  626. case TAKmerge:
  627. return createRoxieServerMergeActivityFactory(id, subgraphId, *this, helperFactory, kind, node);
  628. case TAKnormalize:
  629. return createRoxieServerNormalizeActivityFactory(id, subgraphId, *this, helperFactory, kind, node);
  630. case TAKnormalizechild:
  631. return createRoxieServerNormalizeChildActivityFactory(id, subgraphId, *this, helperFactory, kind, node);
  632. case TAKnormalizelinkedchild:
  633. return createRoxieServerNormalizeLinkedChildActivityFactory(id, subgraphId, *this, helperFactory, kind, node);
  634. case TAKnull:
  635. return createRoxieServerNullActivityFactory(id, subgraphId, *this, helperFactory, kind, node);
  636. case TAKsideeffect:
  637. return createRoxieServerSideEffectActivityFactory(id, subgraphId, *this, helperFactory, kind, node, isRootAction(node));
  638. case TAKsimpleaction:
  639. return createRoxieServerActionActivityFactory(id, subgraphId, *this, helperFactory, kind, node, usageCount(node), isRootAction(node));
  640. case TAKparse:
  641. return createRoxieServerParseActivityFactory(id, subgraphId, *this, helperFactory, kind, node, this);
  642. case TAKworkunitwrite:
  643. return createRoxieServerWorkUnitWriteActivityFactory(id, subgraphId, *this, helperFactory, kind, node, usageCount(node), isRootAction(node));
  644. case TAKdictionaryworkunitwrite:
  645. return createRoxieServerWorkUnitWriteDictActivityFactory(id, subgraphId, *this, helperFactory, kind, node, usageCount(node), isRootAction(node));
  646. case TAKpiperead:
  647. return createRoxieServerPipeReadActivityFactory(id, subgraphId, *this, helperFactory, kind, node);
  648. case TAKpipethrough:
  649. return createRoxieServerPipeThroughActivityFactory(id, subgraphId, *this, helperFactory, kind, node);
  650. case TAKpipewrite:
  651. return createRoxieServerPipeWriteActivityFactory(id, subgraphId, *this, helperFactory, kind, node, usageCount(node), isRootAction(node));
  652. case TAKpull:
  653. return createRoxieServerPullActivityFactory(id, subgraphId, *this, helperFactory, kind, node);
  654. case TAKtrace:
  655. return createRoxieServerTraceActivityFactory(id, subgraphId, *this, helperFactory, kind, node);
  656. case TAKlinkedrawiterator:
  657. return createRoxieServerLinkedRawIteratorActivityFactory(id, subgraphId, *this, helperFactory, kind, node);
  658. case TAKremoteresult:
  659. return createRoxieServerRemoteResultActivityFactory(id, subgraphId, *this, helperFactory, kind, node, usageCount(node), isRootAction(node));
  660. case TAKrollup:
  661. return createRoxieServerRollupActivityFactory(id, subgraphId, *this, helperFactory, kind, node);
  662. case TAKsample:
  663. return createRoxieServerSampleActivityFactory(id, subgraphId, *this, helperFactory, kind, node);
  664. case TAKselectn:
  665. return createRoxieServerSelectNActivityFactory(id, subgraphId, *this, helperFactory, kind, node);
  666. case TAKselfjoin:
  667. case TAKselfjoinlight:
  668. return createRoxieServerSelfJoinActivityFactory(id, subgraphId, *this, helperFactory, kind, node);
  669. case TAKskiplimit:
  670. case TAKcreaterowlimit:
  671. return createRoxieServerSkipLimitActivityFactory(id, subgraphId, *this, helperFactory, kind, node);
  672. case TAKhttp_rowdataset:
  673. case TAKsoap_rowdataset:
  674. return createRoxieServerSoapRowCallActivityFactory(id, subgraphId, *this, helperFactory, kind, node);
  675. case TAKsoap_rowaction:
  676. return createRoxieServerSoapRowActionActivityFactory(id, subgraphId, *this, helperFactory, kind, node, isRootAction(node));
  677. case TAKsoap_datasetdataset:
  678. return createRoxieServerSoapDatasetCallActivityFactory(id, subgraphId, *this, helperFactory, kind, node);
  679. case TAKsoap_datasetaction:
  680. return createRoxieServerSoapDatasetActionActivityFactory(id, subgraphId, *this, helperFactory, kind, node, isRootAction(node));
  681. case TAKsort:
  682. return createRoxieServerSortActivityFactory(id, subgraphId, *this, helperFactory, kind, node);
  683. case TAKspill:
  684. case TAKmemoryspillsplit:
  685. return createRoxieServerThroughSpillActivityFactory(id, subgraphId, *this, helperFactory, kind, node);
  686. case TAKsplit:
  687. return createRoxieServerSplitActivityFactory(id, subgraphId, *this, helperFactory, kind, node);
  688. case TAKstreamediterator:
  689. return createRoxieServerStreamedIteratorActivityFactory(id, subgraphId, *this, helperFactory, kind, node);
  690. case TAKinlinetable:
  691. return createRoxieServerInlineTableActivityFactory(id, subgraphId, *this, helperFactory, kind, node);
  692. case TAKthroughaggregate:
  693. throwUnexpected(); // Concept of through aggregates has been proven not to work in Roxie - codegen should not be creating them any more.
  694. case TAKtopn:
  695. return createRoxieServerTopNActivityFactory(id, subgraphId, *this, helperFactory, kind, node);
  696. case TAKworkunitread:
  697. return createRoxieServerWorkUnitReadActivityFactory(id, subgraphId, *this, helperFactory, kind, node);
  698. case TAKxmlparse:
  699. return createRoxieServerXmlParseActivityFactory(id, subgraphId, *this, helperFactory, kind, node);
  700. case TAKquantile:
  701. return createRoxieServerQuantileActivityFactory(id, subgraphId, *this, helperFactory, kind, node);
  702. case TAKregroup:
  703. return createRoxieServerRegroupActivityFactory(id, subgraphId, *this, helperFactory, kind, node);
  704. case TAKcombine:
  705. return createRoxieServerCombineActivityFactory(id, subgraphId, *this, helperFactory, kind, node);
  706. case TAKcombinegroup:
  707. return createRoxieServerCombineGroupActivityFactory(id, subgraphId, *this, helperFactory, kind, node);
  708. case TAKrollupgroup:
  709. return createRoxieServerRollupGroupActivityFactory(id, subgraphId, *this, helperFactory, kind, node);
  710. case TAKlocalresultread:
  711. {
  712. unsigned graphId = getGraphId(node);
  713. return createRoxieServerLocalResultReadActivityFactory(id, subgraphId, *this, helperFactory, kind, node, graphId);
  714. }
  715. case TAKlocalstreamread:
  716. return createRoxieServerLocalResultStreamReadActivityFactory(id, subgraphId, *this, helperFactory, kind, node);
  717. case TAKlocalresultwrite:
  718. {
  719. unsigned graphId = getGraphId(node);
  720. return createRoxieServerLocalResultWriteActivityFactory(id, subgraphId, *this, helperFactory, kind, node, usageCount(node), graphId, isRootAction(node));
  721. }
  722. case TAKdictionaryresultwrite:
  723. {
  724. unsigned graphId = getGraphId(node);
  725. return createRoxieServerDictionaryResultWriteActivityFactory(id, subgraphId, *this, helperFactory, kind, node, usageCount(node), graphId, isRootAction(node));
  726. }
  727. case TAKloopcount:
  728. case TAKlooprow:
  729. case TAKloopdataset:
  730. {
  731. unsigned loopId = node.getPropInt("att[@name=\"_loopid\"]/@value", 0);
  732. return createRoxieServerLoopActivityFactory(id, subgraphId, *this, helperFactory, kind, node, loopId);
  733. }
  734. case TAKremotegraph:
  735. return createRoxieServerRemoteActivityFactory(id, subgraphId, *this, helperFactory, kind, node, remoteId, isRootAction(node));
  736. case TAKgraphloopresultread:
  737. {
  738. unsigned graphId = getGraphId(node);
  739. return createRoxieServerGraphLoopResultReadActivityFactory(id, subgraphId, *this, helperFactory, kind, node, graphId);
  740. }
  741. case TAKgraphloopresultwrite:
  742. {
  743. unsigned graphId = getGraphId(node);
  744. return createRoxieServerGraphLoopResultWriteActivityFactory(id, subgraphId, *this, helperFactory, kind, node, usageCount(node), graphId);
  745. }
  746. case TAKnwaygraphloopresultread:
  747. {
  748. unsigned graphId = node.getPropInt("att[@name=\"_graphId\"]/@value", 0);
  749. return createRoxieServerNWayGraphLoopResultReadActivityFactory(id, subgraphId, *this, helperFactory, kind, node, graphId);
  750. }
  751. case TAKnwayinput:
  752. return createRoxieServerNWayInputActivityFactory(id, subgraphId, *this, helperFactory, kind, node);
  753. case TAKnwaymerge:
  754. return createRoxieServerNWayMergeActivityFactory(id, subgraphId, *this, helperFactory, kind, node);
  755. case TAKnwaymergejoin:
  756. case TAKnwayjoin:
  757. return createRoxieServerNWayMergeJoinActivityFactory(id, subgraphId, *this, helperFactory, kind, node);
  758. case TAKsorted:
  759. return createRoxieServerSortedActivityFactory(id, subgraphId, *this, helperFactory, kind, node);
  760. case TAKgraphloop:
  761. case TAKparallelgraphloop:
  762. {
  763. unsigned loopId = node.getPropInt("att[@name=\"_loopid\"]/@value", 0);
  764. return createRoxieServerGraphLoopActivityFactory(id, subgraphId, *this, helperFactory, kind, node, loopId);
  765. }
  766. case TAKlibrarycall:
  767. {
  768. LibraryCallFactoryExtra extra;
  769. extra.maxOutputs = node.getPropInt("att[@name=\"_maxOutputs\"]/@value", 0);
  770. extra.graphid = node.getPropInt("att[@name=\"_graphid\"]/@value", 0);
  771. extra.libraryName.set(node.queryProp("att[@name=\"libname\"]/@value"));
  772. extra.interfaceHash = node.getPropInt("att[@name=\"_interfaceHash\"]/@value", 0);
  773. extra.embedded = node.getPropBool("att[@name=\"embedded\"]/@value", false) ;
  774. if (extra.embedded)
  775. {
  776. extra.embeddedGraphName.set(node.queryProp("att[@name=\"graph\"]/@value"));
  777. if (!extra.embeddedGraphName)
  778. extra.embeddedGraphName.set(extra.libraryName);
  779. }
  780. Owned<IPropertyTreeIterator> iter = node.getElements("att[@name=\"_outputUsed\"]");
  781. ForEach(*iter)
  782. extra.outputs.append(iter->query().getPropInt("@value"));
  783. return createRoxieServerLibraryCallActivityFactory(id, subgraphId, *this, helperFactory, kind, node, extra);
  784. }
  785. case TAKnwayselect:
  786. return createRoxieServerNWaySelectActivityFactory(id, subgraphId, *this, helperFactory, kind, node);
  787. case TAKnonempty:
  788. return createRoxieServerNonEmptyActivityFactory(id, subgraphId, *this, helperFactory, kind, node);
  789. case TAKprefetchproject:
  790. return createRoxieServerPrefetchProjectActivityFactory(id, subgraphId, *this, helperFactory, kind, node);
  791. case TAKwhen_dataset:
  792. return createRoxieServerWhenActivityFactory(id, subgraphId, *this, helperFactory, kind, node);
  793. case TAKwhen_action:
  794. return createRoxieServerWhenActionActivityFactory(id, subgraphId, *this, helperFactory, kind, node, isRootAction(node));
  795. case TAKdistribution:
  796. return createRoxieServerDistributionActivityFactory(id, subgraphId, *this, helperFactory, kind, node, isRootAction(node));
  797. // These are not required in Roxie for the time being - code generator should trap them
  798. case TAKchilddataset:
  799. default:
  800. throw MakeStringException(ROXIE_UNIMPLEMENTED_ERROR, "Unimplemented activity %s required", getActivityText(kind));
  801. break;
  802. }
  803. throwUnexpected(); // unreachable, but some compilers will complain about missing return
  804. }
  805. IActivityFactory *findActivity(unsigned id) const
  806. {
  807. if (id)
  808. {
  809. IActivityFactory **f = allActivities.getValue(id);
  810. if (f)
  811. return *f;
  812. }
  813. return NULL;
  814. }
  815. virtual IRoxieServerActivityFactory *getRoxieServerActivityFactory(unsigned id) const
  816. {
  817. checkSuspended();
  818. return LINK(QUERYINTERFACE(findActivity(id), IRoxieServerActivityFactory));
  819. }
  820. virtual ISlaveActivityFactory *getSlaveActivityFactory(unsigned id) const
  821. {
  822. checkSuspended();
  823. IActivityFactory *f = findActivity(id);
  824. return LINK(QUERYINTERFACE(f, ISlaveActivityFactory)); // MORE - don't dynamic cast yuk
  825. }
  826. ActivityArray *loadChildGraph(IPropertyTree &graph)
  827. {
  828. // MORE - this is starting to look very much like loadGraph (on Roxie server side)
  829. ActivityArray *activities = new ActivityArray(true, graph.getPropBool("@delayed"), graph.getPropBool("@library"), graph.getPropBool("@sequential"));
  830. unsigned subgraphId = graph.getPropInt("@id");
  831. try
  832. {
  833. Owned<IPropertyTreeIterator> nodes = graph.getElements("node");
  834. ForEach(*nodes)
  835. {
  836. IPropertyTree &node = nodes->query();
  837. loadNode(node, subgraphId, activities);
  838. }
  839. Owned<IPropertyTreeIterator> edges = graph.getElements("edge");
  840. ForEach(*edges)
  841. {
  842. IPropertyTree &edge = edges->query();
  843. unsigned source = activities->findActivityIndex(edge.getPropInt("@source",0));
  844. unsigned target = activities->findActivityIndex(edge.getPropInt("@target",0));
  845. unsigned sourceOutput = edge.getPropInt("att[@name=\"_sourceIndex\"]/@value", 0);
  846. unsigned targetInput = edge.getPropInt("att[@name=\"_targetIndex\"]/@value", 0);
  847. int controlId = edge.getPropInt("att[@name=\"_when\"]/@value", 0);
  848. if (controlId != 0)
  849. {
  850. const char * edgeId = edge.queryProp("@id");
  851. addDependency(sourceOutput, source, target, controlId, edgeId, activities);
  852. }
  853. else
  854. activities->serverItem(target).setInput(targetInput, source, sourceOutput);
  855. }
  856. }
  857. catch (...)
  858. {
  859. ::Release(activities);
  860. allActivities.kill();
  861. throw;
  862. }
  863. return activities;
  864. }
  865. void loadNode(IPropertyTree &node, unsigned subgraphId, ActivityArray *activities)
  866. {
  867. ThorActivityKind kind = getActivityKind(node);
  868. if (kind==TAKsubgraph)
  869. {
  870. IPropertyTree * childGraphNode = node.queryPropTree("att/graph");
  871. if (childGraphNode->getPropBool("@child"))
  872. {
  873. loadSubgraph(node, activities);
  874. }
  875. else
  876. {
  877. unsigned parentId = findParentId(node);
  878. assertex(parentId);
  879. unsigned parentIdx = activities->findActivityIndex(parentId);
  880. IActivityFactory &parentFactory = activities->item(parentIdx);
  881. ActivityArray *childQuery = loadChildGraph(*childGraphNode);
  882. parentFactory.addChildQuery(node.getPropInt("@id"), childQuery);
  883. }
  884. }
  885. else if (kind)
  886. {
  887. IRoxieServerActivityFactory *f = createActivityFactory(kind, subgraphId, node);
  888. if (f)
  889. {
  890. activities->append(*f);
  891. allActivities.setValue(f->queryId(), f);
  892. }
  893. }
  894. }
  895. void loadSubgraph(IPropertyTree &graph, ActivityArray *activities)
  896. {
  897. unsigned subgraphId = graph.getPropInt("@id");
  898. Owned<IPropertyTreeIterator> nodes = graph.getElements("att/graph/node");
  899. ForEach(*nodes)
  900. {
  901. IPropertyTree &node = nodes->query();
  902. loadNode(node, subgraphId, activities);
  903. }
  904. if (!isSuspended)
  905. {
  906. Owned<IPropertyTreeIterator> edges = graph.getElements("att/graph/edge");
  907. ForEach(*edges)
  908. {
  909. IPropertyTree &edge = edges->query();
  910. unsigned sourceActivity = edge.getPropInt("@source", 0);
  911. unsigned targetActivity = edge.getPropInt("@target", 0);
  912. unsigned source = activities->findActivityIndex(sourceActivity);
  913. unsigned target = activities->recursiveFindActivityIndex(targetActivity);
  914. unsigned sourceOutput = edge.getPropInt("att[@name=\"_sourceIndex\"]/@value", 0);
  915. unsigned targetInput = edge.getPropInt("att[@name=\"_targetIndex\"]/@value", 0);
  916. int controlId = edge.getPropInt("att[@name=\"_when\"]/@value", 0);
  917. if (controlId != 0)
  918. {
  919. const char * edgeId = edge.queryProp("@id");
  920. addDependency(sourceOutput, sourceActivity, targetActivity, controlId, edgeId, activities);
  921. }
  922. else
  923. activities->serverItem(target).setInput(targetInput, source, sourceOutput);
  924. }
  925. }
  926. }
  927. // loadGraph loads outer level graph. This is virtual as slave is very different from Roxie server
  928. virtual ActivityArray *loadGraph(IPropertyTree &graph, const char *graphName) = 0;
  929. bool doAddDependency(unsigned sourceIdx, unsigned sourceId, unsigned targetId, int controlId, const char *edgeId, ActivityArray * activities)
  930. {
  931. // Note - the dependency is recorded with the target being the parent activity that is at the same level as the source
  932. // (recording it on the child that was actually dependent would mean it happened too late)
  933. unsigned source = activities->findActivityIndex(sourceId);
  934. if (source != NotFound)
  935. {
  936. unsigned target = activities->recursiveFindActivityIndex(targetId);
  937. activities->serverItem(target).addDependency(source, activities->serverItem(source).getKind(), sourceIdx, controlId, edgeId);
  938. activities->serverItem(source).noteDependent(target);
  939. return true;
  940. }
  941. ForEachItemIn(idx, *activities)
  942. {
  943. IActivityFactory & cur = activities->item(idx);
  944. unsigned childId;
  945. for (unsigned childIdx = 0;;childIdx++)
  946. {
  947. ActivityArray * children = cur.queryChildQuery(childIdx, childId);
  948. if (!children)
  949. break;
  950. if (doAddDependency(sourceIdx, sourceId, targetId, controlId, edgeId, children))
  951. return true;
  952. }
  953. }
  954. return false;
  955. }
  956. virtual void addDependency(unsigned sourceIdx, unsigned sourceId, unsigned targetId, int controlId, const char *edgeId, ActivityArray * activities)
  957. {
  958. doAddDependency(sourceIdx, sourceId, targetId, controlId, edgeId, activities);
  959. }
  960. void addDependencies(IPropertyTree &graph, ActivityArray *activities)
  961. {
  962. Owned<IPropertyTreeIterator> dependencies = graph.getElements("edge");
  963. ForEach(*dependencies)
  964. {
  965. IPropertyTree &edge = dependencies->query();
  966. if (!edge.getPropInt("att[@name=\"_childGraph\"]/@value", 0))
  967. {
  968. unsigned sourceIdx = edge.getPropInt("att[@name=\"_sourceIndex\"]/@value", 0);
  969. int controlId = edge.getPropInt("att[@name=\"_when\"]/@value", 0);
  970. addDependency(sourceIdx, edge.getPropInt("att[@name=\"_sourceActivity\"]/@value", 0), edge.getPropInt("att[@name=\"_targetActivity\"]/@value", 0), controlId, edge.queryProp("@id"), activities);
  971. }
  972. }
  973. }
  974. public:
  975. IMPLEMENT_IINTERFACE;
  976. unsigned channelNo;
  977. CQueryFactory(const char *_id, const IQueryDll *_dll, const IRoxiePackage &_package, hash64_t _hashValue, unsigned _channelNo, ISharedOnceContext *_sharedOnceContext, bool _dynamic)
  978. : id(_id), package(_package), dll(_dll), channelNo(_channelNo), hashValue(_hashValue), sharedOnceContext(_sharedOnceContext), dynamic(_dynamic)
  979. {
  980. package.Link();
  981. targetClusterType = RoxieCluster;
  982. isSuspended = false;
  983. isLoadFailed = false;
  984. libraryInterfaceHash = 0;
  985. options.enableFieldTranslation = package.getEnableFieldTranslation(); // NOTE - can be overridden by wu settings
  986. options.allSortsMaySpill = dynamic;
  987. }
  988. ~CQueryFactory()
  989. {
  990. HashIterator graphs(graphMap);
  991. for(graphs.first();graphs.isValid();graphs.next())
  992. {
  993. ActivityArray *a = *graphMap.mapToValue(&graphs.query());
  994. a->Release();
  995. }
  996. package.Release();
  997. }
  998. virtual IQueryFactory *lookupLibrary(const char *libraryName, unsigned expectedInterfaceHash, const IRoxieContextLogger &logctx) const
  999. {
  1000. return globalPackageSetManager->lookupLibrary(libraryName, expectedInterfaceHash, logctx);
  1001. }
  1002. virtual void beforeDispose()
  1003. {
  1004. // NOTE: it's theoretically possible for the final release to happen after a replacement has been inserted into hash table.
  1005. // So only remove from hash table if what we find there matches the item that is being deleted.
  1006. hash64_t hv = rtlHash64Data(sizeof(channelNo), &channelNo, hashValue);
  1007. SpinBlock b(queriesCrit);
  1008. CQueryFactory *goer = queryMap.getValue(hv);
  1009. if (goer == this)
  1010. queryMap.remove(hv);
  1011. }
  1012. static IQueryFactory *getQueryFactory(hash64_t hashValue, unsigned channelNo)
  1013. {
  1014. hash64_t hv = rtlHash64Data(sizeof(channelNo), &channelNo, hashValue);
  1015. SpinBlock b(queriesCrit);
  1016. CQueryFactory *factory = LINK(queryMap.getValue(hv));
  1017. if (factory && factory->isAlive())
  1018. return factory;
  1019. else
  1020. return NULL;
  1021. }
  1022. static hash64_t getQueryHash(const char *id, const IQueryDll *dll, const IRoxiePackage &package, const IPropertyTree *stateInfo, IArrayOf<IResolvedFile> &files, bool isDynamic)
  1023. {
  1024. hash64_t hashValue = package.queryHash();
  1025. if (traceLevel > 8)
  1026. DBGLOG("getQueryHash: %s %" I64F "u from package", id, hashValue);
  1027. if (dll)
  1028. {
  1029. hashValue = rtlHash64VStr(dll->queryDll()->queryName(), hashValue);
  1030. if (traceLevel > 8)
  1031. DBGLOG("getQueryHash: %s %" I64F "u from dll", id, hashValue);
  1032. if (!lockSuperFiles && !allFilesDynamic && !isDynamic && !package.isCompulsory())
  1033. {
  1034. IConstWorkUnit *wu = dll->queryWorkUnit();
  1035. if (wu) // wu may be null in some unit test cases
  1036. {
  1037. SCMStringBuffer bStr;
  1038. // Don't want to include files referenced in thor graphs... in practice isDynamic also likely to be set in such cases
  1039. if (getClusterType(wu->getDebugValue("targetClusterType", bStr).str(), RoxieCluster) == RoxieCluster)
  1040. {
  1041. Owned<IConstWUGraphIterator> graphs = &wu->getGraphs(GraphTypeActivities);
  1042. ForEach(*graphs)
  1043. {
  1044. Owned<IPropertyTree> graphXgmml = graphs->query().getXGMMLTree(false);
  1045. Owned<IPropertyTreeIterator> nodes = graphXgmml->getElements(".//node");
  1046. ForEach(*nodes)
  1047. {
  1048. IPropertyTree &node = nodes->query();
  1049. ThorActivityKind kind = getActivityKind(node);
  1050. if (kind != TAKdiskwrite && kind != TAKindexwrite && kind != TAKpiperead && kind != TAKpipewrite)
  1051. {
  1052. const char *fileName = queryNodeFileName(node, kind);
  1053. const char *indexName = queryNodeIndexName(node, kind);
  1054. // What about packages that resolve everything without dali?
  1055. if (indexName)
  1056. {
  1057. bool isOpt = pretendAllOpt || node.getPropBool("att[@name='_isIndexOpt']/@value");
  1058. const IResolvedFile *indexFile = package.lookupFileName(indexName, isOpt, true, true, wu, true);
  1059. if (indexFile)
  1060. {
  1061. hashValue = indexFile->addHash64(hashValue);
  1062. if (traceLevel > 8)
  1063. DBGLOG("getQueryHash: %s %" I64F "u from index %s", id, hashValue, indexName);
  1064. files.append(*const_cast<IResolvedFile *>(indexFile));
  1065. }
  1066. }
  1067. if (fileName)
  1068. {
  1069. if (!node.getPropBool("att[@name='_isSpill']/@value") && !node.getPropBool("att[@name='_isSpillGlobal']/@value"))
  1070. {
  1071. bool isOpt = pretendAllOpt || node.getPropBool("att[@name='_isOpt']/@value");
  1072. const IResolvedFile *dataFile = package.lookupFileName(fileName, isOpt, true, true, wu, true);
  1073. if (dataFile)
  1074. {
  1075. hashValue = dataFile->addHash64(hashValue);
  1076. if (traceLevel > 8)
  1077. DBGLOG("getQueryHash: %s %" I64F "u from index %s", id, hashValue, fileName);
  1078. files.append(*const_cast<IResolvedFile *>(dataFile));
  1079. }
  1080. }
  1081. }
  1082. }
  1083. }
  1084. }
  1085. }
  1086. }
  1087. }
  1088. }
  1089. if (id)
  1090. hashValue = rtlHash64VStr(id, hashValue);
  1091. hashValue = rtlHash64VStr("Roxie", hashValue); // Adds some noise into the hash - otherwise adjacent wuids tend to hash very close together
  1092. if (traceLevel > 8)
  1093. DBGLOG("getQueryHash: %s %" I64F "u from id", id, hashValue);
  1094. if (stateInfo)
  1095. {
  1096. StringBuffer xml;
  1097. toXML(stateInfo, xml);
  1098. hashValue = rtlHash64Data(xml.length(), xml.str(), hashValue);
  1099. if (traceLevel > 8)
  1100. DBGLOG("getQueryHash: %s %" I64F "u from stateInfo", id, hashValue);
  1101. }
  1102. if (traceLevel > 8)
  1103. DBGLOG("getQueryHash: %s %" I64F "u", id, hashValue);
  1104. return hashValue;
  1105. }
  1106. virtual void load(const IPropertyTree *stateInfo)
  1107. {
  1108. IConstWorkUnit *wu = dll->queryWorkUnit();
  1109. if (wu) // wu may be null in some unit test cases
  1110. {
  1111. libraryInterfaceHash = wu->getApplicationValueInt("LibraryModule", "interfaceHash", 0);
  1112. options.setFromWorkUnit(*wu, stateInfo);
  1113. SCMStringBuffer bStr;
  1114. targetClusterType = getClusterType(wu->getDebugValue("targetClusterType", bStr).str(), RoxieCluster);
  1115. // NOTE: stateinfo overrides package info
  1116. if (stateInfo)
  1117. {
  1118. // info in querySets can override the defaults from workunit for some limits
  1119. isSuspended = stateInfo->getPropBool("@suspended", false);
  1120. }
  1121. if (targetClusterType == RoxieCluster)
  1122. {
  1123. Owned<IConstWUGraphIterator> graphs = &wu->getGraphs(GraphTypeActivities);
  1124. SCMStringBuffer graphNameStr;
  1125. ForEach(*graphs)
  1126. {
  1127. graphs->query().getName(graphNameStr);
  1128. const char *graphName = graphNameStr.s.str();
  1129. Owned<IPropertyTree> graphXgmml = graphs->query().getXGMMLTree(false);
  1130. try
  1131. {
  1132. ActivityArray *activities = loadGraph(*graphXgmml, graphName);
  1133. graphMap.setValue(graphName, activities);
  1134. }
  1135. catch (IException *E)
  1136. {
  1137. StringBuffer m;
  1138. E->errorMessage(m);
  1139. suspend(m.str());
  1140. ERRLOG("Query %s suspended: %s", id.get(), m.str());
  1141. E->Release();
  1142. }
  1143. }
  1144. }
  1145. }
  1146. hash64_t hv = rtlHash64Data(sizeof(channelNo), &channelNo, hashValue);
  1147. SpinBlock b(queriesCrit);
  1148. queryMap.setValue(hv, this);
  1149. }
  1150. virtual unsigned queryChannel() const
  1151. {
  1152. return channelNo;
  1153. }
  1154. virtual hash64_t queryHash() const
  1155. {
  1156. return hashValue;
  1157. }
  1158. virtual ISharedOnceContext *querySharedOnceContext() const
  1159. {
  1160. return sharedOnceContext;
  1161. }
  1162. virtual IDeserializedResultStore &queryOnceResultStore() const
  1163. {
  1164. assertex(sharedOnceContext);
  1165. return sharedOnceContext->queryOnceResultStore();
  1166. }
  1167. virtual IPropertyTree &queryOnceContext(const IRoxieContextLogger &logctx) const
  1168. {
  1169. assertex(sharedOnceContext);
  1170. return sharedOnceContext->queryOnceContext(this, logctx);
  1171. }
  1172. virtual const char *loadResource(unsigned id)
  1173. {
  1174. return (const char *) queryDll()->getResource(id);
  1175. }
  1176. virtual ActivityArray *lookupGraphActivities(const char *name) const
  1177. {
  1178. return *graphMap.getValue(name);
  1179. }
  1180. virtual IActivityGraph *lookupGraph(IRoxieSlaveContext *ctx, const char *name, IProbeManager *probeManager, const IRoxieContextLogger &logctx, IRoxieServerActivity *parentActivity) const
  1181. {
  1182. assertex(name && *name);
  1183. ActivityArrayPtr *graph = graphMap.getValue(name);
  1184. assertex(graph);
  1185. Owned<IActivityGraph> ret = ::createActivityGraph(ctx, name, 0, **graph, parentActivity, probeManager, logctx, 1);
  1186. return ret.getClear();
  1187. }
  1188. void getGraphStats(StringBuffer &reply, const IPropertyTree &thisGraph) const
  1189. {
  1190. Owned<IPropertyTree> graph = createPTreeFromIPT(&thisGraph, ipt_lowmem);
  1191. Owned<IPropertyTreeIterator> edges = graph->getElements(".//edge");
  1192. ForEach(*edges)
  1193. {
  1194. IPropertyTree &edge = edges->query();
  1195. IActivityFactory *a = findActivity(edge.getPropInt("@source", 0));
  1196. if (!a)
  1197. a = findActivity(edge.getPropInt("att[@name=\"_sourceActivity\"]/@value", 0));
  1198. if (a)
  1199. {
  1200. unsigned sourceOutput = edge.getPropInt("att[@name=\"_sourceIndex\"]/@value", 0);
  1201. a->getEdgeProgressInfo(sourceOutput, edge);
  1202. }
  1203. }
  1204. Owned<IPropertyTreeIterator> nodes = graph->getElements(".//node");
  1205. ForEach(*nodes)
  1206. {
  1207. IPropertyTree &node = nodes->query();
  1208. IActivityFactory *a = findActivity(node.getPropInt("@id", 0));
  1209. if (a)
  1210. a->getNodeProgressInfo(node);
  1211. }
  1212. toXML(graph, reply);
  1213. }
  1214. virtual IPropertyTree* cloneQueryXGMML() const
  1215. {
  1216. assertex(dll && dll->queryWorkUnit());
  1217. Owned<IPropertyTree> tree = createPTree("Query", ipt_lowmem);
  1218. Owned<IConstWUGraphIterator> graphs = &dll->queryWorkUnit()->getGraphs(GraphTypeActivities);
  1219. SCMStringBuffer graphNameStr;
  1220. ForEach(*graphs)
  1221. {
  1222. graphs->query().getName(graphNameStr);
  1223. const char *graphName = graphNameStr.s.str();
  1224. Owned<IPropertyTree> graphXgmml = graphs->query().getXGMMLTree(false);
  1225. IPropertyTree *newGraph = tree->addPropTree("Graph");
  1226. newGraph->setProp("@id", graphName);
  1227. newGraph->addPropTree("xgmml")->addPropTree("graph", graphXgmml.getLink());
  1228. }
  1229. return tree.getClear();
  1230. }
  1231. virtual void getStats(StringBuffer &reply, const char *graphName) const
  1232. {
  1233. if (dll)
  1234. {
  1235. assertex(dll->queryWorkUnit());
  1236. Owned<IConstWUGraphIterator> graphs = &dll->queryWorkUnit()->getGraphs(GraphTypeActivities);
  1237. SCMStringBuffer thisGraphNameStr;
  1238. ForEach(*graphs)
  1239. {
  1240. graphs->query().getName(thisGraphNameStr);
  1241. if (graphName)
  1242. {
  1243. if (thisGraphNameStr.length() && (stricmp(graphName, thisGraphNameStr.s.str()) != 0))
  1244. continue; // not interested in this one
  1245. }
  1246. reply.appendf("<Graph id='%s'><xgmml>", thisGraphNameStr.s.str());
  1247. Owned<IPropertyTree> graphXgmml = graphs->query().getXGMMLTree(false);
  1248. getGraphStats(reply, *graphXgmml);
  1249. reply.append("</xgmml></Graph>");
  1250. }
  1251. }
  1252. }
  1253. virtual void getActivityMetrics(StringBuffer &reply) const
  1254. {
  1255. HashIterator i(allActivities);
  1256. StringBuffer myReply;
  1257. ForEach(i)
  1258. {
  1259. IActivityFactory *f = *allActivities.mapToValue(&i.query());
  1260. f->getActivityMetrics(myReply.clear());
  1261. if (myReply.length())
  1262. {
  1263. reply.appendf(" <activity query='%s' id='%d' channel='%d'\n", queryQueryName(), f->queryId(), queryChannel());
  1264. reply.append(myReply);
  1265. reply.append(" </activity>\n");
  1266. }
  1267. }
  1268. }
  1269. virtual void getQueryInfo(StringBuffer &reply, bool full, IArrayOf<IQueryFactory> *slaveQueries, const IRoxieContextLogger &logctx) const
  1270. {
  1271. Owned<IPropertyTree> xref = createPTree("Query", ipt_fast);
  1272. xref->setProp("@id", id);
  1273. if (suspended())
  1274. {
  1275. xref->setPropBool("@suspended", true);
  1276. xref->setProp("@error", errorMessage);
  1277. }
  1278. if (full)
  1279. {
  1280. HashIterator i(allActivities);
  1281. ForEach(i)
  1282. {
  1283. IActivityFactory *f = *allActivities.mapToValue(&i.query());
  1284. f->getXrefInfo(*xref, logctx);
  1285. }
  1286. }
  1287. if (slaveQueries)
  1288. {
  1289. ForEachItemIn(idx, *slaveQueries)
  1290. {
  1291. if (slaveQueries->item(idx).suspended())
  1292. {
  1293. xref->setPropBool("@suspended", true);
  1294. xref->setPropBool("@slaveSuspended", true);
  1295. }
  1296. }
  1297. }
  1298. toXML(xref, reply, 1, XML_Embed|XML_LineBreak|XML_SortTags);
  1299. }
  1300. virtual void resetQueryTimings()
  1301. {
  1302. HashIterator i(allActivities);
  1303. ForEach(i)
  1304. {
  1305. IActivityFactory *f = *allActivities.mapToValue(&i.query());
  1306. f->resetNodeProgressInfo();
  1307. }
  1308. }
  1309. virtual const char *queryErrorMessage() const
  1310. {
  1311. return errorMessage.str();
  1312. }
  1313. virtual const char *queryQueryName() const
  1314. {
  1315. return id;
  1316. }
  1317. virtual bool isQueryLibrary() const
  1318. {
  1319. return libraryInterfaceHash != 0;
  1320. }
  1321. virtual unsigned getQueryLibraryInterfaceHash() const
  1322. {
  1323. return libraryInterfaceHash;
  1324. }
  1325. virtual void suspend(const char* errMsg)
  1326. {
  1327. isSuspended = true;
  1328. isLoadFailed = true;
  1329. errorMessage.append(errMsg);
  1330. }
  1331. virtual bool loadFailed() const
  1332. {
  1333. return isLoadFailed;
  1334. }
  1335. virtual bool suspended() const
  1336. {
  1337. return isSuspended;
  1338. }
  1339. virtual const QueryOptions &queryOptions() const
  1340. {
  1341. return options;
  1342. }
  1343. virtual ILoadedDllEntry *queryDll() const
  1344. {
  1345. assertex(dll);
  1346. return dll->queryDll();
  1347. }
  1348. virtual IConstWorkUnit *queryWorkUnit() const
  1349. {
  1350. assertex(dll);
  1351. return dll->queryWorkUnit();
  1352. }
  1353. virtual const IRoxiePackage &queryPackage() const
  1354. {
  1355. return package;
  1356. }
  1357. virtual CRoxieWorkflowMachine *createWorkflowMachine(IConstWorkUnit *wu, bool isOnce, const IRoxieContextLogger &logctx) const
  1358. {
  1359. throwUnexpected(); // only on server...
  1360. }
  1361. virtual char *getEnv(const char *name, const char *defaultValue) const
  1362. {
  1363. if (!defaultValue)
  1364. defaultValue = "";
  1365. const char *result;
  1366. if (name && *name=='@')
  1367. {
  1368. // @ is shorthand for control: for legacy compatibility reasons
  1369. StringBuffer useName;
  1370. useName.append("control:").append(name+1);
  1371. result = package.queryEnv(useName.str());
  1372. }
  1373. else
  1374. result = package.queryEnv(name);
  1375. if (!result && name)
  1376. result = getenv(name);
  1377. return strdup(result ? result : defaultValue);
  1378. }
  1379. virtual IRoxieSlaveContext *createSlaveContext(const SlaveContextLogger &logctx, IRoxieQueryPacket *packet, bool hasChildren) const
  1380. {
  1381. throwUnexpected(); // only implemented in derived slave class
  1382. }
  1383. virtual IRoxieServerContext *createContext(IPropertyTree *xml, IHpccProtocolResponse *protocol, unsigned flags, const ContextLogger &_logctx, PTreeReaderOptions xmlReadFlags, const char *querySetName) const
  1384. {
  1385. throwUnexpected(); // only implemented in derived server class
  1386. }
  1387. virtual IRoxieServerContext *createContext(IConstWorkUnit *wu, const ContextLogger &_logctx) const
  1388. {
  1389. throwUnexpected(); // only implemented in derived server class
  1390. }
  1391. virtual void noteQuery(time_t startTime, bool failed, unsigned elapsed, unsigned memused, unsigned slavesReplyLen, unsigned bytesOut)
  1392. {
  1393. throwUnexpected(); // only implemented in derived server class
  1394. }
  1395. virtual IPropertyTree *getQueryStats(time_t from, time_t to)
  1396. {
  1397. throwUnexpected(); // only implemented in derived server class
  1398. }
  1399. virtual void getGraphNames(StringArray &ret) const
  1400. {
  1401. Owned<IConstWUGraphIterator> graphs = &dll->queryWorkUnit()->getGraphs(GraphTypeActivities);
  1402. ForEach(*graphs)
  1403. {
  1404. SCMStringBuffer graphName;
  1405. graphs->query().getName(graphName);
  1406. ret.append(graphName.str());
  1407. }
  1408. }
  1409. virtual bool isDynamic() const
  1410. {
  1411. return dynamic;
  1412. }
  1413. protected:
  1414. IPropertyTree *queryWorkflowTree() const
  1415. {
  1416. assertex(dll->queryWorkUnit());
  1417. return dll->queryWorkUnit()->queryWorkflowTree();
  1418. }
  1419. bool hasOnceSection() const
  1420. {
  1421. IPropertyTree *workflow = queryWorkflowTree();
  1422. if (workflow)
  1423. return workflow->hasProp("Item[@mode='once']");
  1424. else
  1425. return false;
  1426. }
  1427. virtual void checkSuspended() const
  1428. {
  1429. if (isSuspended)
  1430. {
  1431. StringBuffer err;
  1432. if (errorMessage.length())
  1433. err.appendf(" because %s", errorMessage.str());
  1434. throw MakeStringException(ROXIE_QUERY_SUSPENDED, "Query %s is suspended%s", id.get(), err.str());
  1435. }
  1436. }
  1437. virtual void onTermination(TerminationCallbackInfo *info) const override
  1438. {
  1439. CriticalBlock b(callbacksCrit);
  1440. callbacks.append(*info);
  1441. }
  1442. };
  1443. CriticalSection CQueryFactory::queryCreateLock;
  1444. SpinLock CQueryFactory::queriesCrit;
  1445. CopyMapXToMyClass<hash64_t, hash64_t, CQueryFactory> CQueryFactory::queryMap;
  1446. extern IQueryFactory *getQueryFactory(hash64_t hashvalue, unsigned channel)
  1447. {
  1448. return CQueryFactory::getQueryFactory(hashvalue, channel);
  1449. }
  1450. class CRoxieServerQueryFactory : public CQueryFactory
  1451. {
  1452. // Parts of query factory is only interesting on the server - workflow support, and tracking of total query times
  1453. protected:
  1454. Owned<IQueryStatsAggregator> queryStats;
  1455. public:
  1456. CRoxieServerQueryFactory(const char *_id, const IQueryDll *_dll, const IRoxiePackage &_package, hash64_t _hashValue, ISharedOnceContext *_sharedOnceContext, bool _dynamic)
  1457. : CQueryFactory(_id, _dll, _package, _hashValue, 0, _sharedOnceContext, _dynamic)
  1458. {
  1459. queryStats.setown(createQueryStatsAggregator(id.get(), statsExpiryTime));
  1460. }
  1461. virtual void noteQuery(time_t startTime, bool failed, unsigned elapsed, unsigned memused, unsigned slavesReplyLen, unsigned bytesOut)
  1462. {
  1463. queryStats->noteQuery(startTime, failed, elapsed, memused, slavesReplyLen, bytesOut);
  1464. queryGlobalQueryStatsAggregator()->noteQuery(startTime, failed, elapsed, memused, slavesReplyLen, bytesOut);
  1465. }
  1466. virtual void addDependency(unsigned sourceIdx, unsigned sourceId, unsigned targetId, int controlId, const char *edgeId, ActivityArray * activities)
  1467. {
  1468. // addDependency is expected to fail occasionally on slave, but never on Roxie server
  1469. if (!doAddDependency(sourceIdx, sourceId, targetId, controlId, edgeId, activities))
  1470. throw MakeStringException(ROXIE_ADDDEPENDENCY_ERROR, "Failed to create dependency from %u on %u", sourceId, targetId);
  1471. }
  1472. virtual ActivityArray *loadGraph(IPropertyTree &graph, const char *graphName)
  1473. {
  1474. bool isLibraryGraph = graph.getPropBool("@library");
  1475. bool isSequential = graph.getPropBool("@sequential");
  1476. ActivityArray *activities = new ActivityArray(isLibraryGraph, false, isLibraryGraph, isSequential);
  1477. if (isLibraryGraph)
  1478. activities->setLibraryGraphId(graph.getPropInt("node/@id"));
  1479. try
  1480. {
  1481. Owned<IPropertyTreeIterator> subgraphs = graph.getElements("node");
  1482. ForEach(*subgraphs)
  1483. {
  1484. IPropertyTree &node = subgraphs->query();
  1485. loadSubgraph(node, activities);
  1486. loadNode(node, 0, activities);
  1487. }
  1488. addDependencies(graph, activities);
  1489. }
  1490. catch (...)
  1491. {
  1492. ::Release(activities);
  1493. allActivities.kill();
  1494. throw;
  1495. }
  1496. return activities;
  1497. }
  1498. virtual IRoxieServerContext *createContext(IPropertyTree *context, IHpccProtocolResponse *protocol, unsigned flags, const ContextLogger &_logctx, PTreeReaderOptions _xmlReadFlags, const char *_querySetName) const
  1499. {
  1500. checkSuspended();
  1501. return createRoxieServerContext(context, protocol, this, flags, _logctx, _xmlReadFlags, _querySetName);
  1502. }
  1503. virtual IRoxieServerContext *createContext(IConstWorkUnit *wu, const ContextLogger &_logctx) const
  1504. {
  1505. checkSuspended();
  1506. return createWorkUnitServerContext(wu, this, _logctx);
  1507. }
  1508. virtual CRoxieWorkflowMachine *createWorkflowMachine(IConstWorkUnit *wu, bool isOnce, const IRoxieContextLogger &logctx) const
  1509. {
  1510. IPropertyTree *workflow = queryWorkflowTree();
  1511. if (workflow)
  1512. {
  1513. return ::createRoxieWorkflowMachine(workflow, wu, isOnce, logctx);
  1514. }
  1515. else
  1516. return NULL;
  1517. }
  1518. virtual IPropertyTree *getQueryStats(time_t from, time_t to)
  1519. {
  1520. return queryStats->getStats(from, to);
  1521. }
  1522. };
  1523. unsigned checkWorkunitVersionConsistency(const IConstWorkUnit *wu)
  1524. {
  1525. assertex(wu);
  1526. unsigned wuVersion = wu->getCodeVersion();
  1527. if (wuVersion == 0)
  1528. throw makeStringException(ROXIE_MISMATCH, "Attempting to execute a workunit that hasn't been compiled");
  1529. if (wuVersion > ACTIVITY_INTERFACE_VERSION || wuVersion < MIN_ACTIVITY_INTERFACE_VERSION)
  1530. throw MakeStringException(ROXIE_MISMATCH, "Workunit was compiled for eclhelper interface version %d, this roxie requires version %d..%d", wuVersion, MIN_ACTIVITY_INTERFACE_VERSION, ACTIVITY_INTERFACE_VERSION);
  1531. return wuVersion;
  1532. }
  1533. static void checkWorkunitVersionConsistency(const IQueryDll *dll)
  1534. {
  1535. unsigned wuVersion = checkWorkunitVersionConsistency(dll->queryWorkUnit());
  1536. EclProcessFactory processFactory = (EclProcessFactory) dll->queryDll()->getEntry("createProcess");
  1537. if (processFactory)
  1538. {
  1539. Owned<IEclProcess> process = processFactory();
  1540. assertex(process);
  1541. if (process->getActivityVersion() != wuVersion)
  1542. throw MakeStringException(ROXIE_MISMATCH, "Inconsistent interface versions. Workunit was created using eclcc for version %u, but the c++ compiler used version %u", wuVersion, process->getActivityVersion());
  1543. }
  1544. else
  1545. throw MakeStringException(ROXIE_MISMATCH, "Workunit did not export createProcess function");
  1546. }
  1547. extern IQueryFactory *createServerQueryFactory(const char *id, const IQueryDll *dll, const IRoxiePackage &package, const IPropertyTree *stateInfo, bool isDynamic, bool forceRetry)
  1548. {
  1549. CriticalBlock b(CQueryFactory::queryCreateLock);
  1550. IArrayOf<IResolvedFile> queryFiles; // Note - these should stay in scope long enough to ensure still cached when (if) query is loaded for real
  1551. hash64_t hashValue = CQueryFactory::getQueryHash(id, dll, package, stateInfo, queryFiles, isDynamic);
  1552. IQueryFactory *cached = getQueryFactory(hashValue, 0);
  1553. if (cached && !(cached->loadFailed() && (reloadRetriesFailed || forceRetry)))
  1554. {
  1555. ::Release(dll);
  1556. return cached;
  1557. }
  1558. if (dll && !selfTestMode)
  1559. {
  1560. checkWorkunitVersionConsistency(dll);
  1561. Owned<ISharedOnceContext> sharedOnceContext;
  1562. IPropertyTree *workflow = dll->queryWorkUnit()->queryWorkflowTree();
  1563. if (workflow && workflow->hasProp("Item[@mode='once']"))
  1564. sharedOnceContext.setown(new CSharedOnceContext);
  1565. Owned<CRoxieServerQueryFactory> newFactory = new CRoxieServerQueryFactory(id, dll, dynamic_cast<const IRoxiePackage&>(package), hashValue, sharedOnceContext, isDynamic);
  1566. newFactory->load(stateInfo);
  1567. if (sharedOnceContext && preloadOnceData)
  1568. {
  1569. Owned<StringContextLogger> logctx = new StringContextLogger(id); // NB may get linked by the onceContext
  1570. sharedOnceContext->checkOnceDone(newFactory, *logctx);
  1571. }
  1572. return newFactory.getClear();
  1573. }
  1574. else
  1575. return new CRoxieServerQueryFactory(id, NULL, dynamic_cast<const IRoxiePackage&>(package), hashValue, NULL, isDynamic);
  1576. }
  1577. extern IQueryFactory *createServerQueryFactoryFromWu(IConstWorkUnit *wu, const IQueryDll *_dll)
  1578. {
  1579. Linked<const IQueryDll> dll = _dll;
  1580. if (!dll)
  1581. dll.setown(createWuQueryDll(wu));
  1582. if (!dll)
  1583. return NULL;
  1584. return createServerQueryFactory(wu->queryWuid(), dll.getClear(), queryRootRoxiePackage(), NULL, true, false); // MORE - if use a constant for id might cache better?
  1585. }
  1586. //==============================================================================================================================================
  1587. class CSlaveQueryFactory : public CQueryFactory
  1588. {
  1589. void addActivity(ISlaveActivityFactory *activity, ActivityArray *activities)
  1590. {
  1591. activities->append(*activity);
  1592. unsigned activityId = activity->queryId();
  1593. allActivities.setValue(activityId, activity);
  1594. }
  1595. void loadSlaveNode(IPropertyTree &node, unsigned subgraphId, ActivityArray *activities)
  1596. {
  1597. ThorActivityKind kind = getActivityKind(node);
  1598. switch (kind)
  1599. {
  1600. case TAKcsvread:
  1601. case TAKxmlread:
  1602. case TAKdiskread:
  1603. case TAKjsonread:
  1604. if (node.getPropBool("att[@name='_isSpill']/@value", false) || node.getPropBool("att[@name='_isSpillGlobal']/@value", false))
  1605. return;
  1606. break;
  1607. case TAKkeyedjoin:
  1608. case TAKkeyeddenormalize:
  1609. case TAKkeyeddenormalizegroup:
  1610. case TAKdisknormalize:
  1611. case TAKdiskcount:
  1612. case TAKdiskaggregate:
  1613. case TAKdiskgroupaggregate:
  1614. case TAKindexread:
  1615. case TAKindexnormalize:
  1616. case TAKindexcount:
  1617. case TAKindexaggregate:
  1618. case TAKindexgroupaggregate:
  1619. case TAKindexgroupexists:
  1620. case TAKindexgroupcount:
  1621. case TAKfetch:
  1622. case TAKcsvfetch:
  1623. case TAKxmlfetch:
  1624. case TAKjsonfetch:
  1625. case TAKremotegraph:
  1626. break;
  1627. case TAKsubgraph:
  1628. break;
  1629. default:
  1630. return;
  1631. }
  1632. ISlaveActivityFactory *newAct = NULL;
  1633. if (kind != TAKsubgraph)
  1634. {
  1635. if (isSuspended)
  1636. newAct = createRoxieDummyActivityFactory(node, subgraphId, *this, false); // MORE - is there any point?
  1637. else
  1638. {
  1639. StringBuffer helperName;
  1640. node.getProp("att[@name=\"helper\"]/@value", helperName);
  1641. if (!helperName.length())
  1642. helperName.append("fAc").append(node.getPropInt("@id", 0));
  1643. HelperFactory *helperFactory = dll->getFactory(helperName.str());
  1644. if (!helperFactory)
  1645. throw MakeStringException(ROXIE_INTERNAL_ERROR, "Internal error: helper function %s not exported", helperName.str());
  1646. switch (kind)
  1647. {
  1648. case TAKdiskread:
  1649. newAct = createRoxieDiskReadActivityFactory(node, subgraphId, *this, helperFactory);
  1650. break;
  1651. case TAKcsvread:
  1652. newAct = createRoxieCsvReadActivityFactory(node, subgraphId, *this, helperFactory);
  1653. break;
  1654. case TAKxmlread:
  1655. case TAKjsonread:
  1656. newAct = createRoxieXmlReadActivityFactory(node, subgraphId, *this, helperFactory);
  1657. break;
  1658. case TAKdisknormalize:
  1659. newAct = createRoxieDiskNormalizeActivityFactory(node, subgraphId, *this, helperFactory);
  1660. break;
  1661. case TAKdiskcount:
  1662. newAct = createRoxieDiskCountActivityFactory(node, subgraphId, *this, helperFactory);
  1663. break;
  1664. case TAKdiskaggregate:
  1665. newAct = createRoxieDiskAggregateActivityFactory(node, subgraphId, *this, helperFactory);
  1666. break;
  1667. case TAKdiskgroupaggregate:
  1668. newAct = createRoxieDiskGroupAggregateActivityFactory(node, subgraphId, *this, helperFactory);
  1669. break;
  1670. case TAKindexread:
  1671. newAct = createRoxieIndexReadActivityFactory(node, subgraphId, *this, helperFactory);
  1672. break;
  1673. case TAKindexnormalize:
  1674. newAct = createRoxieIndexNormalizeActivityFactory(node, subgraphId, *this, helperFactory);
  1675. break;
  1676. case TAKindexcount:
  1677. newAct = createRoxieIndexCountActivityFactory(node, subgraphId, *this, helperFactory);
  1678. break;
  1679. case TAKindexaggregate:
  1680. newAct = createRoxieIndexAggregateActivityFactory(node, subgraphId, *this, helperFactory);
  1681. break;
  1682. case TAKindexgroupaggregate:
  1683. case TAKindexgroupexists:
  1684. case TAKindexgroupcount:
  1685. newAct = createRoxieIndexGroupAggregateActivityFactory(node, subgraphId, *this, helperFactory, kind);
  1686. break;
  1687. case TAKfetch:
  1688. newAct = createRoxieFetchActivityFactory(node, subgraphId, *this, helperFactory);
  1689. break;
  1690. case TAKcsvfetch:
  1691. newAct = createRoxieCSVFetchActivityFactory(node, subgraphId, *this, helperFactory);
  1692. break;
  1693. case TAKxmlfetch:
  1694. case TAKjsonfetch:
  1695. newAct = createRoxieXMLFetchActivityFactory(node, subgraphId, *this, helperFactory);
  1696. break;
  1697. case TAKkeyedjoin:
  1698. case TAKkeyeddenormalize:
  1699. case TAKkeyeddenormalizegroup:
  1700. newAct = createRoxieKeyedJoinIndexActivityFactory(node, subgraphId, *this, helperFactory);
  1701. if (node.getPropBool("att[@name=\"_diskAccessRequired\"]/@value"))
  1702. {
  1703. ISlaveActivityFactory *newAct2 = createRoxieKeyedJoinFetchActivityFactory(node, subgraphId, *this, helperFactory);
  1704. unsigned activityId2 = newAct2->queryId() | ROXIE_ACTIVITY_FETCH;
  1705. activities->append(*newAct2);
  1706. allActivities.setValue(activityId2, newAct2);
  1707. }
  1708. break;
  1709. case TAKremotegraph:
  1710. {
  1711. unsigned graphId = node.getPropInt("att[@name=\"_graphid\"]/@value", 0);
  1712. newAct = createRoxieRemoteActivityFactory(node, subgraphId, *this, helperFactory, graphId);
  1713. break;
  1714. }
  1715. default:
  1716. throwUnexpected();
  1717. }
  1718. }
  1719. if (newAct)
  1720. {
  1721. addActivity(newAct, activities);
  1722. }
  1723. }
  1724. else if (kind == TAKsubgraph)
  1725. {
  1726. // If the subgraph belongs to a remote activity, we need to be able to execute it on the slave...
  1727. IPropertyTree * childGraphNode = node.queryPropTree("att/graph");
  1728. if (!childGraphNode->getPropBool("@child"))
  1729. {
  1730. unsigned parentId = findParentId(node);
  1731. assertex(parentId);
  1732. unsigned parentIndex = activities->findActivityIndex(parentId);
  1733. if (parentIndex != NotFound)
  1734. {
  1735. ActivityArray *childQuery = loadChildGraph(*childGraphNode);
  1736. activities->item(parentIndex).addChildQuery(node.getPropInt("@id"), childQuery);
  1737. }
  1738. }
  1739. // Regardless, we need to make sure we create remote activities as required throughout the graph
  1740. Owned<IPropertyTreeIterator> nodes = node.getElements("att/graph/node");
  1741. unsigned subgraphId = node.getPropInt("@id");
  1742. ForEach(*nodes)
  1743. {
  1744. IPropertyTree &node = nodes->query();
  1745. loadSlaveNode(node, subgraphId, activities);
  1746. }
  1747. }
  1748. }
  1749. void loadOuterSubgraph(IPropertyTree &graph, ActivityArray *activities)
  1750. {
  1751. Owned<IPropertyTreeIterator> nodes = graph.getElements("att/graph/node");
  1752. unsigned subgraphId = graph.getPropInt("@id");
  1753. ForEach(*nodes)
  1754. {
  1755. IPropertyTree &node = nodes->query();
  1756. loadSlaveNode(node, subgraphId, activities);
  1757. }
  1758. loadSlaveNode(graph, subgraphId, activities); // MORE - not really sure why this line is here!
  1759. }
  1760. public:
  1761. CSlaveQueryFactory(const char *_id, const IQueryDll *_dll, const IRoxiePackage &_package, hash64_t _hashValue, unsigned _channelNo, ISharedOnceContext *_sharedOnceContext, bool _dynamic)
  1762. : CQueryFactory(_id, _dll, _package, _hashValue, _channelNo, _sharedOnceContext, _dynamic)
  1763. {
  1764. }
  1765. virtual IRoxieSlaveContext *createSlaveContext(const SlaveContextLogger &logctx, IRoxieQueryPacket *packet, bool hasChildren) const
  1766. {
  1767. return ::createSlaveContext(this, logctx, packet, hasChildren);
  1768. }
  1769. virtual ActivityArray *loadGraph(IPropertyTree &graph, const char *graphName)
  1770. {
  1771. // MORE: common up with loadGraph for the Roxie server..
  1772. bool isLibraryGraph = graph.getPropBool("@library");
  1773. bool isSequential = graph.getPropBool("@sequential");
  1774. ActivityArray *activities = new ActivityArray(isLibraryGraph, false, isLibraryGraph, isSequential);
  1775. if (isLibraryGraph)
  1776. activities->setLibraryGraphId(graph.getPropInt("node/@id"));
  1777. try
  1778. {
  1779. if (false && isLibraryGraph)
  1780. {
  1781. //Really only need to do this if the library is called from a remote activity
  1782. //but it's a bit tricky to work out since the library graph will come before the use.
  1783. //Not a major issue since libraries won't be embedded for production queries.
  1784. // this comment makes little sense...
  1785. Owned<IPropertyTreeIterator> subgraphs = graph.getElements("node");
  1786. ForEach(*subgraphs)
  1787. {
  1788. IPropertyTree &node = subgraphs->query();
  1789. loadSubgraph(node, activities);
  1790. loadNode(node, 0, activities);
  1791. }
  1792. }
  1793. Owned<IPropertyTreeIterator> subgraphs = graph.getElements("node");
  1794. ForEach(*subgraphs)
  1795. {
  1796. IPropertyTree &subgraph = subgraphs->query();
  1797. loadOuterSubgraph(subgraph, activities);
  1798. }
  1799. addDependencies(graph, activities);
  1800. }
  1801. catch (...)
  1802. {
  1803. ::Release(activities);
  1804. allActivities.kill();
  1805. throw;
  1806. }
  1807. return activities;
  1808. }
  1809. };
  1810. IQueryFactory *createSlaveQueryFactory(const char *id, const IQueryDll *dll, const IRoxiePackage &package, unsigned channel, const IPropertyTree *stateInfo, bool isDynamic, bool forceRetry)
  1811. {
  1812. CriticalBlock b(CQueryFactory::queryCreateLock);
  1813. IArrayOf<IResolvedFile> queryFiles; // Note - these should stay in scope long enough to ensure still cached when (if) query is loaded for real
  1814. hash64_t hashValue = CQueryFactory::getQueryHash(id, dll, package, stateInfo, queryFiles, isDynamic);
  1815. IQueryFactory *cached = getQueryFactory(hashValue, channel);
  1816. if (cached)
  1817. {
  1818. ::Release(dll);
  1819. return cached;
  1820. }
  1821. if (dll)
  1822. {
  1823. checkWorkunitVersionConsistency(dll);
  1824. Owned<IQueryFactory> serverFactory = createServerQueryFactory(id, LINK(dll), package, stateInfo, false, forceRetry); // Should always find a cached one
  1825. Owned<CSlaveQueryFactory> newFactory = new CSlaveQueryFactory(id, dll, dynamic_cast<const IRoxiePackage&>(package), hashValue, channel, serverFactory->querySharedOnceContext(), isDynamic);
  1826. newFactory->load(stateInfo);
  1827. return newFactory.getClear();
  1828. }
  1829. else
  1830. return new CSlaveQueryFactory(id, NULL, dynamic_cast<const IRoxiePackage&>(package), hashValue, channel, NULL, isDynamic);
  1831. }
  1832. extern IQueryFactory *createSlaveQueryFactoryFromWu(IConstWorkUnit *wu, unsigned channelNo)
  1833. {
  1834. Owned<const IQueryDll> dll = createWuQueryDll(wu);
  1835. if (!dll)
  1836. return NULL;
  1837. return createSlaveQueryFactory(wu->queryWuid(), dll.getClear(), queryRootRoxiePackage(), channelNo, NULL, true, false); // MORE - if use a constant for id might cache better?
  1838. }