123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809 |
- /*##############################################################################
- HPCC SYSTEMS software Copyright (C) 2012 HPCC Systems®.
- Licensed under the Apache License, Version 2.0 (the "License");
- you may not use this file except in compliance with the License.
- You may obtain a copy of the License at
- http://www.apache.org/licenses/LICENSE-2.0
- Unless required by applicable law or agreed to in writing, software
- distributed under the License is distributed on an "AS IS" BASIS,
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- See the License for the specific language governing permissions and
- limitations under the License.
- ############################################################################## */
- #include "platform.h"
- #include "jlib.hpp"
- #include "jptree.hpp"
- #include "commonext.hpp"
- #include "thorport.hpp"
- #include "thormisc.hpp"
- #include "thactivityutil.ipp"
- #include "thexception.hpp"
- #ifndef _WIN32
- #include <stdexcept>
- #endif
- #include "thorfile.hpp"
- #include "thgraphslave.hpp"
- #include "slave.ipp"
- #include <new>
- #define FATAL_ACTJOIN_TIMEOUT (5*60*1000)
- activityslaves_decl CGraphElementBase *createSlaveContainer(IPropertyTree &xgmml, CGraphBase &owner, CGraphBase *resultsGraph);
- MODULE_INIT(INIT_PRIORITY_STANDARD)
- {
- registerCreateFunc(&createSlaveContainer);
- return true;
- }
- //---------------------------------------------------------------------------
- // ProcessSlaveActivity
- ProcessSlaveActivity::ProcessSlaveActivity(CGraphElementBase *container, const StatisticsMapping &statsMapping)
- : CSlaveActivity(container, statsMapping), threaded("ProcessSlaveActivity", this)
- {
- }
- void ProcessSlaveActivity::beforeDispose()
- {
- // Note - we can't throw from the destructor, so do this in beforeDispose instead
- // If the exception is thrown then we are liable to leak the object, but we are dying anyway...
- ::ActPrintLog(this, thorDetailedLogLevel, "destroying ProcessSlaveActivity");
- ::ActPrintLog(this, thorDetailedLogLevel, "ProcessSlaveActivity : joining process thread");
- // NB: The activity thread should have already stopped,
- // if it is still alive at job shutdown and cannot be joined then the thread is in an unknown state.
- if (!threaded.join(FATAL_ACTJOIN_TIMEOUT))
- throw MakeThorFatal(NULL, TE_FailedToAbortSlaves, "Activity %" ACTPF "d failed to stop", container.queryId());
- ::ActPrintLog(this, thorDetailedLogLevel, "AFTER ProcessSlaveActivity : joining process thread");
- }
- void ProcessSlaveActivity::startProcess(bool async)
- {
- if (async)
- threaded.start();
- else
- threadmain();
- }
- void ProcessSlaveActivity::threadmain()
- {
- try
- {
- #ifdef TIME_ACTIVITIES
- if (timeActivities)
- {
- lastCycles = get_cycles_now(); // serializeStats will reset
- process();
- // set lastCycles to 0 to signal not processing
- unsigned __int64 finalCycles = lastCycles.exchange(0);
- slaveTimerStats.totalCycles += get_cycles_now()-finalCycles;
- }
- else
- process();
- #else
- process();
- #endif
- }
- catch (IException *_e)
- {
- IThorException *e = QUERYINTERFACE(_e, IThorException);
- if (e)
- {
- if (!e->queryActivityId())
- {
- e->setGraphInfo(container.queryJob().queryGraphName(), container.queryOwner().queryGraphId());
- e->setActivityKind(container.getKind());
- e->setActivityId(container.queryId());
- }
- exception.set(e);
- }
- else
- {
- e = MakeActivityException(this, _e);
- if (QUERYINTERFACE(_e, ISEH_Exception))
- {
- IThorException *e2 = MakeThorFatal(e, TE_SEH, "FATAL: (SEH)");
- e->Release();
- e = e2;
- }
- _e->Release();
- exception.setown(e);
- }
- ActPrintLog(e);
- }
- catch (std::exception & es)
- {
- StringBuffer m("FATAL std::exception ");
- if(dynamic_cast<std::bad_alloc *>(&es))
- m.append("out of memory (std::bad_alloc)");
- else
- m.append("standard library exception (std::exception ").append(es.what()).append(")");
- m.appendf(" in %" ACTPF "d",container.queryId());
- ActPrintLogEx(&queryContainer(), thorlog_null, MCerror, "%s", m.str());
- exception.setown(MakeThorFatal(NULL, TE_UnknownException, "%s", m.str()));
- }
- catch (CATCHALL)
- {
- ActPrintLogEx(&queryContainer(), thorlog_null, MCerror, "Unknown exception thrown in process()");
- exception.setown(MakeThorFatal(NULL, TE_UnknownException, "FATAL: Unknown exception thrown by ProcessThread"));
- }
- if (exception)
- fireException(exception);
- try { endProcess(); }
- catch (IException *_e)
- {
- ActPrintLog(_e, "Exception calling activity endProcess");
- IThorException *e = QUERYINTERFACE(_e, IThorException);
- if (e)
- exception.set(e);
- else
- exception.setown(MakeActivityException(this, _e));
- _e->Release();
- fireException(exception);
- }
- }
- bool ProcessSlaveActivity::wait(unsigned timeout)
- {
- if (!threaded.join(timeout))
- return false;
- if (exception.get())
- throw exception.getClear();
- return true;
- }
- void ProcessSlaveActivity::serializeStats(MemoryBuffer &mb)
- {
- #ifdef TIME_ACTIVITIES
- if (timeActivities)
- {
- unsigned __int64 curCycles = lastCycles;
- if (curCycles)
- {
- unsigned __int64 nowCycles = get_cycles_now();
- //Update lastCycles to the current number of cycles - unless it has been set to 0 in the meantime
- //Use std::memory_order_relaxed because there is no requirement for other variables to be synchronized.
- if (lastCycles.compare_exchange_strong(curCycles, nowCycles, std::memory_order_relaxed))
- slaveTimerStats.totalCycles += nowCycles-curCycles;
- }
- }
- #endif
- CSlaveActivity::serializeStats(mb);
- mb.append(processed);
- }
- void ProcessSlaveActivity::done()
- {
- CSlaveActivity::done();
- if (exception.get())
- throw exception.getClear();
- }
- #include "aggregate/thaggregateslave.ipp"
- #include "aggregate/thgroupaggregateslave.ipp"
- #include "apply/thapplyslave.ipp"
- #include "choosesets/thchoosesetsslave.ipp"
- #include "countproject/thcountprojectslave.ipp"
- #include "degroup/thdegroupslave.ipp"
- #include "diskread/thdiskreadslave.ipp"
- #include "diskwrite/thdwslave.ipp"
- #include "distribution/thdistributionslave.ipp"
- #include "enth/thenthslave.ipp"
- #include "fetch/thfetchslave.ipp"
- #include "filter/thfilterslave.ipp"
- #include "firstn/thfirstnslave.ipp"
- #include "funnel/thfunnelslave.ipp"
- #include "group/thgroupslave.ipp"
- #include "hashdistrib/thhashdistribslave.ipp"
- #include "indexread/thindexreadslave.ipp"
- #include "iterate/thgroupiterateslave.ipp"
- #include "iterate/thiterateslave.ipp"
- #include "join/thjoinslave.ipp"
- #include "keyedjoin/thkeyedjoinslave.ipp"
- #include "limit/thlimitslave.ipp"
- #include "merge/thmergeslave.ipp"
- #include "msort/thgroupsortslave.ipp"
- #include "msort/thmsortslave.ipp"
- #include "normalize/thnormalizeslave.ipp"
- #include "nsplitter/thnsplitterslave.ipp"
- #include "null/thnullslave.ipp"
- #include "nullaction/thnullactionslave.ipp"
- #include "parse/thparseslave.ipp"
- #include "piperead/thprslave.ipp"
- #include "pipewrite/thpwslave.ipp"
- #include "project/thprojectslave.ipp"
- #include "pull/thpullslave.ipp"
- #include "result/thresultslave.ipp"
- #include "rollup/throllupslave.ipp"
- #include "sample/thsampleslave.ipp"
- #include "selectnth/thselectnthslave.ipp"
- #include "selfjoin/thselfjoinslave.ipp"
- #include "spill/thspillslave.ipp"
- #include "soapcall/thsoapcallslave.ipp"
- #include "temptable/thtmptableslave.ipp"
- #include "topn/thtopnslave.ipp"
- #include "wuidread/thwuidreadslave.ipp"
- #include "wuidwrite/thwuidwriteslave.ipp"
- #include "xmlwrite/thxmlwriteslave.ipp"
- CActivityBase *createLookupJoinSlave(CGraphElementBase *container);
- CActivityBase *createAllJoinSlave(CGraphElementBase *container);
- CActivityBase *createXmlParseSlave(CGraphElementBase *container);
- CActivityBase *createKeyDiffSlave(CGraphElementBase *container);
- CActivityBase *createKeyPatchSlave(CGraphElementBase *container);
- CActivityBase *createCsvReadSlave(CGraphElementBase *container);
- CActivityBase *createXmlReadSlave(CGraphElementBase *container);
- CActivityBase *createIndexWriteSlave(CGraphElementBase *container);
- CActivityBase *createLoopSlave(CGraphElementBase *container);
- CActivityBase *createLocalResultReadSlave(CGraphElementBase *container);
- CActivityBase *createLocalResultWriteSlave(CGraphElementBase *container);
- CActivityBase *createLocalResultSpillSlave(CGraphElementBase *container);
- CActivityBase *createGraphLoopSlave(CGraphElementBase *container);
- CActivityBase *createGraphLoopResultReadSlave(CGraphElementBase *container);
- CActivityBase *createGraphLoopResultWriteSlave(CGraphElementBase *container);
- CActivityBase *createIfSlave(CGraphElementBase *container);
- CActivityBase *createCaseSlave(CGraphElementBase *container);
- CActivityBase *createCatchSlave(CGraphElementBase *container);
- CActivityBase *createChildNormalizeSlave(CGraphElementBase *container);
- CActivityBase *createChildAggregateSlave(CGraphElementBase *container);
- CActivityBase *createChildGroupAggregateSlave(CGraphElementBase *container);
- CActivityBase *createChildThroughNormalizeSlave(CGraphElementBase *container);
- CActivityBase *createWhenSlave(CGraphElementBase *container);
- CActivityBase *createDictionaryWorkunitWriteSlave(CGraphElementBase *container);
- CActivityBase *createDictionaryResultWriteSlave(CGraphElementBase *container);
- CActivityBase *createTraceSlave(CGraphElementBase *container);
- CActivityBase *createIfActionSlave(CGraphElementBase *container);
- CActivityBase *createExternalSlave(CGraphElementBase *container);
- CActivityBase *createExternalSinkSlave(CGraphElementBase *container);
- class CGenericSlaveGraphElement : public CSlaveGraphElement
- {
- bool wuidread2diskread; // master decides after interrogating result and sneaks in info before slave creates
- StringAttr wuidreadFilename;
- Owned<CActivityBase> nullActivity;
- CriticalSection nullActivityCs;
- public:
- CGenericSlaveGraphElement(CGraphBase &_owner, IPropertyTree &xgmml, CGraphBase *resultsGraph) : CSlaveGraphElement(_owner, xgmml, resultsGraph)
- {
- wuidread2diskread = false;
- switch (getKind())
- {
- case TAKnull:
- case TAKsimpleaction:
- case TAKsideeffect:
- nullAct = true;
- break;
- default:
- break;
- }
- }
- virtual void deserializeCreateContext(MemoryBuffer &mb)
- {
- CSlaveGraphElement::deserializeCreateContext(mb);
- if (TAKworkunitread == kind)
- {
- mb.read(wuidread2diskread); // have I been converted
- if (wuidread2diskread)
- mb.read(wuidreadFilename);
- }
- haveCreateCtx = true;
- }
- virtual CActivityBase *factory(ThorActivityKind kind)
- {
- CActivityBase *ret = NULL;
- switch (kind)
- {
- case TAKdiskread:
- case TAKspillread:
- ret = createDiskReadSlave(this);
- break;
- case TAKdisknormalize:
- ret = createDiskNormalizeSlave(this);
- break;
- case TAKdiskaggregate:
- ret = createDiskAggregateSlave(this);
- break;
- case TAKdiskcount:
- ret = createDiskCountSlave(this);
- break;
- case TAKdiskgroupaggregate:
- ret = createDiskGroupAggregateSlave(this);
- break;
- case TAKindexread:
- ret = createIndexReadSlave(this);
- break;
- case TAKindexcount:
- ret = createIndexCountSlave(this);
- break;
- case TAKindexnormalize:
- ret = createIndexNormalizeSlave(this);
- break;
- case TAKindexaggregate:
- ret = createIndexAggregateSlave(this);
- break;
- case TAKindexgroupaggregate:
- case TAKindexgroupexists:
- case TAKindexgroupcount:
- ret = createIndexGroupAggregateSlave(this);
- break;
- case TAKchildaggregate:
- ret = createChildAggregateSlave(this);
- break;
- case TAKchildgroupaggregate:
- ret = createChildGroupAggregateSlave(this);
- break;
- case TAKchildthroughnormalize:
- ret = createChildThroughNormalizeSlave(this);
- break;
- case TAKchildnormalize:
- ret = createChildNormalizeSlave(this);
- break;
- case TAKspill:
- ret = createSpillSlave(this);
- break;
- case TAKdiskwrite:
- case TAKspillwrite:
- ret = createDiskWriteSlave(this);
- break;
- case TAKsort:
- if (queryGrouped() || queryLocal())
- ret = createLocalSortSlave(this);
- else
- ret = createMSortSlave(this);
- break;
- case TAKsorted:
- ret = createSortedSlave(this);
- break;
- case TAKtrace:
- ret = createTraceSlave(this);
- break;
- case TAKdedup:
- if (queryGrouped())
- ret = createGroupDedupSlave(this);
- else if (queryLocal())
- ret = createLocalDedupSlave(this);
- else
- ret = createDedupSlave(this);
- break;
- case TAKrollupgroup:
- ret = createRollupGroupSlave(this);
- break;
- case TAKrollup:
- if (queryGrouped())
- ret = createGroupRollupSlave(this);
- else if (queryLocal())
- ret = createLocalRollupSlave(this);
- else
- ret = createRollupSlave(this);
- break;
- case TAKprocess:
- if (queryGrouped())
- ret = createGroupProcessSlave(this);
- else if (queryLocal())
- ret = createLocalProcessSlave(this);
- else
- ret = createProcessSlave(this);
- break;
- case TAKfilter:
- ret = createFilterSlave(this);
- break;
- case TAKfilterproject:
- ret = createFilterProjectSlave(this);
- break;
- case TAKfiltergroup:
- ret = createFilterGroupSlave(this);
- break;
- case TAKsplit:
- ret = createNSplitterSlave(this);
- break;
- case TAKproject:
- ret = createProjectSlave(this);
- break;
- case TAKprefetchproject:
- ret = createPrefetchProjectSlave(this);
- break;
- case TAKprefetchcountproject:
- break;
- case TAKiterate:
- if (queryGrouped())
- ret = createGroupIterateSlave(this);
- else if (queryLocal())
- ret = createLocalIterateSlave(this);
- else
- ret = createIterateSlave(this);
- break;
- case TAKaggregate:
- case TAKexistsaggregate:
- case TAKcountaggregate:
- if (queryLocalOrGrouped())
- ret = createGroupAggregateSlave(this);
- else
- ret = createAggregateSlave(this);
- break;
- case TAKhashaggregate:
- ret = createHashAggregateSlave(this);
- break;
- case TAKfirstn:
- ret = createFirstNSlave(this);
- break;
- case TAKsample:
- ret = createSampleSlave(this);
- break;
- case TAKdegroup:
- ret = createDegroupSlave(this);
- break;
- case TAKjoin:
- if (queryLocalOrGrouped())
- ret = createLocalJoinSlave(this);
- else
- ret = createJoinSlave(this);
- break;
- case TAKhashjoin:
- case TAKhashdenormalize:
- case TAKhashdenormalizegroup:
- ret = createHashJoinSlave(this);
- break;
- case TAKlookupjoin:
- case TAKlookupdenormalize:
- case TAKlookupdenormalizegroup:
- case TAKsmartjoin:
- case TAKsmartdenormalize:
- case TAKsmartdenormalizegroup:
- ret = createLookupJoinSlave(this);
- break;
- case TAKalljoin:
- case TAKalldenormalize:
- case TAKalldenormalizegroup:
- ret = createAllJoinSlave(this);
- break;
- case TAKselfjoin:
- if (queryLocalOrGrouped())
- ret = createLocalSelfJoinSlave(this);
- else
- ret = createSelfJoinSlave(this);
- break;
- case TAKselfjoinlight:
- ret = createLightweightSelfJoinSlave(this);
- break;
- case TAKkeyedjoin:
- case TAKkeyeddenormalize:
- case TAKkeyeddenormalizegroup:
- ret = createKeyedJoinSlave(this);
- break;
- case TAKgroup:
- ret = createGroupSlave(this);
- break;
- case TAKworkunitwrite:
- ret = createWorkUnitWriteSlave(this);
- break;
- case TAKdictionaryworkunitwrite:
- ret = createDictionaryWorkunitWriteSlave(this);
- break;
- case TAKdictionaryresultwrite:
- ret = createDictionaryResultWriteSlave(this);
- break;
- case TAKfunnel:
- ret = createFunnelSlave(this);
- break;
- case TAKcombine:
- ret = createCombineSlave(this);
- break;
- case TAKcombinegroup:
- ret = createCombineGroupSlave(this);
- break;
- case TAKregroup:
- ret = createRegroupSlave(this);
- break;
- case TAKapply:
- ret = createApplySlave(this);
- break;
- case TAKifaction:
- ret = createIfActionSlave(this);
- break;
- case TAKinlinetable:
- ret = createInlineTableSlave(this);
- break;
- case TAKkeyeddistribute:
- ret = createIndexDistributeSlave(this);
- break;
- case TAKhashdistribute:
- ret = createHashDistributeSlave(this);
- break;
- case TAKnwaydistribute:
- ret = createNWayDistributeSlave(this);
- break;
- case TAKdistributed:
- ret = createHashDistributedSlave(this);
- break;
- case TAKhashdistributemerge:
- ret = createHashDistributeMergeSlave(this);
- break;
- case TAKhashdedup:
- if (queryLocalOrGrouped())
- ret = createHashLocalDedupSlave(this);
- else
- ret = createHashDedupSlave(this);
- break;
- case TAKnormalize:
- ret = createNormalizeSlave(this);
- break;
- case TAKnormalizechild:
- ret = createNormalizeChildSlave(this);
- break;
- case TAKnormalizelinkedchild:
- ret = createNormalizeLinkedChildSlave(this);
- break;
- case TAKremoteresult:
- ret = createResultSlave(this);
- break;
- case TAKpull:
- ret = createPullSlave(this);
- break;
- case TAKdenormalize:
- case TAKdenormalizegroup:
- if (queryLocalOrGrouped())
- ret = createLocalDenormalizeSlave(this);
- else
- ret = createDenormalizeSlave(this);
- break;
- case TAKnwayinput:
- ret = createNWayInputSlave(this);
- break;
- case TAKnwayselect:
- ret = createNWaySelectSlave(this);
- break;
- case TAKnwaymerge:
- ret = createNWayMergeActivity(this);
- break;
- case TAKnwaymergejoin:
- case TAKnwayjoin:
- ret = createNWayMergeJoinActivity(this);
- break;
- case TAKchilddataset:
- UNIMPLEMENTED;
- case TAKchilditerator:
- ret = createChildIteratorSlave(this);
- break;
- case TAKlinkedrawiterator:
- ret = createLinkedRawIteratorSlave(this);
- break;
- case TAKselectn:
- if (queryLocalOrGrouped())
- ret = createLocalSelectNthSlave(this);
- else
- ret = createSelectNthSlave(this);
- break;
- case TAKenth:
- if (queryLocalOrGrouped())
- ret = createLocalEnthSlave(this);
- else
- ret = createEnthSlave(this);
- break;
- case TAKnull:
- ret = createNullSlave(this);
- break;
- case TAKdistribution:
- ret = createDistributionSlave(this);
- break;
- case TAKcountproject:
- if (queryLocalOrGrouped())
- ret = createLocalCountProjectSlave(this);
- else
- ret = createCountProjectSlave(this);
- break;
- case TAKchoosesets:
- if (queryLocalOrGrouped())
- ret = createLocalChooseSetsSlave(this);
- else
- ret = createChooseSetsSlave(this);
- break;
- case TAKpiperead:
- ret = createPipeReadSlave(this);
- break;
- case TAKpipewrite:
- ret = createPipeWriteSlave(this);
- break;
- case TAKcsvread:
- ret = createCsvReadSlave(this);
- break;
- case TAKcsvwrite:
- ret = createCsvWriteSlave(this);
- break;
- case TAKpipethrough:
- ret = createPipeThroughSlave(this);
- break;
- case TAKindexwrite:
- ret = createIndexWriteSlave(this);
- break;
- case TAKchoosesetsenth:
- ret = createChooseSetsEnthSlave(this);
- break;
- case TAKchoosesetslast:
- ret = createChooseSetsLastSlave(this);
- break;
- case TAKfetch:
- ret = createFetchSlave(this);
- break;
- case TAKcsvfetch:
- ret = createCsvFetchSlave(this);
- break;
- case TAKxmlfetch:
- case TAKjsonfetch:
- ret = createXmlFetchSlave(this);
- break;
- case TAKthroughaggregate:
- ret = createThroughAggregateSlave(this);
- break;
- case TAKwhen_dataset:
- ret = createWhenSlave(this);
- break;
- case TAKworkunitread:
- {
- if (wuidread2diskread)
- {
- Owned<IHThorDiskReadArg> diskReadHelper = createWorkUnitReadArg(wuidreadFilename, (IHThorWorkunitReadArg *)LINK(baseHelper));
- Owned<CActivityBase> retAct = createDiskReadSlave(this, diskReadHelper);
- return retAct.getClear();
- }
- else
- ret = createWuidReadSlave(this);
- break;
- }
- case TAKparse:
- ret = createParseSlave(this);
- break;
- case TAKsideeffect:
- ret = createNullActionSlave(this);
- break;
- case TAKsimpleaction:
- ret = createNullSlave(this);
- break;
- case TAKemptyaction:
- ret = createNullSinkSlave(this);
- break;
- case TAKtopn:
- if (queryGrouped())
- ret = createGroupedTopNSlave(this);
- else if (queryLocal())
- ret = createLocalTopNSlave(this);
- else
- ret = createGlobalTopNSlave(this);
- break;
- case TAKxmlparse:
- ret = createXmlParseSlave(this);
- break;
- case TAKxmlread:
- case TAKjsonread:
- ret = createXmlReadSlave(this);
- break;
- case TAKxmlwrite:
- case TAKjsonwrite:
- ret = createXmlWriteSlave(this, kind);
- break;
- case TAKmerge:
- if (queryLocalOrGrouped())
- ret = createLocalMergeSlave(this);
- else
- ret = createGlobalMergeSlave(this);
- break;
- case TAKsoap_rowdataset:
- ret = createSoapRowCallSlave(this);
- break;
- case TAKhttp_rowdataset:
- ret = createHttpRowCallSlave(this);
- break;
- case TAKsoap_rowaction:
- ret = createSoapRowActionSlave(this);
- break;
- case TAKsoap_datasetdataset:
- ret = createSoapDatasetCallSlave(this);
- break;
- case TAKsoap_datasetaction:
- ret = createSoapDatasetActionSlave(this);
- break;
- case TAKkeydiff:
- ret = createKeyDiffSlave(this);
- break;
- case TAKkeypatch:
- ret = createKeyPatchSlave(this);
- break;
- case TAKlimit:
- ret = createLimitSlave(this);
- break;
- case TAKskiplimit:
- ret = createSkipLimitSlave(this);
- break;
- case TAKcreaterowlimit:
- ret = createRowLimitSlave(this);
- break;
- case TAKnonempty:
- ret = createNonEmptySlave(this);
- break;
- case TAKlocalresultread:
- ret = createLocalResultReadSlave(this);
- break;
- case TAKlocalresultwrite:
- ret = createLocalResultWriteSlave(this);
- break;
- case TAKlocalresultspill:
- ret = createLocalResultSpillSlave(this);
- break;
- case TAKif:
- case TAKchildif:
- ret = createIfSlave(this);
- break;
- case TAKcase:
- case TAKchildcase:
- ret = createCaseSlave(this);
- break;
- case TAKcatch:
- case TAKskipcatch:
- case TAKcreaterowcatch:
- ret = createCatchSlave(this);
- break;
- case TAKlooprow:
- case TAKloopcount:
- case TAKloopdataset:
- ret = createLoopSlave(this);
- break;
- case TAKgraphloop:
- case TAKparallelgraphloop:
- ret = createGraphLoopSlave(this);
- break;
- case TAKgraphloopresultread:
- ret = createGraphLoopResultReadSlave(this);
- break;
- case TAKgraphloopresultwrite:
- ret = createGraphLoopResultWriteSlave(this);
- break;
- case TAKstreamediterator:
- ret = createStreamedIteratorSlave(this);
- break;
- case TAKexternalprocess:
- case TAKexternalsource:
- ret = createExternalSlave(this);
- break;
- case TAKexternalsink:
- ret = createExternalSinkSlave(this);
- break;
- default:
- throw MakeStringException(TE_UnsupportedActivityKind, "Unsupported activity kind: %s", activityKindStr(kind));
- }
- return ret;
- }
- };
- activityslaves_decl CGraphElementBase *createSlaveContainer(IPropertyTree &xgmml, CGraphBase &owner, CGraphBase *resultsGraph)
- {
- return new CGenericSlaveGraphElement(owner, xgmml, resultsGraph);
- }
- activityslaves_decl IThorRowInterfaces *queryRowInterfaces(IThorDataLink *link) { return link?link->queryFromActivity():NULL; }
- activityslaves_decl IEngineRowAllocator * queryRowAllocator(IThorDataLink *link) { CActivityBase *base = link?link->queryFromActivity():NULL; return base?base->queryRowAllocator():NULL; }
- activityslaves_decl IOutputRowSerializer * queryRowSerializer(IThorDataLink *link) { CActivityBase *base = link?link->queryFromActivity():NULL; return base?base->queryRowSerializer():NULL; }
- activityslaves_decl IOutputRowDeserializer * queryRowDeserializer(IThorDataLink *link) { CActivityBase *base = link?link->queryFromActivity():NULL; return base?base->queryRowDeserializer():NULL; }
- activityslaves_decl IOutputMetaData *queryRowMetaData(IThorDataLink *link) { CActivityBase *base = link?link->queryFromActivity():NULL; return base?base->queryRowMetaData():NULL; }
- activityslaves_decl unsigned queryActivityId(IThorDataLink *link) { CActivityBase *base = link?link->queryFromActivity():NULL; return base?base->queryId():0; }
- activityslaves_decl ICodeContext *queryCodeContext(IThorDataLink *link) { CActivityBase *base = link?link->queryFromActivity():NULL; return base?base->queryCodeContext():NULL; }
- void dummyProc() // to force static linking
- {
- }
|