thmastermain.cpp 39 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091
  1. /*##############################################################################
  2. HPCC SYSTEMS software Copyright (C) 2012 HPCC Systems®.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. ############################################################################## */
  13. // Entrypoint for ThorMaster.EXE
  14. #include "platform.h"
  15. #include <stddef.h>
  16. #include <stdlib.h>
  17. #include <assert.h>
  18. #include <stdio.h>
  19. #include <stdlib.h>
  20. #ifdef _WIN32
  21. #include <direct.h>
  22. #endif
  23. #include "jlib.hpp"
  24. #include "jdebug.hpp"
  25. #include "jfile.hpp"
  26. #include "jmisc.hpp"
  27. #include "jmutex.hpp"
  28. #include "jprop.hpp"
  29. #include "jset.hpp"
  30. #include "jsocket.hpp"
  31. #include "jthread.hpp"
  32. #include "jexcept.hpp"
  33. #include "mpbase.hpp"
  34. #include "mplog.hpp"
  35. #include "daaudit.hpp"
  36. #include "daclient.hpp"
  37. #include "dadfs.hpp"
  38. #include "dalienv.hpp"
  39. #include "dasds.hpp"
  40. #include "dllserver.hpp"
  41. #include "workunit.hpp"
  42. #include "rmtfile.hpp"
  43. #include "portlist.h"
  44. #include "thor.hpp"
  45. #include "thorport.hpp"
  46. #include "thormisc.hpp"
  47. #include "thgraph.hpp"
  48. #include "thgraphmaster.hpp"
  49. #include "thgraphmanager.hpp"
  50. #include "thmastermain.hpp"
  51. #include "mawatchdog.hpp"
  52. #include "thexception.hpp"
  53. #include "thmem.hpp"
  54. #define DEFAULT_QUERY_SO_DIR "sodir"
  55. #define MAX_SLAVEREG_DELAY 60*1000*15 // 15 mins
  56. #define SLAVEREG_VERIFY_DELAY 5*1000
  57. #define SHUTDOWN_IN_PARALLEL 20
  58. class CThorEndHandler : implements IThreaded
  59. {
  60. CThreaded threaded;
  61. unsigned timeout = 30000;
  62. std::atomic<bool> started{false};
  63. std::atomic<bool> stopped{false};
  64. Semaphore sem;
  65. public:
  66. CThorEndHandler() : threaded("CThorEndHandler")
  67. {
  68. threaded.init(this); // starts thread
  69. }
  70. ~CThorEndHandler()
  71. {
  72. stop();
  73. threaded.join(timeout);
  74. }
  75. void start(unsigned timeoutSecs)
  76. {
  77. bool expected = false;
  78. if (started.compare_exchange_strong(expected, true))
  79. {
  80. timeout = timeoutSecs * 1000; // sem_post and sem_wait are mem_barriers
  81. sem.signal();
  82. }
  83. }
  84. void stop()
  85. {
  86. bool expected = false;
  87. if (stopped.compare_exchange_strong(expected, true))
  88. sem.signal();
  89. }
  90. virtual void threadmain() override
  91. {
  92. // wait to be signalled to start timer
  93. sem.wait();
  94. if (stopped)
  95. return;
  96. if (!sem.wait(timeout))
  97. {
  98. // if it wasn't set by now then it's -1 and Thor restarts ...
  99. int eCode = queryExitCode();
  100. _exit(eCode);
  101. }
  102. }
  103. };
  104. static CThorEndHandler *thorEndHandler = nullptr;
  105. MODULE_INIT(INIT_PRIORITY_STANDARD)
  106. {
  107. /* NB: CThorEndHandler starts the thread now, although strictly it is not needed until later.
  108. * This is to avoid requiring the thread to be started in a unsafe context, e.g. a signal handler
  109. */
  110. thorEndHandler = new CThorEndHandler();
  111. return true;
  112. }
  113. MODULE_EXIT()
  114. {
  115. if (thorEndHandler)
  116. delete thorEndHandler;
  117. }
  118. class CRegistryServer : public CSimpleInterface
  119. {
  120. unsigned msgDelay, slavesRegistered;
  121. CriticalSection crit;
  122. bool stopped = false;
  123. static CriticalSection regCrit;
  124. static CRegistryServer *registryServer;
  125. class CDeregistrationWatch : implements IThreaded
  126. {
  127. CThreaded threaded;
  128. CRegistryServer &registry;
  129. bool running;
  130. public:
  131. CDeregistrationWatch(CRegistryServer &_registry) : threaded("CDeregistrationWatch"), registry(_registry), running(false) { }
  132. ~CDeregistrationWatch()
  133. {
  134. stop();
  135. }
  136. void start() { threaded.init(this); }
  137. void stop()
  138. {
  139. if (running)
  140. {
  141. running = false;
  142. queryWorldCommunicator().cancel(NULL, MPTAG_THORREGISTRATION);
  143. threaded.join();
  144. }
  145. }
  146. virtual void threadmain() override
  147. {
  148. running = true;
  149. for (;;)
  150. {
  151. INode *senderNode;
  152. CMessageBuffer msg;
  153. if (!queryWorldCommunicator().recv(msg, NULL, MPTAG_THORREGISTRATION, &senderNode))
  154. return;
  155. rank_t sender = queryNodeGroup().rank(senderNode);
  156. SocketEndpoint ep = senderNode->endpoint();
  157. StringBuffer url;
  158. ep.getUrlStr(url);
  159. if (RANK_NULL == sender)
  160. {
  161. PROGLOG("Node %s trying to deregister is not part of this cluster", url.str());
  162. continue;
  163. }
  164. RegistryCode code;
  165. readUnderlyingType<RegistryCode>(msg, code);
  166. if (rc_deregister != code)
  167. throwUnexpected();
  168. Owned<IException> e = deserializeException(msg);
  169. if (e.get())
  170. EXCLOG(e, "Slave unregistered with exception");
  171. registry.deregisterNode(sender-1);
  172. }
  173. running = false;
  174. }
  175. } deregistrationWatch;
  176. public:
  177. Linked<CMasterWatchdogBase> watchdog;
  178. IBitSet *status;
  179. CRegistryServer() : deregistrationWatch(*this)
  180. {
  181. status = createThreadSafeBitSet();
  182. msgDelay = SLAVEREG_VERIFY_DELAY;
  183. slavesRegistered = 0;
  184. if (globals->getPropBool("@watchdogEnabled"))
  185. watchdog.setown(createMasterWatchdog(globals->getPropBool("@useUDPWatchdog")));
  186. else
  187. globals->setPropBool("@watchdogProgressEnabled", false);
  188. CriticalBlock b(regCrit);
  189. registryServer = this;
  190. }
  191. ~CRegistryServer()
  192. {
  193. CriticalBlock b(regCrit);
  194. registryServer = NULL;
  195. stop();
  196. if (watchdog)
  197. watchdog->stop();
  198. shutdown();
  199. status->Release();
  200. }
  201. static CRegistryServer *getRegistryServer()
  202. {
  203. CriticalBlock b(regCrit);
  204. return LINK(registryServer);
  205. }
  206. void deregisterNode(unsigned slave)
  207. {
  208. const SocketEndpoint &ep = queryNodeGroup().queryNode(slave+1).endpoint();
  209. StringBuffer url;
  210. ep.getUrlStr(url);
  211. if (!status->test(slave))
  212. {
  213. PROGLOG("Slave %d (%s) trying to unregister, but not currently registered", slave+1, url.str());
  214. return;
  215. }
  216. PROGLOG("Slave %d (%s) unregistered", slave+1, url.str());
  217. status->set(slave, false);
  218. --slavesRegistered;
  219. if (watchdog)
  220. watchdog->removeSlave(ep);
  221. abortThor(MakeThorOperatorException(TE_AbortException, "The machine %s and/or the slave was shutdown. Aborting Thor", url.str()), TEC_SlaveInit);
  222. }
  223. void registerNode(unsigned slave)
  224. {
  225. SocketEndpoint ep = queryNodeGroup().queryNode(slave+1).endpoint();
  226. StringBuffer url;
  227. ep.getUrlStr(url);
  228. if (status->test(slave))
  229. {
  230. PROGLOG("Slave %d (%s) already registered, rejecting", slave+1, url.str());
  231. return;
  232. }
  233. PROGLOG("Slave %d (%s) registered", slave+1, url.str());
  234. status->set(slave);
  235. if (watchdog)
  236. watchdog->addSlave(ep);
  237. ++slavesRegistered;
  238. }
  239. bool connect(unsigned slaves)
  240. {
  241. LOG(MCdebugProgress, thorJob, "Waiting for %d slaves to register", slaves);
  242. IPointerArrayOf<INode> connectedSlaves;
  243. connectedSlaves.ensureCapacity(slaves);
  244. unsigned remaining = slaves;
  245. INode *_sender = nullptr;
  246. CMessageBuffer msg;
  247. while (remaining)
  248. {
  249. if (!queryWorldCommunicator().recv(msg, nullptr, MPTAG_THORREGISTRATION, &_sender, MP_WAIT_FOREVER))
  250. {
  251. ::Release(_sender);
  252. PROGLOG("Failed to initialize slaves");
  253. return false;
  254. }
  255. Owned<INode> sender = _sender;
  256. if (NotFound != connectedSlaves.find(sender))
  257. {
  258. StringBuffer epStr;
  259. PROGLOG("Same slave registered twice!! : %s", sender->endpoint().getUrlStr(epStr).str());
  260. return false;
  261. }
  262. /* NB: in base metal setup, the slaves know which slave number they are in advance, and send their slavenum at registration.
  263. * In non attached storage setup, they do not send a slave by default and instead are given a # once all are registered
  264. */
  265. unsigned slaveNum;
  266. msg.read(slaveNum);
  267. if (NotFound == slaveNum)
  268. {
  269. connectedSlaves.append(sender.getLink());
  270. slaveNum = connectedSlaves.ordinality();
  271. }
  272. else
  273. {
  274. unsigned pos = slaveNum - 1; // NB: slaveNum is 1 based
  275. while (connectedSlaves.ordinality() < pos)
  276. connectedSlaves.append(nullptr);
  277. if (connectedSlaves.ordinality() == pos)
  278. connectedSlaves.append(sender.getLink());
  279. else
  280. connectedSlaves.replace(sender.getLink(), pos);
  281. }
  282. StringBuffer epStr;
  283. PROGLOG("Slave %u connected from %s", slaveNum, sender->endpoint().getUrlStr(epStr).str());
  284. --remaining;
  285. }
  286. assertex(slaves == connectedSlaves.ordinality());
  287. unsigned localThorPortInc = globals->getPropInt("@localThorPortInc", DEFAULT_SLAVEPORTINC);
  288. unsigned slaveBasePort = globals->getPropInt("@slaveport", DEFAULT_THORSLAVEPORT);
  289. unsigned channelsPerWorker = globals->getPropInt("@channelsPerWorker", 1);
  290. Owned<IGroup> processGroup;
  291. // NB: in bare metal Thor is bound to a group and cluster/communicator have alreday been setup (see earlier setClusterGroup call)
  292. if (clusterInitialized())
  293. processGroup.set(&queryProcessGroup());
  294. else
  295. {
  296. processGroup.setown(createIGroup(connectedSlaves.ordinality(), connectedSlaves.getArray()));
  297. setupCluster(queryMyNode(), processGroup, channelsPerWorker, slaveBasePort, localThorPortInc);
  298. }
  299. PROGLOG("Slaves connected, initializing..");
  300. msg.clear();
  301. msg.append(THOR_VERSION_MAJOR).append(THOR_VERSION_MINOR);
  302. processGroup->serialize(msg);
  303. globals->serialize(msg);
  304. msg.append(masterSlaveMpTag);
  305. msg.append(kjServiceMpTag);
  306. if (!queryNodeComm().send(msg, RANK_ALL_OTHER, MPTAG_THORREGISTRATION, MP_ASYNC_SEND))
  307. {
  308. PROGLOG("Failed to initialize slaves");
  309. return false;
  310. }
  311. // Wait for confirmation from slaves
  312. PROGLOG("Initialization sent to slave group");
  313. try
  314. {
  315. while (slavesRegistered < slaves)
  316. {
  317. rank_t sender;
  318. CMessageBuffer msg;
  319. if (!queryNodeComm().recv(msg, RANK_ALL, MPTAG_THORREGISTRATION, &sender, MAX_SLAVEREG_DELAY))
  320. {
  321. PROGLOG("Slaves not responding to cluster initialization: ");
  322. unsigned s=0;
  323. for (;;)
  324. {
  325. unsigned ns = status->scan(s, false);
  326. if (ns<s || ns >= slaves)
  327. break;
  328. s = ns+1;
  329. StringBuffer str;
  330. PROGLOG("Slave %d (%s)", s, queryNodeGroup().queryNode(s).endpoint().getUrlStr(str.clear()).str());
  331. }
  332. throw MakeThorException(TE_AbortException, "Slaves failed to respond to cluster initialization");
  333. }
  334. StringBuffer str;
  335. PROGLOG("Registration confirmation from %s", queryNodeGroup().queryNode(sender).endpoint().getUrlStr(str).str());
  336. if (msg.length())
  337. {
  338. Owned<IException> e = deserializeException(msg);
  339. EXCLOG(e, "Registration error");
  340. if (TE_FailedToRegisterSlave == e->errorCode())
  341. {
  342. setExitCode(0); // to avoid thor auto-recycling
  343. return false;
  344. }
  345. }
  346. registerNode(sender-1);
  347. }
  348. // this is like a barrier, let slaves know all slaves are now connected
  349. PROGLOG("Slaves initialized");
  350. unsigned s=0;
  351. for (; s<slaves; s++)
  352. {
  353. CMessageBuffer msg;
  354. if (!queryNodeComm().send(msg, s+1, MPTAG_THORREGISTRATION))
  355. {
  356. PROGLOG("Failed to acknowledge slave %d registration", s+1);
  357. return false;
  358. }
  359. }
  360. if (watchdog)
  361. watchdog->start();
  362. deregistrationWatch.start();
  363. return true;
  364. }
  365. catch (IException *e)
  366. {
  367. EXCLOG(e, "Slave registration exception");
  368. e->Release();
  369. }
  370. shutdown();
  371. return false;
  372. }
  373. void stop()
  374. {
  375. if (stopped)
  376. return;
  377. stopped = true;
  378. deregistrationWatch.stop();
  379. queryWorldCommunicator().cancel(NULL, MPTAG_THORREGISTRATION);
  380. }
  381. void shutdown()
  382. {
  383. CriticalBlock block(crit);
  384. unsigned i=0;
  385. mptag_t shutdownTag = createReplyTag();
  386. for (; i<queryNodeClusterWidth(); i++)
  387. {
  388. if (status->test(i))
  389. {
  390. SocketEndpoint ep = queryNodeGroup().queryNode(i+1).endpoint();
  391. CMessageBuffer msg;
  392. msg.append((unsigned)Shutdown);
  393. serializeMPtag(msg, shutdownTag);
  394. try
  395. {
  396. queryNodeComm().send(msg, i+1, masterSlaveMpTag, MP_ASYNC_SEND);
  397. }
  398. catch (IMP_Exception *e) { e->Release(); }
  399. catch (IException *e)
  400. {
  401. EXCLOG(e, "Shutting down slave");
  402. e->Release();
  403. }
  404. if (watchdog)
  405. watchdog->removeSlave(ep);
  406. }
  407. }
  408. CTimeMon tm(20000);
  409. unsigned numReplied = 0;
  410. while (numReplied < slavesRegistered)
  411. {
  412. unsigned remaining;
  413. if (tm.timedout(&remaining))
  414. {
  415. PROGLOG("Timeout waiting for Shutdown reply from slave(s) (%u replied out of %u total)", numReplied, slavesRegistered);
  416. StringBuffer slaveList;
  417. for (i=0;i<slavesRegistered;i++)
  418. {
  419. if (status->test(i))
  420. {
  421. if (slaveList.length())
  422. slaveList.append(",");
  423. slaveList.append(i+1);
  424. }
  425. }
  426. if (slaveList.length())
  427. PROGLOG("Slaves that have not replied: %s", slaveList.str());
  428. break;
  429. }
  430. try
  431. {
  432. rank_t sender;
  433. CMessageBuffer msg;
  434. if (queryNodeComm().recv(msg, RANK_ALL, shutdownTag, &sender, remaining))
  435. {
  436. if (sender) // paranoid, sender should always be > 0
  437. status->set(sender-1, false);
  438. numReplied++;
  439. }
  440. }
  441. catch (IException *e)
  442. {
  443. // do not log MP link closed exceptions from ending slaves
  444. e->Release();
  445. }
  446. }
  447. }
  448. };
  449. CriticalSection CRegistryServer::regCrit;
  450. CRegistryServer *CRegistryServer::registryServer = NULL;
  451. //
  452. //////////////////
  453. bool checkClusterRelicateDAFS(IGroup &grp)
  454. {
  455. // check the dafilesrv is running (and right version)
  456. unsigned start = msTick();
  457. PROGLOG("Checking cluster replicate nodes");
  458. SocketEndpointArray epa;
  459. grp.getSocketEndpoints(epa);
  460. ForEachItemIn(i1,epa) {
  461. epa.element(i1).port = getDaliServixPort();
  462. }
  463. SocketEndpointArray failures;
  464. UnsignedArray failedcodes;
  465. StringArray failedmessages;
  466. validateNodes(epa,NULL,NULL,true,failures,failedcodes,failedmessages);
  467. ForEachItemIn(i,failures) {
  468. SocketEndpoint ep(failures.item(i));
  469. ep.port = 0;
  470. StringBuffer ips;
  471. ep.getIpText(ips);
  472. FLLOG(MCoperatorError, thorJob, "VALIDATE FAILED(%d) %s : %s",failedcodes.item(i),ips.str(),failedmessages.item(i));
  473. }
  474. PROGLOG("Cluster replicate nodes check completed in %dms",msTick()-start);
  475. return (failures.ordinality()==0);
  476. }
  477. static bool auditStartLogged = false;
  478. static bool firstCtrlC = true;
  479. bool ControlHandler(ahType type)
  480. {
  481. // MCK - NOTE: this routine may make calls to non-async-signal safe functions
  482. // (such as malloc) that really should not be made if we are called
  483. // from a signal handler - start end handler timer to always end
  484. if (thorEndHandler)
  485. thorEndHandler->start(120);
  486. if (ahInterrupt == type)
  487. {
  488. if (firstCtrlC)
  489. {
  490. LOG(MCdebugProgress, thorJob, "CTRL-C detected");
  491. firstCtrlC = false;
  492. {
  493. Owned<CRegistryServer> registry = CRegistryServer::getRegistryServer();
  494. if (registry)
  495. registry->stop();
  496. }
  497. abortThor(NULL, TEC_CtrlC);
  498. }
  499. else
  500. {
  501. LOG(MCdebugProgress, thorJob, "2nd CTRL-C detected - terminating process");
  502. if (auditStartLogged)
  503. {
  504. auditStartLogged = false;
  505. LOG(MCauditInfo,",Progress,Thor,Terminate,%s,%s,%s,ctrlc",
  506. queryServerStatus().queryProperties()->queryProp("@thorname"),
  507. queryServerStatus().queryProperties()->queryProp("@nodeGroup"),
  508. queryServerStatus().queryProperties()->queryProp("@queue"));
  509. }
  510. queryLogMsgManager()->flushQueue(10*1000);
  511. _exit(TEC_CtrlC);
  512. }
  513. }
  514. // ahTerminate
  515. else
  516. {
  517. LOG(MCdebugProgress, thorJob, "SIGTERM detected, shutting down");
  518. Owned<CRegistryServer> registry = CRegistryServer::getRegistryServer();
  519. if (registry)
  520. registry->stop();
  521. abortThor(NULL, TEC_Clean);
  522. }
  523. return false;
  524. }
  525. #include "thactivitymaster.hpp"
  526. int main( int argc, const char *argv[] )
  527. {
  528. #ifndef _CONTAINERIZED
  529. for (unsigned i=0;i<(unsigned)argc;i++) {
  530. if (streq(argv[i],"--daemon") || streq(argv[i],"-d")) {
  531. if (daemon(1,0) || write_pidfile(argv[++i])) {
  532. perror("Failed to daemonize");
  533. return EXIT_FAILURE;
  534. }
  535. break;
  536. }
  537. }
  538. #endif
  539. #if defined(WIN32) && defined(_DEBUG)
  540. int tmpFlag = _CrtSetDbgFlag( _CRTDBG_REPORT_FLAG );
  541. tmpFlag |= _CRTDBG_LEAK_CHECK_DF;
  542. _CrtSetDbgFlag( tmpFlag );
  543. #endif
  544. loadMasters(); // actually just a dummy call to ensure dll linked
  545. InitModuleObjects();
  546. NoQuickEditSection xxx;
  547. {
  548. globals.setown(loadConfiguration(thorDefaultConfigYaml, argv, "thor", "THOR", "thor.xml", nullptr));
  549. }
  550. #ifdef _DEBUG
  551. unsigned holdSlave = globals->getPropInt("@holdSlave", NotFound);
  552. if (0 == holdSlave) // master
  553. {
  554. DBGLOG("Thor master paused for debugging purposes, attach and set held=false to release");
  555. bool held = true;
  556. while (held)
  557. Sleep(5);
  558. }
  559. #endif
  560. setStatisticsComponentName(SCTthor, globals->queryProp("@name"), true);
  561. globals->setProp("@masterBuildTag", hpccBuildTag);
  562. setIORetryCount(globals->getPropInt("Debug/@ioRetries")); // default == 0 == off
  563. StringBuffer daliServer;
  564. if (!globals->getProp("@daliServers", daliServer))
  565. {
  566. LOG(MCerror, thorJob, "No Dali server list specified in THOR.XML (daliServers=iport,iport...)\n");
  567. return 0; // no recycle
  568. }
  569. SocketEndpoint thorEp;
  570. const char *master = globals->queryProp("@master");
  571. if (master)
  572. {
  573. thorEp.set(master);
  574. thorEp.setLocalHost(thorEp.port);
  575. }
  576. else
  577. thorEp.setLocalHost(0);
  578. if (0 == thorEp.port)
  579. thorEp.port = globals->getPropInt("@masterport", THOR_BASE_PORT);
  580. // Remove sentinel asap
  581. Owned<IFile> sentinelFile = createSentinelTarget();
  582. removeSentinelFile(sentinelFile);
  583. EnableSEHtoExceptionMapping();
  584. #ifndef __64BIT__
  585. // Restrict stack sizes on 32-bit systems
  586. Thread::setDefaultStackSize(0x10000); // NB under windows requires linker setting (/stack:)
  587. #endif
  588. const char *thorname = NULL;
  589. StringBuffer nodeGroup, logUrl;
  590. #ifndef _CONTAINERIZED
  591. unsigned slavesPerNode = globals->getPropInt("@slavesPerNode", 1);
  592. #endif
  593. unsigned channelsPerWorker;
  594. if (globals->hasProp("@channelsPerWorker"))
  595. channelsPerWorker = globals->getPropInt("@channelsPerWorker", 1);
  596. else
  597. { // for backward compatiblity only
  598. channelsPerWorker = globals->getPropInt("@channelsPerSlave", 1);
  599. globals->setPropInt("@channelsPerWorker", channelsPerWorker);
  600. }
  601. installDefaultFileHooks(globals);
  602. ILogMsgHandler *logHandler;
  603. try
  604. {
  605. #ifndef _CONTAINERIZED
  606. {
  607. Owned<IComponentLogFileCreator> lf = createComponentLogFileCreator(globals, "thor");
  608. lf->setName("thormaster");//override default filename
  609. lf->setCreateAliasFile(false);
  610. logHandler = lf->beginLogging();
  611. createUNCFilename(lf->queryLogFileSpec(), logUrl, false);
  612. #ifndef _DEBUG
  613. // keep duplicate logging output to stderr to aide debugging
  614. queryLogMsgManager()->removeMonitor(queryStderrLogMsgHandler());
  615. #endif
  616. LOG(MCdebugProgress, thorJob, "Opened log file %s", logUrl.str());
  617. }
  618. #else
  619. setupContainerizedLogMsgHandler();
  620. logHandler = queryStderrLogMsgHandler();
  621. logUrl.set("stderr");
  622. #endif
  623. LOG(MCdebugProgress, thorJob, "Build %s", hpccBuildTag);
  624. Owned<IGroup> serverGroup = createIGroupRetry(daliServer.str(), DALI_SERVER_PORT);
  625. unsigned retry = 0;
  626. for (;;)
  627. {
  628. try
  629. {
  630. LOG(MCdebugProgress, thorJob, "calling initClientProcess %d", thorEp.port);
  631. initClientProcess(serverGroup, DCR_ThorMaster, thorEp.port, nullptr, nullptr, MP_WAIT_FOREVER, true);
  632. if (0 == thorEp.port)
  633. thorEp.port = queryMyNode()->endpoint().port;
  634. // both same
  635. setMasterPortBase(thorEp.port);
  636. setMachinePortBase(thorEp.port);
  637. break;
  638. }
  639. catch (IJSOCK_Exception *e)
  640. {
  641. if ((e->errorCode()!=JSOCKERR_port_in_use))
  642. throw;
  643. FLLOG(MCexception(e), thorJob, e,"InitClientProcess");
  644. if (retry++>10)
  645. throw;
  646. e->Release();
  647. LOG(MCdebugProgress, thorJob, "Retrying");
  648. Sleep(retry*2000);
  649. }
  650. }
  651. initializeStorageGroups(true);
  652. if (globals->getPropBool("@MPChannelReconnect"))
  653. getMPServer()->setOpt(mpsopt_channelreopen, "true");
  654. if (globals->getPropBool("@enableSysLog",true))
  655. UseSysLogForOperatorMessages();
  656. thorname = globals->queryProp("@name");
  657. if (!thorname)
  658. {
  659. PROGLOG("No 'name' setting, defaulting to \"local\"");
  660. thorname = "local";
  661. globals->setProp("@name", thorname);
  662. }
  663. if (!globals->getProp("@nodeGroup", nodeGroup))
  664. {
  665. nodeGroup.append(thorname);
  666. globals->setProp("@nodeGroup", thorname);
  667. }
  668. if (globals->getPropBool("@useNASTranslation", true))
  669. {
  670. Owned<IPropertyTree> nasConfig = envGetNASConfiguration();
  671. if (nasConfig)
  672. globals->setPropTree("NAS", nasConfig.getLink()); // for use by slaves
  673. Owned<IPropertyTree> masterNasFilters = envGetInstallNASHooks(nasConfig, &thorEp);
  674. }
  675. HardwareInfo hdwInfo;
  676. getHardwareInfo(hdwInfo);
  677. globals->setPropInt("@masterTotalMem", hdwInfo.totalMemory);
  678. unsigned mmemSize = globals->getPropInt("@masterMemorySize"); // in MB
  679. unsigned gmemSize = globals->getPropInt("@globalMemorySize"); // in MB
  680. if (0 == gmemSize)
  681. {
  682. const char *workerResourcedMemory = globals->queryProp("workerResources/@memory");
  683. if (!isEmptyString(workerResourcedMemory))
  684. {
  685. offset_t sizeBytes = friendlyStringToSize(workerResourcedMemory);
  686. gmemSize = (unsigned)(sizeBytes / 0x100000);
  687. }
  688. else
  689. {
  690. unsigned maxMem = hdwInfo.totalMemory;
  691. #ifdef _WIN32
  692. if (maxMem > 2048)
  693. maxMem = 2048;
  694. #else
  695. #ifndef __64BIT__
  696. if (maxMem > 2048)
  697. {
  698. // 32 bit OS doesn't handle whole physically installed RAM
  699. maxMem = 2048;
  700. }
  701. #ifdef __ARM_ARCH_7A__
  702. // For ChromeBook with 2GB RAM
  703. if (maxMem <= 2048)
  704. {
  705. // Decrease max memory to 2/3
  706. maxMem = maxMem * 2 / 3;
  707. }
  708. #endif
  709. #endif
  710. #endif
  711. #ifndef _CONTAINERIZED
  712. if (globals->getPropBool("@localThor") && 0 == mmemSize)
  713. {
  714. gmemSize = maxMem / 2; // 50% of total for slaves
  715. mmemSize = maxMem / 4; // 25% of total for master
  716. }
  717. else
  718. #endif
  719. {
  720. gmemSize = maxMem * 3 / 4; // 75% of total for slaves
  721. }
  722. }
  723. unsigned perSlaveSize = gmemSize;
  724. #ifndef _CONTAINERIZED
  725. if (slavesPerNode>1)
  726. {
  727. PROGLOG("Sharing globalMemorySize(%d MB), between %d slave processes. %d MB each", perSlaveSize, slavesPerNode, perSlaveSize / slavesPerNode);
  728. perSlaveSize /= slavesPerNode;
  729. }
  730. #endif
  731. globals->setPropInt("@globalMemorySize", perSlaveSize);
  732. }
  733. else
  734. {
  735. if (gmemSize >= hdwInfo.totalMemory)
  736. {
  737. // should prob. error here
  738. }
  739. }
  740. if (0 == mmemSize)
  741. {
  742. const char *managerResourcedMemory = globals->queryProp("managerResources/@memory");
  743. if (!isEmptyString(managerResourcedMemory))
  744. {
  745. offset_t sizeBytes = friendlyStringToSize(managerResourcedMemory);
  746. mmemSize = (unsigned)(sizeBytes / 0x100000);
  747. }
  748. else
  749. mmemSize = gmemSize; // default to same as slaves
  750. }
  751. bool gmemAllowHugePages = globals->getPropBool("@heapUseHugePages", false);
  752. gmemAllowHugePages = globals->getPropBool("@heapMasterUseHugePages", gmemAllowHugePages);
  753. bool gmemAllowTransparentHugePages = globals->getPropBool("@heapUseTransparentHugePages", true);
  754. bool gmemRetainMemory = globals->getPropBool("@heapRetainMemory", false);
  755. // if @masterMemorySize and @globalMemorySize unspecified gmemSize will be default based on h/w
  756. globals->setPropInt("@masterMemorySize", mmemSize);
  757. PROGLOG("Global memory size = %d MB", mmemSize);
  758. roxiemem::setTotalMemoryLimit(gmemAllowHugePages, gmemAllowTransparentHugePages, gmemRetainMemory, ((memsize_t)mmemSize) * 0x100000, 0, thorAllocSizes, NULL);
  759. const char * overrideBaseDirectory = globals->queryProp("@thorDataDirectory");
  760. const char * overrideReplicateDirectory = globals->queryProp("@thorReplicateDirectory");
  761. StringBuffer datadir;
  762. StringBuffer repdir;
  763. if (getConfigurationDirectory(globals->queryPropTree("Directories"),"data","thor",globals->queryProp("@name"),datadir))
  764. overrideBaseDirectory = datadir.str();
  765. if (getConfigurationDirectory(globals->queryPropTree("Directories"),"mirror","thor",globals->queryProp("@name"),repdir))
  766. overrideReplicateDirectory = repdir.str();
  767. if (overrideBaseDirectory&&*overrideBaseDirectory)
  768. setBaseDirectory(overrideBaseDirectory, false);
  769. if (overrideReplicateDirectory&&*overrideBaseDirectory)
  770. setBaseDirectory(overrideReplicateDirectory, true);
  771. StringBuffer tempDirStr;
  772. if (!getConfigurationDirectory(globals->queryPropTree("Directories"),"temp","thor",globals->queryProp("@name"), tempDirStr))
  773. {
  774. tempDirStr.append(globals->queryProp("@thorTempDirectory"));
  775. if (0 == tempDirStr.length())
  776. {
  777. appendCurrentDirectory(tempDirStr, true);
  778. if (tempDirStr.length())
  779. addPathSepChar(tempDirStr);
  780. tempDirStr.append("temp");
  781. }
  782. }
  783. globals->setProp("@thorTempDirectory", tempDirStr);
  784. logDiskSpace(); // Log before temp space is cleared
  785. StringBuffer tempPrefix("thtmp");
  786. tempPrefix.append(getMasterPortBase()).append("_");
  787. SetTempDir(0, tempDirStr.str(), tempPrefix.str(), true);
  788. DBGLOG("Temp directory: %s", queryTempDir());
  789. char thorPath[1024];
  790. if (!GetCurrentDirectory(1024, thorPath))
  791. {
  792. OERRLOG("ThorMaster::main: Current directory path too big, setting it to null");
  793. thorPath[0] = 0;
  794. }
  795. unsigned l = strlen(thorPath);
  796. if (l) { thorPath[l] = PATHSEPCHAR; thorPath[l+1] = '\0'; }
  797. globals->setProp("@thorPath", thorPath);
  798. StringBuffer soDir, soPath;
  799. if (getConfigurationDirectory(globals->queryPropTree("Directories"),"query","thor",globals->queryProp("@name"),soDir))
  800. globals->setProp("@query_so_dir", soDir.str());
  801. else if (!globals->getProp("@query_so_dir", soDir)) {
  802. globals->setProp("@query_so_dir", DEFAULT_QUERY_SO_DIR);
  803. soDir.append(DEFAULT_QUERY_SO_DIR);
  804. }
  805. if (isAbsolutePath(soDir.str()))
  806. soPath.append(soDir);
  807. else
  808. {
  809. soPath.append(thorPath);
  810. addPathSepChar(soPath);
  811. soPath.append(soDir);
  812. }
  813. addPathSepChar(soPath);
  814. globals->setProp("@query_so_dir", soPath.str());
  815. recursiveCreateDirectory(soPath.str());
  816. startLogMsgParentReceiver();
  817. connectLogMsgManagerToDali();
  818. if (globals->getPropBool("@cache_dafilesrv_master",false))
  819. setDaliServixSocketCaching(true); // speeds up deletes under linux
  820. }
  821. catch (IException *e)
  822. {
  823. FLLOG(MCexception(e), thorJob, e,"ThorMaster");
  824. e->Release();
  825. return -1;
  826. }
  827. StringBuffer queueName;
  828. SCMStringBuffer _queueNames;
  829. const char *thorName = globals->queryProp("@name");
  830. if (!thorName) thorName = "thor";
  831. getThorQueueNames(_queueNames, thorName);
  832. queueName.set(_queueNames.str());
  833. Owned<IException> exception;
  834. StringBuffer cloudJobName;
  835. try
  836. {
  837. CSDSServerStatus &serverStatus = openThorServerStatus();
  838. Owned<CRegistryServer> registry = new CRegistryServer();
  839. serverStatus.queryProperties()->setProp("@thorname", thorname);
  840. serverStatus.queryProperties()->setProp("@cluster", nodeGroup.str()); // JCSMORE rename
  841. serverStatus.queryProperties()->setProp("LogFile", logUrl.str()); // LogFile read by eclwatch (possibly)
  842. serverStatus.queryProperties()->setProp("@nodeGroup", nodeGroup.str());
  843. serverStatus.queryProperties()->setProp("@queue", queueName.str());
  844. serverStatus.commitProperties();
  845. addAbortHandler(ControlHandler);
  846. masterSlaveMpTag = allocateClusterMPTag();
  847. kjServiceMpTag = allocateClusterMPTag();
  848. unsigned numWorkers = 0;
  849. const char *workunit = nullptr;
  850. const char *graphName = nullptr;
  851. #ifdef _CONTAINERIZED
  852. workunit = globals->queryProp("@workunit");
  853. graphName = globals->queryProp("@graphName");
  854. if (isEmptyString(workunit))
  855. throw makeStringException(0, "missing --workunit");
  856. if (isEmptyString(graphName))
  857. throw makeStringException(0, "missing --graphName");
  858. setDefaultJobId(workunit);
  859. StringBuffer thorEpStr;
  860. LOG(MCdebugProgress, thorJob, "ThorMaster version %d.%d, Started on %s", THOR_VERSION_MAJOR,THOR_VERSION_MINOR,thorEp.getUrlStr(thorEpStr).str());
  861. LOG(MCdebugProgress, thorJob, "Thor name = %s, queue = %s, nodeGroup = %s",thorname,queueName.str(),nodeGroup.str());
  862. if (!globals->hasProp("@numWorkers"))
  863. throw makeStringException(0, "Default number of workers not defined (numWorkers)");
  864. else
  865. {
  866. // check 'numWorkers' workunit option.
  867. Owned<IWorkUnitFactory> factory = getWorkUnitFactory();
  868. Owned<IConstWorkUnit> wuRead = factory->openWorkUnit(workunit);
  869. if (!wuRead)
  870. throw makeStringExceptionV(0, "Cannot open workunit: %s", workunit);
  871. if (wuRead->hasDebugValue("numWorkers"))
  872. numWorkers = wuRead->getDebugValueInt("numWorkers", 0);
  873. else
  874. numWorkers = globals->getPropInt("@numWorkers", 0);
  875. if (0 == numWorkers)
  876. throw makeStringException(0, "Number of workers must be > 0 (numWorkers)");
  877. }
  878. cloudJobName.appendf("%s-%s", workunit, graphName);
  879. StringBuffer myEp;
  880. queryMyNode()->endpoint().getUrlStr(myEp);
  881. applyK8sYaml("thorworker", workunit, cloudJobName, "jobspec", { { "graphName", graphName}, { "master", myEp.str() }, { "%numWorkers", std::to_string(numWorkers)} }, false);
  882. #else
  883. StringBuffer thorEpStr;
  884. LOG(MCdebugProgress, thorJob, "ThorMaster version %d.%d, Started on %s", THOR_VERSION_MAJOR,THOR_VERSION_MINOR,thorEp.getUrlStr(thorEpStr).str());
  885. LOG(MCdebugProgress, thorJob, "Thor name = %s, queue = %s, nodeGroup = %s",thorname,queueName.str(),nodeGroup.str());
  886. unsigned localThorPortInc = globals->getPropInt("@localThorPortInc", DEFAULT_SLAVEPORTINC);
  887. unsigned slaveBasePort = globals->getPropInt("@slaveport", DEFAULT_THORSLAVEPORT);
  888. Owned<IGroup> rawGroup = getClusterNodeGroup(thorname, "ThorCluster");
  889. setClusterGroup(queryMyNode(), rawGroup, slavesPerNode, channelsPerWorker, slaveBasePort, localThorPortInc);
  890. numWorkers = queryNodeClusterWidth();
  891. #endif
  892. if (registry->connect(numWorkers))
  893. {
  894. if (globals->getPropBool("@replicateOutputs")&&globals->getPropBool("@validateDAFS",true)&&!checkClusterRelicateDAFS(queryNodeGroup()))
  895. {
  896. FLLOG(MCoperatorError, thorJob, "ERROR: Validate failure(s) detected, exiting Thor");
  897. return globals->getPropBool("@validateDAFSretCode"); // default is no recycle!
  898. }
  899. unsigned totSlaveProcs = queryNodeClusterWidth();
  900. for (unsigned s=0; s<totSlaveProcs; s++)
  901. {
  902. StringBuffer slaveStr;
  903. for (unsigned c=0; c<channelsPerWorker; c++)
  904. {
  905. unsigned o = s + (c * totSlaveProcs);
  906. if (c)
  907. slaveStr.append(",");
  908. slaveStr.append(o+1);
  909. }
  910. StringBuffer virtStr;
  911. if (channelsPerWorker>1)
  912. virtStr.append("virtual slaves:");
  913. else
  914. virtStr.append("slave:");
  915. PROGLOG("Slave log %u contains %s %s", s+1, virtStr.str(), slaveStr.str());
  916. }
  917. PROGLOG("verifying mp connection to rest of cluster");
  918. if (!queryNodeComm().verifyAll(false, 1000*60*30, 1000*60))
  919. throwStringExceptionV(0, "Failed to connect to all nodes");
  920. PROGLOG("verified mp connection to rest of cluster");
  921. #ifdef _CONTAINERIZED
  922. if (globals->getPropBool("@_dafsStorage"))
  923. {
  924. /* NB: This option is a developer option only.
  925. * It is intended to be used to bring up a temporary Thor instance that uses local node storage,
  926. * as the data plane.
  927. *
  928. * It is likely to be deprecated or need reworking, when DFS is refactored to use SP's properly.
  929. *
  930. * The mechanism works by:
  931. * a) Creating a pseudo StoragePlane (publishes group to Dali).
  932. * b) Spins up a dafilesrv thread in each slave container.
  933. * c) Changes the default StoragePlane used to publish files, to point to the SP/group created in step (a).
  934. *
  935. * In this way, a Thor instance, whilst up, will act similarly to a bare-metal system, using local disks as storage.
  936. * This allows quick cloud based allocation/simulation of bare-metal type clusters for testing purposes.
  937. *
  938. * NB: This isn't a real StoragePlane, and it will not be accessible by any other component.
  939. *
  940. */
  941. StringBuffer uniqueGrpName;
  942. queryNamedGroupStore().addUnique(&queryProcessGroup(), uniqueGrpName);
  943. // change default plane
  944. queryComponentConfig().setProp("storagePlane", uniqueGrpName);
  945. PROGLOG("Persistent Thor group created with group name: %s", uniqueGrpName.str());
  946. }
  947. #endif
  948. LOG(MCauditInfo, ",Progress,Thor,Startup,%s,%s,%s,%s",nodeGroup.str(),thorname,queueName.str(),logUrl.str());
  949. auditStartLogged = true;
  950. writeSentinelFile(sentinelFile);
  951. #ifndef _CONTAINERIZED
  952. unsigned pinterval = globals->getPropInt("@system_monitor_interval",1000*60);
  953. if (pinterval)
  954. startPerformanceMonitor(pinterval, PerfMonStandard, nullptr);
  955. #endif
  956. // NB: workunit/graphName only set in one-shot mode (if isCloud())
  957. thorMain(logHandler, workunit, graphName);
  958. LOG(MCauditInfo, ",Progress,Thor,Terminate,%s,%s,%s",thorname,nodeGroup.str(),queueName.str());
  959. }
  960. else
  961. PROGLOG("Registration aborted");
  962. registry.clear();
  963. LOG(MCdebugProgress, thorJob, "ThorMaster terminated OK");
  964. }
  965. catch (IException *e)
  966. {
  967. FLLOG(MCexception(e), thorJob, e,"ThorMaster");
  968. exception.setown(e);
  969. }
  970. #ifdef _CONTAINERIZED
  971. if (!cloudJobName.isEmpty())
  972. {
  973. KeepK8sJobs keepJob = translateKeepJobs(globals->queryProp("@keepJobs"));
  974. if (keepJob != KeepK8sJobs::all)
  975. {
  976. // Delete jobs unless the pod failed and keepJob==podfailures
  977. if ((nullptr == exception) || (KeepK8sJobs::podfailures != keepJob))
  978. deleteK8sResource("thorworker", cloudJobName, "job");
  979. }
  980. }
  981. setExitCode(0);
  982. #endif
  983. // cleanup handler to be sure we end
  984. thorEndHandler->start(30);
  985. PROGLOG("Thor closing down 5");
  986. #ifndef _CONTAINERIZED
  987. stopPerformanceMonitor();
  988. #endif
  989. disconnectLogMsgManagerFromDali();
  990. closeThorServerStatus();
  991. PROGLOG("Thor closing down 4");
  992. closeDllServer();
  993. PROGLOG("Thor closing down 3");
  994. closeEnvironment();
  995. PROGLOG("Thor closing down 2");
  996. closedownClientProcess();
  997. PROGLOG("Thor closing down 1");
  998. UseSysLogForOperatorMessages(false);
  999. releaseAtoms(); // don't know why we can't use a module_exit to destruct this...
  1000. return queryExitCode();
  1001. }