dafilesrv.cpp 33 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887
  1. /*##############################################################################
  2. HPCC SYSTEMS software Copyright (C) 2012 HPCC Systems®.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. ############################################################################## */
  13. #include "platform.h"
  14. #include "portlist.h"
  15. #include "jlib.hpp"
  16. #include "jiface.hpp"
  17. #include "jutil.hpp"
  18. #include "jfile.hpp"
  19. #include "jlog.hpp"
  20. #include "jmisc.hpp"
  21. #include "dalienv.hpp"
  22. #include "dafdesc.hpp"
  23. #ifdef _MSC_VER
  24. #pragma warning (disable : 4355)
  25. #endif
  26. static const bool defaultRowServiceOnStdPort = true;
  27. static const bool defaultDedicatedRowServiceSSL = false;
  28. static const char* defaultRowSericeConfiguration = "RowSvc";
  29. #include "remoteerr.hpp"
  30. #include "dafscommon.hpp"
  31. #include "rmtclient.hpp"
  32. #include "dafsserver.hpp"
  33. void usage()
  34. {
  35. printf("dafilesrv usage:\n");
  36. printf(" dafilesrv [-T<n>] [...] [<port>] [<send-buff-size-kb> <recv-buff-size-kb>]\n");
  37. printf(" -- run test local\n");
  38. printf(" dafilesrv -D [ -L <log-dir> ] [ -LOCAL ] -- run as linux daemon\n");
  39. printf(" dafilesrv -R -- run remote (linux daemon, windows standalone)\n");
  40. printf(" dafilesrv -install -- install windows service\n");
  41. printf(" dafilesrv -remove -- remove windows service\n\n");
  42. printf(" add -I <instance name> to specify an instance name\n");
  43. printf(" add -NOSSL to disable SSL sockets, even when specified in configuration\n\n");
  44. printf(" additional optional args:\n");
  45. printf(" [-p <port>] [-sslp <ssl-port>] [-sbsize <send-buff-size-kb>] [-rbsize <recv-buff-size-kb>]\n");
  46. printf(" [-addr <ip>:<port>]\n\n");
  47. printf(" Standard port is %d\n",DAFILESRV_PORT);
  48. printf(" Standard SSL port is %d (certificate and key required in environment.conf)\n",SECURE_DAFILESRV_PORT);
  49. printf(" Version: %s\n\n",remoteServerVersionString());
  50. }
  51. static Owned<IRemoteFileServer> server;
  52. #ifdef _WIN32
  53. // Service code
  54. #define DAFS_SERVICE_NAME "DaFileSrv"
  55. #define DAFS_SERVICE_DISPLAY_NAME "Dali File Server"
  56. void LogError( const char *s,DWORD dwError )
  57. {
  58. LPTSTR lpBuffer = NULL;
  59. FormatMessage( FORMAT_MESSAGE_ALLOCATE_BUFFER | FORMAT_MESSAGE_FROM_SYSTEM, NULL, dwError,
  60. MAKELANGID(LANG_NEUTRAL, SUBLANG_DEFAULT),
  61. (LPTSTR) &lpBuffer, 0, NULL );
  62. OERRLOG( "%s(%d): %s", s, dwError, lpBuffer );
  63. LocalFree( lpBuffer );
  64. }
  65. // Service initialization
  66. static class CService *THIS; // singleton instance
  67. class CService
  68. {
  69. SERVICE_STATUS ServiceStatus;
  70. SERVICE_STATUS_HANDLE hStatus;
  71. // Control handler function
  72. void doControlHandler(DWORD request)
  73. {
  74. switch(request) {
  75. case SERVICE_CONTROL_STOP:
  76. PROGLOG(DAFS_SERVICE_NAME " Control: stopped");
  77. ServiceStatus.dwWin32ExitCode = 0;
  78. ServiceStatus.dwCurrentState = SERVICE_STOPPED;
  79. break;
  80. case SERVICE_CONTROL_SHUTDOWN:
  81. PROGLOG(DAFS_SERVICE_NAME "Control: shutdown");
  82. ServiceStatus.dwWin32ExitCode = 0;
  83. ServiceStatus.dwCurrentState = SERVICE_STOPPED;
  84. break;
  85. case SERVICE_CONTROL_INTERROGATE:
  86. break;
  87. default:
  88. PROGLOG(DAFS_SERVICE_NAME " Control: %d",request);
  89. break;
  90. }
  91. // Report current status
  92. SetServiceStatus (hStatus, &ServiceStatus);
  93. return;
  94. }
  95. void doServiceMain(int argc, char** argv)
  96. {
  97. ServiceStatus.dwServiceType = SERVICE_WIN32;
  98. ServiceStatus.dwCurrentState = SERVICE_START_PENDING;
  99. ServiceStatus.dwControlsAccepted = SERVICE_ACCEPT_STOP | SERVICE_ACCEPT_SHUTDOWN;
  100. ServiceStatus.dwWin32ExitCode = 0;
  101. ServiceStatus.dwServiceSpecificExitCode = 0;
  102. ServiceStatus.dwCheckPoint = 0;
  103. ServiceStatus.dwWaitHint = 0;
  104. hStatus = RegisterServiceCtrlHandler(DAFS_SERVICE_NAME, (LPHANDLER_FUNCTION)ControlHandler);
  105. if (hStatus == (SERVICE_STATUS_HANDLE)0) {
  106. // Registering Control Handler failed
  107. LogError("RegisterServiceCtrlHandler",GetLastError());
  108. return;
  109. }
  110. // Initialize Service
  111. if (!init()) {
  112. // Initialization failed
  113. ServiceStatus.dwCurrentState = SERVICE_STOPPED;
  114. ServiceStatus.dwWin32ExitCode = -1;
  115. SetServiceStatus(hStatus, &ServiceStatus);
  116. return;
  117. }
  118. // We report the running status to SCM.
  119. ServiceStatus.dwCurrentState = SERVICE_RUNNING;
  120. SetServiceStatus (hStatus, &ServiceStatus);
  121. // The worker loop of a service
  122. run();
  123. return;
  124. }
  125. public:
  126. CService()
  127. {
  128. THIS = this;
  129. }
  130. void start()
  131. {
  132. SERVICE_TABLE_ENTRY ServiceTable[2];
  133. ServiceTable[0].lpServiceName = DAFS_SERVICE_NAME;
  134. ServiceTable[0].lpServiceProc = (LPSERVICE_MAIN_FUNCTION)ServiceMain;
  135. ServiceTable[1].lpServiceName = NULL;
  136. ServiceTable[1].lpServiceProc = NULL;
  137. // Start the control dispatcher thread for our service
  138. if (!StartServiceCtrlDispatcher(ServiceTable)) {
  139. LogError("StartServiceCtrlDispatcher",GetLastError());
  140. }
  141. }
  142. void stop()
  143. {
  144. ServiceStatus.dwCurrentState = SERVICE_STOPPED;
  145. ServiceStatus.dwCheckPoint = 0;
  146. ServiceStatus.dwWaitHint = 0;
  147. ServiceStatus.dwWin32ExitCode = 0;
  148. ServiceStatus.dwServiceSpecificExitCode = 0;
  149. SetServiceStatus(hStatus, &ServiceStatus);
  150. }
  151. // Control handler function
  152. static void ControlHandler(DWORD request)
  153. {
  154. THIS->doControlHandler(request) ;
  155. }
  156. static void ServiceMain(int argc, char** argv)
  157. {
  158. THIS->doServiceMain(argc, argv);
  159. }
  160. bool running() { return ServiceStatus.dwCurrentState == SERVICE_RUNNING; }
  161. virtual bool init() = 0;
  162. virtual void run() = 0;
  163. };
  164. bool installService(const char *servicename,const char *servicedisplayname,const char *dependancies)
  165. {
  166. DWORD err = ERROR_SUCCESS;
  167. char path[512];
  168. if (GetModuleFileName( NULL, path, sizeof(path) )) {
  169. SC_HANDLE hSCM = OpenSCManager( NULL, NULL, SC_MANAGER_ALL_ACCESS); // full access rights
  170. if (hSCM) {
  171. SC_HANDLE hService = CreateService(
  172. hSCM, // SCManager database
  173. servicename, // name of service
  174. servicedisplayname, // name to display
  175. SERVICE_ALL_ACCESS, // desired access
  176. SERVICE_WIN32_OWN_PROCESS,//|SERVICE_INTERACTIVE_PROCESS , // service type
  177. SERVICE_AUTO_START, // start type
  178. SERVICE_ERROR_NORMAL, // error control type
  179. path, // service's binary
  180. NULL, // no load ordering group
  181. NULL, // no tag identifier
  182. dependancies, // dependencies
  183. NULL, // LocalSystem account
  184. NULL); // no password
  185. if (hService) {
  186. Sleep(1000);
  187. StartService(hService,0,0);
  188. CloseServiceHandle(hService);
  189. }
  190. else
  191. err = GetLastError();
  192. }
  193. else
  194. err = GetLastError();
  195. CloseServiceHandle(hSCM);
  196. }
  197. else
  198. err = GetLastError();
  199. if (err!=ERROR_SUCCESS) {
  200. LogError("Install failed",err);
  201. return false;
  202. }
  203. return true;
  204. }
  205. bool uninstallService(const char *servicename,const char *servicedisplayname)
  206. {
  207. DWORD err = ERROR_SUCCESS;
  208. SC_HANDLE hSCM = OpenSCManager( NULL, NULL, SC_MANAGER_ALL_ACCESS); // full access rights
  209. if (hSCM) {
  210. SC_HANDLE hService = OpenService(hSCM, servicename, SERVICE_STOP|SERVICE_QUERY_STATUS);
  211. if (hService) {
  212. // try to stop the service
  213. SERVICE_STATUS ss;
  214. if ( ControlService( hService, SERVICE_CONTROL_STOP, &ss ) ) {
  215. PROGLOG("Stopping %s", servicedisplayname);
  216. Sleep( 1000 );
  217. while( QueryServiceStatus( hService, &ss ) ) {
  218. if ( ss.dwCurrentState != SERVICE_STOP_PENDING )
  219. break;
  220. Sleep( 1000 );
  221. }
  222. if ( ss.dwCurrentState == SERVICE_STOPPED )
  223. PROGLOG("%s Service stopped",servicedisplayname);
  224. else
  225. OERRLOG("%s failed to stop",servicedisplayname);
  226. }
  227. CloseServiceHandle(hService);
  228. }
  229. hService = OpenService(hSCM, servicename, DELETE);
  230. if (hService) {
  231. // now remove the service
  232. if (!DeleteService(hService))
  233. err = GetLastError();
  234. CloseServiceHandle(hService);
  235. }
  236. else
  237. err = GetLastError();
  238. CloseServiceHandle(hSCM);
  239. }
  240. else
  241. err = GetLastError();
  242. if (err!=ERROR_SUCCESS) {
  243. LogError("Uninstall failed",err);
  244. return false;
  245. }
  246. return true;
  247. }
  248. #else
  249. void sighandler(int signum, siginfo_t *info, void *extra)
  250. {
  251. PROGLOG("Caught signal %d, %p", signum, info?info->si_addr:0);
  252. if (server)
  253. server->stop();
  254. }
  255. int initDaemon()
  256. {
  257. int ret = daemon(1,0);
  258. if (ret)
  259. return ret;
  260. struct sigaction act;
  261. sigset_t blockset;
  262. sigemptyset(&blockset);
  263. act.sa_mask = blockset;
  264. act.sa_handler = SIG_IGN;
  265. act.sa_flags = 0;
  266. sigaction(SIGHUP, &act, NULL);
  267. act.sa_flags = SA_SIGINFO;
  268. act.sa_sigaction = &sighandler;
  269. sigaction(SIGTERM, &act, NULL);
  270. sigaction(SIGINT, &act, NULL);
  271. return 0;
  272. }
  273. #endif
  274. int main(int argc,char **argv)
  275. {
  276. InitModuleObjects();
  277. EnableSEHtoExceptionMapping();
  278. #ifndef __64BIT__
  279. // Restrict stack sizes on 32-bit systems
  280. Thread::setDefaultStackSize(0x10000); // 64K stack (also set in windows DSP)
  281. #endif
  282. Owned<IFile> sentinelFile = createSentinelTarget();
  283. removeSentinelFile(sentinelFile);
  284. SocketEndpoint listenep;
  285. unsigned sendbufsize = 0;
  286. unsigned recvbufsize = 0;
  287. int i = 1;
  288. bool isdaemon = (memicmp(argv[0]+strlen(argv[0])-4,".exe",4)==0);
  289. // bit of a kludge for windows - if .exe not specified then not daemon
  290. bool locallisten = false;
  291. const char *logdir=NULL;
  292. StringBuffer logDir;
  293. StringBuffer componentName;
  294. // Get SSL Settings
  295. DAFSConnectCfg connectMethod;
  296. unsigned short port;
  297. unsigned short sslport;
  298. const char * sslCertFile;
  299. const char * sslKeyFile;
  300. queryDafsSecSettings(&connectMethod, &port, &sslport, &sslCertFile, &sslKeyFile, nullptr);
  301. unsigned maxThreads = DEFAULT_THREADLIMIT;
  302. unsigned maxThreadsDelayMs = DEFAULT_THREADLIMITDELAYMS;
  303. unsigned maxAsyncCopy = DEFAULT_ASYNCCOPYMAX;
  304. unsigned parallelRequestLimit = DEFAULT_STDCMD_PARALLELREQUESTLIMIT;
  305. unsigned throttleDelayMs = DEFAULT_STDCMD_THROTTLEDELAYMS;
  306. unsigned throttleCPULimit = DEFAULT_STDCMD_THROTTLECPULIMIT;
  307. unsigned throttleQueueLimit = DEFAULT_STDCMD_THROTTLEQUEUELIMIT;
  308. unsigned parallelSlowRequestLimit = DEFAULT_SLOWCMD_PARALLELREQUESTLIMIT;
  309. unsigned throttleSlowDelayMs = DEFAULT_SLOWCMD_THROTTLEDELAYMS;
  310. unsigned throttleSlowCPULimit = DEFAULT_SLOWCMD_THROTTLECPULIMIT;
  311. unsigned throttleSlowQueueLimit = DEFAULT_SLOWCMD_THROTTLEQUEUELIMIT;
  312. unsigned dedicatedRowServicePort = DEFAULT_ROWSERVICE_PORT;
  313. StringAttr rowServiceConfiguration = defaultRowSericeConfiguration;
  314. bool dedicatedRowServiceSSL = defaultDedicatedRowServiceSSL;
  315. bool rowServiceOnStdPort = defaultRowServiceOnStdPort;
  316. // these should really be in env, but currently they are not ...
  317. listenep.port = port;
  318. // get command line arguements, including dafilesrv name
  319. while (argc>i) {
  320. if (stricmp(argv[i],"-D")==0) {
  321. i++;
  322. isdaemon = true;
  323. }
  324. else if (stricmp(argv[i],"-R")==0) { // for remote run
  325. i++;
  326. #ifdef _WIN32
  327. isdaemon = false;
  328. #else
  329. isdaemon = true;
  330. #endif
  331. }
  332. else if ((argv[i][0]=='-')&&(toupper(argv[i][1])=='T')&&(!argv[i][2]||isdigit(argv[i][2]))) {
  333. if (argv[i][2])
  334. setDaliServerTrace((byte)atoi(argv[i]+2));
  335. i++;
  336. isdaemon = false;
  337. }
  338. else if ((argc>i+1)&&(stricmp(argv[i],"-L")==0)) {
  339. i++;
  340. logDir.clear().append(argv[i++]);
  341. }
  342. else if ((argc>i+1)&&(stricmp(argv[i],"-I")==0)) {
  343. i++;
  344. componentName.clear().append(argv[i++]);
  345. }
  346. else if ((argc>i+1)&&(stricmp(argv[i],"-p")==0)) {
  347. i++;
  348. listenep.port = atoi(argv[i++]);
  349. }
  350. else if ((argc>i+1)&&(stricmp(argv[i],"-addr")==0)) {
  351. i++;
  352. if (strchr(argv[i],'.')||!isdigit(argv[i][0]))
  353. listenep.set(argv[i], listenep.port);
  354. else
  355. listenep.port = atoi(argv[i]);
  356. i++;
  357. }
  358. else if ((argc>i+1)&&(stricmp(argv[i],"-sslp")==0)) {
  359. i++;
  360. sslport = atoi(argv[i++]);
  361. }
  362. else if ((argc>i+1)&&(stricmp(argv[i],"-sbsize")==0)) {
  363. i++;
  364. sendbufsize = atoi(argv[i++]);
  365. }
  366. else if ((argc>i+1)&&(stricmp(argv[i],"-rbsize")==0)) {
  367. i++;
  368. recvbufsize = atoi(argv[i++]);
  369. }
  370. else if (stricmp(argv[i],"-h")==0) {
  371. usage();
  372. exit(0);
  373. }
  374. else if (stricmp(argv[i],"-LOCAL")==0) {
  375. i++;
  376. locallisten = true;
  377. }
  378. else if (stricmp(argv[i],"-NOSSL")==0) { // overrides config setting
  379. i++;
  380. if (connectMethod == SSLOnly || connectMethod == SSLFirst || connectMethod == UnsecureFirst)
  381. {
  382. PROGLOG("DaFileSrv SSL specified in config but overridden by -NOSSL in command line");
  383. connectMethod = SSLNone;
  384. }
  385. }
  386. else
  387. break;
  388. }
  389. #ifdef _WIN32
  390. if ((argc>i)&&(stricmp(argv[i],"-install")==0)) {
  391. if (installService(DAFS_SERVICE_NAME,DAFS_SERVICE_DISPLAY_NAME,NULL)) {
  392. PROGLOG(DAFS_SERVICE_DISPLAY_NAME " Installed");
  393. return 0;
  394. }
  395. return 1;
  396. }
  397. if ((argc>i)&&(stricmp(argv[i],"-remove")==0)) {
  398. if (uninstallService(DAFS_SERVICE_NAME,DAFS_SERVICE_DISPLAY_NAME)) {
  399. PROGLOG(DAFS_SERVICE_DISPLAY_NAME " Uninstalled");
  400. return 0;
  401. }
  402. return 1;
  403. }
  404. #endif
  405. if (argc > i) {
  406. if (strchr(argv[i],'.')||!isdigit(argv[i][0]))
  407. listenep.set(argv[i], listenep.port);
  408. else
  409. listenep.port = atoi(argv[i]);
  410. sendbufsize = (argc>i+1)?(atoi(argv[i+1])*1024):0;
  411. recvbufsize = (argc>i+2)?(atoi(argv[i+2])*1024):0;
  412. }
  413. Owned<IPropertyTree> env = getHPCCEnvironment();
  414. IPropertyTree *keyPairInfo = nullptr;
  415. if (env)
  416. {
  417. StringBuffer dafilesrvPath("Software/DafilesrvProcess");
  418. if (componentName.length())
  419. dafilesrvPath.appendf("[@name=\"%s\"]", componentName.str());
  420. else
  421. dafilesrvPath.append("[1]"); // in absence of name, use 1st
  422. IPropertyTree *daFileSrv = env->queryPropTree(dafilesrvPath);
  423. Owned<IPropertyTree> _dafileSrv;
  424. if (daFileSrv)
  425. {
  426. const char *componentGroupName = daFileSrv->queryProp("@group");
  427. if (!isEmptyString(componentGroupName))
  428. {
  429. VStringBuffer dafilesrvGroupPath("Software/DafilesrvGroup[@name=\"%s\"]", componentGroupName);
  430. IPropertyTree *daFileSrvGroup = env->queryPropTree(dafilesrvGroupPath);
  431. if (daFileSrvGroup)
  432. {
  433. // create a copy of the group settings and merge in (overwrite) with the component settings, i.e. any group settings become defaults
  434. _dafileSrv.setown(createPTreeFromIPT(daFileSrvGroup));
  435. synchronizePTree(_dafileSrv, daFileSrv, false, false);
  436. daFileSrv = _dafileSrv;
  437. }
  438. }
  439. // Component level DaFileSrv settings:
  440. maxThreads = daFileSrv->getPropInt("@maxThreads", DEFAULT_THREADLIMIT);
  441. maxThreadsDelayMs = daFileSrv->getPropInt("@maxThreadsDelayMs", DEFAULT_THREADLIMITDELAYMS);
  442. maxAsyncCopy = daFileSrv->getPropInt("@maxAsyncCopy", DEFAULT_ASYNCCOPYMAX);
  443. parallelRequestLimit = daFileSrv->getPropInt("@parallelRequestLimit", DEFAULT_STDCMD_PARALLELREQUESTLIMIT);
  444. throttleDelayMs = daFileSrv->getPropInt("@throttleDelayMs", DEFAULT_STDCMD_THROTTLEDELAYMS);
  445. throttleCPULimit = daFileSrv->getPropInt("@throttleCPULimit", DEFAULT_STDCMD_THROTTLECPULIMIT);
  446. throttleQueueLimit = daFileSrv->getPropInt("@throttleQueueLimit", DEFAULT_STDCMD_THROTTLEQUEUELIMIT);
  447. parallelSlowRequestLimit = daFileSrv->getPropInt("@parallelSlowRequestLimit", DEFAULT_SLOWCMD_PARALLELREQUESTLIMIT);
  448. throttleSlowDelayMs = daFileSrv->getPropInt("@throttleSlowDelayMs", DEFAULT_SLOWCMD_THROTTLEDELAYMS);
  449. throttleSlowCPULimit = daFileSrv->getPropInt("@throttleSlowCPULimit", DEFAULT_SLOWCMD_THROTTLECPULIMIT);
  450. throttleSlowQueueLimit = daFileSrv->getPropInt("@throttleSlowQueueLimit", DEFAULT_SLOWCMD_THROTTLEQUEUELIMIT);
  451. dedicatedRowServicePort = daFileSrv->getPropInt("@rowServicePort", DEFAULT_ROWSERVICE_PORT);
  452. dedicatedRowServiceSSL = daFileSrv->getPropBool("@rowServiceSSL", defaultDedicatedRowServiceSSL);
  453. rowServiceOnStdPort = daFileSrv->getPropBool("@rowServiceOnStdPort", defaultRowServiceOnStdPort);
  454. if (daFileSrv->queryProp("@rowServiceConfiguration"))
  455. rowServiceConfiguration = daFileSrv->queryProp("@rowServiceConfiguration");
  456. // any overrides by Instance definitions?
  457. IPropertyTree *dafileSrvInstance = nullptr;
  458. Owned<IPropertyTreeIterator> iter = daFileSrv->getElements("Instance");
  459. ForEach(*iter)
  460. {
  461. IpAddress instanceIP(iter->query().queryProp("@netAddress"));
  462. if (instanceIP.ipequals(queryHostIP()))
  463. dafileSrvInstance = &iter->query();
  464. }
  465. if (dafileSrvInstance)
  466. {
  467. Owned<IPropertyTree> _dafileSrvInstance;
  468. // check if there's a DaFileSrvGroup
  469. const char *instanceGroupName = dafileSrvInstance->queryProp("@group");
  470. if (!isEmptyString(instanceGroupName) && (isEmptyString(componentGroupName) || !strsame(instanceGroupName, componentGroupName))) // i.e. only if different
  471. {
  472. VStringBuffer dafilesrvGroupPath("Software/DafilesrvGroup[@name=\"%s\"]", instanceGroupName);
  473. IPropertyTree *daFileSrvGroup = env->queryPropTree(dafilesrvGroupPath);
  474. if (daFileSrvGroup)
  475. {
  476. // create a copy of the group settings and merge in (overwrite) with the instance settings, i.e. any group settings become defaults
  477. _dafileSrvInstance.setown(createPTreeFromIPT(daFileSrvGroup));
  478. synchronizePTree(_dafileSrvInstance, dafileSrvInstance, false, false);
  479. dafileSrvInstance = _dafileSrvInstance;
  480. }
  481. }
  482. maxThreads = dafileSrvInstance->getPropInt("@maxThreads", maxThreads);
  483. maxThreadsDelayMs = dafileSrvInstance->getPropInt("@maxThreadsDelayMs", maxThreadsDelayMs);
  484. maxAsyncCopy = dafileSrvInstance->getPropInt("@maxAsyncCopy", maxAsyncCopy);
  485. parallelRequestLimit = dafileSrvInstance->getPropInt("@parallelRequestLimit", parallelRequestLimit);
  486. throttleDelayMs = dafileSrvInstance->getPropInt("@throttleDelayMs", throttleDelayMs);
  487. throttleCPULimit = dafileSrvInstance->getPropInt("@throttleCPULimit", throttleCPULimit);
  488. throttleQueueLimit = dafileSrvInstance->getPropInt("@throttleQueueLimit", throttleQueueLimit);
  489. parallelSlowRequestLimit = dafileSrvInstance->getPropInt("@parallelSlowRequestLimit", parallelSlowRequestLimit);
  490. throttleSlowDelayMs = dafileSrvInstance->getPropInt("@throttleSlowDelayMs", throttleSlowDelayMs);
  491. throttleSlowCPULimit = dafileSrvInstance->getPropInt("@throttleSlowCPULimit", throttleSlowCPULimit);
  492. throttleSlowQueueLimit = dafileSrvInstance->getPropInt("@throttleSlowQueueLimit", throttleSlowQueueLimit);
  493. dedicatedRowServicePort = dafileSrvInstance->getPropInt("@rowServicePort", dedicatedRowServicePort);
  494. dedicatedRowServiceSSL = dafileSrvInstance->getPropBool("@rowServiceSSL", dedicatedRowServiceSSL);
  495. rowServiceOnStdPort = dafileSrvInstance->getPropBool("@rowServiceOnStdPort", rowServiceOnStdPort);
  496. }
  497. }
  498. keyPairInfo = env->queryPropTree("EnvSettings/Keys");
  499. }
  500. #ifndef _USE_OPENSSL
  501. if (dedicatedRowServicePort)
  502. {
  503. dedicatedRowServiceSSL = false;
  504. }
  505. #endif
  506. if (0 == logDir.length())
  507. {
  508. getConfigurationDirectory(NULL,"log","dafilesrv",componentName.str(),logDir);
  509. if (0 == logDir.length())
  510. logDir.append(".");
  511. }
  512. if (componentName.length())
  513. {
  514. addPathSepChar(logDir);
  515. logDir.append(componentName.str());
  516. }
  517. if ( (connectMethod == SSLNone) && (listenep.port == 0) )
  518. {
  519. printf("\nError, port must not be 0\n");
  520. usage();
  521. exit(-1);
  522. }
  523. else if ( (connectMethod == SSLOnly) && (sslport == 0) )
  524. {
  525. printf("\nError, secure port must not be 0\n");
  526. usage();
  527. exit(-1);
  528. }
  529. else if ( ((connectMethod == SSLFirst) || (connectMethod == UnsecureFirst)) && ((listenep.port == 0) || (sslport == 0)) )
  530. {
  531. printf("\nError, both port and secure port must not be 0\n");
  532. usage();
  533. exit(-1);
  534. }
  535. StringBuffer secMethod;
  536. if (connectMethod == SSLNone)
  537. secMethod.append("SSLNone");
  538. else if (connectMethod == SSLOnly)
  539. secMethod.append("SSLOnly");
  540. else if (connectMethod == SSLFirst)
  541. secMethod.append("SSLFirst");
  542. else if (connectMethod == UnsecureFirst)
  543. secMethod.append("UnsecureFirst");
  544. if (isdaemon) {
  545. #ifdef _WIN32
  546. class cserv: public CService
  547. {
  548. bool stopped;
  549. bool started;
  550. DAFSConnectCfg connectMethod;
  551. SocketEndpoint listenep;
  552. unsigned maxThreads;
  553. unsigned maxThreadsDelayMs;
  554. unsigned maxAsyncCopy;
  555. unsigned parallelRequestLimit;
  556. unsigned throttleDelayMs;
  557. unsigned throttleCPULimit;
  558. unsigned parallelSlowRequestLimit;
  559. unsigned throttleSlowDelayMs;
  560. unsigned throttleSlowCPULimit;
  561. unsigned sslport;
  562. StringBuffer secMethod;
  563. Linked<IPropertyTree> keyPairInfo;
  564. StringAttr rowServiceConfiguration;
  565. unsigned dedicatedRowServicePort;
  566. bool dedicatedRowServiceSSL;
  567. bool rowServiceOnStdPort;
  568. class cpollthread: public Thread
  569. {
  570. cserv *parent;
  571. public:
  572. cpollthread( cserv *_parent )
  573. : Thread("CService::cpollthread"), parent(_parent)
  574. {
  575. }
  576. int run()
  577. {
  578. while (parent->poll())
  579. Sleep(1000);
  580. return 1;
  581. }
  582. } pollthread;
  583. Owned<IRemoteFileServer> server;
  584. public:
  585. cserv(DAFSConnectCfg _connectMethod, SocketEndpoint _listenep,
  586. unsigned _maxThreads, unsigned _maxThreadsDelayMs, unsigned _maxAsyncCopy,
  587. unsigned _parallelRequestLimit, unsigned _throttleDelayMs, unsigned _throttleCPULimit,
  588. unsigned _parallelSlowRequestLimit, unsigned _throttleSlowDelayMs, unsigned _throttleSlowCPULimit,
  589. unsigned _sslport, const char * _secMethod,
  590. IPropertyTree *_keyPairInfo,
  591. const char *_rowServiceConfiguration,
  592. unsigned _dedicatedRowServicePort, bool _dedicatedRowServiceSSL, bool _rowServiceOnStdPort)
  593. : connectMethod(_connectMethod), listenep(_listenep), pollthread(this),
  594. maxThreads(_maxThreads), maxThreadsDelayMs(_maxThreadsDelayMs), maxAsyncCopy(_maxAsyncCopy),
  595. parallelRequestLimit(_parallelRequestLimit), throttleDelayMs(_throttleDelayMs), throttleCPULimit(_throttleCPULimit),
  596. parallelSlowRequestLimit(_parallelSlowRequestLimit), throttleSlowDelayMs(_throttleSlowDelayMs), throttleSlowCPULimit(_throttleSlowCPULimit),
  597. sslport(_sslport), secMethod(_secMethod),
  598. keyPairInfo(_keyPairInfo),
  599. rowServiceConfiguration(_rowServiceConfiguration), dedicatedRowServicePort(_dedicatedRowServicePort), dedicatedRowServiceSSL(_dedicatedRowServiceSSL), rowServiceOnStdPort(_rowServiceOnStdPort)
  600. {
  601. stopped = false;
  602. started = false;
  603. }
  604. virtual ~cserv()
  605. {
  606. stopped = true;
  607. if (started)
  608. pollthread.join();
  609. }
  610. bool init()
  611. {
  612. PROGLOG(DAFS_SERVICE_DISPLAY_NAME " Initialized");
  613. started = true;
  614. pollthread.start();
  615. return true;
  616. }
  617. bool poll()
  618. {
  619. if (stopped||!running()) {
  620. PROGLOG(DAFS_SERVICE_DISPLAY_NAME " Stopping");
  621. if (server) {
  622. server->stop();
  623. server.clear();
  624. }
  625. return false;
  626. }
  627. return true;
  628. }
  629. void run()
  630. {
  631. // Get params from HKEY_LOCAL_MACHINE\SYSTEM\CurrentControlSet\Services\DaFileSrv\Parameters
  632. StringBuffer eps;
  633. if (listenep.isNull())
  634. eps.append(listenep.port);
  635. else
  636. listenep.getUrlStr(eps);
  637. if (connectMethod != SSLOnly)
  638. PROGLOG("Opening " DAFS_SERVICE_DISPLAY_NAME " on %s", eps.str());
  639. if (connectMethod == SSLOnly || connectMethod == SSLFirst || connectMethod == UnsecureFirst)
  640. {
  641. SocketEndpoint sslep(listenep);
  642. sslep.port = sslport;
  643. eps.kill();
  644. if (sslep.isNull())
  645. eps.append(sslep.port);
  646. else
  647. sslep.getUrlStr(eps);
  648. PROGLOG("Opening " DAFS_SERVICE_DISPLAY_NAME " on SECURE %s", eps.str());
  649. }
  650. PROGLOG("Dali File Server socket security model: %s", secMethod.str());
  651. const char * verstring = remoteServerVersionString();
  652. PROGLOG("Version: %s", verstring);
  653. if (dedicatedRowServicePort)
  654. PROGLOG("Row service(%s) port = %u", rowServiceConfiguration, dedicatedRowServicePort);
  655. PROGLOG(DAFS_SERVICE_DISPLAY_NAME " Running");
  656. server.setown(createRemoteFileServer(maxThreads, maxThreadsDelayMs, maxAsyncCopy, keyPairInfo));
  657. server->setThrottle(ThrottleStd, parallelRequestLimit, throttleDelayMs, throttleCPULimit);
  658. server->setThrottle(ThrottleSlow, parallelSlowRequestLimit, throttleSlowDelayMs, throttleSlowCPULimit);
  659. try
  660. {
  661. if (dedicatedRowServicePort)
  662. {
  663. SocketEndpoint rowServiceEp(listenep); // copy listenep, incase bound by -addr
  664. rowServiceEp.port = dedicatedRowServicePort;
  665. server->run(connectMethod, listenep, sslport, &rowServiceEp, dedicatedRowServiceSSL, rowServiceOnStdPort);
  666. }
  667. else
  668. server->run(connectMethod, listenep, sslport);
  669. }
  670. catch (IException *e) {
  671. EXCLOG(e,DAFS_SERVICE_NAME);
  672. e->Release();
  673. }
  674. PROGLOG(DAFS_SERVICE_DISPLAY_NAME " Stopped");
  675. stopped = true;
  676. }
  677. } service(connectMethod, listenep,
  678. maxThreads, maxThreadsDelayMs, maxAsyncCopy,
  679. parallelRequestLimit, throttleDelayMs, throttleCPULimit,
  680. parallelSlowRequestLimit, throttleSlowDelayMs, throttleSlowCPULimit, sslport, secMethod,
  681. keyPairInfo, rowServiceConfiguration, dedicatedRowServicePort, dedicatedRowServiceSSL, rowServiceOnStdPort);
  682. service.start();
  683. return 0;
  684. #else
  685. int ret = initDaemon();
  686. if (ret)
  687. return ret;
  688. #endif
  689. }
  690. #ifndef _CONTAINERIZED
  691. {
  692. Owned<IComponentLogFileCreator> lf = createComponentLogFileCreator(logDir.str(), "DAFILESRV");
  693. lf->setCreateAliasFile(false);
  694. lf->setMaxDetail(TopDetail);
  695. lf->beginLogging();
  696. }
  697. #else
  698. setupContainerizedLogMsgHandler();
  699. #endif
  700. write_pidfile(componentName.str());
  701. PROGLOG("Dafilesrv starting - Build %s", hpccBuildTag);
  702. PROGLOG("Parallel request limit = %d, throttleDelayMs = %d, throttleCPULimit = %d", parallelRequestLimit, throttleDelayMs, throttleCPULimit);
  703. const char * verstring = remoteServerVersionString();
  704. StringBuffer eps;
  705. if (listenep.isNull())
  706. eps.append(listenep.port);
  707. else
  708. listenep.getUrlStr(eps);
  709. if (connectMethod != SSLOnly)
  710. PROGLOG("Opening Dali File Server on %s", eps.str());
  711. if (connectMethod == SSLOnly || connectMethod == SSLFirst || connectMethod == UnsecureFirst)
  712. {
  713. SocketEndpoint sslep(listenep);
  714. sslep.port = sslport;
  715. eps.kill();
  716. if (sslep.isNull())
  717. eps.append(sslep.port);
  718. else
  719. sslep.getUrlStr(eps);
  720. PROGLOG("Opening Dali File Server on SECURE %s", eps.str());
  721. }
  722. PROGLOG("Dali File Server socket security model: %s", secMethod.str());
  723. PROGLOG("Version: %s", verstring);
  724. if (dedicatedRowServicePort)
  725. PROGLOG("Row service port = %u%s", dedicatedRowServicePort, dedicatedRowServiceSSL ? " SECURE" : "");
  726. server.setown(createRemoteFileServer(maxThreads, maxThreadsDelayMs, maxAsyncCopy, keyPairInfo));
  727. server->setThrottle(ThrottleStd, parallelRequestLimit, throttleDelayMs, throttleCPULimit);
  728. server->setThrottle(ThrottleSlow, parallelSlowRequestLimit, throttleSlowDelayMs, throttleSlowCPULimit);
  729. #ifndef _CONTAINERIZED
  730. class CPerfHook : public CSimpleInterfaceOf<IPerfMonHook>
  731. {
  732. public:
  733. virtual void processPerfStats(unsigned processorUsage, unsigned memoryUsage, unsigned memoryTotal, unsigned __int64 fistDiskUsage, unsigned __int64 firstDiskTotal, unsigned __int64 secondDiskUsage, unsigned __int64 secondDiskTotal, unsigned threadCount)
  734. {
  735. }
  736. virtual StringBuffer &extraLogging(StringBuffer &extra)
  737. {
  738. return server->getStats(extra.newline(), true);
  739. }
  740. virtual void log(int level, const char *msg)
  741. {
  742. PROGLOG("%s", msg);
  743. }
  744. } perfHook;
  745. startPerformanceMonitor(10*60*1000, PerfMonStandard, &perfHook);
  746. #endif
  747. writeSentinelFile(sentinelFile);
  748. try
  749. {
  750. if (dedicatedRowServicePort)
  751. {
  752. SocketEndpoint rowServiceEp(listenep); // copy listenep, incase bound by -addr
  753. rowServiceEp.port = dedicatedRowServicePort;
  754. server->run(connectMethod, listenep, sslport, &rowServiceEp, dedicatedRowServiceSSL, rowServiceOnStdPort);
  755. }
  756. else
  757. server->run(connectMethod, listenep, sslport);
  758. }
  759. catch (IException *e)
  760. {
  761. EXCLOG(e,"DAFILESRV");
  762. if (e->errorCode() == DAFSERR_serverinit_failed)
  763. removeSentinelFile(sentinelFile); // so init does not keep trying to start it ...
  764. e->Release();
  765. }
  766. #ifndef _CONTAINERIZED
  767. stopPerformanceMonitor();
  768. #endif
  769. if (server)
  770. server->stop();
  771. server.clear();
  772. PROGLOG("Stopped Dali File Server");
  773. return 0;
  774. }