12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214121512161217121812191220122112221223122412251226122712281229123012311232123312341235123612371238123912401241124212431244124512461247124812491250125112521253125412551256125712581259126012611262126312641265126612671268126912701271127212731274127512761277127812791280128112821283128412851286128712881289129012911292129312941295129612971298129913001301130213031304130513061307130813091310131113121313131413151316131713181319132013211322132313241325132613271328132913301331133213331334133513361337133813391340134113421343134413451346134713481349135013511352135313541355135613571358135913601361136213631364136513661367136813691370137113721373137413751376137713781379138013811382138313841385138613871388138913901391139213931394139513961397139813991400140114021403140414051406140714081409141014111412141314141415141614171418141914201421142214231424142514261427142814291430143114321433143414351436143714381439144014411442144314441445144614471448144914501451145214531454145514561457145814591460146114621463146414651466146714681469147014711472147314741475147614771478147914801481148214831484148514861487148814891490149114921493149414951496149714981499150015011502150315041505150615071508150915101511151215131514151515161517151815191520152115221523152415251526152715281529153015311532153315341535153615371538153915401541154215431544154515461547154815491550155115521553155415551556155715581559156015611562156315641565156615671568156915701571157215731574157515761577157815791580158115821583158415851586158715881589159015911592159315941595159615971598159916001601160216031604160516061607160816091610161116121613161416151616161716181619162016211622162316241625162616271628162916301631163216331634163516361637163816391640164116421643164416451646164716481649165016511652165316541655165616571658165916601661166216631664166516661667166816691670167116721673167416751676167716781679168016811682168316841685168616871688168916901691169216931694169516961697169816991700170117021703170417051706170717081709171017111712171317141715171617171718171917201721172217231724172517261727172817291730173117321733173417351736173717381739174017411742174317441745174617471748174917501751175217531754175517561757175817591760176117621763176417651766176717681769177017711772177317741775177617771778177917801781178217831784178517861787178817891790179117921793179417951796179717981799180018011802180318041805180618071808180918101811181218131814181518161817181818191820182118221823182418251826182718281829183018311832183318341835183618371838183918401841184218431844184518461847184818491850185118521853185418551856185718581859186018611862186318641865186618671868186918701871187218731874187518761877187818791880188118821883188418851886188718881889189018911892189318941895189618971898189919001901190219031904190519061907190819091910191119121913191419151916191719181919192019211922192319241925192619271928192919301931193219331934193519361937193819391940194119421943194419451946194719481949195019511952195319541955195619571958195919601961196219631964196519661967196819691970197119721973197419751976197719781979198019811982198319841985198619871988198919901991199219931994199519961997199819992000200120022003200420052006200720082009201020112012201320142015201620172018201920202021202220232024202520262027202820292030203120322033203420352036203720382039204020412042204320442045204620472048204920502051205220532054205520562057205820592060206120622063206420652066206720682069207020712072207320742075207620772078207920802081208220832084208520862087208820892090209120922093209420952096209720982099210021012102210321042105210621072108210921102111211221132114211521162117211821192120212121222123212421252126212721282129213021312132213321342135213621372138213921402141214221432144214521462147214821492150215121522153215421552156215721582159216021612162216321642165216621672168216921702171217221732174217521762177217821792180218121822183218421852186218721882189219021912192219321942195219621972198219922002201220222032204220522062207220822092210221122122213221422152216221722182219222022212222222322242225222622272228222922302231223222332234223522362237223822392240224122422243224422452246224722482249225022512252225322542255225622572258225922602261226222632264226522662267226822692270227122722273227422752276227722782279228022812282228322842285228622872288228922902291229222932294229522962297229822992300230123022303230423052306230723082309231023112312231323142315231623172318231923202321232223232324232523262327232823292330233123322333233423352336233723382339234023412342234323442345234623472348234923502351235223532354235523562357235823592360236123622363236423652366236723682369237023712372237323742375237623772378237923802381238223832384238523862387238823892390239123922393239423952396239723982399240024012402240324042405240624072408240924102411241224132414241524162417241824192420242124222423242424252426242724282429243024312432243324342435243624372438243924402441244224432444244524462447244824492450245124522453245424552456245724582459246024612462246324642465246624672468246924702471247224732474247524762477247824792480248124822483248424852486248724882489249024912492249324942495249624972498249925002501250225032504250525062507250825092510251125122513251425152516251725182519252025212522252325242525252625272528252925302531253225332534253525362537253825392540254125422543254425452546254725482549255025512552255325542555255625572558255925602561256225632564256525662567256825692570257125722573257425752576257725782579258025812582258325842585258625872588258925902591259225932594259525962597259825992600260126022603260426052606260726082609261026112612261326142615261626172618261926202621262226232624262526262627262826292630263126322633263426352636263726382639264026412642264326442645264626472648264926502651265226532654265526562657265826592660266126622663266426652666266726682669267026712672267326742675267626772678267926802681268226832684268526862687268826892690269126922693269426952696269726982699270027012702270327042705270627072708270927102711271227132714271527162717271827192720272127222723272427252726272727282729273027312732273327342735273627372738273927402741274227432744274527462747274827492750275127522753275427552756275727582759276027612762276327642765276627672768276927702771277227732774277527762777277827792780278127822783278427852786278727882789279027912792279327942795279627972798279928002801280228032804280528062807280828092810281128122813281428152816281728182819282028212822282328242825282628272828282928302831283228332834283528362837283828392840284128422843284428452846284728482849285028512852285328542855285628572858285928602861286228632864286528662867286828692870287128722873287428752876287728782879288028812882288328842885288628872888288928902891289228932894289528962897289828992900290129022903290429052906290729082909291029112912291329142915291629172918291929202921292229232924292529262927292829292930293129322933293429352936293729382939294029412942294329442945294629472948294929502951295229532954295529562957295829592960296129622963296429652966296729682969297029712972297329742975297629772978297929802981298229832984298529862987298829892990299129922993299429952996299729982999300030013002300330043005300630073008300930103011301230133014301530163017301830193020302130223023302430253026302730283029303030313032303330343035303630373038303930403041304230433044304530463047304830493050305130523053305430553056305730583059 |
- /*##############################################################################
- HPCC SYSTEMS software Copyright (C) 2012 HPCC Systems®.
- Licensed under the Apache License, Version 2.0 (the "License");
- you may not use this file except in compliance with the License.
- You may obtain a copy of the License at
- http://www.apache.org/licenses/LICENSE-2.0
- Unless required by applicable law or agreed to in writing, software
- distributed under the License is distributed on an "AS IS" BASIS,
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- See the License for the specific language governing permissions and
- limitations under the License.
- ############################################################################## */
- #include "thgraph.hpp"
- #include "jptree.hpp"
- #include "commonext.hpp"
- #include "dasess.hpp"
- #include "jhtree.hpp"
- #include "thcodectx.hpp"
- #include "thbuf.hpp"
- #include "thormisc.hpp"
- #include "thbufdef.hpp"
- #include "thmem.hpp"
- PointerArray createFuncs;
- void registerCreateFunc(CreateFunc func)
- {
- createFuncs.append((void *)func);
- }
- ///////////////////////////////////
-
- //////
- /////
- class CThorGraphResult : public CInterface, implements IThorResult, implements IRowWriter
- {
- CActivityBase &activity;
- rowcount_t rowStreamCount;
- IOutputMetaData *meta;
- Owned<IRowWriterMultiReader> rowBuffer;
- IRowInterfaces *rowIf;
- IEngineRowAllocator *allocator;
- bool stopped, readers, distributed;
- void init()
- {
- stopped = readers = false;
- allocator = rowIf->queryRowAllocator();
- meta = allocator->queryOutputMeta();
- rowStreamCount = 0;
- }
- class CStreamWriter : public CSimpleInterface, implements IRowWriterMultiReader
- {
- CThorGraphResult &owner;
- CThorExpandingRowArray rows;
- public:
- IMPLEMENT_IINTERFACE_USING(CSimpleInterface);
- CStreamWriter(CThorGraphResult &_owner) : owner(_owner), rows(owner.activity, owner.rowIf, true)
- {
- }
- //IRowWriterMultiReader
- virtual void putRow(const void *row)
- {
- rows.append(row);
- }
- virtual void flush() { }
- virtual IRowStream *getReader()
- {
- return rows.createRowStream(0, (rowidx_t)-1, false);
- }
- };
- public:
- IMPLEMENT_IINTERFACE;
- CThorGraphResult(CActivityBase &_activity, IRowInterfaces *_rowIf, bool _distributed, unsigned spillPriority) : activity(_activity), rowIf(_rowIf), distributed(_distributed)
- {
- init();
- if (SPILL_PRIORITY_DISABLE == spillPriority)
- rowBuffer.setown(new CStreamWriter(*this));
- else
- rowBuffer.setown(createOverflowableBuffer(activity, rowIf, true, true));
- }
- // IRowWriter
- virtual void putRow(const void *row)
- {
- assertex(!readers);
- ++rowStreamCount;
- rowBuffer->putRow(row);
- }
- virtual void flush() { }
- virtual offset_t getPosition() { UNIMPLEMENTED; return 0; }
- // IThorResult
- virtual IRowWriter *getWriter()
- {
- return LINK(this);
- }
- virtual void setResultStream(IRowWriterMultiReader *stream, rowcount_t count)
- {
- assertex(!readers);
- rowBuffer.setown(stream);
- rowStreamCount = count;
- }
- virtual IRowStream *getRowStream()
- {
- readers = true;
- return rowBuffer->getReader();
- }
- virtual IRowInterfaces *queryRowInterfaces() { return rowIf; }
- virtual CActivityBase *queryActivity() { return &activity; }
- virtual bool isDistributed() const { return distributed; }
- virtual void serialize(MemoryBuffer &mb)
- {
- Owned<IRowStream> stream = getRowStream();
- bool grouped = meta->isGrouped();
- if (grouped)
- {
- OwnedConstThorRow prev = stream->nextRow();
- if (prev)
- {
- bool eog;
- loop
- {
- eog = false;
- OwnedConstThorRow next = stream->nextRow();
- if (!next)
- eog = true;
- size32_t sz = meta->getRecordSize(prev);
- mb.append(sz, prev.get());
- mb.append(eog);
- if (!next)
- {
- next.setown(stream->nextRow());
- if (!next)
- break;
- }
- prev.set(next);
- }
- }
- }
- else
- {
- loop
- {
- OwnedConstThorRow row = stream->nextRow();
- if (!row)
- break;
- size32_t sz = meta->getRecordSize(row);
- mb.append(sz, row.get());
- }
- }
- }
- virtual void getResult(size32_t &len, void * & data)
- {
- MemoryBuffer mb;
- serialize(mb);
- len = mb.length();
- data = mb.detach();
- }
- virtual void getLinkedResult(unsigned &countResult, byte * * & result)
- {
- assertex(rowStreamCount==((unsigned)rowStreamCount)); // catch, just in case
- Owned<IRowStream> stream = getRowStream();
- countResult = 0;
- OwnedConstThorRow _rowset = allocator->createRowset((unsigned)rowStreamCount);
- const void **rowset = (const void **)_rowset.get();
- while (countResult < rowStreamCount)
- {
- OwnedConstThorRow row = stream->nextRow();
- rowset[countResult++] = row.getClear();
- }
- result = (byte **)_rowset.getClear();
- }
- virtual const void * getLinkedRowResult()
- {
- assertex(rowStreamCount==1); // catch, just in case
- Owned<IRowStream> stream = getRowStream();
- return stream->nextRow();
- }
- };
- /////
- IThorResult *CThorGraphResults::createResult(CActivityBase &activity, unsigned id, IRowInterfaces *rowIf, bool distributed, unsigned spillPriority)
- {
- Owned<IThorResult> result = ::createResult(activity, rowIf, distributed, spillPriority);
- setResult(id, result);
- return result;
- }
- /////
- IThorResult *createResult(CActivityBase &activity, IRowInterfaces *rowIf, bool distributed, unsigned spillPriority)
- {
- return new CThorGraphResult(activity, rowIf, distributed, spillPriority);
- }
- /////
- class CThorBoundLoopGraph : public CInterface, implements IThorBoundLoopGraph
- {
- CGraphBase *graph;
- activity_id activityId;
- Linked<IOutputMetaData> resultMeta;
- Owned<IOutputMetaData> counterMeta, loopAgainMeta;
- Owned<IRowInterfaces> resultRowIf, countRowIf, loopAgainRowIf;
- public:
- IMPLEMENT_IINTERFACE;
- CThorBoundLoopGraph(CGraphBase *_graph, IOutputMetaData * _resultMeta, unsigned _activityId) : graph(_graph), resultMeta(_resultMeta), activityId(_activityId)
- {
- counterMeta.setown(createFixedSizeMetaData(sizeof(thor_loop_counter_t)));
- loopAgainMeta.setown(createFixedSizeMetaData(sizeof(bool)));
- }
- virtual void prepareCounterResult(CActivityBase &activity, IThorGraphResults *results, unsigned loopCounter, unsigned pos)
- {
- if (!countRowIf)
- countRowIf.setown(createRowInterfaces(counterMeta, activityId, activity.queryCodeContext()));
- RtlDynamicRowBuilder counterRow(countRowIf->queryRowAllocator());
- thor_loop_counter_t * res = (thor_loop_counter_t *)counterRow.ensureCapacity(sizeof(thor_loop_counter_t),NULL);
- *res = loopCounter;
- OwnedConstThorRow counterRowFinal = counterRow.finalizeRowClear(sizeof(thor_loop_counter_t));
- IThorResult *counterResult = results->createResult(activity, pos, countRowIf, false, SPILL_PRIORITY_DISABLE);
- Owned<IRowWriter> counterResultWriter = counterResult->getWriter();
- counterResultWriter->putRow(counterRowFinal.getClear());
- }
- virtual void prepareLoopAgainResult(CActivityBase &activity, IThorGraphResults *results, unsigned pos)
- {
- if (!loopAgainRowIf)
- loopAgainRowIf.setown(createRowInterfaces(loopAgainMeta, activityId, activity.queryCodeContext()));
- activity.queryGraph().createResult(activity, pos, results, loopAgainRowIf, !activity.queryGraph().isLocalChild(), SPILL_PRIORITY_DISABLE);
- }
- virtual void prepareLoopResults(CActivityBase &activity, IThorGraphResults *results)
- {
- if (!resultRowIf)
- resultRowIf.setown(createRowInterfaces(resultMeta, activityId, activity.queryCodeContext()));
- IThorResult *loopResult = results->createResult(activity, 0, resultRowIf, !activity.queryGraph().isLocalChild()); // loop output
- IThorResult *inputResult = results->createResult(activity, 1, resultRowIf, !activity.queryGraph().isLocalChild()); // loop input
- }
- virtual void execute(CActivityBase &activity, unsigned counter, IThorGraphResults *results, IRowWriterMultiReader *inputStream, rowcount_t rowStreamCount, size32_t parentExtractSz, const byte *parentExtract)
- {
- if (counter)
- graph->setLoopCounter(counter);
- Owned<IThorResult> inputResult = results->getResult(1);
- if (inputStream)
- inputResult->setResultStream(inputStream, rowStreamCount);
- graph->executeChild(parentExtractSz, parentExtract, results, NULL);
- }
- virtual void execute(CActivityBase &activity, unsigned counter, IThorGraphResults *graphLoopResults, size32_t parentExtractSz, const byte *parentExtract)
- {
- Owned<IThorGraphResults> results = graph->createThorGraphResults(1);
- if (counter)
- {
- prepareCounterResult(activity, results, counter, 0);
- graph->setLoopCounter(counter);
- }
- try
- {
- graph->executeChild(parentExtractSz, parentExtract, results, graphLoopResults);
- }
- catch (IException *e)
- {
- IThorException *te = QUERYINTERFACE(e, IThorException);
- if (!te)
- {
- Owned<IThorException> e2 = MakeActivityException(&activity, e, "Exception running child graphs");
- e->Release();
- te = e2.getClear();
- }
- else if (!te->queryActivityId())
- setExceptionActivityInfo(activity.queryContainer(), te);
- try { graph->abort(te); }
- catch (IException *abortE)
- {
- Owned<IThorException> e2 = MakeActivityException(&activity, abortE, "Exception whilst aborting graph");
- abortE->Release();
- EXCLOG(e2, NULL);
- }
- graph->queryJob().fireException(te);
- throw te;
- }
- }
- virtual CGraphBase *queryGraph() { return graph; }
- };
- IThorBoundLoopGraph *createBoundLoopGraph(CGraphBase *graph, IOutputMetaData *resultMeta, unsigned activityId)
- {
- return new CThorBoundLoopGraph(graph, resultMeta, activityId);
- }
- ///////////////////////////////////
- bool isDiskInput(ThorActivityKind kind)
- {
- switch (kind)
- {
- case TAKcsvread:
- case TAKxmlread:
- case TAKjsonread:
- case TAKdiskread:
- case TAKdisknormalize:
- case TAKdiskaggregate:
- case TAKdiskcount:
- case TAKdiskgroupaggregate:
- case TAKindexread:
- case TAKindexcount:
- case TAKindexnormalize:
- case TAKindexaggregate:
- case TAKindexgroupaggregate:
- case TAKindexgroupexists:
- case TAKindexgroupcount:
- return true;
- default:
- return false;
- }
- }
- void CIOConnection::connect(unsigned which, CActivityBase *destActivity)
- {
- destActivity->setInput(which, activity->queryActivity(), index);
- }
- ///////////////////////////////////
- CGraphElementBase *createGraphElement(IPropertyTree &node, CGraphBase &owner, CGraphBase *resultsGraph)
- {
- CGraphElementBase *container = NULL;
- ForEachItemIn(m, createFuncs)
- {
- CreateFunc func = (CreateFunc)createFuncs.item(m);
- container = func(node, owner, resultsGraph);
- if (container) break;
- }
- if (NULL == container)
- {
- ThorActivityKind tak = (ThorActivityKind)node.getPropInt("att[@name=\"_kind\"]/@value", TAKnone);
- throw MakeStringException(TE_UnsupportedActivityKind, "Unsupported activity kind: %s", activityKindStr(tak));
- }
- container->setResultsGraph(resultsGraph);
- return container;
- }
- CGraphElementBase::CGraphElementBase(CGraphBase &_owner, IPropertyTree &_xgmml) : owner(&_owner)
- {
- xgmml.setown(createPTreeFromIPT(&_xgmml));
- eclText.set(xgmml->queryProp("att[@name=\"ecl\"]/@value"));
- id = xgmml->getPropInt("@id", 0);
- kind = (ThorActivityKind)xgmml->getPropInt("att[@name=\"_kind\"]/@value", TAKnone);
- sink = isActivitySink(kind);
- bool coLocal = xgmml->getPropBool("att[@name=\"coLocal\"]/@value", false);
- isLocalData = xgmml->getPropBool("att[@name=\"local\"]/@value", false); // local execute + local data access only
- isLocal = isLocalData || coLocal; // local execute
- isGrouped = xgmml->getPropBool("att[@name=\"grouped\"]/@value", false);
- resultsGraph = NULL;
- ownerId = xgmml->getPropInt("att[@name=\"_parentActivity\"]/@value", 0);
- onCreateCalled = onStartCalled = prepared = haveCreateCtx = haveStartCtx = nullAct = false;
- onlyUpdateIfChanged = xgmml->getPropBool("att[@name=\"_updateIfChanged\"]/@value", false);
- StringBuffer helperName("fAc");
- xgmml->getProp("@id", helperName);
- helperFactory = (EclHelperFactory) queryJob().queryDllEntry().getEntry(helperName.str());
- if (!helperFactory)
- throw makeOsExceptionV(GetLastError(), "Failed to load helper factory method: %s (dll handle = %p)", helperName.str(), queryJob().queryDllEntry().getInstance());
- alreadyUpdated = false;
- whichBranch = (unsigned)-1;
- whichBranchBitSet.setown(createThreadSafeBitSet());
- newWhichBranch = false;
- hasNullInput = false;
- log = true;
- sentActInitData.setown(createThreadSafeBitSet());
- }
- CGraphElementBase::~CGraphElementBase()
- {
- activity.clear();
- baseHelper.clear(); // clear before dll is unloaded
- }
- CJobBase &CGraphElementBase::queryJob() const
- {
- return owner->queryJob();
- }
- IGraphTempHandler *CGraphElementBase::queryTempHandler() const
- {
- if (resultsGraph)
- return resultsGraph->queryTempHandler();
- else
- return queryJob().queryTempHandler();
- }
- void CGraphElementBase::releaseIOs()
- {
- loopGraph.clear();
- associatedChildGraphs.kill();
- if (activity)
- activity->releaseIOs();
- connectedInputs.kill();
- inputs.kill();
- outputs.kill();
- activity.clear();
- }
- void CGraphElementBase::addDependsOn(CGraphBase *graph, int controlId)
- {
- ForEachItemIn(i, dependsOn)
- {
- if (dependsOn.item(i).graph == graph)
- return;
- }
- dependsOn.append(*new CGraphDependency(graph, controlId));
- }
- IThorGraphIterator *CGraphElementBase::getAssociatedChildGraphs() const
- {
- return new CGraphArrayIterator(associatedChildGraphs);
- }
- IThorGraphDependencyIterator *CGraphElementBase::getDependsIterator() const
- {
- return new ArrayIIteratorOf<const CGraphDependencyArray, CGraphDependency, IThorGraphDependencyIterator>(dependsOn);
- }
- void CGraphElementBase::reset()
- {
- onStartCalled = false;
- // prepared = false;
- if (activity)
- activity->reset();
- }
- void CGraphElementBase::ActPrintLog(const char *format, ...)
- {
- va_list args;
- va_start(args, format);
- ::ActPrintLogArgs(this, thorlog_null, MCdebugProgress, format, args);
- va_end(args);
- }
- void CGraphElementBase::ActPrintLog(IException *e, const char *format, ...)
- {
- va_list args;
- va_start(args, format);
- ::ActPrintLogArgs(this, e, thorlog_all, MCexception(e), format, args);
- va_end(args);
- }
- void CGraphElementBase::ActPrintLog(IException *e)
- {
- ActPrintLog(e, "%s", "");
- }
- void CGraphElementBase::abort(IException *e)
- {
- CActivityBase *activity = queryActivity();
- if (activity)
- activity->abort();
- Owned<IThorGraphIterator> graphIter = getAssociatedChildGraphs();
- ForEach (*graphIter)
- {
- graphIter->query().abort(e);
- }
- }
- void CGraphElementBase::doconnect()
- {
- ForEachItemIn(i, connectedInputs)
- {
- CIOConnection *io = connectedInputs.item(i);
- if (io)
- io->connect(i, queryActivity());
- }
- }
- void CGraphElementBase::clearConnections()
- {
- connectedInputs.kill();
- connectedOutputs.kill();
- if (activity)
- activity->clearConnections();
- }
- void CGraphElementBase::addInput(unsigned input, CGraphElementBase *inputAct, unsigned inputOutIdx)
- {
- while (inputs.ordinality()<=input) inputs.append(NULL);
- inputs.replace(new COwningSimpleIOConnection(LINK(inputAct), inputOutIdx), input);
- while (inputAct->outputs.ordinality()<=inputOutIdx) inputAct->outputs.append(NULL);
- inputAct->outputs.replace(new CIOConnection(this, input), inputOutIdx);
- }
- void CGraphElementBase::connectInput(unsigned input, CGraphElementBase *inputAct, unsigned inputOutIdx)
- {
- ActPrintLog("CONNECTING (id=%" ACTPF "d, idx=%d) to (id=%" ACTPF "d, idx=%d)", inputAct->queryId(), inputOutIdx, queryId(), input);
- while (connectedInputs.ordinality()<=input) connectedInputs.append(NULL);
- connectedInputs.replace(new COwningSimpleIOConnection(LINK(inputAct), inputOutIdx), input);
- while (inputAct->connectedOutputs.ordinality()<=inputOutIdx) inputAct->connectedOutputs.append(NULL);
- inputAct->connectedOutputs.replace(new CIOConnection(this, input), inputOutIdx);
- }
- void CGraphElementBase::addAssociatedChildGraph(CGraphBase *childGraph)
- {
- if (!associatedChildGraphs.contains(*childGraph))
- associatedChildGraphs.append(*LINK(childGraph));
- }
- void CGraphElementBase::serializeCreateContext(MemoryBuffer &mb)
- {
- if (!onCreateCalled) return;
- DelayedSizeMarker sizeMark(mb);
- queryHelper()->serializeCreateContext(mb);
- sizeMark.write();
- }
- void CGraphElementBase::serializeStartContext(MemoryBuffer &mb)
- {
- if (!onStartCalled) return;
- DelayedSizeMarker sizeMark(mb);
- queryHelper()->serializeStartContext(mb);
- sizeMark.write();
- }
- void CGraphElementBase::deserializeCreateContext(MemoryBuffer &mb)
- {
- size32_t createCtxLen;
- mb.read(createCtxLen);
- createCtxMb.clear().append(createCtxLen, mb.readDirect(createCtxLen));
- haveCreateCtx = true;
- }
- void CGraphElementBase::deserializeStartContext(MemoryBuffer &mb)
- {
- size32_t startCtxLen;
- mb.read(startCtxLen);
- startCtxMb.append(startCtxLen, mb.readDirect(startCtxLen));
- haveStartCtx = true;
- onStartCalled = false; // allow to be called again
- }
- void CGraphElementBase::onCreate()
- {
- CriticalBlock b(crit);
- if (onCreateCalled)
- return;
- onCreateCalled = true;
- baseHelper.setown(helperFactory());
- if (!nullAct)
- {
- CGraphElementBase *ownerActivity = owner->queryOwner() ? owner->queryOwner()->queryElement(ownerId) : NULL;
- if (ownerActivity)
- {
- ownerActivity->onCreate(); // ensure owner created, might not be if this is child query inside another child query.
- baseHelper->onCreate(queryCodeContext(), ownerActivity->queryHelper(), haveCreateCtx?&createCtxMb:NULL);
- }
- else
- baseHelper->onCreate(queryCodeContext(), NULL, haveCreateCtx?&createCtxMb:NULL);
- }
- }
- void CGraphElementBase::onStart(size32_t parentExtractSz, const byte *parentExtract)
- {
- if (onStartCalled)
- return;
- onStartCalled = true;
- if (!nullAct)
- {
- if (haveStartCtx)
- {
- baseHelper->onStart(parentExtract, &startCtxMb);
- startCtxMb.reset();
- haveStartCtx = false;
- }
- else
- baseHelper->onStart(parentExtract, NULL);
- }
- }
- bool CGraphElementBase::executeDependencies(size32_t parentExtractSz, const byte *parentExtract, int controlId, bool async)
- {
- Owned<IThorGraphDependencyIterator> deps = getDependsIterator();
- ForEach(*deps)
- {
- CGraphDependency &dep = deps->query();
- if (dep.controlId == controlId)
- dep.graph->execute(parentExtractSz, parentExtract, true, async);
- if (owner->queryJob().queryAborted() || owner->queryAborted()) return false;
- }
- return true;
- }
- bool CGraphElementBase::prepareContext(size32_t parentExtractSz, const byte *parentExtract, bool checkDependencies, bool shortCircuit, bool async)
- {
- try
- {
- bool _shortCircuit = shortCircuit;
- Owned<IThorGraphDependencyIterator> deps = getDependsIterator();
- bool depsDone = true;
- ForEach(*deps)
- {
- CGraphDependency &dep = deps->query();
- if (0 == dep.controlId && NotFound == owner->dependentSubGraphs.find(*dep.graph))
- {
- owner->dependentSubGraphs.append(*dep.graph);
- if (!dep.graph->isComplete())
- depsDone = false;
- }
- }
- if (depsDone) _shortCircuit = false;
- if (!depsDone && checkDependencies)
- {
- if (!executeDependencies(parentExtractSz, parentExtract, 0, async))
- return false;
- }
- whichBranch = (unsigned)-1;
- hasNullInput = false;
- alreadyUpdated = false;
- switch (getKind())
- {
- case TAKindexwrite:
- case TAKdiskwrite:
- case TAKcsvwrite:
- case TAKxmlwrite:
- case TAKjsonwrite:
- if (_shortCircuit) return true;
- onCreate();
- alreadyUpdated = checkUpdate();
- if (alreadyUpdated)
- return false;
- break;
- case TAKchildif:
- owner->ifs.append(*this);
- // fall through
- case TAKif:
- case TAKifaction:
- {
- if (_shortCircuit) return true;
- onCreate();
- onStart(parentExtractSz, parentExtract);
- IHThorIfArg *helper = (IHThorIfArg *)baseHelper.get();
- whichBranch = helper->getCondition() ? 0 : 1; // True argument precedes false...
- /* NB - The executeDependencies code below is only needed if actionLinkInNewGraph=true, which is no longer the default
- * It should be removed, once we are positive there are no issues with in-line conditional actions
- */
- if (TAKifaction == getKind())
- {
- if (!executeDependencies(parentExtractSz, parentExtract, whichBranch+1, async)) //NB whenId 1 based
- return false;
- }
- if (inputs.queryItem(whichBranch))
- {
- if (!whichBranchBitSet->testSet(whichBranch)) // if not set, new
- newWhichBranch = true;
- return inputs.item(whichBranch)->activity->prepareContext(parentExtractSz, parentExtract, checkDependencies, false, async);
- }
- return true;
- }
- case TAKchildcase:
- owner->ifs.append(*this);
- // fall through
- case TAKcase:
- {
- if (_shortCircuit) return true;
- onCreate();
- onStart(parentExtractSz, parentExtract);
- IHThorCaseArg *helper = (IHThorCaseArg *)baseHelper.get();
- whichBranch = helper->getBranch();
- if (whichBranch >= inputs.ordinality())
- whichBranch = inputs.ordinality()-1;
- if (inputs.queryItem(whichBranch))
- return inputs.item(whichBranch)->activity->prepareContext(parentExtractSz, parentExtract, checkDependencies, false, async);
- return true;
- }
- case TAKfilter:
- case TAKfiltergroup:
- case TAKfilterproject:
- {
- if (_shortCircuit) return true;
- onCreate();
- onStart(parentExtractSz, parentExtract);
- switch (getKind())
- {
- case TAKfilter:
- hasNullInput = !((IHThorFilterArg *)baseHelper.get())->canMatchAny();
- break;
- case TAKfiltergroup:
- hasNullInput = !((IHThorFilterGroupArg *)baseHelper.get())->canMatchAny();
- break;
- case TAKfilterproject:
- hasNullInput = !((IHThorFilterProjectArg *)baseHelper.get())->canMatchAny();
- break;
- }
- if (hasNullInput)
- return true;
- break;
- }
- case TAKsequential:
- case TAKparallel:
- {
- /* NB - The executeDependencies code below is only needed if actionLinkInNewGraph=true, which is no longer the default
- * It should be removed, once we are positive there are no issues with in-line sequential/parallel activities
- */
- for (unsigned s=1; s<=dependsOn.ordinality(); s++)
- {
- if (!executeDependencies(parentExtractSz, parentExtract, s, async))
- return false;
- }
- break;
- }
- case TAKwhen_dataset:
- case TAKwhen_action:
- {
- if (!executeDependencies(parentExtractSz, parentExtract, WhenBeforeId, async))
- return false;
- if (!executeDependencies(parentExtractSz, parentExtract, WhenParallelId, async))
- return false;
- break;
- }
- }
- ForEachItemIn(i, inputs)
- {
- CGraphElementBase *input = inputs.item(i)->activity;
- if (!input->prepareContext(parentExtractSz, parentExtract, checkDependencies, shortCircuit, async))
- return false;
- }
- return true;
- }
- catch (IException *_e)
- {
- IThorException *e = QUERYINTERFACE(_e, IThorException);
- if (e)
- {
- if (!e->queryActivityId())
- setExceptionActivityInfo(*this, e);
- }
- else
- {
- e = MakeActivityException(this, _e);
- _e->Release();
- }
- throw e;
- }
- }
- void CGraphElementBase::preStart(size32_t parentExtractSz, const byte *parentExtract)
- {
- activity->preStart(parentExtractSz, parentExtract);
- }
- void CGraphElementBase::initActivity()
- {
- CriticalBlock b(crit);
- if (isSink())
- owner->addActiveSink(*this);
- if (activity) // no need to recreate
- return;
- activity.setown(factory());
- if (isLoopActivity(*this))
- {
- unsigned loopId = queryXGMML().getPropInt("att[@name=\"_loopid\"]/@value");
- Owned<CGraphBase> childGraph = owner->getChildGraph(loopId);
- Owned<IThorBoundLoopGraph> boundLoopGraph = createBoundLoopGraph(childGraph, baseHelper->queryOutputMeta(), queryId());
- setBoundGraph(boundLoopGraph);
- }
- }
- void CGraphElementBase::createActivity(size32_t parentExtractSz, const byte *parentExtract)
- {
- if (connectedInputs.ordinality()) // ensure not traversed twice (e.g. via splitter)
- return;
- try
- {
- switch (getKind())
- {
- case TAKchildif:
- case TAKchildcase:
- {
- if (inputs.queryItem(whichBranch))
- {
- CGraphElementBase *input = inputs.item(whichBranch)->activity;
- input->createActivity(parentExtractSz, parentExtract);
- }
- onCreate();
- initActivity();
- if (inputs.queryItem(whichBranch))
- {
- CIOConnection *inputIO = inputs.item(whichBranch);
- connectInput(whichBranch, inputIO->activity, inputIO->index);
- }
- break;
- }
- case TAKif:
- case TAKcase:
- if (inputs.queryItem(whichBranch))
- {
- CGraphElementBase *input = inputs.item(whichBranch)->activity;
- input->createActivity(parentExtractSz, parentExtract);
- }
- else
- {
- onCreate();
- if (!activity)
- factorySet(TAKnull);
- }
- break;
- case TAKifaction:
- if (inputs.queryItem(whichBranch))
- {
- CGraphElementBase *input = inputs.item(whichBranch)->activity;
- input->createActivity(parentExtractSz, parentExtract);
- }
- break;
- case TAKsequential:
- case TAKparallel:
- {
- ForEachItemIn(i, inputs)
- {
- if (inputs.queryItem(i))
- {
- CGraphElementBase *input = inputs.item(i)->activity;
- input->createActivity(parentExtractSz, parentExtract);
- }
- }
- break;
- }
- default:
- if (!hasNullInput)
- {
- ForEachItemIn(i, inputs)
- {
- CGraphElementBase *input = inputs.item(i)->activity;
- input->createActivity(parentExtractSz, parentExtract);
- }
- onCreate();
- if (isDiskInput(getKind()))
- onStart(parentExtractSz, parentExtract);
- ForEachItemIn(i2, inputs)
- {
- CIOConnection *inputIO = inputs.item(i2);
- loop
- {
- CGraphElementBase *input = inputIO->activity;
- switch (input->getKind())
- {
- case TAKif:
- case TAKcase:
- {
- if (input->whichBranch >= input->getInputs()) // if, will have TAKnull activity, made at create time.
- {
- input = NULL;
- break;
- }
- inputIO = input->inputs.item(input->whichBranch);
- assertex(inputIO);
- break;
- }
- default:
- input = NULL;
- break;
- }
- if (!input)
- break;
- }
- connectInput(i2, inputIO->activity, inputIO->index);
- }
- }
- initActivity();
- break;
- }
- }
- catch (IException *e) { ActPrintLog(e); activity.clear(); throw; }
- }
- ICodeContext *CGraphElementBase::queryCodeContext()
- {
- return queryOwner().queryCodeContext();
- }
- /////
- // JCSMORE loop - probably need better way to check if any act in graph is global(meaning needs some synchronization between slaves in activity execution)
- bool isGlobalActivity(CGraphElementBase &container)
- {
- switch (container.getKind())
- {
- // always global, but only co-ordinate init/done
- case TAKcsvwrite:
- case TAKxmlwrite:
- case TAKjsonwrite:
- case TAKindexwrite:
- case TAKkeydiff:
- case TAKkeypatch:
- case TAKdictionaryworkunitwrite:
- return true;
- case TAKdiskwrite:
- {
- Owned<IHThorDiskWriteArg> helper = (IHThorDiskWriteArg *)container.helperFactory();
- unsigned flags = helper->getFlags();
- return (0 == (TDXtemporary & flags)); // global if not temporary
- }
- case TAKspill:
- return false;
- case TAKcsvread:
- {
- Owned<IHThorCsvReadArg> helper = (IHThorCsvReadArg *)container.helperFactory();
- // if header lines, then [may] need to co-ordinate across slaves
- if (container.queryOwner().queryOwner() && (!container.queryOwner().isGlobal())) // I am in a child query
- return false;
- return helper->queryCsvParameters()->queryHeaderLen() > 0;
- }
- // dependent on child acts?
- case TAKlooprow:
- case TAKloopcount:
- case TAKgraphloop:
- case TAKparallelgraphloop:
- case TAKloopdataset:
- return false;
- // dependent on local/grouped
- case TAKkeyeddistribute:
- case TAKhashdistribute:
- case TAKhashdistributemerge:
- case TAKworkunitwrite:
- case TAKdistribution:
- case TAKpartition:
- case TAKdiskaggregate:
- case TAKdiskcount:
- case TAKdiskgroupaggregate:
- case TAKindexaggregate:
- case TAKindexcount:
- case TAKindexgroupaggregate:
- case TAKindexgroupexists:
- case TAKindexgroupcount:
- case TAKremoteresult:
- case TAKcountproject:
- case TAKcreaterowlimit:
- case TAKskiplimit:
- case TAKlimit:
- case TAKsort:
- case TAKdedup:
- case TAKjoin:
- case TAKselfjoin:
- case TAKhashjoin:
- case TAKsmartjoin:
- case TAKkeyeddenormalize:
- case TAKhashdenormalize:
- case TAKdenormalize:
- case TAKlookupdenormalize: //GH->JCS why are these here, and join not?
- case TAKalldenormalize:
- case TAKsmartdenormalize:
- case TAKdenormalizegroup:
- case TAKhashdenormalizegroup:
- case TAKlookupdenormalizegroup:
- case TAKkeyeddenormalizegroup:
- case TAKalldenormalizegroup:
- case TAKsmartdenormalizegroup:
- case TAKaggregate:
- case TAKexistsaggregate:
- case TAKcountaggregate:
- case TAKhashaggregate:
- case TAKhashdedup:
- case TAKrollup:
- case TAKiterate:
- case TAKselectn:
- case TAKfirstn:
- case TAKenth:
- case TAKsample:
- case TAKgroup:
- case TAKchoosesets:
- case TAKchoosesetsenth:
- case TAKchoosesetslast:
- case TAKtopn:
- case TAKprocess:
- case TAKchildcount:
- case TAKwhen_dataset:
- case TAKwhen_action:
- case TAKnonempty:
- if (!container.queryLocalOrGrouped())
- return true;
- break;
- case TAKkeyedjoin:
- case TAKalljoin:
- case TAKlookupjoin:
- if (!container.queryLocal())
- return true;
- // always local
- case TAKfilter:
- case TAKfilterproject:
- case TAKfiltergroup:
- case TAKsplit:
- case TAKpipewrite:
- case TAKdegroup:
- case TAKproject:
- case TAKprefetchproject:
- case TAKprefetchcountproject:
- case TAKnormalize:
- case TAKnormalizechild:
- case TAKnormalizelinkedchild:
- case TAKpipethrough:
- case TAKif:
- case TAKchildif:
- case TAKchildcase:
- case TAKcase:
- case TAKparse:
- case TAKpiperead:
- case TAKxmlparse:
- case TAKjoinlight:
- case TAKselfjoinlight:
- case TAKdiskread:
- case TAKdisknormalize:
- case TAKchildaggregate:
- case TAKchildgroupaggregate:
- case TAKchildthroughnormalize:
- case TAKchildnormalize:
- case TAKindexread:
- case TAKindexnormalize:
- case TAKxmlread:
- case TAKjsonread:
- case TAKdiskexists:
- case TAKindexexists:
- case TAKchildexists:
- case TAKthroughaggregate:
- case TAKmerge:
- case TAKfunnel:
- case TAKregroup:
- case TAKcombine:
- case TAKrollupgroup:
- case TAKcombinegroup:
- case TAKsoap_rowdataset:
- case TAKhttp_rowdataset:
- case TAKsoap_rowaction:
- case TAKsoap_datasetdataset:
- case TAKsoap_datasetaction:
- case TAKlinkedrawiterator:
- case TAKchilditerator:
- case TAKstreamediterator:
- case TAKworkunitread:
- case TAKchilddataset:
- case TAKinlinetable:
- case TAKnull:
- case TAKemptyaction:
- case TAKlocalresultread:
- case TAKlocalresultwrite:
- case TAKdictionaryresultwrite:
- case TAKgraphloopresultread:
- case TAKgraphloopresultwrite:
- case TAKnwaygraphloopresultread:
- case TAKapply:
- case TAKsideeffect:
- case TAKsimpleaction:
- case TAKsorted:
- case TAKdistributed:
- case TAKtrace:
- break;
- case TAKnwayjoin:
- case TAKnwaymerge:
- case TAKnwaymergejoin:
- case TAKnwayinput:
- case TAKnwayselect:
- return false; // JCSMORE - I think and/or have to be for now
- // undefined
- case TAKdatasetresult:
- case TAKrowresult:
- case TAKremotegraph:
- case TAKlibrarycall:
- default:
- return true; // if in doubt
- }
- return false;
- }
- bool isLoopActivity(CGraphElementBase &container)
- {
- switch (container.getKind())
- {
- case TAKlooprow:
- case TAKloopcount:
- case TAKloopdataset:
- case TAKgraphloop:
- case TAKparallelgraphloop:
- return true;
- }
- return false;
- }
- /////
- CGraphBase::CGraphBase(CJobBase &_job) : job(_job)
- {
- xgmml = NULL;
- parent = owner = NULL;
- graphId = 0;
- complete = false;
- reinit = false; // should graph reinitialize each time it is called (e.g. in loop graphs)
- // This is currently for 'init' (Create time) info and onStart into
- sentInitData = false;
- // sentStartCtx = false;
- sentStartCtx = true; // JCSMORE - disable for now
- parentActivityId = 0;
- created = connected = started = graphDone = aborted = prepared = false;
- startBarrier = waitBarrier = doneBarrier = NULL;
- mpTag = waitBarrierTag = startBarrierTag = doneBarrierTag = TAG_NULL;
- executeReplyTag = TAG_NULL;
- parentExtractSz = 0;
- counter = 0; // loop/graph counter, will be set by loop/graph activity if needed
- }
- CGraphBase::~CGraphBase()
- {
- clean();
- }
- void CGraphBase::clean()
- {
- ::Release(startBarrier);
- ::Release(waitBarrier);
- ::Release(doneBarrier);
- localResults.clear();
- graphLoopResults.clear();
- childGraphsTable.kill();
- childGraphs.kill();
- disconnectActivities();
- containers.kill();
- sinks.kill();
- activeSinks.kill();
- }
- void CGraphBase::serializeCreateContexts(MemoryBuffer &mb)
- {
- DelayedSizeMarker sizeMark(mb);
- Owned<IThorActivityIterator> iter = getIterator();
- ForEach (*iter)
- {
- CGraphElementBase &element = iter->query();
- if (element.isOnCreated())
- {
- mb.append(element.queryId());
- element.serializeCreateContext(mb);
- }
- }
- mb.append((activity_id)0);
- sizeMark.write();
- }
- void CGraphBase::serializeStartContexts(MemoryBuffer &mb)
- {
- DelayedSizeMarker sizeMark(mb);
- Owned<IThorActivityIterator> iter = getIterator();
- ForEach (*iter)
- {
- CGraphElementBase &element = iter->query();
- if (element.isOnStarted())
- {
- mb.append(element.queryId());
- element.serializeStartContext(mb);
- }
- }
- mb.append((activity_id)0);
- sizeMark.write();
- }
- void CGraphBase::deserializeCreateContexts(MemoryBuffer &mb)
- {
- activity_id id;
- loop
- {
- mb.read(id);
- if (0 == id) break;
- CGraphElementBase *element = queryElement(id);
- assertex(element);
- element->deserializeCreateContext(mb);
- }
- }
- void CGraphBase::deserializeStartContexts(MemoryBuffer &mb)
- {
- activity_id id;
- loop
- {
- mb.read(id);
- if (0 == id) break;
- CGraphElementBase *element = queryElement(id);
- assertex(element);
- element->deserializeStartContext(mb);
- }
- }
- void CGraphBase::reset()
- {
- setCompleteEx(false);
- graphCancelHandler.reset();
- if (0 == containers.count())
- {
- SuperHashIteratorOf<CGraphBase> iter(childGraphsTable);
- ForEach(iter)
- iter.query().reset();
- }
- else
- {
- Owned<IThorActivityIterator> iter = getIterator();
- ForEach(*iter)
- {
- CGraphElementBase &element = iter->query();
- element.reset();
- }
- dependentSubGraphs.kill();
- }
- if (!queryOwner() || isGlobal())
- job.queryTimeReporter().reset();
- if (!queryOwner())
- clearNodeStats();
- }
- void CGraphBase::addChildGraph(CGraphBase &graph)
- {
- CriticalBlock b(crit);
- childGraphs.append(graph);
- childGraphsTable.replace(graph);
- job.associateGraph(graph);
- }
- IThorGraphIterator *CGraphBase::getChildGraphs() const
- {
- CriticalBlock b(crit);
- return new CGraphArrayCopyIterator(childGraphs);
- }
- bool CGraphBase::fireException(IException *e)
- {
- return job.fireException(e);
- }
- bool CGraphBase::preStart(size32_t parentExtractSz, const byte *parentExtract)
- {
- Owned<IThorActivityIterator> iter = getConnectedIterator();
- ForEach(*iter)
- {
- CGraphElementBase &element = iter->query();
- element.preStart(parentExtractSz, parentExtract);
- }
- return true;
- }
- void CGraphBase::executeSubGraph(size32_t parentExtractSz, const byte *parentExtract)
- {
- CriticalBlock b(executeCrit);
- if (job.queryPausing())
- return;
- Owned<IException> exception;
- try
- {
- if (!prepare(parentExtractSz, parentExtract, false, false, false))
- {
- setCompleteEx();
- return;
- }
- try
- {
- if (!queryOwner())
- {
- StringBuffer s;
- toXML(&queryXGMML(), s, 2);
- GraphPrintLog("Running graph [%s] : %s", isGlobal()?"global":"local", s.str());
- }
- create(parentExtractSz, parentExtract);
- }
- catch (IException *e)
- {
- Owned<IThorException> e2 = MakeThorException(e);
- e2->setGraphId(graphId);
- e2->setAction(tea_abort);
- job.fireException(e2);
- throw;
- }
- if (localResults)
- localResults->clear();
- doExecute(parentExtractSz, parentExtract, false);
- }
- catch (IException *e)
- {
- GraphPrintLog(e);
- exception.setown(e);
- }
- if (!queryOwner())
- {
- GraphPrintLog("Graph Done");
- StringBuffer memStr;
- getSystemTraceInfo(memStr, PerfMonStandard | PerfMonExtended);
- GraphPrintLog("%s", memStr.str());
- }
- if (exception)
- throw exception.getClear();
- }
- void CGraphBase::execute(size32_t _parentExtractSz, const byte *parentExtract, bool checkDependencies, bool async)
- {
- parentExtractSz = _parentExtractSz;
- if (isComplete())
- return;
- if (async)
- queryJob().startGraph(*this, queryJob(), checkDependencies, parentExtractSz, parentExtract); // may block if enough running
- else
- {
- if (!prepare(parentExtractSz, parentExtract, checkDependencies, async, async))
- {
- setComplete();
- return;
- }
- executeSubGraph(parentExtractSz, parentExtract);
- }
- }
- void CGraphBase::doExecute(size32_t parentExtractSz, const byte *parentExtract, bool checkDependencies)
- {
- if (isComplete()) return;
- if (queryAborted())
- {
- if (abortException)
- throw abortException.getLink();
- throw MakeGraphException(this, 0, "subgraph aborted(1)");
- }
- if (!prepare(parentExtractSz, parentExtract, checkDependencies, false, false))
- {
- setComplete();
- return;
- }
- if (queryAborted())
- {
- if (abortException)
- throw abortException.getLink();
- throw MakeGraphException(this, 0, "subgraph aborted(2)");
- }
- Owned<IException> exception;
- try
- {
- if (started)
- reset();
- Owned<IThorActivityIterator> iter = getConnectedIterator();
- ForEach(*iter)
- {
- CGraphElementBase &element = iter->query();
- element.onStart(parentExtractSz, parentExtract);
- }
- if (!preStart(parentExtractSz, parentExtract)) return;
- start();
- if (!wait(aborted?MEDIUMTIMEOUT:INFINITE)) // can't wait indefinitely, query may have aborted and stall, but prudent to wait a short time for underlying graphs to unwind.
- GraphPrintLogEx(this, thorlog_null, MCuserWarning, "Graph wait cancelled, aborted=%s", aborted?"true":"false");
- else
- graphDone = true;
- }
- catch (IException *e)
- {
- GraphPrintLog(e);
- exception.setown(e);
- }
- try
- {
- if (!exception && abortException)
- exception.setown(abortException.getClear());
- if (exception)
- {
- if (NULL == owner || isGlobal())
- waitBarrier->cancel(exception);
- if (!queryOwner())
- {
- StringBuffer str;
- Owned<IThorException> e = MakeGraphException(this, exception->errorCode(), "%s", exception->errorMessage(str).str());
- e->setGraphId(graphId);
- e->setAction(tea_abort);
- fireException(e);
- }
- }
- }
- catch (IException *e)
- {
- GraphPrintLog(e, "during abort()");
- e->Release();
- }
- try
- {
- done();
- if (doneBarrier)
- doneBarrier->wait(false);
- }
- catch (IException *e)
- {
- GraphPrintLog(e);
- if (!exception.get())
- exception.setown(e);
- else
- e->Release();
- }
- end();
- if (exception)
- throw exception.getClear();
- if (!queryAborted())
- setComplete();
- }
- bool CGraphBase::prepare(size32_t parentExtractSz, const byte *parentExtract, bool checkDependencies, bool shortCircuit, bool async)
- {
- if (isComplete()) return false;
- bool needToExecute = false;
- ifs.kill();
- ForEachItemIn(s, sinks)
- {
- CGraphElementBase &sink = sinks.item(s);
- if (sink.prepareContext(parentExtractSz, parentExtract, checkDependencies, shortCircuit, async))
- needToExecute = true;
- }
- // prepared = true;
- return needToExecute;
- }
- void CGraphBase::create(size32_t parentExtractSz, const byte *parentExtract)
- {
- Owned<IThorActivityIterator> iter = getIterator();
- ForEach(*iter)
- {
- CGraphElementBase &element = iter->query();
- element.clearConnections();
- }
- activeSinks.kill(); // NB: activeSinks are added to during activity creation
- ForEachItemIn(s, sinks)
- {
- CGraphElementBase &sink = sinks.item(s);
- sink.createActivity(parentExtractSz, parentExtract);
- }
- created = true;
- }
- void CGraphBase::done()
- {
- if (aborted) return; // activity done methods only called on success
- Owned<IThorActivityIterator> iter = getConnectedIterator();
- ForEach (*iter)
- {
- CGraphElementBase &element = iter->query();
- element.queryActivity()->done();
- }
- }
- bool CGraphBase::syncInitData()
- {
- CGraphElementBase *parentElement = queryOwner() ? queryOwner()->queryElement(queryParentActivityId()) : NULL;
- if (parentElement && isLoopActivity(*parentElement))
- return parentElement->queryLoopGraph()->queryGraph()->isGlobal();
- else
- return !isLocalChild();
- }
- void CGraphBase::end()
- {
- // always called, any final action clear up
- Owned<IThorActivityIterator> iter = getIterator();
- ForEach(*iter)
- {
- CGraphElementBase &element = iter->query();
- try
- {
- if (element.queryActivity())
- element.queryActivity()->kill();
- }
- catch (IException *e)
- {
- Owned<IException> e2 = MakeActivityException(element.queryActivity(), e, "Error calling kill()");
- GraphPrintLog(e2);
- e->Release();
- }
- }
- }
- class CGraphTraverseIteratorBase : public CInterface, implements IThorActivityIterator
- {
- protected:
- CGraphBase &graph;
- Linked<CGraphElementBase> cur;
- CIArrayOf<CGraphElementBase> others;
- CGraphElementArrayCopy covered;
- CGraphElementBase *popNext()
- {
- if (!others.ordinality())
- {
- cur.clear();
- return NULL;
- }
- cur.setown(&others.popGet());
- return cur;
- }
- CGraphElementBase *setNext(CIOConnectionArray &inputs, unsigned whichInput=((unsigned)-1))
- {
- cur.clear();
- unsigned n = inputs.ordinality();
- if (((unsigned)-1) != whichInput)
- {
- CIOConnection *io = inputs.queryItem(whichInput);
- if (io)
- cur.set(io->activity);
- }
- else
- {
- bool first = true;
- unsigned i=0;
- for (; i<n; i++)
- {
- CIOConnection *io = inputs.queryItem(i);
- if (io)
- {
- if (first)
- {
- first = false;
- cur.set(io->activity);
- }
- else
- others.append(*LINK(io->activity));
- }
- }
- }
- if (!cur)
- {
- if (!popNext())
- return NULL;
- }
- // check haven't been here before
- loop
- {
- if (cur->getOutputs() < 2)
- break;
- else if (NotFound == covered.find(*cur))
- {
- if (!cur->alreadyUpdated)
- {
- covered.append(*cur);
- break;
- }
- }
- if (!popNext())
- return NULL;
- }
- return cur.get();
- }
- public:
- IMPLEMENT_IINTERFACE;
- CGraphTraverseIteratorBase(CGraphBase &_graph) : graph(_graph)
- {
- }
- virtual bool first()
- {
- covered.kill();
- others.kill();
- cur.clear();
- Owned<IThorActivityIterator> sinkIter = graph.getSinkIterator();
- if (!sinkIter->first())
- return false;
- loop
- {
- cur.set(& sinkIter->query());
- if (!cur->alreadyUpdated)
- break;
- if (!sinkIter->next())
- return false;
- }
- while (sinkIter->next())
- others.append(sinkIter->get());
- return true;
- }
- virtual bool isValid() { return NULL != cur.get(); }
- virtual CGraphElementBase & query() { return *cur; }
- CGraphElementBase & get() { return *LINK(cur); }
- };
- class CGraphTraverseConnectedIterator : public CGraphTraverseIteratorBase
- {
- public:
- CGraphTraverseConnectedIterator(CGraphBase &graph) : CGraphTraverseIteratorBase(graph) { }
- virtual bool next()
- {
- if (cur->hasNullInput)
- {
- do
- {
- if (!popNext())
- return false;
- }
- while (cur->hasNullInput);
- }
- else
- setNext(cur->connectedInputs);
- return NULL!=cur.get();
- }
- };
- IThorActivityIterator *CGraphBase::getConnectedIterator()
- {
- return new CGraphTraverseConnectedIterator(*this);
- }
- bool CGraphBase::wait(unsigned timeout)
- {
- CTimeMon tm(timeout);
- unsigned remaining = timeout;
- class CWaitException
- {
- CGraphBase *graph;
- Owned<IException> exception;
- public:
- CWaitException(CGraphBase *_graph) : graph(_graph) { }
- IException *get() { return exception; }
- void set(IException *e)
- {
- if (!exception)
- exception.setown(e);
- else
- e->Release();
- }
- void throwException()
- {
- if (exception)
- throw exception.getClear();
- throw MakeGraphException(graph, 0, "Timed out waiting for graph to end");
- }
- } waitException(this);
- Owned<IThorActivityIterator> iter = getConnectedIterator();
- ForEach (*iter)
- {
- CGraphElementBase &element = iter->query();
- CActivityBase *activity = element.queryActivity();
- if (INFINITE != timeout && tm.timedout(&remaining))
- waitException.throwException();
- try
- {
- if (!activity->wait(remaining))
- waitException.throwException();
- }
- catch (IException *e)
- {
- waitException.set(e); // will discard if already set
- if (timeout == INFINITE)
- {
- unsigned e = tm.elapsed();
- if (e >= MEDIUMTIMEOUT)
- waitException.throwException();
- timeout = MEDIUMTIMEOUT-e;
- tm.reset(timeout);
- }
- }
- }
- if (waitException.get())
- waitException.throwException();
- // synchronize all slaves to end of graphs
- if (NULL == owner || isGlobal())
- {
- if (INFINITE != timeout && tm.timedout(&remaining))
- waitException.throwException();
- if (!waitBarrier->wait(true, remaining))
- return false;
- }
- return true;
- }
- void CGraphBase::abort(IException *e)
- {
- if (aborted)
- return;
- {
- CriticalBlock cb(crit);
- abortException.set(e);
- aborted = true;
- graphCancelHandler.cancel(0);
- if (0 == containers.count())
- {
- Owned<IThorGraphIterator> iter = getChildGraphs();
- ForEach(*iter)
- {
- CGraphBase &graph = iter->query();
- graph.abort(e);
- }
- }
- }
- if (started && !graphDone)
- {
- Owned<IThorActivityIterator> iter = getConnectedIterator();
- ForEach (*iter)
- {
- iter->query().abort(e); // JCSMORE - could do in parallel, they can take some time to timeout
- }
- if (startBarrier)
- startBarrier->cancel(e);
- if (waitBarrier)
- waitBarrier->cancel(e);
- if (doneBarrier)
- doneBarrier->cancel(e);
- }
- }
- void CGraphBase::GraphPrintLog(const char *format, ...)
- {
- va_list args;
- va_start(args, format);
- ::GraphPrintLogArgs(this, thorlog_null, MCdebugProgress, format, args);
- va_end(args);
- }
- void CGraphBase::GraphPrintLog(IException *e, const char *format, ...)
- {
- va_list args;
- va_start(args, format);
- ::GraphPrintLogArgs(this, e, thorlog_null, MCdebugProgress, format, args);
- va_end(args);
- }
- void CGraphBase::GraphPrintLog(IException *e)
- {
- GraphPrintLog(e, "%s", "");
- }
- void CGraphBase::setLogging(bool tf)
- {
- Owned<IThorActivityIterator> iter = getIterator();
- ForEach(*iter)
- iter->query().setLogging(tf);
- }
- void CGraphBase::createFromXGMML(IPropertyTree *_node, CGraphBase *_owner, CGraphBase *_parent, CGraphBase *resultsGraph)
- {
- owner = _owner;
- parent = _parent?_parent:owner;
- node.setown(createPTreeFromIPT(_node));
- xgmml = node->queryPropTree("att/graph");
- sink = xgmml->getPropBool("att[@name=\"rootGraph\"]/@value", false);
- sequential = xgmml->getPropBool("@sequential");
- graphId = node->getPropInt("@id");
- global = false;
- localOnly = -1; // unset
- parentActivityId = node->getPropInt("att[@name=\"_parentActivity\"]/@value", 0);
- CGraphBase *graphContainer = this;
- if (resultsGraph)
- graphContainer = resultsGraph; // JCSMORE is this right?
- graphCodeContext.setContext(graphContainer, (ICodeContextExt *)&job.queryCodeContext());
- unsigned numResults = xgmml->getPropInt("att[@name=\"_numResults\"]/@value", 0);
- if (numResults)
- {
- localResults.setown(createThorGraphResults(numResults));
- resultsGraph = this;
- // JCSMORE - it might more sense if this temp handler was owned by parent act., which may finish(get stopped) earlier than the owning graph
- tmpHandler.setown(queryJob().createTempHandler(false));
- }
- localChild = false;
- if (owner && parentActivityId)
- {
- CGraphElementBase *parentElement = owner->queryElement(parentActivityId);
- parentElement->addAssociatedChildGraph(this);
- if (isLoopActivity(*parentElement))
- localChild = parentElement->queryOwner().isLocalChild();
- else
- localChild = true;
- }
- Owned<IPropertyTreeIterator> nodes = xgmml->getElements("node");
- ForEach(*nodes)
- {
- IPropertyTree &e = nodes->query();
- ThorActivityKind kind = (ThorActivityKind) e.getPropInt("att[@name=\"_kind\"]/@value");
- if (TAKsubgraph == kind)
- {
- Owned<CGraphBase> subGraph = job.createGraph();
- subGraph->createFromXGMML(&e, this, parent, resultsGraph);
- addChildGraph(*LINK(subGraph));
- if (!global)
- global = subGraph->isGlobal();
- }
- else
- {
- if (localChild && !e.getPropBool("att[@name=\"coLocal\"]/@value", false))
- {
- IPropertyTree *att = createPTree("att");
- att->setProp("@name", "coLocal");
- att->setPropBool("@value", true);
- e.addPropTree("att", att);
- }
- CGraphElementBase *act = createGraphElement(e, *this, resultsGraph);
- addActivity(act);
- if (!global)
- global = isGlobalActivity(*act);
- }
- }
- Owned<IPropertyTreeIterator> edges = xgmml->getElements("edge");
- ForEach(*edges)
- {
- IPropertyTree &edge = edges->query();
- unsigned sourceOutput = edge.getPropInt("att[@name=\"_sourceIndex\"]/@value", 0);
- unsigned targetInput = edge.getPropInt("att[@name=\"_targetIndex\"]/@value", 0);
- CGraphElementBase *source = queryElement(edge.getPropInt("@source"));
- CGraphElementBase *target = queryElement(edge.getPropInt("@target"));
- target->addInput(targetInput, source, sourceOutput);
- }
- Owned<IThorActivityIterator> iter = getIterator();
- ForEach(*iter)
- {
- CGraphElementBase &element = iter->query();
- if (0 == element.getOutputs())
- {
- /* JCSMORE - Making some outputs conditional, will require:
- * a) Pass through information as to which dependent graph causes this graph (and this sink) to execute)
- * b) Allow the subgraph to re-executed by other dependent subgraphs and avoid re-executing completed sinks
- * c) Keep common points (splitters) around (preferably in memory), re-execution of graph will need them
- */
- sinks.append(*LINK(&element));
- }
- }
- init();
- }
- void CGraphBase::executeChildGraphs(size32_t parentExtractSz, const byte *parentExtract)
- {
- // JCSMORE - would need to respect codegen 'sequential' flag, if these child graphs
- // could be executed in parallel.
- Owned<IThorGraphIterator> iter = getChildGraphs();
- ForEach(*iter)
- {
- CGraphBase &graph = iter->query();
- if (graph.isSink())
- graph.execute(parentExtractSz, parentExtract, true, false);
- }
- }
- void CGraphBase::doExecuteChild(size32_t parentExtractSz, const byte *parentExtract)
- {
- reset();
- if (0 == containers.count())
- executeChildGraphs(parentExtractSz, parentExtract);
- else
- execute(parentExtractSz, parentExtract, false, false);
- queryTempHandler()->clearTemps();
- }
- void CGraphBase::executeChild(size32_t & retSize, void * &ret, size32_t parentExtractSz, const byte *parentExtract)
- {
- reset();
- doExecute(parentExtractSz, parentExtract, false);
- UNIMPLEMENTED;
- /*
- ForEachItemIn(idx1, elements)
- {
- EclGraphElement & cur = elements.item(idx1);
- if (cur.isResult)
- {
- cur.extractResult(retSize, ret);
- return;
- }
- }
- */
- throwUnexpected();
- }
- void CGraphBase::setResults(IThorGraphResults *results) // used by master only
- {
- localResults.set(results);
- }
- void CGraphBase::executeChild(size32_t parentExtractSz, const byte *parentExtract, IThorGraphResults *results, IThorGraphResults *_graphLoopResults)
- {
- localResults.set(results);
- graphLoopResults.set(_graphLoopResults);
- doExecuteChild(parentExtractSz, parentExtract);
- graphLoopResults.clear();
- localResults.clear();
- }
- StringBuffer &getGlobals(CGraphBase &graph, StringBuffer &str)
- {
- bool first = true;
- Owned<IThorActivityIterator> iter = graph.getIterator();
- ForEach(*iter)
- {
- CGraphElementBase &e = iter->query();
- if (isGlobalActivity(e))
- {
- if (first)
- str.append("Graph(").append(graph.queryGraphId()).append("): [");
- else
- str.append(", ");
- first = false;
- ThorActivityKind kind = e.getKind();
- str.append(activityKindStr(kind));
- str.append("(").append(e.queryId()).append(")");
- }
- }
- if (!first)
- str.append("]");
- Owned<IThorGraphIterator> childIter = graph.getChildGraphs();
- ForEach(*childIter)
- {
- CGraphBase &childGraph = childIter->query();
- getGlobals(childGraph, str);
- }
- return str;
- }
- void CGraphBase::executeChild(size32_t parentExtractSz, const byte *parentExtract)
- {
- assertex(localResults);
- localResults->clear();
- if (isGlobal()) // any slave
- {
- StringBuffer str("Global acts = ");
- getGlobals(*this, str);
- throw MakeGraphException(this, 0, "Global child graph? : %s", str.str());
- }
- doExecuteChild(parentExtractSz, parentExtract);
- }
- IThorResult *CGraphBase::getResult(unsigned id, bool distributed)
- {
- return localResults->getResult(id, distributed);
- }
- IThorResult *CGraphBase::getGraphLoopResult(unsigned id, bool distributed)
- {
- return graphLoopResults->getResult(id, distributed);
- }
- IThorResult *CGraphBase::createResult(CActivityBase &activity, unsigned id, IThorGraphResults *results, IRowInterfaces *rowIf, bool distributed, unsigned spillPriority)
- {
- return results->createResult(activity, id, rowIf, distributed, spillPriority);
- }
- IThorResult *CGraphBase::createResult(CActivityBase &activity, unsigned id, IRowInterfaces *rowIf, bool distributed, unsigned spillPriority)
- {
- return localResults->createResult(activity, id, rowIf, distributed, spillPriority);
- }
- IThorResult *CGraphBase::createGraphLoopResult(CActivityBase &activity, IRowInterfaces *rowIf, bool distributed, unsigned spillPriority)
- {
- return graphLoopResults->createResult(activity, rowIf, distributed, spillPriority);
- }
- // IEclGraphResults
- void CGraphBase::getDictionaryResult(unsigned & count, byte * * & ret, unsigned id)
- {
- Owned<IThorResult> result = getResult(id, true); // will get collated distributed result
- result->getLinkedResult(count, ret);
- }
- void CGraphBase::getLinkedResult(unsigned & count, byte * * & ret, unsigned id)
- {
- Owned<IThorResult> result = getResult(id, true); // will get collated distributed result
- result->getLinkedResult(count, ret);
- }
- const void * CGraphBase::getLinkedRowResult(unsigned id)
- {
- Owned<IThorResult> result = getResult(id, true); // will get collated distributed result
- return result->getLinkedRowResult();
- }
- // IThorChildGraph impl.
- IEclGraphResults *CGraphBase::evaluate(unsigned _parentExtractSz, const byte *parentExtract)
- {
- CriticalBlock block(evaluateCrit);
- localResults.setown(createThorGraphResults(xgmml->getPropInt("att[@name=\"_numResults\"]/@value", 0)));
- parentExtractSz = _parentExtractSz;
- executeChild(parentExtractSz, parentExtract);
- return localResults.getClear();
- }
- static bool isLocalOnly(const CGraphElementBase &activity);
- static bool isLocalOnly(const CGraphBase &graph) // checks all dependencies, if something needs to be global, whole body is forced to be execution sync.
- {
- if (0 == graph.activityCount())
- {
- Owned<IThorGraphIterator> iter = graph.getChildGraphs();
- ForEach(*iter)
- {
- CGraphBase &childGraph = iter->query();
- if (childGraph.isSink())
- {
- if (!isLocalOnly(childGraph))
- return false;
- }
- }
- }
- else
- {
- if (graph.isGlobal())
- return false;
- Owned<IThorActivityIterator> sinkIter = graph.getAllSinkIterator();
- ForEach(*sinkIter)
- {
- CGraphElementBase &sink = sinkIter->query();
- if (!isLocalOnly(sink))
- return false;
- }
- }
- return true;
- }
- static bool isLocalOnly(const CGraphElementBase &activity)
- {
- Owned<IThorGraphDependencyIterator> deps = activity.getDependsIterator();
- ForEach(*deps)
- {
- if (!isLocalOnly(*(deps->query().graph)))
- return false;
- }
- StringBuffer match("edge[@target=\"");
- match.append(activity.queryId()).append("\"]");
- Owned<IPropertyTreeIterator> inputs = activity.queryOwner().queryXGMML().getElements(match.str());
- ForEach(*inputs)
- {
- CGraphElementBase *sourceAct = activity.queryOwner().queryElement(inputs->query().getPropInt("@source"));
- if (!isLocalOnly(*sourceAct))
- return false;
- }
- return true;
- }
- bool CGraphBase::isLocalOnly() const // checks all dependencies, if something needs to be global, whole body is forced to be execution sync.
- {
- if (-1 == localOnly)
- localOnly = (int)::isLocalOnly(*this);
- return 1==localOnly;
- }
- IThorGraphResults *CGraphBase::createThorGraphResults(unsigned num)
- {
- return new CThorGraphResults(num);
- }
- ////
- void CGraphTempHandler::registerFile(const char *name, graph_id graphId, unsigned usageCount, bool temp, WUFileKind fileKind, StringArray *clusters)
- {
- assertex(temp);
- LOG(MCdebugProgress, thorJob, "registerTmpFile name=%s, usageCount=%d", name, usageCount);
- CriticalBlock b(crit);
- if (tmpFiles.find(name))
- throw MakeThorException(TE_FileAlreadyUsedAsTempFile, "File already used as temp file (%s)", name);
- tmpFiles.replace(* new CFileUsageEntry(name, graphId, fileKind, usageCount));
- }
- void CGraphTempHandler::deregisterFile(const char *name, bool kept)
- {
- LOG(MCdebugProgress, thorJob, "deregisterTmpFile name=%s", name);
- CriticalBlock b(crit);
- CFileUsageEntry *fileUsage = tmpFiles.find(name);
- if (!fileUsage)
- {
- if (errorOnMissing)
- throw MakeThorException(TE_FileNotFound, "File not found (%s) deregistering tmp file", name);
- return;
- }
- if (0 == fileUsage->queryUsage()) // marked 'not to be deleted' until workunit complete.
- return;
- else if (1 == fileUsage->queryUsage())
- {
- tmpFiles.remove(name);
- try
- {
- if (!removeTemp(name))
- LOG(MCwarning, unknownJob, "Failed to delete tmp file : %s (not found)", name);
- }
- catch (IException *e) { StringBuffer s("Failed to delete tmp file : "); FLLOG(MCwarning, thorJob, e, s.append(name).str()); }
- }
- else
- fileUsage->decUsage();
- }
- void CGraphTempHandler::clearTemps()
- {
- CriticalBlock b(crit);
- Owned<IFileUsageIterator> iter = getIterator();
- ForEach(*iter)
- {
- CFileUsageEntry &entry = iter->query();
- const char *tmpname = entry.queryName();
- try
- {
- if (!removeTemp(tmpname))
- LOG(MCwarning, thorJob, "Failed to delete tmp file : %s (not found)", tmpname);
- }
- catch (IException *e) { StringBuffer s("Failed to delete tmp file : "); FLLOG(MCwarning, thorJob, e, s.append(tmpname).str()); }
- }
- iter.clear();
- tmpFiles.kill();
- }
- /////
- class CGraphExecutor;
- class CGraphExecutorGraphInfo : public CInterface
- {
- public:
- CGraphExecutorGraphInfo(CGraphExecutor &_executor, CGraphBase *_subGraph, IGraphCallback &_callback, const byte *parentExtract, size32_t parentExtractSz) : executor(_executor), subGraph(_subGraph), callback(_callback)
- {
- parentExtractMb.append(parentExtractSz, parentExtract);
- }
- CGraphExecutor &executor;
- IGraphCallback &callback;
- Linked<CGraphBase> subGraph;
- MemoryBuffer parentExtractMb;
- };
- class CGraphExecutor : public CInterface, implements IGraphExecutor
- {
- CJobBase &job;
- CIArrayOf<CGraphExecutorGraphInfo> stack, running, toRun;
- UnsignedArray seen;
- bool stopped;
- unsigned limit;
- unsigned waitOnRunning;
- CriticalSection crit;
- Semaphore runningSem;
- Owned<IThreadPool> graphPool;
- class CGraphExecutorFactory : public CInterface, implements IThreadFactory
- {
- CGraphExecutor &executor;
- public:
- IMPLEMENT_IINTERFACE;
- CGraphExecutorFactory(CGraphExecutor &_executor) : executor(_executor) { }
- // IThreadFactory
- virtual IPooledThread *createNew()
- {
- class CGraphExecutorThread : public CInterface, implements IPooledThread
- {
- Owned<CGraphExecutorGraphInfo> graphInfo;
- public:
- IMPLEMENT_IINTERFACE;
- CGraphExecutorThread()
- {
- }
- void init(void *startInfo)
- {
- graphInfo.setown((CGraphExecutorGraphInfo *)startInfo);
- }
- void main()
- {
- loop
- {
- Linked<CGraphBase> graph = graphInfo->subGraph;
- Owned<IException> e;
- try
- {
- PROGLOG("CGraphExecutor: Running graph, graphId=%" GIDPF "d", graph->queryGraphId());
- graphInfo->callback.runSubgraph(*graph, graphInfo->parentExtractMb.length(), (const byte *)graphInfo->parentExtractMb.toByteArray());
- }
- catch (IException *_e)
- {
- e.setown(_e);
- }
- Owned<CGraphExecutorGraphInfo> nextGraphInfo;
- try
- {
- nextGraphInfo.setown(graphInfo->executor.graphDone(*graphInfo, e));
- }
- catch (IException *e)
- {
- GraphPrintLog(graph, e, "graphDone");
- e->Release();
- }
- if (e)
- throw e.getClear();
- if (!nextGraphInfo)
- return;
- graphInfo.setown(nextGraphInfo.getClear());
- }
- }
- bool canReuse() { return true; }
- bool stop() { return true; }
- };
- return new CGraphExecutorThread();
- }
- } *factory;
- CGraphExecutorGraphInfo *findRunning(graph_id gid)
- {
- ForEachItemIn(r, running)
- {
- CGraphExecutorGraphInfo *graphInfo = &running.item(r);
- if (gid == graphInfo->subGraph->queryGraphId())
- return graphInfo;
- }
- return NULL;
- }
- public:
- IMPLEMENT_IINTERFACE;
- CGraphExecutor(CJobBase &_job) : job(_job)
- {
- limit = (unsigned)job.getWorkUnitValueInt("concurrentSubGraphs", globals->getPropInt("@concurrentSubGraphs", 1));
- PROGLOG("CGraphExecutor: limit = %d", limit);
- waitOnRunning = 0;
- stopped = false;
- factory = new CGraphExecutorFactory(*this);
- graphPool.setown(createThreadPool("CGraphExecutor pool", factory, &job, limit));
- }
- ~CGraphExecutor()
- {
- stopped = true;
- graphPool->joinAll();
- factory->Release();
- }
- CGraphExecutorGraphInfo *graphDone(CGraphExecutorGraphInfo &doneGraphInfo, IException *e)
- {
- CriticalBlock b(crit);
- running.zap(doneGraphInfo);
- if (waitOnRunning)
- {
- runningSem.signal(waitOnRunning);
- waitOnRunning = 0;
- }
- if (e || job.queryAborted())
- {
- stopped = true;
- stack.kill();
- return NULL;
- }
- if (job.queryPausing())
- stack.kill();
- else if (stack.ordinality())
- {
- CICopyArrayOf<CGraphExecutorGraphInfo> toMove;
- ForEachItemIn(s, stack)
- {
- bool dependenciesDone = true;
- CGraphExecutorGraphInfo &graphInfo = stack.item(s);
- ForEachItemIn (d, graphInfo.subGraph->dependentSubGraphs)
- {
- CGraphBase &subGraph = graphInfo.subGraph->dependentSubGraphs.item(d);
- if (!subGraph.isComplete())
- {
- dependenciesDone = false;
- break;
- }
- }
- if (dependenciesDone)
- {
- graphInfo.subGraph->dependentSubGraphs.kill();
- graphInfo.subGraph->prepare(graphInfo.parentExtractMb.length(), (const byte *)graphInfo.parentExtractMb.toByteArray(), true, true, true); // now existing deps done, maybe more to prepare
- ForEachItemIn (d, graphInfo.subGraph->dependentSubGraphs)
- {
- CGraphBase &subGraph = graphInfo.subGraph->dependentSubGraphs.item(d);
- if (!subGraph.isComplete())
- {
- dependenciesDone = false;
- break;
- }
- }
- if (dependenciesDone)
- {
- graphInfo.subGraph->dependentSubGraphs.kill(); // none to track anymore
- toMove.append(graphInfo);
- }
- }
- }
- ForEachItemIn(m, toMove)
- {
- Linked<CGraphExecutorGraphInfo> graphInfo = &toMove.item(m);
- stack.zap(*graphInfo);
- toRun.add(*graphInfo.getClear(), 0);
- }
- }
- job.markWuDirty();
- PROGLOG("CGraphExecutor running=%d, waitingToRun=%d, dependentsWaiting=%d", running.ordinality(), toRun.ordinality(), stack.ordinality());
- while (toRun.ordinality())
- {
- if (job.queryPausing())
- return NULL;
- Linked<CGraphExecutorGraphInfo> nextGraphInfo = &toRun.item(0);
- toRun.remove(0);
- if (!nextGraphInfo->subGraph->isComplete() && (NULL == findRunning(nextGraphInfo->subGraph->queryGraphId())))
- {
- running.append(*nextGraphInfo.getLink());
- return nextGraphInfo.getClear();
- }
- }
- return NULL;
- }
- // IGraphExecutor
- virtual void add(CGraphBase *subGraph, IGraphCallback &callback, bool checkDependencies, size32_t parentExtractSz, const byte *parentExtract)
- {
- bool alreadyRunning;
- {
- CriticalBlock b(crit);
- if (job.queryPausing())
- return;
- if (subGraph->isComplete())
- return;
- alreadyRunning = NULL != findRunning(subGraph->queryGraphId());
- if (alreadyRunning)
- ++waitOnRunning;
- }
- if (alreadyRunning)
- {
- loop
- {
- PROGLOG("Waiting on subgraph %" GIDPF "d", subGraph->queryGraphId());
- if (runningSem.wait(MEDIUMTIMEOUT) || job.queryAborted() || job.queryPausing())
- break;
- }
- return;
- }
- else
- {
- CriticalBlock b(crit);
- if (seen.contains(subGraph->queryGraphId()))
- return; // already queued;
- seen.append(subGraph->queryGraphId());
- }
- if (!subGraph->prepare(parentExtractSz, parentExtract, checkDependencies, true, true))
- {
- subGraph->setComplete();
- return;
- }
- if (subGraph->dependentSubGraphs.ordinality())
- {
- bool dependenciesDone = true;
- ForEachItemIn (d, subGraph->dependentSubGraphs)
- {
- CGraphBase &graph = subGraph->dependentSubGraphs.item(d);
- if (!graph.isComplete())
- {
- dependenciesDone = false;
- break;
- }
- }
- if (dependenciesDone)
- subGraph->dependentSubGraphs.kill(); // none to track anymore
- }
- Owned<CGraphExecutorGraphInfo> graphInfo = new CGraphExecutorGraphInfo(*this, subGraph, callback, parentExtract, parentExtractSz);
- CriticalBlock b(crit);
- if (0 == subGraph->dependentSubGraphs.ordinality())
- {
- if (running.ordinality()<limit)
- {
- running.append(*LINK(graphInfo));
- PROGLOG("Add: Launching graph thread for graphId=%" GIDPF "d", subGraph->queryGraphId());
- graphPool->start(graphInfo.getClear());
- }
- else
- stack.add(*graphInfo.getClear(), 0); // push to front, no dependency, free to run next.
- }
- else
- stack.append(*graphInfo.getClear()); // as dependencies finish, may move up the list
- }
- virtual IThreadPool &queryGraphPool() { return *graphPool; }
- virtual void wait()
- {
- PROGLOG("CGraphExecutor exiting, waiting on graph pool");
- graphPool->joinAll();
- PROGLOG("CGraphExecutor graphPool finished");
- }
- };
- ////
- // IContextLogger
- class CThorContextLogger : CSimpleInterface, implements IContextLogger
- {
- CJobBase &job;
- unsigned traceLevel;
- public:
- IMPLEMENT_IINTERFACE_USING(CSimpleInterface);
- CThorContextLogger(CJobBase &_job) : job(_job)
- {
- traceLevel = 1;
- }
- virtual void CTXLOGva(const char *format, va_list args) const
- {
- StringBuffer ss;
- ss.valist_appendf(format, args);
- LOG(MCdebugProgress, thorJob, "%s", ss.str());
- }
- virtual void logOperatorExceptionVA(IException *E, const char *file, unsigned line, const char *format, va_list args) const
- {
- StringBuffer ss;
- ss.append("ERROR");
- if (E)
- ss.append(": ").append(E->errorCode());
- if (file)
- ss.appendf(": %s(%d) ", file, line);
- if (E)
- E->errorMessage(ss.append(": "));
- if (format)
- ss.append(": ").valist_appendf(format, args);
- LOG(MCoperatorProgress, thorJob, "%s", ss.str());
- }
- virtual void noteStatistic(StatisticKind kind, unsigned __int64 value) const
- {
- }
- virtual void mergeStats(const CRuntimeStatisticCollection &from) const
- {
- }
- virtual unsigned queryTraceLevel() const
- {
- return traceLevel;
- }
- };
- ////
- CJobBase::CJobBase(const char *_graphName) : graphName(_graphName)
- {
- maxDiskUsage = diskUsage = 0;
- dirty = true;
- aborted = false;
- codeCtx = NULL;
- mpJobTag = TAG_NULL;
- timeReporter = createStdTimeReporter();
- pluginMap = new SafePluginMap(&pluginCtx, true);
- // JCSMORE - Will pass down at job creation time...
- jobGroup.set(&::queryClusterGroup());
- jobComm.setown(createCommunicator(jobGroup));
- slaveGroup.setown(jobGroup->remove(0));
- myrank = jobGroup->rank(queryMyNode());
- globalMemorySize = globals->getPropInt("@globalMemorySize"); // in MB
- oldNodeCacheMem = 0;
- }
- void CJobBase::init()
- {
- StringBuffer tmp;
- tmp.append(wuid);
- tmp.append(graphName);
- key.set(tmp.str());
- SCMStringBuffer tokenUser, password;
- extractToken(token.str(), wuid.str(), tokenUser, password);
- userDesc = createUserDescriptor();
- userDesc->set(user.str(), password.str());
- forceLogGraphIdMin = (graph_id)getWorkUnitValueInt("forceLogGraphIdMin", 0);
- forceLogGraphIdMax = (graph_id)getWorkUnitValueInt("forceLogGraphIdMax", 0);
- logctx.setown(new CThorContextLogger(*this));
- // global setting default on, can be overridden by #option
- timeActivities = 0 != getWorkUnitValueInt("timeActivities", globals->getPropBool("@timeActivities", true));
- maxActivityCores = (unsigned)getWorkUnitValueInt("maxActivityCores", globals->getPropInt("@maxActivityCores", 0)); // NB: 0 means system decides
- pausing = false;
- resumed = false;
- bool crcChecking = 0 != getWorkUnitValueInt("THOR_ROWCRC", globals->getPropBool("@THOR_ROWCRC", false));
- bool usePackedAllocator = 0 != getWorkUnitValueInt("THOR_PACKEDALLOCATOR", globals->getPropBool("@THOR_PACKEDALLOCATOR", true));
- unsigned memorySpillAt = (unsigned)getWorkUnitValueInt("memorySpillAt", globals->getPropInt("@memorySpillAt", 80));
- thorAllocator.setown(createThorAllocator(((memsize_t)globalMemorySize)*0x100000, memorySpillAt, *logctx, crcChecking, usePackedAllocator));
- unsigned defaultMemMB = globalMemorySize*3/4;
- PROGLOG("Global memory size = %d MB, memory spill at = %d%%", globalMemorySize, memorySpillAt);
- StringBuffer tracing("maxActivityCores = ");
- if (maxActivityCores)
- tracing.append(maxActivityCores);
- else
- tracing.append("[unbound]");
- PROGLOG("%s", tracing.str());
- graphExecutor.setown(new CGraphExecutor(*this));
- }
- void CJobBase::beforeDispose()
- {
- endJob();
- }
- CJobBase::~CJobBase()
- {
- clean();
- thorAllocator->queryRowManager()->reportMemoryUsage(false);
- PROGLOG("CJobBase resetting memory manager");
- thorAllocator.clear();
- ::Release(codeCtx);
- ::Release(userDesc);
- timeReporter->Release();
- delete pluginMap;
- StringBuffer memStatsStr;
- roxiemem::memstats(memStatsStr);
- PROGLOG("Roxiemem stats: %s", memStatsStr.str());
- memsize_t heapUsage = getMapInfo("heap");
- if (heapUsage) // if 0, assumed to be unavailable
- PROGLOG("Heap usage : %" I64F "d bytes", (unsigned __int64)heapUsage);
- }
- void CJobBase::startJob()
- {
- LOG(MCdebugProgress, thorJob, "New Graph started : %s", graphName.get());
- ClearTempDirs();
- unsigned pinterval = globals->getPropInt("@system_monitor_interval",1000*60);
- if (pinterval)
- {
- perfmonhook.setown(createThorMemStatsPerfMonHook(*this, getOptInt(THOROPT_MAX_KERNLOG, 3)));
- startPerformanceMonitor(pinterval,PerfMonStandard,perfmonhook);
- }
- PrintMemoryStatusLog();
- logDiskSpace();
- unsigned keyNodeCacheMB = (unsigned)getWorkUnitValueInt("keyNodeCacheMB", 0);
- if (keyNodeCacheMB)
- {
- oldNodeCacheMem = setNodeCacheMem(keyNodeCacheMB * 0x100000);
- PROGLOG("Key node cache size set to: %d MB", keyNodeCacheMB);
- }
- unsigned keyFileCacheLimit = (unsigned)getWorkUnitValueInt("keyFileCacheLimit", 0);
- if (!keyFileCacheLimit)
- keyFileCacheLimit = (querySlaves()+1)*2;
- setKeyIndexCacheSize(keyFileCacheLimit);
- PROGLOG("Key file cache size set to: %d", keyFileCacheLimit);
- }
- void CJobBase::endJob()
- {
- stopPerformanceMonitor();
- LOG(MCdebugProgress, thorJob, "Job ended : %s", graphName.get());
- clearKeyStoreCache(true);
- if (oldNodeCacheMem)
- setNodeCacheMem(oldNodeCacheMem);
- PrintMemoryStatusLog();
- }
- bool CJobBase::queryForceLogging(graph_id graphId, bool def) const
- {
- // JCSMORE, could add comma separated range, e.g. 1-5,10-12
- if ((graphId >= forceLogGraphIdMin) && (graphId <= forceLogGraphIdMax))
- return true;
- return def;
- }
- void CJobBase::clean()
- {
- if (graphExecutor)
- {
- graphExecutor->queryGraphPool().stopAll();
- graphExecutor.clear();
- }
- subGraphs.kill();
- }
- IThorGraphIterator *CJobBase::getSubGraphs()
- {
- CriticalBlock b(crit);
- return new CGraphTableIterator(subGraphs);
- }
- static void getGlobalDeps(CGraphBase &graph, CICopyArrayOf<CGraphDependency> &deps)
- {
- Owned<IThorActivityIterator> iter = graph.getIterator();
- ForEach(*iter)
- {
- CGraphElementBase &elem = iter->query();
- Owned<IThorGraphDependencyIterator> dependIterator = elem.getDependsIterator();
- ForEach(*dependIterator)
- {
- CGraphDependency &dependency = dependIterator->query();
- if (dependency.graph->isGlobal() && NULL==dependency.graph->queryOwner())
- deps.append(dependency);
- getGlobalDeps(*dependency.graph, deps);
- }
- }
- }
- static void noteDependency(CGraphElementBase *targetActivity, CGraphElementBase *sourceActivity, CGraphBase *targetGraph, CGraphBase *sourceGraph, unsigned controlId)
- {
- targetActivity->addDependsOn(sourceGraph, controlId);
- // NB: record dependency in source graph, serialized to slaves, used to decided if should run dependency sinks or not
- Owned<IPropertyTree> dependencyFor = createPTree();
- dependencyFor->setPropInt("@id", sourceActivity->queryId());
- dependencyFor->setPropInt("@graphId", targetGraph->queryGraphId());
- if (controlId)
- dependencyFor->setPropInt("@conditionalId", controlId);
- sourceGraph->queryXGMML().addPropTree("Dependency", dependencyFor.getClear());
- }
- void CJobBase::addDependencies(IPropertyTree *xgmml, bool failIfMissing)
- {
- CGraphArrayCopy childGraphs;
- CGraphElementArrayCopy targetActivities, sourceActivities;
- Owned<IPropertyTreeIterator> iter = xgmml->getElements("edge");
- ForEach(*iter)
- {
- IPropertyTree &edge = iter->query();
- Owned<CGraphBase> source = getGraph(edge.getPropInt("@source"));
- Owned<CGraphBase> target = getGraph(edge.getPropInt("@target"));
- if (!source || !target)
- {
- if (failIfMissing)
- throwUnexpected();
- else
- continue; // expected if assigning dependencies in slaves
- }
- CGraphElementBase *targetActivity = (CGraphElementBase *)target->queryElement(edge.getPropInt("att[@name=\"_targetActivity\"]/@value"));
- CGraphElementBase *sourceActivity = (CGraphElementBase *)source->queryElement(edge.getPropInt("att[@name=\"_sourceActivity\"]/@value"));
- int controlId = 0;
- if (edge.getPropBool("att[@name=\"_dependsOn\"]/@value", false))
- {
- if (!edge.getPropBool("att[@name=\"_childGraph\"]/@value", false)) // JCSMORE - not sure if necess. roxie seem to do.
- controlId = edge.getPropInt("att[@name=\"_when\"]/@value", 0);
- CGraphBase &sourceGraph = sourceActivity->queryOwner();
- unsigned sourceGraphContext = sourceGraph.queryParentActivityId();
- CGraphBase *targetGraph = NULL;
- unsigned targetGraphContext = -1;
- loop
- {
- targetGraph = &targetActivity->queryOwner();
- targetGraphContext = targetGraph->queryParentActivityId();
- if (sourceGraphContext == targetGraphContext)
- break;
- targetActivity = targetGraph->queryElement(targetGraphContext);
- }
- assertex(targetActivity && sourceActivity);
- noteDependency(targetActivity, sourceActivity, target, source, controlId);
- }
- else if (edge.getPropBool("att[@name=\"_conditionSource\"]/@value", false))
- { /* Ignore it */ }
- else if (edge.getPropBool("att[@name=\"_childGraph\"]/@value", false))
- {
- // NB: any dependencies of the child acts. are dependencies of this act.
- childGraphs.append(*source);
- targetActivities.append(*targetActivity);
- sourceActivities.append(*sourceActivity);
- }
- else
- {
- if (!edge.getPropBool("att[@name=\"_childGraph\"]/@value", false)) // JCSMORE - not sure if necess. roxie seem to do.
- controlId = edge.getPropInt("att[@name=\"_when\"]/@value", 0);
- noteDependency(targetActivity, sourceActivity, target, source, controlId);
- }
- }
- ForEachItemIn(c, childGraphs)
- {
- CGraphBase &childGraph = childGraphs.item(c);
- CGraphElementBase &targetActivity = targetActivities.item(c);
- CGraphElementBase &sourceActivity = sourceActivities.item(c);
- if (!childGraph.isGlobal())
- {
- CICopyArrayOf<CGraphDependency> globalChildGraphDeps;
- getGlobalDeps(childGraph, globalChildGraphDeps);
- ForEachItemIn(gcd, globalChildGraphDeps)
- {
- CGraphDependency &globalDep = globalChildGraphDeps.item(gcd);
- noteDependency(&targetActivity, &sourceActivity, globalDep.graph, &childGraph, globalDep.controlId);
- }
- }
- }
- SuperHashIteratorOf<CGraphBase> allIter(allGraphs);
- ForEach(allIter)
- {
- CGraphBase &subGraph = allIter.query();
- if (subGraph.queryOwner() && subGraph.queryParentActivityId())
- {
- CGraphElementBase *parentElement = subGraph.queryOwner()->queryElement(subGraph.queryParentActivityId());
- if (isLoopActivity(*parentElement))
- {
- if (!parentElement->queryOwner().isLocalChild() && !subGraph.isLocalOnly())
- subGraph.setGlobal(true);
- }
- }
- bool log = queryForceLogging(subGraph.queryGraphId(), (NULL == subGraph.queryOwner()) || subGraph.isGlobal());
- subGraph.setLogging(log);
- }
- }
- void CJobBase::startGraph(CGraphBase &graph, IGraphCallback &callback, bool checkDependencies, size32_t parentExtractSize, const byte *parentExtract)
- {
- graphExecutor->add(&graph, callback, checkDependencies, parentExtractSize, parentExtract);
- }
- ICodeContext &CJobBase::queryCodeContext() const
- {
- return *codeCtx;
- }
- bool CJobBase::queryUseCheckpoints() const
- {
- return globals->getPropBool("@checkPointRecovery") || 0 != getWorkUnitValueInt("checkPointRecovery", 0);
- }
- void CJobBase::abort(IException *e)
- {
- aborted = true;
- Owned<IThorGraphIterator> iter = getSubGraphs();
- ForEach (*iter)
- {
- CGraphBase &graph = iter->query();
- graph.abort(e);
- }
- }
- void CJobBase::increase(offset_t usage, const char *key)
- {
- diskUsage += usage;
- if (diskUsage > maxDiskUsage) maxDiskUsage = diskUsage;
- }
- void CJobBase::decrease(offset_t usage, const char *key)
- {
- diskUsage -= usage;
- }
- mptag_t CJobBase::allocateMPTag()
- {
- mptag_t tag = allocateClusterMPTag();
- jobComm->flush(tag);
- PROGLOG("allocateMPTag: tag = %d", (int)tag);
- return tag;
- }
- void CJobBase::freeMPTag(mptag_t tag)
- {
- if (TAG_NULL != tag)
- {
- freeClusterMPTag(tag);
- PROGLOG("freeMPTag: tag = %d", (int)tag);
- jobComm->flush(tag);
- }
- }
- mptag_t CJobBase::deserializeMPTag(MemoryBuffer &mb)
- {
- mptag_t tag;
- deserializeMPtag(mb, tag);
- if (TAG_NULL != tag)
- {
- PROGLOG("deserializeMPTag: tag = %d", (int)tag);
- jobComm->flush(tag);
- }
- return tag;
- }
- // these getX methods for property in workunit settings, then global setting, defaulting to provided 'dft' if not present
- StringBuffer &CJobBase::getOpt(const char *opt, StringBuffer &out)
- {
- if (!opt || !*opt)
- return out; // probably error
- VStringBuffer gOpt("Debug/@%s", opt);
- getWorkUnitValue(opt, out);
- if (0 == out.length())
- globals->getProp(gOpt, out);
- return out;
- }
- bool CJobBase::getOptBool(const char *opt, bool dft)
- {
- if (!opt || !*opt)
- return dft; // probably error
- VStringBuffer gOpt("Debug/@%s", opt);
- return getWorkUnitValueBool(opt, globals->getPropBool(gOpt, dft));
- }
- int CJobBase::getOptInt(const char *opt, int dft)
- {
- if (!opt || !*opt)
- return dft; // probably error
- VStringBuffer gOpt("Debug/@%s", opt);
- return (int)getWorkUnitValueInt(opt, globals->getPropInt(gOpt, dft));
- }
- __int64 CJobBase::getOptInt64(const char *opt, __int64 dft)
- {
- if (!opt || !*opt)
- return dft; // probably error
- VStringBuffer gOpt("Debug/@%s", opt);
- return getWorkUnitValueInt(opt, globals->getPropInt64(gOpt, dft));
- }
- // IGraphCallback
- void CJobBase::runSubgraph(CGraphBase &graph, size32_t parentExtractSz, const byte *parentExtract)
- {
- graph.executeSubGraph(parentExtractSz, parentExtract);
- }
- IEngineRowAllocator *CJobBase::getRowAllocator(IOutputMetaData * meta, unsigned activityId, roxiemem::RoxieHeapFlags flags) const
- {
- return thorAllocator->getRowAllocator(meta, activityId, flags);
- }
- roxiemem::IRowManager *CJobBase::queryRowManager() const
- {
- return thorAllocator->queryRowManager();
- }
- IThorResult *CJobBase::getOwnedResult(graph_id gid, activity_id ownerId, unsigned resultId)
- {
- Owned<CGraphBase> graph = getGraph(gid);
- if (!graph)
- {
- Owned<IThorException> e = MakeThorException(0, "getOwnedResult: graph not found");
- e->setGraphId(gid);
- throw e.getClear();
- }
- Owned<IThorResult> result;
- if (ownerId)
- {
- CGraphElementBase *container = graph->queryElement(ownerId);
- assertex(container);
- CActivityBase *activity = container->queryActivity();
- IThorGraphResults *results = activity->queryResults();
- if (!results)
- throw MakeGraphException(graph, 0, "GraphGetResult: no results created (requesting: %d)", resultId);
- result.setown(activity->queryResults()->getResult(resultId));
- }
- else
- result.setown(graph->getResult(resultId));
- if (!result)
- throw MakeGraphException(graph, 0, "GraphGetResult: result not found: %d", resultId);
- return result.getClear();
- }
- static IThorResource *iThorResource = NULL;
- void setIThorResource(IThorResource &r)
- {
- iThorResource = &r;
- }
- IThorResource &queryThor()
- {
- return *iThorResource;
- }
- //
- //
- //
- //
- CActivityBase::CActivityBase(CGraphElementBase *_container) : container(*_container), timeActivities(_container->queryJob().queryTimeActivities())
- {
- mpTag = TAG_NULL;
- abortSoon = receiving = cancelledReceive = reInit = false;
- baseHelper.set(container.queryHelper());
- parentExtractSz = 0;
- parentExtract = NULL;
- // NB: maxCores, currently only used to control # cores used by sorts
- maxCores = container.queryXGMML().getPropInt("hint[@name=\"max_cores\"]/@value", container.queryJob().queryMaxDefaultActivityCores());
- }
- CActivityBase::~CActivityBase()
- {
- }
- void CActivityBase::abort()
- {
- if (!abortSoon) ActPrintLog("Abort condition set");
- abortSoon = true;
- }
- void CActivityBase::kill()
- {
- ownedResults.clear();
- }
- bool CActivityBase::appendRowXml(StringBuffer & target, IOutputMetaData & meta, const void * row) const
- {
- if (!meta.hasXML())
- {
- target.append("<xml-unavailable/>");
- return false;
- }
- try
- {
- CommonXmlWriter xmlWrite(XWFnoindent);
- meta.toXML((byte *) row, xmlWrite);
- target.append(xmlWrite.str());
- return true;
- }
- catch (IException * e)
- {
- e->Release();
- target.append("<invalid-row/>");
- return false;
- }
- }
- void CActivityBase::logRow(const char * prefix, IOutputMetaData & meta, const void * row)
- {
- bool blindLogging = false; // MORE: should check a workunit/global option
- if (meta.hasXML() && !blindLogging)
- {
- StringBuffer xml;
- appendRowXml(xml, meta, row);
- ActPrintLog("%s: %s", prefix, xml.str());
- }
- }
- void CActivityBase::ActPrintLog(const char *format, ...)
- {
- va_list args;
- va_start(args, format);
- ::ActPrintLogArgs(&queryContainer(), thorlog_null, MCdebugProgress, format, args);
- va_end(args);
- }
- void CActivityBase::ActPrintLog(IException *e, const char *format, ...)
- {
- va_list args;
- va_start(args, format);
- ::ActPrintLogArgs(&queryContainer(), e, thorlog_all, MCexception(e), format, args);
- va_end(args);
- }
- void CActivityBase::ActPrintLog(IException *e)
- {
- ActPrintLog(e, "%s", "");
- }
- bool CActivityBase::fireException(IException *e)
- {
- Owned<IThorException> _te;
- IThorException *te = QUERYINTERFACE(e, IThorException);
- if (te)
- {
- if (!te->queryActivityId())
- setExceptionActivityInfo(container, te);
- }
- else
- {
- te = MakeActivityException(this, e);
- te->setAudience(e->errorAudience());
- _te.setown(te);
- }
- return container.queryOwner().fireException(te);
- }
- void CActivityBase::processAndThrowOwnedException(IException * _e)
- {
- IThorException *e = QUERYINTERFACE(_e, IThorException);
- if (e)
- {
- if (!e->queryActivityId())
- setExceptionActivityInfo(container, e);
- }
- else
- {
- e = MakeActivityException(this, _e);
- _e->Release();
- }
- if (!e->queryNotified())
- {
- fireException(e);
- e->setNotified();
- }
- throw e;
- }
- IEngineRowAllocator * CActivityBase::queryRowAllocator()
- {
- if (CABallocatorlock.lock()) {
- if (!rowAllocator)
- rowAllocator.setown(queryJob().getRowAllocator(queryRowMetaData(),queryActivityId()));
- CABallocatorlock.unlock();
- }
- return rowAllocator;
- }
-
- IOutputRowSerializer * CActivityBase::queryRowSerializer()
- {
- if (CABserializerlock.lock()) {
- if (!rowSerializer)
- rowSerializer.setown(queryRowMetaData()->createDiskSerializer(queryCodeContext(),queryActivityId()));
- CABserializerlock.unlock();
- }
- return rowSerializer;
- }
- IOutputRowDeserializer * CActivityBase::queryRowDeserializer()
- {
- if (CABdeserializerlock.lock()) {
- if (!rowDeserializer)
- rowDeserializer.setown(queryRowMetaData()->createDiskDeserializer(queryCodeContext(),queryActivityId()));
- CABdeserializerlock.unlock();
- }
- return rowDeserializer;
- }
- IRowInterfaces *CActivityBase::getRowInterfaces()
- {
- // create an independent instance, to avoid circular link dependency problems
- return createRowInterfaces(queryRowMetaData(), container.queryId(), queryCodeContext());
- }
- bool CActivityBase::receiveMsg(CMessageBuffer &mb, const rank_t rank, const mptag_t mpTag, rank_t *sender, unsigned timeout)
- {
- BooleanOnOff onOff(receiving);
- CTimeMon t(timeout);
- unsigned remaining = timeout;
- // check 'cancelledReceive' every 10 secs
- while (!cancelledReceive && ((MP_WAIT_FOREVER==timeout) || !t.timedout(&remaining)))
- {
- if (container.queryJob().queryJobComm().recv(mb, rank, mpTag, sender, remaining>10000?10000:remaining))
- return true;
- }
- return false;
- }
- void CActivityBase::cancelReceiveMsg(const rank_t rank, const mptag_t mpTag)
- {
- cancelledReceive = true;
- if (receiving)
- container.queryJob().queryJobComm().cancel(rank, mpTag);
- }
- StringBuffer &CActivityBase::getOpt(const char *prop, StringBuffer &out) const
- {
- VStringBuffer path("hint[@name=\"%s\"]/@value", prop);
- if (!container.queryXGMML().getProp(path.toLowerCase().str(), out))
- queryJob().getOpt(prop, out);
- return out;
- }
- bool CActivityBase::getOptBool(const char *prop, bool defVal) const
- {
- bool def = queryJob().getOptBool(prop, defVal);
- VStringBuffer path("hint[@name=\"%s\"]/@value", prop);
- return container.queryXGMML().getPropBool(path.toLowerCase().str(), def);
- }
- int CActivityBase::getOptInt(const char *prop, int defVal) const
- {
- int def = queryJob().getOptInt(prop, defVal);
- VStringBuffer path("hint[@name=\"%s\"]/@value", prop);
- return container.queryXGMML().getPropInt(path.toLowerCase().str(), def);
- }
- __int64 CActivityBase::getOptInt64(const char *prop, __int64 defVal) const
- {
- __int64 def = queryJob().getOptInt64(prop, defVal);
- VStringBuffer path("hint[@name=\"%s\"]/@value", prop);
- return container.queryXGMML().getPropInt64(path.toLowerCase().str(), def);
- }
|