thgraphslave.cpp 68 KB


  1. /*##############################################################################
  2. HPCC SYSTEMS software Copyright (C) 2012 HPCC Systems®.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. ############################################################################## */
  13. #include "jlib.hpp"
  14. #include "jlzw.hpp"
  15. #include "jhtree.hpp"
  16. #include "daclient.hpp"
  17. #include "commonext.hpp"
  18. #include "thorplugin.hpp"
  19. #include "thcodectx.hpp"
  20. #include "thmem.hpp"
  21. #include "thorport.hpp"
  22. #include "slwatchdog.hpp"
  23. #include "thgraphslave.hpp"
  24. #include "thcompressutil.hpp"
  25. #include "enginecontext.hpp"
  26. //////////////////////////////////
  27. class CBarrierSlave : public CInterface, implements IBarrier
  28. {
  29. mptag_t tag;
  30. Linked<ICommunicator> comm;
  31. bool receiving;
  32. CJobChannel &jobChannel;
  33. public:
  34. IMPLEMENT_IINTERFACE;
  35. CBarrierSlave(CJobChannel &_jobChannel, ICommunicator &_comm, mptag_t _tag) : jobChannel(_jobChannel), comm(&_comm), tag(_tag)
  36. {
  37. receiving = false;
  38. }
  39. virtual bool wait(bool exception, unsigned timeout)
  40. {
  41. Owned<IException> e;
  42. CTimeMon tm(timeout);
  43. unsigned remaining = timeout;
  44. CMessageBuffer msg;
  45. msg.append(false);
  46. msg.append(false); // no exception
  47. if (INFINITE != timeout && tm.timedout(&remaining))
  48. {
  49. if (exception)
  50. throw createBarrierAbortException();
  51. else
  52. return false;
  53. }
  54. if (!comm->send(msg, 0, tag, INFINITE != timeout ? remaining : LONGTIMEOUT))
  55. throw MakeStringException(0, "CBarrierSlave::wait - Timeout sending to master");
  56. msg.clear();
  57. if (INFINITE != timeout && tm.timedout(&remaining))
  58. {
  59. if (exception)
  60. throw createBarrierAbortException();
  61. else
  62. return false;
  63. }
  64. {
  65. BooleanOnOff onOff(receiving);
  66. if (!comm->recv(msg, 0, tag, NULL, remaining))
  67. return false;
  68. }
  69. bool aborted;
  70. msg.read(aborted);
  71. bool hasExcept;
  72. msg.read(hasExcept);
  73. if (hasExcept)
  74. e.setown(deserializeException(msg));
  75. if (aborted)
  76. {
  77. if (!exception)
  78. return false;
  79. if (e)
  80. throw e.getClear();
  81. else
  82. throw createBarrierAbortException();
  83. }
  84. return true;
  85. }
  86. virtual void cancel(IException *e)
  87. {
  88. if (receiving)
  89. comm->cancel(jobChannel.queryMyRank(), tag);
  90. CMessageBuffer msg;
  91. msg.append(true);
  92. if (e)
  93. {
  94. msg.append(true);
  95. serializeException(e, msg);
  96. }
  97. else
  98. msg.append(false);
  99. if (!comm->send(msg, 0, tag, LONGTIMEOUT))
  100. throw MakeStringException(0, "CBarrierSlave::cancel - Timeout sending to master");
  101. }
  102. virtual const mptag_t queryTag() const { return tag; }
  103. };
  104. //
  105. CSlaveActivity::CSlaveActivity(CGraphElementBase *_container) : CActivityBase(_container), CEdgeProgress(this)
  106. {
  107. data = NULL;
  108. }
  109. CSlaveActivity::~CSlaveActivity()
  110. {
  111. inputs.kill();
  112. outputs.kill();
  113. if (data) delete [] data;
  114. ActPrintLog("DESTROYED");
  115. }
  116. void CSlaveActivity::setOutputStream(unsigned index, IEngineRowStream *stream)
  117. {
  118. while (outputStreams.ordinality()<=index)
  119. outputStreams.append(nullptr);
  120. outputStreams.replace(stream, index);
  121. }
  122. void CSlaveActivity::setInput(unsigned index, CActivityBase *inputActivity, unsigned inputOutIdx)
  123. {
  124. CActivityBase::setInput(index, inputActivity, inputOutIdx);
  125. Linked<IThorDataLink> outLink;
  126. if (!inputActivity)
  127. {
  128. Owned<CActivityBase> nullAct = container.factory(TAKnull);
  129. outLink.set(((CSlaveActivity *)(nullAct.get()))->queryOutput(0)); // NB inputOutIdx irrelevant, null has single 'fake' output
  130. nullAct->releaseIOs(); // normally done as graph winds up, clear now to avoid circular dependencies with outputs
  131. }
  132. else
  133. outLink.set(((CSlaveActivity *)inputActivity)->queryOutput(inputOutIdx));
  134. assertex(outLink);
  135. while (inputs.ordinality()<=index)
  136. inputs.append(* new CThorInput());
  137. CThorInput &newInput = inputs.item(index);
  138. newInput.set(outLink, inputOutIdx);
  139. if (0 == index && !input)
  140. {
  141. input = outLink;
  142. inputSourceIdx = inputOutIdx;
  143. }
  144. }
  145. void CSlaveActivity::connectInputStreams(bool consumerOrdered)
  146. {
  147. ForEachItemIn(index, inputs)
  148. {
  149. CThorInput &_input = inputs.item(index);
  150. if (_input.itdl)
  151. setInputStream(index, _input, consumerOrdered);
  152. }
  153. }
  154. void CSlaveActivity::setInputStream(unsigned index, CThorInput &_input, bool consumerOrdered)
  155. {
  156. if (_input.itdl)
  157. {
  158. Owned<IStrandJunction> junction;
  159. IEngineRowStream *_inputStream = connectSingleStream(*this, _input.itdl, _input.sourceIdx, junction, _input.itdl->isInputOrdered(consumerOrdered));
  160. if (queryJob().getOptBool("TRACEROWS"))
  161. {
  162. const unsigned numTraceRows = queryJob().getOptInt("numTraceRows", 10);
  163. CTracingStream *tracingStream = new CTracingStream(_input.itdl, _inputStream, _input.itdl->queryFromActivity()->queryHelper(), numTraceRows);
  164. _input.tracingStream.setown(tracingStream);
  165. _inputStream = tracingStream;
  166. }
  167. _input.stream.set(_inputStream);
  168. _input.junction.setown(junction.getClear());
  169. if (0 == index)
  170. inputStream = _inputStream;
  171. _input.itdl->setOutputStream(_input.sourceIdx, LINK(_inputStream)); // used by debug request only at moment. // JCSMORE - this should probably be the junction outputstream if there is one
  172. }
  173. }
  174. IEngineRowStream *CSlaveActivity::replaceInputStream(unsigned index, IEngineRowStream *_inputStream)
  175. {
  176. CThorInput &_input = inputs.item(index);
  177. IEngineRowStream *prevInputStream = _input.stream.getClear();
  178. _input.stream.setown(_inputStream);
  179. if (0 == index)
  180. inputStream = _inputStream;
  181. return prevInputStream;
  182. }
  183. void CSlaveActivity::setLookAhead(unsigned index, IStartableEngineRowStream *lookAhead)
  184. {
  185. CThorInput &_input = inputs.item(index);
  186. _input.lookAhead.setown(lookAhead);
  187. _input.stream.set(lookAhead);
  188. if (0 == index)
  189. inputStream = lookAhead;
  190. }
  191. IStrandJunction *CSlaveActivity::getOutputStreams(CActivityBase &ctx, unsigned idx, PointerArrayOf<IEngineRowStream> &streams, const CThorStrandOptions * consumerOptions, bool consumerOrdered, IOrderedCallbackCollection * orderedCallbacks)
  192. {
  193. // Default non-stranded implementation, expects activity to have 1 output.
  194. // By default, activities are assumed NOT to support streams
  195. bool inputOrdered = isInputOrdered(consumerOrdered);
  196. connectInputStreams(inputOrdered);
  197. // Return a single stream
  198. // Default activity impl. adds single output as stream
  199. streams.append(this);
  200. return nullptr;
  201. }
  202. void CSlaveActivity::appendOutput(IThorDataLink *itdl)
  203. {
  204. outputs.append(itdl);
  205. }
  206. void CSlaveActivity::appendOutputLinked(IThorDataLink *itdl)
  207. {
  208. if (itdl)
  209. itdl->Link();
  210. appendOutput(itdl);
  211. }
  212. IThorDataLink *CSlaveActivity::queryOutput(unsigned index) const
  213. {
  214. if (index>=outputs.ordinality()) return nullptr;
  215. return outputs.item(index);
  216. }
  217. IThorDataLink *CSlaveActivity::queryInput(unsigned index) const
  218. {
  219. if (index>=inputs.ordinality()) return nullptr;
  220. return inputs.item(index).itdl;
  221. }
  222. IEngineRowStream *CSlaveActivity::queryInputStream(unsigned index) const
  223. {
  224. if (index>=inputs.ordinality()) return nullptr;
  225. return inputs.item(index).stream;
  226. }
  227. IStrandJunction *CSlaveActivity::queryInputJunction(unsigned index) const
  228. {
  229. if (index>=inputs.ordinality()) return nullptr;
  230. return inputs.item(index).junction;
  231. }
  232. IEngineRowStream *CSlaveActivity::queryOutputStream(unsigned index) const
  233. {
  234. if (index>=outputStreams.ordinality()) return nullptr;
  235. return outputStreams.item(index);
  236. }
  237. void CSlaveActivity::start()
  238. {
  239. if (inputs.ordinality()>1)
  240. throwUnexpected();
  241. if (input)
  242. startInput(0);
  243. dataLinkStart();
  244. }
  245. void CSlaveActivity::startAllInputs()
  246. {
  247. ForEachItemIn(i, inputs)
  248. {
  249. startInput(i);
  250. }
  251. }
  252. void CSlaveActivity::startInput(unsigned index, const char *extra)
  253. {
  254. VStringBuffer s("Starting input %u", index);
  255. if (extra)
  256. s.append(" ").append(extra);
  257. ActPrintLog("%s", s.str());
  258. CThorInput &_input = inputs.item(index);
  259. #ifdef TRACE_STARTSTOP_EXCEPTIONS
  260. try
  261. {
  262. #endif
  263. _input.itdl->start();
  264. startJunction(_input.junction);
  265. if (_input.lookAhead)
  266. _input.lookAhead->start();
  267. _input.stopped = false;
  268. _input.started = true;
  269. if (0 == index)
  270. inputStopped = false;
  271. #ifdef TRACE_STARTSTOP_EXCEPTIONS
  272. }
  273. catch(IException *e)
  274. {
  275. ActPrintLog(e, "%s", s.str());
  276. throw;
  277. }
  278. #endif
  279. }
  280. void CSlaveActivity::stop()
  281. {
  282. if (input)
  283. stopInput(0);
  284. dataLinkStop();
  285. }
  286. void CSlaveActivity::stopInput(unsigned index, const char *extra)
  287. {
  288. CThorInput &_input = inputs.item(index);
  289. if (_input.stopped)
  290. return;
  291. VStringBuffer s("Stopping input %u for", index);
  292. if (extra)
  293. s.append(" ").append(extra);
  294. ActPrintLog("%s", s.str());
  295. #ifdef TRACE_STARTSTOP_EXCEPTIONS
  296. try
  297. {
  298. #endif
  299. if (_input.stream)
  300. _input.stream->stop();
  301. _input.stopped = true;
  302. if (0 == index)
  303. inputStopped = true;
  304. #ifdef TRACE_STARTSTOP_EXCEPTIONS
  305. }
  306. catch(IException * e)
  307. {
  308. ActPrintLog(e, "%s", s.str());
  309. throw;
  310. }
  311. #endif
  312. }
  313. void CSlaveActivity::stopAllInputs()
  314. {
  315. ForEachItemIn(i, inputs)
  316. {
  317. stopInput(i);
  318. }
  319. }
  320. void CSlaveActivity::reset()
  321. {
  322. CActivityBase::reset();
  323. ForEachItemIn(i, inputs)
  324. inputs.item(i).reset();
  325. inputStopped = false;
  326. }
  327. void CSlaveActivity::releaseIOs()
  328. {
  329. // inputs.kill(); // don't want inputs to die before this dies (release in deconstructor) // JCSMORE not sure why care particularly.
  330. outputs.kill(); // outputs tend to be self-references, this clears them explicitly, otherwise end up leaking with circular references.
  331. outputStreams.kill();
  332. }
  333. void CSlaveActivity::clearConnections()
  334. {
  335. outputStreams.kill();
  336. inputs.kill();
  337. }
  338. MemoryBuffer &CSlaveActivity::queryInitializationData(unsigned slave) const
  339. {
  340. CriticalBlock b(crit);
  341. if (!data)
  342. data = new MemoryBuffer[container.queryJob().querySlaves()];
  343. CMessageBuffer msg;
  344. graph_id gid = queryContainer().queryOwner().queryGraphId();
  345. msg.append(smt_dataReq);
  346. msg.append(slave);
  347. msg.append(gid);
  348. msg.append(container.queryId());
  349. if (!queryJobChannel().queryJobComm().sendRecv(msg, 0, queryContainer().queryJob().querySlaveMpTag(), LONGTIMEOUT))
  350. throwUnexpected();
  351. data[slave].swapWith(msg);
  352. return data[slave];
  353. }
  354. MemoryBuffer &CSlaveActivity::getInitializationData(unsigned slave, MemoryBuffer &mb) const
  355. {
  356. return mb.append(queryInitializationData(slave));
  357. }
  358. unsigned __int64 CSlaveActivity::queryLocalCycles() const
  359. {
  360. unsigned __int64 inputCycles = 0;
  361. if (1 == inputs.ordinality())
  362. {
  363. IThorDataLink *input = queryInput(0);
  364. inputCycles += input->queryTotalCycles();
  365. }
  366. else
  367. {
  368. switch (container.getKind())
  369. {
  370. case TAKif:
  371. case TAKchildif:
  372. case TAKifaction:
  373. case TAKcase:
  374. case TAKchildcase:
  375. if (inputs.ordinality() && (((unsigned)-1) != container.whichBranch))
  376. {
  377. IThorDataLink *input = queryInput(container.whichBranch);
  378. if (input)
  379. inputCycles += input->queryTotalCycles();
  380. }
  381. break;
  382. default:
  383. ForEachItemIn(i, inputs)
  384. {
  385. IThorDataLink *input = queryInput(i);
  386. inputCycles += input->queryTotalCycles();
  387. }
  388. break;
  389. }
  390. }
  391. unsigned __int64 _totalCycles = queryTotalCycles();
  392. if (_totalCycles < inputCycles) // not sure how/if possible, but guard against
  393. return 0;
  394. return _totalCycles-inputCycles;
  395. }
  396. void CSlaveActivity::serializeStats(MemoryBuffer &mb)
  397. {
  398. CriticalBlock b(crit);
  399. mb.append((unsigned __int64)cycle_to_nanosec(queryLocalCycles()));
  400. ForEachItemIn(i, outputs)
  401. {
  402. IThorDataLink *output = queryOutput(i);
  403. if (output)
  404. outputs.item(i)->dataLinkSerialize(mb);
  405. else
  406. serializeNullItdl(mb);
  407. }
  408. }
  409. void CSlaveActivity::debugRequest(unsigned edgeIdx, MemoryBuffer &msg)
  410. {
  411. IEngineRowStream *outputStream = queryOutputStream(edgeIdx);
  412. IThorDebug *debug = QUERYINTERFACE(outputStream, IThorDebug); // should probably use an extended IEngineRowStream, or store in separate array instead
  413. if (debug) debug->debugRequest(msg);
  414. }
  415. bool CSlaveActivity::isGrouped() const
  416. {
  417. if (!input) return false; // should possible be an error if query and not set
  418. return input->isGrouped();
  419. }
  420. IOutputMetaData *CSlaveActivity::queryOutputMeta() const
  421. {
  422. return queryHelper()->queryOutputMeta();
  423. }
  424. void CSlaveActivity::dataLinkSerialize(MemoryBuffer &mb) const
  425. {
  426. CEdgeProgress::dataLinkSerialize(mb);
  427. }
  428. rowcount_t CSlaveActivity::getProgressCount() const
  429. {
  430. return CEdgeProgress::getCount();
  431. }
  432. void CSlaveActivity::debugRequest(MemoryBuffer &msg)
  433. {
  434. }
  435. /// CThorStrandProcessor
  436. CThorStrandProcessor::CThorStrandProcessor(CThorStrandedActivity &_parent, IEngineRowStream *_inputStream, unsigned _outputId)
  437. : parent(_parent), inputStream(_inputStream), outputId(_outputId), timeActivities(_parent.queryTimeActivities())
  438. {
  439. rowsProcessed = 0;
  440. baseHelper.set(parent.queryHelper());
  441. }
  442. void CThorStrandProcessor::processAndThrowOwnedException(IException *_e)
  443. {
  444. IThorException *e = QUERYINTERFACE(_e, IThorException);
  445. if (e)
  446. {
  447. if (!e->queryActivityId())
  448. setExceptionActivityInfo(parent.queryContainer(), e);
  449. }
  450. else
  451. {
  452. e = MakeActivityException(&parent, _e);
  453. _e->Release();
  454. }
  455. throw e;
  456. }
  457. void CThorStrandProcessor::stop()
  458. {
  459. if (!stopped)
  460. {
  461. if (inputStream)
  462. inputStream->stop();
  463. parent.strandedStop();
  464. }
  465. stopped = true;
  466. }
  467. /// CThorStrandedActivity
  468. void CThorStrandedActivity::onStartStrands()
  469. {
  470. active = strands.ordinality();
  471. ForEachItemIn(idx, strands)
  472. strands.item(idx).start();
  473. }
  474. void CThorStrandedActivity::strandedStop()
  475. {
  476. // Called from the strands... which should ensure that stop is not called more than once per strand
  477. //The first strand to call
  478. if (active)
  479. --active;
  480. if (!active)
  481. stop();
  482. }
  483. //This function is pure (But also implemented out of line) to force the derived classes to implement it.
  484. //After calling the base class start method, and initialising any values from the helper they must call onStartStrands(),
  485. //this must also happen before any rows are read from the strands (e.g., by a source junction)
  486. // virtual void start(unsigned parentExtractSize, const byte *parentExtract, bool paused) = 0;
  487. //For some reason gcc doesn't let you specify a function as pure virtual and define it at the same time.
  488. void CThorStrandedActivity::start()
  489. {
  490. CSlaveActivity::start();
  491. startJunction(splitter);
  492. onStartStrands();
  493. }
  494. void CThorStrandedActivity::reset()
  495. {
  496. assertex(active==0);
  497. ForEachItemIn(idx, strands)
  498. strands.item(idx).reset();
  499. resetJunction(splitter);
  500. CSlaveActivity::reset();
  501. resetJunction(sourceJunction);
  502. }
  503. IStrandJunction *CThorStrandedActivity::getOutputStreams(CActivityBase &ctx, unsigned idx, PointerArrayOf<IEngineRowStream> &streams, const CThorStrandOptions * consumerOptions, bool consumerOrdered, IOrderedCallbackCollection * orderedCallbacks)
  504. {
  505. assertex(idx == 0);
  506. assertex(strands.empty());
  507. // JCSMORE these may be wrong if this is a source activity
  508. bool inputOrdered = input ? input->isInputOrdered(consumerOrdered) : isInputOrdered(consumerOrdered);
  509. //Note, numStrands == 1 is an explicit request to disable threading
  510. if (consumerOptions && (consumerOptions->numStrands != 1) && (strandOptions.numStrands != 1))
  511. {
  512. //Check to see if the consumer's settings should override
  513. if (strandOptions.numStrands == 0)
  514. {
  515. strandOptions.numStrands = consumerOptions->numStrands;
  516. strandOptions.blockSize = consumerOptions->blockSize;
  517. }
  518. else if (consumerOptions->numStrands > strandOptions.numStrands)
  519. {
  520. strandOptions.numStrands = consumerOptions->numStrands;
  521. }
  522. }
  523. Owned <IStrandJunction> recombiner;
  524. if (input)
  525. {
  526. if (strandOptions.numStrands == 1)
  527. {
  528. // 1 means explicitly requested single-strand.
  529. Owned<IStrandJunction> junction;
  530. IEngineRowStream *instream = connectSingleStream(ctx, input, inputSourceIdx, junction, inputOrdered);
  531. inputs.item(idx).junction.setown(junction.getClear());
  532. strands.append(*createStrandProcessor(instream));
  533. }
  534. else
  535. {
  536. PointerArrayOf<IEngineRowStream> instreams;
  537. recombiner.setown(input->getOutputStreams(ctx, inputSourceIdx, instreams, &strandOptions, inputOrdered, orderedCallbacks));
  538. if ((instreams.length() == 1) && (strandOptions.numStrands != 0)) // 0 means did not specify - we should use the strands that our upstream provides
  539. {
  540. assertex(recombiner == nullptr);
  541. // Create a splitter to split the input into n... and a recombiner if need to preserve sorting
  542. if (inputOrdered)
  543. {
  544. branch.setown(createStrandBranch(*ctx.queryRowManager(), strandOptions.numStrands, strandOptions.blockSize, true, input->queryOutputMeta()->isGrouped(), false, orderedCallbacks));
  545. splitter.set(branch->queryInputJunction());
  546. recombiner.set(branch->queryOutputJunction());
  547. }
  548. else
  549. {
  550. splitter.setown(createStrandJunction(*ctx.queryRowManager(), 1, strandOptions.numStrands, strandOptions.blockSize, false));
  551. }
  552. splitter->setInput(0, instreams.item(0));
  553. for (unsigned strandNo = 0; strandNo < strandOptions.numStrands; strandNo++)
  554. strands.append(*createStrandProcessor(splitter->queryOutput(strandNo)));
  555. }
  556. else
  557. {
  558. // Ignore my hint and just use the width already split into...
  559. ForEachItemIn(strandNo, instreams)
  560. strands.append(*createStrandProcessor(instreams.item(strandNo)));
  561. }
  562. }
  563. }
  564. else
  565. {
  566. unsigned numStrands = strandOptions.numStrands ? strandOptions.numStrands : 1;
  567. for (unsigned i=0; i < numStrands; i++)
  568. strands.append(*createStrandSourceProcessor(inputOrdered));
  569. if (inputOrdered && (numStrands > 1))
  570. {
  571. if (consumerOptions)
  572. {
  573. //If the output activities are also stranded then need to create a version of the branch
  574. bool isGrouped = queryOutputMeta()->isGrouped();
  575. branch.setown(createStrandBranch(*ctx.queryRowManager(), strandOptions.numStrands, strandOptions.blockSize, true, isGrouped, true, orderedCallbacks));
  576. sourceJunction.set(branch->queryInputJunction());
  577. recombiner.set(branch->queryOutputJunction());
  578. assertex((orderedCallbacks && !recombiner) || (!orderedCallbacks && recombiner));
  579. //This is different from the branch above. The first "junction" has the source activity as the input, and the outputs as the result of the activity
  580. for (unsigned strandNo = 0; strandNo < strandOptions.numStrands; strandNo++)
  581. {
  582. sourceJunction->setInput(strandNo, &strands.item(strandNo));
  583. streams.append(sourceJunction->queryOutput(strandNo));
  584. }
  585. #ifdef TRACE_STRANDS
  586. if (traceLevel > 2)
  587. DBGLOG("Executing activity %u with %u strands", activityId, strands.ordinality());
  588. #endif
  589. return recombiner.getClear();
  590. }
  591. else
  592. recombiner.setown(createStrandJunction(*ctx.queryRowManager(), numStrands, 1, strandOptions.blockSize, inputOrdered));
  593. }
  594. }
  595. ForEachItemIn(i, strands)
  596. streams.append(&strands.item(i));
  597. #ifdef TRACE_STRANDS
  598. if (traceLevel > 2)
  599. DBGLOG("Executing activity %u with %u strands", activityId, strands.ordinality());
  600. #endif
  601. return recombiner.getClear();
  602. }
  603. unsigned __int64 CThorStrandedActivity::queryTotalCycles() const
  604. {
  605. unsigned __int64 total = 0;;
  606. ForEachItemIn(i, strands)
  607. {
  608. CThorStrandProcessor &strand = strands.item(i);
  609. total += strand.queryTotalCycles();
  610. }
  611. return total;
  612. }
  613. void CThorStrandedActivity::dataLinkSerialize(MemoryBuffer &mb) const
  614. {
  615. mb.append(getProgressCount());
  616. }
  617. rowcount_t CThorStrandedActivity::getProgressCount() const
  618. {
  619. rowcount_t totalCount = getCount();
  620. ForEachItemIn(i, strands)
  621. {
  622. CThorStrandProcessor &strand = strands.item(i);
  623. totalCount += strand.getCount();
  624. }
  625. return totalCount;
  626. }
  627. // CSlaveLateStartActivity
  628. void CSlaveLateStartActivity::lateStart(bool any)
  629. {
  630. prefiltered = !any;
  631. if (!prefiltered)
  632. startInput(0);
  633. else
  634. stopInput(0);
  635. }
  636. void CSlaveLateStartActivity::start()
  637. {
  638. Linked<CThorInput> savedInput = &inputs.item(0);
  639. if (!nullInput)
  640. {
  641. nullInput.setown(new CThorInput);
  642. nullInput->sourceIdx = savedInput->sourceIdx; // probably not needed
  643. }
  644. inputs.replace(* nullInput.getLink(), 0);
  645. input = NULL;
  646. CSlaveActivity::start();
  647. inputs.replace(* savedInput.getClear(), 0);
  648. input = inputs.item(0).itdl;
  649. }
  650. void CSlaveLateStartActivity::stop()
  651. {
  652. if (!prefiltered)
  653. {
  654. stopInput(0);
  655. dataLinkStop();
  656. }
  657. }
  658. void CSlaveLateStartActivity::reset()
  659. {
  660. CSlaveActivity::reset();
  661. prefiltered = false;
  662. }
  663. // CSlaveGraph
  664. CSlaveGraph::CSlaveGraph(CJobChannel &jobChannel) : CGraphBase(jobChannel)
  665. {
  666. jobS = (CJobSlave *)&jobChannel.queryJob();
  667. }
  668. void CSlaveGraph::init(MemoryBuffer &mb)
  669. {
  670. mpTag = queryJobChannel().deserializeMPTag(mb);
  671. startBarrierTag = queryJobChannel().deserializeMPTag(mb);
  672. waitBarrierTag = queryJobChannel().deserializeMPTag(mb);
  673. doneBarrierTag = queryJobChannel().deserializeMPTag(mb);
  674. startBarrier = queryJobChannel().createBarrier(startBarrierTag);
  675. waitBarrier = queryJobChannel().createBarrier(waitBarrierTag);
  676. if (doneBarrierTag != TAG_NULL)
  677. doneBarrier = queryJobChannel().createBarrier(doneBarrierTag);
  678. initialized = false;
  679. progressActive = progressToCollect = false;
  680. unsigned subCount;
  681. mb.read(subCount);
  682. while (subCount--)
  683. {
  684. graph_id gid;
  685. mb.read(gid);
  686. Owned<CSlaveGraph> subGraph = (CSlaveGraph *)queryJobChannel().getGraph(gid);
  687. subGraph->init(mb);
  688. }
  689. }
  690. void CSlaveGraph::initWithActData(MemoryBuffer &in, MemoryBuffer &out)
  691. {
  692. activity_id id;
  693. loop
  694. {
  695. in.read(id);
  696. if (0 == id) break;
  697. CSlaveGraphElement *element = (CSlaveGraphElement *)queryElement(id);
  698. assertex(element);
  699. out.append(id);
  700. out.append((size32_t)0);
  701. unsigned l = out.length();
  702. size32_t sz;
  703. in.read(sz);
  704. unsigned aread = in.getPos();
  705. CSlaveActivity *activity = (CSlaveActivity *)element->queryActivity();
  706. assertex(activity);
  707. element->sentActInitData->set(0);
  708. activity->init(in, out);
  709. aread = in.getPos()-aread;
  710. if (aread<sz)
  711. {
  712. Owned<IException> e = MakeActivityException(element, TE_SeriailzationError, "Serialization error - activity did not read all serialized data (%d byte(s) remaining)", sz-aread);
  713. in.readDirect(sz-aread);
  714. throw e.getClear();
  715. }
  716. else if (aread>sz)
  717. throw MakeActivityException(element, TE_SeriailzationError, "Serialization error - activity read beyond serialized data (%d byte(s))", aread-sz);
  718. activity->setInitialized(true);
  719. size32_t dl = out.length() - l;
  720. if (dl)
  721. out.writeDirect(l-sizeof(size32_t), sizeof(size32_t), &dl);
  722. else
  723. out.setLength(l-(sizeof(activity_id)+sizeof(size32_t)));
  724. }
  725. out.append((activity_id)0);
  726. }
  727. bool CSlaveGraph::recvActivityInitData(size32_t parentExtractSz, const byte *parentExtract)
  728. {
  729. bool ret = true;
  730. unsigned needActInit = 0;
  731. unsigned uninitialized = 0;
  732. Owned<IThorActivityIterator> iter = getConnectedIterator();
  733. ForEach(*iter)
  734. {
  735. CGraphElementBase &element = (CGraphElementBase &)iter->query();
  736. CActivityBase *activity = element.queryActivity();
  737. if (activity)
  738. {
  739. if (activity->needReInit())
  740. {
  741. element.sentActInitData->set(0, false); // force act init to be resent
  742. activity->setInitialized(false);
  743. }
  744. if (!element.sentActInitData->test(0))
  745. ++needActInit;
  746. if (!activity->queryInitialized())
  747. ++uninitialized;
  748. }
  749. }
  750. if (0 == uninitialized)
  751. return true;
  752. mptag_t replyTag = TAG_NULL;
  753. size32_t len = 0;
  754. CMessageBuffer actInitRtnData;
  755. actInitRtnData.append(false);
  756. CMessageBuffer msg;
  757. if (syncInitData())
  758. {
  759. if (!graphCancelHandler.recv(queryJobChannel().queryJobComm(), msg, 0, mpTag, NULL, LONGTIMEOUT))
  760. throw MakeStringException(0, "Error receiving actinit data for graph: %" GIDPF "d", graphId);
  761. replyTag = msg.getReplyTag();
  762. msg.read(len);
  763. }
  764. else
  765. {
  766. if (needActInit)
  767. {
  768. // initialize any for which no data was sent
  769. msg.append(smt_initActDataReq); // may cause graph to be created at master
  770. msg.append(queryGraphId());
  771. msg.append(queryJobChannel().queryMyRank()-1);
  772. assertex(!parentExtractSz || NULL!=parentExtract);
  773. msg.append(parentExtractSz);
  774. msg.append(parentExtractSz, parentExtract);
  775. // NB: will only request activities that need initializaton data (those that override CSlaveActivity::init())
  776. ForEach(*iter)
  777. {
  778. CSlaveGraphElement &element = (CSlaveGraphElement &)iter->query();
  779. CActivityBase *activity = element.queryActivity();
  780. if (activity)
  781. {
  782. if (!element.sentActInitData->test(0))
  783. {
  784. msg.append(element.queryId());
  785. // JCSMORE -> GH - do you always generate a start context serializer?
  786. element.serializeStartContext(msg);
  787. }
  788. }
  789. }
  790. msg.append((activity_id)0);
  791. if (!queryJobChannel().queryJobComm().sendRecv(msg, 0, queryJob().querySlaveMpTag(), LONGTIMEOUT))
  792. throwUnexpected();
  793. replyTag = queryJobChannel().deserializeMPTag(msg);
  794. bool error;
  795. msg.read(error);
  796. if (error)
  797. {
  798. Owned<IException> e = deserializeException(msg);
  799. EXCLOG(e, "Master hit exception");
  800. msg.clear();
  801. if (!queryJobChannel().queryJobComm().send(msg, 0, replyTag, LONGTIMEOUT))
  802. throw MakeStringException(0, "Timeout sending init data back to master");
  803. throw e.getClear();
  804. }
  805. msg.read(len);
  806. }
  807. }
  808. if (len)
  809. {
  810. try
  811. {
  812. MemoryBuffer actInitData;
  813. actInitData.append(len, msg.readDirect(len));
  814. CriticalBlock b(progressCrit);
  815. initialized = true;
  816. initWithActData(actInitData, actInitRtnData);
  817. }
  818. catch (IException *e)
  819. {
  820. actInitRtnData.clear();
  821. actInitRtnData.append(true);
  822. serializeThorException(e, actInitRtnData);
  823. e->Release();
  824. ret = false;
  825. }
  826. }
  827. if (syncInitData() || needActInit)
  828. {
  829. if (!queryJobChannel().queryJobComm().send(actInitRtnData, 0, replyTag, LONGTIMEOUT))
  830. throw MakeStringException(0, "Timeout sending init data back to master");
  831. }
  832. // initialize any for which no data was sent
  833. ForEach(*iter)
  834. {
  835. CSlaveGraphElement &element = (CSlaveGraphElement &)iter->query();
  836. CSlaveActivity *activity = (CSlaveActivity *)element.queryActivity();
  837. if (activity && !activity->queryInitialized())
  838. {
  839. activity->setInitialized(true);
  840. element.sentActInitData->set(0);
  841. MemoryBuffer in, out;
  842. activity->init(in, out);
  843. assertex(0 == out.length());
  844. }
  845. }
  846. return ret;
  847. }
  848. bool CSlaveGraph::preStart(size32_t parentExtractSz, const byte *parentExtract)
  849. {
  850. if (!recvActivityInitData(parentExtractSz, parentExtract))
  851. throw MakeThorException(0, "preStart failure");
  852. CGraphBase::preStart(parentExtractSz, parentExtract);
  853. if (isGlobal())
  854. {
  855. if (!startBarrier->wait(false))
  856. return false;
  857. }
  858. return true;
  859. }
  860. void CSlaveGraph::start()
  861. {
  862. {
  863. SpinBlock b(progressActiveLock);
  864. progressActive = true;
  865. progressToCollect = true;
  866. }
  867. bool forceAsync = !queryOwner() || isGlobal();
  868. Owned<IThorActivityIterator> iter = getSinkIterator();
  869. unsigned sinks = 0;
  870. ForEach(*iter)
  871. ++sinks;
  872. ForEach(*iter)
  873. {
  874. CGraphElementBase &container = iter->query();
  875. CActivityBase *sinkAct = (CActivityBase *)container.queryActivity();
  876. --sinks;
  877. sinkAct->startProcess(forceAsync || 0 != sinks); // async, unless last
  878. }
  879. if (!queryOwner())
  880. {
  881. if (globals->getPropBool("@watchdogProgressEnabled"))
  882. jobS->queryProgressHandler()->startGraph(*this);
  883. }
  884. }
  885. void CSlaveGraph::connect()
  886. {
  887. CriticalBlock b(progressCrit);
  888. Owned<IThorActivityIterator> iter = getConnectedIterator(false);
  889. ForEach(*iter)
  890. iter->query().doconnect();
  891. iter.setown(getSinkIterator());
  892. ForEach(*iter)
  893. {
  894. CGraphElementBase &container = iter->query();
  895. CSlaveActivity *sinkAct = (CSlaveActivity *)container.queryActivity();
  896. sinkAct->connectInputStreams(true);
  897. }
  898. }
  899. void CSlaveGraph::executeSubGraph(size32_t parentExtractSz, const byte *parentExtract)
  900. {
  901. if (isComplete())
  902. return;
  903. Owned<IException> exception;
  904. try
  905. {
  906. if (!doneInit)
  907. {
  908. doneInit = true;
  909. if (queryOwner())
  910. {
  911. if (isGlobal())
  912. {
  913. CMessageBuffer msg;
  914. if (!graphCancelHandler.recv(queryJobChannel().queryJobComm(), msg, 0, mpTag, NULL, LONGTIMEOUT))
  915. throw MakeStringException(0, "Error receiving createctx data for graph: %" GIDPF "d", graphId);
  916. try
  917. {
  918. size32_t len;
  919. msg.read(len);
  920. if (len)
  921. {
  922. MemoryBuffer initData;
  923. initData.append(len, msg.readDirect(len));
  924. deserializeCreateContexts(initData);
  925. }
  926. msg.clear();
  927. msg.append(false);
  928. }
  929. catch (IException *e)
  930. {
  931. msg.clear();
  932. msg.append(true);
  933. serializeThorException(e, msg);
  934. }
  935. if (!queryJobChannel().queryJobComm().send(msg, 0, msg.getReplyTag(), LONGTIMEOUT))
  936. throw MakeStringException(0, "Timeout sending init data back to master");
  937. }
  938. else
  939. {
  940. CMessageBuffer msg;
  941. msg.append(smt_initGraphReq);
  942. msg.append(graphId);
  943. if (!queryJobChannel().queryJobComm().sendRecv(msg, 0, queryJob().querySlaveMpTag(), LONGTIMEOUT))
  944. throwUnexpected();
  945. size32_t len;
  946. msg.read(len);
  947. if (len)
  948. deserializeCreateContexts(msg);
  949. // could still request 1 off, onCreate serialization from master 1st.
  950. }
  951. }
  952. connect(); // only now do slave acts. have all their outputs prepared.
  953. }
  954. CGraphBase::executeSubGraph(parentExtractSz, parentExtract);
  955. }
  956. catch (IException *e)
  957. {
  958. GraphPrintLog(e, "In executeSubGraph");
  959. exception.setown(e);
  960. }
  961. if (TAG_NULL != executeReplyTag)
  962. {
  963. CMessageBuffer msg;
  964. if (exception.get())
  965. {
  966. msg.append(true);
  967. serializeThorException(exception, msg);
  968. }
  969. else
  970. msg.append(false);
  971. queryJobChannel().queryJobComm().send(msg, 0, executeReplyTag, LONGTIMEOUT);
  972. }
  973. else if (exception)
  974. throw exception.getClear();
  975. }
  976. void CSlaveGraph::abort(IException *e)
  977. {
  978. if (!graphDone) // set pre done(), no need to abort if got that far.
  979. CGraphBase::abort(e);
  980. getDoneSem.signal();
  981. }
  982. void CSlaveGraph::done()
  983. {
  984. GraphPrintLog("End of sub-graph");
  985. {
  986. SpinBlock b(progressActiveLock);
  987. progressActive = false;
  988. progressToCollect = true; // NB: ensure collected after end of graph
  989. }
  990. if (!aborted && graphDone && (!queryOwner() || isGlobal()))
  991. getDoneSem.wait(); // must wait on master
  992. if (!queryOwner())
  993. {
  994. if (globals->getPropBool("@watchdogProgressEnabled"))
  995. jobS->queryProgressHandler()->stopGraph(*this, NULL);
  996. }
  997. Owned<IException> exception;
  998. try
  999. {
  1000. CGraphBase::done();
  1001. }
  1002. catch (IException *e)
  1003. {
  1004. GraphPrintLog(e, "In CSlaveGraph::done");
  1005. exception.setown(e);
  1006. }
  1007. if (exception.get())
  1008. throw LINK(exception.get());
  1009. }
  1010. bool CSlaveGraph::serializeStats(MemoryBuffer &mb)
  1011. {
  1012. unsigned beginPos = mb.length();
  1013. mb.append(queryGraphId());
  1014. unsigned cPos = mb.length();
  1015. unsigned count = 0;
  1016. mb.append(count);
  1017. CriticalBlock b(progressCrit);
  1018. // until started and activities initialized, activities are not ready to serlialize stats.
  1019. if ((started&&initialized) || 0 == activityCount())
  1020. {
  1021. bool collect=false;
  1022. {
  1023. SpinBlock b(progressActiveLock);
  1024. if (progressActive || progressToCollect)
  1025. {
  1026. progressToCollect = false;
  1027. collect = true;
  1028. }
  1029. }
  1030. if (collect)
  1031. {
  1032. unsigned sPos = mb.length();
  1033. Owned<IThorActivityIterator> iter = getConnectedIterator();
  1034. ForEach (*iter)
  1035. {
  1036. CGraphElementBase &element = iter->query();
  1037. CSlaveActivity &activity = (CSlaveActivity &)*element.queryActivity();
  1038. unsigned pos = mb.length();
  1039. mb.append(activity.queryContainer().queryId());
  1040. activity.serializeStats(mb);
  1041. if (pos == mb.length()-sizeof(activity_id))
  1042. mb.rewrite(pos);
  1043. else
  1044. ++count;
  1045. }
  1046. mb.writeDirect(cPos, sizeof(count), &count);
  1047. }
  1048. unsigned cqCountPos = mb.length();
  1049. unsigned cq=0;
  1050. mb.append(cq);
  1051. Owned<IThorGraphIterator> childIter = getChildGraphs();
  1052. ForEach(*childIter)
  1053. {
  1054. CSlaveGraph &graph = (CSlaveGraph &)childIter->query();
  1055. if (graph.serializeStats(mb))
  1056. ++cq;
  1057. }
  1058. if (count || cq)
  1059. {
  1060. mb.writeDirect(cqCountPos, sizeof(cq), &cq);
  1061. return true;
  1062. }
  1063. }
  1064. mb.rewrite(beginPos);
  1065. return false;
  1066. }
  1067. void CSlaveGraph::serializeDone(MemoryBuffer &mb)
  1068. {
  1069. mb.append(queryGraphId());
  1070. unsigned cPos = mb.length();
  1071. unsigned count=0;
  1072. mb.append(count);
  1073. Owned<IThorActivityIterator> iter = getConnectedIterator();
  1074. ForEach (*iter)
  1075. {
  1076. CGraphElementBase &element = iter->query();
  1077. if (element.queryActivity())
  1078. {
  1079. CSlaveActivity &activity = (CSlaveActivity &)*element.queryActivity();
  1080. unsigned rPos = mb.length();
  1081. mb.append(element.queryId());
  1082. unsigned nl=0;
  1083. mb.append(nl); // place holder for size of mb
  1084. unsigned l = mb.length();
  1085. activity.processDone(mb);
  1086. nl = mb.length()-l;
  1087. if (0 == nl)
  1088. mb.rewrite(rPos);
  1089. else
  1090. {
  1091. mb.writeDirect(l-sizeof(nl), sizeof(nl), &nl);
  1092. ++count;
  1093. }
  1094. }
  1095. }
  1096. mb.writeDirect(cPos, sizeof(count), &count);
  1097. }
  1098. void CSlaveGraph::getDone(MemoryBuffer &doneInfoMb)
  1099. {
  1100. if (!started) return;
  1101. GraphPrintLog("Entering getDone");
  1102. if (!queryOwner() || isGlobal())
  1103. {
  1104. try
  1105. {
  1106. serializeDone(doneInfoMb);
  1107. if (!queryOwner())
  1108. {
  1109. if (globals->getPropBool("@watchdogProgressEnabled"))
  1110. jobS->queryProgressHandler()->stopGraph(*this, &doneInfoMb);
  1111. }
  1112. doneInfoMb.append(job.queryMaxDiskUsage());
  1113. queryJobChannel().queryTimeReporter().serialize(doneInfoMb);
  1114. }
  1115. catch (IException *)
  1116. {
  1117. GraphPrintLog("Leaving getDone");
  1118. getDoneSem.signal();
  1119. throw;
  1120. }
  1121. }
  1122. GraphPrintLog("Leaving getDone");
  1123. getDoneSem.signal();
  1124. }
  1125. class CThorSlaveGraphResults : public CThorGraphResults
  1126. {
  1127. CSlaveGraph &graph;
  1128. IArrayOf<IThorResult> globalResults;
  1129. PointerArrayOf<CriticalSection> globalResultCrits;
  1130. void ensureAtLeastGlobals(unsigned id)
  1131. {
  1132. while (globalResults.ordinality() < id)
  1133. {
  1134. globalResults.append(*new CThorUninitializedGraphResults(globalResults.ordinality()));
  1135. globalResultCrits.append(new CriticalSection);
  1136. }
  1137. }
  1138. public:
  1139. CThorSlaveGraphResults(CSlaveGraph &_graph,unsigned numResults) : CThorGraphResults(numResults), graph(_graph)
  1140. {
  1141. }
  1142. ~CThorSlaveGraphResults()
  1143. {
  1144. clear();
  1145. }
  1146. virtual void clear()
  1147. {
  1148. CriticalBlock procedure(cs);
  1149. results.kill();
  1150. globalResults.kill();
  1151. ForEachItemIn(i, globalResultCrits)
  1152. delete globalResultCrits.item(i);
  1153. globalResultCrits.kill();
  1154. }
  1155. IThorResult *getResult(unsigned id, bool distributed)
  1156. {
  1157. Linked<IThorResult> result;
  1158. {
  1159. CriticalBlock procedure(cs);
  1160. ensureAtLeast(id+1);
  1161. result.set(&results.item(id));
  1162. if (!distributed || !result->isDistributed())
  1163. return result.getClear();
  1164. ensureAtLeastGlobals(id+1);
  1165. }
  1166. CriticalBlock b(*globalResultCrits.item(id)); // block other global requests for this result
  1167. IThorResult *globalResult = &globalResults.item(id);
  1168. if (!QUERYINTERFACE(globalResult, CThorUninitializedGraphResults))
  1169. return LINK(globalResult);
  1170. Owned<IThorResult> gr = graph.getGlobalResult(*result->queryActivity(), result->queryRowInterfaces(), ownerId, id);
  1171. globalResults.replace(*gr.getLink(), id);
  1172. return gr.getClear();
  1173. }
  1174. };
  1175. IThorGraphResults *CSlaveGraph::createThorGraphResults(unsigned num)
  1176. {
  1177. return new CThorSlaveGraphResults(*this, num);
  1178. }
  1179. IThorResult *CSlaveGraph::getGlobalResult(CActivityBase &activity, IThorRowInterfaces *rowIf, activity_id ownerId, unsigned id)
  1180. {
  1181. mptag_t replyTag = queryMPServer().createReplyTag();
  1182. CMessageBuffer msg;
  1183. msg.setReplyTag(replyTag);
  1184. msg.append(smt_getresult);
  1185. msg.append(queryJobChannel().queryMyRank()-1);
  1186. msg.append(graphId);
  1187. msg.append(ownerId);
  1188. msg.append(id);
  1189. msg.append(replyTag);
  1190. if (!queryJobChannel().queryJobComm().send(msg, 0, queryJob().querySlaveMpTag(), LONGTIMEOUT))
  1191. throwUnexpected();
  1192. Owned<IThorResult> result = ::createResult(activity, rowIf, false);
  1193. Owned<IRowWriter> resultWriter = result->getWriter();
  1194. MemoryBuffer mb;
  1195. Owned<ISerialStream> stream = createMemoryBufferSerialStream(mb);
  1196. CThorStreamDeserializerSource rowSource(stream);
  1197. loop
  1198. {
  1199. loop
  1200. {
  1201. if (activity.queryAbortSoon())
  1202. return NULL;
  1203. msg.clear();
  1204. if (activity.receiveMsg(msg, 0, replyTag, NULL, 60*1000))
  1205. break;
  1206. ActPrintLog(&activity, "WARNING: tag %d timedout, retrying", (unsigned)replyTag);
  1207. }
  1208. if (!msg.length())
  1209. break; // done
  1210. else
  1211. {
  1212. bool error;
  1213. msg.read(error);
  1214. if (error)
  1215. throw deserializeThorException(msg);
  1216. ThorExpand(msg, mb.clear());
  1217. while (!rowSource.eos())
  1218. {
  1219. RtlDynamicRowBuilder rowBuilder(rowIf->queryRowAllocator());
  1220. size32_t sz = rowIf->queryRowDeserializer()->deserialize(rowBuilder, rowSource);
  1221. resultWriter->putRow(rowBuilder.finalizeRowClear(sz));
  1222. }
  1223. }
  1224. }
  1225. return result.getClear();
  1226. }
  1227. ///////////////////////////
  1228. class CThorCodeContextSlave : public CThorCodeContextBase, implements IEngineContext
  1229. {
  1230. mptag_t mptag;
  1231. Owned<IDistributedFileTransaction> superfiletransaction;
  1232. void invalidSetResult(const char * name, unsigned seq)
  1233. {
  1234. throw MakeStringException(0, "Attempt to output result ('%s',%d) from a child query", name ? name : "", (int)seq);
  1235. }
  1236. public:
  1237. CThorCodeContextSlave(CJobChannel &jobChannel, ILoadedDllEntry &querySo, IUserDescriptor &userDesc, mptag_t _mptag) : CThorCodeContextBase(jobChannel, querySo, userDesc), mptag(_mptag)
  1238. {
  1239. }
  1240. virtual void setResultBool(const char *name, unsigned sequence, bool value) { invalidSetResult(name, sequence); }
  1241. virtual void setResultData(const char *name, unsigned sequence, int len, const void * data) { invalidSetResult(name, sequence); }
  1242. virtual void setResultDecimal(const char * stepname, unsigned sequence, int len, int precision, bool isSigned, const void *val) { invalidSetResult(stepname, sequence); }
  1243. virtual void setResultInt(const char *name, unsigned sequence, __int64 value, unsigned size) { invalidSetResult(name, sequence); }
  1244. virtual void setResultRaw(const char *name, unsigned sequence, int len, const void * data) { invalidSetResult(name, sequence); }
  1245. virtual void setResultReal(const char * stepname, unsigned sequence, double value) { invalidSetResult(stepname, sequence); }
  1246. virtual void setResultSet(const char *name, unsigned sequence, bool isAll, size32_t len, const void * data, ISetToXmlTransformer * transformer) { invalidSetResult(name, sequence); }
  1247. virtual void setResultString(const char *name, unsigned sequence, int len, const char * str) { invalidSetResult(name, sequence); }
  1248. virtual void setResultUInt(const char *name, unsigned sequence, unsigned __int64 value, unsigned size) { invalidSetResult(name, sequence); }
  1249. virtual void setResultUnicode(const char *name, unsigned sequence, int len, UChar const * str) { invalidSetResult(name, sequence); }
  1250. virtual void setResultVarString(const char * name, unsigned sequence, const char * value) { invalidSetResult(name, sequence); }
  1251. virtual void setResultVarUnicode(const char * name, unsigned sequence, UChar const * value) { invalidSetResult(name, sequence); }
  1252. virtual bool getResultBool(const char * name, unsigned sequence) { throwUnexpected(); }
  1253. virtual void getResultData(unsigned & tlen, void * & tgt, const char * name, unsigned sequence) { throwUnexpected(); }
  1254. virtual void getResultDecimal(unsigned tlen, int precision, bool isSigned, void * tgt, const char * stepname, unsigned sequence) { throwUnexpected(); }
  1255. virtual void getResultRaw(unsigned & tlen, void * & tgt, const char * name, unsigned sequence, IXmlToRowTransformer * xmlTransformer, ICsvToRowTransformer * csvTransformer) { throwUnexpected(); }
  1256. virtual void getResultSet(bool & isAll, size32_t & tlen, void * & tgt, const char * name, unsigned sequence, IXmlToRowTransformer * xmlTransformer, ICsvToRowTransformer * csvTransformer) { throwUnexpected(); }
  1257. virtual __int64 getResultInt(const char * name, unsigned sequence) { throwUnexpected(); }
  1258. virtual double getResultReal(const char * name, unsigned sequence) { throwUnexpected(); }
  1259. virtual void getResultString(unsigned & tlen, char * & tgt, const char * name, unsigned sequence) { throwUnexpected(); }
  1260. virtual void getResultUnicode(unsigned & tlen, UChar * & tgt, const char * name, unsigned sequence) { throwUnexpected(); }
  1261. virtual char *getResultVarString(const char * name, unsigned sequence) { throwUnexpected(); }
  1262. virtual UChar *getResultVarUnicode(const char * name, unsigned sequence) { throwUnexpected(); }
  1263. virtual unsigned getResultHash(const char * name, unsigned sequence) { throwUnexpected(); }
  1264. virtual void getExternalResultRaw(unsigned & tlen, void * & tgt, const char * wuid, const char * stepname, unsigned sequence, IXmlToRowTransformer * xmlTransformer, ICsvToRowTransformer * csvTransformer) { throwUnexpected(); }
  1265. virtual unsigned getExternalResultHash(const char * wuid, const char * name, unsigned sequence) { throwUnexpected(); }
  1266. virtual void addWuException(const char * text, unsigned code, unsigned severity, const char * source)
  1267. {
  1268. DBGLOG("%s", text);
  1269. Owned<IThorException> e = MakeThorException(code, "%s", text);
  1270. e->setOrigin(source);
  1271. e->setAction(tea_warning);
  1272. e->setSeverity((ErrorSeverity)severity);
  1273. jobChannel.fireException(e);
  1274. }
  1275. virtual unsigned getNodes() { return jobChannel.queryJob().querySlaves(); }
  1276. virtual unsigned getNodeNum() { return jobChannel.queryMyRank()-1; }
  1277. virtual char *getFilePart(const char *logicalName, bool create=false)
  1278. {
  1279. CMessageBuffer msg;
  1280. msg.append(smt_getPhysicalName);
  1281. msg.append(logicalName);
  1282. msg.append(getNodeNum());
  1283. msg.append(create);
  1284. if (!jobChannel.queryJobComm().sendRecv(msg, 0, mptag, LONGTIMEOUT))
  1285. throwUnexpected();
  1286. return (char *)msg.detach();
  1287. }
  1288. virtual unsigned __int64 getFileOffset(const char *logicalName)
  1289. {
  1290. CMessageBuffer msg;
  1291. msg.append(smt_getFileOffset);
  1292. if (!jobChannel.queryJobComm().sendRecv(msg, 0, mptag, LONGTIMEOUT))
  1293. throwUnexpected();
  1294. unsigned __int64 offset;
  1295. msg.read(offset);
  1296. return offset;
  1297. }
  1298. virtual IDistributedFileTransaction *querySuperFileTransaction()
  1299. {
  1300. // NB: shouldn't really have fileservice being called on slaves
  1301. if (!superfiletransaction.get())
  1302. superfiletransaction.setown(createDistributedFileTransaction(userDesc, this));
  1303. return superfiletransaction.get();
  1304. }
  1305. virtual void getResultStringF(unsigned tlen, char * tgt, const char * name, unsigned sequence) { throwUnexpected(); }
  1306. virtual void getResultRowset(size32_t & tcount, byte * * & tgt, const char * name, unsigned sequence, IEngineRowAllocator * _rowAllocator, bool isGrouped, IXmlToRowTransformer * xmlTransformer, ICsvToRowTransformer * csvTransformer) { throwUnexpected(); }
  1307. virtual void getResultDictionary(size32_t & tcount, byte * * & tgt, IEngineRowAllocator * _rowAllocator, const char * name, unsigned sequence, IXmlToRowTransformer * xmlTransformer, ICsvToRowTransformer * csvTransformer, IHThorHashLookupInfo * hasher) { throwUnexpected(); }
  1308. virtual void addWuAssertFailure(unsigned code, const char * text, const char * filename, unsigned lineno, unsigned column, bool isAbort)
  1309. {
  1310. DBGLOG("%s", text);
  1311. Owned<IThorException> e = MakeThorException(code, "%s", text);
  1312. e->setAssert(filename, lineno, column);
  1313. e->setOrigin("user");
  1314. e->setSeverity(SeverityError);
  1315. if (!isAbort)
  1316. e->setAction(tea_warning);
  1317. jobChannel.fireException(e);
  1318. }
  1319. virtual unsigned __int64 getDatasetHash(const char * name, unsigned __int64 hash) { throwUnexpected(); } // Should only call from master
  1320. virtual IEngineContext *queryEngineContext() { return this; }
  1321. // IEngineContext impl.
  1322. virtual DALI_UID getGlobalUniqueIds(unsigned num, SocketEndpoint *_foreignNode)
  1323. {
  1324. if (num==0)
  1325. return 0;
  1326. SocketEndpoint foreignNode;
  1327. if (_foreignNode && !_foreignNode->isNull())
  1328. foreignNode.set(*_foreignNode);
  1329. else
  1330. foreignNode.set(globals->queryProp("@DALISERVERS"));
  1331. return ::getGlobalUniqueIds(num, &foreignNode);
  1332. }
  1333. virtual bool allowDaliAccess() const
  1334. {
  1335. // NB. includes access to foreign Dalis.
  1336. return jobChannel.queryJob().getOptBool("slaveDaliClient");
  1337. }
  1338. };
  1339. class CThorCodeContextSlaveSharedMem : public CThorCodeContextSlave
  1340. {
  1341. IThorAllocator *sharedAllocator;
  1342. public:
  1343. CThorCodeContextSlaveSharedMem(CJobChannel &jobChannel, IThorAllocator *_sharedAllocator, ILoadedDllEntry &querySo, IUserDescriptor &userDesc, mptag_t mpTag)
  1344. : CThorCodeContextSlave(jobChannel, querySo, userDesc, mpTag)
  1345. {
  1346. sharedAllocator = _sharedAllocator;
  1347. }
  1348. virtual IEngineRowAllocator *getRowAllocator(IOutputMetaData * meta, unsigned activityId) const
  1349. {
  1350. return sharedAllocator->getRowAllocator(meta, activityId);
  1351. }
  1352. };
  1353. class CSlaveGraphTempHandler : public CGraphTempHandler
  1354. {
  1355. public:
  1356. CSlaveGraphTempHandler(CJobBase &job, bool errorOnMissing) : CGraphTempHandler(job, errorOnMissing)
  1357. {
  1358. }
  1359. virtual bool removeTemp(const char *name)
  1360. {
  1361. OwnedIFile ifile = createIFile(name);
  1362. return ifile->remove();
  1363. }
  1364. };
  1365. #define SLAVEGRAPHPOOLLIMIT 10
  1366. CJobSlave::CJobSlave(ISlaveWatchdog *_watchdog, IPropertyTree *_workUnitInfo, const char *graphName, ILoadedDllEntry *_querySo, mptag_t _mpJobTag, mptag_t _slavemptag) : CJobBase(_querySo, graphName), watchdog(_watchdog)
  1367. {
  1368. workUnitInfo.set(_workUnitInfo);
  1369. workUnitInfo->getProp("token", token);
  1370. workUnitInfo->getProp("user", user);
  1371. workUnitInfo->getProp("wuid", wuid);
  1372. workUnitInfo->getProp("scope", scope);
  1373. init();
  1374. oldNodeCacheMem = 0;
  1375. mpJobTag = _mpJobTag;
  1376. slavemptag = _slavemptag;
  1377. IPropertyTree *plugins = workUnitInfo->queryPropTree("plugins");
  1378. if (plugins)
  1379. {
  1380. StringBuffer pluginsDir, installDir, pluginsList;
  1381. globals->getProp("@INSTALL_DIR", installDir); // could use for socachedir also?
  1382. if (installDir.length())
  1383. addPathSepChar(installDir);
  1384. globals->getProp("@pluginsPath", pluginsDir);
  1385. if (pluginsDir.length())
  1386. {
  1387. if (!isAbsolutePath(pluginsDir.str())) // if !absolute, then make relative to installDir if is one (e.g. master mount)
  1388. {
  1389. if (installDir.length())
  1390. pluginsDir.insert(0, installDir.str());
  1391. }
  1392. addPathSepChar(pluginsDir);
  1393. }
  1394. Owned<IPropertyTreeIterator> pluginIter = plugins->getElements("plugin");
  1395. ForEach(*pluginIter)
  1396. {
  1397. StringBuffer pluginPath;
  1398. IPropertyTree &plugin = pluginIter->query();
  1399. pluginPath.append(pluginsDir).append(plugin.queryProp("@name"));
  1400. if (pluginsList.length())
  1401. pluginsList.append(ENVSEPCHAR);
  1402. pluginsList.append(pluginPath);
  1403. }
  1404. pluginMap->loadFromList(pluginsList.str());
  1405. }
  1406. tmpHandler.setown(createTempHandler(true));
  1407. sharedAllocator.setown(::createThorAllocator(globalMemoryMB, sharedMemoryMB, numChannels, memorySpillAtPercentage, *logctx, crcChecking, usePackedAllocator));
  1408. }
  1409. void CJobSlave::addChannel(IMPServer *mpServer)
  1410. {
  1411. unsigned nextChannelNum = jobChannels.ordinality();
  1412. CJobSlaveChannel *channel = new CJobSlaveChannel(*this, mpServer, nextChannelNum);
  1413. jobChannels.append(*channel);
  1414. unsigned slaveNum = channel->queryMyRank();
  1415. jobChannelSlaveNumbers[nextChannelNum] = slaveNum;
  1416. jobSlaveChannelNum[slaveNum-1] = nextChannelNum;
  1417. }
  1418. void CJobSlave::startJob()
  1419. {
  1420. CJobBase::startJob();
  1421. unsigned minFreeSpace = (unsigned)getWorkUnitValueInt("MINIMUM_DISK_SPACE", 0);
  1422. if (minFreeSpace)
  1423. {
  1424. unsigned __int64 freeSpace = getFreeSpace(queryBaseDirectory(grp_unknown, 0));
  1425. if (freeSpace < ((unsigned __int64)minFreeSpace)*0x100000)
  1426. {
  1427. SocketEndpoint ep;
  1428. ep.setLocalHost(0);
  1429. StringBuffer s;
  1430. throw MakeThorException(TE_NotEnoughFreeSpace, "Node %s has %u MB(s) of available disk space, specified minimum for this job: %u MB(s)", ep.getUrlStr(s).str(), (unsigned) freeSpace / 0x100000, minFreeSpace);
  1431. }
  1432. }
  1433. }
  1434. void CJobSlave::reportGraphEnd(graph_id gid)
  1435. {
  1436. if (nodesLoaded) // wouldn't mean much if parallel jobs running
  1437. PROGLOG("Graph[%" GIDPF "u] - JHTree node stats:\ncacheAdds=%d\ncacheHits=%d\nnodesLoaded=%d\nblobCacheHits=%d\nblobCacheAdds=%d\nleafCacheHits=%d\nleafCacheAdds=%d\nnodeCacheHits=%d\nnodeCacheAdds=%d\n", gid, cacheAdds.load(), cacheHits.load(), nodesLoaded.load(), blobCacheHits.load(), blobCacheAdds.load(), leafCacheHits.load(), leafCacheAdds.load(), nodeCacheHits.load(), nodeCacheAdds.load());
  1438. JSocketStatistics stats;
  1439. getSocketStatistics(stats);
  1440. StringBuffer s;
  1441. getSocketStatisticsString(stats,s);
  1442. PROGLOG("Graph[%" GIDPF "u] - Socket statistics : %s\n", gid, s.str());
  1443. resetSocketStatistics();
  1444. }
  1445. __int64 CJobSlave::getWorkUnitValueInt(const char *prop, __int64 defVal) const
  1446. {
  1447. StringBuffer propName(prop);
  1448. return workUnitInfo->queryPropTree("Debug")->getPropInt64(propName.toLowerCase().str(), defVal);
  1449. }
  1450. StringBuffer &CJobSlave::getWorkUnitValue(const char *prop, StringBuffer &str) const
  1451. {
  1452. StringBuffer propName(prop);
  1453. workUnitInfo->queryPropTree("Debug")->getProp(propName.toLowerCase().str(), str);
  1454. return str;
  1455. }
  1456. bool CJobSlave::getWorkUnitValueBool(const char *prop, bool defVal) const
  1457. {
  1458. StringBuffer propName(prop);
  1459. return workUnitInfo->queryPropTree("Debug")->getPropBool(propName.toLowerCase().str(), defVal);
  1460. }
  1461. void CJobSlave::debugRequest(MemoryBuffer &msg, const char *request) const
  1462. {
  1463. if (watchdog) watchdog->debugRequest(msg, request);
  1464. }
  1465. IGraphTempHandler *CJobSlave::createTempHandler(bool errorOnMissing)
  1466. {
  1467. return new CSlaveGraphTempHandler(*this, errorOnMissing);
  1468. }
  1469. mptag_t CJobSlave::deserializeMPTag(MemoryBuffer &mb)
  1470. {
  1471. mptag_t tag;
  1472. deserializeMPtag(mb, tag);
  1473. if (TAG_NULL != tag)
  1474. {
  1475. PROGLOG("CJobSlave::deserializeMPTag: tag = %d", (int)tag);
  1476. for (unsigned c=0; c<queryJobChannels(); c++)
  1477. queryJobChannel(c).queryJobComm().flush(tag);
  1478. }
  1479. return tag;
  1480. }
  1481. IThorAllocator *CJobSlave::getThorAllocator(unsigned channel)
  1482. {
  1483. if (1 == numChannels)
  1484. return CJobBase::getThorAllocator(channel);
  1485. else
  1486. return sharedAllocator->getSlaveAllocator(channel);
  1487. }
  1488. // IGraphCallback
  1489. CJobSlaveChannel::CJobSlaveChannel(CJobBase &_job, IMPServer *mpServer, unsigned channel) : CJobChannel(_job, mpServer, channel)
  1490. {
  1491. codeCtx.setown(new CThorCodeContextSlave(*this, job.queryDllEntry(), *job.queryUserDescriptor(), job.querySlaveMpTag()));
  1492. sharedMemCodeCtx.setown(new CThorCodeContextSlaveSharedMem(*this, job.querySharedAllocator(), job.queryDllEntry(), *job.queryUserDescriptor(), job.querySlaveMpTag()));
  1493. }
  1494. IBarrier *CJobSlaveChannel::createBarrier(mptag_t tag)
  1495. {
  1496. return new CBarrierSlave(*this, *jobComm, tag);
  1497. }
  1498. void CJobSlaveChannel::runSubgraph(CGraphBase &graph, size32_t parentExtractSz, const byte *parentExtract)
  1499. {
  1500. if (!graph.queryOwner())
  1501. CJobChannel::runSubgraph(graph, parentExtractSz, parentExtract);
  1502. else
  1503. graph.doExecuteChild(parentExtractSz, parentExtract);
  1504. CriticalBlock b(graphRunCrit);
  1505. if (!graph.queryOwner())
  1506. removeSubGraph(graph);
  1507. }
  1508. ///////////////
  1509. bool ensurePrimary(CActivityBase *activity, IPartDescriptor &partDesc, OwnedIFile & ifile, unsigned &location, StringBuffer &path)
  1510. {
  1511. StringBuffer locationName, primaryName;
  1512. RemoteFilename primaryRfn;
  1513. partDesc.getFilename(0, primaryRfn);
  1514. primaryRfn.getPath(primaryName);
  1515. OwnedIFile primaryIFile = createIFile(primaryName.str());
  1516. try
  1517. {
  1518. if (primaryIFile->exists())
  1519. {
  1520. location = 0;
  1521. ifile.set(primaryIFile);
  1522. path.append(primaryName);
  1523. return true;
  1524. }
  1525. }
  1526. catch (IException *e)
  1527. {
  1528. ActPrintLog(&activity->queryContainer(), e, "In ensurePrimary");
  1529. e->Release();
  1530. }
  1531. unsigned l;
  1532. for (l=1; l<partDesc.numCopies(); l++)
  1533. {
  1534. RemoteFilename altRfn;
  1535. partDesc.getFilename(l, altRfn);
  1536. locationName.clear();
  1537. altRfn.getPath(locationName);
  1538. assertex(locationName.length());
  1539. OwnedIFile backupIFile = createIFile(locationName.str());
  1540. try
  1541. {
  1542. if (backupIFile->exists())
  1543. {
  1544. if (primaryRfn.isLocal())
  1545. {
  1546. ensureDirectoryForFile(primaryIFile->queryFilename());
  1547. Owned<IException> e = MakeActivityWarning(activity, 0, "Primary file missing: %s, copying backup %s to primary location", primaryIFile->queryFilename(), locationName.str());
  1548. activity->fireException(e);
  1549. StringBuffer tmpName(primaryIFile->queryFilename());
  1550. tmpName.append(".tmp");
  1551. OwnedIFile tmpFile = createIFile(tmpName.str());
  1552. CFIPScope fipScope(tmpName.str());
  1553. copyFile(tmpFile, backupIFile);
  1554. try
  1555. {
  1556. tmpFile->rename(pathTail(primaryIFile->queryFilename()));
  1557. location = 0;
  1558. ifile.set(primaryIFile);
  1559. path.append(primaryName);
  1560. }
  1561. catch (IException *e)
  1562. {
  1563. try { tmpFile->remove(); } catch (IException *e) { ActPrintLog(&activity->queryContainer(), "Failed to delete temporary file"); e->Release(); }
  1564. Owned<IException> e2 = MakeActivityWarning(activity, e, "Failed to restore primary, failed to rename %s to %s", tmpName.str(), primaryIFile->queryFilename());
  1565. e->Release();
  1566. activity->fireException(e2);
  1567. ifile.set(backupIFile);
  1568. location = l;
  1569. path.append(locationName);
  1570. }
  1571. }
  1572. else // JCSMORE - should use daliservix perhaps to ensure primary
  1573. {
  1574. Owned<IException> e = MakeActivityWarning(activity, 0, "Primary file missing: %s, using remote copy: %s", primaryIFile->queryFilename(), locationName.str());
  1575. activity->fireException(e);
  1576. ifile.set(backupIFile);
  1577. location = l;
  1578. path.append(locationName);
  1579. }
  1580. return true;
  1581. }
  1582. }
  1583. catch (IException *e)
  1584. {
  1585. Owned<IThorException> e2 = MakeActivityException(activity, e);
  1586. e->Release();
  1587. throw e2.getClear();
  1588. }
  1589. }
  1590. return false;
  1591. }
  1592. class CEnsurePrimaryPartFile : public CInterface, implements IReplicatedFile
  1593. {
  1594. CActivityBase &activity;
  1595. Linked<IPartDescriptor> partDesc;
  1596. StringAttr logicalFilename;
  1597. Owned<IReplicatedFile> part;
  1598. public:
  1599. IMPLEMENT_IINTERFACE;
  1600. CEnsurePrimaryPartFile(CActivityBase &_activity, const char *_logicalFilename, IPartDescriptor *_partDesc)
  1601. : activity(_activity), logicalFilename(_logicalFilename), partDesc(_partDesc)
  1602. {
  1603. }
  1604. virtual IFile *open()
  1605. {
  1606. unsigned location;
  1607. OwnedIFile iFile;
  1608. StringBuffer filePath;
  1609. if (globals->getPropBool("@autoCopyBackup", true)?ensurePrimary(&activity, *partDesc, iFile, location, filePath):getBestFilePart(&activity, *partDesc, iFile, location, filePath, &activity))
  1610. return iFile.getClear();
  1611. else
  1612. {
  1613. StringBuffer locations;
  1614. IException *e = MakeActivityException(&activity, TE_FileNotFound, "No physical file part for logical file %s, found at given locations: %s (Error = %d)", logicalFilename.get(), getFilePartLocations(*partDesc, locations).str(), GetLastError());
  1615. EXCLOG(e, NULL);
  1616. throw e;
  1617. }
  1618. }
  1619. RemoteFilenameArray &queryCopies()
  1620. {
  1621. if(!part.get())
  1622. part.setown(partDesc->getReplicatedFile());
  1623. return part->queryCopies();
  1624. }
  1625. };
  1626. IReplicatedFile *createEnsurePrimaryPartFile(CActivityBase &activity, const char *logicalFilename, IPartDescriptor *partDesc)
  1627. {
  1628. return new CEnsurePrimaryPartFile(activity, logicalFilename, partDesc);
  1629. }
  1630. ///////////////
  1631. class CFileCache;
  1632. class CLazyFileIO : public CInterface, implements IFileIO, implements IDelayedFile
  1633. {
  1634. CFileCache &cache;
  1635. Owned<IReplicatedFile> repFile;
  1636. Linked<IExpander> expander;
  1637. bool compressed;
  1638. StringAttr filename;
  1639. CRuntimeStatisticCollection fileStats;
  1640. CriticalSection crit;
  1641. Owned<IFileIO> iFileIO; // real IFileIO
  1642. void checkOpen(); // references CFileCache method
  1643. public:
  1644. IMPLEMENT_IINTERFACE;
  1645. CLazyFileIO(CFileCache &_cache, const char *_filename, IReplicatedFile *_repFile, bool _compressed, IExpander *_expander)
  1646. : cache(_cache), filename(_filename), repFile(_repFile), compressed(_compressed), expander(_expander), fileStats(diskLocalStatistics)
  1647. {
  1648. }
  1649. ~CLazyFileIO()
  1650. {
  1651. iFileIO.clear();
  1652. }
  1653. const char *queryFindString() const { return filename.get(); } // for string HT
  1654. // IFileIO impl.
  1655. virtual size32_t read(offset_t pos, size32_t len, void * data)
  1656. {
  1657. CriticalBlock b(crit);
  1658. checkOpen();
  1659. return iFileIO->read(pos, len, data);
  1660. }
  1661. virtual offset_t size()
  1662. {
  1663. CriticalBlock b(crit);
  1664. checkOpen();
  1665. return iFileIO->size();
  1666. }
  1667. virtual size32_t write(offset_t pos, size32_t len, const void * data)
  1668. {
  1669. CriticalBlock b(crit);
  1670. checkOpen();
  1671. return iFileIO->write(pos, len, data);
  1672. }
  1673. virtual void flush()
  1674. {
  1675. CriticalBlock b(crit);
  1676. if (iFileIO)
  1677. iFileIO->flush();
  1678. }
  1679. virtual void close()
  1680. {
  1681. CriticalBlock b(crit);
  1682. if (iFileIO)
  1683. {
  1684. mergeStats(fileStats, iFileIO);
  1685. iFileIO->close();
  1686. }
  1687. iFileIO.clear();
  1688. }
  1689. virtual offset_t appendFile(IFile *file,offset_t pos=0,offset_t len=(offset_t)-1)
  1690. {
  1691. CriticalBlock b(crit);
  1692. checkOpen();
  1693. return iFileIO->appendFile(file, pos, len);
  1694. }
  1695. virtual void setSize(offset_t size)
  1696. {
  1697. CriticalBlock b(crit);
  1698. checkOpen();
  1699. iFileIO->setSize(size);
  1700. }
  1701. virtual unsigned __int64 getStatistic(StatisticKind kind)
  1702. {
  1703. switch (kind)
  1704. {
  1705. case StTimeDiskReadIO:
  1706. return cycle_to_nanosec(getStatistic(StCycleDiskReadIOCycles));
  1707. case StTimeDiskWriteIO:
  1708. return cycle_to_nanosec(getStatistic(StCycleDiskWriteIOCycles));
  1709. }
  1710. CriticalBlock b(crit);
  1711. unsigned __int64 openValue = iFileIO ? iFileIO->getStatistic(kind) : 0;
  1712. return openValue + fileStats.getStatisticValue(kind);
  1713. }
  1714. // IDelayedFile impl.
  1715. virtual IMemoryMappedFile *queryMappedFile() { return NULL; }
  1716. virtual IFileIO *queryFileIO() { return this; }
  1717. };
  1718. class CFileCache : public CInterface, implements IThorFileCache
  1719. {
  1720. OwningStringSuperHashTableOf<CLazyFileIO> files;
  1721. CICopyArrayOf<CLazyFileIO> openFiles;
  1722. unsigned limit, purgeN;
  1723. CriticalSection crit;
  1724. class CDelayedFileWapper : public CInterface, implements IDelayedFile
  1725. {
  1726. CFileCache &cache;
  1727. Linked<CLazyFileIO> lFile;
  1728. public:
  1729. IMPLEMENT_IINTERFACE;
  1730. CDelayedFileWapper(CFileCache &_cache, CLazyFileIO &_lFile) : cache(_cache), lFile(&_lFile) { }
  1731. ~CDelayedFileWapper()
  1732. {
  1733. cache.remove(*lFile);
  1734. }
  1735. // IDelayedFile impl.
  1736. virtual IMemoryMappedFile *queryMappedFile() { return lFile->queryMappedFile(); }
  1737. virtual IFileIO *queryFileIO() { return lFile->queryFileIO(); }
  1738. };
  1739. void purgeOldest()
  1740. {
  1741. // will be ordered oldest first.
  1742. unsigned count = 0;
  1743. CICopyArrayOf<CLazyFileIO> toClose;
  1744. ForEachItemIn(o, openFiles)
  1745. {
  1746. CLazyFileIO &lFile = openFiles.item(o);
  1747. toClose.append(lFile);
  1748. if (++count>=purgeN) // crude for now, just remove oldest N
  1749. break;
  1750. }
  1751. ForEachItemIn(r, toClose)
  1752. {
  1753. CLazyFileIO &lFile = toClose.item(r);
  1754. lFile.close();
  1755. openFiles.zap(lFile);
  1756. }
  1757. }
  1758. bool _remove(CLazyFileIO &lFile)
  1759. {
  1760. bool ret = files.removeExact(&lFile);
  1761. if (!ret) return false;
  1762. openFiles.zap(lFile);
  1763. return true;
  1764. }
  1765. public:
  1766. IMPLEMENT_IINTERFACE;
  1767. CFileCache(unsigned _limit) : limit(_limit)
  1768. {
  1769. assertex(limit);
  1770. purgeN = globals->getPropInt("@fileCachePurgeN", 10);
  1771. if (purgeN > limit) purgeN=limit; // why would it be, but JIC.
  1772. PROGLOG("FileCache: limit = %d, purgeN = %d", limit, purgeN);
  1773. }
  1774. void opening(CLazyFileIO &lFile)
  1775. {
  1776. CriticalBlock b(crit);
  1777. if (openFiles.ordinality() >= limit)
  1778. {
  1779. purgeOldest(); // will close purgeN
  1780. assertex(openFiles.ordinality() < limit);
  1781. }
  1782. openFiles.zap(lFile);
  1783. openFiles.append(lFile);
  1784. }
  1785. // IThorFileCache impl.
  1786. virtual bool remove(IDelayedFile &dFile)
  1787. {
  1788. CLazyFileIO *lFile = QUERYINTERFACE(&dFile, CLazyFileIO);
  1789. assertex(lFile);
  1790. CriticalBlock b(crit);
  1791. return _remove(*lFile);
  1792. }
  1793. virtual IDelayedFile *lookup(CActivityBase &activity, IPartDescriptor &partDesc, IExpander *expander)
  1794. {
  1795. StringBuffer filename;
  1796. RemoteFilename rfn;
  1797. partDesc.getFilename(0, rfn);
  1798. rfn.getPath(filename);
  1799. CriticalBlock b(crit);
  1800. Linked<CLazyFileIO> file = files.find(filename.str());
  1801. if (!file)
  1802. {
  1803. Owned<IReplicatedFile> repFile = createEnsurePrimaryPartFile(activity, filename.str(), &partDesc);
  1804. bool compressed = partDesc.queryOwner().isCompressed();
  1805. file.setown(new CLazyFileIO(*this, filename.str(), repFile.getClear(), compressed, expander));
  1806. }
  1807. files.replace(*LINK(file));
  1808. return new CDelayedFileWapper(*this, *file); // to avoid circular dependency and allow destruction to remove from cache
  1809. }
  1810. };
  1811. ////
  1812. void CLazyFileIO::checkOpen()
  1813. {
  1814. CriticalBlock b(crit);
  1815. if (iFileIO)
  1816. return;
  1817. cache.opening(*this);
  1818. Owned<IFile> iFile = repFile->open();
  1819. if (NULL != expander.get())
  1820. iFileIO.setown(createCompressedFileReader(iFile, expander));
  1821. else if (compressed)
  1822. iFileIO.setown(createCompressedFileReader(iFile));
  1823. else
  1824. iFileIO.setown(iFile->open(IFOread));
  1825. if (!iFileIO.get())
  1826. throw MakeThorException(0, "CLazyFileIO: failed to open: %s", filename.get());
  1827. }
  1828. IThorFileCache *createFileCache(unsigned limit)
  1829. {
  1830. return new CFileCache(limit);
  1831. }
  1832. /*
  1833. * strand stuff
  1834. */
  1835. IEngineRowStream *connectSingleStream(CActivityBase &activity, IThorDataLink *input, unsigned idx, Owned<IStrandJunction> &junction, bool consumerOrdered)
  1836. {
  1837. if (input)
  1838. {
  1839. PointerArrayOf<IEngineRowStream> instreams;
  1840. junction.setown(input->getOutputStreams(activity, idx, instreams, nullptr, consumerOrdered, nullptr));
  1841. if (instreams.length() != 1)
  1842. {
  1843. assertex(instreams.length());
  1844. if (!junction)
  1845. junction.setown(createStrandJunction(*activity.queryRowManager(), instreams.length(), 1, activity.getOptInt("strandBlockSize"), false));
  1846. ForEachItemIn(stream, instreams)
  1847. {
  1848. junction->setInput(stream, instreams.item(stream));
  1849. }
  1850. return junction->queryOutput(0);
  1851. }
  1852. else
  1853. return instreams.item(0);
  1854. }
  1855. else
  1856. return nullptr;
  1857. }
  1858. IEngineRowStream *connectSingleStream(CActivityBase &activity, IThorDataLink *input, unsigned idx, bool consumerOrdered)
  1859. {
  1860. Owned<IStrandJunction> junction;
  1861. IEngineRowStream * result = connectSingleStream(activity, input, idx, junction, consumerOrdered);
  1862. assertex(!junction);
  1863. return result;
  1864. }