daserver.cpp 17 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465
  1. /*##############################################################################
  2. HPCC SYSTEMS software Copyright (C) 2012 HPCC Systems®.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. ############################################################################## */
  13. #include "build-config.h"
  14. #include "platform.h"
  15. #include "thirdparty.h"
  16. #include "jlib.hpp"
  17. #include "jlog.ipp"
  18. #include "jptree.hpp"
  19. #include "jmisc.hpp"
  20. #include "jutil.hpp"
  21. #include "mpbase.hpp"
  22. #include "mpcomm.hpp"
  23. #include "mplog.hpp" //MORE: deprecated feature, not used by any new clients, remove once all deployed clients that depend on it are upgraded
  24. #include "rmtfile.hpp"
  25. #include "dacoven.hpp"
  26. #include "dadfs.hpp"
  27. #include "dasess.hpp"
  28. #include "daaudit.hpp"
  29. #include "dasds.hpp"
  30. #include "daclient.hpp"
  31. #include "dasubs.ipp"
  32. #include "danqs.hpp"
  33. #include "dadiags.hpp"
  34. #ifdef _DEBUG
  35. //#define DALI_MIN
  36. #endif
  37. #ifdef DALI_MIN
  38. #define _NO_LDAP
  39. #endif
  40. #include "daserver.hpp"
  41. #ifndef _NO_LDAP
  42. #include "daldap.hpp"
  43. #endif
  44. Owned<IPropertyTree> serverConfig;
  45. static IArrayOf<IDaliServer> servers;
  46. static CriticalSection *stopServerCrit;
  47. MODULE_INIT(INIT_PRIORITY_DALI_DASERVER)
  48. {
  49. stopServerCrit = new CriticalSection;
  50. return true;
  51. }
  52. MODULE_EXIT()
  53. {
  54. servers.kill(); // should already be clear when stopped
  55. serverConfig.clear();
  56. delete stopServerCrit;
  57. }
  58. #define DEFAULT_PERF_REPORT_DELAY 60
  59. #define DEFAULT_MOUNT_POINT "/mnt/dalimirror/"
  60. void setMsgLevel(ILogMsgHandler * fileMsgHandler, unsigned level)
  61. {
  62. ILogMsgFilter *filter = getSwitchLogMsgFilterOwn(getComponentLogMsgFilter(3), getCategoryLogMsgFilter(MSGAUD_all, MSGCLS_all, level, true), getDefaultLogMsgFilter());
  63. queryLogMsgManager()->changeMonitorFilter(queryStderrLogMsgHandler(), filter);
  64. queryLogMsgManager()->changeMonitorFilterOwn(fileMsgHandler, filter);
  65. }
  66. void AddServers(const char *auditdir)
  67. {
  68. // order significant
  69. servers.append(*createDaliSessionServer());
  70. servers.append(*createDaliPublisherServer());
  71. servers.append(*createDaliSDSServer(serverConfig));
  72. servers.append(*createDaliNamedQueueServer());
  73. servers.append(*createDaliDFSServer(serverConfig));
  74. servers.append(*createDaliAuditServer(auditdir));
  75. servers.append(*createDaliDiagnosticsServer());
  76. // add new coven servers here
  77. }
  78. static bool serverStopped = false;
  79. static void stopServer()
  80. {
  81. CriticalBlock b(*stopServerCrit); // NB: will not protect against abort handler, which will interrupt thread and be on same TID.
  82. if (serverStopped) return;
  83. serverStopped = true;
  84. ForEachItemInRev(h,servers)
  85. {
  86. IDaliServer &server=servers.item(h);
  87. LOG(MCprogress, unknownJob, "Suspending %d",h);
  88. server.suspend();
  89. }
  90. ForEachItemInRev(i,servers)
  91. {
  92. IDaliServer &server=servers.item(i);
  93. LOG(MCprogress, unknownJob, "Stopping %d",i);
  94. server.stop();
  95. }
  96. closeCoven();
  97. ForEachItemInRev(j,servers)
  98. {
  99. servers.remove(j); // ensure correct order for destruction
  100. }
  101. stopLogMsgReceivers(); //MORE: deprecated feature, not used by any new clients, remove once all deployed clients that depend on it are upgraded
  102. stopMPServer();
  103. }
  104. bool actionOnAbort()
  105. {
  106. stopServer();
  107. return true;
  108. }
  109. USE_JLIB_ALLOC_HOOK;
  110. void usage(void)
  111. {
  112. printf("daserver (option)\n");
  113. printf("--rank|-r <value>\t: dali ranking value\n");
  114. printf("--server|-s <value>\t: server ip if not local host\n");
  115. printf("--port|-p <value>\t: server port only effective if --server set\n");
  116. printf("--daemon|-d <instanceName>\t: run daemon as instance\n");
  117. }
  118. int main(int argc, char* argv[])
  119. {
  120. rank_t myrank = 0;
  121. char *server = nullptr;
  122. int port = 0;
  123. for (unsigned i=1;i<(unsigned)argc;i++) {
  124. if (streq(argv[i],"--daemon") || streq(argv[i],"-d")) {
  125. if (daemon(1,0) || write_pidfile(argv[++i])) {
  126. perror("Failed to daemonize");
  127. return EXIT_FAILURE;
  128. }
  129. }
  130. else if (streq(argv[i],"--server") || streq(argv[i],"-s"))
  131. server = argv[++i];
  132. else if (streq(argv[i],"--port") || streq(argv[i],"-p"))
  133. port = atoi(argv[++i]);
  134. else if (streq(argv[i],"--rank") || streq(argv[i],"-r"))
  135. myrank = atoi(argv[++i]);
  136. else {
  137. usage();
  138. return EXIT_FAILURE;
  139. }
  140. }
  141. InitModuleObjects();
  142. NoQuickEditSection x;
  143. try {
  144. EnableSEHtoExceptionMapping();
  145. #ifndef __64BIT__
  146. // Restrict stack sizes on 32-bit systems
  147. Thread::setDefaultStackSize(0x20000);
  148. #endif
  149. setAllocHook(true);
  150. Owned<IFile> sentinelFile = createSentinelTarget();
  151. removeSentinelFile(sentinelFile);
  152. OwnedIFile confIFile = createIFile(DALICONF);
  153. if (confIFile->exists())
  154. serverConfig.setown(createPTreeFromXMLFile(DALICONF));
  155. ILogMsgHandler * fileMsgHandler;
  156. {
  157. Owned<IComponentLogFileCreator> lf = createComponentLogFileCreator(serverConfig, "dali");
  158. lf->setLogDirSubdir("server");//add to tail of config log dir
  159. lf->setName("DaServer");//override default filename
  160. fileMsgHandler = lf->beginLogging();
  161. }
  162. PROGLOG("Build %s", BUILD_TAG);
  163. if (serverConfig)
  164. {
  165. StringBuffer dataPath;
  166. if (getConfigurationDirectory(serverConfig->queryPropTree("Directories"),"data","dali",serverConfig->queryProp("@name"),dataPath))
  167. serverConfig->setProp("@dataPath",dataPath.str());
  168. else
  169. serverConfig->getProp("@dataPath",dataPath);
  170. if (dataPath.length()) {
  171. RemoteFilename rfn;
  172. rfn.setRemotePath(dataPath);
  173. if (!rfn.isLocal()) {
  174. OERRLOG("if a dataPath is specified, it must be on local machine");
  175. return 0;
  176. }
  177. addPathSepChar(dataPath);
  178. serverConfig->setProp("@dataPath", dataPath.str());
  179. if (dataPath.length())
  180. recursiveCreateDirectory(dataPath.str());
  181. }
  182. // JCSMORE remoteBackupLocation should not be a property of SDS section really.
  183. StringBuffer mirrorPath;
  184. if (!getConfigurationDirectory(serverConfig->queryPropTree("Directories"),"mirror","dali",serverConfig->queryProp("@name"),mirrorPath))
  185. serverConfig->getProp("SDS/@remoteBackupLocation",mirrorPath);
  186. if (mirrorPath.length())
  187. {
  188. try
  189. {
  190. addPathSepChar(mirrorPath);
  191. try
  192. {
  193. StringBuffer backupURL;
  194. if (mirrorPath.length()<=2 || !isPathSepChar(mirrorPath.charAt(0)) || !isPathSepChar(mirrorPath.charAt(1)))
  195. { // local machine path, convert to url
  196. const char *backupnode = serverConfig->queryProp("SDS/@backupComputer");
  197. RemoteFilename rfn;
  198. if (backupnode&&*backupnode) {
  199. SocketEndpoint ep(backupnode);
  200. rfn.setPath(ep,mirrorPath.str());
  201. }
  202. else {
  203. OWARNLOG("Local path used for backup url: %s", mirrorPath.str());
  204. rfn.setLocalPath(mirrorPath.str());
  205. }
  206. rfn.getRemotePath(backupURL);
  207. mirrorPath.clear().append(backupURL);
  208. }
  209. else
  210. backupURL.append(mirrorPath);
  211. recursiveCreateDirectory(backupURL.str());
  212. addPathSepChar(backupURL);
  213. serverConfig->setProp("SDS/@remoteBackupLocation", backupURL.str());
  214. PROGLOG("Backup URL = %s", backupURL.str());
  215. }
  216. catch (IException *e)
  217. {
  218. EXCLOG(e, "Failed to create remote backup directory, disabling backups", MSGCLS_warning);
  219. serverConfig->removeProp("SDS/@remoteBackupLocation");
  220. mirrorPath.clear();
  221. e->Release();
  222. }
  223. if (mirrorPath.length())
  224. {
  225. PROGLOG("Checking backup location: %s", mirrorPath.str());
  226. #if defined(__linux__)
  227. if (serverConfig->getPropBool("@useNFSBackupMount", false))
  228. {
  229. RemoteFilename rfn;
  230. if (mirrorPath.length()<=2 || !isPathSepChar(mirrorPath.charAt(0)) || !isPathSepChar(mirrorPath.charAt(1)))
  231. rfn.setLocalPath(mirrorPath.str());
  232. else
  233. rfn.setRemotePath(mirrorPath.str());
  234. if (!rfn.getPort() && !rfn.isLocal())
  235. {
  236. StringBuffer mountPoint;
  237. serverConfig->getProp("@mountPoint", mountPoint);
  238. if (!mountPoint.length())
  239. mountPoint.append(DEFAULT_MOUNT_POINT);
  240. addPathSepChar(mountPoint);
  241. recursiveCreateDirectory(mountPoint.str());
  242. PROGLOG("Mounting url \"%s\" on mount point \"%s\"", mirrorPath.str(), mountPoint.str());
  243. bool ub = unmountDrive(mountPoint.str());
  244. if (!mountDrive(mountPoint.str(), rfn))
  245. {
  246. if (!ub)
  247. PROGLOG("Failed to remount mount point \"%s\", possibly in use?", mountPoint.str());
  248. else
  249. PROGLOG("Failed to mount \"%s\"", mountPoint.str());
  250. return 0;
  251. }
  252. else
  253. serverConfig->setProp("SDS/@remoteBackupLocation", mountPoint.str());
  254. mirrorPath.clear().append(mountPoint);
  255. }
  256. }
  257. #endif
  258. StringBuffer backupCheck(dataPath);
  259. backupCheck.append("bakchk.").append((unsigned)GetCurrentProcessId());
  260. OwnedIFile iFileDataDir = createIFile(backupCheck.str());
  261. OwnedIFileIO iFileIO = iFileDataDir->open(IFOcreate);
  262. iFileIO.clear();
  263. try
  264. {
  265. backupCheck.clear().append(mirrorPath).append("bakchk.").append((unsigned)GetCurrentProcessId());
  266. OwnedIFile iFileBackup = createIFile(backupCheck.str());
  267. if (iFileBackup->exists())
  268. {
  269. PROGLOG("remoteBackupLocation and dali data path point to same location! : %s", mirrorPath.str());
  270. iFileDataDir->remove();
  271. return 0;
  272. }
  273. }
  274. catch (IException *)
  275. {
  276. try { iFileDataDir->remove(); } catch (IException *e) { EXCLOG(e, NULL); e->Release(); }
  277. throw;
  278. }
  279. iFileDataDir->remove();
  280. StringBuffer dest(mirrorPath.str());
  281. dest.append(DALICONF);
  282. copyFile(dest.str(), DALICONF);
  283. StringBuffer covenPath(dataPath);
  284. OwnedIFile ifile = createIFile(covenPath.append(DALICOVEN).str());
  285. if (ifile->exists())
  286. {
  287. dest.clear().append(mirrorPath.str()).append(DALICOVEN);
  288. copyFile(dest.str(), covenPath.str());
  289. }
  290. }
  291. if (serverConfig->getPropBool("@daliServixCaching", true))
  292. setDaliServixSocketCaching(true);
  293. }
  294. catch (IException *e)
  295. {
  296. StringBuffer s("Failure whilst preparing dali backup location: ");
  297. LOG(MCoperatorError, unknownJob, e, s.append(mirrorPath).append(". Backup disabled").str());
  298. serverConfig->removeProp("SDS/@remoteBackupLocation");
  299. e->Release();
  300. }
  301. }
  302. }
  303. else
  304. serverConfig.setown(createPTree());
  305. write_pidfile(serverConfig->queryProp("@name"));
  306. NamedMutex globalNamedMutex("DASERVER");
  307. if (!serverConfig->getPropBool("allowMultipleDalis"))
  308. {
  309. PROGLOG("Checking for existing daserver instances");
  310. if (!globalNamedMutex.lockWait(0))
  311. {
  312. OWARNLOG("Another DASERVER process is currently running");
  313. return 0;
  314. }
  315. }
  316. SocketEndpoint ep;
  317. SocketEndpointArray epa;
  318. if (!server) {
  319. ep.setLocalHost(DALI_SERVER_PORT);
  320. epa.append(ep);
  321. }
  322. else {
  323. if (!port)
  324. ep.set(server,DALI_SERVER_PORT);
  325. else
  326. ep.set(server,port);
  327. epa.append(ep);
  328. }
  329. unsigned short myport = epa.item(myrank).port;
  330. startMPServer(myport,true);
  331. setMsgLevel(fileMsgHandler, serverConfig->getPropInt("SDS/@msgLevel", 100));
  332. startLogMsgChildReceiver();
  333. startLogMsgParentReceiver();
  334. IGroup *group = createIGroup(epa);
  335. initCoven(group,serverConfig);
  336. group->Release();
  337. epa.kill();
  338. // Audit logging
  339. StringBuffer auditDir;
  340. {
  341. Owned<IComponentLogFileCreator> lf = createComponentLogFileCreator(serverConfig, "dali");
  342. lf->setLogDirSubdir("audit");//add to tail of config log dir
  343. lf->setName("DaAudit");//override default filename
  344. lf->setCreateAliasFile(false);
  345. lf->setMsgFields(MSGFIELD_timeDate | MSGFIELD_code);
  346. lf->setMsgAudiences(MSGAUD_audit);
  347. lf->setMaxDetail(TopDetail);
  348. lf->beginLogging();
  349. auditDir.set(lf->queryLogDir());
  350. }
  351. // SNMP logging
  352. bool enableSNMP = serverConfig->getPropBool("SDS/@enableSNMP");
  353. if (serverConfig->getPropBool("SDS/@enableSysLog",true))
  354. UseSysLogForOperatorMessages();
  355. AddServers(auditDir.str());
  356. addAbortHandler(actionOnAbort);
  357. startPerformanceMonitor(serverConfig->getPropInt("Coven/@perfReportDelay", DEFAULT_PERF_REPORT_DELAY)*1000);
  358. StringBuffer absPath;
  359. StringBuffer dataPath;
  360. serverConfig->getProp("@dataPath",dataPath);
  361. makeAbsolutePath(dataPath.str(), absPath);
  362. setPerformanceMonitorPrimaryFileSystem(absPath.str());
  363. if(serverConfig->hasProp("SDS/@remoteBackupLocation"))
  364. {
  365. absPath.clear();
  366. serverConfig->getProp("SDS/@remoteBackupLocation",dataPath.clear());
  367. makeAbsolutePath(dataPath.str(), absPath);
  368. setPerformanceMonitorSecondaryFileSystem(absPath.str());
  369. }
  370. try
  371. {
  372. ForEachItemIn(i1,servers)
  373. {
  374. IDaliServer &server=servers.item(i1);
  375. server.start();
  376. }
  377. }
  378. catch (IException *e)
  379. {
  380. EXCLOG(e, "Failed whilst starting servers");
  381. stopServer();
  382. stopPerformanceMonitor();
  383. throw;
  384. }
  385. try {
  386. #ifndef _NO_LDAP
  387. setLDAPconnection(createDaliLdapConnection(serverConfig->getPropTree("Coven/ldapSecurity")));
  388. #endif
  389. }
  390. catch (IException *e) {
  391. EXCLOG(e, "LDAP initialization error");
  392. stopServer();
  393. stopPerformanceMonitor();
  394. throw;
  395. }
  396. PROGLOG("DASERVER[%d] starting - listening to port %d",myrank,queryMyNode()->endpoint().port);
  397. startMPServer(myport,false);
  398. bool ok = true;
  399. ForEachItemIn(i2,servers)
  400. {
  401. IDaliServer &server=servers.item(i2);
  402. try {
  403. server.ready();
  404. }
  405. catch (IException *e) {
  406. EXCLOG(e,"Exception starting Dali Server");
  407. ok = false;
  408. }
  409. }
  410. if (ok) {
  411. writeSentinelFile(sentinelFile);
  412. covenMain();
  413. removeAbortHandler(actionOnAbort);
  414. }
  415. stopLogMsgListener();
  416. stopServer();
  417. stopPerformanceMonitor();
  418. }
  419. catch (IException *e) {
  420. EXCLOG(e, "Exception");
  421. }
  422. UseSysLogForOperatorMessages(false);
  423. return 0;
  424. }