1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201120212031204120512061207120812091210121112121213121412151216121712181219122012211222122312241225122612271228122912301231123212331234123512361237123812391240124112421243124412451246124712481249125012511252125312541255125612571258125912601261126212631264126512661267126812691270127112721273127412751276127712781279128012811282128312841285128612871288128912901291129212931294129512961297129812991300130113021303130413051306130713081309131013111312131313141315131613171318131913201321132213231324132513261327132813291330133113321333133413351336133713381339134013411342134313441345134613471348134913501351135213531354135513561357135813591360136113621363136413651366136713681369137013711372137313741375137613771378137913801381138213831384138513861387138813891390139113921393139413951396139713981399140014011402140314041405140614071408140914101411141214131414141514161417141814191420142114221423142414251426142714281429143014311432143314341435143614371438143914401441144214431444144514461447144814491450145114521453145414551456145714581459146014611462146314641465146614671468146914701471147214731474147514761477147814791480148114821483148414851486148714881489149014911492149314941495149614971498149915001501150215031504150515061507150815091510151115121513151415151516151715181519152015211522152315241525152615271528152915301531153215331534153515361537153815391540154115421543154415451546154715481549155015511552155315541555155615571558155915601561156215631564156515661567156815691570157115721573157415751576157715781579158015811582158315841585158615871588158915901591159215931594159515961597159815991600160116021603160416051606160716081609161016111612161316141615161616171618161916201621162216231624162516261627162816291630163116321633163416351636163716381639164016411642164316441645164616471648164916501651165216531654165516561657165816591660166116621663166416651666166716681669167016711672167316741675167616771678167916801681168216831684168516861687168816891690169116921693169416951696169716981699170017011702170317041705170617071708170917101711171217131714171517161717171817191720172117221723172417251726172717281729173017311732173317341735173617371738173917401741174217431744174517461747174817491750175117521753175417551756175717581759176017611762176317641765176617671768176917701771177217731774177517761777177817791780178117821783178417851786178717881789179017911792179317941795179617971798179918001801180218031804180518061807180818091810181118121813181418151816181718181819182018211822182318241825182618271828182918301831183218331834183518361837183818391840184118421843184418451846184718481849185018511852185318541855185618571858185918601861186218631864186518661867186818691870187118721873187418751876187718781879188018811882188318841885188618871888188918901891189218931894189518961897189818991900190119021903190419051906190719081909191019111912191319141915191619171918191919201921192219231924192519261927192819291930193119321933193419351936193719381939194019411942194319441945194619471948194919501951195219531954195519561957195819591960196119621963196419651966196719681969197019711972197319741975197619771978197919801981198219831984198519861987198819891990199119921993199419951996199719981999200020012002200320042005200620072008200920102011201220132014201520162017201820192020202120222023202420252026202720282029203020312032203320342035203620372038203920402041204220432044204520462047204820492050205120522053205420552056205720582059206020612062206320642065206620672068206920702071207220732074207520762077207820792080208120822083208420852086208720882089209020912092209320942095209620972098209921002101210221032104210521062107210821092110211121122113211421152116211721182119212021212122212321242125212621272128212921302131213221332134213521362137213821392140214121422143214421452146214721482149215021512152215321542155215621572158215921602161216221632164216521662167216821692170217121722173217421752176217721782179218021812182218321842185218621872188218921902191219221932194219521962197219821992200220122022203220422052206220722082209221022112212221322142215221622172218221922202221222222232224222522262227222822292230223122322233223422352236223722382239224022412242224322442245224622472248224922502251225222532254225522562257225822592260226122622263226422652266226722682269227022712272227322742275227622772278227922802281228222832284228522862287228822892290229122922293229422952296229722982299230023012302230323042305230623072308230923102311231223132314231523162317231823192320232123222323232423252326232723282329233023312332233323342335233623372338233923402341234223432344234523462347234823492350235123522353235423552356235723582359236023612362236323642365236623672368236923702371237223732374237523762377237823792380238123822383238423852386238723882389239023912392239323942395239623972398239924002401240224032404240524062407240824092410241124122413241424152416241724182419242024212422242324242425242624272428242924302431243224332434243524362437243824392440244124422443244424452446244724482449245024512452245324542455245624572458245924602461246224632464246524662467246824692470247124722473247424752476247724782479248024812482248324842485248624872488248924902491249224932494249524962497249824992500250125022503250425052506250725082509251025112512251325142515251625172518251925202521252225232524252525262527252825292530253125322533253425352536253725382539254025412542254325442545254625472548254925502551255225532554255525562557255825592560256125622563256425652566256725682569257025712572257325742575257625772578257925802581258225832584258525862587258825892590259125922593259425952596259725982599260026012602260326042605260626072608260926102611261226132614261526162617261826192620262126222623262426252626262726282629263026312632263326342635263626372638263926402641264226432644264526462647264826492650265126522653265426552656265726582659266026612662266326642665266626672668266926702671267226732674267526762677267826792680268126822683268426852686268726882689269026912692269326942695269626972698269927002701270227032704270527062707270827092710271127122713271427152716271727182719272027212722272327242725272627272728272927302731273227332734273527362737273827392740274127422743274427452746274727482749275027512752275327542755275627572758275927602761276227632764276527662767276827692770277127722773277427752776277727782779278027812782278327842785278627872788278927902791279227932794279527962797279827992800280128022803280428052806280728082809281028112812281328142815281628172818281928202821282228232824282528262827282828292830283128322833283428352836283728382839284028412842284328442845284628472848284928502851285228532854285528562857285828592860286128622863286428652866286728682869287028712872287328742875287628772878287928802881288228832884288528862887288828892890289128922893289428952896289728982899290029012902290329042905290629072908290929102911291229132914291529162917291829192920292129222923292429252926292729282929293029312932293329342935 |
- /*##############################################################################
- HPCC SYSTEMS software Copyright (C) 2012 HPCC Systems®.
- Licensed under the Apache License, Version 2.0 (the "License");
- you may not use this file except in compliance with the License.
- You may obtain a copy of the License at
- http://www.apache.org/licenses/LICENSE-2.0
- Unless required by applicable law or agreed to in writing, software
- distributed under the License is distributed on an "AS IS" BASIS,
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- See the License for the specific language governing permissions and
- limitations under the License.
- ############################################################################## */
- #include <limits.h>
- #include <stdlib.h>
- #include "jprop.hpp"
- #include "jexcept.hpp"
- #include "jiter.ipp"
- #include "jlzw.hpp"
- #include "jsocket.hpp"
- #include "jset.hpp"
- #include "jsort.hpp"
- #include "portlist.h"
- #include "jhtree.hpp"
- #include "mputil.hpp"
- #include "dllserver.hpp"
- #include "dautils.hpp"
- #include "danqs.hpp"
- #include "daclient.hpp"
- #include "daaudit.hpp"
- #include "wujobq.hpp"
- #include "thorport.hpp"
- #include "commonext.hpp"
- #include "thorxmlread.hpp"
- #include "thorplugin.hpp"
- #include "thormisc.hpp"
- #include "thgraphmaster.ipp"
- #include "thdemonserver.hpp"
- #include "rtlds_imp.hpp"
- #include "eclhelper.hpp"
- #include "thexception.hpp"
- #include "thactivitymaster.ipp"
- #include "thmem.hpp"
- #include "thcompressutil.hpp"
- using roxiemem::OwnedRoxieString;
- static CriticalSection *jobManagerCrit;
- MODULE_INIT(INIT_PRIORITY_STANDARD)
- {
- jobManagerCrit = new CriticalSection;
- return true;
- }
- MODULE_EXIT()
- {
- delete jobManagerCrit;
- }
- unsigned uniqGraphId = 1;
- #define FATAL_TIMEOUT 60
- class CFatalHandler : public CTimeoutTrigger, implements IFatalHandler
- {
- public:
- IMPLEMENT_IINTERFACE;
- CFatalHandler(unsigned timeout) : CTimeoutTrigger(timeout, "EXCEPTION")
- {
- }
- virtual bool action()
- {
- StringBuffer s("FAILED TO RECOVER FROM EXCEPTION, STOPPING THOR");
- FLLOG(MCoperatorWarning, thorJob, exception, s.str());
- Owned<IJobManager> jobManager = getJobManager();
- if (jobManager)
- {
- jobManager->fatal(exception);
- jobManager.clear();
- }
- return true;
- }
- // IFatalHandler
- virtual void inform(IException *e)
- {
- CTimeoutTrigger::inform(e);
- }
- virtual void clear()
- {
- CTimeoutTrigger::clear();
- }
- };
- /////
- CSlaveMessageHandler::CSlaveMessageHandler(CJobMaster &_job, mptag_t _mptag) : threaded("CSlaveMessageHandler"), job(_job), mptag(_mptag)
- {
- stopped = false;
- threaded.init(this);
- childGraphInitTimeout = job.getOptUInt(THOROPT_CHILD_GRAPH_INIT_TIMEOUT, 5*60) * 1000; // default 5 minutes
- }
- CSlaveMessageHandler::~CSlaveMessageHandler()
- {
- stop();
- }
- void CSlaveMessageHandler::stop()
- {
- if (!stopped)
- {
- stopped = true;
- job.queryJobChannel(0).queryJobComm().cancel(0, mptag);
- threaded.join();
- }
- }
- void CSlaveMessageHandler::threadmain()
- {
- try
- {
- for (;;)
- {
- rank_t sender;
- CMessageBuffer msg;
- if (stopped || !job.queryJobChannel(0).queryJobComm().recv(msg, RANK_ALL, mptag, &sender))
- break;
- SlaveMsgTypes msgType;
- msg.read((int &)msgType);
- switch (msgType)
- {
- case smt_errorMsg:
- {
- unsigned slave;
- msg.read(slave);
- Owned<IThorException> e = deserializeThorException(msg);
- e->setSlave(slave+1);
- Owned<CGraphBase> graph = job.queryJobChannel(0).getGraph(e->queryGraphId());
- if (graph)
- {
- activity_id id = e->queryActivityId();
- if (id)
- {
- CGraphElementBase *elem = graph->queryElement(id);
- CActivityBase *act = elem->queryActivity();
- if (act)
- act->fireException(e);
- else
- graph->fireException(e);
- }
- else
- graph->fireException(e);
- }
- else
- job.fireException(e);
- if (msg.getReplyTag() <= TAG_REPLY_BASE)
- {
- msg.clear();
- job.queryJobChannel(0).queryJobComm().reply(msg);
- }
- break;
- }
- case smt_dataReq:
- {
- graph_id gid;
- activity_id id;
- unsigned slave;
- msg.read(slave);
- msg.read(gid);
- msg.read(id);
- msg.clear();
- Owned<CGraphBase> graph = job.queryJobChannel(0).getGraph(gid);
- if (graph)
- {
- CMasterGraphElement *e = (CMasterGraphElement *)graph->queryElement(id);
- e->queryActivity()->getInitializationData(slave, msg);
- }
- job.queryJobChannel(0).queryJobComm().reply(msg);
- break;
- }
- case smt_initGraphReq:
- {
- graph_id gid;
- msg.read(gid);
- Owned<CMasterGraph> graph = (CMasterGraph *)job.queryJobChannel(0).getGraph(gid);
- assertex(graph);
- {
- CriticalBlock b(graph->queryCreateLock());
- Owned<IThorActivityIterator> iter = graph->getIterator();
- // onCreate all
- ForEach (*iter)
- {
- CMasterGraphElement &element = (CMasterGraphElement &)iter->query();
- element.onCreate();
- }
- }
- msg.clear();
- graph->serializeCreateContexts(msg);
- job.queryJobChannel(0).queryJobComm().reply(msg);
- break;
- }
- case smt_initActDataReq:
- {
- graph_id gid;
- msg.read(gid);
- unsigned slave;
- msg.read(slave);
- Owned<CMasterGraph> graph = (CMasterGraph *)job.queryJobChannel(0).getGraph(gid);
- assertex(graph);
- CGraphElementArray toSerialize;
- CriticalBlock b(graph->queryCreateLock());
- size32_t parentExtractSz;
- msg.read(parentExtractSz);
- const byte *parentExtract = NULL;
- if (parentExtractSz)
- {
- parentExtract = msg.readDirect(parentExtractSz);
- StringBuffer msg("Graph(");
- msg.append(graph->queryGraphId()).append(") - initializing master graph with parentExtract ").append(parentExtractSz).append(" bytes");
- DBGLOG("%s", msg.str());
- parentExtract = graph->setParentCtx(parentExtractSz, parentExtract);
- }
- Owned<IException> exception;
- for (;;)
- {
- activity_id id;
- msg.read(id);
- if (!id)
- break;
- CMasterGraphElement *element = (CMasterGraphElement *)graph->queryElement(id);
- assertex(element);
- try
- {
- element->reset();
- size32_t startCtxLen;
- msg.read(startCtxLen);
- element->doCreateActivity(parentExtractSz, parentExtract, startCtxLen ? &msg : nullptr);
- if (element->queryActivity())
- element->preStart(parentExtractSz, parentExtract);
- }
- catch (IException *e)
- {
- EXCLOG(e, NULL);
- exception.setown(e);
- break;
- }
- CActivityBase *activity = element->queryActivity();
- if (activity && activity->needReInit())
- element->sentActInitData->set(slave, 0); // clear to permit serializeActivityInitData to resend
- toSerialize.append(*LINK(element));
- }
- msg.clear();
- mptag_t replyTag = job.queryJobChannel(0).queryMPServer().createReplyTag();
- msg.append(replyTag); // second reply
- if (exception)
- {
- msg.append(true);
- serializeException(exception, msg);
- }
- else
- {
- msg.append(false);
- CGraphElementArrayIterator iter(toSerialize);
- graph->serializeActivityInitData(slave, msg, iter);
- }
- job.queryJobChannel(0).queryJobComm().reply(msg);
- if (!job.queryJobChannel(0).queryJobComm().recv(msg, slave+1, replyTag, NULL, childGraphInitTimeout))
- throwUnexpected();
- if (exception)
- throw exception.getClear();
- bool error;
- msg.read(error);
- if (error)
- {
- Owned<IThorException> e = deserializeThorException(msg);
- e->setSlave(slave);
- StringBuffer tmpStr("Slave ");
- job.queryJobGroup().queryNode(slave).endpoint().getUrlStr(tmpStr);
- GraphPrintLog(graph, e, "%s", tmpStr.append(": slave initialization error").str());
- throw e.getClear();
- }
- break;
- }
- case smt_getPhysicalName:
- {
- LOG(MCdebugProgress, unknownJob, "getPhysicalName called from node %d", sender-1);
- StringAttr logicalName;
- unsigned partNo;
- bool create;
- msg.read(logicalName);
- msg.read(partNo);
- msg.read(create);
- msg.clear();
- StringBuffer phys;
- if (create && !job.queryCreatedFile(logicalName)) // not sure who would do this ever??
- queryThorFileManager().getPublishPhysicalName(job, logicalName, partNo, phys);
- else
- queryThorFileManager().getPhysicalName(job, logicalName, partNo, phys);
- msg.append(phys);
- job.queryJobChannel(0).queryJobComm().reply(msg);
- break;
- }
- case smt_getFileOffset:
- {
- LOG(MCdebugProgress, unknownJob, "getFileOffset called from node %d", sender-1);
- StringAttr logicalName;
- unsigned partNo;
- msg.read(logicalName);
- msg.read(partNo);
- msg.clear();
- offset_t offset = queryThorFileManager().getFileOffset(job, logicalName, partNo);
- msg.append(offset);
- job.queryJobChannel(0).queryJobComm().reply(msg);
- break;
- }
- case smt_actMsg:
- {
- LOG(MCdebugProgress, unknownJob, "smt_actMsg called from node %d", sender-1);
- graph_id gid;
- msg.read(gid);
- activity_id id;
- msg.read(id);
- Owned<CMasterGraph> graph = (CMasterGraph *)job.queryJobChannel(0).getGraph(gid);
- assertex(graph);
- CMasterGraphElement *container = (CMasterGraphElement *)graph->queryElement(id);
- assertex(container);
- CMasterActivity *activity = (CMasterActivity *)container->queryActivity();
- assertex(activity);
- activity->handleSlaveMessage(msg); // don't block
- break;
- }
- case smt_getresult:
- {
- unsigned slave;
- msg.read(slave);
- LOG(MCdebugProgress, unknownJob, "smt_getresult called from slave %d", slave);
- graph_id gid;
- msg.read(gid);
- activity_id ownerId;
- msg.read(ownerId);
- unsigned resultId;
- msg.read(resultId);
- CJobChannel &jobChannel = job.queryJobChannel(0);
- mptag_t replyTag = jobChannel.deserializeMPTag(msg);
- Owned<IThorResult> result = jobChannel.getOwnedResult(gid, ownerId, resultId);
- Owned<IRowStream> resultStream = result->getRowStream();
- sendInChunks(job.queryJobChannel(0).queryJobComm(), slave+1, replyTag, resultStream, result->queryRowInterfaces());
- break;
- }
- }
- }
- }
- catch (IException *e)
- {
- job.fireException(e);
- e->Release();
- }
- }
- //////////////////////
- CMasterActivity::CMasterActivity(CGraphElementBase *_container) : CActivityBase(_container), threaded("CMasterActivity", this), timingInfo(_container->queryJob())
- {
- notedWarnings = createThreadSafeBitSet();
- mpTag = TAG_NULL;
- data = new MemoryBuffer[container.queryJob().querySlaves()];
- asyncStart = false;
- if (container.isSink())
- progressInfo.append(*new ProgressInfo(queryJob()));
- else
- {
- unsigned o=0;
- for (; o<container.getOutputs(); o++)
- progressInfo.append(*new ProgressInfo(queryJob()));
- }
- }
- CMasterActivity::~CMasterActivity()
- {
- if (asyncStart)
- threaded.join();
- notedWarnings->Release();
- queryJob().freeMPTag(mpTag);
- delete [] data;
- }
- void CMasterActivity::addReadFile(IDistributedFile *file, bool temp)
- {
- readFiles.append(*LINK(file));
- if (!temp) // NB: Temps not listed in workunit
- queryThorFileManager().noteFileRead(container.queryJob(), file);
- }
- IDistributedFile *CMasterActivity::queryReadFile(unsigned f)
- {
- if (f>=readFiles.ordinality())
- return NULL;
- return &readFiles.item(f);
- }
- void CMasterActivity::preStart(size32_t parentExtractSz, const byte *parentExtract)
- {
- CActivityBase::preStart(parentExtractSz, parentExtract);
- IArrayOf<IDistributedFile> tmpFiles;
- tmpFiles.swapWith(readFiles);
- ForEachItemIn(f, tmpFiles)
- {
- IDistributedFile &file = tmpFiles.item(f);
- IDistributedSuperFile *super = file.querySuperFile();
- if (super)
- getSuperFileSubs(super, readFiles, true);
- else
- readFiles.append(*LINK(&file));
- }
- }
- MemoryBuffer &CMasterActivity::queryInitializationData(unsigned slave) const
- { // NB: not intended to be called by multiple threads.
- return data[slave].reset();
- }
- MemoryBuffer &CMasterActivity::getInitializationData(unsigned slave, MemoryBuffer &dst) const
- {
- return dst.append(data[slave]);
- }
- void CMasterActivity::threadmain()
- {
- try
- {
- process();
- }
- catch (IException *e)
- {
- Owned<IException> e2;
- if (QUERYINTERFACE(e, ISEH_Exception))
- e2.setown(MakeThorFatal(e, TE_SEH, "(SEH)"));
- else
- e2.setown(MakeActivityException(this, e, "Master exception"));
- e->Release();
- ActPrintLog(e2, "In CMasterActivity::threadmain");
- fireException(e2);
- }
- catch (CATCHALL)
- {
- Owned<IException> e = MakeThorFatal(NULL, TE_MasterProcessError, "FATAL: Unknown master process exception kind=%s, id=%" ACTPF "d", activityKindStr(container.getKind()), container.queryId());
- ActPrintLog(e, "In CMasterActivity::threadmain");
- fireException(e);
- }
- }
- void CMasterActivity::init()
- {
- readFiles.kill();
- }
- void CMasterActivity::startProcess(bool async)
- {
- if (async)
- {
- asyncStart = true;
- threaded.start();
- }
- else
- threadmain();
- }
- bool CMasterActivity::wait(unsigned timeout)
- {
- if (!asyncStart)
- return true;
- return threaded.join(timeout);
- }
- void CMasterActivity::kill()
- {
- CActivityBase::kill();
- readFiles.kill();
- }
- bool CMasterActivity::fireException(IException *_e)
- {
- IThorException *e = QUERYINTERFACE(_e, IThorException);
- if (!e) return false;
- switch (e->errorCode())
- {
- case TE_LargeBufferWarning:
- case TE_MoxieIndarOverflow:
- case TE_BuildIndexFewExcess:
- case TE_FetchMisaligned:
- case TE_FetchOutOfRange:
- case TE_CouldNotCreateLookAhead:
- case TE_SpillAdded:
- case TE_ReadPartialFromPipe:
- case TE_LargeAggregateTable:
- {
- if (!notedWarnings->testSet(e->errorCode()))
- CActivityBase::fireException(e);
- return true;
- }
- }
- return container.queryOwner().fireException(e);
- }
- void CMasterActivity::reset()
- {
- asyncStart = false;
- CActivityBase::reset();
- }
- void CMasterActivity::deserializeStats(unsigned node, MemoryBuffer &mb)
- {
- CriticalBlock b(progressCrit); // don't think needed
- unsigned __int64 localTimeNs;
- mb.read(localTimeNs);
- timingInfo.set(node, localTimeNs);
- rowcount_t count;
- ForEachItemIn(p, progressInfo)
- {
- mb.read(count);
- progressInfo.item(p).set(node, count);
- }
- }
- void CMasterActivity::getActivityStats(IStatisticGatherer & stats)
- {
- timingInfo.getStats(stats);
- }
- void CMasterActivity::getEdgeStats(IStatisticGatherer & stats, unsigned idx)
- {
- CriticalBlock b(progressCrit);
- if (progressInfo.isItem(idx))
- progressInfo.item(idx).getStats(stats);
- }
- void CMasterActivity::done()
- {
- CActivityBase::done();
- ForEachItemIn(s, readFiles)
- {
- IDistributedFile &file = readFiles.item(s);
- file.setAccessed();
- }
- }
- //////////////////////
- // CMasterGraphElement impl.
- //
- CMasterGraphElement::CMasterGraphElement(CGraphBase &_owner, IPropertyTree &_xgmml) : CGraphElementBase(_owner, _xgmml)
- {
- }
- bool CMasterGraphElement::checkUpdate()
- {
- if (!onlyUpdateIfChanged)
- return false;
- if (!globals->getPropBool("@updateEnabled", true) || 0 != queryJob().getWorkUnitValueInt("disableUpdate", 0))
- return false;
- bool doCheckUpdate = false;
- OwnedRoxieString filename;
- unsigned eclCRC;
- unsigned __int64 totalCRC;
- bool temporary = false;
- switch (getKind())
- {
- case TAKindexwrite:
- {
- IHThorIndexWriteArg *helper = (IHThorIndexWriteArg *)queryHelper();
- doCheckUpdate = 0 != (helper->getFlags() & TIWupdate);
- filename.set(helper->getFileName());
- helper->getUpdateCRCs(eclCRC, totalCRC);
- break;
- }
- case TAKdiskwrite:
- case TAKcsvwrite:
- case TAKxmlwrite:
- case TAKjsonwrite:
- case TAKspillwrite:
- {
- IHThorDiskWriteArg *helper = (IHThorDiskWriteArg *)queryHelper();
- doCheckUpdate = 0 != (helper->getFlags() & TDWupdate);
- filename.set(helper->getFileName());
- helper->getUpdateCRCs(eclCRC, totalCRC);
- if (TAKdiskwrite == getKind())
- temporary = 0 != (helper->getFlags() & (TDXtemporary|TDXjobtemp));
- else if (TAKspillwrite == getKind())
- temporary = true;
- break;
- }
- }
- if (doCheckUpdate)
- {
- StringAttr lfn;
- Owned<IDistributedFile> file = queryThorFileManager().lookup(queryJob(), filename, temporary, true);
- if (file)
- {
- IPropertyTree &props = file->queryAttributes();
- if ((eclCRC == props.getPropInt("@eclCRC")) && (totalCRC == props.getPropInt64("@totalCRC")))
- {
- // so this needs pruning
- Owned<IThorException> e = MakeActivityWarning(this, TE_UpToDate, "output file = '%s' - is up to date - it will not be rebuilt", file->queryLogicalName());
- queryOwner().fireException(e);
- return true;
- }
- }
- }
- return false;
- }
- void CMasterGraphElement::initActivity()
- {
- CriticalBlock b(crit);
- if (!initialized)
- {
- initialized = true;
- ((CMasterActivity *)queryActivity())->init();
- }
- }
- void CMasterGraphElement::reset()
- {
- CGraphElementBase::reset();
- if (activity && activity->needReInit())
- initialized = false;
- }
- void CMasterGraphElement::doCreateActivity(size32_t parentExtractSz, const byte *parentExtract, MemoryBuffer *startCtx)
- {
- createActivity();
- onStart(parentExtractSz, parentExtract, startCtx);
- initActivity();
- }
- void CMasterGraphElement::slaveDone(size32_t slaveIdx, MemoryBuffer &mb)
- {
- ((CMasterActivity *)queryActivity())->slaveDone(slaveIdx, mb);
- }
- //////
- ///////
- class CBarrierMaster : public CInterface, implements IBarrier
- {
- mptag_t tag;
- Linked<ICommunicator> comm;
- bool receiving;
- public:
- IMPLEMENT_IINTERFACE;
- CBarrierMaster(ICommunicator &_comm, mptag_t _tag) : comm(&_comm), tag(_tag)
- {
- receiving = false;
- }
- virtual const mptag_t queryTag() const { return tag; }
- virtual bool wait(bool exception, unsigned timeout)
- {
- Owned<IException> e;
- CTimeMon tm(timeout);
- unsigned s=comm->queryGroup().ordinality()-1;
- bool aborted = false;
- CMessageBuffer msg;
- Owned<IBitSet> raisedSet = createThreadSafeBitSet();
- unsigned remaining = timeout;
- while (s--)
- {
- rank_t sender;
- msg.clear();
- if (INFINITE != timeout && tm.timedout(&remaining))
- {
- if (exception)
- throw createBarrierAbortException();
- else
- return false;
- }
- {
- BooleanOnOff onOff(receiving);
- if (!comm->recv(msg, RANK_ALL, tag, &sender, remaining))
- break;
- }
- msg.read(aborted);
- bool hasExcept;
- msg.read(hasExcept);
- if (hasExcept && !e.get())
- e.setown(deserializeException(msg));
- sender = sender - 1; // 0 = master
- if (raisedSet->testSet(sender, true) && !aborted)
- WARNLOG("CBarrierMaster, raise barrier message on tag %d, already received from slave %d", tag, sender);
- if (aborted) break;
- }
- msg.clear();
- msg.append(aborted);
- if (e)
- {
- msg.append(true);
- serializeException(e, msg);
- }
- else
- msg.append(false);
- if (INFINITE != timeout && tm.timedout(&remaining))
- {
- if (exception)
- throw createBarrierAbortException();
- else
- return false;
- }
- if (!comm->send(msg, RANK_ALL_OTHER, tag, INFINITE != timeout ? remaining : LONGTIMEOUT))
- throw MakeStringException(0, "CBarrierMaster::wait - Timeout sending to slaves");
- if (aborted)
- {
- if (!exception)
- return false;
- if (e)
- throw e.getClear();
- else
- throw createBarrierAbortException();
- }
- return true;
- }
- virtual void cancel(IException *e)
- {
- if (receiving)
- comm->cancel(RANK_ALL, tag);
- CMessageBuffer msg;
- msg.append(true);
- if (e)
- {
- msg.append(true);
- serializeException(e, msg);
- }
- else
- msg.append(false);
- if (!comm->send(msg, RANK_ALL_OTHER, tag, LONGTIMEOUT))
- throw MakeStringException(0, "CBarrierMaster::cancel - Timeout sending to slaves");
- }
- };
- /////////////
- class CMasterGraphTempHandler : public CGraphTempHandler
- {
- public:
- CMasterGraphTempHandler(CJobBase &job, bool errorOnMissing) : CGraphTempHandler(job, errorOnMissing) { }
- virtual bool removeTemp(const char *name)
- {
- queryThorFileManager().clearCacheEntry(name);
- return true;
- }
- virtual void registerFile(const char *name, graph_id graphId, unsigned usageCount, bool temp, WUFileKind fileKind, StringArray *clusters)
- {
- if (!temp || job.queryUseCheckpoints())
- {
- Owned<IWorkUnit> wu = &job.queryWorkUnit().lock();
- wu->addFile(name, clusters, usageCount, fileKind, job.queryGraphName());
- }
- else
- CGraphTempHandler::registerFile(name, graphId, usageCount, temp, fileKind, clusters);
- }
- virtual void deregisterFile(const char *name, bool kept) // NB: only called for temp files
- {
- if (kept || job.queryUseCheckpoints())
- {
- Owned<IWorkUnit> wu = &job.queryWorkUnit().lock();
- wu->releaseFile(name);
- }
- else
- CGraphTempHandler::deregisterFile(name);
- }
- virtual void clearTemps()
- {
- try
- {
- if (!job.queryPausing()) // temps of completed workunit will have been preserved and want to keep
- {
- Owned<IWorkUnit> lwu = &job.queryWorkUnit().lock();
- lwu->deleteTempFiles(job.queryGraphName(), false, false);
- }
- }
- catch (IException *e)
- {
- EXCLOG(e, "Problem deleting temp files");
- e->Release();
- }
- CGraphTempHandler::clearTemps();
- }
- };
- static const char * getResultText(StringBuffer & s, const char * stepname, unsigned sequence)
- {
- switch ((int)sequence)
- {
- case -1: return s.append("STORED('").append(stepname).append("')");
- case -2: return s.append("PERSIST('").append(stepname).append("')");
- case -3: return s.append("global('").append(stepname).append("')");
- default:
- if (stepname)
- return s.append(stepname);
- return s.append('#').append(sequence);
- }
- }
- class CThorCodeContextMaster : public CThorCodeContextBase
- {
- Linked<IConstWorkUnit> workunit;
- Owned<IDistributedFileTransaction> superfiletransaction;
- IConstWUResult * getResult(const char * name, unsigned sequence)
- {
- return getWorkUnitResult(workunit, name, sequence);
- }
- #define PROTECTED_GETRESULT(STEPNAME, SEQUENCE, KIND, KINDTEXT, ACTION) \
- LOG(MCdebugProgress, unknownJob, "getResult%s(%s,%d)", KIND, STEPNAME?STEPNAME:"", SEQUENCE); \
- Owned<IConstWUResult> r = getResultForGet(STEPNAME, SEQUENCE); \
- try \
- { \
- ACTION \
- } \
- catch (IException * e) { \
- StringBuffer s, text; e->errorMessage(text); e->Release(); \
- throw MakeStringException(TE_FailedToRetrieveWorkunitValue, "result %s in workunit contains an invalid " KINDTEXT " value [%s]", getResultText(s, STEPNAME, SEQUENCE), text.str()); \
- } \
- catch (CATCHALL) { StringBuffer s; throw MakeStringException(TE_FailedToRetrieveWorkunitValue, "value %s in workunit contains an invalid " KINDTEXT " value", getResultText(s, STEPNAME, SEQUENCE)); }
- public:
- CThorCodeContextMaster(CJobChannel &jobChannel, IConstWorkUnit &_workunit, ILoadedDllEntry &querySo, IUserDescriptor &userDesc) : CThorCodeContextBase(jobChannel, querySo, userDesc), workunit(&_workunit)
- {
- }
- // ICodeContext
- virtual unsigned getGraphLoopCounter() const override { return 0; }
- virtual IDebuggableContext *queryDebugContext() const override { return nullptr; }
- virtual void setResultBool(const char *name, unsigned sequence, bool result) override
- {
- WorkunitUpdate w(&workunit->lock());
- Owned<IWUResult> r = updateWorkUnitResult(w, name, sequence);
- if (r)
- {
- r->setResultBool(result);
- r->setResultStatus(ResultStatusCalculated);
- }
- else
- throw MakeStringException(TE_UnexpectedParameters, "Unexpected parameters to setResultBool");
- }
- virtual void setResultData(const char *name, unsigned sequence, int len, const void *result) override
- {
- WorkunitUpdate w(&workunit->lock());
- Owned<IWUResult> r = updateWorkUnitResult(w, name, sequence);
- if (r)
- {
- r->setResultData(result, len);
- r->setResultStatus(ResultStatusCalculated);
- }
- else
- throw MakeStringException(TE_UnexpectedParameters, "Unexpected parameters to setResultData");
- }
- virtual void setResultDecimal(const char * name, unsigned sequence, int len, int precision, bool isSigned, const void *val) override
- {
- WorkunitUpdate w(&workunit->lock());
- Owned<IWUResult> r = updateWorkUnitResult(w, name, sequence);
- if (r)
- {
- r->setResultDecimal(val, len);
- r->setResultStatus(ResultStatusCalculated);
- }
- else
- throw MakeStringException(TE_UnexpectedParameters, "Unexpected parameters to setResultDecimal");
- }
- virtual void setResultInt(const char *name, unsigned sequence, __int64 result, unsigned size) override
- {
- WorkunitUpdate w(&workunit->lock());
- Owned<IWUResult> r = updateWorkUnitResult(w, name, sequence);
- if (r)
- {
- r->setResultInt(result);
- r->setResultStatus(ResultStatusCalculated);
- }
- else
- throw MakeStringException(TE_UnexpectedParameters, "Unexpected parameters to setResultInt");
- }
- virtual void setResultRaw(const char *name, unsigned sequence, int len, const void *result) override
- {
- WorkunitUpdate w(&workunit->lock());
- Owned<IWUResult> r = updateWorkUnitResult(w, name, sequence);
- if (r)
- {
- r->setResultRaw(len, result, ResultFormatRaw);
- r->setResultStatus(ResultStatusCalculated);
- }
- else
- throw MakeStringException(TE_UnexpectedParameters, "Unexpected parameters to setResultData");
- }
- virtual void setResultReal(const char *name, unsigned sequence, double result) override
- {
- WorkunitUpdate w(&workunit->lock());
- Owned<IWUResult> r = updateWorkUnitResult(w, name, sequence);
- if (r)
- {
- r->setResultReal(result);
- r->setResultStatus(ResultStatusCalculated);
- }
- else
- throw MakeStringException(TE_UnexpectedParameters, "Unexpected parameters to setResultReal");
- }
- virtual void setResultSet(const char *name, unsigned sequence, bool isAll, size32_t len, const void *result, ISetToXmlTransformer *) override
- {
- WorkunitUpdate w(&workunit->lock());
- Owned<IWUResult> r = updateWorkUnitResult(w, name, sequence);
- if (r)
- {
- r->setResultIsAll(isAll);
- r->setResultRaw(len, result, ResultFormatRaw);
- r->setResultStatus(ResultStatusCalculated);
- }
- else
- throw MakeStringException(TE_UnexpectedParameters, "Unexpected parameters to setResultData");
- }
- virtual void setResultString(const char *name, unsigned sequence, int len, const char *result) override
- {
- WorkunitUpdate w(&workunit->lock());
- Owned<IWUResult> r = updateWorkUnitResult(w, name, sequence);
- if (r)
- {
- r->setResultString(result, len);
- r->setResultStatus(ResultStatusCalculated);
- }
- else
- throw MakeStringException(TE_UnexpectedParameters, "Unexpected parameters to setResultString");
- }
- virtual void setResultUnicode(const char * name, unsigned sequence, int len, UChar const * result) override
- {
- WorkunitUpdate w(&workunit->lock());
- Owned<IWUResult> r = updateWorkUnitResult(w, name, sequence);
- if (r)
- {
- r->setResultUnicode(result, len);
- r->setResultStatus(ResultStatusCalculated);
- }
- else
- throw MakeStringException(TE_UnexpectedParameters, "Unexpected parameters to setResultUnicode");
- }
- virtual void setResultVarString(const char * name, unsigned sequence, const char *result) override
- {
- WorkunitUpdate w(&workunit->lock());
- Owned<IWUResult> r = updateWorkUnitResult(w, name, sequence);
- if (r)
- {
- r->setResultString(result, strlen(result));
- r->setResultStatus(ResultStatusCalculated);
- }
- else
- throw MakeStringException(TE_UnexpectedParameters, "Unexpected parameters to setResultVarString");
- }
- virtual void setResultUInt(const char *name, unsigned sequence, unsigned __int64 result, unsigned size) override
- {
- WorkunitUpdate w(&workunit->lock());
- Owned<IWUResult> r = updateWorkUnitResult(w, name, sequence);
- if (r)
- {
- r->setResultUInt(result);
- r->setResultStatus(ResultStatusCalculated);
- }
- else
- throw MakeStringException(TE_UnexpectedParameters, "Unexpected parameters to setResultUInt");
- }
- virtual void setResultVarUnicode(const char * stepname, unsigned sequence, UChar const *val) override
- {
- setResultUnicode(stepname, sequence, rtlUnicodeStrlen(val), val);
- }
- virtual bool getResultBool(const char * stepname, unsigned sequence) override
- {
- PROTECTED_GETRESULT(stepname, sequence, "Bool", "bool",
- return r->getResultBool();
- );
- }
- virtual void getResultData(unsigned & tlen, void * & tgt, const char * stepname, unsigned sequence) override
- {
- PROTECTED_GETRESULT(stepname, sequence, "Data", "data",
- SCMStringBuffer result;
- r->getResultString(result, false);
- tlen = result.length();
- tgt = (char *)result.s.detach();
- );
- }
- virtual void getResultDecimal(unsigned tlen, int precision, bool isSigned, void * tgt, const char * stepname, unsigned sequence) override
- {
- PROTECTED_GETRESULT(stepname, sequence, "Decimal", "decimal",
- r->getResultDecimal(tgt, tlen, precision, isSigned);
- );
- }
- virtual void getResultRaw(unsigned & tlen, void * & tgt, const char * stepname, unsigned sequence, IXmlToRowTransformer * xmlTransformer, ICsvToRowTransformer * csvTransformer) override
- {
- tgt = NULL;
- PROTECTED_GETRESULT(stepname, sequence, "Raw", "raw",
- Variable2IDataVal result(&tlen, &tgt);
- Owned<IXmlToRawTransformer> rawXmlTransformer = createXmlRawTransformer(xmlTransformer);
- Owned<ICsvToRawTransformer> rawCsvTransformer = createCsvRawTransformer(csvTransformer);
- r->getResultRaw(result, rawXmlTransformer, rawCsvTransformer);
- );
- }
- virtual void getResultSet(bool & isAll, unsigned & tlen, void * & tgt, const char * stepname, unsigned sequence, IXmlToRowTransformer * xmlTransformer, ICsvToRowTransformer * csvTransformer) override
- {
- tgt = NULL;
- PROTECTED_GETRESULT(stepname, sequence, "Raw", "raw",
- Variable2IDataVal result(&tlen, &tgt);
- Owned<IXmlToRawTransformer> rawXmlTransformer = createXmlRawTransformer(xmlTransformer);
- Owned<ICsvToRawTransformer> rawCsvTransformer = createCsvRawTransformer(csvTransformer);
- isAll = r->getResultIsAll();
- r->getResultRaw(result, rawXmlTransformer, rawCsvTransformer);
- );
- }
- virtual __int64 getResultInt(const char * name, unsigned sequence) override
- {
- PROTECTED_GETRESULT(name, sequence, "Int", "integer",
- return r->getResultInt();
- );
- }
- virtual double getResultReal(const char * name, unsigned sequence) override
- {
- PROTECTED_GETRESULT(name, sequence, "Real", "real",
- return r->getResultReal();
- );
- }
- virtual void getResultString(unsigned & tlen, char * & tgt, const char * stepname, unsigned sequence) override
- {
- PROTECTED_GETRESULT(stepname, sequence, "String", "string",
- SCMStringBuffer result;
- r->getResultString(result, false);
- tlen = result.length();
- tgt = (char *)result.s.detach();
- );
- }
- virtual void getResultStringF(unsigned tlen, char * tgt, const char * stepname, unsigned sequence) override
- {
- PROTECTED_GETRESULT(stepname, sequence, "String", "string",
- SCMStringBuffer result;
- r->getResultString(result, false);
- rtlStrToStr(tlen, tgt, result.length(), result.s.str());
- );
- }
- virtual void getResultUnicode(unsigned & tlen, UChar * & tgt, const char * stepname, unsigned sequence) override
- {
- PROTECTED_GETRESULT(stepname, sequence, "Unicode", "unicode",
- MemoryBuffer result;
- r->getResultUnicode(MemoryBuffer2IDataVal(result));
- tlen = result.length()/2;
- tgt = (UChar *)malloc(tlen*2);
- memcpy(tgt, result.toByteArray(), tlen*2);
- );
- }
- virtual char * getResultVarString(const char * stepname, unsigned sequence) override
- {
- PROTECTED_GETRESULT(stepname, sequence, "VarString", "string",
- SCMStringBuffer result;
- r->getResultString(result, false);
- return result.s.detach();
- );
- }
- virtual UChar * getResultVarUnicode(const char * stepname, unsigned sequence) override
- {
- PROTECTED_GETRESULT(stepname, sequence, "VarUnicode", "unicode",
- MemoryBuffer result;
- r->getResultUnicode(MemoryBuffer2IDataVal(result));
- unsigned tlen = result.length()/2;
- result.append((UChar)0);
- return (UChar *)result.detach();
- );
- }
- virtual unsigned getResultHash(const char * name, unsigned sequence) override
- {
- PROTECTED_GETRESULT(name, sequence, "Hash", "hash",
- return r->getResultHash();
- );
- }
- virtual unsigned getExternalResultHash(const char * wuid, const char * stepname, unsigned sequence) override
- {
- try
- {
- LOG(MCdebugProgress, unknownJob, "getExternalResultRaw %s", stepname);
- Owned<IConstWUResult> r = getExternalResult(wuid, stepname, sequence);
- return r->getResultHash();
- }
- catch (CATCHALL)
- {
- throw MakeStringException(TE_FailedToRetrieveWorkunitValue, "Failed to retrieve external data hash %s from workunit %s", stepname, wuid);
- }
- }
- virtual void getResultRowset(size32_t & tcount, const byte * * & tgt, const char * stepname, unsigned sequence, IEngineRowAllocator * _rowAllocator, bool isGrouped, IXmlToRowTransformer * xmlTransformer, ICsvToRowTransformer * csvTransformer) override
- {
- tgt = NULL;
- PROTECTED_GETRESULT(stepname, sequence, "Rowset", "rowset",
- MemoryBuffer datasetBuffer;
- MemoryBuffer2IDataVal result(datasetBuffer);
- Owned<IXmlToRawTransformer> rawXmlTransformer = createXmlRawTransformer(xmlTransformer);
- Owned<ICsvToRawTransformer> rawCsvTransformer = createCsvRawTransformer(csvTransformer);
- r->getResultRaw(result, rawXmlTransformer, rawCsvTransformer);
- Owned<IOutputRowDeserializer> deserializer = _rowAllocator->createDiskDeserializer(this);
- rtlDataset2RowsetX(tcount, tgt, _rowAllocator, deserializer, datasetBuffer.length(), datasetBuffer.toByteArray(), isGrouped);
- );
- }
- virtual void getResultDictionary(size32_t & tcount, const byte * * & tgt, IEngineRowAllocator * _rowAllocator, const char * stepname, unsigned sequence, IXmlToRowTransformer * xmlTransformer, ICsvToRowTransformer * csvTransformer, IHThorHashLookupInfo * hasher) override
- {
- tcount = 0;
- tgt = NULL;
- PROTECTED_GETRESULT(stepname, sequence, "Dictionary", "dictionary",
- MemoryBuffer datasetBuffer;
- MemoryBuffer2IDataVal result(datasetBuffer);
- Owned<IXmlToRawTransformer> rawXmlTransformer = createXmlRawTransformer(xmlTransformer);
- Owned<ICsvToRawTransformer> rawCsvTransformer = createCsvRawTransformer(csvTransformer);
- r->getResultRaw(result, rawXmlTransformer, rawCsvTransformer);
- Owned<IOutputRowDeserializer> deserializer = _rowAllocator->createDiskDeserializer(this);
- rtlDeserializeDictionary(tcount, tgt, _rowAllocator, deserializer, datasetBuffer.length(), datasetBuffer.toByteArray());
- );
- }
- virtual void getExternalResultRaw(unsigned & tlen, void * & tgt, const char * wuid, const char * stepname, unsigned sequence, IXmlToRowTransformer * xmlTransformer, ICsvToRowTransformer * csvTransformer) override
- {
- tgt = NULL;
- try
- {
- LOG(MCdebugProgress, unknownJob, "getExternalResultRaw %s", stepname);
- Variable2IDataVal result(&tlen, &tgt);
- Owned<IConstWUResult> r = getExternalResult(wuid, stepname, sequence);
- Owned<IXmlToRawTransformer> rawXmlTransformer = createXmlRawTransformer(xmlTransformer);
- Owned<ICsvToRawTransformer> rawCsvTransformer = createCsvRawTransformer(csvTransformer);
- r->getResultRaw(result, rawXmlTransformer, rawCsvTransformer);
- }
- catch (CATCHALL)
- {
- throw MakeStringException(TE_FailedToRetrieveWorkunitValue, "Failed to retrieve external data value %s from workunit %s", stepname, wuid);
- }
- }
- virtual void addWuException(const char * text, unsigned code, unsigned severity, const char * source) override
- {
- DBGLOG("%s", text);
- try
- {
- Owned<IWorkUnit> w = updateWorkUnit();
- Owned<IWUException> we = w->createException();
- we->setSeverity((ErrorSeverity)severity);
- we->setExceptionMessage(text);
- we->setExceptionSource(source);
- if (code)
- we->setExceptionCode(code);
- }
- catch (IException *E)
- {
- StringBuffer m;
- E->errorMessage(m);
- DBGLOG("Unable to record exception in workunit: %s", m.str());
- E->Release();
- }
- catch (...)
- {
- DBGLOG("Unable to record exception in workunit: unknown exception");
- }
- }
- virtual void addWuAssertFailure(unsigned code, const char * text, const char * filename, unsigned lineno, unsigned column, bool isAbort) override
- {
- DBGLOG("%s", text);
- try
- {
- Owned<IWorkUnit> w = updateWorkUnit();
- unsigned activity = 0;
- addExceptionToWorkunit(w, SeverityError, "user", code, text, filename, lineno, column, activity);
- }
- catch (IException *E)
- {
- StringBuffer m;
- E->errorMessage(m);
- DBGLOG("Unable to record exception in workunit: %s", m.str());
- E->Release();
- }
- catch (...)
- {
- DBGLOG("Unable to record exception in workunit: unknown exception");
- }
- if (isAbort)
- rtlFailOnAssert(); // minimal implementation
- }
- virtual unsigned __int64 getFileOffset(const char *logicalName) override { assertex(false); return 0; }
- virtual unsigned getNodes() override { return jobChannel.queryJob().querySlaves(); }
- virtual unsigned getNodeNum() override { throw MakeThorException(0, "Unsupported. getNodeNum() called in master"); return (unsigned)-1; }
- virtual char *getFilePart(const char *logicalName, bool create=false) override { assertex(false); return NULL; }
- virtual unsigned __int64 getDatasetHash(const char * name, unsigned __int64 hash) override
- {
- unsigned checkSum = 0;
- Owned<IDistributedFile> iDfsFile = queryThorFileManager().lookup(jobChannel.queryJob(), name, false, true, false); // NB: do not update accessed
- if (iDfsFile.get())
- {
- if (iDfsFile->getFileCheckSum(checkSum))
- hash ^= checkSum;
- else
- {
- StringBuffer modifiedStr;
- if (iDfsFile->queryAttributes().getProp("@modified", modifiedStr))
- hash = rtlHash64Data(modifiedStr.length(), modifiedStr.str(), hash);
- // JCS->GH - what's the best thing to do here, if [for some reason] neither are available..
- }
- }
- return hash;
- }
- virtual IDistributedFileTransaction *querySuperFileTransaction() override
- {
- if (!superfiletransaction.get())
- superfiletransaction.setown(createDistributedFileTransaction(userDesc, this));
- return superfiletransaction.get();
- }
- virtual char *getJobName() override
- {
- return strdup(workunit->queryJobName());
- }
- virtual char *getClusterName() override
- {
- return strdup(workunit->queryClusterName());
- }
- virtual char *getGroupName() override
- {
- StringBuffer out;
- if (globals)
- globals->getProp("@nodeGroup",out);
- return out.detach();
- }
- // ICodeContextExt impl.
- virtual IConstWUResult *getExternalResult(const char * wuid, const char *name, unsigned sequence) override
- {
- Owned<IWorkUnitFactory> factory = getWorkUnitFactory();
- Owned<IConstWorkUnit> externalWU = factory->openWorkUnit(wuid);
- if (!externalWU)
- throw MakeStringException(TE_FailedToRetrieveWorkunitValue, "workunit %s not found, retrieving value %s", wuid, name);
- externalWU->remoteCheckAccess(userDesc, false);
- return getWorkUnitResult(externalWU, name, sequence);
- }
- virtual IConstWUResult *getResultForGet(const char *name, unsigned sequence) override
- {
- Owned<IConstWUResult> r = getResult(name, sequence);
- if (!r || (r->getResultStatus() == ResultStatusUndefined))
- {
- StringBuffer s;
- throw MakeStringException(TE_FailedToRetrieveWorkunitValue, "value %s in workunit is undefined", getResultText(s,name,sequence));
- }
- return r.getClear();
- }
- virtual IWorkUnit *updateWorkUnit() const override
- {
- return &workunit->lock();
- }
- };
- class CThorCodeContextMasterSharedMem : public CThorCodeContextMaster
- {
- IThorAllocator *sharedAllocator;
- public:
- CThorCodeContextMasterSharedMem(CJobChannel &jobChannel, IThorAllocator *_sharedAllocator, IConstWorkUnit &_workunit, ILoadedDllEntry &querySo, IUserDescriptor &userDesc)
- : CThorCodeContextMaster(jobChannel, _workunit, querySo, userDesc)
- {
- sharedAllocator = _sharedAllocator;
- }
- virtual IEngineRowAllocator *getRowAllocator(IOutputMetaData * meta, unsigned activityId) const
- {
- return sharedAllocator->getRowAllocator(meta, activityId);
- }
- };
- /////////////
- //
- // CJobMaster
- //
- void loadPlugin(SafePluginMap *pluginMap, const char *_path, const char *name)
- {
- StringBuffer path(_path);
- path.append(name);
- pluginMap->addPlugin(path.str(), name);
- }
- CJobMaster::CJobMaster(IConstWorkUnit &_workunit, const char *graphName, ILoadedDllEntry *querySo, bool _sendSo, const SocketEndpoint &_agentEp)
- : CJobBase(querySo, graphName), workunit(&_workunit), sendSo(_sendSo), agentEp(_agentEp)
- {
- SCMStringBuffer _token, _scope;
- workunit->getScope(_scope);
- workunit->getSecurityToken(_token);
- wuid.set(workunit->queryWuid());
- user.set(workunit->queryUser());
- token.append(_token.str());
- scope.append(_scope.str());
- globalMemoryMB = globals->getPropInt("@masterMemorySize", globals->getPropInt("@globalMemorySize")); // in MB
- numChannels = 1;
- init();
- if (workunit->hasDebugValue("GlobalId"))
- {
- SCMStringBuffer txId;
- workunit->getDebugValue("GlobalId", txId);
- if (txId.length())
- {
- SocketEndpoint thorEp;
- thorEp.setLocalHost(getMachinePortBase());
- logctx->setGlobalId(txId.str(), thorEp, 0);
- VStringBuffer msg("GlobalId: %s", txId.str());
- workunit->getDebugValue("CallerId", txId);
- if (txId.length())
- msg.append(", CallerId: ").append(txId.str());
- txId.set(logctx->queryLocalId());
- if (txId.length())
- msg.append(", LocalId: ").append(txId.str());
- logctx->CTXLOG("%s", msg.str());
- }
- }
- resumed = WUActionResume == workunit->getAction();
- fatalHandler.setown(new CFatalHandler(globals->getPropInt("@fatal_timeout", FATAL_TIMEOUT)));
- querySent = spillsSaved = false;
- nodeDiskUsageCached = false;
- StringBuffer pluginsDir;
- globals->getProp("@pluginsPath", pluginsDir);
- if (pluginsDir.length())
- addPathSepChar(pluginsDir);
- Owned<IConstWUPluginIterator> pluginIter = &workunit->getPlugins();
- ForEach(*pluginIter)
- {
- IConstWUPlugin &plugin = pluginIter->query();
- SCMStringBuffer name;
- plugin.getPluginName(name);
- loadPlugin(pluginMap, pluginsDir.str(), name.str());
- }
- sharedAllocator.setown(::createThorAllocator(globalMemoryMB, 0, 1, memorySpillAtPercentage, *logctx, crcChecking, usePackedAllocator));
- Owned<IMPServer> mpServer = getMPServer();
- addChannel(mpServer);
- slavemptag = allocateMPTag();
- slaveMsgHandler = new CSlaveMessageHandler(*this, slavemptag);
- tmpHandler.setown(createTempHandler(true));
- xgmml.set(graphXGMML);
- }
- CJobMaster::~CJobMaster()
- {
- if (slaveMsgHandler)
- delete slaveMsgHandler;
- freeMPTag(slavemptag);
- tmpHandler.clear();
- }
- void CJobMaster::addChannel(IMPServer *mpServer)
- {
- jobChannels.append(*new CJobMasterChannel(*this, mpServer, jobChannels.ordinality()));
- }
- static IException *createBCastException(unsigned slave, const char *errorMsg)
- {
- // think this should always be fatal, could check link down here, or in general and flag as _shutdown.
- StringBuffer msg("General failure communicating to slave");
- if (slave)
- msg.append("(").append(slave).append(") ");
- else
- msg.append("s ");
- Owned<IThorException> e = MakeThorException(0, "%s", msg.append(" [").append(errorMsg).append("]").str());
- e->setAction(tea_shutdown);
- return e.getClear();
- }
- mptag_t CJobMaster::allocateMPTag()
- {
- mptag_t tag = allocateClusterMPTag();
- queryJobChannel(0).queryJobComm().flush(tag);
- PROGLOG("allocateMPTag: tag = %d", (int)tag);
- return tag;
- }
- void CJobMaster::freeMPTag(mptag_t tag)
- {
- if (TAG_NULL != tag)
- {
- freeClusterMPTag(tag);
- PROGLOG("freeMPTag: tag = %d", (int)tag);
- queryJobChannel(0).queryJobComm().flush(tag);
- }
- }
- void CJobMaster::broadcast(ICommunicator &comm, CMessageBuffer &msg, mptag_t mptag, unsigned timeout, const char *errorMsg, CReplyCancelHandler *msgHandler, bool sendOnly)
- {
- unsigned groupSizeExcludingMaster = comm.queryGroup().ordinality() - 1;
- mptag_t replyTag = TAG_NULL;
- if (!sendOnly)
- {
- replyTag = queryJobChannel(0).queryMPServer().createReplyTag();
- msg.setReplyTag(replyTag);
- }
- if (globals->getPropBool("@broadcastSendAsync", true)) // only here in case of problems/debugging.
- {
- class CSendAsyncfor : public CAsyncFor
- {
- CMessageBuffer &msg;
- mptag_t mptag;
- unsigned timeout;
- StringAttr errorMsg;
- ICommunicator &comm;
- public:
- CSendAsyncfor(ICommunicator &_comm, CMessageBuffer &_msg, mptag_t _mptag, unsigned _timeout, const char *_errorMsg)
- : comm(_comm), msg(_msg), mptag(_mptag), timeout(_timeout), errorMsg(_errorMsg)
- {
- }
- void Do(unsigned i)
- {
- if (!comm.send(msg, i+1, mptag, timeout))
- throw createBCastException(i+1, errorMsg);
- }
- } afor(comm, msg, mptag, timeout, errorMsg);
- try
- {
- afor.For(groupSizeExcludingMaster, groupSizeExcludingMaster);
- }
- catch (IException *e)
- {
- EXCLOG(e, "broadcastSendAsync");
- abort(e);
- throw;
- }
- }
- else if (!comm.send(msg, RANK_ALL_OTHER, mptag, timeout))
- {
- Owned<IException> e = createBCastException(0, errorMsg);
- EXCLOG(e, NULL);
- abort(e);
- throw e.getClear();
- }
- if (sendOnly) return;
- unsigned respondents = 0;
- Owned<IBitSet> bitSet = createThreadSafeBitSet();
- for (;;)
- {
- rank_t sender;
- CMessageBuffer msg;
- bool r = msgHandler ? msgHandler->recv(comm, msg, RANK_ALL, replyTag, &sender, LONGTIMEOUT)
- : comm.recv(msg, RANK_ALL, replyTag, &sender, LONGTIMEOUT);
- if (!r)
- {
- StringBuffer tmpStr;
- if (errorMsg)
- tmpStr.append(": ").append(errorMsg).append(" - ");
- tmpStr.append("Timeout receiving from slaves - no reply from: [");
- unsigned s = bitSet->scan(0, false);
- assertex(s<querySlaves()); // must be at least one
- tmpStr.append(s+1);
- for (;;)
- {
- s = bitSet->scan(s+1, false);
- if (s>=querySlaves())
- break;
- tmpStr.append(",").append(s+1);
- }
- tmpStr.append("]");
- Owned<IException> e = MakeThorFatal(NULL, 0, " %s", tmpStr.str());
- EXCLOG(e, NULL);
- throw e.getClear();
- }
- bool error;
- msg.read(error);
- if (error)
- {
- Owned<IThorException> e = deserializeThorException(msg);
- e->setSlave(sender);
- throw e.getClear();
- }
- ++respondents;
- bitSet->set((unsigned)sender-1);
- if (respondents == groupSizeExcludingMaster)
- break;
- }
- }
- void CJobMaster::initNodeDUCache()
- {
- if (!nodeDiskUsageCached)
- {
- nodeDiskUsageCached = true;
- Owned<IPropertyTreeIterator> fileIter = &workunit->getFileIterator();
- ForEach (*fileIter)
- {
- Owned<IDistributedFile> f = queryDistributedFileDirectory().lookup(fileIter->query().queryProp("@name"), userDesc);
- if (f)
- {
- unsigned n = f->numParts();
- for (unsigned i=0;i<n;i++)
- {
- Owned<IDistributedFilePart> part = f->getPart(i);
- offset_t sz = part->getFileSize(false, false);
- if (i>=nodeDiskUsage.ordinality())
- nodeDiskUsage.append(sz);
- else
- {
- sz += nodeDiskUsage.item(i);
- nodeDiskUsage.add(sz, i);
- }
- }
- }
- }
- }
- }
- IPropertyTree *CJobMaster::prepareWorkUnitInfo()
- {
- Owned<IPropertyTree> workUnitInfo = createPTree("workUnitInfo");
- workUnitInfo->setProp("wuid", wuid);
- workUnitInfo->setProp("user", user);
- workUnitInfo->setProp("token", token);
- workUnitInfo->setProp("scope", scope);
- Owned<IConstWUPluginIterator> pluginIter = &queryWorkUnit().getPlugins();
- IPropertyTree *plugins = NULL;
- ForEach(*pluginIter)
- {
- IConstWUPlugin &thisplugin = pluginIter->query();
- if (!plugins)
- plugins = workUnitInfo->addPropTree("plugins", createPTree());
- SCMStringBuffer name;
- thisplugin.getPluginName(name);
- IPropertyTree *plugin = plugins->addPropTree("plugin", createPTree());
- plugin->setProp("@name", name.str());
- }
- IPropertyTree *debug = workUnitInfo->addPropTree("Debug", createPTree(ipt_caseInsensitive));
- SCMStringBuffer debugStr, valueStr;
- Owned<IStringIterator> debugIter = &queryWorkUnit().getDebugValues();
- ForEach (*debugIter)
- {
- debugIter->str(debugStr);
- queryWorkUnit().getDebugValue(debugStr.str(), valueStr);
- debug->setProp(debugStr.str(), valueStr.str());
- }
- return workUnitInfo.getClear();
- }
- void CJobMaster::sendQuery()
- {
- CriticalBlock b(sendQueryCrit);
- if (querySent) return;
- CMessageBuffer tmp;
- tmp.append(slavemptag);
- tmp.append(queryWuid());
- tmp.append(graphName);
- const char *soName = queryDllEntry().queryName();
- PROGLOG("Query dll: %s", soName);
- tmp.append(soName);
- tmp.append(sendSo);
- if (sendSo)
- {
- CTimeMon atimer;
- OwnedIFile iFile = createIFile(soName);
- OwnedIFileIO iFileIO = iFile->open(IFOread);
- size32_t sz = (size32_t)iFileIO->size();
- tmp.append(sz);
- read(iFileIO, 0, sz, tmp);
- PROGLOG("Loading query for serialization to slaves took %d ms", atimer.elapsed());
- }
- Owned<IPropertyTree> deps = createPTree(queryXGMML()->queryName());
- Owned<IPropertyTreeIterator> edgeIter = queryXGMML()->getElements("edge"); // JCSMORE trim to those actually needed
- ForEach (*edgeIter)
- {
- IPropertyTree &edge = edgeIter->query();
- deps->addPropTree("edge", LINK(&edge));
- }
- Owned<IPropertyTree> workUnitInfo = prepareWorkUnitInfo();
- workUnitInfo->serialize(tmp);
- deps->serialize(tmp);
- CMessageBuffer msg;
- msg.append(QueryInit);
- compressToBuffer(msg, tmp.length(), tmp.toByteArray());
- CTimeMon queryToSlavesTimer;
- broadcast(queryNodeComm(), msg, masterSlaveMpTag, LONGTIMEOUT, "sendQuery");
- PROGLOG("Serialization of query init info (%d bytes) to slaves took %d ms", msg.length(), queryToSlavesTimer.elapsed());
- queryJobManager().addCachedSo(soName);
- querySent = true;
- }
- void CJobMaster::jobDone()
- {
- if (!querySent) return;
- CMessageBuffer msg;
- msg.append(QueryDone);
- msg.append(queryKey());
- broadcast(queryNodeComm(), msg, masterSlaveMpTag, LONGTIMEOUT, "jobDone");
- }
- void CJobMaster::saveSpills()
- {
- CriticalBlock b(spillCrit);
- if (spillsSaved)
- return;
- spillsSaved = true;
- PROGLOG("Paused, saving spills..");
- assertex(!queryUseCheckpoints()); // JCSMORE - checkpoints probably need revisiting
- unsigned numSavedSpills = 0;
- // stash away spills ready for resume, make them owned by workunit in event of abort/delete
- Owned<IFileUsageIterator> iter = queryTempHandler()->getIterator();
- ForEach(*iter)
- {
- CFileUsageEntry &entry = iter->query();
- StringAttr tmpName = entry.queryName();
- if (WUGraphComplete == workunit->queryNodeState(queryGraphName(), entry.queryGraphId()))
- {
- IArrayOf<IGroup> groups;
- StringArray clusters;
- fillClusterArray(*this, tmpName, clusters, groups);
- Owned<IFileDescriptor> fileDesc = queryThorFileManager().create(*this, tmpName, clusters, groups, true, TDXtemporary|TDWnoreplicate);
- fileDesc->queryProperties().setPropBool("@pausefile", true); // JCSMORE - mark to keep, may be able to distinguish via other means
- fileDesc->queryProperties().setProp("@kind", "flat");
- IPropertyTree &props = fileDesc->queryProperties();
- props.setPropBool("@owned", true);
- bool blockCompressed=true; // JCSMORE, should come from helper really
- if (blockCompressed)
- props.setPropBool("@blockCompressed", true);
- Owned<IDistributedFile> file = queryDistributedFileDirectory().createNew(fileDesc);
- // NB: This is renaming/moving from temp path
- StringBuffer newName;
- queryThorFileManager().addScope(*this, tmpName, newName, true, true);
- verifyex(file->renamePhysicalPartFiles(newName.str(), NULL, NULL, queryBaseDirectory(grp_unknown)));
- file->attach(newName,userDesc);
- Owned<IWorkUnit> wu = &queryWorkUnit().lock();
- wu->addFile(newName, &clusters, entry.queryUsage(), entry.queryKind(), queryGraphName());
- ++numSavedSpills;
- }
- }
- PROGLOG("Paused, %d spill(s) saved.", numSavedSpills);
- }
- bool CJobMaster::go()
- {
- class CWorkunitPauseHandler : public CInterface, implements IWorkUnitSubscriber
- {
- CJobMaster &job;
- IConstWorkUnit &wu;
- Owned<IWorkUnitWatcher> watcher;
- CriticalSection crit;
- public:
- IMPLEMENT_IINTERFACE;
- CWorkunitPauseHandler(CJobMaster &_job, IConstWorkUnit &_wu) : job(_job), wu(_wu)
- {
- Owned<IWorkUnitFactory> factory = getWorkUnitFactory();
- watcher.setown(factory->getWatcher(this, (WUSubscribeOptions) (SubscribeOptionAction | SubscribeOptionAbort), wu.queryWuid()));
- }
- ~CWorkunitPauseHandler() { stop(); }
- void stop()
- {
- Owned<IWorkUnitWatcher> _watcher;
- {
- CriticalBlock b(crit);
- if (!watcher)
- return;
- _watcher.setown(watcher.getClear());
- }
- _watcher->unsubscribe();
- }
- void notify(WUSubscribeOptions flags)
- {
- CriticalBlock b(crit);
- if (!watcher)
- return;
- if (flags & SubscribeOptionAbort)
- {
- Owned<IWorkUnitFactory> factory = getWorkUnitFactory();
- if (factory->isAborting(wu.queryWuid()))
- {
- LOG(MCwarning, thorJob, "ABORT detected from user");
- Owned <IException> e = MakeThorException(TE_WorkUnitAborting, "User signalled abort");
- job.fireException(e);
- }
- }
- if (flags & SubscribeOptionAction)
- {
- job.markWuDirty();
- bool abort = false;
- bool pause = false;
- wu.forceReload();
- WUAction action = wu.getAction();
- if (action==WUActionPause)
- {
- // pause after current subgraph
- pause = true;
- }
- else if (action==WUActionPauseNow)
- {
- // abort current subgraph
- abort = true;
- pause = true;
- }
- if (pause)
- {
- PROGLOG("Pausing job%s", abort?" [now]":"");
- job.pause(abort);
- }
- }
- }
- } workunitPauseHandler(*this, *workunit);
- class CQueryTimeoutHandler : public CTimeoutTrigger
- {
- CJobMaster &job;
- public:
- CQueryTimeoutHandler(CJobMaster &_job, unsigned timeout) : CTimeoutTrigger(timeout, "QUERY"), job(_job)
- {
- inform(MakeThorException(TE_QueryTimeoutError, "Query took greater than %d seconds", timeout));
- }
- virtual bool action()
- {
- job.fireException(exception);
- return true;
- }
- private:
- graph_id graphId;
- };
- Owned<CTimeoutTrigger> qtHandler;
- int guillotineTimeout = workunit->getDebugValueInt("maxRunTime", 0);
- if (guillotineTimeout > 0)
- qtHandler.setown(new CQueryTimeoutHandler(*this, guillotineTimeout));
- else if (guillotineTimeout < 0)
- {
- Owned<IException> e = MakeStringException(0, "Ignoring negative maxRunTime: %d", guillotineTimeout);
- reportExceptionToWorkunit(*workunit, e);
- }
- if (WUActionPause == workunit->getAction() || WUActionPauseNow == workunit->getAction())
- throw MakeStringException(0, "Job paused at start, exiting");
- bool allDone = true;
- unsigned concurrentSubGraphs = (unsigned)getWorkUnitValueInt("concurrentSubGraphs", globals->getPropInt("@concurrentSubGraphs", 1));
- try
- {
- startJob();
- workunit->setGraphState(queryGraphName(), WUGraphRunning);
- Owned<IThorGraphIterator> iter = queryJobChannel(0).getSubGraphs();
- CICopyArrayOf<CMasterGraph> toRun;
- ForEach(*iter)
- {
- CMasterGraph &graph = (CMasterGraph &)iter->query();
- if ((queryResumed() || queryUseCheckpoints()) && WUGraphComplete == workunit->queryNodeState(queryGraphName(), graph.queryGraphId()))
- graph.setCompleteEx();
- else
- toRun.append(graph);
- }
- ForEachItemInRev(g, toRun)
- {
- if (aborted) break;
- CMasterGraph &graph = toRun.item(g);
- if (graph.isSink())
- graph.execute(0, NULL, true, concurrentSubGraphs>1);
- if (queryPausing()) break;
- }
- queryJobChannel(0).wait();
- workunitPauseHandler.stop();
- ForEachItemIn(tr, toRun)
- {
- CMasterGraph &graph = toRun.item(tr);
- if (!graph.isComplete())
- {
- allDone = false;
- break;
- }
- }
- }
- catch (IException *e) { fireException(e); e->Release(); }
- catch (CATCHALL) { Owned<IException> e = MakeThorException(0, "Unknown exception running sub graphs"); fireException(e); }
- workunit->setGraphState(queryGraphName(), aborted?WUGraphFailed:(allDone?WUGraphComplete:(pausing?WUGraphPaused:WUGraphComplete)));
- if (queryPausing())
- saveSpills();
- Owned<IException> jobDoneException;
- try { jobDone(); }
- catch (IException *e)
- {
- EXCLOG(e, NULL);
- jobDoneException.setown(e);
- }
- fatalHandler->clear();
- queryTempHandler()->clearTemps();
- slaveMsgHandler->stop();
- if (jobDoneException.get())
- throw LINK(jobDoneException);
- return allDone;
- }
- void CJobMaster::pause(bool doAbort)
- {
- pausing = true;
- if (doAbort)
- {
- // reply will trigger DAMP_THOR_REPLY_PAUSED to agent, unless all graphs are already complete.
- queryJobManager().replyException(*this, NULL);
- // abort current graph asynchronously.
- // After spill files have been saved, trigger timeout handler in case abort doesn't succeed.
- Owned<IException> e = MakeThorException(0, "Unable to recover from pausenow");
- class CAbortThread : implements IThreaded
- {
- CJobMaster &owner;
- CThreaded threaded;
- Linked<IException> exception;
- public:
- CAbortThread(CJobMaster &_owner, IException *_exception) : owner(_owner), exception(_exception), threaded("SaveSpillThread", this)
- {
- threaded.start();
- }
- ~CAbortThread()
- {
- threaded.join();
- }
- // IThreaded
- virtual void threadmain() override
- {
- owner.abort(exception);
- }
- } abortThread(*this, e);
- saveSpills();
- fatalHandler->inform(e.getClear());
- }
- }
- __int64 CJobMaster::queryNodeDiskUsage(unsigned node)
- {
- initNodeDUCache();
- if (!nodeDiskUsage.isItem(node)) return 0;
- return nodeDiskUsage.item(node);
- }
- void CJobMaster::setNodeDiskUsage(unsigned node, __int64 sz)
- {
- initNodeDUCache();
- while (nodeDiskUsage.ordinality() <= node)
- nodeDiskUsage.append(0);
- nodeDiskUsage.replace(sz, node);
- }
- __int64 CJobMaster::addNodeDiskUsage(unsigned node, __int64 sz)
- {
- sz += queryNodeDiskUsage(node);
- setNodeDiskUsage(node, sz);
- return sz;
- }
- bool CJobMaster::queryCreatedFile(const char *file)
- {
- StringBuffer scopedName;
- queryThorFileManager().addScope(*this, file, scopedName, false);
- return (NotFound != createdFiles.find(scopedName.str()));
- }
- void CJobMaster::addCreatedFile(const char *file)
- {
- StringBuffer scopedName;
- queryThorFileManager().addScope(*this, file, scopedName, false);
- createdFiles.append(scopedName.str());
- }
- __int64 CJobMaster::getWorkUnitValueInt(const char *prop, __int64 defVal) const
- {
- return queryWorkUnit().getDebugValueInt64(prop, defVal);
- }
- bool CJobMaster::getWorkUnitValueBool(const char *prop, bool defVal) const
- {
- return queryWorkUnit().getDebugValueBool(prop, defVal);
- }
- StringBuffer &CJobMaster::getWorkUnitValue(const char *prop, StringBuffer &str) const
- {
- SCMStringBuffer scmStr;
- queryWorkUnit().getDebugValue(prop, scmStr);
- return str.append(scmStr.str());
- }
- IGraphTempHandler *CJobMaster::createTempHandler(bool errorOnMissing)
- {
- return new CMasterGraphTempHandler(*this, errorOnMissing);
- }
- bool CJobMaster::fireException(IException *e)
- {
- IThorException *te = QUERYINTERFACE(e, IThorException);
- ThorExceptionAction action;
- if (!te) action = tea_null;
- else action = te->queryAction();
- if (QUERYINTERFACE(e, IMP_Exception) && MPERR_link_closed==e->errorCode())
- action = tea_shutdown;
- else if (QUERYINTERFACE(e, ISEH_Exception))
- action = tea_shutdown;
- CriticalBlock b(exceptCrit);
- switch (action)
- {
- case tea_warning:
- {
- LOG(MCwarning, thorJob, e);
- ErrorSeverity mappedSeverity = workunit->getWarningSeverity(e->errorCode(), SeverityWarning);
- if (mappedSeverity != SeverityIgnore)
- reportExceptionToWorkunit(*workunit, e);
- break;
- }
- default:
- {
- LOG(MCerror, thorJob, e);
- queryJobManager().replyException(*this, e);
- fatalHandler->inform(LINK(e));
- try { abort(e); }
- catch (IException *e)
- {
- Owned<IThorException> te = ThorWrapException(e, "Error aborting job, will cause thor restart");
- e->Release();
- reportExceptionToWorkunit(*workunit, te);
- action = tea_shutdown;
- }
- if (tea_shutdown == action)
- queryJobManager().stop();
- }
- }
- return true;
- }
- // CJobMasterChannel
- CJobMasterChannel::CJobMasterChannel(CJobBase &job, IMPServer *mpServer, unsigned channel) : CJobChannel(job, mpServer, channel)
- {
- codeCtx.setown(new CThorCodeContextMaster(*this, job.queryWorkUnit(), job.queryDllEntry(), *job.queryUserDescriptor()));
- sharedMemCodeCtx.setown(new CThorCodeContextMasterSharedMem(*this, job.querySharedAllocator(), job.queryWorkUnit(), job.queryDllEntry(), *job.queryUserDescriptor()));
- }
- CGraphBase *CJobMasterChannel::createGraph()
- {
- return new CMasterGraph(*this);
- }
- IBarrier *CJobMasterChannel::createBarrier(mptag_t tag)
- {
- return new CBarrierMaster(*jobComm, tag);
- }
- ///////////////////
- class CCollatedResult : implements IThorResult, public CSimpleInterface
- {
- CMasterGraph &graph;
- CActivityBase &activity;
- IThorRowInterfaces *rowIf;
- unsigned id;
- CriticalSection crit;
- PointerArrayOf<CThorExpandingRowArray> results;
- Owned<IThorResult> result;
- unsigned spillPriority;
- activity_id ownerId;
- void ensure()
- {
- CriticalBlock b(crit);
- if (result)
- return;
- mptag_t replyTag = graph.queryMPServer().createReplyTag();
- CMessageBuffer msg;
- msg.append(GraphGetResult);
- msg.append(activity.queryJob().queryKey());
- msg.append(graph.queryGraphId());
- msg.append(ownerId);
- msg.append(id);
- msg.append(replyTag);
- ((CJobMaster &)graph.queryJob()).broadcast(queryNodeComm(), msg, masterSlaveMpTag, LONGTIMEOUT, "CCollectResult", NULL, true);
- unsigned numSlaves = graph.queryJob().querySlaves();
- for (unsigned n=0; n<numSlaves; n++)
- results.item(n)->kill();
- rank_t sender;
- MemoryBuffer mb;
- Owned<ISerialStream> stream = createMemoryBufferSerialStream(mb);
- CThorStreamDeserializerSource rowSource(stream);
- unsigned todo = numSlaves;
- for (;;)
- {
- for (;;)
- {
- if (activity.queryAbortSoon())
- return;
- msg.clear();
- if (activity.receiveMsg(msg, RANK_ALL, replyTag, &sender, 60*1000))
- break;
- ActPrintLog(&activity, "WARNING: tag %d timedout, retrying", (unsigned)replyTag);
- }
- sender = sender - 1; // 0 = master
- if (!msg.length())
- {
- --todo;
- if (0 == todo)
- break; // done
- }
- else
- {
- bool error;
- msg.read(error);
- if (error)
- {
- Owned<IThorException> e = deserializeThorException(msg);
- e->setSlave(sender);
- throw e.getClear();
- }
- ThorExpand(msg, mb.clear());
- CThorExpandingRowArray *slaveResults = results.item(sender);
- while (!rowSource.eos())
- {
- RtlDynamicRowBuilder rowBuilder(rowIf->queryRowAllocator());
- size32_t sz = rowIf->queryRowDeserializer()->deserialize(rowBuilder, rowSource);
- slaveResults->append(rowBuilder.finalizeRowClear(sz));
- }
- }
- }
- Owned<IThorResult> _result = ::createResult(activity, rowIf, thorgraphresult_nul, spillPriority);
- Owned<IRowWriter> resultWriter = _result->getWriter();
- for (unsigned s=0; s<numSlaves; s++)
- {
- CThorExpandingRowArray *slaveResult = results.item(s);
- ForEachItemIn(r, *slaveResult)
- {
- const void *row = slaveResult->query(r);
- LinkThorRow(row);
- resultWriter->putRow(row);
- }
- }
- result.setown(_result.getClear());
- }
- public:
- IMPLEMENT_IINTERFACE_USING(CSimpleInterface);
- CCollatedResult(CMasterGraph &_graph, CActivityBase &_activity, IThorRowInterfaces *_rowIf, unsigned _id, activity_id _ownerId, unsigned _spillPriority)
- : graph(_graph), activity(_activity), rowIf(_rowIf), id(_id), ownerId(_ownerId), spillPriority(_spillPriority)
- {
- for (unsigned n=0; n<graph.queryJob().querySlaves(); n++)
- results.append(new CThorExpandingRowArray(activity, rowIf));
- }
- ~CCollatedResult()
- {
- ForEachItemIn(l, results)
- {
- CThorExpandingRowArray *result = results.item(l);
- delete result;
- }
- }
- void setId(unsigned _id)
- {
- id = _id;
- }
- // IThorResult
- virtual IRowWriter *getWriter() { throwUnexpected(); }
- virtual void setResultStream(IRowWriterMultiReader *stream, rowcount_t count)
- {
- throwUnexpected();
- }
- virtual IRowStream *getRowStream()
- {
- ensure();
- return result->getRowStream();
- }
- virtual IThorRowInterfaces *queryRowInterfaces()
- {
- return rowIf;
- }
- virtual CActivityBase *queryActivity()
- {
- return &activity;
- }
- virtual bool isDistributed() const { return false; }
- virtual void serialize(MemoryBuffer &mb)
- {
- ensure();
- result->serialize(mb);
- }
- virtual void getLinkedResult(unsigned & count, const byte * * & ret) override
- {
- ensure();
- result->getLinkedResult(count, ret);
- }
- virtual const void * getLinkedRowResult()
- {
- ensure();
- return result->getLinkedRowResult();
- }
- };
- ///////////////////
- //
- // CMasterGraph impl.
- //
- CMasterGraph::CMasterGraph(CJobChannel &jobChannel) : CGraphBase(jobChannel)
- {
- jobM = (CJobMaster *)&jobChannel.queryJob();
- mpTag = queryJob().allocateMPTag();
- startBarrierTag = queryJob().allocateMPTag();
- waitBarrierTag = queryJob().allocateMPTag();
- startBarrier = jobChannel.createBarrier(startBarrierTag);
- waitBarrier = jobChannel.createBarrier(waitBarrierTag);
- }
- CMasterGraph::~CMasterGraph()
- {
- queryJob().freeMPTag(mpTag);
- queryJob().freeMPTag(startBarrierTag);
- queryJob().freeMPTag(waitBarrierTag);
- if (TAG_NULL != doneBarrierTag)
- queryJob().freeMPTag(doneBarrierTag);
- if (TAG_NULL != executeReplyTag)
- queryJob().freeMPTag(executeReplyTag);
- }
- void CMasterGraph::init()
- {
- CGraphBase::init();
- if (queryOwner() && isGlobal())
- {
- doneBarrierTag = queryJob().allocateMPTag();
- doneBarrier = jobChannel.createBarrier(doneBarrierTag);
- }
- }
- bool CMasterGraph::fireException(IException *e)
- {
- IThorException *te = QUERYINTERFACE(e, IThorException);
- ThorExceptionAction action;
- if (!te) action = tea_null;
- else action = te->queryAction();
- if (QUERYINTERFACE(e, IMP_Exception) && MPERR_link_closed==e->errorCode())
- action = tea_shutdown;
- else if (QUERYINTERFACE(e, ISEH_Exception))
- action = tea_shutdown;
- CriticalBlock b(exceptCrit);
- switch (action)
- {
- case tea_warning:
- {
- LOG(MCwarning, thorJob, e);
- ErrorSeverity mappedSeverity = job.queryWorkUnit().getWarningSeverity(e->errorCode(), SeverityWarning);
- if (mappedSeverity != SeverityIgnore)
- reportExceptionToWorkunit(job.queryWorkUnit(), e);
- break;
- }
- default:
- {
- LOG(MCerror, thorJob, e);
- if (NULL != fatalHandler)
- fatalHandler->inform(LINK(e));
- if (owner)
- owner->fireException(e);
- else
- queryJobChannel().fireException(e);
- }
- }
- return true;
- }
- void CMasterGraph::reset()
- {
- CGraphBase::reset();
- bcastMsgHandler.reset();
- activityInitMsgHandler.reset();
- executeReplyMsgHandler.reset();
- }
- void CMasterGraph::abort(IException *e)
- {
- if (aborted) return;
- bool _graphDone = graphDone; // aborting master activities can trigger master graphDone, but want to fire GraphAbort to slaves if graphDone=false at start.
- try { CGraphBase::abort(e); }
- catch (IException *e)
- {
- GraphPrintLog(e, "Aborting master graph");
- e->Release();
- }
- bcastMsgHandler.cancel(0);
- activityInitMsgHandler.cancel(RANK_ALL);
- executeReplyMsgHandler.cancel(RANK_ALL);
- if (started && !_graphDone)
- {
- try
- {
- CMessageBuffer msg;
- msg.append(GraphAbort);
- msg.append(job.queryKey());
- msg.append(queryGraphId());
- jobM->broadcast(queryNodeComm(), msg, masterSlaveMpTag, LONGTIMEOUT, "abort");
- }
- catch (IException *e)
- {
- GraphPrintLog(e, "Aborting slave graph");
- if (abortException)
- {
- e->Release();
- throw LINK(abortException);
- }
- throw;
- }
- }
- if (!queryOwner())
- {
- if (globals->getPropBool("@watchdogProgressEnabled"))
- queryJobManager().queryDeMonServer()->endGraph(this, !queryAborted());
- }
- }
- bool CMasterGraph::serializeActivityInitData(unsigned slave, MemoryBuffer &mb, IThorActivityIterator &iter)
- {
- CriticalBlock b(createdCrit);
- DelayedSizeMarker sizeMark1(mb);
- ForEach (iter)
- {
- CMasterGraphElement &element = (CMasterGraphElement &)iter.query();
- if (!element.sentActInitData->testSet(slave))
- {
- CMasterActivity *activity = (CMasterActivity *)element.queryActivity();
- if (activity)
- {
- mb.append(element.queryId());
- DelayedSizeMarker sizeMark2(mb);
- activity->serializeSlaveData(mb, slave);
- sizeMark2.write();
- }
- }
- }
- if (0 == sizeMark1.size())
- return false;
- mb.append((activity_id)0); // terminator
- sizeMark1.write();
- return true;
- }
- void CMasterGraph::readActivityInitData(MemoryBuffer &mb, unsigned slave)
- {
- for (;;)
- {
- activity_id id;
- mb.read(id);
- if (0 == id)
- break;
- size32_t dataLen;
- mb.read(dataLen);
- CGraphElementBase *element = queryElement(id);
- MemoryBuffer &mbDst = element->queryActivity()->queryInitializationData(slave);
- mbDst.append(dataLen, mb.readDirect(dataLen));
- }
- }
- bool CMasterGraph::prepare(size32_t parentExtractSz, const byte *parentExtract, bool checkDependencies, bool shortCircuit, bool async)
- {
- if (!CGraphBase::prepare(parentExtractSz, parentExtract, checkDependencies, shortCircuit, async)) return false;
- if (aborted) return false;
- return true;
- }
- void CMasterGraph::execute(size32_t _parentExtractSz, const byte *parentExtract, bool checkDependencies, bool async)
- {
- if (isComplete())
- return;
- if (!queryOwner()) // owning graph sends query+child graphs
- jobM->sendQuery(); // if not previously sent
- CGraphBase::execute(parentExtractSz, parentExtract, checkDependencies, async);
- }
- void CMasterGraph::start()
- {
- if (!queryOwner())
- {
- if (globals->getPropBool("@watchdogProgressEnabled"))
- queryJobManager().queryDeMonServer()->startGraph(this);
- }
- Owned<IThorActivityIterator> iter = getConnectedIterator();
- ForEach (*iter)
- iter->query().queryActivity()->startProcess();
- }
- void CMasterGraph::sendActivityInitData()
- {
- CMessageBuffer msg;
- mptag_t replyTag = queryJobChannel().queryMPServer().createReplyTag();
- msg.setReplyTag(replyTag);
- unsigned pos = msg.length();
- unsigned w=0;
- unsigned sentTo = 0;
- Owned<IThorActivityIterator> iter = getConnectedIterator();
- for (; w<queryJob().querySlaves(); w++)
- {
- unsigned needActInit = 0;
- ForEach(*iter)
- {
- CGraphElementBase &element = iter->query();
- CActivityBase *activity = element.queryActivity();
- if (activity && activity->needReInit())
- element.sentActInitData->set(w, false); // force act init to be resent
- if (!element.sentActInitData->test(w)) // has it been sent
- ++needActInit;
- }
- if (needActInit)
- {
- try
- {
- msg.rewrite(pos);
- serializeActivityInitData(w, msg, *iter);
- }
- catch (IException *e)
- {
- GraphPrintLog(e);
- throw;
- }
- if (!queryJobChannel().queryJobComm().send(msg, w+1, mpTag, LONGTIMEOUT))
- {
- StringBuffer epStr;
- throw MakeStringException(0, "Timeout sending to slave %s", job.querySlaveGroup().queryNode(w).endpoint().getUrlStr(epStr).str());
- }
- ++sentTo;
- }
- }
- if (sentTo)
- {
- assertex(sentTo == queryJob().querySlaves());
- w=0;
- Owned<IException> e;
- // now get back initialization data from graph tag
- for (; w<queryJob().querySlaves(); w++)
- {
- rank_t sender;
- msg.clear();
- if (!activityInitMsgHandler.recv(queryJobChannel().queryJobComm(), msg, w+1, replyTag, &sender, LONGTIMEOUT))
- throw MakeGraphException(this, 0, "Timeout receiving from slaves after graph sent");
- bool error;
- msg.read(error);
- if (error)
- {
- Owned<IThorException> se = deserializeThorException(msg);
- se->setSlave(sender);
- if (!e.get())
- {
- StringBuffer tmpStr("Slave ");
- queryJob().queryJobGroup().queryNode(sender).endpoint().getUrlStr(tmpStr);
- GraphPrintLog(se, "%s", tmpStr.append(": slave initialization error").str());
- e.setown(se.getClear());
- }
- continue; // to read other slave responses.
- }
- readActivityInitData(msg, w);
- }
- if (e.get())
- throw LINK(e);
- }
- }
- void CMasterGraph::serializeGraphInit(MemoryBuffer &mb)
- {
- mb.append(graphId);
- serializeMPtag(mb, mpTag);
- mb.append((int)startBarrierTag);
- mb.append((int)waitBarrierTag);
- mb.append((int)doneBarrierTag);
- mb.append(queryChildGraphCount());
- Owned<IThorGraphIterator> childIter = getChildGraphIterator();
- ForEach (*childIter)
- {
- CMasterGraph &childGraph = (CMasterGraph &)childIter->query();
- childGraph.serializeGraphInit(mb);
- }
- }
- // IThorChildGraph impl.
- IEclGraphResults *CMasterGraph::evaluate(unsigned _parentExtractSz, const byte *parentExtract)
- {
- throw MakeGraphException(this, 0, "Thor master does not support the execution of child queries");
- }
- void CMasterGraph::executeSubGraph(size32_t parentExtractSz, const byte *parentExtract)
- {
- if (job.queryResumed()) // skip complete subgraph if resuming. NB: temp spill have been tucked away for this purpose when paused.
- {
- if (!queryOwner())
- {
- if (WUGraphComplete == job.queryWorkUnit().queryNodeState(job.queryGraphName(), graphId))
- setCompleteEx();
- }
- }
- if (isComplete())
- return;
- if (!sentGlobalInit)
- {
- sentGlobalInit = true;
- if (!queryOwner())
- sendGraph();
- else
- {
- if (isGlobal())
- {
- CMessageBuffer msg;
- serializeCreateContexts(msg);
- try
- {
- jobM->broadcast(queryJobChannel().queryJobComm(), msg, mpTag, LONGTIMEOUT, "serializeCreateContexts", &bcastMsgHandler);
- }
- catch (IException *e)
- {
- GraphPrintLog(e, "Aborting graph create(2)");
- if (abortException)
- {
- e->Release();
- throw LINK(abortException);
- }
- throw;
- }
- }
- }
- }
- fatalHandler.clear();
- fatalHandler.setown(new CFatalHandler(globals->getPropInt("@fatal_timeout", FATAL_TIMEOUT)));
- CGraphBase::executeSubGraph(parentExtractSz, parentExtract);
- if (TAG_NULL != executeReplyTag)
- {
- rank_t sender;
- unsigned s=0;
- for (; s<queryJob().querySlaves(); s++)
- {
- CMessageBuffer msg;
- if (!executeReplyMsgHandler.recv(queryJobChannel().queryJobComm(), msg, RANK_ALL, executeReplyTag, &sender))
- break;
- bool error;
- msg.read(error);
- if (error)
- {
- Owned<IThorException> exception = deserializeThorException(msg);
- exception->setSlave(sender);
- GraphPrintLog(exception, "slave execute reply exception");
- throw exception.getClear();
- }
- }
- if (fatalHandler)
- fatalHandler->clear();
- }
- fatalHandler.clear();
- Owned<IWorkUnit> wu = &job.queryWorkUnit().lock();
- queryJobManager().updateWorkUnitLog(*wu);
- }
- void CMasterGraph::sendGraph()
- {
- CTimeMon atimer;
- CMessageBuffer msg;
- msg.append(GraphInit);
- msg.append(job.queryKey());
- if (TAG_NULL == executeReplyTag)
- executeReplyTag = jobM->allocateMPTag();
- serializeMPtag(msg, executeReplyTag);
- serializeCreateContexts(msg);
- serializeGraphInit(msg);
- // slave graph data
- try
- {
- jobM->broadcast(queryNodeComm(), msg, masterSlaveMpTag, LONGTIMEOUT, "sendGraph", &bcastMsgHandler);
- }
- catch (IException *e)
- {
- GraphPrintLog(e, "Aborting sendGraph");
- if (abortException)
- {
- e->Release();
- throw LINK(abortException);
- }
- throw;
- }
- GraphPrintLog("sendGraph took %d ms", atimer.elapsed());
- }
- bool CMasterGraph::preStart(size32_t parentExtractSz, const byte *parentExtract)
- {
- GraphPrintLog("Processing graph");
- if (syncInitData())
- {
- sendActivityInitData(); // has to be done at least once
- // NB: At this point, on the slaves, the graphs will start
- }
- CGraphBase::preStart(parentExtractSz, parentExtract);
- if (isGlobal())
- {
- if (!startBarrier->wait(true)) // ensure all graphs are at start point at same time, as some may request data from each other.
- return false;
- }
- if (!queryOwner())
- job.queryWorkUnit().setNodeState(job.queryGraphName(), graphId, WUGraphRunning);
- return true;
- }
- void CMasterGraph::handleSlaveDone(unsigned node, MemoryBuffer &mb)
- {
- graph_id gid;
- mb.read(gid);
- assertex(gid == queryGraphId());
- unsigned count;
- mb.read(count);
- while (count--)
- {
- activity_id activityId;
- mb.read(activityId);
- CMasterGraphElement *act = (CMasterGraphElement *)queryElement(activityId);
- unsigned len;
- mb.read(len);
- const void *d = mb.readDirect(len);
- MemoryBuffer sdMb;
- sdMb.setBuffer(len, (void *)d);
- act->slaveDone(node, sdMb);
- }
- }
- void CMasterGraph::getFinalProgress()
- {
- offset_t totalDiskUsage = 0;
- offset_t minNodeDiskUsage = 0, maxNodeDiskUsage = 0;
- unsigned maxNode = (unsigned)-1, minNode = (unsigned)-1;
- CMessageBuffer msg;
- mptag_t replyTag = queryJobChannel().queryMPServer().createReplyTag();
- msg.setReplyTag(replyTag);
- msg.append((unsigned)GraphEnd);
- msg.append(job.queryKey());
- msg.append(queryGraphId());
- jobM->broadcast(queryNodeComm(), msg, masterSlaveMpTag, LONGTIMEOUT, "graphEnd", NULL, true);
- Owned<IBitSet> respondedBitSet = createBitSet();
- unsigned n=queryJob().queryNodes();
- while (n--)
- {
- rank_t sender;
- if (!queryNodeComm().recv(msg, RANK_ALL, replyTag, &sender, LONGTIMEOUT))
- {
- StringBuffer slaveList;
- n=queryJob().queryNodes();
- unsigned s = 0;
- for (;;)
- {
- unsigned ns = respondedBitSet->scan(s, true); // scan for next respondent
- // list all non-respondents up to this found respondent (or rest of slaves)
- while (s<ns && s<n) // NB: if scan find nothing else set, ns = (unsigned)-1
- {
- if (slaveList.length())
- slaveList.append(",");
- ++s; // inc. 1st as slaveList is list of slaves that are 1 based.
- slaveList.append(s);
- }
- ++s;
- if (s>=n)
- break;
- }
- throw MakeGraphException(this, 0, "Timeout receiving final progress from slaves - these slaves failed to respond: %s", slaveList.str());
- }
- bool error;
- msg.read(error);
- if (error)
- {
- Owned<IThorException> e = deserializeThorException(msg);
- e->setSlave(sender);
- throw e.getClear();
- }
- respondedBitSet->set(((unsigned)sender)-1);
- if (0 == msg.remaining())
- continue;
- unsigned channelsPerSlave = globals->getPropInt("@channelsPerSlave", 1); // JCSMORE - should move somewhere common
- for (unsigned sc=0; sc<channelsPerSlave; sc++)
- {
- unsigned slave;
- msg.read(slave);
- handleSlaveDone(slave, msg);
- if (!queryOwner())
- {
- if (globals->getPropBool("@watchdogProgressEnabled"))
- {
- try
- {
- size32_t progressLen;
- msg.read(progressLen);
- MemoryBuffer progressData;
- progressData.setBuffer(progressLen, (void *)msg.readDirect(progressLen));
- queryJobManager().queryDeMonServer()->takeHeartBeat(progressData);
- }
- catch (IException *e)
- {
- GraphPrintLog(e, "Failure whilst deserializing stats/progress");
- e->Release();
- }
- }
- }
- offset_t nodeDiskUsage;
- msg.read(nodeDiskUsage);
- jobM->setNodeDiskUsage(n, nodeDiskUsage);
- if (nodeDiskUsage > maxNodeDiskUsage)
- {
- maxNodeDiskUsage = nodeDiskUsage;
- maxNode = n;
- }
- if ((unsigned)-1 == minNode || nodeDiskUsage < minNodeDiskUsage)
- {
- minNodeDiskUsage = nodeDiskUsage;
- minNode = n;
- }
- totalDiskUsage += nodeDiskUsage;
- }
- }
- if (totalDiskUsage)
- {
- Owned<IWorkUnit> wu = &job.queryWorkUnit().lock();
- wu->addDiskUsageStats(totalDiskUsage/queryJob().querySlaves(), minNode, minNodeDiskUsage, maxNode, maxNodeDiskUsage, queryGraphId());
- }
- }
- void CMasterGraph::done()
- {
- if (started)
- {
- if (!aborted && (!queryOwner() || isGlobal()))
- getFinalProgress(); // waiting for slave graph to finish and send final progress update + extra act end info.
- }
- CGraphBase::done();
- if (started && !queryOwner())
- {
- if (globals->getPropBool("@watchdogProgressEnabled"))
- queryJobManager().queryDeMonServer()->endGraph(this, true);
- }
- }
- void CMasterGraph::setComplete(bool tf)
- {
- CGraphBase::setComplete(tf);
- if (tf && !queryOwner())
- {
- job.queryWorkUnit().setNodeState(job.queryGraphName(), graphId, graphDone?WUGraphComplete:WUGraphFailed);
- }
- }
- bool CMasterGraph::deserializeStats(unsigned node, MemoryBuffer &mb)
- {
- CriticalBlock b(createdCrit);
- unsigned count, _count;
- mb.read(count);
- if (count)
- setProgressUpdated();
- _count = count;
- while (count--)
- {
- activity_id activityId;
- mb.read(activityId);
- CMasterActivity *activity = NULL;
- CMasterGraphElement *element = (CMasterGraphElement *)queryElement(activityId);
- if (element)
- {
- activity = (CMasterActivity *)element->queryActivity();
- if (!activity)
- {
- CGraphBase *parentGraph = element->queryOwner().queryOwner(); // i.e. am I in a childgraph
- if (!parentGraph)
- {
- GraphPrintLog("Activity id=%" ACTPF "d not created in master and not a child query activity", activityId);
- return false; // don't know if or how this could happen, but all bets off with packet if did.
- }
- Owned<IException> e;
- try
- {
- element->createActivity();
- activity = (CMasterActivity *)element->queryActivity();
- }
- catch (IException *_e)
- {
- e.setown(_e);
- GraphPrintLog(_e, "In deserializeStats");
- }
- if (!activity || e.get())
- {
- GraphPrintLog("Activity id=%" ACTPF "d failed to created child query activity ready for progress", activityId);
- return false;
- }
- }
- if (activity)
- activity->deserializeStats(node, mb);
- }
- else
- {
- GraphPrintLog("Failed to find activity, during progress deserialization, id=%" ACTPF "d", activityId);
- return false; // don't know if or how this could happen, but all bets off with packet if did.
- }
- }
- unsigned subs, _subs;
- mb.read(subs);
- _subs = subs;
- while (subs--)
- {
- graph_id subId;
- mb.read(subId);
- Owned<CMasterGraph> graph = (CMasterGraph *)job.queryJobChannel(0).getGraph(subId);
- if (NULL == graph.get())
- return false;
- if (!graph->deserializeStats(node, mb))
- return false;
- }
- return true;
- }
- IThorResult *CMasterGraph::createResult(CActivityBase &activity, unsigned id, IThorGraphResults *results, IThorRowInterfaces *rowIf, ThorGraphResultType resultType, unsigned spillPriority)
- {
- Owned<CCollatedResult> result = new CCollatedResult(*this, activity, rowIf, id, results->queryOwnerId(), spillPriority);
- results->setResult(id, result);
- return result;
- }
- IThorResult *CMasterGraph::createResult(CActivityBase &activity, unsigned id, IThorRowInterfaces *rowIf, ThorGraphResultType resultType, unsigned spillPriority)
- {
- Owned<CCollatedResult> result = new CCollatedResult(*this, activity, rowIf, id, localResults->queryOwnerId(), spillPriority);
- localResults->setResult(id, result);
- return result;
- }
- IThorResult *CMasterGraph::createGraphLoopResult(CActivityBase &activity, IThorRowInterfaces *rowIf, ThorGraphResultType resultType, unsigned spillPriority)
- {
- Owned<CCollatedResult> result = new CCollatedResult(*this, activity, rowIf, 0, localResults->queryOwnerId(), spillPriority);
- unsigned id = graphLoopResults->addResult(result);
- result->setId(id);
- return result;
- }
- ///////////////////////////////////////////////////
- static bool suppressStatisticIfZero(StatisticKind kind)
- {
- switch (kind)
- {
- case StNumSpills:
- case StSizeSpillFile:
- case StTimeSpillElapsed:
- case StNumDiskRetries:
- return true;
- }
- return false;
- }
- ///////////////////////////////////////////////////
- CThorStats::CThorStats(CJobBase &_ctx, StatisticKind _kind) : ctx(_ctx), kind(_kind)
- {
- unsigned c = queryClusterWidth();
- while (c--) counts.append(0);
- reset();
- }
- void CThorStats::extract(unsigned node, const CRuntimeStatisticCollection & stats)
- {
- set(node, stats.getStatisticValue(kind));
- }
- void CThorStats::set(unsigned node, unsigned __int64 count)
- {
- counts.replace(count, node);
- }
- void CThorStats::reset()
- {
- tot = max = avg = 0;
- min = (unsigned __int64) -1;
- minNode = maxNode = maxSkew = minSkew = maxNode = minNode = 0;
- }
- void CThorStats::calculateSkew()
- {
- unsigned count = counts.ordinality();
- double _avg = (double)tot/count;
- if (_avg)
- {
- if (max > ctx.querySlaves()) // i.e. if small count, suppress skew stats.
- {
- //MORE: Range protection on maxSkew?
- maxSkew = (unsigned)(10000.0 * (((double)max-_avg)/_avg));
- minSkew = (unsigned)(10000.0 * ((_avg-(double)min)/_avg));
- }
- avg = (unsigned __int64)_avg;
- }
- }
- void CThorStats::tallyValue(unsigned __int64 thiscount, unsigned n)
- {
- tot += thiscount;
- if (thiscount > max)
- {
- max = thiscount;
- maxNode = n;
- }
- if (thiscount < min)
- {
- min = thiscount;
- minNode = n;
- }
- }
- void CThorStats::processInfo()
- {
- reset();
- ForEachItemIn(n, counts)
- {
- unsigned __int64 thiscount = counts.item(n);
- tallyValue(thiscount, n+1);
- }
- calculateSkew();
- }
- void CThorStats::getStats(IStatisticGatherer & stats, bool suppressMinMaxWhenEqual)
- {
- processInfo();
- if ((0 == tot) && suppressStatisticIfZero(kind))
- return;
- //MORE: For most measures (not time stamps etc.) it would be sensible to output the total here....
- if (!suppressMinMaxWhenEqual || (maxSkew != minSkew))
- {
- stats.addStatistic((StatisticKind)(kind|StMinX), min);
- stats.addStatistic((StatisticKind)(kind|StMaxX), max);
- stats.addStatistic((StatisticKind)(kind|StAvgX), avg);
- }
- if (maxSkew != minSkew)
- {
- stats.addStatistic((StatisticKind)(kind|StSkewMin), -(__int64)minSkew); // Save minimum as a negative value so consistent
- stats.addStatistic((StatisticKind)(kind|StSkewMax), maxSkew);
- stats.addStatistic((StatisticKind)(kind|StNodeMin), minNode);
- stats.addStatistic((StatisticKind)(kind|StNodeMax), maxNode);
- }
- }
- ///////////////////////////////////////////////////
- CTimingInfo::CTimingInfo(CJobBase &ctx) : CThorStats(ctx, StTimeLocalExecute)
- {
- }
- ///////////////////////////////////////////////////
- ProgressInfo::ProgressInfo(CJobBase &ctx) : CThorStats(ctx, StNumRowsProcessed)
- {
- startcount = stopcount = 0;
- }
- void ProgressInfo::processInfo() // reimplement as counts have special flags (i.e. stop/start)
- {
- reset();
- startcount = stopcount = 0;
- ForEachItemIn(n, counts)
- {
- unsigned __int64 thiscount = counts.item(n);
- if (thiscount & THORDATALINK_STARTED)
- startcount++;
- if (thiscount & THORDATALINK_STOPPED)
- stopcount++;
- thiscount = thiscount & THORDATALINK_COUNT_MASK;
- tallyValue(thiscount, n+1);
- }
- calculateSkew();
- }
- void ProgressInfo::getStats(IStatisticGatherer & stats)
- {
- CThorStats::getStats(stats, true);
- stats.addStatistic(kind, tot);
- stats.addStatistic(StNumSlaves, counts.ordinality());
- stats.addStatistic(StNumStarts, startcount);
- stats.addStatistic(StNumStops, stopcount);
- }
- ///////////////////////////////////////////////////
- CJobMaster *createThorGraph(const char *graphName, IConstWorkUnit &workunit, ILoadedDllEntry *querySo, bool sendSo, const SocketEndpoint &agentEp)
- {
- Owned<CJobMaster> jobMaster = new CJobMaster(workunit, graphName, querySo, sendSo, agentEp);
- IPropertyTree *graphXGMML = jobMaster->queryGraphXGMML();
- Owned<IPropertyTreeIterator> iter = graphXGMML->getElements("node");
- ForEach(*iter)
- jobMaster->addSubGraph(iter->query());
- jobMaster->addDependencies(graphXGMML);
- return LINK(jobMaster);
- }
- static IJobManager *jobManager = NULL;
- void setJobManager(IJobManager *_jobManager)
- {
- CriticalBlock b(*jobManagerCrit);
- jobManager = _jobManager;
- }
- IJobManager *getJobManager()
- {
- CriticalBlock b(*jobManagerCrit);
- return LINK(jobManager);
- }
- IJobManager &queryJobManager()
- {
- CriticalBlock b(*jobManagerCrit);
- assertex(jobManager);
- return *jobManager;
- }
|