ccdquery.cpp 91 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323132413251326132713281329133013311332133313341335133613371338133913401341134213431344134513461347134813491350135113521353135413551356135713581359136013611362136313641365136613671368136913701371137213731374137513761377137813791380138113821383138413851386138713881389139013911392139313941395139613971398139914001401140214031404140514061407140814091410141114121413141414151416141714181419142014211422142314241425142614271428142914301431143214331434143514361437143814391440144114421443144414451446144714481449145014511452145314541455145614571458145914601461146214631464146514661467146814691470147114721473147414751476147714781479148014811482148314841485148614871488148914901491149214931494149514961497149814991500150115021503150415051506150715081509151015111512151315141515151615171518151915201521152215231524152515261527152815291530153115321533153415351536153715381539154015411542154315441545154615471548154915501551155215531554155515561557155815591560156115621563156415651566156715681569157015711572157315741575157615771578157915801581158215831584158515861587158815891590159115921593159415951596159715981599160016011602160316041605160616071608160916101611161216131614161516161617161816191620162116221623162416251626162716281629163016311632163316341635163616371638163916401641164216431644164516461647164816491650165116521653165416551656165716581659166016611662166316641665166616671668166916701671167216731674167516761677167816791680168116821683168416851686168716881689169016911692169316941695169616971698169917001701170217031704170517061707170817091710171117121713171417151716171717181719172017211722172317241725172617271728172917301731173217331734173517361737173817391740174117421743174417451746174717481749175017511752175317541755175617571758175917601761176217631764176517661767176817691770177117721773177417751776177717781779178017811782178317841785178617871788178917901791179217931794179517961797179817991800180118021803180418051806180718081809181018111812181318141815181618171818181918201821182218231824182518261827182818291830183118321833183418351836183718381839184018411842184318441845184618471848184918501851185218531854185518561857185818591860186118621863186418651866186718681869187018711872187318741875187618771878187918801881188218831884188518861887188818891890189118921893189418951896189718981899190019011902190319041905190619071908190919101911191219131914191519161917191819191920192119221923192419251926192719281929193019311932193319341935193619371938193919401941194219431944194519461947194819491950195119521953195419551956195719581959196019611962196319641965196619671968196919701971197219731974197519761977197819791980198119821983198419851986198719881989199019911992199319941995199619971998199920002001200220032004200520062007200820092010201120122013201420152016201720182019202020212022202320242025202620272028202920302031203220332034203520362037203820392040204120422043204420452046204720482049205020512052205320542055205620572058205920602061206220632064206520662067206820692070207120722073207420752076207720782079208020812082208320842085208620872088208920902091209220932094209520962097209820992100210121022103210421052106210721082109211021112112
  1. /*##############################################################################
  2. HPCC SYSTEMS software Copyright (C) 2012 HPCC Systems®.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. ############################################################################## */
  13. #include <platform.h>
  14. #include <jlib.hpp>
  15. #include "ccd.hpp"
  16. #include "ccdquery.hpp"
  17. #include "ccdstate.hpp"
  18. #include "ccdsnmp.hpp"
  19. #include "ccdserver.hpp"
  20. #include "ccdcontext.hpp"
  21. #include "thorplugin.hpp"
  22. #include <thread>
  23. #include <mutex>
  24. void ActivityArray::append(IActivityFactory &cur)
  25. {
  26. hash.setValue(cur.queryId(), activities.ordinality());
  27. activities.append(cur);
  28. }
  29. unsigned ActivityArray::findActivityIndex(unsigned id) const
  30. {
  31. unsigned *ret = hash.getValue(id);
  32. if (ret)
  33. return *ret;
  34. return NotFound;
  35. }
  36. unsigned ActivityArray::recursiveFindActivityIndex(unsigned id)
  37. {
  38. // NOTE - this returns the activity index of the PARENT of the specified activity
  39. unsigned *ret = hash.getValue(id);
  40. if (ret)
  41. return *ret;
  42. ForEachItem(idx)
  43. {
  44. IActivityFactory & cur = item(idx);
  45. unsigned childId;
  46. for (unsigned childIdx = 0;;childIdx++)
  47. {
  48. ActivityArray * children = cur.queryChildQuery(childIdx, childId);
  49. if (!children)
  50. break;
  51. if (children->recursiveFindActivityIndex(id) != NotFound)
  52. {
  53. hash.setValue(id, idx);
  54. return idx;
  55. }
  56. }
  57. }
  58. return NotFound;
  59. }
  60. //----------------------------------------------------------------------------------------------
  61. // Class CQueryDll maps dlls into loadable workunits, complete with caching to ensure that a refresh of the QuerySet
  62. // can avoid reloading dlls, and that the same CQueryDll (and the objects it owns) can be shared between server and
  63. // multiple agent channels
  64. //----------------------------------------------------------------------------------------------
  65. class CQueryDll : implements IQueryDll, public CInterface
  66. {
  67. StringAttr dllName;
  68. Owned <ILoadedDllEntry> dll;
  69. Owned <IConstWorkUnit> wu;
  70. std::once_flag started;
  71. Owned<IException> e;
  72. static CriticalSection dllCacheLock;
  73. static CopyMapStringToMyClass<CQueryDll> dllCache;
  74. public:
  75. IMPLEMENT_IINTERFACE;
  76. CQueryDll(const char *_dllName) : dllName(_dllName)
  77. {
  78. }
  79. private:
  80. void init(bool isExe)
  81. {
  82. std::call_once(started, [this, isExe]()
  83. {
  84. try
  85. {
  86. dll.setown(isExe ? createExeDllEntry(dllName) : queryRoxieDllServer().loadDll(dllName, DllLocationDirectory));
  87. StringBuffer wuXML;
  88. if (!selfTestMode && getEmbeddedWorkUnitXML(dll, wuXML))
  89. {
  90. Owned<ILocalWorkUnit> localWU = createLocalWorkUnit(wuXML);
  91. wu.setown(localWU->unlock());
  92. }
  93. }
  94. catch (IException *E)
  95. {
  96. EXCLOG(E);
  97. e.setown(E);
  98. }
  99. });
  100. if (e)
  101. throw e.getLink();
  102. }
  103. public:
  104. virtual void beforeDispose()
  105. {
  106. CriticalBlock b(dllCacheLock);
  107. // NOTE: it's theoretically possible for the final release to happen after a replacement has been inserted into hash table.
  108. // So only remove from hash table if what we find there matches the item that is being deleted.
  109. CQueryDll *goer = dllCache.getValue(dllName);
  110. if (goer == this)
  111. dllCache.remove(dllName);
  112. }
  113. static const CQueryDll *getQueryDll(const char *dllName, bool isExe)
  114. {
  115. CQueryDll *cached = nullptr;
  116. {
  117. CriticalBlock b(dllCacheLock);
  118. cached = dllCache.getValue(dllName);
  119. if (!cached || !cached->isAliveAndLink())
  120. {
  121. cached = new CQueryDll(dllName);
  122. dllCache.setValue(dllName, cached);
  123. }
  124. }
  125. Owned<CQueryDll> dll(cached); // Make sure it's released if init() throws an exception
  126. dll->init(isExe);
  127. return dll.getClear();
  128. }
  129. static const IQueryDll *getWorkUnitDll(IConstWorkUnit *wu)
  130. {
  131. SCMStringBuffer dllName;
  132. Owned<IConstWUQuery> q = wu->getQuery();
  133. q->getQueryDllName(dllName);
  134. if (dllName.length() == 0)
  135. {
  136. if (wu->getCodeVersion() == 0)
  137. throw makeStringExceptionV(ROXIE_MISSING_DLL, "Attempting to load workunit %s that hasn't been compiled", wu->queryWuid());
  138. else
  139. throw makeStringExceptionV(ROXIE_MISSING_DLL, "Attempting to load workunit %s with no associated dll", wu->queryWuid());
  140. }
  141. return getQueryDll(dllName.str(), false);
  142. }
  143. virtual HelperFactory *getFactory(const char *helperName) const
  144. {
  145. return (HelperFactory *) dll->getEntry(helperName);
  146. }
  147. virtual ILoadedDllEntry *queryDll() const
  148. {
  149. return dll;
  150. }
  151. virtual IConstWorkUnit *queryWorkUnit() const
  152. {
  153. return wu;
  154. }
  155. };
  156. CriticalSection CQueryDll::dllCacheLock;
  157. CopyMapStringToMyClass<CQueryDll> CQueryDll::dllCache;
  158. extern const IQueryDll *createQueryDll(const char *dllName)
  159. {
  160. return CQueryDll::getQueryDll(dllName, false);
  161. }
  162. extern const IQueryDll *createExeQueryDll(const char *exeName)
  163. {
  164. return CQueryDll::getQueryDll(exeName, true);
  165. }
  166. extern const IQueryDll *createWuQueryDll(IConstWorkUnit *wu)
  167. {
  168. return CQueryDll::getWorkUnitDll(wu);
  169. }
  170. // Add information to the xref information to be returned for a control:getQueryXrefInfo request
  171. IPropertyTree * addXrefInfo(IPropertyTree &reply, const char *section, const char *name)
  172. {
  173. VStringBuffer xpath("%s[@name='%s']", section, name);
  174. if (!reply.hasProp(xpath))
  175. {
  176. IPropertyTree *info = reply.addPropTree(section);
  177. info->setProp("@name", name);
  178. return info;
  179. }
  180. return NULL;
  181. }
  182. extern void addXrefFileInfo(IPropertyTree &reply, const IResolvedFile *dataFile)
  183. {
  184. if (dataFile->isSuperFile())
  185. {
  186. IPropertyTree *info = addXrefInfo(reply, "SuperFile", dataFile->queryFileName());
  187. if (info)
  188. {
  189. int numSubs = dataFile->numSubFiles();
  190. for (int i = 0; i < numSubs; i++)
  191. {
  192. StringBuffer subName;
  193. dataFile->getSubFileName(i, subName);
  194. addXrefInfo(*info, "File", subName.str());
  195. }
  196. }
  197. }
  198. else
  199. addXrefInfo(reply, "File", dataFile->queryFileName());
  200. }
  201. extern void addXrefLibraryInfo(IPropertyTree &reply, const char *libraryName)
  202. {
  203. addXrefInfo(reply, "Library", libraryName);
  204. }
  205. //----------------------------------------------------------------------------------------------
  206. // Class CSharedOnceContext manages the context for a query's ONCE code, which is shared between
  207. // all agent and server contexts on a node
  208. //----------------------------------------------------------------------------------------------
  209. class CSharedOnceContext : public CInterfaceOf<ISharedOnceContext>
  210. {
  211. public:
  212. CSharedOnceContext()
  213. {
  214. }
  215. ~CSharedOnceContext()
  216. {
  217. }
  218. virtual IDeserializedResultStore &queryOnceResultStore() const
  219. {
  220. assertex(onceResultStore!= NULL);
  221. return *onceResultStore;
  222. }
  223. virtual IPropertyTree &queryOnceContext(const IQueryFactory *factory, const IRoxieContextLogger &logctx) const
  224. {
  225. checkOnceDone(factory, logctx);
  226. assertex(onceContext != NULL);
  227. return *onceContext;
  228. }
  229. virtual void checkOnceDone(const IQueryFactory *factory, const IRoxieContextLogger &logctx) const
  230. {
  231. if (calculatingOnce) // NOTE - this must be outside the critsec or you deadlock. It is still effectively protected by the critsec
  232. return;
  233. CriticalBlock b(onceCrit);
  234. if (!onceContext)
  235. {
  236. calculatingOnce = true;
  237. onceContext.setown(createPTree(ipt_lowmem));
  238. onceResultStore.setown(createDeserializedResultStore());
  239. Owned <IRoxieServerContext> ctx = createOnceServerContext(factory, logctx);
  240. onceManager.set(&ctx->queryRowManager());
  241. try
  242. {
  243. ctx->process();
  244. ctx->done(false);
  245. }
  246. catch (IException *E)
  247. {
  248. ctx->done(true);
  249. onceException.setown(E);
  250. }
  251. catch (...)
  252. {
  253. ctx->done(true);
  254. onceException.setown(MakeStringException(ROXIE_INTERNAL_ERROR, "Unknown exception in ONCE code"));
  255. }
  256. calculatingOnce = false;
  257. }
  258. if (onceException)
  259. throw onceException.getLink();
  260. }
  261. protected:
  262. mutable CriticalSection onceCrit;
  263. mutable Owned<roxiemem::IRowManager> onceManager; // release AFTER resultStore
  264. mutable Owned<IPropertyTree> onceContext;
  265. mutable Owned<IDeserializedResultStore> onceResultStore;
  266. mutable Owned<IException> onceException;
  267. mutable bool calculatingOnce = false;
  268. };
  269. //----------------------------------------------------------------------------------------------
  270. // Class CQueryOptions is used to store options affecting the execution of a query
  271. // These can be set globally, by the query workunit, or by the query XML parameters
  272. //----------------------------------------------------------------------------------------------
  273. QueryOptions::QueryOptions()
  274. {
  275. priority = 0;
  276. timeLimit = defaultTimeLimit[0];
  277. warnTimeLimit = defaultWarnTimeLimit[0];
  278. memoryLimit = defaultMemoryLimit;
  279. parallelJoinPreload = defaultParallelJoinPreload;
  280. fullKeyedJoinPreload = defaultFullKeyedJoinPreload;
  281. keyedJoinPreload = defaultKeyedJoinPreload;
  282. concatPreload = defaultConcatPreload;
  283. fetchPreload = defaultFetchPreload;
  284. prefetchProjectPreload = defaultPrefetchProjectPreload;
  285. bindCores = coresPerQuery;
  286. strandBlockSize = defaultStrandBlockSize;
  287. forceNumStrands = defaultForceNumStrands;
  288. heapFlags = defaultHeapFlags;
  289. checkingHeap = defaultCheckingHeap;
  290. disableLocalOptimizations = defaultDisableLocalOptimizations;
  291. enableFieldTranslation = fieldTranslationEnabled;
  292. stripWhitespaceFromStoredDataset = ((ptr_ignoreWhiteSpace & defaultXmlReadFlags) != 0);
  293. timeActivities = defaultTimeActivities;
  294. traceEnabled = defaultTraceEnabled;
  295. traceLimit = defaultTraceLimit;
  296. noSeekBuildIndex = defaultNoSeekBuildIndex;
  297. allSortsMaySpill = false; // No global default for this
  298. failOnLeaks = false;
  299. collectFactoryStatistics = defaultCollectFactoryStatistics;
  300. parallelWorkflow = false;
  301. numWorkflowThreads = 1;
  302. }
  303. QueryOptions::QueryOptions(const QueryOptions &other)
  304. {
  305. priority = other.priority;
  306. timeLimit = other.timeLimit;
  307. warnTimeLimit = other.warnTimeLimit;
  308. memoryLimit = other.memoryLimit;
  309. parallelJoinPreload = other.parallelJoinPreload;;
  310. fullKeyedJoinPreload = other.fullKeyedJoinPreload;
  311. keyedJoinPreload = other.keyedJoinPreload;
  312. concatPreload = other.concatPreload;
  313. fetchPreload = other.fetchPreload;
  314. prefetchProjectPreload = other.prefetchProjectPreload;
  315. bindCores = other.bindCores;
  316. strandBlockSize = other.strandBlockSize;
  317. forceNumStrands = other.forceNumStrands;
  318. heapFlags = other.heapFlags;
  319. checkingHeap = other.checkingHeap;
  320. disableLocalOptimizations = other.disableLocalOptimizations;
  321. enableFieldTranslation = other.enableFieldTranslation;
  322. stripWhitespaceFromStoredDataset = other.stripWhitespaceFromStoredDataset;
  323. timeActivities = other.timeActivities;
  324. traceEnabled = other.traceEnabled;
  325. traceLimit = other.traceLimit;
  326. noSeekBuildIndex = other.noSeekBuildIndex;
  327. allSortsMaySpill = other.allSortsMaySpill;
  328. failOnLeaks = other.failOnLeaks;
  329. collectFactoryStatistics = other.collectFactoryStatistics;
  330. parallelWorkflow = other.parallelWorkflow;
  331. numWorkflowThreads = other.numWorkflowThreads;
  332. }
  333. void QueryOptions::setFromWorkUnit(IConstWorkUnit &wu, const IPropertyTree *stateInfo)
  334. {
  335. // calculate priority before others since it affects the defaults of others
  336. updateFromWorkUnit(priority, wu, "priority");
  337. if (stateInfo)
  338. updateFromContext(priority, stateInfo, "@priority");
  339. timeLimit = defaultTimeLimit[priority];
  340. warnTimeLimit = defaultWarnTimeLimit[priority];
  341. updateFromWorkUnit(timeLimit, wu, "timeLimit");
  342. updateFromWorkUnit(warnTimeLimit, wu, "warnTimeLimit");
  343. updateFromWorkUnitM(memoryLimit, wu, "memoryLimit");
  344. if (stateInfo)
  345. {
  346. updateFromContext(timeLimit, stateInfo, "@timeLimit");
  347. updateFromContext(warnTimeLimit, stateInfo, "@warnTimeLimit");
  348. updateFromContextM(memoryLimit, stateInfo, "@memoryLimit");
  349. }
  350. updateFromWorkUnit(parallelJoinPreload, wu, "parallelJoinPreload");
  351. updateFromWorkUnit(fullKeyedJoinPreload, wu, "fullKeyedJoinPreload");
  352. updateFromWorkUnit(keyedJoinPreload, wu, "keyedJoinPreload");
  353. updateFromWorkUnit(concatPreload, wu, "concatPreload");
  354. updateFromWorkUnit(fetchPreload, wu, "fetchPreload");
  355. updateFromWorkUnit(prefetchProjectPreload, wu, "prefetchProjectPreload");
  356. updateFromWorkUnit(bindCores, wu, "bindCores");
  357. updateFromWorkUnit(strandBlockSize, wu, "strandBlockSize");
  358. updateFromWorkUnit(forceNumStrands, wu, "forceNumStrands");
  359. updateFromWorkUnit(heapFlags, wu, "heapFlags");
  360. updateFromWorkUnit(checkingHeap, wu, "checkingHeap");
  361. updateFromWorkUnit(disableLocalOptimizations, wu, "disableLocalOptimizations");
  362. updateFromWorkUnit(enableFieldTranslation, wu, "layoutTranslation"); // Name is different for compatibility reasons
  363. updateFromWorkUnit(stripWhitespaceFromStoredDataset, wu, "stripWhitespaceFromStoredDataset");
  364. updateFromWorkUnit(timeActivities, wu, "timeActivities");
  365. updateFromWorkUnit(traceEnabled, wu, "traceEnabled");
  366. updateFromWorkUnit(traceLimit, wu, "traceLimit");
  367. updateFromWorkUnit(allSortsMaySpill, wu, "allSortsMaySpill");
  368. updateFromWorkUnit(failOnLeaks, wu, "failOnLeaks");
  369. updateFromWorkUnit(noSeekBuildIndex, wu, "noSeekBuildIndex");
  370. updateFromWorkUnit(collectFactoryStatistics, wu, "collectFactoryStatistics");
  371. updateFromWorkUnit(parallelWorkflow, wu, "parallelWorkflow");
  372. updateFromWorkUnit(numWorkflowThreads, wu, "numWorkflowthreads");
  373. }
  374. void QueryOptions::updateFromWorkUnitM(memsize_t &value, IConstWorkUnit &wu, const char *name)
  375. {
  376. value = (memsize_t) wu.getDebugValueInt64(name, value);
  377. }
  378. void QueryOptions::updateFromWorkUnit(int &value, IConstWorkUnit &wu, const char *name)
  379. {
  380. value = wu.getDebugValueInt(name, value);
  381. }
  382. void QueryOptions::updateFromWorkUnit(unsigned &value, IConstWorkUnit &wu, const char *name)
  383. {
  384. value = (unsigned) wu.getDebugValueInt(name, value);
  385. }
  386. void QueryOptions::updateFromWorkUnit(bool &value, IConstWorkUnit &wu, const char *name)
  387. {
  388. value = wu.getDebugValueBool(name, value);
  389. }
  390. void QueryOptions::updateFromWorkUnit(RecordTranslationMode &value, IConstWorkUnit &wu, const char *name)
  391. {
  392. SCMStringBuffer val;
  393. wu.getDebugValue(name, val);
  394. if (val.length())
  395. value = getTranslationMode(val.str(), false);
  396. }
  397. void QueryOptions::setFromContext(const IPropertyTree *ctx)
  398. {
  399. if (ctx)
  400. {
  401. // Note: priority cannot be set at context level
  402. updateFromContext(timeLimit, ctx, "@timeLimit", "_TimeLimit");
  403. updateFromContext(warnTimeLimit, ctx, "@warnTimeLimit", "_WarnTimeLimit");
  404. updateFromContextM(memoryLimit, ctx, "@memoryLimit", "_MemoryLimit");
  405. updateFromContext(parallelJoinPreload, ctx, "@parallelJoinPreload", "_ParallelJoinPreload");
  406. updateFromContext(fullKeyedJoinPreload, ctx, "@fullKeyedJoinPreload", "_FullKeyedJoinPreload");
  407. updateFromContext(keyedJoinPreload, ctx, "@keyedJoinPreload", "_KeyedJoinPreload");
  408. updateFromContext(concatPreload, ctx, "@concatPreload", "_ConcatPreload");
  409. updateFromContext(fetchPreload, ctx, "@fetchPreload", "_FetchPreload");
  410. updateFromContext(prefetchProjectPreload, ctx, "@prefetchProjectPreload", "_PrefetchProjectPreload");
  411. updateFromContext(bindCores, ctx, "@bindCores", "_bindCores");
  412. updateFromContext(strandBlockSize, ctx, "@strandBlockSize", "_strandBlockSize");
  413. updateFromContext(forceNumStrands, ctx, "@forceNumStrands", "_forceNumStrands");
  414. updateFromContext(heapFlags, ctx, "@heapFlags", "_HeapFlags");
  415. updateFromContext(checkingHeap, ctx, "@checkingHeap", "_CheckingHeap");
  416. // Note: disableLocalOptimizations is not permitted at context level (too late)
  417. // Note: enableFieldTranslation is not permitted at context level (generally too late anyway)
  418. updateFromContext(stripWhitespaceFromStoredDataset, ctx, "_StripWhitespaceFromStoredDataset", "@stripWhitespaceFromStoredDataset");
  419. updateFromContext(timeActivities, ctx, "@timeActivities", "_TimeActivities");
  420. updateFromContext(traceEnabled, ctx, "@traceEnabled", "_TraceEnabled");
  421. updateFromContext(traceLimit, ctx, "@traceLimit", "_TraceLimit");
  422. updateFromContext(noSeekBuildIndex, ctx, "@noSeekBuildIndex", "_NoSeekBuildIndex");
  423. // Note: allSortsMaySpill is not permitted at context level (too late anyway, unless I refactored)
  424. updateFromContext(failOnLeaks, ctx, "@failOnLeaks", "_FailOnLeaks");
  425. updateFromContext(collectFactoryStatistics, ctx, "@collectFactoryStatistics", "_CollectFactoryStatistics");
  426. updateFromContext(parallelWorkflow, ctx, "@parallelWorkflow", "_parallelWorkflow");
  427. updateFromContext(numWorkflowThreads, ctx, "@numWorkflowThreads", "_numWorkflowThreads");
  428. }
  429. }
  430. const char * QueryOptions::findProp(const IPropertyTree *ctx, const char *name1, const char *name2)
  431. {
  432. if (name1 && ctx->hasProp(name1))
  433. return name1;
  434. else if (name2 && ctx->hasProp(name2))
  435. return name2;
  436. else
  437. return NULL;
  438. }
  439. void QueryOptions::updateFromContextM(memsize_t &value, const IPropertyTree *ctx, const char *name1, const char *name2)
  440. {
  441. const char *name = findProp(ctx, name1, name2);
  442. if (name)
  443. value = (memsize_t) ctx->getPropInt64(name);
  444. }
  445. void QueryOptions::updateFromContext(int &value, const IPropertyTree *ctx, const char *name1, const char *name2)
  446. {
  447. const char *name = findProp(ctx, name1, name2);
  448. if (name)
  449. value = ctx->getPropInt(name);
  450. }
  451. void QueryOptions::updateFromContext(unsigned &value, const IPropertyTree *ctx, const char *name1, const char *name2)
  452. {
  453. const char *name = findProp(ctx, name1, name2);
  454. if (name)
  455. value = (unsigned) ctx->getPropInt(name);
  456. }
  457. void QueryOptions::updateFromContext(bool &value, const IPropertyTree *ctx, const char *name1, const char *name2)
  458. {
  459. const char *name = findProp(ctx, name1, name2);
  460. if (name)
  461. value = ctx->getPropBool(name);
  462. }
  463. void QueryOptions::setFromAgentLoggingFlags(unsigned loggingFlags)
  464. {
  465. // MORE - priority/timelimit ?
  466. checkingHeap = (loggingFlags & LOGGING_CHECKINGHEAP) != 0;
  467. timeActivities = (loggingFlags & LOGGING_TIMEACTIVITIES) != 0;
  468. }
  469. //----------------------------------------------------------------------------------------------
  470. // Class CQueryFactory is the main implementation of IQueryFactory, combining a IQueryDll and a
  471. // package context into an object that can quickly create a the query context that executes a specific
  472. // instance of a Roxie query.
  473. // Caching is used to ensure that only queries that are affected by a package change need to be reloaded.
  474. // Derived classes handle the differences between agent and server side factories
  475. //----------------------------------------------------------------------------------------------
  476. class CQueryFactory : implements IQueryFactory, implements IResourceContext, public CInterface
  477. {
  478. protected:
  479. const IRoxiePackage &package;
  480. Owned<const IQueryDll> dll;
  481. Linked<ISharedOnceContext> sharedOnceContext;
  482. MapStringToActivityArray graphMap;
  483. StringAttr id;
  484. StringBuffer errorMessage;
  485. MapIdToActivityFactory allActivities;
  486. QueryOptions options;
  487. bool dynamic;
  488. bool isSuspended;
  489. bool isLoadFailed;
  490. ClusterType targetClusterType;
  491. unsigned libraryInterfaceHash;
  492. hash64_t hashValue;
  493. static CriticalSection activeQueriesCrit;
  494. static CopyMapXToMyClass<hash64_t, hash64_t, CQueryFactory> activeQueries; // Active queries
  495. static CopyMapXToMyClass<hash64_t, hash64_t, CQueryFactory> queryCache; // Active and loading queries
  496. mutable CIArrayOf<TerminationCallbackInfo> callbacks;
  497. mutable CriticalSection callbacksCrit;
  498. public:
  499. static CriticalSection queryCacheCrit;
  500. protected:
  501. IRoxieServerActivityFactory *createActivityFactory(ThorActivityKind kind, unsigned subgraphId, IPropertyTree &node)
  502. {
  503. unsigned id = node.getPropInt("@id", 0);
  504. unsigned rid = id;
  505. if (isSuspended)
  506. return createRoxieServerDummyActivityFactory(id, subgraphId, *this, NULL, TAKnone, node, false); // Is there actually any point?
  507. switch (options.priority)
  508. {
  509. case 1:
  510. rid |= ROXIE_HIGH_PRIORITY;
  511. break;
  512. case 2:
  513. rid |= ROXIE_SLA_PRIORITY;
  514. break;
  515. }
  516. StringBuffer helperName;
  517. node.getProp("att[@name=\"helper\"]/@value", helperName);
  518. if (!helperName.length())
  519. helperName.append("fAc").append(id);
  520. HelperFactory *helperFactory = dll->getFactory(helperName);
  521. if (!helperFactory)
  522. throw MakeStringException(ROXIE_INTERNAL_ERROR, "Internal error: helper function %s not exported", helperName.str());
  523. RemoteActivityId remoteId(rid, hashValue);
  524. RemoteActivityId remoteId2(rid | ROXIE_ACTIVITY_FETCH, hashValue);
  525. switch (kind)
  526. {
  527. case TAKalljoin:
  528. case TAKalldenormalize:
  529. case TAKalldenormalizegroup:
  530. return createRoxieServerAllJoinActivityFactory(id, subgraphId, *this, helperFactory, kind, node);
  531. case TAKapply:
  532. return createRoxieServerApplyActivityFactory(id, subgraphId, *this, helperFactory, kind, node, isRootAction(node));
  533. case TAKaggregate:
  534. case TAKexistsaggregate: // could special case.
  535. case TAKcountaggregate:
  536. return createRoxieServerAggregateActivityFactory(id, subgraphId, *this, helperFactory, kind, node);
  537. case TAKcase:
  538. case TAKchildcase:
  539. return createRoxieServerCaseActivityFactory(id, subgraphId, *this, helperFactory, kind, node, isGraphIndependent(node));
  540. case TAKcatch:
  541. case TAKskipcatch:
  542. case TAKcreaterowcatch:
  543. return createRoxieServerCatchActivityFactory(id, subgraphId, *this, helperFactory, kind, node);
  544. case TAKchilditerator:
  545. return createRoxieServerChildIteratorActivityFactory(id, subgraphId, *this, helperFactory, kind, node);
  546. case TAKchoosesets:
  547. return createRoxieServerChooseSetsActivityFactory(id, subgraphId, *this, helperFactory, kind, node);
  548. case TAKchoosesetsenth:
  549. return createRoxieServerChooseSetsEnthActivityFactory(id, subgraphId, *this, helperFactory, kind, node);
  550. case TAKchoosesetslast:
  551. return createRoxieServerChooseSetsLastActivityFactory(id, subgraphId, *this, helperFactory, kind, node);
  552. case TAKproject:
  553. case TAKcountproject:
  554. return createRoxieServerProjectActivityFactory(id, subgraphId, *this, helperFactory, kind, node); // code is common between Project, CountProject
  555. case TAKfilterproject:
  556. return createRoxieServerFilterProjectActivityFactory(id, subgraphId, *this, helperFactory, kind, node);
  557. case TAKdatasetresult:
  558. case TAKrowresult:
  559. return createRoxieServerDatasetResultActivityFactory(id, subgraphId, *this, helperFactory, kind, node, isRootAction(node));
  560. case TAKdedup:
  561. return createRoxieServerDedupActivityFactory(id, subgraphId, *this, helperFactory, kind, node);
  562. case TAKdegroup:
  563. return createRoxieServerDegroupActivityFactory(id, subgraphId, *this, helperFactory, kind, node);
  564. case TAKcsvread:
  565. case TAKxmlread:
  566. case TAKjsonread:
  567. case TAKdiskread:
  568. {
  569. if (node.getPropBool("att[@name='_isSpill']/@value", false) || node.getPropBool("att[@name='_isSpillGlobal']/@value", false))
  570. return createRoxieServerSpillReadActivityFactory(id, subgraphId, *this, helperFactory, kind, node);
  571. else
  572. return createRoxieServerDiskReadActivityFactory(id, subgraphId, *this, helperFactory, kind, node, remoteId);
  573. }
  574. case TAKspillread:
  575. case TAKmemoryspillread:
  576. return createRoxieServerSpillReadActivityFactory(id, subgraphId, *this, helperFactory, kind, node);
  577. case TAKdisknormalize:
  578. case TAKdiskcount:
  579. case TAKdiskaggregate:
  580. case TAKdiskgroupaggregate:
  581. return createRoxieServerDiskReadActivityFactory(id, subgraphId, *this, helperFactory, kind, node, remoteId);
  582. case TAKchildnormalize:
  583. return createRoxieServerNewChildNormalizeActivityFactory(id, subgraphId, *this, helperFactory, kind, node);
  584. case TAKchildaggregate:
  585. return createRoxieServerNewChildAggregateActivityFactory(id, subgraphId, *this, helperFactory, kind, node);
  586. case TAKchildgroupaggregate:
  587. return createRoxieServerNewChildGroupAggregateActivityFactory(id, subgraphId, *this, helperFactory, kind, node);
  588. case TAKchildthroughnormalize:
  589. return createRoxieServerNewChildThroughNormalizeActivityFactory(id, subgraphId, *this, helperFactory, kind, node);
  590. case TAKcsvwrite:
  591. case TAKdiskwrite:
  592. case TAKxmlwrite:
  593. case TAKjsonwrite:
  594. case TAKmemoryspillwrite:
  595. case TAKspillwrite:
  596. return createRoxieServerDiskWriteActivityFactory(id, subgraphId, *this, helperFactory, kind, node);
  597. case TAKindexwrite:
  598. return createRoxieServerIndexWriteActivityFactory(id, subgraphId, *this, helperFactory, kind, node, isRootAction(node));
  599. case TAKenth:
  600. return createRoxieServerEnthActivityFactory(id, subgraphId, *this, helperFactory, kind, node);
  601. case TAKfetch:
  602. case TAKcsvfetch:
  603. case TAKxmlfetch:
  604. case TAKjsonfetch:
  605. return createRoxieServerFetchActivityFactory(id, subgraphId, *this, helperFactory, kind, node, remoteId);
  606. case TAKfilter:
  607. return createRoxieServerFilterActivityFactory(id, subgraphId, *this, helperFactory, kind, node);
  608. case TAKfiltergroup:
  609. return createRoxieServerFilterGroupActivityFactory(id, subgraphId, *this, helperFactory, kind, node);
  610. case TAKfirstn:
  611. return createRoxieServerFirstNActivityFactory(id, subgraphId, *this, helperFactory, kind, node);
  612. case TAKfunnel:
  613. return createRoxieServerConcatActivityFactory(id, subgraphId, *this, helperFactory, kind, node);
  614. case TAKgroup:
  615. return createRoxieServerGroupActivityFactory(id, subgraphId, *this, helperFactory, kind, node);
  616. case TAKhashaggregate:
  617. return createRoxieServerHashAggregateActivityFactory(id, subgraphId, *this, helperFactory, kind, node);
  618. case TAKif:
  619. case TAKchildif:
  620. return createRoxieServerIfActivityFactory(id, subgraphId, *this, helperFactory, kind, node, isGraphIndependent(node));
  621. case TAKifaction:
  622. return createRoxieServerIfActionActivityFactory(id, subgraphId, *this, helperFactory, kind, node, isRootAction(node));
  623. case TAKparallel:
  624. return createRoxieServerParallelActionActivityFactory(id, subgraphId, *this, helperFactory, kind, node, isRootAction(node));
  625. case TAKsequential:
  626. return createRoxieServerSequentialActionActivityFactory(id, subgraphId, *this, helperFactory, kind, node, isRootAction(node));
  627. case TAKindexread:
  628. return createRoxieServerIndexReadActivityFactory(id, subgraphId, *this, helperFactory, kind, node, remoteId);
  629. case TAKindexnormalize:
  630. return createRoxieServerIndexNormalizeActivityFactory(id, subgraphId, *this, helperFactory, kind, node, remoteId);
  631. case TAKindexcount:
  632. return createRoxieServerIndexCountActivityFactory(id, subgraphId, *this, helperFactory, kind, node, remoteId);
  633. case TAKindexaggregate:
  634. return createRoxieServerIndexAggregateActivityFactory(id, subgraphId, *this, helperFactory, kind, node, remoteId);
  635. case TAKindexgroupaggregate:
  636. case TAKindexgroupexists:
  637. case TAKindexgroupcount:
  638. return createRoxieServerIndexGroupAggregateActivityFactory(id, subgraphId, *this, helperFactory, kind, node, remoteId);
  639. case TAKhashdedup:
  640. return createRoxieServerHashDedupActivityFactory(id, subgraphId, *this, helperFactory, kind, node);
  641. case TAKhashdenormalize:
  642. case TAKhashdistribute:
  643. case TAKhashdistributemerge:
  644. case TAKhashjoin:
  645. throwUnexpected(); // Code generator should have removed or transformed
  646. case TAKiterate:
  647. return createRoxieServerIterateActivityFactory(id, subgraphId, *this, helperFactory, kind, node);
  648. case TAKprocess:
  649. return createRoxieServerProcessActivityFactory(id, subgraphId, *this, helperFactory, kind, node);
  650. case TAKjoin:
  651. case TAKjoinlight:
  652. case TAKdenormalize:
  653. case TAKdenormalizegroup:
  654. return createRoxieServerJoinActivityFactory(id, subgraphId, *this, helperFactory, kind, node);
  655. case TAKkeyeddistribute:
  656. throwUnexpected(); // Code generator should have removed or transformed
  657. case TAKkeyedjoin:
  658. case TAKkeyeddenormalize:
  659. case TAKkeyeddenormalizegroup:
  660. return createRoxieServerKeyedJoinActivityFactory(id, subgraphId, *this, helperFactory, kind, node, remoteId, remoteId2);
  661. case TAKlimit:
  662. return createRoxieServerLimitActivityFactory(id, subgraphId, *this, helperFactory, kind, node);
  663. case TAKlookupjoin:
  664. case TAKlookupdenormalize:
  665. case TAKlookupdenormalizegroup:
  666. case TAKsmartjoin:
  667. case TAKsmartdenormalize:
  668. case TAKsmartdenormalizegroup:
  669. return createRoxieServerLookupJoinActivityFactory(id, subgraphId, *this, helperFactory, kind, node);
  670. case TAKmerge:
  671. return createRoxieServerMergeActivityFactory(id, subgraphId, *this, helperFactory, kind, node);
  672. case TAKnormalize:
  673. return createRoxieServerNormalizeActivityFactory(id, subgraphId, *this, helperFactory, kind, node);
  674. case TAKnormalizechild:
  675. return createRoxieServerNormalizeChildActivityFactory(id, subgraphId, *this, helperFactory, kind, node);
  676. case TAKnormalizelinkedchild:
  677. return createRoxieServerNormalizeLinkedChildActivityFactory(id, subgraphId, *this, helperFactory, kind, node);
  678. case TAKnull:
  679. return createRoxieServerNullActivityFactory(id, subgraphId, *this, helperFactory, kind, node);
  680. case TAKsideeffect:
  681. return createRoxieServerSideEffectActivityFactory(id, subgraphId, *this, helperFactory, kind, node, isRootAction(node));
  682. case TAKsimpleaction:
  683. return createRoxieServerActionActivityFactory(id, subgraphId, *this, helperFactory, kind, node, usageCount(node), isRootAction(node));
  684. case TAKparse:
  685. return createRoxieServerParseActivityFactory(id, subgraphId, *this, helperFactory, kind, node, this);
  686. case TAKworkunitwrite:
  687. return createRoxieServerWorkUnitWriteActivityFactory(id, subgraphId, *this, helperFactory, kind, node, usageCount(node), isRootAction(node));
  688. case TAKdictionaryworkunitwrite:
  689. return createRoxieServerWorkUnitWriteDictActivityFactory(id, subgraphId, *this, helperFactory, kind, node, usageCount(node), isRootAction(node));
  690. case TAKpiperead:
  691. return createRoxieServerPipeReadActivityFactory(id, subgraphId, *this, helperFactory, kind, node);
  692. case TAKpipethrough:
  693. return createRoxieServerPipeThroughActivityFactory(id, subgraphId, *this, helperFactory, kind, node);
  694. case TAKpipewrite:
  695. return createRoxieServerPipeWriteActivityFactory(id, subgraphId, *this, helperFactory, kind, node, usageCount(node), isRootAction(node));
  696. case TAKpull:
  697. return createRoxieServerPullActivityFactory(id, subgraphId, *this, helperFactory, kind, node);
  698. case TAKtrace:
  699. return createRoxieServerTraceActivityFactory(id, subgraphId, *this, helperFactory, kind, node);
  700. case TAKlinkedrawiterator:
  701. return createRoxieServerLinkedRawIteratorActivityFactory(id, subgraphId, *this, helperFactory, kind, node);
  702. case TAKremoteresult:
  703. return createRoxieServerRemoteResultActivityFactory(id, subgraphId, *this, helperFactory, kind, node, usageCount(node), isRootAction(node));
  704. case TAKrollup:
  705. return createRoxieServerRollupActivityFactory(id, subgraphId, *this, helperFactory, kind, node);
  706. case TAKsample:
  707. return createRoxieServerSampleActivityFactory(id, subgraphId, *this, helperFactory, kind, node);
  708. case TAKselectn:
  709. return createRoxieServerSelectNActivityFactory(id, subgraphId, *this, helperFactory, kind, node);
  710. case TAKselfjoin:
  711. case TAKselfjoinlight:
  712. return createRoxieServerSelfJoinActivityFactory(id, subgraphId, *this, helperFactory, kind, node);
  713. case TAKskiplimit:
  714. case TAKcreaterowlimit:
  715. return createRoxieServerSkipLimitActivityFactory(id, subgraphId, *this, helperFactory, kind, node);
  716. case TAKhttp_rowdataset:
  717. case TAKsoap_rowdataset:
  718. return createRoxieServerSoapRowCallActivityFactory(id, subgraphId, *this, helperFactory, kind, node);
  719. case TAKsoap_rowaction:
  720. return createRoxieServerSoapRowActionActivityFactory(id, subgraphId, *this, helperFactory, kind, node, isRootAction(node));
  721. case TAKsoap_datasetdataset:
  722. return createRoxieServerSoapDatasetCallActivityFactory(id, subgraphId, *this, helperFactory, kind, node);
  723. case TAKsoap_datasetaction:
  724. return createRoxieServerSoapDatasetActionActivityFactory(id, subgraphId, *this, helperFactory, kind, node, isRootAction(node));
  725. case TAKsort:
  726. return createRoxieServerSortActivityFactory(id, subgraphId, *this, helperFactory, kind, node);
  727. case TAKspill:
  728. case TAKmemoryspillsplit:
  729. return createRoxieServerThroughSpillActivityFactory(id, subgraphId, *this, helperFactory, kind, node);
  730. case TAKsplit:
  731. return createRoxieServerSplitActivityFactory(id, subgraphId, *this, helperFactory, kind, node);
  732. case TAKstreamediterator:
  733. return createRoxieServerStreamedIteratorActivityFactory(id, subgraphId, *this, helperFactory, kind, node);
  734. case TAKinlinetable:
  735. return createRoxieServerInlineTableActivityFactory(id, subgraphId, *this, helperFactory, kind, node);
  736. case TAKthroughaggregate:
  737. throwUnexpected(); // Concept of through aggregates has been proven not to work in Roxie - codegen should not be creating them any more.
  738. case TAKtopn:
  739. return createRoxieServerTopNActivityFactory(id, subgraphId, *this, helperFactory, kind, node);
  740. case TAKworkunitread:
  741. return createRoxieServerWorkUnitReadActivityFactory(id, subgraphId, *this, helperFactory, kind, node);
  742. case TAKxmlparse:
  743. return createRoxieServerXmlParseActivityFactory(id, subgraphId, *this, helperFactory, kind, node);
  744. case TAKquantile:
  745. return createRoxieServerQuantileActivityFactory(id, subgraphId, *this, helperFactory, kind, node);
  746. case TAKregroup:
  747. return createRoxieServerRegroupActivityFactory(id, subgraphId, *this, helperFactory, kind, node);
  748. case TAKcombine:
  749. return createRoxieServerCombineActivityFactory(id, subgraphId, *this, helperFactory, kind, node);
  750. case TAKcombinegroup:
  751. return createRoxieServerCombineGroupActivityFactory(id, subgraphId, *this, helperFactory, kind, node);
  752. case TAKrollupgroup:
  753. return createRoxieServerRollupGroupActivityFactory(id, subgraphId, *this, helperFactory, kind, node);
  754. case TAKlocalresultread:
  755. {
  756. unsigned graphId = getGraphId(node);
  757. return createRoxieServerLocalResultReadActivityFactory(id, subgraphId, *this, helperFactory, kind, node, graphId);
  758. }
  759. case TAKlocalstreamread:
  760. return createRoxieServerLocalResultStreamReadActivityFactory(id, subgraphId, *this, helperFactory, kind, node);
  761. case TAKlocalresultwrite:
  762. {
  763. unsigned graphId = getGraphId(node);
  764. return createRoxieServerLocalResultWriteActivityFactory(id, subgraphId, *this, helperFactory, kind, node, usageCount(node), graphId, isRootAction(node));
  765. }
  766. case TAKdictionaryresultwrite:
  767. {
  768. unsigned graphId = getGraphId(node);
  769. return createRoxieServerDictionaryResultWriteActivityFactory(id, subgraphId, *this, helperFactory, kind, node, usageCount(node), graphId, isRootAction(node));
  770. }
  771. case TAKloopcount:
  772. case TAKlooprow:
  773. case TAKloopdataset:
  774. {
  775. unsigned loopId = node.getPropInt("att[@name=\"_loopid\"]/@value", 0);
  776. return createRoxieServerLoopActivityFactory(id, subgraphId, *this, helperFactory, kind, node, loopId);
  777. }
  778. case TAKremotegraph:
  779. return createRoxieServerRemoteActivityFactory(id, subgraphId, *this, helperFactory, kind, node, remoteId, isRootAction(node));
  780. case TAKgraphloopresultread:
  781. {
  782. unsigned graphId = getGraphId(node);
  783. return createRoxieServerGraphLoopResultReadActivityFactory(id, subgraphId, *this, helperFactory, kind, node, graphId);
  784. }
  785. case TAKgraphloopresultwrite:
  786. {
  787. unsigned graphId = getGraphId(node);
  788. return createRoxieServerGraphLoopResultWriteActivityFactory(id, subgraphId, *this, helperFactory, kind, node, usageCount(node), graphId);
  789. }
  790. case TAKnwaygraphloopresultread:
  791. {
  792. unsigned graphId = node.getPropInt("att[@name=\"_graphId\"]/@value", 0);
  793. return createRoxieServerNWayGraphLoopResultReadActivityFactory(id, subgraphId, *this, helperFactory, kind, node, graphId);
  794. }
  795. case TAKnwayinput:
  796. return createRoxieServerNWayInputActivityFactory(id, subgraphId, *this, helperFactory, kind, node);
  797. case TAKnwaymerge:
  798. return createRoxieServerNWayMergeActivityFactory(id, subgraphId, *this, helperFactory, kind, node);
  799. case TAKnwaymergejoin:
  800. case TAKnwayjoin:
  801. return createRoxieServerNWayMergeJoinActivityFactory(id, subgraphId, *this, helperFactory, kind, node);
  802. case TAKsorted:
  803. return createRoxieServerSortedActivityFactory(id, subgraphId, *this, helperFactory, kind, node);
  804. case TAKgraphloop:
  805. case TAKparallelgraphloop:
  806. {
  807. unsigned loopId = node.getPropInt("att[@name=\"_loopid\"]/@value", 0);
  808. return createRoxieServerGraphLoopActivityFactory(id, subgraphId, *this, helperFactory, kind, node, loopId);
  809. }
  810. case TAKlibrarycall:
  811. {
  812. LibraryCallFactoryExtra extra;
  813. extra.maxOutputs = node.getPropInt("att[@name=\"_maxOutputs\"]/@value", 0);
  814. extra.graphid = node.getPropInt("att[@name=\"_libraryGraphId\"]/@value", 0);
  815. extra.libraryName.set(node.queryProp("att[@name=\"libname\"]/@value"));
  816. extra.interfaceHash = node.getPropInt("att[@name=\"_interfaceHash\"]/@value", 0);
  817. extra.embedded = node.getPropBool("att[@name=\"embedded\"]/@value", false) ;
  818. if (extra.embedded)
  819. {
  820. extra.embeddedGraphName.set(node.queryProp("att[@name=\"graph\"]/@value"));
  821. if (!extra.embeddedGraphName)
  822. extra.embeddedGraphName.set(extra.libraryName);
  823. }
  824. Owned<IPropertyTreeIterator> iter = node.getElements("att[@name=\"_outputUsed\"]");
  825. ForEach(*iter)
  826. extra.outputs.append(iter->query().getPropInt("@value"));
  827. return createRoxieServerLibraryCallActivityFactory(id, subgraphId, *this, helperFactory, kind, node, extra);
  828. }
  829. case TAKnwayselect:
  830. return createRoxieServerNWaySelectActivityFactory(id, subgraphId, *this, helperFactory, kind, node);
  831. case TAKnonempty:
  832. return createRoxieServerNonEmptyActivityFactory(id, subgraphId, *this, helperFactory, kind, node);
  833. case TAKprefetchproject:
  834. return createRoxieServerPrefetchProjectActivityFactory(id, subgraphId, *this, helperFactory, kind, node);
  835. case TAKwhen_dataset:
  836. return createRoxieServerWhenActivityFactory(id, subgraphId, *this, helperFactory, kind, node);
  837. case TAKwhen_action:
  838. return createRoxieServerWhenActionActivityFactory(id, subgraphId, *this, helperFactory, kind, node, isRootAction(node));
  839. case TAKdistribution:
  840. return createRoxieServerDistributionActivityFactory(id, subgraphId, *this, helperFactory, kind, node, isRootAction(node));
  841. case TAKexternalprocess:
  842. case TAKexternalsink:
  843. case TAKexternalsource:
  844. return createRoxieServerExternalActivityFactory(id, subgraphId, *this, helperFactory, kind, node, isRootAction(node));
  845. // These are not required in Roxie for the time being - code generator should trap them
  846. case TAKchilddataset:
  847. default:
  848. throw MakeStringException(ROXIE_UNIMPLEMENTED_ERROR, "Unimplemented activity %s required", getActivityText(kind));
  849. break;
  850. }
  851. throwUnexpected(); // unreachable, but some compilers will complain about missing return
  852. }
  853. IActivityFactory *findActivity(unsigned id) const
  854. {
  855. if (id)
  856. {
  857. IActivityFactory **f = allActivities.getValue(id);
  858. if (f)
  859. return *f;
  860. }
  861. return NULL;
  862. }
  863. virtual IRoxieServerActivityFactory *getRoxieServerActivityFactory(unsigned id) const override
  864. {
  865. checkSuspended();
  866. return LINK(QUERYINTERFACE(findActivity(id), IRoxieServerActivityFactory));
  867. }
  868. virtual IAgentActivityFactory *getAgentActivityFactory(unsigned id) const override
  869. {
  870. checkSuspended();
  871. IActivityFactory *f = findActivity(id);
  872. return LINK(QUERYINTERFACE(f, IAgentActivityFactory)); // MORE - don't dynamic cast yuk
  873. }
  874. ActivityArray *loadChildGraph(IPropertyTree &graph)
  875. {
  876. // MORE - this is starting to look very much like loadGraph (on Roxie server side)
  877. ActivityArray *activities = new ActivityArray(true, graph.getPropBool("@delayed"), graph.getPropBool("@library"), graph.getPropBool("@sequential"), graph.getPropBool("@wfid"));
  878. unsigned subgraphId = graph.getPropInt("@id");
  879. try
  880. {
  881. Owned<IPropertyTreeIterator> nodes = graph.getElements("node");
  882. ForEach(*nodes)
  883. {
  884. IPropertyTree &node = nodes->query();
  885. loadNode(node, subgraphId, activities);
  886. }
  887. Owned<IPropertyTreeIterator> edges = graph.getElements("edge");
  888. ForEach(*edges)
  889. {
  890. IPropertyTree &edge = edges->query();
  891. unsigned source = activities->findActivityIndex(edge.getPropInt("@source",0));
  892. unsigned target = activities->findActivityIndex(edge.getPropInt("@target",0));
  893. unsigned sourceOutput = edge.getPropInt("att[@name=\"_sourceIndex\"]/@value", 0);
  894. unsigned targetInput = edge.getPropInt("att[@name=\"_targetIndex\"]/@value", 0);
  895. int controlId = edge.getPropInt("att[@name=\"_when\"]/@value", 0);
  896. if (controlId != 0)
  897. {
  898. const char * edgeId = edge.queryProp("@id");
  899. addDependency(sourceOutput, source, target, controlId, edgeId, activities);
  900. }
  901. else
  902. activities->serverItem(target).setInput(targetInput, source, sourceOutput);
  903. }
  904. }
  905. catch (...)
  906. {
  907. ::Release(activities);
  908. allActivities.kill();
  909. throw;
  910. }
  911. return activities;
  912. }
  913. void loadNode(IPropertyTree &node, unsigned subgraphId, ActivityArray *activities)
  914. {
  915. ThorActivityKind kind = getActivityKind(node);
  916. if (kind==TAKsubgraph)
  917. {
  918. IPropertyTree * childGraphNode = node.queryPropTree("att/graph");
  919. if (childGraphNode->getPropBool("@child"))
  920. {
  921. loadSubgraph(node, activities);
  922. }
  923. else
  924. {
  925. unsigned parentId = findParentId(node);
  926. assertex(parentId);
  927. unsigned parentIdx = activities->findActivityIndex(parentId);
  928. IActivityFactory &parentFactory = activities->item(parentIdx);
  929. ActivityArray *childQuery = loadChildGraph(*childGraphNode);
  930. parentFactory.addChildQuery(node.getPropInt("@id"), childQuery);
  931. }
  932. }
  933. else if (kind)
  934. {
  935. IRoxieServerActivityFactory *f = createActivityFactory(kind, subgraphId, node);
  936. if (f)
  937. {
  938. activities->append(*f);
  939. allActivities.setValue(f->queryId(), f);
  940. }
  941. }
  942. }
  943. void loadSubgraph(IPropertyTree &graph, ActivityArray *activities)
  944. {
  945. unsigned subgraphId = graph.getPropInt("@id");
  946. Owned<IPropertyTreeIterator> nodes = graph.getElements("att/graph/node");
  947. ForEach(*nodes)
  948. {
  949. IPropertyTree &node = nodes->query();
  950. loadNode(node, subgraphId, activities);
  951. }
  952. if (!isSuspended)
  953. {
  954. Owned<IPropertyTreeIterator> edges = graph.getElements("att/graph/edge");
  955. ForEach(*edges)
  956. {
  957. IPropertyTree &edge = edges->query();
  958. //Ignore edges that represent dependencies from parent activities to child activities.
  959. if (edge.getPropInt("att[@name=\"_childGraph\"]/@value", 0))
  960. continue;
  961. unsigned sourceActivity = edge.getPropInt("@source", 0);
  962. unsigned targetActivity = edge.getPropInt("@target", 0);
  963. unsigned source = activities->findActivityIndex(sourceActivity);
  964. unsigned target = activities->recursiveFindActivityIndex(targetActivity);
  965. unsigned sourceOutput = edge.getPropInt("att[@name=\"_sourceIndex\"]/@value", 0);
  966. unsigned targetInput = edge.getPropInt("att[@name=\"_targetIndex\"]/@value", 0);
  967. int controlId = edge.getPropInt("att[@name=\"_when\"]/@value", 0);
  968. if (controlId != 0)
  969. {
  970. const char * edgeId = edge.queryProp("@id");
  971. addDependency(sourceOutput, sourceActivity, targetActivity, controlId, edgeId, activities);
  972. }
  973. else
  974. activities->serverItem(target).setInput(targetInput, source, sourceOutput);
  975. }
  976. }
  977. }
  978. // loadGraph loads outer level graph. This is virtual as agent is very different from Roxie server
  979. virtual ActivityArray *loadGraph(IPropertyTree &graph, const char *graphName) = 0;
  980. bool doAddDependency(unsigned sourceIdx, unsigned sourceId, unsigned targetId, int controlId, const char *edgeId, ActivityArray * activities)
  981. {
  982. // Note - the dependency is recorded with the target being the parent activity that is at the same level as the source
  983. // (recording it on the child that was actually dependent would mean it happened too late)
  984. unsigned source = activities->findActivityIndex(sourceId);
  985. if (source != NotFound)
  986. {
  987. unsigned target = activities->recursiveFindActivityIndex(targetId);
  988. activities->serverItem(target).addDependency(source, activities->serverItem(source).getKind(), sourceIdx, controlId, edgeId);
  989. activities->serverItem(source).noteDependent(target);
  990. return true;
  991. }
  992. ForEachItemIn(idx, *activities)
  993. {
  994. IActivityFactory & cur = activities->item(idx);
  995. unsigned childId;
  996. for (unsigned childIdx = 0;;childIdx++)
  997. {
  998. ActivityArray * children = cur.queryChildQuery(childIdx, childId);
  999. if (!children)
  1000. break;
  1001. if (doAddDependency(sourceIdx, sourceId, targetId, controlId, edgeId, children))
  1002. return true;
  1003. }
  1004. }
  1005. return false;
  1006. }
  1007. virtual void addDependency(unsigned sourceIdx, unsigned sourceId, unsigned targetId, int controlId, const char *edgeId, ActivityArray * activities)
  1008. {
  1009. doAddDependency(sourceIdx, sourceId, targetId, controlId, edgeId, activities);
  1010. }
  1011. void addDependencies(IPropertyTree &graph, ActivityArray *activities)
  1012. {
  1013. Owned<IPropertyTreeIterator> dependencies = graph.getElements("edge");
  1014. ForEach(*dependencies)
  1015. {
  1016. IPropertyTree &edge = dependencies->query();
  1017. //Ignore edges that represent dependencies from parent activities to child activities.
  1018. if (!edge.getPropInt("att[@name=\"_childGraph\"]/@value", 0))
  1019. {
  1020. unsigned sourceIdx = edge.getPropInt("att[@name=\"_sourceIndex\"]/@value", 0);
  1021. int controlId = edge.getPropInt("att[@name=\"_when\"]/@value", 0);
  1022. addDependency(sourceIdx, edge.getPropInt("att[@name=\"_sourceActivity\"]/@value", 0), edge.getPropInt("att[@name=\"_targetActivity\"]/@value", 0), controlId, edge.queryProp("@id"), activities);
  1023. }
  1024. }
  1025. }
  1026. public:
  1027. IMPLEMENT_IINTERFACE;
  1028. unsigned channelNo;
  1029. CQueryFactory(const char *_id, const IQueryDll *_dll, const IRoxiePackage &_package, hash64_t _hashValue, unsigned _channelNo, ISharedOnceContext *_sharedOnceContext, bool _dynamic)
  1030. : package(_package), dll(_dll), sharedOnceContext(_sharedOnceContext), id(_id), dynamic(_dynamic), hashValue(_hashValue), channelNo(_channelNo)
  1031. {
  1032. package.Link();
  1033. targetClusterType = RoxieCluster;
  1034. isSuspended = false;
  1035. isLoadFailed = false;
  1036. libraryInterfaceHash = 0;
  1037. options.enableFieldTranslation = package.getEnableFieldTranslation(); // NOTE - can be overridden by wu settings
  1038. options.allSortsMaySpill = dynamic;
  1039. addToCache();
  1040. }
  1041. ~CQueryFactory()
  1042. {
  1043. HashIterator graphs(graphMap);
  1044. for(graphs.first();graphs.isValid();graphs.next())
  1045. {
  1046. ActivityArray *a = *graphMap.mapToValue(&graphs.query());
  1047. a->Release();
  1048. }
  1049. package.Release();
  1050. }
  1051. private:
  1052. std::once_flag started;
  1053. Owned<IException> e;
  1054. public:
  1055. void init(const IPropertyTree *stateInfo)
  1056. {
  1057. std::call_once(started, [this, stateInfo]()
  1058. {
  1059. try
  1060. {
  1061. load(stateInfo);
  1062. if (sharedOnceContext && preloadOnceData)
  1063. {
  1064. Owned<StringContextLogger> logctx = new StringContextLogger(id); // NB may get linked by the onceContext
  1065. sharedOnceContext->checkOnceDone(this, *logctx);
  1066. }
  1067. addToMap(); // Publishes for agents to see
  1068. }
  1069. catch (IException *E)
  1070. {
  1071. EXCLOG(E);
  1072. e.setown(E);
  1073. }
  1074. });
  1075. if (e)
  1076. throw e.getLink();
  1077. }
  1078. virtual IQueryFactory *lookupLibrary(const char *libraryName, unsigned expectedInterfaceHash, const IRoxieContextLogger &logctx) const override
  1079. {
  1080. return globalPackageSetManager->lookupLibrary(libraryName, expectedInterfaceHash, logctx);
  1081. }
  1082. virtual void beforeDispose() override
  1083. {
  1084. // NOTE: it's theoretically possible for the final release to happen after a replacement has been inserted into hash table.
  1085. // So only remove from hash table if what we find there matches the item that is being deleted.
  1086. hash64_t hv = rtlHash64Data(sizeof(channelNo), &channelNo, hashValue);
  1087. {
  1088. CriticalBlock b(queryCacheCrit);
  1089. CQueryFactory *goer = queryCache.getValue(hv);
  1090. if (goer == this)
  1091. queryCache.remove(hv);
  1092. }
  1093. {
  1094. CriticalBlock b(activeQueriesCrit);
  1095. CQueryFactory *goer = activeQueries.getValue(hv);
  1096. if (goer == this)
  1097. activeQueries.remove(hv);
  1098. }
  1099. }
  1100. // There are two very similar-looking maps of queries - they have slightly different lifetimes and characteristics
  1101. // One has fully-constructed queries suitable for use responding to a agent request.
  1102. // The other has potentially partially-constructed queries, and is used for ensuring we only build them once
  1103. // while allowing for parallelizing package loads.
  1104. static CQueryFactory *getQueryFactory(hash64_t hashValue, unsigned channelNo)
  1105. {
  1106. hash64_t hv = rtlHash64Data(sizeof(channelNo), &channelNo, hashValue);
  1107. CriticalBlock b(activeQueriesCrit);
  1108. CQueryFactory *factory = activeQueries.getValue(hv);
  1109. if (factory && factory->isAliveAndLink())
  1110. return factory;
  1111. else
  1112. return NULL;
  1113. }
  1114. void addToMap()
  1115. {
  1116. hash64_t hv = rtlHash64Data(sizeof(channelNo), &channelNo, hashValue);
  1117. CriticalBlock b(activeQueriesCrit);
  1118. activeQueries.setValue(hv, this);
  1119. }
  1120. static CQueryFactory *getCachedQuery(hash64_t hashValue, unsigned channelNo)
  1121. {
  1122. // NOTE - this must be called within an allQueriesCrit block
  1123. queryCacheCrit.assertLocked();
  1124. hash64_t hv = rtlHash64Data(sizeof(channelNo), &channelNo, hashValue);
  1125. CQueryFactory *factory = queryCache.getValue(hv);
  1126. if (factory && factory->isAliveAndLink())
  1127. return factory;
  1128. else
  1129. return nullptr;
  1130. }
  1131. void addToCache()
  1132. {
  1133. // NOTE - this must be called within an allQueriesCrit block
  1134. queryCacheCrit.assertLocked();
  1135. hash64_t hv = rtlHash64Data(sizeof(channelNo), &channelNo, hashValue);
  1136. queryCache.setValue(hv, this);
  1137. }
  1138. static hash64_t getQueryHash(const char *id, const IQueryDll *dll, const IRoxiePackage &package, const IPropertyTree *stateInfo, IArrayOf<IResolvedFile> &files, bool isDynamic)
  1139. {
  1140. hash64_t hashValue = package.queryHash();
  1141. if (traceLevel > 8)
  1142. DBGLOG("getQueryHash: %s %" I64F "u from package", id, hashValue);
  1143. if (dll)
  1144. {
  1145. hashValue = rtlHash64VStr(dll->queryDll()->queryName(), hashValue);
  1146. if (traceLevel > 8)
  1147. DBGLOG("getQueryHash: %s %" I64F "u from dll", id, hashValue);
  1148. if (!lockSuperFiles && !allFilesDynamic && !isDynamic && !package.isCompulsory())
  1149. {
  1150. IConstWorkUnit *wu = dll->queryWorkUnit();
  1151. if (wu) // wu may be null in some unit test cases
  1152. {
  1153. SCMStringBuffer bStr;
  1154. // Don't want to include files referenced in thor graphs... in practice isDynamic also likely to be set in such cases
  1155. if (getClusterType(wu->getDebugValue("targetClusterType", bStr).str(), RoxieCluster) == RoxieCluster)
  1156. {
  1157. Owned<IConstWUGraphIterator> graphs = &wu->getGraphs(GraphTypeActivities);
  1158. ForEach(*graphs)
  1159. {
  1160. Owned<IPropertyTree> graphXgmml = graphs->query().getXGMMLTree(false);
  1161. Owned<IPropertyTreeIterator> nodes = graphXgmml->getElements(".//node");
  1162. ForEach(*nodes)
  1163. {
  1164. IPropertyTree &node = nodes->query();
  1165. ThorActivityKind kind = getActivityKind(node);
  1166. if (kind != TAKdiskwrite && kind != TAKspillwrite && kind != TAKindexwrite && kind != TAKpiperead && kind != TAKpipewrite)
  1167. {
  1168. const char *fileName = queryNodeFileName(node, kind);
  1169. const char *indexName = queryNodeIndexName(node, kind);
  1170. // What about packages that resolve everything without dali?
  1171. if (indexName)
  1172. {
  1173. bool isOpt = pretendAllOpt || node.getPropBool("att[@name='_isIndexOpt']/@value");
  1174. bool isCodeSigned = isActivityCodeSigned(node);
  1175. const IResolvedFile *indexFile = package.lookupFileName(indexName, isOpt, true, true, wu, true, isCodeSigned);
  1176. if (indexFile)
  1177. {
  1178. hashValue = indexFile->addHash64(hashValue);
  1179. if (traceLevel > 8)
  1180. DBGLOG("getQueryHash: %s %" I64F "u from index %s", id, hashValue, indexName);
  1181. files.append(*const_cast<IResolvedFile *>(indexFile));
  1182. }
  1183. }
  1184. if (fileName)
  1185. {
  1186. if (!node.getPropBool("att[@name='_isSpill']/@value") && !node.getPropBool("att[@name='_isSpillGlobal']/@value"))
  1187. {
  1188. bool isCodeSigned = isActivityCodeSigned(node);
  1189. bool isOpt = pretendAllOpt || node.getPropBool("att[@name='_isOpt']/@value");
  1190. const IResolvedFile *dataFile = package.lookupFileName(fileName, isOpt, true, true, wu, true, isCodeSigned);
  1191. if (dataFile)
  1192. {
  1193. hashValue = dataFile->addHash64(hashValue);
  1194. if (traceLevel > 8)
  1195. DBGLOG("getQueryHash: %s %" I64F "u from index %s", id, hashValue, fileName);
  1196. files.append(*const_cast<IResolvedFile *>(dataFile));
  1197. }
  1198. }
  1199. }
  1200. }
  1201. }
  1202. }
  1203. }
  1204. }
  1205. }
  1206. }
  1207. if (id)
  1208. hashValue = rtlHash64VStr(id, hashValue);
  1209. hashValue = rtlHash64VStr("Roxie", hashValue); // Adds some noise into the hash - otherwise adjacent wuids tend to hash very close together
  1210. if (traceLevel > 8)
  1211. DBGLOG("getQueryHash: %s %" I64F "u from id", id, hashValue);
  1212. if (stateInfo)
  1213. {
  1214. StringBuffer xml;
  1215. toXML(stateInfo, xml);
  1216. hashValue = rtlHash64Data(xml.length(), xml.str(), hashValue);
  1217. if (traceLevel > 8)
  1218. DBGLOG("getQueryHash: %s %" I64F "u from stateInfo", id, hashValue);
  1219. }
  1220. if (traceLevel > 8)
  1221. DBGLOG("getQueryHash: %s %" I64F "u", id, hashValue);
  1222. return hashValue;
  1223. }
  1224. virtual void load(const IPropertyTree *stateInfo)
  1225. {
  1226. if (!dll)
  1227. return;
  1228. IConstWorkUnit *wu = dll->queryWorkUnit();
  1229. if (wu) // wu and dll may be null in some unit test cases
  1230. {
  1231. libraryInterfaceHash = wu->getApplicationValueInt("LibraryModule", "interfaceHash", 0);
  1232. options.setFromWorkUnit(*wu, stateInfo);
  1233. SCMStringBuffer bStr;
  1234. targetClusterType = getClusterType(wu->getDebugValue("targetClusterType", bStr).str(), RoxieCluster);
  1235. // NOTE: stateinfo overrides package info
  1236. if (stateInfo)
  1237. {
  1238. // info in querySets can override the defaults from workunit for some limits
  1239. isSuspended = stateInfo->getPropBool("@suspended", false);
  1240. }
  1241. if (targetClusterType == RoxieCluster)
  1242. {
  1243. Owned<IConstWUGraphIterator> graphs = &wu->getGraphs(GraphTypeActivities);
  1244. SCMStringBuffer graphNameStr;
  1245. ForEach(*graphs)
  1246. {
  1247. graphs->query().getName(graphNameStr);
  1248. const char *graphName = graphNameStr.s.str();
  1249. Owned<IPropertyTree> graphXgmml = graphs->query().getXGMMLTree(false);
  1250. try
  1251. {
  1252. ActivityArray *activities = loadGraph(*graphXgmml, graphName);
  1253. graphMap.setValue(graphName, activities);
  1254. }
  1255. catch (IException *E)
  1256. {
  1257. StringBuffer m;
  1258. E->errorMessage(m);
  1259. suspend(m.str());
  1260. OERRLOG("Query %s suspended: %s", id.get(), m.str());
  1261. E->Release();
  1262. }
  1263. }
  1264. }
  1265. }
  1266. }
  1267. virtual unsigned queryChannel() const override
  1268. {
  1269. return channelNo;
  1270. }
  1271. virtual hash64_t queryHash() const override
  1272. {
  1273. return hashValue;
  1274. }
  1275. virtual ISharedOnceContext *querySharedOnceContext() const override
  1276. {
  1277. return sharedOnceContext;
  1278. }
  1279. virtual IDeserializedResultStore &queryOnceResultStore() const override
  1280. {
  1281. assertex(sharedOnceContext);
  1282. return sharedOnceContext->queryOnceResultStore();
  1283. }
  1284. virtual IPropertyTree &queryOnceContext(const IRoxieContextLogger &logctx) const override
  1285. {
  1286. assertex(sharedOnceContext);
  1287. return sharedOnceContext->queryOnceContext(this, logctx);
  1288. }
  1289. virtual const char *loadResource(unsigned id)
  1290. {
  1291. return (const char *) queryDll()->getResource(id);
  1292. }
  1293. virtual ActivityArray *lookupGraphActivities(const char *name) const override
  1294. {
  1295. return *graphMap.getValue(name);
  1296. }
  1297. virtual IActivityGraph *lookupGraph(IRoxieAgentContext *ctx, const char *name, IProbeManager *probeManager, const IRoxieContextLogger &logctx, IRoxieServerActivity *parentActivity) const override
  1298. {
  1299. assertex(name && *name);
  1300. ActivityArrayPtr *graph = graphMap.getValue(name);
  1301. assertex(graph);
  1302. Owned<IActivityGraph> ret = ::createActivityGraph(ctx, name, 0, **graph, parentActivity, probeManager, logctx, 1);
  1303. return ret.getClear();
  1304. }
  1305. void getGraphStats(StringBuffer &reply, const IPropertyTree &thisGraph) const
  1306. {
  1307. Owned<IPropertyTree> graph = createPTreeFromIPT(&thisGraph, ipt_lowmem);
  1308. Owned<IPropertyTreeIterator> edges = graph->getElements(".//edge");
  1309. ForEach(*edges)
  1310. {
  1311. IPropertyTree &edge = edges->query();
  1312. IActivityFactory *a = findActivity(edge.getPropInt("@source", 0));
  1313. if (!a)
  1314. a = findActivity(edge.getPropInt("att[@name=\"_sourceActivity\"]/@value", 0));
  1315. if (a)
  1316. {
  1317. unsigned sourceOutput = edge.getPropInt("att[@name=\"_sourceIndex\"]/@value", 0);
  1318. a->getEdgeProgressInfo(sourceOutput, edge);
  1319. }
  1320. }
  1321. Owned<IPropertyTreeIterator> nodes = graph->getElements(".//node");
  1322. ForEach(*nodes)
  1323. {
  1324. IPropertyTree &node = nodes->query();
  1325. IActivityFactory *a = findActivity(node.getPropInt("@id", 0));
  1326. if (a)
  1327. a->getNodeProgressInfo(node);
  1328. }
  1329. toXML(graph, reply);
  1330. }
  1331. virtual IPropertyTree* cloneQueryXGMML() const override
  1332. {
  1333. assertex(dll && dll->queryWorkUnit());
  1334. Owned<IPropertyTree> tree = createPTree("Query", ipt_lowmem);
  1335. Owned<IConstWUGraphIterator> graphs = &dll->queryWorkUnit()->getGraphs(GraphTypeActivities);
  1336. SCMStringBuffer graphNameStr;
  1337. ForEach(*graphs)
  1338. {
  1339. graphs->query().getName(graphNameStr);
  1340. const char *graphName = graphNameStr.s.str();
  1341. Owned<IPropertyTree> graphXgmml = graphs->query().getXGMMLTree(false);
  1342. IPropertyTree *newGraph = tree->addPropTree("Graph");
  1343. newGraph->setProp("@id", graphName);
  1344. newGraph->addPropTree("xgmml")->addPropTree("graph", graphXgmml.getLink());
  1345. }
  1346. return tree.getClear();
  1347. }
  1348. virtual void getStats(StringBuffer &reply, const char *graphName) const override
  1349. {
  1350. if (dll)
  1351. {
  1352. assertex(dll->queryWorkUnit());
  1353. Owned<IConstWUGraphIterator> graphs = &dll->queryWorkUnit()->getGraphs(GraphTypeActivities);
  1354. SCMStringBuffer thisGraphNameStr;
  1355. ForEach(*graphs)
  1356. {
  1357. graphs->query().getName(thisGraphNameStr);
  1358. if (graphName)
  1359. {
  1360. if (thisGraphNameStr.length() && (stricmp(graphName, thisGraphNameStr.s.str()) != 0))
  1361. continue; // not interested in this one
  1362. }
  1363. reply.appendf("<Graph id='%s'><xgmml>", thisGraphNameStr.s.str());
  1364. Owned<IPropertyTree> graphXgmml = graphs->query().getXGMMLTree(false);
  1365. getGraphStats(reply, *graphXgmml);
  1366. reply.append("</xgmml></Graph>");
  1367. }
  1368. }
  1369. }
  1370. virtual void getActivityMetrics(StringBuffer &reply) const override
  1371. {
  1372. HashIterator i(allActivities);
  1373. StringBuffer myReply;
  1374. ForEach(i)
  1375. {
  1376. IActivityFactory *f = *allActivities.mapToValue(&i.query());
  1377. f->getActivityMetrics(myReply.clear());
  1378. if (myReply.length())
  1379. {
  1380. reply.appendf(" <activity query='%s' id='%d' channel='%d'\n", queryQueryName(), f->queryId(), queryChannel());
  1381. reply.append(myReply);
  1382. reply.append(" </activity>\n");
  1383. }
  1384. }
  1385. }
  1386. virtual void getQueryInfo(StringBuffer &reply, bool full, IArrayOf<IQueryFactory> *agentQueries, const IRoxieContextLogger &logctx) const override
  1387. {
  1388. Owned<IPropertyTree> xref = createPTree("Query", ipt_fast);
  1389. xref->setProp("@id", id);
  1390. if (suspended())
  1391. {
  1392. xref->setPropBool("@suspended", true);
  1393. xref->setProp("@error", errorMessage);
  1394. }
  1395. if (full)
  1396. {
  1397. HashIterator i(allActivities);
  1398. ForEach(i)
  1399. {
  1400. IActivityFactory *f = *allActivities.mapToValue(&i.query());
  1401. f->getXrefInfo(*xref, logctx);
  1402. }
  1403. }
  1404. if (agentQueries)
  1405. {
  1406. ForEachItemIn(idx, *agentQueries)
  1407. {
  1408. if (agentQueries->item(idx).suspended())
  1409. {
  1410. xref->setPropBool("@suspended", true);
  1411. xref->setPropBool("@agentSuspended", true);
  1412. xref->setPropBool("@slaveSuspended", true); // legacy name
  1413. }
  1414. }
  1415. }
  1416. toXML(xref, reply, 1, XML_Embed|XML_LineBreak|XML_SortTags);
  1417. }
  1418. virtual void resetQueryTimings() override
  1419. {
  1420. HashIterator i(allActivities);
  1421. ForEach(i)
  1422. {
  1423. IActivityFactory *f = *allActivities.mapToValue(&i.query());
  1424. f->resetNodeProgressInfo();
  1425. }
  1426. }
  1427. virtual const char *queryErrorMessage() const override
  1428. {
  1429. return errorMessage.str();
  1430. }
  1431. virtual const char *queryQueryName() const override
  1432. {
  1433. return id;
  1434. }
  1435. virtual bool isQueryLibrary() const override
  1436. {
  1437. return libraryInterfaceHash != 0;
  1438. }
  1439. virtual unsigned getQueryLibraryInterfaceHash() const override
  1440. {
  1441. return libraryInterfaceHash;
  1442. }
  1443. virtual void suspend(const char* errMsg) override
  1444. {
  1445. isSuspended = true;
  1446. isLoadFailed = true;
  1447. errorMessage.append(errMsg);
  1448. }
  1449. virtual bool loadFailed() const override
  1450. {
  1451. return isLoadFailed;
  1452. }
  1453. virtual bool suspended() const override
  1454. {
  1455. return isSuspended;
  1456. }
  1457. virtual const QueryOptions &queryOptions() const override
  1458. {
  1459. return options;
  1460. }
  1461. virtual ILoadedDllEntry *queryDll() const override
  1462. {
  1463. assertex(dll);
  1464. return dll->queryDll();
  1465. }
  1466. virtual IConstWorkUnit *queryWorkUnit() const override
  1467. {
  1468. assertex(dll);
  1469. return dll->queryWorkUnit();
  1470. }
  1471. virtual const IRoxiePackage &queryPackage() const override
  1472. {
  1473. return package;
  1474. }
  1475. virtual CRoxieWorkflowMachine *createWorkflowMachine(IConstWorkUnit *wu, bool isOnce, const IRoxieContextLogger &logctx, const QueryOptions & options) const override
  1476. {
  1477. throwUnexpected(); // only on server...
  1478. }
  1479. virtual char *getEnv(const char *name, const char *defaultValue) const
  1480. {
  1481. if (!defaultValue)
  1482. defaultValue = "";
  1483. const char *result;
  1484. if (name && *name=='@')
  1485. {
  1486. // @ is shorthand for control: for legacy compatibility reasons
  1487. StringBuffer useName;
  1488. useName.append("control:").append(name+1);
  1489. result = package.queryEnv(useName.str());
  1490. }
  1491. else
  1492. result = package.queryEnv(name);
  1493. if (!result && name)
  1494. result = getenv(name);
  1495. return strdup(result ? result : defaultValue);
  1496. }
  1497. virtual IRoxieAgentContext *createAgentContext(const AgentContextLogger &logctx, IRoxieQueryPacket *packet, bool hasChildren) const override
  1498. {
  1499. throwUnexpected(); // only implemented in derived agent class
  1500. }
  1501. virtual IRoxieServerContext *createContext(IPropertyTree *xml, IHpccProtocolResponse *protocol, unsigned flags, const ContextLogger &_logctx, PTreeReaderOptions xmlReadFlags, const char *querySetName) const override
  1502. {
  1503. throwUnexpected(); // only implemented in derived server class
  1504. }
  1505. virtual IRoxieServerContext *createContext(IConstWorkUnit *wu, const ContextLogger &_logctx) const override
  1506. {
  1507. throwUnexpected(); // only implemented in derived server class
  1508. }
  1509. virtual void noteQuery(time_t startTime, bool failed, unsigned elapsed, unsigned memused, unsigned agentsReplyLen, unsigned bytesOut) override
  1510. {
  1511. throwUnexpected(); // only implemented in derived server class
  1512. }
  1513. virtual IPropertyTree *getQueryStats(time_t from, time_t to) override
  1514. {
  1515. throwUnexpected(); // only implemented in derived server class
  1516. }
  1517. virtual void getGraphNames(StringArray &ret) const override
  1518. {
  1519. Owned<IConstWUGraphIterator> graphs = &dll->queryWorkUnit()->getGraphs(GraphTypeActivities);
  1520. ForEach(*graphs)
  1521. {
  1522. SCMStringBuffer graphName;
  1523. graphs->query().getName(graphName);
  1524. ret.append(graphName.str());
  1525. }
  1526. }
  1527. virtual bool isDynamic() const override
  1528. {
  1529. return dynamic;
  1530. }
  1531. protected:
  1532. IPropertyTree *queryWorkflowTree() const
  1533. {
  1534. assertex(dll->queryWorkUnit());
  1535. return dll->queryWorkUnit()->queryWorkflowTree();
  1536. }
  1537. bool hasOnceSection() const
  1538. {
  1539. IPropertyTree *workflow = queryWorkflowTree();
  1540. if (workflow)
  1541. return workflow->hasProp("Item[@mode='once']");
  1542. else
  1543. return false;
  1544. }
  1545. virtual void checkSuspended() const override
  1546. {
  1547. if (isSuspended)
  1548. {
  1549. StringBuffer err;
  1550. if (errorMessage.length())
  1551. err.appendf(" because %s", errorMessage.str());
  1552. throw MakeStringException(ROXIE_QUERY_SUSPENDED, "Query %s is suspended%s", id.get(), err.str());
  1553. }
  1554. }
  1555. virtual void onTermination(TerminationCallbackInfo *info) const override
  1556. {
  1557. CriticalBlock b(callbacksCrit);
  1558. callbacks.append(*info);
  1559. }
  1560. };
  1561. CriticalSection CQueryFactory::activeQueriesCrit;
  1562. CopyMapXToMyClass<hash64_t, hash64_t, CQueryFactory> CQueryFactory::activeQueries; // Used to map hashes in packets to query factories
  1563. CriticalSection CQueryFactory::queryCacheCrit;
  1564. CopyMapXToMyClass<hash64_t, hash64_t, CQueryFactory> CQueryFactory::queryCache; // Used to ensure a given query is only created once
  1565. extern IQueryFactory *getQueryFactory(hash64_t hashvalue, unsigned channel)
  1566. {
  1567. return CQueryFactory::getQueryFactory(hashvalue, channel);
  1568. }
  1569. class CRoxieServerQueryFactory : public CQueryFactory
  1570. {
  1571. // Parts of query factory is only interesting on the server - workflow support, and tracking of total query times
  1572. protected:
  1573. Owned<IQueryStatsAggregator> queryStats;
  1574. public:
  1575. CRoxieServerQueryFactory(const char *_id, const IQueryDll *_dll, const IRoxiePackage &_package, hash64_t _hashValue, ISharedOnceContext *_sharedOnceContext, bool _dynamic)
  1576. : CQueryFactory(_id, _dll, _package, _hashValue, 0, _sharedOnceContext, _dynamic)
  1577. {
  1578. queryStats.setown(createQueryStatsAggregator(id.get(), statsExpiryTime));
  1579. }
  1580. virtual void noteQuery(time_t startTime, bool failed, unsigned elapsed, unsigned memused, unsigned agentsReplyLen, unsigned bytesOut)
  1581. {
  1582. queryStats->noteQuery(startTime, failed, elapsed, memused, agentsReplyLen, bytesOut);
  1583. queryGlobalQueryStatsAggregator()->noteQuery(startTime, failed, elapsed, memused, agentsReplyLen, bytesOut);
  1584. }
  1585. virtual void addDependency(unsigned sourceIdx, unsigned sourceId, unsigned targetId, int controlId, const char *edgeId, ActivityArray * activities) override
  1586. {
  1587. // addDependency is expected to fail occasionally on agent, but never on Roxie server
  1588. if (!doAddDependency(sourceIdx, sourceId, targetId, controlId, edgeId, activities))
  1589. throw MakeStringException(ROXIE_ADDDEPENDENCY_ERROR, "Failed to create dependency from %u on %u", sourceId, targetId);
  1590. }
  1591. virtual ActivityArray *loadGraph(IPropertyTree &graph, const char *graphName)
  1592. {
  1593. bool isLibraryGraph = graph.getPropBool("@library");
  1594. bool isSequential = graph.getPropBool("@sequential");
  1595. unsigned wfid = graph.getPropInt("@wfid");
  1596. ActivityArray *activities = new ActivityArray(isLibraryGraph, false, isLibraryGraph, isSequential, wfid);
  1597. if (isLibraryGraph)
  1598. activities->setLibraryGraphId(graph.getPropInt("node/@id"));
  1599. try
  1600. {
  1601. Owned<IPropertyTreeIterator> subgraphs = graph.getElements("node");
  1602. ForEach(*subgraphs)
  1603. {
  1604. IPropertyTree &node = subgraphs->query();
  1605. loadSubgraph(node, activities);
  1606. loadNode(node, 0, activities);
  1607. }
  1608. addDependencies(graph, activities);
  1609. }
  1610. catch (...)
  1611. {
  1612. ::Release(activities);
  1613. allActivities.kill();
  1614. throw;
  1615. }
  1616. return activities;
  1617. }
  1618. virtual IRoxieServerContext *createContext(IPropertyTree *context, IHpccProtocolResponse *protocol, unsigned flags, const ContextLogger &_logctx, PTreeReaderOptions _xmlReadFlags, const char *_querySetName) const
  1619. {
  1620. checkSuspended();
  1621. return createRoxieServerContext(context, protocol, this, flags, _logctx, _xmlReadFlags, _querySetName);
  1622. }
  1623. virtual IRoxieServerContext *createContext(IConstWorkUnit *wu, const ContextLogger &_logctx) const
  1624. {
  1625. checkSuspended();
  1626. return createWorkUnitServerContext(wu, this, _logctx);
  1627. }
  1628. virtual CRoxieWorkflowMachine *createWorkflowMachine(IConstWorkUnit *wu, bool isOnce, const IRoxieContextLogger &logctx, const QueryOptions & options) const
  1629. {
  1630. IPropertyTree *workflow = queryWorkflowTree();
  1631. if (workflow)
  1632. {
  1633. return ::createRoxieWorkflowMachine(workflow, wu, isOnce, options.parallelWorkflow, options.numWorkflowThreads, logctx);
  1634. }
  1635. else
  1636. return NULL;
  1637. }
  1638. virtual IPropertyTree *getQueryStats(time_t from, time_t to)
  1639. {
  1640. return queryStats->getStats(from, to);
  1641. }
  1642. };
  1643. unsigned checkWorkunitVersionConsistency(const IConstWorkUnit *wu)
  1644. {
  1645. assertex(wu);
  1646. unsigned wuVersion = wu->getCodeVersion();
  1647. if (wuVersion == 0)
  1648. throw makeStringException(ROXIE_MISMATCH, "Attempting to execute a workunit that hasn't been compiled");
  1649. if (wuVersion > ACTIVITY_INTERFACE_VERSION || wuVersion < MIN_ACTIVITY_INTERFACE_VERSION)
  1650. throw MakeStringException(ROXIE_MISMATCH, "Workunit was compiled for eclhelper interface version %d, this roxie requires version %d..%d", wuVersion, MIN_ACTIVITY_INTERFACE_VERSION, ACTIVITY_INTERFACE_VERSION);
  1651. return wuVersion;
  1652. }
  1653. static void checkWorkunitVersionConsistency(const IQueryDll *dll)
  1654. {
  1655. unsigned wuVersion = checkWorkunitVersionConsistency(dll->queryWorkUnit());
  1656. EclProcessFactory processFactory = (EclProcessFactory) dll->queryDll()->getEntry("createProcess");
  1657. if (processFactory)
  1658. {
  1659. Owned<IEclProcess> process = processFactory();
  1660. assertex(process);
  1661. if (process->getActivityVersion() != wuVersion)
  1662. throw MakeStringException(ROXIE_MISMATCH, "Inconsistent interface versions. Workunit was created using eclcc for version %u, but the c++ compiler used version %u", wuVersion, process->getActivityVersion());
  1663. }
  1664. else
  1665. throw MakeStringException(ROXIE_MISMATCH, "Workunit did not export createProcess function");
  1666. }
  1667. extern IQueryFactory *createServerQueryFactory(const char *id, const IQueryDll *dll, const IRoxiePackage &package, const IPropertyTree *stateInfo, bool isDynamic, bool forceRetry)
  1668. {
  1669. IArrayOf<IResolvedFile> queryFiles; // Note - these should stay in scope long enough to ensure still cached when (if) query is loaded for real
  1670. hash64_t hashValue = CQueryFactory::getQueryHash(id, dll, package, stateInfo, queryFiles, isDynamic);
  1671. Owned<CQueryFactory> ret;
  1672. {
  1673. CriticalBlock b(CQueryFactory::queryCacheCrit);
  1674. ret.setown(CQueryFactory::getCachedQuery(hashValue, 0));
  1675. if (ret && ret->loadFailed() && (reloadRetriesFailed || forceRetry)) // MORE - is there a race on loadFailed?
  1676. {
  1677. ret.clear();
  1678. }
  1679. if (ret)
  1680. ::Release(dll);
  1681. else
  1682. {
  1683. if (dll && !selfTestMode)
  1684. {
  1685. checkWorkunitVersionConsistency(dll);
  1686. Owned<ISharedOnceContext> sharedOnceContext;
  1687. IPropertyTree *workflow = dll->queryWorkUnit()->queryWorkflowTree();
  1688. if (workflow && workflow->hasProp("Item[@mode='once']"))
  1689. sharedOnceContext.setown(new CSharedOnceContext);
  1690. ret.setown(new CRoxieServerQueryFactory(id, dll, package, hashValue, sharedOnceContext, isDynamic));
  1691. }
  1692. else
  1693. ret.setown(new CRoxieServerQueryFactory(id, NULL, package, hashValue, NULL, isDynamic));
  1694. }
  1695. }
  1696. ret->init(stateInfo);
  1697. return ret.getClear();
  1698. }
  1699. extern IQueryFactory *createServerQueryFactoryFromWu(IConstWorkUnit *wu, const IQueryDll *_dll)
  1700. {
  1701. Linked<const IQueryDll> dll = _dll;
  1702. if (!dll)
  1703. dll.setown(createWuQueryDll(wu));
  1704. if (!dll)
  1705. return NULL;
  1706. return createServerQueryFactory(wu->queryWuid(), dll.getClear(), queryRootRoxiePackage(), NULL, true, false); // MORE - if use a constant for id might cache better?
  1707. }
  1708. //==============================================================================================================================================
  1709. class CAgentQueryFactory : public CQueryFactory
  1710. {
  1711. void addActivity(IAgentActivityFactory *activity, ActivityArray *activities)
  1712. {
  1713. activities->append(*activity);
  1714. unsigned activityId = activity->queryId();
  1715. allActivities.setValue(activityId, activity);
  1716. }
  1717. void loadAgentNode(IPropertyTree &node, unsigned subgraphId, ActivityArray *activities)
  1718. {
  1719. ThorActivityKind kind = getActivityKind(node);
  1720. switch (kind)
  1721. {
  1722. case TAKcsvread:
  1723. case TAKxmlread:
  1724. case TAKdiskread:
  1725. case TAKjsonread:
  1726. if (node.getPropBool("att[@name='_isSpill']/@value", false) || node.getPropBool("att[@name='_isSpillGlobal']/@value", false))
  1727. return;
  1728. break;
  1729. case TAKkeyedjoin:
  1730. case TAKkeyeddenormalize:
  1731. case TAKkeyeddenormalizegroup:
  1732. case TAKdisknormalize:
  1733. case TAKdiskcount:
  1734. case TAKdiskaggregate:
  1735. case TAKdiskgroupaggregate:
  1736. case TAKindexread:
  1737. case TAKindexnormalize:
  1738. case TAKindexcount:
  1739. case TAKindexaggregate:
  1740. case TAKindexgroupaggregate:
  1741. case TAKindexgroupexists:
  1742. case TAKindexgroupcount:
  1743. case TAKfetch:
  1744. case TAKcsvfetch:
  1745. case TAKxmlfetch:
  1746. case TAKjsonfetch:
  1747. case TAKremotegraph:
  1748. break;
  1749. case TAKsubgraph:
  1750. break;
  1751. default:
  1752. return;
  1753. }
  1754. IAgentActivityFactory *newAct = NULL;
  1755. if (kind != TAKsubgraph)
  1756. {
  1757. if (isSuspended)
  1758. newAct = createRoxieDummyActivityFactory(node, subgraphId, *this, false); // MORE - is there any point?
  1759. else
  1760. {
  1761. StringBuffer helperName;
  1762. node.getProp("att[@name=\"helper\"]/@value", helperName);
  1763. if (!helperName.length())
  1764. helperName.append("fAc").append(node.getPropInt("@id", 0));
  1765. HelperFactory *helperFactory = dll->getFactory(helperName.str());
  1766. if (!helperFactory)
  1767. throw MakeStringException(ROXIE_INTERNAL_ERROR, "Internal error: helper function %s not exported", helperName.str());
  1768. switch (kind)
  1769. {
  1770. case TAKdiskread:
  1771. newAct = createRoxieDiskReadActivityFactory(node, subgraphId, *this, helperFactory);
  1772. break;
  1773. case TAKcsvread:
  1774. newAct = createRoxieCsvReadActivityFactory(node, subgraphId, *this, helperFactory);
  1775. break;
  1776. case TAKxmlread:
  1777. case TAKjsonread:
  1778. newAct = createRoxieXmlReadActivityFactory(node, subgraphId, *this, helperFactory);
  1779. break;
  1780. case TAKdisknormalize:
  1781. newAct = createRoxieDiskNormalizeActivityFactory(node, subgraphId, *this, helperFactory);
  1782. break;
  1783. case TAKdiskcount:
  1784. newAct = createRoxieDiskCountActivityFactory(node, subgraphId, *this, helperFactory);
  1785. break;
  1786. case TAKdiskaggregate:
  1787. newAct = createRoxieDiskAggregateActivityFactory(node, subgraphId, *this, helperFactory);
  1788. break;
  1789. case TAKdiskgroupaggregate:
  1790. newAct = createRoxieDiskGroupAggregateActivityFactory(node, subgraphId, *this, helperFactory);
  1791. break;
  1792. case TAKindexread:
  1793. newAct = createRoxieIndexReadActivityFactory(node, subgraphId, *this, helperFactory);
  1794. break;
  1795. case TAKindexnormalize:
  1796. newAct = createRoxieIndexNormalizeActivityFactory(node, subgraphId, *this, helperFactory);
  1797. break;
  1798. case TAKindexcount:
  1799. newAct = createRoxieIndexCountActivityFactory(node, subgraphId, *this, helperFactory);
  1800. break;
  1801. case TAKindexaggregate:
  1802. newAct = createRoxieIndexAggregateActivityFactory(node, subgraphId, *this, helperFactory);
  1803. break;
  1804. case TAKindexgroupaggregate:
  1805. case TAKindexgroupexists:
  1806. case TAKindexgroupcount:
  1807. newAct = createRoxieIndexGroupAggregateActivityFactory(node, subgraphId, *this, helperFactory, kind);
  1808. break;
  1809. case TAKfetch:
  1810. newAct = createRoxieFetchActivityFactory(node, subgraphId, *this, helperFactory);
  1811. break;
  1812. case TAKcsvfetch:
  1813. newAct = createRoxieCSVFetchActivityFactory(node, subgraphId, *this, helperFactory);
  1814. break;
  1815. case TAKxmlfetch:
  1816. case TAKjsonfetch:
  1817. newAct = createRoxieXMLFetchActivityFactory(node, subgraphId, *this, helperFactory);
  1818. break;
  1819. case TAKkeyedjoin:
  1820. case TAKkeyeddenormalize:
  1821. case TAKkeyeddenormalizegroup:
  1822. newAct = createRoxieKeyedJoinIndexActivityFactory(node, subgraphId, *this, helperFactory);
  1823. if (node.getPropBool("att[@name=\"_diskAccessRequired\"]/@value"))
  1824. {
  1825. IAgentActivityFactory *newAct2 = createRoxieKeyedJoinFetchActivityFactory(node, subgraphId, *this, helperFactory);
  1826. unsigned activityId2 = newAct2->queryId() | ROXIE_ACTIVITY_FETCH;
  1827. activities->append(*newAct2);
  1828. allActivities.setValue(activityId2, newAct2);
  1829. }
  1830. break;
  1831. case TAKremotegraph:
  1832. {
  1833. unsigned graphId = node.getPropInt("att[@name=\"_remoteSubGraph\"]/@value", 0);
  1834. newAct = createRoxieRemoteActivityFactory(node, subgraphId, *this, helperFactory, graphId);
  1835. break;
  1836. }
  1837. default:
  1838. throwUnexpected();
  1839. }
  1840. }
  1841. if (newAct)
  1842. {
  1843. addActivity(newAct, activities);
  1844. }
  1845. }
  1846. else if (kind == TAKsubgraph)
  1847. {
  1848. // If the subgraph belongs to a remote activity, we need to be able to execute it on the agent...
  1849. IPropertyTree * childGraphNode = node.queryPropTree("att/graph");
  1850. if (!childGraphNode->getPropBool("@child"))
  1851. {
  1852. unsigned parentId = findParentId(node);
  1853. assertex(parentId);
  1854. unsigned parentIndex = activities->findActivityIndex(parentId);
  1855. if (parentIndex != NotFound)
  1856. {
  1857. ActivityArray *childQuery = loadChildGraph(*childGraphNode);
  1858. activities->item(parentIndex).addChildQuery(node.getPropInt("@id"), childQuery);
  1859. }
  1860. }
  1861. // Regardless, we need to make sure we create remote activities as required throughout the graph
  1862. Owned<IPropertyTreeIterator> nodes = node.getElements("att/graph/node");
  1863. unsigned subgraphId = node.getPropInt("@id");
  1864. ForEach(*nodes)
  1865. {
  1866. IPropertyTree &node = nodes->query();
  1867. loadAgentNode(node, subgraphId, activities);
  1868. }
  1869. }
  1870. }
  1871. void loadOuterSubgraph(IPropertyTree &graph, ActivityArray *activities)
  1872. {
  1873. Owned<IPropertyTreeIterator> nodes = graph.getElements("att/graph/node");
  1874. unsigned subgraphId = graph.getPropInt("@id");
  1875. ForEach(*nodes)
  1876. {
  1877. IPropertyTree &node = nodes->query();
  1878. loadAgentNode(node, subgraphId, activities);
  1879. }
  1880. loadAgentNode(graph, subgraphId, activities); // MORE - not really sure why this line is here!
  1881. }
  1882. public:
  1883. CAgentQueryFactory(const char *_id, const IQueryDll *_dll, const IRoxiePackage &_package, hash64_t _hashValue, unsigned _channelNo, ISharedOnceContext *_sharedOnceContext, bool _dynamic)
  1884. : CQueryFactory(_id, _dll, _package, _hashValue, _channelNo, _sharedOnceContext, _dynamic)
  1885. {
  1886. }
  1887. virtual IRoxieAgentContext *createAgentContext(const AgentContextLogger &logctx, IRoxieQueryPacket *packet, bool hasChildren) const
  1888. {
  1889. return ::createAgentContext(this, logctx, packet, hasChildren);
  1890. }
  1891. virtual ActivityArray *loadGraph(IPropertyTree &graph, const char *graphName)
  1892. {
  1893. // MORE: common up with loadGraph for the Roxie server..
  1894. bool isLibraryGraph = graph.getPropBool("@library");
  1895. bool isSequential = graph.getPropBool("@sequential");
  1896. unsigned wfid = graph.getPropInt("@wfid");
  1897. ActivityArray *activities = new ActivityArray(isLibraryGraph, false, isLibraryGraph, isSequential, wfid);
  1898. if (isLibraryGraph)
  1899. activities->setLibraryGraphId(graph.getPropInt("node/@id"));
  1900. try
  1901. {
  1902. if (false && isLibraryGraph)
  1903. {
  1904. //Really only need to do this if the library is called from a remote activity
  1905. //but it's a bit tricky to work out since the library graph will come before the use.
  1906. //Not a major issue since libraries won't be embedded for production queries.
  1907. // this comment makes little sense...
  1908. Owned<IPropertyTreeIterator> subgraphs = graph.getElements("node");
  1909. ForEach(*subgraphs)
  1910. {
  1911. IPropertyTree &node = subgraphs->query();
  1912. loadSubgraph(node, activities);
  1913. loadNode(node, 0, activities);
  1914. }
  1915. }
  1916. Owned<IPropertyTreeIterator> subgraphs = graph.getElements("node");
  1917. ForEach(*subgraphs)
  1918. {
  1919. IPropertyTree &subgraph = subgraphs->query();
  1920. loadOuterSubgraph(subgraph, activities);
  1921. }
  1922. addDependencies(graph, activities);
  1923. }
  1924. catch (...)
  1925. {
  1926. ::Release(activities);
  1927. allActivities.kill();
  1928. throw;
  1929. }
  1930. return activities;
  1931. }
  1932. };
  1933. IQueryFactory *createAgentQueryFactory(const char *id, const IQueryDll *dll, const IRoxiePackage &package, unsigned channel, const IPropertyTree *stateInfo, bool isDynamic, bool forceRetry)
  1934. {
  1935. IArrayOf<IResolvedFile> queryFiles; // Note - these should stay in scope long enough to ensure still cached when (if) query is loaded for real
  1936. Owned<CQueryFactory> ret;
  1937. hash64_t hashValue = CQueryFactory::getQueryHash(id, dll, package, stateInfo, queryFiles, isDynamic);
  1938. {
  1939. CriticalBlock b(CQueryFactory::queryCacheCrit);
  1940. ret.setown(CQueryFactory::getCachedQuery(hashValue, channel));
  1941. if (ret)
  1942. {
  1943. ::Release(dll);
  1944. }
  1945. else if (dll)
  1946. {
  1947. checkWorkunitVersionConsistency(dll);
  1948. Owned<IQueryFactory> serverFactory = CQueryFactory::getCachedQuery(hashValue, 0);
  1949. assertex(serverFactory);
  1950. ret.setown(new CAgentQueryFactory(id, dll, package, hashValue, channel, serverFactory->querySharedOnceContext(), isDynamic));
  1951. }
  1952. else
  1953. ret.setown(new CAgentQueryFactory(id, NULL, package, hashValue, channel, NULL, isDynamic));
  1954. }
  1955. ret->init(stateInfo);
  1956. return ret.getClear();
  1957. }
  1958. extern IQueryFactory *createAgentQueryFactoryFromWu(IConstWorkUnit *wu, unsigned channelNo)
  1959. {
  1960. Owned<const IQueryDll> dll = createWuQueryDll(wu);
  1961. if (!dll)
  1962. return NULL;
  1963. return createAgentQueryFactory(wu->queryWuid(), dll.getClear(), queryRootRoxiePackage(), channelNo, NULL, true, false); // MORE - if use a constant for id might cache better?
  1964. }