thgraphmaster.cpp 106 KB


  1. /*##############################################################################
  2. HPCC SYSTEMS software Copyright (C) 2012 HPCC Systems®.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. ############################################################################## */
  13. #include <limits.h>
  14. #include <stdlib.h>
  15. #include "jprop.hpp"
  16. #include "jexcept.hpp"
  17. #include "jiter.ipp"
  18. #include "jlzw.hpp"
  19. #include "jsocket.hpp"
  20. #include "jset.hpp"
  21. #include "jsort.hpp"
  22. #include "portlist.h"
  23. #include "jhtree.hpp"
  24. #include "mputil.hpp"
  25. #include "dllserver.hpp"
  26. #include "dautils.hpp"
  27. #include "danqs.hpp"
  28. #include "daclient.hpp"
  29. #include "daaudit.hpp"
  30. #include "wujobq.hpp"
  31. #include "thorport.hpp"
  32. #include "commonext.hpp"
  33. #include "thorxmlread.hpp"
  34. #include "thorplugin.hpp"
  35. #include "thormisc.hpp"
  36. #include "thgraphmaster.ipp"
  37. #include "thdemonserver.hpp"
  38. #include "rtlds_imp.hpp"
  39. #include "eclhelper.hpp"
  40. #include "thexception.hpp"
  41. #include "thactivitymaster.ipp"
  42. #include "thmem.hpp"
  43. #include "thcompressutil.hpp"
  44. using roxiemem::OwnedRoxieString;
  45. static CriticalSection *jobManagerCrit;
  46. MODULE_INIT(INIT_PRIORITY_STANDARD)
  47. {
  48. jobManagerCrit = new CriticalSection;
  49. return true;
  50. }
  51. MODULE_EXIT()
  52. {
  53. delete jobManagerCrit;
  54. }
  55. unsigned uniqGraphId = 1;
  56. #define FATAL_TIMEOUT 60
  57. class CFatalHandler : public CTimeoutTrigger, implements IFatalHandler
  58. {
  59. public:
  60. IMPLEMENT_IINTERFACE;
  61. CFatalHandler(unsigned timeout) : CTimeoutTrigger(timeout, "EXCEPTION")
  62. {
  63. start();
  64. }
  65. virtual bool action() override
  66. {
  67. StringBuffer s("FAILED TO RECOVER FROM EXCEPTION, STOPPING THOR");
  68. FLLOG(MCoperatorWarning, thorJob, exception, s.str());
  69. Owned<IJobManager> jobManager = getJobManager();
  70. if (jobManager)
  71. {
  72. jobManager->fatal(exception);
  73. jobManager.clear();
  74. }
  75. return true;
  76. }
  77. // IFatalHandler
  78. virtual void inform(IException *e) override
  79. {
  80. CTimeoutTrigger::inform(e);
  81. }
  82. virtual void clear() override
  83. {
  84. CTimeoutTrigger::clear();
  85. }
  86. };
  87. /////
  88. CSlaveMessageHandler::CSlaveMessageHandler(CJobMaster &_job, mptag_t _mptag) : threaded("CSlaveMessageHandler"), job(_job), mptag(_mptag)
  89. {
  90. stopped = false;
  91. threaded.init(this);
  92. childGraphInitTimeout = job.getOptUInt(THOROPT_CHILD_GRAPH_INIT_TIMEOUT, 5*60) * 1000; // default 5 minutes
  93. }
  94. CSlaveMessageHandler::~CSlaveMessageHandler()
  95. {
  96. stop();
  97. }
  98. void CSlaveMessageHandler::stop()
  99. {
  100. if (!stopped)
  101. {
  102. stopped = true;
  103. job.queryJobChannel(0).queryJobComm().cancel(0, mptag);
  104. threaded.join();
  105. }
  106. }
  107. void CSlaveMessageHandler::threadmain()
  108. {
  109. try
  110. {
  111. for (;;)
  112. {
  113. rank_t sender;
  114. CMessageBuffer msg;
  115. if (stopped || !job.queryJobChannel(0).queryJobComm().recv(msg, RANK_ALL, mptag, &sender))
  116. break;
  117. SlaveMsgTypes msgType;
  118. readUnderlyingType(msg, msgType);
  119. switch (msgType)
  120. {
  121. case smt_errorMsg:
  122. {
  123. unsigned slave;
  124. msg.read(slave);
  125. Owned<IThorException> e = deserializeThorException(msg);
  126. e->setSlave(slave+1);
  127. Owned<CGraphBase> graph = job.queryJobChannel(0).getGraph(e->queryGraphId());
  128. if (graph)
  129. {
  130. activity_id id = e->queryActivityId();
  131. if (id)
  132. {
  133. CGraphElementBase *elem = graph->queryElement(id);
  134. CActivityBase *act = elem->queryActivity();
  135. if (act)
  136. act->fireException(e);
  137. else
  138. graph->fireException(e);
  139. }
  140. else
  141. graph->fireException(e);
  142. }
  143. else
  144. job.fireException(e);
  145. if (msg.getReplyTag() <= TAG_REPLY_BASE)
  146. {
  147. msg.clear();
  148. job.queryJobChannel(0).queryJobComm().reply(msg);
  149. }
  150. break;
  151. }
  152. case smt_dataReq:
  153. {
  154. graph_id gid;
  155. activity_id id;
  156. unsigned slave;
  157. msg.read(slave);
  158. msg.read(gid);
  159. msg.read(id);
  160. msg.clear();
  161. Owned<CGraphBase> graph = job.queryJobChannel(0).getGraph(gid);
  162. if (graph)
  163. {
  164. CMasterGraphElement *e = (CMasterGraphElement *)graph->queryElement(id);
  165. e->queryActivity()->getInitializationData(slave, msg);
  166. }
  167. job.queryJobChannel(0).queryJobComm().reply(msg);
  168. break;
  169. }
  170. case smt_initGraphReq:
  171. {
  172. graph_id gid;
  173. msg.read(gid);
  174. Owned<CMasterGraph> graph = (CMasterGraph *)job.queryJobChannel(0).getGraph(gid);
  175. assertex(graph);
  176. {
  177. CriticalBlock b(graph->queryCreateLock());
  178. Owned<IThorActivityIterator> iter = graph->getIterator();
  179. // onCreate all
  180. ForEach (*iter)
  181. {
  182. CMasterGraphElement &element = (CMasterGraphElement &)iter->query();
  183. element.onCreate();
  184. }
  185. }
  186. msg.clear();
  187. graph->serializeCreateContexts(msg);
  188. job.queryJobChannel(0).queryJobComm().reply(msg);
  189. break;
  190. }
  191. case smt_initActDataReq:
  192. {
  193. graph_id gid;
  194. msg.read(gid);
  195. unsigned slave;
  196. msg.read(slave);
  197. Owned<CMasterGraph> graph = (CMasterGraph *)job.queryJobChannel(0).getGraph(gid);
  198. assertex(graph);
  199. CGraphElementArray toSerialize;
  200. CriticalBlock b(graph->queryCreateLock());
  201. size32_t parentExtractSz;
  202. msg.read(parentExtractSz);
  203. const byte *parentExtract = NULL;
  204. if (parentExtractSz)
  205. {
  206. parentExtract = msg.readDirect(parentExtractSz);
  207. StringBuffer msg("Graph(");
  208. msg.append(graph->queryGraphId()).append(") - initializing master graph with parentExtract ").append(parentExtractSz).append(" bytes");
  209. DBGLOG("%s", msg.str());
  210. parentExtract = graph->setParentCtx(parentExtractSz, parentExtract);
  211. }
  212. Owned<IException> exception;
  213. for (;;)
  214. {
  215. activity_id id;
  216. msg.read(id);
  217. if (!id)
  218. break;
  219. CMasterGraphElement *element = (CMasterGraphElement *)graph->queryElement(id);
  220. assertex(element);
  221. try
  222. {
  223. element->reset();
  224. size32_t startCtxLen;
  225. msg.read(startCtxLen);
  226. element->doCreateActivity(parentExtractSz, parentExtract, startCtxLen ? &msg : nullptr);
  227. if (element->queryActivity())
  228. element->preStart(parentExtractSz, parentExtract);
  229. }
  230. catch (IException *e)
  231. {
  232. EXCLOG(e, NULL);
  233. exception.setown(e);
  234. break;
  235. }
  236. CActivityBase *activity = element->queryActivity();
  237. if (activity)
  238. element->sentActInitData->set(slave, 0); // clear to permit serializeActivityInitData to resend
  239. toSerialize.append(*LINK(element));
  240. }
  241. msg.clear();
  242. mptag_t replyTag = job.queryJobChannel(0).queryMPServer().createReplyTag();
  243. msg.append(replyTag); // second reply
  244. if (exception)
  245. {
  246. msg.append(true);
  247. serializeException(exception, msg);
  248. }
  249. else
  250. {
  251. msg.append(false);
  252. CGraphElementArrayIterator iter(toSerialize);
  253. graph->serializeActivityInitData(slave, msg, iter);
  254. }
  255. job.queryJobChannel(0).queryJobComm().reply(msg);
  256. if (!job.queryJobChannel(0).queryJobComm().recv(msg, slave+1, replyTag, NULL, childGraphInitTimeout))
  257. throwUnexpected();
  258. if (exception)
  259. throw exception.getClear();
  260. bool error;
  261. msg.read(error);
  262. if (error)
  263. {
  264. Owned<IThorException> e = deserializeThorException(msg);
  265. e->setSlave(slave);
  266. StringBuffer tmpStr("Slave ");
  267. job.queryJobGroup().queryNode(slave).endpoint().getUrlStr(tmpStr);
  268. GraphPrintLog(graph, e, "%s", tmpStr.append(": slave initialization error").str());
  269. throw e.getClear();
  270. }
  271. break;
  272. }
  273. case smt_getPhysicalName:
  274. {
  275. LOG(MCdebugProgress, thorJob, "getPhysicalName called from node %d", sender-1);
  276. StringAttr logicalName;
  277. unsigned partNo;
  278. bool create;
  279. msg.read(logicalName);
  280. msg.read(partNo);
  281. msg.read(create);
  282. msg.clear();
  283. StringBuffer phys;
  284. if (create && !job.queryCreatedFile(logicalName)) // not sure who would do this ever??
  285. queryThorFileManager().getPublishPhysicalName(job, logicalName, partNo, phys);
  286. else
  287. queryThorFileManager().getPhysicalName(job, logicalName, partNo, phys);
  288. msg.append(phys);
  289. job.queryJobChannel(0).queryJobComm().reply(msg);
  290. break;
  291. }
  292. case smt_getFileOffset:
  293. {
  294. LOG(MCdebugProgress, thorJob, "getFileOffset called from node %d", sender-1);
  295. StringAttr logicalName;
  296. unsigned partNo;
  297. msg.read(logicalName);
  298. msg.read(partNo);
  299. msg.clear();
  300. offset_t offset = queryThorFileManager().getFileOffset(job, logicalName, partNo);
  301. msg.append(offset);
  302. job.queryJobChannel(0).queryJobComm().reply(msg);
  303. break;
  304. }
  305. case smt_actMsg:
  306. {
  307. LOG(MCdebugProgress, thorJob, "smt_actMsg called from node %d", sender-1);
  308. graph_id gid;
  309. msg.read(gid);
  310. activity_id id;
  311. msg.read(id);
  312. Owned<CMasterGraph> graph = (CMasterGraph *)job.queryJobChannel(0).getGraph(gid);
  313. assertex(graph);
  314. CMasterGraphElement *container = (CMasterGraphElement *)graph->queryElement(id);
  315. assertex(container);
  316. CMasterActivity *activity = (CMasterActivity *)container->queryActivity();
  317. assertex(activity);
  318. activity->handleSlaveMessage(msg); // don't block
  319. break;
  320. }
  321. case smt_getresult:
  322. {
  323. unsigned slave;
  324. msg.read(slave);
  325. LOG(MCdebugProgress, thorJob, "smt_getresult called from slave %d", slave);
  326. graph_id gid;
  327. msg.read(gid);
  328. activity_id ownerId;
  329. msg.read(ownerId);
  330. unsigned resultId;
  331. msg.read(resultId);
  332. CJobChannel &jobChannel = job.queryJobChannel(0);
  333. mptag_t replyTag = jobChannel.deserializeMPTag(msg);
  334. Owned<IThorResult> result = jobChannel.getOwnedResult(gid, ownerId, resultId);
  335. Owned<IRowStream> resultStream = result->getRowStream();
  336. sendInChunks(job.queryJobChannel(0).queryJobComm(), slave+1, replyTag, resultStream, result->queryRowInterfaces());
  337. break;
  338. }
  339. }
  340. }
  341. }
  342. catch (IException *e)
  343. {
  344. job.fireException(e);
  345. e->Release();
  346. }
  347. }
  348. //////////////////////
  349. CMasterActivity::CMasterActivity(CGraphElementBase *_container, const StatisticsMapping &statsMapping)
  350. : CActivityBase(_container, statsMapping), threaded("CMasterActivity", this), statsCollection(statsMapping)
  351. {
  352. notedWarnings = createThreadSafeBitSet();
  353. mpTag = TAG_NULL;
  354. data = new MemoryBuffer[container.queryJob().querySlaves()];
  355. asyncStart = false;
  356. if (container.isSink())
  357. edgeStatsVector.push_back(new CThorEdgeCollection);
  358. else
  359. {
  360. for (unsigned o=0; o<container.getOutputs(); o++)
  361. edgeStatsVector.push_back(new CThorEdgeCollection);
  362. }
  363. }
  364. CMasterActivity::~CMasterActivity()
  365. {
  366. if (asyncStart)
  367. threaded.join();
  368. notedWarnings->Release();
  369. queryJob().freeMPTag(mpTag);
  370. delete [] data;
  371. }
  372. IDistributedFile *CMasterActivity::queryReadFile(unsigned f)
  373. {
  374. if (f>=readFiles.ordinality())
  375. return NULL;
  376. return &readFiles.item(f);
  377. }
  378. IDistributedFile *CMasterActivity::findReadFile(const char *lfnName)
  379. {
  380. auto it = readFilesMap.find(lfnName);
  381. if (it != readFilesMap.end())
  382. return LINK(it->second);
  383. return nullptr;
  384. }
  385. IDistributedFile *CMasterActivity::lookupReadFile(const char *lfnName, bool jobTemp, bool temp, bool opt)
  386. {
  387. StringBuffer normalizedFileName;
  388. queryThorFileManager().addScope(container.queryJob(), lfnName, normalizedFileName, jobTemp|temp);
  389. Owned<IDistributedFile> file = findReadFile(normalizedFileName);
  390. if (!file)
  391. {
  392. file.setown(queryThorFileManager().lookup(container.queryJob(), lfnName, jobTemp|temp, opt, true, container.activityIsCodeSigned()));
  393. if (file)
  394. {
  395. readFiles.append(*LINK(file));
  396. readFilesMap[normalizedFileName.str()] = file;
  397. if (!temp) // NB: Temps not listed in workunit
  398. queryThorFileManager().noteFileRead(container.queryJob(), file);
  399. }
  400. }
  401. return file.getClear();
  402. }
  403. MemoryBuffer &CMasterActivity::queryInitializationData(unsigned slave) const
  404. { // NB: not intended to be called by multiple threads.
  405. return data[slave].reset();
  406. }
  407. MemoryBuffer &CMasterActivity::getInitializationData(unsigned slave, MemoryBuffer &dst) const
  408. {
  409. return dst.append(data[slave]);
  410. }
  411. void CMasterActivity::threadmain()
  412. {
  413. try
  414. {
  415. process();
  416. }
  417. catch (IException *e)
  418. {
  419. Owned<IException> e2;
  420. if (QUERYINTERFACE(e, ISEH_Exception))
  421. e2.setown(MakeThorFatal(e, TE_SEH, "(SEH)"));
  422. else
  423. e2.setown(MakeActivityException(this, e, "Master exception"));
  424. e->Release();
  425. ActPrintLog(e2, "In CMasterActivity::threadmain");
  426. fireException(e2);
  427. }
  428. catch (CATCHALL)
  429. {
  430. Owned<IException> e = MakeThorFatal(NULL, TE_MasterProcessError, "FATAL: Unknown master process exception kind=%s, id=%" ACTPF "d", activityKindStr(container.getKind()), container.queryId());
  431. ActPrintLog(e, "In CMasterActivity::threadmain");
  432. fireException(e);
  433. }
  434. }
  435. void CMasterActivity::init()
  436. {
  437. // Files are added to readFiles during initialization,
  438. // If this is a CQ query act., then it will be repeatedly re-initialized if it depends on master context,
  439. // e.g. due to a variable filename.
  440. // Therefore, do not clear readFiles on reinitialization, to avoid repeatedly [expensively] looking them up.
  441. bool inCQ = container.queryOwner().queryOwner() && !container.queryOwner().isGlobal();
  442. if (!inCQ)
  443. {
  444. readFiles.kill();
  445. readFilesMap.clear();
  446. }
  447. }
  448. void CMasterActivity::startProcess(bool async)
  449. {
  450. if (async)
  451. {
  452. asyncStart = true;
  453. threaded.start();
  454. }
  455. else
  456. threadmain();
  457. }
  458. bool CMasterActivity::wait(unsigned timeout)
  459. {
  460. if (!asyncStart)
  461. return true;
  462. return threaded.join(timeout);
  463. }
  464. void CMasterActivity::kill()
  465. {
  466. CActivityBase::kill();
  467. readFiles.kill();
  468. readFilesMap.clear();
  469. }
  470. bool CMasterActivity::fireException(IException *_e)
  471. {
  472. IThorException *e = QUERYINTERFACE(_e, IThorException);
  473. if (!e) return false;
  474. switch (e->errorCode())
  475. {
  476. case TE_LargeBufferWarning:
  477. case TE_MoxieIndarOverflow:
  478. case TE_BuildIndexFewExcess:
  479. case TE_FetchMisaligned:
  480. case TE_FetchOutOfRange:
  481. case TE_CouldNotCreateLookAhead:
  482. case TE_SpillAdded:
  483. case TE_ReadPartialFromPipe:
  484. case TE_LargeAggregateTable:
  485. case TE_RemoteReadFailure:
  486. {
  487. if (!notedWarnings->testSet(e->errorCode()))
  488. CActivityBase::fireException(e);
  489. return true;
  490. }
  491. }
  492. return container.queryOwner().fireException(e);
  493. }
  494. void CMasterActivity::reset()
  495. {
  496. asyncStart = false;
  497. CActivityBase::reset();
  498. }
  499. void CMasterActivity::deserializeStats(unsigned node, MemoryBuffer &mb)
  500. {
  501. CriticalBlock b(progressCrit); // don't think needed
  502. statsCollection.deserialize(node, mb);
  503. rowcount_t count;
  504. for (auto &collection: edgeStatsVector)
  505. {
  506. mb.read(count);
  507. collection->set(node, count);
  508. }
  509. }
  510. void CMasterActivity::getActivityStats(IStatisticGatherer & stats)
  511. {
  512. statsCollection.getStats(stats);
  513. if (diskAccessCost)
  514. stats.addStatistic(StCostFileAccess, diskAccessCost);
  515. }
  516. void CMasterActivity::getEdgeStats(IStatisticGatherer & stats, unsigned idx)
  517. {
  518. CriticalBlock b(progressCrit);
  519. if (idx < edgeStatsVector.size())
  520. edgeStatsVector[idx]->getStats(stats);
  521. }
  522. void CMasterActivity::done()
  523. {
  524. CActivityBase::done();
  525. if (readFiles.ordinality())
  526. {
  527. IArrayOf<IDistributedFile> tmpFiles;
  528. ForEachItemIn(f, readFiles)
  529. {
  530. IDistributedFile &file = readFiles.item(f);
  531. IDistributedSuperFile *super = file.querySuperFile();
  532. if (super)
  533. {
  534. getSuperFileSubs(super, tmpFiles, true);
  535. tmpFiles.append(*LINK(&file));
  536. }
  537. else
  538. tmpFiles.append(*LINK(&file));
  539. }
  540. ForEachItemIn(s, tmpFiles)
  541. {
  542. IDistributedFile &file = tmpFiles.item(s);
  543. file.setAccessed();
  544. }
  545. }
  546. }
  547. void CMasterActivity::updateFileReadCostStats(std::vector<OwnedPtr<CThorStatsCollection>> & subFileStats)
  548. {
  549. /* JCSMORE->SHAMSER: (separate JIRA needed)
  550. * there can be >1 read file if this act. is in a child query/loop, it could be processing a different file per iteration,
  551. * meaning there could be an arbitrary number of readfiles, in that case activity::init would be called multiple times.
  552. *
  553. * I hit an assert during testing reading a super in a CQ:
  554. libjlib.so!raiseAssertException(const char * assertion, const char * file, unsigned int line) (\home\jsmith\git\HPCC-Platform\system\jlib\jexcept.cpp:660)
  555. libjlib.so!MemoryBuffer::read(MemoryBuffer * const this, unsigned char & value) (\home\jsmith\git\HPCC-Platform\system\jlib\jbuff.cpp:693)
  556. libjlib.so!MemoryBuffer::readPacked(MemoryBuffer * const this) (\home\jsmith\git\HPCC-Platform\system\jlib\jbuff.cpp:813)
  557. libjlib.so!MemoryBuffer::readPacked(MemoryBuffer * const this, unsigned int & value) (\home\jsmith\git\HPCC-Platform\system\jlib\jbuff.cpp:824)
  558. libjlib.so!CRuntimeStatisticCollection::deserialize(CRuntimeStatisticCollection * const this, MemoryBuffer & in) (\home\jsmith\git\HPCC-Platform\system\jlib\jstats.cpp:2524)
  559. libactivitymasters_lcr.so!CThorStatsCollection::deserialize(CThorStatsCollection * const this, unsigned int node, MemoryBuffer & mb) (\home\jsmith\git\HPCC-Platform\thorlcr\graph\thgraphmaster.ipp:53)
  560. libactivitymasters_lcr.so!CDiskReadMasterBase::deserializeStats(CDiskReadMasterBase * const this, unsigned int node, MemoryBuffer & mb) (\home\jsmith\git\HPCC-Platform\thorlcr\activities\thdiskbase.cpp:139)
  561. libgraphmaster_lcr.so!CMasterGraph::deserializeStats(CMasterGraph * const this, unsigned int node, MemoryBuffer & mb) (\home\jsmith\git\HPCC-Platform\thorlcr\graph\thgraphmaster.cpp:2781)
  562. *
  563. * (would be a crash in a Release build)
  564. * it's because the diskread init is adding new CThorStatsCollection per init (per execution of the CQ),
  565. * which means when deserializing there are too many. The code assumes there is only 1 file being read.
  566. *
  567. * I've temporarily changed the code where subFileStats's are added, to prevent more being added per iteration,
  568. * but it needs re-thinking to handle the workers potentially dealing with different logical files
  569. * (+ index read to handle super files, and case where act. is reading >1 file, i.e. KJ)
  570. * I've changed this code rely on the 1st readFiles for now (not the # of subFileStats, which may be more)
  571. * NB: also changed when the expansion of readFiles (from supers to subfiles) happens, a new super was being added
  572. * each CQ iteration and re-expanded, meaing readFiles kept growing.
  573. *
  574. * Also, superkey1.ecl hits a dbgasserex whilst deserializing stats (before and after these PR changes),
  575. * but is caught/ignored. I haven't investigated further.
  576. */
  577. IDistributedFile *file = queryReadFile(0);
  578. if (file)
  579. {
  580. IDistributedSuperFile *super = file->querySuperFile();
  581. if (super)
  582. {
  583. unsigned numSubFiles = super->numSubFiles(true); //subFileStats.size();
  584. if (subFileStats.size())
  585. {
  586. assertex(numSubFiles <= subFileStats.size());
  587. for (unsigned i=0; i<subFileStats.size(); i++)
  588. {
  589. IDistributedFile &subFile = super->querySubFile(i, true);
  590. const char *subName = subFile.queryLogicalName();
  591. PROGLOG("subName = %s", subName);
  592. stat_type numDiskReads = subFileStats[i]->getStatisticSum(StNumDiskReads);
  593. StringBuffer clusterName;
  594. subFile.getClusterName(0, clusterName);
  595. diskAccessCost += money2cost_type(calcFileAccessCost(clusterName, 0, numDiskReads));
  596. subFile.addAttrValue("@numDiskReads", numDiskReads);
  597. }
  598. }
  599. }
  600. else
  601. {
  602. stat_type numDiskReads = statsCollection.getStatisticSum(StNumDiskReads);
  603. StringBuffer clusterName;
  604. file->getClusterName(0, clusterName);
  605. diskAccessCost += money2cost_type(calcFileAccessCost(clusterName, 0, numDiskReads));
  606. file->addAttrValue("@numDiskReads", numDiskReads);
  607. }
  608. }
  609. }
  610. void CMasterActivity::updateFileWriteCostStats(IFileDescriptor & fileDesc, IPropertyTree &props, stat_type numDiskWrites)
  611. {
  612. if (numDiskWrites)
  613. {
  614. props.setPropInt64("@numDiskWrites", numDiskWrites);
  615. assertex(fileDesc.numClusters()>=1);
  616. StringBuffer clusterName;
  617. fileDesc.getClusterGroupName(0, clusterName);// Note: calculating for 1st cluster. (Future: calc for >1 clusters)
  618. diskAccessCost = money2cost_type(calcFileAccessCost(clusterName, numDiskWrites, 0));
  619. }
  620. }
  621. //////////////////////
  622. // CMasterGraphElement impl.
  623. //
  624. CMasterGraphElement::CMasterGraphElement(CGraphBase &_owner, IPropertyTree &_xgmml) : CGraphElementBase(_owner, _xgmml, nullptr)
  625. {
  626. }
  627. bool CMasterGraphElement::checkUpdate()
  628. {
  629. if (!onlyUpdateIfChanged)
  630. return false;
  631. if (!globals->getPropBool("@updateEnabled", true) || 0 != queryJob().getWorkUnitValueInt("disableUpdate", 0))
  632. return false;
  633. bool doCheckUpdate = false;
  634. OwnedRoxieString filename;
  635. unsigned eclCRC;
  636. unsigned __int64 totalCRC;
  637. bool temporary = false;
  638. switch (getKind())
  639. {
  640. case TAKindexwrite:
  641. {
  642. IHThorIndexWriteArg *helper = (IHThorIndexWriteArg *)queryHelper();
  643. doCheckUpdate = 0 != (helper->getFlags() & TIWupdate);
  644. filename.setown(helper->getFileName());
  645. helper->getUpdateCRCs(eclCRC, totalCRC);
  646. break;
  647. }
  648. case TAKdiskwrite:
  649. case TAKcsvwrite:
  650. case TAKxmlwrite:
  651. case TAKjsonwrite:
  652. case TAKspillwrite:
  653. {
  654. IHThorDiskWriteArg *helper = (IHThorDiskWriteArg *)queryHelper();
  655. doCheckUpdate = 0 != (helper->getFlags() & TDWupdate);
  656. filename.setown(helper->getFileName());
  657. helper->getUpdateCRCs(eclCRC, totalCRC);
  658. if (TAKdiskwrite == getKind())
  659. temporary = 0 != (helper->getFlags() & (TDXtemporary|TDXjobtemp));
  660. else if (TAKspillwrite == getKind())
  661. temporary = true;
  662. break;
  663. }
  664. default:
  665. break;
  666. }
  667. if (doCheckUpdate)
  668. {
  669. StringAttr lfn;
  670. Owned<IDistributedFile> file = queryThorFileManager().lookup(queryJob(), filename, temporary, true, false, defaultPrivilegedUser);
  671. if (file)
  672. {
  673. IPropertyTree &props = file->queryAttributes();
  674. if ((eclCRC == (unsigned)props.getPropInt("@eclCRC")) && (totalCRC == (unsigned __int64)props.getPropInt64("@totalCRC")))
  675. {
  676. // so this needs pruning
  677. Owned<IThorException> e = MakeActivityWarning(this, TE_UpToDate, "output file = '%s' - is up to date - it will not be rebuilt", file->queryLogicalName());
  678. queryOwner().fireException(e);
  679. return true;
  680. }
  681. }
  682. }
  683. return false;
  684. }
  685. void CMasterGraphElement::initActivity()
  686. {
  687. CriticalBlock b(crit);
  688. if (!initialized)
  689. {
  690. initialized = true;
  691. ((CMasterActivity *)queryActivity())->init();
  692. }
  693. }
  694. void CMasterGraphElement::reset()
  695. {
  696. CGraphElementBase::reset();
  697. if (activity && activity->needReInit())
  698. initialized = false;
  699. }
  700. void CMasterGraphElement::doCreateActivity(size32_t parentExtractSz, const byte *parentExtract, MemoryBuffer *startCtx)
  701. {
  702. createActivity();
  703. onStart(parentExtractSz, parentExtract, startCtx);
  704. initActivity();
  705. }
  706. void CMasterGraphElement::slaveDone(size32_t slaveIdx, MemoryBuffer &mb)
  707. {
  708. ((CMasterActivity *)queryActivity())->slaveDone(slaveIdx, mb);
  709. }
  710. //////
  711. ///////
  712. class CBarrierMaster : public CInterface, implements IBarrier
  713. {
  714. mptag_t tag;
  715. Linked<ICommunicator> comm;
  716. bool receiving;
  717. public:
  718. IMPLEMENT_IINTERFACE;
  719. CBarrierMaster(ICommunicator &_comm, mptag_t _tag) : comm(&_comm), tag(_tag)
  720. {
  721. receiving = false;
  722. }
  723. virtual mptag_t queryTag() const override { return tag; }
  724. virtual bool wait(bool exception, unsigned timeout) override
  725. {
  726. Owned<IException> e;
  727. CTimeMon tm(timeout);
  728. unsigned s=comm->queryGroup().ordinality()-1;
  729. bool aborted = false;
  730. CMessageBuffer msg;
  731. Owned<IBitSet> raisedSet = createThreadSafeBitSet();
  732. unsigned remaining = timeout;
  733. while (s--)
  734. {
  735. rank_t sender;
  736. msg.clear();
  737. if (INFINITE != timeout && tm.timedout(&remaining))
  738. {
  739. if (exception)
  740. throw createBarrierAbortException();
  741. else
  742. return false;
  743. }
  744. {
  745. BooleanOnOff onOff(receiving);
  746. if (!comm->recv(msg, RANK_ALL, tag, &sender, remaining))
  747. break;
  748. }
  749. msg.read(aborted);
  750. bool hasExcept;
  751. msg.read(hasExcept);
  752. if (hasExcept && !e.get())
  753. e.setown(deserializeException(msg));
  754. sender = sender - 1; // 0 = master
  755. if (raisedSet->testSet(sender, true) && !aborted)
  756. IWARNLOG("CBarrierMaster, raise barrier message on tag %d, already received from slave %d", tag, sender);
  757. if (aborted) break;
  758. }
  759. msg.clear();
  760. msg.append(aborted);
  761. if (e)
  762. {
  763. msg.append(true);
  764. serializeException(e, msg);
  765. }
  766. else
  767. msg.append(false);
  768. if (INFINITE != timeout && tm.timedout(&remaining))
  769. {
  770. if (exception)
  771. throw createBarrierAbortException();
  772. else
  773. return false;
  774. }
  775. if (!comm->send(msg, RANK_ALL_OTHER, tag, INFINITE != timeout ? remaining : LONGTIMEOUT))
  776. throw MakeStringException(0, "CBarrierMaster::wait - Timeout sending to slaves");
  777. if (aborted)
  778. {
  779. if (!exception)
  780. return false;
  781. if (e)
  782. throw e.getClear();
  783. else
  784. throw createBarrierAbortException();
  785. }
  786. return true;
  787. }
  788. virtual void cancel(IException *e) override
  789. {
  790. if (receiving)
  791. comm->cancel(RANK_ALL, tag);
  792. CMessageBuffer msg;
  793. msg.append(true);
  794. if (e)
  795. {
  796. msg.append(true);
  797. serializeException(e, msg);
  798. }
  799. else
  800. msg.append(false);
  801. if (!comm->send(msg, RANK_ALL_OTHER, tag, LONGTIMEOUT))
  802. throw MakeStringException(0, "CBarrierMaster::cancel - Timeout sending to slaves");
  803. }
  804. };
  805. /////////////
  806. class CMasterGraphTempHandler : public CGraphTempHandler
  807. {
  808. public:
  809. CMasterGraphTempHandler(CJobBase &job, bool errorOnMissing) : CGraphTempHandler(job, errorOnMissing) { }
  810. virtual bool removeTemp(const char *name)
  811. {
  812. queryThorFileManager().clearCacheEntry(name);
  813. return true;
  814. }
  815. virtual void registerFile(const char *name, graph_id graphId, unsigned usageCount, bool temp, WUFileKind fileKind, StringArray *clusters)
  816. {
  817. if (!temp || job.queryUseCheckpoints())
  818. {
  819. Owned<IWorkUnit> wu = &job.queryWorkUnit().lock();
  820. wu->addFile(name, clusters, usageCount, fileKind, job.queryGraphName());
  821. }
  822. else
  823. CGraphTempHandler::registerFile(name, graphId, usageCount, temp, fileKind, clusters);
  824. }
  825. virtual void deregisterFile(const char *name, bool kept) // NB: only called for temp files
  826. {
  827. if (kept || job.queryUseCheckpoints())
  828. {
  829. Owned<IWorkUnit> wu = &job.queryWorkUnit().lock();
  830. wu->releaseFile(name);
  831. }
  832. else
  833. CGraphTempHandler::deregisterFile(name);
  834. }
  835. virtual void clearTemps()
  836. {
  837. try
  838. {
  839. if (!job.queryPausing()) // temps of completed workunit will have been preserved and want to keep
  840. {
  841. Owned<IWorkUnit> lwu = &job.queryWorkUnit().lock();
  842. lwu->deleteTempFiles(job.queryGraphName(), false, false);
  843. }
  844. }
  845. catch (IException *e)
  846. {
  847. EXCLOG(e, "Problem deleting temp files");
  848. e->Release();
  849. }
  850. CGraphTempHandler::clearTemps();
  851. }
  852. };
  853. static const char * getResultText(StringBuffer & s, const char * stepname, unsigned sequence)
  854. {
  855. switch ((int)sequence)
  856. {
  857. case -1: return s.append("STORED('").append(stepname).append("')");
  858. case -2: return s.append("PERSIST('").append(stepname).append("')");
  859. case -3: return s.append("global('").append(stepname).append("')");
  860. default:
  861. if (stepname)
  862. return s.append(stepname);
  863. return s.append('#').append(sequence);
  864. }
  865. }
  866. class CThorCodeContextMaster : public CThorCodeContextBase
  867. {
  868. Linked<IConstWorkUnit> workunit;
  869. Owned<IDistributedFileTransaction> superfiletransaction;
  870. IConstWUResult * getResult(const char * name, unsigned sequence)
  871. {
  872. return getWorkUnitResult(workunit, name, sequence);
  873. }
  874. #define PROTECTED_GETRESULT(STEPNAME, SEQUENCE, KIND, KINDTEXT, ACTION) \
  875. LOG(MCdebugProgress, thorJob, "getResult%s(%s,%d)", KIND, STEPNAME?STEPNAME:"", SEQUENCE); \
  876. Owned<IConstWUResult> r = getResultForGet(STEPNAME, SEQUENCE); \
  877. try \
  878. { \
  879. ACTION \
  880. } \
  881. catch (IException * e) { \
  882. StringBuffer s, text; e->errorMessage(text); e->Release(); \
  883. throw MakeStringException(TE_FailedToRetrieveWorkunitValue, "result %s in workunit contains an invalid " KINDTEXT " value [%s]", getResultText(s, STEPNAME, SEQUENCE), text.str()); \
  884. } \
  885. catch (CATCHALL) { StringBuffer s; throw MakeStringException(TE_FailedToRetrieveWorkunitValue, "value %s in workunit contains an invalid " KINDTEXT " value", getResultText(s, STEPNAME, SEQUENCE)); }
  886. public:
  887. CThorCodeContextMaster(CJobChannel &jobChannel, IConstWorkUnit &_workunit, ILoadedDllEntry &querySo, IUserDescriptor &userDesc) : CThorCodeContextBase(jobChannel, querySo, userDesc), workunit(&_workunit)
  888. {
  889. }
  890. // ICodeContext
  891. virtual unsigned getGraphLoopCounter() const override { return 0; }
  892. virtual IDebuggableContext *queryDebugContext() const override { return nullptr; }
  893. virtual void setResultBool(const char *name, unsigned sequence, bool result) override
  894. {
  895. WorkunitUpdate w(&workunit->lock());
  896. Owned<IWUResult> r = updateWorkUnitResult(w, name, sequence);
  897. if (r)
  898. {
  899. r->setResultBool(result);
  900. r->setResultStatus(ResultStatusCalculated);
  901. }
  902. else
  903. throw MakeStringException(TE_UnexpectedParameters, "Unexpected parameters to setResultBool");
  904. }
  905. virtual void setResultData(const char *name, unsigned sequence, int len, const void *result) override
  906. {
  907. WorkunitUpdate w(&workunit->lock());
  908. Owned<IWUResult> r = updateWorkUnitResult(w, name, sequence);
  909. if (r)
  910. {
  911. r->setResultData(result, len);
  912. r->setResultStatus(ResultStatusCalculated);
  913. }
  914. else
  915. throw MakeStringException(TE_UnexpectedParameters, "Unexpected parameters to setResultData");
  916. }
  917. virtual void setResultDecimal(const char * name, unsigned sequence, int len, int precision, bool isSigned, const void *val) override
  918. {
  919. WorkunitUpdate w(&workunit->lock());
  920. Owned<IWUResult> r = updateWorkUnitResult(w, name, sequence);
  921. if (r)
  922. {
  923. r->setResultDecimal(val, len);
  924. r->setResultStatus(ResultStatusCalculated);
  925. }
  926. else
  927. throw MakeStringException(TE_UnexpectedParameters, "Unexpected parameters to setResultDecimal");
  928. }
  929. virtual void setResultInt(const char *name, unsigned sequence, __int64 result, unsigned size) override
  930. {
  931. WorkunitUpdate w(&workunit->lock());
  932. Owned<IWUResult> r = updateWorkUnitResult(w, name, sequence);
  933. if (r)
  934. {
  935. r->setResultInt(result);
  936. r->setResultStatus(ResultStatusCalculated);
  937. }
  938. else
  939. throw MakeStringException(TE_UnexpectedParameters, "Unexpected parameters to setResultInt");
  940. }
  941. virtual void setResultRaw(const char *name, unsigned sequence, int len, const void *result) override
  942. {
  943. WorkunitUpdate w(&workunit->lock());
  944. Owned<IWUResult> r = updateWorkUnitResult(w, name, sequence);
  945. if (r)
  946. {
  947. r->setResultRaw(len, result, ResultFormatRaw);
  948. r->setResultStatus(ResultStatusCalculated);
  949. }
  950. else
  951. throw MakeStringException(TE_UnexpectedParameters, "Unexpected parameters to setResultData");
  952. }
  953. virtual void setResultReal(const char *name, unsigned sequence, double result) override
  954. {
  955. WorkunitUpdate w(&workunit->lock());
  956. Owned<IWUResult> r = updateWorkUnitResult(w, name, sequence);
  957. if (r)
  958. {
  959. r->setResultReal(result);
  960. r->setResultStatus(ResultStatusCalculated);
  961. }
  962. else
  963. throw MakeStringException(TE_UnexpectedParameters, "Unexpected parameters to setResultReal");
  964. }
  965. virtual void setResultSet(const char *name, unsigned sequence, bool isAll, size32_t len, const void *result, ISetToXmlTransformer *) override
  966. {
  967. WorkunitUpdate w(&workunit->lock());
  968. Owned<IWUResult> r = updateWorkUnitResult(w, name, sequence);
  969. if (r)
  970. {
  971. r->setResultIsAll(isAll);
  972. r->setResultRaw(len, result, ResultFormatRaw);
  973. r->setResultStatus(ResultStatusCalculated);
  974. }
  975. else
  976. throw MakeStringException(TE_UnexpectedParameters, "Unexpected parameters to setResultData");
  977. }
  978. virtual void setResultString(const char *name, unsigned sequence, int len, const char *result) override
  979. {
  980. WorkunitUpdate w(&workunit->lock());
  981. Owned<IWUResult> r = updateWorkUnitResult(w, name, sequence);
  982. if (r)
  983. {
  984. r->setResultString(result, len);
  985. r->setResultStatus(ResultStatusCalculated);
  986. }
  987. else
  988. throw MakeStringException(TE_UnexpectedParameters, "Unexpected parameters to setResultString");
  989. }
  990. virtual void setResultUnicode(const char * name, unsigned sequence, int len, UChar const * result) override
  991. {
  992. WorkunitUpdate w(&workunit->lock());
  993. Owned<IWUResult> r = updateWorkUnitResult(w, name, sequence);
  994. if (r)
  995. {
  996. r->setResultUnicode(result, len);
  997. r->setResultStatus(ResultStatusCalculated);
  998. }
  999. else
  1000. throw MakeStringException(TE_UnexpectedParameters, "Unexpected parameters to setResultUnicode");
  1001. }
  1002. virtual void setResultVarString(const char * name, unsigned sequence, const char *result) override
  1003. {
  1004. WorkunitUpdate w(&workunit->lock());
  1005. Owned<IWUResult> r = updateWorkUnitResult(w, name, sequence);
  1006. if (r)
  1007. {
  1008. r->setResultString(result, strlen(result));
  1009. r->setResultStatus(ResultStatusCalculated);
  1010. }
  1011. else
  1012. throw MakeStringException(TE_UnexpectedParameters, "Unexpected parameters to setResultVarString");
  1013. }
  1014. virtual void setResultUInt(const char *name, unsigned sequence, unsigned __int64 result, unsigned size) override
  1015. {
  1016. WorkunitUpdate w(&workunit->lock());
  1017. Owned<IWUResult> r = updateWorkUnitResult(w, name, sequence);
  1018. if (r)
  1019. {
  1020. r->setResultUInt(result);
  1021. r->setResultStatus(ResultStatusCalculated);
  1022. }
  1023. else
  1024. throw MakeStringException(TE_UnexpectedParameters, "Unexpected parameters to setResultUInt");
  1025. }
  1026. virtual void setResultVarUnicode(const char * stepname, unsigned sequence, UChar const *val) override
  1027. {
  1028. setResultUnicode(stepname, sequence, rtlUnicodeStrlen(val), val);
  1029. }
  1030. virtual bool getResultBool(const char * stepname, unsigned sequence) override
  1031. {
  1032. PROTECTED_GETRESULT(stepname, sequence, "Bool", "bool",
  1033. return r->getResultBool();
  1034. );
  1035. }
  1036. virtual void getResultData(unsigned & tlen, void * & tgt, const char * stepname, unsigned sequence) override
  1037. {
  1038. PROTECTED_GETRESULT(stepname, sequence, "Data", "data",
  1039. SCMStringBuffer result;
  1040. r->getResultString(result, false);
  1041. tlen = result.length();
  1042. tgt = (char *)result.s.detach();
  1043. );
  1044. }
  1045. virtual void getResultDecimal(unsigned tlen, int precision, bool isSigned, void * tgt, const char * stepname, unsigned sequence) override
  1046. {
  1047. PROTECTED_GETRESULT(stepname, sequence, "Decimal", "decimal",
  1048. r->getResultDecimal(tgt, tlen, precision, isSigned);
  1049. );
  1050. }
  1051. virtual void getResultRaw(unsigned & tlen, void * & tgt, const char * stepname, unsigned sequence, IXmlToRowTransformer * xmlTransformer, ICsvToRowTransformer * csvTransformer) override
  1052. {
  1053. tgt = NULL;
  1054. PROTECTED_GETRESULT(stepname, sequence, "Raw", "raw",
  1055. Variable2IDataVal result(&tlen, &tgt);
  1056. Owned<IXmlToRawTransformer> rawXmlTransformer = createXmlRawTransformer(xmlTransformer);
  1057. Owned<ICsvToRawTransformer> rawCsvTransformer = createCsvRawTransformer(csvTransformer);
  1058. r->getResultRaw(result, rawXmlTransformer, rawCsvTransformer);
  1059. );
  1060. }
  1061. virtual void getResultSet(bool & isAll, unsigned & tlen, void * & tgt, const char * stepname, unsigned sequence, IXmlToRowTransformer * xmlTransformer, ICsvToRowTransformer * csvTransformer) override
  1062. {
  1063. tgt = NULL;
  1064. PROTECTED_GETRESULT(stepname, sequence, "Raw", "raw",
  1065. Variable2IDataVal result(&tlen, &tgt);
  1066. Owned<IXmlToRawTransformer> rawXmlTransformer = createXmlRawTransformer(xmlTransformer);
  1067. Owned<ICsvToRawTransformer> rawCsvTransformer = createCsvRawTransformer(csvTransformer);
  1068. isAll = r->getResultIsAll();
  1069. r->getResultRaw(result, rawXmlTransformer, rawCsvTransformer);
  1070. );
  1071. }
  1072. virtual __int64 getResultInt(const char * name, unsigned sequence) override
  1073. {
  1074. PROTECTED_GETRESULT(name, sequence, "Int", "integer",
  1075. return r->getResultInt();
  1076. );
  1077. }
  1078. virtual double getResultReal(const char * name, unsigned sequence) override
  1079. {
  1080. PROTECTED_GETRESULT(name, sequence, "Real", "real",
  1081. return r->getResultReal();
  1082. );
  1083. }
  1084. virtual void getResultString(unsigned & tlen, char * & tgt, const char * stepname, unsigned sequence) override
  1085. {
  1086. PROTECTED_GETRESULT(stepname, sequence, "String", "string",
  1087. SCMStringBuffer result;
  1088. r->getResultString(result, false);
  1089. tlen = result.length();
  1090. tgt = (char *)result.s.detach();
  1091. );
  1092. }
  1093. virtual void getResultStringF(unsigned tlen, char * tgt, const char * stepname, unsigned sequence) override
  1094. {
  1095. PROTECTED_GETRESULT(stepname, sequence, "String", "string",
  1096. SCMStringBuffer result;
  1097. r->getResultString(result, false);
  1098. rtlStrToStr(tlen, tgt, result.length(), result.s.str());
  1099. );
  1100. }
  1101. virtual void getResultUnicode(unsigned & tlen, UChar * & tgt, const char * stepname, unsigned sequence) override
  1102. {
  1103. PROTECTED_GETRESULT(stepname, sequence, "Unicode", "unicode",
  1104. MemoryBuffer result;
  1105. r->getResultUnicode(MemoryBuffer2IDataVal(result));
  1106. tlen = result.length()/2;
  1107. tgt = (UChar *)malloc(tlen*2);
  1108. memcpy_iflen(tgt, result.toByteArray(), tlen*2);
  1109. );
  1110. }
  1111. virtual char * getResultVarString(const char * stepname, unsigned sequence) override
  1112. {
  1113. PROTECTED_GETRESULT(stepname, sequence, "VarString", "string",
  1114. SCMStringBuffer result;
  1115. r->getResultString(result, false);
  1116. return result.s.detach();
  1117. );
  1118. }
  1119. virtual UChar * getResultVarUnicode(const char * stepname, unsigned sequence) override
  1120. {
  1121. PROTECTED_GETRESULT(stepname, sequence, "VarUnicode", "unicode",
  1122. MemoryBuffer result;
  1123. r->getResultUnicode(MemoryBuffer2IDataVal(result));
  1124. result.append((UChar)0);
  1125. return (UChar *)result.detach();
  1126. );
  1127. }
  1128. virtual unsigned getResultHash(const char * name, unsigned sequence) override
  1129. {
  1130. PROTECTED_GETRESULT(name, sequence, "Hash", "hash",
  1131. return r->getResultHash();
  1132. );
  1133. }
  1134. virtual unsigned getExternalResultHash(const char * wuid, const char * stepname, unsigned sequence) override
  1135. {
  1136. try
  1137. {
  1138. LOG(MCdebugProgress, thorJob, "getExternalResultRaw %s", stepname);
  1139. Owned<IConstWUResult> r = getExternalResult(wuid, stepname, sequence);
  1140. return r->getResultHash();
  1141. }
  1142. catch (CATCHALL)
  1143. {
  1144. throw MakeStringException(TE_FailedToRetrieveWorkunitValue, "Failed to retrieve external data hash %s from workunit %s", stepname, wuid);
  1145. }
  1146. }
  1147. virtual void getResultRowset(size32_t & tcount, const byte * * & tgt, const char * stepname, unsigned sequence, IEngineRowAllocator * _rowAllocator, bool isGrouped, IXmlToRowTransformer * xmlTransformer, ICsvToRowTransformer * csvTransformer) override
  1148. {
  1149. tgt = NULL;
  1150. PROTECTED_GETRESULT(stepname, sequence, "Rowset", "rowset",
  1151. MemoryBuffer datasetBuffer;
  1152. MemoryBuffer2IDataVal result(datasetBuffer);
  1153. Owned<IXmlToRawTransformer> rawXmlTransformer = createXmlRawTransformer(xmlTransformer);
  1154. Owned<ICsvToRawTransformer> rawCsvTransformer = createCsvRawTransformer(csvTransformer);
  1155. r->getResultRaw(result, rawXmlTransformer, rawCsvTransformer);
  1156. Owned<IOutputRowDeserializer> deserializer = _rowAllocator->createDiskDeserializer(this);
  1157. rtlDataset2RowsetX(tcount, tgt, _rowAllocator, deserializer, datasetBuffer.length(), datasetBuffer.toByteArray(), isGrouped);
  1158. );
  1159. }
  1160. virtual void getResultDictionary(size32_t & tcount, const byte * * & tgt, IEngineRowAllocator * _rowAllocator, const char * stepname, unsigned sequence, IXmlToRowTransformer * xmlTransformer, ICsvToRowTransformer * csvTransformer, IHThorHashLookupInfo * hasher) override
  1161. {
  1162. tcount = 0;
  1163. tgt = NULL;
  1164. PROTECTED_GETRESULT(stepname, sequence, "Dictionary", "dictionary",
  1165. MemoryBuffer datasetBuffer;
  1166. MemoryBuffer2IDataVal result(datasetBuffer);
  1167. Owned<IXmlToRawTransformer> rawXmlTransformer = createXmlRawTransformer(xmlTransformer);
  1168. Owned<ICsvToRawTransformer> rawCsvTransformer = createCsvRawTransformer(csvTransformer);
  1169. r->getResultRaw(result, rawXmlTransformer, rawCsvTransformer);
  1170. Owned<IOutputRowDeserializer> deserializer = _rowAllocator->createDiskDeserializer(this);
  1171. rtlDeserializeDictionary(tcount, tgt, _rowAllocator, deserializer, datasetBuffer.length(), datasetBuffer.toByteArray());
  1172. );
  1173. }
  1174. virtual void getExternalResultRaw(unsigned & tlen, void * & tgt, const char * wuid, const char * stepname, unsigned sequence, IXmlToRowTransformer * xmlTransformer, ICsvToRowTransformer * csvTransformer) override
  1175. {
  1176. tgt = NULL;
  1177. try
  1178. {
  1179. LOG(MCdebugProgress, thorJob, "getExternalResultRaw %s", stepname);
  1180. Variable2IDataVal result(&tlen, &tgt);
  1181. Owned<IConstWUResult> r = getExternalResult(wuid, stepname, sequence);
  1182. Owned<IXmlToRawTransformer> rawXmlTransformer = createXmlRawTransformer(xmlTransformer);
  1183. Owned<ICsvToRawTransformer> rawCsvTransformer = createCsvRawTransformer(csvTransformer);
  1184. r->getResultRaw(result, rawXmlTransformer, rawCsvTransformer);
  1185. }
  1186. catch (CATCHALL)
  1187. {
  1188. throw MakeStringException(TE_FailedToRetrieveWorkunitValue, "Failed to retrieve external data value %s from workunit %s", stepname, wuid);
  1189. }
  1190. }
  1191. virtual void addWuException(const char * text, unsigned code, unsigned severity, const char * source) override
  1192. {
  1193. addWuExceptionEx(text, code, severity, MSGAUD_programmer, source);
  1194. }
  1195. virtual void addWuAssertFailure(unsigned code, const char * text, const char * filename, unsigned lineno, unsigned column, bool isAbort) override
  1196. {
  1197. DBGLOG("%s", text);
  1198. try
  1199. {
  1200. Owned<IWorkUnit> w = updateWorkUnit();
  1201. unsigned activity = 0;
  1202. addExceptionToWorkunit(w, SeverityError, "user", code, text, filename, lineno, column, activity);
  1203. }
  1204. catch (IException *E)
  1205. {
  1206. StringBuffer m;
  1207. E->errorMessage(m);
  1208. IERRLOG("Unable to record exception in workunit: %s", m.str());
  1209. E->Release();
  1210. }
  1211. catch (...)
  1212. {
  1213. IERRLOG("Unable to record exception in workunit: unknown exception");
  1214. }
  1215. if (isAbort)
  1216. throw makeStringException(MSGAUD_user, code, text);
  1217. }
  1218. virtual unsigned __int64 getFileOffset(const char *logicalName) override { assertex(false); return 0; }
  1219. virtual unsigned getNodes() override { return jobChannel.queryJob().querySlaves(); }
  1220. virtual unsigned getNodeNum() override { throw MakeThorException(0, "Unsupported. getNodeNum() called in master"); return (unsigned)-1; }
  1221. virtual char *getFilePart(const char *logicalName, bool create=false) override { assertex(false); return NULL; }
  1222. virtual unsigned __int64 getDatasetHash(const char * name, unsigned __int64 hash) override
  1223. {
  1224. StringBuffer fullname('~');
  1225. expandLogicalName(fullname, name);
  1226. Owned<IDistributedFile> iDfsFile = queryThorFileManager().lookup(jobChannel.queryJob(), fullname, false, true, false, defaultPrivilegedUser); // NB: do not update accessed
  1227. if (iDfsFile.get())
  1228. {
  1229. // NB: if the file or any of the subfiles are compressed, then we cannot rely on the checksum
  1230. // use the hash of the modified time instead (which is what roxie and hthor do for all files).
  1231. bool useModifiedTime = false;
  1232. IDistributedSuperFile * super = iDfsFile->querySuperFile();
  1233. if (super)
  1234. {
  1235. Owned<IDistributedFileIterator> iter = super->getSubFileIterator(true);
  1236. ForEach(*iter)
  1237. {
  1238. if (iter->query().isCompressed())
  1239. {
  1240. useModifiedTime = true;
  1241. break;
  1242. }
  1243. }
  1244. }
  1245. else if (iDfsFile->isCompressed())
  1246. useModifiedTime = true;
  1247. unsigned checkSum = 0;
  1248. if (!useModifiedTime && iDfsFile->getFileCheckSum(checkSum))
  1249. hash ^= checkSum;
  1250. else
  1251. hash = crcLogicalFileTime(iDfsFile, hash, fullname);
  1252. }
  1253. return hash;
  1254. }
  1255. virtual IDistributedFileTransaction *querySuperFileTransaction() override
  1256. {
  1257. if (!superfiletransaction.get())
  1258. superfiletransaction.setown(createDistributedFileTransaction(userDesc, this));
  1259. return superfiletransaction.get();
  1260. }
  1261. virtual char *getJobName() override
  1262. {
  1263. return strdup(workunit->queryJobName());
  1264. }
  1265. virtual char *getClusterName() override
  1266. {
  1267. return strdup(workunit->queryClusterName());
  1268. }
  1269. virtual char *getGroupName() override
  1270. {
  1271. StringBuffer out;
  1272. if (globals)
  1273. globals->getProp("@nodeGroup",out);
  1274. return out.detach();
  1275. }
  1276. // ICodeContextExt impl.
  1277. virtual IConstWUResult *getExternalResult(const char * wuid, const char *name, unsigned sequence) override
  1278. {
  1279. Owned<IWorkUnitFactory> factory = getWorkUnitFactory();
  1280. Owned<IConstWorkUnit> externalWU = factory->openWorkUnit(wuid);
  1281. if (!externalWU)
  1282. throw MakeStringException(TE_FailedToRetrieveWorkunitValue, "workunit %s not found, retrieving value %s", wuid, name);
  1283. externalWU->remoteCheckAccess(userDesc, false);
  1284. return getWorkUnitResult(externalWU, name, sequence);
  1285. }
  1286. virtual IConstWUResult *getResultForGet(const char *name, unsigned sequence) override
  1287. {
  1288. Owned<IConstWUResult> r = getResult(name, sequence);
  1289. if (!r || (r->getResultStatus() == ResultStatusUndefined))
  1290. {
  1291. StringBuffer s;
  1292. throw MakeStringException(TE_FailedToRetrieveWorkunitValue, "value %s in workunit is undefined", getResultText(s,name,sequence));
  1293. }
  1294. return r.getClear();
  1295. }
  1296. virtual IWorkUnit *updateWorkUnit() const override
  1297. {
  1298. return &workunit->lock();
  1299. }
  1300. virtual void addWuExceptionEx(const char * text, unsigned code, unsigned severity, unsigned audience, const char * source) override
  1301. {
  1302. LOG(mapToLogMsgCategory((ErrorSeverity)severity, (MessageAudience)audience), "%s", text);
  1303. try
  1304. {
  1305. Owned<IWorkUnit> w = updateWorkUnit();
  1306. Owned<IWUException> we = w->createException();
  1307. we->setSeverity((ErrorSeverity)severity);
  1308. we->setExceptionMessage(text);
  1309. we->setExceptionSource(source);
  1310. if (code)
  1311. we->setExceptionCode(code);
  1312. }
  1313. catch (IException *E)
  1314. {
  1315. StringBuffer m;
  1316. E->errorMessage(m);
  1317. IERRLOG("Unable to record exception in workunit: %s", m.str());
  1318. E->Release();
  1319. }
  1320. catch (...)
  1321. {
  1322. IERRLOG("Unable to record exception in workunit: unknown exception");
  1323. }
  1324. }
  1325. };
  1326. class CThorCodeContextMasterSharedMem : public CThorCodeContextMaster
  1327. {
  1328. IThorAllocator *sharedAllocator;
  1329. public:
  1330. CThorCodeContextMasterSharedMem(CJobChannel &jobChannel, IThorAllocator *_sharedAllocator, IConstWorkUnit &_workunit, ILoadedDllEntry &querySo, IUserDescriptor &userDesc)
  1331. : CThorCodeContextMaster(jobChannel, _workunit, querySo, userDesc)
  1332. {
  1333. sharedAllocator = _sharedAllocator;
  1334. }
  1335. virtual IEngineRowAllocator *getRowAllocator(IOutputMetaData * meta, unsigned activityId) const
  1336. {
  1337. return sharedAllocator->getRowAllocator(meta, activityId);
  1338. }
  1339. };
  1340. /////////////
  1341. //
  1342. // CJobMaster
  1343. //
  1344. void loadPlugin(SafePluginMap *pluginMap, const char *_path, const char *name)
  1345. {
  1346. StringBuffer path(_path);
  1347. path.append(name);
  1348. pluginMap->addPlugin(path.str(), name);
  1349. }
  1350. CJobMaster::CJobMaster(IConstWorkUnit &_workunit, const char *graphName, ILoadedDllEntry *querySo, bool _sendSo, const SocketEndpoint &_agentEp)
  1351. : CJobBase(querySo, graphName), workunit(&_workunit), sendSo(_sendSo), agentEp(_agentEp)
  1352. {
  1353. SCMStringBuffer _token, _scope;
  1354. workunit->getScope(_scope);
  1355. workunit->getWorkunitDistributedAccessToken(_token);
  1356. wuid.set(workunit->queryWuid());
  1357. user.set(workunit->queryUser());
  1358. token.append(_token.str());
  1359. scope.append(_scope.str());
  1360. numChannels = 1;
  1361. init();
  1362. if (workunit->hasDebugValue("GlobalId"))
  1363. {
  1364. SCMStringBuffer txId;
  1365. workunit->getDebugValue("GlobalId", txId);
  1366. if (txId.length())
  1367. {
  1368. SocketEndpoint thorEp;
  1369. thorEp.setLocalHost(getMachinePortBase());
  1370. logctx->setGlobalId(txId.str(), thorEp, 0);
  1371. SCMStringBuffer callerId;
  1372. workunit->getDebugValue("CallerId", callerId);
  1373. if (callerId.length())
  1374. logctx->setCallerId(callerId.str());
  1375. VStringBuffer msg("GlobalId: %s", txId.str());
  1376. workunit->getDebugValue("CallerId", txId);
  1377. if (txId.length())
  1378. msg.append(", CallerId: ").append(txId.str());
  1379. txId.set(logctx->queryLocalId());
  1380. if (txId.length())
  1381. msg.append(", LocalId: ").append(txId.str());
  1382. logctx->CTXLOG("%s", msg.str());
  1383. }
  1384. }
  1385. resumed = WUActionResume == workunit->getAction();
  1386. fatalHandler.setown(new CFatalHandler(globals->getPropInt("@fatal_timeout", FATAL_TIMEOUT)));
  1387. querySent = spillsSaved = false;
  1388. StringBuffer pluginsDir;
  1389. globals->getProp("@pluginsPath", pluginsDir);
  1390. if (pluginsDir.length())
  1391. addPathSepChar(pluginsDir);
  1392. Owned<IConstWUPluginIterator> pluginIter = &workunit->getPlugins();
  1393. ForEach(*pluginIter)
  1394. {
  1395. IConstWUPlugin &plugin = pluginIter->query();
  1396. SCMStringBuffer name;
  1397. plugin.getPluginName(name);
  1398. loadPlugin(pluginMap, pluginsDir.str(), name.str());
  1399. }
  1400. applyMemorySettings("manager");
  1401. sharedAllocator.setown(::createThorAllocator(queryMemoryMB, 0, 1, memorySpillAtPercentage, *logctx, crcChecking, usePackedAllocator));
  1402. Owned<IMPServer> mpServer = getMPServer();
  1403. CJobChannel *channel = addChannel(mpServer);
  1404. channel->reservePortKind(TPORT_mp);
  1405. channel->reservePortKind(TPORT_watchdog);
  1406. channel->reservePortKind(TPORT_debug);
  1407. slavemptag = allocateMPTag();
  1408. slaveMsgHandler.setown(new CSlaveMessageHandler(*this, slavemptag));
  1409. tmpHandler.setown(createTempHandler(true));
  1410. xgmml.set(graphXGMML);
  1411. StringBuffer tempDir(globals->queryProp("@thorTempDirectory"));
  1412. #ifdef _CONTAINERIZED
  1413. // multiple thor jobs can be running on same node, sharing same local disk for temp storage.
  1414. // make unique by adding wuid+graphName+worker-num
  1415. VStringBuffer uniqueSubDir("%s_%s_0", workunit->queryWuid(), graphName); // 0 denotes master (workers = 1..N)
  1416. SetTempDir(tempDir, uniqueSubDir, "thtmp", false); // no point in clearing as dir. is unique per job+graph
  1417. #else
  1418. // in bare-metal where only 1 Thor instance of each name, re-use same dir, in order to guarantee it will be cleared on startup
  1419. VStringBuffer uniqueSubDir("%s_0", globals->queryProp("@name")); // 0 denotes master (workers = 1..N)
  1420. SetTempDir(tempDir, uniqueSubDir, "thtmp", true);
  1421. #endif
  1422. }
  1423. void CJobMaster::endJob()
  1424. {
  1425. if (jobEnded)
  1426. return;
  1427. slaveMsgHandler.clear();
  1428. freeMPTag(slavemptag);
  1429. tmpHandler.clear();
  1430. PARENT::endJob();
  1431. }
  1432. CJobChannel *CJobMaster::addChannel(IMPServer *mpServer)
  1433. {
  1434. CJobChannel *channel = new CJobMasterChannel(*this, mpServer, jobChannels.ordinality());
  1435. jobChannels.append(*channel);
  1436. return channel;
  1437. }
  1438. static IException *createBCastException(unsigned slave, const char *errorMsg)
  1439. {
  1440. // think this should always be fatal, could check link down here, or in general and flag as _shutdown.
  1441. StringBuffer msg("General failure communicating to slave");
  1442. if (slave)
  1443. msg.append("(").append(slave).append(") ");
  1444. else
  1445. msg.append("s ");
  1446. Owned<IThorException> e = MakeThorException(0, "%s", msg.append(" [").append(errorMsg).append("]").str());
  1447. e->setAction(tea_shutdown);
  1448. return e.getClear();
  1449. }
  1450. mptag_t CJobMaster::allocateMPTag()
  1451. {
  1452. mptag_t tag = allocateClusterMPTag();
  1453. queryJobChannel(0).queryJobComm().flush(tag);
  1454. LOG(MCthorDetailedDebugInfo, thorJob, "allocateMPTag: tag = %d", (int)tag);
  1455. return tag;
  1456. }
  1457. void CJobMaster::freeMPTag(mptag_t tag)
  1458. {
  1459. if (TAG_NULL != tag)
  1460. {
  1461. freeClusterMPTag(tag);
  1462. LOG(MCthorDetailedDebugInfo, thorJob, "freeMPTag: tag = %d", (int)tag);
  1463. queryJobChannel(0).queryJobComm().flush(tag);
  1464. }
  1465. }
  1466. void CJobMaster::broadcast(ICommunicator &comm, CMessageBuffer &msg, mptag_t mptag, unsigned timeout, const char *errorMsg, CReplyCancelHandler *msgHandler, bool sendOnly)
  1467. {
  1468. unsigned groupSizeExcludingMaster = comm.queryGroup().ordinality() - 1;
  1469. mptag_t replyTag = TAG_NULL;
  1470. if (!sendOnly)
  1471. {
  1472. replyTag = queryJobChannel(0).queryMPServer().createReplyTag();
  1473. msg.setReplyTag(replyTag);
  1474. }
  1475. if (globals->getPropBool("@broadcastSendAsync", true)) // only here in case of problems/debugging.
  1476. {
  1477. class CSendAsyncfor : public CAsyncFor
  1478. {
  1479. CMessageBuffer &msg;
  1480. mptag_t mptag;
  1481. unsigned timeout;
  1482. StringAttr errorMsg;
  1483. ICommunicator &comm;
  1484. public:
  1485. CSendAsyncfor(ICommunicator &_comm, CMessageBuffer &_msg, mptag_t _mptag, unsigned _timeout, const char *_errorMsg)
  1486. : comm(_comm), msg(_msg), mptag(_mptag), timeout(_timeout), errorMsg(_errorMsg)
  1487. {
  1488. }
  1489. void Do(unsigned i)
  1490. {
  1491. if (!comm.send(msg, i+1, mptag, timeout))
  1492. throw createBCastException(i+1, errorMsg);
  1493. }
  1494. } afor(comm, msg, mptag, timeout, errorMsg);
  1495. try
  1496. {
  1497. afor.For(groupSizeExcludingMaster, groupSizeExcludingMaster);
  1498. }
  1499. catch (IException *e)
  1500. {
  1501. EXCLOG(e, "broadcastSendAsync");
  1502. abort(e);
  1503. throw;
  1504. }
  1505. }
  1506. else if (!comm.send(msg, RANK_ALL_OTHER, mptag, timeout))
  1507. {
  1508. Owned<IException> e = createBCastException(0, errorMsg);
  1509. EXCLOG(e, NULL);
  1510. abort(e);
  1511. throw e.getClear();
  1512. }
  1513. if (sendOnly) return;
  1514. unsigned respondents = 0;
  1515. Owned<IBitSet> bitSet = createThreadSafeBitSet();
  1516. for (;;)
  1517. {
  1518. rank_t sender;
  1519. CMessageBuffer msg;
  1520. bool r = msgHandler ? msgHandler->recv(comm, msg, RANK_ALL, replyTag, &sender, LONGTIMEOUT)
  1521. : comm.recv(msg, RANK_ALL, replyTag, &sender, LONGTIMEOUT);
  1522. if (!r)
  1523. {
  1524. StringBuffer tmpStr;
  1525. if (errorMsg)
  1526. tmpStr.append(": ").append(errorMsg).append(" - ");
  1527. tmpStr.append("Timeout receiving from slaves - no reply from: [");
  1528. unsigned s = bitSet->scan(0, false);
  1529. assertex(s<querySlaves()); // must be at least one
  1530. tmpStr.append(s+1);
  1531. for (;;)
  1532. {
  1533. s = bitSet->scan(s+1, false);
  1534. if (s>=querySlaves())
  1535. break;
  1536. tmpStr.append(",").append(s+1);
  1537. }
  1538. tmpStr.append("]");
  1539. Owned<IException> e = MakeThorFatal(NULL, 0, " %s", tmpStr.str());
  1540. EXCLOG(e, NULL);
  1541. throw e.getClear();
  1542. }
  1543. bool error;
  1544. msg.read(error);
  1545. if (error)
  1546. {
  1547. Owned<IThorException> e = deserializeThorException(msg);
  1548. e->setSlave(sender);
  1549. throw e.getClear();
  1550. }
  1551. ++respondents;
  1552. bitSet->set((unsigned)sender-1);
  1553. if (respondents == groupSizeExcludingMaster)
  1554. break;
  1555. }
  1556. }
  1557. IPropertyTree *CJobMaster::prepareWorkUnitInfo()
  1558. {
  1559. Owned<IPropertyTree> workUnitInfo = createPTree("workUnitInfo");
  1560. workUnitInfo->setProp("wuid", wuid);
  1561. workUnitInfo->setProp("user", user);
  1562. workUnitInfo->setProp("token", token);
  1563. workUnitInfo->setProp("scope", scope);
  1564. Owned<IConstWUPluginIterator> pluginIter = &queryWorkUnit().getPlugins();
  1565. IPropertyTree *plugins = NULL;
  1566. ForEach(*pluginIter)
  1567. {
  1568. IConstWUPlugin &thisplugin = pluginIter->query();
  1569. if (!plugins)
  1570. plugins = workUnitInfo->addPropTree("plugins", createPTree());
  1571. SCMStringBuffer name;
  1572. thisplugin.getPluginName(name);
  1573. IPropertyTree *plugin = plugins->addPropTree("plugin", createPTree());
  1574. plugin->setProp("@name", name.str());
  1575. }
  1576. IPropertyTree *debug = workUnitInfo->addPropTree("Debug", createPTree(ipt_caseInsensitive));
  1577. SCMStringBuffer debugStr, valueStr;
  1578. Owned<IStringIterator> debugIter = &queryWorkUnit().getDebugValues();
  1579. ForEach (*debugIter)
  1580. {
  1581. debugIter->str(debugStr);
  1582. queryWorkUnit().getDebugValue(debugStr.str(), valueStr);
  1583. debug->setProp(debugStr.str(), valueStr.str());
  1584. }
  1585. return workUnitInfo.getClear();
  1586. }
  1587. void CJobMaster::sendQuery()
  1588. {
  1589. CriticalBlock b(sendQueryCrit);
  1590. if (querySent) return;
  1591. CMessageBuffer tmp;
  1592. tmp.append(slavemptag);
  1593. tmp.append(queryWuid());
  1594. tmp.append(graphName);
  1595. const char *soName = queryDllEntry().queryName();
  1596. PROGLOG("Query dll: %s", soName);
  1597. tmp.append(soName);
  1598. #ifndef _CONTAINERIZED
  1599. tmp.append(sendSo);
  1600. if (sendSo)
  1601. {
  1602. CTimeMon atimer;
  1603. OwnedIFile iFile = createIFile(soName);
  1604. OwnedIFileIO iFileIO = iFile->open(IFOread);
  1605. size32_t sz = (size32_t)iFileIO->size();
  1606. tmp.append(sz);
  1607. read(iFileIO, 0, sz, tmp);
  1608. PROGLOG("Loading query for serialization to slaves took %d ms", atimer.elapsed());
  1609. }
  1610. queryJobManager().addCachedSo(soName);
  1611. #endif
  1612. Owned<IPropertyTree> deps = createPTree(queryXGMML()->queryName());
  1613. Owned<IPropertyTreeIterator> edgeIter = queryXGMML()->getElements("edge"); // JCSMORE trim to those actually needed
  1614. ForEach (*edgeIter)
  1615. {
  1616. IPropertyTree &edge = edgeIter->query();
  1617. deps->addPropTree("edge", LINK(&edge));
  1618. }
  1619. Owned<IPropertyTree> workUnitInfo = prepareWorkUnitInfo();
  1620. workUnitInfo->serialize(tmp);
  1621. deps->serialize(tmp);
  1622. CMessageBuffer msg;
  1623. msg.append(QueryInit);
  1624. compressToBuffer(msg, tmp.length(), tmp.toByteArray());
  1625. CTimeMon queryToSlavesTimer;
  1626. querySent = true;
  1627. broadcast(queryNodeComm(), msg, masterSlaveMpTag, LONGTIMEOUT, "sendQuery");
  1628. PROGLOG("Serialization of query init info (%d bytes) to slaves took %d ms", msg.length(), queryToSlavesTimer.elapsed());
  1629. }
  1630. void CJobMaster::jobDone()
  1631. {
  1632. if (!querySent) return;
  1633. CMessageBuffer msg;
  1634. msg.append(QueryDone);
  1635. msg.append(queryKey());
  1636. broadcast(queryNodeComm(), msg, masterSlaveMpTag, LONGTIMEOUT, "jobDone");
  1637. }
  1638. void CJobMaster::saveSpills()
  1639. {
  1640. CriticalBlock b(spillCrit);
  1641. if (spillsSaved)
  1642. return;
  1643. spillsSaved = true;
  1644. PROGLOG("Paused, saving spills..");
  1645. assertex(!queryUseCheckpoints()); // JCSMORE - checkpoints probably need revisiting
  1646. unsigned numSavedSpills = 0;
  1647. // stash away spills ready for resume, make them owned by workunit in event of abort/delete
  1648. IGraphTempHandler *tempHandler = queryTempHandler(false);
  1649. if (tempHandler)
  1650. {
  1651. Owned<IFileUsageIterator> iter = tempHandler->getIterator();
  1652. ForEach(*iter)
  1653. {
  1654. CFileUsageEntry &entry = iter->query();
  1655. StringAttr tmpName = entry.queryName();
  1656. if (WUGraphComplete == workunit->queryNodeState(queryGraphName(), entry.queryGraphId()))
  1657. {
  1658. IArrayOf<IGroup> groups;
  1659. StringArray clusters;
  1660. fillClusterArray(*this, tmpName, clusters, groups);
  1661. Owned<IFileDescriptor> fileDesc = queryThorFileManager().create(*this, tmpName, clusters, groups, true, TDXtemporary|TDWnoreplicate);
  1662. fileDesc->queryProperties().setPropBool("@pausefile", true); // JCSMORE - mark to keep, may be able to distinguish via other means
  1663. fileDesc->queryProperties().setProp("@kind", "flat");
  1664. IPropertyTree &props = fileDesc->queryProperties();
  1665. props.setPropBool("@owned", true);
  1666. bool blockCompressed=true; // JCSMORE, should come from helper really
  1667. if (blockCompressed)
  1668. props.setPropBool("@blockCompressed", true);
  1669. Owned<IDistributedFile> file = queryDistributedFileDirectory().createNew(fileDesc);
  1670. // NB: This is renaming/moving from temp path
  1671. StringBuffer newName;
  1672. queryThorFileManager().addScope(*this, tmpName, newName, true, true);
  1673. verifyex(file->renamePhysicalPartFiles(newName.str(), NULL, NULL, queryBaseDirectory(grp_unknown)));
  1674. file->attach(newName,userDesc);
  1675. Owned<IWorkUnit> wu = &queryWorkUnit().lock();
  1676. wu->addFile(newName, &clusters, entry.queryUsage(), entry.queryKind(), queryGraphName());
  1677. ++numSavedSpills;
  1678. }
  1679. }
  1680. }
  1681. PROGLOG("Paused, %d spill(s) saved.", numSavedSpills);
  1682. }
  1683. bool CJobMaster::go()
  1684. {
  1685. class CWorkunitPauseHandler : public CInterface, implements IWorkUnitSubscriber
  1686. {
  1687. CJobMaster &job;
  1688. IConstWorkUnit &wu;
  1689. Owned<IWorkUnitWatcher> watcher;
  1690. CriticalSection crit;
  1691. public:
  1692. IMPLEMENT_IINTERFACE;
  1693. CWorkunitPauseHandler(CJobMaster &_job, IConstWorkUnit &_wu) : job(_job), wu(_wu)
  1694. {
  1695. Owned<IWorkUnitFactory> factory = getWorkUnitFactory();
  1696. watcher.setown(factory->getWatcher(this, (WUSubscribeOptions) (SubscribeOptionAction | SubscribeOptionAbort), wu.queryWuid()));
  1697. }
  1698. ~CWorkunitPauseHandler() { stop(); }
  1699. void stop()
  1700. {
  1701. Owned<IWorkUnitWatcher> _watcher;
  1702. {
  1703. CriticalBlock b(crit);
  1704. if (!watcher)
  1705. return;
  1706. _watcher.setown(watcher.getClear());
  1707. }
  1708. _watcher->unsubscribe();
  1709. }
  1710. virtual void notify(WUSubscribeOptions flags, unsigned valueLen, const void *valueData) override
  1711. {
  1712. CriticalBlock b(crit);
  1713. if (!watcher)
  1714. return;
  1715. if (flags & SubscribeOptionAbort)
  1716. {
  1717. Owned<IWorkUnitFactory> factory = getWorkUnitFactory();
  1718. if (factory->isAborting(wu.queryWuid()))
  1719. {
  1720. LOG(MCwarning, thorJob, "ABORT detected from user");
  1721. unsigned code = TE_WorkUnitAborting; // default
  1722. if (job.getOptBool("dumpInfoOnUserAbort", false))
  1723. code = TE_WorkUnitAbortingDumpInfo;
  1724. else if ((1 == valueLen) && ('2' == *((const char *)valueData)))
  1725. {
  1726. /* NB: Standard abort mechanism will trigger abort subscriber with "1"
  1727. * If "2" is signalled, it means - Abort with dump info.
  1728. */
  1729. code = TE_WorkUnitAbortingDumpInfo;
  1730. }
  1731. Owned <IException> e = MakeThorException(code, "User signalled abort");
  1732. job.fireException(e);
  1733. }
  1734. else
  1735. PROGLOG("CWorkunitPauseHandler [SubscribeOptionAbort] notifier called, workunit was not aborting");
  1736. }
  1737. if (flags & SubscribeOptionAction)
  1738. {
  1739. job.markWuDirty();
  1740. bool abort = false;
  1741. bool pause = false;
  1742. wu.forceReload();
  1743. WUAction action = wu.getAction();
  1744. if (action==WUActionPause)
  1745. {
  1746. // pause after current subgraph
  1747. pause = true;
  1748. }
  1749. else if (action==WUActionPauseNow)
  1750. {
  1751. // abort current subgraph
  1752. abort = true;
  1753. pause = true;
  1754. }
  1755. if (pause)
  1756. {
  1757. PROGLOG("Pausing job%s", abort?" [now]":"");
  1758. job.pause(abort);
  1759. }
  1760. }
  1761. }
  1762. } workunitPauseHandler(*this, *workunit);
  1763. class CQueryTimeoutHandler : public CTimeoutTrigger
  1764. {
  1765. CJobMaster &job;
  1766. public:
  1767. CQueryTimeoutHandler(CJobMaster &_job, unsigned timeout) : CTimeoutTrigger(timeout, "QUERY"), job(_job)
  1768. {
  1769. start();
  1770. inform(MakeThorException(TE_QueryTimeoutError, "Query took greater than %d seconds", timeout));
  1771. }
  1772. virtual bool action() override
  1773. {
  1774. job.fireException(exception);
  1775. return true;
  1776. }
  1777. };
  1778. Owned<CTimeoutTrigger> qtHandler;
  1779. int guillotineTimeout = workunit->getDebugValueInt("maxRunTime", 0);
  1780. if (guillotineTimeout > 0)
  1781. qtHandler.setown(new CQueryTimeoutHandler(*this, guillotineTimeout));
  1782. else if (guillotineTimeout < 0)
  1783. {
  1784. Owned<IException> e = MakeStringException(0, "Ignoring negative maxRunTime: %d", guillotineTimeout);
  1785. reportExceptionToWorkunit(*workunit, e);
  1786. }
  1787. if (WUActionPause == workunit->getAction() || WUActionPauseNow == workunit->getAction())
  1788. throw MakeStringException(0, "Job paused at start, exiting");
  1789. bool allDone = true;
  1790. unsigned concurrentSubGraphs = (unsigned)getWorkUnitValueInt("concurrentSubGraphs", globals->getPropInt("@concurrentSubGraphs", 1));
  1791. try
  1792. {
  1793. startJob();
  1794. workunit->setGraphState(queryGraphName(), getWfid(), WUGraphRunning);
  1795. Owned<IThorGraphIterator> iter = queryJobChannel(0).getSubGraphs();
  1796. CICopyArrayOf<CMasterGraph> toRun;
  1797. ForEach(*iter)
  1798. {
  1799. CMasterGraph &graph = (CMasterGraph &)iter->query();
  1800. if ((queryResumed() || queryUseCheckpoints()) && WUGraphComplete == workunit->queryNodeState(queryGraphName(), graph.queryGraphId()))
  1801. graph.setCompleteEx();
  1802. else
  1803. toRun.append(graph);
  1804. }
  1805. ForEachItemInRev(g, toRun)
  1806. {
  1807. if (aborted) break;
  1808. CMasterGraph &graph = toRun.item(g);
  1809. if (graph.isSink())
  1810. graph.execute(0, NULL, true, concurrentSubGraphs>1);
  1811. if (queryPausing()) break;
  1812. }
  1813. queryJobChannel(0).wait();
  1814. workunitPauseHandler.stop();
  1815. ForEachItemIn(tr, toRun)
  1816. {
  1817. CMasterGraph &graph = toRun.item(tr);
  1818. if (!graph.isComplete())
  1819. {
  1820. allDone = false;
  1821. break;
  1822. }
  1823. }
  1824. }
  1825. catch (IException *e) { fireException(e); e->Release(); }
  1826. catch (CATCHALL) { Owned<IException> e = MakeThorException(0, "Unknown exception running sub graphs"); fireException(e); }
  1827. workunit->setGraphState(queryGraphName(), getWfid(), aborted?WUGraphFailed:(allDone?WUGraphComplete:(pausing?WUGraphPaused:WUGraphComplete)));
  1828. if (queryPausing())
  1829. saveSpills();
  1830. Owned<IException> jobDoneException;
  1831. try { jobDone(); }
  1832. catch (IException *e)
  1833. {
  1834. EXCLOG(e, NULL);
  1835. jobDoneException.setown(e);
  1836. }
  1837. queryTempHandler()->clearTemps();
  1838. slaveMsgHandler->stop();
  1839. if (jobDoneException.get())
  1840. throw LINK(jobDoneException);
  1841. return allDone;
  1842. }
  1843. void CJobMaster::pause(bool doAbort)
  1844. {
  1845. pausing = true;
  1846. if (doAbort)
  1847. {
  1848. // reply will trigger DAMP_THOR_REPLY_PAUSED to agent, unless all graphs are already complete.
  1849. queryJobManager().replyException(*this, NULL);
  1850. // abort current graph asynchronously.
  1851. // After spill files have been saved, trigger timeout handler in case abort doesn't succeed.
  1852. Owned<IException> e = MakeThorException(0, "Unable to recover from pausenow");
  1853. class CAbortThread : implements IThreaded
  1854. {
  1855. CJobMaster &owner;
  1856. CThreaded threaded;
  1857. Linked<IException> exception;
  1858. public:
  1859. CAbortThread(CJobMaster &_owner, IException *_exception) : owner(_owner), exception(_exception), threaded("SaveSpillThread", this)
  1860. {
  1861. threaded.start();
  1862. }
  1863. ~CAbortThread()
  1864. {
  1865. threaded.join();
  1866. }
  1867. // IThreaded
  1868. virtual void threadmain() override
  1869. {
  1870. owner.abort(exception);
  1871. }
  1872. } abortThread(*this, e);
  1873. saveSpills();
  1874. fatalHandler->inform(e.getClear());
  1875. }
  1876. }
  1877. bool CJobMaster::queryCreatedFile(const char *file)
  1878. {
  1879. StringBuffer scopedName;
  1880. queryThorFileManager().addScope(*this, file, scopedName, false);
  1881. return (NotFound != createdFiles.find(scopedName.str()));
  1882. }
  1883. void CJobMaster::addCreatedFile(const char *file)
  1884. {
  1885. StringBuffer scopedName;
  1886. queryThorFileManager().addScope(*this, file, scopedName, false);
  1887. createdFiles.append(scopedName.str());
  1888. }
  1889. __int64 CJobMaster::getWorkUnitValueInt(const char *prop, __int64 defVal) const
  1890. {
  1891. return queryWorkUnit().getDebugValueInt64(prop, defVal);
  1892. }
  1893. bool CJobMaster::getWorkUnitValueBool(const char *prop, bool defVal) const
  1894. {
  1895. return queryWorkUnit().getDebugValueBool(prop, defVal);
  1896. }
  1897. StringBuffer &CJobMaster::getWorkUnitValue(const char *prop, StringBuffer &str) const
  1898. {
  1899. SCMStringBuffer scmStr;
  1900. queryWorkUnit().getDebugValue(prop, scmStr);
  1901. return str.append(scmStr.str());
  1902. }
  1903. IGraphTempHandler *CJobMaster::createTempHandler(bool errorOnMissing)
  1904. {
  1905. return new CMasterGraphTempHandler(*this, errorOnMissing);
  1906. }
  1907. bool CJobMaster::fireException(IException *e)
  1908. {
  1909. IThorException *te = QUERYINTERFACE(e, IThorException);
  1910. ThorExceptionAction action;
  1911. if (!te) action = tea_null;
  1912. else action = te->queryAction();
  1913. if (QUERYINTERFACE(e, IMP_Exception) && MPERR_link_closed==e->errorCode())
  1914. action = tea_shutdown;
  1915. else if (QUERYINTERFACE(e, ISEH_Exception))
  1916. action = tea_shutdown;
  1917. CriticalBlock b(exceptCrit);
  1918. switch (action)
  1919. {
  1920. case tea_warning:
  1921. {
  1922. LOG(MCwarning, thorJob, e);
  1923. reportExceptionToWorkunitCheckIgnore(*workunit, e);
  1924. break;
  1925. }
  1926. default:
  1927. {
  1928. LOG(MCerror, thorJob, e);
  1929. queryJobManager().replyException(*this, e);
  1930. fatalHandler->inform(LINK(e));
  1931. try { abort(e); }
  1932. catch (IException *e)
  1933. {
  1934. Owned<IThorException> te = ThorWrapException(e, "Error aborting job, will cause thor restart");
  1935. e->Release();
  1936. reportExceptionToWorkunit(*workunit, te);
  1937. action = tea_shutdown;
  1938. }
  1939. if (tea_shutdown == action)
  1940. queryJobManager().stop();
  1941. }
  1942. }
  1943. return true;
  1944. }
  1945. IFatalHandler *CJobMaster::clearFatalHandler()
  1946. {
  1947. return fatalHandler.getClear();
  1948. }
  1949. // CJobMasterChannel
  1950. CJobMasterChannel::CJobMasterChannel(CJobBase &job, IMPServer *mpServer, unsigned channel) : CJobChannel(job, mpServer, channel)
  1951. {
  1952. codeCtx.setown(new CThorCodeContextMaster(*this, job.queryWorkUnit(), job.queryDllEntry(), *job.queryUserDescriptor()));
  1953. sharedMemCodeCtx.setown(new CThorCodeContextMasterSharedMem(*this, job.querySharedAllocator(), job.queryWorkUnit(), job.queryDllEntry(), *job.queryUserDescriptor()));
  1954. }
  1955. CGraphBase *CJobMasterChannel::createGraph()
  1956. {
  1957. return new CMasterGraph(*this);
  1958. }
  1959. IBarrier *CJobMasterChannel::createBarrier(mptag_t tag)
  1960. {
  1961. return new CBarrierMaster(*jobComm, tag);
  1962. }
  1963. ///////////////////
  1964. class CCollatedResult : implements IThorResult, public CSimpleInterface
  1965. {
  1966. CMasterGraph &graph;
  1967. CActivityBase &activity;
  1968. IThorRowInterfaces *rowIf;
  1969. unsigned id;
  1970. CriticalSection crit;
  1971. PointerArrayOf<CThorExpandingRowArray> results;
  1972. Owned<IThorResult> result;
  1973. unsigned spillPriority;
  1974. activity_id ownerId;
  1975. void ensure()
  1976. {
  1977. CriticalBlock b(crit);
  1978. if (result)
  1979. return;
  1980. mptag_t replyTag = graph.queryMPServer().createReplyTag();
  1981. CMessageBuffer msg;
  1982. msg.append(GraphGetResult);
  1983. msg.append(activity.queryJob().queryKey());
  1984. msg.append(graph.queryGraphId());
  1985. msg.append(ownerId);
  1986. msg.append(id);
  1987. msg.append(replyTag);
  1988. ((CJobMaster &)graph.queryJob()).broadcast(queryNodeComm(), msg, masterSlaveMpTag, LONGTIMEOUT, "CCollectResult", NULL, true);
  1989. unsigned numSlaves = graph.queryJob().querySlaves();
  1990. for (unsigned n=0; n<numSlaves; n++)
  1991. results.item(n)->kill();
  1992. rank_t sender;
  1993. MemoryBuffer mb;
  1994. Owned<ISerialStream> stream = createMemoryBufferSerialStream(mb);
  1995. CThorStreamDeserializerSource rowSource(stream);
  1996. unsigned todo = numSlaves;
  1997. for (;;)
  1998. {
  1999. for (;;)
  2000. {
  2001. if (activity.queryAbortSoon())
  2002. return;
  2003. msg.clear();
  2004. if (activity.receiveMsg(msg, RANK_ALL, replyTag, &sender, 60*1000))
  2005. break;
  2006. ActPrintLog(&activity, "WARNING: tag %d timedout, retrying", (unsigned)replyTag);
  2007. }
  2008. sender = sender - 1; // 0 = master
  2009. if (!msg.length())
  2010. {
  2011. --todo;
  2012. if (0 == todo)
  2013. break; // done
  2014. }
  2015. else
  2016. {
  2017. bool error;
  2018. msg.read(error);
  2019. if (error)
  2020. {
  2021. Owned<IThorException> e = deserializeThorException(msg);
  2022. e->setSlave(sender);
  2023. throw e.getClear();
  2024. }
  2025. ThorExpand(msg, mb.clear());
  2026. CThorExpandingRowArray *slaveResults = results.item(sender);
  2027. while (!rowSource.eos())
  2028. {
  2029. RtlDynamicRowBuilder rowBuilder(rowIf->queryRowAllocator());
  2030. size32_t sz = rowIf->queryRowDeserializer()->deserialize(rowBuilder, rowSource);
  2031. slaveResults->append(rowBuilder.finalizeRowClear(sz));
  2032. }
  2033. }
  2034. }
  2035. Owned<IThorResult> _result = ::createResult(activity, rowIf, thorgraphresult_nul, spillPriority);
  2036. Owned<IRowWriter> resultWriter = _result->getWriter();
  2037. for (unsigned s=0; s<numSlaves; s++)
  2038. {
  2039. CThorExpandingRowArray *slaveResult = results.item(s);
  2040. ForEachItemIn(r, *slaveResult)
  2041. {
  2042. const void *row = slaveResult->query(r);
  2043. LinkThorRow(row);
  2044. resultWriter->putRow(row);
  2045. }
  2046. }
  2047. result.setown(_result.getClear());
  2048. }
  2049. public:
  2050. IMPLEMENT_IINTERFACE_USING(CSimpleInterface);
  2051. CCollatedResult(CMasterGraph &_graph, CActivityBase &_activity, IThorRowInterfaces *_rowIf, unsigned _id, activity_id _ownerId, unsigned _spillPriority)
  2052. : graph(_graph), activity(_activity), rowIf(_rowIf), id(_id), ownerId(_ownerId), spillPriority(_spillPriority)
  2053. {
  2054. for (unsigned n=0; n<graph.queryJob().querySlaves(); n++)
  2055. results.append(new CThorExpandingRowArray(activity, rowIf));
  2056. }
  2057. ~CCollatedResult()
  2058. {
  2059. ForEachItemIn(l, results)
  2060. {
  2061. CThorExpandingRowArray *result = results.item(l);
  2062. delete result;
  2063. }
  2064. }
  2065. void setId(unsigned _id)
  2066. {
  2067. id = _id;
  2068. }
  2069. // IThorResult
  2070. virtual IRowWriter *getWriter() { throwUnexpected(); }
  2071. virtual void setResultStream(IRowWriterMultiReader *stream, rowcount_t count)
  2072. {
  2073. throwUnexpected();
  2074. }
  2075. virtual IRowStream *getRowStream()
  2076. {
  2077. ensure();
  2078. return result->getRowStream();
  2079. }
  2080. virtual IThorRowInterfaces *queryRowInterfaces()
  2081. {
  2082. return rowIf;
  2083. }
  2084. virtual CActivityBase *queryActivity()
  2085. {
  2086. return &activity;
  2087. }
  2088. virtual bool isDistributed() const { return false; }
  2089. virtual void serialize(MemoryBuffer &mb)
  2090. {
  2091. ensure();
  2092. result->serialize(mb);
  2093. }
  2094. virtual void getLinkedResult(unsigned & count, const byte * * & ret) override
  2095. {
  2096. ensure();
  2097. result->getLinkedResult(count, ret);
  2098. }
  2099. virtual const void * getLinkedRowResult()
  2100. {
  2101. ensure();
  2102. return result->getLinkedRowResult();
  2103. }
  2104. };
  2105. ///////////////////
  2106. //
  2107. // CMasterGraph impl.
  2108. //
  2109. CMasterGraph::CMasterGraph(CJobChannel &jobChannel) : CGraphBase(jobChannel), graphStats(graphStatistics)
  2110. {
  2111. jobM = (CJobMaster *)&jobChannel.queryJob();
  2112. mpTag = queryJob().allocateMPTag();
  2113. startBarrierTag = queryJob().allocateMPTag();
  2114. waitBarrierTag = queryJob().allocateMPTag();
  2115. startBarrier = jobChannel.createBarrier(startBarrierTag);
  2116. waitBarrier = jobChannel.createBarrier(waitBarrierTag);
  2117. }
  2118. CMasterGraph::~CMasterGraph()
  2119. {
  2120. queryJob().freeMPTag(mpTag);
  2121. queryJob().freeMPTag(startBarrierTag);
  2122. queryJob().freeMPTag(waitBarrierTag);
  2123. if (TAG_NULL != doneBarrierTag)
  2124. queryJob().freeMPTag(doneBarrierTag);
  2125. if (TAG_NULL != executeReplyTag)
  2126. queryJob().freeMPTag(executeReplyTag);
  2127. }
  2128. void CMasterGraph::init()
  2129. {
  2130. CGraphBase::init();
  2131. if (queryOwner() && isGlobal())
  2132. {
  2133. doneBarrierTag = queryJob().allocateMPTag();
  2134. doneBarrier = jobChannel.createBarrier(doneBarrierTag);
  2135. }
  2136. }
  2137. bool CMasterGraph::fireException(IException *e)
  2138. {
  2139. IThorException *te = QUERYINTERFACE(e, IThorException);
  2140. ThorExceptionAction action;
  2141. if (!te) action = tea_null;
  2142. else action = te->queryAction();
  2143. if (QUERYINTERFACE(e, IMP_Exception) && MPERR_link_closed==e->errorCode())
  2144. action = tea_shutdown;
  2145. else if (QUERYINTERFACE(e, ISEH_Exception))
  2146. action = tea_shutdown;
  2147. CriticalBlock b(exceptCrit);
  2148. switch (action)
  2149. {
  2150. case tea_warning:
  2151. {
  2152. LOG(MCwarning, thorJob, e);
  2153. reportExceptionToWorkunitCheckIgnore(job.queryWorkUnit(), e);
  2154. break;
  2155. }
  2156. default:
  2157. {
  2158. LOG(MCerror, thorJob, e);
  2159. if (NULL != fatalHandler)
  2160. fatalHandler->inform(LINK(e));
  2161. if (owner)
  2162. owner->fireException(e);
  2163. else
  2164. queryJobChannel().fireException(e);
  2165. }
  2166. }
  2167. return true;
  2168. }
  2169. void CMasterGraph::reset()
  2170. {
  2171. CGraphBase::reset();
  2172. bcastMsgHandler.reset();
  2173. activityInitMsgHandler.reset();
  2174. executeReplyMsgHandler.reset();
  2175. }
  2176. void CMasterGraph::abort(IException *e)
  2177. {
  2178. if (aborted) return;
  2179. bool _graphDone = graphDone; // aborting master activities can trigger master graphDone, but want to fire GraphAbort to slaves if graphDone=false at start.
  2180. bool dumpInfo = TE_WorkUnitAbortingDumpInfo == e->errorCode() || job.getOptBool("dumpInfoOnAbort");
  2181. if (dumpInfo)
  2182. {
  2183. StringBuffer dumpInfoCmd;
  2184. checkAndDumpAbortInfo(queryJob().getOpt("dumpInfoCmd", dumpInfoCmd));
  2185. }
  2186. try { CGraphBase::abort(e); }
  2187. catch (IException *e)
  2188. {
  2189. GraphPrintLog(e, "Aborting master graph");
  2190. e->Release();
  2191. }
  2192. bcastMsgHandler.cancel(0);
  2193. activityInitMsgHandler.cancel(RANK_ALL);
  2194. executeReplyMsgHandler.cancel(RANK_ALL);
  2195. if (started && !_graphDone)
  2196. {
  2197. try
  2198. {
  2199. CMessageBuffer msg;
  2200. msg.append(GraphAbort);
  2201. msg.append(job.queryKey());
  2202. msg.append(dumpInfo);
  2203. msg.append(queryGraphId());
  2204. jobM->broadcast(queryNodeComm(), msg, masterSlaveMpTag, LONGTIMEOUT, "abort");
  2205. }
  2206. catch (IException *e)
  2207. {
  2208. GraphPrintLog(e, "Aborting slave graph");
  2209. if (abortException)
  2210. {
  2211. e->Release();
  2212. throw LINK(abortException);
  2213. }
  2214. throw;
  2215. }
  2216. }
  2217. if (!queryOwner())
  2218. {
  2219. if (globals->getPropBool("@watchdogProgressEnabled"))
  2220. queryJobManager().queryDeMonServer()->endGraph(this, !queryAborted());
  2221. }
  2222. }
  2223. bool CMasterGraph::serializeActivityInitData(unsigned slave, MemoryBuffer &mb, IThorActivityIterator &iter)
  2224. {
  2225. CriticalBlock b(createdCrit);
  2226. DelayedSizeMarker sizeMark1(mb);
  2227. ForEach (iter)
  2228. {
  2229. CMasterGraphElement &element = (CMasterGraphElement &)iter.query();
  2230. if (!element.sentActInitData->testSet(slave))
  2231. {
  2232. CMasterActivity *activity = (CMasterActivity *)element.queryActivity();
  2233. if (activity)
  2234. {
  2235. mb.append(element.queryId());
  2236. DelayedSizeMarker sizeMark2(mb);
  2237. activity->serializeSlaveData(mb, slave);
  2238. sizeMark2.write();
  2239. }
  2240. }
  2241. }
  2242. if (0 == sizeMark1.size())
  2243. return false;
  2244. mb.append((activity_id)0); // terminator
  2245. sizeMark1.write();
  2246. return true;
  2247. }
  2248. void CMasterGraph::readActivityInitData(MemoryBuffer &mb, unsigned slave)
  2249. {
  2250. for (;;)
  2251. {
  2252. activity_id id;
  2253. mb.read(id);
  2254. if (0 == id)
  2255. break;
  2256. size32_t dataLen;
  2257. mb.read(dataLen);
  2258. CGraphElementBase *element = queryElement(id);
  2259. MemoryBuffer &mbDst = element->queryActivity()->queryInitializationData(slave);
  2260. mbDst.append(dataLen, mb.readDirect(dataLen));
  2261. }
  2262. }
  2263. bool CMasterGraph::prepare(size32_t parentExtractSz, const byte *parentExtract, bool checkDependencies, bool shortCircuit, bool async)
  2264. {
  2265. if (!CGraphBase::prepare(parentExtractSz, parentExtract, checkDependencies, shortCircuit, async)) return false;
  2266. if (aborted) return false;
  2267. return true;
  2268. }
  2269. void CMasterGraph::execute(size32_t _parentExtractSz, const byte *parentExtract, bool checkDependencies, bool async)
  2270. {
  2271. if (isComplete())
  2272. return;
  2273. if (!queryOwner()) // owning graph sends query+child graphs
  2274. jobM->sendQuery(); // if not previously sent
  2275. CGraphBase::execute(parentExtractSz, parentExtract, checkDependencies, async);
  2276. }
  2277. void CMasterGraph::start()
  2278. {
  2279. if (!queryOwner())
  2280. {
  2281. if (globals->getPropBool("@watchdogProgressEnabled"))
  2282. queryJobManager().queryDeMonServer()->startGraph(this);
  2283. }
  2284. Owned<IThorActivityIterator> iter = getConnectedIterator();
  2285. ForEach (*iter)
  2286. iter->query().queryActivity()->startProcess();
  2287. }
  2288. void CMasterGraph::sendActivityInitData()
  2289. {
  2290. CMessageBuffer msg;
  2291. mptag_t replyTag = queryJobChannel().queryMPServer().createReplyTag();
  2292. msg.setReplyTag(replyTag);
  2293. unsigned pos = msg.length();
  2294. unsigned w=0;
  2295. unsigned sentTo = 0;
  2296. Owned<IThorActivityIterator> iter = getConnectedIterator();
  2297. for (; w<queryJob().querySlaves(); w++)
  2298. {
  2299. unsigned needActInit = 0;
  2300. ForEach(*iter)
  2301. {
  2302. CGraphElementBase &element = iter->query();
  2303. CActivityBase *activity = element.queryActivity();
  2304. if (activity && activity->needReInit())
  2305. element.sentActInitData->set(w, false); // force act init to be resent
  2306. if (!element.sentActInitData->test(w)) // has it been sent
  2307. ++needActInit;
  2308. }
  2309. if (needActInit)
  2310. {
  2311. try
  2312. {
  2313. msg.rewrite(pos);
  2314. serializeActivityInitData(w, msg, *iter);
  2315. }
  2316. catch (IException *e)
  2317. {
  2318. GraphPrintLog(e);
  2319. throw;
  2320. }
  2321. if (!queryJobChannel().queryJobComm().send(msg, w+1, mpTag, LONGTIMEOUT))
  2322. {
  2323. StringBuffer epStr;
  2324. throw MakeStringException(0, "Timeout sending to slave %s", job.querySlaveGroup().queryNode(w).endpoint().getUrlStr(epStr).str());
  2325. }
  2326. ++sentTo;
  2327. }
  2328. }
  2329. if (sentTo)
  2330. {
  2331. assertex(sentTo == queryJob().querySlaves());
  2332. w=0;
  2333. Owned<IException> e;
  2334. // now get back initialization data from graph tag
  2335. for (; w<queryJob().querySlaves(); w++)
  2336. {
  2337. rank_t sender;
  2338. msg.clear();
  2339. if (!activityInitMsgHandler.recv(queryJobChannel().queryJobComm(), msg, w+1, replyTag, &sender, LONGTIMEOUT))
  2340. throw MakeGraphException(this, 0, "Timeout receiving from slaves after graph sent");
  2341. bool error;
  2342. msg.read(error);
  2343. if (error)
  2344. {
  2345. Owned<IThorException> se = deserializeThorException(msg);
  2346. se->setSlave(sender);
  2347. if (!e.get())
  2348. {
  2349. StringBuffer tmpStr("Slave ");
  2350. queryJob().queryJobGroup().queryNode(sender).endpoint().getUrlStr(tmpStr);
  2351. GraphPrintLog(se, "%s", tmpStr.append(": slave initialization error").str());
  2352. e.setown(se.getClear());
  2353. }
  2354. continue; // to read other slave responses.
  2355. }
  2356. readActivityInitData(msg, w);
  2357. }
  2358. if (e.get())
  2359. throw LINK(e);
  2360. }
  2361. }
  2362. void CMasterGraph::serializeGraphInit(MemoryBuffer &mb)
  2363. {
  2364. mb.append(graphId);
  2365. serializeMPtag(mb, mpTag);
  2366. mb.append((int)startBarrierTag);
  2367. mb.append((int)waitBarrierTag);
  2368. mb.append((int)doneBarrierTag);
  2369. sourceActDependents->serialize(mb);
  2370. mb.append(queryChildGraphCount());
  2371. Owned<IThorGraphIterator> childIter = getChildGraphIterator();
  2372. ForEach (*childIter)
  2373. {
  2374. CMasterGraph &childGraph = (CMasterGraph &)childIter->query();
  2375. childGraph.serializeGraphInit(mb);
  2376. }
  2377. }
  2378. // IThorChildGraph impl.
  2379. IEclGraphResults *CMasterGraph::evaluate(unsigned _parentExtractSz, const byte *parentExtract)
  2380. {
  2381. throw MakeGraphException(this, 0, "Thor master does not support the execution of child queries");
  2382. }
  2383. void CMasterGraph::executeSubGraph(size32_t parentExtractSz, const byte *parentExtract)
  2384. {
  2385. if (job.queryResumed()) // skip complete subgraph if resuming. NB: temp spill have been tucked away for this purpose when paused.
  2386. {
  2387. if (!queryOwner())
  2388. {
  2389. if (WUGraphComplete == job.queryWorkUnit().queryNodeState(job.queryGraphName(), graphId))
  2390. setCompleteEx();
  2391. }
  2392. }
  2393. if (isComplete())
  2394. return;
  2395. if (!sentGlobalInit)
  2396. {
  2397. sentGlobalInit = true;
  2398. if (!queryOwner())
  2399. sendGraph();
  2400. else
  2401. {
  2402. if (isGlobal())
  2403. {
  2404. CMessageBuffer msg;
  2405. serializeCreateContexts(msg);
  2406. try
  2407. {
  2408. jobM->broadcast(queryJobChannel().queryJobComm(), msg, mpTag, LONGTIMEOUT, "serializeCreateContexts", &bcastMsgHandler);
  2409. }
  2410. catch (IException *e)
  2411. {
  2412. GraphPrintLog(e, "Aborting graph create(2)");
  2413. if (abortException)
  2414. {
  2415. e->Release();
  2416. throw LINK(abortException);
  2417. }
  2418. throw;
  2419. }
  2420. }
  2421. }
  2422. }
  2423. fatalHandler.clear();
  2424. fatalHandler.setown(new CFatalHandler(globals->getPropInt("@fatal_timeout", FATAL_TIMEOUT)));
  2425. CGraphBase::executeSubGraph(parentExtractSz, parentExtract);
  2426. if (TAG_NULL != executeReplyTag)
  2427. {
  2428. rank_t sender;
  2429. unsigned s=0;
  2430. for (; s<queryJob().querySlaves(); s++)
  2431. {
  2432. CMessageBuffer msg;
  2433. if (!executeReplyMsgHandler.recv(queryJobChannel().queryJobComm(), msg, RANK_ALL, executeReplyTag, &sender))
  2434. break;
  2435. bool error;
  2436. msg.read(error);
  2437. if (error)
  2438. {
  2439. Owned<IThorException> exception = deserializeThorException(msg);
  2440. exception->setSlave(sender);
  2441. GraphPrintLog(exception, "slave execute reply exception");
  2442. throw exception.getClear();
  2443. }
  2444. }
  2445. if (fatalHandler)
  2446. fatalHandler->clear();
  2447. }
  2448. fatalHandler.clear();
  2449. #ifndef _CONTAINERIZED
  2450. Owned<IWorkUnit> wu = &job.queryWorkUnit().lock();
  2451. queryJobManager().updateWorkUnitLog(*wu);
  2452. #endif
  2453. }
  2454. void CMasterGraph::sendGraph()
  2455. {
  2456. CTimeMon atimer;
  2457. CMessageBuffer msg;
  2458. msg.append(GraphInit);
  2459. msg.append(job.queryKey());
  2460. if (TAG_NULL == executeReplyTag)
  2461. executeReplyTag = jobM->allocateMPTag();
  2462. serializeMPtag(msg, executeReplyTag);
  2463. serializeCreateContexts(msg);
  2464. serializeGraphInit(msg);
  2465. // slave graph data
  2466. try
  2467. {
  2468. jobM->broadcast(queryNodeComm(), msg, masterSlaveMpTag, LONGTIMEOUT, "sendGraph", &bcastMsgHandler);
  2469. }
  2470. catch (IException *e)
  2471. {
  2472. GraphPrintLog(e, "Aborting sendGraph");
  2473. if (abortException)
  2474. {
  2475. e->Release();
  2476. throw LINK(abortException);
  2477. }
  2478. throw;
  2479. }
  2480. GraphPrintLog("sendGraph took %d ms", atimer.elapsed());
  2481. }
  2482. bool CMasterGraph::preStart(size32_t parentExtractSz, const byte *parentExtract)
  2483. {
  2484. GraphPrintLog("Processing graph");
  2485. if (syncInitData())
  2486. {
  2487. sendActivityInitData(); // has to be done at least once
  2488. // NB: At this point, on the slaves, the graphs will start
  2489. }
  2490. CGraphBase::preStart(parentExtractSz, parentExtract);
  2491. if (isGlobal())
  2492. {
  2493. if (!startBarrier->wait(true)) // ensure all graphs are at start point at same time, as some may request data from each other.
  2494. return false;
  2495. }
  2496. if (!queryOwner())
  2497. job.queryWorkUnit().setNodeState(job.queryGraphName(), graphId, WUGraphRunning);
  2498. return true;
  2499. }
  2500. void CMasterGraph::handleSlaveDone(unsigned node, MemoryBuffer &mb)
  2501. {
  2502. graph_id gid;
  2503. mb.read(gid);
  2504. assertex(gid == queryGraphId());
  2505. unsigned count;
  2506. mb.read(count);
  2507. while (count--)
  2508. {
  2509. activity_id activityId;
  2510. mb.read(activityId);
  2511. CMasterGraphElement *act = (CMasterGraphElement *)queryElement(activityId);
  2512. unsigned len;
  2513. mb.read(len);
  2514. const void *d = mb.readDirect(len);
  2515. MemoryBuffer sdMb;
  2516. sdMb.setBuffer(len, (void *)d);
  2517. act->slaveDone(node, sdMb);
  2518. }
  2519. }
  2520. void CMasterGraph::getFinalProgress()
  2521. {
  2522. CMessageBuffer msg;
  2523. mptag_t replyTag = queryJobChannel().queryMPServer().createReplyTag();
  2524. msg.setReplyTag(replyTag);
  2525. msg.append((unsigned)GraphEnd);
  2526. msg.append(job.queryKey());
  2527. msg.append(queryGraphId());
  2528. jobM->broadcast(queryNodeComm(), msg, masterSlaveMpTag, LONGTIMEOUT, "graphEnd", NULL, true);
  2529. Owned<IBitSet> respondedBitSet = createBitSet();
  2530. unsigned n=queryJob().queryNodes();
  2531. while (n--)
  2532. {
  2533. rank_t sender;
  2534. if (!queryNodeComm().recv(msg, RANK_ALL, replyTag, &sender, LONGTIMEOUT))
  2535. {
  2536. StringBuffer slaveList;
  2537. n=queryJob().queryNodes();
  2538. unsigned s = 0;
  2539. for (;;)
  2540. {
  2541. unsigned ns = respondedBitSet->scan(s, true); // scan for next respondent
  2542. // list all non-respondents up to this found respondent (or rest of slaves)
  2543. while (s<ns && s<n) // NB: if scan find nothing else set, ns = (unsigned)-1
  2544. {
  2545. if (slaveList.length())
  2546. slaveList.append(",");
  2547. ++s; // inc. 1st as slaveList is list of slaves that are 1 based.
  2548. slaveList.append(s);
  2549. }
  2550. ++s;
  2551. if (s>=n)
  2552. break;
  2553. }
  2554. throw MakeGraphException(this, 0, "Timeout receiving final progress from slaves - these slaves failed to respond: %s", slaveList.str());
  2555. }
  2556. bool error;
  2557. msg.read(error);
  2558. if (error)
  2559. {
  2560. Owned<IThorException> e = deserializeThorException(msg);
  2561. e->setSlave(sender);
  2562. throw e.getClear();
  2563. }
  2564. respondedBitSet->set(((unsigned)sender)-1);
  2565. if (0 == msg.remaining())
  2566. continue;
  2567. unsigned channelsPerSlave = globals->getPropInt("@channelsPerSlave", 1); // JCSMORE - should move somewhere common
  2568. for (unsigned sc=0; sc<channelsPerSlave; sc++)
  2569. {
  2570. unsigned slave;
  2571. msg.read(slave);
  2572. handleSlaveDone(slave, msg);
  2573. if (!queryOwner())
  2574. {
  2575. if (globals->getPropBool("@watchdogProgressEnabled"))
  2576. {
  2577. try
  2578. {
  2579. size32_t progressLen;
  2580. msg.read(progressLen);
  2581. MemoryBuffer progressData;
  2582. progressData.setBuffer(progressLen, (void *)msg.readDirect(progressLen));
  2583. queryJobManager().queryDeMonServer()->takeHeartBeat(progressData);
  2584. }
  2585. catch (IException *e)
  2586. {
  2587. GraphPrintLog(e, "Failure whilst deserializing stats/progress");
  2588. e->Release();
  2589. }
  2590. }
  2591. }
  2592. }
  2593. }
  2594. }
  2595. void CMasterGraph::done()
  2596. {
  2597. if (started)
  2598. {
  2599. if (!aborted && (!queryOwner() || isGlobal()))
  2600. getFinalProgress(); // waiting for slave graph to finish and send final progress update + extra act end info.
  2601. }
  2602. CGraphBase::done();
  2603. if (started && !queryOwner())
  2604. {
  2605. if (globals->getPropBool("@watchdogProgressEnabled"))
  2606. queryJobManager().queryDeMonServer()->endGraph(this, true);
  2607. }
  2608. }
  2609. void CMasterGraph::setComplete(bool tf)
  2610. {
  2611. CGraphBase::setComplete(tf);
  2612. if (tf && !queryOwner())
  2613. {
  2614. job.queryWorkUnit().setNodeState(job.queryGraphName(), graphId, graphDone?WUGraphComplete:WUGraphFailed);
  2615. }
  2616. }
  2617. bool CMasterGraph::deserializeStats(unsigned node, MemoryBuffer &mb)
  2618. {
  2619. CriticalBlock b(createdCrit);
  2620. graphStats.deserialize(node, mb);
  2621. unsigned count;
  2622. mb.read(count);
  2623. if (count)
  2624. setProgressUpdated();
  2625. while (count--)
  2626. {
  2627. activity_id activityId;
  2628. mb.read(activityId);
  2629. CMasterActivity *activity = NULL;
  2630. CMasterGraphElement *element = (CMasterGraphElement *)queryElement(activityId);
  2631. if (element)
  2632. {
  2633. activity = (CMasterActivity *)element->queryActivity();
  2634. if (!activity)
  2635. {
  2636. CGraphBase *parentGraph = element->queryOwner().queryOwner(); // i.e. am I in a childgraph
  2637. if (!parentGraph)
  2638. {
  2639. GraphPrintLog("Activity id=%" ACTPF "d not created in master and not a child query activity", activityId);
  2640. return false; // don't know if or how this could happen, but all bets off with packet if did.
  2641. }
  2642. Owned<IException> e;
  2643. try
  2644. {
  2645. element->createActivity();
  2646. activity = (CMasterActivity *)element->queryActivity();
  2647. }
  2648. catch (IException *_e)
  2649. {
  2650. e.setown(_e);
  2651. GraphPrintLog(_e, "In deserializeStats");
  2652. }
  2653. if (!activity || e.get())
  2654. {
  2655. GraphPrintLog("Activity id=%" ACTPF "d failed to created child query activity ready for progress", activityId);
  2656. return false;
  2657. }
  2658. }
  2659. if (activity)
  2660. activity->deserializeStats(node, mb);
  2661. }
  2662. else
  2663. {
  2664. GraphPrintLog("Failed to find activity, during progress deserialization, id=%" ACTPF "d", activityId);
  2665. return false; // don't know if or how this could happen, but all bets off with packet if did.
  2666. }
  2667. }
  2668. unsigned subs;
  2669. mb.read(subs);
  2670. while (subs--)
  2671. {
  2672. graph_id subId;
  2673. mb.read(subId);
  2674. Owned<CMasterGraph> graph = (CMasterGraph *)job.queryJobChannel(0).getGraph(subId);
  2675. if (NULL == graph.get())
  2676. return false;
  2677. if (!graph->deserializeStats(node, mb))
  2678. return false;
  2679. }
  2680. return true;
  2681. }
  2682. void CMasterGraph::getStats(IStatisticGatherer &stats)
  2683. {
  2684. stats.addStatistic(StNumSlaves, queryClusterWidth());
  2685. // graph specific stats
  2686. graphStats.getStats(stats);
  2687. Owned<IThorActivityIterator> iter;
  2688. if (queryOwner() && !isGlobal())
  2689. iter.setown(getIterator()); // Local child graphs still send progress, but aren't connected in master
  2690. else
  2691. iter.setown(getConnectedIterator());
  2692. ForEach (*iter)
  2693. {
  2694. CMasterGraphElement &container = (CMasterGraphElement &)iter->query();
  2695. CMasterActivity *activity = (CMasterActivity *)container.queryActivity();
  2696. if (activity) // may not be created (if within child query)
  2697. {
  2698. activity_id id = container.queryId();
  2699. unsigned outputs = container.getOutputs();
  2700. unsigned oid = 0;
  2701. for (; oid < outputs; oid++)
  2702. {
  2703. StatsEdgeScope edgeScope(stats, id, oid);
  2704. activity->getEdgeStats(stats, oid); // for subgraph, may recursively call reportGraph
  2705. }
  2706. StatsActivityScope scope(stats, id);
  2707. activity->getActivityStats(stats);
  2708. }
  2709. }
  2710. }
  2711. cost_type CMasterGraph::getDiskAccessCost()
  2712. {
  2713. cost_type totalDiskAccessCost = 0;
  2714. Owned<IThorGraphIterator> iterChildGraph = getChildGraphIterator();
  2715. ForEach(*iterChildGraph)
  2716. {
  2717. CGraphBase &graph = iterChildGraph->query();
  2718. totalDiskAccessCost += graph.getDiskAccessCost();
  2719. }
  2720. Owned<IThorActivityIterator> iter;
  2721. if (queryOwner() && !isGlobal())
  2722. iter.setown(getIterator()); // Local child graphs still send progress, but aren't connected in master
  2723. else
  2724. iter.setown(getConnectedIterator());
  2725. ForEach (*iter)
  2726. {
  2727. CMasterGraphElement &container = (CMasterGraphElement &)iter->query();
  2728. CMasterActivity *activity = (CMasterActivity *)container.queryActivity();
  2729. if (activity) // may not be created (if within child query)
  2730. totalDiskAccessCost += activity->getDiskAccessCost();
  2731. }
  2732. return totalDiskAccessCost;
  2733. }
  2734. IThorResult *CMasterGraph::createResult(CActivityBase &activity, unsigned id, IThorGraphResults *results, IThorRowInterfaces *rowIf, ThorGraphResultType resultType, unsigned spillPriority)
  2735. {
  2736. Owned<CCollatedResult> result = new CCollatedResult(*this, activity, rowIf, id, results->queryOwnerId(), spillPriority);
  2737. results->setResult(id, result);
  2738. return result;
  2739. }
  2740. IThorResult *CMasterGraph::createResult(CActivityBase &activity, unsigned id, IThorRowInterfaces *rowIf, ThorGraphResultType resultType, unsigned spillPriority)
  2741. {
  2742. Owned<CCollatedResult> result = new CCollatedResult(*this, activity, rowIf, id, localResults->queryOwnerId(), spillPriority);
  2743. localResults->setResult(id, result);
  2744. return result;
  2745. }
  2746. IThorResult *CMasterGraph::createGraphLoopResult(CActivityBase &activity, IThorRowInterfaces *rowIf, ThorGraphResultType resultType, unsigned spillPriority)
  2747. {
  2748. Owned<CCollatedResult> result = new CCollatedResult(*this, activity, rowIf, 0, localResults->queryOwnerId(), spillPriority);
  2749. unsigned id = graphLoopResults->addResult(result);
  2750. result->setId(id);
  2751. return result;
  2752. }
  2753. ///////////////////////////////////////////////////
  2754. const StatisticsMapping CThorEdgeCollection::edgeStatsMapping({StNumRowsProcessed, StNumStarts, StNumStops});
  2755. void CThorEdgeCollection::set(unsigned node, unsigned __int64 value)
  2756. {
  2757. unsigned __int64 numRows = value & THORDATALINK_COUNT_MASK;
  2758. byte stop = (value & THORDATALINK_STOPPED) ? 1 : 0;
  2759. byte start = stop || (value & THORDATALINK_STARTED) ? 1 : 0; // start implied if stopped, keeping previous semantics
  2760. setStatistic(node, StNumRowsProcessed, numRows);
  2761. setStatistic(node, StNumStops, stop);
  2762. setStatistic(node, StNumStarts, start);
  2763. }
  2764. ///////////////////////////////////////////////////
  2765. CJobMaster *createThorGraph(const char *graphName, IConstWorkUnit &workunit, ILoadedDllEntry *querySo, bool sendSo, const SocketEndpoint &agentEp)
  2766. {
  2767. Owned<CJobMaster> jobMaster = new CJobMaster(workunit, graphName, querySo, sendSo, agentEp);
  2768. IPropertyTree *graphXGMML = jobMaster->queryGraphXGMML();
  2769. Owned<IPropertyTreeIterator> iter = graphXGMML->getElements("node");
  2770. ForEach(*iter)
  2771. jobMaster->addSubGraph(iter->query());
  2772. jobMaster->addDependencies(graphXGMML);
  2773. return LINK(jobMaster);
  2774. }
  2775. static IJobManager *jobManager = NULL;
  2776. void setJobManager(IJobManager *_jobManager)
  2777. {
  2778. CriticalBlock b(*jobManagerCrit);
  2779. jobManager = _jobManager;
  2780. }
  2781. IJobManager *getJobManager()
  2782. {
  2783. CriticalBlock b(*jobManagerCrit);
  2784. return LINK(jobManager);
  2785. }
  2786. IJobManager &queryJobManager()
  2787. {
  2788. CriticalBlock b(*jobManagerCrit);
  2789. assertex(jobManager);
  2790. return *jobManager;
  2791. }