thgraphmaster.cpp 87 KB


  1. /*##############################################################################
  2. Copyright (C) 2011 HPCC Systems.
  3. All rights reserved. This program is free software: you can redistribute it and/or modify
  4. it under the terms of the GNU Affero General Public License as
  5. published by the Free Software Foundation, either version 3 of the
  6. License, or (at your option) any later version.
  7. This program is distributed in the hope that it will be useful,
  8. but WITHOUT ANY WARRANTY; without even the implied warranty of
  9. MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  10. GNU Affero General Public License for more details.
  11. You should have received a copy of the GNU Affero General Public License
  12. along with this program. If not, see <http://www.gnu.org/licenses/>.
  13. ############################################################################## */
  14. #include <limits.h>
  15. #include <stdlib.h>
  16. #include "jprop.hpp"
  17. #include "jexcept.hpp"
  18. #include "jiter.ipp"
  19. #include "jsocket.hpp"
  20. #include "jset.hpp"
  21. #include "jsort.hpp"
  22. #include "portlist.h"
  23. #include "jhtree.hpp"
  24. #include "mputil.hpp"
  25. #include "dllserver.hpp"
  26. #include "dautils.hpp"
  27. #include "danqs.hpp"
  28. #include "daclient.hpp"
  29. #include "daaudit.hpp"
  30. #include "wujobq.hpp"
  31. #include "thorport.hpp"
  32. #include "commonext.hpp"
  33. #include "thorxmlread.hpp"
  34. #include "thorplugin.hpp"
  35. #include "thormisc.hpp"
  36. #include "thgraphmaster.ipp"
  37. #include "thdemonserver.hpp"
  38. #include "rtlds_imp.hpp"
  39. #include "eclhelper.hpp"
  40. #include "thexception.hpp"
  41. #include "thactivitymaster.ipp"
  42. static CriticalSection *jobManagerCrit;
  43. MODULE_INIT(INIT_PRIORITY_STANDARD)
  44. {
  45. jobManagerCrit = new CriticalSection;
  46. return true;
  47. }
  48. MODULE_EXIT()
  49. {
  50. delete jobManagerCrit;
  51. }
  52. unsigned uniqGraphId = 1;
  53. #define FATAL_TIMEOUT 60
  54. class CFatalHandler : public CTimeoutTrigger, implements IFatalHandler
  55. {
  56. public:
  57. IMPLEMENT_IINTERFACE;
  58. CFatalHandler(unsigned timeout) : CTimeoutTrigger(timeout, "EXCEPTION")
  59. {
  60. }
  61. virtual bool action()
  62. {
  63. StringBuffer s("FAILED TO RECOVER FROM EXCEPTION, STOPPING THOR");
  64. FLLOG(MCoperatorWarning, thorJob, exception, s.str());
  65. Owned<IJobManager> jobManager = getJobManager();
  66. if (jobManager)
  67. {
  68. jobManager->fatal(exception);
  69. jobManager.clear();
  70. }
  71. return true;
  72. }
  73. // IFatalHandler
  74. virtual void inform(IException *e)
  75. {
  76. CTimeoutTrigger::inform(e);
  77. }
  78. virtual void clear()
  79. {
  80. CTimeoutTrigger::clear();
  81. }
  82. };
  83. /////
  84. CSlaveMessageHandler::CSlaveMessageHandler(CJobMaster &_job, mptag_t _mptag) : threaded("CSlaveMessageHandler"), job(_job), mptag(_mptag)
  85. {
  86. stopped = false;
  87. threaded.init(this);
  88. }
  89. CSlaveMessageHandler::~CSlaveMessageHandler()
  90. {
  91. stop();
  92. }
  93. void CSlaveMessageHandler::stop()
  94. {
  95. if (!stopped)
  96. {
  97. stopped = true;
  98. job.queryJobComm().cancel(0, mptag);
  99. threaded.join();
  100. }
  101. }
  102. void CSlaveMessageHandler::main()
  103. {
  104. try
  105. {
  106. loop
  107. {
  108. rank_t sender;
  109. CMessageBuffer msg;
  110. if (stopped || !job.queryJobComm().recv(msg, RANK_ALL, mptag, &sender))
  111. break;
  112. unsigned slave = ((unsigned)sender)-1;
  113. SlaveMsgTypes msgType;
  114. msg.read((int &)msgType);
  115. switch (msgType)
  116. {
  117. case smt_errorMsg:
  118. {
  119. Owned<IThorException> e = deserializeThorException(msg);
  120. e->setSlave(sender);
  121. Owned<CGraphBase> graph = job.getGraph(e->queryGraphId());
  122. if (graph)
  123. {
  124. activity_id id = e->queryActivityId();
  125. if (id)
  126. {
  127. CGraphElementBase *elem = graph->queryElement(id);
  128. CActivityBase *act = elem->queryActivity();
  129. if (act)
  130. act->fireException(e);
  131. else
  132. graph->fireException(e);
  133. }
  134. else
  135. graph->fireException(e);
  136. }
  137. else
  138. job.fireException(e);
  139. if (msg.getReplyTag() <= TAG_REPLY_BASE)
  140. {
  141. msg.clear();
  142. job.queryJobComm().reply(msg);
  143. }
  144. break;
  145. }
  146. case smt_dataReq:
  147. {
  148. graph_id gid;
  149. activity_id id;
  150. unsigned slave;
  151. msg.read(slave);
  152. msg.read(gid);
  153. msg.read(id);
  154. msg.clear();
  155. Owned<CGraphBase> graph = job.getGraph(gid);
  156. if (graph)
  157. {
  158. CMasterGraphElement *e = (CMasterGraphElement *)graph->queryElement(id);
  159. e->queryActivity()->getInitializationData(slave, msg);
  160. }
  161. job.queryJobComm().reply(msg);
  162. break;
  163. }
  164. case smt_initGraphReq:
  165. {
  166. graph_id gid;
  167. msg.read(gid);
  168. Owned<CMasterGraph> graph = (CMasterGraph *)job.getGraph(gid);
  169. assertex(graph);
  170. size32_t parentExtractSz;
  171. msg.read(parentExtractSz);
  172. const byte *parentExtract = NULL;
  173. if (parentExtractSz)
  174. {
  175. parentExtract = msg.readDirect(parentExtractSz);
  176. StringBuffer msg("Graph(");
  177. msg.append(graph->queryGraphId()).append(") - initializing master graph with parentExtract ").append(parentExtractSz).append(" bytes");
  178. DBGLOG("%s", msg.str());
  179. parentExtract = graph->setParentCtx(parentExtractSz, parentExtract);
  180. }
  181. Owned<IThorActivityIterator> iter = graph->getIterator();
  182. // onCreate all
  183. ForEach (*iter)
  184. {
  185. CMasterGraphElement &element = (CMasterGraphElement &)iter->query();
  186. element.onCreate();
  187. if (isDiskInput(element.getKind()))
  188. element.onStart(parentExtractSz, parentExtract);
  189. }
  190. msg.clear();
  191. graph->serializeCreateContexts(msg, true);
  192. job.queryJobComm().reply(msg);
  193. break;
  194. }
  195. case smt_initActDataReq:
  196. {
  197. graph_id gid;
  198. msg.read(gid);
  199. Owned<CMasterGraph> graph = (CMasterGraph *)job.getGraph(gid);
  200. assertex(graph);
  201. msg.clear();
  202. CMessageBuffer replyMsg;
  203. mptag_t replyTag = createReplyTag();
  204. msg.append(replyTag); // second reply
  205. replyMsg.setReplyTag(replyTag);
  206. graph->createForLocal();
  207. graph->serializeActivityInitData(((unsigned)sender)-1, msg);
  208. job.queryJobComm().reply(msg);
  209. if (!job.queryJobComm().recv(msg, sender, replyTag, NULL, MEDIUMTIMEOUT))
  210. throwUnexpected();
  211. bool error;
  212. msg.read(error);
  213. if (error)
  214. {
  215. Owned<IThorException> e = deserializeThorException(msg);
  216. e->setSlave(sender);
  217. StringBuffer tmpStr("Slave ");
  218. job.queryJobGroup().queryNode(sender).endpoint().getUrlStr(tmpStr);
  219. GraphPrintLog(graph, e, "%s", tmpStr.append(": slave initialization error").str());
  220. throw e.getClear();
  221. }
  222. break;
  223. }
  224. case smt_getPhysicalName:
  225. {
  226. LOG(MCdebugProgress, unknownJob, "getPhysicalName called from slave %d", sender-1);
  227. StringAttr logicalName;
  228. unsigned partNo;
  229. bool create;
  230. msg.read(logicalName);
  231. msg.read(partNo);
  232. msg.read(create);
  233. msg.clear();
  234. StringBuffer phys;
  235. if (create && !job.queryCreatedFile(logicalName)) // not sure who would do this ever??
  236. queryThorFileManager().getPublishPhysicalName(job, logicalName, partNo, phys);
  237. else
  238. queryThorFileManager().getPhysicalName(job, logicalName, partNo, phys);
  239. msg.append(phys);
  240. break;
  241. }
  242. case smt_getFileOffset:
  243. {
  244. LOG(MCdebugProgress, unknownJob, "getFileOffset called from slave %d", sender-1);
  245. StringAttr logicalName;
  246. unsigned partNo;
  247. msg.read(logicalName);
  248. msg.read(partNo);
  249. msg.clear();
  250. offset_t offset = queryThorFileManager().getFileOffset(job, logicalName, partNo);
  251. msg.append(offset);
  252. job.queryJobComm().reply(msg);
  253. break;
  254. }
  255. case smt_actMsg:
  256. {
  257. LOG(MCdebugProgress, unknownJob, "smt_podata called from slave %d", sender-1);
  258. graph_id gid;
  259. msg.read(gid);
  260. activity_id id;
  261. msg.read(id);
  262. Owned<CMasterGraph> graph = (CMasterGraph *)job.getGraph(gid);
  263. assertex(graph);
  264. CMasterGraphElement *container = (CMasterGraphElement *)graph->queryElement(id);
  265. assertex(container);
  266. CMasterActivity *activity = (CMasterActivity *)container->queryActivity();
  267. assertex(activity);
  268. activity->handleSlaveMessage(msg); // don't block
  269. break;
  270. }
  271. }
  272. }
  273. }
  274. catch (IException *e)
  275. {
  276. job.fireException(e);
  277. e->Release();
  278. }
  279. }
  280. //////////////////////
  281. CMasterActivity::CMasterActivity(CGraphElementBase *_container) : CActivityBase(_container), threaded("CMasterActivity")
  282. {
  283. notedWarnings = createBitSet();
  284. mpTag = TAG_NULL;
  285. data = new MemoryBuffer[container.queryJob().querySlaves()];
  286. if (container.isSink())
  287. progressInfo.append(*new ProgressInfo);
  288. else
  289. {
  290. unsigned o=0;
  291. for (; o<container.getOutputs(); o++)
  292. progressInfo.append(*new ProgressInfo);
  293. }
  294. }
  295. CMasterActivity::~CMasterActivity()
  296. {
  297. if (actStarted)
  298. threaded.join();
  299. notedWarnings->Release();
  300. container.queryJob().freeMPTag(mpTag);
  301. delete [] data;
  302. }
  303. MemoryBuffer &CMasterActivity::queryInitializationData(unsigned slave) const
  304. { // NB: not intended to be called by multiple threads.
  305. return data[slave].reset();
  306. }
  307. MemoryBuffer &CMasterActivity::getInitializationData(unsigned slave, MemoryBuffer &dst) const
  308. {
  309. return dst.append(data[slave]);
  310. }
  311. void CMasterActivity::main()
  312. {
  313. try
  314. {
  315. process();
  316. }
  317. catch (IException *e)
  318. {
  319. Owned<IException> e2;
  320. if (QUERYINTERFACE(e, ISEH_Exception))
  321. e2.setown(MakeThorFatal(e, TE_SEH, "(SEH)"));
  322. else
  323. e2.setown(MakeActivityException(this, e, "Master exception"));
  324. e->Release();
  325. ActPrintLog(e2, NULL);
  326. fireException(e2);
  327. }
  328. catch (CATCHALL)
  329. {
  330. Owned<IException> e = MakeThorFatal(NULL, TE_MasterProcessError, "FATAL: Unknown master process exception kind=%s, id=%"ACTPF"d", activityKindStr(container.getKind()), container.queryId());
  331. ActPrintLog(e, NULL);
  332. fireException(e);
  333. }
  334. }
  335. void CMasterActivity::startProcess()
  336. {
  337. CActivityBase::startProcess();
  338. threaded.init(this);
  339. }
  340. bool CMasterActivity::wait(unsigned timeout)
  341. {
  342. return threaded.join(timeout);
  343. }
  344. bool CMasterActivity::fireException(IException *_e)
  345. {
  346. IThorException *e = QUERYINTERFACE(_e, IThorException);
  347. if (!e) return false;
  348. switch (e->errorCode())
  349. {
  350. case TE_LargeBufferWarning:
  351. case TE_MoxieIndarOverflow:
  352. case TE_BuildIndexFewExcess:
  353. case TE_FetchMisaligned:
  354. case TE_FetchOutOfRange:
  355. case TE_CouldNotCreateLookAhead:
  356. case TE_SpillAdded:
  357. case TE_ReadPartialFromPipe:
  358. case TE_LargeAggregateTable:
  359. {
  360. if (!notedWarnings->testSet(e->errorCode()))
  361. reportExceptionToWorkunit(container.queryJob().queryWorkUnit(), e);
  362. return true;
  363. }
  364. }
  365. return container.queryOwner().fireException(e);
  366. }
  367. void CMasterActivity::reset()
  368. {
  369. CActivityBase::reset();
  370. }
  371. void CMasterActivity::deserializeStats(unsigned node, MemoryBuffer &mb)
  372. {
  373. CriticalBlock b(progressCrit); // don't think needed
  374. unsigned __int64 localTimeNs;
  375. mb.read(localTimeNs);
  376. timingInfo.set(node, localTimeNs/1000000); // to milliseconds
  377. rowcount_t count;
  378. ForEachItemIn(p, progressInfo)
  379. {
  380. mb.read(count);
  381. progressInfo.item(p).set(node, count);
  382. }
  383. }
  384. void CMasterActivity::getXGMML(IWUGraphProgress *progress, IPropertyTree *node)
  385. {
  386. timingInfo.getXGMML(node);
  387. }
  388. void CMasterActivity::getXGMML(unsigned idx, IPropertyTree *edge)
  389. {
  390. CriticalBlock b(progressCrit);
  391. if (progressInfo.isItem(idx))
  392. progressInfo.item(idx).getXGMML(edge);
  393. }
  394. //////////////////////
  395. // CMasterGraphElement impl.
  396. //
  397. CMasterGraphElement::CMasterGraphElement(CGraphBase &_owner, IPropertyTree &_xgmml) : CGraphElementBase(_owner, _xgmml)
  398. {
  399. sentCreateCtx = false;
  400. }
  401. bool CMasterGraphElement::checkUpdate()
  402. {
  403. if (!onlyUpdateIfChanged)
  404. return false;
  405. if (!globals->getPropBool("@updateEnabled", true) || 0 != queryJob().getWorkUnitValueInt("disableUpdate", 0))
  406. return false;
  407. bool doCheckUpdate = false;
  408. StringAttr filename;
  409. unsigned eclCRC;
  410. unsigned __int64 totalCRC;
  411. bool temporary = false;
  412. switch (getKind())
  413. {
  414. case TAKindexwrite:
  415. {
  416. IHThorIndexWriteArg *helper = (IHThorIndexWriteArg *)queryHelper();
  417. doCheckUpdate = 0 != (helper->getFlags() & TIWupdate);
  418. filename.set(helper->getFileName());
  419. helper->getUpdateCRCs(eclCRC, totalCRC);
  420. break;
  421. }
  422. case TAKdiskwrite:
  423. case TAKcsvwrite:
  424. case TAKxmlwrite:
  425. {
  426. IHThorDiskWriteArg *helper = (IHThorDiskWriteArg *)queryHelper();
  427. doCheckUpdate = 0 != (helper->getFlags() & TDWupdate);
  428. filename.set(helper->getFileName());
  429. helper->getUpdateCRCs(eclCRC, totalCRC);
  430. if (TAKdiskwrite == getKind())
  431. temporary = 0 != (helper->getFlags() & (TDXtemporary|TDXjobtemp));
  432. break;
  433. }
  434. }
  435. if (doCheckUpdate)
  436. {
  437. StringAttr lfn;
  438. Owned<IDistributedFile> file = queryThorFileManager().lookup(queryJob(), filename, temporary, true);
  439. if (file)
  440. {
  441. IPropertyTree &props = file->queryProperties();
  442. if ((eclCRC == props.getPropInt("@eclCRC")) && (totalCRC == props.getPropInt64("@totalCRC")))
  443. {
  444. // so this needs pruning
  445. Owned<IThorException> e = MakeActivityWarning(this, TE_UpToDate, "output file = '%s' - is up to date - it will not be rebuilt", file->queryLogicalName());
  446. queryOwner().fireException(e);
  447. return true;
  448. }
  449. }
  450. }
  451. return false;
  452. }
  453. void CMasterGraphElement::initActivity()
  454. {
  455. CGraphElementBase::initActivity();
  456. ((CMasterActivity *)activity.get())->init();
  457. }
  458. void CMasterGraphElement::doCreateActivity()
  459. {
  460. if (activity)
  461. return;
  462. bool ok=false;
  463. switch (getKind())
  464. {
  465. case TAKspill:
  466. case TAKdiskwrite:
  467. case TAKfetch:
  468. case TAKkeyedjoin:
  469. case TAKworkunitwrite:
  470. case TAKworkunitread:
  471. ok = true;
  472. break;
  473. default:
  474. {
  475. if (isDiskInput(getKind()))
  476. ok = true;
  477. else if (!queryLocalOrGrouped())
  478. ok = true;
  479. break;
  480. }
  481. }
  482. if (!ok)
  483. return;
  484. activity.setown(factory());
  485. initActivity();
  486. }
  487. void CMasterGraphElement::slaveDone(size32_t slaveIdx, MemoryBuffer &mb)
  488. {
  489. ((CMasterActivity *)activity.get())->slaveDone(slaveIdx, mb);
  490. }
  491. //////
  492. ///////
  493. class CBarrierMaster : public CInterface, implements IBarrier
  494. {
  495. mptag_t tag;
  496. Linked<ICommunicator> comm;
  497. bool receiving;
  498. public:
  499. IMPLEMENT_IINTERFACE;
  500. CBarrierMaster(ICommunicator &_comm, mptag_t _tag) : comm(&_comm), tag(_tag)
  501. {
  502. receiving = false;
  503. }
  504. virtual const mptag_t queryTag() const { return tag; }
  505. virtual bool wait(bool exception, unsigned timeout)
  506. {
  507. CTimeMon tm(timeout);
  508. unsigned s=comm->queryGroup().ordinality()-1;
  509. bool aborted = false;
  510. CMessageBuffer msg;
  511. Owned<IBitSet> raisedSet = createBitSet();
  512. unsigned remaining = timeout;
  513. while (s--)
  514. {
  515. rank_t sender;
  516. msg.clear();
  517. if (INFINITE != timeout && tm.timedout(&remaining))
  518. {
  519. if (exception)
  520. throw createBarrierAbortException();
  521. else
  522. return false;
  523. }
  524. {
  525. BooleanOnOff onOff(receiving);
  526. if (!comm->recv(msg, RANK_ALL, tag, &sender, remaining))
  527. break;
  528. }
  529. msg.read(aborted);
  530. sender = sender - 1; // 0 = master
  531. if (raisedSet->testSet(sender, true) && !aborted)
  532. WARNLOG("CBarrierMaster, raise barrier message on tag %d, already received from slave %d", tag, sender);
  533. if (aborted) break;
  534. }
  535. msg.clear();
  536. msg.append(aborted);
  537. if (INFINITE != timeout && tm.timedout(&remaining))
  538. {
  539. if (exception)
  540. throw createBarrierAbortException();
  541. else
  542. return false;
  543. }
  544. if (!comm->send(msg, RANK_ALL_OTHER, tag, INFINITE != timeout ? remaining : LONGTIMEOUT))
  545. throw MakeStringException(0, "CBarrierMaster::wait - Timeout sending to slaves");
  546. if (aborted)
  547. {
  548. if (exception)
  549. throw createBarrierAbortException();
  550. else
  551. return false;
  552. }
  553. return true;
  554. }
  555. virtual void cancel()
  556. {
  557. if (receiving)
  558. comm->cancel(RANK_ALL, tag);
  559. CMessageBuffer msg;
  560. msg.append(true);
  561. if (!comm->send(msg, RANK_ALL_OTHER, tag, LONGTIMEOUT))
  562. throw MakeStringException(0, "CBarrierMaster::cancel - Timeout sending to slaves");
  563. }
  564. };
  565. /////////////
  566. class CMasterGraphTempHandler : public CGraphTempHandler
  567. {
  568. public:
  569. CMasterGraphTempHandler(CJobBase &job) : CGraphTempHandler(job) { }
  570. virtual bool removeTemp(const char *name)
  571. {
  572. queryThorFileManager().clearCacheEntry(name);
  573. return true;
  574. }
  575. virtual void registerFile(const char *name, graph_id graphId, unsigned usageCount, bool temp, WUFileKind fileKind, StringArray *clusters)
  576. {
  577. if (!temp || job.queryUseCheckpoints())
  578. {
  579. StringBuffer scopedName;
  580. queryThorFileManager().addScope(job, name, scopedName, temp || fileKind==WUFileJobOwned);
  581. Owned<IWorkUnit> wu = &job.queryWorkUnit().lock();
  582. wu->addFile(scopedName.str(), clusters, usageCount, fileKind, job.queryGraphName());
  583. }
  584. else
  585. CGraphTempHandler::registerFile(name, graphId, usageCount, temp, fileKind, clusters);
  586. }
  587. virtual void deregisterFile(const char *name, bool kept) // NB: only called for temp files
  588. {
  589. if (kept || job.queryUseCheckpoints())
  590. {
  591. StringBuffer scopedName;
  592. queryThorFileManager().addScope(job, name, scopedName, kept, kept);
  593. Owned<IWorkUnit> wu = &job.queryWorkUnit().lock();
  594. wu->releaseFile(scopedName.str());
  595. }
  596. else
  597. CGraphTempHandler::deregisterFile(name);
  598. }
  599. virtual void clearTemps()
  600. {
  601. try
  602. {
  603. if (!job.queryPausing()) // temps of completed workunit will have been preserved and want to keep
  604. {
  605. Owned<IWorkUnit> lwu = &job.queryWorkUnit().lock();
  606. lwu->deleteTempFiles(job.queryGraphName(), false, false);
  607. }
  608. }
  609. catch (IException *e)
  610. {
  611. EXCLOG(e, "Problem deleting temp files");
  612. e->Release();
  613. }
  614. CGraphTempHandler::clearTemps();
  615. }
  616. };
  617. static const char * getResultText(StringBuffer & s, const char * stepname, unsigned sequence)
  618. {
  619. switch ((int)sequence)
  620. {
  621. case -1: return s.append("STORED('").append(stepname).append("')");
  622. case -2: return s.append("PERSIST('").append(stepname).append("')");
  623. case -3: return s.append("global('").append(stepname).append("')");
  624. default:
  625. if (stepname)
  626. return s.append(stepname);
  627. return s.append('#').append(sequence);
  628. }
  629. }
  630. class CThorCodeContextMaster : public CThorCodeContextBase
  631. {
  632. Linked<IConstWorkUnit> workunit;
  633. Owned<IDistributedFileTransaction> superfiletransaction;
  634. IWorkUnit *updateWorkUnit()
  635. {
  636. StringAttr wuid;
  637. workunit->getWuid(StringAttrAdaptor(wuid));
  638. Owned<IWorkUnitFactory> factory = getWorkUnitFactory();
  639. return factory->updateWorkUnit(wuid);
  640. }
  641. IWUResult *updateResult(const char *name, unsigned sequence)
  642. {
  643. Owned<IWorkUnit> w = updateWorkUnit();
  644. return updateWorkUnitResult(w, name, sequence);
  645. }
  646. IConstWUResult * getResult(const char * name, unsigned sequence)
  647. {
  648. return getWorkUnitResult(workunit, name, sequence);
  649. }
  650. #define PROTECTED_GETRESULT(STEPNAME, SEQUENCE, KIND, KINDTEXT, ACTION) \
  651. LOG(MCdebugProgress, unknownJob, "getResult%s(%s,%d)", KIND, STEPNAME?STEPNAME:"", SEQUENCE); \
  652. Owned<IConstWUResult> r = getResultForGet(STEPNAME, SEQUENCE); \
  653. try \
  654. { \
  655. ACTION \
  656. } \
  657. catch (IException * e) { \
  658. StringBuffer s, text; e->errorMessage(text); e->Release(); \
  659. throw MakeStringException(TE_FailedToRetrieveWorkunitValue, "result %s in workunit contains an invalid " KINDTEXT " value [%s]", getResultText(s, STEPNAME, SEQUENCE), text.str()); \
  660. } \
  661. catch (CATCHALL) { StringBuffer s; throw MakeStringException(TE_FailedToRetrieveWorkunitValue, "value %s in workunit contains an invalid " KINDTEXT " value", getResultText(s, STEPNAME, SEQUENCE)); }
  662. public:
  663. CThorCodeContextMaster(CJobBase &job, IConstWorkUnit &_workunit, ILoadedDllEntry &querySo, IUserDescriptor &userDesc) : CThorCodeContextBase(job, querySo, userDesc), workunit(&_workunit)
  664. {
  665. }
  666. // ICodeContext
  667. virtual void setResultBool(const char *name, unsigned sequence, bool result)
  668. {
  669. Owned<IWUResult> r = updateResult(name, sequence);
  670. if (r)
  671. {
  672. r->setResultBool(result);
  673. r->setResultStatus(ResultStatusCalculated);
  674. }
  675. else
  676. throw MakeStringException(TE_UnexpectedParameters, "Unexpected parameters to setResultBool");
  677. }
  678. virtual void setResultData(const char *name, unsigned sequence, int len, const void *result)
  679. {
  680. Owned<IWUResult> r = updateResult(name, sequence);
  681. if (r)
  682. {
  683. r->setResultData(result, len);
  684. r->setResultStatus(ResultStatusCalculated);
  685. }
  686. else
  687. throw MakeStringException(TE_UnexpectedParameters, "Unexpected parameters to setResultData");
  688. }
  689. virtual void setResultDecimal(const char * name, unsigned sequence, int len, int precision, bool isSigned, const void *val)
  690. {
  691. Owned<IWUResult> r = updateResult(name, sequence);
  692. if (r)
  693. {
  694. r->setResultDecimal(val, len);
  695. r->setResultStatus(ResultStatusCalculated);
  696. }
  697. else
  698. throw MakeStringException(TE_UnexpectedParameters, "Unexpected parameters to setResultDecimal");
  699. }
  700. virtual void setResultInt(const char *name, unsigned sequence, __int64 result)
  701. {
  702. Owned<IWUResult> r = updateResult(name, sequence);
  703. if (r)
  704. {
  705. r->setResultInt(result);
  706. r->setResultStatus(ResultStatusCalculated);
  707. }
  708. else
  709. throw MakeStringException(TE_UnexpectedParameters, "Unexpected parameters to setResultInt");
  710. }
  711. virtual void setResultRaw(const char *name, unsigned sequence, int len, const void *result)
  712. {
  713. Owned<IWUResult> r = updateResult(name, sequence);
  714. if (r)
  715. {
  716. r->setResultRaw(len, result, ResultFormatRaw);
  717. r->setResultStatus(ResultStatusCalculated);
  718. }
  719. else
  720. throw MakeStringException(TE_UnexpectedParameters, "Unexpected parameters to setResultData");
  721. }
  722. virtual void setResultReal(const char *name, unsigned sequence, double result)
  723. {
  724. Owned<IWUResult> r = updateResult(name, sequence);
  725. if (r)
  726. {
  727. r->setResultReal(result);
  728. r->setResultStatus(ResultStatusCalculated);
  729. }
  730. else
  731. throw MakeStringException(TE_UnexpectedParameters, "Unexpected parameters to setResultReal");
  732. }
  733. virtual void setResultSet(const char *name, unsigned sequence, bool isAll, size32_t len, const void *result, ISetToXmlTransformer *)
  734. {
  735. Owned<IWUResult> r = updateResult(name, sequence);
  736. if (r)
  737. {
  738. r->setResultIsAll(isAll);
  739. r->setResultRaw(len, result, ResultFormatRaw);
  740. r->setResultStatus(ResultStatusCalculated);
  741. }
  742. else
  743. throw MakeStringException(TE_UnexpectedParameters, "Unexpected parameters to setResultData");
  744. }
  745. virtual void setResultString(const char *name, unsigned sequence, int len, const char *result)
  746. {
  747. Owned<IWUResult> r = updateResult(name, sequence);
  748. if (r)
  749. {
  750. r->setResultString(result, len);
  751. r->setResultStatus(ResultStatusCalculated);
  752. }
  753. else
  754. throw MakeStringException(TE_UnexpectedParameters, "Unexpected parameters to setResultString");
  755. }
  756. virtual void setResultUnicode(const char * name, unsigned sequence, int len, UChar const * result)
  757. {
  758. Owned<IWUResult> r = updateResult(name, sequence);
  759. if (r)
  760. {
  761. r->setResultUnicode(result, len);
  762. r->setResultStatus(ResultStatusCalculated);
  763. }
  764. else
  765. throw MakeStringException(TE_UnexpectedParameters, "Unexpected parameters to setResultUnicode");
  766. }
  767. virtual void setResultVarString(const char * name, unsigned sequence, const char *result)
  768. {
  769. Owned<IWUResult> r = updateResult(name, sequence);
  770. if (r)
  771. {
  772. r->setResultString(result, strlen(result));
  773. r->setResultStatus(ResultStatusCalculated);
  774. }
  775. else
  776. throw MakeStringException(TE_UnexpectedParameters, "Unexpected parameters to setResultVarString");
  777. }
  778. virtual void setResultUInt(const char *name, unsigned sequence, unsigned __int64 result)
  779. {
  780. Owned<IWUResult> r = updateResult(name, sequence);
  781. if (r)
  782. {
  783. r->setResultUInt(result);
  784. r->setResultStatus(ResultStatusCalculated);
  785. }
  786. else
  787. throw MakeStringException(TE_UnexpectedParameters, "Unexpected parameters to setResultUInt");
  788. }
  789. virtual void setResultVarUnicode(const char * stepname, unsigned sequence, UChar const *val)
  790. {
  791. setResultUnicode(stepname, sequence, rtlUnicodeStrlen(val), val);
  792. }
  793. virtual bool getResultBool(const char * stepname, unsigned sequence)
  794. {
  795. PROTECTED_GETRESULT(stepname, sequence, "Bool", "bool",
  796. return r->getResultBool();
  797. );
  798. }
  799. virtual void getResultData(unsigned & tlen, void * & tgt, const char * stepname, unsigned sequence)
  800. {
  801. PROTECTED_GETRESULT(stepname, sequence, "Data", "data",
  802. SCMStringBuffer result;
  803. r->getResultString(result);
  804. tlen = result.length();
  805. tgt = (char *)result.s.detach();
  806. );
  807. }
  808. virtual void getResultDecimal(unsigned tlen, int precision, bool isSigned, void * tgt, const char * stepname, unsigned sequence)
  809. {
  810. PROTECTED_GETRESULT(stepname, sequence, "Decimal", "decimal",
  811. r->getResultDecimal(tgt, tlen, precision, isSigned);
  812. );
  813. }
  814. virtual void getResultRaw(unsigned & tlen, void * & tgt, const char * stepname, unsigned sequence, IXmlToRowTransformer * xmlTransformer, ICsvToRowTransformer * csvTransformer)
  815. {
  816. tgt = NULL;
  817. PROTECTED_GETRESULT(stepname, sequence, "Raw", "raw",
  818. Variable2IDataVal result(&tlen, &tgt);
  819. Owned<IXmlToRawTransformer> rawXmlTransformer = createXmlRawTransformer(xmlTransformer);
  820. Owned<ICsvToRawTransformer> rawCsvTransformer = createCsvRawTransformer(csvTransformer);
  821. r->getResultRaw(result, rawXmlTransformer, rawCsvTransformer);
  822. );
  823. }
  824. virtual void getResultSet(bool & isAll, unsigned & tlen, void * & tgt, const char * stepname, unsigned sequence, IXmlToRowTransformer * xmlTransformer, ICsvToRowTransformer * csvTransformer)
  825. {
  826. tgt = NULL;
  827. PROTECTED_GETRESULT(stepname, sequence, "Raw", "raw",
  828. Variable2IDataVal result(&tlen, &tgt);
  829. Owned<IXmlToRawTransformer> rawXmlTransformer = createXmlRawTransformer(xmlTransformer);
  830. Owned<ICsvToRawTransformer> rawCsvTransformer = createCsvRawTransformer(csvTransformer);
  831. isAll = r->getResultIsAll();
  832. r->getResultRaw(result, rawXmlTransformer, rawCsvTransformer);
  833. );
  834. }
  835. virtual __int64 getResultInt(const char * name, unsigned sequence)
  836. {
  837. PROTECTED_GETRESULT(name, sequence, "Int", "integer",
  838. return r->getResultInt();
  839. );
  840. }
  841. virtual double getResultReal(const char * name, unsigned sequence)
  842. {
  843. PROTECTED_GETRESULT(name, sequence, "Real", "real",
  844. return r->getResultReal();
  845. );
  846. }
  847. virtual void getResultString(unsigned & tlen, char * & tgt, const char * stepname, unsigned sequence)
  848. {
  849. PROTECTED_GETRESULT(stepname, sequence, "String", "string",
  850. SCMStringBuffer result;
  851. r->getResultString(result);
  852. tlen = result.length();
  853. tgt = (char *)result.s.detach();
  854. );
  855. }
  856. virtual void getResultStringF(unsigned tlen, char * tgt, const char * stepname, unsigned sequence)
  857. {
  858. PROTECTED_GETRESULT(stepname, sequence, "String", "string",
  859. SCMStringBuffer result;
  860. r->getResultString(result);
  861. rtlStrToStr(tlen, tgt, result.length(), result.s.str());
  862. );
  863. }
  864. virtual void getResultUnicode(unsigned & tlen, UChar * & tgt, const char * stepname, unsigned sequence)
  865. {
  866. PROTECTED_GETRESULT(stepname, sequence, "Unicode", "unicode",
  867. MemoryBuffer result;
  868. r->getResultUnicode(MemoryBuffer2IDataVal(result));
  869. tlen = result.length()/2;
  870. tgt = (UChar *)malloc(tlen*2);
  871. memcpy(tgt, result.toByteArray(), tlen*2);
  872. );
  873. }
  874. virtual char * getResultVarString(const char * stepname, unsigned sequence)
  875. {
  876. PROTECTED_GETRESULT(stepname, sequence, "VarString", "string",
  877. SCMStringBuffer result;
  878. r->getResultString(result);
  879. return result.s.detach();
  880. );
  881. }
  882. virtual UChar * getResultVarUnicode(const char * stepname, unsigned sequence)
  883. {
  884. PROTECTED_GETRESULT(stepname, sequence, "VarUnicode", "unicode",
  885. MemoryBuffer result;
  886. r->getResultUnicode(MemoryBuffer2IDataVal(result));
  887. unsigned tlen = result.length()/2;
  888. result.append((UChar)0);
  889. return (UChar *)result.detach();
  890. );
  891. }
  892. virtual unsigned getResultHash(const char * name, unsigned sequence)
  893. {
  894. PROTECTED_GETRESULT(name, sequence, "Hash", "hash",
  895. return r->getResultHash();
  896. );
  897. }
  898. virtual void getResultRowset(size32_t & tcount, byte * * & tgt, const char * stepname, unsigned sequence, IEngineRowAllocator * _rowAllocator, IOutputRowDeserializer * deserializer, bool isGrouped, IXmlToRowTransformer * xmlTransformer, ICsvToRowTransformer * csvTransformer)
  899. {
  900. tgt = NULL;
  901. PROTECTED_GETRESULT(stepname, sequence, "Rowset", "rowset",
  902. MemoryBuffer datasetBuffer;
  903. MemoryBuffer2IDataVal result(datasetBuffer);
  904. Owned<IXmlToRawTransformer> rawXmlTransformer = createXmlRawTransformer(xmlTransformer);
  905. Owned<ICsvToRawTransformer> rawCsvTransformer = createCsvRawTransformer(csvTransformer);
  906. r->getResultRaw(result, rawXmlTransformer, rawCsvTransformer);
  907. rtlDataset2RowsetX(tcount, tgt, _rowAllocator, deserializer, datasetBuffer.length(), datasetBuffer.toByteArray(), isGrouped);
  908. );
  909. }
  910. virtual void getExternalResultRaw(unsigned & tlen, void * & tgt, const char * wuid, const char * stepname, unsigned sequence, IXmlToRowTransformer * xmlTransformer, ICsvToRowTransformer * csvTransformer)
  911. {
  912. tgt = NULL;
  913. try
  914. {
  915. LOG(MCdebugProgress, unknownJob, "getExternalResultRaw %s", stepname);
  916. Variable2IDataVal result(&tlen, &tgt);
  917. Owned<IConstWUResult> r = getExternalResult(wuid, stepname, sequence);
  918. Owned<IXmlToRawTransformer> rawXmlTransformer = createXmlRawTransformer(xmlTransformer);
  919. Owned<ICsvToRawTransformer> rawCsvTransformer = createCsvRawTransformer(csvTransformer);
  920. r->getResultRaw(result, rawXmlTransformer, rawCsvTransformer);
  921. }
  922. catch (CATCHALL)
  923. {
  924. throw MakeStringException(TE_FailedToRetrieveWorkunitValue, "Failed to retrieve external data value %s from workunit %s", stepname, wuid);
  925. }
  926. }
  927. virtual __int64 countDiskFile(const char * name, unsigned recordSize)
  928. {
  929. unsigned __int64 size = 0;
  930. Owned<IDistributedFile> f = queryThorFileManager().lookup(job, name);
  931. if (f)
  932. {
  933. size = f->getFileSize(true,false);
  934. if (size % recordSize)
  935. throw MakeStringException(9001, "File %s has size %"I64F"d which is not a multiple of record size %d", name, size, recordSize);
  936. return size / recordSize;
  937. }
  938. DBGLOG("Error could not resolve file %s", name);
  939. throw MakeStringException(9003, "Error could not resolve %s", name);
  940. }
  941. virtual __int64 countIndex(__int64 activityId, IHThorCountIndexArg & arg) { UNIMPLEMENTED; }
  942. virtual __int64 countDiskFile(__int64 id, IHThorCountFileArg & arg)
  943. {
  944. // would have called the above function in a try block but corrupted registers whenever I tried.
  945. Owned<IHThorCountFileArg> a = &arg; // make sure it gets destroyed....
  946. arg.onCreate(this, NULL, NULL);
  947. arg.onStart(NULL, NULL);
  948. const char *name = arg.getFileName();
  949. Owned<IDistributedFile> f = queryThorFileManager().lookup(job, name, 0 != ((TDXtemporary|TDXjobtemp) & arg.getFlags()));
  950. if (f)
  951. {
  952. IOutputMetaData * rs = arg.queryRecordSize();
  953. assertex(rs->isFixedSize());
  954. unsigned recordSize = rs->getMinRecordSize();
  955. unsigned __int64 size = f->getFileSize(true,false);
  956. if (size % recordSize)
  957. {
  958. throw MakeStringException(0, "Physical file %s has size %"I64F"d which is not a multiple of record size %d", name, size, recordSize);
  959. }
  960. return size / recordSize;
  961. }
  962. else if (arg.getFlags() & TDRoptional)
  963. return 0;
  964. else
  965. {
  966. PrintLog("Error could not resolve file %s", name);
  967. throw MakeStringException(0, "Error could not resolve %s", name);
  968. }
  969. }
  970. virtual void addWuException(const char * text, unsigned code, unsigned severity)
  971. {
  972. DBGLOG("%s", text);
  973. try
  974. {
  975. Owned<IWorkUnit> w = updateWorkUnit();
  976. Owned<IWUException> we = w->createException();
  977. we->setSeverity((WUExceptionSeverity)severity);
  978. we->setExceptionMessage(text);
  979. we->setExceptionSource("user");
  980. if (code)
  981. we->setExceptionCode(code);
  982. }
  983. catch (IException *E)
  984. {
  985. StringBuffer m;
  986. E->errorMessage(m);
  987. DBGLOG("Unable to record exception in workunit: %s", m.str());
  988. E->Release();
  989. }
  990. catch (...)
  991. {
  992. DBGLOG("Unable to record exception in workunit: unknown exception");
  993. }
  994. }
  995. virtual void addWuAssertFailure(unsigned code, const char * text, const char * filename, unsigned lineno, unsigned column, bool isAbort)
  996. {
  997. DBGLOG("%s", text);
  998. try
  999. {
  1000. Owned<IWorkUnit> w = updateWorkUnit();
  1001. addExceptionToWorkunit(w, ExceptionSeverityError, "user", code, text, filename, lineno, column);
  1002. }
  1003. catch (IException *E)
  1004. {
  1005. StringBuffer m;
  1006. E->errorMessage(m);
  1007. DBGLOG("Unable to record exception in workunit: %s", m.str());
  1008. E->Release();
  1009. }
  1010. catch (...)
  1011. {
  1012. DBGLOG("Unable to record exception in workunit: unknown exception");
  1013. }
  1014. if (isAbort)
  1015. rtlFailOnAssert(); // minimal implementation
  1016. }
  1017. virtual unsigned __int64 getFileOffset(const char *logicalName) { assertex(false); return 0; }
  1018. virtual unsigned getRecoveringCount() { UNIMPLEMENTED; } // don't know how to implement here!
  1019. virtual unsigned getNodes() { return job.queryJobGroup().ordinality()-1; }
  1020. virtual unsigned getNodeNum() { throw MakeThorException(0, "Unsupported. getNodeNum() called in master"); return (unsigned)-1; }
  1021. virtual char *getFilePart(const char *logicalName, bool create=false) { assertex(false); return NULL; }
  1022. virtual unsigned __int64 getDatasetHash(const char * name, unsigned __int64 hash)
  1023. {
  1024. unsigned checkSum = 0;
  1025. Owned<IDistributedFile> iDfsFile = queryThorFileManager().lookup(job, name, false, true);
  1026. if (iDfsFile.get())
  1027. {
  1028. if (iDfsFile->getFileCheckSum(checkSum))
  1029. hash ^= checkSum;
  1030. else
  1031. {
  1032. StringBuffer modifiedStr;
  1033. if (iDfsFile->queryProperties().getProp("@modified", modifiedStr))
  1034. hash = rtlHash64Data(modifiedStr.length(), modifiedStr.str(), hash);
  1035. // JCS->GH - what's the best thing to do here, if [for some reason] neither are available..
  1036. }
  1037. }
  1038. return hash;
  1039. }
  1040. virtual IDistributedFileTransaction *querySuperFileTransaction()
  1041. {
  1042. if (!superfiletransaction.get())
  1043. superfiletransaction.setown(createDistributedFileTransaction(userDesc));
  1044. return superfiletransaction.get();
  1045. }
  1046. virtual char *getJobName()
  1047. {
  1048. SCMStringBuffer out;
  1049. workunit->getJobName(out);
  1050. return out.s.detach();
  1051. }
  1052. virtual char *getClusterName()
  1053. {
  1054. SCMStringBuffer out;
  1055. workunit->getClusterName(out);
  1056. return out.s.detach();
  1057. }
  1058. virtual char *getGroupName()
  1059. {
  1060. StringBuffer out;
  1061. if (globals)
  1062. globals->getProp("@nodeGroup",out);
  1063. return out.detach();
  1064. }
  1065. // ICodeContextExt impl.
  1066. virtual IConstWUResult *getExternalResult(const char * wuid, const char *name, unsigned sequence)
  1067. {
  1068. Owned<IWorkUnitFactory> factory = getWorkUnitFactory();
  1069. Owned<IConstWorkUnit> externalWU = factory->openWorkUnit(wuid, false);
  1070. externalWU->remoteCheckAccess(userDesc, false);
  1071. return getWorkUnitResult(externalWU, name, sequence);
  1072. }
  1073. virtual IConstWUResult *getResultForGet(const char *name, unsigned sequence)
  1074. {
  1075. Owned<IConstWUResult> r = getResult(name, sequence);
  1076. if (!r || (r->getResultStatus() == ResultStatusUndefined))
  1077. {
  1078. StringBuffer s;
  1079. throw MakeStringException(TE_FailedToRetrieveWorkunitValue, "value %s in workunit is undefined", getResultText(s,name,sequence));
  1080. }
  1081. return r.getClear();
  1082. }
  1083. };
  1084. /////////////
  1085. //
  1086. // CJobMaster
  1087. //
  1088. void loadPlugin(SafePluginMap *pluginMap, const char *_path, const char *name, const char *version)
  1089. {
  1090. StringBuffer path(_path);
  1091. path.append(name);
  1092. OwnedIFile iFile = createIFile(path.str());
  1093. if (!iFile->exists())
  1094. throw MakeThorException(0, "Plugin %s not found at %s", name, path.str());
  1095. pluginMap->addPlugin(path.str(), name); // throws if unavailable/fails to load
  1096. Owned<ILoadedDllEntry> so = pluginMap->getPluginDll(name, version, true);
  1097. if (NULL == so.get()) // JCSMORE - could perhaps do with a more direct way of asking.
  1098. throw MakeThorException(0, "Incompatible plugin (%s). Version %s unavailable", name, version);
  1099. }
  1100. CJobMaster::CJobMaster(IConstWorkUnit &_workunit, const char *graphName, const char *_querySo, bool _sendSo, const SocketEndpoint &_agentEp)
  1101. : CJobBase(graphName), workunit(&_workunit), sendSo(_sendSo), agentEp(_agentEp)
  1102. {
  1103. SCMStringBuffer _token, _wuid, _user, _scope;
  1104. workunit->getWuid(_wuid);
  1105. workunit->getUser(_user);
  1106. workunit->getScope(_scope);
  1107. workunit->getSecurityToken(_token);
  1108. wuid.append(_wuid.str());
  1109. user.append(_user.str());
  1110. token.append(_token.str());
  1111. scope.append(_scope.str());
  1112. init();
  1113. resumed = WUActionResume == workunit->getAction();
  1114. fatalHandler.setown(new CFatalHandler(globals->getPropInt("@fatal_timeout", FATAL_TIMEOUT)));
  1115. querySent = false;
  1116. nodeDiskUsageCached = false;
  1117. StringBuffer pluginsDir;
  1118. globals->getProp("@pluginsPath", pluginsDir);
  1119. if (pluginsDir.length())
  1120. addPathSepChar(pluginsDir);
  1121. Owned<IConstWUPluginIterator> pluginIter = &workunit->getPlugins();
  1122. ForEach(*pluginIter)
  1123. {
  1124. IConstWUPlugin &plugin = pluginIter->query();
  1125. if (plugin.getPluginHole() || plugin.getPluginThor()) // JCSMORE ..Hole..
  1126. {
  1127. SCMStringBuffer name, version;
  1128. plugin.getPluginName(name);
  1129. plugin.getPluginVersion(version);
  1130. loadPlugin(pluginMap, pluginsDir.str(), name.str(), version.str());
  1131. }
  1132. }
  1133. querySo.setown(createDllEntry(_querySo, false, NULL));
  1134. codeCtx = new CThorCodeContextMaster(*this, *workunit, *querySo, *userDesc);
  1135. mpJobTag = allocateMPTag();
  1136. slavemptag = allocateMPTag();
  1137. slaveMsgHandler = new CSlaveMessageHandler(*this, slavemptag);
  1138. tmpHandler.setown(new CMasterGraphTempHandler(*this));
  1139. }
  1140. CJobMaster::~CJobMaster()
  1141. {
  1142. clean();
  1143. if (slaveMsgHandler)
  1144. delete slaveMsgHandler;
  1145. freeMPTag(mpJobTag);
  1146. freeMPTag(slavemptag);
  1147. tmpHandler.clear();
  1148. }
  1149. void CJobMaster::broadcastToSlaves(CMessageBuffer &msg, mptag_t mptag, unsigned timeout, const char *errorMsg, mptag_t *_replyTag, bool sendOnly)
  1150. {
  1151. mptag_t replyTag = createReplyTag();
  1152. msg.setReplyTag(replyTag);
  1153. if (!queryJobComm().send(msg, RANK_ALL_OTHER, mptag, timeout))
  1154. {
  1155. // think this should always be fatal, could check link down here, or in general and flag as _shutdown.
  1156. StringBuffer msg("General failure communicating to slaves [");
  1157. Owned<IThorException> e = MakeThorException(0, "%s", msg.append(errorMsg).append("]").str());
  1158. e->setAction(tea_shutdown);
  1159. EXCLOG(e, NULL);
  1160. abort(e);
  1161. throw e.getClear();
  1162. }
  1163. if (sendOnly) return;
  1164. if (_replyTag)
  1165. *_replyTag = replyTag;
  1166. unsigned respondents = 0;
  1167. loop
  1168. {
  1169. rank_t sender;
  1170. CMessageBuffer msg;
  1171. if (!queryJobComm().recv(msg, RANK_ALL, replyTag, &sender, LONGTIMEOUT))
  1172. {
  1173. if (_replyTag) _replyTag = NULL;
  1174. StringBuffer tmpStr;
  1175. Owned<IException> e = MakeThorFatal(NULL, 0, "%s - Timeout receiving from slaves", errorMsg?tmpStr.append(": ").append(errorMsg).str():"");
  1176. EXCLOG(e, NULL);
  1177. throw e.getClear();
  1178. }
  1179. if (_replyTag) _replyTag = NULL;
  1180. bool error;
  1181. msg.read(error);
  1182. if (error)
  1183. {
  1184. Owned<IThorException> e = deserializeThorException(msg);
  1185. e->setSlave(sender);
  1186. throw e.getClear();
  1187. }
  1188. ++respondents;
  1189. if (respondents == querySlaveGroup().ordinality())
  1190. break;
  1191. }
  1192. }
  1193. CGraphBase *CJobMaster::createGraph()
  1194. {
  1195. return new CMasterGraph(*this);
  1196. }
  1197. void CJobMaster::initNodeDUCache()
  1198. {
  1199. if (!nodeDiskUsageCached)
  1200. {
  1201. nodeDiskUsageCached = true;
  1202. Owned<IPropertyTreeIterator> fileIter = &workunit->getFileIterator();
  1203. ForEach (*fileIter)
  1204. {
  1205. Owned<IDistributedFile> f = queryDistributedFileDirectory().lookup(fileIter->query().queryProp("@name"), userDesc);
  1206. if (f)
  1207. {
  1208. unsigned n = f->numParts();
  1209. for (unsigned i=0;i<n;i++)
  1210. {
  1211. Owned<IDistributedFilePart> part = f->getPart(i);
  1212. offset_t sz = part->getFileSize(false, false);
  1213. if (i>=nodeDiskUsage.ordinality())
  1214. nodeDiskUsage.append(sz);
  1215. else
  1216. {
  1217. sz += nodeDiskUsage.item(i);
  1218. nodeDiskUsage.add(sz, i);
  1219. }
  1220. }
  1221. }
  1222. }
  1223. }
  1224. }
  1225. IPropertyTree *CJobMaster::prepareWorkUnitInfo()
  1226. {
  1227. Owned<IPropertyTree> workUnitInfo = createPTree("workUnitInfo");
  1228. workUnitInfo->setProp("wuid", wuid);
  1229. workUnitInfo->setProp("user", user);
  1230. workUnitInfo->setProp("token", token);
  1231. workUnitInfo->setProp("scope", scope);
  1232. Owned<IConstWUPluginIterator> pluginIter = &queryWorkUnit().getPlugins();
  1233. IPropertyTree *plugins = NULL;
  1234. ForEach(*pluginIter)
  1235. {
  1236. IConstWUPlugin &thisplugin = pluginIter->query();
  1237. if (thisplugin.getPluginThor() || thisplugin.getPluginHole()) // JCSMORE ..Hole..
  1238. {
  1239. if (!plugins)
  1240. plugins = workUnitInfo->addPropTree("plugins", createPTree());
  1241. SCMStringBuffer name;
  1242. thisplugin.getPluginName(name);
  1243. IPropertyTree *plugin = plugins->addPropTree("plugin", createPTree());
  1244. plugin->setProp("@name", name.str());
  1245. }
  1246. }
  1247. IPropertyTree *debug = workUnitInfo->addPropTree("Debug", createPTree(ipt_caseInsensitive));
  1248. SCMStringBuffer debugStr, valueStr;
  1249. Owned<IStringIterator> debugIter = &queryWorkUnit().getDebugValues();
  1250. ForEach (*debugIter)
  1251. {
  1252. debugIter->str(debugStr);
  1253. queryWorkUnit().getDebugValue(debugStr.str(), valueStr);
  1254. debug->setProp(debugStr.str(), valueStr.str());
  1255. }
  1256. return workUnitInfo.getClear();
  1257. }
  1258. void CJobMaster::sendQuery()
  1259. {
  1260. CriticalBlock b(sendQueryCrit);
  1261. if (querySent) return;
  1262. CMessageBuffer msg;
  1263. msg.append(QueryInit);
  1264. msg.append(mpJobTag);
  1265. msg.append(slavemptag);
  1266. msg.append(queryWuid());
  1267. msg.append(graphName);
  1268. const char *soName = queryDllEntry().queryName();
  1269. PROGLOG("Query dll: %s", soName);
  1270. msg.append(soName);
  1271. msg.append(sendSo);
  1272. if (sendSo)
  1273. {
  1274. OwnedIFile iFile = createIFile(soName);
  1275. OwnedIFileIO iFileIO = iFile->open(IFOread);
  1276. size32_t sz = (size32_t)iFileIO->size();
  1277. msg.append(sz);
  1278. read(iFileIO, 0, sz, msg);
  1279. }
  1280. Owned<IPropertyTree> deps = createPTree(queryXGMML()->queryName());
  1281. Owned<IPropertyTreeIterator> edgeIter = queryXGMML()->getElements("edge"); // JCSMORE trim to those actually needed
  1282. ForEach (*edgeIter)
  1283. {
  1284. IPropertyTree &edge = edgeIter->query();
  1285. deps->addPropTree("edge", LINK(&edge));
  1286. }
  1287. Owned<IPropertyTree> workUnitInfo = prepareWorkUnitInfo();
  1288. workUnitInfo->serialize(msg);
  1289. deps->serialize(msg);
  1290. broadcastToSlaves(msg, masterSlaveMpTag, LONGTIMEOUT, "sendQuery");
  1291. queryJobManager().addCachedSo(soName);
  1292. querySent = true;
  1293. }
  1294. void CJobMaster::jobDone()
  1295. {
  1296. if (!querySent) return;
  1297. CMessageBuffer msg;
  1298. msg.append(QueryDone);
  1299. msg.append(queryKey());
  1300. broadcastToSlaves(msg, masterSlaveMpTag, LONGTIMEOUT, "jobDone");
  1301. }
  1302. bool CJobMaster::go()
  1303. {
  1304. class CWorkunitAbortHandler : public CInterface, implements IThreaded
  1305. {
  1306. CJobMaster &job;
  1307. IConstWorkUnit &wu;
  1308. CThreaded threaded;
  1309. bool running;
  1310. Semaphore sem;
  1311. public:
  1312. CWorkunitAbortHandler(CJobMaster &_job, IConstWorkUnit &_wu)
  1313. : job(_job), wu(_wu), threaded("WorkunitAbortHandler")
  1314. {
  1315. running = true;
  1316. wu.subscribe(SubscribeOptionAbort);
  1317. threaded.init(this);
  1318. }
  1319. ~CWorkunitAbortHandler()
  1320. {
  1321. stop();
  1322. threaded.join();
  1323. }
  1324. virtual void main()
  1325. {
  1326. while (running)
  1327. {
  1328. if (sem.wait(5000))
  1329. break; // signalled aborted
  1330. if (wu.aborting())
  1331. {
  1332. LOG(MCwarning, thorJob, "ABORT detected from user");
  1333. Owned <IException> e = MakeThorException(TE_WorkUnitAborting, "User signalled abort");
  1334. job.fireException(e);
  1335. break;
  1336. }
  1337. }
  1338. }
  1339. void stop() { running = false; sem.signal(); }
  1340. } wuAbortHandler(*this, *workunit);
  1341. class CWorkunitPauseHandler : public CInterface, implements ISDSSubscription
  1342. {
  1343. CJobMaster &job;
  1344. IConstWorkUnit &wu;
  1345. SubscriptionId subId;
  1346. bool subscribed;
  1347. CriticalSection crit;
  1348. public:
  1349. IMPLEMENT_IINTERFACE;
  1350. CWorkunitPauseHandler(CJobMaster &_job, IConstWorkUnit &_wu) : job(_job), wu(_wu)
  1351. {
  1352. StringBuffer xpath("/WorkUnits/");
  1353. SCMStringBuffer istr;
  1354. wu.getWuid(istr);
  1355. xpath.append(istr.str()).append("/Action");
  1356. subId = querySDS().subscribe(xpath.str(), *this, false, true);
  1357. subscribed = true;
  1358. }
  1359. ~CWorkunitPauseHandler() { stop(); }
  1360. void stop()
  1361. {
  1362. CriticalBlock b(crit);
  1363. if (subscribed)
  1364. {
  1365. subscribed = false;
  1366. querySDS().unsubscribe(subId);
  1367. }
  1368. }
  1369. void notify(SubscriptionId id, const char *xpath, SDSNotifyFlags flags, unsigned valueLen, const void *valueData)
  1370. {
  1371. CriticalBlock b(crit);
  1372. if (!subscribed) return;
  1373. job.markWuDirty();
  1374. bool abort = false;
  1375. bool pause = false;
  1376. if (valueLen && valueLen==strlen("pause") && (0 == strncmp("pause", (const char *)valueData, valueLen)))
  1377. {
  1378. // pause after current subgraph
  1379. pause = true;
  1380. }
  1381. else if (valueLen && valueLen==strlen("pausenow") && (0 == strncmp("pausenow", (const char *)valueData, valueLen)))
  1382. {
  1383. // abort current subgraph
  1384. abort = true;
  1385. pause = true;
  1386. }
  1387. else
  1388. {
  1389. abort = pause = false;
  1390. }
  1391. if (pause)
  1392. {
  1393. PROGLOG("Pausing job%s", abort?" [now]":"");
  1394. job.stop(abort);
  1395. }
  1396. }
  1397. } workunitPauseHandler(*this, *workunit);
  1398. class CQueryTimeoutHandler : public CTimeoutTrigger
  1399. {
  1400. CJobMaster &job;
  1401. public:
  1402. CQueryTimeoutHandler(CJobMaster &_job, unsigned timeout) : CTimeoutTrigger(timeout, "QUERY"), job(_job)
  1403. {
  1404. inform(MakeThorException(TE_QueryTimeoutError, "Query took greater than %d seconds", timeout));
  1405. }
  1406. virtual bool action()
  1407. {
  1408. job.fireException(LINK(exception));
  1409. return true;
  1410. }
  1411. private:
  1412. graph_id graphId;
  1413. };
  1414. Owned<CTimeoutTrigger> qtHandler;
  1415. int guillotineTimeout = workunit->getDebugValueInt("maxRunTime", 0);
  1416. if (guillotineTimeout > 0)
  1417. qtHandler.setown(new CQueryTimeoutHandler(*this, guillotineTimeout));
  1418. else if (guillotineTimeout < 0)
  1419. {
  1420. Owned<IException> e = MakeStringException(0, "Ignoring negative maxRunTime: %d", guillotineTimeout);
  1421. reportExceptionToWorkunit(*workunit, e);
  1422. }
  1423. if (WUActionPause == workunit->getAction() || WUActionPauseNow == workunit->getAction())
  1424. throw MakeStringException(0, "Job paused at start, exiting");
  1425. Owned<IConstWUGraphProgress> graphProgress = getGraphProgress();
  1426. bool allDone = true;
  1427. try
  1428. {
  1429. ClearTempDirs();
  1430. Owned<IWUGraphProgress> progress = graphProgress->update();
  1431. progress->setGraphState(WUGraphRunning);
  1432. progress.clear();
  1433. Owned<IThorGraphIterator> iter = getSubGraphs();
  1434. CopyCIArrayOf<CMasterGraph> toRun;
  1435. ForEach(*iter)
  1436. {
  1437. CMasterGraph &graph = (CMasterGraph &)iter->query();
  1438. if ((queryResumed() || queryUseCheckpoints()) && WUGraphComplete == graphProgress->queryNodeState(graph.queryGraphId()))
  1439. graph.setCompleteEx();
  1440. else
  1441. toRun.append(graph);
  1442. }
  1443. graphProgress.clear();
  1444. ForEachItemInRev(g, toRun)
  1445. {
  1446. if (aborted) break;
  1447. CMasterGraph &graph = toRun.item(g);
  1448. if (graph.isSink())
  1449. graph.execute(0, NULL, true, true);
  1450. if (queryPausing()) break;
  1451. }
  1452. graphExecutor->wait();
  1453. workunitPauseHandler.stop();
  1454. ForEachItemIn(tr, toRun)
  1455. {
  1456. CMasterGraph &graph = toRun.item(tr);
  1457. if (!graph.isComplete())
  1458. {
  1459. allDone = false;
  1460. break;
  1461. }
  1462. }
  1463. }
  1464. catch (IException *e) { fireException(e); e->Release(); }
  1465. catch (CATCHALL) { Owned<IException> e = MakeThorException(0, "Unknown exception running sub graphs"); fireException(e); }
  1466. graphProgress.setown(getGraphProgress());
  1467. Owned<IWUGraphProgress> progress = graphProgress->update();
  1468. progress->setGraphState(aborted?WUGraphFailed:(allDone?WUGraphComplete:(pausing?WUGraphPaused:WUGraphComplete)));
  1469. progress.clear();
  1470. graphProgress.clear();
  1471. if (queryPausing())
  1472. {
  1473. assertex(!queryUseCheckpoints()); // JCSMORE - checkpoints probably need revisiting
  1474. // stash away spills ready for resume, make them owned by workunit in event of abort/delete
  1475. Owned<IFileUsageIterator> iter = queryTempHandler()->getIterator();
  1476. ForEach(*iter)
  1477. {
  1478. CFileUsageEntry &entry = iter->query();
  1479. StringAttr tmpName = entry.queryName();
  1480. Owned<IConstWUGraphProgress> graphProgress = getGraphProgress();
  1481. if (WUGraphComplete == graphProgress->queryNodeState(entry.queryGraphId()))
  1482. {
  1483. IArrayOf<IGroup> groups;
  1484. StringArray clusters;
  1485. fillClusterArray(*this, tmpName, clusters, groups);
  1486. Owned<IFileDescriptor> fileDesc = queryThorFileManager().create(*this, tmpName, clusters, groups, true, TDXtemporary|TDWnoreplicate);
  1487. fileDesc->queryProperties().setPropBool("@pausefile", true); // JCSMORE - mark to keep, may be able to distinguish via other means
  1488. IPropertyTree &props = fileDesc->queryProperties();
  1489. props.setPropBool("@owned", true);
  1490. bool blockCompressed=true; // JCSMORE, should come from helper really
  1491. if (blockCompressed)
  1492. props.setPropBool("@blockCompressed", true);
  1493. Owned<IDistributedFile> file = queryDistributedFileDirectory().createNew(fileDesc);
  1494. // NB: This is renaming/moving from temp path
  1495. StringBuffer newName;
  1496. queryThorFileManager().addScope(*this, tmpName, newName, true, true);
  1497. verifyex(file->renamePhysicalPartFiles(newName.str(), NULL, 0, NULL, queryBaseDirectory()));
  1498. file->attach(newName);
  1499. Owned<IWorkUnit> wu = &queryWorkUnit().lock();
  1500. wu->addFile(newName, &clusters, entry.queryUsage(), entry.queryKind(), queryGraphName());
  1501. }
  1502. }
  1503. }
  1504. Owned<IException> jobDoneException;
  1505. try { jobDone(); }
  1506. catch (IException *e)
  1507. {
  1508. EXCLOG(e, NULL);
  1509. jobDoneException.setown(e);
  1510. }
  1511. fatalHandler->clear();
  1512. queryTempHandler()->clearTemps();
  1513. slaveMsgHandler->stop();
  1514. if (jobDoneException.get())
  1515. throw LINK(jobDoneException);
  1516. return allDone;
  1517. }
  1518. void CJobMaster::stop(bool doAbort)
  1519. {
  1520. pausing = true;
  1521. if (doAbort)
  1522. {
  1523. queryJobManager().replyException(*this, NULL);
  1524. Owned<IException> e = MakeThorException(0, "Unable to recover from pausenow");
  1525. fatalHandler->inform(e.getClear());
  1526. abort(e);
  1527. }
  1528. }
  1529. __int64 CJobMaster::queryNodeDiskUsage(unsigned node)
  1530. {
  1531. initNodeDUCache();
  1532. if (!nodeDiskUsage.isItem(node)) return 0;
  1533. return nodeDiskUsage.item(node);
  1534. }
  1535. void CJobMaster::setNodeDiskUsage(unsigned node, __int64 sz)
  1536. {
  1537. initNodeDUCache();
  1538. while (nodeDiskUsage.ordinality() <= node)
  1539. nodeDiskUsage.append(0);
  1540. nodeDiskUsage.replace(sz, node);
  1541. }
  1542. __int64 CJobMaster::addNodeDiskUsage(unsigned node, __int64 sz)
  1543. {
  1544. sz += queryNodeDiskUsage(node);
  1545. setNodeDiskUsage(node, sz);
  1546. return sz;
  1547. }
  1548. bool CJobMaster::queryCreatedFile(const char *file)
  1549. {
  1550. StringBuffer scopedName;
  1551. queryThorFileManager().addScope(*this, file, scopedName, false);
  1552. return (NotFound != createdFiles.find(scopedName.str()));
  1553. }
  1554. void CJobMaster::addCreatedFile(const char *file)
  1555. {
  1556. StringBuffer scopedName;
  1557. queryThorFileManager().addScope(*this, file, scopedName, false);
  1558. createdFiles.append(scopedName.str());
  1559. }
  1560. __int64 CJobMaster::getWorkUnitValueInt(const char *prop, __int64 defVal) const
  1561. {
  1562. return queryWorkUnit().getDebugValueInt64(prop, defVal);
  1563. }
  1564. StringBuffer &CJobMaster::getWorkUnitValue(const char *prop, StringBuffer &str) const
  1565. {
  1566. SCMStringBuffer scmStr;
  1567. queryWorkUnit().getDebugValue(prop, scmStr);
  1568. return str.append(scmStr.str());
  1569. }
  1570. IBarrier *CJobMaster::createBarrier(mptag_t tag)
  1571. {
  1572. return new CBarrierMaster(*jobComm, tag);
  1573. }
  1574. IGraphTempHandler *CJobMaster::createTempHandler()
  1575. {
  1576. return new CMasterGraphTempHandler(*this);
  1577. }
  1578. bool CJobMaster::fireException(IException *e)
  1579. {
  1580. IThorException *te = QUERYINTERFACE(e, IThorException);
  1581. ThorExceptionAction action;
  1582. if (!te) action = tea_null;
  1583. else action = te->queryAction();
  1584. if (QUERYINTERFACE(e, IMP_Exception) && MPERR_link_closed==e->errorCode())
  1585. action = tea_shutdown;
  1586. else if (QUERYINTERFACE(e, ISEH_Exception))
  1587. action = tea_shutdown;
  1588. CriticalBlock b(exceptCrit);
  1589. switch (action)
  1590. {
  1591. case tea_warning:
  1592. LOG(MCwarning, thorJob, e);
  1593. reportExceptionToWorkunit(*workunit, e);
  1594. break;
  1595. default:
  1596. {
  1597. LOG(MCerror, thorJob, e);
  1598. queryJobManager().replyException(*this, e);
  1599. fatalHandler->inform(LINK(e));
  1600. try { abort(e); }
  1601. catch (IException *e)
  1602. {
  1603. Owned<IThorException> te = ThorWrapException(e, "Error aborting job, will cause thor restart");
  1604. e->Release();
  1605. reportExceptionToWorkunit(*workunit, te);
  1606. action = tea_shutdown;
  1607. }
  1608. if (tea_shutdown == action)
  1609. queryJobManager().stop();
  1610. }
  1611. }
  1612. return true;
  1613. }
  1614. ///////////////////
  1615. //
  1616. // CMasterGraph impl.
  1617. //
  1618. CMasterGraph::CMasterGraph(CJobMaster &_job) : CGraphBase(_job), jobM(_job)
  1619. {
  1620. mpTag = job.allocateMPTag();
  1621. startBarrierTag = job.allocateMPTag();
  1622. waitBarrierTag = job.allocateMPTag();
  1623. startBarrier = job.createBarrier(startBarrierTag);
  1624. waitBarrier = job.createBarrier(waitBarrierTag);
  1625. bcastTag = TAG_NULL;
  1626. }
  1627. CMasterGraph::~CMasterGraph()
  1628. {
  1629. job.freeMPTag(mpTag);
  1630. job.freeMPTag(startBarrierTag);
  1631. job.freeMPTag(waitBarrierTag);
  1632. if (TAG_NULL != doneBarrierTag)
  1633. job.freeMPTag(doneBarrierTag);
  1634. if (TAG_NULL != executeReplyTag)
  1635. job.freeMPTag(executeReplyTag);
  1636. }
  1637. void CMasterGraph::init()
  1638. {
  1639. CGraphBase::init();
  1640. if (queryOwner() && isGlobal())
  1641. {
  1642. doneBarrierTag = job.allocateMPTag();
  1643. doneBarrier = job.createBarrier(doneBarrierTag);
  1644. }
  1645. }
  1646. bool CMasterGraph::fireException(IException *e)
  1647. {
  1648. IThorException *te = QUERYINTERFACE(e, IThorException);
  1649. ThorExceptionAction action;
  1650. if (!te) action = tea_null;
  1651. else action = te->queryAction();
  1652. if (QUERYINTERFACE(e, IMP_Exception) && MPERR_link_closed==e->errorCode())
  1653. action = tea_shutdown;
  1654. else if (QUERYINTERFACE(e, ISEH_Exception))
  1655. action = tea_shutdown;
  1656. CriticalBlock b(exceptCrit);
  1657. switch (action)
  1658. {
  1659. case tea_warning:
  1660. LOG(MCwarning, thorJob, e);
  1661. reportExceptionToWorkunit(job.queryWorkUnit(), e);
  1662. break;
  1663. case tea_abort:
  1664. abort(e);
  1665. // fall through
  1666. default:
  1667. {
  1668. LOG(MCerror, thorJob, e);
  1669. if (NULL != fatalHandler)
  1670. fatalHandler->inform(LINK(e));
  1671. if (owner)
  1672. owner->fireException(e);
  1673. else
  1674. job.fireException(e);
  1675. }
  1676. }
  1677. return true;
  1678. }
  1679. void CMasterGraph::abort(IException *e)
  1680. {
  1681. if (aborted) return;
  1682. try{ CGraphBase::abort(e); }
  1683. catch (IException *e)
  1684. {
  1685. GraphPrintLog(e, "Aborting master graph");
  1686. e->Release();
  1687. }
  1688. if (TAG_NULL != bcastTag)
  1689. job.queryJobComm().cancel(0, bcastTag);
  1690. if (started)
  1691. {
  1692. try
  1693. {
  1694. CMessageBuffer msg;
  1695. msg.append(GraphAbort);
  1696. msg.append(job.queryKey());
  1697. msg.append(queryGraphId());
  1698. jobM.broadcastToSlaves(msg, masterSlaveMpTag, LONGTIMEOUT, "abort", &bcastTag);
  1699. }
  1700. catch (IException *e)
  1701. {
  1702. GraphPrintLog(e, "Aborting slave graph");
  1703. if (abortException)
  1704. throw LINK(abortException);
  1705. throw;
  1706. }
  1707. }
  1708. if (!queryOwner())
  1709. {
  1710. if (globals->getPropBool("@watchdogProgressEnabled"))
  1711. queryJobManager().queryDeMonServer()->endGraph(this, true);
  1712. }
  1713. }
  1714. void CMasterGraph::createForLocal()
  1715. {
  1716. CriticalBlock b(createdCrit);
  1717. Owned<IThorActivityIterator> iter = getIterator();
  1718. ForEach (*iter)
  1719. {
  1720. CMasterGraphElement &element = (CMasterGraphElement &)iter->query();
  1721. element.doCreateActivity();
  1722. }
  1723. }
  1724. void CMasterGraph::serializeCreateContexts(MemoryBuffer &mb, bool created)
  1725. {
  1726. CGraphBase::serializeCreateContexts(mb, created);
  1727. Owned<IThorActivityIterator> iter = created ? getCreatedIterator() : getTraverseIterator(); // JCSMORE - created == true always surely?
  1728. ForEach (*iter)
  1729. {
  1730. CMasterGraphElement &element = (CMasterGraphElement &)iter->query();
  1731. if (reinit || !element.sentCreateCtx)
  1732. element.sentCreateCtx = true;
  1733. }
  1734. }
  1735. void CMasterGraph::serializeStartCtxs(MemoryBuffer &mb)
  1736. {
  1737. Owned<IThorActivityIterator> iter = getTraverseIterator();
  1738. ForEach (*iter)
  1739. {
  1740. CMasterGraphElement &element = (CMasterGraphElement &)iter->query();
  1741. mb.append(element.queryId());
  1742. unsigned pos = mb.length();
  1743. mb.append((size32_t)0);
  1744. element.queryHelper()->serializeStartContext(mb);
  1745. size32_t sz = (mb.length()-pos)-sizeof(size32_t);
  1746. mb.writeDirect(pos, sizeof(sz), &sz);
  1747. }
  1748. mb.append((activity_id)0);
  1749. }
  1750. bool CMasterGraph::serializeActivityInitData(unsigned slave, MemoryBuffer &mb)
  1751. {
  1752. unsigned pos=mb.length();
  1753. mb.append((unsigned)0);
  1754. Owned<IThorActivityIterator> iter = (queryOwner() && !isGlobal()) ? getCreatedIterator() : getTraverseIterator();
  1755. ForEach (*iter)
  1756. {
  1757. CMasterGraphElement &element = (CMasterGraphElement &)iter->query();
  1758. if (!element.sentActInitData->testSet(slave))
  1759. {
  1760. CMasterActivity *activity = (CMasterActivity *)element.queryActivity();
  1761. if (activity)
  1762. {
  1763. mb.append(element.queryId());
  1764. unsigned pos = mb.length();
  1765. mb.append((size32_t)0);
  1766. activity->serializeSlaveData(mb, slave);
  1767. size32_t sz = (mb.length()-pos)-sizeof(size32_t);
  1768. mb.writeDirect(pos, sizeof(sz), &sz);
  1769. }
  1770. }
  1771. }
  1772. if (pos == (mb.length()-sizeof(unsigned)))
  1773. return false;
  1774. mb.append((activity_id)0); // terminator
  1775. unsigned len=mb.length()-pos-sizeof(unsigned);
  1776. mb.writeDirect(pos, sizeof(len), &len);
  1777. return true;
  1778. }
  1779. void CMasterGraph::readActivityInitData(MemoryBuffer &mb, unsigned slave)
  1780. {
  1781. loop
  1782. {
  1783. activity_id id;
  1784. mb.read(id);
  1785. if (0 == id)
  1786. break;
  1787. size32_t dataLen;
  1788. mb.read(dataLen);
  1789. CGraphElementBase *element = queryElement(id);
  1790. MemoryBuffer &mbDst = element->queryActivity()->queryInitializationData(slave);
  1791. mbDst.append(dataLen, mb.readDirect(dataLen));
  1792. }
  1793. }
  1794. bool CMasterGraph::prepare(size32_t parentExtractSz, const byte *parentExtract, bool checkDependencies, bool shortCircuit, bool async)
  1795. {
  1796. if (!CGraphBase::prepare(parentExtractSz, parentExtract, checkDependencies, shortCircuit, async)) return false;
  1797. if (aborted) return false;
  1798. return true;
  1799. }
  1800. void CMasterGraph::create(size32_t parentExtractSz, const byte *parentExtract)
  1801. {
  1802. {
  1803. CriticalBlock b(createdCrit);
  1804. CGraphBase::create(parentExtractSz, parentExtract);
  1805. }
  1806. if (!aborted)
  1807. {
  1808. if (!queryOwner()) // owning graph sends query+child graphs
  1809. {
  1810. jobM.sendQuery(); // if not previously sent
  1811. if (globals->getPropBool("@watchdogProgressEnabled"))
  1812. queryJobManager().queryDeMonServer()->startGraph(this);
  1813. sendGraph(); // sends child graphs at same time
  1814. }
  1815. else
  1816. {
  1817. if (isGlobal())
  1818. {
  1819. ForEachItemIn(i, ifs)
  1820. {
  1821. CGraphElementBase &ifElem = ifs.item(i);
  1822. if (ifElem.newWhichBranch)
  1823. {
  1824. sentInitData = false; // force re-request of create data.
  1825. break;
  1826. }
  1827. }
  1828. if (reinit || !sentInitData)
  1829. {
  1830. sentInitData = true;
  1831. CMessageBuffer msg;
  1832. serializeCreateContexts(msg, false);
  1833. try
  1834. {
  1835. jobM.broadcastToSlaves(msg, mpTag, LONGTIMEOUT, "serializeCreateContexts", &bcastTag);
  1836. }
  1837. catch (IException *e)
  1838. {
  1839. GraphPrintLog(e, "Aborting graph create");
  1840. if (abortException)
  1841. throw LINK(abortException);
  1842. throw;
  1843. }
  1844. }
  1845. else
  1846. {
  1847. CMessageBuffer msg;
  1848. msg.append((unsigned)0);
  1849. try
  1850. {
  1851. jobM.broadcastToSlaves(msg, mpTag, LONGTIMEOUT, "serializeCreateContexts", &bcastTag);
  1852. }
  1853. catch (IException *e)
  1854. {
  1855. GraphPrintLog(e, "Aborting graph create(2)");
  1856. if (abortException)
  1857. throw LINK(abortException);
  1858. throw;
  1859. }
  1860. }
  1861. }
  1862. }
  1863. }
  1864. }
  1865. void CMasterGraph::sendActivityInitData()
  1866. {
  1867. CMessageBuffer msg;
  1868. mptag_t replyTag = createReplyTag();
  1869. msg.setReplyTag(replyTag);
  1870. unsigned pos = msg.length();
  1871. unsigned w=0;
  1872. unsigned sentTo = 0;
  1873. for (; w<queryJob().querySlaves(); w++)
  1874. {
  1875. unsigned needActInit = 0;
  1876. Owned<IThorActivityIterator> iter = (queryOwner() && !isGlobal()) ? getCreatedIterator() : getTraverseIterator();
  1877. ForEach(*iter)
  1878. {
  1879. CGraphElementBase &element = iter->query();
  1880. if (!element.sentActInitData->test(w)) // has it been sent
  1881. ++needActInit;
  1882. }
  1883. if (needActInit)
  1884. {
  1885. try
  1886. {
  1887. msg.rewrite(pos);
  1888. serializeActivityInitData(w, msg);
  1889. }
  1890. catch (IException *e)
  1891. {
  1892. GraphPrintLog(e, NULL);
  1893. throw;
  1894. }
  1895. if (!job.queryJobComm().send(msg, w+1, mpTag, LONGTIMEOUT))
  1896. {
  1897. StringBuffer epStr;
  1898. throw MakeStringException(0, "Timeout sending to slave %s", job.querySlaveGroup().queryNode(w).endpoint().getUrlStr(epStr).str());
  1899. }
  1900. ++sentTo;
  1901. }
  1902. }
  1903. if (sentTo)
  1904. {
  1905. assertex(sentTo == queryJob().querySlaves());
  1906. w=0;
  1907. Owned<IException> e;
  1908. // now get back initialization data from graph tag
  1909. for (; w<queryJob().querySlaves(); w++)
  1910. {
  1911. rank_t sender;
  1912. msg.clear();
  1913. if (!job.queryJobComm().recv(msg, w+1, replyTag, &sender, LONGTIMEOUT))
  1914. throw MakeStringException(0, "Timeout receiving from slaves after graph sent");
  1915. bool error;
  1916. msg.read(error);
  1917. if (error)
  1918. {
  1919. Owned<IThorException> se = deserializeThorException(msg);
  1920. se->setSlave(sender);
  1921. if (!e.get())
  1922. {
  1923. StringBuffer tmpStr("Slave ");
  1924. queryJob().queryJobGroup().queryNode(sender).endpoint().getUrlStr(tmpStr);
  1925. GraphPrintLog(se, "%s", tmpStr.append(": slave initialization error").str());
  1926. e.setown(se.getClear());
  1927. }
  1928. continue; // to read other slave responses.
  1929. }
  1930. readActivityInitData(msg, w);
  1931. }
  1932. if (e.get())
  1933. throw LINK(e);
  1934. }
  1935. }
  1936. void CMasterGraph::serializeGraphInit(MemoryBuffer &mb)
  1937. {
  1938. mb.append(graphId);
  1939. mb.append(reinit);
  1940. serializeMPtag(mb, mpTag);
  1941. mb.append((int)startBarrierTag);
  1942. mb.append((int)waitBarrierTag);
  1943. mb.append((int)doneBarrierTag);
  1944. mb.append(queryChildGraphCount());
  1945. Owned<IThorGraphIterator> childIter = getChildGraphs();
  1946. ForEach (*childIter)
  1947. {
  1948. CMasterGraph &childGraph = (CMasterGraph &)childIter->query();
  1949. childGraph.serializeGraphInit(mb);
  1950. }
  1951. }
  1952. void CMasterGraph::executeSubGraph(size32_t parentExtractSz, const byte *parentExtract)
  1953. {
  1954. if (job.queryResumed()) // skip complete subgraph if resuming. NB: temp spill have been tucked away for this purpose when paused.
  1955. {
  1956. if (!queryOwner())
  1957. {
  1958. Owned<IConstWUGraphProgress> graphProgress = ((CJobMaster &)job).getGraphProgress();
  1959. if (WUGraphComplete == graphProgress->queryNodeState(graphId))
  1960. setCompleteEx();
  1961. }
  1962. }
  1963. if (isComplete())
  1964. return;
  1965. fatalHandler.clear();
  1966. fatalHandler.setown(new CFatalHandler(globals->getPropInt("@fatal_timeout", FATAL_TIMEOUT)));
  1967. CGraphBase::executeSubGraph(parentExtractSz, parentExtract);
  1968. if (TAG_NULL != executeReplyTag)
  1969. {
  1970. rank_t sender;
  1971. unsigned s=0;
  1972. for (; s<queryJob().querySlaves(); s++)
  1973. {
  1974. CMessageBuffer msg;
  1975. if (!queryJob().queryJobComm().recv(msg, RANK_ALL, executeReplyTag, &sender))
  1976. break;
  1977. bool error;
  1978. msg.read(error);
  1979. if (error)
  1980. {
  1981. Owned<IThorException> exception = deserializeThorException(msg);
  1982. exception->setSlave(sender);
  1983. GraphPrintLog(exception, "slave execute reply exception");
  1984. throw exception.getClear();
  1985. }
  1986. }
  1987. if (fatalHandler)
  1988. fatalHandler->clear();
  1989. }
  1990. fatalHandler.clear();
  1991. }
  1992. void CMasterGraph::sendGraph()
  1993. {
  1994. CTimeMon atimer;
  1995. CMessageBuffer msg;
  1996. msg.append(GraphInit);
  1997. msg.append(job.queryKey());
  1998. node->serialize(msg); // everthing
  1999. if (TAG_NULL == executeReplyTag)
  2000. executeReplyTag = queryJob().allocateMPTag();
  2001. serializeMPtag(msg, executeReplyTag);
  2002. serializeCreateContexts(msg, false);
  2003. serializeGraphInit(msg);
  2004. // slave graph data
  2005. try
  2006. {
  2007. jobM.broadcastToSlaves(msg, masterSlaveMpTag, LONGTIMEOUT, "sendGraph", &bcastTag);
  2008. }
  2009. catch (IException *e)
  2010. {
  2011. GraphPrintLog(e, "Aborting sendGraph");
  2012. if (abortException)
  2013. throw LINK(abortException);
  2014. throw;
  2015. }
  2016. GraphPrintLog("sendGraph took %d ms", atimer.elapsed());
  2017. }
  2018. bool CMasterGraph::preStart(size32_t parentExtractSz, const byte *parentExtract)
  2019. {
  2020. started = true;
  2021. GraphPrintLog("Processing graph");
  2022. if (!sentStartCtx || reinit)
  2023. {
  2024. sentStartCtx = true;
  2025. CMessageBuffer msg;
  2026. serializeStartCtxs(msg);
  2027. try
  2028. {
  2029. jobM.broadcastToSlaves(msg, mpTag, LONGTIMEOUT, "startCtx", &bcastTag, true);
  2030. }
  2031. catch (IException *e)
  2032. {
  2033. GraphPrintLog(e, "Aborting preStart");
  2034. if (abortException)
  2035. throw LINK(abortException);
  2036. throw;
  2037. }
  2038. }
  2039. if (!queryOwner() || isGlobal())
  2040. sendActivityInitData(); // has to be done at least once
  2041. CGraphBase::preStart(parentExtractSz, parentExtract);
  2042. if (isGlobal())
  2043. {
  2044. if (!startBarrier->wait(false)) // ensure all graphs are at start point at same time, as some may request data from each other.
  2045. return false;
  2046. }
  2047. if (!queryOwner())
  2048. {
  2049. Owned<IConstWUGraphProgress> graphProgress = ((CJobMaster &)job).getGraphProgress();
  2050. Owned<IWUGraphProgress> progress = graphProgress->update();
  2051. progress->setNodeState(graphId, WUGraphRunning);
  2052. progress.clear();
  2053. }
  2054. return true;
  2055. }
  2056. void CMasterGraph::handleSlaveDone(unsigned node, MemoryBuffer &mb)
  2057. {
  2058. graph_id gid;
  2059. mb.read(gid);
  2060. assertex(gid == queryGraphId());
  2061. unsigned count;
  2062. mb.read(count);
  2063. while (count--)
  2064. {
  2065. activity_id activityId;
  2066. mb.read(activityId);
  2067. CMasterGraphElement *act = (CMasterGraphElement *)queryElement(activityId);
  2068. unsigned len;
  2069. mb.read(len);
  2070. const void *d = mb.readDirect(len);
  2071. MemoryBuffer sdMb;
  2072. sdMb.setBuffer(len, (void *)d);
  2073. act->slaveDone(node, sdMb);
  2074. }
  2075. }
  2076. void CMasterGraph::getFinalProgress()
  2077. {
  2078. offset_t totalDiskUsage = 0;
  2079. offset_t minNodeDiskUsage = 0, maxNodeDiskUsage = 0;
  2080. unsigned maxNode = (unsigned)-1, minNode = (unsigned)-1;
  2081. CMessageBuffer msg;
  2082. mptag_t replyTag = createReplyTag();
  2083. msg.setReplyTag(replyTag);
  2084. msg.append((unsigned)GraphEnd);
  2085. msg.append(job.queryKey());
  2086. msg.append(queryGraphId());
  2087. if (!job.queryJobComm().send(msg, RANK_ALL_OTHER, masterSlaveMpTag, LONGTIMEOUT))
  2088. throw MakeStringException(0, "Timeout sending to slaves");
  2089. unsigned n=queryJob().querySlaves();
  2090. while (n--)
  2091. {
  2092. rank_t sender;
  2093. if (!job.queryJobComm().recv(msg, RANK_ALL, replyTag, &sender, LONGTIMEOUT))
  2094. {
  2095. GraphPrintLog("Timeout receiving final progress from slaves, %d slaves did not respond", n+1);
  2096. return;
  2097. }
  2098. bool error;
  2099. msg.read(error);
  2100. if (error)
  2101. {
  2102. Owned<IThorException> e = deserializeThorException(msg);
  2103. e->setSlave(sender);
  2104. throw e.getClear();
  2105. }
  2106. if (0 == msg.remaining())
  2107. continue;
  2108. handleSlaveDone(sender-1, msg);
  2109. if (!queryOwner())
  2110. {
  2111. if (globals->getPropBool("@watchdogProgressEnabled"))
  2112. {
  2113. HeartBeatPacket &hb = *(HeartBeatPacket *) msg.readDirect(sizeof(hb.packetsize));
  2114. if (hb.packetsize)
  2115. {
  2116. size32_t sz = hb.packetsize-sizeof(hb.packetsize);
  2117. if (sz)
  2118. {
  2119. msg.readDirect(sz);
  2120. try
  2121. {
  2122. queryJobManager().queryDeMonServer()->takeHeartBeat(hb);
  2123. }
  2124. catch (IException *e)
  2125. {
  2126. GraphPrintLog(e, "Failure whilst deserializing stats/progress");
  2127. e->Release();
  2128. }
  2129. }
  2130. }
  2131. }
  2132. }
  2133. offset_t nodeDiskUsage;
  2134. msg.read(nodeDiskUsage);
  2135. jobM.setNodeDiskUsage(n, nodeDiskUsage);
  2136. if (nodeDiskUsage > maxNodeDiskUsage)
  2137. {
  2138. maxNodeDiskUsage = nodeDiskUsage;
  2139. maxNode = n;
  2140. }
  2141. if ((unsigned)-1 == minNode || nodeDiskUsage < minNodeDiskUsage)
  2142. {
  2143. minNodeDiskUsage = nodeDiskUsage;
  2144. minNode = n;
  2145. }
  2146. totalDiskUsage += nodeDiskUsage;
  2147. Owned<ITimeReporter> slaveReport = createStdTimeReporter(msg);
  2148. queryJob().queryTimeReporter().merge(*slaveReport);
  2149. }
  2150. if (totalDiskUsage)
  2151. {
  2152. Owned<IWorkUnit> wu = &job.queryWorkUnit().lock();
  2153. wu->addDiskUsageStats(totalDiskUsage/queryJob().querySlaves(), minNode, minNodeDiskUsage, maxNode, maxNodeDiskUsage, queryGraphId());
  2154. }
  2155. }
  2156. void CMasterGraph::done()
  2157. {
  2158. if (started)
  2159. {
  2160. if (!aborted && (!queryOwner() || isGlobal()))
  2161. getFinalProgress(); // waiting for slave graph to finish and send final progress update + extra act end info.
  2162. }
  2163. CGraphBase::done();
  2164. if (started && !queryOwner())
  2165. {
  2166. if (globals->getPropBool("@watchdogProgressEnabled"))
  2167. queryJobManager().queryDeMonServer()->endGraph(this, true);
  2168. }
  2169. if (!queryOwner())
  2170. {
  2171. if (queryJob().queryTimeReporter().numSections())
  2172. {
  2173. if (globals->getPropBool("@reportTimingsToWorkunit", true))
  2174. {
  2175. struct CReport : implements ITimeReportInfo
  2176. {
  2177. Owned<IWorkUnit> wu;
  2178. CGraphBase &graph;
  2179. CReport(CGraphBase &_graph) : graph(_graph)
  2180. {
  2181. wu.setown(&graph.queryJob().queryWorkUnit().lock());
  2182. }
  2183. virtual void report(const char *name, const __int64 totaltime, const __int64 maxtime, const unsigned count)
  2184. {
  2185. StringBuffer timerStr(graph.queryJob().queryGraphName());
  2186. timerStr.append("(").append(graph.queryGraphId()).append("): ");
  2187. timerStr.append(name);
  2188. wu->setTimerInfo(timerStr.str(), NULL, (unsigned)totaltime, count, (unsigned)maxtime);
  2189. }
  2190. } wureport(*this);
  2191. queryJob().queryTimeReporter().report(wureport);
  2192. }
  2193. else
  2194. queryJob().queryTimeReporter().printTimings();
  2195. }
  2196. }
  2197. }
  2198. void CMasterGraph::setComplete(bool tf)
  2199. {
  2200. CGraphBase::setComplete(tf);
  2201. if (tf && !queryOwner())
  2202. {
  2203. Owned<IConstWUGraphProgress> graphProgress = ((CJobMaster &)job).getGraphProgress();
  2204. Owned<IWUGraphProgress> progress = graphProgress->update();
  2205. progress->setNodeState(graphId, graphDone?WUGraphComplete:WUGraphFailed);
  2206. progress.clear();
  2207. }
  2208. }
  2209. bool CMasterGraph::deserializeStats(unsigned node, MemoryBuffer &mb)
  2210. {
  2211. unsigned count, _count;
  2212. mb.read(count);
  2213. _count = count;
  2214. while (count--)
  2215. {
  2216. activity_id activityId;
  2217. mb.read(activityId);
  2218. CMasterActivity *activity = NULL;
  2219. CMasterGraphElement *element = (CMasterGraphElement *)queryElement(activityId);
  2220. if (element)
  2221. {
  2222. activity = (CMasterActivity *)element->queryActivity();
  2223. if (!activity)
  2224. {
  2225. CGraphBase *parentGraph = element->queryOwner().queryOwner(); // i.e. am I in a childgraph
  2226. if (!parentGraph)
  2227. {
  2228. GraphPrintLog("Activity id=%"ACTPF"d not created in master and not a child query activity", activityId);
  2229. return false; // don't know if or how this could happen, but all bets off with packet if did.
  2230. }
  2231. Owned<IException> e;
  2232. try
  2233. {
  2234. activity = (CMasterActivity *)element->factorySet(TAKnull); // purely as a place holder for graph progress deserialization/serialization
  2235. created = true; // means some activities created within this graph
  2236. }
  2237. catch (IException *_e) { e.setown(_e); GraphPrintLog(_e, NULL); }
  2238. if (!activity || e.get())
  2239. {
  2240. GraphPrintLog("Activity id=%"ACTPF"d failed to created child query activity ready for progress", activityId);
  2241. return false;
  2242. }
  2243. }
  2244. if (activity)
  2245. activity->deserializeStats(node, mb);
  2246. }
  2247. else
  2248. {
  2249. GraphPrintLog("Failed to find activity, during progress deserialization, id=%"ACTPF"d", activityId);
  2250. return false; // don't know if or how this could happen, but all bets off with packet if did.
  2251. }
  2252. }
  2253. unsigned subs, _subs;
  2254. mb.read(subs);
  2255. _subs = subs;
  2256. while (subs--)
  2257. {
  2258. graph_id subId;
  2259. mb.read(subId);
  2260. Owned<CMasterGraph> graph = (CMasterGraph *)job.getGraph(subId);
  2261. if (NULL == graph.get())
  2262. return false;
  2263. if (!graph->deserializeStats(node, mb))
  2264. return false;
  2265. }
  2266. return true;
  2267. }
  2268. ///////////////////////////////////////////////////
  2269. CThorStats::CThorStats()
  2270. {
  2271. unsigned c = queryClusterWidth();
  2272. while (c--) counts.append(0);
  2273. }
  2274. void CThorStats::set(unsigned node, unsigned __int64 count)
  2275. {
  2276. counts.replace(count, node);
  2277. }
  2278. void CThorStats::removeAttribute(IPropertyTree *node, const char *name)
  2279. {
  2280. StringBuffer aName("@");
  2281. node->removeProp(aName.append(name).str());
  2282. }
  2283. void CThorStats::addAttribute(IPropertyTree *node, const char *name, unsigned __int64 val)
  2284. {
  2285. StringBuffer aName("@");
  2286. node->setPropInt64(aName.append(name).str(), val);
  2287. }
  2288. void CThorStats::addAttribute(IPropertyTree *node, const char *name, const char *val)
  2289. {
  2290. StringBuffer aName("@");
  2291. node->setProp(aName.append(name).str(), val);
  2292. }
  2293. void CThorStats::reset()
  2294. {
  2295. tot = max = avg = 0;
  2296. min = (unsigned __int64) -1;
  2297. minNode = maxNode = hi = lo = maxNode = minNode = 0;
  2298. }
  2299. void CThorStats::processInfo()
  2300. {
  2301. reset();
  2302. ForEachItemIn(n, counts)
  2303. {
  2304. unsigned __int64 thiscount = counts.item(n);
  2305. tot += thiscount;
  2306. if (thiscount > max)
  2307. {
  2308. max = thiscount;
  2309. maxNode = n;
  2310. }
  2311. if (thiscount < min)
  2312. {
  2313. min = thiscount;
  2314. minNode = n;
  2315. }
  2316. }
  2317. if (max)
  2318. {
  2319. unsigned count = counts.ordinality();
  2320. avg = tot/count;
  2321. if (avg)
  2322. {
  2323. hi = (unsigned)((100 * (max-avg))/avg);
  2324. lo = (unsigned)((100 * (avg-min))/avg);
  2325. }
  2326. }
  2327. }
  2328. ///////////////////////////////////////////////////
  2329. void CTimingInfo::getXGMML(IPropertyTree *node)
  2330. {
  2331. processInfo();
  2332. addAttribute(node, "timeMaxMs", max);
  2333. addAttribute(node, "timeMinMs", min);
  2334. if (hi == lo)
  2335. {
  2336. removeAttribute(node, "timeMaxEndpoint");
  2337. removeAttribute(node, "timeMinEndpoint");
  2338. removeAttribute(node, "timeMaxSkew");
  2339. removeAttribute(node, "timeMinSkew");
  2340. }
  2341. else
  2342. {
  2343. StringBuffer epStr;
  2344. addAttribute(node, "timeMaxEndpoint", querySlaveGroup().queryNode(maxNode).endpoint().getUrlStr(epStr).str());
  2345. addAttribute(node, "timeMinEndpoint", querySlaveGroup().queryNode(minNode).endpoint().getUrlStr(epStr.clear()).str());
  2346. addAttribute(node, "timeMaxSkew", hi);
  2347. addAttribute(node, "timeMinSkew", lo);
  2348. }
  2349. }
  2350. ///////////////////////////////////////////////////
  2351. void ProgressInfo::processInfo() // reimplement as counts have special flags (i.e. stop/start)
  2352. {
  2353. reset();
  2354. startcount = stopcount = 0;
  2355. ForEachItemIn(n, counts)
  2356. {
  2357. unsigned __int64 thiscount = counts.item(n);
  2358. if (thiscount & THORDATALINK_STARTED)
  2359. startcount++;
  2360. if (thiscount & THORDATALINK_STOPPED)
  2361. stopcount++;
  2362. thiscount = thiscount & THORDATALINK_COUNT_MASK;
  2363. tot += thiscount;
  2364. if (thiscount > max)
  2365. {
  2366. max = thiscount;
  2367. maxNode = n;
  2368. }
  2369. if (thiscount < min)
  2370. {
  2371. min = thiscount;
  2372. minNode = n;
  2373. }
  2374. }
  2375. if (max)
  2376. {
  2377. unsigned count = counts.ordinality();
  2378. avg = tot/count;
  2379. if (avg)
  2380. {
  2381. hi = (unsigned)((100 * (max-avg))/avg);
  2382. lo = (unsigned)((100 * (avg-min))/avg);
  2383. }
  2384. }
  2385. }
  2386. void ProgressInfo::getXGMML(IPropertyTree *node)
  2387. {
  2388. processInfo();
  2389. addAttribute(node, "slaves", counts.ordinality());
  2390. addAttribute(node, "count", tot);
  2391. addAttribute(node, "started", startcount);
  2392. addAttribute(node, "stopped", stopcount);
  2393. if (hi == lo)
  2394. {
  2395. removeAttribute(node, "max");
  2396. removeAttribute(node, "min");
  2397. removeAttribute(node, "maxskew");
  2398. removeAttribute(node, "minskew");
  2399. removeAttribute(node, "maxEndpoint");
  2400. removeAttribute(node, "minEndpoint");
  2401. }
  2402. else
  2403. {
  2404. StringBuffer epStr;
  2405. addAttribute(node, "max", max);
  2406. addAttribute(node, "min", min);
  2407. addAttribute(node, "maxskew", hi);
  2408. addAttribute(node, "minskew", lo);
  2409. addAttribute(node, "maxEndpoint", querySlaveGroup().queryNode(maxNode).endpoint().getUrlStr(epStr).str());
  2410. addAttribute(node, "minEndpoint", querySlaveGroup().queryNode(minNode).endpoint().getUrlStr(epStr.clear()).str());
  2411. }
  2412. }
  2413. ///////////////////////////////////////////////////
  2414. CJobMaster *createThorGraph(const char *graphName, IPropertyTree *xgmml, IConstWorkUnit &workunit, const char *querySo, bool sendSo, const SocketEndpoint &agentEp)
  2415. {
  2416. Owned<CJobMaster> masterJob = new CJobMaster(workunit, graphName, querySo, sendSo, agentEp);
  2417. masterJob->setXGMML(xgmml);
  2418. Owned<IPropertyTreeIterator> iter = xgmml->getElements("node");
  2419. ForEach(*iter)
  2420. {
  2421. IPropertyTree &node = iter->query();
  2422. Owned<CGraphBase> subGraph = masterJob->createGraph();
  2423. subGraph->createFromXGMML(&node, NULL, NULL, NULL);
  2424. masterJob->addSubGraph(*LINK(subGraph));
  2425. }
  2426. masterJob->addDependencies(xgmml);
  2427. return LINK(masterJob);
  2428. }
  2429. static IJobManager *jobManager = NULL;
  2430. void setJobManager(IJobManager *_jobManager)
  2431. {
  2432. CriticalBlock b(*jobManagerCrit);
  2433. jobManager = _jobManager;
  2434. }
  2435. IJobManager *getJobManager()
  2436. {
  2437. CriticalBlock b(*jobManagerCrit);
  2438. return LINK(jobManager);
  2439. }
  2440. IJobManager &queryJobManager()
  2441. {
  2442. CriticalBlock b(*jobManagerCrit);
  2443. assertex(jobManager);
  2444. return *jobManager;
  2445. }