thslavemain.cpp 21 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600
  1. /*##############################################################################
  2. HPCC SYSTEMS software Copyright (C) 2012 HPCC Systems®.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. ############################################################################## */
  13. // Entrypoint for ThorSlave.EXE
  14. #include "platform.h"
  15. #include <stddef.h>
  16. #include <stdlib.h>
  17. #include <assert.h>
  18. #include <stdio.h>
  19. #include <stdlib.h>
  20. #include "jlib.hpp"
  21. #include "jdebug.hpp"
  22. #include "jexcept.hpp"
  23. #include "jfile.hpp"
  24. #include "jmisc.hpp"
  25. #include "jprop.hpp"
  26. #include "jthread.hpp"
  27. #include "thormisc.hpp"
  28. #include "slavmain.hpp"
  29. #include "thorport.hpp"
  30. #include "thexception.hpp"
  31. #include "thmem.hpp"
  32. #include "thbuf.hpp"
  33. #include "mpbase.hpp"
  34. #include "mplog.hpp"
  35. #include "daclient.hpp"
  36. #include "dalienv.hpp"
  37. #include "slave.hpp"
  38. #include "portlist.h"
  39. #include "dafdesc.hpp"
  40. #include "rmtfile.hpp"
  41. #include "slavmain.hpp"
  42. #ifdef _CONTAINERIZED
  43. #include "dafsserver.hpp"
  44. #endif
  45. // #define USE_MP_LOG
  46. static INode *masterNode = NULL;
  47. MODULE_INIT(INIT_PRIORITY_STANDARD)
  48. {
  49. return true;
  50. }
  51. MODULE_EXIT()
  52. {
  53. ::Release(masterNode);
  54. }
  55. #ifdef _DEBUG
  56. USE_JLIB_ALLOC_HOOK;
  57. #endif
  58. static SocketEndpoint slfEp;
  59. static unsigned mySlaveNum;
  60. static const unsigned defaultStrandBlockSize = 512;
  61. static const unsigned defaultForceNumStrands = 0;
  62. static const char **cmdArgs;
  63. static void replyError(unsigned errorCode, const char *errorMsg)
  64. {
  65. SocketEndpoint myEp = queryMyNode()->endpoint();
  66. StringBuffer str("Node '");
  67. myEp.getUrlStr(str);
  68. str.append("' exception: ").append(errorMsg);
  69. Owned<IException> e = MakeStringException(errorCode, "%s", str.str());
  70. CMessageBuffer msg;
  71. serializeException(e, msg);
  72. queryNodeComm().send(msg, 0, MPTAG_THORREGISTRATION);
  73. }
  74. static std::atomic<bool> isRegistered {false};
  75. static bool RegisterSelf(SocketEndpoint &masterEp)
  76. {
  77. StringBuffer slfStr;
  78. StringBuffer masterStr;
  79. LOG(MCdebugProgress, thorJob, "registering %s - master %s",slfEp.getUrlStr(slfStr).str(),masterEp.getUrlStr(masterStr).str());
  80. try
  81. {
  82. SocketEndpoint ep = masterEp;
  83. ep.port = getFixedPort(getMasterPortBase(), TPORT_mp);
  84. Owned<INode> masterNode = createINode(ep);
  85. CMessageBuffer msg;
  86. msg.append(mySlaveNum);
  87. queryWorldCommunicator().send(msg, masterNode, MPTAG_THORREGISTRATION);
  88. if (!queryWorldCommunicator().recv(msg, masterNode, MPTAG_THORREGISTRATION))
  89. return false;
  90. PROGLOG("Initialization received");
  91. unsigned vmajor, vminor;
  92. msg.read(vmajor);
  93. msg.read(vminor);
  94. Owned<IGroup> processGroup = deserializeIGroup(msg);
  95. mySlaveNum = (unsigned)processGroup->rank(queryMyNode());
  96. assertex(NotFound != mySlaveNum);
  97. mySlaveNum++; // 1 based;
  98. unsigned configSlaveNum = globals->getPropInt("@slavenum", NotFound);
  99. globals.setown(createPTree(msg));
  100. if (NotFound == configSlaveNum)
  101. globals->setPropInt("@slavenum", mySlaveNum);
  102. else
  103. assertex(mySlaveNum == configSlaveNum);
  104. /* NB: preserve command line option overrides
  105. * Not sure if any cmdline options are actually needed by this stage..
  106. */
  107. loadArgsIntoConfiguration(globals, cmdArgs);
  108. #ifdef _DEBUG
  109. unsigned holdSlave = globals->getPropInt("@holdSlave", NotFound);
  110. if (mySlaveNum == holdSlave)
  111. {
  112. DBGLOG("Thor slave %u paused for debugging purposes, attach and set held=false to release", mySlaveNum);
  113. bool held = true;
  114. while (held)
  115. Sleep(5);
  116. }
  117. #endif
  118. unsigned channelsPerSlave = globals->getPropInt("@channelsPerSlave", 1);
  119. unsigned localThorPortInc = globals->getPropInt("@localThorPortInc", DEFAULT_SLAVEPORTINC);
  120. unsigned slaveBasePort = globals->getPropInt("@slaveport", DEFAULT_THORSLAVEPORT);
  121. setupCluster(masterNode, processGroup, channelsPerSlave, slaveBasePort, localThorPortInc);
  122. if (vmajor != THOR_VERSION_MAJOR || vminor != THOR_VERSION_MINOR)
  123. {
  124. replyError(TE_FailedToRegisterSlave, "Thor master/slave version mismatch");
  125. return false;
  126. }
  127. StringBuffer xpath;
  128. getExpertOptPath(nullptr, xpath); // 'expert' in container world, or 'Debug' in bare-metal
  129. ensurePTree(globals, xpath);
  130. unsigned numStrands, blockSize;
  131. getExpertOptPath("forceNumStrands", xpath.clear());
  132. if (globals->hasProp(xpath))
  133. numStrands = globals->getPropInt(xpath);
  134. else
  135. {
  136. numStrands = defaultForceNumStrands;
  137. globals->setPropInt(xpath, defaultForceNumStrands);
  138. }
  139. getExpertOptPath("strandBlockSize", xpath.clear());
  140. if (globals->hasProp(xpath))
  141. blockSize = globals->getPropInt(xpath);
  142. else
  143. {
  144. blockSize = defaultStrandBlockSize;
  145. globals->setPropInt(xpath, defaultStrandBlockSize);
  146. }
  147. PROGLOG("Strand defaults: numStrands=%u, blockSize=%u", numStrands, blockSize);
  148. const char *_masterBuildTag = globals->queryProp("@masterBuildTag");
  149. const char *masterBuildTag = _masterBuildTag?_masterBuildTag:"no build tag";
  150. PROGLOG("Master build: %s", masterBuildTag);
  151. if (!_masterBuildTag || 0 != strcmp(hpccBuildInfo.buildTag, _masterBuildTag))
  152. {
  153. StringBuffer errStr("Thor master/slave build mismatch, master = ");
  154. errStr.append(masterBuildTag).append(", slave = ").append(hpccBuildInfo.buildTag);
  155. OERRLOG("%s", errStr.str());
  156. #ifndef _DEBUG
  157. replyError(TE_FailedToRegisterSlave, errStr.str());
  158. return false;
  159. #endif
  160. }
  161. readUnderlyingType<mptag_t>(msg, masterSlaveMpTag);
  162. readUnderlyingType<mptag_t>(msg, kjServiceMpTag);
  163. msg.clear();
  164. if (!queryNodeComm().send(msg, 0, MPTAG_THORREGISTRATION))
  165. return false;
  166. PROGLOG("Registration confirmation sent");
  167. if (!queryNodeComm().recv(msg, 0, MPTAG_THORREGISTRATION))
  168. return false;
  169. PROGLOG("Registration confirmation receipt received");
  170. ::masterNode = LINK(masterNode);
  171. PROGLOG("verifying mp connection to rest of cluster");
  172. if (!queryNodeComm().verifyAll())
  173. OERRLOG("Failed to connect to all nodes");
  174. else
  175. PROGLOG("verified mp connection to rest of cluster");
  176. LOG(MCdebugProgress, thorJob, "registered %s",slfStr.str());
  177. }
  178. catch (IException *e)
  179. {
  180. FLLOG(MCexception(e), thorJob, e,"slave registration error");
  181. e->Release();
  182. return false;
  183. }
  184. isRegistered = true;
  185. return true;
  186. }
  187. static bool jobListenerStopped = true;
  188. bool UnregisterSelf(IException *e)
  189. {
  190. if (!hasMPServerStarted())
  191. return false;
  192. if (!isRegistered)
  193. return false;
  194. StringBuffer slfStr;
  195. slfEp.getUrlStr(slfStr);
  196. LOG(MCdebugProgress, thorJob, "Unregistering slave : %s", slfStr.str());
  197. try
  198. {
  199. CMessageBuffer msg;
  200. msg.append(rc_deregister);
  201. serializeException(e, msg); // NB: allows exception to be NULL
  202. if (!queryWorldCommunicator().send(msg, masterNode, MPTAG_THORREGISTRATION, 60*1000))
  203. {
  204. LOG(MCerror, thorJob, "Failed to unregister slave : %s", slfStr.str());
  205. return false;
  206. }
  207. LOG(MCdebugProgress, thorJob, "Unregistered slave : %s", slfStr.str());
  208. isRegistered = false;
  209. return true;
  210. }
  211. catch (IException *e) {
  212. if (!jobListenerStopped)
  213. FLLOG(MCexception(e), thorJob, e,"slave unregistration error");
  214. e->Release();
  215. }
  216. return false;
  217. }
  218. bool ControlHandler(ahType type)
  219. {
  220. if (ahInterrupt == type)
  221. LOG(MCdebugProgress, thorJob, "CTRL-C detected");
  222. else if (!jobListenerStopped)
  223. LOG(MCdebugProgress, thorJob, "SIGTERM detected");
  224. bool unregOK = false;
  225. if (!jobListenerStopped)
  226. {
  227. if (masterNode)
  228. unregOK = UnregisterSelf(NULL);
  229. abortSlave();
  230. }
  231. return !unregOK;
  232. }
  233. void usage()
  234. {
  235. printf("usage: thorslave MASTER=ip:port SLAVE=.:port DALISERVERS=ip:port\n");
  236. exit(1);
  237. }
  238. #ifdef _WIN32
  239. class CReleaseMutex : public CSimpleInterface, public Mutex
  240. {
  241. public:
  242. CReleaseMutex(const char *name) : Mutex(name) { }
  243. ~CReleaseMutex() { if (owner) unlock(); }
  244. };
  245. #endif
  246. ILogMsgHandler *startSlaveLog()
  247. {
  248. ILogMsgHandler *logHandler = nullptr;
  249. #ifndef _CONTAINERIZED
  250. StringBuffer fileName("thorslave");
  251. Owned<IComponentLogFileCreator> lf = createComponentLogFileCreator(globals->queryProp("@logDir"), "thor");
  252. StringBuffer slaveNumStr;
  253. lf->setPostfix(slaveNumStr.append(mySlaveNum).str());
  254. lf->setCreateAliasFile(false);
  255. lf->setName(fileName.str());//override default filename
  256. logHandler = lf->beginLogging();
  257. #ifndef _DEBUG
  258. // keep duplicate logging output to stderr to aide debugging
  259. queryLogMsgManager()->removeMonitor(queryStderrLogMsgHandler());
  260. #endif
  261. LOG(MCdebugProgress, thorJob, "Opened log file %s", lf->queryLogFileSpec());
  262. #else
  263. setupContainerizedLogMsgHandler();
  264. logHandler = queryStderrLogMsgHandler();
  265. #endif
  266. //setupContainerizedStorageLocations();
  267. LOG(MCdebugProgress, thorJob, "Build %s", hpccBuildInfo.buildTag);
  268. return logHandler;
  269. }
  270. void setSlaveAffinity(unsigned processOnNode)
  271. {
  272. const char * affinity = globals->queryProp("@affinity");
  273. if (affinity)
  274. setProcessAffinity(affinity);
  275. else if (globals->getPropBool("@autoAffinity", true))
  276. {
  277. const char * nodes = globals->queryProp("@autoNodeAffinityNodes");
  278. unsigned slavesPerNode = globals->getPropInt("@slavesPerNode", 1);
  279. setAutoAffinity(processOnNode, slavesPerNode, nodes);
  280. }
  281. //The default policy is to allocate from the local node, so restricting allocations to the current sockets
  282. //may not buy much once the affinity is set up. It also means it will fail if there is no memory left on
  283. //this socket - even if there is on others.
  284. //Therefore it is not recommended unless you have maybe several independent thors running on the same machines
  285. //with exclusive access to memory.
  286. if (globals->getPropBool("@numaBindLocal", false))
  287. bindMemoryToLocalNodes();
  288. }
  289. int main( int argc, const char *argv[] )
  290. {
  291. if (!checkCreateDaemon(argc, argv))
  292. return EXIT_FAILURE;
  293. #if defined(WIN32) && defined(_DEBUG)
  294. int tmpFlag = _CrtSetDbgFlag( _CRTDBG_REPORT_FLAG );
  295. tmpFlag |= _CRTDBG_LEAK_CHECK_DF;
  296. _CrtSetDbgFlag( tmpFlag );
  297. #endif
  298. InitModuleObjects();
  299. addAbortHandler(ControlHandler);
  300. EnableSEHtoExceptionMapping();
  301. dummyProc();
  302. #ifndef __64BIT__
  303. // Restrict stack sizes on 32-bit systems
  304. Thread::setDefaultStackSize(0x10000); // NB under windows requires linker setting (/stack:)
  305. #endif
  306. #ifdef _WIN32
  307. Owned<CReleaseMutex> globalNamedMutex;
  308. #endif
  309. globals.setown(createPTree("Thor"));
  310. unsigned multiThorMemoryThreshold = 0;
  311. Owned<IException> unregisterException;
  312. try
  313. {
  314. if (argc==1)
  315. {
  316. usage();
  317. return 1;
  318. }
  319. cmdArgs = argv+1;
  320. #ifdef _CONTAINERIZED
  321. globals.setown(loadConfiguration(thorDefaultConfigYaml, argv, "thor", "THOR", nullptr, nullptr, nullptr, false));
  322. #else
  323. loadArgsIntoConfiguration(globals, cmdArgs);
  324. #endif
  325. const char *master = globals->queryProp("@master");
  326. if (!master)
  327. usage();
  328. mySlaveNum = globals->getPropInt("@slavenum", NotFound);
  329. /* NB: in cloud/non-local storage mode, slave number is not known until after registration with the master
  330. * For the time being log file names are based on their slave number, so can only start when known.
  331. */
  332. ILogMsgHandler *slaveLogHandler = nullptr;
  333. if (NotFound != mySlaveNum)
  334. slaveLogHandler = startSlaveLog();
  335. // In container world, SLAVE= will not be used
  336. const char *slave = globals->queryProp("@slave");
  337. if (slave)
  338. {
  339. slfEp.set(slave);
  340. localHostToNIC(slfEp);
  341. }
  342. else
  343. slfEp.setLocalHost(0);
  344. // TBD: use new config/init system for generic handling of init settings vs command line overrides
  345. if (0 == slfEp.port) // assume default from config if not on command line
  346. slfEp.port = globals->getPropInt("@slaveport", THOR_BASESLAVE_PORT);
  347. startMPServer(DCR_ThorSlave, slfEp.port, false, true);
  348. if (0 == slfEp.port)
  349. slfEp.port = queryMyNode()->endpoint().port;
  350. setMachinePortBase(slfEp.port);
  351. setSlaveAffinity(globals->getPropInt("@slaveprocessnum"));
  352. if (globals->getPropBool("@MPChannelReconnect"))
  353. getMPServer()->setOpt(mpsopt_channelreopen, "true");
  354. #ifdef USE_MP_LOG
  355. startLogMsgParentReceiver();
  356. LOG(MCdebugProgress, thorJob, "MPServer started on port %d", getFixedPort(TPORT_mp));
  357. #endif
  358. SocketEndpoint masterEp(master);
  359. localHostToNIC(masterEp);
  360. setMasterPortBase(masterEp.port);
  361. markNodeCentral(masterEp);
  362. if (RegisterSelf(masterEp))
  363. {
  364. if (!slaveLogHandler)
  365. slaveLogHandler = startSlaveLog();
  366. if (getExpertOptBool("slaveDaliClient"))
  367. enableThorSlaveAsDaliClient();
  368. IDaFileSrvHook *daFileSrvHook = queryDaFileSrvHook();
  369. if (daFileSrvHook) // probably always installed
  370. daFileSrvHook->addFilters(globals->queryPropTree("NAS"), &slfEp);
  371. enableForceRemoteReads(); // forces file reads to be remote reads if they match environment setting 'forceRemotePattern' pattern.
  372. StringBuffer thorPath;
  373. globals->getProp("@thorPath", thorPath);
  374. recursiveCreateDirectory(thorPath.str());
  375. int err = _chdir(thorPath.str());
  376. if (err)
  377. {
  378. IException *e = makeErrnoExceptionV(-1, "Failed to change dir to '%s'", thorPath.str());
  379. FLLOG(MCexception(e), thorJob, e);
  380. throw e;
  381. }
  382. // Initialization from globals
  383. setIORetryCount((unsigned)getExpertOptInt64("ioRetries")); // default == 0 == off
  384. StringBuffer str;
  385. if (globals->getProp("@externalProgDir", str.clear()))
  386. _mkdir(str.str());
  387. else
  388. globals->setProp("@externalProgDir", thorPath);
  389. #ifndef _CONTAINERIZED
  390. const char * overrideBaseDirectory = globals->queryProp("@thorDataDirectory");
  391. const char * overrideReplicateDirectory = globals->queryProp("@thorReplicateDirectory");
  392. StringBuffer datadir;
  393. StringBuffer repdir;
  394. if (getConfigurationDirectory(globals->queryPropTree("Directories"),"data","thor",globals->queryProp("@name"),datadir))
  395. overrideBaseDirectory = datadir.str();
  396. if (getConfigurationDirectory(globals->queryPropTree("Directories"),"mirror","thor",globals->queryProp("@name"),repdir))
  397. overrideReplicateDirectory = repdir.str();
  398. if (!isEmptyString(overrideBaseDirectory))
  399. setBaseDirectory(overrideBaseDirectory, false);
  400. if (!isEmptyString(overrideReplicateDirectory))
  401. setBaseDirectory(overrideReplicateDirectory, true);
  402. if (getConfigurationDirectory(globals->queryPropTree("Directories"),"query","thor",globals->queryProp("@name"),str.clear()))
  403. globals->setProp("@query_so_dir", str.str());
  404. else
  405. globals->getProp("@query_so_dir", str.clear());
  406. if (str.length())
  407. {
  408. if (getExpertOptBool("dllsToSlaves", true))
  409. {
  410. StringBuffer uniqSoPath;
  411. if (PATHSEPCHAR == str.charAt(str.length()-1))
  412. uniqSoPath.append(str.length()-1, str.str());
  413. else
  414. uniqSoPath.append(str);
  415. uniqSoPath.append("_").append(getMachinePortBase());
  416. str.swapWith(uniqSoPath);
  417. globals->setProp("@query_so_dir", str.str());
  418. }
  419. PROGLOG("Using querySo directory: %s", str.str());
  420. recursiveCreateDirectory(str.str());
  421. }
  422. #endif
  423. useMemoryMappedRead(globals->getPropBool("@useMemoryMappedRead"));
  424. LOG(MCdebugProgress, thorJob, "ThorSlave Version LCR - %d.%d started",THOR_VERSION_MAJOR,THOR_VERSION_MINOR);
  425. #ifdef _WIN32
  426. ULARGE_INTEGER userfree;
  427. ULARGE_INTEGER total;
  428. ULARGE_INTEGER free;
  429. if (GetDiskFreeSpaceEx("c:\\",&userfree,&total,&free)&&total.QuadPart) {
  430. unsigned pc = (unsigned)(free.QuadPart*100/total.QuadPart);
  431. LOG(MCdebugProgress, thorJob, "Total disk space = %" I64F "d k", total.QuadPart/1000);
  432. LOG(MCdebugProgress, thorJob, "Free disk space = %" I64F "d k", free.QuadPart/1000);
  433. LOG(MCdebugProgress, thorJob, "%d%% disk free\n",pc);
  434. }
  435. #endif
  436. multiThorMemoryThreshold = globals->getPropInt("@multiThorMemoryThreshold")*0x100000;
  437. if (multiThorMemoryThreshold) {
  438. StringBuffer lgname;
  439. if (!globals->getProp("@multiThorResourceGroup",lgname))
  440. globals->getProp("@nodeGroup",lgname);
  441. if (lgname.length()) {
  442. Owned<ILargeMemLimitNotify> notify = createMultiThorResourceMutex(lgname.str());
  443. setMultiThorMemoryNotify(multiThorMemoryThreshold,notify);
  444. PROGLOG("Multi-Thor resource limit for %s set to %" I64F "d",lgname.str(),(__int64)multiThorMemoryThreshold);
  445. }
  446. else
  447. multiThorMemoryThreshold = 0;
  448. }
  449. #ifndef _CONTAINERIZED
  450. unsigned pinterval = globals->getPropInt("@system_monitor_interval",1000*60);
  451. if (pinterval)
  452. startPerformanceMonitor(pinterval, PerfMonStandard, nullptr);
  453. #endif
  454. #ifdef _CONTAINERIZED
  455. class CServerThread : public CSimpleInterfaceOf<IThreaded>
  456. {
  457. CThreaded threaded;
  458. Owned<IRemoteFileServer> dafsInstance;
  459. public:
  460. CServerThread() : threaded("CServerThread")
  461. {
  462. dafsInstance.setown(createRemoteFileServer());
  463. threaded.init(this);
  464. }
  465. ~CServerThread()
  466. {
  467. PROGLOG("Stopping dafilesrv");
  468. dafsInstance->stop();
  469. threaded.join();
  470. }
  471. // IThreaded
  472. virtual void threadmain() override
  473. {
  474. SocketEndpoint listenEp(DAFILESRV_PORT);
  475. try
  476. {
  477. PROGLOG("Starting dafilesrv");
  478. dafsInstance->run(nullptr, SSLNone, listenEp);
  479. }
  480. catch (IException *e)
  481. {
  482. EXCLOG(e, "dafilesrv error");
  483. throw;
  484. }
  485. }
  486. };
  487. OwnedPtr<CServerThread> dafsThread;
  488. if (globals->getPropBool("@_dafsStorage"))
  489. dafsThread.setown(new CServerThread);
  490. #endif
  491. installDefaultFileHooks(globals);
  492. slaveMain(jobListenerStopped, slaveLogHandler);
  493. }
  494. LOG(MCdebugProgress, thorJob, "ThorSlave terminated OK");
  495. }
  496. catch (IException *e)
  497. {
  498. if (!jobListenerStopped)
  499. FLLOG(MCexception(e), thorJob, e,"ThorSlave");
  500. unregisterException.setown(e);
  501. }
  502. #ifndef _CONTAINERIZED
  503. stopPerformanceMonitor();
  504. #endif
  505. if (multiThorMemoryThreshold)
  506. setMultiThorMemoryNotify(0,NULL);
  507. roxiemem::releaseRoxieHeap();
  508. if (unregisterException.get())
  509. UnregisterSelf(unregisterException);
  510. if (getExpertOptBool("slaveDaliClient"))
  511. disableThorSlaveAsDaliClient();
  512. #ifdef USE_MP_LOG
  513. stopLogMsgReceivers();
  514. #endif
  515. stopMPServer();
  516. releaseAtoms(); // don't know why we can't use a module_exit to destruct these...
  517. ExitModuleObjects(); // not necessary, atexit will call, but good for leak checking
  518. return 0;
  519. }