thmastermain.cpp 29 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818
  1. /*##############################################################################
  2. HPCC SYSTEMS software Copyright (C) 2012 HPCC Systems.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. ############################################################################## */
  13. // Entrypoint for ThorMaster.EXE
  14. #include "platform.h"
  15. #include <stddef.h>
  16. #include <stdlib.h>
  17. #include <assert.h>
  18. #include <stdio.h>
  19. #include <stdlib.h>
  20. #ifdef _WIN32
  21. #include <direct.h>
  22. #endif
  23. #include "build-config.h"
  24. #include "jlib.hpp"
  25. #include "jdebug.hpp"
  26. #include "jfile.hpp"
  27. #include "jmisc.hpp"
  28. #include "jmutex.hpp"
  29. #include "jprop.hpp"
  30. #include "jset.hpp"
  31. #include "jsocket.hpp"
  32. #include "jthread.hpp"
  33. #include "jexcept.hpp"
  34. #include "mpbase.hpp"
  35. #include "mplog.hpp"
  36. #include "daaudit.hpp"
  37. #include "daclient.hpp"
  38. #include "dadfs.hpp"
  39. #include "dalienv.hpp"
  40. #include "dasds.hpp"
  41. #include "dllserver.hpp"
  42. #include "rmtfile.hpp"
  43. #include "portlist.h"
  44. #include "thor.hpp"
  45. #include "thorport.hpp"
  46. #include "thormisc.hpp"
  47. #include "thgraph.hpp"
  48. #include "thgraphmaster.hpp"
  49. #include "thgraphmanager.hpp"
  50. #include "thmastermain.hpp"
  51. #include "mawatchdog.hpp"
  52. #include "thexception.hpp"
  53. #include "thmem.hpp"
  54. #define DEFAULT_QUERY_SO_DIR "sodir"
  55. #define MAX_SLAVEREG_DELAY 60*1000*15 // 15 mins
  56. #define SLAVEREG_VERIFY_DELAY 5*1000
  57. #define SHUTDOWN_IN_PARALLEL 20
  58. class CRegistryServer : public CSimpleInterface
  59. {
  60. unsigned msgDelay, slavesRegistered;
  61. CriticalSection crit;
  62. bool stopped;
  63. static CriticalSection regCrit;
  64. static CRegistryServer *registryServer;
  65. class CDeregistrationWatch : implements IThreaded
  66. {
  67. CThreaded threaded;
  68. CRegistryServer &registry;
  69. bool running;
  70. public:
  71. CDeregistrationWatch(CRegistryServer &_registry) : threaded("CDeregistrationWatch"), registry(_registry), running(false) { }
  72. ~CDeregistrationWatch()
  73. {
  74. stop();
  75. }
  76. void start() { threaded.init(this); }
  77. void stop()
  78. {
  79. if (running)
  80. {
  81. running = false;
  82. queryWorldCommunicator().cancel(NULL, MPTAG_THORREGISTRATION);
  83. threaded.join();
  84. }
  85. }
  86. virtual void main()
  87. {
  88. running = true;
  89. loop
  90. {
  91. INode *senderNode;
  92. CMessageBuffer msg;
  93. if (!queryWorldCommunicator().recv(msg, NULL, MPTAG_THORREGISTRATION, &senderNode))
  94. return;
  95. rank_t sender = queryClusterGroup().rank(senderNode);
  96. SocketEndpoint ep = senderNode->endpoint();
  97. StringBuffer url;
  98. ep.getUrlStr(url);
  99. if (RANK_NULL == sender)
  100. {
  101. PROGLOG("Node %s trying to deregister is not part of this cluster", url.str());
  102. continue;
  103. }
  104. RegistryCode code;
  105. msg.read((int &)code);
  106. if (!rc_deregister == code)
  107. throwUnexpected();
  108. registry.deregisterNode(sender);
  109. }
  110. running = false;
  111. }
  112. } deregistrationWatch;
  113. public:
  114. Linked<CMasterWatchdogBase> watchdog;
  115. IBitSet *status;
  116. CRegistryServer() : deregistrationWatch(*this), stopped(false)
  117. {
  118. status = createBitSet();
  119. msgDelay = SLAVEREG_VERIFY_DELAY;
  120. slavesRegistered = 0;
  121. if (globals->getPropBool("@watchdogEnabled"))
  122. watchdog.setown(createMasterWatchdog(globals->getPropBool("@useUDPWatchdog")));
  123. else
  124. globals->setPropBool("@watchdogProgressEnabled", false);
  125. CriticalBlock b(regCrit);
  126. registryServer = this;
  127. }
  128. ~CRegistryServer()
  129. {
  130. CriticalBlock b(regCrit);
  131. registryServer = NULL;
  132. stop();
  133. if (watchdog)
  134. watchdog->stop();
  135. shutdown();
  136. status->Release();
  137. }
  138. static CRegistryServer *getRegistryServer()
  139. {
  140. CriticalBlock b(regCrit);
  141. return LINK(registryServer);
  142. }
  143. void deregisterNode(unsigned slave)
  144. {
  145. const SocketEndpoint &ep = queryClusterGroup().queryNode(slave).endpoint();
  146. StringBuffer url;
  147. ep.getUrlStr(url);
  148. if (!status->test(slave-1))
  149. {
  150. PROGLOG("Slave %d (%s) trying to unregister, but not currently registered", slave, url.str());
  151. return;
  152. }
  153. PROGLOG("Slave %d (%s) unregistered", slave, url.str());
  154. status->set(slave-1, false);
  155. --slavesRegistered;
  156. if (watchdog)
  157. watchdog->removeSlave(ep);
  158. abortThor(MakeThorOperatorException(TE_AbortException, "The machine %s and/or the slave was shutdown. Aborting Thor", url.str()));
  159. }
  160. void registerNode(unsigned slave)
  161. {
  162. SocketEndpoint ep = queryClusterGroup().queryNode(slave).endpoint();
  163. StringBuffer url;
  164. ep.getUrlStr(url);
  165. if (status->test(slave-1))
  166. {
  167. PROGLOG("Slave %d (%s) already registered, rejecting", slave, url.str());
  168. return;
  169. }
  170. PROGLOG("Slave %d (%s) registered", slave, url.str());
  171. status->set(slave-1);
  172. if (watchdog)
  173. watchdog->addSlave(ep);
  174. ++slavesRegistered;
  175. }
  176. bool connect()
  177. {
  178. LOG(MCdebugProgress, thorJob, "Waiting for %d slaves to register", queryClusterWidth());
  179. unsigned timeWaited = 0;
  180. unsigned connected = 0;
  181. unsigned slaves = queryClusterWidth();
  182. Owned<IBitSet> connectedSet = createBitSet();
  183. loop
  184. {
  185. CTimeMon tm(msgDelay);
  186. UnsignedArray todo;
  187. unsigned s = 0;
  188. while ((s=connectedSet->scan(s, false)) < slaves)
  189. todo.append(s++);
  190. Owned<IShuffledIterator> shuffled = createShuffledIterator(todo.ordinality());
  191. ForEach(*shuffled)
  192. {
  193. s = todo.item(shuffled->get());
  194. unsigned remaining;
  195. if (tm.timedout(&remaining))
  196. break;
  197. PROGLOG("Verifying connection to slave %d", s+1);
  198. if (queryWorldCommunicator().verifyConnection(&queryClusterGroup().queryNode(s+1), remaining))
  199. {
  200. StringBuffer str;
  201. PROGLOG("verified connection with %s", queryClusterGroup().queryNode(s+1).endpoint().getUrlStr(str.clear()).str());
  202. ++connected;
  203. connectedSet->set(s);
  204. }
  205. if (stopped)
  206. return false;
  207. }
  208. timeWaited += tm.elapsed();
  209. if (connected == slaves)
  210. break;
  211. else
  212. {
  213. if (timeWaited >= MAX_SLAVEREG_DELAY)
  214. throw MakeThorException(TE_AbortException, "Have waited over %d minutes for all slaves to connect, quitting.", MAX_SLAVEREG_DELAY/1000/60);
  215. unsigned outstanding = slaves - connected;
  216. PROGLOG("Still Waiting for minimum %d slaves to connect", outstanding);
  217. if ((outstanding) <= 5)
  218. {
  219. unsigned s=0;
  220. loop
  221. {
  222. unsigned ns = connectedSet->scan(s, false);
  223. if (ns<s || ns >= slaves)
  224. break;
  225. s = ns+1;
  226. StringBuffer str;
  227. PROGLOG("waiting for slave %d (%s)", s, queryClusterGroup().queryNode(s).endpoint().getUrlStr(str.clear()).str());
  228. }
  229. }
  230. msgDelay = (unsigned) ((float)msgDelay * 1.5);
  231. if (timeWaited+msgDelay > MAX_SLAVEREG_DELAY)
  232. msgDelay = MAX_SLAVEREG_DELAY - timeWaited;
  233. }
  234. }
  235. PROGLOG("Slaves connected, initializing..");
  236. CMessageBuffer msg;
  237. msg.append(THOR_VERSION_MAJOR).append(THOR_VERSION_MINOR);
  238. queryClusterGroup().serialize(msg);
  239. globals->serialize(msg);
  240. msg.append(masterSlaveMpTag);
  241. if (!queryClusterComm().send(msg, RANK_ALL_OTHER, MPTAG_THORREGISTRATION, MP_ASYNC_SEND))
  242. {
  243. PROGLOG("Failed to initialize slaves");
  244. return false;
  245. }
  246. PROGLOG("Initialization sent to slave group");
  247. try
  248. {
  249. while (slavesRegistered < slaves)
  250. {
  251. rank_t sender;
  252. CMessageBuffer msg;
  253. if (!queryClusterComm().recv(msg, RANK_ALL, MPTAG_THORREGISTRATION, &sender, MAX_SLAVEREG_DELAY))
  254. {
  255. PROGLOG("Slaves not responding to cluster initialization: ");
  256. unsigned s=0;
  257. loop
  258. {
  259. unsigned ns = status->scan(s, false);
  260. if (ns<s || ns >= slaves)
  261. break;
  262. s = ns+1;
  263. StringBuffer str;
  264. PROGLOG("Slave %d (%s)", s, queryClusterGroup().queryNode(s).endpoint().getUrlStr(str.clear()).str());
  265. }
  266. throw MakeThorException(TE_AbortException, "Slaves failed to respond to cluster initialization");
  267. }
  268. StringBuffer str;
  269. PROGLOG("Registration confirmation from %s", queryClusterGroup().queryNode(sender).endpoint().getUrlStr(str).str());
  270. if (msg.length())
  271. {
  272. Owned<IException> e = deserializeException(msg);
  273. EXCLOG(e, "Registration error");
  274. if (TE_FailedToRegisterSlave == e->errorCode())
  275. {
  276. setExitCode(0); // to avoid thor auto-recycling
  277. return false;
  278. }
  279. }
  280. registerNode(sender);
  281. }
  282. PROGLOG("Slaves initialized");
  283. unsigned s=0;
  284. for (; s<slaves; s++)
  285. {
  286. CMessageBuffer msg;
  287. if (!queryClusterComm().send(msg, s+1, MPTAG_THORREGISTRATION))
  288. {
  289. PROGLOG("Failed to acknowledge slave %d registration", s+1);
  290. return false;
  291. }
  292. }
  293. deregistrationWatch.start();
  294. return true;
  295. }
  296. catch (IException *e)
  297. {
  298. EXCLOG(e, "Slave registration exception");
  299. e->Release();
  300. }
  301. shutdown();
  302. return false;
  303. }
  304. void stop()
  305. {
  306. if (stopped)
  307. return;
  308. stopped = true;
  309. deregistrationWatch.stop();
  310. queryWorldCommunicator().cancel(NULL, MPTAG_THORREGISTRATION);
  311. }
  312. void shutdown()
  313. {
  314. CriticalBlock block(crit);
  315. unsigned i=0;
  316. for (; i<queryClusterWidth(); i++)
  317. {
  318. if (status->test(i))
  319. {
  320. status->set(i, false);
  321. SocketEndpoint ep = queryClusterGroup().queryNode(i+1).endpoint();
  322. if (watchdog)
  323. watchdog->removeSlave(ep);
  324. CMessageBuffer msg;
  325. msg.append((unsigned)Shutdown);
  326. try
  327. {
  328. queryClusterComm().send(msg, i+1, masterSlaveMpTag, MP_ASYNC_SEND);
  329. }
  330. catch (IMP_Exception *e) { e->Release(); }
  331. catch (IException *e)
  332. {
  333. EXCLOG(e, "Shutting down slave");
  334. e->Release();
  335. }
  336. }
  337. }
  338. }
  339. };
  340. CriticalSection CRegistryServer::regCrit;
  341. CRegistryServer *CRegistryServer::registryServer = NULL;
  342. //
  343. //////////////////
  344. bool checkClusterRelicateDAFS(IGroup *grp)
  345. {
  346. // check the dafilesrv is running (and right version)
  347. unsigned start = msTick();
  348. PROGLOG("Checking cluster replicate nodes");
  349. SocketEndpointArray epa;
  350. grp->getSocketEndpoints(epa);
  351. ForEachItemIn(i1,epa) {
  352. epa.item(i1).port = getDaliServixPort();
  353. }
  354. SocketEndpointArray failures;
  355. UnsignedArray failedcodes;
  356. StringArray failedmessages;
  357. validateNodes(epa,NULL,NULL,true,NULL,0,failures,failedcodes,failedmessages);
  358. ForEachItemIn(i,failures) {
  359. SocketEndpoint ep(failures.item(i));
  360. ep.port = 0;
  361. StringBuffer ips;
  362. ep.getIpText(ips);
  363. FLLOG(MCoperatorError, thorJob, "VALIDATE FAILED(%d) %s : %s",failedcodes.item(i),ips.str(),failedmessages.item(i));
  364. }
  365. PROGLOG("Cluster replicate nodes check completed in %dms",msTick()-start);
  366. return (failures.ordinality()==0);
  367. }
  368. static bool auditStartLogged = false;
  369. static bool firstCtrlC = true;
  370. bool ControlHandler()
  371. {
  372. if (firstCtrlC)
  373. {
  374. LOG(MCdebugProgress, thorJob, "CTRL-C detected");
  375. firstCtrlC = false;
  376. {
  377. Owned<CRegistryServer> registry = CRegistryServer::getRegistryServer();
  378. if (registry)
  379. registry->stop();
  380. }
  381. abortThor();
  382. }
  383. else
  384. {
  385. LOG(MCdebugProgress, thorJob, "2nd CTRL-C detected - terminating process");
  386. if (auditStartLogged)
  387. {
  388. auditStartLogged = false;
  389. LOG(daliAuditLogCat,",Progress,Thor,Terminate,%s,%s,%s,ctrlc",
  390. queryServerStatus().queryProperties()->queryProp("@thorname"),
  391. queryServerStatus().queryProperties()->queryProp("@nodeGroup"),
  392. queryServerStatus().queryProperties()->queryProp("@queue"));
  393. }
  394. queryLogMsgManager()->flushQueue(10*1000);
  395. #ifdef _WIN32
  396. TerminateProcess(GetCurrentProcess(), 1);
  397. #else
  398. //MORE- verify this
  399. kill(getpid(), SIGKILL);
  400. #endif
  401. _exit(1);
  402. }
  403. return false;
  404. }
  405. #include "thactivitymaster.hpp"
  406. int main( int argc, char *argv[] )
  407. {
  408. #if defined(WIN32) && defined(_DEBUG)
  409. int tmpFlag = _CrtSetDbgFlag( _CRTDBG_REPORT_FLAG );
  410. tmpFlag |= _CRTDBG_LEAK_CHECK_DF;
  411. _CrtSetDbgFlag( tmpFlag );
  412. #endif
  413. loadMasters(); // actually just a dummy call to ensure dll linked
  414. InitModuleObjects();
  415. NoQuickEditSection xxx;
  416. {
  417. Owned<IFile> iFile = createIFile("thor.xml");
  418. globals = iFile->exists() ? createPTree(*iFile, ipt_caseInsensitive) : createPTree("Thor", ipt_caseInsensitive);
  419. }
  420. globals->setProp("@masterBuildTag", BUILD_TAG);
  421. char **pp = argv+1;
  422. while (*pp)
  423. loadCmdProp(globals, *pp++);
  424. setIORetryCount(globals->getPropInt("Debug/@ioRetries")); // default == 0 == off
  425. StringBuffer daliServer;
  426. if (!globals->getProp("@DALISERVERS", daliServer))
  427. {
  428. LOG(MCerror, thorJob, "No Dali server list specified in THOR.XML (DALISERVERS=iport,iport...)\n");
  429. return 0; // no recycle
  430. }
  431. SocketEndpoint thorEp;
  432. const char *master = globals->queryProp("@MASTER");
  433. if (master)
  434. {
  435. thorEp.set(master);
  436. thorEp.setLocalHost(thorEp.port);
  437. }
  438. else
  439. thorEp.setLocalHost(0);
  440. setMasterPortBase(thorEp.port); // both same
  441. thorEp.port = getMasterPortBase();
  442. // Remove sentinel asap
  443. Owned<IFile> sentinelFile = createSentinelTarget();
  444. removeSentinelFile(sentinelFile);
  445. setMachinePortBase(thorEp.port);
  446. unsigned slavePort = globals->hasProp("@slaveport")?atoi(globals->queryProp("@slaveport")):THOR_BASESLAVE_PORT;
  447. EnableSEHtoExceptionMapping();
  448. #ifndef __64BIT__
  449. // Restrict stack sizes on 32-bit systems
  450. Thread::setDefaultStackSize(0x10000); // NB under windows requires linker setting (/stack:)
  451. #endif
  452. const char *thorname = NULL;
  453. StringBuffer nodeGroup, logUrl;
  454. Owned<IPerfMonHook> perfmonhook;
  455. ILogMsgHandler *logHandler;
  456. try
  457. {
  458. {
  459. Owned<IComponentLogFileCreator> lf = createComponentLogFileCreator(globals, "thor");
  460. lf->setName("thormaster");//override default filename
  461. lf->setCreateAliasFile(false);
  462. logHandler = lf->beginLogging();
  463. createUNCFilename(lf->queryLogFileSpec(), logUrl, false);
  464. }
  465. LOG(MCdebugProgress, thorJob, "Opened log file %s", logUrl.toCharArray());
  466. LOG(MCdebugProgress, thorJob, "Build %s", BUILD_TAG);
  467. globals->setProp("@logURL", logUrl.str());
  468. Owned<IGroup> serverGroup = createIGroup(daliServer.str(), DALI_SERVER_PORT);
  469. unsigned retry = 0;
  470. loop {
  471. try {
  472. unsigned port = getFixedPort(TPORT_mp);
  473. LOG(MCdebugProgress, thorJob, "calling initClientProcess Port %d", port);
  474. initClientProcess(serverGroup, DCR_ThorMaster, port);
  475. break;
  476. }
  477. catch (IJSOCK_Exception *e) {
  478. if ((e->errorCode()!=JSOCKERR_port_in_use))
  479. throw;
  480. FLLOG(MCexception(e), thorJob, e,"InitClientProcess");
  481. if (retry++>10)
  482. throw;
  483. e->Release();
  484. LOG(MCdebugProgress, thorJob, "Retrying");
  485. Sleep(retry*2000);
  486. }
  487. }
  488. setPasswordsFromSDS();
  489. if (globals->getPropBool("@enableSysLog",true))
  490. UseSysLogForOperatorMessages();
  491. thorname = globals->queryProp("@name");
  492. if (!thorname)
  493. {
  494. PROGLOG("No 'name' setting, defaulting to \"local\"");
  495. thorname = "local";
  496. globals->setProp("@name", thorname);
  497. }
  498. Owned<IGroup> thorGroup;
  499. OwnedIFile iFile = createIFile("thorgroup");
  500. if (iFile->exists())
  501. {
  502. PROGLOG("Found file 'thorgroup', using to form thor group");
  503. OwnedIFileIO iFileIO = iFile->open(IFOread);
  504. MemoryBuffer slaveListMb;
  505. size32_t sz = (size32_t)iFileIO->size();
  506. if (sz)
  507. {
  508. void *b = slaveListMb.reserveTruncate(sz);
  509. verifyex(sz = iFileIO->read(0, sz, b));
  510. IArrayOf<INode> nodes;
  511. nodes.append(*LINK(queryMyNode()));
  512. const char *n = (const char *)b;
  513. const char *e = n + sz;
  514. loop
  515. {
  516. while (n < e && isspace(*n)) n++;
  517. if (n == e || !*n) break;
  518. const char *start = n;
  519. while (!isspace(*n)) n++;
  520. StringBuffer nodeStr;
  521. nodeStr.append(n-start, start);
  522. SocketEndpoint ep = nodeStr.str();
  523. ep.port = getFixedPort(ep.port, TPORT_mp);
  524. INode *node = createINode(ep);
  525. nodes.append(*node);
  526. }
  527. assertex(nodes.ordinality());
  528. thorGroup.setown(createIGroup(nodes.ordinality(), nodes.getArray()));
  529. }
  530. else
  531. {
  532. ERRLOG("'thorgroup' file is empty");
  533. return 0; // no recycle
  534. }
  535. }
  536. if (!globals->getProp("@nodeGroup", nodeGroup)) {
  537. nodeGroup.append(thorname);
  538. globals->setProp("@nodeGroup", thorname);
  539. }
  540. if (!thorGroup)
  541. {
  542. thorGroup.setown(queryNamedGroupStore().lookup(nodeGroup.str()));
  543. if (!thorGroup)
  544. {
  545. ERRLOG("Named group '%s' not found", nodeGroup.str());
  546. return 0; // no recycle
  547. }
  548. else
  549. {
  550. IArrayOf<INode> nodes;
  551. nodes.append(*LINK(queryMyNode()));
  552. unsigned r=0;
  553. for (; r<thorGroup->ordinality(); r++)
  554. {
  555. SocketEndpoint ep = thorGroup->queryNode(r).endpoint();
  556. if (!ep.port)
  557. ep.port = getFixedPort(slavePort, TPORT_mp);
  558. nodes.append(*createINode(ep));
  559. }
  560. thorGroup.setown(createIGroup(nodes.ordinality(), nodes.getArray()));
  561. }
  562. }
  563. setClusterGroup(thorGroup);
  564. if (globals->getPropBool("@replicateOutputs")&&globals->getPropBool("@validateDAFS",true)&&!checkClusterRelicateDAFS(thorGroup)) {
  565. FLLOG(MCoperatorError, thorJob, "ERROR: Validate failure(s) detected, exiting Thor");
  566. return globals->getPropBool("@validateDAFSretCode"); // default is no recycle!
  567. }
  568. if (globals->getPropBool("@useNASTranslation", true))
  569. {
  570. Owned<IPropertyTree> nasConfig = envGetNASConfiguration();
  571. if (nasConfig)
  572. globals->setPropTree("NAS", nasConfig.getLink()); // for use by slaves
  573. Owned<IPropertyTree> masterNasFilters = envGetInstallNASHooks(nasConfig, &thorEp);
  574. }
  575. HardwareInfo hdwInfo;
  576. getHardwareInfo(hdwInfo);
  577. globals->setPropInt("@masterTotalMem", hdwInfo.totalMemory);
  578. unsigned mmemSize = globals->getPropInt("@masterMemorySize"); // in MB
  579. unsigned gmemSize = globals->getPropInt("@globalMemorySize"); // in MB
  580. if (0 == gmemSize)
  581. {
  582. unsigned maxMem = hdwInfo.totalMemory;
  583. #ifdef _WIN32
  584. if (maxMem > 2048)
  585. maxMem = 2048;
  586. #else
  587. #ifndef __64BIT__
  588. if (maxMem > 2048)
  589. {
  590. // 32 bit OS doesn't handle whole physically installed RAM
  591. maxMem = 2048;
  592. }
  593. #ifdef __ARM_ARCH_7A__
  594. // For ChromeBook with 2GB RAM
  595. if (maxMem <= 2048)
  596. {
  597. // Decrease max memory to 2/3
  598. maxMem = maxMem * 2 / 3;
  599. }
  600. #endif
  601. #endif
  602. #endif
  603. if (globals->getPropBool("@localThor") && 0 == mmemSize)
  604. {
  605. gmemSize = maxMem / 2; // 50% of total for slaves
  606. mmemSize = maxMem / 4; // 25% of total for master
  607. }
  608. else
  609. {
  610. gmemSize = maxMem * 3 / 4; // 75% of total for slaves
  611. if (0 == mmemSize)
  612. mmemSize = gmemSize; // default to same as slaves
  613. }
  614. unsigned perSlaveSize = gmemSize;
  615. unsigned slavesPerNode = globals->getPropInt("@slavesPerNode", 1);
  616. if (slavesPerNode>1)
  617. {
  618. PROGLOG("Sharing globalMemorySize(%d MB), between %d slave. %d MB each", perSlaveSize, slavesPerNode, perSlaveSize / slavesPerNode);
  619. perSlaveSize /= slavesPerNode;
  620. }
  621. globals->setPropInt("@globalMemorySize", perSlaveSize);
  622. }
  623. else
  624. {
  625. if (gmemSize >= hdwInfo.totalMemory)
  626. {
  627. // should prob. error here
  628. }
  629. if (0 == mmemSize)
  630. mmemSize = gmemSize;
  631. }
  632. bool gmemAllowHugePages = globals->getPropBool("@heapUseHugePages", false);
  633. // if @masterMemorySize and @globalMemorySize unspecified gmemSize will be default based on h/w
  634. globals->setPropInt("@masterMemorySize", mmemSize);
  635. PROGLOG("Global memory size = %d MB", mmemSize);
  636. roxiemem::setTotalMemoryLimit(gmemAllowHugePages, ((memsize_t)mmemSize) * 0x100000, 0, NULL);
  637. const char * overrideBaseDirectory = globals->queryProp("@thorDataDirectory");
  638. const char * overrideReplicateDirectory = globals->queryProp("@thorReplicateDirectory");
  639. StringBuffer datadir;
  640. StringBuffer repdir;
  641. if (getConfigurationDirectory(globals->queryPropTree("Directories"),"data","thor",globals->queryProp("@name"),datadir))
  642. overrideBaseDirectory = datadir.str();
  643. if (getConfigurationDirectory(globals->queryPropTree("Directories"),"mirror","thor",globals->queryProp("@name"),repdir))
  644. overrideReplicateDirectory = repdir.str();
  645. if (overrideBaseDirectory&&*overrideBaseDirectory)
  646. setBaseDirectory(overrideBaseDirectory, false);
  647. if (overrideReplicateDirectory&&*overrideBaseDirectory)
  648. setBaseDirectory(overrideReplicateDirectory, true);
  649. StringBuffer tempDirStr;
  650. if (getConfigurationDirectory(globals->queryPropTree("Directories"),"temp","thor",globals->queryProp("@name"), tempDirStr))
  651. globals->setProp("@thorTempDirectory", tempDirStr.str());
  652. else
  653. tempDirStr.append(globals->queryProp("@thorTempDirectory"));
  654. logDiskSpace(); // Log before temp space is cleared
  655. StringBuffer tempPrefix("thtmp");
  656. tempPrefix.append(getMasterPortBase()).append("_");
  657. SetTempDir(tempDirStr.str(), tempPrefix.str(), true);
  658. char thorPath[1024];
  659. if (!GetCurrentDirectory(1024, thorPath)) {
  660. ERRLOG("ThorMaster::main: Current directory path too big, setting it to null");
  661. thorPath[0] = 0;
  662. }
  663. unsigned l = strlen(thorPath);
  664. if (l) { thorPath[l] = PATHSEPCHAR; thorPath[l+1] = '\0'; }
  665. globals->setProp("@thorPath", thorPath);
  666. StringBuffer soDir, soPath;
  667. if (getConfigurationDirectory(globals->queryPropTree("Directories"),"query","thor",globals->queryProp("@name"),soDir))
  668. globals->setProp("@query_so_dir", soDir.str());
  669. else if (!globals->getProp("@query_so_dir", soDir)) {
  670. globals->setProp("@query_so_dir", DEFAULT_QUERY_SO_DIR);
  671. soDir.append(DEFAULT_QUERY_SO_DIR);
  672. }
  673. if (isAbsolutePath(soDir.str()))
  674. soPath.append(soDir);
  675. else
  676. {
  677. soPath.append(thorPath);
  678. addPathSepChar(soPath);
  679. soPath.append(soDir);
  680. }
  681. addPathSepChar(soPath);
  682. globals->setProp("@query_so_dir", soPath.str());
  683. recursiveCreateDirectory(soPath.str());
  684. startLogMsgParentReceiver();
  685. connectLogMsgManagerToDali();
  686. if (globals->getPropBool("@cache_dafilesrv_master",false))
  687. setDaliServixSocketCaching(true); // speeds up deletes under linux
  688. }
  689. catch (IException *e)
  690. {
  691. FLLOG(MCexception(e), thorJob, e,"ThorMaster");
  692. e->Release();
  693. return -1;
  694. }
  695. StringBuffer queueName;
  696. SCMStringBuffer _queueNames;
  697. const char *thorName = globals->queryProp("@name");
  698. if (!thorName) thorName = "thor";
  699. getThorQueueNames(_queueNames, thorName);
  700. queueName.set(_queueNames.str());
  701. try {
  702. CSDSServerStatus &serverStatus = openThorServerStatus();
  703. Owned<CRegistryServer> registry = new CRegistryServer();
  704. StringBuffer thorEpStr;
  705. LOG(MCdebugProgress, thorJob, "ThorMaster version %d.%d, Started on %s", THOR_VERSION_MAJOR,THOR_VERSION_MINOR,thorEp.getUrlStr(thorEpStr).toCharArray());
  706. LOG(MCdebugProgress, thorJob, "Thor name = %s, queue = %s, nodeGroup = %s",thorname,queueName.str(),nodeGroup.str());
  707. serverStatus.queryProperties()->setProp("@thorname", thorname);
  708. serverStatus.queryProperties()->setProp("@cluster", nodeGroup.str()); // JCSMORE rename
  709. serverStatus.queryProperties()->setProp("LogFile", logUrl.str()); // LogFile read by eclwatch (possibly)
  710. serverStatus.queryProperties()->setProp("@nodeGroup", nodeGroup.str());
  711. serverStatus.queryProperties()->setProp("@queue", queueName.str());
  712. serverStatus.commitProperties();
  713. addAbortHandler(ControlHandler);
  714. masterSlaveMpTag = allocateClusterMPTag();
  715. writeSentinelFile(sentinelFile);
  716. if (registry->connect())
  717. {
  718. PROGLOG("verifying mp connection to rest of cluster");
  719. if (!queryClusterComm().verifyAll())
  720. ERRLOG("Failed to connect to all nodes");
  721. else
  722. PROGLOG("verified mp connection to rest of cluster");
  723. LOG(daliAuditLogCat, ",Progress,Thor,Startup,%s,%s,%s,%s",nodeGroup.str(),thorname,queueName.str(),logUrl.str());
  724. auditStartLogged = true;
  725. thorMain(logHandler);
  726. LOG(daliAuditLogCat, ",Progress,Thor,Terminate,%s,%s,%s",thorname,nodeGroup.str(),queueName.str());
  727. }
  728. else
  729. PROGLOG("Registration aborted");
  730. LOG(MCdebugProgress, thorJob, "ThorMaster terminated OK");
  731. }
  732. catch (IException *e)
  733. {
  734. FLLOG(MCexception(e), thorJob, e,"ThorMaster");
  735. e->Release();
  736. }
  737. disconnectLogMsgManagerFromDali();
  738. closeThorServerStatus();
  739. if (globals) globals->Release();
  740. PROGLOG("Thor closing down 5");
  741. PROGLOG("Thor closing down 4");
  742. closeDllServer();
  743. PROGLOG("Thor closing down 3");
  744. closeEnvironment();
  745. PROGLOG("Thor closing down 2");
  746. closedownClientProcess();
  747. PROGLOG("Thor closing down 1");
  748. UseSysLogForOperatorMessages(false);
  749. releaseAtoms(); // don't know why we can't use a module_exit to destruct this...
  750. return queryExitCode();
  751. }