123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653 |
- /*##############################################################################
- Copyright (C) 2011 HPCC Systems.
- All rights reserved. This program is free software: you can redistribute it and/or modify
- it under the terms of the GNU Affero General Public License as
- published by the Free Software Foundation, either version 3 of the
- License, or (at your option) any later version.
- This program is distributed in the hope that it will be useful,
- but WITHOUT ANY WARRANTY; without even the implied warranty of
- MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- GNU Affero General Public License for more details.
- You should have received a copy of the GNU Affero General Public License
- along with this program. If not, see <http://www.gnu.org/licenses/>.
- ############################################################################## */
- #include <platform.h>
- #include "jlib.hpp"
- #include "jexcept.hpp"
- #include "jthread.hpp"
- #include "jprop.hpp"
- #include "jiter.ipp"
- #include "jlzw.hpp"
- #include "jhtree.hpp"
- #include "mpcomm.hpp"
- #include "portlist.h"
- #include "rmtfile.hpp"
- #include "daclient.hpp"
- #include "dafdesc.hpp"
- #include "slwatchdog.hpp"
- #include "thbuf.hpp"
- #include "thmem.hpp"
- #include "thexception.hpp"
- #include "backup.hpp"
- #include "slave.hpp"
- #include "thormisc.hpp"
- #include "thorport.hpp"
- #include "thgraphslave.hpp"
- #include "slave.ipp"
- #include "thcompressutil.hpp"
- //---------------------------------------------------------------------------
- //---------------------------------------------------------------------------
- class CJobListener : public CSimpleInterface
- {
- bool stopped;
- CriticalSection crit;
- OwningStringSuperHashTableOf<CJobSlave> jobs;
- CFifoFileCache querySoCache; // used to mirror master cache
- class CThreadExceptionCatcher : implements IExceptionHandler
- {
- CJobListener &jobListener;
- public:
- CThreadExceptionCatcher(CJobListener &_jobListener) : jobListener(_jobListener)
- {
- addThreadExceptionHandler(this);
- }
- ~CThreadExceptionCatcher()
- {
- removeThreadExceptionHandler(this);
- }
- virtual bool fireException(IException *e)
- {
- mptag_t mptag;
- {
- CriticalBlock b(jobListener.crit);
- if (0 == jobListener.jobs.count())
- {
- EXCLOG(e, "No job active exception: ");
- return true;
- }
- IThorException *te = QUERYINTERFACE(e, IThorException);
- CJobSlave *job = NULL;
- if (te && te->queryJobId())
- job = jobListener.jobs.find(te->queryJobId());
- if (!job)
- {
- // JCSMORE - exception fallen through to thread exception handler, from unknown job, fire up to 1st job for now.
- job = (CJobSlave *)jobListener.jobs.next(NULL);
- }
- mptag = job->querySlaveMpTag();
- }
- CMessageBuffer msg;
- msg.append(smt_errorMsg);
- serializeThorException(e, msg);
- try
- {
- if (!queryClusterComm().sendRecv(msg, 0, mptag, LONGTIMEOUT))
- EXCLOG(e, "Failed to send exception to master");
- }
- catch (IException *e2)
- {
- StringBuffer str("Error whilst sending exception '");
- e->errorMessage(str);
- str.append("' to master");
- EXCLOG(e2, str.str());
- e2->Release();
- }
- return true;
- }
- } excptHandler;
- public:
- CJobListener() : excptHandler(*this)
- {
- stopped = true;
- }
- ~CJobListener()
- {
- stop();
- }
- void stop()
- {
- queryClusterComm().cancel(0, masterSlaveMpTag);
- }
- virtual void main()
- {
- StringBuffer soPath;
- globals->getProp("@query_so_dir", soPath);
- StringBuffer soPattern("*.");
- #ifdef _WIN32
- soPattern.append("dll");
- #else
- soPattern.append("so");
- #endif
- if (globals->getPropBool("Debug/@dllsToSlaves",true))
- querySoCache.init(soPath.str(), DEFAULT_QUERYSO_LIMIT, soPattern);
- Owned<ISlaveWatchdog> watchdog;
- if (globals->getPropBool("@watchdogEnabled"))
- watchdog.setown(createProgressHandler(globals->getPropBool("@useUDPWatchdog")));
- CMessageBuffer msg;
- stopped = false;
- bool doReply;
- rank_t sendert;
- while (!stopped && queryClusterComm().recv(msg, 0, masterSlaveMpTag, &sendert))
- {
- doReply = true;
- msgids cmd;
- try
- {
- msg.read((unsigned &)cmd);
- switch (cmd)
- {
- case QueryInit:
- {
- MemoryBuffer mb;
- decompressToBuffer(mb, msg);
- msg.swapWith(mb);
- mptag_t mptag, slaveMsgTag;
- deserializeMPtag(msg, mptag);
- queryClusterComm().flush(mptag);
- deserializeMPtag(msg, slaveMsgTag);
- queryClusterComm().flush(slaveMsgTag);
- StringAttr wuid, graphName, soPath;
- msg.read(wuid);
- msg.read(graphName);
- msg.read(soPath);
- bool sendSo;
- msg.read(sendSo);
- if (sendSo)
- {
- size32_t size;
- msg.read(size);
- Owned<IFile> iFile = createIFile(soPath);
- try
- {
- const void *soPtr = msg.readDirect(size);
- #ifdef _DEBUG
- if (!iFile->exists())
- #else
- if (1)
- #endif
- {
- iFile->setCreateFlags(S_IRWXU);
- Owned<IFileIO> iFileIO = iFile->open(IFOwrite);
- iFileIO->write(0, size, soPtr);
- }
- }
- catch (IException *e)
- {
- StringBuffer msg("Failed to save dll, cwd = ");
- char buf[255];
- if (!GetCurrentDirectory(sizeof(buf), buf)) {
- ERRLOG("CJobListener::main: Current directory path too big, setting it to null");
- buf[0] = 0;
- }
- msg.append(buf).append(", path = ").append(soPath);
- EXCLOG(e, msg.str());
- e->Release();
- }
- assertex(globals->getPropBool("Debug/@dllsToSlaves", true));
- querySoCache.add(soPath);
- }
- else
- {
- RemoteFilename rfn;
- SocketEndpoint masterEp = queryClusterGroup().queryNode(0).endpoint();
- masterEp.port = 0;
- rfn.setPath(masterEp, soPath);
- StringBuffer rpath;
- if (rfn.isLocal())
- rfn.getLocalPath(rpath);
- else
- rfn.getRemotePath(rpath);
- if (globals->getPropBool("Debug/@dllsToSlaves", true))
- {
- // i.e. should have previously been sent.
- OwnedIFile iFile = createIFile(soPath);
- if (!iFile->exists())
- {
- WARNLOG("Slave cached query dll missing: %s, will attempt to fetch from master", soPath.get());
- copyFile(soPath, rpath.str());
- }
- querySoCache.add(soPath);
- }
- else
- soPath.set(rpath.str());
- }
- Owned<IPropertyTree> workUnitInfo = createPTree(msg);
- StringBuffer user;
- workUnitInfo->getProp("user", user);
- PROGLOG("Started wuid=%s, user=%s, graph=%s\n", wuid.get(), user.str(), graphName.get());
- PROGLOG("Using query: %s", soPath.get());
- Owned<CJobSlave> job = new CJobSlave(watchdog, workUnitInfo, graphName, soPath, mptag, slaveMsgTag);
- jobs.replace(*LINK(job));
- Owned<IPropertyTree> deps = createPTree(msg);
- job->setXGMML(deps);
- msg.clear();
- msg.append(false);
- break;
- }
- case QueryDone:
- {
- StringAttr key;
- msg.read(key);
- CJobSlave *job = jobs.find(key.get());
- StringAttr wuid = job->queryWuid();
- StringAttr graphName = job->queryGraphName();
- PROGLOG("QueryDone, removing %s from jobs", key.get());
- jobs.removeExact(job);
- PROGLOG("QueryDone, removed %s from jobs", key.get());
- PROGLOG("Finished wuid=%s, graph=%s", wuid.get(), graphName.get());
- msg.clear();
- msg.append(false);
- break;
- }
- case GraphInit:
- {
- StringAttr jobKey;
- msg.read(jobKey);
- CJobSlave *job = jobs.find(jobKey.get());
- if (!job)
- throw MakeStringException(0, "Job not found: %s", jobKey.get());
- Owned<IPropertyTree> graphNode = createPTree(msg);
- Owned<CSlaveGraph> subGraph = (CSlaveGraph *)job->createGraph();
- subGraph->createFromXGMML(graphNode, NULL, NULL, NULL);
- PROGLOG("GraphInit: %s, graphId=%"GIDPF"d", jobKey.get(), subGraph->queryGraphId());
- subGraph->setExecuteReplyTag(subGraph->queryJob().deserializeMPTag(msg));
- size32_t len;
- msg.read(len);
- MemoryBuffer initData;
- initData.append(len, msg.readDirect(len));
- subGraph->deserializeCreateContexts(initData);
- graph_id gid;
- msg.read(gid);
- assertex(gid == subGraph->queryGraphId());
- subGraph->init(msg);
- job->addSubGraph(*LINK(subGraph));
- job->addDependencies(job->queryXGMML(), false);
- subGraph->execute(0, NULL, true, true);
- msg.clear();
- msg.append(false);
- break;
- }
- case GraphEnd:
- {
- StringAttr jobKey;
- msg.read(jobKey);
- CJobSlave *job = jobs.find(jobKey.get());
- if (job)
- {
- graph_id gid;
- msg.read(gid);
- msg.clear();
- msg.append(false);
- Owned<CSlaveGraph> graph = (CSlaveGraph *)job->getGraph(gid);
- if (graph)
- {
- graph->getDone(msg);
- graph->join(); // graph will wind-up.
- }
- else
- {
- msg.clear();
- msg.append(false);
- }
- }
- else
- {
- msg.clear();
- msg.append(false);
- }
- break;
- }
- case GraphAbort:
- {
- StringAttr jobKey;
- msg.read(jobKey);
- PROGLOG("GraphAbort: %s", jobKey.get());
- CJobSlave *job = jobs.find(jobKey.get());
- if (job)
- {
- graph_id gid;
- msg.read(gid);
- Owned<CGraphBase> graph = job->getGraph(gid);
- if (graph)
- {
- Owned<IThorException> e = MakeThorException(0, "GraphAbort");
- e->setGraphId(gid);
- graph->abort(e);
- }
- }
- msg.clear();
- msg.append(false);
- break;
- }
- case Shutdown:
- {
- doReply = false;
- stopped = true;
- break;
- }
- case GraphGetResult:
- {
- StringAttr jobKey;
- msg.read(jobKey);
- PROGLOG("GraphGetResult: %s", jobKey.get());
- CJobSlave *job = jobs.find(jobKey.get());
- if (job)
- {
- graph_id gid;
- msg.read(gid);
- Owned<CGraphBase> graph = job->getGraph(gid);
- if (!graph)
- {
- Owned<IThorException> e = MakeThorException(0, "GraphGetResult: graph not found");
- e->setGraphId(gid);
- throw e.getClear();
- }
- unsigned resultId;
- msg.read(resultId);
- mptag_t replyTag = job->deserializeMPTag(msg);
- msg.setReplyTag(replyTag);
- Owned<IThorResult> result = graph->getResult(resultId);
- if (!result)
- throw MakeGraphException(graph, 0, "GraphGetResult: result not found: %d", resultId);
- msg.clear();
- Owned<IRowStream> resultStream = result->getRowStream();
- sendInChunks(job->queryJobComm(), 0, replyTag, resultStream, result->queryRowInterfaces());
- doReply = false;
- }
- break;
- }
- default:
- throwUnexpected();
- }
- }
- catch (IException *e)
- {
- EXCLOG(e, NULL);
- if (doReply && TAG_NULL != msg.getReplyTag())
- {
- doReply = false;
- msg.clear();
- msg.append(true);
- serializeThorException(e, msg);
- queryClusterComm().reply(msg);
- }
- e->Release();
- }
- if (doReply && msg.getReplyTag()!=TAG_NULL)
- queryClusterComm().reply(msg);
- }
- }
- friend class CThreadExceptionCatcher;
- };
- //////////////////////////
- class CStringAttr : public StringAttr, public CSimpleInterface
- {
- public:
- CStringAttr(const char *str) : StringAttr(str) { }
- const char *queryFindString() const { return get(); }
- };
- class CFileInProgressHandler : public CSimpleInterface, implements IFileInProgressHandler
- {
- CriticalSection crit;
- StringSuperHashTableOf<CStringAttr> lookup;
- QueueOf<CStringAttr, false> fipList;
- OwnedIFileIO iFileIO;
- static const char *formatV;
- void write()
- {
- if (0 == fipList.ordinality())
- iFileIO->setSize(0);
- else
- {
- Owned<IFileIOStream> stream = createBufferedIOStream(iFileIO);
- stream->write(3, formatV); // 3 byte format definition, incase of change later
- ForEachItemIn(i, fipList)
- {
- writeStringToStream(*stream, fipList.item(i)->get());
- writeCharToStream(*stream, '\n');
- }
- offset_t pos = stream->tell();
- stream.clear();
- iFileIO->setSize(pos);
- }
- }
- void doDelete(const char *fip)
- {
- OwnedIFile iFile = createIFile(fip);
- try
- {
- iFile->remove();
- }
- catch (IException *e)
- {
- StringBuffer errStr("FileInProgressHandler, failed to remove: ");
- EXCLOG(e, errStr.append(fip).str());
- e->Release();
- }
- }
- void backup(const char *dir, IFile *iFile)
- {
- StringBuffer origName(iFile->queryFilename());
- StringBuffer bakName("fiplist_");
- CDateTime dt;
- dt.setNow();
- bakName.append((unsigned)dt.getSimple()).append("_").append((unsigned)GetCurrentProcessId()).append(".bak");
- iFileIO.clear(); // close old for rename
- iFile->rename(bakName.str());
- WARNLOG("Renamed to %s", bakName.str());
- OwnedIFile newIFile = createIFile(origName);
- iFileIO.setown(newIFile->open(IFOreadwrite)); // reopen
- }
- public:
- IMPLEMENT_IINTERFACE_USING(CSimpleInterface);
- CFileInProgressHandler()
- {
- init();
- }
- ~CFileInProgressHandler()
- {
- deinit();
- }
- void deinit()
- {
- loop
- {
- CStringAttr *item = fipList.dequeue();
- if (!item) break;
- doDelete(item->get());
- item->Release();
- }
- lookup.kill();
- }
- void init()
- {
- StringBuffer dir;
- globals->getProp("@thorPath", dir);
- StringBuffer path(dir);
- addPathSepChar(path);
- path.append("fiplist_");
- globals->getProp("@name", path);
- path.append("_");
- path.append(queryClusterGroup().rank(queryMyNode()));
- path.append(".lst");
- ensureDirectoryForFile(path.str());
- Owned<IFile> iFile = createIFile(path.str());
- iFileIO.setown(iFile->open(IFOreadwrite));
- if (!iFileIO)
- {
- PROGLOG("Failed to open/create backup file: %s", path.str());
- return;
- }
- MemoryBuffer mb;
- size32_t sz = read(iFileIO, 0, (size32_t)iFileIO->size(), mb);
- const char *mem = mb.toByteArray();
- if (mem)
- {
- if (sz<=3)
- {
- WARNLOG("Corrupt files-in-progress file detected: %s", path.str());
- backup(dir, iFile);
- }
- else
- {
- const char *endMem = mem+mb.length();
- mem += 3; // formatV header
- do
- {
- const char *eol = strchr(mem, '\n');
- if (!eol)
- {
- WARNLOG("Corrupt files-in-progress file detected: %s", path.str());
- backup(dir, iFile);
- break;
- }
- StringAttr fip(mem, eol-mem);
- doDelete(fip);
- mem = eol+1;
- }
- while (mem != endMem);
- }
- }
- write();
- }
-
- // IFileInProgressHandler
- virtual void add(const char *fip)
- {
- CriticalBlock b(crit);
- CStringAttr *item = lookup.find(fip);
- assertex(!item);
- item = new CStringAttr(fip);
- fipList.enqueue(item);
- lookup.add(* item);
- write();
- }
- virtual void remove(const char *fip)
- {
- CriticalBlock b(crit);
- CStringAttr *item = lookup.find(fip);
- if (item)
- {
- lookup.removeExact(item);
- unsigned pos = fipList.find(item);
- fipList.dequeue(item);
- item->Release();
- write();
- }
- }
- };
- const char *CFileInProgressHandler::formatV = "01\n";
- class CThorResourceSlave : public CThorResourceBase
- {
- Owned<IThorFileCache> fileCache;
- Owned<IBackup> backupHandler;
- Owned<IFileInProgressHandler> fipHandler;
- public:
- IMPLEMENT_IINTERFACE_USING(CSimpleInterface);
- CThorResourceSlave()
- {
- backupHandler.setown(createBackupHandler(queryBaseDirectory()));
- fileCache.setown(createFileCache(globals->getPropInt("@fileCacheLimit", 1800)));
- fipHandler.setown(new CFileInProgressHandler());
- }
- ~CThorResourceSlave()
- {
- fileCache.clear();
- backupHandler.clear();
- fipHandler.clear();
- }
- // IThorResource
- virtual IThorFileCache &queryFileCache() { return *fileCache.get(); }
- virtual IBackup &queryBackup() { return *backupHandler.get(); }
- virtual IFileInProgressHandler &queryFileInProgressHandler() { return *fipHandler.get(); }
- };
- void slaveMain()
- {
- unsigned masterMemMB = globals->getPropInt("@masterTotalMem");
- HardwareInfo hdwInfo;
- getHardwareInfo(hdwInfo);
- if (hdwInfo.totalMemory < masterMemMB)
- WARNLOG("Slave has less memory than master node"); // JCSMORE, error?
- unsigned gmemSize = globals->getPropInt("@globalMemorySize");
- if (gmemSize >= hdwInfo.totalMemory)
- {
- // should prob. error here
- }
- roxiemem::setTotalMemoryLimit(((memsize_t)gmemSize) * 0x100000, 0, NULL);
- CJobListener jobListener;
- CThorResourceSlave slaveResource;
- setIThorResource(slaveResource);
- #ifdef __linux__
- bool useMirrorMount = globals->getPropBool("Debug/@useMirrorMount", false);
- if (useMirrorMount && queryClusterGroup().ordinality() > 2)
- {
- unsigned slaves = queryClusterGroup().ordinality()-1;
- rank_t next = queryClusterGroup().rank()%slaves; // note 0 = master
- const IpAddress &ip = queryClusterGroup().queryNode(next+1).endpoint();
- StringBuffer ipStr;
- ip.getIpText(ipStr);
- PROGLOG("Redirecting local mount to %s", ipStr.str());
- const char * overrideReplicateDirectory = globals->queryProp("@thorReplicateDirectory");
- StringBuffer repdir;
- if (getConfigurationDirectory(globals->queryPropTree("Directories"),"mirror","thor",globals->queryProp("@name"),repdir))
- overrideReplicateDirectory = repdir.str();
- else
- overrideReplicateDirectory = "/d$";
- setLocalMountRedirect(ip, overrideReplicateDirectory, "/mnt/mirror");
- }
- #endif
- jobListener.main();
- }
- void abortSlave()
- {
- if (clusterInitialized())
- queryClusterComm().cancel(0, masterSlaveMpTag);
- }
|