ccdquery.cpp 66 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201120212031204120512061207120812091210121112121213121412151216121712181219122012211222122312241225122612271228122912301231123212331234123512361237123812391240124112421243124412451246124712481249125012511252125312541255125612571258125912601261126212631264126512661267126812691270127112721273127412751276127712781279128012811282128312841285128612871288128912901291129212931294129512961297129812991300130113021303130413051306130713081309131013111312131313141315131613171318131913201321132213231324132513261327132813291330133113321333133413351336133713381339134013411342134313441345134613471348134913501351135213531354135513561357135813591360136113621363136413651366136713681369137013711372137313741375137613771378137913801381138213831384138513861387138813891390139113921393139413951396139713981399140014011402140314041405140614071408140914101411141214131414141514161417141814191420142114221423142414251426142714281429143014311432143314341435143614371438143914401441144214431444144514461447144814491450145114521453145414551456145714581459146014611462146314641465146614671468146914701471147214731474147514761477147814791480148114821483148414851486148714881489149014911492149314941495149614971498149915001501150215031504150515061507150815091510151115121513151415151516151715181519152015211522152315241525152615271528152915301531153215331534153515361537153815391540154115421543154415451546154715481549155015511552155315541555155615571558155915601561156215631564156515661567156815691570157115721573157415751576157715781579158015811582158315841585158615871588158915901591159215931594159515961597159815991600160116021603160416051606
  1. /*##############################################################################
  2. HPCC SYSTEMS software Copyright (C) 2012 HPCC Systems.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. ############################################################################## */
  13. #include <platform.h>
  14. #include <jlib.hpp>
  15. #include "ccd.hpp"
  16. #include "ccdquery.hpp"
  17. #include "ccdstate.hpp"
  18. #include "ccdsnmp.hpp"
  19. #include "thorplugin.hpp"
  20. #include "layouttrans.hpp"
  21. void ActivityArray::append(IActivityFactory &cur)
  22. {
  23. hash.setValue(cur.queryId(), activities.ordinality());
  24. activities.append(cur);
  25. }
  26. unsigned ActivityArray::findActivityIndex(unsigned id)
  27. {
  28. unsigned *ret = hash.getValue(id);
  29. if (ret)
  30. return *ret;
  31. return NotFound;
  32. }
  33. unsigned ActivityArray::recursiveFindActivityIndex(unsigned id)
  34. {
  35. // NOTE - this returns the activity index of the PARENT of the specified activity
  36. unsigned *ret = hash.getValue(id);
  37. if (ret)
  38. return *ret;
  39. ForEachItem(idx)
  40. {
  41. IActivityFactory & cur = item(idx);
  42. unsigned childId;
  43. for (unsigned childIdx = 0;;childIdx++)
  44. {
  45. ActivityArray * children = cur.queryChildQuery(childIdx, childId);
  46. if (!children)
  47. break;
  48. if (children->recursiveFindActivityIndex(id) != NotFound)
  49. {
  50. hash.setValue(id, idx);
  51. return idx;
  52. }
  53. }
  54. }
  55. return NotFound;
  56. }
  57. //----------------------------------------------------------------------------------------------
  58. // Class CQueryDll maps dlls into loadable workunits, complete with caching to ensure that a refresh of the QuerySet
  59. // can avoid reloading dlls, and that the same CQueryDll (and the objects it owns) can be shared between server and
  60. // multiple slave channels
  61. //----------------------------------------------------------------------------------------------
  62. class CQueryDll : public CInterface, implements IQueryDll
  63. {
  64. StringAttr dllName;
  65. Owned <ILoadedDllEntry> dll;
  66. Owned <IConstWorkUnit> wu;
  67. static CriticalSection dllCacheLock;
  68. static CopyMapStringToMyClass<CQueryDll> dllCache;
  69. public:
  70. IMPLEMENT_IINTERFACE;
  71. CQueryDll(const char *_dllName, ILoadedDllEntry *_dll) : dllName(_dllName), dll(_dll)
  72. {
  73. StringBuffer wuXML;
  74. if (getEmbeddedWorkUnitXML(dll, wuXML))
  75. {
  76. Owned<ILocalWorkUnit> localWU = createLocalWorkUnit();
  77. localWU->loadXML(wuXML);
  78. wu.setown(localWU->unlock());
  79. }
  80. CriticalBlock b(dllCacheLock);
  81. dllCache.setValue(dllName, this);
  82. }
  83. virtual void beforeDispose()
  84. {
  85. CriticalBlock b(dllCacheLock);
  86. // NOTE: it's theoretically possible for the final release to happen after a replacement has been inserted into hash table.
  87. // So only remove from hash table if what we find there matches the item that is being deleted.
  88. CQueryDll *goer = dllCache.getValue(dllName);
  89. if (goer == this)
  90. dllCache.remove(dllName);
  91. }
  92. static const CQueryDll *getQueryDll(const char *dllName, bool isExe)
  93. {
  94. CriticalBlock b(dllCacheLock);
  95. CQueryDll *dll = LINK(dllCache.getValue(dllName));
  96. if (dll && dll->isAlive())
  97. return dll;
  98. else
  99. {
  100. Owned<ILoadedDllEntry> dll = isExe ? createExeDllEntry(dllName) : queryRoxieDllServer().loadDll(dllName, DllLocationDirectory);
  101. assertex(dll != NULL);
  102. return new CQueryDll(dllName, dll.getClear());
  103. }
  104. }
  105. static const IQueryDll *getWorkUnitDll(IConstWorkUnit *wu)
  106. {
  107. SCMStringBuffer dllName;
  108. Owned<IConstWUQuery> q = wu->getQuery();
  109. q->getQueryDllName(dllName);
  110. return getQueryDll(dllName.str(), false);
  111. }
  112. virtual HelperFactory *getFactory(const char *helperName) const
  113. {
  114. return (HelperFactory *) dll->getEntry(helperName);
  115. }
  116. virtual ILoadedDllEntry *queryDll() const
  117. {
  118. return dll;
  119. }
  120. virtual IConstWorkUnit *queryWorkUnit() const
  121. {
  122. return wu;
  123. }
  124. };
  125. CriticalSection CQueryDll::dllCacheLock;
  126. CopyMapStringToMyClass<CQueryDll> CQueryDll::dllCache;
  127. extern const IQueryDll *createQueryDll(const char *dllName)
  128. {
  129. return CQueryDll::getQueryDll(dllName, false);
  130. }
  131. extern const IQueryDll *createExeQueryDll(const char *exeName)
  132. {
  133. return CQueryDll::getQueryDll(exeName, true);
  134. }
  135. extern const IQueryDll *createWuQueryDll(IConstWorkUnit *wu)
  136. {
  137. return CQueryDll::getWorkUnitDll(wu);
  138. }
  139. // Add information to the xref information to be returned for a control:getQueryXrefInfo request
  140. void addXrefInfo(IPropertyTree &reply, const char *section, const char *name)
  141. {
  142. VStringBuffer xpath("%s[@name='%s']", section, name);
  143. if (!reply.hasProp(xpath))
  144. {
  145. IPropertyTree *info = createPTree(section, 0);
  146. info->setProp("@name", name);
  147. reply.addPropTree(section, info);
  148. }
  149. }
  150. extern void addXrefFileInfo(IPropertyTree &reply, const IResolvedFile *dataFile)
  151. {
  152. addXrefInfo(reply, "File", dataFile->queryFileName());
  153. }
  154. extern void addXrefLibraryInfo(IPropertyTree &reply, const char *libraryName)
  155. {
  156. addXrefInfo(reply, "Library", libraryName);
  157. }
  158. //----------------------------------------------------------------------------------------------
  159. // Class CQueryFactory is the main implementation of IQueryFactory, combining a IQueryDll and a
  160. // package context into an object that can quickly create a the query context that executes a specific
  161. // instance of a Roxie query.
  162. // Caching is used to ensure that only queries that are affected by a package change need to be reloaded.
  163. // Derived classes handle the differences between slave and server side factories
  164. //----------------------------------------------------------------------------------------------
  165. class CQueryFactory : public CInterface, implements IQueryFactory, implements IResourceContext
  166. {
  167. protected:
  168. const IRoxiePackage &package;
  169. Owned<const IQueryDll> dll;
  170. MapStringToActivityArray graphMap;
  171. StringAttr id;
  172. StringBuffer errorMessage;
  173. MapIdToActivityFactory allActivities;
  174. bool isSuspended;
  175. bool enableFieldTranslation;
  176. unsigned timeLimit;
  177. unsigned warnTimeLimit;
  178. memsize_t memoryLimit;
  179. unsigned priority;
  180. unsigned libraryInterfaceHash;
  181. hash64_t hashValue;
  182. static SpinLock queriesCrit;
  183. static CopyMapXToMyClass<hash64_t, hash64_t, CQueryFactory> queryMap;
  184. public:
  185. static CriticalSection queryCreateLock;
  186. protected:
  187. IRoxieServerActivityFactory *createActivityFactory(ThorActivityKind kind, unsigned subgraphId, IPropertyTree &node)
  188. {
  189. unsigned id = node.getPropInt("@id", 0);
  190. if (isSuspended)
  191. return createRoxieServerDummyActivityFactory(id, subgraphId, *this, NULL, TAKnone, node, false); // Is there actually any point?
  192. StringBuffer helperName;
  193. node.getProp("att[@name=\"helper\"]/@value", helperName);
  194. if (!helperName.length())
  195. helperName.append("fAc").append(id);
  196. HelperFactory *helperFactory = dll->getFactory(helperName);
  197. if (!helperFactory)
  198. throw MakeStringException(ROXIE_INTERNAL_ERROR, "Internal error: helper function %s not exported", helperName.str());
  199. switch (kind)
  200. {
  201. case TAKalljoin:
  202. case TAKalldenormalize:
  203. case TAKalldenormalizegroup:
  204. return createRoxieServerAllJoinActivityFactory(id, subgraphId, *this, helperFactory, kind);
  205. case TAKapply:
  206. return createRoxieServerApplyActivityFactory(id, subgraphId, *this, helperFactory, kind, isRootAction(node));
  207. case TAKaggregate:
  208. case TAKexistsaggregate: // could special case.
  209. case TAKcountaggregate:
  210. return createRoxieServerAggregateActivityFactory(id, subgraphId, *this, helperFactory, kind);
  211. case TAKcase:
  212. case TAKchildcase:
  213. return createRoxieServerCaseActivityFactory(id, subgraphId, *this, helperFactory, kind, isGraphIndependent(node));
  214. case TAKcatch:
  215. case TAKskipcatch:
  216. case TAKcreaterowcatch:
  217. return createRoxieServerCatchActivityFactory(id, subgraphId, *this, helperFactory, kind);
  218. case TAKchilditerator:
  219. return createRoxieServerChildIteratorActivityFactory(id, subgraphId, *this, helperFactory, kind);
  220. case TAKchoosesets:
  221. return createRoxieServerChooseSetsActivityFactory(id, subgraphId, *this, helperFactory, kind);
  222. case TAKchoosesetsenth:
  223. return createRoxieServerChooseSetsEnthActivityFactory(id, subgraphId, *this, helperFactory, kind);
  224. case TAKchoosesetslast:
  225. return createRoxieServerChooseSetsLastActivityFactory(id, subgraphId, *this, helperFactory, kind);
  226. case TAKproject:
  227. case TAKcountproject:
  228. return createRoxieServerProjectActivityFactory(id, subgraphId, *this, helperFactory, kind); // code is common between Project, CountProject
  229. case TAKfilterproject:
  230. return createRoxieServerFilterProjectActivityFactory(id, subgraphId, *this, helperFactory, kind);
  231. case TAKdatasetresult:
  232. case TAKrowresult:
  233. return createRoxieServerDatasetResultActivityFactory(id, subgraphId, *this, helperFactory, kind, isRootAction(node));
  234. case TAKdedup:
  235. return createRoxieServerDedupActivityFactory(id, subgraphId, *this, helperFactory, kind);
  236. case TAKdegroup:
  237. return createRoxieServerDegroupActivityFactory(id, subgraphId, *this, helperFactory, kind);
  238. case TAKcsvread:
  239. case TAKxmlread:
  240. case TAKdiskread:
  241. {
  242. if (node.getPropBool("att[@name='_isSpill']/@value", false) || node.getPropBool("att[@name='_isSpillGlobal']/@value", false))
  243. return createRoxieServerSpillReadActivityFactory(id, subgraphId, *this, helperFactory, kind);
  244. else
  245. {
  246. RemoteActivityId remoteId(id, hashValue);
  247. return createRoxieServerDiskReadActivityFactory(id, subgraphId, *this, helperFactory, kind, remoteId, node);
  248. }
  249. }
  250. case TAKmemoryspillread:
  251. return createRoxieServerSpillReadActivityFactory(id, subgraphId, *this, helperFactory, kind);
  252. case TAKdisknormalize:
  253. case TAKdiskcount:
  254. case TAKdiskaggregate:
  255. case TAKdiskgroupaggregate:
  256. {
  257. RemoteActivityId remoteId(id, hashValue);
  258. return createRoxieServerDiskReadActivityFactory(id, subgraphId, *this, helperFactory, kind, remoteId, node);
  259. }
  260. case TAKchildnormalize:
  261. return createRoxieServerNewChildNormalizeActivityFactory(id, subgraphId, *this, helperFactory, kind);
  262. case TAKchildaggregate:
  263. return createRoxieServerNewChildAggregateActivityFactory(id, subgraphId, *this, helperFactory, kind);
  264. case TAKchildgroupaggregate:
  265. return createRoxieServerNewChildGroupAggregateActivityFactory(id, subgraphId, *this, helperFactory, kind);
  266. case TAKchildthroughnormalize:
  267. return createRoxieServerNewChildThroughNormalizeActivityFactory(id, subgraphId, *this, helperFactory, kind);
  268. case TAKcsvwrite:
  269. case TAKdiskwrite:
  270. case TAKxmlwrite:
  271. case TAKmemoryspillwrite:
  272. return createRoxieServerDiskWriteActivityFactory(id, subgraphId, *this, helperFactory, kind, isRootAction(node));
  273. case TAKindexwrite:
  274. return createRoxieServerIndexWriteActivityFactory(id, subgraphId, *this, helperFactory, kind, isRootAction(node));
  275. case TAKenth:
  276. return createRoxieServerEnthActivityFactory(id, subgraphId, *this, helperFactory, kind);
  277. case TAKfetch:
  278. case TAKcsvfetch:
  279. case TAKxmlfetch:
  280. {
  281. RemoteActivityId remoteId(id, hashValue);
  282. return createRoxieServerFetchActivityFactory(id, subgraphId, *this, helperFactory, kind, remoteId, node);
  283. }
  284. case TAKfilter:
  285. return createRoxieServerFilterActivityFactory(id, subgraphId, *this, helperFactory, kind);
  286. case TAKfiltergroup:
  287. return createRoxieServerFilterGroupActivityFactory(id, subgraphId, *this, helperFactory, kind);
  288. case TAKfirstn:
  289. return createRoxieServerFirstNActivityFactory(id, subgraphId, *this, helperFactory, kind);
  290. case TAKfunnel:
  291. return createRoxieServerConcatActivityFactory(id, subgraphId, *this, helperFactory, kind);
  292. case TAKgroup:
  293. return createRoxieServerGroupActivityFactory(id, subgraphId, *this, helperFactory, kind);
  294. case TAKhashaggregate:
  295. return createRoxieServerHashAggregateActivityFactory(id, subgraphId, *this, helperFactory, kind, node);
  296. case TAKif:
  297. case TAKchildif:
  298. return createRoxieServerIfActivityFactory(id, subgraphId, *this, helperFactory, kind, isGraphIndependent(node));
  299. case TAKifaction:
  300. return createRoxieServerIfActionActivityFactory(id, subgraphId, *this, helperFactory, kind, isRootAction(node));
  301. case TAKparallel:
  302. return createRoxieServerParallelActionActivityFactory(id, subgraphId, *this, helperFactory, kind, isRootAction(node));
  303. case TAKsequential:
  304. return createRoxieServerSequentialActionActivityFactory(id, subgraphId, *this, helperFactory, kind, isRootAction(node));
  305. case TAKindexread:
  306. {
  307. RemoteActivityId remoteId(id, hashValue);
  308. return createRoxieServerIndexReadActivityFactory(id, subgraphId, *this, helperFactory, kind, remoteId, node);
  309. }
  310. case TAKindexnormalize:
  311. {
  312. RemoteActivityId remoteId(id, hashValue);
  313. return createRoxieServerIndexNormalizeActivityFactory(id, subgraphId, *this, helperFactory, kind, remoteId, node);
  314. }
  315. case TAKindexcount:
  316. {
  317. RemoteActivityId remoteId(id, hashValue);
  318. return createRoxieServerIndexCountActivityFactory(id, subgraphId, *this, helperFactory, kind, remoteId, node);
  319. }
  320. case TAKindexaggregate:
  321. {
  322. RemoteActivityId remoteId(id, hashValue);
  323. return createRoxieServerIndexAggregateActivityFactory(id, subgraphId, *this, helperFactory, kind, remoteId, node);
  324. }
  325. case TAKindexgroupaggregate:
  326. case TAKindexgroupexists:
  327. case TAKindexgroupcount:
  328. {
  329. RemoteActivityId remoteId(id, hashValue);
  330. return createRoxieServerIndexGroupAggregateActivityFactory(id, subgraphId, *this, helperFactory, kind, remoteId, node);
  331. }
  332. case TAKcountdisk:
  333. return createRoxieServerDiskCountActivityFactory(id, subgraphId, *this, helperFactory, kind, node);
  334. case TAKhashdedup:
  335. return createRoxieServerHashDedupActivityFactory(id, subgraphId, *this, helperFactory, kind);
  336. case TAKhashdenormalize:
  337. case TAKhashdistribute:
  338. case TAKhashdistributemerge:
  339. case TAKhashjoin:
  340. throwUnexpected(); // Code generator should have removed or transformed
  341. case TAKiterate:
  342. return createRoxieServerIterateActivityFactory(id, subgraphId, *this, helperFactory, kind);
  343. case TAKprocess:
  344. return createRoxieServerProcessActivityFactory(id, subgraphId, *this, helperFactory, kind);
  345. case TAKjoin:
  346. case TAKjoinlight:
  347. case TAKdenormalize:
  348. case TAKdenormalizegroup:
  349. return createRoxieServerJoinActivityFactory(id, subgraphId, *this, helperFactory, kind);
  350. case TAKkeyeddistribute:
  351. throwUnexpected(); // Code generator should have removed or transformed
  352. case TAKkeyedjoin:
  353. case TAKkeyeddenormalize:
  354. case TAKkeyeddenormalizegroup:
  355. {
  356. RemoteActivityId remoteId(id, hashValue);
  357. RemoteActivityId remoteId2(id | ROXIE_ACTIVITY_FETCH, hashValue);
  358. return createRoxieServerKeyedJoinActivityFactory(id, subgraphId, *this, helperFactory, kind, remoteId, remoteId2, node);
  359. }
  360. case TAKlimit:
  361. return createRoxieServerLimitActivityFactory(id, subgraphId, *this, helperFactory, kind);
  362. case TAKlookupjoin:
  363. case TAKlookupdenormalize:
  364. case TAKlookupdenormalizegroup:
  365. return createRoxieServerLookupJoinActivityFactory(id, subgraphId, *this, helperFactory, kind);
  366. case TAKmerge:
  367. return createRoxieServerMergeActivityFactory(id, subgraphId, *this, helperFactory, kind);
  368. case TAKnormalize:
  369. return createRoxieServerNormalizeActivityFactory(id, subgraphId, *this, helperFactory, kind);
  370. case TAKnormalizechild:
  371. return createRoxieServerNormalizeChildActivityFactory(id, subgraphId, *this, helperFactory, kind);
  372. case TAKnormalizelinkedchild:
  373. return createRoxieServerNormalizeLinkedChildActivityFactory(id, subgraphId, *this, helperFactory, kind);
  374. case TAKnull:
  375. return createRoxieServerNullActivityFactory(id, subgraphId, *this, helperFactory, kind);
  376. case TAKsideeffect:
  377. return createRoxieServerSideEffectActivityFactory(id, subgraphId, *this, helperFactory, kind, isRootAction(node));
  378. case TAKsimpleaction:
  379. return createRoxieServerActionActivityFactory(id, subgraphId, *this, helperFactory, kind, usageCount(node), isRootAction(node));
  380. case TAKparse:
  381. return createRoxieServerParseActivityFactory(id, subgraphId, *this, helperFactory, kind, this);
  382. case TAKworkunitwrite:
  383. return createRoxieServerWorkUnitWriteActivityFactory(id, subgraphId, *this, helperFactory, kind, usageCount(node), isRootAction(node));
  384. case TAKpiperead:
  385. return createRoxieServerPipeReadActivityFactory(id, subgraphId, *this, helperFactory, kind);
  386. case TAKpipethrough:
  387. return createRoxieServerPipeThroughActivityFactory(id, subgraphId, *this, helperFactory, kind);
  388. case TAKpipewrite:
  389. return createRoxieServerPipeWriteActivityFactory(id, subgraphId, *this, helperFactory, kind, usageCount(node), isRootAction(node));
  390. case TAKpull:
  391. throwUnexpected(); //code generator strips for non-thor
  392. case TAKrawiterator:
  393. return createRoxieServerRawIteratorActivityFactory(id, subgraphId, *this, helperFactory, kind);
  394. case TAKlinkedrawiterator:
  395. return createRoxieServerLinkedRawIteratorActivityFactory(id, subgraphId, *this, helperFactory, kind);
  396. case TAKremoteresult:
  397. return createRoxieServerRemoteResultActivityFactory(id, subgraphId, *this, helperFactory, kind, usageCount(node), isRootAction(node));
  398. case TAKrollup:
  399. return createRoxieServerRollupActivityFactory(id, subgraphId, *this, helperFactory, kind);
  400. case TAKsample:
  401. return createRoxieServerSampleActivityFactory(id, subgraphId, *this, helperFactory, kind);
  402. case TAKselectn:
  403. return createRoxieServerSelectNActivityFactory(id, subgraphId, *this, helperFactory, kind);
  404. case TAKselfjoin:
  405. case TAKselfjoinlight:
  406. return createRoxieServerSelfJoinActivityFactory(id, subgraphId, *this, helperFactory, kind);
  407. case TAKskiplimit:
  408. case TAKcreaterowlimit:
  409. return createRoxieServerSkipLimitActivityFactory(id, subgraphId, *this, helperFactory, kind);
  410. case TAKhttp_rowdataset:
  411. case TAKsoap_rowdataset:
  412. return createRoxieServerSoapRowCallActivityFactory(id, subgraphId, *this, helperFactory, kind);
  413. case TAKsoap_rowaction:
  414. return createRoxieServerSoapRowActionActivityFactory(id, subgraphId, *this, helperFactory, kind, isRootAction(node));
  415. case TAKsoap_datasetdataset:
  416. return createRoxieServerSoapDatasetCallActivityFactory(id, subgraphId, *this, helperFactory, kind);
  417. case TAKsoap_datasetaction:
  418. return createRoxieServerSoapDatasetActionActivityFactory(id, subgraphId, *this, helperFactory, kind, isRootAction(node));
  419. case TAKsort:
  420. return createRoxieServerSortActivityFactory(id, subgraphId, *this, helperFactory, kind);
  421. case TAKspill:
  422. case TAKmemoryspillsplit:
  423. return createRoxieServerThroughSpillActivityFactory(id, subgraphId, *this, helperFactory, kind);
  424. case TAKsplit:
  425. return createRoxieServerSplitActivityFactory(id, subgraphId, *this, helperFactory, kind);
  426. case TAKstreamediterator:
  427. return createRoxieServerStreamedIteratorActivityFactory(id, subgraphId, *this, helperFactory, kind);
  428. case TAKtemptable:
  429. case TAKtemprow:
  430. return createRoxieServerTempTableActivityFactory(id, subgraphId, *this, helperFactory, kind);
  431. case TAKinlinetable:
  432. return createRoxieServerInlineTableActivityFactory(id, subgraphId, *this, helperFactory, kind);
  433. case TAKthroughaggregate:
  434. throwUnexpected(); // Concept of through aggregates has been proven not to work in Roxie - codegen should not be creating them any more.
  435. case TAKtopn:
  436. return createRoxieServerTopNActivityFactory(id, subgraphId, *this, helperFactory, kind);
  437. case TAKworkunitread:
  438. return createRoxieServerWorkUnitReadActivityFactory(id, subgraphId, *this, helperFactory, kind);
  439. case TAKxmlparse:
  440. return createRoxieServerXmlParseActivityFactory(id, subgraphId, *this, helperFactory, kind);
  441. case TAKregroup:
  442. return createRoxieServerRegroupActivityFactory(id, subgraphId, *this, helperFactory, kind);
  443. case TAKcombine:
  444. return createRoxieServerCombineActivityFactory(id, subgraphId, *this, helperFactory, kind);
  445. case TAKcombinegroup:
  446. return createRoxieServerCombineGroupActivityFactory(id, subgraphId, *this, helperFactory, kind);
  447. case TAKrollupgroup:
  448. return createRoxieServerRollupGroupActivityFactory(id, subgraphId, *this, helperFactory, kind);
  449. case TAKlocalresultread:
  450. {
  451. unsigned graphId = getGraphId(node);
  452. return createRoxieServerLocalResultReadActivityFactory(id, subgraphId, *this, helperFactory, kind, graphId);
  453. }
  454. case TAKlocalstreamread:
  455. return createRoxieServerLocalResultStreamReadActivityFactory(id, subgraphId, *this, helperFactory, kind);
  456. case TAKlocalresultwrite:
  457. {
  458. unsigned graphId = getGraphId(node);
  459. return createRoxieServerLocalResultWriteActivityFactory(id, subgraphId, *this, helperFactory, kind, usageCount(node), graphId, isRootAction(node));
  460. }
  461. case TAKloopcount:
  462. case TAKlooprow:
  463. case TAKloopdataset:
  464. {
  465. unsigned loopId = node.getPropInt("att[@name=\"_loopid\"]/@value", 0);
  466. return createRoxieServerLoopActivityFactory(id, subgraphId, *this, helperFactory, kind, loopId);
  467. }
  468. case TAKremotegraph:
  469. {
  470. RemoteActivityId remoteId(id, hashValue);
  471. return createRoxieServerRemoteActivityFactory(id, subgraphId, *this, helperFactory, kind, remoteId, isRootAction(node));
  472. }
  473. case TAKgraphloopresultread:
  474. {
  475. unsigned graphId = getGraphId(node);
  476. return createRoxieServerGraphLoopResultReadActivityFactory(id, subgraphId, *this, helperFactory, kind, graphId);
  477. }
  478. case TAKgraphloopresultwrite:
  479. {
  480. unsigned graphId = getGraphId(node);
  481. return createRoxieServerGraphLoopResultWriteActivityFactory(id, subgraphId, *this, helperFactory, kind, usageCount(node), graphId);
  482. }
  483. case TAKnwaygraphloopresultread:
  484. {
  485. unsigned graphId = node.getPropInt("att[@name=\"_graphId\"]/@value", 0);
  486. return createRoxieServerNWayGraphLoopResultReadActivityFactory(id, subgraphId, *this, helperFactory, kind, graphId);
  487. }
  488. case TAKnwayinput:
  489. return createRoxieServerNWayInputActivityFactory(id, subgraphId, *this, helperFactory, kind);
  490. case TAKnwaymerge:
  491. return createRoxieServerNWayMergeActivityFactory(id, subgraphId, *this, helperFactory, kind);
  492. case TAKnwaymergejoin:
  493. case TAKnwayjoin:
  494. return createRoxieServerNWayMergeJoinActivityFactory(id, subgraphId, *this, helperFactory, kind);
  495. case TAKsorted:
  496. return createRoxieServerSortedActivityFactory(id, subgraphId, *this, helperFactory, kind);
  497. case TAKgraphloop:
  498. case TAKparallelgraphloop:
  499. {
  500. unsigned loopId = node.getPropInt("att[@name=\"_loopid\"]/@value", 0);
  501. return createRoxieServerGraphLoopActivityFactory(id, subgraphId, *this, helperFactory, kind, loopId);
  502. }
  503. case TAKlibrarycall:
  504. {
  505. LibraryCallFactoryExtra extra;
  506. extra.maxOutputs = node.getPropInt("att[@name=\"_maxOutputs\"]/@value", 0);
  507. extra.graphid = node.getPropInt("att[@name=\"_graphid\"]/@value", 0);
  508. extra.libraryName.set(node.queryProp("att[@name=\"libname\"]/@value"));
  509. extra.interfaceHash = node.getPropInt("att[@name=\"_interfaceHash\"]/@value", 0);
  510. extra.embedded = node.getPropBool("att[@name=\"embedded\"]/@value", false) ;
  511. Owned<IPropertyTreeIterator> iter = node.getElements("att[@name=\"_outputUsed\"]");
  512. ForEach(*iter)
  513. extra.outputs.append(iter->query().getPropInt("@value"));
  514. return createRoxieServerLibraryCallActivityFactory(id, subgraphId, *this, helperFactory, kind, extra);
  515. }
  516. case TAKnwayselect:
  517. return createRoxieServerNWaySelectActivityFactory(id, subgraphId, *this, helperFactory, kind);
  518. case TAKnonempty:
  519. return createRoxieServerNonEmptyActivityFactory(id, subgraphId, *this, helperFactory, kind);
  520. case TAKprefetchproject:
  521. return createRoxieServerPrefetchProjectActivityFactory(id, subgraphId, *this, helperFactory, kind);
  522. case TAKwhen_dataset:
  523. return createRoxieServerWhenActivityFactory(id, subgraphId, *this, helperFactory, kind);
  524. case TAKwhen_action:
  525. return createRoxieServerWhenActionActivityFactory(id, subgraphId, *this, helperFactory, kind, isRootAction(node));
  526. // These are not required in Roxie for the time being - code generator should trap them
  527. case TAKdistribution:
  528. case TAKchilddataset:
  529. default:
  530. throw MakeStringException(ROXIE_UNIMPLEMENTED_ERROR, "Unimplemented activity %s required", getActivityText(kind));
  531. break;
  532. }
  533. throwUnexpected(); // unreachable, but some compilers will complain about missing return
  534. }
  535. IActivityFactory *findActivity(unsigned id) const
  536. {
  537. if (id)
  538. {
  539. IActivityFactory **f = allActivities.getValue(id);
  540. if (f)
  541. return *f;
  542. }
  543. return NULL;
  544. }
  545. virtual IRoxieServerActivityFactory *getRoxieServerActivityFactory(unsigned id) const
  546. {
  547. checkSuspended();
  548. return LINK(QUERYINTERFACE(findActivity(id), IRoxieServerActivityFactory));
  549. }
  550. virtual ISlaveActivityFactory *getSlaveActivityFactory(unsigned id) const
  551. {
  552. checkSuspended();
  553. IActivityFactory *f = findActivity(id);
  554. return LINK(QUERYINTERFACE(f, ISlaveActivityFactory)); // MORE - don't dynamic cast yuk
  555. }
  556. ActivityArray *loadChildGraph(IPropertyTree &graph)
  557. {
  558. // MORE - this is starting to look very much like loadGraph (on Roxie server side)
  559. ActivityArray *activities = new ActivityArray(true, graph.getPropBool("@delayed"), graph.getPropBool("@library"));
  560. unsigned subgraphId = graph.getPropInt("@id");
  561. try
  562. {
  563. Owned<IPropertyTreeIterator> nodes = graph.getElements("node");
  564. ForEach(*nodes)
  565. {
  566. IPropertyTree &node = nodes->query();
  567. loadNode(node, subgraphId, activities);
  568. }
  569. Owned<IPropertyTreeIterator> edges = graph.getElements("edge");
  570. ForEach(*edges)
  571. {
  572. IPropertyTree &edge = edges->query();
  573. unsigned source = activities->findActivityIndex(edge.getPropInt("@source",0));
  574. unsigned target = activities->findActivityIndex(edge.getPropInt("@target",0));
  575. unsigned sourceOutput = edge.getPropInt("att[@name=\"_sourceIndex\"]/@value", 0);
  576. unsigned targetInput = edge.getPropInt("att[@name=\"_targetIndex\"]/@value", 0);
  577. activities->serverItem(target).setInput(targetInput, source, sourceOutput);
  578. }
  579. }
  580. catch (...)
  581. {
  582. ::Release(activities);
  583. throw;
  584. }
  585. return activities;
  586. }
  587. void loadNode(IPropertyTree &node, unsigned subgraphId, ActivityArray *activities)
  588. {
  589. ThorActivityKind kind = getActivityKind(node);
  590. if (kind==TAKsubgraph)
  591. {
  592. IPropertyTree * childGraphNode = node.queryPropTree("att/graph");
  593. if (childGraphNode->getPropBool("@child"))
  594. {
  595. loadSubgraph(node, activities);
  596. }
  597. else
  598. {
  599. unsigned parentId = findParentId(node);
  600. assertex(parentId);
  601. unsigned parentIdx = activities->findActivityIndex(parentId);
  602. IActivityFactory &parentFactory = activities->item(parentIdx);
  603. ActivityArray *childQuery = loadChildGraph(*childGraphNode);
  604. parentFactory.addChildQuery(node.getPropInt("@id"), childQuery);
  605. }
  606. }
  607. else if (kind)
  608. {
  609. IRoxieServerActivityFactory *f = createActivityFactory(kind, subgraphId, node);
  610. if (f)
  611. {
  612. activities->append(*f);
  613. allActivities.setValue(f->queryId(), f);
  614. }
  615. }
  616. }
  617. void loadSubgraph(IPropertyTree &graph, ActivityArray *activities)
  618. {
  619. unsigned subgraphId = graph.getPropInt("@id");
  620. Owned<IPropertyTreeIterator> nodes = graph.getElements("att/graph/node");
  621. ForEach(*nodes)
  622. {
  623. IPropertyTree &node = nodes->query();
  624. loadNode(node, subgraphId, activities);
  625. }
  626. if (!isSuspended)
  627. {
  628. Owned<IPropertyTreeIterator> edges = graph.getElements("att/graph/edge");
  629. ForEach(*edges)
  630. {
  631. IPropertyTree &edge = edges->query();
  632. unsigned source = activities->findActivityIndex(edge.getPropInt("@source", 0));
  633. unsigned target = activities->recursiveFindActivityIndex(edge.getPropInt("@target", 0));
  634. unsigned sourceOutput = edge.getPropInt("att[@name=\"_sourceIndex\"]/@value", 0);
  635. unsigned targetInput = edge.getPropInt("att[@name=\"_targetIndex\"]/@value", 0);
  636. activities->serverItem(target).setInput(targetInput, source, sourceOutput);
  637. }
  638. }
  639. }
  640. // loadGraph loads outer level graph. This is virtual as slave is very different from Roxie server
  641. virtual ActivityArray *loadGraph(IPropertyTree &graph, const char *graphName) = 0;
  642. bool doAddDependency(unsigned sourceIdx, unsigned sourceId, unsigned targetId, int controlId, const char *edgeId, ActivityArray * activities)
  643. {
  644. // Note - the dependency is recorded with the target being the parent activity that is at the same level as the source
  645. // (recording it on the child that was actually dependent would mean it happened too late)
  646. unsigned source = activities->findActivityIndex(sourceId);
  647. if (source != NotFound)
  648. {
  649. unsigned target = activities->recursiveFindActivityIndex(targetId);
  650. activities->serverItem(target).addDependency(source, activities->serverItem(source).getKind(), sourceIdx, controlId, edgeId);
  651. activities->serverItem(source).noteDependent(target);
  652. return true;
  653. }
  654. ForEachItemIn(idx, *activities)
  655. {
  656. IActivityFactory & cur = activities->item(idx);
  657. unsigned childId;
  658. for (unsigned childIdx = 0;;childIdx++)
  659. {
  660. ActivityArray * children = cur.queryChildQuery(childIdx, childId);
  661. if (!children)
  662. break;
  663. if (doAddDependency(sourceIdx, sourceId, targetId, controlId, edgeId, children))
  664. return true;
  665. }
  666. }
  667. return false;
  668. }
  669. virtual void addDependency(unsigned sourceIdx, unsigned sourceId, unsigned targetId, int controlId, const char *edgeId, ActivityArray * activities)
  670. {
  671. doAddDependency(sourceIdx, sourceId, targetId, controlId, edgeId, activities);
  672. }
  673. void addDependencies(IPropertyTree &graph, ActivityArray *activities)
  674. {
  675. Owned<IPropertyTreeIterator> dependencies = graph.getElements("edge");
  676. ForEach(*dependencies)
  677. {
  678. IPropertyTree &edge = dependencies->query();
  679. if (!edge.getPropInt("att[@name=\"_childGraph\"]/@value", 0))
  680. {
  681. unsigned sourceIdx = edge.getPropInt("att[@name=\"_sourceIndex\"]/@value", 0);
  682. int controlId = edge.getPropInt("att[@name=\"_when\"]/@value", 0);
  683. addDependency(sourceIdx, edge.getPropInt("att[@name=\"_sourceActivity\"]/@value", 0), edge.getPropInt("att[@name=\"_targetActivity\"]/@value", 0), controlId, edge.queryProp("@id"), activities);
  684. }
  685. }
  686. }
  687. public:
  688. IMPLEMENT_IINTERFACE;
  689. unsigned channelNo;
  690. CQueryFactory(const char *_id, const IQueryDll *_dll, const IRoxiePackage &_package, hash64_t _hashValue, unsigned _channelNo)
  691. : id(_id), package(_package), dll(_dll), channelNo(_channelNo), hashValue(_hashValue)
  692. {
  693. package.Link();
  694. isSuspended = false;
  695. libraryInterfaceHash = 0;
  696. priority = 0;
  697. memoryLimit = defaultMemoryLimit;
  698. timeLimit = defaultTimeLimit[priority];
  699. warnTimeLimit = 0;
  700. enableFieldTranslation = fieldTranslationEnabled;
  701. }
  702. ~CQueryFactory()
  703. {
  704. HashIterator graphs(graphMap);
  705. for(graphs.first();graphs.isValid();graphs.next())
  706. {
  707. ActivityArray *a = *graphMap.mapToValue(&graphs.query());
  708. a->Release();
  709. }
  710. package.Release();
  711. }
  712. virtual IQueryFactory *lookupLibrary(const char *libraryName, unsigned expectedInterfaceHash, const IRoxieContextLogger &logctx) const
  713. {
  714. return globalPackageSetManager->lookupLibrary(package, libraryName, expectedInterfaceHash, logctx);
  715. }
  716. virtual void beforeDispose()
  717. {
  718. SpinBlock b(queriesCrit);
  719. // NOTE: it's theoretically possible for the final release to happen after a replacement has been inserted into hash table.
  720. // So only remove from hash table if what we find there matches the item that is being deleted.
  721. CQueryFactory *goer = queryMap.getValue(hashValue+channelNo);
  722. if (goer == this)
  723. queryMap.remove(hashValue+channelNo);
  724. }
  725. static IQueryFactory *getQueryFactory(hash64_t hashValue, unsigned channelNo)
  726. {
  727. SpinBlock b(queriesCrit);
  728. CQueryFactory *factory = LINK(queryMap.getValue(hashValue+channelNo));
  729. if (factory && factory->isAlive())
  730. return factory;
  731. else
  732. return NULL;
  733. }
  734. static hash64_t getQueryHash(const char *id, const IQueryDll *dll, const IRoxiePackage &package, const IPropertyTree *stateInfo)
  735. {
  736. hash64_t hashValue = rtlHash64VStr(dll->queryDll()->queryName(), package.queryHash());
  737. hashValue = rtlHash64VStr(id, hashValue);
  738. if (stateInfo)
  739. {
  740. StringBuffer xml;
  741. toXML(stateInfo, xml);
  742. hashValue = rtlHash64Data(xml.length(), xml.str(), hashValue);
  743. }
  744. return hashValue;
  745. }
  746. virtual void load(const IPropertyTree *stateInfo)
  747. {
  748. IConstWorkUnit *wu = dll->queryWorkUnit();
  749. if (wu) // wu may be null in some unit test cases
  750. {
  751. libraryInterfaceHash = wu->getApplicationValueInt("LibraryModule", "interfaceHash", 0);
  752. // calculate priority before others since it affects the defaults of others
  753. priority = wu->getDebugValueInt("@priority", 0);
  754. if (stateInfo)
  755. priority = stateInfo->getPropInt("@priority", priority);
  756. memoryLimit = (memsize_t) wu->getDebugValueInt64("memoryLimit", defaultMemoryLimit);
  757. timeLimit = (unsigned) wu->getDebugValueInt("timeLimit", defaultTimeLimit[priority]);
  758. warnTimeLimit = (unsigned) wu->getDebugValueInt("warnTimeLimit", 0);
  759. SCMStringBuffer bStr;
  760. enableFieldTranslation = strToBool(wu->getDebugValue("layoutTranslationEnabled", bStr).str());
  761. // MORE - does package override stateInfo, or vice versa?
  762. if (stateInfo)
  763. {
  764. // info in querySets can override the defaults from workunit for some limits
  765. isSuspended = stateInfo->getPropBool("@suspended", false);
  766. memoryLimit = (memsize_t) stateInfo->getPropInt64("@memoryLimit", memoryLimit);
  767. timeLimit = (unsigned) stateInfo->getPropInt("@timeLimit", timeLimit);
  768. warnTimeLimit = (unsigned) stateInfo->getPropInt("@warnTimeLimit", warnTimeLimit);
  769. }
  770. Owned<IConstWUGraphIterator> graphs = &wu->getGraphs(GraphTypeActivities);
  771. SCMStringBuffer graphNameStr;
  772. ForEach(*graphs)
  773. {
  774. graphs->query().getName(graphNameStr);
  775. const char *graphName = graphNameStr.s.str();
  776. Owned<IPropertyTree> graphXgmml = graphs->query().getXGMMLTree(false);
  777. try
  778. {
  779. ActivityArray *activities = loadGraph(*graphXgmml, graphName);
  780. graphMap.setValue(graphName, activities);
  781. }
  782. catch (IException *E)
  783. {
  784. StringBuffer m;
  785. E->errorMessage(m);
  786. suspend(true, m.str(), NULL, false);
  787. ERRLOG("Query %s suspended: %s", id.get(), m.str());
  788. E->Release();
  789. }
  790. }
  791. }
  792. SpinBlock b(queriesCrit);
  793. queryMap.setValue(hashValue+channelNo, this);
  794. }
  795. virtual unsigned queryChannel() const
  796. {
  797. return channelNo;
  798. }
  799. virtual hash64_t queryHash() const
  800. {
  801. return hashValue;
  802. }
  803. virtual const char *loadResource(unsigned id)
  804. {
  805. return (const char *) queryDll()->getResource(id);
  806. }
  807. virtual ActivityArray *lookupGraphActivities(const char *name) const
  808. {
  809. return *graphMap.getValue(name);
  810. }
  811. virtual IActivityGraph *lookupGraph(const char *name, IProbeManager *probeManager, const IRoxieContextLogger &logctx, IRoxieServerActivity *parentActivity) const
  812. {
  813. ActivityArrayPtr *graph = graphMap.getValue(name);
  814. assertex(graph);
  815. Owned<IActivityGraph> ret = ::createActivityGraph(name, 0, **graph, parentActivity, probeManager, logctx);
  816. return ret.getClear();
  817. }
  818. void getGraphStats(StringBuffer &reply, const IPropertyTree &thisGraph) const
  819. {
  820. Owned<IPropertyTree> graph = createPTreeFromIPT(&thisGraph);
  821. Owned<IPropertyTreeIterator> edges = graph->getElements(".//edge");
  822. ForEach(*edges)
  823. {
  824. IPropertyTree &edge = edges->query();
  825. IActivityFactory *a = findActivity(edge.getPropInt("@source", 0));
  826. if (!a)
  827. a = findActivity(edge.getPropInt("att[@name=\"_sourceActivity\"]/@value", 0));
  828. if (a)
  829. {
  830. unsigned sourceOutput = edge.getPropInt("att[@name=\"_sourceIndex\"]/@value", 0);
  831. a->getEdgeProgressInfo(sourceOutput, edge);
  832. }
  833. }
  834. Owned<IPropertyTreeIterator> nodes = graph->getElements(".//node");
  835. ForEach(*nodes)
  836. {
  837. IPropertyTree &node = nodes->query();
  838. IActivityFactory *a = findActivity(node.getPropInt("@id", 0));
  839. if (a)
  840. a->getNodeProgressInfo(node);
  841. }
  842. toXML(graph, reply);
  843. }
  844. virtual IPropertyTree* cloneQueryXGMML() const
  845. {
  846. assertex(dll->queryWorkUnit());
  847. Owned<IPropertyTree> tree = createPTree("Query");
  848. Owned<IConstWUGraphIterator> graphs = &dll->queryWorkUnit()->getGraphs(GraphTypeActivities);
  849. SCMStringBuffer graphNameStr;
  850. ForEach(*graphs)
  851. {
  852. graphs->query().getName(graphNameStr);
  853. const char *graphName = graphNameStr.s.str();
  854. Owned<IPropertyTree> graphXgmml = graphs->query().getXGMMLTree(false);
  855. IPropertyTree *newGraph = createPTree();
  856. newGraph->setProp("@id", graphName);
  857. IPropertyTree *newXGMML = createPTree();
  858. newXGMML->addPropTree("graph", graphXgmml.getLink());
  859. newGraph->addPropTree("xgmml", newXGMML);
  860. tree->addPropTree("Graph", newGraph);
  861. }
  862. return tree.getClear();
  863. }
  864. virtual void getStats(StringBuffer &reply, const char *graphName) const
  865. {
  866. assertex(dll->queryWorkUnit());
  867. Owned<IConstWUGraphIterator> graphs = &dll->queryWorkUnit()->getGraphs(GraphTypeActivities);
  868. SCMStringBuffer thisGraphNameStr;
  869. ForEach(*graphs)
  870. {
  871. graphs->query().getName(thisGraphNameStr);
  872. if (graphName)
  873. {
  874. if (thisGraphNameStr.length() && (stricmp(graphName, thisGraphNameStr.s.str()) != 0))
  875. continue; // not interested in this one
  876. }
  877. reply.appendf("<Graph id='%s'><xgmml><graph>", thisGraphNameStr.s.str());
  878. Owned<IPropertyTree> graphXgmml = graphs->query().getXGMMLTree(false);
  879. getGraphStats(reply, *graphXgmml);
  880. reply.append("</graph></xgmml></Graph>");
  881. }
  882. }
  883. virtual void getActivityMetrics(StringBuffer &reply) const
  884. {
  885. HashIterator i(allActivities);
  886. StringBuffer myReply;
  887. ForEach(i)
  888. {
  889. IActivityFactory *f = *allActivities.mapToValue(&i.query());
  890. f->getActivityMetrics(myReply.clear());
  891. if (myReply.length())
  892. {
  893. reply.appendf(" <activity query='%s' id='%d' channel='%d'\n", queryQueryName(), f->queryId(), queryChannel());
  894. reply.append(myReply);
  895. reply.append(" </activity>\n");
  896. }
  897. }
  898. }
  899. virtual void getQueryInfo(StringBuffer &reply, bool full, const IRoxieContextLogger &logctx) const
  900. {
  901. Owned<IPropertyTree> xref = createPTree("Query", 0);
  902. xref->setProp("@id", id);
  903. if (suspended())
  904. {
  905. xref->setPropBool("@suspended", true);
  906. xref->setProp("@error", errorMessage);
  907. }
  908. if (full)
  909. {
  910. HashIterator i(allActivities);
  911. ForEach(i)
  912. {
  913. IActivityFactory *f = *allActivities.mapToValue(&i.query());
  914. f->getXrefInfo(*xref, logctx);
  915. }
  916. }
  917. toXML(xref, reply);
  918. }
  919. virtual void resetQueryTimings()
  920. {
  921. HashIterator i(allActivities);
  922. ForEach(i)
  923. {
  924. IActivityFactory *f = *allActivities.mapToValue(&i.query());
  925. f->resetNodeProgressInfo();
  926. }
  927. }
  928. virtual const char *queryErrorMessage() const
  929. {
  930. return errorMessage.str();
  931. }
  932. virtual const char *queryQueryName() const
  933. {
  934. return id;
  935. }
  936. virtual bool isQueryLibrary() const
  937. {
  938. return libraryInterfaceHash != 0;
  939. }
  940. virtual unsigned getQueryLibraryInterfaceHash() const
  941. {
  942. return libraryInterfaceHash;
  943. }
  944. virtual void suspend(bool suspendit, const char* errMsg, const char *userId, bool appendIfNewError)
  945. {
  946. // MORE - should wait until no queries active before returning
  947. isSuspended = suspendit; // Atomic enough for our purposes I think - at least until the wait stuff is in place
  948. if (appendIfNewError)
  949. {
  950. if (errorMessage.length())
  951. {
  952. // MORE - not the most efficient code, but this error condition should not occur in production
  953. if (strstr(errorMessage.str(), errMsg) == 0)
  954. errorMessage.appendf(", %s", errMsg);
  955. }
  956. else
  957. errorMessage.append(errMsg);
  958. }
  959. else
  960. errorMessage.clear().append(errMsg);
  961. }
  962. virtual bool suspended() const
  963. {
  964. return isSuspended;
  965. }
  966. virtual memsize_t getMemoryLimit() const
  967. {
  968. return memoryLimit;
  969. }
  970. virtual unsigned getTimeLimit() const
  971. {
  972. return timeLimit;
  973. }
  974. virtual ILoadedDllEntry *queryDll() const
  975. {
  976. return dll->queryDll();
  977. }
  978. virtual IConstWorkUnit *queryWorkUnit() const
  979. {
  980. return dll->queryWorkUnit();
  981. }
  982. virtual const IRoxiePackage &queryPackage() const
  983. {
  984. return package;
  985. }
  986. virtual WorkflowMachine *createWorkflowMachine(bool isOnce, const IRoxieContextLogger &logctx) const
  987. {
  988. throwUnexpected(); // only on server...
  989. }
  990. virtual char *getEnv(const char *name, const char *defaultValue) const
  991. {
  992. if (!defaultValue)
  993. defaultValue = "";
  994. const char *result;
  995. if (name && *name=='@')
  996. {
  997. // @ is shorthand for control: for legacy compatibility reasons
  998. StringBuffer useName;
  999. useName.append("control:").append(name+1);
  1000. result = package.queryEnv(useName.str());
  1001. }
  1002. else
  1003. result = package.queryEnv(name);
  1004. if (!result)
  1005. result = getenv(name);
  1006. return strdup(result ? result : defaultValue);
  1007. }
  1008. virtual unsigned getPriority() const
  1009. {
  1010. return priority;
  1011. }
  1012. virtual unsigned getWarnTimeLimit() const
  1013. {
  1014. return warnTimeLimit;
  1015. }
  1016. virtual int getDebugValueInt(const char * propname, int defVal) const
  1017. {
  1018. assertex(dll->queryWorkUnit());
  1019. return dll->queryWorkUnit()->getDebugValueInt(propname, defVal);
  1020. }
  1021. virtual bool getDebugValueBool(const char * propname, bool defVal) const
  1022. {
  1023. assertex(dll->queryWorkUnit());
  1024. return dll->queryWorkUnit()->getDebugValueBool(propname, defVal);
  1025. }
  1026. bool getEnableFieldTranslation() const
  1027. {
  1028. return enableFieldTranslation;
  1029. }
  1030. virtual IRoxieSlaveContext *createSlaveContext(const SlaveContextLogger &logctx, IRoxieQueryPacket *packet) const
  1031. {
  1032. throwUnexpected(); // only implemented in derived slave class
  1033. }
  1034. virtual IPropertyTree &queryOnceContext() const
  1035. {
  1036. throwUnexpected(); // only implemented in derived server class
  1037. }
  1038. virtual IDeserializedResultStore &queryOnceResultStore() const
  1039. {
  1040. throwUnexpected(); // only implemented in derived server class
  1041. }
  1042. virtual IRoxieServerContext *createContext(IPropertyTree *xml, SafeSocket &client, bool isXml, bool isRaw, bool isBlocked, HttpHelper &httpHelper, bool trim, const IRoxieContextLogger &_logctx, XmlReaderOptions xmlReadFlags) const
  1043. {
  1044. throwUnexpected(); // only implemented in derived server class
  1045. }
  1046. virtual IRoxieServerContext *createContext(IConstWorkUnit *wu, const IRoxieContextLogger &_logctx) const
  1047. {
  1048. throwUnexpected(); // only implemented in derived server class
  1049. }
  1050. virtual void noteQuery(time_t startTime, bool failed, unsigned elapsed, unsigned memused, unsigned slavesReplyLen, unsigned bytesOut)
  1051. {
  1052. throwUnexpected(); // only implemented in derived server class
  1053. }
  1054. virtual IPropertyTree *getQueryStats(time_t from, time_t to)
  1055. {
  1056. throwUnexpected(); // only implemented in derived server class
  1057. }
  1058. virtual void getGraphNames(StringArray &ret) const
  1059. {
  1060. Owned<IConstWUGraphIterator> graphs = &dll->queryWorkUnit()->getGraphs(GraphTypeActivities);
  1061. ForEach(*graphs)
  1062. {
  1063. SCMStringBuffer graphName;
  1064. graphs->query().getName(graphName);
  1065. ret.append(graphName.str());
  1066. }
  1067. }
  1068. protected:
  1069. void checkSuspended() const
  1070. {
  1071. if (isSuspended)
  1072. {
  1073. StringBuffer err;
  1074. if (errorMessage.length())
  1075. err.appendf(" because %s", errorMessage.str());
  1076. throw MakeStringException(ROXIE_QUERY_SUSPENDED, "Query %s is suspended%s", id.get(), err.str());
  1077. }
  1078. }
  1079. };
  1080. CriticalSection CQueryFactory::queryCreateLock;
  1081. SpinLock CQueryFactory::queriesCrit;
  1082. CopyMapXToMyClass<hash64_t, hash64_t, CQueryFactory> CQueryFactory::queryMap;
  1083. extern IQueryFactory *getQueryFactory(hash64_t hashvalue, unsigned channel)
  1084. {
  1085. return CQueryFactory::getQueryFactory(hashvalue, channel);
  1086. }
  1087. class CRoxieServerQueryFactory : public CQueryFactory
  1088. {
  1089. // Parts of query factory is only interesting on the server - once support, workflow support, and tracking of total query times
  1090. protected:
  1091. mutable CriticalSection onceCrit;
  1092. mutable Owned<roxiemem::IRowManager> onceManager; // release AFTER resultStore
  1093. mutable Owned<IPropertyTree> onceContext;
  1094. mutable Owned<IDeserializedResultStore> onceResultStore;
  1095. mutable Owned<IException> onceException;
  1096. Owned<IQueryStatsAggregator> queryStats;
  1097. IPropertyTree *queryWorkflowTree() const
  1098. {
  1099. assertex(dll->queryWorkUnit());
  1100. return dll->queryWorkUnit()->queryWorkflowTree();
  1101. }
  1102. bool hasOnceSection() const
  1103. {
  1104. IPropertyTree *workflow = queryWorkflowTree();
  1105. if (workflow)
  1106. return workflow->hasProp("Item[@mode='once']");
  1107. else
  1108. return false;
  1109. }
  1110. public:
  1111. CRoxieServerQueryFactory(const char *_id, const IQueryDll *_dll, const IRoxiePackage &_package, hash64_t _hashValue)
  1112. : CQueryFactory(_id, _dll, _package, _hashValue, 0)
  1113. {
  1114. queryStats.setown(createQueryStatsAggregator(id.get(), statsExpiryTime));
  1115. }
  1116. virtual void noteQuery(time_t startTime, bool failed, unsigned elapsed, unsigned memused, unsigned slavesReplyLen, unsigned bytesOut)
  1117. {
  1118. queryStats->noteQuery(startTime, failed, elapsed, memused, slavesReplyLen, bytesOut);
  1119. }
  1120. virtual void addDependency(unsigned sourceIdx, unsigned sourceId, unsigned targetId, int controlId, const char *edgeId, ActivityArray * activities)
  1121. {
  1122. // addDependency is expected to fail occasionally on slave, but never on Roxie server
  1123. if (!doAddDependency(sourceIdx, sourceId, targetId, controlId, edgeId, activities))
  1124. throw MakeStringException(ROXIE_ADDDEPENDENCY_ERROR, "Failed to create dependency from %u on %u", sourceId, targetId);
  1125. }
  1126. virtual ActivityArray *loadGraph(IPropertyTree &graph, const char *graphName)
  1127. {
  1128. bool isLibraryGraph = graph.getPropBool("@library");
  1129. ActivityArray *activities = new ActivityArray(isLibraryGraph, false, isLibraryGraph);
  1130. if (isLibraryGraph)
  1131. activities->setLibraryGraphId(graph.getPropInt("node/@id"));
  1132. try
  1133. {
  1134. Owned<IPropertyTreeIterator> subgraphs = graph.getElements("node");
  1135. ForEach(*subgraphs)
  1136. {
  1137. IPropertyTree &node = subgraphs->query();
  1138. loadSubgraph(node, activities);
  1139. loadNode(node, 0, activities);
  1140. }
  1141. addDependencies(graph, activities);
  1142. }
  1143. catch (...)
  1144. {
  1145. ::Release(activities);
  1146. throw;
  1147. }
  1148. return activities;
  1149. }
  1150. virtual IDeserializedResultStore &queryOnceResultStore() const
  1151. {
  1152. assertex(onceResultStore!= NULL);
  1153. return *onceResultStore;
  1154. }
  1155. virtual IPropertyTree &queryOnceContext() const
  1156. {
  1157. assertex(onceContext != NULL);
  1158. return *onceContext;
  1159. }
  1160. virtual void checkOnceDone(const IRoxieContextLogger &_logctx) const
  1161. {
  1162. if (hasOnceSection())
  1163. {
  1164. CriticalBlock b(onceCrit);
  1165. if (!onceContext)
  1166. {
  1167. onceContext.setown(createPTree());
  1168. onceResultStore.setown(createDeserializedResultStore());
  1169. Owned <IRoxieServerContext> ctx = createOnceServerContext(this, _logctx);
  1170. onceManager.set(&ctx->queryRowManager());
  1171. try
  1172. {
  1173. ctx->process();
  1174. ctx->done(false);
  1175. }
  1176. catch (IException *E)
  1177. {
  1178. ctx->done(true);
  1179. onceException.setown(E);
  1180. }
  1181. catch (...)
  1182. {
  1183. ctx->done(true);
  1184. onceException.setown(MakeStringException(ROXIE_INTERNAL_ERROR, "Unknown exception in ONCE code"));
  1185. }
  1186. }
  1187. if (onceException)
  1188. throw onceException.getLink();
  1189. }
  1190. }
  1191. virtual IRoxieServerContext *createContext(IPropertyTree *context, SafeSocket &client, bool isXml, bool isRaw, bool isBlocked, HttpHelper &httpHelper, bool trim, const IRoxieContextLogger &_logctx, XmlReaderOptions _xmlReadFlags) const
  1192. {
  1193. checkSuspended();
  1194. checkOnceDone(_logctx);
  1195. return createRoxieServerContext(context, this, client, isXml, isRaw, isBlocked, httpHelper, trim, priority, _logctx, _xmlReadFlags);
  1196. }
  1197. virtual IRoxieServerContext *createContext(IConstWorkUnit *wu, const IRoxieContextLogger &_logctx) const
  1198. {
  1199. checkSuspended();
  1200. checkOnceDone(_logctx);
  1201. return createWorkUnitServerContext(wu, this, _logctx);
  1202. }
  1203. virtual WorkflowMachine *createWorkflowMachine(bool isOnce, const IRoxieContextLogger &logctx) const
  1204. {
  1205. IPropertyTree *workflow = queryWorkflowTree();
  1206. if (workflow)
  1207. {
  1208. return ::createRoxieWorkflowMachine(workflow, isOnce, logctx);
  1209. }
  1210. else
  1211. return NULL;
  1212. }
  1213. virtual IPropertyTree *getQueryStats(time_t from, time_t to)
  1214. {
  1215. return queryStats->getStats(from, to);
  1216. }
  1217. };
  1218. extern IQueryFactory *createServerQueryFactory(const char *id, const IQueryDll *dll, const IRoxiePackage &package, const IPropertyTree *stateInfo)
  1219. {
  1220. CriticalBlock b(CQueryFactory::queryCreateLock);
  1221. hash64_t hashValue = CQueryFactory::getQueryHash(id, dll, package, stateInfo);
  1222. IQueryFactory *cached = getQueryFactory(hashValue, 0);
  1223. if (cached)
  1224. {
  1225. ::Release(dll);
  1226. return cached;
  1227. }
  1228. Owned<CRoxieServerQueryFactory> newFactory = new CRoxieServerQueryFactory(id, dll, package, hashValue);
  1229. newFactory->load(stateInfo);
  1230. return newFactory.getClear();
  1231. }
  1232. extern IQueryFactory *createServerQueryFactoryFromWu(IConstWorkUnit *wu)
  1233. {
  1234. Owned<const IQueryDll> dll = createWuQueryDll(wu);
  1235. if (!dll)
  1236. return NULL;
  1237. SCMStringBuffer wuid;
  1238. return createServerQueryFactory(wu->getWuid(wuid).str(), dll.getClear(), queryRootPackage(), NULL); // MORE - if use a constant for id might cache better?
  1239. }
  1240. //==============================================================================================================================================
  1241. class CSlaveQueryFactory : public CQueryFactory
  1242. {
  1243. void addActivity(ISlaveActivityFactory *activity, ActivityArray *activities)
  1244. {
  1245. activities->append(*activity);
  1246. unsigned activityId = activity->queryId();
  1247. allActivities.setValue(activityId, activity);
  1248. }
  1249. void loadSlaveNode(IPropertyTree &node, unsigned subgraphId, ActivityArray *activities)
  1250. {
  1251. ThorActivityKind kind = getActivityKind(node);
  1252. switch (kind)
  1253. {
  1254. case TAKcsvread:
  1255. case TAKxmlread:
  1256. case TAKdiskread:
  1257. if (node.getPropBool("att[@name='_isSpill']/@value", false) || node.getPropBool("att[@name='_isSpillGlobal']/@value", false))
  1258. return;
  1259. break;
  1260. case TAKkeyedjoin:
  1261. case TAKkeyeddenormalize:
  1262. case TAKkeyeddenormalizegroup:
  1263. case TAKdisknormalize:
  1264. case TAKdiskcount:
  1265. case TAKdiskaggregate:
  1266. case TAKdiskgroupaggregate:
  1267. case TAKindexread:
  1268. case TAKindexnormalize:
  1269. case TAKindexcount:
  1270. case TAKindexaggregate:
  1271. case TAKindexgroupaggregate:
  1272. case TAKindexgroupexists:
  1273. case TAKindexgroupcount:
  1274. case TAKfetch:
  1275. case TAKcsvfetch:
  1276. case TAKxmlfetch:
  1277. case TAKremotegraph:
  1278. break;
  1279. case TAKsubgraph:
  1280. break;
  1281. default:
  1282. return;
  1283. }
  1284. ISlaveActivityFactory *newAct = NULL;
  1285. if (kind != TAKsubgraph)
  1286. {
  1287. if (isSuspended)
  1288. newAct = createRoxieDummyActivityFactory(node, subgraphId, *this, false); // MORE - is there any point?
  1289. else
  1290. {
  1291. StringBuffer helperName;
  1292. node.getProp("att[@name=\"helper\"]/@value", helperName);
  1293. if (!helperName.length())
  1294. helperName.append("fAc").append(node.getPropInt("@id", 0));
  1295. HelperFactory *helperFactory = dll->getFactory(helperName.str());
  1296. if (!helperFactory)
  1297. throw MakeStringException(ROXIE_INTERNAL_ERROR, "Internal error: helper function %s not exported", helperName.str());
  1298. switch (kind)
  1299. {
  1300. case TAKdiskread:
  1301. newAct = createRoxieDiskReadActivityFactory(node, subgraphId, *this, helperFactory);
  1302. break;
  1303. case TAKcsvread:
  1304. newAct = createRoxieCsvReadActivityFactory(node, subgraphId, *this, helperFactory);
  1305. break;
  1306. case TAKxmlread:
  1307. newAct = createRoxieXmlReadActivityFactory(node, subgraphId, *this, helperFactory);
  1308. break;
  1309. case TAKdisknormalize:
  1310. newAct = createRoxieDiskNormalizeActivityFactory(node, subgraphId, *this, helperFactory);
  1311. break;
  1312. case TAKdiskcount:
  1313. newAct = createRoxieDiskCountActivityFactory(node, subgraphId, *this, helperFactory);
  1314. break;
  1315. case TAKdiskaggregate:
  1316. newAct = createRoxieDiskAggregateActivityFactory(node, subgraphId, *this, helperFactory);
  1317. break;
  1318. case TAKdiskgroupaggregate:
  1319. newAct = createRoxieDiskGroupAggregateActivityFactory(node, subgraphId, *this, helperFactory);
  1320. break;
  1321. case TAKindexread:
  1322. newAct = createRoxieIndexReadActivityFactory(node, subgraphId, *this, helperFactory);
  1323. break;
  1324. case TAKindexnormalize:
  1325. newAct = createRoxieIndexNormalizeActivityFactory(node, subgraphId, *this, helperFactory);
  1326. break;
  1327. case TAKindexcount:
  1328. newAct = createRoxieIndexCountActivityFactory(node, subgraphId, *this, helperFactory);
  1329. break;
  1330. case TAKindexaggregate:
  1331. newAct = createRoxieIndexAggregateActivityFactory(node, subgraphId, *this, helperFactory);
  1332. break;
  1333. case TAKindexgroupaggregate:
  1334. case TAKindexgroupexists:
  1335. case TAKindexgroupcount:
  1336. newAct = createRoxieIndexGroupAggregateActivityFactory(node, subgraphId, *this, helperFactory, kind);
  1337. break;
  1338. case TAKfetch:
  1339. newAct = createRoxieFetchActivityFactory(node, subgraphId, *this, helperFactory);
  1340. break;
  1341. case TAKcsvfetch:
  1342. newAct = createRoxieCSVFetchActivityFactory(node, subgraphId, *this, helperFactory);
  1343. break;
  1344. case TAKxmlfetch:
  1345. newAct = createRoxieXMLFetchActivityFactory(node, subgraphId, *this, helperFactory);
  1346. break;
  1347. case TAKkeyedjoin:
  1348. case TAKkeyeddenormalize:
  1349. case TAKkeyeddenormalizegroup:
  1350. newAct = createRoxieKeyedJoinIndexActivityFactory(node, subgraphId, *this, helperFactory);
  1351. if (node.getPropBool("att[@name=\"_diskAccessRequired\"]/@value"))
  1352. {
  1353. ISlaveActivityFactory *newAct2 = createRoxieKeyedJoinFetchActivityFactory(node, subgraphId, *this, helperFactory);
  1354. unsigned activityId2 = newAct2->queryId() | ROXIE_ACTIVITY_FETCH;
  1355. activities->append(*newAct2);
  1356. allActivities.setValue(activityId2, newAct2);
  1357. }
  1358. break;
  1359. case TAKremotegraph:
  1360. {
  1361. unsigned graphId = node.getPropInt("att[@name=\"_graphid\"]/@value", 0);
  1362. newAct = createRoxieRemoteActivityFactory(node, subgraphId, *this, helperFactory, graphId);
  1363. break;
  1364. }
  1365. default:
  1366. throwUnexpected();
  1367. }
  1368. }
  1369. if (newAct)
  1370. {
  1371. addActivity(newAct, activities);
  1372. }
  1373. }
  1374. else if (kind == TAKsubgraph)
  1375. {
  1376. // If the subgraph belongs to a remote activity, we need to be able to execute it on the slave...
  1377. IPropertyTree * childGraphNode = node.queryPropTree("att/graph");
  1378. if (!childGraphNode->getPropBool("@child"))
  1379. {
  1380. unsigned parentId = findParentId(node);
  1381. assertex(parentId);
  1382. unsigned parentIndex = activities->findActivityIndex(parentId);
  1383. if (parentIndex != NotFound)
  1384. {
  1385. ActivityArray *childQuery = loadChildGraph(*childGraphNode);
  1386. activities->item(parentIndex).addChildQuery(node.getPropInt("@id"), childQuery);
  1387. }
  1388. }
  1389. // Regardless, we need to make sure we create remote activities as required throughout the graph
  1390. Owned<IPropertyTreeIterator> nodes = node.getElements("att/graph/node");
  1391. unsigned subgraphId = node.getPropInt("@id");
  1392. ForEach(*nodes)
  1393. {
  1394. IPropertyTree &node = nodes->query();
  1395. loadSlaveNode(node, subgraphId, activities);
  1396. }
  1397. }
  1398. }
  1399. void loadOuterSubgraph(IPropertyTree &graph, ActivityArray *activities)
  1400. {
  1401. Owned<IPropertyTreeIterator> nodes = graph.getElements("att/graph/node");
  1402. unsigned subgraphId = graph.getPropInt("@id");
  1403. ForEach(*nodes)
  1404. {
  1405. IPropertyTree &node = nodes->query();
  1406. loadSlaveNode(node, subgraphId, activities);
  1407. }
  1408. loadSlaveNode(graph, subgraphId, activities); // MORE - not really sure why this line is here!
  1409. }
  1410. public:
  1411. CSlaveQueryFactory(const char *_id, const IQueryDll *_dll, const IRoxiePackage &_package, hash64_t _hashValue, unsigned _channelNo)
  1412. : CQueryFactory(_id, _dll, _package, _hashValue, _channelNo)
  1413. {
  1414. }
  1415. virtual IRoxieSlaveContext *createSlaveContext(const SlaveContextLogger &logctx, IRoxieQueryPacket *packet) const
  1416. {
  1417. return ::createSlaveContext(this, logctx, timeLimit, memoryLimit, packet);
  1418. }
  1419. virtual ActivityArray *loadGraph(IPropertyTree &graph, const char *graphName)
  1420. {
  1421. // MORE: common up with loadGraph for the Roxie server..
  1422. bool isLibraryGraph = graph.getPropBool("@library");
  1423. ActivityArray *activities = new ActivityArray(isLibraryGraph, false, isLibraryGraph);
  1424. if (isLibraryGraph)
  1425. activities->setLibraryGraphId(graph.getPropInt("node/@id"));
  1426. try
  1427. {
  1428. if (false && isLibraryGraph)
  1429. {
  1430. //Really only need to do this if the library is called from a remote activity
  1431. //but it's a bit tricky to work out since the library graph will come before the use.
  1432. //Not a major issue since libraries won't be embedded for production queries.
  1433. // this comment makes little sense...
  1434. Owned<IPropertyTreeIterator> subgraphs = graph.getElements("node");
  1435. ForEach(*subgraphs)
  1436. {
  1437. IPropertyTree &node = subgraphs->query();
  1438. loadSubgraph(node, activities);
  1439. loadNode(node, 0, activities);
  1440. }
  1441. }
  1442. Owned<IPropertyTreeIterator> subgraphs = graph.getElements("node");
  1443. ForEach(*subgraphs)
  1444. {
  1445. IPropertyTree &subgraph = subgraphs->query();
  1446. loadOuterSubgraph(subgraph, activities);
  1447. }
  1448. addDependencies(graph, activities);
  1449. }
  1450. catch (...)
  1451. {
  1452. ::Release(activities);
  1453. throw;
  1454. }
  1455. return activities;
  1456. }
  1457. };
  1458. IQueryFactory *createSlaveQueryFactory(const char *id, const IQueryDll *dll, const IRoxiePackage &package, unsigned channel, const IPropertyTree *stateInfo)
  1459. {
  1460. CriticalBlock b(CQueryFactory::queryCreateLock);
  1461. hash64_t hashValue = CQueryFactory::getQueryHash(id, dll, package, stateInfo);
  1462. IQueryFactory *cached = getQueryFactory(hashValue, channel);
  1463. if (cached)
  1464. {
  1465. ::Release(dll);
  1466. return cached;
  1467. }
  1468. Owned<CSlaveQueryFactory> newFactory = new CSlaveQueryFactory(id, dll, package, hashValue, channel);
  1469. newFactory->load(stateInfo);
  1470. return newFactory.getClear();
  1471. }
  1472. extern IQueryFactory *createSlaveQueryFactoryFromWu(IConstWorkUnit *wu, unsigned channelNo)
  1473. {
  1474. Owned<const IQueryDll> dll = createWuQueryDll(wu);
  1475. if (!dll)
  1476. return NULL;
  1477. SCMStringBuffer wuid;
  1478. return createSlaveQueryFactory(wu->getWuid(wuid).str(), dll.getClear(), queryRootPackage(), channelNo, NULL); // MORE - if use a constant for id might cache better?
  1479. }
  1480. IRecordLayoutTranslator * createRecordLayoutTranslator(const char *logicalName, IDefRecordMeta const * diskMeta, IDefRecordMeta const * activityMeta)
  1481. {
  1482. try
  1483. {
  1484. return ::createRecordLayoutTranslator(diskMeta, activityMeta);
  1485. }
  1486. catch (IException *E)
  1487. {
  1488. StringBuffer q, d;
  1489. getRecordMetaAsString(q, activityMeta);
  1490. getRecordMetaAsString(d, diskMeta);
  1491. DBGLOG("Activity: %s", q.str());
  1492. DBGLOG("Disk: %s", d.str());
  1493. StringBuffer m;
  1494. m.appendf("In index %s:", logicalName);
  1495. E->errorMessage(m);
  1496. E->Release();
  1497. DBGLOG("%s", m.str());
  1498. throw MakeStringException(ROXIE_RCD_LAYOUT_TRANSLATOR, "%s", m.str());
  1499. }
  1500. }