ccdmain.cpp 63 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323132413251326132713281329133013311332133313341335133613371338133913401341134213431344134513461347134813491350135113521353135413551356135713581359136013611362136313641365136613671368136913701371137213731374137513761377137813791380138113821383138413851386138713881389139013911392139313941395139613971398139914001401140214031404140514061407140814091410141114121413141414151416141714181419142014211422142314241425142614271428142914301431143214331434143514361437143814391440144114421443144414451446144714481449145014511452145314541455145614571458145914601461146214631464146514661467146814691470147114721473147414751476147714781479148014811482148314841485148614871488148914901491149214931494149514961497149814991500150115021503150415051506150715081509151015111512151315141515
  1. /*##############################################################################
  2. HPCC SYSTEMS software Copyright (C) 2012 HPCC Systems®.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. ############################################################################## */
  13. #include <platform.h>
  14. #include <signal.h>
  15. #include <algorithm>
  16. #include <jlib.hpp>
  17. #include <jio.hpp>
  18. #include <jmisc.hpp>
  19. #include <jqueue.tpp>
  20. #include <jsocket.hpp>
  21. #include <jlog.hpp>
  22. #include <jprop.hpp>
  23. #include <jfile.hpp>
  24. #include <jencrypt.hpp>
  25. #include "jutil.hpp"
  26. #include "jsecrets.hpp"
  27. #include <build-config.h>
  28. #include <udptopo.hpp>
  29. #include "rtlformat.hpp"
  30. #include "dalienv.hpp"
  31. #include "rmtfile.hpp"
  32. #include "ccd.hpp"
  33. #include "ccdquery.hpp"
  34. #include "ccdstate.hpp"
  35. #include "ccdqueue.ipp"
  36. #include "ccdserver.hpp"
  37. #include "ccdlistener.hpp"
  38. #include "ccdsnmp.hpp"
  39. #include "thorplugin.hpp"
  40. #if defined (__linux__)
  41. #include <sys/syscall.h>
  42. #include "ioprio.h"
  43. #endif
  44. #ifdef _USE_CPPUNIT
  45. #include <cppunit/extensions/TestFactoryRegistry.h>
  46. #include <cppunit/ui/text/TestRunner.h>
  47. #endif
  48. //=================================================================================
  49. bool shuttingDown = false;
  50. unsigned numChannels;
  51. unsigned callbackRetries = 3;
  52. unsigned callbackTimeout = 5000;
  53. unsigned lowTimeout = 10000;
  54. unsigned highTimeout = 2000;
  55. unsigned slaTimeout = 2000;
  56. unsigned numServerThreads = 30;
  57. unsigned numAgentThreads = 30;
  58. bool prestartAgentThreads = false;
  59. unsigned numRequestArrayThreads = 5;
  60. unsigned headRegionSize;
  61. unsigned ccdMulticastPort;
  62. bool enableHeartBeat = true;
  63. unsigned parallelLoopFlowLimit = 100;
  64. unsigned perChannelFlowLimit = 10;
  65. time_t startupTime;
  66. unsigned statsExpiryTime = 3600;
  67. unsigned miscDebugTraceLevel = 0; // separate trace settings purely for debugging specific items (i.e. all possible locations to look for files at startup)
  68. unsigned readTimeout = 300;
  69. unsigned indexReadChunkSize = 60000;
  70. unsigned maxBlockSize = 10000000;
  71. unsigned maxLockAttempts = 5;
  72. bool pretendAllOpt = false;
  73. bool traceStartStop = false;
  74. bool traceRoxiePackets = false;
  75. bool delaySubchannelPackets = false; // For debugging/testing purposes only
  76. bool defaultTimeActivities = true;
  77. bool defaultTraceEnabled = false;
  78. bool traceTranslations = true;
  79. unsigned IBYTIbufferSize = 0;
  80. unsigned IBYTIbufferLifetime = 50; // In milliseconds
  81. unsigned defaultTraceLimit = 10;
  82. unsigned watchActivityId = 0;
  83. unsigned testAgentFailure = 0;
  84. RelaxedAtomic<unsigned> restarts;
  85. RecordTranslationMode fieldTranslationEnabled = RecordTranslationMode::Payload;
  86. bool mergeAgentStatistics = true;
  87. PTreeReaderOptions defaultXmlReadFlags = ptr_ignoreWhiteSpace;
  88. bool runOnce = false;
  89. bool oneShotRoxie = false;
  90. unsigned udpMulticastBufferSize = 262142;
  91. #ifndef _CONTAINERIZED
  92. bool roxieMulticastEnabled = true;
  93. #endif
  94. IPropertyTree *topology;
  95. MapStringTo<int> *preferredClusters;
  96. StringBuffer topologyFile;
  97. CriticalSection ccdChannelsCrit;
  98. StringArray allQuerySetNames;
  99. bool alwaysTrustFormatCrcs;
  100. bool allFilesDynamic;
  101. bool lockSuperFiles;
  102. bool useRemoteResources;
  103. bool checkFileDate;
  104. bool lazyOpen;
  105. bool localAgent;
  106. bool encryptInTransit;
  107. bool useAeron;
  108. bool ignoreOrphans;
  109. bool doIbytiDelay = true;
  110. bool copyResources;
  111. bool enableKeyDiff = true;
  112. bool chunkingHeap = true;
  113. bool logFullQueries;
  114. bool blindLogging = false;
  115. bool debugPermitted = true;
  116. bool checkCompleted = true;
  117. unsigned preabortKeyedJoinsThreshold = 100;
  118. unsigned preabortIndexReadsThreshold = 100;
  119. bool preloadOnceData;
  120. bool reloadRetriesFailed;
  121. bool selfTestMode = false;
  122. bool defaultCollectFactoryStatistics = true;
  123. bool defaultNoSeekBuildIndex = false;
  124. unsigned parallelLoadQueries = 8;
  125. bool alwaysFailOnLeaks = false;
  126. unsigned continuationCompressThreshold = 1024;
  127. bool useOldTopology = false;
  128. int backgroundCopyClass = 0;
  129. int backgroundCopyPrio = 0;
  130. unsigned memoryStatsInterval = 0;
  131. memsize_t defaultMemoryLimit;
  132. unsigned defaultTimeLimit[3] = {0, 0, 0};
  133. unsigned defaultWarnTimeLimit[3] = {0, 5000, 5000};
  134. unsigned defaultThorConnectTimeout;
  135. unsigned defaultParallelJoinPreload = 0;
  136. unsigned defaultPrefetchProjectPreload = 10;
  137. unsigned defaultConcatPreload = 0;
  138. unsigned defaultFetchPreload = 0;
  139. unsigned defaultFullKeyedJoinPreload = 0;
  140. unsigned defaultKeyedJoinPreload = 0;
  141. unsigned dafilesrvLookupTimeout = 10000;
  142. bool defaultCheckingHeap = false;
  143. bool defaultDisableLocalOptimizations = false;
  144. unsigned defaultStrandBlockSize = 512;
  145. unsigned defaultForceNumStrands = 0;
  146. unsigned defaultHeapFlags = roxiemem::RHFnone;
  147. unsigned agentQueryReleaseDelaySeconds = 60;
  148. unsigned coresPerQuery = 0;
  149. unsigned logQueueLen;
  150. unsigned logQueueDrop;
  151. bool useLogQueue;
  152. bool fastLaneQueue;
  153. unsigned mtu_size = 1400; // upper limit on outbound buffer size - allow some header room too
  154. StringBuffer fileNameServiceDali;
  155. StringBuffer roxieName;
  156. bool trapTooManyActiveQueries;
  157. unsigned maxEmptyLoopIterations;
  158. unsigned maxGraphLoopIterations;
  159. bool probeAllRows;
  160. bool steppingEnabled = true;
  161. bool simpleLocalKeyedJoins = true;
  162. bool adhocRoxie = false;
  163. unsigned __int64 minFreeDiskSpace = 1024 * 0x100000; // default to 1 GB
  164. unsigned socketCheckInterval = 5000;
  165. unsigned cacheReportPeriodSeconds = 5*60;
  166. StringBuffer logDirectory;
  167. StringBuffer pluginDirectory;
  168. StringBuffer queryDirectory;
  169. StringBuffer codeDirectory;
  170. StringBuffer tempDirectory;
  171. ClientCertificate clientCert;
  172. bool useHardLink;
  173. unsigned maxFileAge[2] = {0xffffffff, 60*60*1000}; // local files don't expire, remote expire in 1 hour, by default
  174. unsigned minFilesOpen[2] = {2000, 500};
  175. unsigned maxFilesOpen[2] = {4000, 1000};
  176. SocketEndpoint debugEndpoint;
  177. HardwareInfo hdwInfo;
  178. unsigned parallelAggregate;
  179. bool inMemoryKeysEnabled = true;
  180. bool nodeCachePreload = false;
  181. unsigned nodeCacheMB = 100;
  182. unsigned leafCacheMB = 50;
  183. unsigned blobCacheMB = 0;
  184. unsigned roxiePort = 0;
  185. #ifndef _CONTAINERIZED
  186. Owned<IPerfMonHook> perfMonHook;
  187. #endif
  188. MODULE_INIT(INIT_PRIORITY_STANDARD)
  189. {
  190. topology = NULL;
  191. return true;
  192. }
  193. MODULE_EXIT()
  194. {
  195. ::Release(topology);
  196. }
  197. //=========================================================================================
  198. //////////////////////////////////////////////////////////////////////////////////////////////
  199. extern "C" void caughtSIGPIPE(int sig)
  200. {
  201. DBGLOG("Caught sigpipe %d", sig);
  202. }
  203. extern "C" void caughtSIGHUP(int sig)
  204. {
  205. DBGLOG("Caught sighup %d", sig);
  206. }
  207. extern "C" void caughtSIGALRM(int sig)
  208. {
  209. DBGLOG("Caught sigalrm %d", sig);
  210. }
  211. extern "C" void caughtSIGTERM(int sig)
  212. {
  213. DBGLOG("Caught sigterm %d", sig);
  214. }
  215. void init_signals()
  216. {
  217. // signal(SIGTERM, caughtSIGTERM);
  218. #ifndef _WIN32
  219. signal(SIGPIPE, caughtSIGPIPE);
  220. signal(SIGHUP, caughtSIGHUP);
  221. signal(SIGALRM, caughtSIGALRM);
  222. #endif
  223. }
  224. //=========================================================================================
  225. class Waiter : public CInterface, implements IAbortHandler
  226. {
  227. Semaphore aborted;
  228. public:
  229. IMPLEMENT_IINTERFACE;
  230. bool wait(unsigned timeout)
  231. {
  232. return aborted.wait(timeout);
  233. }
  234. void wait()
  235. {
  236. aborted.wait();
  237. }
  238. bool onAbort()
  239. {
  240. aborted.signal();
  241. roxieMetrics.clear();
  242. #ifdef _DEBUG
  243. return false; // we want full leak checking info
  244. #else
  245. return true; // we don't care - just exit as fast as we can
  246. #endif
  247. }
  248. } waiter;
  249. void closedown()
  250. {
  251. Owned<IFile> sentinelFile = createSentinelTarget();
  252. removeSentinelFile(sentinelFile);
  253. waiter.onAbort();
  254. }
  255. void getAccessList(const char *aclName, const IPropertyTree *topology, IPropertyTree *aclInfo)
  256. {
  257. StringBuffer xpath;
  258. xpath.append("ACL[@name='").append(aclName).append("']");
  259. if (aclInfo->queryPropTree(xpath))
  260. throw MakeStringException(MSGAUD_operator, ROXIE_INVALID_TOPOLOGY, "Invalid topology file - recursive ACL definition of %s", aclName);
  261. aclInfo->addPropTree("ACL")->setProp("@name", aclName);
  262. Owned<IPropertyTree> acl = topology->getPropTree(xpath.str());
  263. if (!acl)
  264. throw MakeStringException(MSGAUD_operator, ROXIE_INVALID_TOPOLOGY, "Invalid topology file - acl %s not found", aclName);
  265. Owned<IPropertyTreeIterator> access = acl->getElements("Access");
  266. ForEach(*access)
  267. {
  268. IPropertyTree &child = access->query();
  269. const char *base = child.queryProp("@base");
  270. if (base)
  271. getAccessList(base, topology, aclInfo);
  272. else
  273. aclInfo->addPropTree(child.queryName(), LINK(&child));
  274. }
  275. aclInfo->removeProp(xpath);
  276. }
  277. bool ipMatch(IpAddress &ip)
  278. {
  279. return ip.isLocal();
  280. }
  281. extern void doUNIMPLEMENTED(unsigned line, const char *file)
  282. {
  283. throw MakeStringException(ROXIE_UNIMPLEMENTED_ERROR, "UNIMPLEMENTED at %s:%d", sanitizeSourceFile(file), line);
  284. }
  285. void FatalError(const char *format, ...)
  286. {
  287. va_list args;
  288. va_start(args, format);
  289. StringBuffer errMsg;
  290. errMsg.valist_appendf(format, args);
  291. va_end(args);
  292. Owned<IException> E = MakeStringException(MSGAUD_operator, ROXIE_INTERNAL_ERROR, "%s", errMsg.str());
  293. EXCLOG(E, "Fatal error");
  294. Sleep(5000);
  295. _exit(1);
  296. }
  297. // If changing these, please change roxie.cpp's roxie_server_usage() as well
  298. static void roxie_common_usage(const char * progName)
  299. {
  300. StringBuffer program;
  301. program.append(progName);
  302. getFileNameOnly(program, false);
  303. // Things that are also relevant to stand-alone executables
  304. printf("Usage: %s [options]\n", program.str());
  305. printf("\nOptions:\n");
  306. printf(" --[xml|csv|raw] : Output format (default ascii)\n");
  307. printf(" --daliServers=[host1,...] : List of Dali servers to use\n");
  308. printf(" --tracelevel=[integer] : Amount of information to dump on logs\n");
  309. printf(" --stdlog=[boolean] : Standard log format (based on tracelevel)\n");
  310. printf(" --logfile=[filename] : Outputs to logfile, rather than stdout\n");
  311. printf(" --help|-h : This message\n");
  312. printf("\n");
  313. }
  314. class MAbortHandler : implements IExceptionHandler
  315. {
  316. public:
  317. virtual bool fireException(IException *e)
  318. {
  319. ForEachItemIn(idx, socketListeners)
  320. {
  321. socketListeners.item(idx).stopListening();
  322. }
  323. return false; // It returns to excsighandler() to abort!
  324. }
  325. } abortHandler;
  326. #ifdef _WIN32
  327. int myhook(int alloctype, void *, size_t nSize, int p1, long allocSeq, const unsigned char *file, int line)
  328. {
  329. // Handy place to put breakpoints when tracking down obscure memory leaks...
  330. if (nSize==68 && !file)
  331. {
  332. DBGLOG("memory hook matched");
  333. }
  334. return true;
  335. }
  336. #endif
  337. void saveTopology()
  338. {
  339. // Write back changes that have been made via certain control:xxx changes, so that they survive a roxie restart
  340. // Note that they are overwritten when Roxie is manually stopped/started via hpcc-init service - these changes
  341. // are only intended to be temporary for the current session
  342. if (!useOldTopology)
  343. return;
  344. try
  345. {
  346. saveXML(topologyFile.str(), topology);
  347. }
  348. catch (IException *E)
  349. {
  350. // If we can't save the topology, then tough. Carry on without it. Changes will not survive an unexpected roxie restart
  351. EXCLOG(E, "Error saving topology file");
  352. E->Release();
  353. }
  354. }
  355. class CHpccProtocolPluginCtx : implements IHpccProtocolPluginContext, public CInterface
  356. {
  357. public:
  358. IMPLEMENT_IINTERFACE;
  359. virtual int ctxGetPropInt(const char *propName, int defaultValue) const
  360. {
  361. return topology->getPropInt(propName, defaultValue);
  362. }
  363. virtual bool ctxGetPropBool(const char *propName, bool defaultValue) const
  364. {
  365. return topology->getPropBool(propName, defaultValue);
  366. }
  367. virtual const char *ctxQueryProp(const char *propName) const
  368. {
  369. return topology->queryProp(propName);
  370. }
  371. };
  372. static std::vector<RoxieEndpointInfo> myRoles;
  373. static std::vector<std::pair<unsigned, unsigned>> agentChannels;
  374. #ifndef _CONTAINERIZED
  375. void readStaticTopology()
  376. {
  377. // If dynamicServers not set, we read a list of all servers form the topology file, and deduce which ones are on which channel
  378. // and the total number of channels
  379. std::vector<RoxieEndpointInfo> allRoles;
  380. IpAddressArray nodeTable;
  381. unsigned numNodes = topology->getCount("./RoxieServerProcess");
  382. if (!numNodes && localAgent)
  383. {
  384. topology->addPropTree("RoxieServerProcess")->setProp("@netAddress", ".");
  385. numNodes = 1;
  386. }
  387. Owned<IPropertyTreeIterator> roxieServers = topology->getElements("./RoxieServerProcess");
  388. bool myNodeSet = false;
  389. unsigned calcNumChannels = 0;
  390. ForEach(*roxieServers)
  391. {
  392. IPropertyTree &roxieServer = roxieServers->query();
  393. const char *iptext = roxieServer.queryProp("@netAddress");
  394. IpAddress ip(iptext);
  395. if (ip.isNull())
  396. throw MakeStringException(ROXIE_UDP_ERROR, "Could not resolve address %s", iptext);
  397. if (ip.isLocal() && !myNodeSet)
  398. {
  399. myNodeSet = true;
  400. myNode.setIp(ip);
  401. myAgentEP.set(ccdMulticastPort, myNode.getIpAddress());
  402. }
  403. ForEachItemIn(idx, nodeTable)
  404. {
  405. if (ip.ipequals(nodeTable.item(idx)))
  406. throw MakeStringException(ROXIE_UDP_ERROR, "Duplicated node %s in RoxieServerProcess list", iptext);
  407. }
  408. nodeTable.append(ip);
  409. Owned<IPropertyTreeIterator> roxieFarms = topology->getElements("./RoxieFarmProcess");
  410. ForEach(*roxieFarms)
  411. {
  412. IPropertyTree &roxieFarm = roxieFarms->query();
  413. unsigned port = roxieFarm.getPropInt("@port", ROXIE_SERVER_PORT);
  414. RoxieEndpointInfo server = {RoxieEndpointInfo::RoxieServer, 0, { (unsigned short) port, ip }, 0};
  415. allRoles.push_back(server);
  416. }
  417. }
  418. if (!myNodeSet)
  419. throw MakeStringException(MSGAUD_operator, ROXIE_INVALID_TOPOLOGY, "Invalid topology file - current node is not in server list");
  420. // Generate the agent channels
  421. unsigned numDataCopies = topology->getPropInt("@numDataCopies", 1);
  422. if (!numDataCopies)
  423. throw MakeStringException(MSGAUD_operator, ROXIE_INVALID_TOPOLOGY, "Invalid topology file - numDataCopies should be > 0");
  424. unsigned channelsPerNode = topology->getPropInt("@channelsPerNode", 1);
  425. const char *agentConfig = topology->queryProp("@agentConfig");
  426. if (!agentConfig)
  427. agentConfig = topology->queryProp("@slaveConfig"); // legacy name
  428. if (!agentConfig)
  429. agentConfig = "simple";
  430. if (strnicmp(agentConfig, "cyclic", 6) == 0)
  431. {
  432. calcNumChannels = numNodes;
  433. unsigned cyclicOffset = topology->getPropInt("@cyclicOffset", 1);
  434. for (unsigned copy=0; copy<numDataCopies; copy++)
  435. {
  436. // Note this code is a little confusing - easy to get the cyclic offset backwards
  437. // cyclic offset means node n+offset has copy 2 for channel n, so node n has copy 2 for channel n-offset
  438. for (unsigned i=0; i<numNodes; i++)
  439. {
  440. int channel = (int)i+1 - (copy * cyclicOffset);
  441. while (channel < 1)
  442. channel = channel + numNodes;
  443. RoxieEndpointInfo agent = {RoxieEndpointInfo::RoxieAgent, (unsigned) channel, { (unsigned short) ccdMulticastPort, nodeTable.item(i) }, copy};
  444. allRoles.push_back(agent);
  445. }
  446. }
  447. }
  448. else if (strnicmp(agentConfig, "overloaded", 10) == 0)
  449. {
  450. if (!channelsPerNode)
  451. throw MakeStringException(MSGAUD_operator, ROXIE_INVALID_TOPOLOGY, "Invalid topology file - channelsPerNode should be > 0");
  452. calcNumChannels = numNodes * channelsPerNode;
  453. for (unsigned copy=0; copy<channelsPerNode; copy++)
  454. {
  455. for (unsigned i=0; i<numNodes; i++)
  456. {
  457. unsigned channel = i+1;
  458. RoxieEndpointInfo agent = {RoxieEndpointInfo::RoxieAgent, channel, { (unsigned short) ccdMulticastPort, nodeTable.item(i) }, copy};
  459. allRoles.push_back(agent);
  460. channel += numNodes;
  461. }
  462. }
  463. }
  464. else // 'Full redundancy' or 'simple' mode
  465. {
  466. if (numNodes % numDataCopies)
  467. throw MakeStringException(MSGAUD_operator, ROXIE_INVALID_TOPOLOGY, "Invalid topology file - numChannels not an integer");
  468. calcNumChannels = numNodes / numDataCopies;
  469. unsigned channel = 1;
  470. for (unsigned i=0; i<numNodes; i++)
  471. {
  472. RoxieEndpointInfo agent = {RoxieEndpointInfo::RoxieAgent, channel, { (unsigned short) ccdMulticastPort, nodeTable.item(i) }, 0 };
  473. allRoles.push_back(agent);
  474. channel++;
  475. if (channel > calcNumChannels)
  476. channel = 1;
  477. }
  478. }
  479. if (numChannels && numChannels != calcNumChannels)
  480. throw MakeStringException(MSGAUD_operator, ROXIE_INVALID_TOPOLOGY, "Invalid topology file - numChannels calculated at %u but specified as %u", calcNumChannels, numChannels);
  481. if (!calcNumChannels)
  482. throw MakeStringException(MSGAUD_operator, ROXIE_INVALID_TOPOLOGY, "Invalid topology file - numChannels calculated at 0");
  483. if (calcNumChannels > 1 && localAgent)
  484. throw MakeStringException(MSGAUD_operator, ROXIE_INVALID_TOPOLOGY, "Invalid topology file - localAgent requires single channel (%d channels specified)", calcNumChannels);
  485. numChannels = calcNumChannels;
  486. createStaticTopology(allRoles, traceLevel);
  487. }
  488. #endif
  489. int CCD_API roxie_main(int argc, const char *argv[], const char * defaultYaml)
  490. {
  491. #ifndef _CONTAINERIZED
  492. for (unsigned i=0;i<(unsigned)argc;i++) {
  493. if (streq(argv[i],"--daemon") || streq(argv[i],"-d")) {
  494. if (daemon(1,0) || write_pidfile(argv[++i])) {
  495. perror("Failed to daemonize");
  496. return EXIT_FAILURE;
  497. }
  498. break;
  499. }
  500. }
  501. #endif
  502. EnableSEHtoExceptionMapping();
  503. setTerminateOnSEH();
  504. init_signals();
  505. // We need to do the above BEFORE we call InitModuleObjects
  506. try
  507. {
  508. InitModuleObjects();
  509. }
  510. catch (IException *E)
  511. {
  512. EXCLOG(E);
  513. E->Release();
  514. return EXIT_FAILURE;
  515. }
  516. init_signals();
  517. for (unsigned i=0; i<(unsigned)argc; i++)
  518. {
  519. if (stricmp(argv[i], "--help")==0 ||
  520. stricmp(argv[i], "-h")==0)
  521. {
  522. roxie_common_usage(argv[0]);
  523. return EXIT_SUCCESS;
  524. }
  525. else if (strsame(argv[i], "-xml")) // Back compatibility
  526. argv[i] = "--xml";
  527. else if (strsame(argv[i], "-csv"))
  528. argv[i] = "--csv";
  529. else if (strsame(argv[i], "-raw"))
  530. argv[i] = "--raw";
  531. }
  532. #ifdef _USE_CPPUNIT
  533. if (argc>=2 && (stricmp(argv[1], "-selftest")==0 || stricmp(argv[1], "--selftest")==0))
  534. {
  535. selfTestMode = true;
  536. queryStderrLogMsgHandler()->setMessageFields(MSGFIELD_time | MSGFIELD_milliTime | MSGFIELD_prefix);
  537. CppUnit::TextUi::TestRunner runner;
  538. if (argc==2)
  539. {
  540. CppUnit::TestFactoryRegistry &registry = CppUnit::TestFactoryRegistry::getRegistry();
  541. runner.addTest( registry.makeTest() );
  542. }
  543. else
  544. {
  545. // MORE - maybe add a 'list' function here?
  546. for (int name = 2; name < argc; name++)
  547. {
  548. if (stricmp(argv[name], "-q")==0)
  549. {
  550. traceLevel = 0;
  551. roxiemem::setMemTraceLevel(0);
  552. removeLog();
  553. }
  554. else
  555. {
  556. CppUnit::TestFactoryRegistry &registry = CppUnit::TestFactoryRegistry::getRegistry(argv[name]);
  557. runner.addTest( registry.makeTest() );
  558. }
  559. }
  560. }
  561. bool wasSucessful = runner.run( "", false );
  562. releaseAtoms();
  563. return wasSucessful;
  564. }
  565. #endif
  566. #ifdef _DEBUG
  567. #ifdef _WIN32
  568. _CrtSetAllocHook(myhook);
  569. #endif
  570. #endif
  571. #ifndef __64BIT__
  572. // Restrict stack sizes on 32-bit systems
  573. Thread::setDefaultStackSize(0x10000); // NB under windows requires linker setting (/stack:)
  574. #endif
  575. srand( (unsigned)time( NULL ) );
  576. char currentDirectory[_MAX_DIR];
  577. if (!getcwd(currentDirectory, sizeof(currentDirectory)))
  578. throw MakeStringException(ROXIE_INTERNAL_ERROR, "getcwd failed (%d)", errno);
  579. codeDirectory.set(currentDirectory);
  580. addNonEmptyPathSepChar(codeDirectory);
  581. try
  582. {
  583. Owned<IFile> sentinelFile = createSentinelTarget();
  584. removeSentinelFile(sentinelFile);
  585. topologyFile.append(codeDirectory).append(PATHSEPCHAR).append("RoxieTopology.xml");
  586. useOldTopology = checkFileExists(topologyFile.str());
  587. topology = loadConfiguration(useOldTopology ? nullptr : defaultYaml, argv, "roxie", "ROXIE", topologyFile, nullptr, "@netAddress");
  588. saveTopology();
  589. localAgent = topology->getPropBool("@localAgent", topology->getPropBool("@localSlave", false)); // legacy name
  590. encryptInTransit = topology->getPropBool("@encryptInTransit", false) && !localAgent;
  591. numChannels = topology->getPropInt("@numChannels", 0);
  592. #ifdef _CONTAINERIZED
  593. if (!numChannels)
  594. throw makeStringException(MSGAUD_operator, ROXIE_INVALID_TOPOLOGY, "Invalid topology file - numChannels not set");
  595. #endif
  596. const char *channels = topology->queryProp("@channels");
  597. if (channels)
  598. {
  599. StringArray channelSpecs;
  600. channelSpecs.appendList(channels, ",", true);
  601. ForEachItemIn(idx, channelSpecs)
  602. {
  603. char *tail = nullptr;
  604. unsigned channel = strtoul(channelSpecs.item(idx), &tail, 10);
  605. unsigned repl = 0;
  606. if (*tail==':')
  607. {
  608. tail++;
  609. repl = atoi(tail);
  610. }
  611. else if (*tail)
  612. throw makeStringExceptionV(ROXIE_INTERNAL_ERROR, "Invalid channel specification %s", channels);
  613. agentChannels.push_back(std::pair<unsigned, unsigned>(channel, repl));
  614. }
  615. }
  616. #ifdef _CONTAINERIZED
  617. else if (localAgent)
  618. {
  619. for (unsigned channel = 1; channel <= numChannels; channel++)
  620. agentChannels.push_back(std::pair<unsigned, unsigned>(channel, 0));
  621. }
  622. #endif
  623. const char *topos = topology->queryProp("@topologyServers");
  624. StringArray topoValues;
  625. if (topos)
  626. topoValues.appendList(topos, ",", true);
  627. if (topology->hasProp("PreferredCluster"))
  628. {
  629. preferredClusters = new MapStringTo<int>(true);
  630. Owned<IPropertyTreeIterator> clusters = topology->getElements("PreferredCluster");
  631. ForEach(*clusters)
  632. {
  633. IPropertyTree &child = clusters->query();
  634. const char *name = child.queryProp("@name");
  635. int priority = child.getPropInt("@priority", 100);
  636. if (name && *name)
  637. preferredClusters->setValue(name, priority);
  638. }
  639. }
  640. topology->getProp("@name", roxieName);
  641. if (roxieName.length())
  642. setStatisticsComponentName(SCTroxie, roxieName, true);
  643. else
  644. setStatisticsComponentName(SCTroxie, "roxie", true);
  645. installDefaultFileHooks(topology);
  646. Owned<const IQueryDll> standAloneDll;
  647. const char *wuid = topology->queryProp("@workunit");
  648. if (wuid)
  649. setDefaultJobId(wuid);
  650. if (topology->hasProp("@loadWorkunit"))
  651. {
  652. StringBuffer workunitName;
  653. topology->getProp("@loadWorkunit", workunitName);
  654. standAloneDll.setown(createQueryDll(workunitName));
  655. }
  656. else
  657. {
  658. Owned<ILoadedDllEntry> dll = createExeDllEntry(argv[0]);
  659. if (checkEmbeddedWorkUnitXML(dll))
  660. standAloneDll.setown(createExeQueryDll(argv[0]));
  661. }
  662. if (standAloneDll || wuid)
  663. {
  664. oneShotRoxie = true;
  665. if (wuid)
  666. DBGLOG("Starting roxie - wuid=%s", wuid);
  667. allFilesDynamic = true;
  668. if (topology->getPropBool("@server", false))
  669. {
  670. #ifdef _CONTAINERIZED
  671. if (!topology->getCount("services"))
  672. {
  673. // Makes debugging easier...
  674. IPropertyTree *service = topology->addPropTree("services");
  675. service->setProp("@name", "query");
  676. service->setPropInt("@port", 9876);
  677. }
  678. #else
  679. if (!topology->getCount("RoxieFarmProcess"))
  680. {
  681. // Makes debugging easier...
  682. IPropertyTree *service = topology->addPropTree("RoxieFarmProcess");
  683. service->setPropInt("@port", 9876);
  684. }
  685. #endif
  686. runOnce = false;
  687. }
  688. else
  689. runOnce = true;
  690. }
  691. if (!topology->hasProp("@resolveLocally"))
  692. topology->setPropBool("@resolveLocally", !topology->hasProp("@daliServers"));
  693. traceLevel = topology->getPropInt("@traceLevel", runOnce ? 0 : 1);
  694. if (traceLevel > MAXTRACELEVEL)
  695. traceLevel = MAXTRACELEVEL;
  696. if (traceLevel && topology->hasProp("logging/@disabled"))
  697. topology->setPropBool("logging/@disabled", false);
  698. udpStatsReportInterval = topology->getPropInt("@udpStatsReportInterval", traceLevel ? 60000 : 0);
  699. udpTraceFlow = topology->getPropBool("@udpTraceFlow", false);
  700. udpTraceTimeouts = topology->getPropBool("@udpTraceTimeouts", false);
  701. udpTraceLevel = topology->getPropInt("@udpTraceLevel", runOnce ? 0 : 1);
  702. roxiemem::setMemTraceLevel(topology->getPropInt("@memTraceLevel", runOnce ? 0 : 1));
  703. soapTraceLevel = topology->getPropInt("@soapTraceLevel", runOnce ? 0 : 1);
  704. miscDebugTraceLevel = topology->getPropInt("@miscDebugTraceLevel", 0);
  705. Linked<IPropertyTree> directoryTree = topology->queryPropTree("Directories");
  706. if (!directoryTree)
  707. {
  708. Owned<IPropertyTree> envFile = getHPCCEnvironment();
  709. if (envFile)
  710. directoryTree.set(envFile->queryPropTree("Software/Directories"));
  711. }
  712. if (directoryTree)
  713. {
  714. getConfigurationDirectory(directoryTree, "query", "roxie", roxieName, queryDirectory);
  715. for (unsigned replicationLevel = 0; replicationLevel < MAX_REPLICATION_LEVELS; replicationLevel++)
  716. {
  717. StringBuffer dataDir;
  718. StringBuffer dirId("data");
  719. if (replicationLevel)
  720. dirId.append(replicationLevel+1);
  721. if (getConfigurationDirectory(directoryTree, dirId, "roxie", roxieName, dataDir))
  722. setBaseDirectory(dataDir, replicationLevel, DFD_OSdefault);
  723. }
  724. }
  725. directoryTree.clear();
  726. //Logging stuff
  727. #ifndef _CONTAINERIZED
  728. if (topology->getPropBool("@stdlog", traceLevel != 0) || topology->getPropBool("@forceStdLog", false))
  729. queryStderrLogMsgHandler()->setMessageFields(MSGFIELD_time | MSGFIELD_milliTime | MSGFIELD_thread | MSGFIELD_prefix);
  730. else
  731. removeLog();
  732. if (topology->hasProp("@logfile"))
  733. {
  734. Owned<IComponentLogFileCreator> lf = createComponentLogFileCreator(topology, "roxie");
  735. lf->setMaxDetail(TopDetail);
  736. lf->beginLogging();
  737. logDirectory.set(lf->queryLogDir());
  738. #ifdef _DEBUG
  739. unsigned useLogQueue = topology->getPropBool("@useLogQueue", false);
  740. #else
  741. unsigned useLogQueue = topology->getPropBool("@useLogQueue", true);
  742. #endif
  743. if (useLogQueue)
  744. {
  745. unsigned logQueueLen = topology->getPropInt("@logQueueLen", 512);
  746. unsigned logQueueDrop = topology->getPropInt("@logQueueDrop", 32);
  747. queryLogMsgManager()->enterQueueingMode();
  748. queryLogMsgManager()->setQueueDroppingLimit(logQueueLen, logQueueDrop);
  749. }
  750. if (topology->getPropBool("@enableSysLog",true))
  751. UseSysLogForOperatorMessages();
  752. }
  753. #else
  754. setupContainerizedLogMsgHandler();
  755. #endif
  756. roxieMetrics.setown(createRoxieMetricsManager());
  757. Owned<IPropertyTreeIterator> userMetrics = topology->getElements("./UserMetric");
  758. ForEach(*userMetrics)
  759. {
  760. IPropertyTree &metric = userMetrics->query();
  761. const char *name = metric.queryProp("@name");
  762. const char *regex= metric.queryProp("@regex");
  763. if (name && regex)
  764. roxieMetrics->addUserMetric(name, regex);
  765. else
  766. throw MakeStringException(ROXIE_INTERNAL_ERROR, "Invalid UserMetric element in topology file - name or regex missing");
  767. }
  768. restarts = topology->getPropInt("@restarts", 0);
  769. const char *preferredSubnet = topology->queryProp("@preferredSubnet");
  770. if (preferredSubnet)
  771. {
  772. const char *preferredSubnetMask = topology->queryProp("@preferredSubnetMask");
  773. if (!preferredSubnetMask) preferredSubnetMask = "255.255.255.0";
  774. if (!setPreferredSubnet(preferredSubnet, preferredSubnetMask))
  775. throw MakeStringException(ROXIE_INTERNAL_ERROR, "Error setting preferred subnet %s mask %s", preferredSubnet, preferredSubnetMask);
  776. }
  777. if (restarts)
  778. {
  779. if (traceLevel)
  780. DBGLOG("Roxie restarting: restarts = %d build = %s", restarts.load(), hpccBuildTag);
  781. setStartRuid(restarts);
  782. }
  783. else
  784. {
  785. if (traceLevel)
  786. {
  787. DBGLOG("Roxie starting, build = %s", hpccBuildTag);
  788. }
  789. }
  790. headRegionSize = topology->getPropInt("@headRegionSize", 50);
  791. ccdMulticastPort = topology->getPropInt("@multicastPort", CCD_MULTICAST_PORT);
  792. statsExpiryTime = topology->getPropInt("@statsExpiryTime", 3600);
  793. roxiemem::setMemTraceSizeLimit((memsize_t) topology->getPropInt64("@memTraceSizeLimit", 0));
  794. callbackRetries = topology->getPropInt("@callbackRetries", 3);
  795. callbackTimeout = topology->getPropInt("@callbackTimeout", 5000);
  796. lowTimeout = topology->getPropInt("@lowTimeout", 10000);
  797. highTimeout = topology->getPropInt("@highTimeout", 2000);
  798. slaTimeout = topology->getPropInt("@slaTimeout", 2000);
  799. parallelLoopFlowLimit = topology->getPropInt("@parallelLoopFlowLimit", 100);
  800. perChannelFlowLimit = topology->getPropInt("@perChannelFlowLimit", 10);
  801. copyResources = topology->getPropBool("@copyResources", true);
  802. useRemoteResources = topology->getPropBool("@useRemoteResources", true);
  803. checkFileDate = topology->getPropBool("@checkFileDate", true);
  804. const char *lazyOpenMode = topology->queryProp("@lazyOpen");
  805. if (!lazyOpenMode || stricmp(lazyOpenMode, "smart")==0)
  806. lazyOpen = (restarts > 0);
  807. else
  808. lazyOpen = topology->getPropBool("@lazyOpen", false);
  809. bool useNasTranslation = topology->getPropBool("@useNASTranslation", true);
  810. if (useNasTranslation)
  811. {
  812. Owned<IPropertyTree> nas = envGetNASConfiguration(topology);
  813. envInstallNASHooks(nas);
  814. }
  815. useAeron = topology->getPropBool("@useAeron", false);
  816. doIbytiDelay = topology->getPropBool("@doIbytiDelay", true);
  817. minIbytiDelay = topology->getPropInt("@minIbytiDelay", 2);
  818. initIbytiDelay = topology->getPropInt("@initIbytiDelay", 50);
  819. alwaysTrustFormatCrcs = topology->getPropBool("@alwaysTrustFormatCrcs", true);
  820. allFilesDynamic = topology->getPropBool("@allFilesDynamic", false);
  821. lockSuperFiles = topology->getPropBool("@lockSuperFiles", false);
  822. ignoreOrphans = topology->getPropBool("@ignoreOrphans", true);
  823. chunkingHeap = topology->getPropBool("@chunkingHeap", true);
  824. readTimeout = topology->getPropInt("@readTimeout", 300);
  825. logFullQueries = topology->getPropBool("@logFullQueries", false);
  826. debugPermitted = topology->getPropBool("@debugPermitted", true);
  827. blindLogging = topology->getPropBool("@blindLogging", false);
  828. preloadOnceData = topology->getPropBool("@preloadOnceData", true);
  829. reloadRetriesFailed = topology->getPropBool("@reloadRetriesSuspended", true);
  830. #if defined(__linux__) && defined(SYS_ioprio_set)
  831. const char *backgroundCopyClassString = topology->queryProp("@backgroundCopyClass");
  832. if (!isEmptyString(backgroundCopyClassString))
  833. {
  834. if (strieq(backgroundCopyClassString, "best-effort"))
  835. backgroundCopyClass = IOPRIO_CLASS_BE;
  836. else if (strieq(backgroundCopyClassString, "idle"))
  837. backgroundCopyClass = IOPRIO_CLASS_IDLE;
  838. else if (strieq(backgroundCopyClassString, "none"))
  839. backgroundCopyClass = IOPRIO_CLASS_NONE;
  840. else
  841. DBGLOG("Invalid backgroundCopyClass %s specified - ignored", backgroundCopyClassString);
  842. }
  843. backgroundCopyPrio = topology->getPropInt("@backgroundCopyPrio", 0);
  844. if (backgroundCopyPrio >= IOPRIO_BE_NR)
  845. {
  846. DBGLOG("Invalid backgroundCopyPrio %d specified - using %d", backgroundCopyPrio, (int) (IOPRIO_BE_NR-1));
  847. backgroundCopyPrio = IOPRIO_BE_NR-1;
  848. }
  849. else if (backgroundCopyPrio < 0)
  850. {
  851. DBGLOG("Invalid backgroundCopyPrio %d specified - using 0", backgroundCopyPrio);
  852. backgroundCopyPrio = 0;
  853. }
  854. #endif
  855. linuxYield = topology->getPropBool("@linuxYield", false);
  856. traceSmartStepping = topology->getPropBool("@traceSmartStepping", false);
  857. useMemoryMappedIndexes = topology->getPropBool("@useMemoryMappedIndexes", false);
  858. flushJHtreeCacheOnOOM = topology->getPropBool("@flushJHtreeCacheOnOOM", true);
  859. fastLaneQueue = topology->getPropBool("@fastLaneQueue", true);
  860. udpOutQsPriority = topology->getPropInt("@udpOutQsPriority", 0);
  861. udpRetryBusySenders = topology->getPropInt("@udpRetryBusySenders", 0);
  862. // Historically, this was specified in seconds. Assume any value <= 10 is a legacy value specified in seconds!
  863. udpMaxRetryTimedoutReqs = topology->getPropInt("@udpMaxRetryTimedoutReqs", 0);
  864. udpRequestToSendTimeout = topology->getPropInt("@udpRequestToSendTimeout", 0);
  865. if (udpRequestToSendTimeout<=10)
  866. udpRequestToSendTimeout *= 1000;
  867. if (udpRequestToSendTimeout == 0)
  868. {
  869. if (slaTimeout)
  870. {
  871. udpRequestToSendTimeout = (slaTimeout*3) / 4;
  872. if (udpRequestToSendTimeout < 10)
  873. udpRequestToSendTimeout = 10;
  874. }
  875. else
  876. udpRequestToSendTimeout = 5000;
  877. }
  878. udpRequestToSendAckTimeout = topology->getPropInt("@udpRequestToSendAckTimeout", 100);
  879. // MORE: might want to check socket buffer sizes against sys max here instead of udp threads ?
  880. udpSnifferReadThreadPriority = topology->getPropInt("@udpSnifferReadThreadPriority", 3);
  881. udpSnifferSendThreadPriority = topology->getPropInt("@udpSnifferSendThreadPriority", 3);
  882. udpMulticastBufferSize = topology->getPropInt("@udpMulticastBufferSize", 262142);
  883. udpFlowSocketsSize = topology->getPropInt("@udpFlowSocketsSize", 131072);
  884. udpLocalWriteSocketSize = topology->getPropInt("@udpLocalWriteSocketSize", 1024000);
  885. #ifndef _CONTAINERIZED
  886. roxieMulticastEnabled = topology->getPropBool("@roxieMulticastEnabled", true) && !useAeron; // enable use of multicast for sending requests to agents
  887. udpSnifferEnabled = topology->getPropBool("@udpSnifferEnabled", roxieMulticastEnabled);
  888. if (udpSnifferEnabled && !roxieMulticastEnabled)
  889. {
  890. DBGLOG("WARNING: ignoring udpSnifferEnabled setting as multicast not enabled");
  891. udpSnifferEnabled = false;
  892. }
  893. #endif
  894. udpResendEnabled = topology->getPropBool("@udpResendEnabled", true);
  895. udpResendTimeout = topology->getPropInt("@udpResendTimeout", 10); // milliseconds
  896. udpAssumeSequential = topology->getPropBool("@udpAssumeSequential", false);
  897. udpMaxPendingPermits = topology->getPropInt("@udpMaxPendingPermits", 1);
  898. int ttlTmp = topology->getPropInt("@multicastTTL", 1);
  899. if (ttlTmp < 0)
  900. {
  901. multicastTTL = 1;
  902. IWARNLOG("multicastTTL value (%d) invalid, must be >=0, resetting to %u", ttlTmp, multicastTTL);
  903. }
  904. else if (ttlTmp > 255)
  905. {
  906. multicastTTL = 255;
  907. IWARNLOG("multicastTTL value (%d) invalid, must be <=%u, resetting to maximum", ttlTmp, multicastTTL);
  908. }
  909. else
  910. multicastTTL = ttlTmp;
  911. indexReadChunkSize = topology->getPropInt("@indexReadChunkSize", 60000);
  912. numAgentThreads = topology->getPropInt("@agentThreads", topology->getPropInt("@slaveThreads", 30)); // legacy name
  913. numServerThreads = topology->getPropInt("@serverThreads", 30);
  914. numRequestArrayThreads = topology->getPropInt("@requestArrayThreads", 5);
  915. maxBlockSize = topology->getPropInt("@maxBlockSize", 10000000);
  916. maxLockAttempts = topology->getPropInt("@maxLockAttempts", 5);
  917. enableHeartBeat = topology->getPropBool("@enableHeartBeat", true);
  918. checkCompleted = topology->getPropBool("@checkCompleted", true);
  919. prestartAgentThreads = topology->getPropBool("@prestartAgentThreads", topology->getPropBool("@prestartSlaveThreads", false)); // legacy name
  920. preabortKeyedJoinsThreshold = topology->getPropInt("@preabortKeyedJoinsThreshold", 100);
  921. preabortIndexReadsThreshold = topology->getPropInt("@preabortIndexReadsThreshold", 100);
  922. defaultMemoryLimit = (memsize_t) topology->getPropInt64("@defaultMemoryLimit", 0);
  923. defaultTimeLimit[0] = (unsigned) topology->getPropInt64("@defaultLowPriorityTimeLimit", 0);
  924. defaultTimeLimit[1] = (unsigned) topology->getPropInt64("@defaultHighPriorityTimeLimit", 0);
  925. defaultTimeLimit[2] = (unsigned) topology->getPropInt64("@defaultSLAPriorityTimeLimit", 0);
  926. defaultWarnTimeLimit[0] = (unsigned) topology->getPropInt64("@defaultLowPriorityTimeWarning", 0);
  927. defaultWarnTimeLimit[1] = (unsigned) topology->getPropInt64("@defaultHighPriorityTimeWarning", 0);
  928. defaultWarnTimeLimit[2] = (unsigned) topology->getPropInt64("@defaultSLAPriorityTimeWarning", 0);
  929. defaultThorConnectTimeout = (unsigned) topology->getPropInt64("@defaultThorConnectTimeout", 60);
  930. continuationCompressThreshold = (unsigned) topology->getPropInt64("@continuationCompressThreshold", 1024);
  931. defaultXmlReadFlags = topology->getPropBool("@defaultStripLeadingWhitespace", true) ? ptr_ignoreWhiteSpace : ptr_none;
  932. defaultParallelJoinPreload = topology->getPropInt("@defaultParallelJoinPreload", 0);
  933. defaultConcatPreload = topology->getPropInt("@defaultConcatPreload", 0);
  934. defaultFetchPreload = topology->getPropInt("@defaultFetchPreload", 0);
  935. defaultFullKeyedJoinPreload = topology->getPropInt("@defaultFullKeyedJoinPreload", 0);
  936. defaultKeyedJoinPreload = topology->getPropInt("@defaultKeyedJoinPreload", 0);
  937. defaultPrefetchProjectPreload = topology->getPropInt("@defaultPrefetchProjectPreload", 10);
  938. defaultStrandBlockSize = topology->getPropInt("@defaultStrandBlockSize", 512);
  939. defaultForceNumStrands = topology->getPropInt("@defaultForceNumStrands", 0);
  940. defaultCheckingHeap = topology->getPropBool("@checkingHeap", false); // NOTE - not in configmgr - too dangerous!
  941. defaultDisableLocalOptimizations = topology->getPropBool("@disableLocalOptimizations", false); // NOTE - not in configmgr - too dangerous!
  942. agentQueryReleaseDelaySeconds = topology->getPropInt("@agentQueryReleaseDelaySeconds", topology->getPropInt("@slaveQueryReleaseDelaySeconds", 60)); // legacy name
  943. coresPerQuery = topology->getPropInt("@coresPerQuery", 0);
  944. diskReadBufferSize = topology->getPropInt("@diskReadBufferSize", 0x10000);
  945. fieldTranslationEnabled = RecordTranslationMode::Payload;
  946. const char *val = topology->queryProp("@fieldTranslationEnabled");
  947. if (val)
  948. fieldTranslationEnabled = getTranslationMode(val, false);
  949. pretendAllOpt = topology->getPropBool("@ignoreMissingFiles", false);
  950. memoryStatsInterval = topology->getPropInt("@memoryStatsInterval", 60);
  951. roxiemem::setMemoryStatsInterval(memoryStatsInterval);
  952. pingInterval = topology->getPropInt("@pingInterval", 0);
  953. socketCheckInterval = topology->getPropInt("@socketCheckInterval", runOnce ? 0 : 5000);
  954. memsize_t totalMemoryLimit = (memsize_t) topology->getPropInt64("@totalMemoryLimit", 0);
  955. bool allowHugePages = topology->getPropBool("@heapUseHugePages", false);
  956. bool allowTransparentHugePages = topology->getPropBool("@heapUseTransparentHugePages", true);
  957. bool retainMemory = topology->getPropBool("@heapRetainMemory", false);
  958. if (!totalMemoryLimit)
  959. totalMemoryLimit = 1024 * 0x100000; // 1 Gb;
  960. roxiemem::setTotalMemoryLimit(allowHugePages, allowTransparentHugePages, retainMemory, totalMemoryLimit, 0, NULL, NULL);
  961. traceStartStop = topology->getPropBool("@traceStartStop", false);
  962. watchActivityId = topology->getPropInt("@watchActivityId", 0);
  963. traceRoxiePackets = topology->getPropBool("@traceRoxiePackets", false);
  964. delaySubchannelPackets = topology->getPropBool("@delaySubchannelPackets", false);
  965. IBYTIbufferSize = topology->getPropInt("@IBYTIbufferSize", roxieMulticastEnabled ? 0 : 10);
  966. IBYTIbufferLifetime = topology->getPropInt("@IBYTIbufferLifetime", initIbytiDelay);
  967. traceTranslations = topology->getPropBool("@traceTranslations", true);
  968. defaultTimeActivities = topology->getPropBool("@timeActivities", true);
  969. defaultTraceEnabled = topology->getPropBool("@traceEnabled", false);
  970. defaultTraceLimit = topology->getPropInt("@traceLimit", 10);
  971. clientCert.certificate.set(topology->queryProp("@certificateFileName"));
  972. clientCert.privateKey.set(topology->queryProp("@privateKeyFileName"));
  973. clientCert.passphrase.set(topology->queryProp("@passphrase"));
  974. useHardLink = topology->getPropBool("@useHardLink", false);
  975. maxFileAge[false] = topology->getPropInt("@localFilesExpire", (unsigned) -1);
  976. maxFileAge[true] = topology->getPropInt("@remoteFilesExpire", 60*60*1000);
  977. minFilesOpen[false] = topology->getPropInt("@minLocalFilesOpen", 2000);
  978. minFilesOpen[true] = topology->getPropInt("@minRemoteFilesOpen", 500);
  979. maxFilesOpen[false] = topology->getPropInt("@maxLocalFilesOpen", 4000);
  980. maxFilesOpen[true] = topology->getPropInt("@maxRemoteFilesOpen", 1000);
  981. dafilesrvLookupTimeout = topology->getPropInt("@dafilesrvLookupTimeout", 10000);
  982. setRemoteFileTimeouts(dafilesrvLookupTimeout, 0);
  983. topology->getProp("@daliServers", fileNameServiceDali);
  984. trapTooManyActiveQueries = topology->getPropBool("@trapTooManyActiveQueries", true);
  985. maxEmptyLoopIterations = topology->getPropInt("@maxEmptyLoopIterations", 1000);
  986. maxGraphLoopIterations = topology->getPropInt("@maxGraphLoopIterations", 1000);
  987. mergeAgentStatistics = topology->getPropBool("@mergeAgentStatistics", topology->getPropBool("@mergeSlaveStatistics", true)); // legacy name
  988. defaultCollectFactoryStatistics = topology->getPropBool("@collectFactoryStatistics", true);
  989. defaultNoSeekBuildIndex = topology->getPropBool("@noSeekBuildIndex", isContainerized());
  990. parallelLoadQueries = topology->getPropInt("@parallelLoadQueries", 8);
  991. if (!parallelLoadQueries)
  992. parallelLoadQueries = 1;
  993. alwaysFailOnLeaks = topology->getPropBool("@alwaysFailOnLeaks", false);
  994. enableKeyDiff = topology->getPropBool("@enableKeyDiff", true);
  995. cacheReportPeriodSeconds = topology->getPropInt("@cacheReportPeriodSeconds", 5*60);
  996. // NB: these directories will have been setup by topology earlier
  997. const char *primaryDirectory = queryBaseDirectory(grp_unknown, 0);
  998. const char *secondaryDirectory = queryBaseDirectory(grp_unknown, 1);
  999. // MORE: Get parms from topology after it is populated from Hardware/computer types section in configenv
  1000. // Then if does not match and based on desired action in topolgy, either warn, or fatal exit or .... etc
  1001. getHardwareInfo(hdwInfo, primaryDirectory, secondaryDirectory);
  1002. if (traceLevel)
  1003. {
  1004. DBGLOG("Current Hardware Info: CPUs=%i, speed=%i MHz, Mem=%i MB , primDisk=%i GB, primFree=%i GB, secDisk=%i GB, secFree=%i GB, NIC=%i",
  1005. hdwInfo.numCPUs, hdwInfo.CPUSpeed, hdwInfo.totalMemory,
  1006. hdwInfo.primDiskSize, hdwInfo.primFreeSize, hdwInfo.secDiskSize, hdwInfo.secFreeSize, hdwInfo.NICSpeed);
  1007. }
  1008. parallelAggregate = topology->getPropInt("@parallelAggregate", 0);
  1009. if (!parallelAggregate)
  1010. parallelAggregate = hdwInfo.numCPUs;
  1011. if (!parallelAggregate)
  1012. parallelAggregate = 1;
  1013. simpleLocalKeyedJoins = topology->getPropBool("@simpleLocalKeyedJoins", true);
  1014. inMemoryKeysEnabled = topology->getPropBool("@inMemoryKeysEnabled", true);
  1015. setKeyIndexCacheSize((unsigned)-1); // unbound
  1016. nodeCachePreload = topology->getPropBool("@nodeCachePreload", false);
  1017. setNodeCachePreload(nodeCachePreload);
  1018. nodeCacheMB = topology->getPropInt("@nodeCacheMem", 100);
  1019. setNodeCacheMem(nodeCacheMB * 0x100000);
  1020. leafCacheMB = topology->getPropInt("@leafCacheMem", 50);
  1021. setLeafCacheMem(leafCacheMB * 0x100000);
  1022. blobCacheMB = topology->getPropInt("@blobCacheMem", 0);
  1023. setBlobCacheMem(blobCacheMB * 0x100000);
  1024. unsigned __int64 affinity = topology->getPropInt64("@affinity", 0);
  1025. updateAffinity(affinity);
  1026. minFreeDiskSpace = topology->getPropInt64("@minFreeDiskSpace", (1024 * 0x100000)); // default to 1 GB
  1027. mtu_size = topology->getPropInt("@mtuPayload", 0);
  1028. if (mtu_size)
  1029. {
  1030. if (mtu_size < 1400 || mtu_size > 9000)
  1031. throw MakeStringException(MSGAUD_operator, ROXIE_INVALID_TOPOLOGY, "Invalid settings - mtuPayload should be between 1400 and 9000");
  1032. unsigned alignment = 0x400;
  1033. while (alignment*2 < mtu_size)
  1034. alignment *= 2;
  1035. roxiemem::setDataAlignmentSize(alignment); // smallest power of two under mtuSize
  1036. }
  1037. else if (topology->getPropBool("@jumboFrames", false))
  1038. {
  1039. mtu_size = 9000; // upper limit on outbound buffer size - allow some header room too
  1040. roxiemem::setDataAlignmentSize(0x2000);
  1041. }
  1042. else
  1043. {
  1044. mtu_size = 1400; // upper limit on outbound buffer size - allow some header room too
  1045. roxiemem::setDataAlignmentSize(0x400);
  1046. }
  1047. #ifndef _CONTAINERIZED
  1048. perfMonHook.setown(roxiemem::createRoxieMemStatsPerfMonHook()); // Note - we create even if pinterval is 0, as can be enabled via control message
  1049. unsigned pinterval = topology->getPropInt("@systemMonitorInterval",1000*60);
  1050. if (pinterval)
  1051. startPerformanceMonitor(pinterval, PerfMonStandard, perfMonHook);
  1052. #endif
  1053. topology->getProp("@pluginDirectory", pluginDirectory);
  1054. StringBuffer packageDirectory;
  1055. getPackageFolder(packageDirectory);
  1056. if (pluginDirectory.length() == 0 && packageDirectory.length() != 0)
  1057. {
  1058. pluginDirectory.append(packageDirectory).append("plugins");
  1059. }
  1060. getAdditionalPluginsPath(pluginDirectory, packageDirectory);
  1061. if (queryDirectory.length() == 0)
  1062. {
  1063. topology->getProp("@queryDir", queryDirectory);
  1064. if (queryDirectory.length() == 0)
  1065. queryDirectory.append(codeDirectory).append("queries");
  1066. }
  1067. addNonEmptyPathSepChar(queryDirectory);
  1068. getTempFilePath(tempDirectory, "roxie", topology);
  1069. #ifdef _WIN32
  1070. topology->addPropBool("@linuxOS", false);
  1071. #else
  1072. topology->addPropBool("@linuxOS", true);
  1073. #endif
  1074. if (useAeron)
  1075. setAeronProperties(topology);
  1076. #ifdef _CONTAINERIZED
  1077. allQuerySetNames.append(roxieName);
  1078. #else
  1079. allQuerySetNames.appendListUniq(topology->queryProp("@querySets"), ",");
  1080. #endif
  1081. #ifdef _CONTAINERIZED
  1082. IpAddress myIP(".");
  1083. myNode.setIp(myIP);
  1084. if (topology->getPropBool("@server", true))
  1085. {
  1086. Owned<IPropertyTreeIterator> roxieFarms = topology->getElements("./services");
  1087. ForEach(*roxieFarms)
  1088. {
  1089. IPropertyTree &roxieFarm = roxieFarms->query();
  1090. unsigned port = roxieFarm.getPropInt("@port", ROXIE_SERVER_PORT);
  1091. RoxieEndpointInfo me = {RoxieEndpointInfo::RoxieServer, 0, { (unsigned short) port, myIP }, 0};
  1092. myRoles.push_back(me);
  1093. }
  1094. }
  1095. for (std::pair<unsigned, unsigned> channel: agentChannels)
  1096. {
  1097. myAgentEP.set(ccdMulticastPort, myIP);
  1098. RoxieEndpointInfo me = { RoxieEndpointInfo::RoxieAgent, channel.first, myAgentEP, channel.second };
  1099. myRoles.push_back(me);
  1100. }
  1101. #else
  1102. // Set multicast base addresses - must be done before generating agent channels
  1103. if (roxieMulticastEnabled && !localAgent)
  1104. {
  1105. if (topology->queryProp("@multicastBase"))
  1106. multicastBase.ipset(topology->queryProp("@multicastBase"));
  1107. else
  1108. throw MakeStringException(MSGAUD_operator, ROXIE_INVALID_TOPOLOGY, "Invalid topology file - multicastBase not set");
  1109. if (topology->queryProp("@multicastLast"))
  1110. multicastLast.ipset(topology->queryProp("@multicastLast"));
  1111. else
  1112. throw MakeStringException(MSGAUD_operator, ROXIE_INVALID_TOPOLOGY, "Invalid topology file - multicastLast not set");
  1113. }
  1114. readStaticTopology();
  1115. #endif
  1116. // Now we know all the channels, we can open and subscribe the multicast channels
  1117. if (!localAgent)
  1118. {
  1119. openMulticastSocket();
  1120. if (roxieMulticastEnabled)
  1121. setMulticastEndpoints(numChannels);
  1122. }
  1123. setDaliServixSocketCaching(true); // enable daliservix caching
  1124. enableForceRemoteReads(); // forces file reads to be remote reads if they match environment setting 'forceRemotePattern' pattern.
  1125. if (!oneShotRoxie)
  1126. {
  1127. queryFileCache().start();
  1128. loadPlugins();
  1129. }
  1130. unsigned snifferChannel = numChannels+2; // MORE - why +2 not +1??
  1131. #ifdef _CONTAINERIZED
  1132. initializeTopology(topoValues, myRoles);
  1133. #endif
  1134. createDelayedReleaser();
  1135. globalPackageSetManager = createRoxiePackageSetManager(standAloneDll.getClear());
  1136. globalPackageSetManager->load();
  1137. ROQ = createOutputQueueManager(snifferChannel, numAgentThreads, encryptInTransit);
  1138. ROQ->setHeadRegionSize(headRegionSize);
  1139. ROQ->start();
  1140. Owned<IPacketDiscarder> packetDiscarder = createPacketDiscarder();
  1141. #if defined(WIN32) && defined(_DEBUG) && defined(_DEBUG_HEAP_FULL)
  1142. int tmpFlag = _CrtSetDbgFlag( _CRTDBG_REPORT_FLAG );
  1143. tmpFlag |= _CRTDBG_CHECK_ALWAYS_DF;
  1144. _CrtSetDbgFlag( tmpFlag );
  1145. #endif
  1146. //MORE: I'm not sure where this should go, or how it fits in. Possibly the function needs to be split in two.
  1147. initializeStorageGroups(false);
  1148. EnableSEHtoExceptionMapping();
  1149. setSEHtoExceptionHandler(&abortHandler);
  1150. Owned<IHpccProtocolPluginContext> protocolCtx = new CHpccProtocolPluginCtx();
  1151. if (runOnce)
  1152. {
  1153. if (wuid)
  1154. {
  1155. Owned<IHpccProtocolListener> roxieServer = createRoxieWorkUnitListener(1, false);
  1156. try
  1157. {
  1158. roxieServer->runOnce(wuid);
  1159. }
  1160. catch (IException *E)
  1161. {
  1162. EXCLOG(E);
  1163. E->Release();
  1164. }
  1165. }
  1166. else
  1167. {
  1168. Owned<IHpccProtocolPlugin> protocolPlugin = loadHpccProtocolPlugin(protocolCtx, NULL);
  1169. Owned<IHpccProtocolListener> roxieServer = protocolPlugin->createListener("runOnce", createRoxieProtocolMsgSink(myNode.getIpAddress(), 0, 1, false), 0, 0, NULL);
  1170. try
  1171. {
  1172. const char *format = topology->queryProp("@format");
  1173. if (!format)
  1174. {
  1175. if (topology->getPropBool("@xml", false))
  1176. format = "xml";
  1177. else if (topology->getPropBool("@csv", false))
  1178. format = "csv";
  1179. else if (topology->getPropBool("@raw", false))
  1180. format = "raw";
  1181. else
  1182. format = "ascii";
  1183. }
  1184. StringBuffer query;
  1185. query.appendf("<roxie format='%s'/>", format);
  1186. roxieServer->runOnce(query.str()); // MORE - should use the wu listener instead I suspect
  1187. fflush(stdout); // in windows if output is redirected results don't appear without flushing
  1188. }
  1189. catch (IException *E)
  1190. {
  1191. EXCLOG(E);
  1192. E->Release();
  1193. }
  1194. }
  1195. }
  1196. else
  1197. {
  1198. try
  1199. {
  1200. #ifdef _CONTAINERIZED
  1201. Owned<IPropertyTreeIterator> roxieFarms = topology->getElements("./services");
  1202. #else
  1203. Owned<IPropertyTreeIterator> roxieFarms = topology->getElements("./RoxieFarmProcess");
  1204. #endif
  1205. ForEach(*roxieFarms)
  1206. {
  1207. IPropertyTree &roxieFarm = roxieFarms->query();
  1208. unsigned listenQueue = roxieFarm.getPropInt("@listenQueue", DEFAULT_LISTEN_QUEUE_SIZE);
  1209. unsigned numThreads = roxieFarm.getPropInt("@numThreads", 0);
  1210. if (!numThreads)
  1211. numThreads = numServerThreads;
  1212. unsigned port = roxieFarm.getPropInt("@port", ROXIE_SERVER_PORT);
  1213. //unsigned requestArrayThreads = roxieFarm.getPropInt("@requestArrayThreads", 5);
  1214. // NOTE: farmer name [@name=] is not copied into topology
  1215. const IpAddress ip = myNode.getIpAddress();
  1216. if (!roxiePort)
  1217. {
  1218. roxiePort = port;
  1219. debugEndpoint.set(roxiePort, ip);
  1220. }
  1221. bool suspended = roxieFarm.getPropBool("@suspended", false);
  1222. Owned <IHpccProtocolListener> roxieServer;
  1223. if (port)
  1224. {
  1225. const char *protocol = roxieFarm.queryProp("@protocol");
  1226. StringBuffer certFileName;
  1227. StringBuffer keyFileName;
  1228. StringBuffer passPhraseStr;
  1229. if (protocol && streq(protocol, "ssl"))
  1230. {
  1231. #ifdef _USE_OPENSSL
  1232. const char *certFile = roxieFarm.queryProp("@certificateFileName");
  1233. if (!certFile)
  1234. throw MakeStringException(ROXIE_FILE_ERROR, "Roxie SSL Farm Listener on port %d missing certificateFileName tag", port);
  1235. if (isAbsolutePath(certFile))
  1236. certFileName.append(certFile);
  1237. else
  1238. certFileName.append(codeDirectory.str()).append(certFile);
  1239. if (!checkFileExists(certFileName.str()))
  1240. throw MakeStringException(ROXIE_FILE_ERROR, "Roxie SSL Farm Listener on port %d missing certificateFile (%s)", port, certFileName.str());
  1241. const char *keyFile = roxieFarm.queryProp("@privateKeyFileName");
  1242. if (!keyFile)
  1243. throw MakeStringException(ROXIE_FILE_ERROR, "Roxie SSL Farm Listener on port %d missing privateKeyFileName tag", port);
  1244. if (isAbsolutePath(keyFile))
  1245. keyFileName.append(keyFile);
  1246. else
  1247. keyFileName.append(codeDirectory.str()).append(keyFile);
  1248. if (!checkFileExists(keyFileName.str()))
  1249. throw MakeStringException(ROXIE_FILE_ERROR, "Roxie SSL Farm Listener on port %d missing privateKeyFile (%s)", port, keyFileName.str());
  1250. const char *passPhrase = roxieFarm.queryProp("@passphrase");
  1251. if (!isEmptyString(passPhrase))
  1252. decrypt(passPhraseStr, passPhrase);
  1253. #else
  1254. OWARNLOG("Skipping Roxie SSL Farm Listener on port %d : OpenSSL disabled in build", port);
  1255. continue;
  1256. #endif
  1257. }
  1258. const char *soname = roxieFarm.queryProp("@so");
  1259. const char *config = roxieFarm.queryProp("@config");
  1260. Owned<IHpccProtocolPlugin> protocolPlugin = ensureProtocolPlugin(*protocolCtx, soname);
  1261. roxieServer.setown(protocolPlugin->createListener(protocol ? protocol : "native", createRoxieProtocolMsgSink(ip, port, numThreads, suspended), port, listenQueue, config, certFileName.str(), keyFileName.str(), passPhraseStr.str()));
  1262. }
  1263. else
  1264. roxieServer.setown(createRoxieWorkUnitListener(numThreads, suspended));
  1265. IHpccProtocolMsgSink *sink = roxieServer->queryMsgSink();
  1266. const char *aclName = roxieFarm.queryProp("@aclName");
  1267. if (aclName && *aclName)
  1268. {
  1269. Owned<IPropertyTree> aclInfo = createPTree("AccessInfo", ipt_lowmem);
  1270. getAccessList(aclName, topology, aclInfo);
  1271. Owned<IPropertyTreeIterator> accesses = aclInfo->getElements("Access");
  1272. ForEach(*accesses)
  1273. {
  1274. IPropertyTree &access = accesses->query();
  1275. try
  1276. {
  1277. sink->addAccess(access.getPropBool("@allow", true), access.getPropBool("@allowBlind", true), access.queryProp("@ip"), access.queryProp("@mask"), access.queryProp("@query"), access.queryProp("@error"), access.getPropInt("@errorCode"));
  1278. }
  1279. catch (IException *E)
  1280. {
  1281. StringBuffer s, x;
  1282. E->errorMessage(s);
  1283. E->Release();
  1284. toXML(&access, x, 0, 0);
  1285. throw MakeStringException(ROXIE_ACL_ERROR, "Error in access statement %s: %s", x.str(), s.str());
  1286. }
  1287. }
  1288. }
  1289. socketListeners.append(*roxieServer.getLink());
  1290. time(&startupTime);
  1291. roxieServer->start();
  1292. }
  1293. #ifdef _CONTAINERIZED
  1294. queryFileCache().loadSavedOsCacheInfo();
  1295. queryFileCache().startCacheReporter();
  1296. publishTopology(traceLevel);
  1297. #endif
  1298. writeSentinelFile(sentinelFile);
  1299. DBGLOG("Startup completed - LPT=%u APT=%u", queryNumLocalTrees(), queryNumAtomTrees());
  1300. DBGLOG("Waiting for queries");
  1301. if (pingInterval)
  1302. startPingTimer();
  1303. LocalIAbortHandler abortHandler(waiter);
  1304. waiter.wait();
  1305. }
  1306. catch (IException *E)
  1307. {
  1308. StringBuffer x;
  1309. IERRLOG("EXCEPTION: (%d): %s", E->errorCode(), E->errorMessage(x).str());
  1310. E->Release();
  1311. }
  1312. }
  1313. DBGLOG("Roxie closing down");
  1314. shuttingDown = true;
  1315. if (pingInterval)
  1316. stopPingTimer();
  1317. setSEHtoExceptionHandler(NULL);
  1318. while (socketListeners.isItem(0))
  1319. {
  1320. socketListeners.item(0).stop();
  1321. socketListeners.remove(0);
  1322. }
  1323. packetDiscarder->stop();
  1324. packetDiscarder.clear();
  1325. ROQ->stop();
  1326. ROQ->join();
  1327. ROQ->Release();
  1328. ROQ = NULL;
  1329. }
  1330. catch (IException *E)
  1331. {
  1332. StringBuffer x;
  1333. IERRLOG("EXCEPTION: (%d): %s", E->errorCode(), E->errorMessage(x).str());
  1334. if (!queryLogMsgManager()->isActiveMonitor(queryStderrLogMsgHandler()))
  1335. fprintf(stderr, "EXCEPTION: (%d): %s\n", E->errorCode(), x.str());
  1336. E->Release();
  1337. }
  1338. roxieMetrics.clear();
  1339. #ifndef _CONTAINERIZED
  1340. stopPerformanceMonitor();
  1341. #endif
  1342. ::Release(globalPackageSetManager);
  1343. globalPackageSetManager = NULL;
  1344. stopDelayedReleaser();
  1345. cleanupPlugins();
  1346. unloadHpccProtocolPlugin();
  1347. closeMulticastSockets();
  1348. releaseAgentDynamicFileCache();
  1349. releaseRoxieStateCache();
  1350. setDaliServixSocketCaching(false); // make sure it cleans up or you get bogus memleak reports
  1351. setNodeCaching(false); // ditto
  1352. #ifndef _CONTAINERIZED
  1353. perfMonHook.clear();
  1354. #endif
  1355. stopAeronDriver();
  1356. stopTopoThread();
  1357. strdup("Make sure leak checking is working");
  1358. roxiemem::releaseRoxieHeap();
  1359. UseSysLogForOperatorMessages(false);
  1360. ExitModuleObjects();
  1361. releaseAtoms();
  1362. strdup("Make sure leak checking is working");
  1363. #ifdef _WIN32
  1364. #ifdef _DEBUG
  1365. #if 1
  1366. StringBuffer leakFileDir(logDirectory.str());
  1367. leakFileDir.append("roxieleaks.log");
  1368. HANDLE h = CreateFile(leakFileDir.str(), GENERIC_READ|GENERIC_WRITE, 0, NULL, CREATE_ALWAYS, 0, 0);
  1369. _CrtSetReportMode( _CRT_WARN, _CRTDBG_MODE_FILE|_CRTDBG_MODE_DEBUG);
  1370. _CrtSetReportFile( _CRT_WARN, h);
  1371. _CrtSetReportMode( _CRT_ERROR, _CRTDBG_MODE_FILE|_CRTDBG_MODE_DEBUG);
  1372. _CrtSetReportFile( _CRT_ERROR, h);
  1373. _CrtSetReportMode( _CRT_ASSERT, _CRTDBG_MODE_FILE|_CRTDBG_MODE_DEBUG);
  1374. _CrtSetReportFile( _CRT_ASSERT, h);
  1375. // _CrtDumpMemoryLeaks(); if you uncomment these lines you get to see the leaks sooner (so can look in debugger at full memory)
  1376. // CloseHandle(h); but there will be additional leaks reported that are not really leaks
  1377. #endif
  1378. #endif
  1379. #endif
  1380. return 0;
  1381. }
  1382. // These defaults only apply when roxie is linked into a standalone executable
  1383. // Note that the defaults for roxie executable (in whatever mode) are set in roxie.cpp
  1384. static constexpr const char * standaloneDefaultYaml = R"!!(
  1385. version: "1.0"
  1386. roxie:
  1387. allFilesDynamic: true
  1388. localAgent: true
  1389. numChannels: 1
  1390. queueNames: roxie.roxie
  1391. traceLevel: 0
  1392. server: false
  1393. logging:
  1394. disabled: true
  1395. )!!";
  1396. int STARTQUERY_API start_query(int argc, const char *argv[])
  1397. {
  1398. return roxie_main(argc, argv, standaloneDefaultYaml);
  1399. }