thgraphmaster.cpp 96 KB


  1. /*##############################################################################
  2. HPCC SYSTEMS software Copyright (C) 2012 HPCC Systems.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. ############################################################################## */
  13. #include <limits.h>
  14. #include <stdlib.h>
  15. #include "jprop.hpp"
  16. #include "jexcept.hpp"
  17. #include "jiter.ipp"
  18. #include "jlzw.hpp"
  19. #include "jsocket.hpp"
  20. #include "jset.hpp"
  21. #include "jsort.hpp"
  22. #include "portlist.h"
  23. #include "jhtree.hpp"
  24. #include "mputil.hpp"
  25. #include "dllserver.hpp"
  26. #include "dautils.hpp"
  27. #include "danqs.hpp"
  28. #include "daclient.hpp"
  29. #include "daaudit.hpp"
  30. #include "wujobq.hpp"
  31. #include "thorport.hpp"
  32. #include "commonext.hpp"
  33. #include "thorxmlread.hpp"
  34. #include "thorplugin.hpp"
  35. #include "thormisc.hpp"
  36. #include "thgraphmaster.ipp"
  37. #include "thdemonserver.hpp"
  38. #include "rtlds_imp.hpp"
  39. #include "eclhelper.hpp"
  40. #include "thexception.hpp"
  41. #include "thactivitymaster.ipp"
  42. #include "thmem.hpp"
  43. #include "thcompressutil.hpp"
  44. static CriticalSection *jobManagerCrit;
  45. MODULE_INIT(INIT_PRIORITY_STANDARD)
  46. {
  47. jobManagerCrit = new CriticalSection;
  48. return true;
  49. }
  50. MODULE_EXIT()
  51. {
  52. delete jobManagerCrit;
  53. }
  54. unsigned uniqGraphId = 1;
  55. #define FATAL_TIMEOUT 60
  56. class CFatalHandler : public CTimeoutTrigger, implements IFatalHandler
  57. {
  58. public:
  59. IMPLEMENT_IINTERFACE;
  60. CFatalHandler(unsigned timeout) : CTimeoutTrigger(timeout, "EXCEPTION")
  61. {
  62. }
  63. virtual bool action()
  64. {
  65. StringBuffer s("FAILED TO RECOVER FROM EXCEPTION, STOPPING THOR");
  66. FLLOG(MCoperatorWarning, thorJob, exception, s.str());
  67. Owned<IJobManager> jobManager = getJobManager();
  68. if (jobManager)
  69. {
  70. jobManager->fatal(exception);
  71. jobManager.clear();
  72. }
  73. return true;
  74. }
  75. // IFatalHandler
  76. virtual void inform(IException *e)
  77. {
  78. CTimeoutTrigger::inform(e);
  79. }
  80. virtual void clear()
  81. {
  82. CTimeoutTrigger::clear();
  83. }
  84. };
  85. /////
  86. CSlaveMessageHandler::CSlaveMessageHandler(CJobMaster &_job, mptag_t _mptag) : threaded("CSlaveMessageHandler"), job(_job), mptag(_mptag)
  87. {
  88. stopped = false;
  89. threaded.init(this);
  90. }
  91. CSlaveMessageHandler::~CSlaveMessageHandler()
  92. {
  93. stop();
  94. }
  95. void CSlaveMessageHandler::stop()
  96. {
  97. if (!stopped)
  98. {
  99. stopped = true;
  100. job.queryJobComm().cancel(0, mptag);
  101. threaded.join();
  102. }
  103. }
  104. void CSlaveMessageHandler::main()
  105. {
  106. try
  107. {
  108. loop
  109. {
  110. rank_t sender;
  111. CMessageBuffer msg;
  112. if (stopped || !job.queryJobComm().recv(msg, RANK_ALL, mptag, &sender))
  113. break;
  114. unsigned slave = ((unsigned)sender)-1;
  115. SlaveMsgTypes msgType;
  116. msg.read((int &)msgType);
  117. switch (msgType)
  118. {
  119. case smt_errorMsg:
  120. {
  121. Owned<IThorException> e = deserializeThorException(msg);
  122. e->setSlave(sender);
  123. Owned<CGraphBase> graph = job.getGraph(e->queryGraphId());
  124. if (graph)
  125. {
  126. activity_id id = e->queryActivityId();
  127. if (id)
  128. {
  129. CGraphElementBase *elem = graph->queryElement(id);
  130. CActivityBase *act = elem->queryActivity();
  131. if (act)
  132. act->fireException(e);
  133. else
  134. graph->fireException(e);
  135. }
  136. else
  137. graph->fireException(e);
  138. }
  139. else
  140. job.fireException(e);
  141. if (msg.getReplyTag() <= TAG_REPLY_BASE)
  142. {
  143. msg.clear();
  144. job.queryJobComm().reply(msg);
  145. }
  146. break;
  147. }
  148. case smt_dataReq:
  149. {
  150. graph_id gid;
  151. activity_id id;
  152. unsigned slave;
  153. msg.read(slave);
  154. msg.read(gid);
  155. msg.read(id);
  156. msg.clear();
  157. Owned<CGraphBase> graph = job.getGraph(gid);
  158. if (graph)
  159. {
  160. CMasterGraphElement *e = (CMasterGraphElement *)graph->queryElement(id);
  161. e->queryActivity()->getInitializationData(slave, msg);
  162. }
  163. job.queryJobComm().reply(msg);
  164. break;
  165. }
  166. case smt_initGraphReq:
  167. {
  168. graph_id gid;
  169. msg.read(gid);
  170. Owned<CMasterGraph> graph = (CMasterGraph *)job.getGraph(gid);
  171. assertex(graph);
  172. {
  173. CriticalBlock b(graph->queryCreateLock());
  174. Owned<IThorActivityIterator> iter = graph->getIterator();
  175. // onCreate all
  176. ForEach (*iter)
  177. {
  178. CMasterGraphElement &element = (CMasterGraphElement &)iter->query();
  179. element.onCreate();
  180. }
  181. }
  182. msg.clear();
  183. graph->serializeCreateContexts(msg);
  184. job.queryJobComm().reply(msg);
  185. break;
  186. }
  187. case smt_initActDataReq:
  188. {
  189. graph_id gid;
  190. msg.read(gid);
  191. Owned<CMasterGraph> graph = (CMasterGraph *)job.getGraph(gid);
  192. assertex(graph);
  193. CGraphElementArray toSerialize;
  194. CriticalBlock b(graph->queryCreateLock());
  195. size32_t parentExtractSz;
  196. msg.read(parentExtractSz);
  197. const byte *parentExtract = NULL;
  198. if (parentExtractSz)
  199. {
  200. parentExtract = msg.readDirect(parentExtractSz);
  201. StringBuffer msg("Graph(");
  202. msg.append(graph->queryGraphId()).append(") - initializing master graph with parentExtract ").append(parentExtractSz).append(" bytes");
  203. DBGLOG("%s", msg.str());
  204. parentExtract = graph->setParentCtx(parentExtractSz, parentExtract);
  205. }
  206. loop
  207. {
  208. activity_id id;
  209. msg.read(id);
  210. if (!id)
  211. break;
  212. CMasterGraphElement *element = (CMasterGraphElement *)graph->queryElement(id);
  213. assertex(element);
  214. element->deserializeStartContext(msg);
  215. element->doCreateActivity(parentExtractSz, parentExtract);
  216. CActivityBase *activity = element->queryActivity();
  217. if (activity && activity->needReInit())
  218. element->sentActInitData->set(slave, 0); // clear to permit serializeActivityInitData to resend
  219. toSerialize.append(*LINK(element));
  220. }
  221. msg.clear();
  222. CMessageBuffer replyMsg;
  223. mptag_t replyTag = createReplyTag();
  224. msg.append(replyTag); // second reply
  225. replyMsg.setReplyTag(replyTag);
  226. CGraphElementArrayIterator iter(toSerialize);
  227. graph->serializeActivityInitData(((unsigned)sender)-1, msg, iter);
  228. job.queryJobComm().reply(msg);
  229. if (!job.queryJobComm().recv(msg, sender, replyTag, NULL, MEDIUMTIMEOUT))
  230. throwUnexpected();
  231. bool error;
  232. msg.read(error);
  233. if (error)
  234. {
  235. Owned<IThorException> e = deserializeThorException(msg);
  236. e->setSlave(sender);
  237. StringBuffer tmpStr("Slave ");
  238. job.queryJobGroup().queryNode(sender).endpoint().getUrlStr(tmpStr);
  239. GraphPrintLog(graph, e, "%s", tmpStr.append(": slave initialization error").str());
  240. throw e.getClear();
  241. }
  242. break;
  243. }
  244. case smt_getPhysicalName:
  245. {
  246. LOG(MCdebugProgress, unknownJob, "getPhysicalName called from slave %d", sender-1);
  247. StringAttr logicalName;
  248. unsigned partNo;
  249. bool create;
  250. msg.read(logicalName);
  251. msg.read(partNo);
  252. msg.read(create);
  253. msg.clear();
  254. StringBuffer phys;
  255. if (create && !job.queryCreatedFile(logicalName)) // not sure who would do this ever??
  256. queryThorFileManager().getPublishPhysicalName(job, logicalName, partNo, phys);
  257. else
  258. queryThorFileManager().getPhysicalName(job, logicalName, partNo, phys);
  259. msg.append(phys);
  260. break;
  261. }
  262. case smt_getFileOffset:
  263. {
  264. LOG(MCdebugProgress, unknownJob, "getFileOffset called from slave %d", sender-1);
  265. StringAttr logicalName;
  266. unsigned partNo;
  267. msg.read(logicalName);
  268. msg.read(partNo);
  269. msg.clear();
  270. offset_t offset = queryThorFileManager().getFileOffset(job, logicalName, partNo);
  271. msg.append(offset);
  272. job.queryJobComm().reply(msg);
  273. break;
  274. }
  275. case smt_actMsg:
  276. {
  277. LOG(MCdebugProgress, unknownJob, "smt_actMsg called from slave %d", sender-1);
  278. graph_id gid;
  279. msg.read(gid);
  280. activity_id id;
  281. msg.read(id);
  282. Owned<CMasterGraph> graph = (CMasterGraph *)job.getGraph(gid);
  283. assertex(graph);
  284. CMasterGraphElement *container = (CMasterGraphElement *)graph->queryElement(id);
  285. assertex(container);
  286. CMasterActivity *activity = (CMasterActivity *)container->queryActivity();
  287. assertex(activity);
  288. activity->handleSlaveMessage(msg); // don't block
  289. break;
  290. }
  291. case smt_getresult:
  292. {
  293. LOG(MCdebugProgress, unknownJob, "smt_getresult called from slave %d", sender-1);
  294. graph_id gid;
  295. msg.read(gid);
  296. activity_id ownerId;
  297. msg.read(ownerId);
  298. unsigned resultId;
  299. msg.read(resultId);
  300. mptag_t replyTag = job.deserializeMPTag(msg);
  301. Owned<IThorResult> result = job.getOwnedResult(gid, ownerId, resultId);
  302. Owned<IRowStream> resultStream = result->getRowStream();
  303. sendInChunks(job.queryJobComm(), sender, replyTag, resultStream, result->queryRowInterfaces());
  304. break;
  305. }
  306. }
  307. }
  308. }
  309. catch (IException *e)
  310. {
  311. job.fireException(e);
  312. e->Release();
  313. }
  314. }
  315. //////////////////////
  316. CMasterActivity::CMasterActivity(CGraphElementBase *_container) : CActivityBase(_container), threaded("CMasterActivity", this)
  317. {
  318. notedWarnings = createBitSet();
  319. mpTag = TAG_NULL;
  320. data = new MemoryBuffer[container.queryJob().querySlaves()];
  321. asyncStart = false;
  322. if (container.isSink())
  323. progressInfo.append(*new ProgressInfo);
  324. else
  325. {
  326. unsigned o=0;
  327. for (; o<container.getOutputs(); o++)
  328. progressInfo.append(*new ProgressInfo);
  329. }
  330. }
  331. CMasterActivity::~CMasterActivity()
  332. {
  333. if (asyncStart)
  334. threaded.join();
  335. notedWarnings->Release();
  336. container.queryJob().freeMPTag(mpTag);
  337. delete [] data;
  338. }
  339. MemoryBuffer &CMasterActivity::queryInitializationData(unsigned slave) const
  340. { // NB: not intended to be called by multiple threads.
  341. return data[slave].reset();
  342. }
  343. MemoryBuffer &CMasterActivity::getInitializationData(unsigned slave, MemoryBuffer &dst) const
  344. {
  345. return dst.append(data[slave]);
  346. }
  347. void CMasterActivity::main()
  348. {
  349. try
  350. {
  351. process();
  352. }
  353. catch (IException *e)
  354. {
  355. Owned<IException> e2;
  356. if (QUERYINTERFACE(e, ISEH_Exception))
  357. e2.setown(MakeThorFatal(e, TE_SEH, "(SEH)"));
  358. else
  359. e2.setown(MakeActivityException(this, e, "Master exception"));
  360. e->Release();
  361. ActPrintLog(e2, NULL);
  362. fireException(e2);
  363. }
  364. catch (CATCHALL)
  365. {
  366. Owned<IException> e = MakeThorFatal(NULL, TE_MasterProcessError, "FATAL: Unknown master process exception kind=%s, id=%"ACTPF"d", activityKindStr(container.getKind()), container.queryId());
  367. ActPrintLog(e, NULL);
  368. fireException(e);
  369. }
  370. }
  371. void CMasterActivity::startProcess(bool async)
  372. {
  373. if (async)
  374. {
  375. asyncStart = true;
  376. threaded.start();
  377. }
  378. else
  379. main();
  380. }
  381. bool CMasterActivity::wait(unsigned timeout)
  382. {
  383. if (!asyncStart)
  384. return true;
  385. return threaded.join(timeout);
  386. }
  387. bool CMasterActivity::fireException(IException *_e)
  388. {
  389. IThorException *e = QUERYINTERFACE(_e, IThorException);
  390. if (!e) return false;
  391. switch (e->errorCode())
  392. {
  393. case TE_LargeBufferWarning:
  394. case TE_MoxieIndarOverflow:
  395. case TE_BuildIndexFewExcess:
  396. case TE_FetchMisaligned:
  397. case TE_FetchOutOfRange:
  398. case TE_CouldNotCreateLookAhead:
  399. case TE_SpillAdded:
  400. case TE_ReadPartialFromPipe:
  401. case TE_LargeAggregateTable:
  402. {
  403. if (!notedWarnings->testSet(e->errorCode()))
  404. reportExceptionToWorkunit(container.queryJob().queryWorkUnit(), e);
  405. return true;
  406. }
  407. }
  408. return container.queryOwner().fireException(e);
  409. }
  410. void CMasterActivity::reset()
  411. {
  412. asyncStart = false;
  413. CActivityBase::reset();
  414. }
  415. void CMasterActivity::deserializeStats(unsigned node, MemoryBuffer &mb)
  416. {
  417. CriticalBlock b(progressCrit); // don't think needed
  418. unsigned __int64 localTimeNs;
  419. mb.read(localTimeNs);
  420. timingInfo.set(node, localTimeNs/1000000); // to milliseconds
  421. rowcount_t count;
  422. ForEachItemIn(p, progressInfo)
  423. {
  424. mb.read(count);
  425. progressInfo.item(p).set(node, count);
  426. }
  427. }
  428. void CMasterActivity::getXGMML(IWUGraphProgress *progress, IPropertyTree *node)
  429. {
  430. timingInfo.getXGMML(node);
  431. }
  432. void CMasterActivity::getXGMML(unsigned idx, IPropertyTree *edge)
  433. {
  434. CriticalBlock b(progressCrit);
  435. if (progressInfo.isItem(idx))
  436. progressInfo.item(idx).getXGMML(edge);
  437. }
  438. //////////////////////
  439. // CMasterGraphElement impl.
  440. //
  441. CMasterGraphElement::CMasterGraphElement(CGraphBase &_owner, IPropertyTree &_xgmml) : CGraphElementBase(_owner, _xgmml)
  442. {
  443. sentCreateCtx = false;
  444. }
  445. bool CMasterGraphElement::checkUpdate()
  446. {
  447. if (!onlyUpdateIfChanged)
  448. return false;
  449. if (!globals->getPropBool("@updateEnabled", true) || 0 != queryJob().getWorkUnitValueInt("disableUpdate", 0))
  450. return false;
  451. bool doCheckUpdate = false;
  452. StringAttr filename;
  453. unsigned eclCRC;
  454. unsigned __int64 totalCRC;
  455. bool temporary = false;
  456. switch (getKind())
  457. {
  458. case TAKindexwrite:
  459. {
  460. IHThorIndexWriteArg *helper = (IHThorIndexWriteArg *)queryHelper();
  461. doCheckUpdate = 0 != (helper->getFlags() & TIWupdate);
  462. filename.set(helper->getFileName());
  463. helper->getUpdateCRCs(eclCRC, totalCRC);
  464. break;
  465. }
  466. case TAKdiskwrite:
  467. case TAKcsvwrite:
  468. case TAKxmlwrite:
  469. {
  470. IHThorDiskWriteArg *helper = (IHThorDiskWriteArg *)queryHelper();
  471. doCheckUpdate = 0 != (helper->getFlags() & TDWupdate);
  472. filename.set(helper->getFileName());
  473. helper->getUpdateCRCs(eclCRC, totalCRC);
  474. if (TAKdiskwrite == getKind())
  475. temporary = 0 != (helper->getFlags() & (TDXtemporary|TDXjobtemp));
  476. break;
  477. }
  478. }
  479. if (doCheckUpdate)
  480. {
  481. StringAttr lfn;
  482. Owned<IDistributedFile> file = queryThorFileManager().lookup(queryJob(), filename, temporary, true);
  483. if (file)
  484. {
  485. IPropertyTree &props = file->queryAttributes();
  486. if ((eclCRC == props.getPropInt("@eclCRC")) && (totalCRC == props.getPropInt64("@totalCRC")))
  487. {
  488. // so this needs pruning
  489. Owned<IThorException> e = MakeActivityWarning(this, TE_UpToDate, "output file = '%s' - is up to date - it will not be rebuilt", file->queryLogicalName());
  490. queryOwner().fireException(e);
  491. return true;
  492. }
  493. }
  494. }
  495. return false;
  496. }
  497. void CMasterGraphElement::initActivity()
  498. {
  499. CriticalBlock b(crit);
  500. bool first = (NULL == activity);
  501. CGraphElementBase::initActivity();
  502. if (first || activity->needReInit())
  503. ((CMasterActivity *)activity.get())->init();
  504. }
  505. void CMasterGraphElement::doCreateActivity(size32_t parentExtractSz, const byte *parentExtract)
  506. {
  507. bool ok=false;
  508. switch (getKind())
  509. {
  510. case TAKspill:
  511. case TAKdiskwrite:
  512. case TAKfetch:
  513. case TAKkeyedjoin:
  514. case TAKworkunitwrite:
  515. case TAKworkunitread:
  516. ok = true;
  517. break;
  518. default:
  519. {
  520. if (isDiskInput(getKind()))
  521. ok = true;
  522. else if (!queryLocalOrGrouped())
  523. ok = true;
  524. break;
  525. }
  526. }
  527. if (!ok)
  528. return;
  529. onCreate();
  530. if (isDiskInput(getKind()))
  531. onStart(parentExtractSz, parentExtract);
  532. initActivity();
  533. }
  534. void CMasterGraphElement::slaveDone(size32_t slaveIdx, MemoryBuffer &mb)
  535. {
  536. ((CMasterActivity *)activity.get())->slaveDone(slaveIdx, mb);
  537. }
  538. //////
  539. ///////
  540. class CBarrierMaster : public CInterface, implements IBarrier
  541. {
  542. mptag_t tag;
  543. Linked<ICommunicator> comm;
  544. bool receiving;
  545. public:
  546. IMPLEMENT_IINTERFACE;
  547. CBarrierMaster(ICommunicator &_comm, mptag_t _tag) : comm(&_comm), tag(_tag)
  548. {
  549. receiving = false;
  550. }
  551. virtual const mptag_t queryTag() const { return tag; }
  552. virtual bool wait(bool exception, unsigned timeout)
  553. {
  554. CTimeMon tm(timeout);
  555. unsigned s=comm->queryGroup().ordinality()-1;
  556. bool aborted = false;
  557. CMessageBuffer msg;
  558. Owned<IBitSet> raisedSet = createBitSet();
  559. unsigned remaining = timeout;
  560. while (s--)
  561. {
  562. rank_t sender;
  563. msg.clear();
  564. if (INFINITE != timeout && tm.timedout(&remaining))
  565. {
  566. if (exception)
  567. throw createBarrierAbortException();
  568. else
  569. return false;
  570. }
  571. {
  572. BooleanOnOff onOff(receiving);
  573. if (!comm->recv(msg, RANK_ALL, tag, &sender, remaining))
  574. break;
  575. }
  576. msg.read(aborted);
  577. sender = sender - 1; // 0 = master
  578. if (raisedSet->testSet(sender, true) && !aborted)
  579. WARNLOG("CBarrierMaster, raise barrier message on tag %d, already received from slave %d", tag, sender);
  580. if (aborted) break;
  581. }
  582. msg.clear();
  583. msg.append(aborted);
  584. if (INFINITE != timeout && tm.timedout(&remaining))
  585. {
  586. if (exception)
  587. throw createBarrierAbortException();
  588. else
  589. return false;
  590. }
  591. if (!comm->send(msg, RANK_ALL_OTHER, tag, INFINITE != timeout ? remaining : LONGTIMEOUT))
  592. throw MakeStringException(0, "CBarrierMaster::wait - Timeout sending to slaves");
  593. if (aborted)
  594. {
  595. if (exception)
  596. throw createBarrierAbortException();
  597. else
  598. return false;
  599. }
  600. return true;
  601. }
  602. virtual void cancel()
  603. {
  604. if (receiving)
  605. comm->cancel(RANK_ALL, tag);
  606. CMessageBuffer msg;
  607. msg.append(true);
  608. if (!comm->send(msg, RANK_ALL_OTHER, tag, LONGTIMEOUT))
  609. throw MakeStringException(0, "CBarrierMaster::cancel - Timeout sending to slaves");
  610. }
  611. };
  612. /////////////
  613. class CMasterGraphTempHandler : public CGraphTempHandler
  614. {
  615. public:
  616. CMasterGraphTempHandler(CJobBase &job, bool errorOnMissing) : CGraphTempHandler(job, errorOnMissing) { }
  617. virtual bool removeTemp(const char *name)
  618. {
  619. queryThorFileManager().clearCacheEntry(name);
  620. return true;
  621. }
  622. virtual void registerFile(const char *name, graph_id graphId, unsigned usageCount, bool temp, WUFileKind fileKind, StringArray *clusters)
  623. {
  624. if (!temp || job.queryUseCheckpoints())
  625. {
  626. StringBuffer scopedName;
  627. queryThorFileManager().addScope(job, name, scopedName, temp || fileKind==WUFileJobOwned);
  628. Owned<IWorkUnit> wu = &job.queryWorkUnit().lock();
  629. wu->addFile(scopedName.str(), clusters, usageCount, fileKind, job.queryGraphName());
  630. }
  631. else
  632. CGraphTempHandler::registerFile(name, graphId, usageCount, temp, fileKind, clusters);
  633. }
  634. virtual void deregisterFile(const char *name, bool kept) // NB: only called for temp files
  635. {
  636. if (kept || job.queryUseCheckpoints())
  637. {
  638. StringBuffer scopedName;
  639. queryThorFileManager().addScope(job, name, scopedName, kept, kept);
  640. Owned<IWorkUnit> wu = &job.queryWorkUnit().lock();
  641. wu->releaseFile(scopedName.str());
  642. }
  643. else
  644. CGraphTempHandler::deregisterFile(name);
  645. }
  646. virtual void clearTemps()
  647. {
  648. try
  649. {
  650. if (!job.queryPausing()) // temps of completed workunit will have been preserved and want to keep
  651. {
  652. Owned<IWorkUnit> lwu = &job.queryWorkUnit().lock();
  653. lwu->deleteTempFiles(job.queryGraphName(), false, false);
  654. }
  655. }
  656. catch (IException *e)
  657. {
  658. EXCLOG(e, "Problem deleting temp files");
  659. e->Release();
  660. }
  661. CGraphTempHandler::clearTemps();
  662. }
  663. };
  664. static const char * getResultText(StringBuffer & s, const char * stepname, unsigned sequence)
  665. {
  666. switch ((int)sequence)
  667. {
  668. case -1: return s.append("STORED('").append(stepname).append("')");
  669. case -2: return s.append("PERSIST('").append(stepname).append("')");
  670. case -3: return s.append("global('").append(stepname).append("')");
  671. default:
  672. if (stepname)
  673. return s.append(stepname);
  674. return s.append('#').append(sequence);
  675. }
  676. }
  677. class CThorCodeContextMaster : public CThorCodeContextBase
  678. {
  679. Linked<IConstWorkUnit> workunit;
  680. Owned<IDistributedFileTransaction> superfiletransaction;
  681. IWorkUnit *updateWorkUnit()
  682. {
  683. StringAttr wuid;
  684. workunit->getWuid(StringAttrAdaptor(wuid));
  685. Owned<IWorkUnitFactory> factory = getWorkUnitFactory();
  686. return factory->updateWorkUnit(wuid);
  687. }
  688. IWUResult *updateResult(const char *name, unsigned sequence)
  689. {
  690. Owned<IWorkUnit> w = updateWorkUnit();
  691. return updateWorkUnitResult(w, name, sequence);
  692. }
  693. IConstWUResult * getResult(const char * name, unsigned sequence)
  694. {
  695. return getWorkUnitResult(workunit, name, sequence);
  696. }
  697. #define PROTECTED_GETRESULT(STEPNAME, SEQUENCE, KIND, KINDTEXT, ACTION) \
  698. LOG(MCdebugProgress, unknownJob, "getResult%s(%s,%d)", KIND, STEPNAME?STEPNAME:"", SEQUENCE); \
  699. Owned<IConstWUResult> r = getResultForGet(STEPNAME, SEQUENCE); \
  700. try \
  701. { \
  702. ACTION \
  703. } \
  704. catch (IException * e) { \
  705. StringBuffer s, text; e->errorMessage(text); e->Release(); \
  706. throw MakeStringException(TE_FailedToRetrieveWorkunitValue, "result %s in workunit contains an invalid " KINDTEXT " value [%s]", getResultText(s, STEPNAME, SEQUENCE), text.str()); \
  707. } \
  708. catch (CATCHALL) { StringBuffer s; throw MakeStringException(TE_FailedToRetrieveWorkunitValue, "value %s in workunit contains an invalid " KINDTEXT " value", getResultText(s, STEPNAME, SEQUENCE)); }
  709. public:
  710. CThorCodeContextMaster(CJobBase &job, IConstWorkUnit &_workunit, ILoadedDllEntry &querySo, IUserDescriptor &userDesc) : CThorCodeContextBase(job, querySo, userDesc), workunit(&_workunit)
  711. {
  712. }
  713. // ICodeContext
  714. virtual void setResultBool(const char *name, unsigned sequence, bool result)
  715. {
  716. Owned<IWUResult> r = updateResult(name, sequence);
  717. if (r)
  718. {
  719. r->setResultBool(result);
  720. r->setResultStatus(ResultStatusCalculated);
  721. }
  722. else
  723. throw MakeStringException(TE_UnexpectedParameters, "Unexpected parameters to setResultBool");
  724. }
  725. virtual void setResultData(const char *name, unsigned sequence, int len, const void *result)
  726. {
  727. Owned<IWUResult> r = updateResult(name, sequence);
  728. if (r)
  729. {
  730. r->setResultData(result, len);
  731. r->setResultStatus(ResultStatusCalculated);
  732. }
  733. else
  734. throw MakeStringException(TE_UnexpectedParameters, "Unexpected parameters to setResultData");
  735. }
  736. virtual void setResultDecimal(const char * name, unsigned sequence, int len, int precision, bool isSigned, const void *val)
  737. {
  738. Owned<IWUResult> r = updateResult(name, sequence);
  739. if (r)
  740. {
  741. r->setResultDecimal(val, len);
  742. r->setResultStatus(ResultStatusCalculated);
  743. }
  744. else
  745. throw MakeStringException(TE_UnexpectedParameters, "Unexpected parameters to setResultDecimal");
  746. }
  747. virtual void setResultInt(const char *name, unsigned sequence, __int64 result)
  748. {
  749. Owned<IWUResult> r = updateResult(name, sequence);
  750. if (r)
  751. {
  752. r->setResultInt(result);
  753. r->setResultStatus(ResultStatusCalculated);
  754. }
  755. else
  756. throw MakeStringException(TE_UnexpectedParameters, "Unexpected parameters to setResultInt");
  757. }
  758. virtual void setResultRaw(const char *name, unsigned sequence, int len, const void *result)
  759. {
  760. Owned<IWUResult> r = updateResult(name, sequence);
  761. if (r)
  762. {
  763. r->setResultRaw(len, result, ResultFormatRaw);
  764. r->setResultStatus(ResultStatusCalculated);
  765. }
  766. else
  767. throw MakeStringException(TE_UnexpectedParameters, "Unexpected parameters to setResultData");
  768. }
  769. virtual void setResultReal(const char *name, unsigned sequence, double result)
  770. {
  771. Owned<IWUResult> r = updateResult(name, sequence);
  772. if (r)
  773. {
  774. r->setResultReal(result);
  775. r->setResultStatus(ResultStatusCalculated);
  776. }
  777. else
  778. throw MakeStringException(TE_UnexpectedParameters, "Unexpected parameters to setResultReal");
  779. }
  780. virtual void setResultSet(const char *name, unsigned sequence, bool isAll, size32_t len, const void *result, ISetToXmlTransformer *)
  781. {
  782. Owned<IWUResult> r = updateResult(name, sequence);
  783. if (r)
  784. {
  785. r->setResultIsAll(isAll);
  786. r->setResultRaw(len, result, ResultFormatRaw);
  787. r->setResultStatus(ResultStatusCalculated);
  788. }
  789. else
  790. throw MakeStringException(TE_UnexpectedParameters, "Unexpected parameters to setResultData");
  791. }
  792. virtual void setResultString(const char *name, unsigned sequence, int len, const char *result)
  793. {
  794. Owned<IWUResult> r = updateResult(name, sequence);
  795. if (r)
  796. {
  797. r->setResultString(result, len);
  798. r->setResultStatus(ResultStatusCalculated);
  799. }
  800. else
  801. throw MakeStringException(TE_UnexpectedParameters, "Unexpected parameters to setResultString");
  802. }
  803. virtual void setResultUnicode(const char * name, unsigned sequence, int len, UChar const * result)
  804. {
  805. Owned<IWUResult> r = updateResult(name, sequence);
  806. if (r)
  807. {
  808. r->setResultUnicode(result, len);
  809. r->setResultStatus(ResultStatusCalculated);
  810. }
  811. else
  812. throw MakeStringException(TE_UnexpectedParameters, "Unexpected parameters to setResultUnicode");
  813. }
  814. virtual void setResultVarString(const char * name, unsigned sequence, const char *result)
  815. {
  816. Owned<IWUResult> r = updateResult(name, sequence);
  817. if (r)
  818. {
  819. r->setResultString(result, strlen(result));
  820. r->setResultStatus(ResultStatusCalculated);
  821. }
  822. else
  823. throw MakeStringException(TE_UnexpectedParameters, "Unexpected parameters to setResultVarString");
  824. }
  825. virtual void setResultUInt(const char *name, unsigned sequence, unsigned __int64 result)
  826. {
  827. Owned<IWUResult> r = updateResult(name, sequence);
  828. if (r)
  829. {
  830. r->setResultUInt(result);
  831. r->setResultStatus(ResultStatusCalculated);
  832. }
  833. else
  834. throw MakeStringException(TE_UnexpectedParameters, "Unexpected parameters to setResultUInt");
  835. }
  836. virtual void setResultVarUnicode(const char * stepname, unsigned sequence, UChar const *val)
  837. {
  838. setResultUnicode(stepname, sequence, rtlUnicodeStrlen(val), val);
  839. }
  840. virtual bool getResultBool(const char * stepname, unsigned sequence)
  841. {
  842. PROTECTED_GETRESULT(stepname, sequence, "Bool", "bool",
  843. return r->getResultBool();
  844. );
  845. }
  846. virtual void getResultData(unsigned & tlen, void * & tgt, const char * stepname, unsigned sequence)
  847. {
  848. PROTECTED_GETRESULT(stepname, sequence, "Data", "data",
  849. SCMStringBuffer result;
  850. r->getResultString(result);
  851. tlen = result.length();
  852. tgt = (char *)result.s.detach();
  853. );
  854. }
  855. virtual void getResultDecimal(unsigned tlen, int precision, bool isSigned, void * tgt, const char * stepname, unsigned sequence)
  856. {
  857. PROTECTED_GETRESULT(stepname, sequence, "Decimal", "decimal",
  858. r->getResultDecimal(tgt, tlen, precision, isSigned);
  859. );
  860. }
  861. virtual void getResultRaw(unsigned & tlen, void * & tgt, const char * stepname, unsigned sequence, IXmlToRowTransformer * xmlTransformer, ICsvToRowTransformer * csvTransformer)
  862. {
  863. tgt = NULL;
  864. PROTECTED_GETRESULT(stepname, sequence, "Raw", "raw",
  865. Variable2IDataVal result(&tlen, &tgt);
  866. Owned<IXmlToRawTransformer> rawXmlTransformer = createXmlRawTransformer(xmlTransformer);
  867. Owned<ICsvToRawTransformer> rawCsvTransformer = createCsvRawTransformer(csvTransformer);
  868. r->getResultRaw(result, rawXmlTransformer, rawCsvTransformer);
  869. );
  870. }
  871. virtual void getResultSet(bool & isAll, unsigned & tlen, void * & tgt, const char * stepname, unsigned sequence, IXmlToRowTransformer * xmlTransformer, ICsvToRowTransformer * csvTransformer)
  872. {
  873. tgt = NULL;
  874. PROTECTED_GETRESULT(stepname, sequence, "Raw", "raw",
  875. Variable2IDataVal result(&tlen, &tgt);
  876. Owned<IXmlToRawTransformer> rawXmlTransformer = createXmlRawTransformer(xmlTransformer);
  877. Owned<ICsvToRawTransformer> rawCsvTransformer = createCsvRawTransformer(csvTransformer);
  878. isAll = r->getResultIsAll();
  879. r->getResultRaw(result, rawXmlTransformer, rawCsvTransformer);
  880. );
  881. }
  882. virtual __int64 getResultInt(const char * name, unsigned sequence)
  883. {
  884. PROTECTED_GETRESULT(name, sequence, "Int", "integer",
  885. return r->getResultInt();
  886. );
  887. }
  888. virtual double getResultReal(const char * name, unsigned sequence)
  889. {
  890. PROTECTED_GETRESULT(name, sequence, "Real", "real",
  891. return r->getResultReal();
  892. );
  893. }
  894. virtual void getResultString(unsigned & tlen, char * & tgt, const char * stepname, unsigned sequence)
  895. {
  896. PROTECTED_GETRESULT(stepname, sequence, "String", "string",
  897. SCMStringBuffer result;
  898. r->getResultString(result);
  899. tlen = result.length();
  900. tgt = (char *)result.s.detach();
  901. );
  902. }
  903. virtual void getResultStringF(unsigned tlen, char * tgt, const char * stepname, unsigned sequence)
  904. {
  905. PROTECTED_GETRESULT(stepname, sequence, "String", "string",
  906. SCMStringBuffer result;
  907. r->getResultString(result);
  908. rtlStrToStr(tlen, tgt, result.length(), result.s.str());
  909. );
  910. }
  911. virtual void getResultUnicode(unsigned & tlen, UChar * & tgt, const char * stepname, unsigned sequence)
  912. {
  913. PROTECTED_GETRESULT(stepname, sequence, "Unicode", "unicode",
  914. MemoryBuffer result;
  915. r->getResultUnicode(MemoryBuffer2IDataVal(result));
  916. tlen = result.length()/2;
  917. tgt = (UChar *)malloc(tlen*2);
  918. memcpy(tgt, result.toByteArray(), tlen*2);
  919. );
  920. }
  921. virtual char * getResultVarString(const char * stepname, unsigned sequence)
  922. {
  923. PROTECTED_GETRESULT(stepname, sequence, "VarString", "string",
  924. SCMStringBuffer result;
  925. r->getResultString(result);
  926. return result.s.detach();
  927. );
  928. }
  929. virtual UChar * getResultVarUnicode(const char * stepname, unsigned sequence)
  930. {
  931. PROTECTED_GETRESULT(stepname, sequence, "VarUnicode", "unicode",
  932. MemoryBuffer result;
  933. r->getResultUnicode(MemoryBuffer2IDataVal(result));
  934. unsigned tlen = result.length()/2;
  935. result.append((UChar)0);
  936. return (UChar *)result.detach();
  937. );
  938. }
  939. virtual unsigned getResultHash(const char * name, unsigned sequence)
  940. {
  941. PROTECTED_GETRESULT(name, sequence, "Hash", "hash",
  942. return r->getResultHash();
  943. );
  944. }
  945. virtual void getResultRowset(size32_t & tcount, byte * * & tgt, const char * stepname, unsigned sequence, IEngineRowAllocator * _rowAllocator, IOutputRowDeserializer * deserializer, bool isGrouped, IXmlToRowTransformer * xmlTransformer, ICsvToRowTransformer * csvTransformer)
  946. {
  947. tgt = NULL;
  948. PROTECTED_GETRESULT(stepname, sequence, "Rowset", "rowset",
  949. MemoryBuffer datasetBuffer;
  950. MemoryBuffer2IDataVal result(datasetBuffer);
  951. Owned<IXmlToRawTransformer> rawXmlTransformer = createXmlRawTransformer(xmlTransformer);
  952. Owned<ICsvToRawTransformer> rawCsvTransformer = createCsvRawTransformer(csvTransformer);
  953. r->getResultRaw(result, rawXmlTransformer, rawCsvTransformer);
  954. rtlDataset2RowsetX(tcount, tgt, _rowAllocator, deserializer, datasetBuffer.length(), datasetBuffer.toByteArray(), isGrouped);
  955. );
  956. }
  957. virtual void getExternalResultRaw(unsigned & tlen, void * & tgt, const char * wuid, const char * stepname, unsigned sequence, IXmlToRowTransformer * xmlTransformer, ICsvToRowTransformer * csvTransformer)
  958. {
  959. tgt = NULL;
  960. try
  961. {
  962. LOG(MCdebugProgress, unknownJob, "getExternalResultRaw %s", stepname);
  963. Variable2IDataVal result(&tlen, &tgt);
  964. Owned<IConstWUResult> r = getExternalResult(wuid, stepname, sequence);
  965. Owned<IXmlToRawTransformer> rawXmlTransformer = createXmlRawTransformer(xmlTransformer);
  966. Owned<ICsvToRawTransformer> rawCsvTransformer = createCsvRawTransformer(csvTransformer);
  967. r->getResultRaw(result, rawXmlTransformer, rawCsvTransformer);
  968. }
  969. catch (CATCHALL)
  970. {
  971. throw MakeStringException(TE_FailedToRetrieveWorkunitValue, "Failed to retrieve external data value %s from workunit %s", stepname, wuid);
  972. }
  973. }
  974. virtual __int64 countDiskFile(const char * name, unsigned recordSize)
  975. {
  976. unsigned __int64 size = 0;
  977. Owned<IDistributedFile> f = queryThorFileManager().lookup(job, name);
  978. if (f)
  979. {
  980. size = f->getFileSize(true,false);
  981. if (size % recordSize)
  982. throw MakeStringException(9001, "File %s has size %"I64F"d which is not a multiple of record size %d", name, size, recordSize);
  983. return size / recordSize;
  984. }
  985. DBGLOG("Error could not resolve file %s", name);
  986. throw MakeStringException(9003, "Error could not resolve %s", name);
  987. }
  988. virtual __int64 countIndex(__int64 activityId, IHThorCountIndexArg & arg) { UNIMPLEMENTED; }
  989. virtual __int64 countDiskFile(__int64 id, IHThorCountFileArg & arg)
  990. {
  991. // would have called the above function in a try block but corrupted registers whenever I tried.
  992. Owned<IHThorCountFileArg> a = &arg; // make sure it gets destroyed....
  993. arg.onCreate(this, NULL, NULL);
  994. arg.onStart(NULL, NULL);
  995. const char *name = arg.getFileName();
  996. Owned<IDistributedFile> f = queryThorFileManager().lookup(job, name, 0 != ((TDXtemporary|TDXjobtemp) & arg.getFlags()));
  997. if (f)
  998. {
  999. IOutputMetaData * rs = arg.queryRecordSize();
  1000. assertex(rs->isFixedSize());
  1001. unsigned recordSize = rs->getMinRecordSize();
  1002. unsigned __int64 size = f->getFileSize(true,false);
  1003. if (size % recordSize)
  1004. {
  1005. throw MakeStringException(0, "Physical file %s has size %"I64F"d which is not a multiple of record size %d", name, size, recordSize);
  1006. }
  1007. return size / recordSize;
  1008. }
  1009. else if (arg.getFlags() & TDRoptional)
  1010. return 0;
  1011. else
  1012. {
  1013. PrintLog("Error could not resolve file %s", name);
  1014. throw MakeStringException(0, "Error could not resolve %s", name);
  1015. }
  1016. }
  1017. virtual void addWuException(const char * text, unsigned code, unsigned severity)
  1018. {
  1019. DBGLOG("%s", text);
  1020. try
  1021. {
  1022. Owned<IWorkUnit> w = updateWorkUnit();
  1023. Owned<IWUException> we = w->createException();
  1024. we->setSeverity((WUExceptionSeverity)severity);
  1025. we->setExceptionMessage(text);
  1026. we->setExceptionSource("user");
  1027. if (code)
  1028. we->setExceptionCode(code);
  1029. }
  1030. catch (IException *E)
  1031. {
  1032. StringBuffer m;
  1033. E->errorMessage(m);
  1034. DBGLOG("Unable to record exception in workunit: %s", m.str());
  1035. E->Release();
  1036. }
  1037. catch (...)
  1038. {
  1039. DBGLOG("Unable to record exception in workunit: unknown exception");
  1040. }
  1041. }
  1042. virtual void addWuAssertFailure(unsigned code, const char * text, const char * filename, unsigned lineno, unsigned column, bool isAbort)
  1043. {
  1044. DBGLOG("%s", text);
  1045. try
  1046. {
  1047. Owned<IWorkUnit> w = updateWorkUnit();
  1048. addExceptionToWorkunit(w, ExceptionSeverityError, "user", code, text, filename, lineno, column);
  1049. }
  1050. catch (IException *E)
  1051. {
  1052. StringBuffer m;
  1053. E->errorMessage(m);
  1054. DBGLOG("Unable to record exception in workunit: %s", m.str());
  1055. E->Release();
  1056. }
  1057. catch (...)
  1058. {
  1059. DBGLOG("Unable to record exception in workunit: unknown exception");
  1060. }
  1061. if (isAbort)
  1062. rtlFailOnAssert(); // minimal implementation
  1063. }
  1064. virtual unsigned __int64 getFileOffset(const char *logicalName) { assertex(false); return 0; }
  1065. virtual unsigned getRecoveringCount() { UNIMPLEMENTED; } // don't know how to implement here!
  1066. virtual unsigned getNodes() { return job.queryJobGroup().ordinality()-1; }
  1067. virtual unsigned getNodeNum() { throw MakeThorException(0, "Unsupported. getNodeNum() called in master"); return (unsigned)-1; }
  1068. virtual char *getFilePart(const char *logicalName, bool create=false) { assertex(false); return NULL; }
  1069. virtual unsigned __int64 getDatasetHash(const char * name, unsigned __int64 hash)
  1070. {
  1071. unsigned checkSum = 0;
  1072. Owned<IDistributedFile> iDfsFile = queryThorFileManager().lookup(job, name, false, true);
  1073. if (iDfsFile.get())
  1074. {
  1075. if (iDfsFile->getFileCheckSum(checkSum))
  1076. hash ^= checkSum;
  1077. else
  1078. {
  1079. StringBuffer modifiedStr;
  1080. if (iDfsFile->queryAttributes().getProp("@modified", modifiedStr))
  1081. hash = rtlHash64Data(modifiedStr.length(), modifiedStr.str(), hash);
  1082. // JCS->GH - what's the best thing to do here, if [for some reason] neither are available..
  1083. }
  1084. }
  1085. return hash;
  1086. }
  1087. virtual IDistributedFileTransaction *querySuperFileTransaction()
  1088. {
  1089. if (!superfiletransaction.get())
  1090. superfiletransaction.setown(createDistributedFileTransaction(userDesc));
  1091. return superfiletransaction.get();
  1092. }
  1093. virtual char *getJobName()
  1094. {
  1095. SCMStringBuffer out;
  1096. workunit->getJobName(out);
  1097. return out.s.detach();
  1098. }
  1099. virtual char *getClusterName()
  1100. {
  1101. SCMStringBuffer out;
  1102. workunit->getClusterName(out);
  1103. return out.s.detach();
  1104. }
  1105. virtual char *getGroupName()
  1106. {
  1107. StringBuffer out;
  1108. if (globals)
  1109. globals->getProp("@nodeGroup",out);
  1110. return out.detach();
  1111. }
  1112. // ICodeContextExt impl.
  1113. virtual IConstWUResult *getExternalResult(const char * wuid, const char *name, unsigned sequence)
  1114. {
  1115. Owned<IWorkUnitFactory> factory = getWorkUnitFactory();
  1116. Owned<IConstWorkUnit> externalWU = factory->openWorkUnit(wuid, false);
  1117. externalWU->remoteCheckAccess(userDesc, false);
  1118. return getWorkUnitResult(externalWU, name, sequence);
  1119. }
  1120. virtual IConstWUResult *getResultForGet(const char *name, unsigned sequence)
  1121. {
  1122. Owned<IConstWUResult> r = getResult(name, sequence);
  1123. if (!r || (r->getResultStatus() == ResultStatusUndefined))
  1124. {
  1125. StringBuffer s;
  1126. throw MakeStringException(TE_FailedToRetrieveWorkunitValue, "value %s in workunit is undefined", getResultText(s,name,sequence));
  1127. }
  1128. return r.getClear();
  1129. }
  1130. };
  1131. /////////////
  1132. //
  1133. // CJobMaster
  1134. //
  1135. void loadPlugin(SafePluginMap *pluginMap, const char *_path, const char *name, const char *version)
  1136. {
  1137. StringBuffer path(_path);
  1138. path.append(name);
  1139. OwnedIFile iFile = createIFile(path.str());
  1140. if (!iFile->exists())
  1141. throw MakeThorException(0, "Plugin %s not found at %s", name, path.str());
  1142. pluginMap->addPlugin(path.str(), name); // throws if unavailable/fails to load
  1143. Owned<ILoadedDllEntry> so = pluginMap->getPluginDll(name, version, true);
  1144. if (NULL == so.get()) // JCSMORE - could perhaps do with a more direct way of asking.
  1145. throw MakeThorException(0, "Incompatible plugin (%s). Version %s unavailable", name, version);
  1146. }
  1147. CJobMaster::CJobMaster(IConstWorkUnit &_workunit, const char *graphName, const char *_querySo, bool _sendSo, const SocketEndpoint &_agentEp)
  1148. : CJobBase(graphName), workunit(&_workunit), sendSo(_sendSo), agentEp(_agentEp)
  1149. {
  1150. SCMStringBuffer _token, _wuid, _user, _scope;
  1151. workunit->getWuid(_wuid);
  1152. workunit->getUser(_user);
  1153. workunit->getScope(_scope);
  1154. workunit->getSecurityToken(_token);
  1155. wuid.append(_wuid.str());
  1156. user.append(_user.str());
  1157. token.append(_token.str());
  1158. scope.append(_scope.str());
  1159. globalMemorySize = globals->getPropInt("@masterMemorySize", globals->getPropInt("@globalMemorySize")); // in MB
  1160. init();
  1161. resumed = WUActionResume == workunit->getAction();
  1162. fatalHandler.setown(new CFatalHandler(globals->getPropInt("@fatal_timeout", FATAL_TIMEOUT)));
  1163. querySent = false;
  1164. nodeDiskUsageCached = false;
  1165. StringBuffer pluginsDir;
  1166. globals->getProp("@pluginsPath", pluginsDir);
  1167. if (pluginsDir.length())
  1168. addPathSepChar(pluginsDir);
  1169. Owned<IConstWUPluginIterator> pluginIter = &workunit->getPlugins();
  1170. ForEach(*pluginIter)
  1171. {
  1172. IConstWUPlugin &plugin = pluginIter->query();
  1173. if (plugin.getPluginHole() || plugin.getPluginThor()) // JCSMORE ..Hole..
  1174. {
  1175. SCMStringBuffer name, version;
  1176. plugin.getPluginName(name);
  1177. plugin.getPluginVersion(version);
  1178. loadPlugin(pluginMap, pluginsDir.str(), name.str(), version.str());
  1179. }
  1180. }
  1181. querySo.setown(createDllEntry(_querySo, false, NULL));
  1182. codeCtx = new CThorCodeContextMaster(*this, *workunit, *querySo, *userDesc);
  1183. mpJobTag = allocateMPTag();
  1184. slavemptag = allocateMPTag();
  1185. slaveMsgHandler = new CSlaveMessageHandler(*this, slavemptag);
  1186. tmpHandler.setown(createTempHandler(true));
  1187. }
  1188. CJobMaster::~CJobMaster()
  1189. {
  1190. clean();
  1191. if (slaveMsgHandler)
  1192. delete slaveMsgHandler;
  1193. freeMPTag(mpJobTag);
  1194. freeMPTag(slavemptag);
  1195. tmpHandler.clear();
  1196. }
  1197. static IException *createBCastException(unsigned slave, const char *errorMsg)
  1198. {
  1199. // think this should always be fatal, could check link down here, or in general and flag as _shutdown.
  1200. StringBuffer msg("General failure communicating to slave");
  1201. if (slave)
  1202. msg.append("(").append(slave).append(") ");
  1203. else
  1204. msg.append("s ");
  1205. Owned<IThorException> e = MakeThorException(0, "%s", msg.append(" [").append(errorMsg).append("]").str());
  1206. e->setAction(tea_shutdown);
  1207. return e.getClear();
  1208. }
  1209. void CJobMaster::broadcastToSlaves(CMessageBuffer &msg, mptag_t mptag, unsigned timeout, const char *errorMsg, mptag_t *_replyTag, bool sendOnly)
  1210. {
  1211. mptag_t replyTag = createReplyTag();
  1212. msg.setReplyTag(replyTag);
  1213. if (globals->getPropBool("@broadcastSendAsync", true)) // only here in case of problems/debugging.
  1214. {
  1215. class CSendAsyncfor : public CAsyncFor
  1216. {
  1217. CJobMaster &job;
  1218. CMessageBuffer &msg;
  1219. mptag_t mptag;
  1220. unsigned timeout;
  1221. StringAttr errorMsg;
  1222. public:
  1223. CSendAsyncfor(CJobMaster &_job, CMessageBuffer &_msg, mptag_t _mptag, unsigned _timeout, const char *_errorMsg)
  1224. : job(_job), msg(_msg), mptag(_mptag), timeout(_timeout), errorMsg(_errorMsg)
  1225. {
  1226. }
  1227. void Do(unsigned i)
  1228. {
  1229. if (!job.queryJobComm().send(msg, i+1, mptag, timeout))
  1230. throw createBCastException(i+1, errorMsg);
  1231. }
  1232. } afor(*this, msg, mptag, timeout, errorMsg);
  1233. try
  1234. {
  1235. afor.For(querySlaves(), querySlaves());
  1236. }
  1237. catch (IException *e)
  1238. {
  1239. EXCLOG(e, "broadcastSendAsync");
  1240. abort(e);
  1241. throw;
  1242. }
  1243. }
  1244. else if (!queryJobComm().send(msg, RANK_ALL_OTHER, mptag, timeout))
  1245. {
  1246. Owned<IException> e = createBCastException(0, errorMsg);
  1247. EXCLOG(e, NULL);
  1248. abort(e);
  1249. throw e.getClear();
  1250. }
  1251. if (sendOnly) return;
  1252. if (_replyTag)
  1253. *_replyTag = replyTag;
  1254. unsigned respondents = 0;
  1255. Owned<IBitSet> bitSet = createBitSet();
  1256. loop
  1257. {
  1258. rank_t sender;
  1259. CMessageBuffer msg;
  1260. if (!queryJobComm().recv(msg, RANK_ALL, replyTag, &sender, LONGTIMEOUT))
  1261. {
  1262. if (_replyTag) _replyTag = NULL;
  1263. StringBuffer tmpStr;
  1264. if (errorMsg)
  1265. tmpStr.append(": ").append(errorMsg).append(" - ");
  1266. tmpStr.append("Timeout receiving from slaves - no reply from: [");
  1267. unsigned s = bitSet->scan(0, false);
  1268. assertex(s<querySlaves()); // must be at least one
  1269. tmpStr.append(s+1);
  1270. loop
  1271. {
  1272. s = bitSet->scan(s+1, false);
  1273. if (s>=querySlaves())
  1274. break;
  1275. tmpStr.append(",").append(s+1);
  1276. }
  1277. tmpStr.append("]");
  1278. Owned<IException> e = MakeThorFatal(NULL, 0, " %s", tmpStr.str());
  1279. EXCLOG(e, NULL);
  1280. throw e.getClear();
  1281. }
  1282. if (_replyTag) _replyTag = NULL;
  1283. bool error;
  1284. msg.read(error);
  1285. if (error)
  1286. {
  1287. Owned<IThorException> e = deserializeThorException(msg);
  1288. e->setSlave(sender);
  1289. throw e.getClear();
  1290. }
  1291. ++respondents;
  1292. bitSet->set((unsigned)sender-1);
  1293. if (respondents == querySlaveGroup().ordinality())
  1294. break;
  1295. }
  1296. }
  1297. CGraphBase *CJobMaster::createGraph()
  1298. {
  1299. return new CMasterGraph(*this);
  1300. }
  1301. void CJobMaster::initNodeDUCache()
  1302. {
  1303. if (!nodeDiskUsageCached)
  1304. {
  1305. nodeDiskUsageCached = true;
  1306. Owned<IPropertyTreeIterator> fileIter = &workunit->getFileIterator();
  1307. ForEach (*fileIter)
  1308. {
  1309. Owned<IDistributedFile> f = queryDistributedFileDirectory().lookup(fileIter->query().queryProp("@name"), userDesc);
  1310. if (f)
  1311. {
  1312. unsigned n = f->numParts();
  1313. for (unsigned i=0;i<n;i++)
  1314. {
  1315. Owned<IDistributedFilePart> part = f->getPart(i);
  1316. offset_t sz = part->getFileSize(false, false);
  1317. if (i>=nodeDiskUsage.ordinality())
  1318. nodeDiskUsage.append(sz);
  1319. else
  1320. {
  1321. sz += nodeDiskUsage.item(i);
  1322. nodeDiskUsage.add(sz, i);
  1323. }
  1324. }
  1325. }
  1326. }
  1327. }
  1328. }
  1329. IPropertyTree *CJobMaster::prepareWorkUnitInfo()
  1330. {
  1331. Owned<IPropertyTree> workUnitInfo = createPTree("workUnitInfo");
  1332. workUnitInfo->setProp("wuid", wuid);
  1333. workUnitInfo->setProp("user", user);
  1334. workUnitInfo->setProp("token", token);
  1335. workUnitInfo->setProp("scope", scope);
  1336. Owned<IConstWUPluginIterator> pluginIter = &queryWorkUnit().getPlugins();
  1337. IPropertyTree *plugins = NULL;
  1338. ForEach(*pluginIter)
  1339. {
  1340. IConstWUPlugin &thisplugin = pluginIter->query();
  1341. if (thisplugin.getPluginThor() || thisplugin.getPluginHole()) // JCSMORE ..Hole..
  1342. {
  1343. if (!plugins)
  1344. plugins = workUnitInfo->addPropTree("plugins", createPTree());
  1345. SCMStringBuffer name;
  1346. thisplugin.getPluginName(name);
  1347. IPropertyTree *plugin = plugins->addPropTree("plugin", createPTree());
  1348. plugin->setProp("@name", name.str());
  1349. }
  1350. }
  1351. IPropertyTree *debug = workUnitInfo->addPropTree("Debug", createPTree(ipt_caseInsensitive));
  1352. SCMStringBuffer debugStr, valueStr;
  1353. Owned<IStringIterator> debugIter = &queryWorkUnit().getDebugValues();
  1354. ForEach (*debugIter)
  1355. {
  1356. debugIter->str(debugStr);
  1357. queryWorkUnit().getDebugValue(debugStr.str(), valueStr);
  1358. debug->setProp(debugStr.str(), valueStr.str());
  1359. }
  1360. return workUnitInfo.getClear();
  1361. }
  1362. void CJobMaster::sendQuery()
  1363. {
  1364. CriticalBlock b(sendQueryCrit);
  1365. if (querySent) return;
  1366. CMessageBuffer tmp;
  1367. tmp.append(mpJobTag);
  1368. tmp.append(slavemptag);
  1369. tmp.append(queryWuid());
  1370. tmp.append(graphName);
  1371. const char *soName = queryDllEntry().queryName();
  1372. PROGLOG("Query dll: %s", soName);
  1373. tmp.append(soName);
  1374. tmp.append(sendSo);
  1375. if (sendSo)
  1376. {
  1377. CTimeMon atimer;
  1378. OwnedIFile iFile = createIFile(soName);
  1379. OwnedIFileIO iFileIO = iFile->open(IFOread);
  1380. size32_t sz = (size32_t)iFileIO->size();
  1381. tmp.append(sz);
  1382. read(iFileIO, 0, sz, tmp);
  1383. PROGLOG("Loading query for serialization to slaves took %d ms", atimer.elapsed());
  1384. }
  1385. Owned<IPropertyTree> deps = createPTree(queryXGMML()->queryName());
  1386. Owned<IPropertyTreeIterator> edgeIter = queryXGMML()->getElements("edge"); // JCSMORE trim to those actually needed
  1387. ForEach (*edgeIter)
  1388. {
  1389. IPropertyTree &edge = edgeIter->query();
  1390. deps->addPropTree("edge", LINK(&edge));
  1391. }
  1392. Owned<IPropertyTree> workUnitInfo = prepareWorkUnitInfo();
  1393. workUnitInfo->serialize(tmp);
  1394. deps->serialize(tmp);
  1395. CMessageBuffer msg;
  1396. msg.append(QueryInit);
  1397. compressToBuffer(msg, tmp.length(), tmp.toByteArray());
  1398. CTimeMon queryToSlavesTimer;
  1399. broadcastToSlaves(msg, masterSlaveMpTag, LONGTIMEOUT, "sendQuery");
  1400. PROGLOG("Serialization of query init info (%d bytes) to slaves took %d ms", msg.length(), queryToSlavesTimer.elapsed());
  1401. queryJobManager().addCachedSo(soName);
  1402. querySent = true;
  1403. }
  1404. void CJobMaster::jobDone()
  1405. {
  1406. if (!querySent) return;
  1407. CMessageBuffer msg;
  1408. msg.append(QueryDone);
  1409. msg.append(queryKey());
  1410. broadcastToSlaves(msg, masterSlaveMpTag, LONGTIMEOUT, "jobDone");
  1411. }
  1412. bool CJobMaster::go()
  1413. {
  1414. class CWorkunitAbortHandler : public CInterface, implements IThreaded
  1415. {
  1416. CJobMaster &job;
  1417. IConstWorkUnit &wu;
  1418. CThreaded threaded;
  1419. bool running;
  1420. Semaphore sem;
  1421. public:
  1422. CWorkunitAbortHandler(CJobMaster &_job, IConstWorkUnit &_wu)
  1423. : job(_job), wu(_wu), threaded("WorkunitAbortHandler")
  1424. {
  1425. running = true;
  1426. wu.subscribe(SubscribeOptionAbort);
  1427. threaded.init(this);
  1428. }
  1429. ~CWorkunitAbortHandler()
  1430. {
  1431. stop();
  1432. threaded.join();
  1433. }
  1434. virtual void main()
  1435. {
  1436. while (running)
  1437. {
  1438. if (sem.wait(5000))
  1439. break; // signalled aborted
  1440. if (wu.aborting())
  1441. {
  1442. LOG(MCwarning, thorJob, "ABORT detected from user");
  1443. Owned <IException> e = MakeThorException(TE_WorkUnitAborting, "User signalled abort");
  1444. job.fireException(e);
  1445. break;
  1446. }
  1447. }
  1448. }
  1449. void stop() { running = false; sem.signal(); }
  1450. } wuAbortHandler(*this, *workunit);
  1451. class CWorkunitPauseHandler : public CInterface, implements ISDSSubscription
  1452. {
  1453. CJobMaster &job;
  1454. IConstWorkUnit &wu;
  1455. SubscriptionId subId;
  1456. bool subscribed;
  1457. CriticalSection crit;
  1458. public:
  1459. IMPLEMENT_IINTERFACE;
  1460. CWorkunitPauseHandler(CJobMaster &_job, IConstWorkUnit &_wu) : job(_job), wu(_wu)
  1461. {
  1462. StringBuffer xpath("/WorkUnits/");
  1463. SCMStringBuffer istr;
  1464. wu.getWuid(istr);
  1465. xpath.append(istr.str()).append("/Action");
  1466. subId = querySDS().subscribe(xpath.str(), *this, false, true);
  1467. subscribed = true;
  1468. }
  1469. ~CWorkunitPauseHandler() { stop(); }
  1470. void stop()
  1471. {
  1472. CriticalBlock b(crit);
  1473. if (subscribed)
  1474. {
  1475. subscribed = false;
  1476. querySDS().unsubscribe(subId);
  1477. }
  1478. }
  1479. void notify(SubscriptionId id, const char *xpath, SDSNotifyFlags flags, unsigned valueLen, const void *valueData)
  1480. {
  1481. CriticalBlock b(crit);
  1482. if (!subscribed) return;
  1483. job.markWuDirty();
  1484. bool abort = false;
  1485. bool pause = false;
  1486. if (valueLen && valueLen==strlen("pause") && (0 == strncmp("pause", (const char *)valueData, valueLen)))
  1487. {
  1488. // pause after current subgraph
  1489. pause = true;
  1490. }
  1491. else if (valueLen && valueLen==strlen("pausenow") && (0 == strncmp("pausenow", (const char *)valueData, valueLen)))
  1492. {
  1493. // abort current subgraph
  1494. abort = true;
  1495. pause = true;
  1496. }
  1497. else
  1498. {
  1499. abort = pause = false;
  1500. }
  1501. if (pause)
  1502. {
  1503. PROGLOG("Pausing job%s", abort?" [now]":"");
  1504. job.stop(abort);
  1505. }
  1506. }
  1507. } workunitPauseHandler(*this, *workunit);
  1508. class CQueryTimeoutHandler : public CTimeoutTrigger
  1509. {
  1510. CJobMaster &job;
  1511. public:
  1512. CQueryTimeoutHandler(CJobMaster &_job, unsigned timeout) : CTimeoutTrigger(timeout, "QUERY"), job(_job)
  1513. {
  1514. inform(MakeThorException(TE_QueryTimeoutError, "Query took greater than %d seconds", timeout));
  1515. }
  1516. virtual bool action()
  1517. {
  1518. job.fireException(LINK(exception));
  1519. return true;
  1520. }
  1521. private:
  1522. graph_id graphId;
  1523. };
  1524. Owned<CTimeoutTrigger> qtHandler;
  1525. int guillotineTimeout = workunit->getDebugValueInt("maxRunTime", 0);
  1526. if (guillotineTimeout > 0)
  1527. qtHandler.setown(new CQueryTimeoutHandler(*this, guillotineTimeout));
  1528. else if (guillotineTimeout < 0)
  1529. {
  1530. Owned<IException> e = MakeStringException(0, "Ignoring negative maxRunTime: %d", guillotineTimeout);
  1531. reportExceptionToWorkunit(*workunit, e);
  1532. }
  1533. if (WUActionPause == workunit->getAction() || WUActionPauseNow == workunit->getAction())
  1534. throw MakeStringException(0, "Job paused at start, exiting");
  1535. Owned<IConstWUGraphProgress> graphProgress = getGraphProgress();
  1536. bool allDone = true;
  1537. unsigned concurrentSubGraphs = (unsigned)getWorkUnitValueInt("concurrentSubGraphs", globals->getPropInt("@concurrentSubGraphs", 1));
  1538. try
  1539. {
  1540. ClearTempDirs();
  1541. Owned<IWUGraphProgress> progress = graphProgress->update();
  1542. progress->setGraphState(WUGraphRunning);
  1543. progress.clear();
  1544. Owned<IThorGraphIterator> iter = getSubGraphs();
  1545. CopyCIArrayOf<CMasterGraph> toRun;
  1546. ForEach(*iter)
  1547. {
  1548. CMasterGraph &graph = (CMasterGraph &)iter->query();
  1549. if ((queryResumed() || queryUseCheckpoints()) && WUGraphComplete == graphProgress->queryNodeState(graph.queryGraphId()))
  1550. graph.setCompleteEx();
  1551. else
  1552. toRun.append(graph);
  1553. }
  1554. graphProgress.clear();
  1555. ForEachItemInRev(g, toRun)
  1556. {
  1557. if (aborted) break;
  1558. CMasterGraph &graph = toRun.item(g);
  1559. if (graph.isSink())
  1560. graph.execute(0, NULL, true, concurrentSubGraphs>1);
  1561. if (queryPausing()) break;
  1562. }
  1563. graphExecutor->wait();
  1564. workunitPauseHandler.stop();
  1565. ForEachItemIn(tr, toRun)
  1566. {
  1567. CMasterGraph &graph = toRun.item(tr);
  1568. if (!graph.isComplete())
  1569. {
  1570. allDone = false;
  1571. break;
  1572. }
  1573. }
  1574. }
  1575. catch (IException *e) { fireException(e); e->Release(); }
  1576. catch (CATCHALL) { Owned<IException> e = MakeThorException(0, "Unknown exception running sub graphs"); fireException(e); }
  1577. graphProgress.setown(getGraphProgress());
  1578. Owned<IWUGraphProgress> progress = graphProgress->update();
  1579. progress->setGraphState(aborted?WUGraphFailed:(allDone?WUGraphComplete:(pausing?WUGraphPaused:WUGraphComplete)));
  1580. progress.clear();
  1581. graphProgress.clear();
  1582. if (queryPausing())
  1583. {
  1584. assertex(!queryUseCheckpoints()); // JCSMORE - checkpoints probably need revisiting
  1585. // stash away spills ready for resume, make them owned by workunit in event of abort/delete
  1586. Owned<IFileUsageIterator> iter = queryTempHandler()->getIterator();
  1587. ForEach(*iter)
  1588. {
  1589. CFileUsageEntry &entry = iter->query();
  1590. StringAttr tmpName = entry.queryName();
  1591. Owned<IConstWUGraphProgress> graphProgress = getGraphProgress();
  1592. if (WUGraphComplete == graphProgress->queryNodeState(entry.queryGraphId()))
  1593. {
  1594. IArrayOf<IGroup> groups;
  1595. StringArray clusters;
  1596. fillClusterArray(*this, tmpName, clusters, groups);
  1597. Owned<IFileDescriptor> fileDesc = queryThorFileManager().create(*this, tmpName, clusters, groups, true, TDXtemporary|TDWnoreplicate);
  1598. fileDesc->queryProperties().setPropBool("@pausefile", true); // JCSMORE - mark to keep, may be able to distinguish via other means
  1599. IPropertyTree &props = fileDesc->queryProperties();
  1600. props.setPropBool("@owned", true);
  1601. bool blockCompressed=true; // JCSMORE, should come from helper really
  1602. if (blockCompressed)
  1603. props.setPropBool("@blockCompressed", true);
  1604. Owned<IDistributedFile> file = queryDistributedFileDirectory().createNew(fileDesc);
  1605. // NB: This is renaming/moving from temp path
  1606. StringBuffer newName;
  1607. queryThorFileManager().addScope(*this, tmpName, newName, true, true);
  1608. verifyex(file->renamePhysicalPartFiles(newName.str(), NULL, NULL, queryBaseDirectory()));
  1609. file->attach(newName,userDesc);
  1610. Owned<IWorkUnit> wu = &queryWorkUnit().lock();
  1611. wu->addFile(newName, &clusters, entry.queryUsage(), entry.queryKind(), queryGraphName());
  1612. }
  1613. }
  1614. }
  1615. Owned<IException> jobDoneException;
  1616. try { jobDone(); }
  1617. catch (IException *e)
  1618. {
  1619. EXCLOG(e, NULL);
  1620. jobDoneException.setown(e);
  1621. }
  1622. fatalHandler->clear();
  1623. queryTempHandler()->clearTemps();
  1624. slaveMsgHandler->stop();
  1625. if (jobDoneException.get())
  1626. throw LINK(jobDoneException);
  1627. return allDone;
  1628. }
  1629. void CJobMaster::stop(bool doAbort)
  1630. {
  1631. pausing = true;
  1632. if (doAbort)
  1633. {
  1634. queryJobManager().replyException(*this, NULL);
  1635. Owned<IException> e = MakeThorException(0, "Unable to recover from pausenow");
  1636. fatalHandler->inform(e.getClear());
  1637. abort(e);
  1638. }
  1639. }
  1640. __int64 CJobMaster::queryNodeDiskUsage(unsigned node)
  1641. {
  1642. initNodeDUCache();
  1643. if (!nodeDiskUsage.isItem(node)) return 0;
  1644. return nodeDiskUsage.item(node);
  1645. }
  1646. void CJobMaster::setNodeDiskUsage(unsigned node, __int64 sz)
  1647. {
  1648. initNodeDUCache();
  1649. while (nodeDiskUsage.ordinality() <= node)
  1650. nodeDiskUsage.append(0);
  1651. nodeDiskUsage.replace(sz, node);
  1652. }
  1653. __int64 CJobMaster::addNodeDiskUsage(unsigned node, __int64 sz)
  1654. {
  1655. sz += queryNodeDiskUsage(node);
  1656. setNodeDiskUsage(node, sz);
  1657. return sz;
  1658. }
  1659. bool CJobMaster::queryCreatedFile(const char *file)
  1660. {
  1661. StringBuffer scopedName;
  1662. queryThorFileManager().addScope(*this, file, scopedName, false);
  1663. return (NotFound != createdFiles.find(scopedName.str()));
  1664. }
  1665. void CJobMaster::addCreatedFile(const char *file)
  1666. {
  1667. StringBuffer scopedName;
  1668. queryThorFileManager().addScope(*this, file, scopedName, false);
  1669. createdFiles.append(scopedName.str());
  1670. }
  1671. __int64 CJobMaster::getWorkUnitValueInt(const char *prop, __int64 defVal) const
  1672. {
  1673. return queryWorkUnit().getDebugValueInt64(prop, defVal);
  1674. }
  1675. bool CJobMaster::getWorkUnitValueBool(const char *prop, bool defVal) const
  1676. {
  1677. return queryWorkUnit().getDebugValueBool(prop, defVal);
  1678. }
  1679. StringBuffer &CJobMaster::getWorkUnitValue(const char *prop, StringBuffer &str) const
  1680. {
  1681. SCMStringBuffer scmStr;
  1682. queryWorkUnit().getDebugValue(prop, scmStr);
  1683. return str.append(scmStr.str());
  1684. }
  1685. IBarrier *CJobMaster::createBarrier(mptag_t tag)
  1686. {
  1687. return new CBarrierMaster(*jobComm, tag);
  1688. }
  1689. IGraphTempHandler *CJobMaster::createTempHandler(bool errorOnMissing)
  1690. {
  1691. return new CMasterGraphTempHandler(*this, errorOnMissing);
  1692. }
  1693. bool CJobMaster::fireException(IException *e)
  1694. {
  1695. IThorException *te = QUERYINTERFACE(e, IThorException);
  1696. ThorExceptionAction action;
  1697. if (!te) action = tea_null;
  1698. else action = te->queryAction();
  1699. if (QUERYINTERFACE(e, IMP_Exception) && MPERR_link_closed==e->errorCode())
  1700. action = tea_shutdown;
  1701. else if (QUERYINTERFACE(e, ISEH_Exception))
  1702. action = tea_shutdown;
  1703. CriticalBlock b(exceptCrit);
  1704. switch (action)
  1705. {
  1706. case tea_warning:
  1707. LOG(MCwarning, thorJob, e);
  1708. reportExceptionToWorkunit(*workunit, e);
  1709. break;
  1710. default:
  1711. {
  1712. LOG(MCerror, thorJob, e);
  1713. queryJobManager().replyException(*this, e);
  1714. fatalHandler->inform(LINK(e));
  1715. try { abort(e); }
  1716. catch (IException *e)
  1717. {
  1718. Owned<IThorException> te = ThorWrapException(e, "Error aborting job, will cause thor restart");
  1719. e->Release();
  1720. reportExceptionToWorkunit(*workunit, te);
  1721. action = tea_shutdown;
  1722. }
  1723. if (tea_shutdown == action)
  1724. queryJobManager().stop();
  1725. }
  1726. }
  1727. return true;
  1728. }
  1729. ///////////////////
  1730. class CCollatedResult : public CSimpleInterface, implements IThorResult
  1731. {
  1732. CMasterGraph &graph;
  1733. CActivityBase &activity;
  1734. IRowInterfaces *rowIf;
  1735. unsigned id;
  1736. CriticalSection crit;
  1737. PointerArrayOf<CThorExpandingRowArray> results;
  1738. Owned<IThorResult> result;
  1739. unsigned spillPriority;
  1740. activity_id ownerId;
  1741. void ensure()
  1742. {
  1743. CriticalBlock b(crit);
  1744. if (result)
  1745. return;
  1746. mptag_t replyTag = createReplyTag();
  1747. CMessageBuffer msg;
  1748. msg.append(GraphGetResult);
  1749. msg.append(activity.queryJob().queryKey());
  1750. msg.append(graph.queryGraphId());
  1751. msg.append(ownerId);
  1752. msg.append(id);
  1753. msg.append(replyTag);
  1754. ((CJobMaster &)graph.queryJob()).broadcastToSlaves(msg, masterSlaveMpTag, LONGTIMEOUT, NULL, NULL, true);
  1755. unsigned numSlaves = graph.queryJob().querySlaves();
  1756. for (unsigned n=0; n<numSlaves; n++)
  1757. results.item(n)->kill();
  1758. rank_t sender;
  1759. MemoryBuffer mb;
  1760. Owned<ISerialStream> stream = createMemoryBufferSerialStream(mb);
  1761. CThorStreamDeserializerSource rowSource(stream);
  1762. unsigned todo = numSlaves;
  1763. loop
  1764. {
  1765. loop
  1766. {
  1767. if (activity.queryAbortSoon())
  1768. return;
  1769. msg.clear();
  1770. if (activity.receiveMsg(msg, RANK_ALL, replyTag, &sender, 60*1000))
  1771. break;
  1772. ActPrintLog(&activity, "WARNING: tag %d timedout, retrying", (unsigned)replyTag);
  1773. }
  1774. sender = sender - 1; // 0 = master
  1775. if (!msg.length())
  1776. {
  1777. --todo;
  1778. if (0 == todo)
  1779. break; // done
  1780. }
  1781. else
  1782. {
  1783. bool error;
  1784. msg.read(error);
  1785. if (error)
  1786. {
  1787. Owned<IThorException> e = deserializeThorException(msg);
  1788. e->setSlave(sender);
  1789. throw e.getClear();
  1790. }
  1791. ThorExpand(msg, mb.clear());
  1792. CThorExpandingRowArray *slaveResults = results.item(sender);
  1793. while (!rowSource.eos())
  1794. {
  1795. RtlDynamicRowBuilder rowBuilder(rowIf->queryRowAllocator());
  1796. size32_t sz = rowIf->queryRowDeserializer()->deserialize(rowBuilder, rowSource);
  1797. slaveResults->append(rowBuilder.finalizeRowClear(sz));
  1798. }
  1799. }
  1800. }
  1801. Owned<IThorResult> _result = ::createResult(activity, rowIf, false, spillPriority);
  1802. Owned<IRowWriter> resultWriter = _result->getWriter();
  1803. for (unsigned s=0; s<numSlaves; s++)
  1804. {
  1805. CThorExpandingRowArray *slaveResult = results.item(s);
  1806. ForEachItemIn(r, *slaveResult)
  1807. {
  1808. const void *row = slaveResult->query(r);
  1809. LinkThorRow(row);
  1810. resultWriter->putRow(row);
  1811. }
  1812. }
  1813. result.setown(_result.getClear());
  1814. }
  1815. public:
  1816. IMPLEMENT_IINTERFACE_USING(CSimpleInterface);
  1817. CCollatedResult(CMasterGraph &_graph, CActivityBase &_activity, IRowInterfaces *_rowIf, unsigned _id, activity_id _ownerId, unsigned _spillPriority)
  1818. : graph(_graph), activity(_activity), rowIf(_rowIf), id(_id), ownerId(_ownerId), spillPriority(_spillPriority)
  1819. {
  1820. for (unsigned n=0; n<graph.queryJob().querySlaves(); n++)
  1821. results.append(new CThorExpandingRowArray(activity, rowIf));
  1822. }
  1823. ~CCollatedResult()
  1824. {
  1825. ForEachItemIn(l, results)
  1826. {
  1827. CThorExpandingRowArray *result = results.item(l);
  1828. delete result;
  1829. }
  1830. }
  1831. void setId(unsigned _id)
  1832. {
  1833. id = _id;
  1834. }
  1835. // IThorResult
  1836. virtual IRowWriter *getWriter() { throwUnexpected(); }
  1837. virtual void setResultStream(IRowWriterMultiReader *stream, rowcount_t count)
  1838. {
  1839. throwUnexpected();
  1840. }
  1841. virtual IRowStream *getRowStream()
  1842. {
  1843. ensure();
  1844. return result->getRowStream();
  1845. }
  1846. virtual IRowInterfaces *queryRowInterfaces()
  1847. {
  1848. return rowIf;
  1849. }
  1850. virtual CActivityBase *queryActivity()
  1851. {
  1852. return &activity;
  1853. }
  1854. virtual bool isDistributed() const { return false; }
  1855. virtual void serialize(MemoryBuffer &mb)
  1856. {
  1857. ensure();
  1858. result->serialize(mb);
  1859. }
  1860. virtual void getResult(size32_t & retSize, void * & ret)
  1861. {
  1862. ensure();
  1863. return result->getResult(retSize, ret);
  1864. }
  1865. virtual void getLinkedResult(unsigned & count, byte * * & ret)
  1866. {
  1867. ensure();
  1868. return result->getLinkedResult(count, ret);
  1869. }
  1870. };
  1871. ///////////////////
  1872. //
  1873. // CMasterGraph impl.
  1874. //
  1875. CMasterGraph::CMasterGraph(CJobMaster &_job) : CGraphBase(_job), jobM(_job)
  1876. {
  1877. mpTag = job.allocateMPTag();
  1878. startBarrierTag = job.allocateMPTag();
  1879. waitBarrierTag = job.allocateMPTag();
  1880. startBarrier = job.createBarrier(startBarrierTag);
  1881. waitBarrier = job.createBarrier(waitBarrierTag);
  1882. bcastTag = TAG_NULL;
  1883. }
  1884. CMasterGraph::~CMasterGraph()
  1885. {
  1886. job.freeMPTag(mpTag);
  1887. job.freeMPTag(startBarrierTag);
  1888. job.freeMPTag(waitBarrierTag);
  1889. if (TAG_NULL != doneBarrierTag)
  1890. job.freeMPTag(doneBarrierTag);
  1891. if (TAG_NULL != executeReplyTag)
  1892. job.freeMPTag(executeReplyTag);
  1893. }
  1894. void CMasterGraph::init()
  1895. {
  1896. CGraphBase::init();
  1897. if (queryOwner() && isGlobal())
  1898. {
  1899. doneBarrierTag = job.allocateMPTag();
  1900. doneBarrier = job.createBarrier(doneBarrierTag);
  1901. }
  1902. }
  1903. bool CMasterGraph::fireException(IException *e)
  1904. {
  1905. IThorException *te = QUERYINTERFACE(e, IThorException);
  1906. ThorExceptionAction action;
  1907. if (!te) action = tea_null;
  1908. else action = te->queryAction();
  1909. if (QUERYINTERFACE(e, IMP_Exception) && MPERR_link_closed==e->errorCode())
  1910. action = tea_shutdown;
  1911. else if (QUERYINTERFACE(e, ISEH_Exception))
  1912. action = tea_shutdown;
  1913. CriticalBlock b(exceptCrit);
  1914. switch (action)
  1915. {
  1916. case tea_warning:
  1917. LOG(MCwarning, thorJob, e);
  1918. reportExceptionToWorkunit(job.queryWorkUnit(), e);
  1919. break;
  1920. case tea_abort:
  1921. abort(e);
  1922. // fall through
  1923. default:
  1924. {
  1925. LOG(MCerror, thorJob, e);
  1926. if (NULL != fatalHandler)
  1927. fatalHandler->inform(LINK(e));
  1928. if (owner)
  1929. owner->fireException(e);
  1930. else
  1931. job.fireException(e);
  1932. }
  1933. }
  1934. return true;
  1935. }
  1936. void CMasterGraph::abort(IException *e)
  1937. {
  1938. if (aborted) return;
  1939. try{ CGraphBase::abort(e); }
  1940. catch (IException *e)
  1941. {
  1942. GraphPrintLog(e, "Aborting master graph");
  1943. e->Release();
  1944. }
  1945. if (TAG_NULL != bcastTag)
  1946. job.queryJobComm().cancel(0, bcastTag);
  1947. if (started)
  1948. {
  1949. try
  1950. {
  1951. CMessageBuffer msg;
  1952. msg.append(GraphAbort);
  1953. msg.append(job.queryKey());
  1954. msg.append(queryGraphId());
  1955. jobM.broadcastToSlaves(msg, masterSlaveMpTag, LONGTIMEOUT, "abort", &bcastTag);
  1956. }
  1957. catch (IException *e)
  1958. {
  1959. GraphPrintLog(e, "Aborting slave graph");
  1960. if (abortException)
  1961. throw LINK(abortException);
  1962. throw;
  1963. }
  1964. }
  1965. if (!queryOwner())
  1966. {
  1967. if (globals->getPropBool("@watchdogProgressEnabled"))
  1968. queryJobManager().queryDeMonServer()->endGraph(this, true);
  1969. }
  1970. }
  1971. void CMasterGraph::serializeCreateContexts(MemoryBuffer &mb)
  1972. {
  1973. CGraphBase::serializeCreateContexts(mb);
  1974. Owned<IThorActivityIterator> iter = (queryOwner() && !isGlobal()) ? getIterator() : getTraverseIterator();
  1975. ForEach (*iter)
  1976. {
  1977. CMasterGraphElement &element = (CMasterGraphElement &)iter->query();
  1978. if (reinit || !element.sentCreateCtx)
  1979. element.sentCreateCtx = true;
  1980. }
  1981. }
  1982. bool CMasterGraph::serializeActivityInitData(unsigned slave, MemoryBuffer &mb, IThorActivityIterator &iter)
  1983. {
  1984. CriticalBlock b(createdCrit);
  1985. DelayedSizeMarker sizeMark1(mb);
  1986. ForEach (iter)
  1987. {
  1988. CMasterGraphElement &element = (CMasterGraphElement &)iter.query();
  1989. if (!element.sentActInitData->testSet(slave))
  1990. {
  1991. CMasterActivity *activity = (CMasterActivity *)element.queryActivity();
  1992. if (activity)
  1993. {
  1994. mb.append(element.queryId());
  1995. DelayedSizeMarker sizeMark2(mb);
  1996. activity->serializeSlaveData(mb, slave);
  1997. sizeMark2.write();
  1998. }
  1999. }
  2000. }
  2001. if (0 == sizeMark1.size())
  2002. return false;
  2003. mb.append((activity_id)0); // terminator
  2004. sizeMark1.write();
  2005. return true;
  2006. }
  2007. void CMasterGraph::readActivityInitData(MemoryBuffer &mb, unsigned slave)
  2008. {
  2009. loop
  2010. {
  2011. activity_id id;
  2012. mb.read(id);
  2013. if (0 == id)
  2014. break;
  2015. size32_t dataLen;
  2016. mb.read(dataLen);
  2017. CGraphElementBase *element = queryElement(id);
  2018. MemoryBuffer &mbDst = element->queryActivity()->queryInitializationData(slave);
  2019. mbDst.append(dataLen, mb.readDirect(dataLen));
  2020. }
  2021. }
  2022. bool CMasterGraph::prepare(size32_t parentExtractSz, const byte *parentExtract, bool checkDependencies, bool shortCircuit, bool async)
  2023. {
  2024. if (!CGraphBase::prepare(parentExtractSz, parentExtract, checkDependencies, shortCircuit, async)) return false;
  2025. if (aborted) return false;
  2026. return true;
  2027. }
  2028. void CMasterGraph::create(size32_t parentExtractSz, const byte *parentExtract)
  2029. {
  2030. {
  2031. CriticalBlock b(createdCrit);
  2032. CGraphBase::create(parentExtractSz, parentExtract);
  2033. }
  2034. if (!aborted)
  2035. {
  2036. if (!queryOwner()) // owning graph sends query+child graphs
  2037. {
  2038. jobM.sendQuery(); // if not previously sent
  2039. if (globals->getPropBool("@watchdogProgressEnabled"))
  2040. queryJobManager().queryDeMonServer()->startGraph(this);
  2041. sendGraph(); // sends child graphs at same time
  2042. }
  2043. else
  2044. {
  2045. if (isGlobal())
  2046. {
  2047. ForEachItemIn(i, ifs)
  2048. {
  2049. CGraphElementBase &ifElem = ifs.item(i);
  2050. if (ifElem.newWhichBranch)
  2051. {
  2052. ifElem.newWhichBranch = false;
  2053. sentInitData = false; // force re-request of create data.
  2054. break;
  2055. }
  2056. }
  2057. CMessageBuffer msg;
  2058. if (reinit || !sentInitData)
  2059. {
  2060. sentInitData = true;
  2061. serializeCreateContexts(msg);
  2062. }
  2063. else
  2064. msg.append((unsigned)0);
  2065. try
  2066. {
  2067. jobM.broadcastToSlaves(msg, mpTag, LONGTIMEOUT, "serializeCreateContexts", &bcastTag);
  2068. }
  2069. catch (IException *e)
  2070. {
  2071. GraphPrintLog(e, "Aborting graph create(2)");
  2072. if (abortException)
  2073. throw LINK(abortException);
  2074. throw;
  2075. }
  2076. }
  2077. }
  2078. }
  2079. }
  2080. void CMasterGraph::start()
  2081. {
  2082. Owned<IThorActivityIterator> iter = getTraverseIterator();
  2083. ForEach (*iter)
  2084. iter->query().queryActivity()->startProcess();
  2085. }
  2086. void CMasterGraph::sendActivityInitData()
  2087. {
  2088. CMessageBuffer msg;
  2089. mptag_t replyTag = createReplyTag();
  2090. msg.setReplyTag(replyTag);
  2091. unsigned pos = msg.length();
  2092. unsigned w=0;
  2093. unsigned sentTo = 0;
  2094. for (; w<queryJob().querySlaves(); w++)
  2095. {
  2096. unsigned needActInit = 0;
  2097. Owned<IThorActivityIterator> iter = getTraverseIterator();
  2098. ForEach(*iter)
  2099. {
  2100. CGraphElementBase &element = iter->query();
  2101. CActivityBase *activity = element.queryActivity();
  2102. if (activity && activity->needReInit())
  2103. element.sentActInitData->set(w, false); // force act init to be resent
  2104. if (!element.sentActInitData->test(w)) // has it been sent
  2105. ++needActInit;
  2106. }
  2107. if (needActInit)
  2108. {
  2109. try
  2110. {
  2111. msg.rewrite(pos);
  2112. Owned<IThorActivityIterator> iter = getTraverseIterator();
  2113. serializeActivityInitData(w, msg, *iter);
  2114. }
  2115. catch (IException *e)
  2116. {
  2117. GraphPrintLog(e, NULL);
  2118. throw;
  2119. }
  2120. if (!job.queryJobComm().send(msg, w+1, mpTag, LONGTIMEOUT))
  2121. {
  2122. StringBuffer epStr;
  2123. throw MakeStringException(0, "Timeout sending to slave %s", job.querySlaveGroup().queryNode(w).endpoint().getUrlStr(epStr).str());
  2124. }
  2125. ++sentTo;
  2126. }
  2127. }
  2128. if (sentTo)
  2129. {
  2130. assertex(sentTo == queryJob().querySlaves());
  2131. w=0;
  2132. Owned<IException> e;
  2133. // now get back initialization data from graph tag
  2134. for (; w<queryJob().querySlaves(); w++)
  2135. {
  2136. rank_t sender;
  2137. msg.clear();
  2138. if (!job.queryJobComm().recv(msg, w+1, replyTag, &sender, LONGTIMEOUT))
  2139. throw MakeGraphException(this, 0, "Timeout receiving from slaves after graph sent");
  2140. bool error;
  2141. msg.read(error);
  2142. if (error)
  2143. {
  2144. Owned<IThorException> se = deserializeThorException(msg);
  2145. se->setSlave(sender);
  2146. if (!e.get())
  2147. {
  2148. StringBuffer tmpStr("Slave ");
  2149. queryJob().queryJobGroup().queryNode(sender).endpoint().getUrlStr(tmpStr);
  2150. GraphPrintLog(se, "%s", tmpStr.append(": slave initialization error").str());
  2151. e.setown(se.getClear());
  2152. }
  2153. continue; // to read other slave responses.
  2154. }
  2155. readActivityInitData(msg, w);
  2156. }
  2157. if (e.get())
  2158. throw LINK(e);
  2159. }
  2160. }
  2161. void CMasterGraph::serializeGraphInit(MemoryBuffer &mb)
  2162. {
  2163. mb.append(graphId);
  2164. mb.append(reinit);
  2165. serializeMPtag(mb, mpTag);
  2166. mb.append((int)startBarrierTag);
  2167. mb.append((int)waitBarrierTag);
  2168. mb.append((int)doneBarrierTag);
  2169. mb.append(queryChildGraphCount());
  2170. Owned<IThorGraphIterator> childIter = getChildGraphs();
  2171. ForEach (*childIter)
  2172. {
  2173. CMasterGraph &childGraph = (CMasterGraph &)childIter->query();
  2174. childGraph.serializeGraphInit(mb);
  2175. }
  2176. }
  2177. // IThorChildGraph impl.
  2178. IEclGraphResults *CMasterGraph::evaluate(unsigned _parentExtractSz, const byte *parentExtract)
  2179. {
  2180. throw MakeGraphException(this, 0, "Thor master does not support the execution of child queries");
  2181. }
  2182. void CMasterGraph::executeSubGraph(size32_t parentExtractSz, const byte *parentExtract)
  2183. {
  2184. if (job.queryResumed()) // skip complete subgraph if resuming. NB: temp spill have been tucked away for this purpose when paused.
  2185. {
  2186. if (!queryOwner())
  2187. {
  2188. Owned<IConstWUGraphProgress> graphProgress = ((CJobMaster &)job).getGraphProgress();
  2189. if (WUGraphComplete == graphProgress->queryNodeState(graphId))
  2190. setCompleteEx();
  2191. }
  2192. }
  2193. if (isComplete())
  2194. return;
  2195. fatalHandler.clear();
  2196. fatalHandler.setown(new CFatalHandler(globals->getPropInt("@fatal_timeout", FATAL_TIMEOUT)));
  2197. CGraphBase::executeSubGraph(parentExtractSz, parentExtract);
  2198. if (TAG_NULL != executeReplyTag)
  2199. {
  2200. rank_t sender;
  2201. unsigned s=0;
  2202. for (; s<queryJob().querySlaves(); s++)
  2203. {
  2204. CMessageBuffer msg;
  2205. if (!queryJob().queryJobComm().recv(msg, RANK_ALL, executeReplyTag, &sender))
  2206. break;
  2207. bool error;
  2208. msg.read(error);
  2209. if (error)
  2210. {
  2211. Owned<IThorException> exception = deserializeThorException(msg);
  2212. exception->setSlave(sender);
  2213. GraphPrintLog(exception, "slave execute reply exception");
  2214. throw exception.getClear();
  2215. }
  2216. }
  2217. if (fatalHandler)
  2218. fatalHandler->clear();
  2219. }
  2220. fatalHandler.clear();
  2221. }
  2222. void CMasterGraph::sendGraph()
  2223. {
  2224. CTimeMon atimer;
  2225. CMessageBuffer msg;
  2226. msg.append(GraphInit);
  2227. msg.append(job.queryKey());
  2228. node->serialize(msg); // everything
  2229. if (TAG_NULL == executeReplyTag)
  2230. executeReplyTag = queryJob().allocateMPTag();
  2231. serializeMPtag(msg, executeReplyTag);
  2232. serializeCreateContexts(msg);
  2233. serializeGraphInit(msg);
  2234. // slave graph data
  2235. try
  2236. {
  2237. jobM.broadcastToSlaves(msg, masterSlaveMpTag, LONGTIMEOUT, "sendGraph", &bcastTag);
  2238. }
  2239. catch (IException *e)
  2240. {
  2241. GraphPrintLog(e, "Aborting sendGraph");
  2242. if (abortException)
  2243. throw LINK(abortException);
  2244. throw;
  2245. }
  2246. GraphPrintLog("sendGraph took %d ms", atimer.elapsed());
  2247. }
  2248. bool CMasterGraph::preStart(size32_t parentExtractSz, const byte *parentExtract)
  2249. {
  2250. started = true;
  2251. GraphPrintLog("Processing graph");
  2252. if (!sentStartCtx || reinit)
  2253. {
  2254. sentStartCtx = true;
  2255. CMessageBuffer msg;
  2256. serializeStartContexts(msg);
  2257. try
  2258. {
  2259. jobM.broadcastToSlaves(msg, mpTag, LONGTIMEOUT, "startCtx", &bcastTag, true);
  2260. }
  2261. catch (IException *e)
  2262. {
  2263. GraphPrintLog(e, "Aborting preStart");
  2264. if (abortException)
  2265. throw LINK(abortException);
  2266. throw;
  2267. }
  2268. }
  2269. if (syncInitData())
  2270. sendActivityInitData(); // has to be done at least once
  2271. CGraphBase::preStart(parentExtractSz, parentExtract);
  2272. if (isGlobal())
  2273. {
  2274. if (!startBarrier->wait(false)) // ensure all graphs are at start point at same time, as some may request data from each other.
  2275. return false;
  2276. }
  2277. if (!queryOwner())
  2278. {
  2279. Owned<IConstWUGraphProgress> graphProgress = ((CJobMaster &)job).getGraphProgress();
  2280. Owned<IWUGraphProgress> progress = graphProgress->update();
  2281. progress->setNodeState(graphId, WUGraphRunning);
  2282. progress.clear();
  2283. }
  2284. return true;
  2285. }
  2286. void CMasterGraph::handleSlaveDone(unsigned node, MemoryBuffer &mb)
  2287. {
  2288. graph_id gid;
  2289. mb.read(gid);
  2290. assertex(gid == queryGraphId());
  2291. unsigned count;
  2292. mb.read(count);
  2293. while (count--)
  2294. {
  2295. activity_id activityId;
  2296. mb.read(activityId);
  2297. CMasterGraphElement *act = (CMasterGraphElement *)queryElement(activityId);
  2298. unsigned len;
  2299. mb.read(len);
  2300. const void *d = mb.readDirect(len);
  2301. MemoryBuffer sdMb;
  2302. sdMb.setBuffer(len, (void *)d);
  2303. act->slaveDone(node, sdMb);
  2304. }
  2305. }
  2306. void CMasterGraph::getFinalProgress()
  2307. {
  2308. offset_t totalDiskUsage = 0;
  2309. offset_t minNodeDiskUsage = 0, maxNodeDiskUsage = 0;
  2310. unsigned maxNode = (unsigned)-1, minNode = (unsigned)-1;
  2311. CMessageBuffer msg;
  2312. mptag_t replyTag = createReplyTag();
  2313. msg.setReplyTag(replyTag);
  2314. msg.append((unsigned)GraphEnd);
  2315. msg.append(job.queryKey());
  2316. msg.append(queryGraphId());
  2317. if (!job.queryJobComm().send(msg, RANK_ALL_OTHER, masterSlaveMpTag, LONGTIMEOUT))
  2318. throw MakeGraphException(this, 0, "Timeout sending to slaves");
  2319. unsigned n=queryJob().querySlaves();
  2320. while (n--)
  2321. {
  2322. rank_t sender;
  2323. if (!job.queryJobComm().recv(msg, RANK_ALL, replyTag, &sender, LONGTIMEOUT))
  2324. {
  2325. GraphPrintLog("Timeout receiving final progress from slaves, %d slaves did not respond", n+1);
  2326. return;
  2327. }
  2328. bool error;
  2329. msg.read(error);
  2330. if (error)
  2331. {
  2332. Owned<IThorException> e = deserializeThorException(msg);
  2333. e->setSlave(sender);
  2334. throw e.getClear();
  2335. }
  2336. if (0 == msg.remaining())
  2337. continue;
  2338. handleSlaveDone(sender-1, msg);
  2339. if (!queryOwner())
  2340. {
  2341. if (globals->getPropBool("@watchdogProgressEnabled"))
  2342. {
  2343. try
  2344. {
  2345. size32_t progressLen;
  2346. msg.read(progressLen);
  2347. MemoryBuffer progressData;
  2348. progressData.setBuffer(progressLen, (void *)msg.readDirect(progressLen));
  2349. const SocketEndpoint &ep = queryClusterGroup().queryNode(sender).endpoint();
  2350. queryJobManager().queryDeMonServer()->takeHeartBeat(ep, progressData);
  2351. }
  2352. catch (IException *e)
  2353. {
  2354. GraphPrintLog(e, "Failure whilst deserializing stats/progress");
  2355. e->Release();
  2356. }
  2357. }
  2358. }
  2359. offset_t nodeDiskUsage;
  2360. msg.read(nodeDiskUsage);
  2361. jobM.setNodeDiskUsage(n, nodeDiskUsage);
  2362. if (nodeDiskUsage > maxNodeDiskUsage)
  2363. {
  2364. maxNodeDiskUsage = nodeDiskUsage;
  2365. maxNode = n;
  2366. }
  2367. if ((unsigned)-1 == minNode || nodeDiskUsage < minNodeDiskUsage)
  2368. {
  2369. minNodeDiskUsage = nodeDiskUsage;
  2370. minNode = n;
  2371. }
  2372. totalDiskUsage += nodeDiskUsage;
  2373. Owned<ITimeReporter> slaveReport = createStdTimeReporter(msg);
  2374. queryJob().queryTimeReporter().merge(*slaveReport);
  2375. }
  2376. if (totalDiskUsage)
  2377. {
  2378. Owned<IWorkUnit> wu = &job.queryWorkUnit().lock();
  2379. wu->addDiskUsageStats(totalDiskUsage/queryJob().querySlaves(), minNode, minNodeDiskUsage, maxNode, maxNodeDiskUsage, queryGraphId());
  2380. }
  2381. }
  2382. void CMasterGraph::done()
  2383. {
  2384. if (started)
  2385. {
  2386. if (!aborted && (!queryOwner() || isGlobal()))
  2387. getFinalProgress(); // waiting for slave graph to finish and send final progress update + extra act end info.
  2388. }
  2389. CGraphBase::done();
  2390. if (started && !queryOwner())
  2391. {
  2392. if (globals->getPropBool("@watchdogProgressEnabled"))
  2393. queryJobManager().queryDeMonServer()->endGraph(this, true);
  2394. }
  2395. if (!queryOwner())
  2396. {
  2397. if (queryJob().queryTimeReporter().numSections())
  2398. {
  2399. if (globals->getPropBool("@reportTimingsToWorkunit", true))
  2400. {
  2401. struct CReport : implements ITimeReportInfo
  2402. {
  2403. Owned<IWorkUnit> wu;
  2404. CGraphBase &graph;
  2405. CReport(CGraphBase &_graph) : graph(_graph)
  2406. {
  2407. wu.setown(&graph.queryJob().queryWorkUnit().lock());
  2408. }
  2409. virtual void report(const char *name, const __int64 totaltime, const __int64 maxtime, const unsigned count)
  2410. {
  2411. StringBuffer timerStr(graph.queryJob().queryGraphName());
  2412. timerStr.append("(").append(graph.queryGraphId()).append("): ");
  2413. timerStr.append(name);
  2414. wu->setTimerInfo(timerStr.str(), NULL, (unsigned)totaltime, count, (unsigned)maxtime);
  2415. }
  2416. } wureport(*this);
  2417. queryJob().queryTimeReporter().report(wureport);
  2418. }
  2419. else
  2420. queryJob().queryTimeReporter().printTimings();
  2421. }
  2422. }
  2423. }
  2424. void CMasterGraph::setComplete(bool tf)
  2425. {
  2426. CGraphBase::setComplete(tf);
  2427. if (tf && !queryOwner())
  2428. {
  2429. Owned<IConstWUGraphProgress> graphProgress = ((CJobMaster &)job).getGraphProgress();
  2430. Owned<IWUGraphProgress> progress = graphProgress->update();
  2431. progress->setNodeState(graphId, graphDone?WUGraphComplete:WUGraphFailed);
  2432. progress.clear();
  2433. }
  2434. }
  2435. bool CMasterGraph::deserializeStats(unsigned node, MemoryBuffer &mb)
  2436. {
  2437. CriticalBlock b(createdCrit);
  2438. unsigned count, _count;
  2439. mb.read(count);
  2440. _count = count;
  2441. while (count--)
  2442. {
  2443. activity_id activityId;
  2444. mb.read(activityId);
  2445. CMasterActivity *activity = NULL;
  2446. CMasterGraphElement *element = (CMasterGraphElement *)queryElement(activityId);
  2447. if (element)
  2448. {
  2449. activity = (CMasterActivity *)element->queryActivity();
  2450. if (!activity)
  2451. {
  2452. CGraphBase *parentGraph = element->queryOwner().queryOwner(); // i.e. am I in a childgraph
  2453. if (!parentGraph)
  2454. {
  2455. GraphPrintLog("Activity id=%"ACTPF"d not created in master and not a child query activity", activityId);
  2456. return false; // don't know if or how this could happen, but all bets off with packet if did.
  2457. }
  2458. Owned<IException> e;
  2459. try
  2460. {
  2461. element->onCreate();
  2462. element->initActivity();
  2463. activity = (CMasterActivity *)element->queryActivity();
  2464. created = true; // means some activities created within this graph
  2465. }
  2466. catch (IException *_e) { e.setown(_e); GraphPrintLog(_e, NULL); }
  2467. if (!activity || e.get())
  2468. {
  2469. GraphPrintLog("Activity id=%"ACTPF"d failed to created child query activity ready for progress", activityId);
  2470. return false;
  2471. }
  2472. }
  2473. if (activity)
  2474. activity->deserializeStats(node, mb);
  2475. }
  2476. else
  2477. {
  2478. GraphPrintLog("Failed to find activity, during progress deserialization, id=%"ACTPF"d", activityId);
  2479. return false; // don't know if or how this could happen, but all bets off with packet if did.
  2480. }
  2481. }
  2482. unsigned subs, _subs;
  2483. mb.read(subs);
  2484. _subs = subs;
  2485. while (subs--)
  2486. {
  2487. graph_id subId;
  2488. mb.read(subId);
  2489. Owned<CMasterGraph> graph = (CMasterGraph *)job.getGraph(subId);
  2490. if (NULL == graph.get())
  2491. return false;
  2492. if (!graph->deserializeStats(node, mb))
  2493. return false;
  2494. }
  2495. return true;
  2496. }
  2497. IThorResult *CMasterGraph::createResult(CActivityBase &activity, unsigned id, IThorGraphResults *results, IRowInterfaces *rowIf, bool distributed, unsigned spillPriority)
  2498. {
  2499. Owned<CCollatedResult> result = new CCollatedResult(*this, activity, rowIf, id, results->queryOwnerId(), spillPriority);
  2500. results->setResult(id, result);
  2501. return result;
  2502. }
  2503. IThorResult *CMasterGraph::createResult(CActivityBase &activity, unsigned id, IRowInterfaces *rowIf, bool distributed, unsigned spillPriority)
  2504. {
  2505. Owned<CCollatedResult> result = new CCollatedResult(*this, activity, rowIf, id, localResults->queryOwnerId(), spillPriority);
  2506. localResults->setResult(id, result);
  2507. return result;
  2508. }
  2509. IThorResult *CMasterGraph::createGraphLoopResult(CActivityBase &activity, IRowInterfaces *rowIf, bool distributed, unsigned spillPriority)
  2510. {
  2511. Owned<CCollatedResult> result = new CCollatedResult(*this, activity, rowIf, 0, localResults->queryOwnerId(), spillPriority);
  2512. unsigned id = graphLoopResults->addResult(result);
  2513. result->setId(id);
  2514. return result;
  2515. }
  2516. ///////////////////////////////////////////////////
  2517. CThorStats::CThorStats(const char *_prefix)
  2518. {
  2519. unsigned c = queryClusterWidth();
  2520. while (c--) counts.append(0);
  2521. if (_prefix)
  2522. {
  2523. prefix.set(_prefix);
  2524. StringBuffer tmp;
  2525. labelMin.set(tmp.append(_prefix).append("Min"));
  2526. labelMax.set(tmp.clear().append(_prefix).append("Max"));
  2527. labelMinSkew.set(tmp.clear().append(_prefix).append("MinSkew"));
  2528. labelMaxSkew.set(tmp.clear().append(_prefix).append("MaxSkew"));
  2529. labelMinEndpoint.set(tmp.clear().append(_prefix).append("MinEndpoint"));
  2530. labelMaxEndpoint.set(tmp.clear().append(_prefix).append("MaxEndpoint"));
  2531. }
  2532. else
  2533. {
  2534. labelMin.set("min");
  2535. labelMax.set("max");
  2536. labelMinSkew.set("minskew");
  2537. labelMaxSkew.set("maxskew");
  2538. labelMinEndpoint.set("minEndpoint");
  2539. labelMaxEndpoint.set("maxEndpoint");
  2540. }
  2541. }
  2542. void CThorStats::set(unsigned node, unsigned __int64 count)
  2543. {
  2544. counts.replace(count, node);
  2545. }
  2546. void CThorStats::removeAttribute(IPropertyTree *node, const char *name)
  2547. {
  2548. StringBuffer aName("@");
  2549. node->removeProp(aName.append(name).str());
  2550. }
  2551. void CThorStats::addAttribute(IPropertyTree *node, const char *name, unsigned __int64 val)
  2552. {
  2553. StringBuffer aName("@");
  2554. node->setPropInt64(aName.append(name).str(), val);
  2555. }
  2556. void CThorStats::addAttribute(IPropertyTree *node, const char *name, const char *val)
  2557. {
  2558. StringBuffer aName("@");
  2559. node->setProp(aName.append(name).str(), val);
  2560. }
  2561. void CThorStats::reset()
  2562. {
  2563. tot = max = avg = 0;
  2564. min = (unsigned __int64) -1;
  2565. minNode = maxNode = hi = lo = maxNode = minNode = 0;
  2566. }
  2567. void CThorStats::processInfo()
  2568. {
  2569. reset();
  2570. ForEachItemIn(n, counts)
  2571. {
  2572. unsigned __int64 thiscount = counts.item(n);
  2573. tot += thiscount;
  2574. if (thiscount > max)
  2575. {
  2576. max = thiscount;
  2577. maxNode = n;
  2578. }
  2579. if (thiscount < min)
  2580. {
  2581. min = thiscount;
  2582. minNode = n;
  2583. }
  2584. }
  2585. if (max)
  2586. {
  2587. unsigned count = counts.ordinality();
  2588. avg = tot/count;
  2589. if (avg)
  2590. {
  2591. hi = (unsigned)((100 * (max-avg))/avg);
  2592. lo = (unsigned)((100 * (avg-min))/avg);
  2593. }
  2594. }
  2595. }
  2596. void CThorStats::getXGMML(IPropertyTree *node, bool suppressMinMaxWhenEqual)
  2597. {
  2598. processInfo();
  2599. if (suppressMinMaxWhenEqual && (hi == lo))
  2600. {
  2601. removeAttribute(node, labelMin);
  2602. removeAttribute(node, labelMax);
  2603. }
  2604. else
  2605. {
  2606. addAttribute(node, labelMin, min);
  2607. addAttribute(node, labelMax, max);
  2608. }
  2609. if (hi == lo)
  2610. {
  2611. removeAttribute(node, labelMinEndpoint);
  2612. removeAttribute(node, labelMaxEndpoint);
  2613. removeAttribute(node, labelMinSkew);
  2614. removeAttribute(node, labelMaxSkew);
  2615. }
  2616. else
  2617. {
  2618. addAttribute(node, labelMinSkew, lo);
  2619. addAttribute(node, labelMaxSkew, hi);
  2620. StringBuffer epStr;
  2621. addAttribute(node, labelMinEndpoint, querySlaveGroup().queryNode(minNode).endpoint().getUrlStr(epStr).str());
  2622. addAttribute(node, labelMaxEndpoint, querySlaveGroup().queryNode(maxNode).endpoint().getUrlStr(epStr.clear()).str());
  2623. }
  2624. }
  2625. ///////////////////////////////////////////////////
  2626. CTimingInfo::CTimingInfo() : CThorStats("time")
  2627. {
  2628. StringBuffer tmp;
  2629. labelMin.set(tmp.append(labelMin).append("Ms"));
  2630. labelMax.set(tmp.clear().append(labelMax).append("Ms"));
  2631. }
  2632. ///////////////////////////////////////////////////
  2633. void ProgressInfo::processInfo() // reimplement as counts have special flags (i.e. stop/start)
  2634. {
  2635. reset();
  2636. startcount = stopcount = 0;
  2637. ForEachItemIn(n, counts)
  2638. {
  2639. unsigned __int64 thiscount = counts.item(n);
  2640. if (thiscount & THORDATALINK_STARTED)
  2641. startcount++;
  2642. if (thiscount & THORDATALINK_STOPPED)
  2643. stopcount++;
  2644. thiscount = thiscount & THORDATALINK_COUNT_MASK;
  2645. tot += thiscount;
  2646. if (thiscount > max)
  2647. {
  2648. max = thiscount;
  2649. maxNode = n;
  2650. }
  2651. if (thiscount < min)
  2652. {
  2653. min = thiscount;
  2654. minNode = n;
  2655. }
  2656. }
  2657. if (max)
  2658. {
  2659. unsigned count = counts.ordinality();
  2660. avg = tot/count;
  2661. if (avg)
  2662. {
  2663. hi = (unsigned)((100 * (max-avg))/avg);
  2664. lo = (unsigned)((100 * (avg-min))/avg);
  2665. }
  2666. }
  2667. }
  2668. void ProgressInfo::getXGMML(IPropertyTree *node)
  2669. {
  2670. CThorStats::getXGMML(node, true);
  2671. addAttribute(node, "slaves", counts.ordinality());
  2672. addAttribute(node, "count", tot);
  2673. addAttribute(node, "started", startcount);
  2674. addAttribute(node, "stopped", stopcount);
  2675. }
  2676. ///////////////////////////////////////////////////
  2677. CJobMaster *createThorGraph(const char *graphName, IPropertyTree *xgmml, IConstWorkUnit &workunit, const char *querySo, bool sendSo, const SocketEndpoint &agentEp)
  2678. {
  2679. Owned<CJobMaster> masterJob = new CJobMaster(workunit, graphName, querySo, sendSo, agentEp);
  2680. masterJob->setXGMML(xgmml);
  2681. Owned<IPropertyTreeIterator> iter = xgmml->getElements("node");
  2682. ForEach(*iter)
  2683. {
  2684. IPropertyTree &node = iter->query();
  2685. Owned<CGraphBase> subGraph = masterJob->createGraph();
  2686. subGraph->createFromXGMML(&node, NULL, NULL, NULL);
  2687. masterJob->addSubGraph(*LINK(subGraph));
  2688. }
  2689. masterJob->addDependencies(xgmml);
  2690. return LINK(masterJob);
  2691. }
  2692. static IJobManager *jobManager = NULL;
  2693. void setJobManager(IJobManager *_jobManager)
  2694. {
  2695. CriticalBlock b(*jobManagerCrit);
  2696. jobManager = _jobManager;
  2697. }
  2698. IJobManager *getJobManager()
  2699. {
  2700. CriticalBlock b(*jobManagerCrit);
  2701. return LINK(jobManager);
  2702. }
  2703. IJobManager &queryJobManager()
  2704. {
  2705. CriticalBlock b(*jobManagerCrit);
  2706. assertex(jobManager);
  2707. return *jobManager;
  2708. }