ccdquery.cpp 76 KB


  1. /*##############################################################################
  2. HPCC SYSTEMS software Copyright (C) 2012 HPCC Systems.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. ############################################################################## */
  13. #include <platform.h>
  14. #include <jlib.hpp>
  15. #include "ccd.hpp"
  16. #include "ccdquery.hpp"
  17. #include "ccdstate.hpp"
  18. #include "ccdsnmp.hpp"
  19. #include "ccdserver.hpp"
  20. #include "ccdcontext.hpp"
  21. #include "thorplugin.hpp"
  22. #include "layouttrans.hpp"
  23. void ActivityArray::append(IActivityFactory &cur)
  24. {
  25. hash.setValue(cur.queryId(), activities.ordinality());
  26. activities.append(cur);
  27. }
  28. unsigned ActivityArray::findActivityIndex(unsigned id)
  29. {
  30. unsigned *ret = hash.getValue(id);
  31. if (ret)
  32. return *ret;
  33. return NotFound;
  34. }
  35. unsigned ActivityArray::recursiveFindActivityIndex(unsigned id)
  36. {
  37. // NOTE - this returns the activity index of the PARENT of the specified activity
  38. unsigned *ret = hash.getValue(id);
  39. if (ret)
  40. return *ret;
  41. ForEachItem(idx)
  42. {
  43. IActivityFactory & cur = item(idx);
  44. unsigned childId;
  45. for (unsigned childIdx = 0;;childIdx++)
  46. {
  47. ActivityArray * children = cur.queryChildQuery(childIdx, childId);
  48. if (!children)
  49. break;
  50. if (children->recursiveFindActivityIndex(id) != NotFound)
  51. {
  52. hash.setValue(id, idx);
  53. return idx;
  54. }
  55. }
  56. }
  57. return NotFound;
  58. }
  59. //----------------------------------------------------------------------------------------------
  60. // Class CQueryDll maps dlls into loadable workunits, complete with caching to ensure that a refresh of the QuerySet
  61. // can avoid reloading dlls, and that the same CQueryDll (and the objects it owns) can be shared between server and
  62. // multiple slave channels
  63. //----------------------------------------------------------------------------------------------
  64. class CQueryDll : public CInterface, implements IQueryDll
  65. {
  66. StringAttr dllName;
  67. Owned <ILoadedDllEntry> dll;
  68. Owned <IConstWorkUnit> wu;
  69. static CriticalSection dllCacheLock;
  70. static CopyMapStringToMyClass<CQueryDll> dllCache;
  71. public:
  72. IMPLEMENT_IINTERFACE;
  73. CQueryDll(const char *_dllName, ILoadedDllEntry *_dll) : dllName(_dllName), dll(_dll)
  74. {
  75. StringBuffer wuXML;
  76. if (getEmbeddedWorkUnitXML(dll, wuXML))
  77. {
  78. Owned<ILocalWorkUnit> localWU = createLocalWorkUnit();
  79. localWU->loadXML(wuXML);
  80. wu.setown(localWU->unlock());
  81. }
  82. CriticalBlock b(dllCacheLock);
  83. dllCache.setValue(dllName, this);
  84. }
  85. virtual void beforeDispose()
  86. {
  87. CriticalBlock b(dllCacheLock);
  88. // NOTE: it's theoretically possible for the final release to happen after a replacement has been inserted into hash table.
  89. // So only remove from hash table if what we find there matches the item that is being deleted.
  90. CQueryDll *goer = dllCache.getValue(dllName);
  91. if (goer == this)
  92. dllCache.remove(dllName);
  93. }
  94. static const CQueryDll *getQueryDll(const char *dllName, bool isExe)
  95. {
  96. CriticalBlock b(dllCacheLock);
  97. CQueryDll *dll = LINK(dllCache.getValue(dllName));
  98. if (dll && dll->isAlive())
  99. return dll;
  100. else
  101. {
  102. Owned<ILoadedDllEntry> dll = isExe ? createExeDllEntry(dllName) : queryRoxieDllServer().loadDll(dllName, DllLocationDirectory);
  103. assertex(dll != NULL);
  104. return new CQueryDll(dllName, dll.getClear());
  105. }
  106. }
  107. static const IQueryDll *getWorkUnitDll(IConstWorkUnit *wu)
  108. {
  109. SCMStringBuffer dllName;
  110. Owned<IConstWUQuery> q = wu->getQuery();
  111. q->getQueryDllName(dllName);
  112. return getQueryDll(dllName.str(), false);
  113. }
  114. virtual HelperFactory *getFactory(const char *helperName) const
  115. {
  116. return (HelperFactory *) dll->getEntry(helperName);
  117. }
  118. virtual ILoadedDllEntry *queryDll() const
  119. {
  120. return dll;
  121. }
  122. virtual IConstWorkUnit *queryWorkUnit() const
  123. {
  124. return wu;
  125. }
  126. };
  127. CriticalSection CQueryDll::dllCacheLock;
  128. CopyMapStringToMyClass<CQueryDll> CQueryDll::dllCache;
  129. extern const IQueryDll *createQueryDll(const char *dllName)
  130. {
  131. return CQueryDll::getQueryDll(dllName, false);
  132. }
  133. extern const IQueryDll *createExeQueryDll(const char *exeName)
  134. {
  135. return CQueryDll::getQueryDll(exeName, true);
  136. }
  137. extern const IQueryDll *createWuQueryDll(IConstWorkUnit *wu)
  138. {
  139. return CQueryDll::getWorkUnitDll(wu);
  140. }
  141. // Add information to the xref information to be returned for a control:getQueryXrefInfo request
  142. IPropertyTree * addXrefInfo(IPropertyTree &reply, const char *section, const char *name)
  143. {
  144. VStringBuffer xpath("%s[@name='%s']", section, name);
  145. if (!reply.hasProp(xpath))
  146. {
  147. IPropertyTree *info = createPTree(section, 0);
  148. info->setProp("@name", name);
  149. return reply.addPropTree(section, info);
  150. }
  151. return NULL;
  152. }
  153. extern void addXrefFileInfo(IPropertyTree &reply, const IResolvedFile *dataFile)
  154. {
  155. if (dataFile->isSuperFile())
  156. {
  157. IPropertyTree *info = addXrefInfo(reply, "SuperFile", dataFile->queryFileName());
  158. if (info)
  159. {
  160. int numSubs = dataFile->numSubFiles();
  161. for (int i = 0; i < numSubs; i++)
  162. {
  163. StringBuffer subName;
  164. dataFile->getSubFileName(i, subName);
  165. addXrefInfo(*info, "File", subName.str());
  166. }
  167. }
  168. }
  169. else
  170. addXrefInfo(reply, "File", dataFile->queryFileName());
  171. }
  172. extern void addXrefLibraryInfo(IPropertyTree &reply, const char *libraryName)
  173. {
  174. addXrefInfo(reply, "Library", libraryName);
  175. }
  176. //----------------------------------------------------------------------------------------------
  177. // Class CSharedOnceContext manages the context for a query's ONCE code, which is shared between
  178. // all slave and server contexts on a node
  179. //----------------------------------------------------------------------------------------------
  180. class CSharedOnceContext : public CInterfaceOf<ISharedOnceContext>
  181. {
  182. public:
  183. CSharedOnceContext()
  184. {
  185. }
  186. ~CSharedOnceContext()
  187. {
  188. }
  189. virtual IDeserializedResultStore &queryOnceResultStore() const
  190. {
  191. assertex(onceResultStore!= NULL);
  192. return *onceResultStore;
  193. }
  194. virtual IPropertyTree &queryOnceContext(const IQueryFactory *factory, const ContextLogger &logctx) const
  195. {
  196. checkOnceDone(factory, logctx);
  197. assertex(onceContext != NULL);
  198. return *onceContext;
  199. }
  200. virtual void checkOnceDone(const IQueryFactory *factory, const ContextLogger &logctx) const
  201. {
  202. CriticalBlock b(onceCrit);
  203. if (!onceContext)
  204. {
  205. onceContext.setown(createPTree());
  206. onceResultStore.setown(createDeserializedResultStore());
  207. Owned <IRoxieServerContext> ctx = createOnceServerContext(factory, logctx);
  208. onceManager.set(&ctx->queryRowManager());
  209. try
  210. {
  211. ctx->process();
  212. ctx->done(false);
  213. }
  214. catch (IException *E)
  215. {
  216. ctx->done(true);
  217. onceException.setown(E);
  218. }
  219. catch (...)
  220. {
  221. ctx->done(true);
  222. onceException.setown(MakeStringException(ROXIE_INTERNAL_ERROR, "Unknown exception in ONCE code"));
  223. }
  224. }
  225. if (onceException)
  226. throw onceException.getLink();
  227. }
  228. protected:
  229. mutable CriticalSection onceCrit;
  230. mutable Owned<roxiemem::IRowManager> onceManager; // release AFTER resultStore
  231. mutable Owned<IPropertyTree> onceContext;
  232. mutable Owned<IDeserializedResultStore> onceResultStore;
  233. mutable Owned<IException> onceException;
  234. };
  235. //----------------------------------------------------------------------------------------------
  236. // Class CQueryFactory is the main implementation of IQueryFactory, combining a IQueryDll and a
  237. // package context into an object that can quickly create a the query context that executes a specific
  238. // instance of a Roxie query.
  239. // Caching is used to ensure that only queries that are affected by a package change need to be reloaded.
  240. // Derived classes handle the differences between slave and server side factories
  241. //----------------------------------------------------------------------------------------------
  242. class CQueryFactory : public CInterface, implements IQueryFactory, implements IResourceContext
  243. {
  244. protected:
  245. const IRoxiePackage &package;
  246. Owned<const IQueryDll> dll;
  247. Linked<ISharedOnceContext> sharedOnceContext;
  248. MapStringToActivityArray graphMap;
  249. StringAttr id;
  250. StringBuffer errorMessage;
  251. MapIdToActivityFactory allActivities;
  252. bool dynamic;
  253. bool isSuspended;
  254. bool isLoadFailed;
  255. bool enableFieldTranslation;
  256. ClusterType targetClusterType;
  257. unsigned timeLimit;
  258. unsigned warnTimeLimit;
  259. memsize_t memoryLimit;
  260. unsigned priority;
  261. unsigned libraryInterfaceHash;
  262. hash64_t hashValue;
  263. static SpinLock queriesCrit;
  264. static CopyMapXToMyClass<hash64_t, hash64_t, CQueryFactory> queryMap;
  265. public:
  266. static CriticalSection queryCreateLock;
  267. protected:
  268. IRoxieServerActivityFactory *createActivityFactory(ThorActivityKind kind, unsigned subgraphId, IPropertyTree &node)
  269. {
  270. unsigned id = node.getPropInt("@id", 0);
  271. if (isSuspended)
  272. return createRoxieServerDummyActivityFactory(id, subgraphId, *this, NULL, TAKnone, node, false); // Is there actually any point?
  273. StringBuffer helperName;
  274. node.getProp("att[@name=\"helper\"]/@value", helperName);
  275. if (!helperName.length())
  276. helperName.append("fAc").append(id);
  277. HelperFactory *helperFactory = dll->getFactory(helperName);
  278. if (!helperFactory)
  279. throw MakeStringException(ROXIE_INTERNAL_ERROR, "Internal error: helper function %s not exported", helperName.str());
  280. switch (kind)
  281. {
  282. case TAKalljoin:
  283. case TAKalldenormalize:
  284. case TAKalldenormalizegroup:
  285. return createRoxieServerAllJoinActivityFactory(id, subgraphId, *this, helperFactory, kind);
  286. case TAKapply:
  287. return createRoxieServerApplyActivityFactory(id, subgraphId, *this, helperFactory, kind, isRootAction(node));
  288. case TAKaggregate:
  289. case TAKexistsaggregate: // could special case.
  290. case TAKcountaggregate:
  291. return createRoxieServerAggregateActivityFactory(id, subgraphId, *this, helperFactory, kind);
  292. case TAKcase:
  293. case TAKchildcase:
  294. return createRoxieServerCaseActivityFactory(id, subgraphId, *this, helperFactory, kind, isGraphIndependent(node));
  295. case TAKcatch:
  296. case TAKskipcatch:
  297. case TAKcreaterowcatch:
  298. return createRoxieServerCatchActivityFactory(id, subgraphId, *this, helperFactory, kind);
  299. case TAKchilditerator:
  300. return createRoxieServerChildIteratorActivityFactory(id, subgraphId, *this, helperFactory, kind);
  301. case TAKchoosesets:
  302. return createRoxieServerChooseSetsActivityFactory(id, subgraphId, *this, helperFactory, kind);
  303. case TAKchoosesetsenth:
  304. return createRoxieServerChooseSetsEnthActivityFactory(id, subgraphId, *this, helperFactory, kind);
  305. case TAKchoosesetslast:
  306. return createRoxieServerChooseSetsLastActivityFactory(id, subgraphId, *this, helperFactory, kind);
  307. case TAKproject:
  308. case TAKcountproject:
  309. return createRoxieServerProjectActivityFactory(id, subgraphId, *this, helperFactory, kind); // code is common between Project, CountProject
  310. case TAKfilterproject:
  311. return createRoxieServerFilterProjectActivityFactory(id, subgraphId, *this, helperFactory, kind);
  312. case TAKdatasetresult:
  313. case TAKrowresult:
  314. return createRoxieServerDatasetResultActivityFactory(id, subgraphId, *this, helperFactory, kind, isRootAction(node));
  315. case TAKdedup:
  316. return createRoxieServerDedupActivityFactory(id, subgraphId, *this, helperFactory, kind);
  317. case TAKdegroup:
  318. return createRoxieServerDegroupActivityFactory(id, subgraphId, *this, helperFactory, kind);
  319. case TAKcsvread:
  320. case TAKxmlread:
  321. case TAKdiskread:
  322. {
  323. if (node.getPropBool("att[@name='_isSpill']/@value", false) || node.getPropBool("att[@name='_isSpillGlobal']/@value", false))
  324. return createRoxieServerSpillReadActivityFactory(id, subgraphId, *this, helperFactory, kind);
  325. else
  326. {
  327. RemoteActivityId remoteId(id, hashValue);
  328. return createRoxieServerDiskReadActivityFactory(id, subgraphId, *this, helperFactory, kind, remoteId, node);
  329. }
  330. }
  331. case TAKmemoryspillread:
  332. return createRoxieServerSpillReadActivityFactory(id, subgraphId, *this, helperFactory, kind);
  333. case TAKdisknormalize:
  334. case TAKdiskcount:
  335. case TAKdiskaggregate:
  336. case TAKdiskgroupaggregate:
  337. {
  338. RemoteActivityId remoteId(id, hashValue);
  339. return createRoxieServerDiskReadActivityFactory(id, subgraphId, *this, helperFactory, kind, remoteId, node);
  340. }
  341. case TAKchildnormalize:
  342. return createRoxieServerNewChildNormalizeActivityFactory(id, subgraphId, *this, helperFactory, kind);
  343. case TAKchildaggregate:
  344. return createRoxieServerNewChildAggregateActivityFactory(id, subgraphId, *this, helperFactory, kind);
  345. case TAKchildgroupaggregate:
  346. return createRoxieServerNewChildGroupAggregateActivityFactory(id, subgraphId, *this, helperFactory, kind);
  347. case TAKchildthroughnormalize:
  348. return createRoxieServerNewChildThroughNormalizeActivityFactory(id, subgraphId, *this, helperFactory, kind);
  349. case TAKcsvwrite:
  350. case TAKdiskwrite:
  351. case TAKxmlwrite:
  352. case TAKmemoryspillwrite:
  353. return createRoxieServerDiskWriteActivityFactory(id, subgraphId, *this, helperFactory, kind, isRootAction(node));
  354. case TAKindexwrite:
  355. return createRoxieServerIndexWriteActivityFactory(id, subgraphId, *this, helperFactory, kind, isRootAction(node));
  356. case TAKenth:
  357. return createRoxieServerEnthActivityFactory(id, subgraphId, *this, helperFactory, kind);
  358. case TAKfetch:
  359. case TAKcsvfetch:
  360. case TAKxmlfetch:
  361. {
  362. RemoteActivityId remoteId(id, hashValue);
  363. return createRoxieServerFetchActivityFactory(id, subgraphId, *this, helperFactory, kind, remoteId, node);
  364. }
  365. case TAKfilter:
  366. return createRoxieServerFilterActivityFactory(id, subgraphId, *this, helperFactory, kind);
  367. case TAKfiltergroup:
  368. return createRoxieServerFilterGroupActivityFactory(id, subgraphId, *this, helperFactory, kind);
  369. case TAKfirstn:
  370. return createRoxieServerFirstNActivityFactory(id, subgraphId, *this, helperFactory, kind);
  371. case TAKfunnel:
  372. return createRoxieServerConcatActivityFactory(id, subgraphId, *this, helperFactory, kind);
  373. case TAKgroup:
  374. return createRoxieServerGroupActivityFactory(id, subgraphId, *this, helperFactory, kind);
  375. case TAKhashaggregate:
  376. return createRoxieServerHashAggregateActivityFactory(id, subgraphId, *this, helperFactory, kind, node);
  377. case TAKif:
  378. case TAKchildif:
  379. return createRoxieServerIfActivityFactory(id, subgraphId, *this, helperFactory, kind, isGraphIndependent(node));
  380. case TAKifaction:
  381. return createRoxieServerIfActionActivityFactory(id, subgraphId, *this, helperFactory, kind, isRootAction(node));
  382. case TAKparallel:
  383. return createRoxieServerParallelActionActivityFactory(id, subgraphId, *this, helperFactory, kind, isRootAction(node));
  384. case TAKsequential:
  385. return createRoxieServerSequentialActionActivityFactory(id, subgraphId, *this, helperFactory, kind, isRootAction(node));
  386. case TAKindexread:
  387. {
  388. RemoteActivityId remoteId(id, hashValue);
  389. return createRoxieServerIndexReadActivityFactory(id, subgraphId, *this, helperFactory, kind, remoteId, node);
  390. }
  391. case TAKindexnormalize:
  392. {
  393. RemoteActivityId remoteId(id, hashValue);
  394. return createRoxieServerIndexNormalizeActivityFactory(id, subgraphId, *this, helperFactory, kind, remoteId, node);
  395. }
  396. case TAKindexcount:
  397. {
  398. RemoteActivityId remoteId(id, hashValue);
  399. return createRoxieServerIndexCountActivityFactory(id, subgraphId, *this, helperFactory, kind, remoteId, node);
  400. }
  401. case TAKindexaggregate:
  402. {
  403. RemoteActivityId remoteId(id, hashValue);
  404. return createRoxieServerIndexAggregateActivityFactory(id, subgraphId, *this, helperFactory, kind, remoteId, node);
  405. }
  406. case TAKindexgroupaggregate:
  407. case TAKindexgroupexists:
  408. case TAKindexgroupcount:
  409. {
  410. RemoteActivityId remoteId(id, hashValue);
  411. return createRoxieServerIndexGroupAggregateActivityFactory(id, subgraphId, *this, helperFactory, kind, remoteId, node);
  412. }
  413. case TAKhashdedup:
  414. return createRoxieServerHashDedupActivityFactory(id, subgraphId, *this, helperFactory, kind);
  415. case TAKhashdenormalize:
  416. case TAKhashdistribute:
  417. case TAKhashdistributemerge:
  418. case TAKhashjoin:
  419. throwUnexpected(); // Code generator should have removed or transformed
  420. case TAKiterate:
  421. return createRoxieServerIterateActivityFactory(id, subgraphId, *this, helperFactory, kind);
  422. case TAKprocess:
  423. return createRoxieServerProcessActivityFactory(id, subgraphId, *this, helperFactory, kind);
  424. case TAKjoin:
  425. case TAKjoinlight:
  426. case TAKdenormalize:
  427. case TAKdenormalizegroup:
  428. return createRoxieServerJoinActivityFactory(id, subgraphId, *this, helperFactory, kind);
  429. case TAKkeyeddistribute:
  430. throwUnexpected(); // Code generator should have removed or transformed
  431. case TAKkeyedjoin:
  432. case TAKkeyeddenormalize:
  433. case TAKkeyeddenormalizegroup:
  434. {
  435. RemoteActivityId remoteId(id, hashValue);
  436. RemoteActivityId remoteId2(id | ROXIE_ACTIVITY_FETCH, hashValue);
  437. return createRoxieServerKeyedJoinActivityFactory(id, subgraphId, *this, helperFactory, kind, remoteId, remoteId2, node);
  438. }
  439. case TAKlimit:
  440. return createRoxieServerLimitActivityFactory(id, subgraphId, *this, helperFactory, kind);
  441. case TAKlookupjoin:
  442. case TAKlookupdenormalize:
  443. case TAKlookupdenormalizegroup:
  444. case TAKsmartjoin:
  445. case TAKsmartdenormalize:
  446. case TAKsmartdenormalizegroup:
  447. return createRoxieServerLookupJoinActivityFactory(id, subgraphId, *this, helperFactory, kind, node);
  448. case TAKmerge:
  449. return createRoxieServerMergeActivityFactory(id, subgraphId, *this, helperFactory, kind);
  450. case TAKnormalize:
  451. return createRoxieServerNormalizeActivityFactory(id, subgraphId, *this, helperFactory, kind);
  452. case TAKnormalizechild:
  453. return createRoxieServerNormalizeChildActivityFactory(id, subgraphId, *this, helperFactory, kind);
  454. case TAKnormalizelinkedchild:
  455. return createRoxieServerNormalizeLinkedChildActivityFactory(id, subgraphId, *this, helperFactory, kind);
  456. case TAKnull:
  457. return createRoxieServerNullActivityFactory(id, subgraphId, *this, helperFactory, kind);
  458. case TAKsideeffect:
  459. return createRoxieServerSideEffectActivityFactory(id, subgraphId, *this, helperFactory, kind, isRootAction(node));
  460. case TAKsimpleaction:
  461. return createRoxieServerActionActivityFactory(id, subgraphId, *this, helperFactory, kind, usageCount(node), isRootAction(node));
  462. case TAKparse:
  463. return createRoxieServerParseActivityFactory(id, subgraphId, *this, helperFactory, kind, this);
  464. case TAKworkunitwrite:
  465. return createRoxieServerWorkUnitWriteActivityFactory(id, subgraphId, *this, helperFactory, kind, usageCount(node), isRootAction(node));
  466. case TAKdictionaryworkunitwrite:
  467. return createRoxieServerWorkUnitWriteDictActivityFactory(id, subgraphId, *this, helperFactory, kind, usageCount(node), isRootAction(node));
  468. case TAKpiperead:
  469. return createRoxieServerPipeReadActivityFactory(id, subgraphId, *this, helperFactory, kind);
  470. case TAKpipethrough:
  471. return createRoxieServerPipeThroughActivityFactory(id, subgraphId, *this, helperFactory, kind);
  472. case TAKpipewrite:
  473. return createRoxieServerPipeWriteActivityFactory(id, subgraphId, *this, helperFactory, kind, usageCount(node), isRootAction(node));
  474. case TAKpull:
  475. throwUnexpected(); //code generator strips for non-thor
  476. case TAKlinkedrawiterator:
  477. return createRoxieServerLinkedRawIteratorActivityFactory(id, subgraphId, *this, helperFactory, kind);
  478. case TAKremoteresult:
  479. return createRoxieServerRemoteResultActivityFactory(id, subgraphId, *this, helperFactory, kind, usageCount(node), isRootAction(node));
  480. case TAKrollup:
  481. return createRoxieServerRollupActivityFactory(id, subgraphId, *this, helperFactory, kind);
  482. case TAKsample:
  483. return createRoxieServerSampleActivityFactory(id, subgraphId, *this, helperFactory, kind);
  484. case TAKselectn:
  485. return createRoxieServerSelectNActivityFactory(id, subgraphId, *this, helperFactory, kind);
  486. case TAKselfjoin:
  487. case TAKselfjoinlight:
  488. return createRoxieServerSelfJoinActivityFactory(id, subgraphId, *this, helperFactory, kind);
  489. case TAKskiplimit:
  490. case TAKcreaterowlimit:
  491. return createRoxieServerSkipLimitActivityFactory(id, subgraphId, *this, helperFactory, kind);
  492. case TAKhttp_rowdataset:
  493. case TAKsoap_rowdataset:
  494. return createRoxieServerSoapRowCallActivityFactory(id, subgraphId, *this, helperFactory, kind);
  495. case TAKsoap_rowaction:
  496. return createRoxieServerSoapRowActionActivityFactory(id, subgraphId, *this, helperFactory, kind, isRootAction(node));
  497. case TAKsoap_datasetdataset:
  498. return createRoxieServerSoapDatasetCallActivityFactory(id, subgraphId, *this, helperFactory, kind);
  499. case TAKsoap_datasetaction:
  500. return createRoxieServerSoapDatasetActionActivityFactory(id, subgraphId, *this, helperFactory, kind, isRootAction(node));
  501. case TAKsort:
  502. return createRoxieServerSortActivityFactory(id, subgraphId, *this, helperFactory, kind);
  503. case TAKspill:
  504. case TAKmemoryspillsplit:
  505. return createRoxieServerThroughSpillActivityFactory(id, subgraphId, *this, helperFactory, kind);
  506. case TAKsplit:
  507. return createRoxieServerSplitActivityFactory(id, subgraphId, *this, helperFactory, kind);
  508. case TAKstreamediterator:
  509. return createRoxieServerStreamedIteratorActivityFactory(id, subgraphId, *this, helperFactory, kind);
  510. case TAKinlinetable:
  511. return createRoxieServerInlineTableActivityFactory(id, subgraphId, *this, helperFactory, kind);
  512. case TAKthroughaggregate:
  513. throwUnexpected(); // Concept of through aggregates has been proven not to work in Roxie - codegen should not be creating them any more.
  514. case TAKtopn:
  515. return createRoxieServerTopNActivityFactory(id, subgraphId, *this, helperFactory, kind);
  516. case TAKworkunitread:
  517. return createRoxieServerWorkUnitReadActivityFactory(id, subgraphId, *this, helperFactory, kind);
  518. case TAKxmlparse:
  519. return createRoxieServerXmlParseActivityFactory(id, subgraphId, *this, helperFactory, kind);
  520. case TAKregroup:
  521. return createRoxieServerRegroupActivityFactory(id, subgraphId, *this, helperFactory, kind);
  522. case TAKcombine:
  523. return createRoxieServerCombineActivityFactory(id, subgraphId, *this, helperFactory, kind);
  524. case TAKcombinegroup:
  525. return createRoxieServerCombineGroupActivityFactory(id, subgraphId, *this, helperFactory, kind);
  526. case TAKrollupgroup:
  527. return createRoxieServerRollupGroupActivityFactory(id, subgraphId, *this, helperFactory, kind);
  528. case TAKlocalresultread:
  529. {
  530. unsigned graphId = getGraphId(node);
  531. return createRoxieServerLocalResultReadActivityFactory(id, subgraphId, *this, helperFactory, kind, graphId);
  532. }
  533. case TAKlocalstreamread:
  534. return createRoxieServerLocalResultStreamReadActivityFactory(id, subgraphId, *this, helperFactory, kind);
  535. case TAKlocalresultwrite:
  536. {
  537. unsigned graphId = getGraphId(node);
  538. return createRoxieServerLocalResultWriteActivityFactory(id, subgraphId, *this, helperFactory, kind, usageCount(node), graphId, isRootAction(node));
  539. }
  540. case TAKdictionaryresultwrite:
  541. {
  542. unsigned graphId = getGraphId(node);
  543. return createRoxieServerDictionaryResultWriteActivityFactory(id, subgraphId, *this, helperFactory, kind, usageCount(node), graphId, isRootAction(node));
  544. }
  545. case TAKloopcount:
  546. case TAKlooprow:
  547. case TAKloopdataset:
  548. {
  549. unsigned loopId = node.getPropInt("att[@name=\"_loopid\"]/@value", 0);
  550. return createRoxieServerLoopActivityFactory(id, subgraphId, *this, helperFactory, kind, loopId);
  551. }
  552. case TAKremotegraph:
  553. {
  554. RemoteActivityId remoteId(id, hashValue);
  555. return createRoxieServerRemoteActivityFactory(id, subgraphId, *this, helperFactory, kind, remoteId, isRootAction(node));
  556. }
  557. case TAKgraphloopresultread:
  558. {
  559. unsigned graphId = getGraphId(node);
  560. return createRoxieServerGraphLoopResultReadActivityFactory(id, subgraphId, *this, helperFactory, kind, graphId);
  561. }
  562. case TAKgraphloopresultwrite:
  563. {
  564. unsigned graphId = getGraphId(node);
  565. return createRoxieServerGraphLoopResultWriteActivityFactory(id, subgraphId, *this, helperFactory, kind, usageCount(node), graphId);
  566. }
  567. case TAKnwaygraphloopresultread:
  568. {
  569. unsigned graphId = node.getPropInt("att[@name=\"_graphId\"]/@value", 0);
  570. return createRoxieServerNWayGraphLoopResultReadActivityFactory(id, subgraphId, *this, helperFactory, kind, graphId);
  571. }
  572. case TAKnwayinput:
  573. return createRoxieServerNWayInputActivityFactory(id, subgraphId, *this, helperFactory, kind);
  574. case TAKnwaymerge:
  575. return createRoxieServerNWayMergeActivityFactory(id, subgraphId, *this, helperFactory, kind);
  576. case TAKnwaymergejoin:
  577. case TAKnwayjoin:
  578. return createRoxieServerNWayMergeJoinActivityFactory(id, subgraphId, *this, helperFactory, kind);
  579. case TAKsorted:
  580. return createRoxieServerSortedActivityFactory(id, subgraphId, *this, helperFactory, kind);
  581. case TAKgraphloop:
  582. case TAKparallelgraphloop:
  583. {
  584. unsigned loopId = node.getPropInt("att[@name=\"_loopid\"]/@value", 0);
  585. return createRoxieServerGraphLoopActivityFactory(id, subgraphId, *this, helperFactory, kind, loopId);
  586. }
  587. case TAKlibrarycall:
  588. {
  589. LibraryCallFactoryExtra extra;
  590. extra.maxOutputs = node.getPropInt("att[@name=\"_maxOutputs\"]/@value", 0);
  591. extra.graphid = node.getPropInt("att[@name=\"_graphid\"]/@value", 0);
  592. extra.libraryName.set(node.queryProp("att[@name=\"libname\"]/@value"));
  593. extra.interfaceHash = node.getPropInt("att[@name=\"_interfaceHash\"]/@value", 0);
  594. extra.embedded = node.getPropBool("att[@name=\"embedded\"]/@value", false) ;
  595. Owned<IPropertyTreeIterator> iter = node.getElements("att[@name=\"_outputUsed\"]");
  596. ForEach(*iter)
  597. extra.outputs.append(iter->query().getPropInt("@value"));
  598. return createRoxieServerLibraryCallActivityFactory(id, subgraphId, *this, helperFactory, kind, extra);
  599. }
  600. case TAKnwayselect:
  601. return createRoxieServerNWaySelectActivityFactory(id, subgraphId, *this, helperFactory, kind);
  602. case TAKnonempty:
  603. return createRoxieServerNonEmptyActivityFactory(id, subgraphId, *this, helperFactory, kind);
  604. case TAKprefetchproject:
  605. return createRoxieServerPrefetchProjectActivityFactory(id, subgraphId, *this, helperFactory, kind);
  606. case TAKwhen_dataset:
  607. return createRoxieServerWhenActivityFactory(id, subgraphId, *this, helperFactory, kind);
  608. case TAKwhen_action:
  609. return createRoxieServerWhenActionActivityFactory(id, subgraphId, *this, helperFactory, kind, isRootAction(node));
  610. case TAKdistribution:
  611. return createRoxieServerDistributionActivityFactory(id, subgraphId, *this, helperFactory, kind, isRootAction(node));
  612. // These are not required in Roxie for the time being - code generator should trap them
  613. case TAKchilddataset:
  614. default:
  615. throw MakeStringException(ROXIE_UNIMPLEMENTED_ERROR, "Unimplemented activity %s required", getActivityText(kind));
  616. break;
  617. }
  618. throwUnexpected(); // unreachable, but some compilers will complain about missing return
  619. }
  620. IActivityFactory *findActivity(unsigned id) const
  621. {
  622. if (id)
  623. {
  624. IActivityFactory **f = allActivities.getValue(id);
  625. if (f)
  626. return *f;
  627. }
  628. return NULL;
  629. }
  630. virtual IRoxieServerActivityFactory *getRoxieServerActivityFactory(unsigned id) const
  631. {
  632. checkSuspended();
  633. return LINK(QUERYINTERFACE(findActivity(id), IRoxieServerActivityFactory));
  634. }
  635. virtual ISlaveActivityFactory *getSlaveActivityFactory(unsigned id) const
  636. {
  637. checkSuspended();
  638. IActivityFactory *f = findActivity(id);
  639. return LINK(QUERYINTERFACE(f, ISlaveActivityFactory)); // MORE - don't dynamic cast yuk
  640. }
  641. ActivityArray *loadChildGraph(IPropertyTree &graph)
  642. {
  643. // MORE - this is starting to look very much like loadGraph (on Roxie server side)
  644. ActivityArray *activities = new ActivityArray(true, graph.getPropBool("@delayed"), graph.getPropBool("@library"), graph.getPropBool("@sequential"));
  645. unsigned subgraphId = graph.getPropInt("@id");
  646. try
  647. {
  648. Owned<IPropertyTreeIterator> nodes = graph.getElements("node");
  649. ForEach(*nodes)
  650. {
  651. IPropertyTree &node = nodes->query();
  652. loadNode(node, subgraphId, activities);
  653. }
  654. Owned<IPropertyTreeIterator> edges = graph.getElements("edge");
  655. ForEach(*edges)
  656. {
  657. IPropertyTree &edge = edges->query();
  658. unsigned source = activities->findActivityIndex(edge.getPropInt("@source",0));
  659. unsigned target = activities->findActivityIndex(edge.getPropInt("@target",0));
  660. unsigned sourceOutput = edge.getPropInt("att[@name=\"_sourceIndex\"]/@value", 0);
  661. unsigned targetInput = edge.getPropInt("att[@name=\"_targetIndex\"]/@value", 0);
  662. int controlId = edge.getPropInt("att[@name=\"_when\"]/@value", 0);
  663. if (controlId != 0)
  664. {
  665. const char * edgeId = edge.queryProp("@id");
  666. addDependency(sourceOutput, source, target, controlId, edgeId, activities);
  667. }
  668. else
  669. activities->serverItem(target).setInput(targetInput, source, sourceOutput);
  670. }
  671. }
  672. catch (...)
  673. {
  674. ::Release(activities);
  675. allActivities.kill();
  676. throw;
  677. }
  678. return activities;
  679. }
  680. void loadNode(IPropertyTree &node, unsigned subgraphId, ActivityArray *activities)
  681. {
  682. ThorActivityKind kind = getActivityKind(node);
  683. if (kind==TAKsubgraph)
  684. {
  685. IPropertyTree * childGraphNode = node.queryPropTree("att/graph");
  686. if (childGraphNode->getPropBool("@child"))
  687. {
  688. loadSubgraph(node, activities);
  689. }
  690. else
  691. {
  692. unsigned parentId = findParentId(node);
  693. assertex(parentId);
  694. unsigned parentIdx = activities->findActivityIndex(parentId);
  695. IActivityFactory &parentFactory = activities->item(parentIdx);
  696. ActivityArray *childQuery = loadChildGraph(*childGraphNode);
  697. parentFactory.addChildQuery(node.getPropInt("@id"), childQuery);
  698. }
  699. }
  700. else if (kind)
  701. {
  702. IRoxieServerActivityFactory *f = createActivityFactory(kind, subgraphId, node);
  703. if (f)
  704. {
  705. activities->append(*f);
  706. allActivities.setValue(f->queryId(), f);
  707. }
  708. }
  709. }
  710. void loadSubgraph(IPropertyTree &graph, ActivityArray *activities)
  711. {
  712. unsigned subgraphId = graph.getPropInt("@id");
  713. Owned<IPropertyTreeIterator> nodes = graph.getElements("att/graph/node");
  714. ForEach(*nodes)
  715. {
  716. IPropertyTree &node = nodes->query();
  717. loadNode(node, subgraphId, activities);
  718. }
  719. if (!isSuspended)
  720. {
  721. Owned<IPropertyTreeIterator> edges = graph.getElements("att/graph/edge");
  722. ForEach(*edges)
  723. {
  724. IPropertyTree &edge = edges->query();
  725. unsigned sourceActivity = edge.getPropInt("@source", 0);
  726. unsigned targetActivity = edge.getPropInt("@target", 0);
  727. unsigned source = activities->findActivityIndex(sourceActivity);
  728. unsigned target = activities->recursiveFindActivityIndex(targetActivity);
  729. unsigned sourceOutput = edge.getPropInt("att[@name=\"_sourceIndex\"]/@value", 0);
  730. unsigned targetInput = edge.getPropInt("att[@name=\"_targetIndex\"]/@value", 0);
  731. int controlId = edge.getPropInt("att[@name=\"_when\"]/@value", 0);
  732. if (controlId != 0)
  733. {
  734. const char * edgeId = edge.queryProp("@id");
  735. addDependency(sourceOutput, sourceActivity, targetActivity, controlId, edgeId, activities);
  736. }
  737. else
  738. activities->serverItem(target).setInput(targetInput, source, sourceOutput);
  739. }
  740. }
  741. }
  742. // loadGraph loads outer level graph. This is virtual as slave is very different from Roxie server
  743. virtual ActivityArray *loadGraph(IPropertyTree &graph, const char *graphName) = 0;
  744. bool doAddDependency(unsigned sourceIdx, unsigned sourceId, unsigned targetId, int controlId, const char *edgeId, ActivityArray * activities)
  745. {
  746. // Note - the dependency is recorded with the target being the parent activity that is at the same level as the source
  747. // (recording it on the child that was actually dependent would mean it happened too late)
  748. unsigned source = activities->findActivityIndex(sourceId);
  749. if (source != NotFound)
  750. {
  751. unsigned target = activities->recursiveFindActivityIndex(targetId);
  752. activities->serverItem(target).addDependency(source, activities->serverItem(source).getKind(), sourceIdx, controlId, edgeId);
  753. activities->serverItem(source).noteDependent(target);
  754. return true;
  755. }
  756. ForEachItemIn(idx, *activities)
  757. {
  758. IActivityFactory & cur = activities->item(idx);
  759. unsigned childId;
  760. for (unsigned childIdx = 0;;childIdx++)
  761. {
  762. ActivityArray * children = cur.queryChildQuery(childIdx, childId);
  763. if (!children)
  764. break;
  765. if (doAddDependency(sourceIdx, sourceId, targetId, controlId, edgeId, children))
  766. return true;
  767. }
  768. }
  769. return false;
  770. }
  771. virtual void addDependency(unsigned sourceIdx, unsigned sourceId, unsigned targetId, int controlId, const char *edgeId, ActivityArray * activities)
  772. {
  773. doAddDependency(sourceIdx, sourceId, targetId, controlId, edgeId, activities);
  774. }
  775. void addDependencies(IPropertyTree &graph, ActivityArray *activities)
  776. {
  777. Owned<IPropertyTreeIterator> dependencies = graph.getElements("edge");
  778. ForEach(*dependencies)
  779. {
  780. IPropertyTree &edge = dependencies->query();
  781. if (!edge.getPropInt("att[@name=\"_childGraph\"]/@value", 0))
  782. {
  783. unsigned sourceIdx = edge.getPropInt("att[@name=\"_sourceIndex\"]/@value", 0);
  784. int controlId = edge.getPropInt("att[@name=\"_when\"]/@value", 0);
  785. addDependency(sourceIdx, edge.getPropInt("att[@name=\"_sourceActivity\"]/@value", 0), edge.getPropInt("att[@name=\"_targetActivity\"]/@value", 0), controlId, edge.queryProp("@id"), activities);
  786. }
  787. }
  788. }
  789. public:
  790. IMPLEMENT_IINTERFACE;
  791. unsigned channelNo;
  792. CQueryFactory(const char *_id, const IQueryDll *_dll, const IRoxiePackage &_package, hash64_t _hashValue, unsigned _channelNo, ISharedOnceContext *_sharedOnceContext, bool _dynamic)
  793. : id(_id), package(_package), dll(_dll), channelNo(_channelNo), hashValue(_hashValue), sharedOnceContext(_sharedOnceContext), dynamic(_dynamic)
  794. {
  795. package.Link();
  796. isSuspended = false;
  797. isLoadFailed = false;
  798. libraryInterfaceHash = 0;
  799. priority = 0;
  800. memoryLimit = defaultMemoryLimit;
  801. timeLimit = defaultTimeLimit[priority];
  802. warnTimeLimit = 0;
  803. enableFieldTranslation = package.getEnableFieldTranslation();
  804. }
  805. ~CQueryFactory()
  806. {
  807. HashIterator graphs(graphMap);
  808. for(graphs.first();graphs.isValid();graphs.next())
  809. {
  810. ActivityArray *a = *graphMap.mapToValue(&graphs.query());
  811. a->Release();
  812. }
  813. package.Release();
  814. }
  815. virtual IQueryFactory *lookupLibrary(const char *libraryName, unsigned expectedInterfaceHash, const IRoxieContextLogger &logctx) const
  816. {
  817. return globalPackageSetManager->lookupLibrary(libraryName, expectedInterfaceHash, logctx);
  818. }
  819. virtual void beforeDispose()
  820. {
  821. SpinBlock b(queriesCrit);
  822. // NOTE: it's theoretically possible for the final release to happen after a replacement has been inserted into hash table.
  823. // So only remove from hash table if what we find there matches the item that is being deleted.
  824. CQueryFactory *goer = queryMap.getValue(hashValue+channelNo);
  825. if (goer == this)
  826. queryMap.remove(hashValue+channelNo);
  827. }
  828. static IQueryFactory *getQueryFactory(hash64_t hashValue, unsigned channelNo)
  829. {
  830. SpinBlock b(queriesCrit);
  831. CQueryFactory *factory = LINK(queryMap.getValue(hashValue+channelNo));
  832. if (factory && factory->isAlive())
  833. return factory;
  834. else
  835. return NULL;
  836. }
  837. static hash64_t getQueryHash(const char *id, const IQueryDll *dll, const IRoxiePackage &package, const IPropertyTree *stateInfo, IArrayOf<IResolvedFile> &files, bool isDynamic)
  838. {
  839. hash64_t hashValue = package.queryHash();
  840. if (dll)
  841. {
  842. hashValue = rtlHash64VStr(dll->queryDll()->queryName(), hashValue);
  843. if (!allFilesDynamic && !isDynamic)
  844. {
  845. IConstWorkUnit *wu = dll->queryWorkUnit();
  846. if (wu) // wu may be null in some unit test cases
  847. {
  848. SCMStringBuffer bStr;
  849. // Don't want to include files referenced in thor graphs... in practice isDynamic also likely to be set in such cases
  850. if (getClusterType(wu->getDebugValue("targetClusterType", bStr).str(), RoxieCluster) == RoxieCluster)
  851. {
  852. Owned<IConstWUGraphIterator> graphs = &wu->getGraphs(GraphTypeActivities);
  853. ForEach(*graphs)
  854. {
  855. Owned<IPropertyTree> graphXgmml = graphs->query().getXGMMLTree(false);
  856. Owned<IPropertyTreeIterator> nodes = graphXgmml->getElements(".//node");
  857. ForEach(*nodes)
  858. {
  859. IPropertyTree &node = nodes->query();
  860. ThorActivityKind kind = getActivityKind(node);
  861. if (kind != TAKdiskwrite && kind != TAKindexwrite && kind != TAKpiperead && kind != TAKpipewrite)
  862. {
  863. const char *fileName = queryNodeFileName(node, kind);
  864. const char *indexName = queryNodeIndexName(node, kind);
  865. // What about packages that resolve everything without dali?
  866. if (indexName)
  867. {
  868. bool isOpt = pretendAllOpt || node.getPropBool("att[@name='_isIndexOpt']/@value");
  869. const IResolvedFile *indexFile = package.lookupFileName(indexName, isOpt, true, true, wu);
  870. if (indexFile)
  871. {
  872. hashValue = indexFile->addHash64(hashValue);
  873. files.append(*const_cast<IResolvedFile *>(indexFile));
  874. }
  875. }
  876. if (fileName)
  877. {
  878. if (!node.getPropBool("att[@name='_isSpill']/@value") && !node.getPropBool("att[@name='_isSpillGlobal']/@value"))
  879. {
  880. bool isOpt = pretendAllOpt || node.getPropBool("att[@name='_isOpt']/@value");
  881. const IResolvedFile *dataFile = package.lookupFileName(fileName, isOpt, true, true, wu);
  882. if (dataFile)
  883. {
  884. hashValue = dataFile->addHash64(hashValue);
  885. files.append(*const_cast<IResolvedFile *>(dataFile));
  886. }
  887. }
  888. }
  889. }
  890. }
  891. }
  892. }
  893. }
  894. }
  895. }
  896. if (id)
  897. hashValue = rtlHash64VStr(id, hashValue);
  898. if (stateInfo)
  899. {
  900. StringBuffer xml;
  901. toXML(stateInfo, xml);
  902. hashValue = rtlHash64Data(xml.length(), xml.str(), hashValue);
  903. }
  904. return hashValue;
  905. }
  906. virtual void load(const IPropertyTree *stateInfo)
  907. {
  908. IConstWorkUnit *wu = dll->queryWorkUnit();
  909. if (wu) // wu may be null in some unit test cases
  910. {
  911. libraryInterfaceHash = wu->getApplicationValueInt("LibraryModule", "interfaceHash", 0);
  912. // calculate priority before others since it affects the defaults of others
  913. priority = wu->getDebugValueInt("@priority", 0);
  914. if (stateInfo)
  915. priority = stateInfo->getPropInt("@priority", priority);
  916. memoryLimit = (memsize_t) wu->getDebugValueInt64("memoryLimit", defaultMemoryLimit);
  917. timeLimit = (unsigned) wu->getDebugValueInt("timeLimit", defaultTimeLimit[priority]);
  918. warnTimeLimit = (unsigned) wu->getDebugValueInt("warnTimeLimit", 0);
  919. enableFieldTranslation = wu->getDebugValueBool("layoutTranslationEnabled", enableFieldTranslation);
  920. SCMStringBuffer bStr;
  921. targetClusterType = getClusterType(wu->getDebugValue("targetClusterType", bStr).str(), RoxieCluster);
  922. // MORE - does package override stateInfo, or vice versa?
  923. if (stateInfo)
  924. {
  925. // info in querySets can override the defaults from workunit for some limits
  926. isSuspended = stateInfo->getPropBool("@suspended", false);
  927. memoryLimit = (memsize_t) stateInfo->getPropInt64("@memoryLimit", memoryLimit);
  928. timeLimit = (unsigned) stateInfo->getPropInt("@timeLimit", timeLimit);
  929. warnTimeLimit = (unsigned) stateInfo->getPropInt("@warnTimeLimit", warnTimeLimit);
  930. }
  931. if (targetClusterType == RoxieCluster)
  932. {
  933. Owned<IConstWUGraphIterator> graphs = &wu->getGraphs(GraphTypeActivities);
  934. SCMStringBuffer graphNameStr;
  935. ForEach(*graphs)
  936. {
  937. graphs->query().getName(graphNameStr);
  938. const char *graphName = graphNameStr.s.str();
  939. Owned<IPropertyTree> graphXgmml = graphs->query().getXGMMLTree(false);
  940. try
  941. {
  942. ActivityArray *activities = loadGraph(*graphXgmml, graphName);
  943. graphMap.setValue(graphName, activities);
  944. }
  945. catch (IException *E)
  946. {
  947. StringBuffer m;
  948. E->errorMessage(m);
  949. suspend(m.str());
  950. ERRLOG("Query %s suspended: %s", id.get(), m.str());
  951. E->Release();
  952. }
  953. }
  954. }
  955. }
  956. SpinBlock b(queriesCrit);
  957. queryMap.setValue(hashValue+channelNo, this);
  958. }
  959. virtual unsigned queryChannel() const
  960. {
  961. return channelNo;
  962. }
  963. virtual hash64_t queryHash() const
  964. {
  965. return hashValue;
  966. }
  967. virtual ISharedOnceContext *querySharedOnceContext() const
  968. {
  969. return sharedOnceContext;
  970. }
  971. virtual IDeserializedResultStore &queryOnceResultStore() const
  972. {
  973. assertex(sharedOnceContext);
  974. return sharedOnceContext->queryOnceResultStore();
  975. }
  976. virtual IPropertyTree &queryOnceContext(const ContextLogger &logctx) const
  977. {
  978. assertex(sharedOnceContext);
  979. return sharedOnceContext->queryOnceContext(this, logctx);
  980. }
  981. virtual const char *loadResource(unsigned id)
  982. {
  983. return (const char *) queryDll()->getResource(id);
  984. }
  985. virtual ActivityArray *lookupGraphActivities(const char *name) const
  986. {
  987. return *graphMap.getValue(name);
  988. }
  989. virtual IActivityGraph *lookupGraph(const char *name, IProbeManager *probeManager, const IRoxieContextLogger &logctx, IRoxieServerActivity *parentActivity) const
  990. {
  991. ActivityArrayPtr *graph = graphMap.getValue(name);
  992. assertex(graph);
  993. Owned<IActivityGraph> ret = ::createActivityGraph(name, 0, **graph, parentActivity, probeManager, logctx);
  994. return ret.getClear();
  995. }
  996. void getGraphStats(StringBuffer &reply, const IPropertyTree &thisGraph) const
  997. {
  998. Owned<IPropertyTree> graph = createPTreeFromIPT(&thisGraph);
  999. Owned<IPropertyTreeIterator> edges = graph->getElements(".//edge");
  1000. ForEach(*edges)
  1001. {
  1002. IPropertyTree &edge = edges->query();
  1003. IActivityFactory *a = findActivity(edge.getPropInt("@source", 0));
  1004. if (!a)
  1005. a = findActivity(edge.getPropInt("att[@name=\"_sourceActivity\"]/@value", 0));
  1006. if (a)
  1007. {
  1008. unsigned sourceOutput = edge.getPropInt("att[@name=\"_sourceIndex\"]/@value", 0);
  1009. a->getEdgeProgressInfo(sourceOutput, edge);
  1010. }
  1011. }
  1012. Owned<IPropertyTreeIterator> nodes = graph->getElements(".//node");
  1013. ForEach(*nodes)
  1014. {
  1015. IPropertyTree &node = nodes->query();
  1016. IActivityFactory *a = findActivity(node.getPropInt("@id", 0));
  1017. if (a)
  1018. a->getNodeProgressInfo(node);
  1019. }
  1020. toXML(graph, reply);
  1021. }
  1022. virtual IPropertyTree* cloneQueryXGMML() const
  1023. {
  1024. assertex(dll && dll->queryWorkUnit());
  1025. Owned<IPropertyTree> tree = createPTree("Query");
  1026. Owned<IConstWUGraphIterator> graphs = &dll->queryWorkUnit()->getGraphs(GraphTypeActivities);
  1027. SCMStringBuffer graphNameStr;
  1028. ForEach(*graphs)
  1029. {
  1030. graphs->query().getName(graphNameStr);
  1031. const char *graphName = graphNameStr.s.str();
  1032. Owned<IPropertyTree> graphXgmml = graphs->query().getXGMMLTree(false);
  1033. IPropertyTree *newGraph = createPTree();
  1034. newGraph->setProp("@id", graphName);
  1035. IPropertyTree *newXGMML = createPTree();
  1036. newXGMML->addPropTree("graph", graphXgmml.getLink());
  1037. newGraph->addPropTree("xgmml", newXGMML);
  1038. tree->addPropTree("Graph", newGraph);
  1039. }
  1040. return tree.getClear();
  1041. }
  1042. virtual void getStats(StringBuffer &reply, const char *graphName) const
  1043. {
  1044. if (dll)
  1045. {
  1046. assertex(dll->queryWorkUnit());
  1047. Owned<IConstWUGraphIterator> graphs = &dll->queryWorkUnit()->getGraphs(GraphTypeActivities);
  1048. SCMStringBuffer thisGraphNameStr;
  1049. ForEach(*graphs)
  1050. {
  1051. graphs->query().getName(thisGraphNameStr);
  1052. if (graphName)
  1053. {
  1054. if (thisGraphNameStr.length() && (stricmp(graphName, thisGraphNameStr.s.str()) != 0))
  1055. continue; // not interested in this one
  1056. }
  1057. reply.appendf("<Graph id='%s'><xgmml>", thisGraphNameStr.s.str());
  1058. Owned<IPropertyTree> graphXgmml = graphs->query().getXGMMLTree(false);
  1059. getGraphStats(reply, *graphXgmml);
  1060. reply.append("</xgmml></Graph>");
  1061. }
  1062. }
  1063. }
  1064. virtual void getActivityMetrics(StringBuffer &reply) const
  1065. {
  1066. HashIterator i(allActivities);
  1067. StringBuffer myReply;
  1068. ForEach(i)
  1069. {
  1070. IActivityFactory *f = *allActivities.mapToValue(&i.query());
  1071. f->getActivityMetrics(myReply.clear());
  1072. if (myReply.length())
  1073. {
  1074. reply.appendf(" <activity query='%s' id='%d' channel='%d'\n", queryQueryName(), f->queryId(), queryChannel());
  1075. reply.append(myReply);
  1076. reply.append(" </activity>\n");
  1077. }
  1078. }
  1079. }
  1080. virtual void getQueryInfo(StringBuffer &reply, bool full, const IRoxieContextLogger &logctx) const
  1081. {
  1082. Owned<IPropertyTree> xref = createPTree("Query", 0);
  1083. xref->setProp("@id", id);
  1084. if (suspended())
  1085. {
  1086. xref->setPropBool("@suspended", true);
  1087. xref->setProp("@error", errorMessage);
  1088. }
  1089. if (full)
  1090. {
  1091. HashIterator i(allActivities);
  1092. ForEach(i)
  1093. {
  1094. IActivityFactory *f = *allActivities.mapToValue(&i.query());
  1095. f->getXrefInfo(*xref, logctx);
  1096. }
  1097. }
  1098. toXML(xref, reply, 1);
  1099. }
  1100. virtual void resetQueryTimings()
  1101. {
  1102. HashIterator i(allActivities);
  1103. ForEach(i)
  1104. {
  1105. IActivityFactory *f = *allActivities.mapToValue(&i.query());
  1106. f->resetNodeProgressInfo();
  1107. }
  1108. }
  1109. virtual const char *queryErrorMessage() const
  1110. {
  1111. return errorMessage.str();
  1112. }
  1113. virtual const char *queryQueryName() const
  1114. {
  1115. return id;
  1116. }
  1117. virtual bool isQueryLibrary() const
  1118. {
  1119. return libraryInterfaceHash != 0;
  1120. }
  1121. virtual unsigned getQueryLibraryInterfaceHash() const
  1122. {
  1123. return libraryInterfaceHash;
  1124. }
  1125. virtual void suspend(const char* errMsg)
  1126. {
  1127. isSuspended = true;
  1128. isLoadFailed = true;
  1129. errorMessage.append(errMsg);
  1130. }
  1131. virtual bool loadFailed() const
  1132. {
  1133. return isLoadFailed;
  1134. }
  1135. virtual bool suspended() const
  1136. {
  1137. return isSuspended;
  1138. }
  1139. virtual memsize_t getMemoryLimit() const
  1140. {
  1141. return memoryLimit;
  1142. }
  1143. virtual unsigned getTimeLimit() const
  1144. {
  1145. return timeLimit;
  1146. }
  1147. virtual ILoadedDllEntry *queryDll() const
  1148. {
  1149. assertex(dll);
  1150. return dll->queryDll();
  1151. }
  1152. virtual IConstWorkUnit *queryWorkUnit() const
  1153. {
  1154. assertex(dll);
  1155. return dll->queryWorkUnit();
  1156. }
  1157. virtual const IRoxiePackage &queryPackage() const
  1158. {
  1159. return package;
  1160. }
  1161. virtual CRoxieWorkflowMachine *createWorkflowMachine(IConstWorkUnit *wu, bool isOnce, const ContextLogger &logctx) const
  1162. {
  1163. throwUnexpected(); // only on server...
  1164. }
  1165. virtual char *getEnv(const char *name, const char *defaultValue) const
  1166. {
  1167. if (!defaultValue)
  1168. defaultValue = "";
  1169. const char *result;
  1170. if (name && *name=='@')
  1171. {
  1172. // @ is shorthand for control: for legacy compatibility reasons
  1173. StringBuffer useName;
  1174. useName.append("control:").append(name+1);
  1175. result = package.queryEnv(useName.str());
  1176. }
  1177. else
  1178. result = package.queryEnv(name);
  1179. if (!result)
  1180. result = getenv(name);
  1181. return strdup(result ? result : defaultValue);
  1182. }
  1183. virtual unsigned getPriority() const
  1184. {
  1185. return priority;
  1186. }
  1187. virtual unsigned getWarnTimeLimit() const
  1188. {
  1189. return warnTimeLimit;
  1190. }
  1191. virtual int getDebugValueInt(const char * propname, int defVal) const
  1192. {
  1193. assertex(dll->queryWorkUnit());
  1194. return dll->queryWorkUnit()->getDebugValueInt(propname, defVal);
  1195. }
  1196. virtual bool getDebugValueBool(const char * propname, bool defVal) const
  1197. {
  1198. assertex(dll->queryWorkUnit());
  1199. return dll->queryWorkUnit()->getDebugValueBool(propname, defVal);
  1200. }
  1201. bool getEnableFieldTranslation() const
  1202. {
  1203. return enableFieldTranslation;
  1204. }
  1205. virtual IRoxieSlaveContext *createSlaveContext(const SlaveContextLogger &logctx, IRoxieQueryPacket *packet) const
  1206. {
  1207. throwUnexpected(); // only implemented in derived slave class
  1208. }
  1209. virtual IRoxieServerContext *createContext(IPropertyTree *xml, SafeSocket &client, TextMarkupFormat mlFmt, bool isRaw, bool isBlocked, HttpHelper &httpHelper, bool trim, const ContextLogger &_logctx, PTreeReaderOptions xmlReadFlags, const char *querySetName) const
  1210. {
  1211. throwUnexpected(); // only implemented in derived server class
  1212. }
  1213. virtual IRoxieServerContext *createContext(IConstWorkUnit *wu, const ContextLogger &_logctx) const
  1214. {
  1215. throwUnexpected(); // only implemented in derived server class
  1216. }
  1217. virtual void noteQuery(time_t startTime, bool failed, unsigned elapsed, unsigned memused, unsigned slavesReplyLen, unsigned bytesOut)
  1218. {
  1219. throwUnexpected(); // only implemented in derived server class
  1220. }
  1221. virtual IPropertyTree *getQueryStats(time_t from, time_t to)
  1222. {
  1223. throwUnexpected(); // only implemented in derived server class
  1224. }
  1225. virtual void getGraphNames(StringArray &ret) const
  1226. {
  1227. Owned<IConstWUGraphIterator> graphs = &dll->queryWorkUnit()->getGraphs(GraphTypeActivities);
  1228. ForEach(*graphs)
  1229. {
  1230. SCMStringBuffer graphName;
  1231. graphs->query().getName(graphName);
  1232. ret.append(graphName.str());
  1233. }
  1234. }
  1235. virtual bool isDynamic() const
  1236. {
  1237. return dynamic;
  1238. }
  1239. protected:
  1240. IPropertyTree *queryWorkflowTree() const
  1241. {
  1242. assertex(dll->queryWorkUnit());
  1243. return dll->queryWorkUnit()->queryWorkflowTree();
  1244. }
  1245. bool hasOnceSection() const
  1246. {
  1247. IPropertyTree *workflow = queryWorkflowTree();
  1248. if (workflow)
  1249. return workflow->hasProp("Item[@mode='once']");
  1250. else
  1251. return false;
  1252. }
  1253. void checkSuspended() const
  1254. {
  1255. if (isSuspended)
  1256. {
  1257. StringBuffer err;
  1258. if (errorMessage.length())
  1259. err.appendf(" because %s", errorMessage.str());
  1260. throw MakeStringException(ROXIE_QUERY_SUSPENDED, "Query %s is suspended%s", id.get(), err.str());
  1261. }
  1262. }
  1263. };
  1264. CriticalSection CQueryFactory::queryCreateLock;
  1265. SpinLock CQueryFactory::queriesCrit;
  1266. CopyMapXToMyClass<hash64_t, hash64_t, CQueryFactory> CQueryFactory::queryMap;
  1267. extern IQueryFactory *getQueryFactory(hash64_t hashvalue, unsigned channel)
  1268. {
  1269. return CQueryFactory::getQueryFactory(hashvalue, channel);
  1270. }
  1271. class CRoxieServerQueryFactory : public CQueryFactory
  1272. {
  1273. // Parts of query factory is only interesting on the server - workflow support, and tracking of total query times
  1274. protected:
  1275. Owned<IQueryStatsAggregator> queryStats;
  1276. public:
  1277. CRoxieServerQueryFactory(const char *_id, const IQueryDll *_dll, const IRoxiePackage &_package, hash64_t _hashValue, ISharedOnceContext *_sharedOnceContext, bool _dynamic)
  1278. : CQueryFactory(_id, _dll, _package, _hashValue, 0, _sharedOnceContext, _dynamic)
  1279. {
  1280. queryStats.setown(createQueryStatsAggregator(id.get(), statsExpiryTime));
  1281. }
  1282. virtual void noteQuery(time_t startTime, bool failed, unsigned elapsed, unsigned memused, unsigned slavesReplyLen, unsigned bytesOut)
  1283. {
  1284. queryStats->noteQuery(startTime, failed, elapsed, memused, slavesReplyLen, bytesOut);
  1285. queryGlobalQueryStatsAggregator()->noteQuery(startTime, failed, elapsed, memused, slavesReplyLen, bytesOut);
  1286. }
  1287. virtual void addDependency(unsigned sourceIdx, unsigned sourceId, unsigned targetId, int controlId, const char *edgeId, ActivityArray * activities)
  1288. {
  1289. // addDependency is expected to fail occasionally on slave, but never on Roxie server
  1290. if (!doAddDependency(sourceIdx, sourceId, targetId, controlId, edgeId, activities))
  1291. throw MakeStringException(ROXIE_ADDDEPENDENCY_ERROR, "Failed to create dependency from %u on %u", sourceId, targetId);
  1292. }
  1293. virtual ActivityArray *loadGraph(IPropertyTree &graph, const char *graphName)
  1294. {
  1295. bool isLibraryGraph = graph.getPropBool("@library");
  1296. bool isSequential = graph.getPropBool("@sequential");
  1297. ActivityArray *activities = new ActivityArray(isLibraryGraph, false, isLibraryGraph, isSequential);
  1298. if (isLibraryGraph)
  1299. activities->setLibraryGraphId(graph.getPropInt("node/@id"));
  1300. try
  1301. {
  1302. Owned<IPropertyTreeIterator> subgraphs = graph.getElements("node");
  1303. ForEach(*subgraphs)
  1304. {
  1305. IPropertyTree &node = subgraphs->query();
  1306. loadSubgraph(node, activities);
  1307. loadNode(node, 0, activities);
  1308. }
  1309. addDependencies(graph, activities);
  1310. }
  1311. catch (...)
  1312. {
  1313. ::Release(activities);
  1314. allActivities.kill();
  1315. throw;
  1316. }
  1317. return activities;
  1318. }
  1319. virtual IRoxieServerContext *createContext(IPropertyTree *context, SafeSocket &client, TextMarkupFormat mlFmt, bool isRaw, bool isBlocked, HttpHelper &httpHelper, bool trim, const ContextLogger &_logctx, PTreeReaderOptions _xmlReadFlags, const char *_querySetName) const
  1320. {
  1321. checkSuspended();
  1322. return createRoxieServerContext(context, this, client, mlFmt==MarkupFmt_XML, isRaw, isBlocked, httpHelper, trim, priority, _logctx, _xmlReadFlags, _querySetName);
  1323. }
  1324. virtual IRoxieServerContext *createContext(IConstWorkUnit *wu, const ContextLogger &_logctx) const
  1325. {
  1326. checkSuspended();
  1327. return createWorkUnitServerContext(wu, this, _logctx);
  1328. }
  1329. virtual CRoxieWorkflowMachine *createWorkflowMachine(IConstWorkUnit *wu, bool isOnce, const ContextLogger &logctx) const
  1330. {
  1331. IPropertyTree *workflow = queryWorkflowTree();
  1332. if (workflow)
  1333. {
  1334. return ::createRoxieWorkflowMachine(workflow, wu, isOnce, logctx);
  1335. }
  1336. else
  1337. return NULL;
  1338. }
  1339. virtual IPropertyTree *getQueryStats(time_t from, time_t to)
  1340. {
  1341. return queryStats->getStats(from, to);
  1342. }
  1343. };
  1344. static void checkWorkunitVersionConsistency(const IQueryDll *dll)
  1345. {
  1346. assertex(dll->queryWorkUnit());
  1347. unsigned wuVersion = dll->queryWorkUnit()->getCodeVersion();
  1348. if (wuVersion > ACTIVITY_INTERFACE_VERSION || wuVersion < MIN_ACTIVITY_INTERFACE_VERSION)
  1349. throw MakeStringException(ROXIE_MISMATCH, "Workunit was compiled for eclhelper interface version %d, this roxie requires version %d..%d", wuVersion, MIN_ACTIVITY_INTERFACE_VERSION, ACTIVITY_INTERFACE_VERSION);
  1350. EclProcessFactory processFactory = (EclProcessFactory) dll->queryDll()->getEntry("createProcess");
  1351. if (processFactory)
  1352. {
  1353. Owned<IEclProcess> process = processFactory();
  1354. assertex(process);
  1355. if (process->getActivityVersion() != wuVersion)
  1356. throw MakeStringException(ROXIE_MISMATCH, "Inconsistent interface versions. Workunit was created using eclcc for version %u, but the c++ compiler used version %u", wuVersion, process->getActivityVersion());
  1357. }
  1358. else
  1359. throw MakeStringException(ROXIE_MISMATCH, "Workunit did not export createProcess function");
  1360. }
  1361. extern IQueryFactory *createServerQueryFactory(const char *id, const IQueryDll *dll, const IRoxiePackage &package, const IPropertyTree *stateInfo, bool isDynamic, bool forceRetry)
  1362. {
  1363. CriticalBlock b(CQueryFactory::queryCreateLock);
  1364. IArrayOf<IResolvedFile> queryFiles; // Note - these should stay in scope long enough to ensure still cached when (if) query is loaded for real
  1365. hash64_t hashValue = CQueryFactory::getQueryHash(id, dll, package, stateInfo, queryFiles, isDynamic);
  1366. IQueryFactory *cached = getQueryFactory(hashValue, 0);
  1367. if (cached && !(cached->loadFailed() && (reloadRetriesFailed || forceRetry)))
  1368. {
  1369. ::Release(dll);
  1370. return cached;
  1371. }
  1372. if (dll)
  1373. {
  1374. checkWorkunitVersionConsistency(dll);
  1375. Owned<ISharedOnceContext> sharedOnceContext;
  1376. IPropertyTree *workflow = dll->queryWorkUnit()->queryWorkflowTree();
  1377. if (workflow && workflow->hasProp("Item[@mode='once']"))
  1378. sharedOnceContext.setown(new CSharedOnceContext);
  1379. Owned<CRoxieServerQueryFactory> newFactory = new CRoxieServerQueryFactory(id, dll, dynamic_cast<const IRoxiePackage&>(package), hashValue, sharedOnceContext, isDynamic);
  1380. newFactory->load(stateInfo);
  1381. if (sharedOnceContext && preloadOnceData)
  1382. {
  1383. Owned<StringContextLogger> logctx = new StringContextLogger(id); // NB may get linked by the onceContext
  1384. sharedOnceContext->checkOnceDone(newFactory, *logctx);
  1385. }
  1386. return newFactory.getClear();
  1387. }
  1388. else
  1389. return new CRoxieServerQueryFactory(id, NULL, dynamic_cast<const IRoxiePackage&>(package), hashValue, NULL, isDynamic);
  1390. }
  1391. extern IQueryFactory *createServerQueryFactoryFromWu(IConstWorkUnit *wu)
  1392. {
  1393. Owned<const IQueryDll> dll = createWuQueryDll(wu);
  1394. if (!dll)
  1395. return NULL;
  1396. SCMStringBuffer wuid;
  1397. return createServerQueryFactory(wu->getWuid(wuid).str(), dll.getClear(), queryRootRoxiePackage(), NULL, true, false); // MORE - if use a constant for id might cache better?
  1398. }
  1399. //==============================================================================================================================================
  1400. class CSlaveQueryFactory : public CQueryFactory
  1401. {
  1402. void addActivity(ISlaveActivityFactory *activity, ActivityArray *activities)
  1403. {
  1404. activities->append(*activity);
  1405. unsigned activityId = activity->queryId();
  1406. allActivities.setValue(activityId, activity);
  1407. }
  1408. void loadSlaveNode(IPropertyTree &node, unsigned subgraphId, ActivityArray *activities)
  1409. {
  1410. ThorActivityKind kind = getActivityKind(node);
  1411. switch (kind)
  1412. {
  1413. case TAKcsvread:
  1414. case TAKxmlread:
  1415. case TAKdiskread:
  1416. if (node.getPropBool("att[@name='_isSpill']/@value", false) || node.getPropBool("att[@name='_isSpillGlobal']/@value", false))
  1417. return;
  1418. break;
  1419. case TAKkeyedjoin:
  1420. case TAKkeyeddenormalize:
  1421. case TAKkeyeddenormalizegroup:
  1422. case TAKdisknormalize:
  1423. case TAKdiskcount:
  1424. case TAKdiskaggregate:
  1425. case TAKdiskgroupaggregate:
  1426. case TAKindexread:
  1427. case TAKindexnormalize:
  1428. case TAKindexcount:
  1429. case TAKindexaggregate:
  1430. case TAKindexgroupaggregate:
  1431. case TAKindexgroupexists:
  1432. case TAKindexgroupcount:
  1433. case TAKfetch:
  1434. case TAKcsvfetch:
  1435. case TAKxmlfetch:
  1436. case TAKremotegraph:
  1437. break;
  1438. case TAKsubgraph:
  1439. break;
  1440. default:
  1441. return;
  1442. }
  1443. ISlaveActivityFactory *newAct = NULL;
  1444. if (kind != TAKsubgraph)
  1445. {
  1446. if (isSuspended)
  1447. newAct = createRoxieDummyActivityFactory(node, subgraphId, *this, false); // MORE - is there any point?
  1448. else
  1449. {
  1450. StringBuffer helperName;
  1451. node.getProp("att[@name=\"helper\"]/@value", helperName);
  1452. if (!helperName.length())
  1453. helperName.append("fAc").append(node.getPropInt("@id", 0));
  1454. HelperFactory *helperFactory = dll->getFactory(helperName.str());
  1455. if (!helperFactory)
  1456. throw MakeStringException(ROXIE_INTERNAL_ERROR, "Internal error: helper function %s not exported", helperName.str());
  1457. switch (kind)
  1458. {
  1459. case TAKdiskread:
  1460. newAct = createRoxieDiskReadActivityFactory(node, subgraphId, *this, helperFactory);
  1461. break;
  1462. case TAKcsvread:
  1463. newAct = createRoxieCsvReadActivityFactory(node, subgraphId, *this, helperFactory);
  1464. break;
  1465. case TAKxmlread:
  1466. newAct = createRoxieXmlReadActivityFactory(node, subgraphId, *this, helperFactory);
  1467. break;
  1468. case TAKdisknormalize:
  1469. newAct = createRoxieDiskNormalizeActivityFactory(node, subgraphId, *this, helperFactory);
  1470. break;
  1471. case TAKdiskcount:
  1472. newAct = createRoxieDiskCountActivityFactory(node, subgraphId, *this, helperFactory);
  1473. break;
  1474. case TAKdiskaggregate:
  1475. newAct = createRoxieDiskAggregateActivityFactory(node, subgraphId, *this, helperFactory);
  1476. break;
  1477. case TAKdiskgroupaggregate:
  1478. newAct = createRoxieDiskGroupAggregateActivityFactory(node, subgraphId, *this, helperFactory);
  1479. break;
  1480. case TAKindexread:
  1481. newAct = createRoxieIndexReadActivityFactory(node, subgraphId, *this, helperFactory);
  1482. break;
  1483. case TAKindexnormalize:
  1484. newAct = createRoxieIndexNormalizeActivityFactory(node, subgraphId, *this, helperFactory);
  1485. break;
  1486. case TAKindexcount:
  1487. newAct = createRoxieIndexCountActivityFactory(node, subgraphId, *this, helperFactory);
  1488. break;
  1489. case TAKindexaggregate:
  1490. newAct = createRoxieIndexAggregateActivityFactory(node, subgraphId, *this, helperFactory);
  1491. break;
  1492. case TAKindexgroupaggregate:
  1493. case TAKindexgroupexists:
  1494. case TAKindexgroupcount:
  1495. newAct = createRoxieIndexGroupAggregateActivityFactory(node, subgraphId, *this, helperFactory, kind);
  1496. break;
  1497. case TAKfetch:
  1498. newAct = createRoxieFetchActivityFactory(node, subgraphId, *this, helperFactory);
  1499. break;
  1500. case TAKcsvfetch:
  1501. newAct = createRoxieCSVFetchActivityFactory(node, subgraphId, *this, helperFactory);
  1502. break;
  1503. case TAKxmlfetch:
  1504. newAct = createRoxieXMLFetchActivityFactory(node, subgraphId, *this, helperFactory);
  1505. break;
  1506. case TAKkeyedjoin:
  1507. case TAKkeyeddenormalize:
  1508. case TAKkeyeddenormalizegroup:
  1509. newAct = createRoxieKeyedJoinIndexActivityFactory(node, subgraphId, *this, helperFactory);
  1510. if (node.getPropBool("att[@name=\"_diskAccessRequired\"]/@value"))
  1511. {
  1512. ISlaveActivityFactory *newAct2 = createRoxieKeyedJoinFetchActivityFactory(node, subgraphId, *this, helperFactory);
  1513. unsigned activityId2 = newAct2->queryId() | ROXIE_ACTIVITY_FETCH;
  1514. activities->append(*newAct2);
  1515. allActivities.setValue(activityId2, newAct2);
  1516. }
  1517. break;
  1518. case TAKremotegraph:
  1519. {
  1520. unsigned graphId = node.getPropInt("att[@name=\"_graphid\"]/@value", 0);
  1521. newAct = createRoxieRemoteActivityFactory(node, subgraphId, *this, helperFactory, graphId);
  1522. break;
  1523. }
  1524. default:
  1525. throwUnexpected();
  1526. }
  1527. }
  1528. if (newAct)
  1529. {
  1530. addActivity(newAct, activities);
  1531. }
  1532. }
  1533. else if (kind == TAKsubgraph)
  1534. {
  1535. // If the subgraph belongs to a remote activity, we need to be able to execute it on the slave...
  1536. IPropertyTree * childGraphNode = node.queryPropTree("att/graph");
  1537. if (!childGraphNode->getPropBool("@child"))
  1538. {
  1539. unsigned parentId = findParentId(node);
  1540. assertex(parentId);
  1541. unsigned parentIndex = activities->findActivityIndex(parentId);
  1542. if (parentIndex != NotFound)
  1543. {
  1544. ActivityArray *childQuery = loadChildGraph(*childGraphNode);
  1545. activities->item(parentIndex).addChildQuery(node.getPropInt("@id"), childQuery);
  1546. }
  1547. }
  1548. // Regardless, we need to make sure we create remote activities as required throughout the graph
  1549. Owned<IPropertyTreeIterator> nodes = node.getElements("att/graph/node");
  1550. unsigned subgraphId = node.getPropInt("@id");
  1551. ForEach(*nodes)
  1552. {
  1553. IPropertyTree &node = nodes->query();
  1554. loadSlaveNode(node, subgraphId, activities);
  1555. }
  1556. }
  1557. }
  1558. void loadOuterSubgraph(IPropertyTree &graph, ActivityArray *activities)
  1559. {
  1560. Owned<IPropertyTreeIterator> nodes = graph.getElements("att/graph/node");
  1561. unsigned subgraphId = graph.getPropInt("@id");
  1562. ForEach(*nodes)
  1563. {
  1564. IPropertyTree &node = nodes->query();
  1565. loadSlaveNode(node, subgraphId, activities);
  1566. }
  1567. loadSlaveNode(graph, subgraphId, activities); // MORE - not really sure why this line is here!
  1568. }
  1569. public:
  1570. CSlaveQueryFactory(const char *_id, const IQueryDll *_dll, const IRoxiePackage &_package, hash64_t _hashValue, unsigned _channelNo, ISharedOnceContext *_sharedOnceContext, bool _dynamic)
  1571. : CQueryFactory(_id, _dll, _package, _hashValue, _channelNo, _sharedOnceContext, _dynamic)
  1572. {
  1573. }
  1574. virtual IRoxieSlaveContext *createSlaveContext(const SlaveContextLogger &logctx, IRoxieQueryPacket *packet) const
  1575. {
  1576. return ::createSlaveContext(this, logctx, timeLimit, memoryLimit, packet);
  1577. }
  1578. virtual ActivityArray *loadGraph(IPropertyTree &graph, const char *graphName)
  1579. {
  1580. // MORE: common up with loadGraph for the Roxie server..
  1581. bool isLibraryGraph = graph.getPropBool("@library");
  1582. bool isSequential = graph.getPropBool("@sequential");
  1583. ActivityArray *activities = new ActivityArray(isLibraryGraph, false, isLibraryGraph, isSequential);
  1584. if (isLibraryGraph)
  1585. activities->setLibraryGraphId(graph.getPropInt("node/@id"));
  1586. try
  1587. {
  1588. if (false && isLibraryGraph)
  1589. {
  1590. //Really only need to do this if the library is called from a remote activity
  1591. //but it's a bit tricky to work out since the library graph will come before the use.
  1592. //Not a major issue since libraries won't be embedded for production queries.
  1593. // this comment makes little sense...
  1594. Owned<IPropertyTreeIterator> subgraphs = graph.getElements("node");
  1595. ForEach(*subgraphs)
  1596. {
  1597. IPropertyTree &node = subgraphs->query();
  1598. loadSubgraph(node, activities);
  1599. loadNode(node, 0, activities);
  1600. }
  1601. }
  1602. Owned<IPropertyTreeIterator> subgraphs = graph.getElements("node");
  1603. ForEach(*subgraphs)
  1604. {
  1605. IPropertyTree &subgraph = subgraphs->query();
  1606. loadOuterSubgraph(subgraph, activities);
  1607. }
  1608. addDependencies(graph, activities);
  1609. }
  1610. catch (...)
  1611. {
  1612. ::Release(activities);
  1613. allActivities.kill();
  1614. throw;
  1615. }
  1616. return activities;
  1617. }
  1618. };
  1619. IQueryFactory *createSlaveQueryFactory(const char *id, const IQueryDll *dll, const IRoxiePackage &package, unsigned channel, const IPropertyTree *stateInfo, bool isDynamic, bool forceRetry)
  1620. {
  1621. CriticalBlock b(CQueryFactory::queryCreateLock);
  1622. IArrayOf<IResolvedFile> queryFiles; // Note - these should stay in scope long enough to ensure still cached when (if) query is loaded for real
  1623. hash64_t hashValue = CQueryFactory::getQueryHash(id, dll, package, stateInfo, queryFiles, isDynamic);
  1624. IQueryFactory *cached = getQueryFactory(hashValue, channel);
  1625. if (cached)
  1626. {
  1627. ::Release(dll);
  1628. return cached;
  1629. }
  1630. if (dll)
  1631. {
  1632. checkWorkunitVersionConsistency(dll);
  1633. Owned<IQueryFactory> serverFactory = createServerQueryFactory(id, LINK(dll), package, stateInfo, false, forceRetry); // Should always find a cached one
  1634. Owned<CSlaveQueryFactory> newFactory = new CSlaveQueryFactory(id, dll, dynamic_cast<const IRoxiePackage&>(package), hashValue, channel, serverFactory->querySharedOnceContext(), isDynamic);
  1635. newFactory->load(stateInfo);
  1636. return newFactory.getClear();
  1637. }
  1638. else
  1639. return new CSlaveQueryFactory(id, NULL, dynamic_cast<const IRoxiePackage&>(package), hashValue, channel, NULL, isDynamic);
  1640. }
  1641. extern IQueryFactory *createSlaveQueryFactoryFromWu(IConstWorkUnit *wu, unsigned channelNo)
  1642. {
  1643. Owned<const IQueryDll> dll = createWuQueryDll(wu);
  1644. if (!dll)
  1645. return NULL;
  1646. SCMStringBuffer wuid;
  1647. return createSlaveQueryFactory(wu->getWuid(wuid).str(), dll.getClear(), queryRootRoxiePackage(), channelNo, NULL, true, false); // MORE - if use a constant for id might cache better?
  1648. }
  1649. IRecordLayoutTranslator * createRecordLayoutTranslator(const char *logicalName, IDefRecordMeta const * diskMeta, IDefRecordMeta const * activityMeta)
  1650. {
  1651. try
  1652. {
  1653. return ::createRecordLayoutTranslator(diskMeta, activityMeta);
  1654. }
  1655. catch (IException *E)
  1656. {
  1657. StringBuffer q, d;
  1658. getRecordMetaAsString(q, activityMeta);
  1659. getRecordMetaAsString(d, diskMeta);
  1660. DBGLOG("Activity: %s", q.str());
  1661. DBGLOG("Disk: %s", d.str());
  1662. StringBuffer m;
  1663. m.appendf("In index %s:", logicalName);
  1664. E->errorMessage(m);
  1665. E->Release();
  1666. DBGLOG("%s", m.str());
  1667. throw MakeStringException(ROXIE_RCD_LAYOUT_TRANSLATOR, "%s", m.str());
  1668. }
  1669. }