thgraphmanager.cpp 47 KB


  1. /*##############################################################################
  2. HPCC SYSTEMS software Copyright (C) 2012 HPCC Systems®.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. ############################################################################## */
  13. #include "platform.h"
  14. #include "jarray.hpp"
  15. #include "jfile.hpp"
  16. #include "jmutex.hpp"
  17. #include "jlog.hpp"
  18. #include "rmtfile.hpp"
  19. #include "portlist.h"
  20. #include "wujobq.hpp"
  21. #include "daclient.hpp"
  22. #include "thgraphmaster.ipp"
  23. #include "thorport.hpp"
  24. #include "thmem.hpp"
  25. #include "thmastermain.hpp"
  26. #include "thexception.hpp"
  27. #include "thcodectx.hpp"
  28. #include "daaudit.hpp"
  29. #include "dadfs.hpp"
  30. #include "dafdesc.hpp"
  31. #include "dautils.hpp"
  32. #include "dllserver.hpp"
  33. #include "eclhelper.hpp"
  34. #include "swapnodelib.hpp"
  35. #include "thactivitymaster.ipp"
  36. #include "thdemonserver.hpp"
  37. #include "thgraphmanager.hpp"
  38. #include "roxiehelper.hpp"
  39. #include "environment.hpp"
  40. class CJobManager : public CSimpleInterface, implements IJobManager, implements IExceptionHandler
  41. {
  42. bool stopped, handlingConversation;
  43. Owned<IConversation> conversation;
  44. StringAttr queueName;
  45. CriticalSection replyCrit, jobCrit;
  46. CFifoFileCache querySoCache;
  47. Owned<IJobQueue> jobq;
  48. ICopyArrayOf<CJobMaster> jobs;
  49. Owned<IException> exitException;
  50. Owned<IDeMonServer> demonServer;
  51. atomic_t activeTasks;
  52. StringAttr currentWuid;
  53. ILogMsgHandler *logHandler;
  54. CJobMaster *getCurrentJob() { CriticalBlock b(jobCrit); return jobs.ordinality() ? &OLINK(jobs.item(0)) : NULL; }
  55. bool executeGraph(IConstWorkUnit &workunit, const char *graphName, const SocketEndpoint &agentEp);
  56. void addJob(CJobMaster &job) { CriticalBlock b(jobCrit); jobs.append(job); }
  57. void removeJob(CJobMaster &job) { CriticalBlock b(jobCrit); jobs.zap(job); }
  58. class CThorDebugListener : public CSimpleInterfaceOf<IInterface>, implements IThreaded
  59. {
  60. protected:
  61. CThreaded threaded;
  62. unsigned port;
  63. Owned<ISocket> sock;
  64. CJobManager &mgr;
  65. private:
  66. volatile bool running;
  67. public:
  68. CThorDebugListener(CJobManager &_mgr) : threaded("CThorDebugListener", this), mgr(_mgr)
  69. {
  70. unsigned defaultThorDebugPort = getFixedPort(getMasterPortBase(), TPORT_debug);
  71. port = globals->getPropInt("DebugPort", defaultThorDebugPort);
  72. running = true;
  73. threaded.start();
  74. }
  75. ~CThorDebugListener()
  76. {
  77. running = false;
  78. if (sock)
  79. sock->cancel_accept();
  80. threaded.join();
  81. }
  82. virtual unsigned getPort() const { return port; }
  83. virtual void processDebugCommand(CSafeSocket &ssock, StringBuffer &rawText)
  84. {
  85. Owned<IPropertyTree> queryXml;
  86. try
  87. {
  88. queryXml.setown(createPTreeFromXMLString(rawText.str(), ipt_caseInsensitive, (PTreeReaderOptions)(ptr_ignoreWhiteSpace|ptr_ignoreNameSpaces)));
  89. }
  90. catch (IException *E)
  91. {
  92. StringBuffer s;
  93. IWARNLOG("processDebugCommand: Invalid XML received from %s:%s", E->errorMessage(s).str(), rawText.str());
  94. throw;
  95. }
  96. Linked<CJobMaster> job = mgr.getCurrentJob();
  97. if (!job)
  98. throw MakeStringException(5300, "Command not available when no job active");
  99. const char *graphId = job->queryGraphName();
  100. if (!graphId)
  101. throw MakeStringException(5300, "Command not available when no graph active");
  102. const char *command = queryXml->queryName();
  103. if (!command) throw MakeStringException(5300, "Invalid debug command");
  104. FlushingStringBuffer response(&ssock, false, MarkupFmt_XML, false, false, queryDummyContextLogger());
  105. response.startDataset("Debug", NULL, (unsigned) -1);
  106. if (strncmp(command,"print", 5) == 0)
  107. {
  108. const char *edgeId = queryXml->queryProp("@edgeId");
  109. if (!edgeId) throw MakeStringException(5300, "Debug command requires edgeId");
  110. ICommunicator &comm = job->queryNodeComm();
  111. CMessageBuffer mbuf;
  112. mbuf.append(DebugRequest);
  113. mbuf.append(job->queryKey());
  114. mptag_t replyTag = createReplyTag();
  115. serializeMPtag(mbuf, replyTag);
  116. mbuf.append(rawText);
  117. if (!comm.send(mbuf, RANK_ALL_OTHER, masterSlaveMpTag, MP_ASYNC_SEND))
  118. {
  119. DBGLOG("Failed to send debug info to slave");
  120. throwUnexpected();
  121. }
  122. unsigned nodes = job->queryNodes();
  123. response.appendf("<print graphId='%s' edgeId='%s'>", graphId, edgeId);
  124. while (nodes)
  125. {
  126. rank_t sender;
  127. mbuf.clear();
  128. comm.recv(mbuf, RANK_ALL, replyTag, &sender, 10000);
  129. while (mbuf.remaining())
  130. {
  131. StringAttr row;
  132. mbuf.read(row);
  133. response.append(row);
  134. }
  135. nodes--;
  136. }
  137. response.append("</print>");
  138. }
  139. else if (strncmp(command,"quit", 4) == 0)
  140. {
  141. LOG(MCwarning, thorJob, "ABORT detected from user during debug session");
  142. Owned<IException> e = MakeThorException(TE_WorkUnitAborting, "User signalled abort during debug session");
  143. job->fireException(e);
  144. response.appendf("<quit state='quit'/>");
  145. }
  146. else
  147. throw MakeStringException(5300, "Command not supported by Thor");
  148. response.flush(true);
  149. }
  150. virtual void threadmain() override
  151. {
  152. sock.setown(ISocket::create(port));
  153. while (running)
  154. {
  155. try
  156. {
  157. Owned<ISocket> client = sock->accept(true);
  158. if (client)
  159. {
  160. client->set_linger(-1);
  161. CSafeSocket ssock(client.getClear());
  162. StringBuffer rawText;
  163. IpAddress peer;
  164. bool continuationNeeded;
  165. bool isStatus;
  166. ssock.querySocket()->getPeerAddress(peer);
  167. DBGLOG("Reading debug command from socket...");
  168. if (!ssock.readBlocktms(rawText, WAIT_FOREVER, NULL, continuationNeeded, isStatus, 1024*1024))
  169. {
  170. DBGLOG("No data reading query from socket");
  171. continue;
  172. }
  173. assertex(!continuationNeeded);
  174. assertex(!isStatus);
  175. try
  176. {
  177. processDebugCommand(ssock,rawText);
  178. }
  179. catch (IException *E)
  180. {
  181. StringBuffer s;
  182. ssock.sendException("Thor", E->errorCode(), E->errorMessage(s), false, queryDummyContextLogger());
  183. E->Release();
  184. }
  185. // Write terminator
  186. unsigned replyLen = 0;
  187. ssock.write(&replyLen, sizeof(replyLen));
  188. }
  189. }
  190. catch (IException *E)
  191. {
  192. EXCLOG(E);
  193. E->Release();
  194. }
  195. catch (...)
  196. {
  197. DBGLOG("Unexpected exception in CThorDebugListener");
  198. }
  199. }
  200. }
  201. };
  202. Owned<CThorDebugListener> debugListener;
  203. public:
  204. IMPLEMENT_IINTERFACE_USING(CSimpleInterface);
  205. CJobManager(ILogMsgHandler *logHandler);
  206. ~CJobManager();
  207. bool doit(IConstWorkUnit *workunit, const char *graphName, const SocketEndpoint &agentep);
  208. void reply(IConstWorkUnit *workunit, const char *wuid, IException *e, const SocketEndpoint &agentep, bool allDone);
  209. void run();
  210. bool execute(IConstWorkUnit *workunit, const char *wuid, const char *graphName, const SocketEndpoint &agentep);
  211. IException *queryExitException() { return exitException; }
  212. // IExceptionHandler
  213. bool fireException(IException *e);
  214. // IJobManager
  215. virtual void stop();
  216. virtual void replyException(CJobMaster &job, IException *e);
  217. virtual void setWuid(const char *wuid, const char *cluster=NULL);
  218. virtual IDeMonServer *queryDeMonServer() { return demonServer; }
  219. virtual void fatal(IException *e);
  220. virtual void addCachedSo(const char *name);
  221. virtual void updateWorkUnitLog(IWorkUnit &workunit);
  222. };
  223. // CJobManager impl.
  224. CJobManager::CJobManager(ILogMsgHandler *_logHandler) : logHandler(_logHandler)
  225. {
  226. stopped = handlingConversation = false;
  227. addThreadExceptionHandler(this);
  228. if (globals->getPropBool("@watchdogEnabled"))
  229. demonServer.setown(createDeMonServer());
  230. else
  231. globals->setPropBool("@watchdogProgressEnabled", false);
  232. atomic_set(&activeTasks, 0);
  233. setJobManager(this);
  234. debugListener.setown(new CThorDebugListener(*this));
  235. }
  236. CJobManager::~CJobManager()
  237. {
  238. setJobManager(NULL);
  239. removeThreadExceptionHandler(this);
  240. }
  241. void CJobManager::stop()
  242. {
  243. if (!stopped)
  244. {
  245. LOG(MCdebugProgress, thorJob, "Stopping jobManager");
  246. stopped = true;
  247. if (jobq)
  248. {
  249. jobq->cancelWaitStatsChange();
  250. jobq->cancelAcceptConversation();
  251. }
  252. if (conversation && !handlingConversation)
  253. conversation->cancel();
  254. }
  255. }
  256. void CJobManager::fatal(IException *e)
  257. {
  258. try
  259. {
  260. IArrayOf<CJobMaster> jobList;
  261. {
  262. CriticalBlock b(jobCrit);
  263. ForEachItemIn(j, jobs)
  264. jobList.append(*LINK(&jobs.item(j)));
  265. }
  266. ForEachItemIn(j, jobList)
  267. replyException(jobList.item(j), e);
  268. jobList.kill();
  269. if (globals->getPropBool("@watchdogProgressEnabled"))
  270. queryDeMonServer()->endGraphs();
  271. setWuid(NULL); // deactivate workunit status (Shouldn't this logic belong outside of thor?)
  272. }
  273. catch (IException *e)
  274. {
  275. EXCLOG(e, NULL);
  276. e->Release();
  277. }
  278. catch (...)
  279. {
  280. IERRLOG("Unknown exception in CJobManager::fatal");
  281. }
  282. LOG(MCauditInfo,",Progress,Thor,Terminate,%s,%s,%s,exception",
  283. queryServerStatus().queryProperties()->queryProp("@thorname"),
  284. queryServerStatus().queryProperties()->queryProp("@nodeGroup"),
  285. queryServerStatus().queryProperties()->queryProp("@queue"));
  286. queryLogMsgManager()->flushQueue(10*1000);
  287. #ifdef _WIN32
  288. TerminateProcess(GetCurrentProcess(), 1);
  289. #else
  290. kill(getpid(), SIGKILL);
  291. #endif
  292. }
  293. void CJobManager::updateWorkUnitLog(IWorkUnit &workunit)
  294. {
  295. StringBuffer log, logUrl, slaveLogPattern;
  296. logHandler->getLogName(log);
  297. createUNCFilename(log, logUrl, false);
  298. slaveLogPattern.set(THORSLAVELOGSEARCHSTR).append(SLAVEIDSTR);
  299. const char *ptr = strstr(log, THORMASTERLOGSEARCHSTR);
  300. dbgassertex(ptr);
  301. slaveLogPattern.append(ptr + strlen(THORMASTERLOGSEARCHSTR) - 1); //Keep the '.' at the end of the THORMASTERLOGSEARCHSTR.
  302. Owned<IConstWUClusterInfo> clusterInfo = getTargetClusterInfo(workunit.queryClusterName());
  303. unsigned numberOfSlaves = clusterInfo->getNumberOfSlaveLogs();
  304. workunit.addProcess("Thor", globals->queryProp("@name"), 0, numberOfSlaves, slaveLogPattern, false, logUrl.str());
  305. }
  306. #define IDLE_RESTART_PERIOD (8*60) // 8 hours
  307. class CIdleShutdown : public CSimpleInterface, implements IThreaded
  308. {
  309. unsigned timeout;
  310. Semaphore sem;
  311. CThreaded threaded;
  312. public:
  313. CIdleShutdown(unsigned _timeout) : timeout(_timeout*60000), threaded("CIdleShutdown") { threaded.init(this); }
  314. ~CIdleShutdown() { stop(); threaded.join(); }
  315. virtual void threadmain() override
  316. {
  317. if (!sem.wait(timeout)) // feeling neglected, restarting..
  318. abortThor(MakeThorException(TE_IdleRestart, "Thor has been idle for %d minutes, restarting", timeout/60000), TEC_Idle, false);
  319. }
  320. void stop() { sem.signal(); }
  321. };
  322. static int getRunningMaxPriority(const char *qname)
  323. {
  324. int maxpriority = 0; // ignore neg
  325. try {
  326. Owned<IRemoteConnection> conn = querySDS().connect("/Status/Servers",myProcessSession(),RTM_LOCK_READ,30000);
  327. if (conn.get())
  328. {
  329. Owned<IPropertyTreeIterator> it(conn->queryRoot()->getElements("Server"));
  330. ForEach(*it) {
  331. StringBuffer instance;
  332. if(it->query().hasProp("@queue"))
  333. {
  334. const char* queue=it->query().queryProp("@queue");
  335. if(queue&&(strcmp(queue,qname)==0)) {
  336. Owned<IPropertyTreeIterator> wuids = it->query().getElements("WorkUnit");
  337. ForEach(*wuids) {
  338. IPropertyTree &wu = wuids->query();
  339. const char* wuid=wu.queryProp(NULL);
  340. if (wuid&&*wuid) {
  341. Owned<IWorkUnitFactory> factory = getWorkUnitFactory();
  342. Owned<IConstWorkUnit> workunit = factory->openWorkUnit(wuid);
  343. if (workunit) {
  344. int priority = workunit->getPriorityValue();
  345. if (priority>maxpriority)
  346. maxpriority = priority;
  347. }
  348. }
  349. }
  350. }
  351. }
  352. }
  353. }
  354. }
  355. catch (IException *e)
  356. {
  357. EXCLOG(e,"getRunningMaxPriority");
  358. e->Release();
  359. }
  360. return maxpriority;
  361. }
  362. bool CJobManager::fireException(IException *e)
  363. {
  364. IArrayOf<CJobMaster> jobList;
  365. {
  366. CriticalBlock b(jobCrit);
  367. ForEachItemIn(j, jobs)
  368. jobList.append(*LINK(&jobs.item(j)));
  369. }
  370. ForEachItemIn(j, jobList)
  371. jobList.item(j).fireException(e);
  372. jobList.kill();
  373. return true;
  374. }
  375. bool CJobManager::execute(IConstWorkUnit *workunit, const char *wuid, const char *graphName, const SocketEndpoint &agentep)
  376. {
  377. try
  378. {
  379. if (!workunit) // check workunit is available and ready to run.
  380. throw MakeStringException(0, "Could not locate workunit %s", wuid);
  381. if (workunit->getCodeVersion() == 0)
  382. throw makeStringException(0, "Attempting to execute a workunit that hasn't been compiled");
  383. if ((workunit->getCodeVersion() > ACTIVITY_INTERFACE_VERSION) || (workunit->getCodeVersion() < MIN_ACTIVITY_INTERFACE_VERSION))
  384. throw MakeStringException(0, "Workunit was compiled for eclagent interface version %d, this thor requires version %d..%d", workunit->getCodeVersion(), MIN_ACTIVITY_INTERFACE_VERSION, ACTIVITY_INTERFACE_VERSION);
  385. if (workunit->getCodeVersion() == 652)
  386. {
  387. // Any workunit compiled using eclcc 7.12.0-7.12.18 is not compatible
  388. StringBuffer buildVersion, eclVersion;
  389. workunit->getBuildVersion(StringBufferAdaptor(buildVersion), StringBufferAdaptor(eclVersion));
  390. const char *version = strstr(buildVersion, "7.12.");
  391. if (version)
  392. {
  393. const char *point = version + strlen("7.12.");
  394. unsigned pointVer = atoi(point);
  395. if (pointVer <= 18)
  396. throw MakeStringException(0, "Workunit was compiled by eclcc version %s which is not compatible with this runtime", buildVersion.str());
  397. }
  398. }
  399. if (debugListener)
  400. {
  401. WorkunitUpdate wu(&workunit->lock());
  402. StringBuffer sb;
  403. queryHostIP().getIpText(sb);
  404. wu->setDebugAgentListenerIP(sb); //tells debugger what IP to write commands to
  405. wu->setDebugAgentListenerPort(debugListener->getPort());
  406. }
  407. return doit(workunit, graphName, agentep);
  408. }
  409. catch (IException *e)
  410. {
  411. IThorException *te = QUERYINTERFACE(e, IThorException);
  412. if (te && tea_shutdown==te->queryAction())
  413. stopped = true;
  414. reply(workunit, wuid, e, agentep, false);
  415. }
  416. catch (CATCHALL)
  417. {
  418. reply(workunit, wuid, MakeStringException(0, "Unknown exception"), agentep, false);
  419. }
  420. return false;
  421. }
  422. void CJobManager::run()
  423. {
  424. LOG(MCdebugProgress, thorJob, "Listening for graph");
  425. setWuid(NULL);
  426. StringBuffer soPath;
  427. globals->getProp("@query_so_dir", soPath);
  428. StringBuffer soPattern("*.");
  429. #ifdef _WIN32
  430. soPattern.append("dll");
  431. #else
  432. soPattern.append("so");
  433. #endif
  434. querySoCache.init(soPath.str(), DEFAULT_QUERYSO_LIMIT, soPattern);
  435. SCMStringBuffer _queueNames;
  436. const char *thorName = globals->queryProp("@name");
  437. if (!thorName) thorName = "thor";
  438. getThorQueueNames(_queueNames, thorName);
  439. queueName.set(_queueNames.str());
  440. jobq.setown(createJobQueue(queueName.get()));
  441. struct cdynprio: public IDynamicPriority
  442. {
  443. const char *qn;
  444. int get()
  445. {
  446. int p = getRunningMaxPriority(qn);
  447. if (p)
  448. PROGLOG("Dynamic Min priority = %d",p);
  449. return p;
  450. }
  451. } *dp = NULL;
  452. if (globals->getPropBool("@multiThorPriorityLock")) {
  453. PROGLOG("multiThorPriorityLock enabled");
  454. dp = new cdynprio;
  455. dp->qn = queueName.get();
  456. }
  457. PROGLOG("verifying mp connection to all slaves");
  458. Owned<IMPServer> mpServer = getMPServer();
  459. Owned<ICommunicator> comm = mpServer->createCommunicator(&queryClusterGroup());
  460. if (!comm->verifyAll(false, 1000*60*30, 1000*60))
  461. throwStringExceptionV(0, "Failed to connect to all slaves");
  462. else
  463. PROGLOG("verified mp connection to all slaves");
  464. class CThorListener : public CSimpleInterface, implements IThreaded
  465. {
  466. CThreaded threaded;
  467. mptag_t mptag;
  468. bool stopped = false;
  469. public:
  470. CThorListener(mptag_t _mptag) : threaded("CDaliConnectionValidator"), mptag(_mptag)
  471. {
  472. threaded.init(this);
  473. }
  474. ~CThorListener() { stop(); threaded.join(); }
  475. void stop()
  476. {
  477. stopped = true;
  478. queryWorldCommunicator().cancel(NULL, mptag);
  479. }
  480. virtual void threadmain() override
  481. {
  482. for (;;)
  483. {
  484. CMessageBuffer msg;
  485. if (!queryWorldCommunicator().recv(msg, NULL, mptag))
  486. break;
  487. StringAttr cmd;
  488. msg.read(cmd);
  489. if (0 == stricmp("stop", cmd))
  490. {
  491. bool stopCurrentJob;
  492. msg.read(stopCurrentJob);
  493. abortThor(NULL, TEC_Clean, stopCurrentJob);
  494. break;
  495. }
  496. else
  497. PROGLOG("Unknown cmd = %s", cmd.get());
  498. }
  499. }
  500. } stopThorListener(MPTAG_THOR);
  501. StringBuffer exclusiveLockName;
  502. Owned<IDaliMutex> exclLockDaliMutex;
  503. if (globals->getProp("@multiThorExclusionLockName",exclusiveLockName))
  504. {
  505. if (exclusiveLockName.length())
  506. {
  507. if (globals->getPropBool("@multiThorPriorityLock"))
  508. FLLOG(MCoperatorWarning, thorJob, "multiThorPriorityLock cannot be used in conjunction with multiThorExclusionLockName");
  509. else
  510. {
  511. PROGLOG("Multi-Thor exclusive lock defined: %s", exclusiveLockName.str());
  512. exclLockDaliMutex.setown(createDaliMutex(exclusiveLockName.str()));
  513. }
  514. }
  515. }
  516. bool jobQConnected = false;
  517. while (!stopped)
  518. {
  519. handlingConversation = false;
  520. conversation.clear();
  521. SocketEndpoint masterEp(getMasterPortBase());
  522. StringBuffer url;
  523. PROGLOG("ThorLCR(%s) available, waiting on queue %s",masterEp.getUrlStr(url).str(),queueName.get());
  524. struct CLock
  525. {
  526. IDaliMutex *lock;
  527. StringAttr name;
  528. CLock() : lock(NULL) { }
  529. ~CLock()
  530. {
  531. clear();
  532. }
  533. void set(IDaliMutex *_lock, const char *_name)
  534. {
  535. lock = _lock;
  536. name.set(_name);
  537. PROGLOG("Took exclusive lock %s", name.get());
  538. }
  539. void clear()
  540. {
  541. if (lock)
  542. {
  543. IDaliMutex *_lock = lock;
  544. lock = NULL;
  545. _lock->leave();
  546. PROGLOG("Cleared exclusive lock: %s", name.get());
  547. }
  548. }
  549. } daliLock;
  550. Owned<IJobQueueItem> item;
  551. {
  552. CIdleShutdown idleshutdown(globals->getPropInt("@idleRestartPeriod", IDLE_RESTART_PERIOD));
  553. if (exclLockDaliMutex.get())
  554. {
  555. for (;;)
  556. {
  557. while (!stopped && !jobq->ordinality()) // this is avoid tight loop when nothing on q.
  558. {
  559. if (jobQConnected)
  560. {
  561. jobq->disconnect();
  562. jobQConnected = false;
  563. }
  564. jobq->waitStatsChange(1000);
  565. }
  566. if (stopped)
  567. break;
  568. unsigned connected, waiting, enqueued;
  569. if (exclLockDaliMutex->enter(5000))
  570. {
  571. daliLock.set(exclLockDaliMutex, exclusiveLockName);
  572. if (jobq->ordinality())
  573. {
  574. if (!jobQConnected)
  575. {
  576. jobq->connect(true);
  577. jobQConnected = true;
  578. }
  579. // NB: this is expecting to get an item without delay, timeout JIC.
  580. unsigned t = msTick();
  581. Owned<IJobQueueItem> _item = jobq->dequeue(30*1000);
  582. unsigned e = msTick() - t;
  583. StringBuffer msg;
  584. if (_item.get())
  585. msg.append("Jobqueue item retrieved");
  586. else
  587. msg.append("Nothing found on jobq::dequeue");
  588. if (e>=5000)
  589. msg.append(" - acceptConversation took ").append(e/1000).append(" secs");
  590. PROGLOG("%s", msg.str());
  591. if (_item.get())
  592. {
  593. if (_item->isValidSession())
  594. {
  595. SocketEndpoint ep = _item->queryEndpoint();
  596. ep.port = _item->getPort();
  597. Owned<IConversation> acceptconv = createSingletonSocketConnection(ep.port,&ep);
  598. if (acceptconv->connect(60*1000)) // shouldn't need that long
  599. {
  600. acceptconv->set_keep_alive(true);
  601. item.setown(_item.getClear());
  602. conversation.setown(acceptconv.getClear());
  603. }
  604. break;
  605. }
  606. }
  607. }
  608. daliLock.clear();
  609. }
  610. else
  611. {
  612. jobq->getStats(connected, waiting, enqueued);
  613. if (enqueued)
  614. PROGLOG("Exclusive lock %s in use. Queue state (connected=%d, waiting=%d, enqueued=%d)", exclusiveLockName.str(), connected ,waiting, enqueued);
  615. }
  616. }
  617. }
  618. else
  619. {
  620. if (!jobQConnected)
  621. {
  622. jobq->connect(true);
  623. jobQConnected = true;
  624. }
  625. IJobQueueItem *_item;
  626. conversation.setown(jobq->acceptConversation(_item,30*1000,dp)); // 30s priority transition delay
  627. item.setown(_item);
  628. }
  629. }
  630. if (!conversation.get()||!item.get())
  631. {
  632. if (!stopped)
  633. setExitCode(0);
  634. PROGLOG("acceptConversation aborted - terminating");
  635. break;
  636. }
  637. StringAttr graphName, wuid;
  638. const char *wuidGraph = item->queryWUID(); // actually <wuid>/<graphName>
  639. StringArray sArray;
  640. sArray.appendList(wuidGraph, "/");
  641. assertex(2 == sArray.ordinality());
  642. wuid.set(sArray.item(0));
  643. graphName.set(sArray.item(1));
  644. handlingConversation = true;
  645. SocketEndpoint agentep;
  646. try
  647. {
  648. MemoryBuffer msg;
  649. masterEp.serialize(msg); // only used for tracing
  650. if (!conversation->send(msg))
  651. {
  652. IWARNLOG("send conversation failed");
  653. continue;
  654. }
  655. if (!conversation->recv(msg.clear(),60*1000))
  656. {
  657. IWARNLOG("recv conversation failed");
  658. continue;
  659. }
  660. agentep.deserialize(msg);
  661. }
  662. catch (IException *e)
  663. {
  664. FLLOG(MCoperatorWarning, thorJob, e, "CJobManager::run");
  665. continue;
  666. }
  667. Owned<IWorkUnitFactory> factory;
  668. Owned<IConstWorkUnit> workunit;
  669. bool allDone = false;
  670. try
  671. {
  672. factory.setown(getWorkUnitFactory());
  673. workunit.setown(factory->openWorkUnit(wuid));
  674. unsigned maxLogDetail = workunit->getDebugValueInt("maxlogdetail", DefaultDetail);
  675. ILogMsgFilter *existingLogHandler = queryLogMsgManager()->queryMonitorFilter(logHandler);
  676. dbgassertex(existingLogHandler);
  677. verifyex(queryLogMsgManager()->changeMonitorFilterOwn(logHandler, getCategoryLogMsgFilter(existingLogHandler->queryAudienceMask(), existingLogHandler->queryClassMask(), maxLogDetail)));
  678. allDone = execute(workunit, wuid, graphName, agentep);
  679. daliLock.clear();
  680. reply(workunit, wuid, NULL, agentep, allDone);
  681. }
  682. catch (IException *e)
  683. {
  684. IThorException *te = QUERYINTERFACE(e, IThorException);
  685. if (te && tea_shutdown==te->queryAction())
  686. stopped = true;
  687. reply(workunit, wuid, e, agentep, false);
  688. }
  689. catch (CATCHALL)
  690. {
  691. reply(workunit, wuid, MakeStringException(0, "Unknown exception"), agentep, false);
  692. }
  693. // reset for next job
  694. setProcessAborted(false);
  695. }
  696. delete dp;
  697. jobq.clear();
  698. }
  699. bool CJobManager::doit(IConstWorkUnit *workunit, const char *graphName, const SocketEndpoint &agentep)
  700. {
  701. StringBuffer s;
  702. StringAttr wuid(workunit->queryWuid());
  703. StringAttr user(workunit->queryUser());
  704. LOG(MCdebugInfo, thorJob, "Processing wuid=%s, graph=%s from agent: %s", wuid.str(), graphName, agentep.getUrlStr(s).str());
  705. LOG(MCauditInfo,",Progress,Thor,Start,%s,%s,%s,%s,%s,%s",
  706. queryServerStatus().queryProperties()->queryProp("@thorname"),
  707. wuid.str(),
  708. graphName,
  709. user.str(),
  710. queryServerStatus().queryProperties()->queryProp("@nodeGroup"),
  711. queryServerStatus().queryProperties()->queryProp("@queue"));
  712. Owned<IException> e;
  713. bool allDone = false;
  714. try
  715. {
  716. allDone = executeGraph(*workunit, graphName, agentep);
  717. }
  718. catch (IException *_e) { e.setown(_e); }
  719. LOG(MCauditInfo,",Progress,Thor,Stop,%s,%s,%s,%s,%s,%s",
  720. queryServerStatus().queryProperties()->queryProp("@thorname"),
  721. wuid.str(),
  722. graphName,
  723. user.str(),
  724. queryServerStatus().queryProperties()->queryProp("@nodeGroup"),
  725. queryServerStatus().queryProperties()->queryProp("@queue"));
  726. if (e.get()) throw e.getClear();
  727. return allDone;
  728. }
  729. void CJobManager::setWuid(const char *wuid, const char *cluster)
  730. {
  731. currentWuid.set(wuid);
  732. try
  733. {
  734. if (wuid && *wuid)
  735. {
  736. queryServerStatus().queryProperties()->setProp("WorkUnit", wuid);
  737. queryServerStatus().queryProperties()->setProp("Cluster", cluster);
  738. }
  739. else
  740. {
  741. queryServerStatus().queryProperties()->removeProp("WorkUnit");
  742. queryServerStatus().queryProperties()->removeProp("Cluster");
  743. }
  744. queryServerStatus().commitProperties();
  745. }
  746. catch (IException *e)
  747. {
  748. FLLOG(MCexception(e), thorJob, e, "WARNING: Failed to set wuid in SDS:");
  749. e->Release();
  750. }
  751. catch (CATCHALL)
  752. {
  753. FLLOG(MCerror, thorJob, "WARNING: Failed to set wuid in SDS: Unknown error");
  754. }
  755. }
  756. void CJobManager::replyException(CJobMaster &job, IException *e)
  757. {
  758. reply(&job.queryWorkUnit(), job.queryWorkUnit().queryWuid(), e, job.queryAgentEp(), false);
  759. }
  760. void CJobManager::reply(IConstWorkUnit *workunit, const char *wuid, IException *e, const SocketEndpoint &agentep, bool allDone)
  761. {
  762. CriticalBlock b(replyCrit);
  763. #ifdef _CONTAINERIZED
  764. // JCSMORE ignore pause/resume cases for now.
  765. if (e)
  766. {
  767. if (!exitException)
  768. exitException.setown(e);
  769. return;
  770. }
  771. #else
  772. workunit->forceReload();
  773. if (!conversation)
  774. return;
  775. StringBuffer s;
  776. if (e) {
  777. s.append("Posting exception: ");
  778. e->errorMessage(s);
  779. }
  780. else
  781. s.append("Posting OK");
  782. s.append(" to agent ");
  783. agentep.getUrlStr(s);
  784. s.append(" for workunit(").append(wuid).append(")");
  785. PROGLOG("%s", s.str());
  786. MemoryBuffer replyMb;
  787. workunit->forceReload();
  788. if (!allDone && (WUActionPause == workunit->getAction() || WUActionPauseNow == workunit->getAction()))
  789. {
  790. replyMb.append((unsigned)DAMP_THOR_REPLY_PAUSED);
  791. if (e)
  792. {
  793. // likely if WUActionPauseNow, shouldn't happen if WUActionPause
  794. EXCLOG(e, "Exception at time of pause");
  795. replyMb.append(true);
  796. serializeException(e, replyMb);
  797. }
  798. else
  799. replyMb.append(false);
  800. }
  801. else if (e)
  802. {
  803. IThorException *te = QUERYINTERFACE(e, IThorException);
  804. if (te)
  805. {
  806. switch (te->errorCode())
  807. {
  808. case TE_CostExceeded:
  809. case TE_WorkUnitAborting:
  810. replyMb.append((unsigned)DAMP_THOR_REPLY_ABORT);
  811. break;
  812. default:
  813. replyMb.append((unsigned)DAMP_THOR_REPLY_ERROR);
  814. break;
  815. }
  816. }
  817. else
  818. replyMb.append((unsigned)DAMP_THOR_REPLY_ERROR);
  819. serializeException(e, replyMb);
  820. }
  821. else
  822. replyMb.append((unsigned)DAMP_THOR_REPLY_GOOD);
  823. if (!conversation->send(replyMb)) {
  824. s.clear();
  825. IERRLOG("Failed to reply to agent %s",agentep.getUrlStr(s).str());
  826. }
  827. conversation.clear();
  828. handlingConversation = false;
  829. //GH->JCS Should this be using getEnvironmentFactory()->openEnvironment()?
  830. Owned<IRemoteConnection> conn = querySDS().connect("/Environment", myProcessSession(), RTM_LOCK_READ, MEDIUMTIMEOUT);
  831. if (checkThorNodeSwap(globals->queryProp("@name"),e?wuid:NULL,(unsigned)-1))
  832. abortThor(e, TEC_Swap, false);
  833. #endif
  834. }
  835. bool CJobManager::executeGraph(IConstWorkUnit &workunit, const char *graphName, const SocketEndpoint &agentEp)
  836. {
  837. timestamp_type startTs = getTimeStampNowValue();
  838. {
  839. Owned<IWorkUnit> wu = &workunit.lock();
  840. wu->setTracingValue("ThorBuild", hpccBuildTag);
  841. #ifndef _CONTAINERIZED
  842. updateWorkUnitLog(*wu);
  843. #endif
  844. }
  845. workunit.forceReload();
  846. StringAttr wuid(workunit.queryWuid());
  847. cycle_t startCycles = get_cycles_now();
  848. Owned<IConstWUQuery> query = workunit.getQuery();
  849. SCMStringBuffer soName;
  850. query->getQueryDllName(soName);
  851. unsigned version = query->getQueryDllCrc();
  852. query.clear();
  853. StringBuffer soPath;
  854. globals->getProp("@query_so_dir", soPath);
  855. StringBuffer compoundPath;
  856. compoundPath.append(soPath.str());
  857. soPath.append(soName.str());
  858. getCompoundQueryName(compoundPath, soName.str(), version);
  859. bool sendSo = false;
  860. if (querySoCache.isAvailable(compoundPath.str()))
  861. PROGLOG("Using existing local dll: %s", compoundPath.str()); // It is assumed if present here then _still_ present on slaves from previous send.
  862. else
  863. {
  864. MemoryBuffer file;
  865. queryDllServer().getDll(soName.str(), file);
  866. PROGLOG("Saving dll: %s", compoundPath.str());
  867. OwnedIFile out = createIFile(compoundPath.str());
  868. try
  869. {
  870. out->setCreateFlags(S_IRWXU);
  871. OwnedIFileIO io = out->open(IFOcreate);
  872. io->write(0, file.length(), file.toByteArray());
  873. io.clear();
  874. }
  875. catch (IException *e)
  876. {
  877. FLLOG(MCexception(e), thorJob, e, "Failed to write query dll - ignoring!");
  878. e->Release();
  879. }
  880. sendSo = globals->getPropBool("Debug/@dllsToSlaves", true);
  881. }
  882. Owned<ILoadedDllEntry> querySo = createDllEntry(compoundPath.str(), false, NULL, false);
  883. SCMStringBuffer eclstr;
  884. StringAttr user(workunit.queryUser());
  885. PROGLOG("Started wuid=%s, user=%s, graph=%s\n", wuid.str(), user.str(), graphName);
  886. PROGLOG("Query %s loaded", compoundPath.str());
  887. Owned<CJobMaster> job = createThorGraph(graphName, workunit, querySo, sendSo, agentEp);
  888. unsigned wfid = job->getWfid();
  889. StringBuffer graphScope;
  890. graphScope.append(WorkflowScopePrefix).append(wfid).append(":").append(graphName);
  891. PROGLOG("Graph %s created", graphName);
  892. PROGLOG("Running graph=%s", job->queryGraphName());
  893. addJob(*job);
  894. bool allDone = false;
  895. Owned<IException> exception;
  896. Owned<IFatalHandler> fatalHdlr;
  897. try
  898. {
  899. struct CounterBlock
  900. {
  901. atomic_t &counter;
  902. CounterBlock(atomic_t &_counter) : counter(_counter) { atomic_inc(&counter); }
  903. ~CounterBlock() { atomic_dec(&counter); }
  904. } cBlock(activeTasks);
  905. {
  906. Owned<IWorkUnit> wu = &workunit.lock();
  907. wu->setStatistic(queryStatisticsComponentType(), queryStatisticsComponentName(), SSTgraph, graphScope, StWhenStarted, NULL, startTs, 1, 0, StatsMergeAppend);
  908. //Could use addTimeStamp(wu, SSTgraph, graphName, StWhenStarted, wfid) if start time could be this point
  909. wu->setState(WUStateRunning);
  910. VStringBuffer version("%d.%d", THOR_VERSION_MAJOR, THOR_VERSION_MINOR);
  911. wu->setDebugValue("ThorVersion", version.str(), true);
  912. }
  913. setWuid(workunit.queryWuid(), workunit.queryClusterName());
  914. allDone = job->go();
  915. Owned<IWorkUnit> wu = &workunit.lock();
  916. unsigned __int64 graphTimeNs = cycle_to_nanosec(get_cycles_now()-startCycles);
  917. StringBuffer graphTimeStr;
  918. formatGraphTimerLabel(graphTimeStr, graphName);
  919. updateWorkunitStat(wu, SSTgraph, graphName, StTimeElapsed, graphTimeStr, graphTimeNs, wfid);
  920. addTimeStamp(wu, SSTgraph, graphName, StWhenFinished, wfid);
  921. cost_type cost = money2cost_type(calculateThorCost(nanoToMilli(graphTimeNs), queryNodeClusterWidth()));
  922. if (cost)
  923. wu->setStatistic(queryStatisticsComponentType(), queryStatisticsComponentName(), SSTgraph, graphScope, StCostExecute, NULL, cost, 1, 0, StatsMergeReplace);
  924. removeJob(*job);
  925. }
  926. catch (IException *e)
  927. {
  928. exception.setown(e);
  929. }
  930. job->endJob();
  931. removeJob(*job);
  932. if (exception)
  933. {
  934. setWuid(nullptr);
  935. throw exception.getClear();
  936. }
  937. fatalHdlr.setown(job->clearFatalHandler());
  938. job.clear();
  939. PROGLOG("Finished wuid=%s, graph=%s", wuid.str(), graphName);
  940. fatalHdlr->clear();
  941. setWuid(NULL);
  942. return allDone;
  943. }
  944. void CJobManager::addCachedSo(const char *name)
  945. {
  946. querySoCache.add(name);
  947. }
  948. static int exitCode = -1;
  949. void setExitCode(int code) { exitCode = code; }
  950. int queryExitCode() { return exitCode; }
  951. static unsigned aborting = 99;
  952. void abortThor(IException *e, unsigned errCode, bool abortCurrentJob)
  953. {
  954. if (-1 == queryExitCode()) setExitCode(errCode);
  955. Owned<CJobManager> jM = ((CJobManager *)getJobManager());
  956. Owned<IException> _e;
  957. if (0 == aborting)
  958. {
  959. aborting = 1;
  960. if (errCode != TEC_Clean)
  961. {
  962. if (!e)
  963. {
  964. _e.setown(MakeThorException(TE_AbortException, "THOR ABORT"));
  965. e = _e;
  966. }
  967. EXCLOG(e,"abortThor");
  968. }
  969. LOG(MCdebugProgress, thorJob, "abortThor called");
  970. if (jM)
  971. jM->stop();
  972. }
  973. if (2 > aborting && abortCurrentJob)
  974. {
  975. aborting = 2;
  976. LOG(MCdebugProgress, thorJob, "aborting any current active job");
  977. if (jM)
  978. {
  979. if (!e)
  980. {
  981. _e.setown(MakeThorException(TE_AbortException, "THOR ABORT"));
  982. e = _e;
  983. }
  984. jM->fireException(e);
  985. }
  986. if (errCode == TEC_Clean)
  987. {
  988. LOG(MCdebugProgress, thorJob, "Removing sentinel upon normal shutdown");
  989. Owned<IFile> sentinelFile = createSentinelTarget();
  990. removeSentinelFile(sentinelFile);
  991. }
  992. }
  993. }
  994. #define DEFAULT_VERIFYDALI_POLL 5*60 // secs
  995. class CDaliConnectionValidator : public CSimpleInterface, implements IThreaded
  996. {
  997. bool stopped;
  998. unsigned pollDelay;
  999. Semaphore poll;
  1000. CThreaded threaded;
  1001. public:
  1002. CDaliConnectionValidator(unsigned _pollDelay) : threaded("CDaliConnectionValidator") { pollDelay = _pollDelay*1000; stopped = false; threaded.init(this); }
  1003. ~CDaliConnectionValidator() { stop(); threaded.join(); }
  1004. virtual void threadmain() override
  1005. {
  1006. for (;;)
  1007. {
  1008. poll.wait(pollDelay);
  1009. if (stopped) break;
  1010. if (!verifyCovenConnection(pollDelay)) // use poll delay time for verify connection timeout
  1011. {
  1012. abortThor(MakeThorOperatorException(TE_AbortException, "Detected lost connectivity with dali server, aborting thor"), TEC_DaliDown);
  1013. break;
  1014. }
  1015. }
  1016. }
  1017. void stop()
  1018. {
  1019. stopped = true;
  1020. poll.signal();
  1021. }
  1022. };
  1023. static CSDSServerStatus *serverStatus = NULL;
  1024. CSDSServerStatus &queryServerStatus() { return *serverStatus; }
  1025. CSDSServerStatus &openThorServerStatus()
  1026. {
  1027. assertex(!serverStatus);
  1028. serverStatus = new CSDSServerStatus("ThorMaster");
  1029. return *serverStatus;
  1030. }
  1031. void closeThorServerStatus()
  1032. {
  1033. if (serverStatus)
  1034. {
  1035. delete serverStatus;
  1036. serverStatus = NULL;
  1037. }
  1038. }
  1039. #ifdef _CONTAINERIZED
  1040. /*
  1041. * Waits on recv for another wuid/graph to run.
  1042. * Return values:
  1043. * -2 = reply to client failed
  1044. * -1 = recv failed/timedout
  1045. * 0 = unrecognised format, or wuid mismatch
  1046. * 1 = success. new graph/wuid received.
  1047. */
  1048. static int recvNextGraph(unsigned timeoutMs, const char *wuid, StringBuffer &retWuid, StringBuffer &retGraphName)
  1049. {
  1050. PROGLOG("Lingering time left: %.2f", ((float)timeoutMs)/1000);
  1051. CMessageBuffer msg;
  1052. if (!queryWorldCommunicator().recv(msg, NULL, MPTAG_THOR, nullptr, timeoutMs))
  1053. return -1;
  1054. StringBuffer next;
  1055. msg.read(next);
  1056. // validate
  1057. StringArray sArray;
  1058. sArray.appendList(next, "/");
  1059. if (2 == sArray.ordinality())
  1060. {
  1061. if (wuid && !streq(sArray.item(0), wuid))
  1062. return 0;
  1063. msg.clear().append(true);
  1064. if (queryWorldCommunicator().reply(msg, 60*1000)) // should be quick!
  1065. {
  1066. retWuid.set(sArray.item(0));
  1067. retGraphName.set(sArray.item(1));
  1068. return 1;
  1069. }
  1070. else
  1071. return -2;
  1072. }
  1073. return 0;
  1074. }
  1075. #endif
  1076. void thorMain(ILogMsgHandler *logHandler, const char *wuid, const char *graphName)
  1077. {
  1078. aborting = 0;
  1079. unsigned multiThorMemoryThreshold = globals->getPropInt("@multiThorMemoryThreshold")*0x100000;
  1080. try
  1081. {
  1082. Owned<CDaliConnectionValidator> daliConnectValidator = new CDaliConnectionValidator(globals->getPropInt("@verifyDaliConnectionInterval", DEFAULT_VERIFYDALI_POLL));
  1083. Owned<ILargeMemLimitNotify> notify;
  1084. if (multiThorMemoryThreshold)
  1085. {
  1086. StringBuffer ngname;
  1087. if (!globals->getProp("@multiThorResourceGroup",ngname))
  1088. globals->getProp("@nodeGroup",ngname);
  1089. if (ngname.length())
  1090. {
  1091. notify.setown(createMultiThorResourceMutex(ngname.str(),serverStatus));
  1092. setMultiThorMemoryNotify(multiThorMemoryThreshold,notify);
  1093. PROGLOG("Multi-Thor resource limit for %s set to %" I64F "d",ngname.str(),(__int64)multiThorMemoryThreshold);
  1094. }
  1095. else
  1096. multiThorMemoryThreshold = 0;
  1097. }
  1098. initFileManager();
  1099. CThorResourceMaster masterResource;
  1100. setIThorResource(masterResource);
  1101. enableForceRemoteReads(); // forces file reads to be remote reads if they match environment setting 'forceRemotePattern' pattern.
  1102. Owned<CJobManager> jobManager = new CJobManager(logHandler);
  1103. try
  1104. {
  1105. #ifndef _CONTAINERIZED
  1106. jobManager->run();
  1107. #else
  1108. unsigned lingerPeriod = globals->getPropInt("@lingerPeriod", DEFAULT_LINGER_SECS)*1000;
  1109. bool multiJobLinger = globals->getPropBool("@multiJobLinger");
  1110. VStringBuffer multiJobLingerQueueName("%s_lingerqueue", globals->queryProp("@name"));
  1111. StringBuffer instance("thorinstance_");
  1112. queryMyNode()->endpoint().getUrlStr(instance);
  1113. StringBuffer currentGraphName(graphName);
  1114. StringBuffer currentWuid(wuid);
  1115. while (true)
  1116. {
  1117. PROGLOG("Executing: wuid=%s, graph=%s", currentWuid.str(), currentGraphName.str());
  1118. {
  1119. Owned<IWorkUnitFactory> factory;
  1120. Owned<IConstWorkUnit> workunit;
  1121. factory.setown(getWorkUnitFactory());
  1122. workunit.setown(factory->openWorkUnit(currentWuid));
  1123. SocketEndpoint dummyAgentEp;
  1124. jobManager->execute(workunit, currentWuid, currentGraphName, dummyAgentEp);
  1125. IException *e = jobManager->queryExitException();
  1126. Owned<IWorkUnit> w = &workunit->lock();
  1127. if (e)
  1128. {
  1129. Owned<IWUException> we = w->createException();
  1130. we->setSeverity(SeverityInformation);
  1131. StringBuffer errStr;
  1132. e->errorMessage(errStr);
  1133. we->setExceptionMessage(errStr);
  1134. we->setExceptionSource("thormasterexception");
  1135. we->setExceptionCode(e->errorCode());
  1136. w->setState(WUStateWait);
  1137. break;
  1138. }
  1139. if (!multiJobLinger && lingerPeriod)
  1140. w->setDebugValue(instance, "1", true);
  1141. w->setState(WUStateWait);
  1142. }
  1143. currentGraphName.clear();
  1144. if (lingerPeriod)
  1145. {
  1146. // Register the idle lingering Thor.
  1147. Owned<IRemoteConnection> multiJobLingerInstanceConn;
  1148. if (multiJobLinger)
  1149. {
  1150. // Global, available to any workunit.
  1151. // Register it in a dali psuedo queue
  1152. VStringBuffer multiJobLingerXPath("/Status/ThorLinger/%s", globals->queryProp("@name"));
  1153. Owned<IRemoteConnection> conn = querySDS().connect(multiJobLingerXPath, myProcessSession(), RTM_CREATE_QUERY | RTM_LOCK_WRITE, 5*60*1000);
  1154. StringBuffer instance;
  1155. queryMyNode()->endpoint().getUrlStr(instance);
  1156. VStringBuffer entryXPath("%s/Thor%" I64F "u", multiJobLingerXPath.str(), myProcessSession());
  1157. /* Establish the instance with a RTM_DELETE_ON_DISCONNECT, so that if process exits for any reason,
  1158. * the instance is also automatically removed.
  1159. * A client (agent) may pick up this instance and consume it.
  1160. * NB: There's a window where it's possible that a client picks up this Thor instance, as it's shutting down,
  1161. * in which case, the client will timeout trying to connect to it and cycle around to try another or queue the job
  1162. * for a new instance.
  1163. */
  1164. multiJobLingerInstanceConn.setown(querySDS().connect(entryXPath, myProcessSession(), RTM_CREATE_QUERY | RTM_DELETE_ON_DISCONNECT | RTM_LOCK_WRITE, 5*1000));
  1165. multiJobLingerInstanceConn->queryRoot()->setProp(nullptr, instance);
  1166. multiJobLingerInstanceConn->changeMode(RTM_NONE); // unlock, because can be read and deleted by a reader
  1167. PROGLOG("Thor %s added to multijob linger queue: %s", instance.str(), multiJobLingerQueueName.str());
  1168. }
  1169. CMessageBuffer msg;
  1170. CTimeMon timer(lingerPeriod);
  1171. unsigned remaining;
  1172. while (!timer.timedout(&remaining))
  1173. {
  1174. StringBuffer wuid;
  1175. int ret = recvNextGraph(remaining, multiJobLinger ? nullptr : currentWuid.str(), wuid, currentGraphName);
  1176. if (ret > 0)
  1177. {
  1178. currentWuid.set(wuid); // NB: will always be same if !multiJobLinger
  1179. break; // success
  1180. }
  1181. else if (ret < 0)
  1182. break; // timeout/abort
  1183. // else - reject/ignore duff message.
  1184. }
  1185. // De-register the idle lingering entry.
  1186. // NB: in the case of multiJobLinger, it is handled by the RTM_DELETE_ON_DISCONNECT on multiJobLingerInstanceConn
  1187. if (!multiJobLinger && (0 == currentGraphName.length()))
  1188. {
  1189. // remove lingering instance from workunit
  1190. Owned<IWorkUnitFactory> factory;
  1191. Owned<IConstWorkUnit> workunit;
  1192. factory.setown(getWorkUnitFactory());
  1193. workunit.setown(factory->openWorkUnit(currentWuid));
  1194. Owned<IWorkUnit> w = &workunit->lock();
  1195. w->setDebugValue(instance, "0", true);
  1196. }
  1197. }
  1198. if (0 == currentGraphName.length())
  1199. break;
  1200. }
  1201. #endif
  1202. }
  1203. catch (IException *e)
  1204. {
  1205. EXCLOG(e, NULL);
  1206. throw;
  1207. }
  1208. }
  1209. catch (IException *e)
  1210. {
  1211. FLLOG(MCexception(e), thorJob, e,"ThorMaster");
  1212. e->Release();
  1213. }
  1214. if (multiThorMemoryThreshold)
  1215. setMultiThorMemoryNotify(0,NULL);
  1216. }