slave.cpp 30 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809
  1. /*##############################################################################
  2. HPCC SYSTEMS software Copyright (C) 2012 HPCC Systems®.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. ############################################################################## */
  13. #include "platform.h"
  14. #include "jlib.hpp"
  15. #include "jptree.hpp"
  16. #include "commonext.hpp"
  17. #include "thorport.hpp"
  18. #include "thormisc.hpp"
  19. #include "thactivityutil.ipp"
  20. #include "thexception.hpp"
  21. #ifndef _WIN32
  22. #include <stdexcept>
  23. #endif
  24. #include "thorfile.hpp"
  25. #include "thgraphslave.hpp"
  26. #include "slave.ipp"
  27. #include <new>
  28. #define FATAL_ACTJOIN_TIMEOUT (5*60*1000)
  29. activityslaves_decl CGraphElementBase *createSlaveContainer(IPropertyTree &xgmml, CGraphBase &owner, CGraphBase *resultsGraph);
  30. MODULE_INIT(INIT_PRIORITY_STANDARD)
  31. {
  32. registerCreateFunc(&createSlaveContainer);
  33. return true;
  34. }
  35. //---------------------------------------------------------------------------
  36. // ProcessSlaveActivity
  37. ProcessSlaveActivity::ProcessSlaveActivity(CGraphElementBase *container, const StatisticsMapping &statsMapping)
  38. : CSlaveActivity(container, statsMapping), threaded("ProcessSlaveActivity", this)
  39. {
  40. }
  41. void ProcessSlaveActivity::beforeDispose()
  42. {
  43. // Note - we can't throw from the destructor, so do this in beforeDispose instead
  44. // If the exception is thrown then we are liable to leak the object, but we are dying anyway...
  45. ::ActPrintLog(this, thorDetailedLogLevel, "destroying ProcessSlaveActivity");
  46. ::ActPrintLog(this, thorDetailedLogLevel, "ProcessSlaveActivity : joining process thread");
  47. // NB: The activity thread should have already stopped,
  48. // if it is still alive at job shutdown and cannot be joined then the thread is in an unknown state.
  49. if (!threaded.join(FATAL_ACTJOIN_TIMEOUT))
  50. throw MakeThorFatal(NULL, TE_FailedToAbortSlaves, "Activity %" ACTPF "d failed to stop", container.queryId());
  51. ::ActPrintLog(this, thorDetailedLogLevel, "AFTER ProcessSlaveActivity : joining process thread");
  52. }
  53. void ProcessSlaveActivity::startProcess(bool async)
  54. {
  55. if (async)
  56. threaded.start();
  57. else
  58. threadmain();
  59. }
  60. void ProcessSlaveActivity::threadmain()
  61. {
  62. try
  63. {
  64. #ifdef TIME_ACTIVITIES
  65. if (timeActivities)
  66. {
  67. lastCycles = get_cycles_now(); // serializeStats will reset
  68. process();
  69. // set lastCycles to 0 to signal not processing
  70. unsigned __int64 finalCycles = lastCycles.exchange(0);
  71. slaveTimerStats.totalCycles += get_cycles_now()-finalCycles;
  72. }
  73. else
  74. process();
  75. #else
  76. process();
  77. #endif
  78. }
  79. catch (IException *_e)
  80. {
  81. IThorException *e = QUERYINTERFACE(_e, IThorException);
  82. if (e)
  83. {
  84. if (!e->queryActivityId())
  85. {
  86. e->setGraphInfo(container.queryJob().queryGraphName(), container.queryOwner().queryGraphId());
  87. e->setActivityKind(container.getKind());
  88. e->setActivityId(container.queryId());
  89. }
  90. exception.set(e);
  91. }
  92. else
  93. {
  94. e = MakeActivityException(this, _e);
  95. if (QUERYINTERFACE(_e, ISEH_Exception))
  96. {
  97. IThorException *e2 = MakeThorFatal(e, TE_SEH, "FATAL: (SEH)");
  98. e->Release();
  99. e = e2;
  100. }
  101. _e->Release();
  102. exception.setown(e);
  103. }
  104. ActPrintLog(e);
  105. }
  106. catch (std::exception & es)
  107. {
  108. StringBuffer m("FATAL std::exception ");
  109. if(dynamic_cast<std::bad_alloc *>(&es))
  110. m.append("out of memory (std::bad_alloc)");
  111. else
  112. m.append("standard library exception (std::exception ").append(es.what()).append(")");
  113. m.appendf(" in %" ACTPF "d",container.queryId());
  114. ActPrintLogEx(&queryContainer(), thorlog_null, MCerror, "%s", m.str());
  115. exception.setown(MakeThorFatal(NULL, TE_UnknownException, "%s", m.str()));
  116. }
  117. catch (CATCHALL)
  118. {
  119. ActPrintLogEx(&queryContainer(), thorlog_null, MCerror, "Unknown exception thrown in process()");
  120. exception.setown(MakeThorFatal(NULL, TE_UnknownException, "FATAL: Unknown exception thrown by ProcessThread"));
  121. }
  122. if (exception)
  123. fireException(exception);
  124. try { endProcess(); }
  125. catch (IException *_e)
  126. {
  127. ActPrintLog(_e, "Exception calling activity endProcess");
  128. IThorException *e = QUERYINTERFACE(_e, IThorException);
  129. if (e)
  130. exception.set(e);
  131. else
  132. exception.setown(MakeActivityException(this, _e));
  133. _e->Release();
  134. fireException(exception);
  135. }
  136. }
  137. bool ProcessSlaveActivity::wait(unsigned timeout)
  138. {
  139. if (!threaded.join(timeout))
  140. return false;
  141. if (exception.get())
  142. throw exception.getClear();
  143. return true;
  144. }
  145. void ProcessSlaveActivity::serializeStats(MemoryBuffer &mb)
  146. {
  147. #ifdef TIME_ACTIVITIES
  148. if (timeActivities)
  149. {
  150. unsigned __int64 curCycles = lastCycles;
  151. if (curCycles)
  152. {
  153. unsigned __int64 nowCycles = get_cycles_now();
  154. //Update lastCycles to the current number of cycles - unless it has been set to 0 in the meantime
  155. //Use std::memory_order_relaxed because there is no requirement for other variables to be synchronized.
  156. if (lastCycles.compare_exchange_strong(curCycles, nowCycles, std::memory_order_relaxed))
  157. slaveTimerStats.totalCycles += nowCycles-curCycles;
  158. }
  159. }
  160. #endif
  161. CSlaveActivity::serializeStats(mb);
  162. mb.append(processed);
  163. }
  164. void ProcessSlaveActivity::done()
  165. {
  166. CSlaveActivity::done();
  167. if (exception.get())
  168. throw exception.getClear();
  169. }
  170. #include "aggregate/thaggregateslave.ipp"
  171. #include "aggregate/thgroupaggregateslave.ipp"
  172. #include "apply/thapplyslave.ipp"
  173. #include "choosesets/thchoosesetsslave.ipp"
  174. #include "countproject/thcountprojectslave.ipp"
  175. #include "degroup/thdegroupslave.ipp"
  176. #include "diskread/thdiskreadslave.ipp"
  177. #include "diskwrite/thdwslave.ipp"
  178. #include "distribution/thdistributionslave.ipp"
  179. #include "enth/thenthslave.ipp"
  180. #include "fetch/thfetchslave.ipp"
  181. #include "filter/thfilterslave.ipp"
  182. #include "firstn/thfirstnslave.ipp"
  183. #include "funnel/thfunnelslave.ipp"
  184. #include "group/thgroupslave.ipp"
  185. #include "hashdistrib/thhashdistribslave.ipp"
  186. #include "indexread/thindexreadslave.ipp"
  187. #include "iterate/thgroupiterateslave.ipp"
  188. #include "iterate/thiterateslave.ipp"
  189. #include "join/thjoinslave.ipp"
  190. #include "keyedjoin/thkeyedjoinslave.ipp"
  191. #include "limit/thlimitslave.ipp"
  192. #include "merge/thmergeslave.ipp"
  193. #include "msort/thgroupsortslave.ipp"
  194. #include "msort/thmsortslave.ipp"
  195. #include "normalize/thnormalizeslave.ipp"
  196. #include "nsplitter/thnsplitterslave.ipp"
  197. #include "null/thnullslave.ipp"
  198. #include "nullaction/thnullactionslave.ipp"
  199. #include "parse/thparseslave.ipp"
  200. #include "piperead/thprslave.ipp"
  201. #include "pipewrite/thpwslave.ipp"
  202. #include "project/thprojectslave.ipp"
  203. #include "pull/thpullslave.ipp"
  204. #include "result/thresultslave.ipp"
  205. #include "rollup/throllupslave.ipp"
  206. #include "sample/thsampleslave.ipp"
  207. #include "selectnth/thselectnthslave.ipp"
  208. #include "selfjoin/thselfjoinslave.ipp"
  209. #include "spill/thspillslave.ipp"
  210. #include "soapcall/thsoapcallslave.ipp"
  211. #include "temptable/thtmptableslave.ipp"
  212. #include "topn/thtopnslave.ipp"
  213. #include "wuidread/thwuidreadslave.ipp"
  214. #include "wuidwrite/thwuidwriteslave.ipp"
  215. #include "xmlwrite/thxmlwriteslave.ipp"
  216. CActivityBase *createLookupJoinSlave(CGraphElementBase *container);
  217. CActivityBase *createAllJoinSlave(CGraphElementBase *container);
  218. CActivityBase *createXmlParseSlave(CGraphElementBase *container);
  219. CActivityBase *createKeyDiffSlave(CGraphElementBase *container);
  220. CActivityBase *createKeyPatchSlave(CGraphElementBase *container);
  221. CActivityBase *createCsvReadSlave(CGraphElementBase *container);
  222. CActivityBase *createXmlReadSlave(CGraphElementBase *container);
  223. CActivityBase *createIndexWriteSlave(CGraphElementBase *container);
  224. CActivityBase *createLoopSlave(CGraphElementBase *container);
  225. CActivityBase *createLocalResultReadSlave(CGraphElementBase *container);
  226. CActivityBase *createLocalResultWriteSlave(CGraphElementBase *container);
  227. CActivityBase *createLocalResultSpillSlave(CGraphElementBase *container);
  228. CActivityBase *createGraphLoopSlave(CGraphElementBase *container);
  229. CActivityBase *createGraphLoopResultReadSlave(CGraphElementBase *container);
  230. CActivityBase *createGraphLoopResultWriteSlave(CGraphElementBase *container);
  231. CActivityBase *createIfSlave(CGraphElementBase *container);
  232. CActivityBase *createCaseSlave(CGraphElementBase *container);
  233. CActivityBase *createCatchSlave(CGraphElementBase *container);
  234. CActivityBase *createChildNormalizeSlave(CGraphElementBase *container);
  235. CActivityBase *createChildAggregateSlave(CGraphElementBase *container);
  236. CActivityBase *createChildGroupAggregateSlave(CGraphElementBase *container);
  237. CActivityBase *createChildThroughNormalizeSlave(CGraphElementBase *container);
  238. CActivityBase *createWhenSlave(CGraphElementBase *container);
  239. CActivityBase *createDictionaryWorkunitWriteSlave(CGraphElementBase *container);
  240. CActivityBase *createDictionaryResultWriteSlave(CGraphElementBase *container);
  241. CActivityBase *createTraceSlave(CGraphElementBase *container);
  242. CActivityBase *createIfActionSlave(CGraphElementBase *container);
  243. CActivityBase *createExternalSlave(CGraphElementBase *container);
  244. CActivityBase *createExternalSinkSlave(CGraphElementBase *container);
  245. class CGenericSlaveGraphElement : public CSlaveGraphElement
  246. {
  247. bool wuidread2diskread; // master decides after interrogating result and sneaks in info before slave creates
  248. StringAttr wuidreadFilename;
  249. Owned<CActivityBase> nullActivity;
  250. CriticalSection nullActivityCs;
  251. public:
  252. CGenericSlaveGraphElement(CGraphBase &_owner, IPropertyTree &xgmml, CGraphBase *resultsGraph) : CSlaveGraphElement(_owner, xgmml, resultsGraph)
  253. {
  254. wuidread2diskread = false;
  255. switch (getKind())
  256. {
  257. case TAKnull:
  258. case TAKsimpleaction:
  259. case TAKsideeffect:
  260. nullAct = true;
  261. break;
  262. default:
  263. break;
  264. }
  265. }
  266. virtual void deserializeCreateContext(MemoryBuffer &mb)
  267. {
  268. CSlaveGraphElement::deserializeCreateContext(mb);
  269. if (TAKworkunitread == kind)
  270. {
  271. mb.read(wuidread2diskread); // have I been converted
  272. if (wuidread2diskread)
  273. mb.read(wuidreadFilename);
  274. }
  275. haveCreateCtx = true;
  276. }
  277. virtual CActivityBase *factory(ThorActivityKind kind)
  278. {
  279. CActivityBase *ret = NULL;
  280. switch (kind)
  281. {
  282. case TAKdiskread:
  283. case TAKspillread:
  284. ret = createDiskReadSlave(this);
  285. break;
  286. case TAKdisknormalize:
  287. ret = createDiskNormalizeSlave(this);
  288. break;
  289. case TAKdiskaggregate:
  290. ret = createDiskAggregateSlave(this);
  291. break;
  292. case TAKdiskcount:
  293. ret = createDiskCountSlave(this);
  294. break;
  295. case TAKdiskgroupaggregate:
  296. ret = createDiskGroupAggregateSlave(this);
  297. break;
  298. case TAKindexread:
  299. ret = createIndexReadSlave(this);
  300. break;
  301. case TAKindexcount:
  302. ret = createIndexCountSlave(this);
  303. break;
  304. case TAKindexnormalize:
  305. ret = createIndexNormalizeSlave(this);
  306. break;
  307. case TAKindexaggregate:
  308. ret = createIndexAggregateSlave(this);
  309. break;
  310. case TAKindexgroupaggregate:
  311. case TAKindexgroupexists:
  312. case TAKindexgroupcount:
  313. ret = createIndexGroupAggregateSlave(this);
  314. break;
  315. case TAKchildaggregate:
  316. ret = createChildAggregateSlave(this);
  317. break;
  318. case TAKchildgroupaggregate:
  319. ret = createChildGroupAggregateSlave(this);
  320. break;
  321. case TAKchildthroughnormalize:
  322. ret = createChildThroughNormalizeSlave(this);
  323. break;
  324. case TAKchildnormalize:
  325. ret = createChildNormalizeSlave(this);
  326. break;
  327. case TAKspill:
  328. ret = createSpillSlave(this);
  329. break;
  330. case TAKdiskwrite:
  331. case TAKspillwrite:
  332. ret = createDiskWriteSlave(this);
  333. break;
  334. case TAKsort:
  335. if (queryGrouped() || queryLocal())
  336. ret = createLocalSortSlave(this);
  337. else
  338. ret = createMSortSlave(this);
  339. break;
  340. case TAKsorted:
  341. ret = createSortedSlave(this);
  342. break;
  343. case TAKtrace:
  344. ret = createTraceSlave(this);
  345. break;
  346. case TAKdedup:
  347. if (queryGrouped())
  348. ret = createGroupDedupSlave(this);
  349. else if (queryLocal())
  350. ret = createLocalDedupSlave(this);
  351. else
  352. ret = createDedupSlave(this);
  353. break;
  354. case TAKrollupgroup:
  355. ret = createRollupGroupSlave(this);
  356. break;
  357. case TAKrollup:
  358. if (queryGrouped())
  359. ret = createGroupRollupSlave(this);
  360. else if (queryLocal())
  361. ret = createLocalRollupSlave(this);
  362. else
  363. ret = createRollupSlave(this);
  364. break;
  365. case TAKprocess:
  366. if (queryGrouped())
  367. ret = createGroupProcessSlave(this);
  368. else if (queryLocal())
  369. ret = createLocalProcessSlave(this);
  370. else
  371. ret = createProcessSlave(this);
  372. break;
  373. case TAKfilter:
  374. ret = createFilterSlave(this);
  375. break;
  376. case TAKfilterproject:
  377. ret = createFilterProjectSlave(this);
  378. break;
  379. case TAKfiltergroup:
  380. ret = createFilterGroupSlave(this);
  381. break;
  382. case TAKsplit:
  383. ret = createNSplitterSlave(this);
  384. break;
  385. case TAKproject:
  386. ret = createProjectSlave(this);
  387. break;
  388. case TAKprefetchproject:
  389. ret = createPrefetchProjectSlave(this);
  390. break;
  391. case TAKprefetchcountproject:
  392. break;
  393. case TAKiterate:
  394. if (queryGrouped())
  395. ret = createGroupIterateSlave(this);
  396. else if (queryLocal())
  397. ret = createLocalIterateSlave(this);
  398. else
  399. ret = createIterateSlave(this);
  400. break;
  401. case TAKaggregate:
  402. case TAKexistsaggregate:
  403. case TAKcountaggregate:
  404. if (queryLocalOrGrouped())
  405. ret = createGroupAggregateSlave(this);
  406. else
  407. ret = createAggregateSlave(this);
  408. break;
  409. case TAKhashaggregate:
  410. ret = createHashAggregateSlave(this);
  411. break;
  412. case TAKfirstn:
  413. ret = createFirstNSlave(this);
  414. break;
  415. case TAKsample:
  416. ret = createSampleSlave(this);
  417. break;
  418. case TAKdegroup:
  419. ret = createDegroupSlave(this);
  420. break;
  421. case TAKjoin:
  422. if (queryLocalOrGrouped())
  423. ret = createLocalJoinSlave(this);
  424. else
  425. ret = createJoinSlave(this);
  426. break;
  427. case TAKhashjoin:
  428. case TAKhashdenormalize:
  429. case TAKhashdenormalizegroup:
  430. ret = createHashJoinSlave(this);
  431. break;
  432. case TAKlookupjoin:
  433. case TAKlookupdenormalize:
  434. case TAKlookupdenormalizegroup:
  435. case TAKsmartjoin:
  436. case TAKsmartdenormalize:
  437. case TAKsmartdenormalizegroup:
  438. ret = createLookupJoinSlave(this);
  439. break;
  440. case TAKalljoin:
  441. case TAKalldenormalize:
  442. case TAKalldenormalizegroup:
  443. ret = createAllJoinSlave(this);
  444. break;
  445. case TAKselfjoin:
  446. if (queryLocalOrGrouped())
  447. ret = createLocalSelfJoinSlave(this);
  448. else
  449. ret = createSelfJoinSlave(this);
  450. break;
  451. case TAKselfjoinlight:
  452. ret = createLightweightSelfJoinSlave(this);
  453. break;
  454. case TAKkeyedjoin:
  455. case TAKkeyeddenormalize:
  456. case TAKkeyeddenormalizegroup:
  457. ret = createKeyedJoinSlave(this);
  458. break;
  459. case TAKgroup:
  460. ret = createGroupSlave(this);
  461. break;
  462. case TAKworkunitwrite:
  463. ret = createWorkUnitWriteSlave(this);
  464. break;
  465. case TAKdictionaryworkunitwrite:
  466. ret = createDictionaryWorkunitWriteSlave(this);
  467. break;
  468. case TAKdictionaryresultwrite:
  469. ret = createDictionaryResultWriteSlave(this);
  470. break;
  471. case TAKfunnel:
  472. ret = createFunnelSlave(this);
  473. break;
  474. case TAKcombine:
  475. ret = createCombineSlave(this);
  476. break;
  477. case TAKcombinegroup:
  478. ret = createCombineGroupSlave(this);
  479. break;
  480. case TAKregroup:
  481. ret = createRegroupSlave(this);
  482. break;
  483. case TAKapply:
  484. ret = createApplySlave(this);
  485. break;
  486. case TAKifaction:
  487. ret = createIfActionSlave(this);
  488. break;
  489. case TAKinlinetable:
  490. ret = createInlineTableSlave(this);
  491. break;
  492. case TAKkeyeddistribute:
  493. ret = createIndexDistributeSlave(this);
  494. break;
  495. case TAKhashdistribute:
  496. ret = createHashDistributeSlave(this);
  497. break;
  498. case TAKnwaydistribute:
  499. ret = createNWayDistributeSlave(this);
  500. break;
  501. case TAKdistributed:
  502. ret = createHashDistributedSlave(this);
  503. break;
  504. case TAKhashdistributemerge:
  505. ret = createHashDistributeMergeSlave(this);
  506. break;
  507. case TAKhashdedup:
  508. if (queryLocalOrGrouped())
  509. ret = createHashLocalDedupSlave(this);
  510. else
  511. ret = createHashDedupSlave(this);
  512. break;
  513. case TAKnormalize:
  514. ret = createNormalizeSlave(this);
  515. break;
  516. case TAKnormalizechild:
  517. ret = createNormalizeChildSlave(this);
  518. break;
  519. case TAKnormalizelinkedchild:
  520. ret = createNormalizeLinkedChildSlave(this);
  521. break;
  522. case TAKremoteresult:
  523. ret = createResultSlave(this);
  524. break;
  525. case TAKpull:
  526. ret = createPullSlave(this);
  527. break;
  528. case TAKdenormalize:
  529. case TAKdenormalizegroup:
  530. if (queryLocalOrGrouped())
  531. ret = createLocalDenormalizeSlave(this);
  532. else
  533. ret = createDenormalizeSlave(this);
  534. break;
  535. case TAKnwayinput:
  536. ret = createNWayInputSlave(this);
  537. break;
  538. case TAKnwayselect:
  539. ret = createNWaySelectSlave(this);
  540. break;
  541. case TAKnwaymerge:
  542. ret = createNWayMergeActivity(this);
  543. break;
  544. case TAKnwaymergejoin:
  545. case TAKnwayjoin:
  546. ret = createNWayMergeJoinActivity(this);
  547. break;
  548. case TAKchilddataset:
  549. UNIMPLEMENTED;
  550. case TAKchilditerator:
  551. ret = createChildIteratorSlave(this);
  552. break;
  553. case TAKlinkedrawiterator:
  554. ret = createLinkedRawIteratorSlave(this);
  555. break;
  556. case TAKselectn:
  557. if (queryLocalOrGrouped())
  558. ret = createLocalSelectNthSlave(this);
  559. else
  560. ret = createSelectNthSlave(this);
  561. break;
  562. case TAKenth:
  563. if (queryLocalOrGrouped())
  564. ret = createLocalEnthSlave(this);
  565. else
  566. ret = createEnthSlave(this);
  567. break;
  568. case TAKnull:
  569. ret = createNullSlave(this);
  570. break;
  571. case TAKdistribution:
  572. ret = createDistributionSlave(this);
  573. break;
  574. case TAKcountproject:
  575. if (queryLocalOrGrouped())
  576. ret = createLocalCountProjectSlave(this);
  577. else
  578. ret = createCountProjectSlave(this);
  579. break;
  580. case TAKchoosesets:
  581. if (queryLocalOrGrouped())
  582. ret = createLocalChooseSetsSlave(this);
  583. else
  584. ret = createChooseSetsSlave(this);
  585. break;
  586. case TAKpiperead:
  587. ret = createPipeReadSlave(this);
  588. break;
  589. case TAKpipewrite:
  590. ret = createPipeWriteSlave(this);
  591. break;
  592. case TAKcsvread:
  593. ret = createCsvReadSlave(this);
  594. break;
  595. case TAKcsvwrite:
  596. ret = createCsvWriteSlave(this);
  597. break;
  598. case TAKpipethrough:
  599. ret = createPipeThroughSlave(this);
  600. break;
  601. case TAKindexwrite:
  602. ret = createIndexWriteSlave(this);
  603. break;
  604. case TAKchoosesetsenth:
  605. ret = createChooseSetsEnthSlave(this);
  606. break;
  607. case TAKchoosesetslast:
  608. ret = createChooseSetsLastSlave(this);
  609. break;
  610. case TAKfetch:
  611. ret = createFetchSlave(this);
  612. break;
  613. case TAKcsvfetch:
  614. ret = createCsvFetchSlave(this);
  615. break;
  616. case TAKxmlfetch:
  617. case TAKjsonfetch:
  618. ret = createXmlFetchSlave(this);
  619. break;
  620. case TAKthroughaggregate:
  621. ret = createThroughAggregateSlave(this);
  622. break;
  623. case TAKwhen_dataset:
  624. ret = createWhenSlave(this);
  625. break;
  626. case TAKworkunitread:
  627. {
  628. if (wuidread2diskread)
  629. {
  630. Owned<IHThorDiskReadArg> diskReadHelper = createWorkUnitReadArg(wuidreadFilename, (IHThorWorkunitReadArg *)LINK(baseHelper));
  631. Owned<CActivityBase> retAct = createDiskReadSlave(this, diskReadHelper);
  632. return retAct.getClear();
  633. }
  634. else
  635. ret = createWuidReadSlave(this);
  636. break;
  637. }
  638. case TAKparse:
  639. ret = createParseSlave(this);
  640. break;
  641. case TAKsideeffect:
  642. ret = createNullActionSlave(this);
  643. break;
  644. case TAKsimpleaction:
  645. ret = createNullSlave(this);
  646. break;
  647. case TAKemptyaction:
  648. ret = createNullSinkSlave(this);
  649. break;
  650. case TAKtopn:
  651. if (queryGrouped())
  652. ret = createGroupedTopNSlave(this);
  653. else if (queryLocal())
  654. ret = createLocalTopNSlave(this);
  655. else
  656. ret = createGlobalTopNSlave(this);
  657. break;
  658. case TAKxmlparse:
  659. ret = createXmlParseSlave(this);
  660. break;
  661. case TAKxmlread:
  662. case TAKjsonread:
  663. ret = createXmlReadSlave(this);
  664. break;
  665. case TAKxmlwrite:
  666. case TAKjsonwrite:
  667. ret = createXmlWriteSlave(this, kind);
  668. break;
  669. case TAKmerge:
  670. if (queryLocalOrGrouped())
  671. ret = createLocalMergeSlave(this);
  672. else
  673. ret = createGlobalMergeSlave(this);
  674. break;
  675. case TAKsoap_rowdataset:
  676. ret = createSoapRowCallSlave(this);
  677. break;
  678. case TAKhttp_rowdataset:
  679. ret = createHttpRowCallSlave(this);
  680. break;
  681. case TAKsoap_rowaction:
  682. ret = createSoapRowActionSlave(this);
  683. break;
  684. case TAKsoap_datasetdataset:
  685. ret = createSoapDatasetCallSlave(this);
  686. break;
  687. case TAKsoap_datasetaction:
  688. ret = createSoapDatasetActionSlave(this);
  689. break;
  690. case TAKkeydiff:
  691. ret = createKeyDiffSlave(this);
  692. break;
  693. case TAKkeypatch:
  694. ret = createKeyPatchSlave(this);
  695. break;
  696. case TAKlimit:
  697. ret = createLimitSlave(this);
  698. break;
  699. case TAKskiplimit:
  700. ret = createSkipLimitSlave(this);
  701. break;
  702. case TAKcreaterowlimit:
  703. ret = createRowLimitSlave(this);
  704. break;
  705. case TAKnonempty:
  706. ret = createNonEmptySlave(this);
  707. break;
  708. case TAKlocalresultread:
  709. ret = createLocalResultReadSlave(this);
  710. break;
  711. case TAKlocalresultwrite:
  712. ret = createLocalResultWriteSlave(this);
  713. break;
  714. case TAKlocalresultspill:
  715. ret = createLocalResultSpillSlave(this);
  716. break;
  717. case TAKif:
  718. case TAKchildif:
  719. ret = createIfSlave(this);
  720. break;
  721. case TAKcase:
  722. case TAKchildcase:
  723. ret = createCaseSlave(this);
  724. break;
  725. case TAKcatch:
  726. case TAKskipcatch:
  727. case TAKcreaterowcatch:
  728. ret = createCatchSlave(this);
  729. break;
  730. case TAKlooprow:
  731. case TAKloopcount:
  732. case TAKloopdataset:
  733. ret = createLoopSlave(this);
  734. break;
  735. case TAKgraphloop:
  736. case TAKparallelgraphloop:
  737. ret = createGraphLoopSlave(this);
  738. break;
  739. case TAKgraphloopresultread:
  740. ret = createGraphLoopResultReadSlave(this);
  741. break;
  742. case TAKgraphloopresultwrite:
  743. ret = createGraphLoopResultWriteSlave(this);
  744. break;
  745. case TAKstreamediterator:
  746. ret = createStreamedIteratorSlave(this);
  747. break;
  748. case TAKexternalprocess:
  749. case TAKexternalsource:
  750. ret = createExternalSlave(this);
  751. break;
  752. case TAKexternalsink:
  753. ret = createExternalSinkSlave(this);
  754. break;
  755. default:
  756. throw MakeStringException(TE_UnsupportedActivityKind, "Unsupported activity kind: %s", activityKindStr(kind));
  757. }
  758. return ret;
  759. }
  760. };
  761. activityslaves_decl CGraphElementBase *createSlaveContainer(IPropertyTree &xgmml, CGraphBase &owner, CGraphBase *resultsGraph)
  762. {
  763. return new CGenericSlaveGraphElement(owner, xgmml, resultsGraph);
  764. }
  765. activityslaves_decl IThorRowInterfaces *queryRowInterfaces(IThorDataLink *link) { return link?link->queryFromActivity():NULL; }
  766. activityslaves_decl IEngineRowAllocator * queryRowAllocator(IThorDataLink *link) { CActivityBase *base = link?link->queryFromActivity():NULL; return base?base->queryRowAllocator():NULL; }
  767. activityslaves_decl IOutputRowSerializer * queryRowSerializer(IThorDataLink *link) { CActivityBase *base = link?link->queryFromActivity():NULL; return base?base->queryRowSerializer():NULL; }
  768. activityslaves_decl IOutputRowDeserializer * queryRowDeserializer(IThorDataLink *link) { CActivityBase *base = link?link->queryFromActivity():NULL; return base?base->queryRowDeserializer():NULL; }
  769. activityslaves_decl IOutputMetaData *queryRowMetaData(IThorDataLink *link) { CActivityBase *base = link?link->queryFromActivity():NULL; return base?base->queryRowMetaData():NULL; }
  770. activityslaves_decl unsigned queryActivityId(IThorDataLink *link) { CActivityBase *base = link?link->queryFromActivity():NULL; return base?base->queryId():0; }
  771. activityslaves_decl ICodeContext *queryCodeContext(IThorDataLink *link) { CActivityBase *base = link?link->queryFromActivity():NULL; return base?base->queryCodeContext():NULL; }
  772. void dummyProc() // to force static linking
  773. {
  774. }