thmastermain.cpp 30 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849
  1. /*##############################################################################
  2. HPCC SYSTEMS software Copyright (C) 2012 HPCC Systems®.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. ############################################################################## */
  13. // Entrypoint for ThorMaster.EXE
  14. #include "platform.h"
  15. #include <stddef.h>
  16. #include <stdlib.h>
  17. #include <assert.h>
  18. #include <stdio.h>
  19. #include <stdlib.h>
  20. #ifdef _WIN32
  21. #include <direct.h>
  22. #endif
  23. #include "build-config.h"
  24. #include "jlib.hpp"
  25. #include "jdebug.hpp"
  26. #include "jfile.hpp"
  27. #include "jmisc.hpp"
  28. #include "jmutex.hpp"
  29. #include "jprop.hpp"
  30. #include "jset.hpp"
  31. #include "jsocket.hpp"
  32. #include "jthread.hpp"
  33. #include "jexcept.hpp"
  34. #include "mpbase.hpp"
  35. #include "mplog.hpp"
  36. #include "daaudit.hpp"
  37. #include "daclient.hpp"
  38. #include "dadfs.hpp"
  39. #include "dalienv.hpp"
  40. #include "dasds.hpp"
  41. #include "dllserver.hpp"
  42. #include "rmtfile.hpp"
  43. #include "portlist.h"
  44. #include "thor.hpp"
  45. #include "thorport.hpp"
  46. #include "thormisc.hpp"
  47. #include "thgraph.hpp"
  48. #include "thgraphmaster.hpp"
  49. #include "thgraphmanager.hpp"
  50. #include "thmastermain.hpp"
  51. #include "mawatchdog.hpp"
  52. #include "thexception.hpp"
  53. #include "thmem.hpp"
  54. #define DEFAULT_QUERY_SO_DIR "sodir"
  55. #define MAX_SLAVEREG_DELAY 60*1000*15 // 15 mins
  56. #define SLAVEREG_VERIFY_DELAY 5*1000
  57. #define SHUTDOWN_IN_PARALLEL 20
  58. class CRegistryServer : public CSimpleInterface
  59. {
  60. unsigned msgDelay, slavesRegistered;
  61. CriticalSection crit;
  62. bool stopped;
  63. static CriticalSection regCrit;
  64. static CRegistryServer *registryServer;
  65. class CDeregistrationWatch : implements IThreaded
  66. {
  67. CThreaded threaded;
  68. CRegistryServer &registry;
  69. bool running;
  70. public:
  71. CDeregistrationWatch(CRegistryServer &_registry) : threaded("CDeregistrationWatch"), registry(_registry), running(false) { }
  72. ~CDeregistrationWatch()
  73. {
  74. stop();
  75. }
  76. void start() { threaded.init(this); }
  77. void stop()
  78. {
  79. if (running)
  80. {
  81. running = false;
  82. queryWorldCommunicator().cancel(NULL, MPTAG_THORREGISTRATION);
  83. threaded.join();
  84. }
  85. }
  86. virtual void main()
  87. {
  88. running = true;
  89. loop
  90. {
  91. INode *senderNode;
  92. CMessageBuffer msg;
  93. if (!queryWorldCommunicator().recv(msg, NULL, MPTAG_THORREGISTRATION, &senderNode))
  94. return;
  95. rank_t sender = queryNodeGroup().rank(senderNode);
  96. SocketEndpoint ep = senderNode->endpoint();
  97. StringBuffer url;
  98. ep.getUrlStr(url);
  99. if (RANK_NULL == sender)
  100. {
  101. PROGLOG("Node %s trying to deregister is not part of this cluster", url.str());
  102. continue;
  103. }
  104. RegistryCode code;
  105. msg.read((int &)code);
  106. if (rc_deregister != code)
  107. throwUnexpected();
  108. Owned<IException> e = deserializeException(msg);
  109. if (e.get())
  110. EXCLOG(e, "Slave unregistered with exception");
  111. registry.deregisterNode(sender-1);
  112. }
  113. running = false;
  114. }
  115. } deregistrationWatch;
  116. public:
  117. Linked<CMasterWatchdogBase> watchdog;
  118. IBitSet *status;
  119. CRegistryServer() : deregistrationWatch(*this), stopped(false)
  120. {
  121. status = createThreadSafeBitSet();
  122. msgDelay = SLAVEREG_VERIFY_DELAY;
  123. slavesRegistered = 0;
  124. if (globals->getPropBool("@watchdogEnabled"))
  125. watchdog.setown(createMasterWatchdog(globals->getPropBool("@useUDPWatchdog")));
  126. else
  127. globals->setPropBool("@watchdogProgressEnabled", false);
  128. CriticalBlock b(regCrit);
  129. registryServer = this;
  130. }
  131. ~CRegistryServer()
  132. {
  133. CriticalBlock b(regCrit);
  134. registryServer = NULL;
  135. stop();
  136. if (watchdog)
  137. watchdog->stop();
  138. shutdown();
  139. status->Release();
  140. }
  141. static CRegistryServer *getRegistryServer()
  142. {
  143. CriticalBlock b(regCrit);
  144. return LINK(registryServer);
  145. }
  146. void deregisterNode(unsigned slave)
  147. {
  148. const SocketEndpoint &ep = queryNodeGroup().queryNode(slave+1).endpoint();
  149. StringBuffer url;
  150. ep.getUrlStr(url);
  151. if (!status->test(slave))
  152. {
  153. PROGLOG("Slave %d (%s) trying to unregister, but not currently registered", slave+1, url.str());
  154. return;
  155. }
  156. PROGLOG("Slave %d (%s) unregistered", slave+1, url.str());
  157. status->set(slave, false);
  158. --slavesRegistered;
  159. if (watchdog)
  160. watchdog->removeSlave(ep);
  161. abortThor(MakeThorOperatorException(TE_AbortException, "The machine %s and/or the slave was shutdown. Aborting Thor", url.str()), TEC_SlaveInit);
  162. }
  163. void registerNode(unsigned slave)
  164. {
  165. SocketEndpoint ep = queryNodeGroup().queryNode(slave+1).endpoint();
  166. StringBuffer url;
  167. ep.getUrlStr(url);
  168. if (status->test(slave))
  169. {
  170. PROGLOG("Slave %d (%s) already registered, rejecting", slave+1, url.str());
  171. return;
  172. }
  173. PROGLOG("Slave %d (%s) registered", slave+1, url.str());
  174. status->set(slave);
  175. if (watchdog)
  176. watchdog->addSlave(ep);
  177. ++slavesRegistered;
  178. }
  179. bool connect()
  180. {
  181. unsigned slaves = queryNodeClusterWidth();
  182. LOG(MCdebugProgress, thorJob, "Waiting for %d slaves to register", slaves);
  183. unsigned timeWaited = 0;
  184. unsigned connected = 0;
  185. Owned<IBitSet> connectedSet = createThreadSafeBitSet();
  186. loop
  187. {
  188. CTimeMon tm(msgDelay);
  189. UnsignedArray todo;
  190. unsigned s = 0;
  191. while ((s=connectedSet->scan(s, false)) < slaves)
  192. todo.append(s++);
  193. Owned<IShuffledIterator> shuffled = createShuffledIterator(todo.ordinality());
  194. ForEach(*shuffled)
  195. {
  196. s = todo.item(shuffled->get());
  197. unsigned remaining;
  198. if (tm.timedout(&remaining))
  199. break;
  200. PROGLOG("Verifying connection to slave %d", s+1);
  201. if (queryWorldCommunicator().verifyConnection(&queryNodeGroup().queryNode(s+1), remaining))
  202. {
  203. StringBuffer str;
  204. PROGLOG("verified connection with %s", queryNodeGroup().queryNode(s+1).endpoint().getUrlStr(str.clear()).str());
  205. ++connected;
  206. connectedSet->set(s);
  207. }
  208. if (stopped)
  209. return false;
  210. }
  211. timeWaited += tm.elapsed();
  212. if (connected == slaves)
  213. break;
  214. else
  215. {
  216. if (timeWaited >= MAX_SLAVEREG_DELAY)
  217. throw MakeThorException(TE_AbortException, "Have waited over %d minutes for all slaves to connect, quitting.", MAX_SLAVEREG_DELAY/1000/60);
  218. unsigned outstanding = slaves - connected;
  219. PROGLOG("Still Waiting for minimum %d slaves to connect", outstanding);
  220. if ((outstanding) <= 5)
  221. {
  222. unsigned s=0;
  223. loop
  224. {
  225. unsigned ns = connectedSet->scan(s, false);
  226. if (ns<s || ns >= slaves)
  227. break;
  228. s = ns+1;
  229. StringBuffer str;
  230. PROGLOG("waiting for slave %d (%s)", s, queryNodeGroup().queryNode(s).endpoint().getUrlStr(str.clear()).str());
  231. }
  232. }
  233. msgDelay = (unsigned) ((float)msgDelay * 1.5);
  234. if (timeWaited+msgDelay > MAX_SLAVEREG_DELAY)
  235. msgDelay = MAX_SLAVEREG_DELAY - timeWaited;
  236. }
  237. }
  238. PROGLOG("Slaves connected, initializing..");
  239. CMessageBuffer msg;
  240. msg.append(THOR_VERSION_MAJOR).append(THOR_VERSION_MINOR);
  241. queryRawGroup().serialize(msg);
  242. globals->serialize(msg);
  243. msg.append(masterSlaveMpTag);
  244. if (!queryNodeComm().send(msg, RANK_ALL_OTHER, MPTAG_THORREGISTRATION, MP_ASYNC_SEND))
  245. {
  246. PROGLOG("Failed to initialize slaves");
  247. return false;
  248. }
  249. PROGLOG("Initialization sent to slave group");
  250. try
  251. {
  252. while (slavesRegistered < slaves)
  253. {
  254. rank_t sender;
  255. CMessageBuffer msg;
  256. if (!queryNodeComm().recv(msg, RANK_ALL, MPTAG_THORREGISTRATION, &sender, MAX_SLAVEREG_DELAY))
  257. {
  258. PROGLOG("Slaves not responding to cluster initialization: ");
  259. unsigned s=0;
  260. loop
  261. {
  262. unsigned ns = status->scan(s, false);
  263. if (ns<s || ns >= slaves)
  264. break;
  265. s = ns+1;
  266. StringBuffer str;
  267. PROGLOG("Slave %d (%s)", s, queryNodeGroup().queryNode(s).endpoint().getUrlStr(str.clear()).str());
  268. }
  269. throw MakeThorException(TE_AbortException, "Slaves failed to respond to cluster initialization");
  270. }
  271. StringBuffer str;
  272. PROGLOG("Registration confirmation from %s", queryNodeGroup().queryNode(sender).endpoint().getUrlStr(str).str());
  273. if (msg.length())
  274. {
  275. Owned<IException> e = deserializeException(msg);
  276. EXCLOG(e, "Registration error");
  277. if (TE_FailedToRegisterSlave == e->errorCode())
  278. {
  279. setExitCode(0); // to avoid thor auto-recycling
  280. return false;
  281. }
  282. }
  283. registerNode(sender-1);
  284. }
  285. PROGLOG("Slaves initialized");
  286. unsigned s=0;
  287. for (; s<slaves; s++)
  288. {
  289. CMessageBuffer msg;
  290. if (!queryNodeComm().send(msg, s+1, MPTAG_THORREGISTRATION))
  291. {
  292. PROGLOG("Failed to acknowledge slave %d registration", s+1);
  293. return false;
  294. }
  295. }
  296. deregistrationWatch.start();
  297. return true;
  298. }
  299. catch (IException *e)
  300. {
  301. EXCLOG(e, "Slave registration exception");
  302. e->Release();
  303. }
  304. shutdown();
  305. return false;
  306. }
  307. void stop()
  308. {
  309. if (stopped)
  310. return;
  311. stopped = true;
  312. deregistrationWatch.stop();
  313. queryWorldCommunicator().cancel(NULL, MPTAG_THORREGISTRATION);
  314. }
  315. void shutdown()
  316. {
  317. CriticalBlock block(crit);
  318. unsigned i=0;
  319. mptag_t shutdownTag = createReplyTag();
  320. for (; i<queryNodeClusterWidth(); i++)
  321. {
  322. if (status->test(i))
  323. {
  324. SocketEndpoint ep = queryNodeGroup().queryNode(i+1).endpoint();
  325. CMessageBuffer msg;
  326. msg.append((unsigned)Shutdown);
  327. serializeMPtag(msg, shutdownTag);
  328. try
  329. {
  330. queryNodeComm().send(msg, i+1, masterSlaveMpTag, MP_ASYNC_SEND);
  331. }
  332. catch (IMP_Exception *e) { e->Release(); }
  333. catch (IException *e)
  334. {
  335. EXCLOG(e, "Shutting down slave");
  336. e->Release();
  337. }
  338. if (watchdog)
  339. watchdog->removeSlave(ep);
  340. }
  341. }
  342. CTimeMon tm(20000);
  343. unsigned numReplied = 0;
  344. while (numReplied < slavesRegistered)
  345. {
  346. unsigned remaining;
  347. if (tm.timedout(&remaining))
  348. {
  349. PROGLOG("Timeout waiting for Shutdown reply from slave(s) (%u replied out of %u total)", numReplied, slavesRegistered);
  350. StringBuffer slaveList;
  351. for (i=0;i<slavesRegistered;i++)
  352. {
  353. if (status->test(i))
  354. {
  355. if (slaveList.length())
  356. slaveList.append(",");
  357. slaveList.append(i+1);
  358. }
  359. }
  360. if (slaveList.length())
  361. PROGLOG("Slaves that have not replied: %s", slaveList.str());
  362. break;
  363. }
  364. try
  365. {
  366. rank_t sender;
  367. CMessageBuffer msg;
  368. if (queryNodeComm().recv(msg, RANK_ALL, shutdownTag, &sender, remaining))
  369. {
  370. if (sender) // paranoid, sender should always be > 0
  371. status->set(sender-1, false);
  372. numReplied++;
  373. }
  374. }
  375. catch (IException *e)
  376. {
  377. // do not log MP link closed exceptions from ending slaves
  378. e->Release();
  379. }
  380. }
  381. }
  382. };
  383. CriticalSection CRegistryServer::regCrit;
  384. CRegistryServer *CRegistryServer::registryServer = NULL;
  385. //
  386. //////////////////
  387. bool checkClusterRelicateDAFS(IGroup &grp)
  388. {
  389. // check the dafilesrv is running (and right version)
  390. unsigned start = msTick();
  391. PROGLOG("Checking cluster replicate nodes");
  392. SocketEndpointArray epa;
  393. grp.getSocketEndpoints(epa);
  394. ForEachItemIn(i1,epa) {
  395. epa.element(i1).port = getDaliServixPort();
  396. }
  397. SocketEndpointArray failures;
  398. UnsignedArray failedcodes;
  399. StringArray failedmessages;
  400. validateNodes(epa,NULL,NULL,true,NULL,0,failures,failedcodes,failedmessages);
  401. ForEachItemIn(i,failures) {
  402. SocketEndpoint ep(failures.item(i));
  403. ep.port = 0;
  404. StringBuffer ips;
  405. ep.getIpText(ips);
  406. FLLOG(MCoperatorError, thorJob, "VALIDATE FAILED(%d) %s : %s",failedcodes.item(i),ips.str(),failedmessages.item(i));
  407. }
  408. PROGLOG("Cluster replicate nodes check completed in %dms",msTick()-start);
  409. return (failures.ordinality()==0);
  410. }
  411. static bool auditStartLogged = false;
  412. static bool firstCtrlC = true;
  413. bool ControlHandler(ahType type)
  414. {
  415. if (ahInterrupt == type)
  416. {
  417. if (firstCtrlC)
  418. {
  419. LOG(MCdebugProgress, thorJob, "CTRL-C detected");
  420. firstCtrlC = false;
  421. {
  422. Owned<CRegistryServer> registry = CRegistryServer::getRegistryServer();
  423. if (registry)
  424. registry->stop();
  425. }
  426. abortThor(NULL, TEC_CtrlC);
  427. }
  428. else
  429. {
  430. LOG(MCdebugProgress, thorJob, "2nd CTRL-C detected - terminating process");
  431. if (auditStartLogged)
  432. {
  433. auditStartLogged = false;
  434. LOG(daliAuditLogCat,",Progress,Thor,Terminate,%s,%s,%s,ctrlc",
  435. queryServerStatus().queryProperties()->queryProp("@thorname"),
  436. queryServerStatus().queryProperties()->queryProp("@nodeGroup"),
  437. queryServerStatus().queryProperties()->queryProp("@queue"));
  438. }
  439. queryLogMsgManager()->flushQueue(10*1000);
  440. #ifdef _WIN32
  441. TerminateProcess(GetCurrentProcess(), 1);
  442. #else
  443. //MORE- verify this
  444. // why not just raise(SIGKILL); ?
  445. kill(getpid(), SIGKILL);
  446. #endif
  447. _exit(1);
  448. }
  449. }
  450. // ahTerminate
  451. else
  452. {
  453. LOG(MCdebugProgress, thorJob, "SIGTERM detected, shutting down");
  454. Owned<CRegistryServer> registry = CRegistryServer::getRegistryServer();
  455. if (registry)
  456. registry->stop();
  457. abortThor(NULL, TEC_Clean);
  458. }
  459. return false;
  460. }
  461. #include "thactivitymaster.hpp"
  462. int main( int argc, char *argv[] )
  463. {
  464. #if defined(WIN32) && defined(_DEBUG)
  465. int tmpFlag = _CrtSetDbgFlag( _CRTDBG_REPORT_FLAG );
  466. tmpFlag |= _CRTDBG_LEAK_CHECK_DF;
  467. _CrtSetDbgFlag( tmpFlag );
  468. #endif
  469. loadMasters(); // actually just a dummy call to ensure dll linked
  470. InitModuleObjects();
  471. NoQuickEditSection xxx;
  472. {
  473. Owned<IFile> iFile = createIFile("thor.xml");
  474. globals = iFile->exists() ? createPTree(*iFile, ipt_caseInsensitive) : createPTree("Thor", ipt_caseInsensitive);
  475. }
  476. setStatisticsComponentName(SCTthor, globals->queryProp("@name"), true);
  477. globals->setProp("@masterBuildTag", BUILD_TAG);
  478. char **pp = argv+1;
  479. while (*pp)
  480. loadCmdProp(globals, *pp++);
  481. setIORetryCount(globals->getPropInt("Debug/@ioRetries")); // default == 0 == off
  482. StringBuffer daliServer;
  483. if (!globals->getProp("@DALISERVERS", daliServer))
  484. {
  485. LOG(MCerror, thorJob, "No Dali server list specified in THOR.XML (DALISERVERS=iport,iport...)\n");
  486. return 0; // no recycle
  487. }
  488. SocketEndpoint thorEp;
  489. const char *master = globals->queryProp("@MASTER");
  490. if (master)
  491. {
  492. thorEp.set(master);
  493. thorEp.setLocalHost(thorEp.port);
  494. }
  495. else
  496. thorEp.setLocalHost(0);
  497. setMasterPortBase(thorEp.port); // both same
  498. thorEp.port = getMasterPortBase();
  499. // Remove sentinel asap
  500. Owned<IFile> sentinelFile = createSentinelTarget();
  501. removeSentinelFile(sentinelFile);
  502. setMachinePortBase(thorEp.port);
  503. EnableSEHtoExceptionMapping();
  504. #ifndef __64BIT__
  505. // Restrict stack sizes on 32-bit systems
  506. Thread::setDefaultStackSize(0x10000); // NB under windows requires linker setting (/stack:)
  507. #endif
  508. const char *thorname = NULL;
  509. StringBuffer nodeGroup, logUrl;
  510. unsigned channelsPerSlave = 1;
  511. ILogMsgHandler *logHandler;
  512. try
  513. {
  514. {
  515. Owned<IComponentLogFileCreator> lf = createComponentLogFileCreator(globals, "thor");
  516. lf->setName("thormaster");//override default filename
  517. lf->setCreateAliasFile(false);
  518. logHandler = lf->beginLogging();
  519. createUNCFilename(lf->queryLogFileSpec(), logUrl, false);
  520. }
  521. LOG(MCdebugProgress, thorJob, "Opened log file %s", logUrl.str());
  522. LOG(MCdebugProgress, thorJob, "Build %s", BUILD_TAG);
  523. globals->setProp("@logURL", logUrl.str());
  524. Owned<IGroup> serverGroup = createIGroup(daliServer.str(), DALI_SERVER_PORT);
  525. unsigned retry = 0;
  526. loop {
  527. try {
  528. unsigned port = getFixedPort(TPORT_mp);
  529. LOG(MCdebugProgress, thorJob, "calling initClientProcess Port %d", port);
  530. initClientProcess(serverGroup, DCR_ThorMaster, port);
  531. break;
  532. }
  533. catch (IJSOCK_Exception *e) {
  534. if ((e->errorCode()!=JSOCKERR_port_in_use))
  535. throw;
  536. FLLOG(MCexception(e), thorJob, e,"InitClientProcess");
  537. if (retry++>10)
  538. throw;
  539. e->Release();
  540. LOG(MCdebugProgress, thorJob, "Retrying");
  541. Sleep(retry*2000);
  542. }
  543. }
  544. if (globals->getPropBool("@MPChannelReconnect"))
  545. getMPServer()->setOpt(mpsopt_channelreopen, "true");
  546. setPasswordsFromSDS();
  547. if (globals->getPropBool("@enableSysLog",true))
  548. UseSysLogForOperatorMessages();
  549. thorname = globals->queryProp("@name");
  550. if (!thorname)
  551. {
  552. PROGLOG("No 'name' setting, defaulting to \"local\"");
  553. thorname = "local";
  554. globals->setProp("@name", thorname);
  555. }
  556. if (!globals->getProp("@nodeGroup", nodeGroup))
  557. {
  558. nodeGroup.append(thorname);
  559. globals->setProp("@nodeGroup", thorname);
  560. }
  561. unsigned slavesPerNode = globals->getPropInt("@slavesPerNode", 1);
  562. channelsPerSlave = globals->getPropInt("@channelsPerSlave", 1);
  563. unsigned localThorPortInc = globals->getPropInt("@localThorPortInc", DEFAULT_SLAVEPORTINC);
  564. unsigned slaveBasePort = globals->getPropInt("@slaveport", DEFAULT_THORSLAVEPORT);
  565. Owned<IGroup> rawGroup = getClusterNodeGroup(thorname, "ThorCluster");
  566. setClusterGroup(queryMyNode(), rawGroup, slavesPerNode, channelsPerSlave, slaveBasePort, localThorPortInc);
  567. if (globals->getPropBool("@replicateOutputs")&&globals->getPropBool("@validateDAFS",true)&&!checkClusterRelicateDAFS(queryNodeGroup()))
  568. {
  569. FLLOG(MCoperatorError, thorJob, "ERROR: Validate failure(s) detected, exiting Thor");
  570. return globals->getPropBool("@validateDAFSretCode"); // default is no recycle!
  571. }
  572. if (globals->getPropBool("@useNASTranslation", true))
  573. {
  574. Owned<IPropertyTree> nasConfig = envGetNASConfiguration();
  575. if (nasConfig)
  576. globals->setPropTree("NAS", nasConfig.getLink()); // for use by slaves
  577. Owned<IPropertyTree> masterNasFilters = envGetInstallNASHooks(nasConfig, &thorEp);
  578. }
  579. HardwareInfo hdwInfo;
  580. getHardwareInfo(hdwInfo);
  581. globals->setPropInt("@masterTotalMem", hdwInfo.totalMemory);
  582. unsigned mmemSize = globals->getPropInt("@masterMemorySize"); // in MB
  583. unsigned gmemSize = globals->getPropInt("@globalMemorySize"); // in MB
  584. if (0 == gmemSize)
  585. {
  586. unsigned maxMem = hdwInfo.totalMemory;
  587. #ifdef _WIN32
  588. if (maxMem > 2048)
  589. maxMem = 2048;
  590. #else
  591. #ifndef __64BIT__
  592. if (maxMem > 2048)
  593. {
  594. // 32 bit OS doesn't handle whole physically installed RAM
  595. maxMem = 2048;
  596. }
  597. #ifdef __ARM_ARCH_7A__
  598. // For ChromeBook with 2GB RAM
  599. if (maxMem <= 2048)
  600. {
  601. // Decrease max memory to 2/3
  602. maxMem = maxMem * 2 / 3;
  603. }
  604. #endif
  605. #endif
  606. #endif
  607. if (globals->getPropBool("@localThor") && 0 == mmemSize)
  608. {
  609. gmemSize = maxMem / 2; // 50% of total for slaves
  610. mmemSize = maxMem / 4; // 25% of total for master
  611. }
  612. else
  613. {
  614. gmemSize = maxMem * 3 / 4; // 75% of total for slaves
  615. if (0 == mmemSize)
  616. mmemSize = gmemSize; // default to same as slaves
  617. }
  618. unsigned perSlaveSize = gmemSize;
  619. if (slavesPerNode>1)
  620. {
  621. PROGLOG("Sharing globalMemorySize(%d MB), between %d slave processes. %d MB each", perSlaveSize, slavesPerNode, perSlaveSize / slavesPerNode);
  622. perSlaveSize /= slavesPerNode;
  623. }
  624. globals->setPropInt("@globalMemorySize", perSlaveSize);
  625. }
  626. else
  627. {
  628. if (gmemSize >= hdwInfo.totalMemory)
  629. {
  630. // should prob. error here
  631. }
  632. if (0 == mmemSize)
  633. mmemSize = gmemSize;
  634. }
  635. bool gmemAllowHugePages = globals->getPropBool("@heapUseHugePages", false);
  636. gmemAllowHugePages = globals->getPropBool("@heapMasterUseHugePages", gmemAllowHugePages);
  637. bool gmemAllowTransparentHugePages = globals->getPropBool("@heapUseTransparentHugePages", true);
  638. bool gmemRetainMemory = globals->getPropBool("@heapRetainMemory", false);
  639. // if @masterMemorySize and @globalMemorySize unspecified gmemSize will be default based on h/w
  640. globals->setPropInt("@masterMemorySize", mmemSize);
  641. PROGLOG("Global memory size = %d MB", mmemSize);
  642. roxiemem::setTotalMemoryLimit(gmemAllowHugePages, gmemAllowTransparentHugePages, gmemRetainMemory, ((memsize_t)mmemSize) * 0x100000, 0, thorAllocSizes, NULL);
  643. const char * overrideBaseDirectory = globals->queryProp("@thorDataDirectory");
  644. const char * overrideReplicateDirectory = globals->queryProp("@thorReplicateDirectory");
  645. StringBuffer datadir;
  646. StringBuffer repdir;
  647. if (getConfigurationDirectory(globals->queryPropTree("Directories"),"data","thor",globals->queryProp("@name"),datadir))
  648. overrideBaseDirectory = datadir.str();
  649. if (getConfigurationDirectory(globals->queryPropTree("Directories"),"mirror","thor",globals->queryProp("@name"),repdir))
  650. overrideReplicateDirectory = repdir.str();
  651. if (overrideBaseDirectory&&*overrideBaseDirectory)
  652. setBaseDirectory(overrideBaseDirectory, false);
  653. if (overrideReplicateDirectory&&*overrideBaseDirectory)
  654. setBaseDirectory(overrideReplicateDirectory, true);
  655. StringBuffer tempDirStr;
  656. if (getConfigurationDirectory(globals->queryPropTree("Directories"),"temp","thor",globals->queryProp("@name"), tempDirStr))
  657. globals->setProp("@thorTempDirectory", tempDirStr.str());
  658. else
  659. tempDirStr.append(globals->queryProp("@thorTempDirectory"));
  660. logDiskSpace(); // Log before temp space is cleared
  661. StringBuffer tempPrefix("thtmp");
  662. tempPrefix.append(getMasterPortBase()).append("_");
  663. SetTempDir(tempDirStr.str(), tempPrefix.str(), true);
  664. char thorPath[1024];
  665. if (!GetCurrentDirectory(1024, thorPath)) {
  666. ERRLOG("ThorMaster::main: Current directory path too big, setting it to null");
  667. thorPath[0] = 0;
  668. }
  669. unsigned l = strlen(thorPath);
  670. if (l) { thorPath[l] = PATHSEPCHAR; thorPath[l+1] = '\0'; }
  671. globals->setProp("@thorPath", thorPath);
  672. StringBuffer soDir, soPath;
  673. if (getConfigurationDirectory(globals->queryPropTree("Directories"),"query","thor",globals->queryProp("@name"),soDir))
  674. globals->setProp("@query_so_dir", soDir.str());
  675. else if (!globals->getProp("@query_so_dir", soDir)) {
  676. globals->setProp("@query_so_dir", DEFAULT_QUERY_SO_DIR);
  677. soDir.append(DEFAULT_QUERY_SO_DIR);
  678. }
  679. if (isAbsolutePath(soDir.str()))
  680. soPath.append(soDir);
  681. else
  682. {
  683. soPath.append(thorPath);
  684. addPathSepChar(soPath);
  685. soPath.append(soDir);
  686. }
  687. addPathSepChar(soPath);
  688. globals->setProp("@query_so_dir", soPath.str());
  689. recursiveCreateDirectory(soPath.str());
  690. startLogMsgParentReceiver();
  691. connectLogMsgManagerToDali();
  692. if (globals->getPropBool("@cache_dafilesrv_master",false))
  693. setDaliServixSocketCaching(true); // speeds up deletes under linux
  694. }
  695. catch (IException *e)
  696. {
  697. FLLOG(MCexception(e), thorJob, e,"ThorMaster");
  698. e->Release();
  699. return -1;
  700. }
  701. StringBuffer queueName;
  702. SCMStringBuffer _queueNames;
  703. const char *thorName = globals->queryProp("@name");
  704. if (!thorName) thorName = "thor";
  705. getThorQueueNames(_queueNames, thorName);
  706. queueName.set(_queueNames.str());
  707. try {
  708. CSDSServerStatus &serverStatus = openThorServerStatus();
  709. Owned<CRegistryServer> registry = new CRegistryServer();
  710. StringBuffer thorEpStr;
  711. LOG(MCdebugProgress, thorJob, "ThorMaster version %d.%d, Started on %s", THOR_VERSION_MAJOR,THOR_VERSION_MINOR,thorEp.getUrlStr(thorEpStr).str());
  712. LOG(MCdebugProgress, thorJob, "Thor name = %s, queue = %s, nodeGroup = %s",thorname,queueName.str(),nodeGroup.str());
  713. serverStatus.queryProperties()->setProp("@thorname", thorname);
  714. serverStatus.queryProperties()->setProp("@cluster", nodeGroup.str()); // JCSMORE rename
  715. serverStatus.queryProperties()->setProp("LogFile", logUrl.str()); // LogFile read by eclwatch (possibly)
  716. serverStatus.queryProperties()->setProp("@nodeGroup", nodeGroup.str());
  717. serverStatus.queryProperties()->setProp("@queue", queueName.str());
  718. serverStatus.commitProperties();
  719. addAbortHandler(ControlHandler);
  720. masterSlaveMpTag = allocateClusterMPTag();
  721. if (registry->connect())
  722. {
  723. unsigned totSlaveProcs = queryNodeClusterWidth();
  724. for (unsigned s=0; s<totSlaveProcs; s++)
  725. {
  726. StringBuffer slaveStr;
  727. for (unsigned c=0; c<channelsPerSlave; c++)
  728. {
  729. unsigned o = s + (c * totSlaveProcs);
  730. if (c)
  731. slaveStr.append(",");
  732. slaveStr.append(o+1);
  733. }
  734. StringBuffer virtStr;
  735. if (channelsPerSlave>1)
  736. virtStr.append("virtual slaves:");
  737. else
  738. virtStr.append("slave:");
  739. PROGLOG("Slave log %u contains %s %s", s+1, virtStr.str(), slaveStr.str());
  740. }
  741. PROGLOG("verifying mp connection to rest of cluster");
  742. if (!queryNodeComm().verifyAll())
  743. ERRLOG("Failed to connect to all nodes");
  744. else
  745. PROGLOG("verified mp connection to rest of cluster");
  746. LOG(daliAuditLogCat, ",Progress,Thor,Startup,%s,%s,%s,%s",nodeGroup.str(),thorname,queueName.str(),logUrl.str());
  747. auditStartLogged = true;
  748. writeSentinelFile(sentinelFile);
  749. unsigned pinterval = globals->getPropInt("@system_monitor_interval",1000*60);
  750. if (pinterval)
  751. startPerformanceMonitor(pinterval, PerfMonStandard, nullptr);
  752. thorMain(logHandler);
  753. LOG(daliAuditLogCat, ",Progress,Thor,Terminate,%s,%s,%s",thorname,nodeGroup.str(),queueName.str());
  754. }
  755. else
  756. PROGLOG("Registration aborted");
  757. LOG(MCdebugProgress, thorJob, "ThorMaster terminated OK");
  758. }
  759. catch (IException *e)
  760. {
  761. FLLOG(MCexception(e), thorJob, e,"ThorMaster");
  762. e->Release();
  763. }
  764. stopPerformanceMonitor();
  765. disconnectLogMsgManagerFromDali();
  766. closeThorServerStatus();
  767. if (globals) globals->Release();
  768. PROGLOG("Thor closing down 5");
  769. PROGLOG("Thor closing down 4");
  770. closeDllServer();
  771. PROGLOG("Thor closing down 3");
  772. closeEnvironment();
  773. PROGLOG("Thor closing down 2");
  774. closedownClientProcess();
  775. PROGLOG("Thor closing down 1");
  776. UseSysLogForOperatorMessages(false);
  777. releaseAtoms(); // don't know why we can't use a module_exit to destruct this...
  778. return queryExitCode();
  779. }