1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201120212031204120512061207120812091210121112121213121412151216121712181219122012211222122312241225122612271228122912301231123212331234123512361237123812391240124112421243124412451246124712481249125012511252125312541255125612571258125912601261126212631264126512661267126812691270127112721273127412751276127712781279128012811282128312841285128612871288128912901291129212931294129512961297129812991300130113021303130413051306130713081309131013111312131313141315131613171318131913201321132213231324132513261327132813291330133113321333133413351336133713381339134013411342134313441345134613471348134913501351135213531354135513561357135813591360136113621363136413651366136713681369137013711372137313741375137613771378137913801381138213831384138513861387138813891390139113921393139413951396139713981399140014011402140314041405140614071408140914101411141214131414141514161417141814191420142114221423142414251426142714281429143014311432143314341435143614371438143914401441144214431444144514461447144814491450145114521453145414551456145714581459146014611462146314641465146614671468146914701471147214731474147514761477147814791480148114821483148414851486148714881489149014911492149314941495149614971498149915001501150215031504150515061507150815091510151115121513151415151516151715181519152015211522152315241525152615271528152915301531153215331534153515361537153815391540154115421543154415451546154715481549155015511552155315541555155615571558155915601561156215631564156515661567156815691570157115721573157415751576157715781579158015811582158315841585158615871588158915901591159215931594159515961597159815991600160116021603160416051606160716081609161016111612161316141615161616171618161916201621162216231624162516261627162816291630163116321633163416351636163716381639164016411642164316441645164616471648164916501651165216531654165516561657165816591660166116621663166416651666166716681669167016711672167316741675167616771678167916801681168216831684168516861687168816891690169116921693169416951696169716981699170017011702170317041705170617071708170917101711171217131714171517161717171817191720172117221723172417251726172717281729173017311732173317341735173617371738173917401741174217431744174517461747174817491750175117521753175417551756175717581759176017611762176317641765176617671768176917701771177217731774177517761777177817791780178117821783178417851786178717881789179017911792179317941795179617971798179918001801180218031804180518061807180818091810181118121813181418151816181718181819182018211822182318241825182618271828182918301831183218331834183518361837183818391840184118421843184418451846184718481849185018511852185318541855185618571858185918601861186218631864186518661867186818691870187118721873187418751876187718781879188018811882188318841885188618871888188918901891189218931894189518961897189818991900190119021903190419051906190719081909191019111912191319141915191619171918191919201921192219231924192519261927192819291930193119321933193419351936193719381939194019411942194319441945194619471948194919501951195219531954195519561957195819591960196119621963196419651966196719681969197019711972197319741975197619771978197919801981198219831984198519861987198819891990199119921993199419951996199719981999200020012002200320042005200620072008200920102011201220132014201520162017201820192020202120222023202420252026202720282029203020312032203320342035203620372038203920402041204220432044204520462047204820492050205120522053205420552056205720582059206020612062206320642065206620672068206920702071207220732074207520762077207820792080208120822083208420852086208720882089209020912092209320942095209620972098209921002101210221032104210521062107210821092110211121122113211421152116211721182119212021212122212321242125212621272128212921302131213221332134213521362137213821392140214121422143214421452146214721482149215021512152215321542155215621572158215921602161216221632164216521662167216821692170217121722173217421752176217721782179218021812182218321842185218621872188218921902191219221932194219521962197219821992200220122022203220422052206220722082209221022112212221322142215221622172218221922202221222222232224222522262227222822292230223122322233223422352236223722382239224022412242224322442245224622472248224922502251225222532254225522562257225822592260226122622263226422652266226722682269227022712272227322742275227622772278227922802281228222832284228522862287228822892290229122922293229422952296229722982299230023012302230323042305230623072308230923102311231223132314231523162317231823192320232123222323232423252326232723282329233023312332233323342335233623372338233923402341234223432344234523462347234823492350235123522353235423552356235723582359236023612362236323642365236623672368236923702371237223732374237523762377237823792380238123822383238423852386238723882389239023912392239323942395239623972398239924002401240224032404240524062407240824092410241124122413241424152416241724182419242024212422242324242425242624272428242924302431243224332434243524362437243824392440244124422443244424452446244724482449245024512452245324542455245624572458245924602461246224632464246524662467246824692470247124722473247424752476247724782479248024812482248324842485248624872488248924902491249224932494249524962497249824992500250125022503250425052506250725082509251025112512251325142515251625172518251925202521252225232524252525262527252825292530253125322533253425352536253725382539254025412542254325442545254625472548254925502551255225532554255525562557255825592560256125622563256425652566256725682569257025712572257325742575257625772578257925802581258225832584258525862587258825892590259125922593259425952596259725982599260026012602260326042605260626072608260926102611261226132614261526162617261826192620262126222623262426252626262726282629263026312632263326342635263626372638263926402641264226432644264526462647264826492650265126522653265426552656265726582659266026612662266326642665266626672668266926702671267226732674267526762677267826792680268126822683268426852686268726882689269026912692269326942695269626972698269927002701270227032704270527062707270827092710271127122713271427152716271727182719272027212722272327242725272627272728272927302731273227332734273527362737273827392740274127422743274427452746274727482749275027512752275327542755275627572758275927602761276227632764276527662767276827692770277127722773277427752776277727782779278027812782278327842785278627872788278927902791279227932794279527962797279827992800280128022803280428052806280728082809281028112812281328142815281628172818281928202821282228232824282528262827282828292830283128322833283428352836283728382839284028412842284328442845284628472848284928502851285228532854285528562857285828592860286128622863286428652866286728682869287028712872287328742875287628772878287928802881288228832884288528862887288828892890289128922893289428952896289728982899290029012902290329042905290629072908290929102911291229132914291529162917291829192920292129222923292429252926292729282929293029312932293329342935293629372938293929402941294229432944294529462947294829492950295129522953295429552956295729582959296029612962296329642965296629672968296929702971297229732974297529762977297829792980298129822983298429852986298729882989299029912992299329942995299629972998299930003001300230033004300530063007300830093010301130123013301430153016301730183019302030213022302330243025302630273028302930303031303230333034303530363037303830393040304130423043304430453046304730483049305030513052305330543055305630573058305930603061306230633064306530663067306830693070307130723073307430753076307730783079308030813082308330843085308630873088308930903091309230933094309530963097309830993100310131023103310431053106310731083109311031113112311331143115311631173118311931203121312231233124312531263127312831293130 |
- /*##############################################################################
- HPCC SYSTEMS software Copyright (C) 2012 HPCC Systems®.
- Licensed under the Apache License, Version 2.0 (the "License");
- you may not use this file except in compliance with the License.
- You may obtain a copy of the License at
- http://www.apache.org/licenses/LICENSE-2.0
- Unless required by applicable law or agreed to in writing, software
- distributed under the License is distributed on an "AS IS" BASIS,
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- See the License for the specific language governing permissions and
- limitations under the License.
- ############################################################################## */
- #include "thgraph.hpp"
- #include "jptree.hpp"
- #include "commonext.hpp"
- #include "dasess.hpp"
- #include "jhtree.hpp"
- #include "thcodectx.hpp"
- #include "thbuf.hpp"
- #include "thormisc.hpp"
- #include "thbufdef.hpp"
- #include "thmem.hpp"
- PointerArray createFuncs;
- void registerCreateFunc(CreateFunc func)
- {
- createFuncs.append((void *)func);
- }
- ///////////////////////////////////
-
- //////
- /////
- class CThorGraphResult : public CInterface, implements IThorResult, implements IRowWriter
- {
- CActivityBase &activity;
- rowcount_t rowStreamCount;
- IOutputMetaData *meta;
- Owned<IRowWriterMultiReader> rowBuffer;
- IRowInterfaces *rowIf;
- IEngineRowAllocator *allocator;
- bool stopped, readers, distributed;
- void init()
- {
- stopped = readers = false;
- allocator = rowIf->queryRowAllocator();
- meta = allocator->queryOutputMeta();
- rowStreamCount = 0;
- }
- class CStreamWriter : public CSimpleInterface, implements IRowWriterMultiReader
- {
- CThorGraphResult &owner;
- CThorExpandingRowArray rows;
- public:
- IMPLEMENT_IINTERFACE_USING(CSimpleInterface);
- CStreamWriter(CThorGraphResult &_owner) : owner(_owner), rows(owner.activity, owner.rowIf, true)
- {
- }
- //IRowWriterMultiReader
- virtual void putRow(const void *row)
- {
- rows.append(row);
- }
- virtual void flush() { }
- virtual IRowStream *getReader()
- {
- return rows.createRowStream(0, (rowidx_t)-1, false);
- }
- };
- public:
- IMPLEMENT_IINTERFACE;
- CThorGraphResult(CActivityBase &_activity, IRowInterfaces *_rowIf, bool _distributed, unsigned spillPriority) : activity(_activity), rowIf(_rowIf), distributed(_distributed)
- {
- init();
- if (SPILL_PRIORITY_DISABLE == spillPriority)
- rowBuffer.setown(new CStreamWriter(*this));
- else
- rowBuffer.setown(createOverflowableBuffer(activity, rowIf, true, true));
- }
- // IRowWriter
- virtual void putRow(const void *row)
- {
- assertex(!readers);
- ++rowStreamCount;
- rowBuffer->putRow(row);
- }
- virtual void flush() { }
- virtual offset_t getPosition() { UNIMPLEMENTED; return 0; }
- // IThorResult
- virtual IRowWriter *getWriter()
- {
- return LINK(this);
- }
- virtual void setResultStream(IRowWriterMultiReader *stream, rowcount_t count)
- {
- assertex(!readers);
- rowBuffer.setown(stream);
- rowStreamCount = count;
- }
- virtual IRowStream *getRowStream()
- {
- readers = true;
- return rowBuffer->getReader();
- }
- virtual IRowInterfaces *queryRowInterfaces() { return rowIf; }
- virtual CActivityBase *queryActivity() { return &activity; }
- virtual bool isDistributed() const { return distributed; }
- virtual void serialize(MemoryBuffer &mb)
- {
- Owned<IRowStream> stream = getRowStream();
- bool grouped = meta->isGrouped();
- if (grouped)
- {
- OwnedConstThorRow prev = stream->nextRow();
- if (prev)
- {
- bool eog;
- loop
- {
- eog = false;
- OwnedConstThorRow next = stream->nextRow();
- if (!next)
- eog = true;
- size32_t sz = meta->getRecordSize(prev);
- mb.append(sz, prev.get());
- mb.append(eog);
- if (!next)
- {
- next.setown(stream->nextRow());
- if (!next)
- break;
- }
- prev.set(next);
- }
- }
- }
- else
- {
- loop
- {
- OwnedConstThorRow row = stream->nextRow();
- if (!row)
- break;
- size32_t sz = meta->getRecordSize(row);
- mb.append(sz, row.get());
- }
- }
- }
- virtual void getResult(size32_t &len, void * & data)
- {
- MemoryBuffer mb;
- serialize(mb);
- len = mb.length();
- data = mb.detach();
- }
- virtual void getLinkedResult(unsigned &countResult, byte * * & result)
- {
- assertex(rowStreamCount==((unsigned)rowStreamCount)); // catch, just in case
- Owned<IRowStream> stream = getRowStream();
- countResult = 0;
- OwnedConstThorRow _rowset = allocator->createRowset((unsigned)rowStreamCount);
- const void **rowset = (const void **)_rowset.get();
- while (countResult < rowStreamCount)
- {
- OwnedConstThorRow row = stream->nextRow();
- rowset[countResult++] = row.getClear();
- }
- result = (byte **)_rowset.getClear();
- }
- virtual const void * getLinkedRowResult()
- {
- assertex(rowStreamCount==1); // catch, just in case
- Owned<IRowStream> stream = getRowStream();
- return stream->nextRow();
- }
- };
- /////
- IThorResult *CThorGraphResults::createResult(CActivityBase &activity, unsigned id, IRowInterfaces *rowIf, bool distributed, unsigned spillPriority)
- {
- Owned<IThorResult> result = ::createResult(activity, rowIf, distributed, spillPriority);
- setResult(id, result);
- return result;
- }
- /////
- IThorResult *createResult(CActivityBase &activity, IRowInterfaces *rowIf, bool distributed, unsigned spillPriority)
- {
- return new CThorGraphResult(activity, rowIf, distributed, spillPriority);
- }
- /////
- class CThorBoundLoopGraph : public CInterface, implements IThorBoundLoopGraph
- {
- CGraphBase *graph;
- activity_id activityId;
- Linked<IOutputMetaData> resultMeta;
- Owned<IOutputMetaData> counterMeta, loopAgainMeta;
- Owned<IRowInterfaces> resultRowIf, countRowIf, loopAgainRowIf;
- public:
- IMPLEMENT_IINTERFACE;
- CThorBoundLoopGraph(CGraphBase *_graph, IOutputMetaData * _resultMeta, unsigned _activityId) : graph(_graph), resultMeta(_resultMeta), activityId(_activityId)
- {
- counterMeta.setown(createFixedSizeMetaData(sizeof(thor_loop_counter_t)));
- loopAgainMeta.setown(createFixedSizeMetaData(sizeof(bool)));
- }
- virtual void prepareCounterResult(CActivityBase &activity, IThorGraphResults *results, unsigned loopCounter, unsigned pos)
- {
- if (!countRowIf)
- countRowIf.setown(createRowInterfaces(counterMeta, activityId, activity.queryCodeContext()));
- RtlDynamicRowBuilder counterRow(countRowIf->queryRowAllocator());
- thor_loop_counter_t * res = (thor_loop_counter_t *)counterRow.ensureCapacity(sizeof(thor_loop_counter_t),NULL);
- *res = loopCounter;
- OwnedConstThorRow counterRowFinal = counterRow.finalizeRowClear(sizeof(thor_loop_counter_t));
- IThorResult *counterResult = results->createResult(activity, pos, countRowIf, false, SPILL_PRIORITY_DISABLE);
- Owned<IRowWriter> counterResultWriter = counterResult->getWriter();
- counterResultWriter->putRow(counterRowFinal.getClear());
- }
- virtual void prepareLoopAgainResult(CActivityBase &activity, IThorGraphResults *results, unsigned pos)
- {
- if (!loopAgainRowIf)
- loopAgainRowIf.setown(createRowInterfaces(loopAgainMeta, activityId, activity.queryCodeContext()));
- activity.queryGraph().createResult(activity, pos, results, loopAgainRowIf, !activity.queryGraph().isLocalChild(), SPILL_PRIORITY_DISABLE);
- }
- virtual void prepareLoopResults(CActivityBase &activity, IThorGraphResults *results)
- {
- if (!resultRowIf)
- resultRowIf.setown(createRowInterfaces(resultMeta, activityId, activity.queryCodeContext()));
- IThorResult *loopResult = results->createResult(activity, 0, resultRowIf, !activity.queryGraph().isLocalChild()); // loop output
- IThorResult *inputResult = results->createResult(activity, 1, resultRowIf, !activity.queryGraph().isLocalChild()); // loop input
- }
- virtual void execute(CActivityBase &activity, unsigned counter, IThorGraphResults *results, IRowWriterMultiReader *inputStream, rowcount_t rowStreamCount, size32_t parentExtractSz, const byte *parentExtract)
- {
- if (counter)
- graph->setLoopCounter(counter);
- Owned<IThorResult> inputResult = results->getResult(1);
- if (inputStream)
- inputResult->setResultStream(inputStream, rowStreamCount);
- graph->executeChild(parentExtractSz, parentExtract, results, NULL);
- }
- virtual void execute(CActivityBase &activity, unsigned counter, IThorGraphResults *graphLoopResults, size32_t parentExtractSz, const byte *parentExtract)
- {
- Owned<IThorGraphResults> results = graph->createThorGraphResults(1);
- if (counter)
- {
- prepareCounterResult(activity, results, counter, 0);
- graph->setLoopCounter(counter);
- }
- try
- {
- graph->executeChild(parentExtractSz, parentExtract, results, graphLoopResults);
- }
- catch (IException *e)
- {
- IThorException *te = QUERYINTERFACE(e, IThorException);
- if (!te)
- {
- Owned<IThorException> e2 = MakeActivityException(&activity, e, "Exception running child graphs");
- e->Release();
- te = e2.getClear();
- }
- else if (!te->queryActivityId())
- setExceptionActivityInfo(activity.queryContainer(), te);
- try { graph->abort(te); }
- catch (IException *abortE)
- {
- Owned<IThorException> e2 = MakeActivityException(&activity, abortE, "Exception whilst aborting graph");
- abortE->Release();
- EXCLOG(e2, NULL);
- }
- graph->queryJobChannel().fireException(te);
- throw te;
- }
- }
- virtual CGraphBase *queryGraph() { return graph; }
- };
- IThorBoundLoopGraph *createBoundLoopGraph(CGraphBase *graph, IOutputMetaData *resultMeta, unsigned activityId)
- {
- return new CThorBoundLoopGraph(graph, resultMeta, activityId);
- }
- ///////////////////////////////////
- bool isDiskInput(ThorActivityKind kind)
- {
- switch (kind)
- {
- case TAKcsvread:
- case TAKxmlread:
- case TAKjsonread:
- case TAKdiskread:
- case TAKdisknormalize:
- case TAKdiskaggregate:
- case TAKdiskcount:
- case TAKdiskgroupaggregate:
- case TAKindexread:
- case TAKindexcount:
- case TAKindexnormalize:
- case TAKindexaggregate:
- case TAKindexgroupaggregate:
- case TAKindexgroupexists:
- case TAKindexgroupcount:
- return true;
- default:
- return false;
- }
- }
- void CIOConnection::connect(unsigned which, CActivityBase *destActivity)
- {
- destActivity->setInput(which, activity->queryActivity(true), index);
- }
- ///////////////////////////////////
- CGraphElementBase *createGraphElement(IPropertyTree &node, CGraphBase &owner, CGraphBase *resultsGraph)
- {
- CGraphElementBase *container = NULL;
- ForEachItemIn(m, createFuncs)
- {
- CreateFunc func = (CreateFunc)createFuncs.item(m);
- container = func(node, owner, resultsGraph);
- if (container) break;
- }
- if (NULL == container)
- {
- ThorActivityKind tak = (ThorActivityKind)node.getPropInt("att[@name=\"_kind\"]/@value", TAKnone);
- throw MakeStringException(TE_UnsupportedActivityKind, "Unsupported activity kind: %s", activityKindStr(tak));
- }
- container->setResultsGraph(resultsGraph);
- return container;
- }
- CGraphElementBase::CGraphElementBase(CGraphBase &_owner, IPropertyTree &_xgmml) : owner(&_owner)
- {
- xgmml.setown(createPTreeFromIPT(&_xgmml));
- eclText.set(xgmml->queryProp("att[@name=\"ecl\"]/@value"));
- id = xgmml->getPropInt("@id", 0);
- kind = (ThorActivityKind)xgmml->getPropInt("att[@name=\"_kind\"]/@value", TAKnone);
- sink = isActivitySink(kind);
- bool coLocal = xgmml->getPropBool("att[@name=\"coLocal\"]/@value", false);
- isLocalData = xgmml->getPropBool("att[@name=\"local\"]/@value", false); // local execute + local data access only
- isLocal = isLocalData || coLocal; // local execute
- isGrouped = xgmml->getPropBool("att[@name=\"grouped\"]/@value", false);
- resultsGraph = NULL;
- ownerId = xgmml->getPropInt("att[@name=\"_parentActivity\"]/@value", 0);
- onCreateCalled = onStartCalled = prepared = haveCreateCtx = haveStartCtx = nullAct = false;
- onlyUpdateIfChanged = xgmml->getPropBool("att[@name=\"_updateIfChanged\"]/@value", false);
- StringBuffer helperName("fAc");
- xgmml->getProp("@id", helperName);
- helperFactory = (EclHelperFactory) queryJob().queryDllEntry().getEntry(helperName.str());
- if (!helperFactory)
- throw makeOsExceptionV(GetLastError(), "Failed to load helper factory method: %s (dll handle = %p)", helperName.str(), queryJob().queryDllEntry().getInstance());
- alreadyUpdated = false;
- whichBranch = (unsigned)-1;
- whichBranchBitSet.setown(createThreadSafeBitSet());
- newWhichBranch = false;
- hasNullInput = false;
- log = true;
- sentActInitData.setown(createThreadSafeBitSet());
- }
- CGraphElementBase::~CGraphElementBase()
- {
- activity.clear();
- baseHelper.clear(); // clear before dll is unloaded
- }
- CJobBase &CGraphElementBase::queryJob() const
- {
- return owner->queryJob();
- }
- CJobChannel &CGraphElementBase::queryJobChannel() const
- {
- return owner->queryJobChannel();
- }
- IGraphTempHandler *CGraphElementBase::queryTempHandler() const
- {
- if (resultsGraph)
- return resultsGraph->queryTempHandler();
- else
- return queryJob().queryTempHandler();
- }
- void CGraphElementBase::releaseIOs()
- {
- loopGraph.clear();
- associatedChildGraphs.kill();
- if (activity)
- activity->releaseIOs();
- connectedInputs.kill();
- inputs.kill();
- outputs.kill();
- activity.clear();
- }
- void CGraphElementBase::addDependsOn(CGraphBase *graph, int controlId)
- {
- ForEachItemIn(i, dependsOn)
- {
- if (dependsOn.item(i).graph == graph)
- return;
- }
- dependsOn.append(*new CGraphDependency(graph, controlId));
- }
- IThorGraphIterator *CGraphElementBase::getAssociatedChildGraphs() const
- {
- return new CGraphArrayIterator(associatedChildGraphs);
- }
- StringBuffer &CGraphElementBase::getOpt(const char *prop, StringBuffer &out) const
- {
- VStringBuffer path("hint[@name=\"%s\"]/@value", prop);
- if (!queryXGMML().getProp(path.toLowerCase().str(), out))
- queryJob().getOpt(prop, out);
- return out;
- }
- bool CGraphElementBase::getOptBool(const char *prop, bool defVal) const
- {
- bool def = queryJob().getOptBool(prop, defVal);
- VStringBuffer path("hint[@name=\"%s\"]/@value", prop);
- return queryXGMML().getPropBool(path.toLowerCase().str(), def);
- }
- int CGraphElementBase::getOptInt(const char *prop, int defVal) const
- {
- int def = queryJob().getOptInt(prop, defVal);
- VStringBuffer path("hint[@name=\"%s\"]/@value", prop);
- return queryXGMML().getPropInt(path.toLowerCase().str(), def);
- }
- __int64 CGraphElementBase::getOptInt64(const char *prop, __int64 defVal) const
- {
- __int64 def = queryJob().getOptInt64(prop, defVal);
- VStringBuffer path("hint[@name=\"%s\"]/@value", prop);
- return queryXGMML().getPropInt64(path.toLowerCase().str(), def);
- }
- IThorGraphDependencyIterator *CGraphElementBase::getDependsIterator() const
- {
- return new ArrayIIteratorOf<const CGraphDependencyArray, CGraphDependency, IThorGraphDependencyIterator>(dependsOn);
- }
- void CGraphElementBase::reset()
- {
- onStartCalled = false;
- // prepared = false;
- if (activity)
- activity->reset();
- }
- void CGraphElementBase::ActPrintLog(const char *format, ...)
- {
- va_list args;
- va_start(args, format);
- ::ActPrintLogArgs(this, thorlog_null, MCdebugProgress, format, args);
- va_end(args);
- }
- void CGraphElementBase::ActPrintLog(IException *e, const char *format, ...)
- {
- va_list args;
- va_start(args, format);
- ::ActPrintLogArgs(this, e, thorlog_all, MCexception(e), format, args);
- va_end(args);
- }
- void CGraphElementBase::ActPrintLog(IException *e)
- {
- ActPrintLog(e, "%s", "");
- }
- void CGraphElementBase::abort(IException *e)
- {
- CActivityBase *activity = queryActivity();
- if (activity)
- activity->abort();
- Owned<IThorGraphIterator> graphIter = getAssociatedChildGraphs();
- ForEach (*graphIter)
- {
- graphIter->query().abort(e);
- }
- }
- void CGraphElementBase::doconnect()
- {
- ForEachItemIn(i, connectedInputs)
- {
- CIOConnection *io = connectedInputs.item(i);
- if (io)
- io->connect(i, queryActivity());
- }
- }
- void CGraphElementBase::clearConnections()
- {
- connectedInputs.kill();
- connectedOutputs.kill();
- if (activity)
- activity->clearConnections();
- }
- void CGraphElementBase::addInput(unsigned input, CGraphElementBase *inputAct, unsigned inputOutIdx)
- {
- while (inputs.ordinality()<=input) inputs.append(NULL);
- inputs.replace(new COwningSimpleIOConnection(LINK(inputAct), inputOutIdx), input);
- while (inputAct->outputs.ordinality()<=inputOutIdx) inputAct->outputs.append(NULL);
- inputAct->outputs.replace(new CIOConnection(this, input), inputOutIdx);
- }
- void CGraphElementBase::connectInput(unsigned input, CGraphElementBase *inputAct, unsigned inputOutIdx)
- {
- ActPrintLog("CONNECTING (id=%" ACTPF "d, idx=%d) to (id=%" ACTPF "d, idx=%d)", inputAct->queryId(), inputOutIdx, queryId(), input);
- while (connectedInputs.ordinality()<=input) connectedInputs.append(NULL);
- connectedInputs.replace(new COwningSimpleIOConnection(LINK(inputAct), inputOutIdx), input);
- while (inputAct->connectedOutputs.ordinality()<=inputOutIdx) inputAct->connectedOutputs.append(NULL);
- inputAct->connectedOutputs.replace(new CIOConnection(this, input), inputOutIdx);
- }
- void CGraphElementBase::addAssociatedChildGraph(CGraphBase *childGraph)
- {
- if (!associatedChildGraphs.contains(*childGraph))
- associatedChildGraphs.append(*LINK(childGraph));
- }
- void CGraphElementBase::serializeCreateContext(MemoryBuffer &mb)
- {
- if (!onCreateCalled) return;
- DelayedSizeMarker sizeMark(mb);
- queryHelper()->serializeCreateContext(mb);
- sizeMark.write();
- }
- void CGraphElementBase::serializeStartContext(MemoryBuffer &mb)
- {
- if (!onStartCalled) return;
- DelayedSizeMarker sizeMark(mb);
- queryHelper()->serializeStartContext(mb);
- sizeMark.write();
- }
- void CGraphElementBase::deserializeCreateContext(MemoryBuffer &mb)
- {
- size32_t createCtxLen;
- mb.read(createCtxLen);
- createCtxMb.clear().append(createCtxLen, mb.readDirect(createCtxLen));
- haveCreateCtx = true;
- }
- void CGraphElementBase::deserializeStartContext(MemoryBuffer &mb)
- {
- size32_t startCtxLen;
- mb.read(startCtxLen);
- startCtxMb.append(startCtxLen, mb.readDirect(startCtxLen));
- haveStartCtx = true;
- onStartCalled = false; // allow to be called again
- }
- void CGraphElementBase::onCreate()
- {
- CriticalBlock b(crit);
- if (onCreateCalled)
- return;
- onCreateCalled = true;
- baseHelper.setown(helperFactory());
- if (!nullAct)
- {
- CGraphElementBase *ownerActivity = owner->queryOwner() ? owner->queryOwner()->queryElement(ownerId) : NULL;
- if (ownerActivity)
- {
- ownerActivity->onCreate(); // ensure owner created, might not be if this is child query inside another child query.
- baseHelper->onCreate(queryCodeContext(), ownerActivity->queryHelper(), haveCreateCtx?&createCtxMb:NULL);
- }
- else
- baseHelper->onCreate(queryCodeContext(), NULL, haveCreateCtx?&createCtxMb:NULL);
- }
- }
- void CGraphElementBase::onStart(size32_t parentExtractSz, const byte *parentExtract)
- {
- if (onStartCalled)
- return;
- onStartCalled = true;
- if (!nullAct)
- {
- if (haveStartCtx)
- {
- baseHelper->onStart(parentExtract, &startCtxMb);
- startCtxMb.reset();
- haveStartCtx = false;
- }
- else
- baseHelper->onStart(parentExtract, NULL);
- }
- }
- bool CGraphElementBase::executeDependencies(size32_t parentExtractSz, const byte *parentExtract, int controlId, bool async)
- {
- Owned<IThorGraphDependencyIterator> deps = getDependsIterator();
- ForEach(*deps)
- {
- CGraphDependency &dep = deps->query();
- if (dep.controlId == controlId)
- dep.graph->execute(parentExtractSz, parentExtract, true, async);
- if (owner->queryJob().queryAborted() || owner->queryAborted()) return false;
- }
- return true;
- }
- bool CGraphElementBase::prepareContext(size32_t parentExtractSz, const byte *parentExtract, bool checkDependencies, bool shortCircuit, bool async)
- {
- try
- {
- bool _shortCircuit = shortCircuit;
- Owned<IThorGraphDependencyIterator> deps = getDependsIterator();
- bool depsDone = true;
- ForEach(*deps)
- {
- CGraphDependency &dep = deps->query();
- if (0 == dep.controlId && NotFound == owner->dependentSubGraphs.find(*dep.graph))
- {
- owner->dependentSubGraphs.append(*dep.graph);
- if (!dep.graph->isComplete())
- depsDone = false;
- }
- }
- if (depsDone) _shortCircuit = false;
- if (!depsDone && checkDependencies)
- {
- if (!executeDependencies(parentExtractSz, parentExtract, 0, async))
- return false;
- }
- whichBranch = (unsigned)-1;
- hasNullInput = false;
- alreadyUpdated = false;
- switch (getKind())
- {
- case TAKindexwrite:
- case TAKdiskwrite:
- case TAKcsvwrite:
- case TAKxmlwrite:
- case TAKjsonwrite:
- if (_shortCircuit) return true;
- onCreate();
- alreadyUpdated = checkUpdate();
- if (alreadyUpdated)
- return false;
- break;
- case TAKchildif:
- owner->ifs.append(*this);
- // fall through
- case TAKif:
- case TAKifaction:
- {
- if (_shortCircuit) return true;
- onCreate();
- onStart(parentExtractSz, parentExtract);
- IHThorIfArg *helper = (IHThorIfArg *)baseHelper.get();
- whichBranch = helper->getCondition() ? 0 : 1; // True argument precedes false...
- /* NB - The executeDependencies code below is only needed if actionLinkInNewGraph=true, which is no longer the default
- * It should be removed, once we are positive there are no issues with in-line conditional actions
- */
- if (TAKifaction == getKind())
- {
- if (!executeDependencies(parentExtractSz, parentExtract, whichBranch+1, async)) //NB whenId 1 based
- return false;
- }
- if (inputs.queryItem(whichBranch))
- {
- if (!whichBranchBitSet->testSet(whichBranch)) // if not set, new
- newWhichBranch = true;
- return inputs.item(whichBranch)->activity->prepareContext(parentExtractSz, parentExtract, checkDependencies, false, async);
- }
- return true;
- }
- case TAKchildcase:
- owner->ifs.append(*this);
- // fall through
- case TAKcase:
- {
- if (_shortCircuit) return true;
- onCreate();
- onStart(parentExtractSz, parentExtract);
- IHThorCaseArg *helper = (IHThorCaseArg *)baseHelper.get();
- whichBranch = helper->getBranch();
- if (whichBranch >= inputs.ordinality())
- whichBranch = inputs.ordinality()-1;
- if (inputs.queryItem(whichBranch))
- return inputs.item(whichBranch)->activity->prepareContext(parentExtractSz, parentExtract, checkDependencies, false, async);
- return true;
- }
- case TAKfilter:
- case TAKfiltergroup:
- case TAKfilterproject:
- {
- if (_shortCircuit) return true;
- onCreate();
- onStart(parentExtractSz, parentExtract);
- switch (getKind())
- {
- case TAKfilter:
- hasNullInput = !((IHThorFilterArg *)baseHelper.get())->canMatchAny();
- break;
- case TAKfiltergroup:
- hasNullInput = !((IHThorFilterGroupArg *)baseHelper.get())->canMatchAny();
- break;
- case TAKfilterproject:
- hasNullInput = !((IHThorFilterProjectArg *)baseHelper.get())->canMatchAny();
- break;
- }
- if (hasNullInput)
- return true;
- break;
- }
- case TAKsequential:
- case TAKparallel:
- {
- /* NB - The executeDependencies code below is only needed if actionLinkInNewGraph=true, which is no longer the default
- * It should be removed, once we are positive there are no issues with in-line sequential/parallel activities
- */
- for (unsigned s=1; s<=dependsOn.ordinality(); s++)
- {
- if (!executeDependencies(parentExtractSz, parentExtract, s, async))
- return false;
- }
- break;
- }
- case TAKwhen_dataset:
- case TAKwhen_action:
- {
- if (!executeDependencies(parentExtractSz, parentExtract, WhenBeforeId, async))
- return false;
- if (!executeDependencies(parentExtractSz, parentExtract, WhenParallelId, async))
- return false;
- break;
- }
- }
- ForEachItemIn(i, inputs)
- {
- CGraphElementBase *input = inputs.item(i)->activity;
- if (!input->prepareContext(parentExtractSz, parentExtract, checkDependencies, shortCircuit, async))
- return false;
- }
- return true;
- }
- catch (IException *_e)
- {
- IThorException *e = QUERYINTERFACE(_e, IThorException);
- if (e)
- {
- if (!e->queryActivityId())
- setExceptionActivityInfo(*this, e);
- }
- else
- {
- e = MakeActivityException(this, _e);
- _e->Release();
- }
- throw e;
- }
- }
- void CGraphElementBase::preStart(size32_t parentExtractSz, const byte *parentExtract)
- {
- activity->preStart(parentExtractSz, parentExtract);
- }
- void CGraphElementBase::initActivity()
- {
- CriticalBlock b(crit);
- if (isSink())
- owner->addActiveSink(*this);
- if (activity) // no need to recreate
- return;
- activity.setown(factory());
- if (isLoopActivity(*this))
- {
- unsigned loopId = queryXGMML().getPropInt("att[@name=\"_loopid\"]/@value");
- Owned<CGraphBase> childGraph = owner->getChildGraph(loopId);
- Owned<IThorBoundLoopGraph> boundLoopGraph = createBoundLoopGraph(childGraph, baseHelper->queryOutputMeta(), queryId());
- setBoundGraph(boundLoopGraph);
- }
- }
- void CGraphElementBase::createActivity(size32_t parentExtractSz, const byte *parentExtract)
- {
- if (connectedInputs.ordinality()) // ensure not traversed twice (e.g. via splitter)
- return;
- try
- {
- switch (getKind())
- {
- case TAKchildif:
- case TAKchildcase:
- {
- if (inputs.queryItem(whichBranch))
- {
- CGraphElementBase *input = inputs.item(whichBranch)->activity;
- input->createActivity(parentExtractSz, parentExtract);
- }
- onCreate();
- initActivity();
- if (inputs.queryItem(whichBranch))
- {
- CIOConnection *inputIO = inputs.item(whichBranch);
- connectInput(whichBranch, inputIO->activity, inputIO->index);
- }
- break;
- }
- case TAKif:
- case TAKcase:
- if (inputs.queryItem(whichBranch))
- {
- CGraphElementBase *input = inputs.item(whichBranch)->activity;
- input->createActivity(parentExtractSz, parentExtract);
- }
- else
- {
- onCreate();
- if (!activity)
- factorySet(TAKnull);
- }
- break;
- case TAKifaction:
- if (inputs.queryItem(whichBranch))
- {
- CGraphElementBase *input = inputs.item(whichBranch)->activity;
- input->createActivity(parentExtractSz, parentExtract);
- }
- break;
- case TAKsequential:
- case TAKparallel:
- {
- ForEachItemIn(i, inputs)
- {
- if (inputs.queryItem(i))
- {
- CGraphElementBase *input = inputs.item(i)->activity;
- input->createActivity(parentExtractSz, parentExtract);
- }
- }
- break;
- }
- default:
- if (!hasNullInput)
- {
- ForEachItemIn(i, inputs)
- {
- CGraphElementBase *input = inputs.item(i)->activity;
- input->createActivity(parentExtractSz, parentExtract);
- }
- onCreate();
- if (isDiskInput(getKind()))
- onStart(parentExtractSz, parentExtract);
- ForEachItemIn(i2, inputs)
- {
- CIOConnection *inputIO = inputs.item(i2);
- loop
- {
- CGraphElementBase *input = inputIO->activity;
- switch (input->getKind())
- {
- case TAKif:
- case TAKcase:
- {
- if (input->whichBranch >= input->getInputs()) // if, will have TAKnull activity, made at create time.
- {
- input = NULL;
- break;
- }
- inputIO = input->inputs.item(input->whichBranch);
- assertex(inputIO);
- break;
- }
- default:
- input = NULL;
- break;
- }
- if (!input)
- break;
- }
- connectInput(i2, inputIO->activity, inputIO->index);
- }
- }
- initActivity();
- break;
- }
- }
- catch (IException *e) { ActPrintLog(e); activity.clear(); throw; }
- }
- ICodeContext *CGraphElementBase::queryCodeContext()
- {
- return queryOwner().queryCodeContext();
- }
- /////
- // JCSMORE loop - probably need better way to check if any act in graph is global(meaning needs some synchronization between slaves in activity execution)
- bool isGlobalActivity(CGraphElementBase &container)
- {
- switch (container.getKind())
- {
- // always global, but only co-ordinate init/done
- case TAKcsvwrite:
- case TAKxmlwrite:
- case TAKjsonwrite:
- case TAKindexwrite:
- case TAKkeydiff:
- case TAKkeypatch:
- case TAKdictionaryworkunitwrite:
- return true;
- case TAKdiskwrite:
- {
- Owned<IHThorDiskWriteArg> helper = (IHThorDiskWriteArg *)container.helperFactory();
- unsigned flags = helper->getFlags();
- return (0 == (TDXtemporary & flags)); // global if not temporary
- }
- case TAKspill:
- return false;
- case TAKcsvread:
- {
- Owned<IHThorCsvReadArg> helper = (IHThorCsvReadArg *)container.helperFactory();
- // if header lines, then [may] need to co-ordinate across slaves
- if (container.queryOwner().queryOwner() && (!container.queryOwner().isGlobal())) // I am in a child query
- return false;
- return helper->queryCsvParameters()->queryHeaderLen() > 0;
- }
- // dependent on child acts?
- case TAKlooprow:
- case TAKloopcount:
- case TAKgraphloop:
- case TAKparallelgraphloop:
- case TAKloopdataset:
- return false;
- // dependent on local/grouped
- case TAKkeyeddistribute:
- case TAKhashdistribute:
- case TAKhashdistributemerge:
- case TAKworkunitwrite:
- case TAKdistribution:
- case TAKpartition:
- case TAKdiskaggregate:
- case TAKdiskcount:
- case TAKdiskgroupaggregate:
- case TAKindexaggregate:
- case TAKindexcount:
- case TAKindexgroupaggregate:
- case TAKindexgroupexists:
- case TAKindexgroupcount:
- case TAKremoteresult:
- case TAKcountproject:
- case TAKcreaterowlimit:
- case TAKskiplimit:
- case TAKlimit:
- case TAKsort:
- case TAKdedup:
- case TAKjoin:
- case TAKselfjoin:
- case TAKhashjoin:
- case TAKsmartjoin:
- case TAKkeyeddenormalize:
- case TAKhashdenormalize:
- case TAKdenormalize:
- case TAKlookupdenormalize: //GH->JCS why are these here, and join not?
- case TAKalldenormalize:
- case TAKsmartdenormalize:
- case TAKdenormalizegroup:
- case TAKhashdenormalizegroup:
- case TAKlookupdenormalizegroup:
- case TAKkeyeddenormalizegroup:
- case TAKalldenormalizegroup:
- case TAKsmartdenormalizegroup:
- case TAKaggregate:
- case TAKexistsaggregate:
- case TAKcountaggregate:
- case TAKhashaggregate:
- case TAKhashdedup:
- case TAKrollup:
- case TAKiterate:
- case TAKselectn:
- case TAKfirstn:
- case TAKenth:
- case TAKsample:
- case TAKgroup:
- case TAKchoosesets:
- case TAKchoosesetsenth:
- case TAKchoosesetslast:
- case TAKtopn:
- case TAKprocess:
- case TAKchildcount:
- case TAKwhen_dataset:
- case TAKwhen_action:
- case TAKnonempty:
- if (!container.queryLocalOrGrouped())
- return true;
- break;
- case TAKkeyedjoin:
- case TAKalljoin:
- case TAKlookupjoin:
- if (!container.queryLocal())
- return true;
- // always local
- case TAKfilter:
- case TAKfilterproject:
- case TAKfiltergroup:
- case TAKsplit:
- case TAKpipewrite:
- case TAKdegroup:
- case TAKproject:
- case TAKprefetchproject:
- case TAKprefetchcountproject:
- case TAKnormalize:
- case TAKnormalizechild:
- case TAKnormalizelinkedchild:
- case TAKpipethrough:
- case TAKif:
- case TAKchildif:
- case TAKchildcase:
- case TAKcase:
- case TAKparse:
- case TAKpiperead:
- case TAKxmlparse:
- case TAKjoinlight:
- case TAKselfjoinlight:
- case TAKdiskread:
- case TAKdisknormalize:
- case TAKchildaggregate:
- case TAKchildgroupaggregate:
- case TAKchildthroughnormalize:
- case TAKchildnormalize:
- case TAKindexread:
- case TAKindexnormalize:
- case TAKxmlread:
- case TAKjsonread:
- case TAKdiskexists:
- case TAKindexexists:
- case TAKchildexists:
- case TAKthroughaggregate:
- case TAKmerge:
- case TAKfunnel:
- case TAKregroup:
- case TAKcombine:
- case TAKrollupgroup:
- case TAKcombinegroup:
- case TAKsoap_rowdataset:
- case TAKhttp_rowdataset:
- case TAKsoap_rowaction:
- case TAKsoap_datasetdataset:
- case TAKsoap_datasetaction:
- case TAKlinkedrawiterator:
- case TAKchilditerator:
- case TAKstreamediterator:
- case TAKworkunitread:
- case TAKchilddataset:
- case TAKinlinetable:
- case TAKnull:
- case TAKemptyaction:
- case TAKlocalresultread:
- case TAKlocalresultwrite:
- case TAKdictionaryresultwrite:
- case TAKgraphloopresultread:
- case TAKgraphloopresultwrite:
- case TAKnwaygraphloopresultread:
- case TAKapply:
- case TAKsideeffect:
- case TAKsimpleaction:
- case TAKsorted:
- case TAKdistributed:
- case TAKtrace:
- break;
- case TAKnwayjoin:
- case TAKnwaymerge:
- case TAKnwaymergejoin:
- case TAKnwayinput:
- case TAKnwayselect:
- return false; // JCSMORE - I think and/or have to be for now
- // undefined
- case TAKdatasetresult:
- case TAKrowresult:
- case TAKremotegraph:
- case TAKlibrarycall:
- default:
- return true; // if in doubt
- }
- return false;
- }
- bool isLoopActivity(CGraphElementBase &container)
- {
- switch (container.getKind())
- {
- case TAKlooprow:
- case TAKloopcount:
- case TAKloopdataset:
- case TAKgraphloop:
- case TAKparallelgraphloop:
- return true;
- }
- return false;
- }
- /////
- CGraphBase::CGraphBase(CJobChannel &_jobChannel) : jobChannel(_jobChannel), job(_jobChannel.queryJob())
- {
- xgmml = NULL;
- parent = owner = NULL;
- graphId = 0;
- complete = false;
- reinit = false; // should graph reinitialize each time it is called (e.g. in loop graphs)
- // This is currently for 'init' (Create time) info and onStart into
- sentInitData = false;
- // sentStartCtx = false;
- sentStartCtx = true; // JCSMORE - disable for now
- parentActivityId = 0;
- created = connected = started = graphDone = aborted = prepared = false;
- startBarrier = waitBarrier = doneBarrier = NULL;
- mpTag = waitBarrierTag = startBarrierTag = doneBarrierTag = TAG_NULL;
- executeReplyTag = TAG_NULL;
- parentExtractSz = 0;
- counter = 0; // loop/graph counter, will be set by loop/graph activity if needed
- }
- CGraphBase::~CGraphBase()
- {
- clean();
- }
- void CGraphBase::clean()
- {
- ::Release(startBarrier);
- ::Release(waitBarrier);
- ::Release(doneBarrier);
- localResults.clear();
- graphLoopResults.clear();
- childGraphsTable.kill();
- childGraphs.kill();
- disconnectActivities();
- containers.kill();
- sinks.kill();
- activeSinks.kill();
- }
- void CGraphBase::serializeCreateContexts(MemoryBuffer &mb)
- {
- DelayedSizeMarker sizeMark(mb);
- Owned<IThorActivityIterator> iter = getIterator();
- ForEach (*iter)
- {
- CGraphElementBase &element = iter->query();
- if (element.isOnCreated())
- {
- mb.append(element.queryId());
- element.serializeCreateContext(mb);
- }
- }
- mb.append((activity_id)0);
- sizeMark.write();
- }
- void CGraphBase::serializeStartContexts(MemoryBuffer &mb)
- {
- DelayedSizeMarker sizeMark(mb);
- Owned<IThorActivityIterator> iter = getIterator();
- ForEach (*iter)
- {
- CGraphElementBase &element = iter->query();
- if (element.isOnStarted())
- {
- mb.append(element.queryId());
- element.serializeStartContext(mb);
- }
- }
- mb.append((activity_id)0);
- sizeMark.write();
- }
- void CGraphBase::deserializeCreateContexts(MemoryBuffer &mb)
- {
- activity_id id;
- loop
- {
- mb.read(id);
- if (0 == id) break;
- CGraphElementBase *element = queryElement(id);
- assertex(element);
- element->deserializeCreateContext(mb);
- }
- }
- void CGraphBase::deserializeStartContexts(MemoryBuffer &mb)
- {
- activity_id id;
- loop
- {
- mb.read(id);
- if (0 == id) break;
- CGraphElementBase *element = queryElement(id);
- assertex(element);
- element->deserializeStartContext(mb);
- }
- }
- void CGraphBase::reset()
- {
- setCompleteEx(false);
- graphCancelHandler.reset();
- if (0 == containers.count())
- {
- SuperHashIteratorOf<CGraphBase> iter(childGraphsTable);
- ForEach(iter)
- iter.query().reset();
- }
- else
- {
- Owned<IThorActivityIterator> iter = getIterator();
- ForEach(*iter)
- {
- CGraphElementBase &element = iter->query();
- element.reset();
- }
- dependentSubGraphs.kill();
- }
- if (!queryOwner() || isGlobal())
- jobChannel.queryTimeReporter().reset();
- if (!queryOwner())
- clearNodeStats();
- }
- void CGraphBase::addChildGraph(CGraphBase &graph)
- {
- CriticalBlock b(crit);
- childGraphs.append(graph);
- childGraphsTable.replace(graph);
- jobChannel.associateGraph(graph);
- }
- IThorGraphIterator *CGraphBase::getChildGraphs() const
- {
- CriticalBlock b(crit);
- return new CGraphArrayCopyIterator(childGraphs);
- }
- bool CGraphBase::fireException(IException *e)
- {
- return queryJobChannel().fireException(e);
- }
- bool CGraphBase::preStart(size32_t parentExtractSz, const byte *parentExtract)
- {
- Owned<IThorActivityIterator> iter = getConnectedIterator();
- ForEach(*iter)
- {
- CGraphElementBase &element = iter->query();
- element.preStart(parentExtractSz, parentExtract);
- }
- return true;
- }
- void CGraphBase::executeSubGraph(size32_t parentExtractSz, const byte *parentExtract)
- {
- CriticalBlock b(executeCrit);
- if (job.queryPausing())
- return;
- Owned<IException> exception;
- try
- {
- if (!prepare(parentExtractSz, parentExtract, false, false, false))
- {
- setCompleteEx();
- return;
- }
- try
- {
- if (!queryOwner())
- {
- StringBuffer s;
- toXML(&queryXGMML(), s, 2);
- GraphPrintLog("Running graph [%s] : %s", isGlobal()?"global":"local", s.str());
- }
- create(parentExtractSz, parentExtract);
- }
- catch (IException *e)
- {
- Owned<IThorException> e2 = MakeGraphException(this, e);
- e2->setAction(tea_abort);
- queryJobChannel().fireException(e2);
- throw;
- }
- if (localResults)
- localResults->clear();
- doExecute(parentExtractSz, parentExtract, false);
- }
- catch (IException *e)
- {
- GraphPrintLog(e);
- exception.setown(e);
- }
- if (!queryOwner())
- {
- GraphPrintLog("Graph Done");
- StringBuffer memStr;
- getSystemTraceInfo(memStr, PerfMonStandard | PerfMonExtended);
- GraphPrintLog("%s", memStr.str());
- }
- if (exception)
- throw exception.getClear();
- }
- void CGraphBase::execute(size32_t _parentExtractSz, const byte *parentExtract, bool checkDependencies, bool async)
- {
- parentExtractSz = _parentExtractSz;
- if (isComplete())
- return;
- if (async)
- queryJobChannel().startGraph(*this, queryJobChannel(), checkDependencies, parentExtractSz, parentExtract); // may block if enough running
- else
- {
- if (!prepare(parentExtractSz, parentExtract, checkDependencies, async, async))
- {
- setComplete();
- return;
- }
- executeSubGraph(parentExtractSz, parentExtract);
- }
- }
- void CGraphBase::doExecute(size32_t parentExtractSz, const byte *parentExtract, bool checkDependencies)
- {
- if (isComplete()) return;
- if (queryAborted())
- {
- if (abortException)
- throw abortException.getLink();
- throw MakeGraphException(this, 0, "subgraph aborted(1)");
- }
- if (!prepare(parentExtractSz, parentExtract, checkDependencies, false, false))
- {
- setComplete();
- return;
- }
- if (queryAborted())
- {
- if (abortException)
- throw abortException.getLink();
- throw MakeGraphException(this, 0, "subgraph aborted(2)");
- }
- Owned<IException> exception;
- try
- {
- if (started)
- reset();
- Owned<IThorActivityIterator> iter = getConnectedIterator();
- ForEach(*iter)
- {
- CGraphElementBase &element = iter->query();
- element.onStart(parentExtractSz, parentExtract);
- }
- if (!preStart(parentExtractSz, parentExtract)) return;
- start();
- if (!wait(aborted?MEDIUMTIMEOUT:INFINITE)) // can't wait indefinitely, query may have aborted and stall, but prudent to wait a short time for underlying graphs to unwind.
- GraphPrintLogEx(this, thorlog_null, MCuserWarning, "Graph wait cancelled, aborted=%s", aborted?"true":"false");
- else
- graphDone = true;
- }
- catch (IException *e)
- {
- GraphPrintLog(e);
- exception.setown(e);
- }
- try
- {
- if (!exception && abortException)
- exception.setown(abortException.getClear());
- if (exception)
- {
- if (NULL == owner || isGlobal())
- waitBarrier->cancel(exception);
- if (!queryOwner())
- {
- StringBuffer str;
- Owned<IThorException> e = MakeGraphException(this, exception->errorCode(), "%s", exception->errorMessage(str).str());
- e->setAction(tea_abort);
- fireException(e);
- }
- }
- }
- catch (IException *e)
- {
- GraphPrintLog(e, "during abort()");
- e->Release();
- }
- try
- {
- done();
- if (doneBarrier)
- doneBarrier->wait(false);
- }
- catch (IException *e)
- {
- GraphPrintLog(e);
- if (!exception.get())
- exception.setown(e);
- else
- e->Release();
- }
- end();
- if (exception)
- throw exception.getClear();
- if (!queryAborted())
- setComplete();
- }
- bool CGraphBase::prepare(size32_t parentExtractSz, const byte *parentExtract, bool checkDependencies, bool shortCircuit, bool async)
- {
- if (isComplete()) return false;
- bool needToExecute = false;
- ifs.kill();
- ForEachItemIn(s, sinks)
- {
- CGraphElementBase &sink = sinks.item(s);
- if (sink.prepareContext(parentExtractSz, parentExtract, checkDependencies, shortCircuit, async))
- needToExecute = true;
- }
- // prepared = true;
- return needToExecute;
- }
- void CGraphBase::create(size32_t parentExtractSz, const byte *parentExtract)
- {
- Owned<IThorActivityIterator> iter = getIterator();
- ForEach(*iter)
- {
- CGraphElementBase &element = iter->query();
- element.clearConnections();
- }
- activeSinks.kill(); // NB: activeSinks are added to during activity creation
- ForEachItemIn(s, sinks)
- {
- CGraphElementBase &sink = sinks.item(s);
- sink.createActivity(parentExtractSz, parentExtract);
- }
- created = true;
- }
- void CGraphBase::done()
- {
- if (aborted) return; // activity done methods only called on success
- Owned<IThorActivityIterator> iter = getConnectedIterator();
- ForEach (*iter)
- {
- CGraphElementBase &element = iter->query();
- element.queryActivity()->done();
- }
- }
- IMPServer &CGraphBase::queryMPServer() const
- {
- return jobChannel.queryMPServer();
- }
- bool CGraphBase::syncInitData()
- {
- CGraphElementBase *parentElement = queryOwner() ? queryOwner()->queryElement(queryParentActivityId()) : NULL;
- if (parentElement && isLoopActivity(*parentElement))
- return parentElement->queryLoopGraph()->queryGraph()->isGlobal();
- else
- return !isLocalChild();
- }
- void CGraphBase::end()
- {
- // always called, any final action clear up
- Owned<IThorActivityIterator> iter = getIterator();
- ForEach(*iter)
- {
- CGraphElementBase &element = iter->query();
- try
- {
- if (element.queryActivity())
- element.queryActivity()->kill();
- }
- catch (IException *e)
- {
- Owned<IException> e2 = MakeActivityException(element.queryActivity(), e, "Error calling kill()");
- GraphPrintLog(e2);
- e->Release();
- }
- }
- }
- class CGraphTraverseIteratorBase : public CInterface, implements IThorActivityIterator
- {
- protected:
- CGraphBase &graph;
- Linked<CGraphElementBase> cur;
- CIArrayOf<CGraphElementBase> others;
- CGraphElementArrayCopy covered;
- CGraphElementBase *popNext()
- {
- if (!others.ordinality())
- {
- cur.clear();
- return NULL;
- }
- cur.setown(&others.popGet());
- return cur;
- }
- CGraphElementBase *setNext(CIOConnectionArray &inputs, unsigned whichInput=((unsigned)-1))
- {
- cur.clear();
- unsigned n = inputs.ordinality();
- if (((unsigned)-1) != whichInput)
- {
- CIOConnection *io = inputs.queryItem(whichInput);
- if (io)
- cur.set(io->activity);
- }
- else
- {
- bool first = true;
- unsigned i=0;
- for (; i<n; i++)
- {
- CIOConnection *io = inputs.queryItem(i);
- if (io)
- {
- if (first)
- {
- first = false;
- cur.set(io->activity);
- }
- else
- others.append(*LINK(io->activity));
- }
- }
- }
- if (!cur)
- {
- if (!popNext())
- return NULL;
- }
- // check haven't been here before
- loop
- {
- if (cur->getOutputs() < 2)
- break;
- else if (NotFound == covered.find(*cur))
- {
- if (!cur->alreadyUpdated)
- {
- covered.append(*cur);
- break;
- }
- }
- if (!popNext())
- return NULL;
- }
- return cur.get();
- }
- public:
- IMPLEMENT_IINTERFACE;
- CGraphTraverseIteratorBase(CGraphBase &_graph) : graph(_graph)
- {
- }
- virtual bool first()
- {
- covered.kill();
- others.kill();
- cur.clear();
- Owned<IThorActivityIterator> sinkIter = graph.getSinkIterator();
- if (!sinkIter->first())
- return false;
- loop
- {
- cur.set(& sinkIter->query());
- if (!cur->alreadyUpdated)
- break;
- if (!sinkIter->next())
- return false;
- }
- while (sinkIter->next())
- others.append(sinkIter->get());
- return true;
- }
- virtual bool isValid() { return NULL != cur.get(); }
- virtual CGraphElementBase & query() { return *cur; }
- CGraphElementBase & get() { return *LINK(cur); }
- };
- class CGraphTraverseConnectedIterator : public CGraphTraverseIteratorBase
- {
- public:
- CGraphTraverseConnectedIterator(CGraphBase &graph) : CGraphTraverseIteratorBase(graph) { }
- virtual bool next()
- {
- if (cur->hasNullInput)
- {
- do
- {
- if (!popNext())
- return false;
- }
- while (cur->hasNullInput);
- }
- else
- setNext(cur->connectedInputs);
- return NULL!=cur.get();
- }
- };
- IThorActivityIterator *CGraphBase::getConnectedIterator()
- {
- return new CGraphTraverseConnectedIterator(*this);
- }
- bool CGraphBase::wait(unsigned timeout)
- {
- CTimeMon tm(timeout);
- unsigned remaining = timeout;
- class CWaitException
- {
- CGraphBase *graph;
- Owned<IException> exception;
- public:
- CWaitException(CGraphBase *_graph) : graph(_graph) { }
- IException *get() { return exception; }
- void set(IException *e)
- {
- if (!exception)
- exception.setown(e);
- else
- e->Release();
- }
- void throwException()
- {
- if (exception)
- throw exception.getClear();
- throw MakeGraphException(graph, 0, "Timed out waiting for graph to end");
- }
- } waitException(this);
- Owned<IThorActivityIterator> iter = getConnectedIterator();
- ForEach (*iter)
- {
- CGraphElementBase &element = iter->query();
- CActivityBase *activity = element.queryActivity();
- if (INFINITE != timeout && tm.timedout(&remaining))
- waitException.throwException();
- try
- {
- if (!activity->wait(remaining))
- waitException.throwException();
- }
- catch (IException *e)
- {
- waitException.set(e); // will discard if already set
- if (timeout == INFINITE)
- {
- unsigned e = tm.elapsed();
- if (e >= MEDIUMTIMEOUT)
- waitException.throwException();
- timeout = MEDIUMTIMEOUT-e;
- tm.reset(timeout);
- }
- }
- }
- if (waitException.get())
- waitException.throwException();
- // synchronize all slaves to end of graphs
- if (NULL == owner || isGlobal())
- {
- if (INFINITE != timeout && tm.timedout(&remaining))
- waitException.throwException();
- if (!waitBarrier->wait(true, remaining))
- return false;
- }
- return true;
- }
- void CGraphBase::abort(IException *e)
- {
- if (aborted)
- return;
- {
- CriticalBlock cb(crit);
- abortException.set(e);
- aborted = true;
- graphCancelHandler.cancel(0);
- if (0 == containers.count())
- {
- Owned<IThorGraphIterator> iter = getChildGraphs();
- ForEach(*iter)
- {
- CGraphBase &graph = iter->query();
- graph.abort(e);
- }
- }
- }
- if (started && !graphDone)
- {
- Owned<IThorActivityIterator> iter = getConnectedIterator();
- ForEach (*iter)
- {
- iter->query().abort(e); // JCSMORE - could do in parallel, they can take some time to timeout
- }
- if (startBarrier)
- startBarrier->cancel(e);
- if (waitBarrier)
- waitBarrier->cancel(e);
- if (doneBarrier)
- doneBarrier->cancel(e);
- }
- }
- void CGraphBase::GraphPrintLog(const char *format, ...)
- {
- va_list args;
- va_start(args, format);
- ::GraphPrintLogArgs(this, thorlog_null, MCdebugProgress, format, args);
- va_end(args);
- }
- void CGraphBase::GraphPrintLog(IException *e, const char *format, ...)
- {
- va_list args;
- va_start(args, format);
- ::GraphPrintLogArgs(this, e, thorlog_null, MCdebugProgress, format, args);
- va_end(args);
- }
- void CGraphBase::GraphPrintLog(IException *e)
- {
- GraphPrintLog(e, "%s", "");
- }
- void CGraphBase::setLogging(bool tf)
- {
- Owned<IThorActivityIterator> iter = getIterator();
- ForEach(*iter)
- iter->query().setLogging(tf);
- }
- void CGraphBase::createFromXGMML(IPropertyTree *_node, CGraphBase *_owner, CGraphBase *_parent, CGraphBase *resultsGraph)
- {
- owner = _owner;
- parent = _parent?_parent:owner;
- node.setown(createPTreeFromIPT(_node));
- xgmml = node->queryPropTree("att/graph");
- sink = xgmml->getPropBool("att[@name=\"rootGraph\"]/@value", false);
- sequential = xgmml->getPropBool("@sequential");
- graphId = node->getPropInt("@id");
- global = false;
- localOnly = -1; // unset
- parentActivityId = node->getPropInt("att[@name=\"_parentActivity\"]/@value", 0);
- CGraphBase *graphContainer = this;
- if (resultsGraph)
- graphContainer = resultsGraph; // JCSMORE is this right?
- graphCodeContext.setContext(graphContainer, (ICodeContextExt *)&jobChannel.queryCodeContext());
- unsigned numResults = xgmml->getPropInt("att[@name=\"_numResults\"]/@value", 0);
- if (numResults)
- {
- localResults.setown(createThorGraphResults(numResults));
- resultsGraph = this;
- // JCSMORE - it might more sense if this temp handler was owned by parent act., which may finish(get stopped) earlier than the owning graph
- tmpHandler.setown(queryJob().createTempHandler(false));
- }
- localChild = false;
- if (owner && parentActivityId)
- {
- CGraphElementBase *parentElement = owner->queryElement(parentActivityId);
- parentElement->addAssociatedChildGraph(this);
- if (isLoopActivity(*parentElement))
- localChild = parentElement->queryOwner().isLocalChild();
- else
- localChild = true;
- }
- Owned<IPropertyTreeIterator> nodes = xgmml->getElements("node");
- ForEach(*nodes)
- {
- IPropertyTree &e = nodes->query();
- ThorActivityKind kind = (ThorActivityKind) e.getPropInt("att[@name=\"_kind\"]/@value");
- if (TAKsubgraph == kind)
- {
- Owned<CGraphBase> subGraph = queryJobChannel().createGraph();
- subGraph->createFromXGMML(&e, this, parent, resultsGraph);
- addChildGraph(*LINK(subGraph));
- if (!global)
- global = subGraph->isGlobal();
- }
- else
- {
- if (localChild && !e.getPropBool("att[@name=\"coLocal\"]/@value", false))
- {
- IPropertyTree *att = createPTree("att");
- att->setProp("@name", "coLocal");
- att->setPropBool("@value", true);
- e.addPropTree("att", att);
- }
- CGraphElementBase *act = createGraphElement(e, *this, resultsGraph);
- addActivity(act);
- if (!global)
- global = isGlobalActivity(*act);
- }
- }
- Owned<IPropertyTreeIterator> edges = xgmml->getElements("edge");
- ForEach(*edges)
- {
- IPropertyTree &edge = edges->query();
- unsigned sourceOutput = edge.getPropInt("att[@name=\"_sourceIndex\"]/@value", 0);
- unsigned targetInput = edge.getPropInt("att[@name=\"_targetIndex\"]/@value", 0);
- CGraphElementBase *source = queryElement(edge.getPropInt("@source"));
- CGraphElementBase *target = queryElement(edge.getPropInt("@target"));
- target->addInput(targetInput, source, sourceOutput);
- }
- Owned<IThorActivityIterator> iter = getIterator();
- ForEach(*iter)
- {
- CGraphElementBase &element = iter->query();
- if (0 == element.getOutputs())
- {
- /* JCSMORE - Making some outputs conditional, will require:
- * a) Pass through information as to which dependent graph causes this graph (and this sink) to execute)
- * b) Allow the subgraph to re-executed by other dependent subgraphs and avoid re-executing completed sinks
- * c) Keep common points (splitters) around (preferably in memory), re-execution of graph will need them
- */
- sinks.append(*LINK(&element));
- }
- }
- init();
- }
- void CGraphBase::executeChildGraphs(size32_t parentExtractSz, const byte *parentExtract)
- {
- // JCSMORE - would need to respect codegen 'sequential' flag, if these child graphs
- // could be executed in parallel.
- Owned<IThorGraphIterator> iter = getChildGraphs();
- ForEach(*iter)
- {
- CGraphBase &graph = iter->query();
- if (graph.isSink())
- graph.execute(parentExtractSz, parentExtract, true, false);
- }
- }
- void CGraphBase::doExecuteChild(size32_t parentExtractSz, const byte *parentExtract)
- {
- reset();
- if (0 == containers.count())
- executeChildGraphs(parentExtractSz, parentExtract);
- else
- execute(parentExtractSz, parentExtract, false, false);
- queryTempHandler()->clearTemps();
- }
- void CGraphBase::executeChild(size32_t & retSize, void * &ret, size32_t parentExtractSz, const byte *parentExtract)
- {
- reset();
- doExecute(parentExtractSz, parentExtract, false);
- UNIMPLEMENTED;
- /*
- ForEachItemIn(idx1, elements)
- {
- EclGraphElement & cur = elements.item(idx1);
- if (cur.isResult)
- {
- cur.extractResult(retSize, ret);
- return;
- }
- }
- */
- throwUnexpected();
- }
- void CGraphBase::setResults(IThorGraphResults *results) // used by master only
- {
- localResults.set(results);
- }
- void CGraphBase::executeChild(size32_t parentExtractSz, const byte *parentExtract, IThorGraphResults *results, IThorGraphResults *_graphLoopResults)
- {
- localResults.set(results);
- graphLoopResults.set(_graphLoopResults);
- doExecuteChild(parentExtractSz, parentExtract);
- graphLoopResults.clear();
- localResults.clear();
- }
- StringBuffer &getGlobals(CGraphBase &graph, StringBuffer &str)
- {
- bool first = true;
- Owned<IThorActivityIterator> iter = graph.getIterator();
- ForEach(*iter)
- {
- CGraphElementBase &e = iter->query();
- if (isGlobalActivity(e))
- {
- if (first)
- str.append("Graph(").append(graph.queryGraphId()).append("): [");
- else
- str.append(", ");
- first = false;
- ThorActivityKind kind = e.getKind();
- str.append(activityKindStr(kind));
- str.append("(").append(e.queryId()).append(")");
- }
- }
- if (!first)
- str.append("]");
- Owned<IThorGraphIterator> childIter = graph.getChildGraphs();
- ForEach(*childIter)
- {
- CGraphBase &childGraph = childIter->query();
- getGlobals(childGraph, str);
- }
- return str;
- }
- void CGraphBase::executeChild(size32_t parentExtractSz, const byte *parentExtract)
- {
- assertex(localResults);
- localResults->clear();
- if (isGlobal()) // any slave
- {
- StringBuffer str("Global acts = ");
- getGlobals(*this, str);
- throw MakeGraphException(this, 0, "Global child graph? : %s", str.str());
- }
- doExecuteChild(parentExtractSz, parentExtract);
- }
- IThorResult *CGraphBase::getResult(unsigned id, bool distributed)
- {
- return localResults->getResult(id, distributed);
- }
- IThorResult *CGraphBase::getGraphLoopResult(unsigned id, bool distributed)
- {
- return graphLoopResults->getResult(id, distributed);
- }
- IThorResult *CGraphBase::createResult(CActivityBase &activity, unsigned id, IThorGraphResults *results, IRowInterfaces *rowIf, bool distributed, unsigned spillPriority)
- {
- return results->createResult(activity, id, rowIf, distributed, spillPriority);
- }
- IThorResult *CGraphBase::createResult(CActivityBase &activity, unsigned id, IRowInterfaces *rowIf, bool distributed, unsigned spillPriority)
- {
- return localResults->createResult(activity, id, rowIf, distributed, spillPriority);
- }
- IThorResult *CGraphBase::createGraphLoopResult(CActivityBase &activity, IRowInterfaces *rowIf, bool distributed, unsigned spillPriority)
- {
- return graphLoopResults->createResult(activity, rowIf, distributed, spillPriority);
- }
- // IEclGraphResults
- void CGraphBase::getDictionaryResult(unsigned & count, byte * * & ret, unsigned id)
- {
- Owned<IThorResult> result = getResult(id, true); // will get collated distributed result
- result->getLinkedResult(count, ret);
- }
- void CGraphBase::getLinkedResult(unsigned & count, byte * * & ret, unsigned id)
- {
- Owned<IThorResult> result = getResult(id, true); // will get collated distributed result
- result->getLinkedResult(count, ret);
- }
- const void * CGraphBase::getLinkedRowResult(unsigned id)
- {
- Owned<IThorResult> result = getResult(id, true); // will get collated distributed result
- return result->getLinkedRowResult();
- }
- // IThorChildGraph impl.
- IEclGraphResults *CGraphBase::evaluate(unsigned _parentExtractSz, const byte *parentExtract)
- {
- CriticalBlock block(evaluateCrit);
- localResults.setown(createThorGraphResults(xgmml->getPropInt("att[@name=\"_numResults\"]/@value", 0)));
- parentExtractSz = _parentExtractSz;
- executeChild(parentExtractSz, parentExtract);
- return localResults.getClear();
- }
- static bool isLocalOnly(const CGraphElementBase &activity);
- static bool isLocalOnly(const CGraphBase &graph) // checks all dependencies, if something needs to be global, whole body is forced to be execution sync.
- {
- if (0 == graph.activityCount())
- {
- Owned<IThorGraphIterator> iter = graph.getChildGraphs();
- ForEach(*iter)
- {
- CGraphBase &childGraph = iter->query();
- if (childGraph.isSink())
- {
- if (!isLocalOnly(childGraph))
- return false;
- }
- }
- }
- else
- {
- if (graph.isGlobal())
- return false;
- Owned<IThorActivityIterator> sinkIter = graph.getAllSinkIterator();
- ForEach(*sinkIter)
- {
- CGraphElementBase &sink = sinkIter->query();
- if (!isLocalOnly(sink))
- return false;
- }
- }
- return true;
- }
- static bool isLocalOnly(const CGraphElementBase &activity)
- {
- Owned<IThorGraphDependencyIterator> deps = activity.getDependsIterator();
- ForEach(*deps)
- {
- if (!isLocalOnly(*(deps->query().graph)))
- return false;
- }
- StringBuffer match("edge[@target=\"");
- match.append(activity.queryId()).append("\"]");
- Owned<IPropertyTreeIterator> inputs = activity.queryOwner().queryXGMML().getElements(match.str());
- ForEach(*inputs)
- {
- CGraphElementBase *sourceAct = activity.queryOwner().queryElement(inputs->query().getPropInt("@source"));
- if (!isLocalOnly(*sourceAct))
- return false;
- }
- return true;
- }
- bool CGraphBase::isLocalOnly() const // checks all dependencies, if something needs to be global, whole body is forced to be execution sync.
- {
- if (-1 == localOnly)
- localOnly = (int)::isLocalOnly(*this);
- return 1==localOnly;
- }
- IThorGraphResults *CGraphBase::createThorGraphResults(unsigned num)
- {
- return new CThorGraphResults(num);
- }
- ////
- void CGraphTempHandler::registerFile(const char *name, graph_id graphId, unsigned usageCount, bool temp, WUFileKind fileKind, StringArray *clusters)
- {
- assertex(temp);
- LOG(MCdebugProgress, thorJob, "registerTmpFile name=%s, usageCount=%d", name, usageCount);
- CriticalBlock b(crit);
- if (tmpFiles.find(name))
- throw MakeThorException(TE_FileAlreadyUsedAsTempFile, "File already used as temp file (%s)", name);
- tmpFiles.replace(* new CFileUsageEntry(name, graphId, fileKind, usageCount));
- }
- void CGraphTempHandler::deregisterFile(const char *name, bool kept)
- {
- LOG(MCdebugProgress, thorJob, "deregisterTmpFile name=%s", name);
- CriticalBlock b(crit);
- CFileUsageEntry *fileUsage = tmpFiles.find(name);
- if (!fileUsage)
- {
- if (errorOnMissing)
- throw MakeThorException(TE_FileNotFound, "File not found (%s) deregistering tmp file", name);
- return;
- }
- if (0 == fileUsage->queryUsage()) // marked 'not to be deleted' until workunit complete.
- return;
- else if (1 == fileUsage->queryUsage())
- {
- tmpFiles.remove(name);
- try
- {
- if (!removeTemp(name))
- LOG(MCwarning, unknownJob, "Failed to delete tmp file : %s (not found)", name);
- }
- catch (IException *e) { StringBuffer s("Failed to delete tmp file : "); FLLOG(MCwarning, thorJob, e, s.append(name).str()); }
- }
- else
- fileUsage->decUsage();
- }
- void CGraphTempHandler::clearTemps()
- {
- CriticalBlock b(crit);
- Owned<IFileUsageIterator> iter = getIterator();
- ForEach(*iter)
- {
- CFileUsageEntry &entry = iter->query();
- const char *tmpname = entry.queryName();
- try
- {
- if (!removeTemp(tmpname))
- LOG(MCwarning, thorJob, "Failed to delete tmp file : %s (not found)", tmpname);
- }
- catch (IException *e) { StringBuffer s("Failed to delete tmp file : "); FLLOG(MCwarning, thorJob, e, s.append(tmpname).str()); }
- }
- iter.clear();
- tmpFiles.kill();
- }
- /////
- class CGraphExecutor;
- class CGraphExecutorGraphInfo : public CInterface
- {
- public:
- CGraphExecutorGraphInfo(CGraphExecutor &_executor, CGraphBase *_subGraph, IGraphCallback &_callback, const byte *parentExtract, size32_t parentExtractSz) : executor(_executor), subGraph(_subGraph), callback(_callback)
- {
- parentExtractMb.append(parentExtractSz, parentExtract);
- }
- CGraphExecutor &executor;
- IGraphCallback &callback;
- Linked<CGraphBase> subGraph;
- MemoryBuffer parentExtractMb;
- };
- class CGraphExecutor : public CInterface, implements IGraphExecutor
- {
- CJobChannel &jobChannel;
- CJobBase &job;
- CIArrayOf<CGraphExecutorGraphInfo> stack, running, toRun;
- UnsignedArray seen;
- bool stopped;
- unsigned limit;
- unsigned waitOnRunning;
- CriticalSection crit;
- Semaphore runningSem;
- Owned<IThreadPool> graphPool;
- class CGraphExecutorFactory : public CInterface, implements IThreadFactory
- {
- CGraphExecutor &executor;
- public:
- IMPLEMENT_IINTERFACE;
- CGraphExecutorFactory(CGraphExecutor &_executor) : executor(_executor) { }
- // IThreadFactory
- virtual IPooledThread *createNew()
- {
- class CGraphExecutorThread : public CInterface, implements IPooledThread
- {
- Owned<CGraphExecutorGraphInfo> graphInfo;
- public:
- IMPLEMENT_IINTERFACE;
- CGraphExecutorThread()
- {
- }
- void init(void *startInfo)
- {
- graphInfo.setown((CGraphExecutorGraphInfo *)startInfo);
- }
- void main()
- {
- loop
- {
- Linked<CGraphBase> graph = graphInfo->subGraph;
- Owned<IException> e;
- try
- {
- PROGLOG("CGraphExecutor: Running graph, graphId=%" GIDPF "d", graph->queryGraphId());
- graphInfo->callback.runSubgraph(*graph, graphInfo->parentExtractMb.length(), (const byte *)graphInfo->parentExtractMb.toByteArray());
- }
- catch (IException *_e)
- {
- e.setown(_e);
- }
- Owned<CGraphExecutorGraphInfo> nextGraphInfo;
- try
- {
- nextGraphInfo.setown(graphInfo->executor.graphDone(*graphInfo, e));
- }
- catch (IException *e)
- {
- GraphPrintLog(graph, e, "graphDone");
- e->Release();
- }
- if (e)
- throw e.getClear();
- if (!nextGraphInfo)
- return;
- graphInfo.setown(nextGraphInfo.getClear());
- }
- }
- bool canReuse() { return true; }
- bool stop() { return true; }
- };
- return new CGraphExecutorThread();
- }
- } *factory;
- CGraphExecutorGraphInfo *findRunning(graph_id gid)
- {
- ForEachItemIn(r, running)
- {
- CGraphExecutorGraphInfo *graphInfo = &running.item(r);
- if (gid == graphInfo->subGraph->queryGraphId())
- return graphInfo;
- }
- return NULL;
- }
- public:
- IMPLEMENT_IINTERFACE;
- CGraphExecutor(CJobChannel &_jobChannel) : jobChannel(_jobChannel), job(_jobChannel.queryJob())
- {
- limit = (unsigned)job.getWorkUnitValueInt("concurrentSubGraphs", globals->getPropInt("@concurrentSubGraphs", 1));
- PROGLOG("CGraphExecutor: limit = %d", limit);
- waitOnRunning = 0;
- stopped = false;
- factory = new CGraphExecutorFactory(*this);
- graphPool.setown(createThreadPool("CGraphExecutor pool", factory, &jobChannel, limit));
- }
- ~CGraphExecutor()
- {
- stopped = true;
- graphPool->joinAll();
- factory->Release();
- }
- CGraphExecutorGraphInfo *graphDone(CGraphExecutorGraphInfo &doneGraphInfo, IException *e)
- {
- CriticalBlock b(crit);
- running.zap(doneGraphInfo);
- if (waitOnRunning)
- {
- runningSem.signal(waitOnRunning);
- waitOnRunning = 0;
- }
- if (e || job.queryAborted())
- {
- stopped = true;
- stack.kill();
- return NULL;
- }
- if (job.queryPausing())
- stack.kill();
- else if (stack.ordinality())
- {
- CICopyArrayOf<CGraphExecutorGraphInfo> toMove;
- ForEachItemIn(s, stack)
- {
- bool dependenciesDone = true;
- CGraphExecutorGraphInfo &graphInfo = stack.item(s);
- ForEachItemIn (d, graphInfo.subGraph->dependentSubGraphs)
- {
- CGraphBase &subGraph = graphInfo.subGraph->dependentSubGraphs.item(d);
- if (!subGraph.isComplete())
- {
- dependenciesDone = false;
- break;
- }
- }
- if (dependenciesDone)
- {
- graphInfo.subGraph->dependentSubGraphs.kill();
- graphInfo.subGraph->prepare(graphInfo.parentExtractMb.length(), (const byte *)graphInfo.parentExtractMb.toByteArray(), true, true, true); // now existing deps done, maybe more to prepare
- ForEachItemIn (d, graphInfo.subGraph->dependentSubGraphs)
- {
- CGraphBase &subGraph = graphInfo.subGraph->dependentSubGraphs.item(d);
- if (!subGraph.isComplete())
- {
- dependenciesDone = false;
- break;
- }
- }
- if (dependenciesDone)
- {
- graphInfo.subGraph->dependentSubGraphs.kill(); // none to track anymore
- toMove.append(graphInfo);
- }
- }
- }
- ForEachItemIn(m, toMove)
- {
- Linked<CGraphExecutorGraphInfo> graphInfo = &toMove.item(m);
- stack.zap(*graphInfo);
- toRun.add(*graphInfo.getClear(), 0);
- }
- }
- job.markWuDirty();
- PROGLOG("CGraphExecutor running=%d, waitingToRun=%d, dependentsWaiting=%d", running.ordinality(), toRun.ordinality(), stack.ordinality());
- while (toRun.ordinality())
- {
- if (job.queryPausing())
- return NULL;
- Linked<CGraphExecutorGraphInfo> nextGraphInfo = &toRun.item(0);
- toRun.remove(0);
- if (!nextGraphInfo->subGraph->isComplete() && (NULL == findRunning(nextGraphInfo->subGraph->queryGraphId())))
- {
- running.append(*nextGraphInfo.getLink());
- return nextGraphInfo.getClear();
- }
- }
- return NULL;
- }
- // IGraphExecutor
- virtual void add(CGraphBase *subGraph, IGraphCallback &callback, bool checkDependencies, size32_t parentExtractSz, const byte *parentExtract)
- {
- bool alreadyRunning;
- {
- CriticalBlock b(crit);
- if (job.queryPausing())
- return;
- if (subGraph->isComplete())
- return;
- alreadyRunning = NULL != findRunning(subGraph->queryGraphId());
- if (alreadyRunning)
- ++waitOnRunning;
- }
- if (alreadyRunning)
- {
- loop
- {
- PROGLOG("Waiting on subgraph %" GIDPF "d", subGraph->queryGraphId());
- if (runningSem.wait(MEDIUMTIMEOUT) || job.queryAborted() || job.queryPausing())
- break;
- }
- return;
- }
- else
- {
- CriticalBlock b(crit);
- if (seen.contains(subGraph->queryGraphId()))
- return; // already queued;
- seen.append(subGraph->queryGraphId());
- }
- if (!subGraph->prepare(parentExtractSz, parentExtract, checkDependencies, true, true))
- {
- subGraph->setComplete();
- return;
- }
- if (subGraph->dependentSubGraphs.ordinality())
- {
- bool dependenciesDone = true;
- ForEachItemIn (d, subGraph->dependentSubGraphs)
- {
- CGraphBase &graph = subGraph->dependentSubGraphs.item(d);
- if (!graph.isComplete())
- {
- dependenciesDone = false;
- break;
- }
- }
- if (dependenciesDone)
- subGraph->dependentSubGraphs.kill(); // none to track anymore
- }
- Owned<CGraphExecutorGraphInfo> graphInfo = new CGraphExecutorGraphInfo(*this, subGraph, callback, parentExtract, parentExtractSz);
- CriticalBlock b(crit);
- if (0 == subGraph->dependentSubGraphs.ordinality())
- {
- if (running.ordinality()<limit)
- {
- running.append(*LINK(graphInfo));
- PROGLOG("Add: Launching graph thread for graphId=%" GIDPF "d", subGraph->queryGraphId());
- graphPool->start(graphInfo.getClear());
- }
- else
- stack.add(*graphInfo.getClear(), 0); // push to front, no dependency, free to run next.
- }
- else
- stack.append(*graphInfo.getClear()); // as dependencies finish, may move up the list
- }
- virtual IThreadPool &queryGraphPool() { return *graphPool; }
- virtual void wait()
- {
- PROGLOG("CGraphExecutor exiting, waiting on graph pool");
- graphPool->joinAll();
- PROGLOG("CGraphExecutor graphPool finished");
- }
- };
- ////
- // IContextLogger
- class CThorContextLogger : CSimpleInterface, implements IContextLogger
- {
- CJobBase &job;
- unsigned traceLevel;
- public:
- IMPLEMENT_IINTERFACE_USING(CSimpleInterface);
- CThorContextLogger(CJobBase &_job) : job(_job)
- {
- traceLevel = 1;
- }
- virtual void CTXLOGva(const char *format, va_list args) const __attribute__((format(printf,2,0)))
- {
- StringBuffer ss;
- ss.valist_appendf(format, args);
- LOG(MCdebugProgress, thorJob, "%s", ss.str());
- }
- virtual void logOperatorExceptionVA(IException *E, const char *file, unsigned line, const char *format, va_list args) const __attribute__((format(printf,5,0)))
- {
- StringBuffer ss;
- ss.append("ERROR");
- if (E)
- ss.append(": ").append(E->errorCode());
- if (file)
- ss.appendf(": %s(%d) ", file, line);
- if (E)
- E->errorMessage(ss.append(": "));
- if (format)
- ss.append(": ").valist_appendf(format, args);
- LOG(MCoperatorProgress, thorJob, "%s", ss.str());
- }
- virtual void noteStatistic(StatisticKind kind, unsigned __int64 value) const
- {
- }
- virtual void mergeStats(const CRuntimeStatisticCollection &from) const
- {
- }
- virtual unsigned queryTraceLevel() const
- {
- return traceLevel;
- }
- };
- ////
- CJobBase::CJobBase(const char *_graphName) : graphName(_graphName)
- {
- maxDiskUsage = diskUsage = 0;
- dirty = true;
- aborted = false;
- mpJobTag = TAG_NULL;
- globalMemorySize = globals->getPropInt("@globalMemorySize"); // in MB
- oldNodeCacheMem = 0;
- pluginMap = new SafePluginMap(&pluginCtx, true);
- // JCSMORE - Will pass down at job creation time...
- jobGroup.set(&::queryClusterGroup());
- slaveGroup.setown(jobGroup->remove(0));
- nodeGroup.set(&queryNodeGroup());
- }
- void CJobBase::init()
- {
- StringBuffer tmp;
- tmp.append(wuid);
- tmp.append(graphName);
- key.set(tmp.str());
- SCMStringBuffer tokenUser, password;
- extractToken(token.str(), wuid.str(), tokenUser, password);
- userDesc = createUserDescriptor();
- userDesc->set(user.str(), password.str());
- forceLogGraphIdMin = (graph_id)getWorkUnitValueInt("forceLogGraphIdMin", 0);
- forceLogGraphIdMax = (graph_id)getWorkUnitValueInt("forceLogGraphIdMax", 0);
- logctx.setown(new CThorContextLogger(*this));
- // global setting default on, can be overridden by #option
- timeActivities = 0 != getWorkUnitValueInt("timeActivities", globals->getPropBool("@timeActivities", true));
- maxActivityCores = (unsigned)getWorkUnitValueInt("maxActivityCores", globals->getPropInt("@maxActivityCores", 0)); // NB: 0 means system decides
- pausing = false;
- resumed = false;
- crcChecking = 0 != getWorkUnitValueInt("THOR_ROWCRC", globals->getPropBool("@THOR_ROWCRC", false));
- usePackedAllocator = 0 != getWorkUnitValueInt("THOR_PACKEDALLOCATOR", globals->getPropBool("@THOR_PACKEDALLOCATOR", true));
- memorySpillAt = (unsigned)getWorkUnitValueInt("memorySpillAt", globals->getPropInt("@memorySpillAt", 80));
- PROGLOG("Global memory size = %d MB, memory spill at = %d%%", globalMemorySize, memorySpillAt);
- StringBuffer tracing("maxActivityCores = ");
- if (maxActivityCores)
- tracing.append(maxActivityCores);
- else
- tracing.append("[unbound]");
- PROGLOG("%s", tracing.str());
- }
- void CJobBase::beforeDispose()
- {
- endJob();
- ForEachItemIn(c, jobChannels)
- jobChannels.item(c).clean();
- }
- CJobBase::~CJobBase()
- {
- ::Release(userDesc);
- ::Release(pluginMap);
- StringBuffer memStatsStr;
- roxiemem::memstats(memStatsStr);
- PROGLOG("Roxiemem stats: %s", memStatsStr.str());
- memsize_t heapUsage = getMapInfo("heap");
- if (heapUsage) // if 0, assumed to be unavailable
- PROGLOG("Heap usage : %" I64F "d bytes", (unsigned __int64)heapUsage);
- }
- CJobChannel &CJobBase::queryJobChannel(unsigned c) const
- {
- return jobChannels.item(c);
- }
- CActivityBase &CJobBase::queryChannelActivity(unsigned c, graph_id gid, activity_id id) const
- {
- CJobChannel &channel = queryJobChannel(c);
- Owned<CGraphBase> graph = channel.getGraph(gid);
- dbgassertex(graph);
- CGraphElementBase *container = graph->queryElement(id);
- dbgassertex(container);
- return *container->queryActivity();
- }
- void CJobBase::startJob()
- {
- LOG(MCdebugProgress, thorJob, "New Graph started : %s", graphName.get());
- ClearTempDirs();
- unsigned pinterval = globals->getPropInt("@system_monitor_interval",1000*60);
- if (pinterval)
- {
- perfmonhook.setown(createThorMemStatsPerfMonHook(*this, getOptInt(THOROPT_MAX_KERNLOG, 3)));
- startPerformanceMonitor(pinterval,PerfMonStandard,perfmonhook);
- }
- PrintMemoryStatusLog();
- logDiskSpace();
- unsigned keyNodeCacheMB = (unsigned)getWorkUnitValueInt("keyNodeCacheMB", 0);
- if (keyNodeCacheMB)
- {
- oldNodeCacheMem = setNodeCacheMem(keyNodeCacheMB * 0x100000);
- PROGLOG("Key node cache size set to: %d MB", keyNodeCacheMB);
- }
- unsigned keyFileCacheLimit = (unsigned)getWorkUnitValueInt("keyFileCacheLimit", 0);
- if (!keyFileCacheLimit)
- keyFileCacheLimit = (querySlaves()+1)*2;
- setKeyIndexCacheSize(keyFileCacheLimit);
- PROGLOG("Key file cache size set to: %d", keyFileCacheLimit);
- }
- void CJobBase::endJob()
- {
- stopPerformanceMonitor();
- LOG(MCdebugProgress, thorJob, "Job ended : %s", graphName.get());
- clearKeyStoreCache(true);
- if (oldNodeCacheMem)
- setNodeCacheMem(oldNodeCacheMem);
- PrintMemoryStatusLog();
- }
- bool CJobBase::queryForceLogging(graph_id graphId, bool def) const
- {
- // JCSMORE, could add comma separated range, e.g. 1-5,10-12
- if ((graphId >= forceLogGraphIdMin) && (graphId <= forceLogGraphIdMax))
- return true;
- return def;
- }
- static void getGlobalDeps(CGraphBase &graph, CICopyArrayOf<CGraphDependency> &deps)
- {
- Owned<IThorActivityIterator> iter = graph.getIterator();
- ForEach(*iter)
- {
- CGraphElementBase &elem = iter->query();
- Owned<IThorGraphDependencyIterator> dependIterator = elem.getDependsIterator();
- ForEach(*dependIterator)
- {
- CGraphDependency &dependency = dependIterator->query();
- if (dependency.graph->isGlobal() && NULL==dependency.graph->queryOwner())
- deps.append(dependency);
- getGlobalDeps(*dependency.graph, deps);
- }
- }
- }
- void CJobBase::addSubGraph(IPropertyTree &xgmml)
- {
- CriticalBlock b(crit);
- for (unsigned c=0; c<queryJobChannels(); c++)
- {
- CJobChannel &jobChannel = queryJobChannel(c);
- Owned<CGraphBase> subGraph = jobChannel.createGraph();
- // JCSMORE - clone, instead of recrete
- subGraph->createFromXGMML(&xgmml, NULL, NULL, NULL);
- jobChannel.addSubGraph(*subGraph.getClear());
- }
- }
- void CJobBase::addDependencies(IPropertyTree *xgmml, bool failIfMissing)
- {
- for (unsigned c=0; c<queryJobChannels(); c++)
- {
- CJobChannel &jobChannel = queryJobChannel(c);
- jobChannel.addDependencies(xgmml, failIfMissing);
- }
- }
- bool CJobBase::queryUseCheckpoints() const
- {
- return globals->getPropBool("@checkPointRecovery") || 0 != getWorkUnitValueInt("checkPointRecovery", 0);
- }
- void CJobBase::abort(IException *e)
- {
- aborted = true;
- for (unsigned c=0; c<queryJobChannels(); c++)
- {
- CJobChannel &jobChannel = queryJobChannel(c);
- jobChannel.abort(e);
- }
- }
- void CJobBase::increase(offset_t usage, const char *key)
- {
- diskUsage += usage;
- if (diskUsage > maxDiskUsage) maxDiskUsage = diskUsage;
- }
- void CJobBase::decrease(offset_t usage, const char *key)
- {
- diskUsage -= usage;
- }
- // these getX methods for property in workunit settings, then global setting, defaulting to provided 'dft' if not present
- StringBuffer &CJobBase::getOpt(const char *opt, StringBuffer &out)
- {
- if (!opt || !*opt)
- return out; // probably error
- VStringBuffer gOpt("Debug/@%s", opt);
- getWorkUnitValue(opt, out);
- if (0 == out.length())
- globals->getProp(gOpt, out);
- return out;
- }
- bool CJobBase::getOptBool(const char *opt, bool dft)
- {
- if (!opt || !*opt)
- return dft; // probably error
- VStringBuffer gOpt("Debug/@%s", opt);
- return getWorkUnitValueBool(opt, globals->getPropBool(gOpt, dft));
- }
- int CJobBase::getOptInt(const char *opt, int dft)
- {
- if (!opt || !*opt)
- return dft; // probably error
- VStringBuffer gOpt("Debug/@%s", opt);
- return (int)getWorkUnitValueInt(opt, globals->getPropInt(gOpt, dft));
- }
- __int64 CJobBase::getOptInt64(const char *opt, __int64 dft)
- {
- if (!opt || !*opt)
- return dft; // probably error
- VStringBuffer gOpt("Debug/@%s", opt);
- return getWorkUnitValueInt(opt, globals->getPropInt64(gOpt, dft));
- }
- IThorAllocator *CJobBase::createThorAllocator()
- {
- return ::createThorAllocator(((memsize_t)globalMemorySize)*0x100000, memorySpillAt, *logctx, crcChecking, usePackedAllocator);
- }
- /// CJobChannel
- CJobChannel::CJobChannel(CJobBase &_job, IMPServer *_mpServer, unsigned _channel)
- : job(_job), mpServer(_mpServer), channel(_channel)
- {
- aborted = false;
- codeCtx = NULL;
- thorAllocator.setown(job.createThorAllocator());
- timeReporter = createStdTimeReporter();
- jobComm.setown(mpServer->createCommunicator(&job.queryJobGroup()));
- myrank = job.queryJobGroup().rank(queryMyNode());
- graphExecutor.setown(new CGraphExecutor(*this));
- }
- CJobChannel::~CJobChannel()
- {
- queryRowManager().reportMemoryUsage(false);
- PROGLOG("CJobBase resetting memory manager");
- thorAllocator.clear();
- wait();
- clean();
- ::Release(codeCtx);
- timeReporter->Release();
- }
- INode *CJobChannel::queryMyNode()
- {
- return mpServer->queryMyNode();
- }
- void CJobChannel::wait()
- {
- if (graphExecutor)
- graphExecutor->wait();
- }
- ICodeContext &CJobChannel::queryCodeContext() const
- {
- return *codeCtx;
- }
- mptag_t CJobChannel::deserializeMPTag(MemoryBuffer &mb)
- {
- mptag_t tag;
- deserializeMPtag(mb, tag);
- if (TAG_NULL != tag)
- {
- PROGLOG("deserializeMPTag: tag = %d", (int)tag);
- jobComm->flush(tag);
- }
- return tag;
- }
- IEngineRowAllocator *CJobChannel::getRowAllocator(IOutputMetaData * meta, activity_id activityId, roxiemem::RoxieHeapFlags flags) const
- {
- return thorAllocator->getRowAllocator(meta, activityId, flags);
- }
- roxiemem::IRowManager &CJobChannel::queryRowManager() const
- {
- return thorAllocator->queryRowManager();
- }
- static void noteDependency(CGraphElementBase *targetActivity, CGraphElementBase *sourceActivity, CGraphBase *targetGraph, CGraphBase *sourceGraph, unsigned controlId)
- {
- targetActivity->addDependsOn(sourceGraph, controlId);
- // NB: record dependency in source graph, serialized to slaves, used to decided if should run dependency sinks or not
- Owned<IPropertyTree> dependencyFor = createPTree();
- dependencyFor->setPropInt("@id", sourceActivity->queryId());
- dependencyFor->setPropInt("@graphId", targetGraph->queryGraphId());
- if (controlId)
- dependencyFor->setPropInt("@conditionalId", controlId);
- sourceGraph->queryXGMML().addPropTree("Dependency", dependencyFor.getClear());
- }
- void CJobChannel::addDependencies(IPropertyTree *xgmml, bool failIfMissing)
- {
- CGraphArrayCopy childGraphs;
- CGraphElementArrayCopy targetActivities, sourceActivities;
- Owned<IPropertyTreeIterator> iter = xgmml->getElements("edge");
- ForEach(*iter)
- {
- IPropertyTree &edge = iter->query();
- Owned<CGraphBase> source = getGraph(edge.getPropInt("@source"));
- Owned<CGraphBase> target = getGraph(edge.getPropInt("@target"));
- if (!source || !target)
- {
- if (failIfMissing)
- throwUnexpected();
- else
- continue; // expected if assigning dependencies in slaves
- }
- CGraphElementBase *targetActivity = (CGraphElementBase *)target->queryElement(edge.getPropInt("att[@name=\"_targetActivity\"]/@value"));
- CGraphElementBase *sourceActivity = (CGraphElementBase *)source->queryElement(edge.getPropInt("att[@name=\"_sourceActivity\"]/@value"));
- int controlId = 0;
- if (edge.getPropBool("att[@name=\"_dependsOn\"]/@value", false))
- {
- if (!edge.getPropBool("att[@name=\"_childGraph\"]/@value", false)) // JCSMORE - not sure if necess. roxie seem to do.
- controlId = edge.getPropInt("att[@name=\"_when\"]/@value", 0);
- CGraphBase &sourceGraph = sourceActivity->queryOwner();
- unsigned sourceGraphContext = sourceGraph.queryParentActivityId();
- CGraphBase *targetGraph = NULL;
- unsigned targetGraphContext = -1;
- loop
- {
- targetGraph = &targetActivity->queryOwner();
- targetGraphContext = targetGraph->queryParentActivityId();
- if (sourceGraphContext == targetGraphContext)
- break;
- targetActivity = targetGraph->queryElement(targetGraphContext);
- }
- assertex(targetActivity && sourceActivity);
- noteDependency(targetActivity, sourceActivity, target, source, controlId);
- }
- else if (edge.getPropBool("att[@name=\"_conditionSource\"]/@value", false))
- { /* Ignore it */ }
- else if (edge.getPropBool("att[@name=\"_childGraph\"]/@value", false))
- {
- // NB: any dependencies of the child acts. are dependencies of this act.
- childGraphs.append(*source);
- targetActivities.append(*targetActivity);
- sourceActivities.append(*sourceActivity);
- }
- else
- {
- if (!edge.getPropBool("att[@name=\"_childGraph\"]/@value", false)) // JCSMORE - not sure if necess. roxie seem to do.
- controlId = edge.getPropInt("att[@name=\"_when\"]/@value", 0);
- noteDependency(targetActivity, sourceActivity, target, source, controlId);
- }
- }
- ForEachItemIn(c, childGraphs)
- {
- CGraphBase &childGraph = childGraphs.item(c);
- CGraphElementBase &targetActivity = targetActivities.item(c);
- CGraphElementBase &sourceActivity = sourceActivities.item(c);
- if (!childGraph.isGlobal())
- {
- CICopyArrayOf<CGraphDependency> globalChildGraphDeps;
- getGlobalDeps(childGraph, globalChildGraphDeps);
- ForEachItemIn(gcd, globalChildGraphDeps)
- {
- CGraphDependency &globalDep = globalChildGraphDeps.item(gcd);
- noteDependency(&targetActivity, &sourceActivity, globalDep.graph, &childGraph, globalDep.controlId);
- }
- }
- }
- SuperHashIteratorOf<CGraphBase> allIter(allGraphs);
- ForEach(allIter)
- {
- CGraphBase &subGraph = allIter.query();
- if (subGraph.queryOwner() && subGraph.queryParentActivityId())
- {
- CGraphElementBase *parentElement = subGraph.queryOwner()->queryElement(subGraph.queryParentActivityId());
- if (isLoopActivity(*parentElement))
- {
- if (!parentElement->queryOwner().isLocalChild() && !subGraph.isLocalOnly())
- subGraph.setGlobal(true);
- }
- }
- bool log = queryJob().queryForceLogging(subGraph.queryGraphId(), (NULL == subGraph.queryOwner()) || subGraph.isGlobal());
- subGraph.setLogging(log);
- }
- }
- IThorGraphIterator *CJobChannel::getSubGraphs()
- {
- CriticalBlock b(crit);
- return new CGraphTableIterator(subGraphs);
- }
- void CJobChannel::clean()
- {
- if (graphExecutor)
- {
- graphExecutor->queryGraphPool().stopAll();
- graphExecutor.clear();
- }
- subGraphs.kill();
- }
- void CJobChannel::startGraph(CGraphBase &graph, IGraphCallback &callback, bool checkDependencies, size32_t parentExtractSize, const byte *parentExtract)
- {
- graphExecutor->add(&graph, callback, checkDependencies, parentExtractSize, parentExtract);
- }
- IThorResult *CJobChannel::getOwnedResult(graph_id gid, activity_id ownerId, unsigned resultId)
- {
- Owned<CGraphBase> graph = getGraph(gid);
- if (!graph)
- {
- Owned<IThorException> e = MakeThorException(0, "getOwnedResult: graph not found");
- e->setGraphInfo(queryJob().queryGraphName(), gid);
- throw e.getClear();
- }
- Owned<IThorResult> result;
- if (ownerId)
- {
- CGraphElementBase *container = graph->queryElement(ownerId);
- assertex(container);
- CActivityBase *activity = container->queryActivity();
- IThorGraphResults *results = activity->queryResults();
- if (!results)
- throw MakeGraphException(graph, 0, "GraphGetResult: no results created (requesting: %d)", resultId);
- result.setown(activity->queryResults()->getResult(resultId));
- }
- else
- result.setown(graph->getResult(resultId));
- if (!result)
- throw MakeGraphException(graph, 0, "GraphGetResult: result not found: %d", resultId);
- return result.getClear();
- }
- void CJobChannel::abort(IException *e)
- {
- aborted = true;
- Owned<IThorGraphIterator> iter = getSubGraphs();
- ForEach (*iter)
- {
- CGraphBase &graph = iter->query();
- graph.abort(e);
- }
- }
- // IGraphCallback
- void CJobChannel::runSubgraph(CGraphBase &graph, size32_t parentExtractSz, const byte *parentExtract)
- {
- graph.executeSubGraph(parentExtractSz, parentExtract);
- }
- static IThorResource *iThorResource = NULL;
- void setIThorResource(IThorResource &r)
- {
- iThorResource = &r;
- }
- IThorResource &queryThor()
- {
- return *iThorResource;
- }
- //
- //
- //
- //
- CActivityBase::CActivityBase(CGraphElementBase *_container) : container(*_container), timeActivities(_container->queryJob().queryTimeActivities())
- {
- mpTag = TAG_NULL;
- abortSoon = receiving = cancelledReceive = reInit = false;
- baseHelper.set(container.queryHelper());
- parentExtractSz = 0;
- parentExtract = NULL;
- // NB: maxCores, currently only used to control # cores used by sorts
- maxCores = container.queryXGMML().getPropInt("hint[@name=\"max_cores\"]/@value", container.queryJob().queryMaxDefaultActivityCores());
- }
- CActivityBase::~CActivityBase()
- {
- }
- void CActivityBase::abort()
- {
- if (!abortSoon) ActPrintLog("Abort condition set");
- abortSoon = true;
- }
- void CActivityBase::kill()
- {
- ownedResults.clear();
- }
- bool CActivityBase::appendRowXml(StringBuffer & target, IOutputMetaData & meta, const void * row) const
- {
- if (!meta.hasXML())
- {
- target.append("<xml-unavailable/>");
- return false;
- }
- try
- {
- CommonXmlWriter xmlWrite(XWFnoindent);
- meta.toXML((byte *) row, xmlWrite);
- target.append(xmlWrite.str());
- return true;
- }
- catch (IException * e)
- {
- e->Release();
- target.append("<invalid-row/>");
- return false;
- }
- }
- void CActivityBase::logRow(const char * prefix, IOutputMetaData & meta, const void * row)
- {
- bool blindLogging = false; // MORE: should check a workunit/global option
- if (meta.hasXML() && !blindLogging)
- {
- StringBuffer xml;
- appendRowXml(xml, meta, row);
- ActPrintLog("%s: %s", prefix, xml.str());
- }
- }
- void CActivityBase::ActPrintLog(const char *format, ...)
- {
- va_list args;
- va_start(args, format);
- ::ActPrintLogArgs(&queryContainer(), thorlog_null, MCdebugProgress, format, args);
- va_end(args);
- }
- void CActivityBase::ActPrintLog(IException *e, const char *format, ...)
- {
- va_list args;
- va_start(args, format);
- ::ActPrintLogArgs(&queryContainer(), e, thorlog_all, MCexception(e), format, args);
- va_end(args);
- }
- void CActivityBase::ActPrintLog(IException *e)
- {
- ActPrintLog(e, "%s", "");
- }
- bool CActivityBase::fireException(IException *e)
- {
- Owned<IThorException> _te;
- IThorException *te = QUERYINTERFACE(e, IThorException);
- if (te)
- {
- if (!te->queryActivityId())
- setExceptionActivityInfo(container, te);
- }
- else
- {
- te = MakeActivityException(this, e);
- te->setAudience(e->errorAudience());
- _te.setown(te);
- }
- return container.queryOwner().fireException(te);
- }
- void CActivityBase::processAndThrowOwnedException(IException * _e)
- {
- IThorException *e = QUERYINTERFACE(_e, IThorException);
- if (e)
- {
- if (!e->queryActivityId())
- setExceptionActivityInfo(container, e);
- }
- else
- {
- e = MakeActivityException(this, _e);
- _e->Release();
- }
- throw e;
- }
- IEngineRowAllocator * CActivityBase::queryRowAllocator()
- {
- if (CABallocatorlock.lock()) {
- if (!rowAllocator)
- rowAllocator.setown(getRowAllocator(queryRowMetaData()));
- CABallocatorlock.unlock();
- }
- return rowAllocator;
- }
-
- IOutputRowSerializer * CActivityBase::queryRowSerializer()
- {
- if (CABserializerlock.lock()) {
- if (!rowSerializer)
- rowSerializer.setown(queryRowMetaData()->createDiskSerializer(queryCodeContext(),queryId()));
- CABserializerlock.unlock();
- }
- return rowSerializer;
- }
- IOutputRowDeserializer * CActivityBase::queryRowDeserializer()
- {
- if (CABdeserializerlock.lock()) {
- if (!rowDeserializer)
- rowDeserializer.setown(queryRowMetaData()->createDiskDeserializer(queryCodeContext(),queryId()));
- CABdeserializerlock.unlock();
- }
- return rowDeserializer;
- }
- IRowInterfaces *CActivityBase::getRowInterfaces()
- {
- // create an independent instance, to avoid circular link dependency problems
- return createRowInterfaces(queryRowMetaData(), container.queryId(), queryCodeContext());
- }
- IEngineRowAllocator *CActivityBase::getRowAllocator(IOutputMetaData * meta, roxiemem::RoxieHeapFlags flags) const
- {
- return queryJobChannel().getRowAllocator(meta, queryId(), flags);
- }
- bool CActivityBase::receiveMsg(CMessageBuffer &mb, const rank_t rank, const mptag_t mpTag, rank_t *sender, unsigned timeout)
- {
- BooleanOnOff onOff(receiving);
- CTimeMon t(timeout);
- unsigned remaining = timeout;
- // check 'cancelledReceive' every 10 secs
- while (!cancelledReceive && ((MP_WAIT_FOREVER==timeout) || !t.timedout(&remaining)))
- {
- if (queryJobChannel().queryJobComm().recv(mb, rank, mpTag, sender, remaining>10000?10000:remaining))
- return true;
- }
- return false;
- }
- void CActivityBase::cancelReceiveMsg(const rank_t rank, const mptag_t mpTag)
- {
- cancelledReceive = true;
- if (receiving)
- queryJobChannel().queryJobComm().cancel(rank, mpTag);
- }
|