thmastermain.cpp 40 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118
  1. /*##############################################################################
  2. HPCC SYSTEMS software Copyright (C) 2012 HPCC Systems®.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. ############################################################################## */
  13. // Entrypoint for ThorMaster.EXE
  14. #include "platform.h"
  15. #include <stddef.h>
  16. #include <stdlib.h>
  17. #include <assert.h>
  18. #include <stdio.h>
  19. #include <stdlib.h>
  20. #ifdef _WIN32
  21. #include <direct.h>
  22. #endif
  23. #include "jlib.hpp"
  24. #include "jdebug.hpp"
  25. #include "jfile.hpp"
  26. #include "jmisc.hpp"
  27. #include "jmutex.hpp"
  28. #include "jprop.hpp"
  29. #include "jset.hpp"
  30. #include "jsocket.hpp"
  31. #include "jthread.hpp"
  32. #include "jexcept.hpp"
  33. #include "mpbase.hpp"
  34. #include "mplog.hpp"
  35. #include "daaudit.hpp"
  36. #include "daclient.hpp"
  37. #include "dadfs.hpp"
  38. #include "dalienv.hpp"
  39. #include "dasds.hpp"
  40. #include "dllserver.hpp"
  41. #include "workunit.hpp"
  42. #include "rmtfile.hpp"
  43. #include "portlist.h"
  44. #include "thor.hpp"
  45. #include "thorport.hpp"
  46. #include "thormisc.hpp"
  47. #include "thgraph.hpp"
  48. #include "thgraphmaster.hpp"
  49. #include "thgraphmanager.hpp"
  50. #include "thmastermain.hpp"
  51. #include "mawatchdog.hpp"
  52. #include "thexception.hpp"
  53. #include "thmem.hpp"
  54. #ifndef _CONTAINERIED
  55. #define DEFAULT_QUERY_SO_DIR "sodir"
  56. #endif
  57. #define MAX_SLAVEREG_DELAY 60*1000*15 // 15 mins
  58. #define SLAVEREG_VERIFY_DELAY 5*1000
  59. #define SHUTDOWN_IN_PARALLEL 20
  60. class CThorEndHandler : implements IThreaded
  61. {
  62. CThreaded threaded;
  63. unsigned timeout = 30000;
  64. std::atomic<bool> started{false};
  65. std::atomic<bool> stopped{false};
  66. Semaphore sem;
  67. public:
  68. CThorEndHandler() : threaded("CThorEndHandler")
  69. {
  70. threaded.init(this); // starts thread
  71. }
  72. ~CThorEndHandler()
  73. {
  74. stop();
  75. threaded.join(timeout);
  76. }
  77. void start(unsigned timeoutSecs)
  78. {
  79. bool expected = false;
  80. if (started.compare_exchange_strong(expected, true))
  81. {
  82. timeout = timeoutSecs * 1000; // sem_post and sem_wait are mem_barriers
  83. sem.signal();
  84. }
  85. }
  86. void stop()
  87. {
  88. bool expected = false;
  89. if (stopped.compare_exchange_strong(expected, true))
  90. sem.signal();
  91. }
  92. virtual void threadmain() override
  93. {
  94. // wait to be signalled to start timer
  95. sem.wait();
  96. if (stopped)
  97. return;
  98. if (!sem.wait(timeout))
  99. {
  100. // if it wasn't set by now then it's -1 and Thor restarts ...
  101. int eCode = queryExitCode();
  102. _exit(eCode);
  103. }
  104. }
  105. };
  106. static CThorEndHandler *thorEndHandler = nullptr;
  107. MODULE_INIT(INIT_PRIORITY_STANDARD)
  108. {
  109. /* NB: CThorEndHandler starts the thread now, although strictly it is not needed until later.
  110. * This is to avoid requiring the thread to be started in a unsafe context, e.g. a signal handler
  111. */
  112. thorEndHandler = new CThorEndHandler();
  113. return true;
  114. }
  115. MODULE_EXIT()
  116. {
  117. if (thorEndHandler)
  118. delete thorEndHandler;
  119. }
  120. class CRegistryServer : public CSimpleInterface
  121. {
  122. unsigned msgDelay, slavesRegistered;
  123. CriticalSection crit;
  124. bool stopped = false;
  125. static CriticalSection regCrit;
  126. static CRegistryServer *registryServer;
  127. class CDeregistrationWatch : implements IThreaded
  128. {
  129. CThreaded threaded;
  130. CRegistryServer &registry;
  131. std::atomic<bool> running;
  132. public:
  133. CDeregistrationWatch(CRegistryServer &_registry) : threaded("CDeregistrationWatch"), registry(_registry), running(false) { }
  134. ~CDeregistrationWatch()
  135. {
  136. stop();
  137. }
  138. void start() { threaded.init(this); }
  139. void stop()
  140. {
  141. if (running)
  142. {
  143. running = false;
  144. queryWorldCommunicator().cancel(NULL, MPTAG_THORREGISTRATION);
  145. threaded.join();
  146. }
  147. }
  148. virtual void threadmain() override
  149. {
  150. running = true;
  151. for (;;)
  152. {
  153. INode *senderNode;
  154. CMessageBuffer msg;
  155. if (!queryWorldCommunicator().recv(msg, NULL, MPTAG_THORREGISTRATION, &senderNode))
  156. return;
  157. rank_t sender = queryNodeGroup().rank(senderNode);
  158. SocketEndpoint ep = senderNode->endpoint();
  159. StringBuffer url;
  160. ep.getUrlStr(url);
  161. if (RANK_NULL == sender)
  162. {
  163. PROGLOG("Node %s trying to deregister is not part of this cluster", url.str());
  164. continue;
  165. }
  166. RegistryCode code;
  167. readUnderlyingType<RegistryCode>(msg, code);
  168. if (rc_deregister != code)
  169. throwUnexpected();
  170. Owned<IException> e = deserializeException(msg);
  171. if (e.get())
  172. EXCLOG(e, "Slave unregistered with exception");
  173. registry.deregisterNode(sender-1);
  174. }
  175. running = false;
  176. }
  177. } deregistrationWatch;
  178. public:
  179. Linked<CMasterWatchdogBase> watchdog;
  180. IBitSet *status;
  181. CRegistryServer() : deregistrationWatch(*this)
  182. {
  183. status = createThreadSafeBitSet();
  184. msgDelay = SLAVEREG_VERIFY_DELAY;
  185. slavesRegistered = 0;
  186. if (globals->getPropBool("@watchdogEnabled"))
  187. watchdog.setown(createMasterWatchdog(globals->getPropBool("@useUDPWatchdog")));
  188. else
  189. globals->setPropBool("@watchdogProgressEnabled", false);
  190. CriticalBlock b(regCrit);
  191. registryServer = this;
  192. }
  193. ~CRegistryServer()
  194. {
  195. CriticalBlock b(regCrit);
  196. registryServer = NULL;
  197. stop();
  198. if (watchdog)
  199. watchdog->stop();
  200. shutdown();
  201. status->Release();
  202. }
  203. static CRegistryServer *getRegistryServer()
  204. {
  205. CriticalBlock b(regCrit);
  206. return LINK(registryServer);
  207. }
  208. void deregisterNode(unsigned slave)
  209. {
  210. const SocketEndpoint &ep = queryNodeGroup().queryNode(slave+1).endpoint();
  211. StringBuffer url;
  212. ep.getUrlStr(url);
  213. if (!status->test(slave))
  214. {
  215. PROGLOG("Slave %d (%s) trying to unregister, but not currently registered", slave+1, url.str());
  216. return;
  217. }
  218. PROGLOG("Slave %d (%s) unregistered", slave+1, url.str());
  219. status->set(slave, false);
  220. --slavesRegistered;
  221. if (watchdog)
  222. watchdog->removeSlave(ep);
  223. abortThor(MakeThorOperatorException(TE_AbortException, "The machine %s and/or the slave was shutdown. Aborting Thor", url.str()), TEC_SlaveInit);
  224. }
  225. void registerNode(unsigned slave)
  226. {
  227. SocketEndpoint ep = queryNodeGroup().queryNode(slave+1).endpoint();
  228. StringBuffer url;
  229. ep.getUrlStr(url);
  230. if (status->test(slave))
  231. {
  232. PROGLOG("Slave %d (%s) already registered, rejecting", slave+1, url.str());
  233. return;
  234. }
  235. PROGLOG("Slave %d (%s) registered", slave+1, url.str());
  236. status->set(slave);
  237. if (watchdog)
  238. watchdog->addSlave(ep);
  239. ++slavesRegistered;
  240. }
  241. bool connect(unsigned slaves)
  242. {
  243. LOG(MCdebugProgress, thorJob, "Waiting for %d slaves to register", slaves);
  244. IPointerArrayOf<INode> connectedSlaves;
  245. connectedSlaves.ensureCapacity(slaves);
  246. unsigned remaining = slaves;
  247. INode *_sender = nullptr;
  248. CMessageBuffer msg;
  249. while (remaining)
  250. {
  251. if (!queryWorldCommunicator().recv(msg, nullptr, MPTAG_THORREGISTRATION, &_sender, MP_WAIT_FOREVER))
  252. {
  253. ::Release(_sender);
  254. PROGLOG("Failed to initialize slaves");
  255. return false;
  256. }
  257. Owned<INode> sender = _sender;
  258. if (NotFound != connectedSlaves.find(sender))
  259. {
  260. StringBuffer epStr;
  261. PROGLOG("Same slave registered twice!! : %s", sender->endpoint().getUrlStr(epStr).str());
  262. return false;
  263. }
  264. /* NB: in base metal setup, the slaves know which slave number they are in advance, and send their slavenum at registration.
  265. * In non attached storage setup, they do not send a slave by default and instead are given a # once all are registered
  266. */
  267. unsigned slaveNum;
  268. msg.read(slaveNum);
  269. if (NotFound == slaveNum)
  270. {
  271. connectedSlaves.append(sender.getLink());
  272. slaveNum = connectedSlaves.ordinality();
  273. }
  274. else
  275. {
  276. unsigned pos = slaveNum - 1; // NB: slaveNum is 1 based
  277. while (connectedSlaves.ordinality() < pos)
  278. connectedSlaves.append(nullptr);
  279. if (connectedSlaves.ordinality() == pos)
  280. connectedSlaves.append(sender.getLink());
  281. else
  282. connectedSlaves.replace(sender.getLink(), pos);
  283. }
  284. StringBuffer epStr;
  285. PROGLOG("Slave %u connected from %s", slaveNum, sender->endpoint().getUrlStr(epStr).str());
  286. --remaining;
  287. }
  288. assertex(slaves == connectedSlaves.ordinality());
  289. unsigned localThorPortInc = globals->getPropInt("@localThorPortInc", DEFAULT_SLAVEPORTINC);
  290. unsigned slaveBasePort = globals->getPropInt("@slaveport", DEFAULT_THORSLAVEPORT);
  291. unsigned channelsPerWorker = globals->getPropInt("@channelsPerWorker", 1);
  292. Owned<IGroup> processGroup;
  293. // NB: in bare metal Thor is bound to a group and cluster/communicator have alreday been setup (see earlier setClusterGroup call)
  294. if (clusterInitialized())
  295. processGroup.set(&queryProcessGroup());
  296. else
  297. {
  298. /* sort by {port, ip}
  299. * So that workers are not bunched on same node, but striped across the pod ips
  300. */
  301. auto compareINodeOrder = [](IInterface * const *ll, IInterface * const *rr)
  302. {
  303. INode *l = (INode *) *ll;
  304. INode *r = (INode *) *rr;
  305. const SocketEndpoint &lep = l->endpoint();
  306. const SocketEndpoint &rep = r->endpoint();
  307. if (lep.port < rep.port)
  308. return -1;
  309. else if (lep.port > rep.port)
  310. return 1;
  311. return lep.ipcompare(rep);
  312. };
  313. connectedSlaves.sort(compareINodeOrder);
  314. processGroup.setown(createIGroup(connectedSlaves.ordinality(), connectedSlaves.getArray()));
  315. setupCluster(queryMyNode(), processGroup, channelsPerWorker, slaveBasePort, localThorPortInc);
  316. }
  317. PROGLOG("Slaves connected, initializing..");
  318. msg.clear();
  319. msg.append(THOR_VERSION_MAJOR).append(THOR_VERSION_MINOR);
  320. processGroup->serialize(msg);
  321. globals->serialize(msg);
  322. msg.append(masterSlaveMpTag);
  323. msg.append(kjServiceMpTag);
  324. if (!queryNodeComm().send(msg, RANK_ALL_OTHER, MPTAG_THORREGISTRATION, MP_ASYNC_SEND))
  325. {
  326. PROGLOG("Failed to initialize slaves");
  327. return false;
  328. }
  329. // Wait for confirmation from slaves
  330. PROGLOG("Initialization sent to slave group");
  331. try
  332. {
  333. while (slavesRegistered < slaves)
  334. {
  335. rank_t sender;
  336. CMessageBuffer msg;
  337. if (!queryNodeComm().recv(msg, RANK_ALL, MPTAG_THORREGISTRATION, &sender, MAX_SLAVEREG_DELAY))
  338. {
  339. PROGLOG("Slaves not responding to cluster initialization: ");
  340. unsigned s=0;
  341. for (;;)
  342. {
  343. unsigned ns = status->scan(s, false);
  344. if (ns<s || ns >= slaves)
  345. break;
  346. s = ns+1;
  347. StringBuffer str;
  348. PROGLOG("Slave %d (%s)", s, queryNodeGroup().queryNode(s).endpoint().getUrlStr(str.clear()).str());
  349. }
  350. throw MakeThorException(TE_AbortException, "Slaves failed to respond to cluster initialization");
  351. }
  352. StringBuffer str;
  353. PROGLOG("Registration confirmation from %s", queryNodeGroup().queryNode(sender).endpoint().getUrlStr(str).str());
  354. if (msg.length())
  355. {
  356. Owned<IException> e = deserializeException(msg);
  357. EXCLOG(e, "Registration error");
  358. if (TE_FailedToRegisterSlave == e->errorCode())
  359. {
  360. setExitCode(0); // to avoid thor auto-recycling
  361. return false;
  362. }
  363. }
  364. registerNode(sender-1);
  365. }
  366. // this is like a barrier, let slaves know all slaves are now connected
  367. PROGLOG("Slaves initialized");
  368. unsigned s=0;
  369. for (; s<slaves; s++)
  370. {
  371. CMessageBuffer msg;
  372. if (!queryNodeComm().send(msg, s+1, MPTAG_THORREGISTRATION))
  373. {
  374. PROGLOG("Failed to acknowledge slave %d registration", s+1);
  375. return false;
  376. }
  377. }
  378. if (watchdog)
  379. watchdog->start();
  380. deregistrationWatch.start();
  381. return true;
  382. }
  383. catch (IException *e)
  384. {
  385. EXCLOG(e, "Slave registration exception");
  386. e->Release();
  387. }
  388. shutdown();
  389. return false;
  390. }
  391. void stop()
  392. {
  393. if (stopped)
  394. return;
  395. stopped = true;
  396. deregistrationWatch.stop();
  397. queryWorldCommunicator().cancel(NULL, MPTAG_THORREGISTRATION);
  398. }
  399. void shutdown()
  400. {
  401. CriticalBlock block(crit);
  402. unsigned i=0;
  403. mptag_t shutdownTag = createReplyTag();
  404. for (; i<queryNodeClusterWidth(); i++)
  405. {
  406. if (status->test(i))
  407. {
  408. SocketEndpoint ep = queryNodeGroup().queryNode(i+1).endpoint();
  409. CMessageBuffer msg;
  410. msg.append((unsigned)Shutdown);
  411. serializeMPtag(msg, shutdownTag);
  412. try
  413. {
  414. queryNodeComm().send(msg, i+1, masterSlaveMpTag, MP_ASYNC_SEND);
  415. }
  416. catch (IMP_Exception *e) { e->Release(); }
  417. catch (IException *e)
  418. {
  419. EXCLOG(e, "Shutting down slave");
  420. e->Release();
  421. }
  422. if (watchdog)
  423. watchdog->removeSlave(ep);
  424. }
  425. }
  426. CTimeMon tm(20000);
  427. unsigned numReplied = 0;
  428. while (numReplied < slavesRegistered)
  429. {
  430. unsigned remaining;
  431. if (tm.timedout(&remaining))
  432. {
  433. PROGLOG("Timeout waiting for Shutdown reply from slave(s) (%u replied out of %u total)", numReplied, slavesRegistered);
  434. StringBuffer slaveList;
  435. for (i=0;i<slavesRegistered;i++)
  436. {
  437. if (status->test(i))
  438. {
  439. if (slaveList.length())
  440. slaveList.append(",");
  441. slaveList.append(i+1);
  442. }
  443. }
  444. if (slaveList.length())
  445. PROGLOG("Slaves that have not replied: %s", slaveList.str());
  446. break;
  447. }
  448. try
  449. {
  450. rank_t sender;
  451. CMessageBuffer msg;
  452. if (queryNodeComm().recv(msg, RANK_ALL, shutdownTag, &sender, remaining))
  453. {
  454. if (sender) // paranoid, sender should always be > 0
  455. status->set(sender-1, false);
  456. numReplied++;
  457. }
  458. }
  459. catch (IException *e)
  460. {
  461. // do not log MP link closed exceptions from ending slaves
  462. e->Release();
  463. }
  464. }
  465. }
  466. };
  467. CriticalSection CRegistryServer::regCrit;
  468. CRegistryServer *CRegistryServer::registryServer = NULL;
  469. //
  470. //////////////////
  471. bool checkClusterRelicateDAFS(IGroup &grp)
  472. {
  473. // check the dafilesrv is running (and right version)
  474. unsigned start = msTick();
  475. PROGLOG("Checking cluster replicate nodes");
  476. SocketEndpointArray epa;
  477. grp.getSocketEndpoints(epa);
  478. ForEachItemIn(i1,epa) {
  479. epa.element(i1).port = getDaliServixPort();
  480. }
  481. SocketEndpointArray failures;
  482. UnsignedArray failedcodes;
  483. StringArray failedmessages;
  484. validateNodes(epa,NULL,NULL,true,failures,failedcodes,failedmessages);
  485. ForEachItemIn(i,failures) {
  486. SocketEndpoint ep(failures.item(i));
  487. ep.port = 0;
  488. StringBuffer ips;
  489. ep.getIpText(ips);
  490. FLLOG(MCoperatorError, thorJob, "VALIDATE FAILED(%d) %s : %s",failedcodes.item(i),ips.str(),failedmessages.item(i));
  491. }
  492. PROGLOG("Cluster replicate nodes check completed in %dms",msTick()-start);
  493. return (failures.ordinality()==0);
  494. }
  495. static bool auditStartLogged = false;
  496. static bool firstCtrlC = true;
  497. bool ControlHandler(ahType type)
  498. {
  499. // MCK - NOTE: this routine may make calls to non-async-signal safe functions
  500. // (such as malloc) that really should not be made if we are called
  501. // from a signal handler - start end handler timer to always end
  502. if (thorEndHandler)
  503. thorEndHandler->start(120);
  504. if (ahInterrupt == type)
  505. {
  506. if (firstCtrlC)
  507. {
  508. LOG(MCdebugProgress, thorJob, "CTRL-C detected");
  509. firstCtrlC = false;
  510. {
  511. Owned<CRegistryServer> registry = CRegistryServer::getRegistryServer();
  512. if (registry)
  513. registry->stop();
  514. }
  515. abortThor(NULL, TEC_CtrlC);
  516. }
  517. else
  518. {
  519. LOG(MCdebugProgress, thorJob, "2nd CTRL-C detected - terminating process");
  520. if (auditStartLogged)
  521. {
  522. auditStartLogged = false;
  523. LOG(MCauditInfo,",Progress,Thor,Terminate,%s,%s,%s,ctrlc",
  524. queryServerStatus().queryProperties()->queryProp("@thorname"),
  525. queryServerStatus().queryProperties()->queryProp("@nodeGroup"),
  526. queryServerStatus().queryProperties()->queryProp("@queue"));
  527. }
  528. queryLogMsgManager()->flushQueue(10*1000);
  529. _exit(TEC_CtrlC);
  530. }
  531. }
  532. // ahTerminate
  533. else
  534. {
  535. LOG(MCdebugProgress, thorJob, "SIGTERM detected, shutting down");
  536. Owned<CRegistryServer> registry = CRegistryServer::getRegistryServer();
  537. if (registry)
  538. registry->stop();
  539. abortThor(NULL, TEC_Clean);
  540. }
  541. return false;
  542. }
  543. #include "thactivitymaster.hpp"
  544. int main( int argc, const char *argv[] )
  545. {
  546. if (!checkCreateDaemon(argc, argv))
  547. return EXIT_FAILURE;
  548. #if defined(WIN32) && defined(_DEBUG)
  549. int tmpFlag = _CrtSetDbgFlag( _CRTDBG_REPORT_FLAG );
  550. tmpFlag |= _CRTDBG_LEAK_CHECK_DF;
  551. _CrtSetDbgFlag( tmpFlag );
  552. #endif
  553. loadMasters(); // actually just a dummy call to ensure dll linked
  554. InitModuleObjects();
  555. NoQuickEditSection xxx;
  556. {
  557. globals.setown(loadConfiguration(thorDefaultConfigYaml, argv, "thor", "THOR", "thor.xml", nullptr, nullptr, false));
  558. }
  559. #ifdef _DEBUG
  560. unsigned holdSlave = globals->getPropInt("@holdSlave", NotFound);
  561. if (0 == holdSlave) // master
  562. {
  563. DBGLOG("Thor master paused for debugging purposes, attach and set held=false to release");
  564. bool held = true;
  565. while (held)
  566. Sleep(5);
  567. }
  568. #endif
  569. setStatisticsComponentName(SCTthor, globals->queryProp("@name"), true);
  570. globals->setProp("@masterBuildTag", hpccBuildInfo.buildTag);
  571. setIORetryCount((unsigned)getExpertOptInt64("ioRetries")); // default == 0 == off
  572. StringBuffer daliServer;
  573. if (!globals->getProp("@daliServers", daliServer))
  574. {
  575. LOG(MCerror, thorJob, "No Dali server list specified in THOR.XML (daliServers=iport,iport...)\n");
  576. return 0; // no recycle
  577. }
  578. SocketEndpoint thorEp;
  579. const char *master = globals->queryProp("@master");
  580. if (master)
  581. {
  582. thorEp.set(master);
  583. thorEp.setLocalHost(thorEp.port);
  584. }
  585. else
  586. thorEp.setLocalHost(0);
  587. if (0 == thorEp.port)
  588. thorEp.port = globals->getPropInt("@masterport", THOR_BASE_PORT);
  589. // Remove sentinel asap
  590. Owned<IFile> sentinelFile = createSentinelTarget();
  591. removeSentinelFile(sentinelFile);
  592. EnableSEHtoExceptionMapping();
  593. #ifndef __64BIT__
  594. // Restrict stack sizes on 32-bit systems
  595. Thread::setDefaultStackSize(0x10000); // NB under windows requires linker setting (/stack:)
  596. #endif
  597. const char *thorname = NULL;
  598. StringBuffer nodeGroup, logUrl;
  599. #ifndef _CONTAINERIZED
  600. unsigned slavesPerNode = globals->getPropInt("@slavesPerNode", 1);
  601. #endif
  602. unsigned channelsPerWorker;
  603. if (globals->hasProp("@channelsPerWorker"))
  604. channelsPerWorker = globals->getPropInt("@channelsPerWorker", 1);
  605. else
  606. { // for backward compatiblity only
  607. channelsPerWorker = globals->getPropInt("@channelsPerSlave", 1);
  608. globals->setPropInt("@channelsPerWorker", channelsPerWorker);
  609. }
  610. installDefaultFileHooks(globals);
  611. ILogMsgHandler *logHandler;
  612. const char *workunit = nullptr;
  613. const char *graphName = nullptr;
  614. try
  615. {
  616. #ifndef _CONTAINERIZED
  617. {
  618. Owned<IComponentLogFileCreator> lf = createComponentLogFileCreator(globals, "thor");
  619. lf->setName("thormaster");//override default filename
  620. lf->setCreateAliasFile(false);
  621. logHandler = lf->beginLogging();
  622. createUNCFilename(lf->queryLogFileSpec(), logUrl, false);
  623. #ifndef _DEBUG
  624. // keep duplicate logging output to stderr to aide debugging
  625. queryLogMsgManager()->removeMonitor(queryStderrLogMsgHandler());
  626. #endif
  627. LOG(MCdebugProgress, thorJob, "Opened log file %s", logUrl.str());
  628. }
  629. #else
  630. setupContainerizedLogMsgHandler();
  631. logHandler = queryStderrLogMsgHandler();
  632. logUrl.set("stderr");
  633. #endif
  634. LOG(MCdebugProgress, thorJob, "Build %s", hpccBuildInfo.buildTag);
  635. Owned<IGroup> serverGroup = createIGroupRetry(daliServer.str(), DALI_SERVER_PORT);
  636. unsigned retry = 0;
  637. for (;;)
  638. {
  639. try
  640. {
  641. LOG(MCdebugProgress, thorJob, "calling initClientProcess %d", thorEp.port);
  642. initClientProcess(serverGroup, DCR_ThorMaster, thorEp.port, nullptr, nullptr, MP_WAIT_FOREVER, true);
  643. if (0 == thorEp.port)
  644. thorEp.port = queryMyNode()->endpoint().port;
  645. // both same
  646. setMasterPortBase(thorEp.port);
  647. setMachinePortBase(thorEp.port);
  648. break;
  649. }
  650. catch (IJSOCK_Exception *e)
  651. {
  652. if ((e->errorCode()!=JSOCKERR_port_in_use))
  653. throw;
  654. FLLOG(MCexception(e), thorJob, e,"InitClientProcess");
  655. if (retry++>10)
  656. throw;
  657. e->Release();
  658. LOG(MCdebugProgress, thorJob, "Retrying");
  659. Sleep(retry*2000);
  660. }
  661. }
  662. initializeStorageGroups(true);
  663. if (globals->getPropBool("@MPChannelReconnect"))
  664. getMPServer()->setOpt(mpsopt_channelreopen, "true");
  665. if (globals->getPropBool("@enableSysLog",true))
  666. UseSysLogForOperatorMessages();
  667. thorname = globals->queryProp("@name");
  668. if (!thorname)
  669. {
  670. PROGLOG("No 'name' setting, defaulting to \"local\"");
  671. thorname = "local";
  672. globals->setProp("@name", thorname);
  673. }
  674. if (!globals->getProp("@nodeGroup", nodeGroup))
  675. {
  676. nodeGroup.append(thorname);
  677. globals->setProp("@nodeGroup", thorname);
  678. }
  679. #ifndef _CONTAINERIZED
  680. if (globals->getPropBool("@useNASTranslation", true))
  681. {
  682. Owned<IPropertyTree> nasConfig = envGetNASConfiguration();
  683. if (nasConfig)
  684. globals->setPropTree("NAS", nasConfig.getLink()); // for use by slaves
  685. Owned<IPropertyTree> masterNasFilters = envGetInstallNASHooks(nasConfig, &thorEp);
  686. }
  687. #endif
  688. IPropertyTree *managerMemory = ensurePTree(globals, "managerMemory");
  689. IPropertyTree *workerMemory = ensurePTree(globals, "workerMemory");
  690. HardwareInfo hdwInfo;
  691. getHardwareInfo(hdwInfo);
  692. globals->setPropInt("@masterTotalMem", hdwInfo.totalMemory);
  693. unsigned mmemSize = globals->getPropInt("@masterMemorySize"); // in MB
  694. unsigned gmemSize = globals->getPropInt("@globalMemorySize"); // in MB
  695. if (0 == gmemSize)
  696. {
  697. // NB: This could be in a isContainerized(), but the 'workerResources' section only applies to containerized setups
  698. const char *workerResourcedMemory = globals->queryProp("workerResources/@memory");
  699. if (!isEmptyString(workerResourcedMemory))
  700. {
  701. offset_t sizeBytes = friendlyStringToSize(workerResourcedMemory);
  702. gmemSize = (unsigned)(sizeBytes / 0x100000);
  703. }
  704. else
  705. {
  706. gmemSize = hdwInfo.totalMemory;
  707. #ifdef _WIN32
  708. if (gmemSize > 2048)
  709. gmemSize = 2048;
  710. #else
  711. #ifndef __64BIT__
  712. if (gmemSize > 2048)
  713. {
  714. // 32 bit OS doesn't handle whole physically installed RAM
  715. gmemSize = 2048;
  716. }
  717. #ifdef __ARM_ARCH_7A__
  718. // For ChromeBook with 2GB RAM
  719. if (gmemSize <= 2048)
  720. {
  721. // Decrease max memory to 2/3
  722. gmemSize = gmemSize * 2 / 3;
  723. }
  724. #endif
  725. #endif
  726. #endif
  727. }
  728. #ifndef _CONTAINERIZED
  729. // @localThor mode - 25% is used for manager and 50% is used for workers
  730. // overrides recommended max percentage preferences if present
  731. if (globals->getPropBool("@localThor") && (0 == mmemSize))
  732. {
  733. managerMemory->setProp("@maxMemPercentage", "25.0");
  734. workerMemory->setPropReal("@maxMemPercentage", 50.0 / slavesPerNode);
  735. }
  736. else
  737. #endif
  738. if (!workerMemory->hasProp("@maxMemPercentage"))
  739. workerMemory->setPropReal("@maxMemPercentage", defaultPctSysMemForRoxie);
  740. }
  741. workerMemory->setPropInt("@total", gmemSize);
  742. // used by slaves in absence of specific workerMemory settings, to split available memory equally between workers
  743. unsigned numWorkersPerPod = globals->getPropInt("@numWorkersPerPod");
  744. if (numWorkersPerPod)
  745. workerMemory->setPropInt("@sharedInstances", numWorkersPerPod);
  746. if (mmemSize)
  747. {
  748. if (mmemSize > hdwInfo.totalMemory)
  749. OWARNLOG("Configured manager memory size (%u MB) is greater than total hardware memory (%u MB)", mmemSize, hdwInfo.totalMemory);
  750. }
  751. else
  752. {
  753. // NB: This could be in a isContainerized(), but the 'managerResources' section only applies to containerized setups
  754. const char *managerResourcedMemory = globals->queryProp("managerResources/@memory");
  755. if (!isEmptyString(managerResourcedMemory))
  756. {
  757. offset_t sizeBytes = friendlyStringToSize(managerResourcedMemory);
  758. mmemSize = (unsigned)(sizeBytes / 0x100000);
  759. }
  760. else
  761. mmemSize = gmemSize; // default to same as slaves
  762. if (!globals->hasProp("@globalMemorySize"))
  763. {
  764. if (!managerMemory->hasProp("@maxMemPercentage"))
  765. managerMemory->setPropReal("@maxMemPercentage", defaultPctSysMemForRoxie);
  766. }
  767. }
  768. managerMemory->setPropInt("@total", mmemSize);
  769. char thorPath[1024];
  770. if (!GetCurrentDirectory(1024, thorPath))
  771. {
  772. OERRLOG("ThorMaster::main: Current directory path too big, setting it to null");
  773. thorPath[0] = 0;
  774. }
  775. unsigned l = strlen(thorPath);
  776. if (l) { thorPath[l] = PATHSEPCHAR; thorPath[l+1] = '\0'; }
  777. globals->setProp("@thorPath", thorPath);
  778. #ifdef _CONTAINERIZED
  779. workunit = globals->queryProp("@workunit");
  780. graphName = globals->queryProp("@graphName");
  781. if (isEmptyString(workunit))
  782. throw makeStringException(0, "missing --workunit");
  783. if (isEmptyString(graphName))
  784. throw makeStringException(0, "missing --graphName");
  785. #else
  786. const char * overrideBaseDirectory = globals->queryProp("@thorDataDirectory");
  787. const char * overrideReplicateDirectory = globals->queryProp("@thorReplicateDirectory");
  788. StringBuffer datadir;
  789. StringBuffer repdir;
  790. if (getConfigurationDirectory(globals->queryPropTree("Directories"),"data","thor",globals->queryProp("@name"),datadir))
  791. overrideBaseDirectory = datadir.str();
  792. if (getConfigurationDirectory(globals->queryPropTree("Directories"),"mirror","thor",globals->queryProp("@name"),repdir))
  793. overrideReplicateDirectory = repdir.str();
  794. if (overrideBaseDirectory&&*overrideBaseDirectory)
  795. setBaseDirectory(overrideBaseDirectory, false);
  796. if (overrideReplicateDirectory&&*overrideBaseDirectory)
  797. setBaseDirectory(overrideReplicateDirectory, true);
  798. StringBuffer soDir, soPath;
  799. if (getConfigurationDirectory(globals->queryPropTree("Directories"),"query","thor",globals->queryProp("@name"),soDir))
  800. globals->setProp("@query_so_dir", soDir.str());
  801. else if (!globals->getProp("@query_so_dir", soDir)) {
  802. globals->setProp("@query_so_dir", DEFAULT_QUERY_SO_DIR);
  803. soDir.append(DEFAULT_QUERY_SO_DIR);
  804. }
  805. if (isAbsolutePath(soDir.str()))
  806. soPath.append(soDir);
  807. else
  808. {
  809. soPath.append(thorPath);
  810. addPathSepChar(soPath);
  811. soPath.append(soDir);
  812. }
  813. addPathSepChar(soPath);
  814. globals->setProp("@query_so_dir", soPath.str());
  815. recursiveCreateDirectory(soPath.str());
  816. #endif
  817. StringBuffer tempDirStr;
  818. if (!getConfigurationDirectory(globals->queryPropTree("Directories"),"spill","thor",globals->queryProp("@name"), tempDirStr))
  819. {
  820. tempDirStr.append(globals->queryProp("@thorTempDirectory"));
  821. if (0 == tempDirStr.length())
  822. {
  823. appendCurrentDirectory(tempDirStr, true);
  824. if (tempDirStr.length())
  825. addPathSepChar(tempDirStr);
  826. tempDirStr.append("temp");
  827. }
  828. }
  829. // NB: set into globals, serialized and used by worker processes.
  830. globals->setProp("@thorTempDirectory", tempDirStr);
  831. startLogMsgParentReceiver();
  832. connectLogMsgManagerToDali();
  833. if (globals->getPropBool("@cache_dafilesrv_master",false))
  834. setDaliServixSocketCaching(true); // speeds up deletes under linux
  835. }
  836. catch (IException *e)
  837. {
  838. FLLOG(MCexception(e), thorJob, e,"ThorMaster");
  839. e->Release();
  840. return -1;
  841. }
  842. StringBuffer queueName;
  843. #ifndef _CONTAINERIZED
  844. SCMStringBuffer _queueNames;
  845. const char *thorName = globals->queryProp("@name");
  846. if (!thorName) thorName = "thor";
  847. getThorQueueNames(_queueNames, thorName);
  848. queueName.set(_queueNames.str());
  849. #endif
  850. Owned<IException> exception;
  851. StringBuffer cloudJobName;
  852. try
  853. {
  854. CSDSServerStatus &serverStatus = openThorServerStatus();
  855. Owned<CRegistryServer> registry = new CRegistryServer();
  856. serverStatus.queryProperties()->setProp("@thorname", thorname);
  857. serverStatus.queryProperties()->setProp("@cluster", nodeGroup.str()); // JCSMORE rename
  858. serverStatus.queryProperties()->setProp("LogFile", logUrl.str()); // LogFile read by eclwatch (possibly)
  859. serverStatus.queryProperties()->setProp("@nodeGroup", nodeGroup.str());
  860. serverStatus.queryProperties()->setProp("@queue", queueName.str());
  861. serverStatus.commitProperties();
  862. addAbortHandler(ControlHandler);
  863. masterSlaveMpTag = allocateClusterMPTag();
  864. kjServiceMpTag = allocateClusterMPTag();
  865. unsigned numWorkers = 0;
  866. #ifdef _CONTAINERIZED
  867. LogMsgJobId thorJobId = queryLogMsgManager()->addJobId(workunit);
  868. thorJob.setJobID(thorJobId);
  869. setDefaultJobId(thorJobId);
  870. StringBuffer thorEpStr;
  871. LOG(MCdebugProgress, thorJob, "ThorMaster version %d.%d, Started on %s", THOR_VERSION_MAJOR,THOR_VERSION_MINOR,thorEp.getUrlStr(thorEpStr).str());
  872. LOG(MCdebugProgress, thorJob, "Thor name = %s, queue = %s, nodeGroup = %s",thorname,queueName.str(),nodeGroup.str());
  873. unsigned numWorkersPerPod = 1;
  874. if (!globals->hasProp("@numWorkers"))
  875. throw makeStringException(0, "Default number of workers not defined (numWorkers)");
  876. else
  877. {
  878. // check 'numWorkers' workunit option.
  879. Owned<IWorkUnitFactory> factory = getWorkUnitFactory();
  880. Owned<IConstWorkUnit> wuRead = factory->openWorkUnit(workunit);
  881. if (!wuRead)
  882. throw makeStringExceptionV(0, "Cannot open workunit: %s", workunit);
  883. if (wuRead->hasDebugValue("numWorkers"))
  884. numWorkers = wuRead->getDebugValueInt("numWorkers", 0);
  885. else
  886. numWorkers = globals->getPropInt("@numWorkers", 0);
  887. if (0 == numWorkers)
  888. throw makeStringException(0, "Number of workers must be > 0 (numWorkers)");
  889. if (wuRead->hasDebugValue("numWorkersPerPod"))
  890. numWorkersPerPod = wuRead->getDebugValueInt("numWorkersPerPod", 1);
  891. else
  892. numWorkersPerPod = globals->getPropInt("@numWorkersPerPod", 1); // default to 1
  893. if (numWorkersPerPod < 1)
  894. throw makeStringException(0, "Number of workers per pod must be > 0 (numWorkersPerPod)");
  895. if ((numWorkers % numWorkersPerPod) != 0)
  896. throw makeStringExceptionV(0, "numWorkersPerPod must be a factor of numWorkers. (numWorkers=%u, numWorkersPerPod=%u)", numWorkers, numWorkersPerPod);
  897. }
  898. cloudJobName.appendf("%s-%s", workunit, graphName);
  899. StringBuffer myEp;
  900. queryMyNode()->endpoint().getUrlStr(myEp);
  901. applyK8sYaml("thorworker", workunit, cloudJobName, "jobspec", { { "graphName", graphName}, { "master", myEp.str() }, { "_HPCC_NUM_WORKERS_", std::to_string(numWorkers/numWorkersPerPod)} }, false);
  902. #else
  903. StringBuffer thorEpStr;
  904. LOG(MCdebugProgress, thorJob, "ThorMaster version %d.%d, Started on %s", THOR_VERSION_MAJOR,THOR_VERSION_MINOR,thorEp.getUrlStr(thorEpStr).str());
  905. LOG(MCdebugProgress, thorJob, "Thor name = %s, queue = %s, nodeGroup = %s",thorname,queueName.str(),nodeGroup.str());
  906. unsigned localThorPortInc = globals->getPropInt("@localThorPortInc", DEFAULT_SLAVEPORTINC);
  907. unsigned slaveBasePort = globals->getPropInt("@slaveport", DEFAULT_THORSLAVEPORT);
  908. Owned<IGroup> rawGroup = getClusterNodeGroup(thorname, "ThorCluster");
  909. setClusterGroup(queryMyNode(), rawGroup, slavesPerNode, channelsPerWorker, slaveBasePort, localThorPortInc);
  910. numWorkers = queryNodeClusterWidth();
  911. #endif
  912. if (registry->connect(numWorkers))
  913. {
  914. if (globals->getPropBool("@replicateOutputs")&&globals->getPropBool("@validateDAFS",true)&&!checkClusterRelicateDAFS(queryNodeGroup()))
  915. {
  916. FLLOG(MCoperatorError, thorJob, "ERROR: Validate failure(s) detected, exiting Thor");
  917. return globals->getPropBool("@validateDAFSretCode"); // default is no recycle!
  918. }
  919. unsigned totSlaveProcs = queryNodeClusterWidth();
  920. for (unsigned s=0; s<totSlaveProcs; s++)
  921. {
  922. StringBuffer slaveStr;
  923. for (unsigned c=0; c<channelsPerWorker; c++)
  924. {
  925. unsigned o = s + (c * totSlaveProcs);
  926. if (c)
  927. slaveStr.append(",");
  928. slaveStr.append(o+1);
  929. }
  930. StringBuffer virtStr;
  931. if (channelsPerWorker>1)
  932. virtStr.append("virtual slaves:");
  933. else
  934. virtStr.append("slave:");
  935. PROGLOG("Slave log %u contains %s %s", s+1, virtStr.str(), slaveStr.str());
  936. }
  937. PROGLOG("verifying mp connection to rest of cluster");
  938. if (!queryNodeComm().verifyAll(false, 1000*60*30, 1000*60))
  939. throwStringExceptionV(0, "Failed to connect to all nodes");
  940. PROGLOG("verified mp connection to rest of cluster");
  941. #ifdef _CONTAINERIZED
  942. if (globals->getPropBool("@_dafsStorage"))
  943. {
  944. /* NB: This option is a developer option only.
  945. * It is intended to be used to bring up a temporary Thor instance that uses local node storage,
  946. * as the data plane.
  947. *
  948. * It is likely to be deprecated or need reworking, when DFS is refactored to use SP's properly.
  949. *
  950. * The mechanism works by:
  951. * a) Creating a pseudo StoragePlane (publishes group to Dali).
  952. * b) Spins up a dafilesrv thread in each slave container.
  953. * c) Changes the default StoragePlane used to publish files, to point to the SP/group created in step (a).
  954. *
  955. * In this way, a Thor instance, whilst up, will act similarly to a bare-metal system, using local disks as storage.
  956. * This allows quick cloud based allocation/simulation of bare-metal type clusters for testing purposes.
  957. *
  958. * NB: This isn't a real StoragePlane, and it will not be accessible by any other component.
  959. *
  960. */
  961. StringBuffer uniqueGrpName;
  962. queryNamedGroupStore().addUnique(&queryProcessGroup(), uniqueGrpName);
  963. // change default plane
  964. getComponentConfigSP()->setProp("@dataPlane", uniqueGrpName);
  965. PROGLOG("Persistent Thor group created with group name: %s", uniqueGrpName.str());
  966. }
  967. #endif
  968. LOG(MCauditInfo, ",Progress,Thor,Startup,%s,%s,%s,%s",nodeGroup.str(),thorname,queueName.str(),logUrl.str());
  969. auditStartLogged = true;
  970. writeSentinelFile(sentinelFile);
  971. #ifndef _CONTAINERIZED
  972. unsigned pinterval = globals->getPropInt("@system_monitor_interval",1000*60);
  973. if (pinterval)
  974. startPerformanceMonitor(pinterval, PerfMonStandard, nullptr);
  975. #endif
  976. // NB: workunit/graphName only set in one-shot mode (if isCloud())
  977. thorMain(logHandler, workunit, graphName);
  978. LOG(MCauditInfo, ",Progress,Thor,Terminate,%s,%s,%s",thorname,nodeGroup.str(),queueName.str());
  979. }
  980. else
  981. PROGLOG("Registration aborted");
  982. registry.clear();
  983. LOG(MCdebugProgress, thorJob, "ThorMaster terminated OK");
  984. }
  985. catch (IException *e)
  986. {
  987. FLLOG(MCexception(e), thorJob, e,"ThorMaster");
  988. exception.setown(e);
  989. }
  990. #ifdef _CONTAINERIZED
  991. if (!cloudJobName.isEmpty())
  992. {
  993. KeepK8sJobs keepJob = translateKeepJobs(globals->queryProp("@keepJobs"));
  994. if (keepJob != KeepK8sJobs::all)
  995. {
  996. // Delete jobs unless the pod failed and keepJob==podfailures
  997. if ((nullptr == exception) || (KeepK8sJobs::podfailures != keepJob))
  998. deleteK8sResource("thorworker", cloudJobName, "job");
  999. }
  1000. }
  1001. setExitCode(0);
  1002. #endif
  1003. // cleanup handler to be sure we end
  1004. thorEndHandler->start(30);
  1005. PROGLOG("Thor closing down 5");
  1006. #ifndef _CONTAINERIZED
  1007. stopPerformanceMonitor();
  1008. #endif
  1009. disconnectLogMsgManagerFromDali();
  1010. closeThorServerStatus();
  1011. PROGLOG("Thor closing down 4");
  1012. closeDllServer();
  1013. PROGLOG("Thor closing down 3");
  1014. closeEnvironment();
  1015. PROGLOG("Thor closing down 2");
  1016. closedownClientProcess();
  1017. PROGLOG("Thor closing down 1");
  1018. UseSysLogForOperatorMessages(false);
  1019. releaseAtoms(); // don't know why we can't use a module_exit to destruct this...
  1020. return queryExitCode();
  1021. }