thgraphslave.cpp 51 KB


  1. /*##############################################################################
  2. HPCC SYSTEMS software Copyright (C) 2012 HPCC Systems.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. ############################################################################## */
  13. #include "jlib.hpp"
  14. #include "jlzw.hpp"
  15. #include "jhtree.hpp"
  16. #include "daclient.hpp"
  17. #include "commonext.hpp"
  18. #include "thorplugin.hpp"
  19. #include "thcodectx.hpp"
  20. #include "thmem.hpp"
  21. #include "thorport.hpp"
  22. #include "slwatchdog.hpp"
  23. #include "thgraphslave.hpp"
  24. #include "thcompressutil.hpp"
  25. #include "enginecontext.hpp"
  26. //////////////////////////////////
  27. class CBarrierSlave : public CInterface, implements IBarrier
  28. {
  29. mptag_t tag;
  30. Linked<ICommunicator> comm;
  31. bool receiving;
  32. public:
  33. IMPLEMENT_IINTERFACE;
  34. CBarrierSlave(ICommunicator &_comm, mptag_t _tag) : comm(&_comm), tag(_tag)
  35. {
  36. receiving = false;
  37. }
  38. virtual bool wait(bool exception, unsigned timeout)
  39. {
  40. CTimeMon tm(timeout);
  41. unsigned remaining = timeout;
  42. CMessageBuffer msg;
  43. msg.append(false);
  44. if (INFINITE != timeout && tm.timedout(&remaining))
  45. {
  46. if (exception)
  47. throw createBarrierAbortException();
  48. else
  49. return false;
  50. }
  51. if (!comm->send(msg, 0, tag, INFINITE != timeout ? remaining : LONGTIMEOUT))
  52. throw MakeStringException(0, "CBarrierSlave::wait - Timeout sending to master");
  53. msg.clear();
  54. if (INFINITE != timeout && tm.timedout(&remaining))
  55. {
  56. if (exception)
  57. throw createBarrierAbortException();
  58. else
  59. return false;
  60. }
  61. {
  62. BooleanOnOff onOff(receiving);
  63. if (!comm->recv(msg, 0, tag, NULL, remaining))
  64. return false;
  65. }
  66. bool aborted;
  67. msg.read(aborted);
  68. if (aborted)
  69. {
  70. if (exception)
  71. throw createBarrierAbortException();
  72. else
  73. return false;
  74. }
  75. return true;
  76. }
  77. virtual void cancel()
  78. {
  79. if (receiving)
  80. comm->cancel(comm->queryGroup().rank(), tag);
  81. CMessageBuffer msg;
  82. msg.append(true);
  83. if (!comm->send(msg, 0, tag, LONGTIMEOUT))
  84. throw MakeStringException(0, "CBarrierSlave::cancel - Timeout sending to master");
  85. }
  86. virtual const mptag_t queryTag() const { return tag; }
  87. };
  88. //
  89. CSlaveActivity::CSlaveActivity(CGraphElementBase *_container) : CActivityBase(_container)
  90. {
  91. data = NULL;
  92. totalCycles = 0;
  93. }
  94. CSlaveActivity::~CSlaveActivity()
  95. {
  96. inputs.kill();
  97. outputs.kill();
  98. if (data) delete [] data;
  99. ActPrintLog("DESTROYED");
  100. }
  101. void CSlaveActivity::setInput(unsigned index, CActivityBase *inputActivity, unsigned inputOutIdx)
  102. {
  103. CActivityBase::setInput(index, inputActivity, inputOutIdx);
  104. Linked<IThorDataLink> outLink;
  105. if (!inputActivity)
  106. {
  107. Owned<CActivityBase> nullAct = container.factory(TAKnull);
  108. outLink.set(((CSlaveActivity *)(nullAct.get()))->queryOutput(0)); // NB inputOutIdx irrelevant, null has single 'fake' output
  109. nullAct->releaseIOs(); // normally done as graph winds up, clear now to avoid circular dependencies with outputs
  110. }
  111. else
  112. outLink.set(((CSlaveActivity *)inputActivity)->queryOutput(inputOutIdx));
  113. assertex(outLink);
  114. while (inputs.ordinality()<=index) inputs.append(NULL);
  115. inputs.replace(outLink.getClear(), index);
  116. }
  117. IThorDataLink *CSlaveActivity::queryOutput(unsigned index)
  118. {
  119. if (index>=outputs.ordinality()) return NULL;
  120. return outputs.item(index);
  121. }
  122. IThorDataLink *CSlaveActivity::queryInput(unsigned index)
  123. {
  124. if (index>=inputs.ordinality()) return NULL;
  125. return inputs.item(index);
  126. }
  127. void CSlaveActivity::startInput(IThorDataLink *itdl, const char *extra)
  128. {
  129. StringBuffer s("Starting input");
  130. if (extra)
  131. s.append(" ").append(extra);
  132. ActPrintLog("%s", s.str());
  133. #ifdef TRACE_STARTSTOP_EXCEPTIONS
  134. try
  135. {
  136. itdl->start();
  137. }
  138. catch(IException *e)
  139. {
  140. ActPrintLog(e, "%s", s.str());
  141. throw;
  142. }
  143. #else
  144. itdl->start();
  145. #endif
  146. }
  147. void CSlaveActivity::stopInput(IRowStream *itdl, const char *extra)
  148. {
  149. StringBuffer s("Stopping input for");
  150. if (extra)
  151. s.append(" ").append(extra);
  152. ActPrintLog("%s", s.str());
  153. #ifdef TRACE_STARTSTOP_EXCEPTIONS
  154. try
  155. {
  156. itdl->stop();
  157. }
  158. catch(IException * e)
  159. {
  160. ActPrintLog(e, "%s", s.str());
  161. throw;
  162. }
  163. #else
  164. itdl->stop();
  165. #endif
  166. }
  167. void CSlaveActivity::abort()
  168. {
  169. CActivityBase::abort();
  170. CriticalBlock b(crit);
  171. ForEachItemIn(o, outputs)
  172. {
  173. StringBuffer msg("--------> ");
  174. msg.append("GraphId = ").append(container.queryOwner().queryGraphId());
  175. msg.append(" ActivityId = ").append(container.queryId());
  176. msg.append(" OutputId = ").append(o);
  177. MemoryBuffer mb;
  178. outputs.item(o)->dataLinkSerialize(mb); // JCSMORE should add direct method
  179. rowcount_t count;
  180. mb.read(count);
  181. msg.append(": Count = ").append(count);
  182. }
  183. }
  184. void CSlaveActivity::releaseIOs()
  185. {
  186. // inputs.kill(); // don't want inputs to die before this dies (release in deconstructor) // JCSMORE not sure why care particularly.
  187. outputs.kill(); // outputs tend to be self-references, this clears them explicitly, otherwise end up leaking with circular references.
  188. }
  189. void CSlaveActivity::clearConnections()
  190. {
  191. inputs.kill();
  192. }
  193. MemoryBuffer &CSlaveActivity::queryInitializationData(unsigned slave) const
  194. {
  195. CriticalBlock b(crit);
  196. if (!data)
  197. data = new MemoryBuffer[container.queryJob().querySlaves()];
  198. CMessageBuffer msg;
  199. graph_id gid = queryContainer().queryOwner().queryGraphId();
  200. msg.append(smt_dataReq);
  201. msg.append(slave);
  202. msg.append(gid);
  203. msg.append(container.queryId());
  204. if (!container.queryJob().queryJobComm().sendRecv(msg, 0, queryContainer().queryJob().querySlaveMpTag(), LONGTIMEOUT))
  205. throwUnexpected();
  206. data[slave].swapWith(msg);
  207. return data[slave];
  208. }
  209. MemoryBuffer &CSlaveActivity::getInitializationData(unsigned slave, MemoryBuffer &mb) const
  210. {
  211. return mb.append(queryInitializationData(slave));
  212. }
  213. unsigned __int64 CSlaveActivity::queryLocalCycles() const
  214. {
  215. unsigned __int64 inputCycles = 0;
  216. if (1 == inputs.ordinality())
  217. {
  218. IThorDataLink *input = inputs.item(0);
  219. inputCycles += input->queryTotalCycles();
  220. }
  221. else
  222. {
  223. switch (container.getKind())
  224. {
  225. case TAKchildif:
  226. case TAKchildcase:
  227. if (inputs.ordinality() && (((unsigned)-1) != container.whichBranch))
  228. {
  229. IThorDataLink *input = inputs.item(container.whichBranch);
  230. if (input)
  231. inputCycles += input->queryTotalCycles();
  232. }
  233. break;
  234. default:
  235. ForEachItemIn(i, inputs)
  236. {
  237. IThorDataLink *input = inputs.item(i);
  238. inputCycles += input->queryTotalCycles();
  239. }
  240. break;
  241. }
  242. }
  243. unsigned __int64 _totalCycles = queryTotalCycles();
  244. if (_totalCycles < inputCycles) // not sure how/if possible, but guard against
  245. return 0;
  246. return _totalCycles-inputCycles;
  247. }
  248. unsigned __int64 CSlaveActivity::queryTotalCycles() const
  249. {
  250. return totalCycles;
  251. }
  252. void CSlaveActivity::serializeStats(MemoryBuffer &mb)
  253. {
  254. CriticalBlock b(crit);
  255. mb.append((unsigned __int64)cycle_to_nanosec(queryLocalCycles()));
  256. ForEachItemIn(i, outputs)
  257. outputs.item(i)->dataLinkSerialize(mb);
  258. }
  259. ///
  260. // CSlaveGraph
  261. CSlaveGraph::CSlaveGraph(CJobSlave &job) : CGraphBase(job), jobS(job)
  262. {
  263. }
  264. void CSlaveGraph::init(MemoryBuffer &mb)
  265. {
  266. mb.read(reinit);
  267. mpTag = job.deserializeMPTag(mb);
  268. startBarrierTag = job.deserializeMPTag(mb);
  269. waitBarrierTag = job.deserializeMPTag(mb);
  270. doneBarrierTag = job.deserializeMPTag(mb);
  271. startBarrier = job.createBarrier(startBarrierTag);
  272. waitBarrier = job.createBarrier(waitBarrierTag);
  273. if (doneBarrierTag != TAG_NULL)
  274. doneBarrier = job.createBarrier(doneBarrierTag);
  275. initialized = false;
  276. progressActive = progressToCollect = false;
  277. unsigned subCount;
  278. mb.read(subCount);
  279. while (subCount--)
  280. {
  281. graph_id gid;
  282. mb.read(gid);
  283. Owned<CSlaveGraph> subGraph = (CSlaveGraph *)job.getGraph(gid);
  284. subGraph->init(mb);
  285. }
  286. }
  287. void CSlaveGraph::initWithActData(MemoryBuffer &in, MemoryBuffer &out)
  288. {
  289. CriticalBlock b(progressCrit);
  290. initialized = true;
  291. if (0 == in.length())
  292. return;
  293. activity_id id;
  294. loop
  295. {
  296. in.read(id);
  297. if (0 == id) break;
  298. CSlaveGraphElement *element = (CSlaveGraphElement *)queryElement(id);
  299. assertex(element);
  300. out.append(id);
  301. out.append((size32_t)0);
  302. unsigned l = out.length();
  303. size32_t sz;
  304. in.read(sz);
  305. unsigned aread = in.getPos();
  306. CSlaveActivity *activity = (CSlaveActivity *)element->queryActivity();
  307. if (activity)
  308. {
  309. element->sentActInitData->set(0);
  310. activity->init(in, out);
  311. }
  312. aread = in.getPos()-aread;
  313. if (aread<sz)
  314. {
  315. Owned<IException> e = MakeActivityException(element, TE_SeriailzationError, "Serialization error - activity did not read all serialized data (%d byte(s) remaining)", sz-aread);
  316. in.readDirect(sz-aread);
  317. throw e.getClear();
  318. }
  319. else if (aread>sz)
  320. throw MakeActivityException(element, TE_SeriailzationError, "Serialization error - activity read beyond serialized data (%d byte(s))", aread-sz);
  321. size32_t dl = out.length() - l;
  322. if (dl)
  323. out.writeDirect(l-sizeof(size32_t), sizeof(size32_t), &dl);
  324. else
  325. out.setLength(l-(sizeof(activity_id)+sizeof(size32_t)));
  326. }
  327. out.append((activity_id)0);
  328. }
  329. void CSlaveGraph::recvStartCtx()
  330. {
  331. if (!sentStartCtx)
  332. {
  333. sentStartCtx = true;
  334. CMessageBuffer msg;
  335. if (!graphCancelHandler.recv(queryJob().queryJobComm(), msg, 0, mpTag, NULL, LONGTIMEOUT))
  336. throw MakeStringException(0, "Error receiving startCtx data for graph: %"GIDPF"d", graphId);
  337. deserializeStartContexts(msg);
  338. }
  339. }
  340. bool CSlaveGraph::recvActivityInitData(size32_t parentExtractSz, const byte *parentExtract)
  341. {
  342. bool ret = true;
  343. unsigned needActInit = 0;
  344. Owned<IThorActivityIterator> iter = getTraverseIterator();
  345. ForEach(*iter)
  346. {
  347. CGraphElementBase &element = (CGraphElementBase &)iter->query();
  348. CActivityBase *activity = element.queryActivity();
  349. if (activity && activity->needReInit())
  350. element.sentActInitData->set(0, false); // force act init to be resent
  351. if (!element.sentActInitData->test(0))
  352. ++needActInit;
  353. }
  354. if (needActInit)
  355. {
  356. mptag_t replyTag = TAG_NULL;
  357. size32_t len;
  358. CMessageBuffer actInitRtnData;
  359. actInitRtnData.append(false);
  360. CMessageBuffer msg;
  361. if (syncInitData())
  362. {
  363. if (!graphCancelHandler.recv(queryJob().queryJobComm(), msg, 0, mpTag, NULL, LONGTIMEOUT))
  364. throw MakeStringException(0, "Error receiving actinit data for graph: %"GIDPF"d", graphId);
  365. replyTag = msg.getReplyTag();
  366. msg.read(len);
  367. }
  368. else
  369. {
  370. // initialize any for which no data was sent
  371. msg.append(smt_initActDataReq); // may cause graph to be created at master
  372. msg.append(queryGraphId());
  373. assertex(!parentExtractSz || NULL!=parentExtract);
  374. msg.append(parentExtractSz);
  375. msg.append(parentExtractSz, parentExtract);
  376. Owned<IThorActivityIterator> iter = getTraverseIterator();
  377. ForEach(*iter)
  378. {
  379. CSlaveGraphElement &element = (CSlaveGraphElement &)iter->query();
  380. if (!element.sentActInitData->test(0))
  381. {
  382. msg.append(element.queryId());
  383. element.serializeStartContext(msg);
  384. }
  385. }
  386. msg.append((activity_id)0);
  387. if (!queryJob().queryJobComm().sendRecv(msg, 0, queryJob().querySlaveMpTag(), LONGTIMEOUT))
  388. throwUnexpected();
  389. replyTag = job.deserializeMPTag(msg);
  390. msg.read(len);
  391. }
  392. try
  393. {
  394. MemoryBuffer actInitData;
  395. if (len)
  396. actInitData.append(len, msg.readDirect(len));
  397. initWithActData(actInitData, actInitRtnData);
  398. if (queryOwner() && !isGlobal())
  399. {
  400. // initialize any for which no data was sent
  401. Owned<IThorActivityIterator> iter = getTraverseIterator();
  402. ForEach(*iter)
  403. {
  404. CSlaveGraphElement &element = (CSlaveGraphElement &)iter->query();
  405. if (!element.sentActInitData->test(0))
  406. {
  407. element.sentActInitData->set(0);
  408. CSlaveActivity *activity = (CSlaveActivity *)element.queryActivity();
  409. if (activity)
  410. {
  411. MemoryBuffer in, out;
  412. activity->init(in, out);
  413. assertex(0 == out.length());
  414. }
  415. }
  416. }
  417. }
  418. }
  419. catch (IException *e)
  420. {
  421. actInitRtnData.clear();
  422. actInitRtnData.append(true);
  423. serializeThorException(e, actInitRtnData);
  424. e->Release();
  425. ret = false;
  426. }
  427. if (!job.queryJobComm().send(actInitRtnData, 0, replyTag, LONGTIMEOUT))
  428. throw MakeStringException(0, "Timeout sending init data back to master");
  429. }
  430. return ret;
  431. }
  432. bool CSlaveGraph::preStart(size32_t parentExtractSz, const byte *parentExtract)
  433. {
  434. started = true;
  435. recvStartCtx();
  436. CGraphBase::preStart(parentExtractSz, parentExtract);
  437. if (!recvActivityInitData(parentExtractSz, parentExtract))
  438. return false;
  439. connect(); // only now do slave acts. have all their outputs prepared.
  440. if (isGlobal())
  441. {
  442. if (!startBarrier->wait(false))
  443. return false;
  444. }
  445. return true;
  446. }
  447. void CSlaveGraph::start()
  448. {
  449. {
  450. SpinBlock b(progressActiveLock);
  451. progressActive = true;
  452. progressToCollect = true;
  453. }
  454. bool forceAsync = !queryOwner() || isGlobal();
  455. Owned<IThorActivityIterator> iter = getSinkIterator();
  456. unsigned sinks = 0;
  457. ForEach(*iter)
  458. ++sinks;
  459. ForEach(*iter)
  460. {
  461. CGraphElementBase &container = iter->query();
  462. CActivityBase *sinkAct = (CActivityBase *)container.queryActivity();
  463. --sinks;
  464. sinkAct->startProcess(forceAsync || 0 != sinks); // async, unless last
  465. }
  466. if (!queryOwner())
  467. {
  468. if (globals->getPropBool("@watchdogProgressEnabled"))
  469. jobS.queryProgressHandler()->startGraph(*this);
  470. }
  471. }
  472. void CSlaveGraph::connect()
  473. {
  474. CriticalBlock b(progressCrit);
  475. Owned<IThorActivityIterator> iter = getTraverseIterator();
  476. ForEach(*iter)
  477. iter->query().doconnect();
  478. }
  479. void CSlaveGraph::executeSubGraph(size32_t parentExtractSz, const byte *parentExtract)
  480. {
  481. if (isComplete())
  482. return;
  483. Owned<IException> exception;
  484. try
  485. {
  486. CGraphBase::executeSubGraph(parentExtractSz, parentExtract);
  487. }
  488. catch (IException *e)
  489. {
  490. GraphPrintLog(e, "In executeSubGraph");
  491. exception.setown(e);
  492. }
  493. if (TAG_NULL != executeReplyTag)
  494. {
  495. CMessageBuffer msg;
  496. if (exception.get())
  497. {
  498. msg.append(true);
  499. serializeThorException(exception, msg);
  500. }
  501. else
  502. msg.append(false);
  503. queryJob().queryJobComm().send(msg, 0, executeReplyTag, LONGTIMEOUT);
  504. }
  505. else if (exception)
  506. throw exception.getClear();
  507. }
  508. void CSlaveGraph::create(size32_t parentExtractSz, const byte *parentExtract)
  509. {
  510. CriticalBlock b(progressCrit);
  511. if (queryOwner())
  512. {
  513. if (isGlobal())
  514. {
  515. CMessageBuffer msg;
  516. // nothing changed if rerunning, unless conditional branches different
  517. if (!graphCancelHandler.recv(queryJob().queryJobComm(), msg, 0, mpTag, NULL, LONGTIMEOUT))
  518. throw MakeStringException(0, "Error receiving createctx data for graph: %"GIDPF"d", graphId);
  519. try
  520. {
  521. size32_t len;
  522. msg.read(len);
  523. if (len)
  524. {
  525. MemoryBuffer initData;
  526. initData.append(len, msg.readDirect(len));
  527. deserializeCreateContexts(initData);
  528. }
  529. msg.clear();
  530. msg.append(false);
  531. }
  532. catch (IException *e)
  533. {
  534. msg.clear();
  535. msg.append(true);
  536. serializeThorException(e, msg);
  537. }
  538. if (!job.queryJobComm().send(msg, 0, msg.getReplyTag(), LONGTIMEOUT))
  539. throw MakeStringException(0, "Timeout sending init data back to master");
  540. }
  541. else
  542. {
  543. ForEachItemIn(i, ifs)
  544. {
  545. CGraphElementBase &ifElem = ifs.item(i);
  546. if (ifElem.newWhichBranch)
  547. {
  548. ifElem.newWhichBranch = false;
  549. sentInitData = false; // force re-request of create data.
  550. break;
  551. }
  552. }
  553. if ((reinit || !sentInitData))
  554. {
  555. sentInitData = true;
  556. CMessageBuffer msg;
  557. msg.append(smt_initGraphReq);
  558. msg.append(graphId);
  559. if (!queryJob().queryJobComm().sendRecv(msg, 0, queryJob().querySlaveMpTag(), LONGTIMEOUT))
  560. throwUnexpected();
  561. size32_t len;
  562. msg.read(len);
  563. if (len)
  564. deserializeCreateContexts(msg);
  565. // could still request 1 off, onCreate serialization from master 1st.
  566. CGraphBase::create(parentExtractSz, parentExtract);
  567. return;
  568. }
  569. }
  570. }
  571. CGraphBase::create(parentExtractSz, parentExtract);
  572. }
  573. void CSlaveGraph::abort(IException *e)
  574. {
  575. if (!graphDone) // set pre done(), no need to abort if got that far.
  576. CGraphBase::abort(e);
  577. getDoneSem.signal();
  578. }
  579. void CSlaveGraph::done()
  580. {
  581. GraphPrintLog("End of sub-graph");
  582. {
  583. SpinBlock b(progressActiveLock);
  584. progressActive = false;
  585. progressToCollect = true; // NB: ensure collected after end of graph
  586. }
  587. if (!aborted && graphDone && (!queryOwner() || isGlobal()))
  588. getDoneSem.wait(); // must wait on master
  589. if (!queryOwner())
  590. {
  591. if (globals->getPropBool("@watchdogProgressEnabled"))
  592. jobS.queryProgressHandler()->stopGraph(*this, NULL);
  593. }
  594. Owned<IException> exception;
  595. try
  596. {
  597. CGraphBase::done();
  598. }
  599. catch (IException *e)
  600. {
  601. GraphPrintLog(e, "In CSlaveGraph::done");
  602. exception.setown(e);
  603. }
  604. if (exception.get())
  605. throw LINK(exception.get());
  606. }
  607. void CSlaveGraph::end()
  608. {
  609. CGraphBase::end();
  610. if (!queryOwner())
  611. {
  612. if (atomic_read(&nodesLoaded)) // wouldn't mean much if parallel jobs running
  613. GraphPrintLog("JHTree node stats:\ncacheAdds=%d\ncacheHits=%d\nnodesLoaded=%d\nblobCacheHits=%d\nblobCacheAdds=%d\nleafCacheHits=%d\nleafCacheAdds=%d\nnodeCacheHits=%d\nnodeCacheAdds=%d\n", atomic_read(&cacheAdds), atomic_read(&cacheHits), atomic_read(&nodesLoaded), atomic_read(&blobCacheHits), atomic_read(&blobCacheAdds), atomic_read(&leafCacheHits), atomic_read(&leafCacheAdds), atomic_read(&nodeCacheHits), atomic_read(&nodeCacheAdds));
  614. JSocketStatistics stats;
  615. getSocketStatistics(stats);
  616. StringBuffer s;
  617. getSocketStatisticsString(stats,s);
  618. GraphPrintLog("Socket statistics : %s\n",s.str());
  619. resetSocketStatistics();
  620. }
  621. }
  622. bool CSlaveGraph::serializeStats(MemoryBuffer &mb)
  623. {
  624. unsigned beginPos = mb.length();
  625. mb.append(queryGraphId());
  626. unsigned cPos = mb.length();
  627. unsigned count = 0;
  628. mb.append(count);
  629. CriticalBlock b(progressCrit);
  630. // until started and activities initialized, activities are not ready to serlialize stats.
  631. if ((started&&initialized) || 0 == activityCount())
  632. {
  633. bool collect=false;
  634. {
  635. SpinBlock b(progressActiveLock);
  636. if (progressActive || progressToCollect)
  637. {
  638. progressToCollect = false;
  639. collect = true;
  640. }
  641. }
  642. if (collect)
  643. {
  644. unsigned sPos = mb.length();
  645. Owned<IThorActivityIterator> iter = getTraverseIterator();
  646. ForEach (*iter)
  647. {
  648. CGraphElementBase &element = iter->query();
  649. CSlaveActivity &activity = (CSlaveActivity &)*element.queryActivity();
  650. unsigned pos = mb.length();
  651. mb.append(activity.queryContainer().queryId());
  652. activity.serializeStats(mb);
  653. if (pos == mb.length()-sizeof(activity_id))
  654. mb.rewrite(pos);
  655. else
  656. ++count;
  657. }
  658. mb.writeDirect(cPos, sizeof(count), &count);
  659. }
  660. unsigned cqCountPos = mb.length();
  661. unsigned cq=0;
  662. mb.append(cq);
  663. Owned<IThorGraphIterator> childIter = getChildGraphs();
  664. ForEach(*childIter)
  665. {
  666. CSlaveGraph &graph = (CSlaveGraph &)childIter->query();
  667. if (graph.serializeStats(mb))
  668. ++cq;
  669. }
  670. if (count || cq)
  671. {
  672. mb.writeDirect(cqCountPos, sizeof(cq), &cq);
  673. return true;
  674. }
  675. }
  676. mb.rewrite(beginPos);
  677. return false;
  678. }
  679. void CSlaveGraph::serializeDone(MemoryBuffer &mb)
  680. {
  681. mb.append(queryGraphId());
  682. unsigned cPos = mb.length();
  683. unsigned count=0;
  684. mb.append(count);
  685. Owned<IThorActivityIterator> iter = getTraverseIterator();
  686. ForEach (*iter)
  687. {
  688. CGraphElementBase &element = iter->query();
  689. if (element.queryActivity())
  690. {
  691. CSlaveActivity &activity = (CSlaveActivity &)*element.queryActivity();
  692. unsigned rPos = mb.length();
  693. mb.append(element.queryId());
  694. unsigned nl=0;
  695. mb.append(nl); // place holder for size of mb
  696. unsigned l = mb.length();
  697. activity.processDone(mb);
  698. nl = mb.length()-l;
  699. if (0 == nl)
  700. mb.rewrite(rPos);
  701. else
  702. {
  703. mb.writeDirect(l-sizeof(nl), sizeof(nl), &nl);
  704. ++count;
  705. }
  706. }
  707. }
  708. mb.writeDirect(cPos, sizeof(count), &count);
  709. }
  710. void CSlaveGraph::getDone(MemoryBuffer &doneInfoMb)
  711. {
  712. if (!started) return;
  713. GraphPrintLog("Entering getDone");
  714. if (!queryOwner() || isGlobal())
  715. {
  716. try
  717. {
  718. serializeDone(doneInfoMb);
  719. if (!queryOwner())
  720. {
  721. if (globals->getPropBool("@watchdogProgressEnabled"))
  722. jobS.queryProgressHandler()->stopGraph(*this, &doneInfoMb);
  723. }
  724. doneInfoMb.append(job.queryMaxDiskUsage());
  725. queryJob().queryTimeReporter().serialize(doneInfoMb);
  726. }
  727. catch (IException *)
  728. {
  729. GraphPrintLog("Leaving getDone");
  730. getDoneSem.signal();
  731. throw;
  732. }
  733. }
  734. GraphPrintLog("Leaving getDone");
  735. getDoneSem.signal();
  736. }
  737. class CThorSlaveGraphResults : public CThorGraphResults
  738. {
  739. CSlaveGraph &graph;
  740. IArrayOf<IThorResult> globalResults;
  741. PointerArrayOf<CriticalSection> globalResultCrits;
  742. void ensureAtLeastGlobals(unsigned id)
  743. {
  744. while (globalResults.ordinality() < id)
  745. {
  746. globalResults.append(*new CThorUninitializedGraphResults(globalResults.ordinality()));
  747. globalResultCrits.append(new CriticalSection);
  748. }
  749. }
  750. public:
  751. CThorSlaveGraphResults(CSlaveGraph &_graph,unsigned numResults) : CThorGraphResults(numResults), graph(_graph)
  752. {
  753. }
  754. ~CThorSlaveGraphResults()
  755. {
  756. clear();
  757. }
  758. virtual void clear()
  759. {
  760. CriticalBlock procedure(cs);
  761. results.kill();
  762. globalResults.kill();
  763. ForEachItemIn(i, globalResultCrits)
  764. delete globalResultCrits.item(i);
  765. globalResultCrits.kill();
  766. }
  767. IThorResult *getResult(unsigned id, bool distributed)
  768. {
  769. Linked<IThorResult> result;
  770. {
  771. CriticalBlock procedure(cs);
  772. ensureAtLeast(id+1);
  773. result.set(&results.item(id));
  774. if (!distributed || !result->isDistributed())
  775. return result.getClear();
  776. ensureAtLeastGlobals(id+1);
  777. }
  778. CriticalBlock b(*globalResultCrits.item(id)); // block other global requests for this result
  779. IThorResult *globalResult = &globalResults.item(id);
  780. if (!QUERYINTERFACE(globalResult, CThorUninitializedGraphResults))
  781. return LINK(globalResult);
  782. Owned<IThorResult> gr = graph.getGlobalResult(*result->queryActivity(), result->queryRowInterfaces(), ownerId, id);
  783. globalResults.replace(*gr.getLink(), id);
  784. return gr.getClear();
  785. }
  786. };
  787. IThorGraphResults *CSlaveGraph::createThorGraphResults(unsigned num)
  788. {
  789. return new CThorSlaveGraphResults(*this, num);
  790. }
  791. IThorResult *CSlaveGraph::getGlobalResult(CActivityBase &activity, IRowInterfaces *rowIf, activity_id ownerId, unsigned id)
  792. {
  793. mptag_t replyTag = createReplyTag();
  794. CMessageBuffer msg;
  795. msg.setReplyTag(replyTag);
  796. msg.append(smt_getresult);
  797. msg.append(graphId);
  798. msg.append(ownerId);
  799. msg.append(id);
  800. msg.append(replyTag);
  801. if (!queryJob().queryJobComm().send(msg, 0, queryJob().querySlaveMpTag(), LONGTIMEOUT))
  802. throwUnexpected();
  803. Owned<IThorResult> result = ::createResult(activity, rowIf, false);
  804. Owned<IRowWriter> resultWriter = result->getWriter();
  805. MemoryBuffer mb;
  806. Owned<ISerialStream> stream = createMemoryBufferSerialStream(mb);
  807. CThorStreamDeserializerSource rowSource(stream);
  808. loop
  809. {
  810. loop
  811. {
  812. if (activity.queryAbortSoon())
  813. return NULL;
  814. msg.clear();
  815. if (activity.receiveMsg(msg, 0, replyTag, NULL, 60*1000))
  816. break;
  817. ActPrintLog(&activity, "WARNING: tag %d timedout, retrying", (unsigned)replyTag);
  818. }
  819. if (!msg.length())
  820. break; // done
  821. else
  822. {
  823. bool error;
  824. msg.read(error);
  825. if (error)
  826. throw deserializeThorException(msg);
  827. ThorExpand(msg, mb.clear());
  828. while (!rowSource.eos())
  829. {
  830. RtlDynamicRowBuilder rowBuilder(rowIf->queryRowAllocator());
  831. size32_t sz = rowIf->queryRowDeserializer()->deserialize(rowBuilder, rowSource);
  832. resultWriter->putRow(rowBuilder.finalizeRowClear(sz));
  833. }
  834. }
  835. }
  836. return result.getClear();
  837. }
  838. ///////////////////////////
  839. class CThorCodeContextSlave : public CThorCodeContextBase, implements IEngineContext
  840. {
  841. mptag_t mptag;
  842. Owned<IDistributedFileTransaction> superfiletransaction;
  843. public:
  844. CThorCodeContextSlave(CJobBase &job, ILoadedDllEntry &querySo, IUserDescriptor &userDesc, mptag_t _mptag) : CThorCodeContextBase(job, querySo, userDesc), mptag(_mptag)
  845. {
  846. }
  847. virtual void setResultBool(const char *name, unsigned sequence, bool value) { throwUnexpected(); }
  848. virtual void setResultData(const char *name, unsigned sequence, int len, const void * data) { throwUnexpected(); }
  849. virtual void setResultDecimal(const char * stepname, unsigned sequence, int len, int precision, bool isSigned, const void *val) { throwUnexpected(); }
  850. virtual void setResultInt(const char *name, unsigned sequence, __int64 value) { throwUnexpected(); }
  851. virtual void setResultRaw(const char *name, unsigned sequence, int len, const void * data) { throwUnexpected(); }
  852. virtual void setResultReal(const char * stepname, unsigned sequence, double value) { throwUnexpected(); }
  853. virtual void setResultSet(const char *name, unsigned sequence, bool isAll, size32_t len, const void * data, ISetToXmlTransformer * transformer) { throwUnexpected(); }
  854. virtual void setResultString(const char *name, unsigned sequence, int len, const char * str) { throwUnexpected(); }
  855. virtual void setResultUInt(const char *name, unsigned sequence, unsigned __int64 value) { throwUnexpected(); }
  856. virtual void setResultUnicode(const char *name, unsigned sequence, int len, UChar const * str) { throwUnexpected(); }
  857. virtual void setResultVarString(const char * name, unsigned sequence, const char * value) { throwUnexpected(); }
  858. virtual void setResultVarUnicode(const char * name, unsigned sequence, UChar const * value) { throwUnexpected(); }
  859. virtual bool getResultBool(const char * name, unsigned sequence) { throwUnexpected(); }
  860. virtual void getResultData(unsigned & tlen, void * & tgt, const char * name, unsigned sequence) { throwUnexpected(); }
  861. virtual void getResultDecimal(unsigned tlen, int precision, bool isSigned, void * tgt, const char * stepname, unsigned sequence) { throwUnexpected(); }
  862. virtual void getResultRaw(unsigned & tlen, void * & tgt, const char * name, unsigned sequence, IXmlToRowTransformer * xmlTransformer, ICsvToRowTransformer * csvTransformer) { throwUnexpected(); }
  863. virtual void getResultSet(bool & isAll, size32_t & tlen, void * & tgt, const char * name, unsigned sequence, IXmlToRowTransformer * xmlTransformer, ICsvToRowTransformer * csvTransformer) { throwUnexpected(); }
  864. virtual __int64 getResultInt(const char * name, unsigned sequence) { throwUnexpected(); }
  865. virtual double getResultReal(const char * name, unsigned sequence) { throwUnexpected(); }
  866. virtual void getResultString(unsigned & tlen, char * & tgt, const char * name, unsigned sequence) { throwUnexpected(); }
  867. virtual void getResultUnicode(unsigned & tlen, UChar * & tgt, const char * name, unsigned sequence) { throwUnexpected(); }
  868. virtual char *getResultVarString(const char * name, unsigned sequence) { throwUnexpected(); }
  869. virtual UChar *getResultVarUnicode(const char * name, unsigned sequence) { throwUnexpected(); }
  870. virtual unsigned getResultHash(const char * name, unsigned sequence) { throwUnexpected(); }
  871. virtual void getExternalResultRaw(unsigned & tlen, void * & tgt, const char * wuid, const char * stepname, unsigned sequence, IXmlToRowTransformer * xmlTransformer, ICsvToRowTransformer * csvTransformer) { throwUnexpected(); }
  872. virtual void addWuException(const char * text, unsigned code, unsigned severity)
  873. {
  874. DBGLOG("%s", text);
  875. Owned<IThorException> e = MakeThorException(code, "%s", text);
  876. e->setAction(tea_warning);
  877. e->setOrigin("user");
  878. e->setAction(tea_warning);
  879. e->setSeverity((WUExceptionSeverity)severity);
  880. job.fireException(e);
  881. }
  882. virtual unsigned getNodes() { return job.queryJobGroup().ordinality()-1; }
  883. virtual unsigned getNodeNum() { return job.queryMyRank()-1; }
  884. virtual char *getFilePart(const char *logicalName, bool create=false)
  885. {
  886. CMessageBuffer msg;
  887. msg.append(smt_getPhysicalName);
  888. msg.append(logicalName);
  889. msg.append(getNodeNum());
  890. msg.append(create);
  891. if (!job.queryJobComm().sendRecv(msg, 0, mptag, LONGTIMEOUT))
  892. throwUnexpected();
  893. return (char *)msg.detach();
  894. }
  895. virtual unsigned __int64 getFileOffset(const char *logicalName)
  896. {
  897. CMessageBuffer msg;
  898. msg.append(smt_getFileOffset);
  899. if (!job.queryJobComm().sendRecv(msg, 0, mptag, LONGTIMEOUT))
  900. throwUnexpected();
  901. unsigned __int64 offset;
  902. msg.read(offset);
  903. return offset;
  904. }
  905. virtual IDistributedFileTransaction *querySuperFileTransaction()
  906. {
  907. // NB: shouldn't really have fileservice being called on slaves
  908. if (!superfiletransaction.get())
  909. superfiletransaction.setown(createDistributedFileTransaction(userDesc));
  910. return superfiletransaction.get();
  911. }
  912. virtual void getResultStringF(unsigned tlen, char * tgt, const char * name, unsigned sequence) { throwUnexpected(); }
  913. virtual void getResultRowset(size32_t & tcount, byte * * & tgt, const char * name, unsigned sequence, IEngineRowAllocator * _rowAllocator, bool isGrouped, IXmlToRowTransformer * xmlTransformer, ICsvToRowTransformer * csvTransformer) { throwUnexpected(); }
  914. virtual void getResultDictionary(size32_t & tcount, byte * * & tgt, IEngineRowAllocator * _rowAllocator, const char * name, unsigned sequence, IXmlToRowTransformer * xmlTransformer, ICsvToRowTransformer * csvTransformer, IHThorHashLookupInfo * hasher) { throwUnexpected(); }
  915. virtual void addWuAssertFailure(unsigned code, const char * text, const char * filename, unsigned lineno, unsigned column, bool isAbort)
  916. {
  917. DBGLOG("%s", text);
  918. Owned<IThorException> e = MakeThorException(code, "%s", text);
  919. e->setAssert(filename, lineno, column);
  920. e->setOrigin("user");
  921. e->setSeverity(ExceptionSeverityError);
  922. if (!isAbort)
  923. e->setAction(tea_warning);
  924. job.fireException(e);
  925. }
  926. virtual unsigned __int64 getDatasetHash(const char * name, unsigned __int64 hash) { throwUnexpected(); } // Should only call from master
  927. virtual IEngineContext *queryEngineContext() { return this; }
  928. // IEngineContext impl.
  929. virtual DALI_UID getGlobalUniqueIds(unsigned num, SocketEndpoint *_foreignNode)
  930. {
  931. if (num==0)
  932. return 0;
  933. SocketEndpoint foreignNode;
  934. if (_foreignNode && !_foreignNode->isNull())
  935. foreignNode.set(*_foreignNode);
  936. else
  937. foreignNode.set(globals->queryProp("@DALISERVERS"));
  938. return ::getGlobalUniqueIds(num, &foreignNode);
  939. }
  940. };
  941. class CSlaveGraphTempHandler : public CGraphTempHandler
  942. {
  943. public:
  944. CSlaveGraphTempHandler(CJobBase &job, bool errorOnMissing) : CGraphTempHandler(job, errorOnMissing)
  945. {
  946. }
  947. virtual bool removeTemp(const char *name)
  948. {
  949. OwnedIFile ifile = createIFile(name);
  950. return ifile->remove();
  951. }
  952. };
  953. #define SLAVEGRAPHPOOLLIMIT 10
  954. CJobSlave::CJobSlave(ISlaveWatchdog *_watchdog, IPropertyTree *_workUnitInfo, const char *graphName, const char *_querySo, mptag_t _mpJobTag, mptag_t _slavemptag) : CJobBase(graphName), watchdog(_watchdog)
  955. {
  956. workUnitInfo.set(_workUnitInfo);
  957. workUnitInfo->getProp("token", token);
  958. workUnitInfo->getProp("user", user);
  959. workUnitInfo->getProp("wuid", wuid);
  960. workUnitInfo->getProp("scope", scope);
  961. init();
  962. oldNodeCacheMem = 0;
  963. mpJobTag = _mpJobTag;
  964. slavemptag = _slavemptag;
  965. IPropertyTree *plugins = workUnitInfo->queryPropTree("plugins");
  966. if (plugins)
  967. {
  968. StringBuffer pluginsDir, installDir, pluginsList;
  969. globals->getProp("@INSTALL_DIR", installDir); // could use for socachedir also?
  970. if (installDir.length())
  971. addPathSepChar(installDir);
  972. globals->getProp("@pluginsPath", pluginsDir);
  973. if (pluginsDir.length())
  974. {
  975. if (!isAbsolutePath(pluginsDir.str())) // if !absolute, then make relative to installDir if is one (e.g. master mount)
  976. {
  977. if (installDir.length())
  978. pluginsDir.insert(0, installDir.str());
  979. }
  980. addPathSepChar(pluginsDir);
  981. }
  982. Owned<IPropertyTreeIterator> pluginIter = plugins->getElements("plugin");
  983. ForEach(*pluginIter)
  984. {
  985. StringBuffer pluginPath;
  986. IPropertyTree &plugin = pluginIter->query();
  987. pluginPath.append(pluginsDir).append(plugin.queryProp("@name"));
  988. if (pluginsList.length())
  989. pluginsList.append(ENVSEPCHAR);
  990. pluginsList.append(pluginPath);
  991. }
  992. pluginMap->loadFromList(pluginsList.str());
  993. }
  994. #ifdef __linux__
  995. // only relevant if dllsToSlaves=false and query_so_dir was fully qualified remote path (e.g. //<ip>/path/file
  996. RemoteFilename rfn;
  997. rfn.setRemotePath(_querySo);
  998. StringBuffer tempSo;
  999. if (!rfn.isLocal())
  1000. {
  1001. WARNLOG("Cannot load shared object directly from remote path, creating temporary local copy: %s", _querySo);
  1002. GetTempName(tempSo,"so",true);
  1003. copyFile(tempSo.str(), _querySo);
  1004. _querySo = tempSo.str();
  1005. }
  1006. #endif
  1007. querySo.setown(createDllEntry(_querySo, false, NULL));
  1008. codeCtx = new CThorCodeContextSlave(*this, *querySo, *userDesc, slavemptag);
  1009. tmpHandler.setown(createTempHandler(true));
  1010. startJob();
  1011. }
  1012. CJobSlave::~CJobSlave()
  1013. {
  1014. graphExecutor->wait();
  1015. endJob();
  1016. }
  1017. void CJobSlave::startJob()
  1018. {
  1019. LOG(MCdebugProgress, thorJob, "New Graph started : %s", graphName.get());
  1020. ClearTempDirs();
  1021. unsigned pinterval = globals->getPropInt("@system_monitor_interval",1000*60);
  1022. if (pinterval)
  1023. startPerformanceMonitor(pinterval);
  1024. if (pinterval)
  1025. {
  1026. perfmonhook.setown(createThorMemStatsPerfMonHook());
  1027. startPerformanceMonitor(pinterval,PerfMonStandard,perfmonhook);
  1028. }
  1029. PrintMemoryStatusLog();
  1030. logDiskSpace();
  1031. unsigned minFreeSpace = (unsigned)getWorkUnitValueInt("MINIMUM_DISK_SPACE", 0);
  1032. if (minFreeSpace)
  1033. {
  1034. unsigned __int64 freeSpace = getFreeSpace(queryBaseDirectory(grp_unknown, 0));
  1035. if (freeSpace < ((unsigned __int64)minFreeSpace)*0x100000)
  1036. {
  1037. SocketEndpoint ep;
  1038. ep.setLocalHost(0);
  1039. StringBuffer s;
  1040. throw MakeThorException(TE_NotEnoughFreeSpace, "Node %s has %u MB(s) of available disk space, specified minimum for this job: %u MB(s)", ep.getUrlStr(s).str(), (unsigned) freeSpace / 0x100000, minFreeSpace);
  1041. }
  1042. }
  1043. unsigned keyNodeCacheMB = (unsigned)getWorkUnitValueInt("keyNodeCacheMB", 0);
  1044. if (keyNodeCacheMB)
  1045. {
  1046. oldNodeCacheMem = setNodeCacheMem(keyNodeCacheMB * 0x100000);
  1047. PROGLOG("Key node cache size set to: %d MB", keyNodeCacheMB);
  1048. }
  1049. unsigned keyFileCacheLimit = (unsigned)getWorkUnitValueInt("keyFileCacheLimit", 0);
  1050. if (!keyFileCacheLimit)
  1051. keyFileCacheLimit = (querySlaves()+1)*2;
  1052. setKeyIndexCacheSize(keyFileCacheLimit);
  1053. PROGLOG("Key file cache size set to: %d", keyFileCacheLimit);
  1054. }
  1055. void CJobSlave::endJob()
  1056. {
  1057. stopPerformanceMonitor();
  1058. LOG(MCdebugProgress, thorJob, "Job ended : %s", graphName.get());
  1059. clearKeyStoreCache(true);
  1060. if (oldNodeCacheMem)
  1061. setNodeCacheMem(oldNodeCacheMem);
  1062. PrintMemoryStatusLog();
  1063. }
  1064. __int64 CJobSlave::getWorkUnitValueInt(const char *prop, __int64 defVal) const
  1065. {
  1066. StringBuffer propName(prop);
  1067. return workUnitInfo->queryPropTree("Debug")->getPropInt64(propName.toLowerCase().str(), defVal);
  1068. }
  1069. StringBuffer &CJobSlave::getWorkUnitValue(const char *prop, StringBuffer &str) const
  1070. {
  1071. StringBuffer propName(prop);
  1072. workUnitInfo->queryPropTree("Debug")->getProp(propName.toLowerCase().str(), str);
  1073. return str;
  1074. }
  1075. bool CJobSlave::getWorkUnitValueBool(const char *prop, bool defVal) const
  1076. {
  1077. StringBuffer propName(prop);
  1078. return workUnitInfo->queryPropTree("Debug")->getPropBool(propName.toLowerCase().str(), defVal);
  1079. }
  1080. IBarrier *CJobSlave::createBarrier(mptag_t tag)
  1081. {
  1082. return new CBarrierSlave(*jobComm, tag);
  1083. }
  1084. IGraphTempHandler *CJobSlave::createTempHandler(bool errorOnMissing)
  1085. {
  1086. return new CSlaveGraphTempHandler(*this, errorOnMissing);
  1087. }
  1088. // IGraphCallback
  1089. void CJobSlave::runSubgraph(CGraphBase &graph, size32_t parentExtractSz, const byte *parentExtract)
  1090. {
  1091. if (!graph.queryOwner())
  1092. CJobBase::runSubgraph(graph, parentExtractSz, parentExtract);
  1093. else
  1094. graph.doExecuteChild(parentExtractSz, parentExtract);
  1095. CriticalBlock b(graphRunCrit);
  1096. if (!graph.queryOwner())
  1097. removeSubGraph(graph);
  1098. }
  1099. ///////////////
  1100. bool ensurePrimary(CActivityBase *activity, IPartDescriptor &partDesc, OwnedIFile & ifile, unsigned &location, StringBuffer &path)
  1101. {
  1102. StringBuffer locationName, primaryName;
  1103. RemoteFilename primaryRfn;
  1104. partDesc.getFilename(0, primaryRfn);
  1105. primaryRfn.getPath(primaryName);
  1106. OwnedIFile primaryIFile = createIFile(primaryName.str());
  1107. try
  1108. {
  1109. if (primaryIFile->exists())
  1110. {
  1111. location = 0;
  1112. ifile.set(primaryIFile);
  1113. path.append(primaryName);
  1114. return true;
  1115. }
  1116. }
  1117. catch (IException *e)
  1118. {
  1119. ActPrintLog(&activity->queryContainer(), e, "In ensurePrimary");
  1120. e->Release();
  1121. }
  1122. unsigned l;
  1123. for (l=1; l<partDesc.numCopies(); l++)
  1124. {
  1125. RemoteFilename altRfn;
  1126. partDesc.getFilename(l, altRfn);
  1127. locationName.clear();
  1128. altRfn.getPath(locationName);
  1129. assertex(locationName.length());
  1130. OwnedIFile backupIFile = createIFile(locationName.str());
  1131. try
  1132. {
  1133. if (backupIFile->exists())
  1134. {
  1135. if (primaryRfn.isLocal())
  1136. {
  1137. ensureDirectoryForFile(primaryIFile->queryFilename());
  1138. Owned<IException> e = MakeActivityWarning(activity, 0, "Primary file missing: %s, copying backup %s to primary location", primaryIFile->queryFilename(), locationName.str());
  1139. activity->fireException(e);
  1140. StringBuffer tmpName(primaryIFile->queryFilename());
  1141. tmpName.append(".tmp");
  1142. OwnedIFile tmpFile = createIFile(tmpName.str());
  1143. CFIPScope fipScope(tmpName.str());
  1144. copyFile(tmpFile, backupIFile);
  1145. try
  1146. {
  1147. tmpFile->rename(pathTail(primaryIFile->queryFilename()));
  1148. location = 0;
  1149. ifile.set(primaryIFile);
  1150. path.append(primaryName);
  1151. }
  1152. catch (IException *e)
  1153. {
  1154. try { tmpFile->remove(); } catch (IException *e) { ActPrintLog(&activity->queryContainer(), "Failed to delete temporary file"); e->Release(); }
  1155. Owned<IException> e2 = MakeActivityWarning(activity, e, "Failed to restore primary, failed to rename %s to %s", tmpName.str(), primaryIFile->queryFilename());
  1156. e->Release();
  1157. activity->fireException(e2);
  1158. ifile.set(backupIFile);
  1159. location = l;
  1160. path.append(locationName);
  1161. }
  1162. }
  1163. else // JCSMORE - should use daliservix perhaps to ensure primary
  1164. {
  1165. Owned<IException> e = MakeActivityWarning(activity, 0, "Primary file missing: %s, using remote copy: %s", primaryIFile->queryFilename(), locationName.str());
  1166. activity->fireException(e);
  1167. ifile.set(backupIFile);
  1168. location = l;
  1169. path.append(locationName);
  1170. }
  1171. return true;
  1172. }
  1173. }
  1174. catch (IException *e)
  1175. {
  1176. Owned<IThorException> e2 = MakeActivityException(activity, e);
  1177. e->Release();
  1178. throw e2.getClear();
  1179. }
  1180. }
  1181. return false;
  1182. }
  1183. class CEnsurePrimaryPartFile : public CInterface, implements IReplicatedFile
  1184. {
  1185. CActivityBase &activity;
  1186. Linked<IPartDescriptor> partDesc;
  1187. StringAttr logicalFilename;
  1188. Owned<IReplicatedFile> part;
  1189. public:
  1190. IMPLEMENT_IINTERFACE;
  1191. CEnsurePrimaryPartFile(CActivityBase &_activity, const char *_logicalFilename, IPartDescriptor *_partDesc)
  1192. : activity(_activity), logicalFilename(_logicalFilename), partDesc(_partDesc)
  1193. {
  1194. }
  1195. virtual IFile *open()
  1196. {
  1197. unsigned location;
  1198. OwnedIFile iFile;
  1199. StringBuffer filePath;
  1200. if (globals->getPropBool("@autoCopyBackup", true)?ensurePrimary(&activity, *partDesc, iFile, location, filePath):getBestFilePart(&activity, *partDesc, iFile, location, filePath, &activity))
  1201. return iFile.getClear();
  1202. else
  1203. {
  1204. StringBuffer locations;
  1205. IException *e = MakeActivityException(&activity, TE_FileNotFound, "No physical file part for logical file %s, found at given locations: %s (Error = %d)", logicalFilename.get(), getFilePartLocations(*partDesc, locations).str(), GetLastError());
  1206. EXCLOG(e, NULL);
  1207. throw e;
  1208. }
  1209. }
  1210. RemoteFilenameArray &queryCopies()
  1211. {
  1212. if(!part.get())
  1213. part.setown(partDesc->getReplicatedFile());
  1214. return part->queryCopies();
  1215. }
  1216. };
  1217. IReplicatedFile *createEnsurePrimaryPartFile(CActivityBase &activity, const char *logicalFilename, IPartDescriptor *partDesc)
  1218. {
  1219. return new CEnsurePrimaryPartFile(activity, logicalFilename, partDesc);
  1220. }
  1221. ///////////////
  1222. class CFileCache;
  1223. class CLazyFileIO : public CInterface, implements IFileIO, implements IDelayedFile
  1224. {
  1225. CFileCache &cache;
  1226. Owned<IReplicatedFile> repFile;
  1227. Linked<IExpander> expander;
  1228. bool compressed;
  1229. StringAttr filename;
  1230. CriticalSection crit;
  1231. Owned<IFileIO> iFileIO; // real IFileIO
  1232. void checkOpen(); // references CFileCache method
  1233. public:
  1234. IMPLEMENT_IINTERFACE;
  1235. CLazyFileIO(CFileCache &_cache, const char *_filename, IReplicatedFile *_repFile, bool _compressed, IExpander *_expander) : cache(_cache), filename(_filename), repFile(_repFile), compressed(_compressed), expander(_expander)
  1236. {
  1237. }
  1238. ~CLazyFileIO()
  1239. {
  1240. iFileIO.clear();
  1241. }
  1242. const char *queryFindString() const { return filename.get(); } // for string HT
  1243. // IFileIO impl.
  1244. virtual size32_t read(offset_t pos, size32_t len, void * data)
  1245. {
  1246. CriticalBlock b(crit);
  1247. checkOpen();
  1248. return iFileIO->read(pos, len, data);
  1249. }
  1250. virtual offset_t size()
  1251. {
  1252. CriticalBlock b(crit);
  1253. checkOpen();
  1254. return iFileIO->size();
  1255. }
  1256. virtual size32_t write(offset_t pos, size32_t len, const void * data)
  1257. {
  1258. CriticalBlock b(crit);
  1259. checkOpen();
  1260. return iFileIO->write(pos, len, data);
  1261. }
  1262. virtual void flush()
  1263. {
  1264. CriticalBlock b(crit);
  1265. if (iFileIO)
  1266. iFileIO->flush();
  1267. }
  1268. virtual void close()
  1269. {
  1270. CriticalBlock b(crit);
  1271. if (iFileIO)
  1272. iFileIO->close();
  1273. iFileIO.clear();
  1274. }
  1275. virtual offset_t appendFile(IFile *file,offset_t pos=0,offset_t len=(offset_t)-1)
  1276. {
  1277. CriticalBlock b(crit);
  1278. checkOpen();
  1279. return iFileIO->appendFile(file, pos, len);
  1280. }
  1281. virtual void setSize(offset_t size)
  1282. {
  1283. CriticalBlock b(crit);
  1284. checkOpen();
  1285. iFileIO->setSize(size);
  1286. }
  1287. // IDelayedFile impl.
  1288. virtual IMemoryMappedFile *queryMappedFile() { return NULL; }
  1289. virtual IFileIO *queryFileIO() { return this; }
  1290. };
  1291. class CFileCache : public CInterface, implements IThorFileCache
  1292. {
  1293. OwningStringSuperHashTableOf<CLazyFileIO> files;
  1294. CopyCIArrayOf<CLazyFileIO> openFiles;
  1295. unsigned limit, purgeN;
  1296. CriticalSection crit;
  1297. class CDelayedFileWapper : public CInterface, implements IDelayedFile
  1298. {
  1299. CFileCache &cache;
  1300. Linked<CLazyFileIO> lFile;
  1301. public:
  1302. IMPLEMENT_IINTERFACE;
  1303. CDelayedFileWapper(CFileCache &_cache, CLazyFileIO &_lFile) : cache(_cache), lFile(&_lFile) { }
  1304. ~CDelayedFileWapper()
  1305. {
  1306. cache.remove(*lFile);
  1307. }
  1308. // IDelayedFile impl.
  1309. virtual IMemoryMappedFile *queryMappedFile() { return lFile->queryMappedFile(); }
  1310. virtual IFileIO *queryFileIO() { return lFile->queryFileIO(); }
  1311. };
  1312. void purgeOldest()
  1313. {
  1314. // will be ordered oldest first.
  1315. unsigned count = 0;
  1316. CopyCIArrayOf<CLazyFileIO> toClose;
  1317. ForEachItemIn(o, openFiles)
  1318. {
  1319. CLazyFileIO &lFile = openFiles.item(o);
  1320. toClose.append(lFile);
  1321. if (++count>=purgeN) // crude for now, just remove oldest N
  1322. break;
  1323. }
  1324. ForEachItemIn(r, toClose)
  1325. {
  1326. CLazyFileIO &lFile = toClose.item(r);
  1327. lFile.close();
  1328. openFiles.zap(lFile);
  1329. }
  1330. }
  1331. bool _remove(CLazyFileIO &lFile)
  1332. {
  1333. bool ret = files.removeExact(&lFile);
  1334. if (!ret) return false;
  1335. openFiles.zap(lFile);
  1336. return true;
  1337. }
  1338. public:
  1339. IMPLEMENT_IINTERFACE;
  1340. CFileCache(unsigned _limit) : limit(_limit)
  1341. {
  1342. assertex(limit);
  1343. purgeN = globals->getPropInt("@fileCachePurgeN", 10);
  1344. if (purgeN > limit) purgeN=limit; // why would it be, but JIC.
  1345. PROGLOG("FileCache: limit = %d, purgeN = %d", limit, purgeN);
  1346. }
  1347. void opening(CLazyFileIO &lFile)
  1348. {
  1349. CriticalBlock b(crit);
  1350. if (openFiles.ordinality() >= limit)
  1351. {
  1352. purgeOldest(); // will close purgeN
  1353. assertex(openFiles.ordinality() < limit);
  1354. }
  1355. openFiles.zap(lFile);
  1356. openFiles.append(lFile);
  1357. }
  1358. // IThorFileCache impl.
  1359. virtual bool remove(IDelayedFile &dFile)
  1360. {
  1361. CLazyFileIO *lFile = QUERYINTERFACE(&dFile, CLazyFileIO);
  1362. assertex(lFile);
  1363. CriticalBlock b(crit);
  1364. return _remove(*lFile);
  1365. }
  1366. virtual IDelayedFile *lookup(CActivityBase &activity, IPartDescriptor &partDesc, IExpander *expander)
  1367. {
  1368. StringBuffer filename;
  1369. RemoteFilename rfn;
  1370. partDesc.getFilename(0, rfn);
  1371. rfn.getPath(filename);
  1372. CriticalBlock b(crit);
  1373. Linked<CLazyFileIO> file = files.find(filename.str());
  1374. if (!file)
  1375. {
  1376. Owned<IReplicatedFile> repFile = createEnsurePrimaryPartFile(activity, filename.str(), &partDesc);
  1377. bool compressed = partDesc.queryOwner().isCompressed();
  1378. file.setown(new CLazyFileIO(*this, filename.str(), repFile.getClear(), compressed, expander));
  1379. }
  1380. files.replace(*LINK(file));
  1381. return new CDelayedFileWapper(*this, *file); // to avoid circular dependency and allow destruction to remove from cache
  1382. }
  1383. };
  1384. ////
  1385. void CLazyFileIO::checkOpen()
  1386. {
  1387. CriticalBlock b(crit);
  1388. if (iFileIO)
  1389. return;
  1390. cache.opening(*this);
  1391. Owned<IFile> iFile = repFile->open();
  1392. if (NULL != expander.get())
  1393. iFileIO.setown(createCompressedFileReader(iFile, expander));
  1394. else if (compressed)
  1395. iFileIO.setown(createCompressedFileReader(iFile));
  1396. else
  1397. iFileIO.setown(iFile->open(IFOread));
  1398. if (!iFileIO.get())
  1399. throw MakeThorException(0, "CLazyFileIO: failed to open: %s", filename.get());
  1400. }
  1401. IThorFileCache *createFileCache(unsigned limit)
  1402. {
  1403. return new CFileCache(limit);
  1404. }