ccdmain.cpp 46 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123
  1. /*##############################################################################
  2. HPCC SYSTEMS software Copyright (C) 2012 HPCC Systems.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. ############################################################################## */
  13. #include <platform.h>
  14. #include <signal.h>
  15. #include <jlib.hpp>
  16. #include <jio.hpp>
  17. #include <jmisc.hpp>
  18. #include <jqueue.tpp>
  19. #include <jsocket.hpp>
  20. #include <jlog.hpp>
  21. #include <jprop.hpp>
  22. #include <jfile.hpp>
  23. #include "jutil.hpp"
  24. #include <build-config.h>
  25. #include "rmtfile.hpp"
  26. #include "ccd.hpp"
  27. #include "ccdquery.hpp"
  28. #include "ccdstate.hpp"
  29. #include "ccdqueue.ipp"
  30. #include "ccdserver.hpp"
  31. #include "ccdlistener.hpp"
  32. #include "ccdsnmp.hpp"
  33. #include "thorplugin.hpp"
  34. #ifdef _USE_CPPUNIT
  35. #include <cppunit/extensions/TestFactoryRegistry.h>
  36. #include <cppunit/ui/text/TestRunner.h>
  37. #endif
  38. //=================================================================================
  39. bool shuttingDown = false;
  40. unsigned numChannels;
  41. unsigned numActiveChannels;
  42. unsigned callbackRetries = 3;
  43. unsigned callbackTimeout = 500;
  44. unsigned lowTimeout = 10000;
  45. unsigned highTimeout = 2000;
  46. unsigned slaTimeout = 2000;
  47. unsigned numServerThreads = 30;
  48. unsigned numSlaveThreads = 30;
  49. unsigned numRequestArrayThreads = 5;
  50. unsigned headRegionSize;
  51. bool enableHeartBeat = true;
  52. unsigned keyedJoinFlowLimit = 1000;
  53. unsigned parallelLoopFlowLimit = 100;
  54. unsigned perChannelFlowLimit = 10;
  55. time_t startupTime;
  56. unsigned statsExpiryTime = 3600;
  57. unsigned miscDebugTraceLevel = 0; // separate trace settings purely for debugging specific items (i.e. all possible locations to look for files at startup)
  58. unsigned readTimeout = 300;
  59. unsigned indexReadChunkSize = 60000;
  60. unsigned smartSteppingChunkRows = 100;
  61. unsigned maxBlockSize = 10000000;
  62. unsigned maxLockAttempts = 5;
  63. bool checkVersion = true;
  64. bool deleteUnneededFiles = true;
  65. bool checkPrimaries = true;
  66. bool pretendAllOpt = false;
  67. bool traceStartStop = false;
  68. bool traceServerSideCache = false;
  69. bool timeActivities = true;
  70. unsigned watchActivityId = 0;
  71. unsigned testSlaveFailure = 0;
  72. unsigned restarts = 0;
  73. bool heapSort = false;
  74. bool insertionSort = false;
  75. bool fieldTranslationEnabled = false;
  76. bool useTreeCopy = true;
  77. bool mergeSlaveStatistics = true;
  78. XmlReaderOptions defaultXmlReadFlags = xr_ignoreWhiteSpace;
  79. bool runOnce = false;
  80. unsigned udpMulticastBufferSize = 262142;
  81. bool roxieMulticastEnabled = true;
  82. IPropertyTree* topology;
  83. CriticalSection ccdChannelsCrit;
  84. IPropertyTree* ccdChannels;
  85. StringArray allQuerySetNames;
  86. bool crcResources;
  87. bool useRemoteResources;
  88. bool checkFileDate;
  89. bool lazyOpen;
  90. bool localSlave;
  91. bool doIbytiDelay = true;
  92. unsigned initIbytiDelay; // In MillSec
  93. unsigned minIbytiDelay; // In MillSec
  94. bool copyResources;
  95. bool enableKeyDiff = true;
  96. bool enableForceKeyDiffCopy = true;
  97. bool chunkingHeap = true;
  98. bool logFullQueries;
  99. bool blindLogging = false;
  100. bool debugPermitted = true;
  101. bool checkCompleted = true;
  102. unsigned preabortKeyedJoinsThreshold = 100;
  103. unsigned preabortIndexReadsThreshold = 100;
  104. bool preloadOnceData;
  105. unsigned memoryStatsInterval = 0;
  106. memsize_t defaultMemoryLimit;
  107. unsigned defaultTimeLimit[3] = {0, 0, 0};
  108. unsigned defaultWarnTimeLimit[3] = {0, 5000, 5000};
  109. int defaultCheckingHeap = 0;
  110. unsigned defaultParallelJoinPreload = 0;
  111. unsigned defaultPrefetchProjectPreload = 10;
  112. unsigned defaultConcatPreload = 0;
  113. unsigned defaultFetchPreload = 0;
  114. unsigned defaultFullKeyedJoinPreload = 0;
  115. unsigned defaultKeyedJoinPreload = 0;
  116. unsigned dafilesrvLookupTimeout = 10000;
  117. unsigned logQueueLen;
  118. unsigned logQueueDrop;
  119. bool useLogQueue;
  120. bool fastLaneQueue;
  121. unsigned mtu_size = 1400; // upper limit on outbound buffer size - allow some header room too
  122. StringBuffer fileNameServiceDali;
  123. StringBuffer roxieName;
  124. bool trapTooManyActiveQueries;
  125. bool allowRoxieOnDemand;
  126. unsigned maxEmptyLoopIterations;
  127. unsigned maxGraphLoopIterations;
  128. bool probeAllRows;
  129. bool steppingEnabled = true;
  130. bool simpleLocalKeyedJoins = true;
  131. unsigned __int64 minFreeDiskSpace = 1024 * 0x100000; // default to 1 GB
  132. unsigned socketCheckInterval = 5000;
  133. StringBuffer logDirectory;
  134. StringBuffer pluginDirectory;
  135. StringBuffer queryDirectory;
  136. StringBuffer codeDirectory;
  137. StringBuffer baseDataDirectory;
  138. StringBuffer tempDirectory;
  139. ClientCertificate clientCert;
  140. bool useHardLink;
  141. unsigned maxFileAge[2] = {0xffffffff, 60*60*1000}; // local files don't expire, remote expire in 1 hour, by default
  142. unsigned minFilesOpen[2] = {2000, 500};
  143. unsigned maxFilesOpen[2] = {4000, 1000};
  144. SocketEndpoint ownEP;
  145. SocketEndpointArray allRoxieServers;
  146. HardwareInfo hdwInfo;
  147. unsigned parallelAggregate;
  148. bool inMemoryKeysEnabled = true;
  149. unsigned serverSideCacheSize = 0;
  150. bool nodeCachePreload = false;
  151. unsigned nodeCacheMB = 100;
  152. unsigned leafCacheMB = 50;
  153. unsigned blobCacheMB = 0;
  154. MODULE_INIT(INIT_PRIORITY_STANDARD)
  155. {
  156. topology = NULL;
  157. ccdChannels = NULL;
  158. return true;
  159. }
  160. MODULE_EXIT()
  161. {
  162. ::Release(topology);
  163. ::Release(ccdChannels);
  164. }
  165. //=========================================================================================
  166. //////////////////////////////////////////////////////////////////////////////////////////////
  167. extern "C" void caughtSIGPIPE(int sig)
  168. {
  169. DBGLOG("Caught sigpipe %d", sig);
  170. }
  171. extern "C" void caughtSIGHUP(int sig)
  172. {
  173. DBGLOG("Caught sighup %d", sig);
  174. }
  175. extern "C" void caughtSIGALRM(int sig)
  176. {
  177. DBGLOG("Caught sigalrm %d", sig);
  178. }
  179. extern "C" void caughtSIGTERM(int sig)
  180. {
  181. DBGLOG("Caught sigterm %d", sig);
  182. }
  183. void init_signals()
  184. {
  185. // signal(SIGTERM, caughtSIGTERM);
  186. #ifndef _WIN32
  187. signal(SIGPIPE, caughtSIGPIPE);
  188. signal(SIGHUP, caughtSIGHUP);
  189. signal(SIGALRM, caughtSIGALRM);
  190. #endif
  191. }
  192. //=========================================================================================
  193. class Waiter : public CInterface, implements IAbortHandler
  194. {
  195. Semaphore aborted;
  196. public:
  197. IMPLEMENT_IINTERFACE;
  198. bool wait(unsigned timeout)
  199. {
  200. return aborted.wait(timeout);
  201. }
  202. void wait()
  203. {
  204. aborted.wait();
  205. }
  206. bool onAbort()
  207. {
  208. aborted.signal();
  209. roxieMetrics.clear();
  210. #ifdef _DEBUG
  211. return false; // we want full leak checking info
  212. #else
  213. return true; // we don't care - just exit as fast as we can
  214. #endif
  215. }
  216. } waiter;
  217. void closedown()
  218. {
  219. Owned<IFile> sentinelFile = createSentinelTarget();
  220. removeSentinelFile(sentinelFile);
  221. waiter.onAbort();
  222. }
  223. void getAccessList(const char *aclName, IPropertyTree *topology, IPropertyTree *serverInfo)
  224. {
  225. StringBuffer xpath;
  226. xpath.append("ACL[@name='").append(aclName).append("']");
  227. if (serverInfo->queryPropTree(xpath))
  228. throw MakeStringException(MSGAUD_operator, ROXIE_INVALID_TOPOLOGY, "Invalid topology file - recursive ACL definition of %s", aclName);
  229. Owned<IPropertyTree> X = createPTree("ACL");
  230. X->setProp("@name", aclName);
  231. serverInfo->addPropTree("ACL", X.getClear());
  232. Owned<IPropertyTree> acl = topology->getPropTree(xpath.str());
  233. if (!acl)
  234. throw MakeStringException(MSGAUD_operator, ROXIE_INVALID_TOPOLOGY, "Invalid topology file - acl %s not found", aclName);
  235. Owned<IPropertyTreeIterator> access = acl->getElements("Access");
  236. ForEach(*access)
  237. {
  238. IPropertyTree &child = access->query();
  239. const char *base = child.queryProp("@base");
  240. if (base)
  241. getAccessList(base, topology, serverInfo);
  242. else
  243. serverInfo->addPropTree(child.queryName(), LINK(&child));
  244. }
  245. serverInfo->removeProp(xpath);
  246. }
  247. void addServerChannel(const char *dataDirectory, unsigned port, unsigned threads, const char *access, IPropertyTree *topology)
  248. {
  249. if (!ownEP.port)
  250. ownEP.set(port, queryHostIP());
  251. Owned<IPropertyTreeIterator> servers = ccdChannels->getElements("RoxieServerProcess");
  252. ForEach(*servers)
  253. {
  254. IPropertyTree &f = servers->query();
  255. if (strcmp(f.queryProp("@dataDirectory"), dataDirectory) != 0)
  256. throw MakeStringException(MSGAUD_operator, ROXIE_INVALID_TOPOLOGY, "Invalid topology file - Roxie server dataDirectory respecified");
  257. if (f.getPropInt("@port", 0) == port)
  258. throw MakeStringException(MSGAUD_operator, ROXIE_INVALID_TOPOLOGY, "Invalid topology file - Roxie server port repeated");
  259. }
  260. IPropertyTree *ci = createPTree("RoxieServerProcess");
  261. ci->setProp("@dataDirectory", dataDirectory);
  262. ci->setPropInt("@port", port);
  263. ci->setPropInt("@numThreads", threads);
  264. if (access && *access)
  265. {
  266. getAccessList(access, topology, ci);
  267. }
  268. ccdChannels->addPropTree("RoxieServerProcess", ci);
  269. }
  270. bool ipMatch(IpAddress &ip)
  271. {
  272. return ip.isLocal();
  273. }
  274. void addSlaveChannel(unsigned channel, const char *dataDirectory, bool suspended)
  275. {
  276. StringBuffer xpath;
  277. xpath.appendf("RoxieSlaveProcess[@channel=\"%d\"]", channel);
  278. if (ccdChannels->hasProp(xpath.str()))
  279. throw MakeStringException(MSGAUD_operator, ROXIE_INVALID_TOPOLOGY, "Invalid topology file - channel %d repeated", channel);
  280. IPropertyTree *ci = createPTree("RoxieSlaveProcess");
  281. ci->setPropInt("@channel", channel);
  282. ci->setPropBool("@suspended", suspended);
  283. ci->setPropInt("@subChannel", numSlaves[channel]);
  284. suspendedChannels[channel] = suspended;
  285. ci->setProp("@dataDirectory", dataDirectory);
  286. ccdChannels->addPropTree("RoxieSlaveProcess", ci);
  287. }
  288. void addChannel(unsigned channel, const char *dataDirectory, bool isMe, bool suspended, IpAddress& slaveIp)
  289. {
  290. numSlaves[channel]++;
  291. if (isMe && channel > 0 && channel <= numChannels)
  292. {
  293. addSlaveChannel(channel, dataDirectory, suspended);
  294. }
  295. if (!localSlave)
  296. {
  297. addEndpoint(channel, slaveIp, CCD_MULTICAST_PORT);
  298. }
  299. }
  300. extern void doUNIMPLEMENTED(unsigned line, const char *file)
  301. {
  302. throw MakeStringException(ROXIE_UNIMPLEMENTED_ERROR, "UNIMPLEMENTED at %s:%d", file, line);
  303. }
  304. void FatalError(const char *format, ...)
  305. {
  306. va_list args;
  307. va_start(args, format);
  308. StringBuffer errMsg;
  309. errMsg.valist_appendf(format, args);
  310. va_end(args);
  311. Owned<IException> E = MakeStringException(MSGAUD_operator, ROXIE_INTERNAL_ERROR, "%s", errMsg.str());
  312. EXCLOG(E, "Fatal error");
  313. Sleep(5000);
  314. _exit(1);
  315. }
  316. // If changing these, please change roxie.cpp's roxie_server_usage() as well
  317. static void roxie_common_usage(const char * progName)
  318. {
  319. StringBuffer program;
  320. program.append(progName);
  321. getFileNameOnly(program, false);
  322. // Things that are also relevant to stand-alone executables
  323. printf("Usage: %s [options]\n", program.str());
  324. printf("\nOptions:\n");
  325. printf("\t--daliServers=[host1,...]\t: List of Dali servers to use\n");
  326. printf("\t--tracelevel=[integer]\t: Amount of information to dump on logs\n");
  327. printf("\t--stdlog=[boolean]\t: Standard log format (based on tracelevel)\n");
  328. printf("\t--logfile=[format]\t: Outputs to logfile, rather than stdout\n");
  329. printf("\t--help|-h\t: This message\n");
  330. printf("\n");
  331. }
  332. class MAbortHandler : implements IExceptionHandler
  333. {
  334. unsigned dummy; // to avoid complaints about an empty class...
  335. public:
  336. MAbortHandler() : dummy(0) {};
  337. virtual bool fireException(IException *e)
  338. {
  339. ForEachItemIn(idx, socketListeners)
  340. {
  341. socketListeners.item(idx).stopListening();
  342. }
  343. return false;
  344. }
  345. } abortHandler;
  346. #ifdef _WIN32
  347. int myhook(int alloctype, void *, size_t nSize, int p1, long allocSeq, const unsigned char *file, int line)
  348. {
  349. // Handy place to put breakpoints when tracking down obscure memory leaks...
  350. if (nSize==68 && !file)
  351. {
  352. DBGLOG("memory hook matched");
  353. }
  354. return true;
  355. }
  356. #endif
  357. static class RoxieRowCallbackHook : implements IRtlRowCallback
  358. {
  359. public:
  360. virtual void releaseRow(const void * row) const
  361. {
  362. ReleaseRoxieRow(row);
  363. }
  364. virtual void releaseRowset(unsigned count, byte * * rowset) const
  365. {
  366. ReleaseRoxieRowset(count, rowset);
  367. }
  368. virtual void * linkRow(const void * row) const
  369. {
  370. LinkRoxieRow(row);
  371. return const_cast<void *>(row);
  372. }
  373. virtual byte * * linkRowset(byte * * rowset) const
  374. {
  375. LinkRoxieRow(rowset);
  376. return const_cast<byte * *>(rowset);
  377. }
  378. } callbackHook;
  379. int STARTQUERY_API start_query(int argc, const char *argv[])
  380. {
  381. EnableSEHtoExceptionMapping();
  382. setTerminateOnSEH();
  383. init_signals();
  384. // We need to do the above BEFORE we call InitModuleObjects
  385. InitModuleObjects();
  386. getDaliServixPort();
  387. init_signals();
  388. // stand alone usage only, not server
  389. for (unsigned i=0; i<argc; i++)
  390. {
  391. if (stricmp(argv[i], "--help")==0 ||
  392. stricmp(argv[i], "-h")==0)
  393. {
  394. roxie_common_usage(argv[0]);
  395. return EXIT_SUCCESS;
  396. }
  397. }
  398. #ifdef _USE_CPPUNIT
  399. if (argc>=2 && stricmp(argv[1], "-selftest")==0)
  400. {
  401. queryStderrLogMsgHandler()->setMessageFields(MSGFIELD_time | MSGFIELD_prefix);
  402. CppUnit::TextUi::TestRunner runner;
  403. if (argc==2)
  404. {
  405. CppUnit::TestFactoryRegistry &registry = CppUnit::TestFactoryRegistry::getRegistry();
  406. runner.addTest( registry.makeTest() );
  407. }
  408. else
  409. {
  410. // MORE - maybe add a 'list' function here?
  411. for (int name = 2; name < argc; name++)
  412. {
  413. if (stricmp(argv[name], "-q")==0)
  414. {
  415. traceLevel = 0;
  416. roxiemem::memTraceLevel = 0;
  417. removeLog();
  418. }
  419. else
  420. {
  421. CppUnit::TestFactoryRegistry &registry = CppUnit::TestFactoryRegistry::getRegistry(argv[name]);
  422. runner.addTest( registry.makeTest() );
  423. }
  424. }
  425. }
  426. bool wasSucessful = runner.run( "", false );
  427. releaseAtoms();
  428. return wasSucessful;
  429. }
  430. #endif
  431. #ifdef _DEBUG
  432. #ifdef _WIN32
  433. _CrtSetAllocHook(myhook);
  434. #endif
  435. #endif
  436. #ifndef __64BIT__
  437. Thread::setDefaultStackSize(0x10000); // NB under windows requires linker setting (/stack:)
  438. #endif
  439. srand( (unsigned)time( NULL ) );
  440. ccdChannels = createPTree("Channels");
  441. char currentDirectory[_MAX_DIR];
  442. if (!getcwd(currentDirectory, sizeof(currentDirectory)))
  443. throw MakeStringException(ROXIE_INTERNAL_ERROR, "getcwd failed (%d)", errno);
  444. codeDirectory.set(currentDirectory);
  445. addNonEmptyPathSepChar(codeDirectory);
  446. try
  447. {
  448. Owned<IProperties> globals = createProperties(true);
  449. for (int i = 1; i < argc; i++)
  450. globals->loadProp(argv[i], true);
  451. Owned<IFile> sentinelFile = createSentinelTarget();
  452. removeSentinelFile(sentinelFile);
  453. StringBuffer topologyFile;
  454. if (globals->hasProp("--topology"))
  455. globals->getProp("--topology", topologyFile);
  456. else
  457. topologyFile.append(codeDirectory).append(PATHSEPCHAR).append("RoxieTopology.xml");
  458. if (checkFileExists(topologyFile.str()))
  459. {
  460. DBGLOG("Loading topology file %s", topologyFile.str());
  461. topology = createPTreeFromXMLFile(topologyFile.str());
  462. }
  463. else
  464. {
  465. if (globals->hasProp("--topology"))
  466. {
  467. // Explicitly-named topology file SHOULD exist...
  468. throw MakeStringException(ROXIE_INVALID_TOPOLOGY, "topology file %s not found", topologyFile.str());
  469. }
  470. topology=createPTreeFromXMLString(
  471. "<RoxieTopology numChannels='1' localSlave='1'>"
  472. "<RoxieServerProcess dataDirectory='.' netAddress='.'/>"
  473. "<RoxieSlaveProcess dataDirectory='.' netAddress='.' channel='1'/>"
  474. "</RoxieTopology>"
  475. );
  476. int port = globals->getPropInt("--port", 9876);
  477. topology->setPropInt("RoxieServerProcess/@port", port);
  478. topology->setProp("@daliServers", globals->queryProp("--daliServers"));
  479. topology->setProp("@traceLevel", globals->queryProp("--traceLevel"));
  480. topology->setProp("@memTraceLevel", globals->queryProp("--memTraceLevel"));
  481. }
  482. topology->getProp("@name", roxieName);
  483. Owned<const IQueryDll> standAloneDll;
  484. if (globals->hasProp("--loadWorkunit"))
  485. {
  486. StringBuffer workunitName;
  487. globals->getProp("--loadWorkunit", workunitName);
  488. standAloneDll.setown(createQueryDll(workunitName));
  489. }
  490. else
  491. {
  492. Owned<ILoadedDllEntry> dll = createExeDllEntry(argv[0]);
  493. if (checkEmbeddedWorkUnitXML(dll))
  494. {
  495. standAloneDll.setown(createExeQueryDll(argv[0]));
  496. runOnce = globals->getPropInt("--port", 0) == 0;
  497. }
  498. }
  499. traceLevel = topology->getPropInt("@traceLevel", runOnce ? 0 : 1);
  500. if (traceLevel > MAXTRACELEVEL)
  501. traceLevel = MAXTRACELEVEL;
  502. udpTraceLevel = topology->getPropInt("@udpTraceLevel", runOnce ? 0 : 1);
  503. roxiemem::memTraceLevel = topology->getPropInt("@memTraceLevel", runOnce ? 0 : 1);
  504. soapTraceLevel = topology->getPropInt("@soapTraceLevel", runOnce ? 0 : 1);
  505. miscDebugTraceLevel = topology->getPropInt("@miscDebugTraceLevel", 0);
  506. IPropertyTree *directoryTree = topology->queryPropTree("Directories");
  507. if (directoryTree)
  508. getConfigurationDirectory(directoryTree,"query","roxie", roxieName, queryDirectory);
  509. //Logging stuff
  510. if (globals->hasProp("--logfile"))
  511. {
  512. Owned<IComponentLogFileCreator> lf = createComponentLogFileCreator(topology, "roxie");
  513. if (globals->getPropBool("--stdlog", traceLevel != 0) || topology->getPropBool("@forceStdLog", false))
  514. lf->setMsgFields(MSGFIELD_time | MSGFIELD_thread | MSGFIELD_prefix);
  515. else
  516. removeLog();
  517. lf->setMaxDetail(TopDetail);
  518. lf->beginLogging();
  519. logDirectory.set(lf->queryLogDir());
  520. #ifdef _DEBUG
  521. unsigned useLogQueue = topology->getPropBool("@useLogQueue", false);
  522. #else
  523. unsigned useLogQueue = topology->getPropBool("@useLogQueue", true);
  524. #endif
  525. if (useLogQueue)
  526. {
  527. unsigned logQueueLen = topology->getPropInt("@logQueueLen", 512);
  528. unsigned logQueueDrop = topology->getPropInt("@logQueueDrop", 32);
  529. queryLogMsgManager()->enterQueueingMode();
  530. queryLogMsgManager()->setQueueDroppingLimit(logQueueLen, logQueueDrop);
  531. }
  532. if (globals->getPropBool("--enableSysLog",true))
  533. UseSysLogForOperatorMessages();
  534. }
  535. roxieMetrics.setown(createRoxieMetricsManager());
  536. Owned<IPropertyTreeIterator> userMetrics = topology->getElements("./UserMetric");
  537. ForEach(*userMetrics)
  538. {
  539. IPropertyTree &metric = userMetrics->query();
  540. const char *name = metric.queryProp("@name");
  541. const char *regex= metric.queryProp("@regex");
  542. if (name && regex)
  543. roxieMetrics->addUserMetric(name, regex);
  544. else
  545. throw MakeStringException(ROXIE_INTERNAL_ERROR, "Invalid UserMetric element in topology file - name or regex missing");
  546. }
  547. restarts = globals->getPropInt("--restarts", 0);
  548. const char *preferredSubnet = topology->queryProp("@preferredSubnet");
  549. if (preferredSubnet)
  550. {
  551. const char *preferredSubnetMask = topology->queryProp("@preferredSubnetMask");
  552. if (!preferredSubnetMask) preferredSubnetMask = "255.255.255.0";
  553. if (!setPreferredSubnet(preferredSubnet, preferredSubnetMask))
  554. throw MakeStringException(ROXIE_INTERNAL_ERROR, "Error setting preferred subnet %s mask %s", preferredSubnet, preferredSubnetMask);
  555. }
  556. bool multiHostMode = globals->hasProp("--host");
  557. unsigned myHostNumber = globals->getPropInt("--host", 0);
  558. if (restarts)
  559. {
  560. if (traceLevel)
  561. DBGLOG("Roxie restarting: restarts = %d build = %s", restarts, BUILD_TAG);
  562. setStartRuid(restarts);
  563. }
  564. else
  565. {
  566. if (traceLevel)
  567. {
  568. DBGLOG("Roxie starting, build = %s", BUILD_TAG);
  569. }
  570. }
  571. bool isCCD = false;
  572. headRegionSize = topology->getPropInt("@headRegionSize", 50);
  573. numChannels = topology->getPropInt("@numChannels", 0);
  574. numActiveChannels = topology->getPropInt("@numActiveChannels", numChannels);
  575. statsExpiryTime = topology->getPropInt("@statsExpiryTime", 3600);
  576. roxiemem::memTraceSizeLimit = (memsize_t) topology->getPropInt64("@memTraceSizeLimit", 0);
  577. callbackRetries = topology->getPropInt("@callbackRetries", 3);
  578. callbackTimeout = topology->getPropInt("@callbackTimeout", 5000);
  579. lowTimeout = topology->getPropInt("@lowTimeout", 10000);
  580. highTimeout = topology->getPropInt("@highTimeout", 2000);
  581. slaTimeout = topology->getPropInt("@slaTimeout", 2000);
  582. keyedJoinFlowLimit = topology->getPropInt("@keyedJoinFlowLimit", 1000);
  583. parallelLoopFlowLimit = topology->getPropInt("@parallelLoopFlowLimit", 100);
  584. perChannelFlowLimit = topology->getPropInt("@perChannelFlowLimit", 10);
  585. copyResources = topology->getPropBool("@copyResources", true);
  586. useRemoteResources = topology->getPropBool("@useRemoteResources", true);
  587. checkFileDate = topology->getPropBool("@checkFileDate", true);
  588. const char *lazyOpenMode = topology->queryProp("@lazyOpen");
  589. if (!lazyOpenMode)
  590. lazyOpen = false;
  591. else if (stricmp(lazyOpenMode, "smart")==0)
  592. lazyOpen = (restarts > 0);
  593. else
  594. lazyOpen = topology->getPropBool("@lazyOpen", false);
  595. localSlave = topology->getPropBool("@localSlave", false);
  596. doIbytiDelay = topology->getPropBool("@doIbytiDelay", true);
  597. minIbytiDelay = topology->getPropInt("@minIbytiDelay", 2);
  598. initIbytiDelay = topology->getPropInt("@initIbytiDelay", 50);
  599. crcResources = topology->getPropBool("@crcResources", false);
  600. chunkingHeap = topology->getPropBool("@chunkingHeap", true);
  601. readTimeout = topology->getPropInt("@readTimeout", 300);
  602. logFullQueries = topology->getPropBool("@logFullQueries", false);
  603. debugPermitted = topology->getPropBool("@debugPermitted", true);
  604. blindLogging = topology->getPropBool("@blindLogging", false);
  605. if (!blindLogging)
  606. logExcessiveSeeks = true;
  607. preloadOnceData = topology->getPropBool("@preloadOnceData", true);
  608. linuxYield = topology->getPropBool("@linuxYield", false);
  609. traceSmartStepping = topology->getPropBool("@traceSmartStepping", false);
  610. useMemoryMappedIndexes = topology->getPropBool("@useMemoryMappedIndexes", false);
  611. traceJHtreeAllocations = topology->getPropBool("@traceJHtreeAllocations", false);
  612. flushJHtreeCacheOnOOM = topology->getPropBool("@flushJHtreeCacheOnOOM", true);
  613. fastLaneQueue = topology->getPropBool("@fastLaneQueue", true);
  614. udpOutQsPriority = topology->getPropInt("@udpOutQsPriority", 0);
  615. udpSnifferEnabled = topology->getPropBool("@udpSnifferEnabled", true);
  616. udpInlineCollation = topology->getPropBool("@udpInlineCollation", false);
  617. udpInlineCollationPacketLimit = topology->getPropInt("@udpInlineCollationPacketLimit", 50);
  618. udpSendCompletedInData = topology->getPropBool("@udpSendCompletedInData", false);
  619. udpRetryBusySenders = topology->getPropInt("@udpRetryBusySenders", 0);
  620. udpMaxRetryTimedoutReqs = topology->getPropInt("@udpMaxRetryTimedoutReqs", 0);
  621. udpRequestToSendTimeout = topology->getPropInt("@udpRequestToSendTimeout", 5);
  622. // MORE: think of a better way/value/check maybe/and/or based on Roxie server timeout
  623. if (udpRequestToSendTimeout == 0)
  624. udpRequestToSendTimeout = 5;
  625. // This is not added to deployment\xmlenv\roxie.xsd on purpose
  626. enableSocketMaxSetting = topology->getPropBool("@enableSocketMaxSetting", false);
  627. // MORE: might want to check socket buffer sizes against sys max here instead of udp threads ?
  628. udpMulticastBufferSize = topology->getPropInt("@udpMulticastBufferSize", 262142);
  629. udpFlowSocketsSize = topology->getPropInt("@udpFlowSocketsSize", 131072);
  630. udpLocalWriteSocketSize = topology->getPropInt("@udpLocalWriteSocketSize", 1024000);
  631. roxieMulticastEnabled = topology->getPropBool("@roxieMulticastEnabled", true); // enable use of multicast for sending requests to slaves
  632. if (udpSnifferEnabled && !roxieMulticastEnabled)
  633. DBGLOG("WARNING: ignoring udpSnifferEnabled setting as multicast not enabled");
  634. indexReadChunkSize = topology->getPropInt("@indexReadChunkSize", 60000);
  635. smartSteppingChunkRows = topology->getPropInt("@smartSteppingChunkRows", 100);
  636. numSlaveThreads = topology->getPropInt("@slaveThreads", 30);
  637. numServerThreads = topology->getPropInt("@serverThreads", 30);
  638. numRequestArrayThreads = topology->getPropInt("@requestArrayThreads", 5);
  639. maxBlockSize = topology->getPropInt("@maxBlockSize", 10000000);
  640. maxLockAttempts = topology->getPropInt("@maxLockAttempts", 5);
  641. enableHeartBeat = topology->getPropBool("@enableHeartBeat", true);
  642. checkCompleted = topology->getPropBool("@checkCompleted", true);
  643. preabortKeyedJoinsThreshold = topology->getPropInt("@preabortKeyedJoinsThreshold", 100);
  644. preabortIndexReadsThreshold = topology->getPropInt("@preabortIndexReadsThreshold", 100);
  645. defaultMemoryLimit = (memsize_t) topology->getPropInt64("@defaultMemoryLimit", 0);
  646. defaultTimeLimit[0] = (unsigned) topology->getPropInt64("@defaultLowPriorityTimeLimit", 0);
  647. defaultTimeLimit[1] = (unsigned) topology->getPropInt64("@defaultHighPriorityTimeLimit", 0);
  648. defaultTimeLimit[2] = (unsigned) topology->getPropInt64("@defaultSLAPriorityTimeLimit", 0);
  649. defaultWarnTimeLimit[0] = (unsigned) topology->getPropInt64("@defaultLowPriorityTimeWarning", 0);
  650. defaultWarnTimeLimit[1] = (unsigned) topology->getPropInt64("@defaultHighPriorityTimeWarning", 0);
  651. defaultWarnTimeLimit[2] = (unsigned) topology->getPropInt64("@defaultSLAPriorityTimeWarning", 0);
  652. defaultXmlReadFlags = topology->getPropBool("@defaultStripLeadingWhitespace", true) ? xr_ignoreWhiteSpace : xr_none;
  653. defaultParallelJoinPreload = topology->getPropInt("@defaultParallelJoinPreload", 0);
  654. defaultConcatPreload = topology->getPropInt("@defaultConcatPreload", 0);
  655. defaultFetchPreload = topology->getPropInt("@defaultFetchPreload", 0);
  656. defaultFullKeyedJoinPreload = topology->getPropInt("@defaultFullKeyedJoinPreload", 0);
  657. defaultKeyedJoinPreload = topology->getPropInt("@defaultKeyedJoinPreload", 0);
  658. defaultPrefetchProjectPreload = topology->getPropInt("@defaultPrefetchProjectPreload", 10);
  659. fieldTranslationEnabled = topology->getPropBool("@fieldTranslationEnabled", false);
  660. defaultCheckingHeap = topology->getPropInt("@checkingHeap", 0);
  661. checkVersion = topology->getPropBool("@checkVersion", true);
  662. deleteUnneededFiles = topology->getPropBool("@deleteUnneededFiles", true);
  663. checkPrimaries = topology->getPropBool("@checkPrimaries", true);
  664. pretendAllOpt = topology->getPropBool("@ignoreMissingFiles", false);
  665. memoryStatsInterval = topology->getPropInt("@memoryStatsInterval", 60);
  666. roxiemem::setMemoryStatsInterval(memoryStatsInterval);
  667. pingInterval = topology->getPropInt("@pingInterval", 0);
  668. socketCheckInterval = topology->getPropInt("@socketCheckInterval", 5000);
  669. memsize_t totalMemoryLimit = (memsize_t) topology->getPropInt64("@totalMemoryLimit", 0);
  670. if (!totalMemoryLimit)
  671. totalMemoryLimit = 1024 * 0x100000; // 1 Gb;
  672. roxiemem::setTotalMemoryLimit(totalMemoryLimit, 0, NULL);
  673. rtlSetReleaseRowHook(&callbackHook);
  674. traceStartStop = topology->getPropBool("@traceStartStop", false);
  675. traceServerSideCache = topology->getPropBool("@traceServerSideCache", false);
  676. timeActivities = topology->getPropBool("@timeActivities", true);
  677. clientCert.certificate.set(topology->queryProp("@certificateFileName"));
  678. clientCert.privateKey.set(topology->queryProp("@privateKeyFileName"));
  679. clientCert.passphrase.set(topology->queryProp("@passphrase"));
  680. useHardLink = topology->getPropBool("@useHardLink", false);
  681. maxFileAge[false] = topology->getPropInt("@localFilesExpire", (unsigned) -1);
  682. maxFileAge[true] = topology->getPropInt("@remoteFilesExpire", 60*60*1000);
  683. minFilesOpen[false] = topology->getPropInt("@minLocalFilesOpen", 2000);
  684. minFilesOpen[true] = topology->getPropInt("@minRemoteFilesOpen", 500);
  685. maxFilesOpen[false] = topology->getPropInt("@maxLocalFilesOpen", 4000);
  686. maxFilesOpen[true] = topology->getPropInt("@maxRemoteFilesOpen", 1000);
  687. dafilesrvLookupTimeout = topology->getPropInt("@dafilesrvLookupTimeout", 10000);
  688. topology->getProp("@daliServers", fileNameServiceDali);
  689. trapTooManyActiveQueries = topology->getPropBool("@trapTooManyActiveQueries", true);
  690. allowRoxieOnDemand = topology->getPropBool("@allowRoxieOnDemand", false);
  691. maxEmptyLoopIterations = topology->getPropInt("@maxEmptyLoopIterations", 1000);
  692. maxGraphLoopIterations = topology->getPropInt("@maxGraphLoopIterations", 1000);
  693. useTreeCopy = topology->getPropBool("@useTreeCopy", true);
  694. mergeSlaveStatistics = topology->getPropBool("@mergeSlaveStatistics", true);
  695. enableKeyDiff = topology->getPropBool("@enableKeyDiff", true);
  696. enableForceKeyDiffCopy = topology->getPropBool("@enableForceKeyDiffCopy", false);
  697. // MORE: Get parms from topology after it is populated from Hardware/computer types section in configenv
  698. // Then if does not match and based on desired action in topolgy, either warn, or fatal exit or .... etc
  699. // Also get prim path and sec from topology
  700. #ifdef _WIN32
  701. getHardwareInfo(hdwInfo, "C:", "D:");
  702. #else // linux
  703. getHardwareInfo(hdwInfo, "/c$", "/d$");
  704. #endif
  705. if (traceLevel)
  706. {
  707. DBGLOG("Current Hardware Info: CPUs=%i, speed=%i MHz, Mem=%i MB , primDisk=%i GB, primFree=%i GB, secDisk=%i GB, secFree=%i GB, NIC=%i",
  708. hdwInfo.numCPUs, hdwInfo.CPUSpeed, hdwInfo.totalMemory,
  709. hdwInfo.primDiskSize, hdwInfo.primFreeSize, hdwInfo.secDiskSize, hdwInfo.secFreeSize, hdwInfo.NICSpeed);
  710. }
  711. parallelAggregate = topology->getPropInt("@parallelAggregate", 0);
  712. if (!parallelAggregate)
  713. parallelAggregate = hdwInfo.numCPUs;
  714. if (!parallelAggregate)
  715. parallelAggregate = 1;
  716. simpleLocalKeyedJoins = topology->getPropBool("@simpleLocalKeyedJoins", true);
  717. inMemoryKeysEnabled = topology->getPropBool("@inMemoryKeysEnabled", true);
  718. serverSideCacheSize = topology->getPropInt("@serverSideCacheSize", 0);
  719. setKeyIndexCacheSize((unsigned)-1); // unbound
  720. nodeCachePreload = topology->getPropBool("@nodeCachePreload", false);
  721. setNodeCachePreload(nodeCachePreload);
  722. nodeCacheMB = topology->getPropInt("@nodeCacheMem", 100);
  723. setNodeCacheMem(nodeCacheMB * 0x100000);
  724. leafCacheMB = topology->getPropInt("@leafCacheMem", 50);
  725. setLeafCacheMem(leafCacheMB * 0x100000);
  726. blobCacheMB = topology->getPropInt("@blobCacheMem", 0);
  727. setBlobCacheMem(blobCacheMB * 0x100000);
  728. minFreeDiskSpace = topology->getPropInt64("@minFreeDiskSpace", (1024 * 0x100000)); // default to 1 GB
  729. if (topology->getPropBool("@jumboFrames", false))
  730. {
  731. mtu_size = 9000; // upper limit on outbound buffer size - allow some header room too
  732. roxiemem::setDataAlignmentSize(0x2000);
  733. }
  734. else
  735. {
  736. mtu_size = 1400; // upper limit on outbound buffer size - allow some header room too
  737. roxiemem::setDataAlignmentSize(0x400);
  738. }
  739. unsigned pinterval = topology->getPropInt("@systemMonitorInterval",1000*60);
  740. if (pinterval)
  741. startPerformanceMonitor(pinterval);
  742. topology->getProp("@pluginDirectory", pluginDirectory);
  743. if (pluginDirectory.length() == 0)
  744. pluginDirectory.append(codeDirectory).append("plugins");
  745. if (queryDirectory.length() == 0)
  746. {
  747. topology->getProp("@queryDir", queryDirectory);
  748. if (queryDirectory.length() == 0)
  749. queryDirectory.append(codeDirectory).append("queries");
  750. }
  751. addNonEmptyPathSepChar(queryDirectory);
  752. // if no Dali, files are local
  753. if (fileNameServiceDali.length() == 0)
  754. baseDataDirectory.append("./"); // Path separator will be replaced, if necessary
  755. else
  756. baseDataDirectory.append(topology->queryProp("@baseDataDir"));
  757. queryFileCache().start();
  758. getTempFilePath(tempDirectory, "roxie", topology);
  759. #ifdef _WIN32
  760. topology->addPropBool("@linuxOS", false);
  761. #else
  762. topology->addPropBool("@linuxOS", true);
  763. #endif
  764. allQuerySetNames.appendListUniq(topology->queryProp("@querySets"), ",");
  765. if (!numChannels)
  766. throw MakeStringException(MSGAUD_operator, ROXIE_INVALID_TOPOLOGY, "Invalid topology file - numChannels attribute must be specified");
  767. bool myIPadded = false;
  768. Owned<IPropertyTreeIterator> roxieServers = topology->getElements("./RoxieServerProcess");
  769. ForEach(*roxieServers)
  770. {
  771. IPropertyTree &roxieServer = roxieServers->query();
  772. const char *iptext = roxieServer.queryProp("@netAddress");
  773. unsigned nodeIndex = addRoxieNode(iptext);
  774. unsigned port = roxieServer.getPropInt("@port", ROXIE_SERVER_PORT);
  775. if (iptext)
  776. {
  777. SocketEndpoint ep(iptext, port);
  778. unsigned roxieServerHost = roxieServer.getPropInt("@multihost", 0);
  779. if (ipMatch(ep) && ((roxieServerHost == myHostNumber) || (myHostNumber==-1)))
  780. {
  781. if (baseDataDirectory.length() == 0) // if not set by other topology settings default to this ...
  782. baseDataDirectory.append(roxieServer.queryProp("@baseDataDirectory"));
  783. unsigned numThreads = roxieServer.getPropInt("@numThreads", numServerThreads);
  784. const char *aclName = roxieServer.queryProp("@aclName");
  785. addServerChannel(roxieServers->query().queryProp("@dataDirectory"), port, numThreads, aclName, topology);
  786. if (!myIPadded || (myHostNumber==-1))
  787. {
  788. myNodeIndex = nodeIndex;
  789. allRoxieServers.append(ep);
  790. myIPadded = true;
  791. }
  792. }
  793. else if (multiHostMode || !roxieServerHost)
  794. {
  795. bool found = false;
  796. ForEachItemIn(idx, allRoxieServers)
  797. {
  798. if (multiHostMode)
  799. {
  800. if (ep.equals(allRoxieServers.item(idx)))
  801. {
  802. found = true;
  803. break;
  804. }
  805. }
  806. else
  807. {
  808. if (ep.ipequals(allRoxieServers.item(idx)))
  809. {
  810. found = true;
  811. break;
  812. }
  813. }
  814. }
  815. if (!found)
  816. allRoxieServers.append(ep);
  817. }
  818. }
  819. else
  820. throw MakeStringException(MSGAUD_operator, ROXIE_INVALID_TOPOLOGY, "Invalid topology file - missing netAddress or port specification on RoxieServerProcess element");
  821. }
  822. if (!localSlave)
  823. {
  824. if (roxieMulticastEnabled)
  825. {
  826. if (topology->queryProp("@multicastBase"))
  827. multicastBase.ipset(topology->queryProp("@multicastBase"));
  828. else
  829. throw MakeStringException(MSGAUD_operator, ROXIE_INVALID_TOPOLOGY, "Invalid topology file - multicastBase not set");
  830. if (topology->queryProp("@multicastLast"))
  831. multicastLast.ipset(topology->queryProp("@multicastLast"));
  832. else
  833. throw MakeStringException(MSGAUD_operator, ROXIE_INVALID_TOPOLOGY, "Invalid topology file - multicastLast not set");
  834. }
  835. }
  836. Owned<IPropertyTreeIterator> slaves = topology->getElements("./RoxieSlaveProcess");
  837. unsigned slaveCount = 0;
  838. IpAddress *primaries = new IpAddress[numChannels+1]; // check each channel has a different primary, if possible. Leaks on fatal errors, but who cares
  839. ForEach(*slaves)
  840. {
  841. IPropertyTree &slave = slaves->query();
  842. const char *iptext = slave.queryProp("@netAddress");
  843. if (iptext)
  844. {
  845. addRoxieNode(iptext);
  846. IpAddress slaveIp(iptext);
  847. bool isMe = ipMatch(slaveIp) && slave.getPropInt("@multihost", 0) == myHostNumber;
  848. bool suspended = slave.getPropBool("@suspended", false);
  849. unsigned channel = slave.getPropInt("@channel", 0);
  850. if (!channel)
  851. channel = slave.getPropInt("@channels", 0); // legacy support
  852. const char *dataDirectory = slave.queryProp("@dataDirectory");
  853. if (channel && channel <= numChannels)
  854. {
  855. if (isMe)
  856. isCCD = true;
  857. if (!numSlaves[channel])
  858. {
  859. primaries[channel] = slaveIp;
  860. slaveCount++;
  861. }
  862. addChannel(channel, dataDirectory, isMe, suspended, slaveIp);
  863. if (isMe)
  864. joinMulticastChannel(channel);
  865. }
  866. else
  867. throw MakeStringException(MSGAUD_operator, ROXIE_INVALID_TOPOLOGY, "Invalid topology file - missing or invalid channel attribute on RoxieSlaveProcess element");
  868. }
  869. else
  870. throw MakeStringException(MSGAUD_operator, ROXIE_INVALID_TOPOLOGY, "Invalid topology file - missing netAddress attribute on RoxieSlaveProcess element");
  871. }
  872. if (numActiveChannels)
  873. joinMulticastChannel(0); // all slaves also listen on channel 0
  874. for (unsigned n = 1; n < numActiveChannels; n++)
  875. {
  876. if (primaries[n].isNull())
  877. throw MakeStringException(MSGAUD_operator, ROXIE_INVALID_TOPOLOGY, "Invalid topology file - no slaves for channel %d", n);
  878. if (checkPrimaries)
  879. {
  880. for (unsigned m = n+1; m <= numChannels; m++)
  881. if (primaries[n].ipequals(primaries[m]))
  882. {
  883. StringBuffer s;
  884. primaries[n].getIpText(s);
  885. throw MakeStringException(MSGAUD_operator, ROXIE_INVALID_TOPOLOGY, "Invalid topology file - slave %s is primary for multiple channels", s.str());
  886. }
  887. }
  888. }
  889. delete [] primaries;
  890. setDaliServixSocketCaching(true); // enable daliservix caching
  891. loadPlugins();
  892. globalPackageSetManager = createRoxiePackageSetManager(standAloneDll.getClear());
  893. globalPackageSetManager->load();
  894. unsigned snifferChannel = numChannels+2; // MORE - why +2 not +1 ??
  895. ROQ = createOutputQueueManager(snifferChannel, isCCD ? numSlaveThreads : 1);
  896. ROQ->setHeadRegionSize(headRegionSize);
  897. ROQ->start();
  898. Owned<IPacketDiscarder> packetDiscarder = createPacketDiscarder();
  899. #if defined(WIN32) && defined(_DEBUG) && defined(_DEBUG_HEAP_FULL)
  900. int tmpFlag = _CrtSetDbgFlag( _CRTDBG_REPORT_FLAG );
  901. tmpFlag |= _CRTDBG_CHECK_ALWAYS_DF;
  902. _CrtSetDbgFlag( tmpFlag );
  903. #endif
  904. setSEHtoExceptionHandler(&abortHandler);
  905. if (runOnce)
  906. {
  907. Owned <IRoxieListener> roxieServer = createRoxieSocketListener(0, 1, 0, false);
  908. try
  909. {
  910. const char *format = globals->queryProp("format");
  911. if (!format)
  912. {
  913. if (globals->hasProp("-xml"))
  914. format = "xml";
  915. else if (globals->hasProp("-csv"))
  916. format = "csv";
  917. else if (globals->hasProp("-raw"))
  918. format = "raw";
  919. else
  920. format = "ascii";
  921. }
  922. StringBuffer query;
  923. query.appendf("<roxie format='%s'/>", format);
  924. roxieServer->runOnce(query.str()); // MORE - should use the wu listener instead I suspect
  925. fflush(stdout); // in windows if output is redirected results don't appear without flushing
  926. }
  927. catch (IException *E)
  928. {
  929. EXCLOG(E);
  930. E->Release();
  931. }
  932. }
  933. else
  934. {
  935. Owned<IPropertyTreeIterator> it = ccdChannels->getElements("RoxieServerProcess");
  936. ForEach(*it)
  937. {
  938. // MORE - there are assumptions that everyone is a server (in deployment)
  939. IPropertyTree &serverInfo = it->query();
  940. unsigned port = serverInfo.getPropInt("@port", -1);
  941. bool suspended = serverInfo.getPropBool("@suspended", false);
  942. unsigned numThreads = serverInfo.getPropInt("@numThreads", -1);
  943. unsigned listenQueue = serverInfo.getPropInt("@listenQueue", DEFAULT_LISTEN_QUEUE_SIZE);
  944. Owned <IRoxieListener> roxieServer;
  945. if (port)
  946. roxieServer.setown(createRoxieSocketListener(port, numThreads, listenQueue, suspended));
  947. else
  948. roxieServer.setown(createRoxieWorkUnitListener(numThreads, suspended));
  949. Owned<IPropertyTreeIterator> accesses = serverInfo.getElements("Access");
  950. ForEach(*accesses)
  951. {
  952. IPropertyTree &access = accesses->query();
  953. try
  954. {
  955. roxieServer->addAccess(access.getPropBool("@allow", true), access.getPropBool("@allowBlind", true), access.queryProp("@ip"), access.queryProp("@mask"), access.queryProp("@query"), access.queryProp("@error"), access.getPropInt("@errorCode"));
  956. }
  957. catch (IException *E)
  958. {
  959. StringBuffer s, x;
  960. E->errorMessage(s);
  961. E->Release();
  962. toXML(&access, x, 0, 0);
  963. throw MakeStringException(ROXIE_ACL_ERROR, "Error in access statement %s: %s", x.str(), s.str());
  964. }
  965. }
  966. socketListeners.append(*roxieServer.getLink());
  967. time(&startupTime);
  968. roxieServer->start();
  969. }
  970. writeSentinelFile(sentinelFile);
  971. DBGLOG("Waiting for queries");
  972. if (pingInterval)
  973. startPingTimer();
  974. LocalIAbortHandler abortHandler(waiter);
  975. waiter.wait();
  976. }
  977. shuttingDown = true;
  978. if (pingInterval)
  979. stopPingTimer();
  980. setSEHtoExceptionHandler(NULL);
  981. while (socketListeners.isItem(0))
  982. {
  983. socketListeners.item(0).stop(1000);
  984. socketListeners.remove(0);
  985. }
  986. packetDiscarder->stop();
  987. packetDiscarder.clear();
  988. ROQ->stop();
  989. ROQ->join();
  990. ROQ->Release();
  991. ROQ = NULL;
  992. }
  993. catch (IException *E)
  994. {
  995. StringBuffer x;
  996. DBGLOG("EXCEPTION: (%d): %s", E->errorCode(), E->errorMessage(x).str());
  997. E->Release();
  998. }
  999. roxieMetrics.clear();
  1000. allRoxieServers.kill();
  1001. stopPerformanceMonitor();
  1002. ::Release(globalPackageSetManager);
  1003. globalPackageSetManager = NULL;
  1004. cleanupPlugins();
  1005. closeMulticastSockets();
  1006. releaseSlaveDynamicFileCache();
  1007. releaseDiffFileInfoCache();
  1008. releaseRoxieStateCache();
  1009. setDaliServixSocketCaching(false); // make sure it cleans up or you get bogus memleak reports
  1010. setNodeCaching(false); // ditto
  1011. strdup("Make sure leak checking is working");
  1012. UseSysLogForOperatorMessages(false);
  1013. ExitModuleObjects();
  1014. releaseAtoms();
  1015. strdup("Make sure leak checking is working");
  1016. #ifdef _WIN32
  1017. #ifdef _DEBUG
  1018. #if 1
  1019. StringBuffer leakFileDir(logDirectory.str());
  1020. leakFileDir.append("roxieleaks.log");
  1021. HANDLE h = CreateFile(leakFileDir.str(), GENERIC_READ|GENERIC_WRITE, 0, NULL, CREATE_ALWAYS, 0, 0);
  1022. _CrtSetReportMode( _CRT_WARN, _CRTDBG_MODE_FILE|_CRTDBG_MODE_DEBUG);
  1023. _CrtSetReportFile( _CRT_WARN, h);
  1024. _CrtSetReportMode( _CRT_ERROR, _CRTDBG_MODE_FILE|_CRTDBG_MODE_DEBUG);
  1025. _CrtSetReportFile( _CRT_ERROR, h);
  1026. _CrtSetReportMode( _CRT_ASSERT, _CRTDBG_MODE_FILE|_CRTDBG_MODE_DEBUG);
  1027. _CrtSetReportFile( _CRT_ASSERT, h);
  1028. // _CrtDumpMemoryLeaks(); if you uncomment these lines you get to see the leaks sooner (so can look in debugger at full memory)
  1029. // CloseHandle(h); but there will be additional leaks reported that are not really leaks
  1030. #endif
  1031. #endif
  1032. #endif
  1033. return 0;
  1034. }