123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600 |
- /*##############################################################################
- HPCC SYSTEMS software Copyright (C) 2012 HPCC Systems®.
- Licensed under the Apache License, Version 2.0 (the "License");
- you may not use this file except in compliance with the License.
- You may obtain a copy of the License at
- http://www.apache.org/licenses/LICENSE-2.0
- Unless required by applicable law or agreed to in writing, software
- distributed under the License is distributed on an "AS IS" BASIS,
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- See the License for the specific language governing permissions and
- limitations under the License.
- ############################################################################## */
- // Entrypoint for ThorSlave.EXE
- #include "platform.h"
- #include <stddef.h>
- #include <stdlib.h>
- #include <assert.h>
- #include <stdio.h>
- #include <stdlib.h>
- #include "jlib.hpp"
- #include "jdebug.hpp"
- #include "jexcept.hpp"
- #include "jfile.hpp"
- #include "jmisc.hpp"
- #include "jprop.hpp"
- #include "jthread.hpp"
- #include "thormisc.hpp"
- #include "slavmain.hpp"
- #include "thorport.hpp"
- #include "thexception.hpp"
- #include "thmem.hpp"
- #include "thbuf.hpp"
- #include "mpbase.hpp"
- #include "mplog.hpp"
- #include "daclient.hpp"
- #include "dalienv.hpp"
- #include "slave.hpp"
- #include "portlist.h"
- #include "dafdesc.hpp"
- #include "rmtfile.hpp"
- #include "slavmain.hpp"
- #ifdef _CONTAINERIZED
- #include "dafsserver.hpp"
- #endif
- // #define USE_MP_LOG
- static INode *masterNode = NULL;
- MODULE_INIT(INIT_PRIORITY_STANDARD)
- {
- return true;
- }
- MODULE_EXIT()
- {
- ::Release(masterNode);
- }
- #ifdef _DEBUG
- USE_JLIB_ALLOC_HOOK;
- #endif
- static SocketEndpoint slfEp;
- static unsigned mySlaveNum;
- static const unsigned defaultStrandBlockSize = 512;
- static const unsigned defaultForceNumStrands = 0;
- static const char **cmdArgs;
- static void replyError(unsigned errorCode, const char *errorMsg)
- {
- SocketEndpoint myEp = queryMyNode()->endpoint();
- StringBuffer str("Node '");
- myEp.getUrlStr(str);
- str.append("' exception: ").append(errorMsg);
- Owned<IException> e = MakeStringException(errorCode, "%s", str.str());
- CMessageBuffer msg;
- serializeException(e, msg);
- queryNodeComm().send(msg, 0, MPTAG_THORREGISTRATION);
- }
- static std::atomic<bool> isRegistered {false};
- static bool RegisterSelf(SocketEndpoint &masterEp)
- {
- StringBuffer slfStr;
- StringBuffer masterStr;
- LOG(MCdebugProgress, thorJob, "registering %s - master %s",slfEp.getUrlStr(slfStr).str(),masterEp.getUrlStr(masterStr).str());
- try
- {
- SocketEndpoint ep = masterEp;
- ep.port = getFixedPort(getMasterPortBase(), TPORT_mp);
- Owned<INode> masterNode = createINode(ep);
- CMessageBuffer msg;
- msg.append(mySlaveNum);
- queryWorldCommunicator().send(msg, masterNode, MPTAG_THORREGISTRATION);
- if (!queryWorldCommunicator().recv(msg, masterNode, MPTAG_THORREGISTRATION))
- return false;
- PROGLOG("Initialization received");
- unsigned vmajor, vminor;
- msg.read(vmajor);
- msg.read(vminor);
- Owned<IGroup> processGroup = deserializeIGroup(msg);
- mySlaveNum = (unsigned)processGroup->rank(queryMyNode());
- assertex(NotFound != mySlaveNum);
- mySlaveNum++; // 1 based;
- unsigned configSlaveNum = globals->getPropInt("@slavenum", NotFound);
- globals.setown(createPTree(msg));
- if (NotFound == configSlaveNum)
- globals->setPropInt("@slavenum", mySlaveNum);
- else
- assertex(mySlaveNum == configSlaveNum);
- /* NB: preserve command line option overrides
- * Not sure if any cmdline options are actually needed by this stage..
- */
- loadArgsIntoConfiguration(globals, cmdArgs);
- #ifdef _DEBUG
- unsigned holdSlave = globals->getPropInt("@holdSlave", NotFound);
- if (mySlaveNum == holdSlave)
- {
- DBGLOG("Thor slave %u paused for debugging purposes, attach and set held=false to release", mySlaveNum);
- bool held = true;
- while (held)
- Sleep(5);
- }
- #endif
- unsigned channelsPerSlave = globals->getPropInt("@channelsPerSlave", 1);
- unsigned localThorPortInc = globals->getPropInt("@localThorPortInc", DEFAULT_SLAVEPORTINC);
- unsigned slaveBasePort = globals->getPropInt("@slaveport", DEFAULT_THORSLAVEPORT);
- setupCluster(masterNode, processGroup, channelsPerSlave, slaveBasePort, localThorPortInc);
- if (vmajor != THOR_VERSION_MAJOR || vminor != THOR_VERSION_MINOR)
- {
- replyError(TE_FailedToRegisterSlave, "Thor master/slave version mismatch");
- return false;
- }
- StringBuffer xpath;
- getExpertOptPath(nullptr, xpath); // 'expert' in container world, or 'Debug' in bare-metal
- ensurePTree(globals, xpath);
- unsigned numStrands, blockSize;
- getExpertOptPath("forceNumStrands", xpath.clear());
- if (globals->hasProp(xpath))
- numStrands = globals->getPropInt(xpath);
- else
- {
- numStrands = defaultForceNumStrands;
- globals->setPropInt(xpath, defaultForceNumStrands);
- }
- getExpertOptPath("strandBlockSize", xpath.clear());
- if (globals->hasProp(xpath))
- blockSize = globals->getPropInt(xpath);
- else
- {
- blockSize = defaultStrandBlockSize;
- globals->setPropInt(xpath, defaultStrandBlockSize);
- }
- PROGLOG("Strand defaults: numStrands=%u, blockSize=%u", numStrands, blockSize);
- const char *_masterBuildTag = globals->queryProp("@masterBuildTag");
- const char *masterBuildTag = _masterBuildTag?_masterBuildTag:"no build tag";
- PROGLOG("Master build: %s", masterBuildTag);
- if (!_masterBuildTag || 0 != strcmp(hpccBuildInfo.buildTag, _masterBuildTag))
- {
- StringBuffer errStr("Thor master/slave build mismatch, master = ");
- errStr.append(masterBuildTag).append(", slave = ").append(hpccBuildInfo.buildTag);
- OERRLOG("%s", errStr.str());
- #ifndef _DEBUG
- replyError(TE_FailedToRegisterSlave, errStr.str());
- return false;
- #endif
- }
- readUnderlyingType<mptag_t>(msg, masterSlaveMpTag);
- readUnderlyingType<mptag_t>(msg, kjServiceMpTag);
- msg.clear();
- if (!queryNodeComm().send(msg, 0, MPTAG_THORREGISTRATION))
- return false;
- PROGLOG("Registration confirmation sent");
- if (!queryNodeComm().recv(msg, 0, MPTAG_THORREGISTRATION))
- return false;
- PROGLOG("Registration confirmation receipt received");
- ::masterNode = LINK(masterNode);
- PROGLOG("verifying mp connection to rest of cluster");
- if (!queryNodeComm().verifyAll())
- OERRLOG("Failed to connect to all nodes");
- else
- PROGLOG("verified mp connection to rest of cluster");
- LOG(MCdebugProgress, thorJob, "registered %s",slfStr.str());
- }
- catch (IException *e)
- {
- FLLOG(MCexception(e), thorJob, e,"slave registration error");
- e->Release();
- return false;
- }
- isRegistered = true;
- return true;
- }
- static bool jobListenerStopped = true;
- bool UnregisterSelf(IException *e)
- {
- if (!hasMPServerStarted())
- return false;
- if (!isRegistered)
- return false;
- StringBuffer slfStr;
- slfEp.getUrlStr(slfStr);
- LOG(MCdebugProgress, thorJob, "Unregistering slave : %s", slfStr.str());
- try
- {
- CMessageBuffer msg;
- msg.append(rc_deregister);
- serializeException(e, msg); // NB: allows exception to be NULL
- if (!queryWorldCommunicator().send(msg, masterNode, MPTAG_THORREGISTRATION, 60*1000))
- {
- LOG(MCerror, thorJob, "Failed to unregister slave : %s", slfStr.str());
- return false;
- }
- LOG(MCdebugProgress, thorJob, "Unregistered slave : %s", slfStr.str());
- isRegistered = false;
- return true;
- }
- catch (IException *e) {
- if (!jobListenerStopped)
- FLLOG(MCexception(e), thorJob, e,"slave unregistration error");
- e->Release();
- }
- return false;
- }
- bool ControlHandler(ahType type)
- {
- if (ahInterrupt == type)
- LOG(MCdebugProgress, thorJob, "CTRL-C detected");
- else if (!jobListenerStopped)
- LOG(MCdebugProgress, thorJob, "SIGTERM detected");
- bool unregOK = false;
- if (!jobListenerStopped)
- {
- if (masterNode)
- unregOK = UnregisterSelf(NULL);
- abortSlave();
- }
- return !unregOK;
- }
- void usage()
- {
- printf("usage: thorslave MASTER=ip:port SLAVE=.:port DALISERVERS=ip:port\n");
- exit(1);
- }
- #ifdef _WIN32
- class CReleaseMutex : public CSimpleInterface, public Mutex
- {
- public:
- CReleaseMutex(const char *name) : Mutex(name) { }
- ~CReleaseMutex() { if (owner) unlock(); }
- };
- #endif
- ILogMsgHandler *startSlaveLog()
- {
- ILogMsgHandler *logHandler = nullptr;
- #ifndef _CONTAINERIZED
- StringBuffer fileName("thorslave");
- Owned<IComponentLogFileCreator> lf = createComponentLogFileCreator(globals->queryProp("@logDir"), "thor");
- StringBuffer slaveNumStr;
- lf->setPostfix(slaveNumStr.append(mySlaveNum).str());
- lf->setCreateAliasFile(false);
- lf->setName(fileName.str());//override default filename
- logHandler = lf->beginLogging();
- #ifndef _DEBUG
- // keep duplicate logging output to stderr to aide debugging
- queryLogMsgManager()->removeMonitor(queryStderrLogMsgHandler());
- #endif
- LOG(MCdebugProgress, thorJob, "Opened log file %s", lf->queryLogFileSpec());
- #else
- setupContainerizedLogMsgHandler();
- logHandler = queryStderrLogMsgHandler();
- #endif
- //setupContainerizedStorageLocations();
- LOG(MCdebugProgress, thorJob, "Build %s", hpccBuildInfo.buildTag);
- return logHandler;
- }
- void setSlaveAffinity(unsigned processOnNode)
- {
- const char * affinity = globals->queryProp("@affinity");
- if (affinity)
- setProcessAffinity(affinity);
- else if (globals->getPropBool("@autoAffinity", true))
- {
- const char * nodes = globals->queryProp("@autoNodeAffinityNodes");
- unsigned slavesPerNode = globals->getPropInt("@slavesPerNode", 1);
- setAutoAffinity(processOnNode, slavesPerNode, nodes);
- }
- //The default policy is to allocate from the local node, so restricting allocations to the current sockets
- //may not buy much once the affinity is set up. It also means it will fail if there is no memory left on
- //this socket - even if there is on others.
- //Therefore it is not recommended unless you have maybe several independent thors running on the same machines
- //with exclusive access to memory.
- if (globals->getPropBool("@numaBindLocal", false))
- bindMemoryToLocalNodes();
- }
- int main( int argc, const char *argv[] )
- {
- if (!checkCreateDaemon(argc, argv))
- return EXIT_FAILURE;
- #if defined(WIN32) && defined(_DEBUG)
- int tmpFlag = _CrtSetDbgFlag( _CRTDBG_REPORT_FLAG );
- tmpFlag |= _CRTDBG_LEAK_CHECK_DF;
- _CrtSetDbgFlag( tmpFlag );
- #endif
- InitModuleObjects();
- addAbortHandler(ControlHandler);
- EnableSEHtoExceptionMapping();
- dummyProc();
- #ifndef __64BIT__
- // Restrict stack sizes on 32-bit systems
- Thread::setDefaultStackSize(0x10000); // NB under windows requires linker setting (/stack:)
- #endif
- #ifdef _WIN32
- Owned<CReleaseMutex> globalNamedMutex;
- #endif
- globals.setown(createPTree("Thor"));
- unsigned multiThorMemoryThreshold = 0;
- Owned<IException> unregisterException;
- try
- {
- if (argc==1)
- {
- usage();
- return 1;
- }
- cmdArgs = argv+1;
- #ifdef _CONTAINERIZED
- globals.setown(loadConfiguration(thorDefaultConfigYaml, argv, "thor", "THOR", nullptr, nullptr, nullptr, false));
- #else
- loadArgsIntoConfiguration(globals, cmdArgs);
- #endif
- const char *master = globals->queryProp("@master");
- if (!master)
- usage();
- mySlaveNum = globals->getPropInt("@slavenum", NotFound);
- /* NB: in cloud/non-local storage mode, slave number is not known until after registration with the master
- * For the time being log file names are based on their slave number, so can only start when known.
- */
- ILogMsgHandler *slaveLogHandler = nullptr;
- if (NotFound != mySlaveNum)
- slaveLogHandler = startSlaveLog();
- // In container world, SLAVE= will not be used
- const char *slave = globals->queryProp("@slave");
- if (slave)
- {
- slfEp.set(slave);
- localHostToNIC(slfEp);
- }
- else
- slfEp.setLocalHost(0);
- // TBD: use new config/init system for generic handling of init settings vs command line overrides
- if (0 == slfEp.port) // assume default from config if not on command line
- slfEp.port = globals->getPropInt("@slaveport", THOR_BASESLAVE_PORT);
- startMPServer(DCR_ThorSlave, slfEp.port, false, true);
- if (0 == slfEp.port)
- slfEp.port = queryMyNode()->endpoint().port;
- setMachinePortBase(slfEp.port);
- setSlaveAffinity(globals->getPropInt("@slaveprocessnum"));
- if (globals->getPropBool("@MPChannelReconnect"))
- getMPServer()->setOpt(mpsopt_channelreopen, "true");
- #ifdef USE_MP_LOG
- startLogMsgParentReceiver();
- LOG(MCdebugProgress, thorJob, "MPServer started on port %d", getFixedPort(TPORT_mp));
- #endif
- SocketEndpoint masterEp(master);
- localHostToNIC(masterEp);
- setMasterPortBase(masterEp.port);
- markNodeCentral(masterEp);
- if (RegisterSelf(masterEp))
- {
- if (!slaveLogHandler)
- slaveLogHandler = startSlaveLog();
- if (getExpertOptBool("slaveDaliClient"))
- enableThorSlaveAsDaliClient();
- IDaFileSrvHook *daFileSrvHook = queryDaFileSrvHook();
- if (daFileSrvHook) // probably always installed
- daFileSrvHook->addFilters(globals->queryPropTree("NAS"), &slfEp);
- enableForceRemoteReads(); // forces file reads to be remote reads if they match environment setting 'forceRemotePattern' pattern.
- StringBuffer thorPath;
- globals->getProp("@thorPath", thorPath);
- recursiveCreateDirectory(thorPath.str());
- int err = _chdir(thorPath.str());
- if (err)
- {
- IException *e = makeErrnoExceptionV(-1, "Failed to change dir to '%s'", thorPath.str());
- FLLOG(MCexception(e), thorJob, e);
- throw e;
- }
- // Initialization from globals
- setIORetryCount((unsigned)getExpertOptInt64("ioRetries")); // default == 0 == off
- StringBuffer str;
- if (globals->getProp("@externalProgDir", str.clear()))
- _mkdir(str.str());
- else
- globals->setProp("@externalProgDir", thorPath);
- #ifndef _CONTAINERIZED
- const char * overrideBaseDirectory = globals->queryProp("@thorDataDirectory");
- const char * overrideReplicateDirectory = globals->queryProp("@thorReplicateDirectory");
- StringBuffer datadir;
- StringBuffer repdir;
- if (getConfigurationDirectory(globals->queryPropTree("Directories"),"data","thor",globals->queryProp("@name"),datadir))
- overrideBaseDirectory = datadir.str();
- if (getConfigurationDirectory(globals->queryPropTree("Directories"),"mirror","thor",globals->queryProp("@name"),repdir))
- overrideReplicateDirectory = repdir.str();
- if (!isEmptyString(overrideBaseDirectory))
- setBaseDirectory(overrideBaseDirectory, false);
- if (!isEmptyString(overrideReplicateDirectory))
- setBaseDirectory(overrideReplicateDirectory, true);
- if (getConfigurationDirectory(globals->queryPropTree("Directories"),"query","thor",globals->queryProp("@name"),str.clear()))
- globals->setProp("@query_so_dir", str.str());
- else
- globals->getProp("@query_so_dir", str.clear());
- if (str.length())
- {
- if (getExpertOptBool("dllsToSlaves", true))
- {
- StringBuffer uniqSoPath;
- if (PATHSEPCHAR == str.charAt(str.length()-1))
- uniqSoPath.append(str.length()-1, str.str());
- else
- uniqSoPath.append(str);
- uniqSoPath.append("_").append(getMachinePortBase());
- str.swapWith(uniqSoPath);
- globals->setProp("@query_so_dir", str.str());
- }
- PROGLOG("Using querySo directory: %s", str.str());
- recursiveCreateDirectory(str.str());
- }
- #endif
- useMemoryMappedRead(globals->getPropBool("@useMemoryMappedRead"));
- LOG(MCdebugProgress, thorJob, "ThorSlave Version LCR - %d.%d started",THOR_VERSION_MAJOR,THOR_VERSION_MINOR);
- #ifdef _WIN32
- ULARGE_INTEGER userfree;
- ULARGE_INTEGER total;
- ULARGE_INTEGER free;
- if (GetDiskFreeSpaceEx("c:\\",&userfree,&total,&free)&&total.QuadPart) {
- unsigned pc = (unsigned)(free.QuadPart*100/total.QuadPart);
- LOG(MCdebugProgress, thorJob, "Total disk space = %" I64F "d k", total.QuadPart/1000);
- LOG(MCdebugProgress, thorJob, "Free disk space = %" I64F "d k", free.QuadPart/1000);
- LOG(MCdebugProgress, thorJob, "%d%% disk free\n",pc);
- }
- #endif
-
- multiThorMemoryThreshold = globals->getPropInt("@multiThorMemoryThreshold")*0x100000;
- if (multiThorMemoryThreshold) {
- StringBuffer lgname;
- if (!globals->getProp("@multiThorResourceGroup",lgname))
- globals->getProp("@nodeGroup",lgname);
- if (lgname.length()) {
- Owned<ILargeMemLimitNotify> notify = createMultiThorResourceMutex(lgname.str());
- setMultiThorMemoryNotify(multiThorMemoryThreshold,notify);
- PROGLOG("Multi-Thor resource limit for %s set to %" I64F "d",lgname.str(),(__int64)multiThorMemoryThreshold);
- }
- else
- multiThorMemoryThreshold = 0;
- }
- #ifndef _CONTAINERIZED
- unsigned pinterval = globals->getPropInt("@system_monitor_interval",1000*60);
- if (pinterval)
- startPerformanceMonitor(pinterval, PerfMonStandard, nullptr);
- #endif
- #ifdef _CONTAINERIZED
- class CServerThread : public CSimpleInterfaceOf<IThreaded>
- {
- CThreaded threaded;
- Owned<IRemoteFileServer> dafsInstance;
- public:
- CServerThread() : threaded("CServerThread")
- {
- dafsInstance.setown(createRemoteFileServer());
- threaded.init(this);
- }
- ~CServerThread()
- {
- PROGLOG("Stopping dafilesrv");
- dafsInstance->stop();
- threaded.join();
- }
- // IThreaded
- virtual void threadmain() override
- {
- SocketEndpoint listenEp(DAFILESRV_PORT);
- try
- {
- PROGLOG("Starting dafilesrv");
- dafsInstance->run(nullptr, SSLNone, listenEp);
- }
- catch (IException *e)
- {
- EXCLOG(e, "dafilesrv error");
- throw;
- }
- }
- };
- OwnedPtr<CServerThread> dafsThread;
- if (globals->getPropBool("@_dafsStorage"))
- dafsThread.setown(new CServerThread);
- #endif
- installDefaultFileHooks(globals);
- slaveMain(jobListenerStopped, slaveLogHandler);
- }
- LOG(MCdebugProgress, thorJob, "ThorSlave terminated OK");
- }
- catch (IException *e)
- {
- if (!jobListenerStopped)
- FLLOG(MCexception(e), thorJob, e,"ThorSlave");
- unregisterException.setown(e);
- }
- #ifndef _CONTAINERIZED
- stopPerformanceMonitor();
- #endif
- if (multiThorMemoryThreshold)
- setMultiThorMemoryNotify(0,NULL);
- roxiemem::releaseRoxieHeap();
- if (unregisterException.get())
- UnregisterSelf(unregisterException);
- if (getExpertOptBool("slaveDaliClient"))
- disableThorSlaveAsDaliClient();
- #ifdef USE_MP_LOG
- stopLogMsgReceivers();
- #endif
- stopMPServer();
- releaseAtoms(); // don't know why we can't use a module_exit to destruct these...
- ExitModuleObjects(); // not necessary, atexit will call, but good for leak checking
- return 0;
- }
|