1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201120212031204120512061207120812091210121112121213121412151216121712181219122012211222122312241225122612271228122912301231123212331234123512361237123812391240124112421243124412451246124712481249125012511252125312541255125612571258125912601261126212631264126512661267126812691270127112721273127412751276127712781279128012811282128312841285128612871288128912901291129212931294129512961297129812991300130113021303130413051306130713081309131013111312131313141315131613171318131913201321132213231324132513261327132813291330133113321333133413351336133713381339134013411342134313441345134613471348134913501351135213531354135513561357135813591360136113621363136413651366136713681369137013711372137313741375137613771378137913801381138213831384138513861387138813891390139113921393139413951396139713981399140014011402140314041405140614071408140914101411141214131414141514161417141814191420142114221423142414251426142714281429143014311432143314341435143614371438143914401441144214431444144514461447144814491450145114521453145414551456145714581459146014611462146314641465146614671468146914701471147214731474147514761477147814791480148114821483148414851486148714881489149014911492149314941495149614971498149915001501150215031504150515061507150815091510151115121513151415151516151715181519152015211522152315241525152615271528152915301531153215331534153515361537153815391540154115421543154415451546154715481549155015511552155315541555155615571558155915601561156215631564156515661567156815691570157115721573157415751576157715781579158015811582158315841585158615871588158915901591159215931594159515961597159815991600160116021603160416051606160716081609161016111612161316141615161616171618161916201621162216231624162516261627162816291630163116321633163416351636163716381639164016411642164316441645164616471648164916501651165216531654165516561657165816591660166116621663166416651666166716681669167016711672167316741675167616771678167916801681168216831684168516861687168816891690169116921693169416951696169716981699170017011702170317041705170617071708170917101711171217131714171517161717171817191720172117221723172417251726172717281729173017311732173317341735173617371738173917401741174217431744174517461747174817491750175117521753175417551756175717581759176017611762176317641765176617671768176917701771177217731774177517761777177817791780178117821783178417851786178717881789179017911792179317941795179617971798179918001801180218031804180518061807180818091810181118121813181418151816181718181819182018211822182318241825182618271828182918301831183218331834183518361837183818391840184118421843184418451846184718481849185018511852185318541855185618571858185918601861186218631864186518661867186818691870187118721873187418751876187718781879188018811882188318841885188618871888188918901891189218931894189518961897189818991900190119021903190419051906190719081909191019111912191319141915191619171918191919201921192219231924192519261927192819291930193119321933193419351936193719381939194019411942194319441945194619471948194919501951195219531954195519561957195819591960196119621963196419651966196719681969197019711972197319741975197619771978197919801981198219831984198519861987198819891990199119921993199419951996199719981999200020012002200320042005200620072008200920102011201220132014201520162017201820192020202120222023 |
- /*##############################################################################
- HPCC SYSTEMS software Copyright (C) 2012 HPCC Systems®.
- Licensed under the Apache License, Version 2.0 (the "License");
- you may not use this file except in compliance with the License.
- You may obtain a copy of the License at
- http://www.apache.org/licenses/LICENSE-2.0
- Unless required by applicable law or agreed to in writing, software
- distributed under the License is distributed on an "AS IS" BASIS,
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- See the License for the specific language governing permissions and
- limitations under the License.
- ############################################################################## */
- #include "jlib.hpp"
- #include "jlzw.hpp"
- #include "jhtree.hpp"
- #include "daclient.hpp"
- #include "commonext.hpp"
- #include "thorplugin.hpp"
- #include "thcodectx.hpp"
- #include "thmem.hpp"
- #include "thorport.hpp"
- #include "slwatchdog.hpp"
- #include "thgraphslave.hpp"
- #include "thcompressutil.hpp"
- #include "enginecontext.hpp"
- //////////////////////////////////
- class CBarrierSlave : public CInterface, implements IBarrier
- {
- mptag_t tag;
- Linked<ICommunicator> comm;
- bool receiving;
- CJobChannel &jobChannel;
- public:
- IMPLEMENT_IINTERFACE;
- CBarrierSlave(CJobChannel &_jobChannel, ICommunicator &_comm, mptag_t _tag) : jobChannel(_jobChannel), comm(&_comm), tag(_tag)
- {
- receiving = false;
- }
- virtual bool wait(bool exception, unsigned timeout)
- {
- Owned<IException> e;
- CTimeMon tm(timeout);
- unsigned remaining = timeout;
- CMessageBuffer msg;
- msg.append(false);
- msg.append(false); // no exception
- if (INFINITE != timeout && tm.timedout(&remaining))
- {
- if (exception)
- throw createBarrierAbortException();
- else
- return false;
- }
- if (!comm->send(msg, 0, tag, INFINITE != timeout ? remaining : LONGTIMEOUT))
- throw MakeStringException(0, "CBarrierSlave::wait - Timeout sending to master");
- msg.clear();
- if (INFINITE != timeout && tm.timedout(&remaining))
- {
- if (exception)
- throw createBarrierAbortException();
- else
- return false;
- }
- {
- BooleanOnOff onOff(receiving);
- if (!comm->recv(msg, 0, tag, NULL, remaining))
- return false;
- }
- bool aborted;
- msg.read(aborted);
- bool hasExcept;
- msg.read(hasExcept);
- if (hasExcept)
- e.setown(deserializeException(msg));
- if (aborted)
- {
- if (!exception)
- return false;
- if (e)
- throw e.getClear();
- else
- throw createBarrierAbortException();
- }
- return true;
- }
- virtual void cancel(IException *e)
- {
- if (receiving)
- comm->cancel(jobChannel.queryMyRank(), tag);
- CMessageBuffer msg;
- msg.append(true);
- if (e)
- {
- msg.append(true);
- serializeException(e, msg);
- }
- else
- msg.append(false);
- if (!comm->send(msg, 0, tag, LONGTIMEOUT))
- throw MakeStringException(0, "CBarrierSlave::cancel - Timeout sending to master");
- }
- virtual const mptag_t queryTag() const { return tag; }
- };
- //
- CSlaveActivity::CSlaveActivity(CGraphElementBase *_container) : CActivityBase(_container), CEdgeProgress(this)
- {
- data = NULL;
- }
- CSlaveActivity::~CSlaveActivity()
- {
- inputs.kill();
- outputs.kill();
- if (data) delete [] data;
- ActPrintLog("DESTROYED");
- }
- void CSlaveActivity::setOutputStream(unsigned index, IEngineRowStream *stream)
- {
- while (outputStreams.ordinality()<=index)
- outputStreams.append(nullptr);
- outputStreams.replace(stream, index);
- }
- void CSlaveActivity::setInput(unsigned index, CActivityBase *inputActivity, unsigned inputOutIdx)
- {
- CActivityBase::setInput(index, inputActivity, inputOutIdx);
- Linked<IThorDataLink> outLink;
- if (!inputActivity)
- {
- Owned<CActivityBase> nullAct = container.factory(TAKnull);
- outLink.set(((CSlaveActivity *)(nullAct.get()))->queryOutput(0)); // NB inputOutIdx irrelevant, null has single 'fake' output
- nullAct->releaseIOs(); // normally done as graph winds up, clear now to avoid circular dependencies with outputs
- }
- else
- outLink.set(((CSlaveActivity *)inputActivity)->queryOutput(inputOutIdx));
- assertex(outLink);
- while (inputs.ordinality()<=index)
- inputs.append(* new CThorInput());
- CThorInput &newInput = inputs.item(index);
- newInput.set(outLink, inputOutIdx);
- if (0 == index && !input)
- {
- input = outLink;
- inputSourceIdx = inputOutIdx;
- }
- }
- void CSlaveActivity::connectInputStreams(bool consumerOrdered)
- {
- ForEachItemIn(index, inputs)
- {
- CThorInput &_input = inputs.item(index);
- if (_input.itdl)
- setInputStream(index, _input, consumerOrdered);
- }
- }
- void CSlaveActivity::setInputStream(unsigned index, CThorInput &_input, bool consumerOrdered)
- {
- if (_input.itdl)
- {
- Owned<IStrandJunction> junction;
- IEngineRowStream *_inputStream = connectSingleStream(*this, _input.itdl, _input.sourceIdx, junction, _input.itdl->isInputOrdered(consumerOrdered));
- if (queryJob().getOptBool("TRACEROWS"))
- {
- const unsigned numTraceRows = queryJob().getOptInt("numTraceRows", 10);
- CTracingStream *tracingStream = new CTracingStream(_input.itdl, _inputStream, _input.itdl->queryFromActivity()->queryHelper(), numTraceRows);
- _input.tracingStream.setown(tracingStream);
- _inputStream = tracingStream;
- }
- _input.stream.set(_inputStream);
- _input.junction.setown(junction.getClear());
- if (0 == index)
- inputStream = _inputStream;
- _input.itdl->setOutputStream(_input.sourceIdx, LINK(_inputStream)); // used by debug request only at moment. // JCSMORE - this should probably be the junction outputstream if there is one
- }
- }
- IEngineRowStream *CSlaveActivity::replaceInputStream(unsigned index, IEngineRowStream *_inputStream)
- {
- CThorInput &_input = inputs.item(index);
- IEngineRowStream *prevInputStream = _input.stream.getClear();
- _input.stream.setown(_inputStream);
- if (0 == index)
- inputStream = _inputStream;
- return prevInputStream;
- }
- void CSlaveActivity::setLookAhead(unsigned index, IStartableEngineRowStream *lookAhead)
- {
- CThorInput &_input = inputs.item(index);
- _input.lookAhead.setown(lookAhead);
- _input.stream.set(lookAhead);
- if (0 == index)
- inputStream = lookAhead;
- }
- IStrandJunction *CSlaveActivity::getOutputStreams(CActivityBase &ctx, unsigned idx, PointerArrayOf<IEngineRowStream> &streams, const CThorStrandOptions * consumerOptions, bool consumerOrdered, IOrderedCallbackCollection * orderedCallbacks)
- {
- // Default non-stranded implementation, expects activity to have 1 output.
- // By default, activities are assumed NOT to support streams
- bool inputOrdered = isInputOrdered(consumerOrdered);
- connectInputStreams(inputOrdered);
- // Return a single stream
- // Default activity impl. adds single output as stream
- streams.append(this);
- return nullptr;
- }
- void CSlaveActivity::appendOutput(IThorDataLink *itdl)
- {
- outputs.append(itdl);
- }
- void CSlaveActivity::appendOutputLinked(IThorDataLink *itdl)
- {
- if (itdl)
- itdl->Link();
- appendOutput(itdl);
- }
- IThorDataLink *CSlaveActivity::queryOutput(unsigned index) const
- {
- if (index>=outputs.ordinality()) return nullptr;
- return outputs.item(index);
- }
- IThorDataLink *CSlaveActivity::queryInput(unsigned index) const
- {
- if (index>=inputs.ordinality()) return nullptr;
- return inputs.item(index).itdl;
- }
- IEngineRowStream *CSlaveActivity::queryInputStream(unsigned index) const
- {
- if (index>=inputs.ordinality()) return nullptr;
- return inputs.item(index).stream;
- }
- IStrandJunction *CSlaveActivity::queryInputJunction(unsigned index) const
- {
- if (index>=inputs.ordinality()) return nullptr;
- return inputs.item(index).junction;
- }
- IEngineRowStream *CSlaveActivity::queryOutputStream(unsigned index) const
- {
- if (index>=outputStreams.ordinality()) return nullptr;
- return outputStreams.item(index);
- }
- void CSlaveActivity::start()
- {
- if (inputs.ordinality()>1)
- throwUnexpected();
- if (input)
- startInput(0);
- dataLinkStart();
- }
- void CSlaveActivity::startAllInputs()
- {
- ForEachItemIn(i, inputs)
- {
- startInput(i);
- }
- }
- void CSlaveActivity::startInput(unsigned index, const char *extra)
- {
- VStringBuffer s("Starting input %u", index);
- if (extra)
- s.append(" ").append(extra);
- ActPrintLog("%s", s.str());
- CThorInput &_input = inputs.item(index);
- #ifdef TRACE_STARTSTOP_EXCEPTIONS
- try
- {
- #endif
- _input.itdl->start();
- startJunction(_input.junction);
- if (_input.lookAhead)
- _input.lookAhead->start();
- _input.stopped = false;
- _input.started = true;
- if (0 == index)
- inputStopped = false;
- #ifdef TRACE_STARTSTOP_EXCEPTIONS
- }
- catch(IException *e)
- {
- ActPrintLog(e, "%s", s.str());
- throw;
- }
- #endif
- }
- void CSlaveActivity::stop()
- {
- if (input)
- stopInput(0);
- dataLinkStop();
- }
- void CSlaveActivity::stopInput(unsigned index, const char *extra)
- {
- CThorInput &_input = inputs.item(index);
- if (_input.stopped)
- return;
- VStringBuffer s("Stopping input %u for", index);
- if (extra)
- s.append(" ").append(extra);
- ActPrintLog("%s", s.str());
- #ifdef TRACE_STARTSTOP_EXCEPTIONS
- try
- {
- #endif
- if (_input.stream)
- _input.stream->stop();
- _input.stopped = true;
- if (0 == index)
- inputStopped = true;
- #ifdef TRACE_STARTSTOP_EXCEPTIONS
- }
- catch(IException * e)
- {
- ActPrintLog(e, "%s", s.str());
- throw;
- }
- #endif
- }
- void CSlaveActivity::stopAllInputs()
- {
- ForEachItemIn(i, inputs)
- {
- stopInput(i);
- }
- }
- void CSlaveActivity::reset()
- {
- CActivityBase::reset();
- ForEachItemIn(i, inputs)
- inputs.item(i).reset();
- inputStopped = false;
- }
- void CSlaveActivity::releaseIOs()
- {
- // inputs.kill(); // don't want inputs to die before this dies (release in deconstructor) // JCSMORE not sure why care particularly.
- outputs.kill(); // outputs tend to be self-references, this clears them explicitly, otherwise end up leaking with circular references.
- outputStreams.kill();
- }
- void CSlaveActivity::clearConnections()
- {
- outputStreams.kill();
- inputs.kill();
- }
- MemoryBuffer &CSlaveActivity::queryInitializationData(unsigned slave) const
- {
- CriticalBlock b(crit);
- if (!data)
- data = new MemoryBuffer[container.queryJob().querySlaves()];
- CMessageBuffer msg;
- graph_id gid = queryContainer().queryOwner().queryGraphId();
- msg.append(smt_dataReq);
- msg.append(slave);
- msg.append(gid);
- msg.append(container.queryId());
- if (!queryJobChannel().queryJobComm().sendRecv(msg, 0, queryContainer().queryJob().querySlaveMpTag(), LONGTIMEOUT))
- throwUnexpected();
- data[slave].swapWith(msg);
- return data[slave];
- }
- MemoryBuffer &CSlaveActivity::getInitializationData(unsigned slave, MemoryBuffer &mb) const
- {
- return mb.append(queryInitializationData(slave));
- }
- unsigned __int64 CSlaveActivity::queryLocalCycles() const
- {
- unsigned __int64 inputCycles = 0;
- if (1 == inputs.ordinality())
- {
- IThorDataLink *input = queryInput(0);
- inputCycles += input->queryTotalCycles();
- }
- else
- {
- switch (container.getKind())
- {
- case TAKif:
- case TAKchildif:
- case TAKifaction:
- case TAKcase:
- case TAKchildcase:
- if (inputs.ordinality() && (((unsigned)-1) != container.whichBranch))
- {
- IThorDataLink *input = queryInput(container.whichBranch);
- if (input)
- inputCycles += input->queryTotalCycles();
- }
- break;
- default:
- ForEachItemIn(i, inputs)
- {
- IThorDataLink *input = queryInput(i);
- inputCycles += input->queryTotalCycles();
- }
- break;
- }
- }
- unsigned __int64 _totalCycles = queryTotalCycles();
- if (_totalCycles < inputCycles) // not sure how/if possible, but guard against
- return 0;
- return _totalCycles-inputCycles;
- }
- void CSlaveActivity::serializeStats(MemoryBuffer &mb)
- {
- CriticalBlock b(crit);
- mb.append((unsigned __int64)cycle_to_nanosec(queryLocalCycles()));
- ForEachItemIn(i, outputs)
- {
- IThorDataLink *output = queryOutput(i);
- if (output)
- outputs.item(i)->dataLinkSerialize(mb);
- else
- serializeNullItdl(mb);
- }
- }
- void CSlaveActivity::debugRequest(unsigned edgeIdx, MemoryBuffer &msg)
- {
- IEngineRowStream *outputStream = queryOutputStream(edgeIdx);
- IThorDebug *debug = QUERYINTERFACE(outputStream, IThorDebug); // should probably use an extended IEngineRowStream, or store in separate array instead
- if (debug) debug->debugRequest(msg);
- }
- bool CSlaveActivity::isGrouped() const
- {
- if (!input) return false; // should possible be an error if query and not set
- return input->isGrouped();
- }
- IOutputMetaData *CSlaveActivity::queryOutputMeta() const
- {
- return queryHelper()->queryOutputMeta();
- }
- void CSlaveActivity::dataLinkSerialize(MemoryBuffer &mb) const
- {
- CEdgeProgress::dataLinkSerialize(mb);
- }
- rowcount_t CSlaveActivity::getProgressCount() const
- {
- return CEdgeProgress::getCount();
- }
- void CSlaveActivity::debugRequest(MemoryBuffer &msg)
- {
- }
- /// CThorStrandProcessor
- CThorStrandProcessor::CThorStrandProcessor(CThorStrandedActivity &_parent, IEngineRowStream *_inputStream, unsigned _outputId)
- : parent(_parent), inputStream(_inputStream), outputId(_outputId), timeActivities(_parent.queryTimeActivities())
- {
- rowsProcessed = 0;
- baseHelper.set(parent.queryHelper());
- }
- void CThorStrandProcessor::processAndThrowOwnedException(IException *_e)
- {
- IThorException *e = QUERYINTERFACE(_e, IThorException);
- if (e)
- {
- if (!e->queryActivityId())
- setExceptionActivityInfo(parent.queryContainer(), e);
- }
- else
- {
- e = MakeActivityException(&parent, _e);
- _e->Release();
- }
- throw e;
- }
- void CThorStrandProcessor::stop()
- {
- if (!stopped)
- {
- if (inputStream)
- inputStream->stop();
- parent.strandedStop();
- }
- stopped = true;
- }
- /// CThorStrandedActivity
- void CThorStrandedActivity::onStartStrands()
- {
- active = strands.ordinality();
- ForEachItemIn(idx, strands)
- strands.item(idx).start();
- }
- void CThorStrandedActivity::strandedStop()
- {
- // Called from the strands... which should ensure that stop is not called more than once per strand
- //The first strand to call
- if (active)
- --active;
- if (!active)
- stop();
- }
- //This function is pure (But also implemented out of line) to force the derived classes to implement it.
- //After calling the base class start method, and initialising any values from the helper they must call onStartStrands(),
- //this must also happen before any rows are read from the strands (e.g., by a source junction)
- // virtual void start(unsigned parentExtractSize, const byte *parentExtract, bool paused) = 0;
- //For some reason gcc doesn't let you specify a function as pure virtual and define it at the same time.
- void CThorStrandedActivity::start()
- {
- CSlaveActivity::start();
- startJunction(splitter);
- onStartStrands();
- }
- void CThorStrandedActivity::reset()
- {
- assertex(active==0);
- ForEachItemIn(idx, strands)
- strands.item(idx).reset();
- resetJunction(splitter);
- CSlaveActivity::reset();
- resetJunction(sourceJunction);
- }
- IStrandJunction *CThorStrandedActivity::getOutputStreams(CActivityBase &ctx, unsigned idx, PointerArrayOf<IEngineRowStream> &streams, const CThorStrandOptions * consumerOptions, bool consumerOrdered, IOrderedCallbackCollection * orderedCallbacks)
- {
- assertex(idx == 0);
- assertex(strands.empty());
- // JCSMORE these may be wrong if this is a source activity
- bool inputOrdered = input ? input->isInputOrdered(consumerOrdered) : isInputOrdered(consumerOrdered);
- //Note, numStrands == 1 is an explicit request to disable threading
- if (consumerOptions && (consumerOptions->numStrands != 1) && (strandOptions.numStrands != 1))
- {
- //Check to see if the consumer's settings should override
- if (strandOptions.numStrands == 0)
- {
- strandOptions.numStrands = consumerOptions->numStrands;
- strandOptions.blockSize = consumerOptions->blockSize;
- }
- else if (consumerOptions->numStrands > strandOptions.numStrands)
- {
- strandOptions.numStrands = consumerOptions->numStrands;
- }
- }
- Owned <IStrandJunction> recombiner;
- if (input)
- {
- if (strandOptions.numStrands == 1)
- {
- // 1 means explicitly requested single-strand.
- Owned<IStrandJunction> junction;
- IEngineRowStream *instream = connectSingleStream(ctx, input, inputSourceIdx, junction, inputOrdered);
- inputs.item(idx).junction.setown(junction.getClear());
- strands.append(*createStrandProcessor(instream));
- }
- else
- {
- PointerArrayOf<IEngineRowStream> instreams;
- recombiner.setown(input->getOutputStreams(ctx, inputSourceIdx, instreams, &strandOptions, inputOrdered, orderedCallbacks));
- if ((instreams.length() == 1) && (strandOptions.numStrands != 0)) // 0 means did not specify - we should use the strands that our upstream provides
- {
- assertex(recombiner == nullptr);
- // Create a splitter to split the input into n... and a recombiner if need to preserve sorting
- if (inputOrdered)
- {
- branch.setown(createStrandBranch(*ctx.queryRowManager(), strandOptions.numStrands, strandOptions.blockSize, true, input->queryOutputMeta()->isGrouped(), false, orderedCallbacks));
- splitter.set(branch->queryInputJunction());
- recombiner.set(branch->queryOutputJunction());
- }
- else
- {
- splitter.setown(createStrandJunction(*ctx.queryRowManager(), 1, strandOptions.numStrands, strandOptions.blockSize, false));
- }
- splitter->setInput(0, instreams.item(0));
- for (unsigned strandNo = 0; strandNo < strandOptions.numStrands; strandNo++)
- strands.append(*createStrandProcessor(splitter->queryOutput(strandNo)));
- }
- else
- {
- // Ignore my hint and just use the width already split into...
- ForEachItemIn(strandNo, instreams)
- strands.append(*createStrandProcessor(instreams.item(strandNo)));
- }
- }
- }
- else
- {
- unsigned numStrands = strandOptions.numStrands ? strandOptions.numStrands : 1;
- for (unsigned i=0; i < numStrands; i++)
- strands.append(*createStrandSourceProcessor(inputOrdered));
- if (inputOrdered && (numStrands > 1))
- {
- if (consumerOptions)
- {
- //If the output activities are also stranded then need to create a version of the branch
- bool isGrouped = queryOutputMeta()->isGrouped();
- branch.setown(createStrandBranch(*ctx.queryRowManager(), strandOptions.numStrands, strandOptions.blockSize, true, isGrouped, true, orderedCallbacks));
- sourceJunction.set(branch->queryInputJunction());
- recombiner.set(branch->queryOutputJunction());
- assertex((orderedCallbacks && !recombiner) || (!orderedCallbacks && recombiner));
- //This is different from the branch above. The first "junction" has the source activity as the input, and the outputs as the result of the activity
- for (unsigned strandNo = 0; strandNo < strandOptions.numStrands; strandNo++)
- {
- sourceJunction->setInput(strandNo, &strands.item(strandNo));
- streams.append(sourceJunction->queryOutput(strandNo));
- }
- #ifdef TRACE_STRANDS
- if (traceLevel > 2)
- DBGLOG("Executing activity %u with %u strands", activityId, strands.ordinality());
- #endif
- return recombiner.getClear();
- }
- else
- recombiner.setown(createStrandJunction(*ctx.queryRowManager(), numStrands, 1, strandOptions.blockSize, inputOrdered));
- }
- }
- ForEachItemIn(i, strands)
- streams.append(&strands.item(i));
- #ifdef TRACE_STRANDS
- if (traceLevel > 2)
- DBGLOG("Executing activity %u with %u strands", activityId, strands.ordinality());
- #endif
- return recombiner.getClear();
- }
- unsigned __int64 CThorStrandedActivity::queryTotalCycles() const
- {
- unsigned __int64 total = 0;;
- ForEachItemIn(i, strands)
- {
- CThorStrandProcessor &strand = strands.item(i);
- total += strand.queryTotalCycles();
- }
- return total;
- }
- void CThorStrandedActivity::dataLinkSerialize(MemoryBuffer &mb) const
- {
- mb.append(getProgressCount());
- }
- rowcount_t CThorStrandedActivity::getProgressCount() const
- {
- rowcount_t totalCount = getCount();
- ForEachItemIn(i, strands)
- {
- CThorStrandProcessor &strand = strands.item(i);
- totalCount += strand.getCount();
- }
- return totalCount;
- }
- // CSlaveLateStartActivity
- void CSlaveLateStartActivity::lateStart(bool any)
- {
- prefiltered = !any;
- if (!prefiltered)
- startInput(0);
- else
- stopInput(0);
- }
- void CSlaveLateStartActivity::start()
- {
- Linked<CThorInput> savedInput = &inputs.item(0);
- if (!nullInput)
- {
- nullInput.setown(new CThorInput);
- nullInput->sourceIdx = savedInput->sourceIdx; // probably not needed
- }
- inputs.replace(* nullInput.getLink(), 0);
- input = NULL;
- CSlaveActivity::start();
- inputs.replace(* savedInput.getClear(), 0);
- input = inputs.item(0).itdl;
- }
- void CSlaveLateStartActivity::stop()
- {
- if (!prefiltered)
- {
- stopInput(0);
- dataLinkStop();
- }
- }
- void CSlaveLateStartActivity::reset()
- {
- CSlaveActivity::reset();
- prefiltered = false;
- }
- // CSlaveGraph
- CSlaveGraph::CSlaveGraph(CJobChannel &jobChannel) : CGraphBase(jobChannel)
- {
- jobS = (CJobSlave *)&jobChannel.queryJob();
- }
- void CSlaveGraph::init(MemoryBuffer &mb)
- {
- mpTag = queryJobChannel().deserializeMPTag(mb);
- startBarrierTag = queryJobChannel().deserializeMPTag(mb);
- waitBarrierTag = queryJobChannel().deserializeMPTag(mb);
- doneBarrierTag = queryJobChannel().deserializeMPTag(mb);
- startBarrier = queryJobChannel().createBarrier(startBarrierTag);
- waitBarrier = queryJobChannel().createBarrier(waitBarrierTag);
- if (doneBarrierTag != TAG_NULL)
- doneBarrier = queryJobChannel().createBarrier(doneBarrierTag);
- initialized = false;
- progressActive = progressToCollect = false;
- unsigned subCount;
- mb.read(subCount);
- while (subCount--)
- {
- graph_id gid;
- mb.read(gid);
- Owned<CSlaveGraph> subGraph = (CSlaveGraph *)queryJobChannel().getGraph(gid);
- subGraph->init(mb);
- }
- }
- void CSlaveGraph::initWithActData(MemoryBuffer &in, MemoryBuffer &out)
- {
- activity_id id;
- loop
- {
- in.read(id);
- if (0 == id) break;
- CSlaveGraphElement *element = (CSlaveGraphElement *)queryElement(id);
- assertex(element);
- out.append(id);
- out.append((size32_t)0);
- unsigned l = out.length();
- size32_t sz;
- in.read(sz);
- unsigned aread = in.getPos();
- CSlaveActivity *activity = (CSlaveActivity *)element->queryActivity();
- assertex(activity);
- element->sentActInitData->set(0);
- activity->init(in, out);
- aread = in.getPos()-aread;
- if (aread<sz)
- {
- Owned<IException> e = MakeActivityException(element, TE_SeriailzationError, "Serialization error - activity did not read all serialized data (%d byte(s) remaining)", sz-aread);
- in.readDirect(sz-aread);
- throw e.getClear();
- }
- else if (aread>sz)
- throw MakeActivityException(element, TE_SeriailzationError, "Serialization error - activity read beyond serialized data (%d byte(s))", aread-sz);
- activity->setInitialized(true);
- size32_t dl = out.length() - l;
- if (dl)
- out.writeDirect(l-sizeof(size32_t), sizeof(size32_t), &dl);
- else
- out.setLength(l-(sizeof(activity_id)+sizeof(size32_t)));
- }
- out.append((activity_id)0);
- }
- bool CSlaveGraph::recvActivityInitData(size32_t parentExtractSz, const byte *parentExtract)
- {
- bool ret = true;
- unsigned needActInit = 0;
- unsigned uninitialized = 0;
- Owned<IThorActivityIterator> iter = getConnectedIterator();
- ForEach(*iter)
- {
- CGraphElementBase &element = (CGraphElementBase &)iter->query();
- CActivityBase *activity = element.queryActivity();
- if (activity)
- {
- if (activity->needReInit())
- {
- element.sentActInitData->set(0, false); // force act init to be resent
- activity->setInitialized(false);
- }
- if (!element.sentActInitData->test(0))
- ++needActInit;
- if (!activity->queryInitialized())
- ++uninitialized;
- }
- }
- if (0 == uninitialized)
- return true;
- mptag_t replyTag = TAG_NULL;
- size32_t len = 0;
- CMessageBuffer actInitRtnData;
- actInitRtnData.append(false);
- CMessageBuffer msg;
- if (syncInitData())
- {
- if (!graphCancelHandler.recv(queryJobChannel().queryJobComm(), msg, 0, mpTag, NULL, LONGTIMEOUT))
- throw MakeStringException(0, "Error receiving actinit data for graph: %" GIDPF "d", graphId);
- replyTag = msg.getReplyTag();
- msg.read(len);
- }
- else
- {
- if (needActInit)
- {
- // initialize any for which no data was sent
- msg.append(smt_initActDataReq); // may cause graph to be created at master
- msg.append(queryGraphId());
- msg.append(queryJobChannel().queryMyRank()-1);
- assertex(!parentExtractSz || NULL!=parentExtract);
- msg.append(parentExtractSz);
- msg.append(parentExtractSz, parentExtract);
- // NB: will only request activities that need initializaton data (those that override CSlaveActivity::init())
- ForEach(*iter)
- {
- CSlaveGraphElement &element = (CSlaveGraphElement &)iter->query();
- CActivityBase *activity = element.queryActivity();
- if (activity)
- {
- if (!element.sentActInitData->test(0))
- {
- msg.append(element.queryId());
- // JCSMORE -> GH - do you always generate a start context serializer?
- element.serializeStartContext(msg);
- }
- }
- }
- msg.append((activity_id)0);
- if (!queryJobChannel().queryJobComm().sendRecv(msg, 0, queryJob().querySlaveMpTag(), LONGTIMEOUT))
- throwUnexpected();
- replyTag = queryJobChannel().deserializeMPTag(msg);
- bool error;
- msg.read(error);
- if (error)
- {
- Owned<IException> e = deserializeException(msg);
- EXCLOG(e, "Master hit exception");
- msg.clear();
- if (!queryJobChannel().queryJobComm().send(msg, 0, replyTag, LONGTIMEOUT))
- throw MakeStringException(0, "Timeout sending init data back to master");
- throw e.getClear();
- }
- msg.read(len);
- }
- }
- if (len)
- {
- try
- {
- MemoryBuffer actInitData;
- actInitData.append(len, msg.readDirect(len));
- CriticalBlock b(progressCrit);
- initialized = true;
- initWithActData(actInitData, actInitRtnData);
- }
- catch (IException *e)
- {
- actInitRtnData.clear();
- actInitRtnData.append(true);
- serializeThorException(e, actInitRtnData);
- e->Release();
- ret = false;
- }
- }
- if (syncInitData() || needActInit)
- {
- if (!queryJobChannel().queryJobComm().send(actInitRtnData, 0, replyTag, LONGTIMEOUT))
- throw MakeStringException(0, "Timeout sending init data back to master");
- }
- // initialize any for which no data was sent
- ForEach(*iter)
- {
- CSlaveGraphElement &element = (CSlaveGraphElement &)iter->query();
- CSlaveActivity *activity = (CSlaveActivity *)element.queryActivity();
- if (activity && !activity->queryInitialized())
- {
- activity->setInitialized(true);
- element.sentActInitData->set(0);
- MemoryBuffer in, out;
- activity->init(in, out);
- assertex(0 == out.length());
- }
- }
- return ret;
- }
- bool CSlaveGraph::preStart(size32_t parentExtractSz, const byte *parentExtract)
- {
- if (!recvActivityInitData(parentExtractSz, parentExtract))
- throw MakeThorException(0, "preStart failure");
- CGraphBase::preStart(parentExtractSz, parentExtract);
- if (isGlobal())
- {
- if (!startBarrier->wait(false))
- return false;
- }
- return true;
- }
- void CSlaveGraph::start()
- {
- {
- SpinBlock b(progressActiveLock);
- progressActive = true;
- progressToCollect = true;
- }
- bool forceAsync = !queryOwner() || isGlobal();
- Owned<IThorActivityIterator> iter = getSinkIterator();
- unsigned sinks = 0;
- ForEach(*iter)
- ++sinks;
- ForEach(*iter)
- {
- CGraphElementBase &container = iter->query();
- CActivityBase *sinkAct = (CActivityBase *)container.queryActivity();
- --sinks;
- sinkAct->startProcess(forceAsync || 0 != sinks); // async, unless last
- }
- if (!queryOwner())
- {
- if (globals->getPropBool("@watchdogProgressEnabled"))
- jobS->queryProgressHandler()->startGraph(*this);
- }
- }
- void CSlaveGraph::connect()
- {
- CriticalBlock b(progressCrit);
- Owned<IThorActivityIterator> iter = getConnectedIterator(false);
- ForEach(*iter)
- iter->query().doconnect();
- iter.setown(getSinkIterator());
- ForEach(*iter)
- {
- CGraphElementBase &container = iter->query();
- CSlaveActivity *sinkAct = (CSlaveActivity *)container.queryActivity();
- sinkAct->connectInputStreams(true);
- }
- }
- void CSlaveGraph::executeSubGraph(size32_t parentExtractSz, const byte *parentExtract)
- {
- if (isComplete())
- return;
- Owned<IException> exception;
- try
- {
- if (!doneInit)
- {
- doneInit = true;
- if (queryOwner())
- {
- if (isGlobal())
- {
- CMessageBuffer msg;
- if (!graphCancelHandler.recv(queryJobChannel().queryJobComm(), msg, 0, mpTag, NULL, LONGTIMEOUT))
- throw MakeStringException(0, "Error receiving createctx data for graph: %" GIDPF "d", graphId);
- try
- {
- size32_t len;
- msg.read(len);
- if (len)
- {
- MemoryBuffer initData;
- initData.append(len, msg.readDirect(len));
- deserializeCreateContexts(initData);
- }
- msg.clear();
- msg.append(false);
- }
- catch (IException *e)
- {
- msg.clear();
- msg.append(true);
- serializeThorException(e, msg);
- }
- if (!queryJobChannel().queryJobComm().send(msg, 0, msg.getReplyTag(), LONGTIMEOUT))
- throw MakeStringException(0, "Timeout sending init data back to master");
- }
- else
- {
- CMessageBuffer msg;
- msg.append(smt_initGraphReq);
- msg.append(graphId);
- if (!queryJobChannel().queryJobComm().sendRecv(msg, 0, queryJob().querySlaveMpTag(), LONGTIMEOUT))
- throwUnexpected();
- size32_t len;
- msg.read(len);
- if (len)
- deserializeCreateContexts(msg);
- // could still request 1 off, onCreate serialization from master 1st.
- }
- }
- connect(); // only now do slave acts. have all their outputs prepared.
- }
- CGraphBase::executeSubGraph(parentExtractSz, parentExtract);
- }
- catch (IException *e)
- {
- GraphPrintLog(e, "In executeSubGraph");
- exception.setown(e);
- }
- if (TAG_NULL != executeReplyTag)
- {
- CMessageBuffer msg;
- if (exception.get())
- {
- msg.append(true);
- serializeThorException(exception, msg);
- }
- else
- msg.append(false);
- queryJobChannel().queryJobComm().send(msg, 0, executeReplyTag, LONGTIMEOUT);
- }
- else if (exception)
- throw exception.getClear();
- }
- void CSlaveGraph::abort(IException *e)
- {
- if (!graphDone) // set pre done(), no need to abort if got that far.
- CGraphBase::abort(e);
- getDoneSem.signal();
- }
- void CSlaveGraph::done()
- {
- GraphPrintLog("End of sub-graph");
- {
- SpinBlock b(progressActiveLock);
- progressActive = false;
- progressToCollect = true; // NB: ensure collected after end of graph
- }
- if (!aborted && graphDone && (!queryOwner() || isGlobal()))
- getDoneSem.wait(); // must wait on master
- if (!queryOwner())
- {
- if (globals->getPropBool("@watchdogProgressEnabled"))
- jobS->queryProgressHandler()->stopGraph(*this, NULL);
- }
- Owned<IException> exception;
- try
- {
- CGraphBase::done();
- }
- catch (IException *e)
- {
- GraphPrintLog(e, "In CSlaveGraph::done");
- exception.setown(e);
- }
- if (exception.get())
- throw LINK(exception.get());
- }
- bool CSlaveGraph::serializeStats(MemoryBuffer &mb)
- {
- unsigned beginPos = mb.length();
- mb.append(queryGraphId());
- unsigned cPos = mb.length();
- unsigned count = 0;
- mb.append(count);
- CriticalBlock b(progressCrit);
- // until started and activities initialized, activities are not ready to serlialize stats.
- if ((started&&initialized) || 0 == activityCount())
- {
- bool collect=false;
- {
- SpinBlock b(progressActiveLock);
- if (progressActive || progressToCollect)
- {
- progressToCollect = false;
- collect = true;
- }
- }
- if (collect)
- {
- unsigned sPos = mb.length();
- Owned<IThorActivityIterator> iter = getConnectedIterator();
- ForEach (*iter)
- {
- CGraphElementBase &element = iter->query();
- CSlaveActivity &activity = (CSlaveActivity &)*element.queryActivity();
- unsigned pos = mb.length();
- mb.append(activity.queryContainer().queryId());
- activity.serializeStats(mb);
- if (pos == mb.length()-sizeof(activity_id))
- mb.rewrite(pos);
- else
- ++count;
- }
- mb.writeDirect(cPos, sizeof(count), &count);
- }
- unsigned cqCountPos = mb.length();
- unsigned cq=0;
- mb.append(cq);
- Owned<IThorGraphIterator> childIter = getChildGraphs();
- ForEach(*childIter)
- {
- CSlaveGraph &graph = (CSlaveGraph &)childIter->query();
- if (graph.serializeStats(mb))
- ++cq;
- }
- if (count || cq)
- {
- mb.writeDirect(cqCountPos, sizeof(cq), &cq);
- return true;
- }
- }
- mb.rewrite(beginPos);
- return false;
- }
- void CSlaveGraph::serializeDone(MemoryBuffer &mb)
- {
- mb.append(queryGraphId());
- unsigned cPos = mb.length();
- unsigned count=0;
- mb.append(count);
- Owned<IThorActivityIterator> iter = getConnectedIterator();
- ForEach (*iter)
- {
- CGraphElementBase &element = iter->query();
- if (element.queryActivity())
- {
- CSlaveActivity &activity = (CSlaveActivity &)*element.queryActivity();
- unsigned rPos = mb.length();
- mb.append(element.queryId());
- unsigned nl=0;
- mb.append(nl); // place holder for size of mb
- unsigned l = mb.length();
- activity.processDone(mb);
- nl = mb.length()-l;
- if (0 == nl)
- mb.rewrite(rPos);
- else
- {
- mb.writeDirect(l-sizeof(nl), sizeof(nl), &nl);
- ++count;
- }
- }
- }
- mb.writeDirect(cPos, sizeof(count), &count);
- }
- void CSlaveGraph::getDone(MemoryBuffer &doneInfoMb)
- {
- if (!started) return;
- GraphPrintLog("Entering getDone");
- if (!queryOwner() || isGlobal())
- {
- try
- {
- serializeDone(doneInfoMb);
- if (!queryOwner())
- {
- if (globals->getPropBool("@watchdogProgressEnabled"))
- jobS->queryProgressHandler()->stopGraph(*this, &doneInfoMb);
- }
- doneInfoMb.append(job.queryMaxDiskUsage());
- queryJobChannel().queryTimeReporter().serialize(doneInfoMb);
- }
- catch (IException *)
- {
- GraphPrintLog("Leaving getDone");
- getDoneSem.signal();
- throw;
- }
- }
- GraphPrintLog("Leaving getDone");
- getDoneSem.signal();
- }
- class CThorSlaveGraphResults : public CThorGraphResults
- {
- CSlaveGraph &graph;
- IArrayOf<IThorResult> globalResults;
- PointerArrayOf<CriticalSection> globalResultCrits;
- void ensureAtLeastGlobals(unsigned id)
- {
- while (globalResults.ordinality() < id)
- {
- globalResults.append(*new CThorUninitializedGraphResults(globalResults.ordinality()));
- globalResultCrits.append(new CriticalSection);
- }
- }
- public:
- CThorSlaveGraphResults(CSlaveGraph &_graph,unsigned numResults) : CThorGraphResults(numResults), graph(_graph)
- {
- }
- ~CThorSlaveGraphResults()
- {
- clear();
- }
- virtual void clear()
- {
- CriticalBlock procedure(cs);
- results.kill();
- globalResults.kill();
- ForEachItemIn(i, globalResultCrits)
- delete globalResultCrits.item(i);
- globalResultCrits.kill();
- }
- IThorResult *getResult(unsigned id, bool distributed)
- {
- Linked<IThorResult> result;
- {
- CriticalBlock procedure(cs);
- ensureAtLeast(id+1);
- result.set(&results.item(id));
- if (!distributed || !result->isDistributed())
- return result.getClear();
- ensureAtLeastGlobals(id+1);
- }
- CriticalBlock b(*globalResultCrits.item(id)); // block other global requests for this result
- IThorResult *globalResult = &globalResults.item(id);
- if (!QUERYINTERFACE(globalResult, CThorUninitializedGraphResults))
- return LINK(globalResult);
- Owned<IThorResult> gr = graph.getGlobalResult(*result->queryActivity(), result->queryRowInterfaces(), ownerId, id);
- globalResults.replace(*gr.getLink(), id);
- return gr.getClear();
- }
- };
- IThorGraphResults *CSlaveGraph::createThorGraphResults(unsigned num)
- {
- return new CThorSlaveGraphResults(*this, num);
- }
- IThorResult *CSlaveGraph::getGlobalResult(CActivityBase &activity, IThorRowInterfaces *rowIf, activity_id ownerId, unsigned id)
- {
- mptag_t replyTag = queryMPServer().createReplyTag();
- CMessageBuffer msg;
- msg.setReplyTag(replyTag);
- msg.append(smt_getresult);
- msg.append(queryJobChannel().queryMyRank()-1);
- msg.append(graphId);
- msg.append(ownerId);
- msg.append(id);
- msg.append(replyTag);
- if (!queryJobChannel().queryJobComm().send(msg, 0, queryJob().querySlaveMpTag(), LONGTIMEOUT))
- throwUnexpected();
- Owned<IThorResult> result = ::createResult(activity, rowIf, false);
- Owned<IRowWriter> resultWriter = result->getWriter();
- MemoryBuffer mb;
- Owned<ISerialStream> stream = createMemoryBufferSerialStream(mb);
- CThorStreamDeserializerSource rowSource(stream);
- loop
- {
- loop
- {
- if (activity.queryAbortSoon())
- return NULL;
- msg.clear();
- if (activity.receiveMsg(msg, 0, replyTag, NULL, 60*1000))
- break;
- ActPrintLog(&activity, "WARNING: tag %d timedout, retrying", (unsigned)replyTag);
- }
- if (!msg.length())
- break; // done
- else
- {
- bool error;
- msg.read(error);
- if (error)
- throw deserializeThorException(msg);
- ThorExpand(msg, mb.clear());
- while (!rowSource.eos())
- {
- RtlDynamicRowBuilder rowBuilder(rowIf->queryRowAllocator());
- size32_t sz = rowIf->queryRowDeserializer()->deserialize(rowBuilder, rowSource);
- resultWriter->putRow(rowBuilder.finalizeRowClear(sz));
- }
- }
- }
- return result.getClear();
- }
- ///////////////////////////
- class CThorCodeContextSlave : public CThorCodeContextBase, implements IEngineContext
- {
- mptag_t mptag;
- Owned<IDistributedFileTransaction> superfiletransaction;
- void invalidSetResult(const char * name, unsigned seq)
- {
- throw MakeStringException(0, "Attempt to output result ('%s',%d) from a child query", name ? name : "", (int)seq);
- }
- public:
- CThorCodeContextSlave(CJobChannel &jobChannel, ILoadedDllEntry &querySo, IUserDescriptor &userDesc, mptag_t _mptag) : CThorCodeContextBase(jobChannel, querySo, userDesc), mptag(_mptag)
- {
- }
- virtual void setResultBool(const char *name, unsigned sequence, bool value) { invalidSetResult(name, sequence); }
- virtual void setResultData(const char *name, unsigned sequence, int len, const void * data) { invalidSetResult(name, sequence); }
- virtual void setResultDecimal(const char * stepname, unsigned sequence, int len, int precision, bool isSigned, const void *val) { invalidSetResult(stepname, sequence); }
- virtual void setResultInt(const char *name, unsigned sequence, __int64 value, unsigned size) { invalidSetResult(name, sequence); }
- virtual void setResultRaw(const char *name, unsigned sequence, int len, const void * data) { invalidSetResult(name, sequence); }
- virtual void setResultReal(const char * stepname, unsigned sequence, double value) { invalidSetResult(stepname, sequence); }
- virtual void setResultSet(const char *name, unsigned sequence, bool isAll, size32_t len, const void * data, ISetToXmlTransformer * transformer) { invalidSetResult(name, sequence); }
- virtual void setResultString(const char *name, unsigned sequence, int len, const char * str) { invalidSetResult(name, sequence); }
- virtual void setResultUInt(const char *name, unsigned sequence, unsigned __int64 value, unsigned size) { invalidSetResult(name, sequence); }
- virtual void setResultUnicode(const char *name, unsigned sequence, int len, UChar const * str) { invalidSetResult(name, sequence); }
- virtual void setResultVarString(const char * name, unsigned sequence, const char * value) { invalidSetResult(name, sequence); }
- virtual void setResultVarUnicode(const char * name, unsigned sequence, UChar const * value) { invalidSetResult(name, sequence); }
- virtual bool getResultBool(const char * name, unsigned sequence) { throwUnexpected(); }
- virtual void getResultData(unsigned & tlen, void * & tgt, const char * name, unsigned sequence) { throwUnexpected(); }
- virtual void getResultDecimal(unsigned tlen, int precision, bool isSigned, void * tgt, const char * stepname, unsigned sequence) { throwUnexpected(); }
- virtual void getResultRaw(unsigned & tlen, void * & tgt, const char * name, unsigned sequence, IXmlToRowTransformer * xmlTransformer, ICsvToRowTransformer * csvTransformer) { throwUnexpected(); }
- virtual void getResultSet(bool & isAll, size32_t & tlen, void * & tgt, const char * name, unsigned sequence, IXmlToRowTransformer * xmlTransformer, ICsvToRowTransformer * csvTransformer) { throwUnexpected(); }
- virtual __int64 getResultInt(const char * name, unsigned sequence) { throwUnexpected(); }
- virtual double getResultReal(const char * name, unsigned sequence) { throwUnexpected(); }
- virtual void getResultString(unsigned & tlen, char * & tgt, const char * name, unsigned sequence) { throwUnexpected(); }
- virtual void getResultUnicode(unsigned & tlen, UChar * & tgt, const char * name, unsigned sequence) { throwUnexpected(); }
- virtual char *getResultVarString(const char * name, unsigned sequence) { throwUnexpected(); }
- virtual UChar *getResultVarUnicode(const char * name, unsigned sequence) { throwUnexpected(); }
- virtual unsigned getResultHash(const char * name, unsigned sequence) { throwUnexpected(); }
- virtual void getExternalResultRaw(unsigned & tlen, void * & tgt, const char * wuid, const char * stepname, unsigned sequence, IXmlToRowTransformer * xmlTransformer, ICsvToRowTransformer * csvTransformer) { throwUnexpected(); }
- virtual unsigned getExternalResultHash(const char * wuid, const char * name, unsigned sequence) { throwUnexpected(); }
- virtual void addWuException(const char * text, unsigned code, unsigned severity, const char * source)
- {
- DBGLOG("%s", text);
- Owned<IThorException> e = MakeThorException(code, "%s", text);
- e->setOrigin(source);
- e->setAction(tea_warning);
- e->setSeverity((ErrorSeverity)severity);
- jobChannel.fireException(e);
- }
- virtual unsigned getNodes() { return jobChannel.queryJob().querySlaves(); }
- virtual unsigned getNodeNum() { return jobChannel.queryMyRank()-1; }
- virtual char *getFilePart(const char *logicalName, bool create=false)
- {
- CMessageBuffer msg;
- msg.append(smt_getPhysicalName);
- msg.append(logicalName);
- msg.append(getNodeNum());
- msg.append(create);
- if (!jobChannel.queryJobComm().sendRecv(msg, 0, mptag, LONGTIMEOUT))
- throwUnexpected();
- return (char *)msg.detach();
- }
- virtual unsigned __int64 getFileOffset(const char *logicalName)
- {
- CMessageBuffer msg;
- msg.append(smt_getFileOffset);
- if (!jobChannel.queryJobComm().sendRecv(msg, 0, mptag, LONGTIMEOUT))
- throwUnexpected();
- unsigned __int64 offset;
- msg.read(offset);
- return offset;
- }
- virtual IDistributedFileTransaction *querySuperFileTransaction()
- {
- // NB: shouldn't really have fileservice being called on slaves
- if (!superfiletransaction.get())
- superfiletransaction.setown(createDistributedFileTransaction(userDesc, this));
- return superfiletransaction.get();
- }
- virtual void getResultStringF(unsigned tlen, char * tgt, const char * name, unsigned sequence) { throwUnexpected(); }
- virtual void getResultRowset(size32_t & tcount, byte * * & tgt, const char * name, unsigned sequence, IEngineRowAllocator * _rowAllocator, bool isGrouped, IXmlToRowTransformer * xmlTransformer, ICsvToRowTransformer * csvTransformer) { throwUnexpected(); }
- virtual void getResultDictionary(size32_t & tcount, byte * * & tgt, IEngineRowAllocator * _rowAllocator, const char * name, unsigned sequence, IXmlToRowTransformer * xmlTransformer, ICsvToRowTransformer * csvTransformer, IHThorHashLookupInfo * hasher) { throwUnexpected(); }
- virtual void addWuAssertFailure(unsigned code, const char * text, const char * filename, unsigned lineno, unsigned column, bool isAbort)
- {
- DBGLOG("%s", text);
- Owned<IThorException> e = MakeThorException(code, "%s", text);
- e->setAssert(filename, lineno, column);
- e->setOrigin("user");
- e->setSeverity(SeverityError);
- if (!isAbort)
- e->setAction(tea_warning);
- jobChannel.fireException(e);
- }
- virtual unsigned __int64 getDatasetHash(const char * name, unsigned __int64 hash) { throwUnexpected(); } // Should only call from master
- virtual IEngineContext *queryEngineContext() { return this; }
- // IEngineContext impl.
- virtual DALI_UID getGlobalUniqueIds(unsigned num, SocketEndpoint *_foreignNode)
- {
- if (num==0)
- return 0;
- SocketEndpoint foreignNode;
- if (_foreignNode && !_foreignNode->isNull())
- foreignNode.set(*_foreignNode);
- else
- foreignNode.set(globals->queryProp("@DALISERVERS"));
- return ::getGlobalUniqueIds(num, &foreignNode);
- }
- virtual bool allowDaliAccess() const
- {
- // NB. includes access to foreign Dalis.
- return jobChannel.queryJob().getOptBool("slaveDaliClient");
- }
- };
- class CThorCodeContextSlaveSharedMem : public CThorCodeContextSlave
- {
- IThorAllocator *sharedAllocator;
- public:
- CThorCodeContextSlaveSharedMem(CJobChannel &jobChannel, IThorAllocator *_sharedAllocator, ILoadedDllEntry &querySo, IUserDescriptor &userDesc, mptag_t mpTag)
- : CThorCodeContextSlave(jobChannel, querySo, userDesc, mpTag)
- {
- sharedAllocator = _sharedAllocator;
- }
- virtual IEngineRowAllocator *getRowAllocator(IOutputMetaData * meta, unsigned activityId) const
- {
- return sharedAllocator->getRowAllocator(meta, activityId);
- }
- };
- class CSlaveGraphTempHandler : public CGraphTempHandler
- {
- public:
- CSlaveGraphTempHandler(CJobBase &job, bool errorOnMissing) : CGraphTempHandler(job, errorOnMissing)
- {
- }
- virtual bool removeTemp(const char *name)
- {
- OwnedIFile ifile = createIFile(name);
- return ifile->remove();
- }
- };
- #define SLAVEGRAPHPOOLLIMIT 10
- CJobSlave::CJobSlave(ISlaveWatchdog *_watchdog, IPropertyTree *_workUnitInfo, const char *graphName, ILoadedDllEntry *_querySo, mptag_t _mpJobTag, mptag_t _slavemptag) : CJobBase(_querySo, graphName), watchdog(_watchdog)
- {
- workUnitInfo.set(_workUnitInfo);
- workUnitInfo->getProp("token", token);
- workUnitInfo->getProp("user", user);
- workUnitInfo->getProp("wuid", wuid);
- workUnitInfo->getProp("scope", scope);
- init();
- oldNodeCacheMem = 0;
- mpJobTag = _mpJobTag;
- slavemptag = _slavemptag;
- IPropertyTree *plugins = workUnitInfo->queryPropTree("plugins");
- if (plugins)
- {
- StringBuffer pluginsDir, installDir, pluginsList;
- globals->getProp("@INSTALL_DIR", installDir); // could use for socachedir also?
- if (installDir.length())
- addPathSepChar(installDir);
- globals->getProp("@pluginsPath", pluginsDir);
- if (pluginsDir.length())
- {
- if (!isAbsolutePath(pluginsDir.str())) // if !absolute, then make relative to installDir if is one (e.g. master mount)
- {
- if (installDir.length())
- pluginsDir.insert(0, installDir.str());
- }
- addPathSepChar(pluginsDir);
- }
- Owned<IPropertyTreeIterator> pluginIter = plugins->getElements("plugin");
- ForEach(*pluginIter)
- {
- StringBuffer pluginPath;
- IPropertyTree &plugin = pluginIter->query();
- pluginPath.append(pluginsDir).append(plugin.queryProp("@name"));
- if (pluginsList.length())
- pluginsList.append(ENVSEPCHAR);
- pluginsList.append(pluginPath);
- }
- pluginMap->loadFromList(pluginsList.str());
- }
- tmpHandler.setown(createTempHandler(true));
- sharedAllocator.setown(::createThorAllocator(globalMemoryMB, sharedMemoryMB, numChannels, memorySpillAtPercentage, *logctx, crcChecking, usePackedAllocator));
- }
- void CJobSlave::addChannel(IMPServer *mpServer)
- {
- unsigned nextChannelNum = jobChannels.ordinality();
- CJobSlaveChannel *channel = new CJobSlaveChannel(*this, mpServer, nextChannelNum);
- jobChannels.append(*channel);
- unsigned slaveNum = channel->queryMyRank();
- jobChannelSlaveNumbers[nextChannelNum] = slaveNum;
- jobSlaveChannelNum[slaveNum-1] = nextChannelNum;
- }
- void CJobSlave::startJob()
- {
- CJobBase::startJob();
- unsigned minFreeSpace = (unsigned)getWorkUnitValueInt("MINIMUM_DISK_SPACE", 0);
- if (minFreeSpace)
- {
- unsigned __int64 freeSpace = getFreeSpace(queryBaseDirectory(grp_unknown, 0));
- if (freeSpace < ((unsigned __int64)minFreeSpace)*0x100000)
- {
- SocketEndpoint ep;
- ep.setLocalHost(0);
- StringBuffer s;
- throw MakeThorException(TE_NotEnoughFreeSpace, "Node %s has %u MB(s) of available disk space, specified minimum for this job: %u MB(s)", ep.getUrlStr(s).str(), (unsigned) freeSpace / 0x100000, minFreeSpace);
- }
- }
- }
- void CJobSlave::reportGraphEnd(graph_id gid)
- {
- if (nodesLoaded) // wouldn't mean much if parallel jobs running
- PROGLOG("Graph[%" GIDPF "u] - JHTree node stats:\ncacheAdds=%d\ncacheHits=%d\nnodesLoaded=%d\nblobCacheHits=%d\nblobCacheAdds=%d\nleafCacheHits=%d\nleafCacheAdds=%d\nnodeCacheHits=%d\nnodeCacheAdds=%d\n", gid, cacheAdds.load(), cacheHits.load(), nodesLoaded.load(), blobCacheHits.load(), blobCacheAdds.load(), leafCacheHits.load(), leafCacheAdds.load(), nodeCacheHits.load(), nodeCacheAdds.load());
- JSocketStatistics stats;
- getSocketStatistics(stats);
- StringBuffer s;
- getSocketStatisticsString(stats,s);
- PROGLOG("Graph[%" GIDPF "u] - Socket statistics : %s\n", gid, s.str());
- resetSocketStatistics();
- }
- __int64 CJobSlave::getWorkUnitValueInt(const char *prop, __int64 defVal) const
- {
- StringBuffer propName(prop);
- return workUnitInfo->queryPropTree("Debug")->getPropInt64(propName.toLowerCase().str(), defVal);
- }
- StringBuffer &CJobSlave::getWorkUnitValue(const char *prop, StringBuffer &str) const
- {
- StringBuffer propName(prop);
- workUnitInfo->queryPropTree("Debug")->getProp(propName.toLowerCase().str(), str);
- return str;
- }
- bool CJobSlave::getWorkUnitValueBool(const char *prop, bool defVal) const
- {
- StringBuffer propName(prop);
- return workUnitInfo->queryPropTree("Debug")->getPropBool(propName.toLowerCase().str(), defVal);
- }
- void CJobSlave::debugRequest(MemoryBuffer &msg, const char *request) const
- {
- if (watchdog) watchdog->debugRequest(msg, request);
- }
- IGraphTempHandler *CJobSlave::createTempHandler(bool errorOnMissing)
- {
- return new CSlaveGraphTempHandler(*this, errorOnMissing);
- }
- mptag_t CJobSlave::deserializeMPTag(MemoryBuffer &mb)
- {
- mptag_t tag;
- deserializeMPtag(mb, tag);
- if (TAG_NULL != tag)
- {
- PROGLOG("CJobSlave::deserializeMPTag: tag = %d", (int)tag);
- for (unsigned c=0; c<queryJobChannels(); c++)
- queryJobChannel(c).queryJobComm().flush(tag);
- }
- return tag;
- }
- IThorAllocator *CJobSlave::getThorAllocator(unsigned channel)
- {
- if (1 == numChannels)
- return CJobBase::getThorAllocator(channel);
- else
- return sharedAllocator->getSlaveAllocator(channel);
- }
- // IGraphCallback
- CJobSlaveChannel::CJobSlaveChannel(CJobBase &_job, IMPServer *mpServer, unsigned channel) : CJobChannel(_job, mpServer, channel)
- {
- codeCtx.setown(new CThorCodeContextSlave(*this, job.queryDllEntry(), *job.queryUserDescriptor(), job.querySlaveMpTag()));
- sharedMemCodeCtx.setown(new CThorCodeContextSlaveSharedMem(*this, job.querySharedAllocator(), job.queryDllEntry(), *job.queryUserDescriptor(), job.querySlaveMpTag()));
- }
- IBarrier *CJobSlaveChannel::createBarrier(mptag_t tag)
- {
- return new CBarrierSlave(*this, *jobComm, tag);
- }
- void CJobSlaveChannel::runSubgraph(CGraphBase &graph, size32_t parentExtractSz, const byte *parentExtract)
- {
- if (!graph.queryOwner())
- CJobChannel::runSubgraph(graph, parentExtractSz, parentExtract);
- else
- graph.doExecuteChild(parentExtractSz, parentExtract);
- CriticalBlock b(graphRunCrit);
- if (!graph.queryOwner())
- removeSubGraph(graph);
- }
- ///////////////
- bool ensurePrimary(CActivityBase *activity, IPartDescriptor &partDesc, OwnedIFile & ifile, unsigned &location, StringBuffer &path)
- {
- StringBuffer locationName, primaryName;
- RemoteFilename primaryRfn;
- partDesc.getFilename(0, primaryRfn);
- primaryRfn.getPath(primaryName);
- OwnedIFile primaryIFile = createIFile(primaryName.str());
- try
- {
- if (primaryIFile->exists())
- {
- location = 0;
- ifile.set(primaryIFile);
- path.append(primaryName);
- return true;
- }
- }
- catch (IException *e)
- {
- ActPrintLog(&activity->queryContainer(), e, "In ensurePrimary");
- e->Release();
- }
- unsigned l;
- for (l=1; l<partDesc.numCopies(); l++)
- {
- RemoteFilename altRfn;
- partDesc.getFilename(l, altRfn);
- locationName.clear();
- altRfn.getPath(locationName);
- assertex(locationName.length());
- OwnedIFile backupIFile = createIFile(locationName.str());
- try
- {
- if (backupIFile->exists())
- {
- if (primaryRfn.isLocal())
- {
- ensureDirectoryForFile(primaryIFile->queryFilename());
- Owned<IException> e = MakeActivityWarning(activity, 0, "Primary file missing: %s, copying backup %s to primary location", primaryIFile->queryFilename(), locationName.str());
- activity->fireException(e);
- StringBuffer tmpName(primaryIFile->queryFilename());
- tmpName.append(".tmp");
- OwnedIFile tmpFile = createIFile(tmpName.str());
- CFIPScope fipScope(tmpName.str());
- copyFile(tmpFile, backupIFile);
- try
- {
- tmpFile->rename(pathTail(primaryIFile->queryFilename()));
- location = 0;
- ifile.set(primaryIFile);
- path.append(primaryName);
- }
- catch (IException *e)
- {
- try { tmpFile->remove(); } catch (IException *e) { ActPrintLog(&activity->queryContainer(), "Failed to delete temporary file"); e->Release(); }
- Owned<IException> e2 = MakeActivityWarning(activity, e, "Failed to restore primary, failed to rename %s to %s", tmpName.str(), primaryIFile->queryFilename());
- e->Release();
- activity->fireException(e2);
- ifile.set(backupIFile);
- location = l;
- path.append(locationName);
- }
- }
- else // JCSMORE - should use daliservix perhaps to ensure primary
- {
- Owned<IException> e = MakeActivityWarning(activity, 0, "Primary file missing: %s, using remote copy: %s", primaryIFile->queryFilename(), locationName.str());
- activity->fireException(e);
- ifile.set(backupIFile);
- location = l;
- path.append(locationName);
- }
- return true;
- }
- }
- catch (IException *e)
- {
- Owned<IThorException> e2 = MakeActivityException(activity, e);
- e->Release();
- throw e2.getClear();
- }
- }
- return false;
- }
- class CEnsurePrimaryPartFile : public CInterface, implements IReplicatedFile
- {
- CActivityBase &activity;
- Linked<IPartDescriptor> partDesc;
- StringAttr logicalFilename;
- Owned<IReplicatedFile> part;
- public:
- IMPLEMENT_IINTERFACE;
- CEnsurePrimaryPartFile(CActivityBase &_activity, const char *_logicalFilename, IPartDescriptor *_partDesc)
- : activity(_activity), logicalFilename(_logicalFilename), partDesc(_partDesc)
- {
- }
- virtual IFile *open()
- {
- unsigned location;
- OwnedIFile iFile;
- StringBuffer filePath;
- if (globals->getPropBool("@autoCopyBackup", true)?ensurePrimary(&activity, *partDesc, iFile, location, filePath):getBestFilePart(&activity, *partDesc, iFile, location, filePath, &activity))
- return iFile.getClear();
- else
- {
- StringBuffer locations;
- IException *e = MakeActivityException(&activity, TE_FileNotFound, "No physical file part for logical file %s, found at given locations: %s (Error = %d)", logicalFilename.get(), getFilePartLocations(*partDesc, locations).str(), GetLastError());
- EXCLOG(e, NULL);
- throw e;
- }
- }
- RemoteFilenameArray &queryCopies()
- {
- if(!part.get())
- part.setown(partDesc->getReplicatedFile());
- return part->queryCopies();
- }
- };
- IReplicatedFile *createEnsurePrimaryPartFile(CActivityBase &activity, const char *logicalFilename, IPartDescriptor *partDesc)
- {
- return new CEnsurePrimaryPartFile(activity, logicalFilename, partDesc);
- }
- ///////////////
- class CFileCache;
- class CLazyFileIO : public CInterface, implements IFileIO, implements IDelayedFile
- {
- CFileCache &cache;
- Owned<IReplicatedFile> repFile;
- Linked<IExpander> expander;
- bool compressed;
- StringAttr filename;
- CRuntimeStatisticCollection fileStats;
- CriticalSection crit;
- Owned<IFileIO> iFileIO; // real IFileIO
- void checkOpen(); // references CFileCache method
- public:
- IMPLEMENT_IINTERFACE;
- CLazyFileIO(CFileCache &_cache, const char *_filename, IReplicatedFile *_repFile, bool _compressed, IExpander *_expander)
- : cache(_cache), filename(_filename), repFile(_repFile), compressed(_compressed), expander(_expander), fileStats(diskLocalStatistics)
- {
- }
- ~CLazyFileIO()
- {
- iFileIO.clear();
- }
- const char *queryFindString() const { return filename.get(); } // for string HT
- // IFileIO impl.
- virtual size32_t read(offset_t pos, size32_t len, void * data)
- {
- CriticalBlock b(crit);
- checkOpen();
- return iFileIO->read(pos, len, data);
- }
- virtual offset_t size()
- {
- CriticalBlock b(crit);
- checkOpen();
- return iFileIO->size();
- }
- virtual size32_t write(offset_t pos, size32_t len, const void * data)
- {
- CriticalBlock b(crit);
- checkOpen();
- return iFileIO->write(pos, len, data);
- }
- virtual void flush()
- {
- CriticalBlock b(crit);
- if (iFileIO)
- iFileIO->flush();
- }
- virtual void close()
- {
- CriticalBlock b(crit);
- if (iFileIO)
- {
- mergeStats(fileStats, iFileIO);
- iFileIO->close();
- }
- iFileIO.clear();
- }
- virtual offset_t appendFile(IFile *file,offset_t pos=0,offset_t len=(offset_t)-1)
- {
- CriticalBlock b(crit);
- checkOpen();
- return iFileIO->appendFile(file, pos, len);
- }
- virtual void setSize(offset_t size)
- {
- CriticalBlock b(crit);
- checkOpen();
- iFileIO->setSize(size);
- }
- virtual unsigned __int64 getStatistic(StatisticKind kind)
- {
- switch (kind)
- {
- case StTimeDiskReadIO:
- return cycle_to_nanosec(getStatistic(StCycleDiskReadIOCycles));
- case StTimeDiskWriteIO:
- return cycle_to_nanosec(getStatistic(StCycleDiskWriteIOCycles));
- }
- CriticalBlock b(crit);
- unsigned __int64 openValue = iFileIO ? iFileIO->getStatistic(kind) : 0;
- return openValue + fileStats.getStatisticValue(kind);
- }
- // IDelayedFile impl.
- virtual IMemoryMappedFile *queryMappedFile() { return NULL; }
- virtual IFileIO *queryFileIO() { return this; }
- };
- class CFileCache : public CInterface, implements IThorFileCache
- {
- OwningStringSuperHashTableOf<CLazyFileIO> files;
- CICopyArrayOf<CLazyFileIO> openFiles;
- unsigned limit, purgeN;
- CriticalSection crit;
- class CDelayedFileWapper : public CInterface, implements IDelayedFile
- {
- CFileCache &cache;
- Linked<CLazyFileIO> lFile;
- public:
- IMPLEMENT_IINTERFACE;
- CDelayedFileWapper(CFileCache &_cache, CLazyFileIO &_lFile) : cache(_cache), lFile(&_lFile) { }
- ~CDelayedFileWapper()
- {
- cache.remove(*lFile);
- }
- // IDelayedFile impl.
- virtual IMemoryMappedFile *queryMappedFile() { return lFile->queryMappedFile(); }
- virtual IFileIO *queryFileIO() { return lFile->queryFileIO(); }
- };
- void purgeOldest()
- {
- // will be ordered oldest first.
- unsigned count = 0;
- CICopyArrayOf<CLazyFileIO> toClose;
- ForEachItemIn(o, openFiles)
- {
- CLazyFileIO &lFile = openFiles.item(o);
- toClose.append(lFile);
- if (++count>=purgeN) // crude for now, just remove oldest N
- break;
- }
- ForEachItemIn(r, toClose)
- {
- CLazyFileIO &lFile = toClose.item(r);
- lFile.close();
- openFiles.zap(lFile);
- }
- }
- bool _remove(CLazyFileIO &lFile)
- {
- bool ret = files.removeExact(&lFile);
- if (!ret) return false;
- openFiles.zap(lFile);
- return true;
- }
- public:
- IMPLEMENT_IINTERFACE;
- CFileCache(unsigned _limit) : limit(_limit)
- {
- assertex(limit);
- purgeN = globals->getPropInt("@fileCachePurgeN", 10);
- if (purgeN > limit) purgeN=limit; // why would it be, but JIC.
- PROGLOG("FileCache: limit = %d, purgeN = %d", limit, purgeN);
- }
- void opening(CLazyFileIO &lFile)
- {
- CriticalBlock b(crit);
- if (openFiles.ordinality() >= limit)
- {
- purgeOldest(); // will close purgeN
- assertex(openFiles.ordinality() < limit);
- }
- openFiles.zap(lFile);
- openFiles.append(lFile);
- }
- // IThorFileCache impl.
- virtual bool remove(IDelayedFile &dFile)
- {
- CLazyFileIO *lFile = QUERYINTERFACE(&dFile, CLazyFileIO);
- assertex(lFile);
- CriticalBlock b(crit);
- return _remove(*lFile);
- }
- virtual IDelayedFile *lookup(CActivityBase &activity, IPartDescriptor &partDesc, IExpander *expander)
- {
- StringBuffer filename;
- RemoteFilename rfn;
- partDesc.getFilename(0, rfn);
- rfn.getPath(filename);
- CriticalBlock b(crit);
- Linked<CLazyFileIO> file = files.find(filename.str());
- if (!file)
- {
- Owned<IReplicatedFile> repFile = createEnsurePrimaryPartFile(activity, filename.str(), &partDesc);
- bool compressed = partDesc.queryOwner().isCompressed();
- file.setown(new CLazyFileIO(*this, filename.str(), repFile.getClear(), compressed, expander));
- }
- files.replace(*LINK(file));
- return new CDelayedFileWapper(*this, *file); // to avoid circular dependency and allow destruction to remove from cache
- }
- };
- ////
- void CLazyFileIO::checkOpen()
- {
- CriticalBlock b(crit);
- if (iFileIO)
- return;
- cache.opening(*this);
- Owned<IFile> iFile = repFile->open();
- if (NULL != expander.get())
- iFileIO.setown(createCompressedFileReader(iFile, expander));
- else if (compressed)
- iFileIO.setown(createCompressedFileReader(iFile));
- else
- iFileIO.setown(iFile->open(IFOread));
- if (!iFileIO.get())
- throw MakeThorException(0, "CLazyFileIO: failed to open: %s", filename.get());
- }
- IThorFileCache *createFileCache(unsigned limit)
- {
- return new CFileCache(limit);
- }
- /*
- * strand stuff
- */
- IEngineRowStream *connectSingleStream(CActivityBase &activity, IThorDataLink *input, unsigned idx, Owned<IStrandJunction> &junction, bool consumerOrdered)
- {
- if (input)
- {
- PointerArrayOf<IEngineRowStream> instreams;
- junction.setown(input->getOutputStreams(activity, idx, instreams, nullptr, consumerOrdered, nullptr));
- if (instreams.length() != 1)
- {
- assertex(instreams.length());
- if (!junction)
- junction.setown(createStrandJunction(*activity.queryRowManager(), instreams.length(), 1, activity.getOptInt("strandBlockSize"), false));
- ForEachItemIn(stream, instreams)
- {
- junction->setInput(stream, instreams.item(stream));
- }
- return junction->queryOutput(0);
- }
- else
- return instreams.item(0);
- }
- else
- return nullptr;
- }
- IEngineRowStream *connectSingleStream(CActivityBase &activity, IThorDataLink *input, unsigned idx, bool consumerOrdered)
- {
- Owned<IStrandJunction> junction;
- IEngineRowStream * result = connectSingleStream(activity, input, idx, junction, consumerOrdered);
- assertex(!junction);
- return result;
- }
|