slavmain.cpp 23 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653
  1. /*##############################################################################
  2. Copyright (C) 2011 HPCC Systems.
  3. All rights reserved. This program is free software: you can redistribute it and/or modify
  4. it under the terms of the GNU Affero General Public License as
  5. published by the Free Software Foundation, either version 3 of the
  6. License, or (at your option) any later version.
  7. This program is distributed in the hope that it will be useful,
  8. but WITHOUT ANY WARRANTY; without even the implied warranty of
  9. MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  10. GNU Affero General Public License for more details.
  11. You should have received a copy of the GNU Affero General Public License
  12. along with this program. If not, see <http://www.gnu.org/licenses/>.
  13. ############################################################################## */
  14. #include <platform.h>
  15. #include "jlib.hpp"
  16. #include "jexcept.hpp"
  17. #include "jthread.hpp"
  18. #include "jprop.hpp"
  19. #include "jiter.ipp"
  20. #include "jlzw.hpp"
  21. #include "jhtree.hpp"
  22. #include "mpcomm.hpp"
  23. #include "portlist.h"
  24. #include "rmtfile.hpp"
  25. #include "daclient.hpp"
  26. #include "dafdesc.hpp"
  27. #include "slwatchdog.hpp"
  28. #include "thbuf.hpp"
  29. #include "thmem.hpp"
  30. #include "thexception.hpp"
  31. #include "backup.hpp"
  32. #include "slave.hpp"
  33. #include "thormisc.hpp"
  34. #include "thorport.hpp"
  35. #include "thgraphslave.hpp"
  36. #include "slave.ipp"
  37. #include "thcompressutil.hpp"
  38. //---------------------------------------------------------------------------
  39. //---------------------------------------------------------------------------
  40. class CJobListener : public CSimpleInterface
  41. {
  42. bool stopped;
  43. CriticalSection crit;
  44. OwningStringSuperHashTableOf<CJobSlave> jobs;
  45. CFifoFileCache querySoCache; // used to mirror master cache
  46. class CThreadExceptionCatcher : implements IExceptionHandler
  47. {
  48. CJobListener &jobListener;
  49. public:
  50. CThreadExceptionCatcher(CJobListener &_jobListener) : jobListener(_jobListener)
  51. {
  52. addThreadExceptionHandler(this);
  53. }
  54. ~CThreadExceptionCatcher()
  55. {
  56. removeThreadExceptionHandler(this);
  57. }
  58. virtual bool fireException(IException *e)
  59. {
  60. mptag_t mptag;
  61. {
  62. CriticalBlock b(jobListener.crit);
  63. if (0 == jobListener.jobs.count())
  64. {
  65. EXCLOG(e, "No job active exception: ");
  66. return true;
  67. }
  68. IThorException *te = QUERYINTERFACE(e, IThorException);
  69. CJobSlave *job = NULL;
  70. if (te && te->queryJobId())
  71. job = jobListener.jobs.find(te->queryJobId());
  72. if (!job)
  73. {
  74. // JCSMORE - exception fallen through to thread exception handler, from unknown job, fire up to 1st job for now.
  75. job = (CJobSlave *)jobListener.jobs.next(NULL);
  76. }
  77. mptag = job->querySlaveMpTag();
  78. }
  79. CMessageBuffer msg;
  80. msg.append(smt_errorMsg);
  81. serializeThorException(e, msg);
  82. try
  83. {
  84. if (!queryClusterComm().sendRecv(msg, 0, mptag, LONGTIMEOUT))
  85. EXCLOG(e, "Failed to send exception to master");
  86. }
  87. catch (IException *e2)
  88. {
  89. StringBuffer str("Error whilst sending exception '");
  90. e->errorMessage(str);
  91. str.append("' to master");
  92. EXCLOG(e2, str.str());
  93. e2->Release();
  94. }
  95. return true;
  96. }
  97. } excptHandler;
  98. public:
  99. CJobListener() : excptHandler(*this)
  100. {
  101. stopped = true;
  102. }
  103. ~CJobListener()
  104. {
  105. stop();
  106. }
  107. void stop()
  108. {
  109. queryClusterComm().cancel(0, masterSlaveMpTag);
  110. }
  111. virtual void main()
  112. {
  113. StringBuffer soPath;
  114. globals->getProp("@query_so_dir", soPath);
  115. StringBuffer soPattern("*.");
  116. #ifdef _WIN32
  117. soPattern.append("dll");
  118. #else
  119. soPattern.append("so");
  120. #endif
  121. if (globals->getPropBool("Debug/@dllsToSlaves",true))
  122. querySoCache.init(soPath.str(), DEFAULT_QUERYSO_LIMIT, soPattern);
  123. Owned<ISlaveWatchdog> watchdog;
  124. if (globals->getPropBool("@watchdogEnabled"))
  125. watchdog.setown(createProgressHandler(globals->getPropBool("@useUDPWatchdog")));
  126. CMessageBuffer msg;
  127. stopped = false;
  128. bool doReply;
  129. rank_t sendert;
  130. while (!stopped && queryClusterComm().recv(msg, 0, masterSlaveMpTag, &sendert))
  131. {
  132. doReply = true;
  133. msgids cmd;
  134. try
  135. {
  136. msg.read((unsigned &)cmd);
  137. switch (cmd)
  138. {
  139. case QueryInit:
  140. {
  141. MemoryBuffer mb;
  142. decompressToBuffer(mb, msg);
  143. msg.swapWith(mb);
  144. mptag_t mptag, slaveMsgTag;
  145. deserializeMPtag(msg, mptag);
  146. queryClusterComm().flush(mptag);
  147. deserializeMPtag(msg, slaveMsgTag);
  148. queryClusterComm().flush(slaveMsgTag);
  149. StringAttr wuid, graphName, soPath;
  150. msg.read(wuid);
  151. msg.read(graphName);
  152. msg.read(soPath);
  153. bool sendSo;
  154. msg.read(sendSo);
  155. if (sendSo)
  156. {
  157. size32_t size;
  158. msg.read(size);
  159. Owned<IFile> iFile = createIFile(soPath);
  160. try
  161. {
  162. const void *soPtr = msg.readDirect(size);
  163. #ifdef _DEBUG
  164. if (!iFile->exists())
  165. #else
  166. if (1)
  167. #endif
  168. {
  169. iFile->setCreateFlags(S_IRWXU);
  170. Owned<IFileIO> iFileIO = iFile->open(IFOwrite);
  171. iFileIO->write(0, size, soPtr);
  172. }
  173. }
  174. catch (IException *e)
  175. {
  176. StringBuffer msg("Failed to save dll, cwd = ");
  177. char buf[255];
  178. if (!GetCurrentDirectory(sizeof(buf), buf)) {
  179. ERRLOG("CJobListener::main: Current directory path too big, setting it to null");
  180. buf[0] = 0;
  181. }
  182. msg.append(buf).append(", path = ").append(soPath);
  183. EXCLOG(e, msg.str());
  184. e->Release();
  185. }
  186. assertex(globals->getPropBool("Debug/@dllsToSlaves", true));
  187. querySoCache.add(soPath);
  188. }
  189. else
  190. {
  191. RemoteFilename rfn;
  192. SocketEndpoint masterEp = queryClusterGroup().queryNode(0).endpoint();
  193. masterEp.port = 0;
  194. rfn.setPath(masterEp, soPath);
  195. StringBuffer rpath;
  196. if (rfn.isLocal())
  197. rfn.getLocalPath(rpath);
  198. else
  199. rfn.getRemotePath(rpath);
  200. if (globals->getPropBool("Debug/@dllsToSlaves", true))
  201. {
  202. // i.e. should have previously been sent.
  203. OwnedIFile iFile = createIFile(soPath);
  204. if (!iFile->exists())
  205. {
  206. WARNLOG("Slave cached query dll missing: %s, will attempt to fetch from master", soPath.get());
  207. copyFile(soPath, rpath.str());
  208. }
  209. querySoCache.add(soPath);
  210. }
  211. else
  212. soPath.set(rpath.str());
  213. }
  214. Owned<IPropertyTree> workUnitInfo = createPTree(msg);
  215. StringBuffer user;
  216. workUnitInfo->getProp("user", user);
  217. PROGLOG("Started wuid=%s, user=%s, graph=%s\n", wuid.get(), user.str(), graphName.get());
  218. PROGLOG("Using query: %s", soPath.get());
  219. Owned<CJobSlave> job = new CJobSlave(watchdog, workUnitInfo, graphName, soPath, mptag, slaveMsgTag);
  220. jobs.replace(*LINK(job));
  221. Owned<IPropertyTree> deps = createPTree(msg);
  222. job->setXGMML(deps);
  223. msg.clear();
  224. msg.append(false);
  225. break;
  226. }
  227. case QueryDone:
  228. {
  229. StringAttr key;
  230. msg.read(key);
  231. CJobSlave *job = jobs.find(key.get());
  232. StringAttr wuid = job->queryWuid();
  233. StringAttr graphName = job->queryGraphName();
  234. PROGLOG("QueryDone, removing %s from jobs", key.get());
  235. jobs.removeExact(job);
  236. PROGLOG("QueryDone, removed %s from jobs", key.get());
  237. PROGLOG("Finished wuid=%s, graph=%s", wuid.get(), graphName.get());
  238. msg.clear();
  239. msg.append(false);
  240. break;
  241. }
  242. case GraphInit:
  243. {
  244. StringAttr jobKey;
  245. msg.read(jobKey);
  246. CJobSlave *job = jobs.find(jobKey.get());
  247. if (!job)
  248. throw MakeStringException(0, "Job not found: %s", jobKey.get());
  249. Owned<IPropertyTree> graphNode = createPTree(msg);
  250. Owned<CSlaveGraph> subGraph = (CSlaveGraph *)job->createGraph();
  251. subGraph->createFromXGMML(graphNode, NULL, NULL, NULL);
  252. PROGLOG("GraphInit: %s, graphId=%"GIDPF"d", jobKey.get(), subGraph->queryGraphId());
  253. subGraph->setExecuteReplyTag(subGraph->queryJob().deserializeMPTag(msg));
  254. size32_t len;
  255. msg.read(len);
  256. MemoryBuffer initData;
  257. initData.append(len, msg.readDirect(len));
  258. subGraph->deserializeCreateContexts(initData);
  259. graph_id gid;
  260. msg.read(gid);
  261. assertex(gid == subGraph->queryGraphId());
  262. subGraph->init(msg);
  263. job->addSubGraph(*LINK(subGraph));
  264. job->addDependencies(job->queryXGMML(), false);
  265. subGraph->execute(0, NULL, true, true);
  266. msg.clear();
  267. msg.append(false);
  268. break;
  269. }
  270. case GraphEnd:
  271. {
  272. StringAttr jobKey;
  273. msg.read(jobKey);
  274. CJobSlave *job = jobs.find(jobKey.get());
  275. if (job)
  276. {
  277. graph_id gid;
  278. msg.read(gid);
  279. msg.clear();
  280. msg.append(false);
  281. Owned<CSlaveGraph> graph = (CSlaveGraph *)job->getGraph(gid);
  282. if (graph)
  283. {
  284. graph->getDone(msg);
  285. graph->join(); // graph will wind-up.
  286. }
  287. else
  288. {
  289. msg.clear();
  290. msg.append(false);
  291. }
  292. }
  293. else
  294. {
  295. msg.clear();
  296. msg.append(false);
  297. }
  298. break;
  299. }
  300. case GraphAbort:
  301. {
  302. StringAttr jobKey;
  303. msg.read(jobKey);
  304. PROGLOG("GraphAbort: %s", jobKey.get());
  305. CJobSlave *job = jobs.find(jobKey.get());
  306. if (job)
  307. {
  308. graph_id gid;
  309. msg.read(gid);
  310. Owned<CGraphBase> graph = job->getGraph(gid);
  311. if (graph)
  312. {
  313. Owned<IThorException> e = MakeThorException(0, "GraphAbort");
  314. e->setGraphId(gid);
  315. graph->abort(e);
  316. }
  317. }
  318. msg.clear();
  319. msg.append(false);
  320. break;
  321. }
  322. case Shutdown:
  323. {
  324. doReply = false;
  325. stopped = true;
  326. break;
  327. }
  328. case GraphGetResult:
  329. {
  330. StringAttr jobKey;
  331. msg.read(jobKey);
  332. PROGLOG("GraphGetResult: %s", jobKey.get());
  333. CJobSlave *job = jobs.find(jobKey.get());
  334. if (job)
  335. {
  336. graph_id gid;
  337. msg.read(gid);
  338. Owned<CGraphBase> graph = job->getGraph(gid);
  339. if (!graph)
  340. {
  341. Owned<IThorException> e = MakeThorException(0, "GraphGetResult: graph not found");
  342. e->setGraphId(gid);
  343. throw e.getClear();
  344. }
  345. unsigned resultId;
  346. msg.read(resultId);
  347. mptag_t replyTag = job->deserializeMPTag(msg);
  348. msg.setReplyTag(replyTag);
  349. Owned<IThorResult> result = graph->getResult(resultId);
  350. if (!result)
  351. throw MakeGraphException(graph, 0, "GraphGetResult: result not found: %d", resultId);
  352. msg.clear();
  353. Owned<IRowStream> resultStream = result->getRowStream();
  354. sendInChunks(job->queryJobComm(), 0, replyTag, resultStream, result->queryRowInterfaces());
  355. doReply = false;
  356. }
  357. break;
  358. }
  359. default:
  360. throwUnexpected();
  361. }
  362. }
  363. catch (IException *e)
  364. {
  365. EXCLOG(e, NULL);
  366. if (doReply && TAG_NULL != msg.getReplyTag())
  367. {
  368. doReply = false;
  369. msg.clear();
  370. msg.append(true);
  371. serializeThorException(e, msg);
  372. queryClusterComm().reply(msg);
  373. }
  374. e->Release();
  375. }
  376. if (doReply && msg.getReplyTag()!=TAG_NULL)
  377. queryClusterComm().reply(msg);
  378. }
  379. }
  380. friend class CThreadExceptionCatcher;
  381. };
  382. //////////////////////////
  383. class CStringAttr : public StringAttr, public CSimpleInterface
  384. {
  385. public:
  386. CStringAttr(const char *str) : StringAttr(str) { }
  387. const char *queryFindString() const { return get(); }
  388. };
  389. class CFileInProgressHandler : public CSimpleInterface, implements IFileInProgressHandler
  390. {
  391. CriticalSection crit;
  392. StringSuperHashTableOf<CStringAttr> lookup;
  393. QueueOf<CStringAttr, false> fipList;
  394. OwnedIFileIO iFileIO;
  395. static const char *formatV;
  396. void write()
  397. {
  398. if (0 == fipList.ordinality())
  399. iFileIO->setSize(0);
  400. else
  401. {
  402. Owned<IFileIOStream> stream = createBufferedIOStream(iFileIO);
  403. stream->write(3, formatV); // 3 byte format definition, incase of change later
  404. ForEachItemIn(i, fipList)
  405. {
  406. writeStringToStream(*stream, fipList.item(i)->get());
  407. writeCharToStream(*stream, '\n');
  408. }
  409. offset_t pos = stream->tell();
  410. stream.clear();
  411. iFileIO->setSize(pos);
  412. }
  413. }
  414. void doDelete(const char *fip)
  415. {
  416. OwnedIFile iFile = createIFile(fip);
  417. try
  418. {
  419. iFile->remove();
  420. }
  421. catch (IException *e)
  422. {
  423. StringBuffer errStr("FileInProgressHandler, failed to remove: ");
  424. EXCLOG(e, errStr.append(fip).str());
  425. e->Release();
  426. }
  427. }
  428. void backup(const char *dir, IFile *iFile)
  429. {
  430. StringBuffer origName(iFile->queryFilename());
  431. StringBuffer bakName("fiplist_");
  432. CDateTime dt;
  433. dt.setNow();
  434. bakName.append((unsigned)dt.getSimple()).append("_").append((unsigned)GetCurrentProcessId()).append(".bak");
  435. iFileIO.clear(); // close old for rename
  436. iFile->rename(bakName.str());
  437. WARNLOG("Renamed to %s", bakName.str());
  438. OwnedIFile newIFile = createIFile(origName);
  439. iFileIO.setown(newIFile->open(IFOreadwrite)); // reopen
  440. }
  441. public:
  442. IMPLEMENT_IINTERFACE_USING(CSimpleInterface);
  443. CFileInProgressHandler()
  444. {
  445. init();
  446. }
  447. ~CFileInProgressHandler()
  448. {
  449. deinit();
  450. }
  451. void deinit()
  452. {
  453. loop
  454. {
  455. CStringAttr *item = fipList.dequeue();
  456. if (!item) break;
  457. doDelete(item->get());
  458. item->Release();
  459. }
  460. lookup.kill();
  461. }
  462. void init()
  463. {
  464. StringBuffer dir;
  465. globals->getProp("@thorPath", dir);
  466. StringBuffer path(dir);
  467. addPathSepChar(path);
  468. path.append("fiplist_");
  469. globals->getProp("@name", path);
  470. path.append("_");
  471. path.append(queryClusterGroup().rank(queryMyNode()));
  472. path.append(".lst");
  473. ensureDirectoryForFile(path.str());
  474. Owned<IFile> iFile = createIFile(path.str());
  475. iFileIO.setown(iFile->open(IFOreadwrite));
  476. if (!iFileIO)
  477. {
  478. PROGLOG("Failed to open/create backup file: %s", path.str());
  479. return;
  480. }
  481. MemoryBuffer mb;
  482. size32_t sz = read(iFileIO, 0, (size32_t)iFileIO->size(), mb);
  483. const char *mem = mb.toByteArray();
  484. if (mem)
  485. {
  486. if (sz<=3)
  487. {
  488. WARNLOG("Corrupt files-in-progress file detected: %s", path.str());
  489. backup(dir, iFile);
  490. }
  491. else
  492. {
  493. const char *endMem = mem+mb.length();
  494. mem += 3; // formatV header
  495. do
  496. {
  497. const char *eol = strchr(mem, '\n');
  498. if (!eol)
  499. {
  500. WARNLOG("Corrupt files-in-progress file detected: %s", path.str());
  501. backup(dir, iFile);
  502. break;
  503. }
  504. StringAttr fip(mem, eol-mem);
  505. doDelete(fip);
  506. mem = eol+1;
  507. }
  508. while (mem != endMem);
  509. }
  510. }
  511. write();
  512. }
  513. // IFileInProgressHandler
  514. virtual void add(const char *fip)
  515. {
  516. CriticalBlock b(crit);
  517. CStringAttr *item = lookup.find(fip);
  518. assertex(!item);
  519. item = new CStringAttr(fip);
  520. fipList.enqueue(item);
  521. lookup.add(* item);
  522. write();
  523. }
  524. virtual void remove(const char *fip)
  525. {
  526. CriticalBlock b(crit);
  527. CStringAttr *item = lookup.find(fip);
  528. if (item)
  529. {
  530. lookup.removeExact(item);
  531. unsigned pos = fipList.find(item);
  532. fipList.dequeue(item);
  533. item->Release();
  534. write();
  535. }
  536. }
  537. };
  538. const char *CFileInProgressHandler::formatV = "01\n";
  539. class CThorResourceSlave : public CThorResourceBase
  540. {
  541. Owned<IThorFileCache> fileCache;
  542. Owned<IBackup> backupHandler;
  543. Owned<IFileInProgressHandler> fipHandler;
  544. public:
  545. IMPLEMENT_IINTERFACE_USING(CSimpleInterface);
  546. CThorResourceSlave()
  547. {
  548. backupHandler.setown(createBackupHandler(queryBaseDirectory()));
  549. fileCache.setown(createFileCache(globals->getPropInt("@fileCacheLimit", 1800)));
  550. fipHandler.setown(new CFileInProgressHandler());
  551. }
  552. ~CThorResourceSlave()
  553. {
  554. fileCache.clear();
  555. backupHandler.clear();
  556. fipHandler.clear();
  557. }
  558. // IThorResource
  559. virtual IThorFileCache &queryFileCache() { return *fileCache.get(); }
  560. virtual IBackup &queryBackup() { return *backupHandler.get(); }
  561. virtual IFileInProgressHandler &queryFileInProgressHandler() { return *fipHandler.get(); }
  562. };
  563. void slaveMain()
  564. {
  565. unsigned masterMemMB = globals->getPropInt("@masterTotalMem");
  566. HardwareInfo hdwInfo;
  567. getHardwareInfo(hdwInfo);
  568. if (hdwInfo.totalMemory < masterMemMB)
  569. WARNLOG("Slave has less memory than master node"); // JCSMORE, error?
  570. unsigned gmemSize = globals->getPropInt("@globalMemorySize");
  571. if (gmemSize >= hdwInfo.totalMemory)
  572. {
  573. // should prob. error here
  574. }
  575. roxiemem::setTotalMemoryLimit(((memsize_t)gmemSize) * 0x100000, 0, NULL);
  576. CJobListener jobListener;
  577. CThorResourceSlave slaveResource;
  578. setIThorResource(slaveResource);
  579. #ifdef __linux__
  580. bool useMirrorMount = globals->getPropBool("Debug/@useMirrorMount", false);
  581. if (useMirrorMount && queryClusterGroup().ordinality() > 2)
  582. {
  583. unsigned slaves = queryClusterGroup().ordinality()-1;
  584. rank_t next = queryClusterGroup().rank()%slaves; // note 0 = master
  585. const IpAddress &ip = queryClusterGroup().queryNode(next+1).endpoint();
  586. StringBuffer ipStr;
  587. ip.getIpText(ipStr);
  588. PROGLOG("Redirecting local mount to %s", ipStr.str());
  589. const char * overrideReplicateDirectory = globals->queryProp("@thorReplicateDirectory");
  590. StringBuffer repdir;
  591. if (getConfigurationDirectory(globals->queryPropTree("Directories"),"mirror","thor",globals->queryProp("@name"),repdir))
  592. overrideReplicateDirectory = repdir.str();
  593. else
  594. overrideReplicateDirectory = "/d$";
  595. setLocalMountRedirect(ip, overrideReplicateDirectory, "/mnt/mirror");
  596. }
  597. #endif
  598. jobListener.main();
  599. }
  600. void abortSlave()
  601. {
  602. if (clusterInitialized())
  603. queryClusterComm().cancel(0, masterSlaveMpTag);
  604. }