ccdmain.cpp 59 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201120212031204120512061207120812091210121112121213121412151216121712181219122012211222122312241225122612271228122912301231123212331234123512361237123812391240124112421243124412451246124712481249125012511252125312541255125612571258125912601261126212631264126512661267126812691270127112721273127412751276127712781279128012811282128312841285128612871288128912901291129212931294129512961297129812991300130113021303130413051306130713081309131013111312131313141315131613171318131913201321132213231324132513261327132813291330133113321333133413351336133713381339134013411342134313441345134613471348134913501351135213531354135513561357135813591360136113621363136413651366136713681369137013711372137313741375137613771378137913801381138213831384138513861387138813891390139113921393139413951396139713981399140014011402140314041405140614071408
  1. /*##############################################################################
  2. HPCC SYSTEMS software Copyright (C) 2012 HPCC Systems®.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. ############################################################################## */
  13. #include <platform.h>
  14. #include <signal.h>
  15. #include <algorithm>
  16. #include <jlib.hpp>
  17. #include <jio.hpp>
  18. #include <jmisc.hpp>
  19. #include <jqueue.tpp>
  20. #include <jsocket.hpp>
  21. #include <jlog.hpp>
  22. #include <jprop.hpp>
  23. #include <jfile.hpp>
  24. #include <jencrypt.hpp>
  25. #include "jutil.hpp"
  26. #include <build-config.h>
  27. #include <udptopo.hpp>
  28. #include "rtlformat.hpp"
  29. #include "dalienv.hpp"
  30. #include "rmtfile.hpp"
  31. #include "ccd.hpp"
  32. #include "ccdquery.hpp"
  33. #include "ccdstate.hpp"
  34. #include "ccdqueue.ipp"
  35. #include "ccdserver.hpp"
  36. #include "ccdlistener.hpp"
  37. #include "ccdsnmp.hpp"
  38. #include "thorplugin.hpp"
  39. #if defined (__linux__)
  40. #include <sys/syscall.h>
  41. #include "ioprio.h"
  42. #endif
  43. #ifdef _USE_CPPUNIT
  44. #include <cppunit/extensions/TestFactoryRegistry.h>
  45. #include <cppunit/ui/text/TestRunner.h>
  46. #endif
  47. //=================================================================================
  48. bool shuttingDown = false;
  49. unsigned numChannels;
  50. unsigned callbackRetries = 3;
  51. unsigned callbackTimeout = 5000;
  52. unsigned lowTimeout = 10000;
  53. unsigned highTimeout = 2000;
  54. unsigned slaTimeout = 2000;
  55. unsigned numServerThreads = 30;
  56. unsigned numSlaveThreads = 30;
  57. bool prestartSlaveThreads = false;
  58. unsigned numRequestArrayThreads = 5;
  59. unsigned headRegionSize;
  60. unsigned ccdMulticastPort;
  61. bool enableHeartBeat = true;
  62. unsigned parallelLoopFlowLimit = 100;
  63. unsigned perChannelFlowLimit = 10;
  64. time_t startupTime;
  65. unsigned statsExpiryTime = 3600;
  66. unsigned miscDebugTraceLevel = 0; // separate trace settings purely for debugging specific items (i.e. all possible locations to look for files at startup)
  67. unsigned readTimeout = 300;
  68. unsigned indexReadChunkSize = 60000;
  69. unsigned maxBlockSize = 10000000;
  70. unsigned maxLockAttempts = 5;
  71. bool pretendAllOpt = false;
  72. bool traceStartStop = false;
  73. bool traceServerSideCache = false;
  74. bool defaultTimeActivities = true;
  75. bool defaultTraceEnabled = false;
  76. bool traceTranslations = true;
  77. unsigned defaultTraceLimit = 10;
  78. unsigned watchActivityId = 0;
  79. unsigned testSlaveFailure = 0;
  80. RelaxedAtomic<unsigned> restarts;
  81. RecordTranslationMode fieldTranslationEnabled = RecordTranslationMode::Payload;
  82. bool mergeSlaveStatistics = true;
  83. PTreeReaderOptions defaultXmlReadFlags = ptr_ignoreWhiteSpace;
  84. bool runOnce = false;
  85. unsigned udpMulticastBufferSize = 262142;
  86. bool roxieMulticastEnabled = true;
  87. IPropertyTree *topology;
  88. MapStringTo<int> *preferredClusters;
  89. StringBuffer topologyFile;
  90. CriticalSection ccdChannelsCrit;
  91. StringArray allQuerySetNames;
  92. bool alwaysTrustFormatCrcs;
  93. bool allFilesDynamic;
  94. bool lockSuperFiles;
  95. bool useRemoteResources;
  96. bool checkFileDate;
  97. bool lazyOpen;
  98. bool localSlave;
  99. bool useAeron;
  100. bool ignoreOrphans;
  101. bool doIbytiDelay = true;
  102. bool copyResources;
  103. bool enableKeyDiff = true;
  104. bool chunkingHeap = true;
  105. bool logFullQueries;
  106. bool blindLogging = false;
  107. bool debugPermitted = true;
  108. bool checkCompleted = true;
  109. unsigned preabortKeyedJoinsThreshold = 100;
  110. unsigned preabortIndexReadsThreshold = 100;
  111. bool preloadOnceData;
  112. bool reloadRetriesFailed;
  113. bool selfTestMode = false;
  114. bool defaultCollectFactoryStatistics = true;
  115. int backgroundCopyClass = 0;
  116. int backgroundCopyPrio = 0;
  117. unsigned memoryStatsInterval = 0;
  118. memsize_t defaultMemoryLimit;
  119. unsigned defaultTimeLimit[3] = {0, 0, 0};
  120. unsigned defaultWarnTimeLimit[3] = {0, 5000, 5000};
  121. unsigned defaultThorConnectTimeout;
  122. unsigned defaultParallelJoinPreload = 0;
  123. unsigned defaultPrefetchProjectPreload = 10;
  124. unsigned defaultConcatPreload = 0;
  125. unsigned defaultFetchPreload = 0;
  126. unsigned defaultFullKeyedJoinPreload = 0;
  127. unsigned defaultKeyedJoinPreload = 0;
  128. unsigned dafilesrvLookupTimeout = 10000;
  129. bool defaultCheckingHeap = false;
  130. bool defaultDisableLocalOptimizations = false;
  131. unsigned defaultStrandBlockSize = 512;
  132. unsigned defaultForceNumStrands = 0;
  133. unsigned defaultHeapFlags = roxiemem::RHFnone;
  134. unsigned slaveQueryReleaseDelaySeconds = 60;
  135. unsigned coresPerQuery = 0;
  136. unsigned logQueueLen;
  137. unsigned logQueueDrop;
  138. bool useLogQueue;
  139. bool fastLaneQueue;
  140. unsigned mtu_size = 1400; // upper limit on outbound buffer size - allow some header room too
  141. StringBuffer fileNameServiceDali;
  142. StringBuffer roxieName;
  143. bool trapTooManyActiveQueries;
  144. unsigned maxEmptyLoopIterations;
  145. unsigned maxGraphLoopIterations;
  146. bool probeAllRows;
  147. bool steppingEnabled = true;
  148. bool simpleLocalKeyedJoins = true;
  149. unsigned __int64 minFreeDiskSpace = 1024 * 0x100000; // default to 1 GB
  150. unsigned socketCheckInterval = 5000;
  151. StringBuffer logDirectory;
  152. StringBuffer pluginDirectory;
  153. StringBuffer queryDirectory;
  154. StringBuffer codeDirectory;
  155. StringBuffer tempDirectory;
  156. ClientCertificate clientCert;
  157. bool useHardLink;
  158. unsigned maxFileAge[2] = {0xffffffff, 60*60*1000}; // local files don't expire, remote expire in 1 hour, by default
  159. unsigned minFilesOpen[2] = {2000, 500};
  160. unsigned maxFilesOpen[2] = {4000, 1000};
  161. SocketEndpoint debugEndpoint;
  162. HardwareInfo hdwInfo;
  163. unsigned parallelAggregate;
  164. bool inMemoryKeysEnabled = true;
  165. unsigned serverSideCacheSize = 0;
  166. bool nodeCachePreload = false;
  167. unsigned nodeCacheMB = 100;
  168. unsigned leafCacheMB = 50;
  169. unsigned blobCacheMB = 0;
  170. unsigned roxiePort = 0;
  171. Owned<IPerfMonHook> perfMonHook;
  172. MODULE_INIT(INIT_PRIORITY_STANDARD)
  173. {
  174. topology = NULL;
  175. return true;
  176. }
  177. MODULE_EXIT()
  178. {
  179. ::Release(topology);
  180. }
  181. //=========================================================================================
  182. //////////////////////////////////////////////////////////////////////////////////////////////
  183. extern "C" void caughtSIGPIPE(int sig)
  184. {
  185. DBGLOG("Caught sigpipe %d", sig);
  186. }
  187. extern "C" void caughtSIGHUP(int sig)
  188. {
  189. DBGLOG("Caught sighup %d", sig);
  190. }
  191. extern "C" void caughtSIGALRM(int sig)
  192. {
  193. DBGLOG("Caught sigalrm %d", sig);
  194. }
  195. extern "C" void caughtSIGTERM(int sig)
  196. {
  197. DBGLOG("Caught sigterm %d", sig);
  198. }
  199. void init_signals()
  200. {
  201. // signal(SIGTERM, caughtSIGTERM);
  202. #ifndef _WIN32
  203. signal(SIGPIPE, caughtSIGPIPE);
  204. signal(SIGHUP, caughtSIGHUP);
  205. signal(SIGALRM, caughtSIGALRM);
  206. #endif
  207. }
  208. //=========================================================================================
  209. class Waiter : public CInterface, implements IAbortHandler
  210. {
  211. Semaphore aborted;
  212. public:
  213. IMPLEMENT_IINTERFACE;
  214. bool wait(unsigned timeout)
  215. {
  216. return aborted.wait(timeout);
  217. }
  218. void wait()
  219. {
  220. aborted.wait();
  221. }
  222. bool onAbort()
  223. {
  224. aborted.signal();
  225. roxieMetrics.clear();
  226. #ifdef _DEBUG
  227. return false; // we want full leak checking info
  228. #else
  229. return true; // we don't care - just exit as fast as we can
  230. #endif
  231. }
  232. } waiter;
  233. void closedown()
  234. {
  235. Owned<IFile> sentinelFile = createSentinelTarget();
  236. removeSentinelFile(sentinelFile);
  237. waiter.onAbort();
  238. }
  239. void getAccessList(const char *aclName, const IPropertyTree *topology, IPropertyTree *aclInfo)
  240. {
  241. StringBuffer xpath;
  242. xpath.append("ACL[@name='").append(aclName).append("']");
  243. if (aclInfo->queryPropTree(xpath))
  244. throw MakeStringException(MSGAUD_operator, ROXIE_INVALID_TOPOLOGY, "Invalid topology file - recursive ACL definition of %s", aclName);
  245. aclInfo->addPropTree("ACL")->setProp("@name", aclName);
  246. Owned<IPropertyTree> acl = topology->getPropTree(xpath.str());
  247. if (!acl)
  248. throw MakeStringException(MSGAUD_operator, ROXIE_INVALID_TOPOLOGY, "Invalid topology file - acl %s not found", aclName);
  249. Owned<IPropertyTreeIterator> access = acl->getElements("Access");
  250. ForEach(*access)
  251. {
  252. IPropertyTree &child = access->query();
  253. const char *base = child.queryProp("@base");
  254. if (base)
  255. getAccessList(base, topology, aclInfo);
  256. else
  257. aclInfo->addPropTree(child.queryName(), LINK(&child));
  258. }
  259. aclInfo->removeProp(xpath);
  260. }
  261. bool ipMatch(IpAddress &ip)
  262. {
  263. return ip.isLocal();
  264. }
  265. extern void doUNIMPLEMENTED(unsigned line, const char *file)
  266. {
  267. throw MakeStringException(ROXIE_UNIMPLEMENTED_ERROR, "UNIMPLEMENTED at %s:%d", sanitizeSourceFile(file), line);
  268. }
  269. void FatalError(const char *format, ...)
  270. {
  271. va_list args;
  272. va_start(args, format);
  273. StringBuffer errMsg;
  274. errMsg.valist_appendf(format, args);
  275. va_end(args);
  276. Owned<IException> E = MakeStringException(MSGAUD_operator, ROXIE_INTERNAL_ERROR, "%s", errMsg.str());
  277. EXCLOG(E, "Fatal error");
  278. Sleep(5000);
  279. _exit(1);
  280. }
  281. // If changing these, please change roxie.cpp's roxie_server_usage() as well
  282. static void roxie_common_usage(const char * progName)
  283. {
  284. StringBuffer program;
  285. program.append(progName);
  286. getFileNameOnly(program, false);
  287. // Things that are also relevant to stand-alone executables
  288. printf("Usage: %s [options]\n", program.str());
  289. printf("\nOptions:\n");
  290. printf(" -[xml|csv|raw] : Output format (default ascii)\n");
  291. printf(" --daliServers=[host1,...] : List of Dali servers to use\n");
  292. printf(" --tracelevel=[integer] : Amount of information to dump on logs\n");
  293. printf(" --stdlog=[boolean] : Standard log format (based on tracelevel)\n");
  294. printf(" --logfile=[filename] : Outputs to logfile, rather than stdout\n");
  295. printf(" --topology=[filename] : Load configuration from named xml file (default RoxieTopology.xml)\n");
  296. printf(" --help|-h : This message\n");
  297. printf("\n");
  298. }
  299. class MAbortHandler : implements IExceptionHandler
  300. {
  301. public:
  302. virtual bool fireException(IException *e)
  303. {
  304. ForEachItemIn(idx, socketListeners)
  305. {
  306. socketListeners.item(idx).stopListening();
  307. }
  308. return false; // It returns to excsighandler() to abort!
  309. }
  310. } abortHandler;
  311. #ifdef _WIN32
  312. int myhook(int alloctype, void *, size_t nSize, int p1, long allocSeq, const unsigned char *file, int line)
  313. {
  314. // Handy place to put breakpoints when tracking down obscure memory leaks...
  315. if (nSize==68 && !file)
  316. {
  317. DBGLOG("memory hook matched");
  318. }
  319. return true;
  320. }
  321. #endif
  322. void saveTopology()
  323. {
  324. // Write back changes that have been made via certain control:xxx changes, so that they survive a roxie restart
  325. // Note that they are overwritten when Roxie is manually stopped/started via hpcc-init service - these changes
  326. // are only intended to be temporary for the current session
  327. try
  328. {
  329. saveXML(topologyFile.str(), topology);
  330. }
  331. catch (IException *E)
  332. {
  333. // If we can't save the topology, then tough. Carry on without it. Changes will not survive an unexpected roxie restart
  334. EXCLOG(E, "Error saving topology file");
  335. E->Release();
  336. }
  337. }
  338. class CHpccProtocolPluginCtx : implements IHpccProtocolPluginContext, public CInterface
  339. {
  340. public:
  341. IMPLEMENT_IINTERFACE;
  342. virtual int ctxGetPropInt(const char *propName, int defaultValue) const
  343. {
  344. return topology->getPropInt(propName, defaultValue);
  345. }
  346. virtual bool ctxGetPropBool(const char *propName, bool defaultValue) const
  347. {
  348. return topology->getPropBool(propName, defaultValue);
  349. }
  350. virtual const char *ctxQueryProp(const char *propName) const
  351. {
  352. return topology->queryProp(propName);
  353. }
  354. };
  355. static SocketEndpointArray topologyServers;
  356. static std::vector<RoxieEndpointInfo> myRoles;
  357. static std::vector<unsigned> farmerPorts;
  358. static std::vector<std::pair<unsigned, unsigned>> slaveChannels;
  359. static bool splitarg(const char *arg, std::string &name, std::string &value)
  360. {
  361. const char *eq = strchr(arg, '=');
  362. if (eq)
  363. {
  364. name.append(arg, eq-arg);
  365. value.append(eq+1);
  366. return true;
  367. }
  368. else
  369. return false;
  370. }
  371. void readStaticTopology()
  372. {
  373. // If dynamicServers not set, we read a list of all servers form the topology file, and deduce which ones are on which channel
  374. // and the total number of channels
  375. std::vector<RoxieEndpointInfo> allRoles;
  376. IpAddressArray nodeTable;
  377. unsigned numNodes = topology->getCount("./RoxieServerProcess");
  378. Owned<IPropertyTreeIterator> roxieServers = topology->getElements("./RoxieServerProcess");
  379. bool myNodeSet = false;
  380. unsigned calcNumChannels = 0;
  381. ForEach(*roxieServers)
  382. {
  383. IPropertyTree &roxieServer = roxieServers->query();
  384. const char *iptext = roxieServer.queryProp("@netAddress");
  385. IpAddress ip(iptext);
  386. if (ip.isNull())
  387. throw MakeStringException(ROXIE_UDP_ERROR, "Could not resolve address %s", iptext);
  388. if (ip.isLocal() && !myNodeSet)
  389. {
  390. myNodeSet = true;
  391. myNode.setIp(ip);
  392. mySlaveEP.set(ccdMulticastPort, myNode.getNodeAddress());
  393. }
  394. ForEachItemIn(idx, nodeTable)
  395. {
  396. if (ip.ipequals(nodeTable.item(idx)))
  397. throw MakeStringException(ROXIE_UDP_ERROR, "Duplicated node %s in RoxieServerProcess list", iptext);
  398. }
  399. nodeTable.append(ip);
  400. Owned<IPropertyTreeIterator> roxieFarms = topology->getElements("./RoxieFarmProcess");
  401. ForEach(*roxieFarms)
  402. {
  403. IPropertyTree &roxieFarm = roxieFarms->query();
  404. unsigned port = roxieFarm.getPropInt("@port", ROXIE_SERVER_PORT);
  405. RoxieEndpointInfo server = {RoxieEndpointInfo::RoxieServer, 0, { (unsigned short) port, ip }, 0};
  406. allRoles.push_back(server);
  407. }
  408. }
  409. if (!myNodeSet)
  410. throw MakeStringException(MSGAUD_operator, ROXIE_INVALID_TOPOLOGY, "Invalid topology file - current node is not in server list");
  411. // Generate the slave channels
  412. unsigned numDataCopies = topology->getPropInt("@numDataCopies", 1);
  413. if (!numDataCopies)
  414. throw MakeStringException(MSGAUD_operator, ROXIE_INVALID_TOPOLOGY, "Invalid topology file - numDataCopies should be > 0");
  415. unsigned channelsPerNode = topology->getPropInt("@channelsPerNode", 1);
  416. const char *slaveConfig = topology->queryProp("@slaveConfig");
  417. if (!slaveConfig)
  418. slaveConfig = "simple";
  419. if (strnicmp(slaveConfig, "cyclic", 6) == 0)
  420. {
  421. calcNumChannels = numNodes;
  422. unsigned cyclicOffset = topology->getPropInt("@cyclicOffset", 1);
  423. for (unsigned copy=0; copy<numDataCopies; copy++)
  424. {
  425. // Note this code is a little confusing - easy to get the cyclic offset backwards
  426. // cyclic offset means node n+offset has copy 2 for channel n, so node n has copy 2 for channel n-offset
  427. for (unsigned i=0; i<numNodes; i++)
  428. {
  429. int channel = (int)i+1 - (copy * cyclicOffset);
  430. while (channel < 1)
  431. channel = channel + numNodes;
  432. RoxieEndpointInfo slave = {RoxieEndpointInfo::RoxieSlave, (unsigned) channel, { (unsigned short) ccdMulticastPort, nodeTable.item(i) }, copy};
  433. allRoles.push_back(slave);
  434. }
  435. }
  436. }
  437. else if (strnicmp(slaveConfig, "overloaded", 10) == 0)
  438. {
  439. if (!channelsPerNode)
  440. throw MakeStringException(MSGAUD_operator, ROXIE_INVALID_TOPOLOGY, "Invalid topology file - channelsPerNode should be > 0");
  441. calcNumChannels = numNodes * channelsPerNode;
  442. for (unsigned copy=0; copy<channelsPerNode; copy++)
  443. {
  444. for (unsigned i=0; i<numNodes; i++)
  445. {
  446. unsigned channel = i+1;
  447. RoxieEndpointInfo slave = {RoxieEndpointInfo::RoxieSlave, channel, { (unsigned short) ccdMulticastPort, nodeTable.item(i) }, copy};
  448. allRoles.push_back(slave);
  449. channel += numNodes;
  450. }
  451. }
  452. }
  453. else // 'Full redundancy' or 'simple' mode
  454. {
  455. if (numNodes % numDataCopies)
  456. throw MakeStringException(MSGAUD_operator, ROXIE_INVALID_TOPOLOGY, "Invalid topology file - numChannels not an integer");
  457. calcNumChannels = numNodes / numDataCopies;
  458. unsigned channel = 1;
  459. for (unsigned i=0; i<numNodes; i++)
  460. {
  461. RoxieEndpointInfo slave = {RoxieEndpointInfo::RoxieSlave, channel, { (unsigned short) ccdMulticastPort, nodeTable.item(i) }, 0 };
  462. allRoles.push_back(slave);
  463. channel++;
  464. if (channel > calcNumChannels)
  465. channel = 1;
  466. }
  467. }
  468. if (numChannels && numChannels != calcNumChannels)
  469. throw MakeStringException(MSGAUD_operator, ROXIE_INVALID_TOPOLOGY, "Invalid topology file - numChannels calculated at %u but specified as %u", calcNumChannels, numChannels);
  470. if (!calcNumChannels)
  471. throw MakeStringException(MSGAUD_operator, ROXIE_INVALID_TOPOLOGY, "Invalid topology file - numChannels calculated at 0");
  472. if (calcNumChannels > 1 && localSlave)
  473. throw MakeStringException(MSGAUD_operator, ROXIE_INVALID_TOPOLOGY, "Invalid topology file - localSlave requires single channel (%d channels specified)", calcNumChannels);
  474. numChannels = calcNumChannels;
  475. createStaticTopology(allRoles, traceLevel);
  476. }
  477. int STARTQUERY_API start_query(int argc, const char *argv[])
  478. {
  479. for (unsigned i=0;i<(unsigned)argc;i++) {
  480. if (streq(argv[i],"--daemon") || streq(argv[i],"-d")) {
  481. if (daemon(1,0) || write_pidfile(argv[++i])) {
  482. perror("Failed to daemonize");
  483. return EXIT_FAILURE;
  484. }
  485. break;
  486. }
  487. }
  488. EnableSEHtoExceptionMapping();
  489. setTerminateOnSEH();
  490. init_signals();
  491. // We need to do the above BEFORE we call InitModuleObjects
  492. try
  493. {
  494. InitModuleObjects();
  495. }
  496. catch (IException *E)
  497. {
  498. EXCLOG(E);
  499. E->Release();
  500. return EXIT_FAILURE;
  501. }
  502. init_signals();
  503. // stand alone usage only, not server
  504. for (unsigned i=0; i<(unsigned)argc; i++)
  505. {
  506. if (stricmp(argv[i], "--help")==0 ||
  507. stricmp(argv[i], "-h")==0)
  508. {
  509. roxie_common_usage(argv[0]);
  510. return EXIT_SUCCESS;
  511. }
  512. }
  513. #ifdef _USE_CPPUNIT
  514. if (argc>=2 && stricmp(argv[1], "-selftest")==0)
  515. {
  516. selfTestMode = true;
  517. queryStderrLogMsgHandler()->setMessageFields(MSGFIELD_time | MSGFIELD_milliTime | MSGFIELD_prefix);
  518. CppUnit::TextUi::TestRunner runner;
  519. if (argc==2)
  520. {
  521. CppUnit::TestFactoryRegistry &registry = CppUnit::TestFactoryRegistry::getRegistry();
  522. runner.addTest( registry.makeTest() );
  523. }
  524. else
  525. {
  526. // MORE - maybe add a 'list' function here?
  527. for (int name = 2; name < argc; name++)
  528. {
  529. if (stricmp(argv[name], "-q")==0)
  530. {
  531. traceLevel = 0;
  532. roxiemem::setMemTraceLevel(0);
  533. removeLog();
  534. }
  535. else
  536. {
  537. CppUnit::TestFactoryRegistry &registry = CppUnit::TestFactoryRegistry::getRegistry(argv[name]);
  538. runner.addTest( registry.makeTest() );
  539. }
  540. }
  541. }
  542. bool wasSucessful = runner.run( "", false );
  543. releaseAtoms();
  544. return wasSucessful;
  545. }
  546. #endif
  547. #ifdef _DEBUG
  548. #ifdef _WIN32
  549. _CrtSetAllocHook(myhook);
  550. #endif
  551. #endif
  552. #ifndef __64BIT__
  553. // Restrict stack sizes on 32-bit systems
  554. Thread::setDefaultStackSize(0x10000); // NB under windows requires linker setting (/stack:)
  555. #endif
  556. srand( (unsigned)time( NULL ) );
  557. char currentDirectory[_MAX_DIR];
  558. if (!getcwd(currentDirectory, sizeof(currentDirectory)))
  559. throw MakeStringException(ROXIE_INTERNAL_ERROR, "getcwd failed (%d)", errno);
  560. codeDirectory.set(currentDirectory);
  561. addNonEmptyPathSepChar(codeDirectory);
  562. try
  563. {
  564. Owned<IProperties> globals = createProperties(true);
  565. for (int i = 1; i < argc; i++)
  566. {
  567. std::string name, value;
  568. if (splitarg(argv[i], name, value))
  569. {
  570. if (name=="--topologyServer")
  571. {
  572. topologyServers.append(SocketEndpoint(value.c_str()));
  573. continue;
  574. }
  575. else if (name=="--serverPort")
  576. {
  577. farmerPorts.push_back(atoi(value.c_str()));
  578. continue;
  579. }
  580. else if (name=="--channel")
  581. {
  582. char *tail = nullptr;
  583. unsigned channel = strtoul(value.c_str(), &tail, 10);
  584. unsigned repl = 0;
  585. if (*tail==':')
  586. {
  587. tail++;
  588. repl = atoi(tail);
  589. }
  590. slaveChannels.push_back(std::pair<unsigned, unsigned>(channel, repl));
  591. continue;
  592. }
  593. }
  594. globals->loadProp(argv[i], true);
  595. }
  596. Owned<IFile> sentinelFile = createSentinelTarget();
  597. removeSentinelFile(sentinelFile);
  598. if (globals->hasProp("--topology"))
  599. globals->getProp("--topology", topologyFile);
  600. else
  601. topologyFile.append(codeDirectory).append(PATHSEPCHAR).append("RoxieTopology.xml");
  602. if (checkFileExists(topologyFile.str()))
  603. {
  604. DBGLOG("Loading topology file %s", topologyFile.str());
  605. topology = createPTreeFromXMLFile(topologyFile.str(), ipt_lowmem);
  606. saveTopology();
  607. if (globals->hasProp("--udpTraceLevel"))
  608. topology->setProp("@udpTraceLevel", globals->queryProp("--udpTraceLevel"));
  609. if (globals->hasProp("--traceLevel"))
  610. topology->setProp("@traceLevel", globals->queryProp("--traceLevel"));
  611. if (globals->hasProp("--prestartSlaveThreads"))
  612. topology->setProp("@prestartSlaveThreads", globals->queryProp("--prestartSlaveThreads"));
  613. }
  614. else
  615. {
  616. if (globals->hasProp("--topology"))
  617. {
  618. // Explicitly-named topology file SHOULD exist...
  619. throw MakeStringException(ROXIE_INVALID_TOPOLOGY, "topology file %s not found", topologyFile.str());
  620. }
  621. topology=createPTreeFromXMLString(
  622. "<RoxieTopology allFilesDynamic='1' localSlave='1' resolveLocally='1'>"
  623. " <RoxieFarmProcess/>"
  624. " <RoxieServerProcess netAddress='.'/>"
  625. "</RoxieTopology>"
  626. , ipt_lowmem
  627. );
  628. int port = globals->getPropInt("--port", 9876);
  629. topology->setPropInt("RoxieFarmProcess/@port", port);
  630. topology->setProp("@daliServers", globals->queryProp("--daliServers"));
  631. topology->setProp("@traceLevel", globals->queryProp("--traceLevel"));
  632. topology->setPropInt("@allFilesDynamic", globals->getPropInt("--allFilesDynamic", 1));
  633. topology->setProp("@memTraceLevel", globals->queryProp("--memTraceLevel"));
  634. topology->setPropInt64("@totalMemoryLimit", globals->getPropInt("--totalMemoryLimitMb", 0) * (memsize_t) 0x100000);
  635. topology->setProp("@disableLocalOptimizations", globals->queryProp("--disableLocalOptimizations"));
  636. topology->setPropInt("@indexReadChunkSize", globals->getPropInt("--indexReadChunkSize", 60000));
  637. }
  638. if (topology->hasProp("PreferredCluster"))
  639. {
  640. preferredClusters = new MapStringTo<int>(true);
  641. Owned<IPropertyTreeIterator> clusters = topology->getElements("PreferredCluster");
  642. ForEach(*clusters)
  643. {
  644. IPropertyTree &child = clusters->query();
  645. const char *name = child.queryProp("@name");
  646. int priority = child.getPropInt("@priority", 100);
  647. if (name && *name)
  648. preferredClusters->setValue(name, priority);
  649. }
  650. }
  651. topology->getProp("@name", roxieName);
  652. if (roxieName.length())
  653. setStatisticsComponentName(SCTroxie, roxieName, true);
  654. else
  655. setStatisticsComponentName(SCTroxie, "roxie", true);
  656. Owned<const IQueryDll> standAloneDll;
  657. if (globals->hasProp("--loadWorkunit"))
  658. {
  659. StringBuffer workunitName;
  660. globals->getProp("--loadWorkunit", workunitName);
  661. standAloneDll.setown(createQueryDll(workunitName));
  662. }
  663. else
  664. {
  665. Owned<ILoadedDllEntry> dll = createExeDllEntry(argv[0]);
  666. if (checkEmbeddedWorkUnitXML(dll))
  667. {
  668. standAloneDll.setown(createExeQueryDll(argv[0]));
  669. runOnce = globals->getPropInt("--port", 0) == 0;
  670. }
  671. }
  672. traceLevel = topology->getPropInt("@traceLevel", runOnce ? 0 : 1);
  673. if (traceLevel > MAXTRACELEVEL)
  674. traceLevel = MAXTRACELEVEL;
  675. udpTraceLevel = topology->getPropInt("@udpTraceLevel", runOnce ? 0 : 1);
  676. roxiemem::setMemTraceLevel(topology->getPropInt("@memTraceLevel", runOnce ? 0 : 1));
  677. soapTraceLevel = topology->getPropInt("@soapTraceLevel", runOnce ? 0 : 1);
  678. miscDebugTraceLevel = topology->getPropInt("@miscDebugTraceLevel", 0);
  679. Linked<IPropertyTree> directoryTree = topology->queryPropTree("Directories");
  680. if (!directoryTree)
  681. {
  682. Owned<IPropertyTree> envFile = getHPCCEnvironment();
  683. if (envFile)
  684. directoryTree.set(envFile->queryPropTree("Software/Directories"));
  685. }
  686. if (directoryTree)
  687. {
  688. getConfigurationDirectory(directoryTree, "query", "roxie", roxieName, queryDirectory);
  689. for (unsigned replicationLevel = 0; replicationLevel < MAX_REPLICATION_LEVELS; replicationLevel++)
  690. {
  691. StringBuffer dataDir;
  692. StringBuffer dirId("data");
  693. if (replicationLevel)
  694. dirId.append(replicationLevel+1);
  695. if (getConfigurationDirectory(directoryTree, dirId, "roxie", roxieName, dataDir))
  696. setBaseDirectory(dataDir, replicationLevel, DFD_OSdefault);
  697. }
  698. }
  699. directoryTree.clear();
  700. //Logging stuff
  701. if (globals->getPropBool("--stdlog", traceLevel != 0) || topology->getPropBool("@forceStdLog", false))
  702. queryStderrLogMsgHandler()->setMessageFields(MSGFIELD_time | MSGFIELD_milliTime | MSGFIELD_thread | MSGFIELD_prefix);
  703. else
  704. removeLog();
  705. if (globals->hasProp("--logfile"))
  706. {
  707. Owned<IComponentLogFileCreator> lf = createComponentLogFileCreator(topology, "roxie");
  708. lf->setMaxDetail(TopDetail);
  709. lf->beginLogging();
  710. logDirectory.set(lf->queryLogDir());
  711. #ifdef _DEBUG
  712. unsigned useLogQueue = topology->getPropBool("@useLogQueue", false);
  713. #else
  714. unsigned useLogQueue = topology->getPropBool("@useLogQueue", true);
  715. #endif
  716. if (useLogQueue)
  717. {
  718. unsigned logQueueLen = topology->getPropInt("@logQueueLen", 512);
  719. unsigned logQueueDrop = topology->getPropInt("@logQueueDrop", 32);
  720. queryLogMsgManager()->enterQueueingMode();
  721. queryLogMsgManager()->setQueueDroppingLimit(logQueueLen, logQueueDrop);
  722. }
  723. if (globals->getPropBool("--enableSysLog",true))
  724. UseSysLogForOperatorMessages();
  725. }
  726. roxieMetrics.setown(createRoxieMetricsManager());
  727. Owned<IPropertyTreeIterator> userMetrics = topology->getElements("./UserMetric");
  728. ForEach(*userMetrics)
  729. {
  730. IPropertyTree &metric = userMetrics->query();
  731. const char *name = metric.queryProp("@name");
  732. const char *regex= metric.queryProp("@regex");
  733. if (name && regex)
  734. roxieMetrics->addUserMetric(name, regex);
  735. else
  736. throw MakeStringException(ROXIE_INTERNAL_ERROR, "Invalid UserMetric element in topology file - name or regex missing");
  737. }
  738. restarts = globals->getPropInt("--restarts", 0);
  739. const char *preferredSubnet = topology->queryProp("@preferredSubnet");
  740. if (preferredSubnet)
  741. {
  742. const char *preferredSubnetMask = topology->queryProp("@preferredSubnetMask");
  743. if (!preferredSubnetMask) preferredSubnetMask = "255.255.255.0";
  744. if (!setPreferredSubnet(preferredSubnet, preferredSubnetMask))
  745. throw MakeStringException(ROXIE_INTERNAL_ERROR, "Error setting preferred subnet %s mask %s", preferredSubnet, preferredSubnetMask);
  746. }
  747. if (restarts)
  748. {
  749. if (traceLevel)
  750. DBGLOG("Roxie restarting: restarts = %d build = %s", restarts.load(), BUILD_TAG);
  751. setStartRuid(restarts);
  752. }
  753. else
  754. {
  755. if (traceLevel)
  756. {
  757. DBGLOG("Roxie starting, build = %s", BUILD_TAG);
  758. }
  759. }
  760. headRegionSize = topology->getPropInt("@headRegionSize", 50);
  761. ccdMulticastPort = topology->getPropInt("@multicastPort", CCD_MULTICAST_PORT);
  762. statsExpiryTime = topology->getPropInt("@statsExpiryTime", 3600);
  763. roxiemem::setMemTraceSizeLimit((memsize_t) topology->getPropInt64("@memTraceSizeLimit", 0));
  764. callbackRetries = topology->getPropInt("@callbackRetries", 3);
  765. callbackTimeout = topology->getPropInt("@callbackTimeout", 5000);
  766. lowTimeout = topology->getPropInt("@lowTimeout", 10000);
  767. highTimeout = topology->getPropInt("@highTimeout", 2000);
  768. slaTimeout = topology->getPropInt("@slaTimeout", 2000);
  769. parallelLoopFlowLimit = topology->getPropInt("@parallelLoopFlowLimit", 100);
  770. perChannelFlowLimit = topology->getPropInt("@perChannelFlowLimit", 10);
  771. copyResources = topology->getPropBool("@copyResources", true);
  772. useRemoteResources = topology->getPropBool("@useRemoteResources", true);
  773. checkFileDate = topology->getPropBool("@checkFileDate", true);
  774. const char *lazyOpenMode = topology->queryProp("@lazyOpen");
  775. if (!lazyOpenMode || stricmp(lazyOpenMode, "smart")==0)
  776. lazyOpen = (restarts > 0);
  777. else
  778. lazyOpen = topology->getPropBool("@lazyOpen", false);
  779. bool useNasTranslation = topology->getPropBool("@useNASTranslation", true);
  780. if (useNasTranslation)
  781. {
  782. Owned<IPropertyTree> nas = envGetNASConfiguration(topology);
  783. envInstallNASHooks(nas);
  784. }
  785. useDynamicServers = topology->getPropBool("@useDynamicServers", topologyServers.length()>0);
  786. useAeron = topology->getPropBool("@useAeron", useDynamicServers);
  787. localSlave = topology->getPropBool("@localSlave", false);
  788. numChannels = topology->getPropInt("@numChannels", 0);
  789. doIbytiDelay = topology->getPropBool("@doIbytiDelay", true);
  790. minIbytiDelay = topology->getPropInt("@minIbytiDelay", 2);
  791. initIbytiDelay = topology->getPropInt("@initIbytiDelay", 50);
  792. alwaysTrustFormatCrcs = topology->getPropBool("@alwaysTrustFormatCrcs", true);
  793. allFilesDynamic = topology->getPropBool("@allFilesDynamic", false);
  794. lockSuperFiles = topology->getPropBool("@lockSuperFiles", false);
  795. ignoreOrphans = topology->getPropBool("@ignoreOrphans", true);
  796. chunkingHeap = topology->getPropBool("@chunkingHeap", true);
  797. readTimeout = topology->getPropInt("@readTimeout", 300);
  798. logFullQueries = topology->getPropBool("@logFullQueries", false);
  799. debugPermitted = topology->getPropBool("@debugPermitted", true);
  800. blindLogging = topology->getPropBool("@blindLogging", false);
  801. preloadOnceData = topology->getPropBool("@preloadOnceData", true);
  802. reloadRetriesFailed = topology->getPropBool("@reloadRetriesSuspended", true);
  803. #if defined(__linux__) && defined(SYS_ioprio_set)
  804. const char *backgroundCopyClassString = topology->queryProp("@backgroundCopyClass");
  805. if (!isEmptyString(backgroundCopyClassString))
  806. {
  807. if (strieq(backgroundCopyClassString, "best-effort"))
  808. backgroundCopyClass = IOPRIO_CLASS_BE;
  809. else if (strieq(backgroundCopyClassString, "idle"))
  810. backgroundCopyClass = IOPRIO_CLASS_IDLE;
  811. else if (strieq(backgroundCopyClassString, "none"))
  812. backgroundCopyClass = IOPRIO_CLASS_NONE;
  813. else
  814. DBGLOG("Invalid backgroundCopyClass %s specified - ignored", backgroundCopyClassString);
  815. }
  816. backgroundCopyPrio = topology->getPropInt("@backgroundCopyPrio", 0);
  817. if (backgroundCopyPrio >= IOPRIO_BE_NR)
  818. {
  819. DBGLOG("Invalid backgroundCopyPrio %d specified - using %d", backgroundCopyPrio, (int) (IOPRIO_BE_NR-1));
  820. backgroundCopyPrio = IOPRIO_BE_NR-1;
  821. }
  822. else if (backgroundCopyPrio < 0)
  823. {
  824. DBGLOG("Invalid backgroundCopyPrio %d specified - using 0", backgroundCopyPrio);
  825. backgroundCopyPrio = 0;
  826. }
  827. #endif
  828. linuxYield = topology->getPropBool("@linuxYield", false);
  829. traceSmartStepping = topology->getPropBool("@traceSmartStepping", false);
  830. useMemoryMappedIndexes = topology->getPropBool("@useMemoryMappedIndexes", false);
  831. flushJHtreeCacheOnOOM = topology->getPropBool("@flushJHtreeCacheOnOOM", true);
  832. fastLaneQueue = topology->getPropBool("@fastLaneQueue", true);
  833. udpOutQsPriority = topology->getPropInt("@udpOutQsPriority", 0);
  834. udpSnifferEnabled = topology->getPropBool("@udpSnifferEnabled", true);
  835. udpRetryBusySenders = topology->getPropInt("@udpRetryBusySenders", 0);
  836. // Historically, this was specified in seconds. Assume any value <= 10 is a legacy value specified in seconds!
  837. udpMaxRetryTimedoutReqs = topology->getPropInt("@udpMaxRetryTimedoutReqs", 0);
  838. udpRequestToSendTimeout = topology->getPropInt("@udpRequestToSendTimeout", 0);
  839. if (udpRequestToSendTimeout<=10)
  840. udpRequestToSendTimeout *= 1000;
  841. if (udpRequestToSendTimeout == 0)
  842. {
  843. if (slaTimeout)
  844. udpRequestToSendTimeout = (slaTimeout*3) / 4;
  845. else
  846. udpRequestToSendTimeout = 5000;
  847. }
  848. udpRequestToSendAckTimeout = topology->getPropInt("@udpRequestToSendAckTimeout", 100);
  849. // MORE: might want to check socket buffer sizes against sys max here instead of udp threads ?
  850. udpSnifferReadThreadPriority = topology->getPropInt("@udpSnifferReadThreadPriority", 3);
  851. udpSnifferSendThreadPriority = topology->getPropInt("@udpSnifferSendThreadPriority", 3);
  852. udpMulticastBufferSize = topology->getPropInt("@udpMulticastBufferSize", 262142);
  853. udpFlowSocketsSize = topology->getPropInt("@udpFlowSocketsSize", 131072);
  854. udpLocalWriteSocketSize = topology->getPropInt("@udpLocalWriteSocketSize", 1024000);
  855. roxieMulticastEnabled = topology->getPropBool("@roxieMulticastEnabled", true); // enable use of multicast for sending requests to slaves
  856. if (udpSnifferEnabled && !roxieMulticastEnabled)
  857. {
  858. DBGLOG("WARNING: ignoring udpSnifferEnabled setting as multicast not enabled");
  859. udpSnifferEnabled = false;
  860. }
  861. int ttlTmp = topology->getPropInt("@multicastTTL", 1);
  862. if (ttlTmp < 0)
  863. {
  864. multicastTTL = 1;
  865. IWARNLOG("multicastTTL value (%d) invalid, must be >=0, resetting to %u", ttlTmp, multicastTTL);
  866. }
  867. else if (ttlTmp > 255)
  868. {
  869. multicastTTL = 255;
  870. IWARNLOG("multicastTTL value (%d) invalid, must be <=%u, resetting to maximum", ttlTmp, multicastTTL);
  871. }
  872. else
  873. multicastTTL = ttlTmp;
  874. indexReadChunkSize = topology->getPropInt("@indexReadChunkSize", 60000);
  875. numSlaveThreads = topology->getPropInt("@slaveThreads", 30);
  876. numServerThreads = topology->getPropInt("@serverThreads", 30);
  877. numRequestArrayThreads = topology->getPropInt("@requestArrayThreads", 5);
  878. maxBlockSize = topology->getPropInt("@maxBlockSize", 10000000);
  879. maxLockAttempts = topology->getPropInt("@maxLockAttempts", 5);
  880. enableHeartBeat = topology->getPropBool("@enableHeartBeat", true);
  881. checkCompleted = topology->getPropBool("@checkCompleted", true);
  882. prestartSlaveThreads = topology->getPropBool("@prestartSlaveThreads", false);
  883. preabortKeyedJoinsThreshold = topology->getPropInt("@preabortKeyedJoinsThreshold", 100);
  884. preabortIndexReadsThreshold = topology->getPropInt("@preabortIndexReadsThreshold", 100);
  885. defaultMemoryLimit = (memsize_t) topology->getPropInt64("@defaultMemoryLimit", 0);
  886. defaultTimeLimit[0] = (unsigned) topology->getPropInt64("@defaultLowPriorityTimeLimit", 0);
  887. defaultTimeLimit[1] = (unsigned) topology->getPropInt64("@defaultHighPriorityTimeLimit", 0);
  888. defaultTimeLimit[2] = (unsigned) topology->getPropInt64("@defaultSLAPriorityTimeLimit", 0);
  889. defaultWarnTimeLimit[0] = (unsigned) topology->getPropInt64("@defaultLowPriorityTimeWarning", 0);
  890. defaultWarnTimeLimit[1] = (unsigned) topology->getPropInt64("@defaultHighPriorityTimeWarning", 0);
  891. defaultWarnTimeLimit[2] = (unsigned) topology->getPropInt64("@defaultSLAPriorityTimeWarning", 0);
  892. defaultThorConnectTimeout = (unsigned) topology->getPropInt64("@defaultThorConnectTimeout", 60);
  893. defaultXmlReadFlags = topology->getPropBool("@defaultStripLeadingWhitespace", true) ? ptr_ignoreWhiteSpace : ptr_none;
  894. defaultParallelJoinPreload = topology->getPropInt("@defaultParallelJoinPreload", 0);
  895. defaultConcatPreload = topology->getPropInt("@defaultConcatPreload", 0);
  896. defaultFetchPreload = topology->getPropInt("@defaultFetchPreload", 0);
  897. defaultFullKeyedJoinPreload = topology->getPropInt("@defaultFullKeyedJoinPreload", 0);
  898. defaultKeyedJoinPreload = topology->getPropInt("@defaultKeyedJoinPreload", 0);
  899. defaultPrefetchProjectPreload = topology->getPropInt("@defaultPrefetchProjectPreload", 10);
  900. defaultStrandBlockSize = topology->getPropInt("@defaultStrandBlockSize", 512);
  901. defaultForceNumStrands = topology->getPropInt("@defaultForceNumStrands", 0);
  902. defaultCheckingHeap = topology->getPropBool("@checkingHeap", false); // NOTE - not in configmgr - too dangerous!
  903. defaultDisableLocalOptimizations = topology->getPropBool("@disableLocalOptimizations", false); // NOTE - not in configmgr - too dangerous!
  904. slaveQueryReleaseDelaySeconds = topology->getPropInt("@slaveQueryReleaseDelaySeconds", 60);
  905. coresPerQuery = topology->getPropInt("@coresPerQuery", 0);
  906. diskReadBufferSize = topology->getPropInt("@diskReadBufferSize", 0x10000);
  907. fieldTranslationEnabled = RecordTranslationMode::Payload;
  908. const char *val = topology->queryProp("@fieldTranslationEnabled");
  909. if (val)
  910. fieldTranslationEnabled = getTranslationMode(val);
  911. pretendAllOpt = topology->getPropBool("@ignoreMissingFiles", false);
  912. memoryStatsInterval = topology->getPropInt("@memoryStatsInterval", 60);
  913. roxiemem::setMemoryStatsInterval(memoryStatsInterval);
  914. pingInterval = topology->getPropInt("@pingInterval", 0);
  915. socketCheckInterval = topology->getPropInt("@socketCheckInterval", runOnce ? 0 : 5000);
  916. memsize_t totalMemoryLimit = (memsize_t) topology->getPropInt64("@totalMemoryLimit", 0);
  917. bool allowHugePages = topology->getPropBool("@heapUseHugePages", false);
  918. bool allowTransparentHugePages = topology->getPropBool("@heapUseTransparentHugePages", true);
  919. bool retainMemory = topology->getPropBool("@heapRetainMemory", false);
  920. if (!totalMemoryLimit)
  921. totalMemoryLimit = 1024 * 0x100000; // 1 Gb;
  922. roxiemem::setTotalMemoryLimit(allowHugePages, allowTransparentHugePages, retainMemory, totalMemoryLimit, 0, NULL, NULL);
  923. traceStartStop = topology->getPropBool("@traceStartStop", false);
  924. traceServerSideCache = topology->getPropBool("@traceServerSideCache", false);
  925. traceTranslations = topology->getPropBool("@traceTranslations", true);
  926. defaultTimeActivities = topology->getPropBool("@timeActivities", true);
  927. defaultTraceEnabled = topology->getPropBool("@traceEnabled", false);
  928. defaultTraceLimit = topology->getPropInt("@traceLimit", 10);
  929. clientCert.certificate.set(topology->queryProp("@certificateFileName"));
  930. clientCert.privateKey.set(topology->queryProp("@privateKeyFileName"));
  931. clientCert.passphrase.set(topology->queryProp("@passphrase"));
  932. useHardLink = topology->getPropBool("@useHardLink", false);
  933. maxFileAge[false] = topology->getPropInt("@localFilesExpire", (unsigned) -1);
  934. maxFileAge[true] = topology->getPropInt("@remoteFilesExpire", 60*60*1000);
  935. minFilesOpen[false] = topology->getPropInt("@minLocalFilesOpen", 2000);
  936. minFilesOpen[true] = topology->getPropInt("@minRemoteFilesOpen", 500);
  937. maxFilesOpen[false] = topology->getPropInt("@maxLocalFilesOpen", 4000);
  938. maxFilesOpen[true] = topology->getPropInt("@maxRemoteFilesOpen", 1000);
  939. dafilesrvLookupTimeout = topology->getPropInt("@dafilesrvLookupTimeout", 10000);
  940. setRemoteFileTimeouts(dafilesrvLookupTimeout, 0);
  941. topology->getProp("@daliServers", fileNameServiceDali);
  942. trapTooManyActiveQueries = topology->getPropBool("@trapTooManyActiveQueries", true);
  943. maxEmptyLoopIterations = topology->getPropInt("@maxEmptyLoopIterations", 1000);
  944. maxGraphLoopIterations = topology->getPropInt("@maxGraphLoopIterations", 1000);
  945. mergeSlaveStatistics = topology->getPropBool("@mergeSlaveStatistics", true);
  946. defaultCollectFactoryStatistics = topology->getPropBool("@collectFactoryStatistics", true);
  947. enableKeyDiff = topology->getPropBool("@enableKeyDiff", true);
  948. // NB: these directories will have been setup by topology earlier
  949. const char *primaryDirectory = queryBaseDirectory(grp_unknown, 0);
  950. const char *secondaryDirectory = queryBaseDirectory(grp_unknown, 1);
  951. // MORE: Get parms from topology after it is populated from Hardware/computer types section in configenv
  952. // Then if does not match and based on desired action in topolgy, either warn, or fatal exit or .... etc
  953. getHardwareInfo(hdwInfo, primaryDirectory, secondaryDirectory);
  954. if (traceLevel)
  955. {
  956. DBGLOG("Current Hardware Info: CPUs=%i, speed=%i MHz, Mem=%i MB , primDisk=%i GB, primFree=%i GB, secDisk=%i GB, secFree=%i GB, NIC=%i",
  957. hdwInfo.numCPUs, hdwInfo.CPUSpeed, hdwInfo.totalMemory,
  958. hdwInfo.primDiskSize, hdwInfo.primFreeSize, hdwInfo.secDiskSize, hdwInfo.secFreeSize, hdwInfo.NICSpeed);
  959. }
  960. parallelAggregate = topology->getPropInt("@parallelAggregate", 0);
  961. if (!parallelAggregate)
  962. parallelAggregate = hdwInfo.numCPUs;
  963. if (!parallelAggregate)
  964. parallelAggregate = 1;
  965. simpleLocalKeyedJoins = topology->getPropBool("@simpleLocalKeyedJoins", true);
  966. inMemoryKeysEnabled = topology->getPropBool("@inMemoryKeysEnabled", true);
  967. serverSideCacheSize = topology->getPropInt("@serverSideCacheSize", 0);
  968. setKeyIndexCacheSize((unsigned)-1); // unbound
  969. nodeCachePreload = topology->getPropBool("@nodeCachePreload", false);
  970. setNodeCachePreload(nodeCachePreload);
  971. nodeCacheMB = topology->getPropInt("@nodeCacheMem", 100);
  972. setNodeCacheMem(nodeCacheMB * 0x100000);
  973. leafCacheMB = topology->getPropInt("@leafCacheMem", 50);
  974. setLeafCacheMem(leafCacheMB * 0x100000);
  975. blobCacheMB = topology->getPropInt("@blobCacheMem", 0);
  976. setBlobCacheMem(blobCacheMB * 0x100000);
  977. unsigned __int64 affinity = topology->getPropInt64("@affinity", 0);
  978. updateAffinity(affinity);
  979. minFreeDiskSpace = topology->getPropInt64("@minFreeDiskSpace", (1024 * 0x100000)); // default to 1 GB
  980. if (topology->getPropBool("@jumboFrames", false))
  981. {
  982. mtu_size = 9000; // upper limit on outbound buffer size - allow some header room too
  983. roxiemem::setDataAlignmentSize(0x2000);
  984. }
  985. else
  986. {
  987. mtu_size = 1400; // upper limit on outbound buffer size - allow some header room too
  988. roxiemem::setDataAlignmentSize(0x400);
  989. }
  990. unsigned pinterval = topology->getPropInt("@systemMonitorInterval",1000*60);
  991. perfMonHook.setown(roxiemem::createRoxieMemStatsPerfMonHook()); // Note - we create even if pinterval is 0, as can be enabled via control message
  992. if (pinterval)
  993. startPerformanceMonitor(pinterval, PerfMonStandard, perfMonHook);
  994. topology->getProp("@pluginDirectory", pluginDirectory);
  995. StringBuffer packageDirectory;
  996. getPackageFolder(packageDirectory);
  997. if (pluginDirectory.length() == 0 && packageDirectory.length() != 0)
  998. {
  999. pluginDirectory.append(packageDirectory).append("plugins");
  1000. }
  1001. getAdditionalPluginsPath(pluginDirectory, packageDirectory);
  1002. if (queryDirectory.length() == 0)
  1003. {
  1004. topology->getProp("@queryDir", queryDirectory);
  1005. if (queryDirectory.length() == 0)
  1006. queryDirectory.append(codeDirectory).append("queries");
  1007. }
  1008. addNonEmptyPathSepChar(queryDirectory);
  1009. queryFileCache().start();
  1010. getTempFilePath(tempDirectory, "roxie", topology);
  1011. #ifdef _WIN32
  1012. topology->addPropBool("@linuxOS", false);
  1013. #else
  1014. topology->addPropBool("@linuxOS", true);
  1015. #endif
  1016. allQuerySetNames.appendListUniq(topology->queryProp("@querySets"), ",");
  1017. // Set multicast base addresses - must be done before generating slave channels
  1018. if (roxieMulticastEnabled && !localSlave)
  1019. {
  1020. if (topology->queryProp("@multicastBase"))
  1021. multicastBase.ipset(topology->queryProp("@multicastBase"));
  1022. else
  1023. throw MakeStringException(MSGAUD_operator, ROXIE_INVALID_TOPOLOGY, "Invalid topology file - multicastBase not set");
  1024. if (topology->queryProp("@multicastLast"))
  1025. multicastLast.ipset(topology->queryProp("@multicastLast"));
  1026. else
  1027. throw MakeStringException(MSGAUD_operator, ROXIE_INVALID_TOPOLOGY, "Invalid topology file - multicastLast not set");
  1028. }
  1029. if (useDynamicServers)
  1030. {
  1031. if (!numChannels)
  1032. throw makeStringException(MSGAUD_operator, ROXIE_INVALID_TOPOLOGY, "Invalid topology file - numChannels not set");
  1033. IpAddress myIP(".");
  1034. for (unsigned port: farmerPorts)
  1035. {
  1036. VStringBuffer xpath("./RoxieFarmProcess[@port='%u']", port);
  1037. if (!topology->hasProp(xpath))
  1038. topology->addPropTree("./RoxieFarmProcess")->setPropInt("@port", port);
  1039. RoxieEndpointInfo me = {RoxieEndpointInfo::RoxieServer, 0, { (unsigned short) port, myIP }, 0};
  1040. myRoles.push_back(me);
  1041. }
  1042. for (std::pair<unsigned, unsigned> channel: slaveChannels)
  1043. {
  1044. mySlaveEP.set(ccdMulticastPort, myIP);
  1045. RoxieEndpointInfo me = { RoxieEndpointInfo::RoxieSlave, channel.first, mySlaveEP, channel.second };
  1046. myRoles.push_back(me);
  1047. }
  1048. }
  1049. else
  1050. {
  1051. readStaticTopology();
  1052. }
  1053. // Now we know all the channels, we can open and subscribe the multicast channels
  1054. if (!localSlave)
  1055. {
  1056. openMulticastSocket();
  1057. if (roxieMulticastEnabled)
  1058. setMulticastEndpoints(numChannels);
  1059. }
  1060. setDaliServixSocketCaching(true); // enable daliservix caching
  1061. enableForceRemoteReads(); // forces file reads to be remote reads if they match environment setting 'forceRemotePattern' pattern.
  1062. loadPlugins();
  1063. createDelayedReleaser();
  1064. globalPackageSetManager = createRoxiePackageSetManager(standAloneDll.getClear());
  1065. globalPackageSetManager->load();
  1066. unsigned snifferChannel = numChannels+2; // MORE - why +2 not +1??
  1067. if (useDynamicServers && topologyServers.length())
  1068. {
  1069. startTopoThread(topologyServers, myRoles, traceLevel);
  1070. }
  1071. ROQ = createOutputQueueManager(snifferChannel, numSlaveThreads);
  1072. ROQ->setHeadRegionSize(headRegionSize);
  1073. ROQ->start();
  1074. Owned<IPacketDiscarder> packetDiscarder = createPacketDiscarder();
  1075. #if defined(WIN32) && defined(_DEBUG) && defined(_DEBUG_HEAP_FULL)
  1076. int tmpFlag = _CrtSetDbgFlag( _CRTDBG_REPORT_FLAG );
  1077. tmpFlag |= _CRTDBG_CHECK_ALWAYS_DF;
  1078. _CrtSetDbgFlag( tmpFlag );
  1079. #endif
  1080. EnableSEHtoExceptionMapping();
  1081. setSEHtoExceptionHandler(&abortHandler);
  1082. Owned<IHpccProtocolPluginContext> protocolCtx = new CHpccProtocolPluginCtx();
  1083. if (runOnce)
  1084. {
  1085. if (globals->hasProp("--wu"))
  1086. {
  1087. Owned<IHpccProtocolListener> roxieServer = createRoxieWorkUnitListener(1, false);
  1088. try
  1089. {
  1090. VStringBuffer x("-%s", argv[0]);
  1091. roxieServer->runOnce(x);
  1092. fflush(stdout); // in windows if output is redirected results don't appear without flushing
  1093. }
  1094. catch (IException *E)
  1095. {
  1096. EXCLOG(E);
  1097. E->Release();
  1098. }
  1099. }
  1100. else
  1101. {
  1102. Owned<IHpccProtocolPlugin> protocolPlugin = loadHpccProtocolPlugin(protocolCtx, NULL);
  1103. Owned<IHpccProtocolListener> roxieServer = protocolPlugin->createListener("runOnce", createRoxieProtocolMsgSink(myNode.getNodeAddress(), 0, 1, false), 0, 0, NULL);
  1104. try
  1105. {
  1106. const char *format = globals->queryProp("-format");
  1107. if (!format)
  1108. {
  1109. if (globals->hasProp("--xml") || globals->hasProp("-xml")) // Support - versions for compatibility
  1110. format = "xml";
  1111. else if (globals->hasProp("--csv") || globals->hasProp("-csv"))
  1112. format = "csv";
  1113. else if (globals->hasProp("-raw") || globals->hasProp("--raw"))
  1114. format = "raw";
  1115. else
  1116. format = "ascii";
  1117. }
  1118. StringBuffer query;
  1119. query.appendf("<roxie format='%s'/>", format);
  1120. roxieServer->runOnce(query.str()); // MORE - should use the wu listener instead I suspect
  1121. fflush(stdout); // in windows if output is redirected results don't appear without flushing
  1122. }
  1123. catch (IException *E)
  1124. {
  1125. EXCLOG(E);
  1126. E->Release();
  1127. }
  1128. }
  1129. }
  1130. else
  1131. {
  1132. try
  1133. {
  1134. Owned<IPropertyTreeIterator> roxieFarms = topology->getElements("./RoxieFarmProcess");
  1135. ForEach(*roxieFarms)
  1136. {
  1137. IPropertyTree &roxieFarm = roxieFarms->query();
  1138. unsigned listenQueue = roxieFarm.getPropInt("@listenQueue", DEFAULT_LISTEN_QUEUE_SIZE);
  1139. unsigned numThreads = roxieFarm.getPropInt("@numThreads", numServerThreads);
  1140. unsigned port = roxieFarm.getPropInt("@port", ROXIE_SERVER_PORT);
  1141. if (useDynamicServers)
  1142. {
  1143. if (std::find(std::begin(farmerPorts), std::end(farmerPorts), port) == std::end(farmerPorts))
  1144. continue;
  1145. }
  1146. //unsigned requestArrayThreads = roxieFarm.getPropInt("@requestArrayThreads", 5);
  1147. // NOTE: farmer name [@name=] is not copied into topology
  1148. const IpAddress &ip = myNode.getNodeAddress();
  1149. if (!roxiePort)
  1150. {
  1151. roxiePort = port;
  1152. debugEndpoint.set(roxiePort, ip);
  1153. }
  1154. bool suspended = roxieFarm.getPropBool("@suspended", false);
  1155. Owned <IHpccProtocolListener> roxieServer;
  1156. if (port)
  1157. {
  1158. const char *protocol = roxieFarm.queryProp("@protocol");
  1159. StringBuffer certFileName;
  1160. StringBuffer keyFileName;
  1161. StringBuffer passPhraseStr;
  1162. if (protocol && streq(protocol, "ssl"))
  1163. {
  1164. #ifdef _USE_OPENSSL
  1165. const char *certFile = roxieFarm.queryProp("@certificateFileName");
  1166. if (!certFile)
  1167. throw MakeStringException(ROXIE_FILE_ERROR, "Roxie SSL Farm Listener on port %d missing certificateFileName tag", port);
  1168. if (isAbsolutePath(certFile))
  1169. certFileName.append(certFile);
  1170. else
  1171. certFileName.append(codeDirectory.str()).append(certFile);
  1172. if (!checkFileExists(certFileName.str()))
  1173. throw MakeStringException(ROXIE_FILE_ERROR, "Roxie SSL Farm Listener on port %d missing certificateFile (%s)", port, certFileName.str());
  1174. const char *keyFile = roxieFarm.queryProp("@privateKeyFileName");
  1175. if (!keyFile)
  1176. throw MakeStringException(ROXIE_FILE_ERROR, "Roxie SSL Farm Listener on port %d missing privateKeyFileName tag", port);
  1177. if (isAbsolutePath(keyFile))
  1178. keyFileName.append(keyFile);
  1179. else
  1180. keyFileName.append(codeDirectory.str()).append(keyFile);
  1181. if (!checkFileExists(keyFileName.str()))
  1182. throw MakeStringException(ROXIE_FILE_ERROR, "Roxie SSL Farm Listener on port %d missing privateKeyFile (%s)", port, keyFileName.str());
  1183. const char *passPhrase = roxieFarm.queryProp("@passphrase");
  1184. if (!isEmptyString(passPhrase))
  1185. decrypt(passPhraseStr, passPhrase);
  1186. #else
  1187. OWARNLOG("Skipping Roxie SSL Farm Listener on port %d : OpenSSL disabled in build", port);
  1188. continue;
  1189. #endif
  1190. }
  1191. const char *soname = roxieFarm.queryProp("@so");
  1192. const char *config = roxieFarm.queryProp("@config");
  1193. Owned<IHpccProtocolPlugin> protocolPlugin = ensureProtocolPlugin(*protocolCtx, soname);
  1194. roxieServer.setown(protocolPlugin->createListener(protocol ? protocol : "native", createRoxieProtocolMsgSink(ip, port, numThreads, suspended), port, listenQueue, config, certFileName.str(), keyFileName.str(), passPhraseStr.str()));
  1195. }
  1196. else
  1197. roxieServer.setown(createRoxieWorkUnitListener(numThreads, suspended));
  1198. IHpccProtocolMsgSink *sink = roxieServer->queryMsgSink();
  1199. const char *aclName = roxieFarm.queryProp("@aclName");
  1200. if (aclName && *aclName)
  1201. {
  1202. Owned<IPropertyTree> aclInfo = createPTree("AccessInfo", ipt_lowmem);
  1203. getAccessList(aclName, topology, aclInfo);
  1204. Owned<IPropertyTreeIterator> accesses = aclInfo->getElements("Access");
  1205. ForEach(*accesses)
  1206. {
  1207. IPropertyTree &access = accesses->query();
  1208. try
  1209. {
  1210. sink->addAccess(access.getPropBool("@allow", true), access.getPropBool("@allowBlind", true), access.queryProp("@ip"), access.queryProp("@mask"), access.queryProp("@query"), access.queryProp("@error"), access.getPropInt("@errorCode"));
  1211. }
  1212. catch (IException *E)
  1213. {
  1214. StringBuffer s, x;
  1215. E->errorMessage(s);
  1216. E->Release();
  1217. toXML(&access, x, 0, 0);
  1218. throw MakeStringException(ROXIE_ACL_ERROR, "Error in access statement %s: %s", x.str(), s.str());
  1219. }
  1220. }
  1221. }
  1222. socketListeners.append(*roxieServer.getLink());
  1223. time(&startupTime);
  1224. roxieServer->start();
  1225. }
  1226. writeSentinelFile(sentinelFile);
  1227. DBGLOG("Startup completed - LPT=%u APT=%u", queryNumLocalTrees(), queryNumAtomTrees());
  1228. DBGLOG("Waiting for queries");
  1229. if (pingInterval)
  1230. startPingTimer();
  1231. LocalIAbortHandler abortHandler(waiter);
  1232. waiter.wait();
  1233. }
  1234. catch (IException *E)
  1235. {
  1236. StringBuffer x;
  1237. IERRLOG("EXCEPTION: (%d): %s", E->errorCode(), E->errorMessage(x).str());
  1238. E->Release();
  1239. }
  1240. }
  1241. shuttingDown = true;
  1242. if (pingInterval)
  1243. stopPingTimer();
  1244. setSEHtoExceptionHandler(NULL);
  1245. while (socketListeners.isItem(0))
  1246. {
  1247. socketListeners.item(0).stop(1000);
  1248. socketListeners.remove(0);
  1249. }
  1250. packetDiscarder->stop();
  1251. packetDiscarder.clear();
  1252. ROQ->stop();
  1253. ROQ->join();
  1254. ROQ->Release();
  1255. ROQ = NULL;
  1256. }
  1257. catch (IException *E)
  1258. {
  1259. StringBuffer x;
  1260. IERRLOG("EXCEPTION: (%d): %s", E->errorCode(), E->errorMessage(x).str());
  1261. E->Release();
  1262. }
  1263. roxieMetrics.clear();
  1264. stopPerformanceMonitor();
  1265. ::Release(globalPackageSetManager);
  1266. globalPackageSetManager = NULL;
  1267. stopDelayedReleaser();
  1268. cleanupPlugins();
  1269. unloadHpccProtocolPlugin();
  1270. closeMulticastSockets();
  1271. releaseSlaveDynamicFileCache();
  1272. releaseRoxieStateCache();
  1273. setDaliServixSocketCaching(false); // make sure it cleans up or you get bogus memleak reports
  1274. setNodeCaching(false); // ditto
  1275. perfMonHook.clear();
  1276. stopAeronDriver();
  1277. strdup("Make sure leak checking is working");
  1278. roxiemem::releaseRoxieHeap();
  1279. UseSysLogForOperatorMessages(false);
  1280. ExitModuleObjects();
  1281. releaseAtoms();
  1282. strdup("Make sure leak checking is working");
  1283. #ifdef _WIN32
  1284. #ifdef _DEBUG
  1285. #if 1
  1286. StringBuffer leakFileDir(logDirectory.str());
  1287. leakFileDir.append("roxieleaks.log");
  1288. HANDLE h = CreateFile(leakFileDir.str(), GENERIC_READ|GENERIC_WRITE, 0, NULL, CREATE_ALWAYS, 0, 0);
  1289. _CrtSetReportMode( _CRT_WARN, _CRTDBG_MODE_FILE|_CRTDBG_MODE_DEBUG);
  1290. _CrtSetReportFile( _CRT_WARN, h);
  1291. _CrtSetReportMode( _CRT_ERROR, _CRTDBG_MODE_FILE|_CRTDBG_MODE_DEBUG);
  1292. _CrtSetReportFile( _CRT_ERROR, h);
  1293. _CrtSetReportMode( _CRT_ASSERT, _CRTDBG_MODE_FILE|_CRTDBG_MODE_DEBUG);
  1294. _CrtSetReportFile( _CRT_ASSERT, h);
  1295. // _CrtDumpMemoryLeaks(); if you uncomment these lines you get to see the leaks sooner (so can look in debugger at full memory)
  1296. // CloseHandle(h); but there will be additional leaks reported that are not really leaks
  1297. #endif
  1298. #endif
  1299. #endif
  1300. return 0;
  1301. }