ccdquery.cpp 86 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323132413251326132713281329133013311332133313341335133613371338133913401341134213431344134513461347134813491350135113521353135413551356135713581359136013611362136313641365136613671368136913701371137213731374137513761377137813791380138113821383138413851386138713881389139013911392139313941395139613971398139914001401140214031404140514061407140814091410141114121413141414151416141714181419142014211422142314241425142614271428142914301431143214331434143514361437143814391440144114421443144414451446144714481449145014511452145314541455145614571458145914601461146214631464146514661467146814691470147114721473147414751476147714781479148014811482148314841485148614871488148914901491149214931494149514961497149814991500150115021503150415051506150715081509151015111512151315141515151615171518151915201521152215231524152515261527152815291530153115321533153415351536153715381539154015411542154315441545154615471548154915501551155215531554155515561557155815591560156115621563156415651566156715681569157015711572157315741575157615771578157915801581158215831584158515861587158815891590159115921593159415951596159715981599160016011602160316041605160616071608160916101611161216131614161516161617161816191620162116221623162416251626162716281629163016311632163316341635163616371638163916401641164216431644164516461647164816491650165116521653165416551656165716581659166016611662166316641665166616671668166916701671167216731674167516761677167816791680168116821683168416851686168716881689169016911692169316941695169616971698169917001701170217031704170517061707170817091710171117121713171417151716171717181719172017211722172317241725172617271728172917301731173217331734173517361737173817391740174117421743174417451746174717481749175017511752175317541755175617571758175917601761176217631764176517661767176817691770177117721773177417751776177717781779178017811782178317841785178617871788178917901791179217931794179517961797179817991800180118021803180418051806180718081809181018111812181318141815181618171818181918201821182218231824182518261827182818291830183118321833183418351836183718381839184018411842184318441845184618471848184918501851185218531854185518561857185818591860186118621863186418651866186718681869187018711872187318741875187618771878187918801881188218831884188518861887188818891890189118921893189418951896189718981899190019011902190319041905190619071908190919101911191219131914191519161917191819191920192119221923192419251926192719281929193019311932193319341935193619371938193919401941194219431944194519461947194819491950195119521953195419551956195719581959196019611962196319641965196619671968196919701971197219731974197519761977197819791980198119821983198419851986198719881989199019911992
  1. /*##############################################################################
  2. HPCC SYSTEMS software Copyright (C) 2012 HPCC Systems®.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. ############################################################################## */
  13. #include <platform.h>
  14. #include <jlib.hpp>
  15. #include "ccd.hpp"
  16. #include "ccdquery.hpp"
  17. #include "ccdstate.hpp"
  18. #include "ccdsnmp.hpp"
  19. #include "ccdserver.hpp"
  20. #include "ccdcontext.hpp"
  21. #include "thorplugin.hpp"
  22. #include "layouttrans.hpp"
  23. void ActivityArray::append(IActivityFactory &cur)
  24. {
  25. hash.setValue(cur.queryId(), activities.ordinality());
  26. activities.append(cur);
  27. }
  28. unsigned ActivityArray::findActivityIndex(unsigned id)
  29. {
  30. unsigned *ret = hash.getValue(id);
  31. if (ret)
  32. return *ret;
  33. return NotFound;
  34. }
  35. unsigned ActivityArray::recursiveFindActivityIndex(unsigned id)
  36. {
  37. // NOTE - this returns the activity index of the PARENT of the specified activity
  38. unsigned *ret = hash.getValue(id);
  39. if (ret)
  40. return *ret;
  41. ForEachItem(idx)
  42. {
  43. IActivityFactory & cur = item(idx);
  44. unsigned childId;
  45. for (unsigned childIdx = 0;;childIdx++)
  46. {
  47. ActivityArray * children = cur.queryChildQuery(childIdx, childId);
  48. if (!children)
  49. break;
  50. if (children->recursiveFindActivityIndex(id) != NotFound)
  51. {
  52. hash.setValue(id, idx);
  53. return idx;
  54. }
  55. }
  56. }
  57. return NotFound;
  58. }
  59. //----------------------------------------------------------------------------------------------
  60. // Class CQueryDll maps dlls into loadable workunits, complete with caching to ensure that a refresh of the QuerySet
  61. // can avoid reloading dlls, and that the same CQueryDll (and the objects it owns) can be shared between server and
  62. // multiple slave channels
  63. //----------------------------------------------------------------------------------------------
  64. class CQueryDll : public CInterface, implements IQueryDll
  65. {
  66. StringAttr dllName;
  67. Owned <ILoadedDllEntry> dll;
  68. Owned <IConstWorkUnit> wu;
  69. static CriticalSection dllCacheLock;
  70. static CopyMapStringToMyClass<CQueryDll> dllCache;
  71. public:
  72. IMPLEMENT_IINTERFACE;
  73. CQueryDll(const char *_dllName, ILoadedDllEntry *_dll) : dllName(_dllName), dll(_dll)
  74. {
  75. StringBuffer wuXML;
  76. if (getEmbeddedWorkUnitXML(dll, wuXML))
  77. {
  78. Owned<ILocalWorkUnit> localWU = createLocalWorkUnit(wuXML);
  79. wu.setown(localWU->unlock());
  80. }
  81. CriticalBlock b(dllCacheLock);
  82. dllCache.setValue(dllName, this);
  83. }
  84. virtual void beforeDispose()
  85. {
  86. CriticalBlock b(dllCacheLock);
  87. // NOTE: it's theoretically possible for the final release to happen after a replacement has been inserted into hash table.
  88. // So only remove from hash table if what we find there matches the item that is being deleted.
  89. CQueryDll *goer = dllCache.getValue(dllName);
  90. if (goer == this)
  91. dllCache.remove(dllName);
  92. }
  93. static const CQueryDll *getQueryDll(const char *dllName, bool isExe)
  94. {
  95. CriticalBlock b(dllCacheLock);
  96. CQueryDll *dll = LINK(dllCache.getValue(dllName));
  97. if (dll && dll->isAlive())
  98. return dll;
  99. else
  100. {
  101. Owned<ILoadedDllEntry> dll = isExe ? createExeDllEntry(dllName) : queryRoxieDllServer().loadDll(dllName, DllLocationDirectory);
  102. assertex(dll != NULL);
  103. return new CQueryDll(dllName, dll.getClear());
  104. }
  105. }
  106. static const IQueryDll *getWorkUnitDll(IConstWorkUnit *wu)
  107. {
  108. SCMStringBuffer dllName;
  109. Owned<IConstWUQuery> q = wu->getQuery();
  110. q->getQueryDllName(dllName);
  111. if (dllName.length() == 0)
  112. {
  113. if (wu->getCodeVersion() == 0)
  114. throw makeStringExceptionV(ROXIE_MISSING_DLL, "Attempting to load workunit %s that hasn't been compiled", wu->queryWuid());
  115. else
  116. throw makeStringExceptionV(ROXIE_MISSING_DLL, "Attempting to load workunit %s with no associated dll", wu->queryWuid());
  117. }
  118. return getQueryDll(dllName.str(), false);
  119. }
  120. virtual HelperFactory *getFactory(const char *helperName) const
  121. {
  122. return (HelperFactory *) dll->getEntry(helperName);
  123. }
  124. virtual ILoadedDllEntry *queryDll() const
  125. {
  126. return dll;
  127. }
  128. virtual IConstWorkUnit *queryWorkUnit() const
  129. {
  130. return wu;
  131. }
  132. };
  133. CriticalSection CQueryDll::dllCacheLock;
  134. CopyMapStringToMyClass<CQueryDll> CQueryDll::dllCache;
  135. extern const IQueryDll *createQueryDll(const char *dllName)
  136. {
  137. return CQueryDll::getQueryDll(dllName, false);
  138. }
  139. extern const IQueryDll *createExeQueryDll(const char *exeName)
  140. {
  141. return CQueryDll::getQueryDll(exeName, true);
  142. }
  143. extern const IQueryDll *createWuQueryDll(IConstWorkUnit *wu)
  144. {
  145. return CQueryDll::getWorkUnitDll(wu);
  146. }
  147. // Add information to the xref information to be returned for a control:getQueryXrefInfo request
  148. IPropertyTree * addXrefInfo(IPropertyTree &reply, const char *section, const char *name)
  149. {
  150. VStringBuffer xpath("%s[@name='%s']", section, name);
  151. if (!reply.hasProp(xpath))
  152. {
  153. IPropertyTree *info = createPTree(section, 0);
  154. info->setProp("@name", name);
  155. return reply.addPropTree(section, info);
  156. }
  157. return NULL;
  158. }
  159. extern void addXrefFileInfo(IPropertyTree &reply, const IResolvedFile *dataFile)
  160. {
  161. if (dataFile->isSuperFile())
  162. {
  163. IPropertyTree *info = addXrefInfo(reply, "SuperFile", dataFile->queryFileName());
  164. if (info)
  165. {
  166. int numSubs = dataFile->numSubFiles();
  167. for (int i = 0; i < numSubs; i++)
  168. {
  169. StringBuffer subName;
  170. dataFile->getSubFileName(i, subName);
  171. addXrefInfo(*info, "File", subName.str());
  172. }
  173. }
  174. }
  175. else
  176. addXrefInfo(reply, "File", dataFile->queryFileName());
  177. }
  178. extern void addXrefLibraryInfo(IPropertyTree &reply, const char *libraryName)
  179. {
  180. addXrefInfo(reply, "Library", libraryName);
  181. }
  182. //----------------------------------------------------------------------------------------------
  183. // Class CSharedOnceContext manages the context for a query's ONCE code, which is shared between
  184. // all slave and server contexts on a node
  185. //----------------------------------------------------------------------------------------------
  186. class CSharedOnceContext : public CInterfaceOf<ISharedOnceContext>
  187. {
  188. public:
  189. CSharedOnceContext()
  190. {
  191. }
  192. ~CSharedOnceContext()
  193. {
  194. }
  195. virtual IDeserializedResultStore &queryOnceResultStore() const
  196. {
  197. assertex(onceResultStore!= NULL);
  198. return *onceResultStore;
  199. }
  200. virtual IPropertyTree &queryOnceContext(const IQueryFactory *factory, const IRoxieContextLogger &logctx) const
  201. {
  202. checkOnceDone(factory, logctx);
  203. assertex(onceContext != NULL);
  204. return *onceContext;
  205. }
  206. virtual void checkOnceDone(const IQueryFactory *factory, const IRoxieContextLogger &logctx) const
  207. {
  208. CriticalBlock b(onceCrit);
  209. if (!onceContext)
  210. {
  211. onceContext.setown(createPTree());
  212. onceResultStore.setown(createDeserializedResultStore());
  213. Owned <IRoxieServerContext> ctx = createOnceServerContext(factory, logctx);
  214. onceManager.set(&ctx->queryRowManager());
  215. try
  216. {
  217. ctx->process();
  218. ctx->done(false);
  219. }
  220. catch (IException *E)
  221. {
  222. ctx->done(true);
  223. onceException.setown(E);
  224. }
  225. catch (...)
  226. {
  227. ctx->done(true);
  228. onceException.setown(MakeStringException(ROXIE_INTERNAL_ERROR, "Unknown exception in ONCE code"));
  229. }
  230. }
  231. if (onceException)
  232. throw onceException.getLink();
  233. }
  234. protected:
  235. mutable CriticalSection onceCrit;
  236. mutable Owned<roxiemem::IRowManager> onceManager; // release AFTER resultStore
  237. mutable Owned<IPropertyTree> onceContext;
  238. mutable Owned<IDeserializedResultStore> onceResultStore;
  239. mutable Owned<IException> onceException;
  240. };
  241. //----------------------------------------------------------------------------------------------
  242. // Class CQueryOptions is used to store options affecting the execution of a query
  243. // These can be set globally, byt he query workunit, or by the query XML parameters
  244. //----------------------------------------------------------------------------------------------
  245. QueryOptions::QueryOptions()
  246. {
  247. priority = 0;
  248. timeLimit = defaultTimeLimit[0];
  249. warnTimeLimit = defaultWarnTimeLimit[0];
  250. memoryLimit = defaultMemoryLimit;
  251. parallelJoinPreload = defaultParallelJoinPreload;
  252. fullKeyedJoinPreload = defaultFullKeyedJoinPreload;
  253. keyedJoinPreload = defaultKeyedJoinPreload;
  254. concatPreload = defaultConcatPreload;
  255. fetchPreload = defaultFetchPreload;
  256. prefetchProjectPreload = defaultPrefetchProjectPreload;
  257. bindCores = coresPerQuery;
  258. strandBlockSize = defaultStrandBlockSize;
  259. forceNumStrands = defaultForceNumStrands;
  260. checkingHeap = defaultCheckingHeap;
  261. disableLocalOptimizations = false; // No global default for this
  262. enableFieldTranslation = fieldTranslationEnabled;
  263. skipFileFormatCrcCheck = false;
  264. stripWhitespaceFromStoredDataset = ((ptr_ignoreWhiteSpace & defaultXmlReadFlags) != 0);
  265. timeActivities = defaultTimeActivities;
  266. traceEnabled = defaultTraceEnabled;
  267. traceLimit = defaultTraceLimit;
  268. allSortsMaySpill = false; // No global default for this
  269. }
  270. QueryOptions::QueryOptions(const QueryOptions &other)
  271. {
  272. priority = other.priority;
  273. timeLimit = other.timeLimit;
  274. warnTimeLimit = other.warnTimeLimit;
  275. memoryLimit = other.memoryLimit;
  276. parallelJoinPreload = other.parallelJoinPreload;;
  277. fullKeyedJoinPreload = other.fullKeyedJoinPreload;
  278. keyedJoinPreload = other.keyedJoinPreload;
  279. concatPreload = other.concatPreload;
  280. fetchPreload = other.fetchPreload;
  281. prefetchProjectPreload = other.prefetchProjectPreload;
  282. bindCores = other.bindCores;
  283. strandBlockSize = other.strandBlockSize;
  284. forceNumStrands = other.forceNumStrands;
  285. checkingHeap = other.checkingHeap;
  286. disableLocalOptimizations = other.disableLocalOptimizations;
  287. enableFieldTranslation = other.enableFieldTranslation;
  288. skipFileFormatCrcCheck = other.skipFileFormatCrcCheck;
  289. stripWhitespaceFromStoredDataset = other.stripWhitespaceFromStoredDataset;
  290. timeActivities = other.timeActivities;
  291. traceEnabled = other.traceEnabled;
  292. traceLimit = other.traceLimit;
  293. allSortsMaySpill = other.allSortsMaySpill;
  294. }
  295. void QueryOptions::setFromWorkUnit(IConstWorkUnit &wu, const IPropertyTree *stateInfo)
  296. {
  297. // calculate priority before others since it affects the defaults of others
  298. updateFromWorkUnit(priority, wu, "priority");
  299. if (stateInfo)
  300. updateFromContext(priority, stateInfo, "@priority");
  301. timeLimit = defaultTimeLimit[priority];
  302. warnTimeLimit = defaultWarnTimeLimit[priority];
  303. updateFromWorkUnit(timeLimit, wu, "timeLimit");
  304. updateFromWorkUnit(warnTimeLimit, wu, "warnTimeLimit");
  305. updateFromWorkUnitM(memoryLimit, wu, "memoryLimit");
  306. if (stateInfo)
  307. {
  308. updateFromContext(timeLimit, stateInfo, "@timeLimit");
  309. updateFromContext(warnTimeLimit, stateInfo, "@warnTimeLimit");
  310. updateFromContextM(memoryLimit, stateInfo, "@memoryLimit");
  311. }
  312. updateFromWorkUnit(parallelJoinPreload, wu, "parallelJoinPreload");
  313. updateFromWorkUnit(fullKeyedJoinPreload, wu, "fullKeyedJoinPreload");
  314. updateFromWorkUnit(keyedJoinPreload, wu, "keyedJoinPreload");
  315. updateFromWorkUnit(concatPreload, wu, "concatPreload");
  316. updateFromWorkUnit(fetchPreload, wu, "fetchPreload");
  317. updateFromWorkUnit(prefetchProjectPreload, wu, "prefetchProjectPreload");
  318. updateFromWorkUnit(bindCores, wu, "bindCores");
  319. updateFromWorkUnit(strandBlockSize, wu, "strandBlockSize");
  320. updateFromWorkUnit(forceNumStrands, wu, "forceNumStrands");
  321. updateFromWorkUnit(checkingHeap, wu, "checkingHeap");
  322. updateFromWorkUnit(disableLocalOptimizations, wu, "disableLocalOptimizations");
  323. updateFromWorkUnit(enableFieldTranslation, wu, "layoutTranslationEnabled"); // Name is different for compatibility reasons
  324. updateFromWorkUnit(skipFileFormatCrcCheck, wu, "skipFileFormatCrcCheck");
  325. updateFromWorkUnit(stripWhitespaceFromStoredDataset, wu, "stripWhitespaceFromStoredDataset");
  326. updateFromWorkUnit(timeActivities, wu, "timeActivities");
  327. updateFromWorkUnit(traceEnabled, wu, "traceEnabled");
  328. updateFromWorkUnit(traceLimit, wu, "traceLimit");
  329. updateFromWorkUnit(allSortsMaySpill, wu, "allSortsMaySpill");
  330. }
  331. void QueryOptions::updateFromWorkUnitM(memsize_t &value, IConstWorkUnit &wu, const char *name)
  332. {
  333. value = (memsize_t) wu.getDebugValueInt64(name, value);
  334. }
  335. void QueryOptions::updateFromWorkUnit(int &value, IConstWorkUnit &wu, const char *name)
  336. {
  337. value = wu.getDebugValueInt(name, value);
  338. }
  339. void QueryOptions::updateFromWorkUnit(unsigned &value, IConstWorkUnit &wu, const char *name)
  340. {
  341. value = (unsigned) wu.getDebugValueInt(name, value);
  342. }
  343. void QueryOptions::updateFromWorkUnit(bool &value, IConstWorkUnit &wu, const char *name)
  344. {
  345. value = wu.getDebugValueBool(name, value);
  346. }
  347. void QueryOptions::setFromContext(const IPropertyTree *ctx)
  348. {
  349. if (ctx)
  350. {
  351. updateFromContext(priority, ctx, "@priority", "_Priority");
  352. updateFromContext(timeLimit, ctx, "@timeLimit", "_TimeLimit");
  353. updateFromContext(warnTimeLimit, ctx, "@warnTimeLimit", "_WarnTimeLimit");
  354. updateFromContextM(memoryLimit, ctx, "@memoryLimit", "_MemoryLimit");
  355. updateFromContext(parallelJoinPreload, ctx, "@parallelJoinPreload", "_ParallelJoinPreload");
  356. updateFromContext(fullKeyedJoinPreload, ctx, "@fullKeyedJoinPreload", "_FullKeyedJoinPreload");
  357. updateFromContext(keyedJoinPreload, ctx, "@keyedJoinPreload", "_KeyedJoinPreload");
  358. updateFromContext(concatPreload, ctx, "@concatPreload", "_ConcatPreload");
  359. updateFromContext(fetchPreload, ctx, "@fetchPreload", "_FetchPreload");
  360. updateFromContext(prefetchProjectPreload, ctx, "@prefetchProjectPreload", "_PrefetchProjectPreload");
  361. updateFromContext(bindCores, ctx, "@bindCores", "_bindCores");
  362. updateFromContext(strandBlockSize, ctx, "@strandBlockSize", "_strandBlockSize");
  363. updateFromContext(forceNumStrands, ctx, "@forceNumStrands", "_forceNumStrands");
  364. updateFromContext(checkingHeap, ctx, "@checkingHeap", "_CheckingHeap");
  365. // Note: disableLocalOptimizations is not permitted at context level (too late)
  366. // Note: enableFieldTranslation is not permitted at context level (generally too late anyway)
  367. updateFromContext(skipFileFormatCrcCheck, ctx, "_SkipFileFormatCrcCheck", "@skipFileFormatCrcCheck");
  368. updateFromContext(stripWhitespaceFromStoredDataset, ctx, "_StripWhitespaceFromStoredDataset", "@stripWhitespaceFromStoredDataset");
  369. updateFromContext(timeActivities, ctx, "@timeActivities", "_TimeActivities");
  370. updateFromContext(traceEnabled, ctx, "@traceEnabled", "_TraceEnabled");
  371. updateFromContext(traceLimit, ctx, "@traceLimit", "_TraceLimit");
  372. // Note: allSortsMaySpill is not permitted at context level (too late anyway, unless I refactored)
  373. }
  374. }
  375. const char * QueryOptions::findProp(const IPropertyTree *ctx, const char *name1, const char *name2)
  376. {
  377. if (name1 && ctx->hasProp(name1))
  378. return name1;
  379. else if (name2 && ctx->hasProp(name2))
  380. return name2;
  381. else
  382. return NULL;
  383. }
  384. void QueryOptions::updateFromContextM(memsize_t &value, const IPropertyTree *ctx, const char *name1, const char *name2)
  385. {
  386. const char *name = findProp(ctx, name1, name2);
  387. if (name)
  388. value = (memsize_t) ctx->getPropInt64(name);
  389. }
  390. void QueryOptions::updateFromContext(int &value, const IPropertyTree *ctx, const char *name1, const char *name2)
  391. {
  392. const char *name = findProp(ctx, name1, name2);
  393. if (name)
  394. value = ctx->getPropInt(name);
  395. }
  396. void QueryOptions::updateFromContext(unsigned &value, const IPropertyTree *ctx, const char *name1, const char *name2)
  397. {
  398. const char *name = findProp(ctx, name1, name2);
  399. if (name)
  400. value = (unsigned) ctx->getPropInt(name);
  401. }
  402. void QueryOptions::updateFromContext(bool &value, const IPropertyTree *ctx, const char *name1, const char *name2)
  403. {
  404. const char *name = findProp(ctx, name1, name2);
  405. if (name)
  406. value = ctx->getPropBool(name);
  407. }
  408. void QueryOptions::setFromSlaveLoggingFlags(unsigned loggingFlags)
  409. {
  410. // MORE - priority/timelimit ?
  411. checkingHeap = (loggingFlags & LOGGING_CHECKINGHEAP) != 0;
  412. timeActivities = (loggingFlags & LOGGING_TIMEACTIVITIES) != 0;
  413. }
  414. //----------------------------------------------------------------------------------------------
  415. // Class CQueryFactory is the main implementation of IQueryFactory, combining a IQueryDll and a
  416. // package context into an object that can quickly create a the query context that executes a specific
  417. // instance of a Roxie query.
  418. // Caching is used to ensure that only queries that are affected by a package change need to be reloaded.
  419. // Derived classes handle the differences between slave and server side factories
  420. //----------------------------------------------------------------------------------------------
  421. class CQueryFactory : public CInterface, implements IQueryFactory, implements IResourceContext
  422. {
  423. protected:
  424. const IRoxiePackage &package;
  425. Owned<const IQueryDll> dll;
  426. Linked<ISharedOnceContext> sharedOnceContext;
  427. MapStringToActivityArray graphMap;
  428. StringAttr id;
  429. StringBuffer errorMessage;
  430. MapIdToActivityFactory allActivities;
  431. QueryOptions options;
  432. bool dynamic;
  433. bool isSuspended;
  434. bool isLoadFailed;
  435. ClusterType targetClusterType;
  436. unsigned libraryInterfaceHash;
  437. hash64_t hashValue;
  438. static SpinLock queriesCrit;
  439. static CopyMapXToMyClass<hash64_t, hash64_t, CQueryFactory> queryMap;
  440. public:
  441. static CriticalSection queryCreateLock;
  442. protected:
  443. IRoxieServerActivityFactory *createActivityFactory(ThorActivityKind kind, unsigned subgraphId, IPropertyTree &node)
  444. {
  445. unsigned id = node.getPropInt("@id", 0);
  446. if (isSuspended)
  447. return createRoxieServerDummyActivityFactory(id, subgraphId, *this, NULL, TAKnone, node, false); // Is there actually any point?
  448. StringBuffer helperName;
  449. node.getProp("att[@name=\"helper\"]/@value", helperName);
  450. if (!helperName.length())
  451. helperName.append("fAc").append(id);
  452. HelperFactory *helperFactory = dll->getFactory(helperName);
  453. if (!helperFactory)
  454. throw MakeStringException(ROXIE_INTERNAL_ERROR, "Internal error: helper function %s not exported", helperName.str());
  455. switch (kind)
  456. {
  457. case TAKalljoin:
  458. case TAKalldenormalize:
  459. case TAKalldenormalizegroup:
  460. return createRoxieServerAllJoinActivityFactory(id, subgraphId, *this, helperFactory, kind, node);
  461. case TAKapply:
  462. return createRoxieServerApplyActivityFactory(id, subgraphId, *this, helperFactory, kind, node, isRootAction(node));
  463. case TAKaggregate:
  464. case TAKexistsaggregate: // could special case.
  465. case TAKcountaggregate:
  466. return createRoxieServerAggregateActivityFactory(id, subgraphId, *this, helperFactory, kind, node);
  467. case TAKcase:
  468. case TAKchildcase:
  469. return createRoxieServerCaseActivityFactory(id, subgraphId, *this, helperFactory, kind, node, isGraphIndependent(node));
  470. case TAKcatch:
  471. case TAKskipcatch:
  472. case TAKcreaterowcatch:
  473. return createRoxieServerCatchActivityFactory(id, subgraphId, *this, helperFactory, kind, node);
  474. case TAKchilditerator:
  475. return createRoxieServerChildIteratorActivityFactory(id, subgraphId, *this, helperFactory, kind, node);
  476. case TAKchoosesets:
  477. return createRoxieServerChooseSetsActivityFactory(id, subgraphId, *this, helperFactory, kind, node);
  478. case TAKchoosesetsenth:
  479. return createRoxieServerChooseSetsEnthActivityFactory(id, subgraphId, *this, helperFactory, kind, node);
  480. case TAKchoosesetslast:
  481. return createRoxieServerChooseSetsLastActivityFactory(id, subgraphId, *this, helperFactory, kind, node);
  482. case TAKproject:
  483. case TAKcountproject:
  484. return createRoxieServerProjectActivityFactory(id, subgraphId, *this, helperFactory, kind, node); // code is common between Project, CountProject
  485. case TAKfilterproject:
  486. return createRoxieServerFilterProjectActivityFactory(id, subgraphId, *this, helperFactory, kind, node);
  487. case TAKdatasetresult:
  488. case TAKrowresult:
  489. return createRoxieServerDatasetResultActivityFactory(id, subgraphId, *this, helperFactory, kind, node, isRootAction(node));
  490. case TAKdedup:
  491. return createRoxieServerDedupActivityFactory(id, subgraphId, *this, helperFactory, kind, node);
  492. case TAKdegroup:
  493. return createRoxieServerDegroupActivityFactory(id, subgraphId, *this, helperFactory, kind, node);
  494. case TAKcsvread:
  495. case TAKxmlread:
  496. case TAKjsonread:
  497. case TAKdiskread:
  498. {
  499. if (node.getPropBool("att[@name='_isSpill']/@value", false) || node.getPropBool("att[@name='_isSpillGlobal']/@value", false))
  500. return createRoxieServerSpillReadActivityFactory(id, subgraphId, *this, helperFactory, kind, node);
  501. else
  502. {
  503. RemoteActivityId remoteId(id, hashValue);
  504. return createRoxieServerDiskReadActivityFactory(id, subgraphId, *this, helperFactory, kind, node, remoteId);
  505. }
  506. }
  507. case TAKmemoryspillread:
  508. return createRoxieServerSpillReadActivityFactory(id, subgraphId, *this, helperFactory, kind, node);
  509. case TAKdisknormalize:
  510. case TAKdiskcount:
  511. case TAKdiskaggregate:
  512. case TAKdiskgroupaggregate:
  513. {
  514. RemoteActivityId remoteId(id, hashValue);
  515. return createRoxieServerDiskReadActivityFactory(id, subgraphId, *this, helperFactory, kind, node, remoteId);
  516. }
  517. case TAKchildnormalize:
  518. return createRoxieServerNewChildNormalizeActivityFactory(id, subgraphId, *this, helperFactory, kind, node);
  519. case TAKchildaggregate:
  520. return createRoxieServerNewChildAggregateActivityFactory(id, subgraphId, *this, helperFactory, kind, node);
  521. case TAKchildgroupaggregate:
  522. return createRoxieServerNewChildGroupAggregateActivityFactory(id, subgraphId, *this, helperFactory, kind, node);
  523. case TAKchildthroughnormalize:
  524. return createRoxieServerNewChildThroughNormalizeActivityFactory(id, subgraphId, *this, helperFactory, kind, node);
  525. case TAKcsvwrite:
  526. case TAKdiskwrite:
  527. case TAKxmlwrite:
  528. case TAKjsonwrite:
  529. case TAKmemoryspillwrite:
  530. return createRoxieServerDiskWriteActivityFactory(id, subgraphId, *this, helperFactory, kind, node, isRootAction(node));
  531. case TAKindexwrite:
  532. return createRoxieServerIndexWriteActivityFactory(id, subgraphId, *this, helperFactory, kind, node, isRootAction(node));
  533. case TAKenth:
  534. return createRoxieServerEnthActivityFactory(id, subgraphId, *this, helperFactory, kind, node);
  535. case TAKfetch:
  536. case TAKcsvfetch:
  537. case TAKxmlfetch:
  538. case TAKjsonfetch:
  539. {
  540. RemoteActivityId remoteId(id, hashValue);
  541. return createRoxieServerFetchActivityFactory(id, subgraphId, *this, helperFactory, kind, node, remoteId);
  542. }
  543. case TAKfilter:
  544. return createRoxieServerFilterActivityFactory(id, subgraphId, *this, helperFactory, kind, node);
  545. case TAKfiltergroup:
  546. return createRoxieServerFilterGroupActivityFactory(id, subgraphId, *this, helperFactory, kind, node);
  547. case TAKfirstn:
  548. return createRoxieServerFirstNActivityFactory(id, subgraphId, *this, helperFactory, kind, node);
  549. case TAKfunnel:
  550. return createRoxieServerConcatActivityFactory(id, subgraphId, *this, helperFactory, kind, node);
  551. case TAKgroup:
  552. return createRoxieServerGroupActivityFactory(id, subgraphId, *this, helperFactory, kind, node);
  553. case TAKhashaggregate:
  554. return createRoxieServerHashAggregateActivityFactory(id, subgraphId, *this, helperFactory, kind, node);
  555. case TAKif:
  556. case TAKchildif:
  557. return createRoxieServerIfActivityFactory(id, subgraphId, *this, helperFactory, kind, node, isGraphIndependent(node));
  558. case TAKifaction:
  559. return createRoxieServerIfActionActivityFactory(id, subgraphId, *this, helperFactory, kind, node, isRootAction(node));
  560. case TAKparallel:
  561. return createRoxieServerParallelActionActivityFactory(id, subgraphId, *this, helperFactory, kind, node, isRootAction(node));
  562. case TAKsequential:
  563. return createRoxieServerSequentialActionActivityFactory(id, subgraphId, *this, helperFactory, kind, node, isRootAction(node));
  564. case TAKindexread:
  565. {
  566. RemoteActivityId remoteId(id, hashValue);
  567. return createRoxieServerIndexReadActivityFactory(id, subgraphId, *this, helperFactory, kind, node, remoteId);
  568. }
  569. case TAKindexnormalize:
  570. {
  571. RemoteActivityId remoteId(id, hashValue);
  572. return createRoxieServerIndexNormalizeActivityFactory(id, subgraphId, *this, helperFactory, kind, node, remoteId);
  573. }
  574. case TAKindexcount:
  575. {
  576. RemoteActivityId remoteId(id, hashValue);
  577. return createRoxieServerIndexCountActivityFactory(id, subgraphId, *this, helperFactory, kind, node, remoteId);
  578. }
  579. case TAKindexaggregate:
  580. {
  581. RemoteActivityId remoteId(id, hashValue);
  582. return createRoxieServerIndexAggregateActivityFactory(id, subgraphId, *this, helperFactory, kind, node, remoteId);
  583. }
  584. case TAKindexgroupaggregate:
  585. case TAKindexgroupexists:
  586. case TAKindexgroupcount:
  587. {
  588. RemoteActivityId remoteId(id, hashValue);
  589. return createRoxieServerIndexGroupAggregateActivityFactory(id, subgraphId, *this, helperFactory, kind, node, remoteId);
  590. }
  591. case TAKhashdedup:
  592. return createRoxieServerHashDedupActivityFactory(id, subgraphId, *this, helperFactory, kind, node);
  593. case TAKhashdenormalize:
  594. case TAKhashdistribute:
  595. case TAKhashdistributemerge:
  596. case TAKhashjoin:
  597. throwUnexpected(); // Code generator should have removed or transformed
  598. case TAKiterate:
  599. return createRoxieServerIterateActivityFactory(id, subgraphId, *this, helperFactory, kind, node);
  600. case TAKprocess:
  601. return createRoxieServerProcessActivityFactory(id, subgraphId, *this, helperFactory, kind, node);
  602. case TAKjoin:
  603. case TAKjoinlight:
  604. case TAKdenormalize:
  605. case TAKdenormalizegroup:
  606. return createRoxieServerJoinActivityFactory(id, subgraphId, *this, helperFactory, kind, node);
  607. case TAKkeyeddistribute:
  608. throwUnexpected(); // Code generator should have removed or transformed
  609. case TAKkeyedjoin:
  610. case TAKkeyeddenormalize:
  611. case TAKkeyeddenormalizegroup:
  612. {
  613. RemoteActivityId remoteId(id, hashValue);
  614. RemoteActivityId remoteId2(id | ROXIE_ACTIVITY_FETCH, hashValue);
  615. return createRoxieServerKeyedJoinActivityFactory(id, subgraphId, *this, helperFactory, kind, node, remoteId, remoteId2);
  616. }
  617. case TAKlimit:
  618. return createRoxieServerLimitActivityFactory(id, subgraphId, *this, helperFactory, kind, node);
  619. case TAKlookupjoin:
  620. case TAKlookupdenormalize:
  621. case TAKlookupdenormalizegroup:
  622. case TAKsmartjoin:
  623. case TAKsmartdenormalize:
  624. case TAKsmartdenormalizegroup:
  625. return createRoxieServerLookupJoinActivityFactory(id, subgraphId, *this, helperFactory, kind, node);
  626. case TAKmerge:
  627. return createRoxieServerMergeActivityFactory(id, subgraphId, *this, helperFactory, kind, node);
  628. case TAKnormalize:
  629. return createRoxieServerNormalizeActivityFactory(id, subgraphId, *this, helperFactory, kind, node);
  630. case TAKnormalizechild:
  631. return createRoxieServerNormalizeChildActivityFactory(id, subgraphId, *this, helperFactory, kind, node);
  632. case TAKnormalizelinkedchild:
  633. return createRoxieServerNormalizeLinkedChildActivityFactory(id, subgraphId, *this, helperFactory, kind, node);
  634. case TAKnull:
  635. return createRoxieServerNullActivityFactory(id, subgraphId, *this, helperFactory, kind, node);
  636. case TAKsideeffect:
  637. return createRoxieServerSideEffectActivityFactory(id, subgraphId, *this, helperFactory, kind, node, isRootAction(node));
  638. case TAKsimpleaction:
  639. return createRoxieServerActionActivityFactory(id, subgraphId, *this, helperFactory, kind, node, usageCount(node), isRootAction(node));
  640. case TAKparse:
  641. return createRoxieServerParseActivityFactory(id, subgraphId, *this, helperFactory, kind, node, this);
  642. case TAKworkunitwrite:
  643. return createRoxieServerWorkUnitWriteActivityFactory(id, subgraphId, *this, helperFactory, kind, node, usageCount(node), isRootAction(node));
  644. case TAKdictionaryworkunitwrite:
  645. return createRoxieServerWorkUnitWriteDictActivityFactory(id, subgraphId, *this, helperFactory, kind, node, usageCount(node), isRootAction(node));
  646. case TAKpiperead:
  647. return createRoxieServerPipeReadActivityFactory(id, subgraphId, *this, helperFactory, kind, node);
  648. case TAKpipethrough:
  649. return createRoxieServerPipeThroughActivityFactory(id, subgraphId, *this, helperFactory, kind, node);
  650. case TAKpipewrite:
  651. return createRoxieServerPipeWriteActivityFactory(id, subgraphId, *this, helperFactory, kind, node, usageCount(node), isRootAction(node));
  652. case TAKpull:
  653. return createRoxieServerPullActivityFactory(id, subgraphId, *this, helperFactory, kind, node);
  654. case TAKtrace:
  655. return createRoxieServerTraceActivityFactory(id, subgraphId, *this, helperFactory, kind, node);
  656. case TAKlinkedrawiterator:
  657. return createRoxieServerLinkedRawIteratorActivityFactory(id, subgraphId, *this, helperFactory, kind, node);
  658. case TAKremoteresult:
  659. return createRoxieServerRemoteResultActivityFactory(id, subgraphId, *this, helperFactory, kind, node, usageCount(node), isRootAction(node));
  660. case TAKrollup:
  661. return createRoxieServerRollupActivityFactory(id, subgraphId, *this, helperFactory, kind, node);
  662. case TAKsample:
  663. return createRoxieServerSampleActivityFactory(id, subgraphId, *this, helperFactory, kind, node);
  664. case TAKselectn:
  665. return createRoxieServerSelectNActivityFactory(id, subgraphId, *this, helperFactory, kind, node);
  666. case TAKselfjoin:
  667. case TAKselfjoinlight:
  668. return createRoxieServerSelfJoinActivityFactory(id, subgraphId, *this, helperFactory, kind, node);
  669. case TAKskiplimit:
  670. case TAKcreaterowlimit:
  671. return createRoxieServerSkipLimitActivityFactory(id, subgraphId, *this, helperFactory, kind, node);
  672. case TAKhttp_rowdataset:
  673. case TAKsoap_rowdataset:
  674. return createRoxieServerSoapRowCallActivityFactory(id, subgraphId, *this, helperFactory, kind, node);
  675. case TAKsoap_rowaction:
  676. return createRoxieServerSoapRowActionActivityFactory(id, subgraphId, *this, helperFactory, kind, node, isRootAction(node));
  677. case TAKsoap_datasetdataset:
  678. return createRoxieServerSoapDatasetCallActivityFactory(id, subgraphId, *this, helperFactory, kind, node);
  679. case TAKsoap_datasetaction:
  680. return createRoxieServerSoapDatasetActionActivityFactory(id, subgraphId, *this, helperFactory, kind, node, isRootAction(node));
  681. case TAKsort:
  682. return createRoxieServerSortActivityFactory(id, subgraphId, *this, helperFactory, kind, node);
  683. case TAKspill:
  684. case TAKmemoryspillsplit:
  685. return createRoxieServerThroughSpillActivityFactory(id, subgraphId, *this, helperFactory, kind, node);
  686. case TAKsplit:
  687. return createRoxieServerSplitActivityFactory(id, subgraphId, *this, helperFactory, kind, node);
  688. case TAKstreamediterator:
  689. return createRoxieServerStreamedIteratorActivityFactory(id, subgraphId, *this, helperFactory, kind, node);
  690. case TAKinlinetable:
  691. return createRoxieServerInlineTableActivityFactory(id, subgraphId, *this, helperFactory, kind, node);
  692. case TAKthroughaggregate:
  693. throwUnexpected(); // Concept of through aggregates has been proven not to work in Roxie - codegen should not be creating them any more.
  694. case TAKtopn:
  695. return createRoxieServerTopNActivityFactory(id, subgraphId, *this, helperFactory, kind, node);
  696. case TAKworkunitread:
  697. return createRoxieServerWorkUnitReadActivityFactory(id, subgraphId, *this, helperFactory, kind, node);
  698. case TAKxmlparse:
  699. return createRoxieServerXmlParseActivityFactory(id, subgraphId, *this, helperFactory, kind, node);
  700. case TAKquantile:
  701. return createRoxieServerQuantileActivityFactory(id, subgraphId, *this, helperFactory, kind, node);
  702. case TAKregroup:
  703. return createRoxieServerRegroupActivityFactory(id, subgraphId, *this, helperFactory, kind, node);
  704. case TAKcombine:
  705. return createRoxieServerCombineActivityFactory(id, subgraphId, *this, helperFactory, kind, node);
  706. case TAKcombinegroup:
  707. return createRoxieServerCombineGroupActivityFactory(id, subgraphId, *this, helperFactory, kind, node);
  708. case TAKrollupgroup:
  709. return createRoxieServerRollupGroupActivityFactory(id, subgraphId, *this, helperFactory, kind, node);
  710. case TAKlocalresultread:
  711. {
  712. unsigned graphId = getGraphId(node);
  713. return createRoxieServerLocalResultReadActivityFactory(id, subgraphId, *this, helperFactory, kind, node, graphId);
  714. }
  715. case TAKlocalstreamread:
  716. return createRoxieServerLocalResultStreamReadActivityFactory(id, subgraphId, *this, helperFactory, kind, node);
  717. case TAKlocalresultwrite:
  718. {
  719. unsigned graphId = getGraphId(node);
  720. return createRoxieServerLocalResultWriteActivityFactory(id, subgraphId, *this, helperFactory, kind, node, usageCount(node), graphId, isRootAction(node));
  721. }
  722. case TAKdictionaryresultwrite:
  723. {
  724. unsigned graphId = getGraphId(node);
  725. return createRoxieServerDictionaryResultWriteActivityFactory(id, subgraphId, *this, helperFactory, kind, node, usageCount(node), graphId, isRootAction(node));
  726. }
  727. case TAKloopcount:
  728. case TAKlooprow:
  729. case TAKloopdataset:
  730. {
  731. unsigned loopId = node.getPropInt("att[@name=\"_loopid\"]/@value", 0);
  732. return createRoxieServerLoopActivityFactory(id, subgraphId, *this, helperFactory, kind, node, loopId);
  733. }
  734. case TAKremotegraph:
  735. {
  736. RemoteActivityId remoteId(id, hashValue);
  737. return createRoxieServerRemoteActivityFactory(id, subgraphId, *this, helperFactory, kind, node, remoteId, isRootAction(node));
  738. }
  739. case TAKgraphloopresultread:
  740. {
  741. unsigned graphId = getGraphId(node);
  742. return createRoxieServerGraphLoopResultReadActivityFactory(id, subgraphId, *this, helperFactory, kind, node, graphId);
  743. }
  744. case TAKgraphloopresultwrite:
  745. {
  746. unsigned graphId = getGraphId(node);
  747. return createRoxieServerGraphLoopResultWriteActivityFactory(id, subgraphId, *this, helperFactory, kind, node, usageCount(node), graphId);
  748. }
  749. case TAKnwaygraphloopresultread:
  750. {
  751. unsigned graphId = node.getPropInt("att[@name=\"_graphId\"]/@value", 0);
  752. return createRoxieServerNWayGraphLoopResultReadActivityFactory(id, subgraphId, *this, helperFactory, kind, node, graphId);
  753. }
  754. case TAKnwayinput:
  755. return createRoxieServerNWayInputActivityFactory(id, subgraphId, *this, helperFactory, kind, node);
  756. case TAKnwaymerge:
  757. return createRoxieServerNWayMergeActivityFactory(id, subgraphId, *this, helperFactory, kind, node);
  758. case TAKnwaymergejoin:
  759. case TAKnwayjoin:
  760. return createRoxieServerNWayMergeJoinActivityFactory(id, subgraphId, *this, helperFactory, kind, node);
  761. case TAKsorted:
  762. return createRoxieServerSortedActivityFactory(id, subgraphId, *this, helperFactory, kind, node);
  763. case TAKgraphloop:
  764. case TAKparallelgraphloop:
  765. {
  766. unsigned loopId = node.getPropInt("att[@name=\"_loopid\"]/@value", 0);
  767. return createRoxieServerGraphLoopActivityFactory(id, subgraphId, *this, helperFactory, kind, node, loopId);
  768. }
  769. case TAKlibrarycall:
  770. {
  771. LibraryCallFactoryExtra extra;
  772. extra.maxOutputs = node.getPropInt("att[@name=\"_maxOutputs\"]/@value", 0);
  773. extra.graphid = node.getPropInt("att[@name=\"_graphid\"]/@value", 0);
  774. extra.libraryName.set(node.queryProp("att[@name=\"libname\"]/@value"));
  775. extra.interfaceHash = node.getPropInt("att[@name=\"_interfaceHash\"]/@value", 0);
  776. extra.embedded = node.getPropBool("att[@name=\"embedded\"]/@value", false) ;
  777. if (extra.embedded)
  778. {
  779. extra.embeddedGraphName.set(node.queryProp("att[@name=\"graph\"]/@value"));
  780. if (!extra.embeddedGraphName)
  781. extra.embeddedGraphName.set(extra.libraryName);
  782. }
  783. Owned<IPropertyTreeIterator> iter = node.getElements("att[@name=\"_outputUsed\"]");
  784. ForEach(*iter)
  785. extra.outputs.append(iter->query().getPropInt("@value"));
  786. return createRoxieServerLibraryCallActivityFactory(id, subgraphId, *this, helperFactory, kind, node, extra);
  787. }
  788. case TAKnwayselect:
  789. return createRoxieServerNWaySelectActivityFactory(id, subgraphId, *this, helperFactory, kind, node);
  790. case TAKnonempty:
  791. return createRoxieServerNonEmptyActivityFactory(id, subgraphId, *this, helperFactory, kind, node);
  792. case TAKprefetchproject:
  793. return createRoxieServerPrefetchProjectActivityFactory(id, subgraphId, *this, helperFactory, kind, node);
  794. case TAKwhen_dataset:
  795. return createRoxieServerWhenActivityFactory(id, subgraphId, *this, helperFactory, kind, node);
  796. case TAKwhen_action:
  797. return createRoxieServerWhenActionActivityFactory(id, subgraphId, *this, helperFactory, kind, node, isRootAction(node));
  798. case TAKdistribution:
  799. return createRoxieServerDistributionActivityFactory(id, subgraphId, *this, helperFactory, kind, node, isRootAction(node));
  800. // These are not required in Roxie for the time being - code generator should trap them
  801. case TAKchilddataset:
  802. default:
  803. throw MakeStringException(ROXIE_UNIMPLEMENTED_ERROR, "Unimplemented activity %s required", getActivityText(kind));
  804. break;
  805. }
  806. throwUnexpected(); // unreachable, but some compilers will complain about missing return
  807. }
  808. IActivityFactory *findActivity(unsigned id) const
  809. {
  810. if (id)
  811. {
  812. IActivityFactory **f = allActivities.getValue(id);
  813. if (f)
  814. return *f;
  815. }
  816. return NULL;
  817. }
  818. virtual IRoxieServerActivityFactory *getRoxieServerActivityFactory(unsigned id) const
  819. {
  820. checkSuspended();
  821. return LINK(QUERYINTERFACE(findActivity(id), IRoxieServerActivityFactory));
  822. }
  823. virtual ISlaveActivityFactory *getSlaveActivityFactory(unsigned id) const
  824. {
  825. checkSuspended();
  826. IActivityFactory *f = findActivity(id);
  827. return LINK(QUERYINTERFACE(f, ISlaveActivityFactory)); // MORE - don't dynamic cast yuk
  828. }
  829. ActivityArray *loadChildGraph(IPropertyTree &graph)
  830. {
  831. // MORE - this is starting to look very much like loadGraph (on Roxie server side)
  832. ActivityArray *activities = new ActivityArray(true, graph.getPropBool("@delayed"), graph.getPropBool("@library"), graph.getPropBool("@sequential"));
  833. unsigned subgraphId = graph.getPropInt("@id");
  834. try
  835. {
  836. Owned<IPropertyTreeIterator> nodes = graph.getElements("node");
  837. ForEach(*nodes)
  838. {
  839. IPropertyTree &node = nodes->query();
  840. loadNode(node, subgraphId, activities);
  841. }
  842. Owned<IPropertyTreeIterator> edges = graph.getElements("edge");
  843. ForEach(*edges)
  844. {
  845. IPropertyTree &edge = edges->query();
  846. unsigned source = activities->findActivityIndex(edge.getPropInt("@source",0));
  847. unsigned target = activities->findActivityIndex(edge.getPropInt("@target",0));
  848. unsigned sourceOutput = edge.getPropInt("att[@name=\"_sourceIndex\"]/@value", 0);
  849. unsigned targetInput = edge.getPropInt("att[@name=\"_targetIndex\"]/@value", 0);
  850. int controlId = edge.getPropInt("att[@name=\"_when\"]/@value", 0);
  851. if (controlId != 0)
  852. {
  853. const char * edgeId = edge.queryProp("@id");
  854. addDependency(sourceOutput, source, target, controlId, edgeId, activities);
  855. }
  856. else
  857. activities->serverItem(target).setInput(targetInput, source, sourceOutput);
  858. }
  859. }
  860. catch (...)
  861. {
  862. ::Release(activities);
  863. allActivities.kill();
  864. throw;
  865. }
  866. return activities;
  867. }
  868. void loadNode(IPropertyTree &node, unsigned subgraphId, ActivityArray *activities)
  869. {
  870. ThorActivityKind kind = getActivityKind(node);
  871. if (kind==TAKsubgraph)
  872. {
  873. IPropertyTree * childGraphNode = node.queryPropTree("att/graph");
  874. if (childGraphNode->getPropBool("@child"))
  875. {
  876. loadSubgraph(node, activities);
  877. }
  878. else
  879. {
  880. unsigned parentId = findParentId(node);
  881. assertex(parentId);
  882. unsigned parentIdx = activities->findActivityIndex(parentId);
  883. IActivityFactory &parentFactory = activities->item(parentIdx);
  884. ActivityArray *childQuery = loadChildGraph(*childGraphNode);
  885. parentFactory.addChildQuery(node.getPropInt("@id"), childQuery);
  886. }
  887. }
  888. else if (kind)
  889. {
  890. IRoxieServerActivityFactory *f = createActivityFactory(kind, subgraphId, node);
  891. if (f)
  892. {
  893. activities->append(*f);
  894. allActivities.setValue(f->queryId(), f);
  895. }
  896. }
  897. }
  898. void loadSubgraph(IPropertyTree &graph, ActivityArray *activities)
  899. {
  900. unsigned subgraphId = graph.getPropInt("@id");
  901. Owned<IPropertyTreeIterator> nodes = graph.getElements("att/graph/node");
  902. ForEach(*nodes)
  903. {
  904. IPropertyTree &node = nodes->query();
  905. loadNode(node, subgraphId, activities);
  906. }
  907. if (!isSuspended)
  908. {
  909. Owned<IPropertyTreeIterator> edges = graph.getElements("att/graph/edge");
  910. ForEach(*edges)
  911. {
  912. IPropertyTree &edge = edges->query();
  913. unsigned sourceActivity = edge.getPropInt("@source", 0);
  914. unsigned targetActivity = edge.getPropInt("@target", 0);
  915. unsigned source = activities->findActivityIndex(sourceActivity);
  916. unsigned target = activities->recursiveFindActivityIndex(targetActivity);
  917. unsigned sourceOutput = edge.getPropInt("att[@name=\"_sourceIndex\"]/@value", 0);
  918. unsigned targetInput = edge.getPropInt("att[@name=\"_targetIndex\"]/@value", 0);
  919. int controlId = edge.getPropInt("att[@name=\"_when\"]/@value", 0);
  920. if (controlId != 0)
  921. {
  922. const char * edgeId = edge.queryProp("@id");
  923. addDependency(sourceOutput, sourceActivity, targetActivity, controlId, edgeId, activities);
  924. }
  925. else
  926. activities->serverItem(target).setInput(targetInput, source, sourceOutput);
  927. }
  928. }
  929. }
  930. // loadGraph loads outer level graph. This is virtual as slave is very different from Roxie server
  931. virtual ActivityArray *loadGraph(IPropertyTree &graph, const char *graphName) = 0;
  932. bool doAddDependency(unsigned sourceIdx, unsigned sourceId, unsigned targetId, int controlId, const char *edgeId, ActivityArray * activities)
  933. {
  934. // Note - the dependency is recorded with the target being the parent activity that is at the same level as the source
  935. // (recording it on the child that was actually dependent would mean it happened too late)
  936. unsigned source = activities->findActivityIndex(sourceId);
  937. if (source != NotFound)
  938. {
  939. unsigned target = activities->recursiveFindActivityIndex(targetId);
  940. activities->serverItem(target).addDependency(source, activities->serverItem(source).getKind(), sourceIdx, controlId, edgeId);
  941. activities->serverItem(source).noteDependent(target);
  942. return true;
  943. }
  944. ForEachItemIn(idx, *activities)
  945. {
  946. IActivityFactory & cur = activities->item(idx);
  947. unsigned childId;
  948. for (unsigned childIdx = 0;;childIdx++)
  949. {
  950. ActivityArray * children = cur.queryChildQuery(childIdx, childId);
  951. if (!children)
  952. break;
  953. if (doAddDependency(sourceIdx, sourceId, targetId, controlId, edgeId, children))
  954. return true;
  955. }
  956. }
  957. return false;
  958. }
  959. virtual void addDependency(unsigned sourceIdx, unsigned sourceId, unsigned targetId, int controlId, const char *edgeId, ActivityArray * activities)
  960. {
  961. doAddDependency(sourceIdx, sourceId, targetId, controlId, edgeId, activities);
  962. }
  963. void addDependencies(IPropertyTree &graph, ActivityArray *activities)
  964. {
  965. Owned<IPropertyTreeIterator> dependencies = graph.getElements("edge");
  966. ForEach(*dependencies)
  967. {
  968. IPropertyTree &edge = dependencies->query();
  969. if (!edge.getPropInt("att[@name=\"_childGraph\"]/@value", 0))
  970. {
  971. unsigned sourceIdx = edge.getPropInt("att[@name=\"_sourceIndex\"]/@value", 0);
  972. int controlId = edge.getPropInt("att[@name=\"_when\"]/@value", 0);
  973. addDependency(sourceIdx, edge.getPropInt("att[@name=\"_sourceActivity\"]/@value", 0), edge.getPropInt("att[@name=\"_targetActivity\"]/@value", 0), controlId, edge.queryProp("@id"), activities);
  974. }
  975. }
  976. }
  977. public:
  978. IMPLEMENT_IINTERFACE;
  979. unsigned channelNo;
  980. CQueryFactory(const char *_id, const IQueryDll *_dll, const IRoxiePackage &_package, hash64_t _hashValue, unsigned _channelNo, ISharedOnceContext *_sharedOnceContext, bool _dynamic)
  981. : id(_id), package(_package), dll(_dll), channelNo(_channelNo), hashValue(_hashValue), sharedOnceContext(_sharedOnceContext), dynamic(_dynamic)
  982. {
  983. package.Link();
  984. isSuspended = false;
  985. isLoadFailed = false;
  986. libraryInterfaceHash = 0;
  987. options.enableFieldTranslation = package.getEnableFieldTranslation(); // NOTE - can be overridden by wu settings
  988. options.allSortsMaySpill = dynamic;
  989. }
  990. ~CQueryFactory()
  991. {
  992. HashIterator graphs(graphMap);
  993. for(graphs.first();graphs.isValid();graphs.next())
  994. {
  995. ActivityArray *a = *graphMap.mapToValue(&graphs.query());
  996. a->Release();
  997. }
  998. package.Release();
  999. }
  1000. virtual IQueryFactory *lookupLibrary(const char *libraryName, unsigned expectedInterfaceHash, const IRoxieContextLogger &logctx) const
  1001. {
  1002. return globalPackageSetManager->lookupLibrary(libraryName, expectedInterfaceHash, logctx);
  1003. }
  1004. virtual void beforeDispose()
  1005. {
  1006. SpinBlock b(queriesCrit);
  1007. // NOTE: it's theoretically possible for the final release to happen after a replacement has been inserted into hash table.
  1008. // So only remove from hash table if what we find there matches the item that is being deleted.
  1009. CQueryFactory *goer = queryMap.getValue(hashValue+channelNo);
  1010. if (goer == this)
  1011. queryMap.remove(hashValue+channelNo);
  1012. }
  1013. static IQueryFactory *getQueryFactory(hash64_t hashValue, unsigned channelNo)
  1014. {
  1015. SpinBlock b(queriesCrit);
  1016. CQueryFactory *factory = LINK(queryMap.getValue(hashValue+channelNo));
  1017. if (factory && factory->isAlive())
  1018. return factory;
  1019. else
  1020. return NULL;
  1021. }
  1022. static hash64_t getQueryHash(const char *id, const IQueryDll *dll, const IRoxiePackage &package, const IPropertyTree *stateInfo, IArrayOf<IResolvedFile> &files, bool isDynamic)
  1023. {
  1024. hash64_t hashValue = package.queryHash();
  1025. if (traceLevel > 8)
  1026. DBGLOG("getQueryHash: %s %" I64F "u from package", id, hashValue);
  1027. if (dll)
  1028. {
  1029. hashValue = rtlHash64VStr(dll->queryDll()->queryName(), hashValue);
  1030. if (traceLevel > 8)
  1031. DBGLOG("getQueryHash: %s %" I64F "u from dll", id, hashValue);
  1032. if (!lockSuperFiles && !allFilesDynamic && !isDynamic && !package.isCompulsory())
  1033. {
  1034. IConstWorkUnit *wu = dll->queryWorkUnit();
  1035. if (wu) // wu may be null in some unit test cases
  1036. {
  1037. SCMStringBuffer bStr;
  1038. // Don't want to include files referenced in thor graphs... in practice isDynamic also likely to be set in such cases
  1039. if (getClusterType(wu->getDebugValue("targetClusterType", bStr).str(), RoxieCluster) == RoxieCluster)
  1040. {
  1041. Owned<IConstWUGraphIterator> graphs = &wu->getGraphs(GraphTypeActivities);
  1042. ForEach(*graphs)
  1043. {
  1044. Owned<IPropertyTree> graphXgmml = graphs->query().getXGMMLTree(false);
  1045. Owned<IPropertyTreeIterator> nodes = graphXgmml->getElements(".//node");
  1046. ForEach(*nodes)
  1047. {
  1048. IPropertyTree &node = nodes->query();
  1049. ThorActivityKind kind = getActivityKind(node);
  1050. if (kind != TAKdiskwrite && kind != TAKindexwrite && kind != TAKpiperead && kind != TAKpipewrite)
  1051. {
  1052. const char *fileName = queryNodeFileName(node, kind);
  1053. const char *indexName = queryNodeIndexName(node, kind);
  1054. // What about packages that resolve everything without dali?
  1055. if (indexName)
  1056. {
  1057. bool isOpt = pretendAllOpt || node.getPropBool("att[@name='_isIndexOpt']/@value");
  1058. const IResolvedFile *indexFile = package.lookupFileName(indexName, isOpt, true, true, wu, true);
  1059. if (indexFile)
  1060. {
  1061. hashValue = indexFile->addHash64(hashValue);
  1062. if (traceLevel > 8)
  1063. DBGLOG("getQueryHash: %s %" I64F "u from index %s", id, hashValue, indexName);
  1064. files.append(*const_cast<IResolvedFile *>(indexFile));
  1065. }
  1066. }
  1067. if (fileName)
  1068. {
  1069. if (!node.getPropBool("att[@name='_isSpill']/@value") && !node.getPropBool("att[@name='_isSpillGlobal']/@value"))
  1070. {
  1071. bool isOpt = pretendAllOpt || node.getPropBool("att[@name='_isOpt']/@value");
  1072. const IResolvedFile *dataFile = package.lookupFileName(fileName, isOpt, true, true, wu, true);
  1073. if (dataFile)
  1074. {
  1075. hashValue = dataFile->addHash64(hashValue);
  1076. if (traceLevel > 8)
  1077. DBGLOG("getQueryHash: %s %" I64F "u from index %s", id, hashValue, fileName);
  1078. files.append(*const_cast<IResolvedFile *>(dataFile));
  1079. }
  1080. }
  1081. }
  1082. }
  1083. }
  1084. }
  1085. }
  1086. }
  1087. }
  1088. }
  1089. if (id)
  1090. hashValue = rtlHash64VStr(id, hashValue);
  1091. if (traceLevel > 8)
  1092. DBGLOG("getQueryHash: %s %" I64F "u from id", id, hashValue);
  1093. if (stateInfo)
  1094. {
  1095. StringBuffer xml;
  1096. toXML(stateInfo, xml);
  1097. hashValue = rtlHash64Data(xml.length(), xml.str(), hashValue);
  1098. if (traceLevel > 8)
  1099. DBGLOG("getQueryHash: %s %" I64F "u from stateInfo", id, hashValue);
  1100. }
  1101. if (traceLevel > 8)
  1102. DBGLOG("getQueryHash: %s %" I64F "u", id, hashValue);
  1103. return hashValue;
  1104. }
  1105. virtual void load(const IPropertyTree *stateInfo)
  1106. {
  1107. IConstWorkUnit *wu = dll->queryWorkUnit();
  1108. if (wu) // wu may be null in some unit test cases
  1109. {
  1110. libraryInterfaceHash = wu->getApplicationValueInt("LibraryModule", "interfaceHash", 0);
  1111. options.setFromWorkUnit(*wu, stateInfo);
  1112. SCMStringBuffer bStr;
  1113. targetClusterType = getClusterType(wu->getDebugValue("targetClusterType", bStr).str(), RoxieCluster);
  1114. // NOTE: stateinfo overrides package info
  1115. if (stateInfo)
  1116. {
  1117. // info in querySets can override the defaults from workunit for some limits
  1118. isSuspended = stateInfo->getPropBool("@suspended", false);
  1119. }
  1120. if (targetClusterType == RoxieCluster)
  1121. {
  1122. Owned<IConstWUGraphIterator> graphs = &wu->getGraphs(GraphTypeActivities);
  1123. SCMStringBuffer graphNameStr;
  1124. ForEach(*graphs)
  1125. {
  1126. graphs->query().getName(graphNameStr);
  1127. const char *graphName = graphNameStr.s.str();
  1128. Owned<IPropertyTree> graphXgmml = graphs->query().getXGMMLTree(false);
  1129. try
  1130. {
  1131. ActivityArray *activities = loadGraph(*graphXgmml, graphName);
  1132. graphMap.setValue(graphName, activities);
  1133. }
  1134. catch (IException *E)
  1135. {
  1136. StringBuffer m;
  1137. E->errorMessage(m);
  1138. suspend(m.str());
  1139. ERRLOG("Query %s suspended: %s", id.get(), m.str());
  1140. E->Release();
  1141. }
  1142. }
  1143. }
  1144. }
  1145. SpinBlock b(queriesCrit);
  1146. queryMap.setValue(hashValue+channelNo, this);
  1147. }
  1148. virtual unsigned queryChannel() const
  1149. {
  1150. return channelNo;
  1151. }
  1152. virtual hash64_t queryHash() const
  1153. {
  1154. return hashValue;
  1155. }
  1156. virtual ISharedOnceContext *querySharedOnceContext() const
  1157. {
  1158. return sharedOnceContext;
  1159. }
  1160. virtual IDeserializedResultStore &queryOnceResultStore() const
  1161. {
  1162. assertex(sharedOnceContext);
  1163. return sharedOnceContext->queryOnceResultStore();
  1164. }
  1165. virtual IPropertyTree &queryOnceContext(const IRoxieContextLogger &logctx) const
  1166. {
  1167. assertex(sharedOnceContext);
  1168. return sharedOnceContext->queryOnceContext(this, logctx);
  1169. }
  1170. virtual const char *loadResource(unsigned id)
  1171. {
  1172. return (const char *) queryDll()->getResource(id);
  1173. }
  1174. virtual ActivityArray *lookupGraphActivities(const char *name) const
  1175. {
  1176. return *graphMap.getValue(name);
  1177. }
  1178. virtual IActivityGraph *lookupGraph(IRoxieSlaveContext *ctx, const char *name, IProbeManager *probeManager, const IRoxieContextLogger &logctx, IRoxieServerActivity *parentActivity) const
  1179. {
  1180. assertex(name && *name);
  1181. ActivityArrayPtr *graph = graphMap.getValue(name);
  1182. assertex(graph);
  1183. Owned<IActivityGraph> ret = ::createActivityGraph(ctx, name, 0, **graph, parentActivity, probeManager, logctx);
  1184. return ret.getClear();
  1185. }
  1186. void getGraphStats(StringBuffer &reply, const IPropertyTree &thisGraph) const
  1187. {
  1188. Owned<IPropertyTree> graph = createPTreeFromIPT(&thisGraph);
  1189. Owned<IPropertyTreeIterator> edges = graph->getElements(".//edge");
  1190. ForEach(*edges)
  1191. {
  1192. IPropertyTree &edge = edges->query();
  1193. IActivityFactory *a = findActivity(edge.getPropInt("@source", 0));
  1194. if (!a)
  1195. a = findActivity(edge.getPropInt("att[@name=\"_sourceActivity\"]/@value", 0));
  1196. if (a)
  1197. {
  1198. unsigned sourceOutput = edge.getPropInt("att[@name=\"_sourceIndex\"]/@value", 0);
  1199. a->getEdgeProgressInfo(sourceOutput, edge);
  1200. }
  1201. }
  1202. Owned<IPropertyTreeIterator> nodes = graph->getElements(".//node");
  1203. ForEach(*nodes)
  1204. {
  1205. IPropertyTree &node = nodes->query();
  1206. IActivityFactory *a = findActivity(node.getPropInt("@id", 0));
  1207. if (a)
  1208. a->getNodeProgressInfo(node);
  1209. }
  1210. toXML(graph, reply);
  1211. }
  1212. virtual IPropertyTree* cloneQueryXGMML() const
  1213. {
  1214. assertex(dll && dll->queryWorkUnit());
  1215. Owned<IPropertyTree> tree = createPTree("Query");
  1216. Owned<IConstWUGraphIterator> graphs = &dll->queryWorkUnit()->getGraphs(GraphTypeActivities);
  1217. SCMStringBuffer graphNameStr;
  1218. ForEach(*graphs)
  1219. {
  1220. graphs->query().getName(graphNameStr);
  1221. const char *graphName = graphNameStr.s.str();
  1222. Owned<IPropertyTree> graphXgmml = graphs->query().getXGMMLTree(false);
  1223. IPropertyTree *newGraph = createPTree();
  1224. newGraph->setProp("@id", graphName);
  1225. IPropertyTree *newXGMML = createPTree();
  1226. newXGMML->addPropTree("graph", graphXgmml.getLink());
  1227. newGraph->addPropTree("xgmml", newXGMML);
  1228. tree->addPropTree("Graph", newGraph);
  1229. }
  1230. return tree.getClear();
  1231. }
  1232. virtual void getStats(StringBuffer &reply, const char *graphName) const
  1233. {
  1234. if (dll)
  1235. {
  1236. assertex(dll->queryWorkUnit());
  1237. Owned<IConstWUGraphIterator> graphs = &dll->queryWorkUnit()->getGraphs(GraphTypeActivities);
  1238. SCMStringBuffer thisGraphNameStr;
  1239. ForEach(*graphs)
  1240. {
  1241. graphs->query().getName(thisGraphNameStr);
  1242. if (graphName)
  1243. {
  1244. if (thisGraphNameStr.length() && (stricmp(graphName, thisGraphNameStr.s.str()) != 0))
  1245. continue; // not interested in this one
  1246. }
  1247. reply.appendf("<Graph id='%s'><xgmml>", thisGraphNameStr.s.str());
  1248. Owned<IPropertyTree> graphXgmml = graphs->query().getXGMMLTree(false);
  1249. getGraphStats(reply, *graphXgmml);
  1250. reply.append("</xgmml></Graph>");
  1251. }
  1252. }
  1253. }
  1254. virtual void getActivityMetrics(StringBuffer &reply) const
  1255. {
  1256. HashIterator i(allActivities);
  1257. StringBuffer myReply;
  1258. ForEach(i)
  1259. {
  1260. IActivityFactory *f = *allActivities.mapToValue(&i.query());
  1261. f->getActivityMetrics(myReply.clear());
  1262. if (myReply.length())
  1263. {
  1264. reply.appendf(" <activity query='%s' id='%d' channel='%d'\n", queryQueryName(), f->queryId(), queryChannel());
  1265. reply.append(myReply);
  1266. reply.append(" </activity>\n");
  1267. }
  1268. }
  1269. }
  1270. virtual void getQueryInfo(StringBuffer &reply, bool full, IArrayOf<IQueryFactory> *slaveQueries, const IRoxieContextLogger &logctx) const
  1271. {
  1272. Owned<IPropertyTree> xref = createPTree("Query", 0);
  1273. xref->setProp("@id", id);
  1274. if (suspended())
  1275. {
  1276. xref->setPropBool("@suspended", true);
  1277. xref->setProp("@error", errorMessage);
  1278. }
  1279. if (full)
  1280. {
  1281. HashIterator i(allActivities);
  1282. ForEach(i)
  1283. {
  1284. IActivityFactory *f = *allActivities.mapToValue(&i.query());
  1285. f->getXrefInfo(*xref, logctx);
  1286. }
  1287. }
  1288. if (slaveQueries)
  1289. {
  1290. ForEachItemIn(idx, *slaveQueries)
  1291. {
  1292. if (slaveQueries->item(idx).suspended())
  1293. {
  1294. xref->setPropBool("@suspended", true);
  1295. xref->setPropBool("@slaveSuspended", true);
  1296. }
  1297. }
  1298. }
  1299. toXML(xref, reply, 1, XML_Embed|XML_LineBreak|XML_SortTags);
  1300. }
  1301. virtual void resetQueryTimings()
  1302. {
  1303. HashIterator i(allActivities);
  1304. ForEach(i)
  1305. {
  1306. IActivityFactory *f = *allActivities.mapToValue(&i.query());
  1307. f->resetNodeProgressInfo();
  1308. }
  1309. }
  1310. virtual const char *queryErrorMessage() const
  1311. {
  1312. return errorMessage.str();
  1313. }
  1314. virtual const char *queryQueryName() const
  1315. {
  1316. return id;
  1317. }
  1318. virtual bool isQueryLibrary() const
  1319. {
  1320. return libraryInterfaceHash != 0;
  1321. }
  1322. virtual unsigned getQueryLibraryInterfaceHash() const
  1323. {
  1324. return libraryInterfaceHash;
  1325. }
  1326. virtual void suspend(const char* errMsg)
  1327. {
  1328. isSuspended = true;
  1329. isLoadFailed = true;
  1330. errorMessage.append(errMsg);
  1331. }
  1332. virtual bool loadFailed() const
  1333. {
  1334. return isLoadFailed;
  1335. }
  1336. virtual bool suspended() const
  1337. {
  1338. return isSuspended;
  1339. }
  1340. virtual const QueryOptions &queryOptions() const
  1341. {
  1342. return options;
  1343. }
  1344. virtual ILoadedDllEntry *queryDll() const
  1345. {
  1346. assertex(dll);
  1347. return dll->queryDll();
  1348. }
  1349. virtual IConstWorkUnit *queryWorkUnit() const
  1350. {
  1351. assertex(dll);
  1352. return dll->queryWorkUnit();
  1353. }
  1354. virtual const IRoxiePackage &queryPackage() const
  1355. {
  1356. return package;
  1357. }
  1358. virtual CRoxieWorkflowMachine *createWorkflowMachine(IConstWorkUnit *wu, bool isOnce, const IRoxieContextLogger &logctx) const
  1359. {
  1360. throwUnexpected(); // only on server...
  1361. }
  1362. virtual char *getEnv(const char *name, const char *defaultValue) const
  1363. {
  1364. if (!defaultValue)
  1365. defaultValue = "";
  1366. const char *result;
  1367. if (name && *name=='@')
  1368. {
  1369. // @ is shorthand for control: for legacy compatibility reasons
  1370. StringBuffer useName;
  1371. useName.append("control:").append(name+1);
  1372. result = package.queryEnv(useName.str());
  1373. }
  1374. else
  1375. result = package.queryEnv(name);
  1376. if (!result)
  1377. result = getenv(name);
  1378. return strdup(result ? result : defaultValue);
  1379. }
  1380. virtual IRoxieSlaveContext *createSlaveContext(const SlaveContextLogger &logctx, IRoxieQueryPacket *packet, bool hasChildren) const
  1381. {
  1382. throwUnexpected(); // only implemented in derived slave class
  1383. }
  1384. virtual IRoxieServerContext *createContext(IPropertyTree *xml, IHpccProtocolResponse *protocol, unsigned flags, const ContextLogger &_logctx, PTreeReaderOptions xmlReadFlags, const char *querySetName) const
  1385. {
  1386. throwUnexpected(); // only implemented in derived server class
  1387. }
  1388. virtual IRoxieServerContext *createContext(IConstWorkUnit *wu, const ContextLogger &_logctx) const
  1389. {
  1390. throwUnexpected(); // only implemented in derived server class
  1391. }
  1392. virtual void noteQuery(time_t startTime, bool failed, unsigned elapsed, unsigned memused, unsigned slavesReplyLen, unsigned bytesOut)
  1393. {
  1394. throwUnexpected(); // only implemented in derived server class
  1395. }
  1396. virtual IPropertyTree *getQueryStats(time_t from, time_t to)
  1397. {
  1398. throwUnexpected(); // only implemented in derived server class
  1399. }
  1400. virtual void getGraphNames(StringArray &ret) const
  1401. {
  1402. Owned<IConstWUGraphIterator> graphs = &dll->queryWorkUnit()->getGraphs(GraphTypeActivities);
  1403. ForEach(*graphs)
  1404. {
  1405. SCMStringBuffer graphName;
  1406. graphs->query().getName(graphName);
  1407. ret.append(graphName.str());
  1408. }
  1409. }
  1410. virtual bool isDynamic() const
  1411. {
  1412. return dynamic;
  1413. }
  1414. protected:
  1415. IPropertyTree *queryWorkflowTree() const
  1416. {
  1417. assertex(dll->queryWorkUnit());
  1418. return dll->queryWorkUnit()->queryWorkflowTree();
  1419. }
  1420. bool hasOnceSection() const
  1421. {
  1422. IPropertyTree *workflow = queryWorkflowTree();
  1423. if (workflow)
  1424. return workflow->hasProp("Item[@mode='once']");
  1425. else
  1426. return false;
  1427. }
  1428. virtual void checkSuspended() const
  1429. {
  1430. if (isSuspended)
  1431. {
  1432. StringBuffer err;
  1433. if (errorMessage.length())
  1434. err.appendf(" because %s", errorMessage.str());
  1435. throw MakeStringException(ROXIE_QUERY_SUSPENDED, "Query %s is suspended%s", id.get(), err.str());
  1436. }
  1437. }
  1438. };
  1439. CriticalSection CQueryFactory::queryCreateLock;
  1440. SpinLock CQueryFactory::queriesCrit;
  1441. CopyMapXToMyClass<hash64_t, hash64_t, CQueryFactory> CQueryFactory::queryMap;
  1442. extern IQueryFactory *getQueryFactory(hash64_t hashvalue, unsigned channel)
  1443. {
  1444. return CQueryFactory::getQueryFactory(hashvalue, channel);
  1445. }
  1446. class CRoxieServerQueryFactory : public CQueryFactory
  1447. {
  1448. // Parts of query factory is only interesting on the server - workflow support, and tracking of total query times
  1449. protected:
  1450. Owned<IQueryStatsAggregator> queryStats;
  1451. public:
  1452. CRoxieServerQueryFactory(const char *_id, const IQueryDll *_dll, const IRoxiePackage &_package, hash64_t _hashValue, ISharedOnceContext *_sharedOnceContext, bool _dynamic)
  1453. : CQueryFactory(_id, _dll, _package, _hashValue, 0, _sharedOnceContext, _dynamic)
  1454. {
  1455. queryStats.setown(createQueryStatsAggregator(id.get(), statsExpiryTime));
  1456. }
  1457. virtual void noteQuery(time_t startTime, bool failed, unsigned elapsed, unsigned memused, unsigned slavesReplyLen, unsigned bytesOut)
  1458. {
  1459. queryStats->noteQuery(startTime, failed, elapsed, memused, slavesReplyLen, bytesOut);
  1460. queryGlobalQueryStatsAggregator()->noteQuery(startTime, failed, elapsed, memused, slavesReplyLen, bytesOut);
  1461. }
  1462. virtual void addDependency(unsigned sourceIdx, unsigned sourceId, unsigned targetId, int controlId, const char *edgeId, ActivityArray * activities)
  1463. {
  1464. // addDependency is expected to fail occasionally on slave, but never on Roxie server
  1465. if (!doAddDependency(sourceIdx, sourceId, targetId, controlId, edgeId, activities))
  1466. throw MakeStringException(ROXIE_ADDDEPENDENCY_ERROR, "Failed to create dependency from %u on %u", sourceId, targetId);
  1467. }
  1468. virtual ActivityArray *loadGraph(IPropertyTree &graph, const char *graphName)
  1469. {
  1470. bool isLibraryGraph = graph.getPropBool("@library");
  1471. bool isSequential = graph.getPropBool("@sequential");
  1472. ActivityArray *activities = new ActivityArray(isLibraryGraph, false, isLibraryGraph, isSequential);
  1473. if (isLibraryGraph)
  1474. activities->setLibraryGraphId(graph.getPropInt("node/@id"));
  1475. try
  1476. {
  1477. Owned<IPropertyTreeIterator> subgraphs = graph.getElements("node");
  1478. ForEach(*subgraphs)
  1479. {
  1480. IPropertyTree &node = subgraphs->query();
  1481. loadSubgraph(node, activities);
  1482. loadNode(node, 0, activities);
  1483. }
  1484. addDependencies(graph, activities);
  1485. }
  1486. catch (...)
  1487. {
  1488. ::Release(activities);
  1489. allActivities.kill();
  1490. throw;
  1491. }
  1492. return activities;
  1493. }
  1494. virtual IRoxieServerContext *createContext(IPropertyTree *context, IHpccProtocolResponse *protocol, unsigned flags, const ContextLogger &_logctx, PTreeReaderOptions _xmlReadFlags, const char *_querySetName) const
  1495. {
  1496. checkSuspended();
  1497. return createRoxieServerContext(context, protocol, this, flags, _logctx, _xmlReadFlags, _querySetName);
  1498. }
  1499. virtual IRoxieServerContext *createContext(IConstWorkUnit *wu, const ContextLogger &_logctx) const
  1500. {
  1501. checkSuspended();
  1502. return createWorkUnitServerContext(wu, this, _logctx);
  1503. }
  1504. virtual CRoxieWorkflowMachine *createWorkflowMachine(IConstWorkUnit *wu, bool isOnce, const IRoxieContextLogger &logctx) const
  1505. {
  1506. IPropertyTree *workflow = queryWorkflowTree();
  1507. if (workflow)
  1508. {
  1509. return ::createRoxieWorkflowMachine(workflow, wu, isOnce, logctx);
  1510. }
  1511. else
  1512. return NULL;
  1513. }
  1514. virtual IPropertyTree *getQueryStats(time_t from, time_t to)
  1515. {
  1516. return queryStats->getStats(from, to);
  1517. }
  1518. };
  1519. unsigned checkWorkunitVersionConsistency(const IConstWorkUnit *wu)
  1520. {
  1521. assertex(wu);
  1522. unsigned wuVersion = wu->getCodeVersion();
  1523. if (wuVersion == 0)
  1524. throw makeStringException(ROXIE_MISMATCH, "Attempting to execute a workunit that hasn't been compiled");
  1525. if (wuVersion > ACTIVITY_INTERFACE_VERSION || wuVersion < MIN_ACTIVITY_INTERFACE_VERSION)
  1526. throw MakeStringException(ROXIE_MISMATCH, "Workunit was compiled for eclhelper interface version %d, this roxie requires version %d..%d", wuVersion, MIN_ACTIVITY_INTERFACE_VERSION, ACTIVITY_INTERFACE_VERSION);
  1527. return wuVersion;
  1528. }
  1529. static void checkWorkunitVersionConsistency(const IQueryDll *dll)
  1530. {
  1531. unsigned wuVersion = checkWorkunitVersionConsistency(dll->queryWorkUnit());
  1532. EclProcessFactory processFactory = (EclProcessFactory) dll->queryDll()->getEntry("createProcess");
  1533. if (processFactory)
  1534. {
  1535. Owned<IEclProcess> process = processFactory();
  1536. assertex(process);
  1537. if (process->getActivityVersion() != wuVersion)
  1538. throw MakeStringException(ROXIE_MISMATCH, "Inconsistent interface versions. Workunit was created using eclcc for version %u, but the c++ compiler used version %u", wuVersion, process->getActivityVersion());
  1539. }
  1540. else
  1541. throw MakeStringException(ROXIE_MISMATCH, "Workunit did not export createProcess function");
  1542. }
  1543. extern IQueryFactory *createServerQueryFactory(const char *id, const IQueryDll *dll, const IRoxiePackage &package, const IPropertyTree *stateInfo, bool isDynamic, bool forceRetry)
  1544. {
  1545. CriticalBlock b(CQueryFactory::queryCreateLock);
  1546. IArrayOf<IResolvedFile> queryFiles; // Note - these should stay in scope long enough to ensure still cached when (if) query is loaded for real
  1547. hash64_t hashValue = CQueryFactory::getQueryHash(id, dll, package, stateInfo, queryFiles, isDynamic);
  1548. IQueryFactory *cached = getQueryFactory(hashValue, 0);
  1549. if (cached && !(cached->loadFailed() && (reloadRetriesFailed || forceRetry)))
  1550. {
  1551. ::Release(dll);
  1552. return cached;
  1553. }
  1554. if (dll && !selfTestMode)
  1555. {
  1556. checkWorkunitVersionConsistency(dll);
  1557. Owned<ISharedOnceContext> sharedOnceContext;
  1558. IPropertyTree *workflow = dll->queryWorkUnit()->queryWorkflowTree();
  1559. if (workflow && workflow->hasProp("Item[@mode='once']"))
  1560. sharedOnceContext.setown(new CSharedOnceContext);
  1561. Owned<CRoxieServerQueryFactory> newFactory = new CRoxieServerQueryFactory(id, dll, dynamic_cast<const IRoxiePackage&>(package), hashValue, sharedOnceContext, isDynamic);
  1562. newFactory->load(stateInfo);
  1563. if (sharedOnceContext && preloadOnceData)
  1564. {
  1565. Owned<StringContextLogger> logctx = new StringContextLogger(id); // NB may get linked by the onceContext
  1566. sharedOnceContext->checkOnceDone(newFactory, *logctx);
  1567. }
  1568. return newFactory.getClear();
  1569. }
  1570. else
  1571. return new CRoxieServerQueryFactory(id, NULL, dynamic_cast<const IRoxiePackage&>(package), hashValue, NULL, isDynamic);
  1572. }
  1573. extern IQueryFactory *createServerQueryFactoryFromWu(IConstWorkUnit *wu)
  1574. {
  1575. Owned<const IQueryDll> dll = createWuQueryDll(wu);
  1576. if (!dll)
  1577. return NULL;
  1578. return createServerQueryFactory(wu->queryWuid(), dll.getClear(), queryRootRoxiePackage(), NULL, true, false); // MORE - if use a constant for id might cache better?
  1579. }
  1580. //==============================================================================================================================================
  1581. class CSlaveQueryFactory : public CQueryFactory
  1582. {
  1583. void addActivity(ISlaveActivityFactory *activity, ActivityArray *activities)
  1584. {
  1585. activities->append(*activity);
  1586. unsigned activityId = activity->queryId();
  1587. allActivities.setValue(activityId, activity);
  1588. }
  1589. void loadSlaveNode(IPropertyTree &node, unsigned subgraphId, ActivityArray *activities)
  1590. {
  1591. ThorActivityKind kind = getActivityKind(node);
  1592. switch (kind)
  1593. {
  1594. case TAKcsvread:
  1595. case TAKxmlread:
  1596. case TAKdiskread:
  1597. case TAKjsonread:
  1598. if (node.getPropBool("att[@name='_isSpill']/@value", false) || node.getPropBool("att[@name='_isSpillGlobal']/@value", false))
  1599. return;
  1600. break;
  1601. case TAKkeyedjoin:
  1602. case TAKkeyeddenormalize:
  1603. case TAKkeyeddenormalizegroup:
  1604. case TAKdisknormalize:
  1605. case TAKdiskcount:
  1606. case TAKdiskaggregate:
  1607. case TAKdiskgroupaggregate:
  1608. case TAKindexread:
  1609. case TAKindexnormalize:
  1610. case TAKindexcount:
  1611. case TAKindexaggregate:
  1612. case TAKindexgroupaggregate:
  1613. case TAKindexgroupexists:
  1614. case TAKindexgroupcount:
  1615. case TAKfetch:
  1616. case TAKcsvfetch:
  1617. case TAKxmlfetch:
  1618. case TAKjsonfetch:
  1619. case TAKremotegraph:
  1620. break;
  1621. case TAKsubgraph:
  1622. break;
  1623. default:
  1624. return;
  1625. }
  1626. ISlaveActivityFactory *newAct = NULL;
  1627. if (kind != TAKsubgraph)
  1628. {
  1629. if (isSuspended)
  1630. newAct = createRoxieDummyActivityFactory(node, subgraphId, *this, false); // MORE - is there any point?
  1631. else
  1632. {
  1633. StringBuffer helperName;
  1634. node.getProp("att[@name=\"helper\"]/@value", helperName);
  1635. if (!helperName.length())
  1636. helperName.append("fAc").append(node.getPropInt("@id", 0));
  1637. HelperFactory *helperFactory = dll->getFactory(helperName.str());
  1638. if (!helperFactory)
  1639. throw MakeStringException(ROXIE_INTERNAL_ERROR, "Internal error: helper function %s not exported", helperName.str());
  1640. switch (kind)
  1641. {
  1642. case TAKdiskread:
  1643. newAct = createRoxieDiskReadActivityFactory(node, subgraphId, *this, helperFactory);
  1644. break;
  1645. case TAKcsvread:
  1646. newAct = createRoxieCsvReadActivityFactory(node, subgraphId, *this, helperFactory);
  1647. break;
  1648. case TAKxmlread:
  1649. case TAKjsonread:
  1650. newAct = createRoxieXmlReadActivityFactory(node, subgraphId, *this, helperFactory);
  1651. break;
  1652. case TAKdisknormalize:
  1653. newAct = createRoxieDiskNormalizeActivityFactory(node, subgraphId, *this, helperFactory);
  1654. break;
  1655. case TAKdiskcount:
  1656. newAct = createRoxieDiskCountActivityFactory(node, subgraphId, *this, helperFactory);
  1657. break;
  1658. case TAKdiskaggregate:
  1659. newAct = createRoxieDiskAggregateActivityFactory(node, subgraphId, *this, helperFactory);
  1660. break;
  1661. case TAKdiskgroupaggregate:
  1662. newAct = createRoxieDiskGroupAggregateActivityFactory(node, subgraphId, *this, helperFactory);
  1663. break;
  1664. case TAKindexread:
  1665. newAct = createRoxieIndexReadActivityFactory(node, subgraphId, *this, helperFactory);
  1666. break;
  1667. case TAKindexnormalize:
  1668. newAct = createRoxieIndexNormalizeActivityFactory(node, subgraphId, *this, helperFactory);
  1669. break;
  1670. case TAKindexcount:
  1671. newAct = createRoxieIndexCountActivityFactory(node, subgraphId, *this, helperFactory);
  1672. break;
  1673. case TAKindexaggregate:
  1674. newAct = createRoxieIndexAggregateActivityFactory(node, subgraphId, *this, helperFactory);
  1675. break;
  1676. case TAKindexgroupaggregate:
  1677. case TAKindexgroupexists:
  1678. case TAKindexgroupcount:
  1679. newAct = createRoxieIndexGroupAggregateActivityFactory(node, subgraphId, *this, helperFactory, kind);
  1680. break;
  1681. case TAKfetch:
  1682. newAct = createRoxieFetchActivityFactory(node, subgraphId, *this, helperFactory);
  1683. break;
  1684. case TAKcsvfetch:
  1685. newAct = createRoxieCSVFetchActivityFactory(node, subgraphId, *this, helperFactory);
  1686. break;
  1687. case TAKxmlfetch:
  1688. case TAKjsonfetch:
  1689. newAct = createRoxieXMLFetchActivityFactory(node, subgraphId, *this, helperFactory);
  1690. break;
  1691. case TAKkeyedjoin:
  1692. case TAKkeyeddenormalize:
  1693. case TAKkeyeddenormalizegroup:
  1694. newAct = createRoxieKeyedJoinIndexActivityFactory(node, subgraphId, *this, helperFactory);
  1695. if (node.getPropBool("att[@name=\"_diskAccessRequired\"]/@value"))
  1696. {
  1697. ISlaveActivityFactory *newAct2 = createRoxieKeyedJoinFetchActivityFactory(node, subgraphId, *this, helperFactory);
  1698. unsigned activityId2 = newAct2->queryId() | ROXIE_ACTIVITY_FETCH;
  1699. activities->append(*newAct2);
  1700. allActivities.setValue(activityId2, newAct2);
  1701. }
  1702. break;
  1703. case TAKremotegraph:
  1704. {
  1705. unsigned graphId = node.getPropInt("att[@name=\"_graphid\"]/@value", 0);
  1706. newAct = createRoxieRemoteActivityFactory(node, subgraphId, *this, helperFactory, graphId);
  1707. break;
  1708. }
  1709. default:
  1710. throwUnexpected();
  1711. }
  1712. }
  1713. if (newAct)
  1714. {
  1715. addActivity(newAct, activities);
  1716. }
  1717. }
  1718. else if (kind == TAKsubgraph)
  1719. {
  1720. // If the subgraph belongs to a remote activity, we need to be able to execute it on the slave...
  1721. IPropertyTree * childGraphNode = node.queryPropTree("att/graph");
  1722. if (!childGraphNode->getPropBool("@child"))
  1723. {
  1724. unsigned parentId = findParentId(node);
  1725. assertex(parentId);
  1726. unsigned parentIndex = activities->findActivityIndex(parentId);
  1727. if (parentIndex != NotFound)
  1728. {
  1729. ActivityArray *childQuery = loadChildGraph(*childGraphNode);
  1730. activities->item(parentIndex).addChildQuery(node.getPropInt("@id"), childQuery);
  1731. }
  1732. }
  1733. // Regardless, we need to make sure we create remote activities as required throughout the graph
  1734. Owned<IPropertyTreeIterator> nodes = node.getElements("att/graph/node");
  1735. unsigned subgraphId = node.getPropInt("@id");
  1736. ForEach(*nodes)
  1737. {
  1738. IPropertyTree &node = nodes->query();
  1739. loadSlaveNode(node, subgraphId, activities);
  1740. }
  1741. }
  1742. }
  1743. void loadOuterSubgraph(IPropertyTree &graph, ActivityArray *activities)
  1744. {
  1745. Owned<IPropertyTreeIterator> nodes = graph.getElements("att/graph/node");
  1746. unsigned subgraphId = graph.getPropInt("@id");
  1747. ForEach(*nodes)
  1748. {
  1749. IPropertyTree &node = nodes->query();
  1750. loadSlaveNode(node, subgraphId, activities);
  1751. }
  1752. loadSlaveNode(graph, subgraphId, activities); // MORE - not really sure why this line is here!
  1753. }
  1754. public:
  1755. CSlaveQueryFactory(const char *_id, const IQueryDll *_dll, const IRoxiePackage &_package, hash64_t _hashValue, unsigned _channelNo, ISharedOnceContext *_sharedOnceContext, bool _dynamic)
  1756. : CQueryFactory(_id, _dll, _package, _hashValue, _channelNo, _sharedOnceContext, _dynamic)
  1757. {
  1758. }
  1759. virtual IRoxieSlaveContext *createSlaveContext(const SlaveContextLogger &logctx, IRoxieQueryPacket *packet, bool hasChildren) const
  1760. {
  1761. return ::createSlaveContext(this, logctx, packet, hasChildren);
  1762. }
  1763. virtual ActivityArray *loadGraph(IPropertyTree &graph, const char *graphName)
  1764. {
  1765. // MORE: common up with loadGraph for the Roxie server..
  1766. bool isLibraryGraph = graph.getPropBool("@library");
  1767. bool isSequential = graph.getPropBool("@sequential");
  1768. ActivityArray *activities = new ActivityArray(isLibraryGraph, false, isLibraryGraph, isSequential);
  1769. if (isLibraryGraph)
  1770. activities->setLibraryGraphId(graph.getPropInt("node/@id"));
  1771. try
  1772. {
  1773. if (false && isLibraryGraph)
  1774. {
  1775. //Really only need to do this if the library is called from a remote activity
  1776. //but it's a bit tricky to work out since the library graph will come before the use.
  1777. //Not a major issue since libraries won't be embedded for production queries.
  1778. // this comment makes little sense...
  1779. Owned<IPropertyTreeIterator> subgraphs = graph.getElements("node");
  1780. ForEach(*subgraphs)
  1781. {
  1782. IPropertyTree &node = subgraphs->query();
  1783. loadSubgraph(node, activities);
  1784. loadNode(node, 0, activities);
  1785. }
  1786. }
  1787. Owned<IPropertyTreeIterator> subgraphs = graph.getElements("node");
  1788. ForEach(*subgraphs)
  1789. {
  1790. IPropertyTree &subgraph = subgraphs->query();
  1791. loadOuterSubgraph(subgraph, activities);
  1792. }
  1793. addDependencies(graph, activities);
  1794. }
  1795. catch (...)
  1796. {
  1797. ::Release(activities);
  1798. allActivities.kill();
  1799. throw;
  1800. }
  1801. return activities;
  1802. }
  1803. };
  1804. IQueryFactory *createSlaveQueryFactory(const char *id, const IQueryDll *dll, const IRoxiePackage &package, unsigned channel, const IPropertyTree *stateInfo, bool isDynamic, bool forceRetry)
  1805. {
  1806. CriticalBlock b(CQueryFactory::queryCreateLock);
  1807. IArrayOf<IResolvedFile> queryFiles; // Note - these should stay in scope long enough to ensure still cached when (if) query is loaded for real
  1808. hash64_t hashValue = CQueryFactory::getQueryHash(id, dll, package, stateInfo, queryFiles, isDynamic);
  1809. IQueryFactory *cached = getQueryFactory(hashValue, channel);
  1810. if (cached)
  1811. {
  1812. ::Release(dll);
  1813. return cached;
  1814. }
  1815. if (dll)
  1816. {
  1817. checkWorkunitVersionConsistency(dll);
  1818. Owned<IQueryFactory> serverFactory = createServerQueryFactory(id, LINK(dll), package, stateInfo, false, forceRetry); // Should always find a cached one
  1819. Owned<CSlaveQueryFactory> newFactory = new CSlaveQueryFactory(id, dll, dynamic_cast<const IRoxiePackage&>(package), hashValue, channel, serverFactory->querySharedOnceContext(), isDynamic);
  1820. newFactory->load(stateInfo);
  1821. return newFactory.getClear();
  1822. }
  1823. else
  1824. return new CSlaveQueryFactory(id, NULL, dynamic_cast<const IRoxiePackage&>(package), hashValue, channel, NULL, isDynamic);
  1825. }
  1826. extern IQueryFactory *createSlaveQueryFactoryFromWu(IConstWorkUnit *wu, unsigned channelNo)
  1827. {
  1828. Owned<const IQueryDll> dll = createWuQueryDll(wu);
  1829. if (!dll)
  1830. return NULL;
  1831. return createSlaveQueryFactory(wu->queryWuid(), dll.getClear(), queryRootRoxiePackage(), channelNo, NULL, true, false); // MORE - if use a constant for id might cache better?
  1832. }
  1833. IRecordLayoutTranslator * createRecordLayoutTranslator(const char *logicalName, IDefRecordMeta const * diskMeta, IDefRecordMeta const * activityMeta)
  1834. {
  1835. try
  1836. {
  1837. return ::createRecordLayoutTranslator(diskMeta, activityMeta);
  1838. }
  1839. catch (IException *E)
  1840. {
  1841. StringBuffer q, d;
  1842. getRecordMetaAsString(q, activityMeta);
  1843. getRecordMetaAsString(d, diskMeta);
  1844. DBGLOG("Activity: %s", q.str());
  1845. DBGLOG("Disk: %s", d.str());
  1846. StringBuffer m;
  1847. m.appendf("In index %s:", logicalName);
  1848. E->errorMessage(m);
  1849. E->Release();
  1850. DBGLOG("%s", m.str());
  1851. throw MakeStringException(ROXIE_RCD_LAYOUT_TRANSLATOR, "%s", m.str());
  1852. }
  1853. }