ccdmain.cpp 48 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184
  1. /*##############################################################################
  2. HPCC SYSTEMS software Copyright (C) 2012 HPCC Systems®.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. ############################################################################## */
  13. #include <platform.h>
  14. #include <signal.h>
  15. #include <jlib.hpp>
  16. #include <jio.hpp>
  17. #include <jmisc.hpp>
  18. #include <jqueue.tpp>
  19. #include <jsocket.hpp>
  20. #include <jlog.hpp>
  21. #include <jprop.hpp>
  22. #include <jfile.hpp>
  23. #include "jutil.hpp"
  24. #include <build-config.h>
  25. #include "dalienv.hpp"
  26. #include "rmtfile.hpp"
  27. #include "ccd.hpp"
  28. #include "ccdquery.hpp"
  29. #include "ccdstate.hpp"
  30. #include "ccdqueue.ipp"
  31. #include "ccdserver.hpp"
  32. #include "ccdlistener.hpp"
  33. #include "ccdsnmp.hpp"
  34. #include "thorplugin.hpp"
  35. #ifdef _USE_CPPUNIT
  36. #include <cppunit/extensions/TestFactoryRegistry.h>
  37. #include <cppunit/ui/text/TestRunner.h>
  38. #endif
  39. //=================================================================================
  40. bool shuttingDown = false;
  41. unsigned numChannels;
  42. unsigned callbackRetries = 3;
  43. unsigned callbackTimeout = 5000;
  44. unsigned lowTimeout = 10000;
  45. unsigned highTimeout = 2000;
  46. unsigned slaTimeout = 2000;
  47. unsigned numServerThreads = 30;
  48. unsigned numSlaveThreads = 30;
  49. unsigned numRequestArrayThreads = 5;
  50. unsigned headRegionSize;
  51. unsigned ccdMulticastPort;
  52. bool enableHeartBeat = true;
  53. unsigned parallelLoopFlowLimit = 100;
  54. unsigned perChannelFlowLimit = 10;
  55. time_t startupTime;
  56. unsigned statsExpiryTime = 3600;
  57. unsigned miscDebugTraceLevel = 0; // separate trace settings purely for debugging specific items (i.e. all possible locations to look for files at startup)
  58. unsigned readTimeout = 300;
  59. unsigned indexReadChunkSize = 60000;
  60. unsigned maxBlockSize = 10000000;
  61. unsigned maxLockAttempts = 5;
  62. bool pretendAllOpt = false;
  63. bool traceStartStop = false;
  64. bool traceServerSideCache = false;
  65. bool defaultTimeActivities = true;
  66. bool defaultTraceEnabled = false;
  67. unsigned defaultTraceLimit = 10;
  68. unsigned watchActivityId = 0;
  69. unsigned testSlaveFailure = 0;
  70. unsigned restarts = 0;
  71. bool fieldTranslationEnabled = false;
  72. bool mergeSlaveStatistics = true;
  73. PTreeReaderOptions defaultXmlReadFlags = ptr_ignoreWhiteSpace;
  74. bool runOnce = false;
  75. unsigned udpMulticastBufferSize = 262142;
  76. bool roxieMulticastEnabled = true;
  77. IPropertyTree *topology;
  78. MapStringTo<int> *preferredClusters;
  79. StringBuffer topologyFile;
  80. CriticalSection ccdChannelsCrit;
  81. IPropertyTree* ccdChannels;
  82. StringArray allQuerySetNames;
  83. IProperties *targetAliases;
  84. bool allFilesDynamic;
  85. bool lockSuperFiles;
  86. bool crcResources;
  87. bool useRemoteResources;
  88. bool checkFileDate;
  89. bool lazyOpen;
  90. bool localSlave;
  91. bool ignoreOrphans;
  92. bool doIbytiDelay = true;
  93. unsigned initIbytiDelay; // In MillSec
  94. unsigned minIbytiDelay; // In MillSec
  95. bool copyResources;
  96. bool enableKeyDiff = true;
  97. bool chunkingHeap = true;
  98. bool logFullQueries;
  99. bool blindLogging = false;
  100. bool debugPermitted = true;
  101. bool checkCompleted = true;
  102. unsigned preabortKeyedJoinsThreshold = 100;
  103. unsigned preabortIndexReadsThreshold = 100;
  104. bool preloadOnceData;
  105. bool reloadRetriesFailed;
  106. bool selfTestMode = false;
  107. unsigned memoryStatsInterval = 0;
  108. memsize_t defaultMemoryLimit;
  109. unsigned defaultTimeLimit[3] = {0, 0, 0};
  110. unsigned defaultWarnTimeLimit[3] = {0, 5000, 5000};
  111. unsigned defaultThorConnectTimeout;
  112. unsigned defaultParallelJoinPreload = 0;
  113. unsigned defaultPrefetchProjectPreload = 10;
  114. unsigned defaultConcatPreload = 0;
  115. unsigned defaultFetchPreload = 0;
  116. unsigned defaultFullKeyedJoinPreload = 0;
  117. unsigned defaultKeyedJoinPreload = 0;
  118. unsigned dafilesrvLookupTimeout = 10000;
  119. bool defaultCheckingHeap = false;
  120. unsigned defaultStrandBlockSize = 512;
  121. unsigned defaultForceNumStrands = 0;
  122. unsigned slaveQueryReleaseDelaySeconds = 60;
  123. unsigned coresPerQuery = 0;
  124. unsigned logQueueLen;
  125. unsigned logQueueDrop;
  126. bool useLogQueue;
  127. bool fastLaneQueue;
  128. unsigned mtu_size = 1400; // upper limit on outbound buffer size - allow some header room too
  129. StringBuffer fileNameServiceDali;
  130. StringBuffer roxieName;
  131. bool trapTooManyActiveQueries;
  132. unsigned maxEmptyLoopIterations;
  133. unsigned maxGraphLoopIterations;
  134. bool probeAllRows;
  135. bool steppingEnabled = true;
  136. bool simpleLocalKeyedJoins = true;
  137. unsigned __int64 minFreeDiskSpace = 1024 * 0x100000; // default to 1 GB
  138. unsigned socketCheckInterval = 5000;
  139. StringBuffer logDirectory;
  140. StringBuffer pluginDirectory;
  141. StringBuffer queryDirectory;
  142. StringBuffer codeDirectory;
  143. StringBuffer tempDirectory;
  144. ClientCertificate clientCert;
  145. bool useHardLink;
  146. unsigned maxFileAge[2] = {0xffffffff, 60*60*1000}; // local files don't expire, remote expire in 1 hour, by default
  147. unsigned minFilesOpen[2] = {2000, 500};
  148. unsigned maxFilesOpen[2] = {4000, 1000};
  149. unsigned myNodeIndex = (unsigned) -1;
  150. SocketEndpoint ownEP;
  151. HardwareInfo hdwInfo;
  152. unsigned parallelAggregate;
  153. bool inMemoryKeysEnabled = true;
  154. unsigned serverSideCacheSize = 0;
  155. bool nodeCachePreload = false;
  156. unsigned nodeCacheMB = 100;
  157. unsigned leafCacheMB = 50;
  158. unsigned blobCacheMB = 0;
  159. unsigned roxiePort = 0;
  160. Owned<IPerfMonHook> perfMonHook;
  161. MODULE_INIT(INIT_PRIORITY_STANDARD)
  162. {
  163. topology = NULL;
  164. ccdChannels = NULL;
  165. return true;
  166. }
  167. MODULE_EXIT()
  168. {
  169. ::Release(topology);
  170. ::Release(ccdChannels);
  171. }
  172. //=========================================================================================
  173. //////////////////////////////////////////////////////////////////////////////////////////////
  174. extern "C" void caughtSIGPIPE(int sig)
  175. {
  176. DBGLOG("Caught sigpipe %d", sig);
  177. }
  178. extern "C" void caughtSIGHUP(int sig)
  179. {
  180. DBGLOG("Caught sighup %d", sig);
  181. }
  182. extern "C" void caughtSIGALRM(int sig)
  183. {
  184. DBGLOG("Caught sigalrm %d", sig);
  185. }
  186. extern "C" void caughtSIGTERM(int sig)
  187. {
  188. DBGLOG("Caught sigterm %d", sig);
  189. }
  190. void init_signals()
  191. {
  192. // signal(SIGTERM, caughtSIGTERM);
  193. #ifndef _WIN32
  194. signal(SIGPIPE, caughtSIGPIPE);
  195. signal(SIGHUP, caughtSIGHUP);
  196. signal(SIGALRM, caughtSIGALRM);
  197. #endif
  198. }
  199. //=========================================================================================
  200. class Waiter : public CInterface, implements IAbortHandler
  201. {
  202. Semaphore aborted;
  203. public:
  204. IMPLEMENT_IINTERFACE;
  205. bool wait(unsigned timeout)
  206. {
  207. return aborted.wait(timeout);
  208. }
  209. void wait()
  210. {
  211. aborted.wait();
  212. }
  213. bool onAbort()
  214. {
  215. aborted.signal();
  216. roxieMetrics.clear();
  217. #ifdef _DEBUG
  218. return false; // we want full leak checking info
  219. #else
  220. return true; // we don't care - just exit as fast as we can
  221. #endif
  222. }
  223. } waiter;
  224. void closedown()
  225. {
  226. Owned<IFile> sentinelFile = createSentinelTarget();
  227. removeSentinelFile(sentinelFile);
  228. waiter.onAbort();
  229. }
  230. void getAccessList(const char *aclName, const IPropertyTree *topology, IPropertyTree *aclInfo)
  231. {
  232. StringBuffer xpath;
  233. xpath.append("ACL[@name='").append(aclName).append("']");
  234. if (aclInfo->queryPropTree(xpath))
  235. throw MakeStringException(MSGAUD_operator, ROXIE_INVALID_TOPOLOGY, "Invalid topology file - recursive ACL definition of %s", aclName);
  236. Owned<IPropertyTree> X = createPTree("ACL");
  237. X->setProp("@name", aclName);
  238. aclInfo->addPropTree("ACL", X.getClear());
  239. Owned<IPropertyTree> acl = topology->getPropTree(xpath.str());
  240. if (!acl)
  241. throw MakeStringException(MSGAUD_operator, ROXIE_INVALID_TOPOLOGY, "Invalid topology file - acl %s not found", aclName);
  242. Owned<IPropertyTreeIterator> access = acl->getElements("Access");
  243. ForEach(*access)
  244. {
  245. IPropertyTree &child = access->query();
  246. const char *base = child.queryProp("@base");
  247. if (base)
  248. getAccessList(base, topology, aclInfo);
  249. else
  250. aclInfo->addPropTree(child.queryName(), LINK(&child));
  251. }
  252. aclInfo->removeProp(xpath);
  253. }
  254. bool ipMatch(IpAddress &ip)
  255. {
  256. return ip.isLocal();
  257. }
  258. void addSlaveChannel(unsigned channel, unsigned level)
  259. {
  260. StringBuffer xpath;
  261. xpath.appendf("RoxieSlaveProcess[@channel=\"%d\"]", channel);
  262. if (ccdChannels->hasProp(xpath.str()))
  263. throw MakeStringException(MSGAUD_operator, ROXIE_INVALID_TOPOLOGY, "Invalid topology file - channel %d repeated", channel);
  264. IPropertyTree *ci = createPTree("RoxieSlaveProcess");
  265. ci->setPropInt("@channel", channel);
  266. ci->setPropInt("@subChannel", numSlaves[channel]);
  267. ccdChannels->addPropTree("RoxieSlaveProcess", ci);
  268. }
  269. void addChannel(unsigned nodeNumber, unsigned channel, unsigned level)
  270. {
  271. numSlaves[channel]++;
  272. if (nodeNumber == myNodeIndex && channel > 0)
  273. {
  274. assertex(channel <= numChannels);
  275. assertex(!replicationLevel[channel]);
  276. replicationLevel[channel] = level;
  277. addSlaveChannel(channel, level);
  278. }
  279. if (!localSlave)
  280. {
  281. addEndpoint(channel, getNodeAddress(nodeNumber), ccdMulticastPort);
  282. }
  283. }
  284. extern void doUNIMPLEMENTED(unsigned line, const char *file)
  285. {
  286. throw MakeStringException(ROXIE_UNIMPLEMENTED_ERROR, "UNIMPLEMENTED at %s:%d", file, line);
  287. }
  288. void FatalError(const char *format, ...)
  289. {
  290. va_list args;
  291. va_start(args, format);
  292. StringBuffer errMsg;
  293. errMsg.valist_appendf(format, args);
  294. va_end(args);
  295. Owned<IException> E = MakeStringException(MSGAUD_operator, ROXIE_INTERNAL_ERROR, "%s", errMsg.str());
  296. EXCLOG(E, "Fatal error");
  297. Sleep(5000);
  298. _exit(1);
  299. }
  300. // If changing these, please change roxie.cpp's roxie_server_usage() as well
  301. static void roxie_common_usage(const char * progName)
  302. {
  303. StringBuffer program;
  304. program.append(progName);
  305. getFileNameOnly(program, false);
  306. // Things that are also relevant to stand-alone executables
  307. printf("Usage: %s [options]\n", program.str());
  308. printf("\nOptions:\n");
  309. printf("\t--daliServers=[host1,...]\t: List of Dali servers to use\n");
  310. printf("\t--tracelevel=[integer]\t: Amount of information to dump on logs\n");
  311. printf("\t--stdlog=[boolean]\t: Standard log format (based on tracelevel)\n");
  312. printf("\t--logfile\t: Outputs to logfile, rather than stdout\n");
  313. printf("\t--help|-h\t: This message\n");
  314. printf("\n");
  315. }
  316. class MAbortHandler : implements IExceptionHandler
  317. {
  318. unsigned dummy; // to avoid complaints about an empty class...
  319. public:
  320. MAbortHandler() : dummy(0) {};
  321. virtual bool fireException(IException *e)
  322. {
  323. ForEachItemIn(idx, socketListeners)
  324. {
  325. socketListeners.item(idx).stopListening();
  326. }
  327. return false; // It returns to excsighandler() to abort!
  328. }
  329. } abortHandler;
  330. #ifdef _WIN32
  331. int myhook(int alloctype, void *, size_t nSize, int p1, long allocSeq, const unsigned char *file, int line)
  332. {
  333. // Handy place to put breakpoints when tracking down obscure memory leaks...
  334. if (nSize==68 && !file)
  335. {
  336. DBGLOG("memory hook matched");
  337. }
  338. return true;
  339. }
  340. #endif
  341. void saveTopology()
  342. {
  343. // Write back changes that have been made via certain control:xxx changes, so that they survive a roxie restart
  344. // Note that they are overwritten when Roxie is manually stopped/started via hpcc-init service - these changes
  345. // are only intended to be temporary for the current session
  346. try
  347. {
  348. saveXML(topologyFile.str(), topology);
  349. }
  350. catch (IException *E)
  351. {
  352. // If we can't save the topology, then tough. Carry on without it. Changes will not survive an unexpected roxie restart
  353. EXCLOG(E, "Error saving topology file");
  354. E->Release();
  355. }
  356. }
  357. class CHpccProtocolPluginCtx : public CInterface, implements IHpccProtocolPluginContext
  358. {
  359. public:
  360. IMPLEMENT_IINTERFACE;
  361. virtual int ctxGetPropInt(const char *propName, int defaultValue) const
  362. {
  363. return topology->getPropInt(propName, defaultValue);
  364. }
  365. virtual bool ctxGetPropBool(const char *propName, bool defaultValue) const
  366. {
  367. return topology->getPropBool(propName, defaultValue);
  368. }
  369. virtual const char *ctxQueryProp(const char *propName) const
  370. {
  371. return topology->queryProp(propName);
  372. }
  373. };
  374. int STARTQUERY_API start_query(int argc, const char *argv[])
  375. {
  376. EnableSEHtoExceptionMapping();
  377. setTerminateOnSEH();
  378. init_signals();
  379. // We need to do the above BEFORE we call InitModuleObjects
  380. InitModuleObjects();
  381. getDaliServixPort();
  382. init_signals();
  383. // stand alone usage only, not server
  384. for (unsigned i=0; i<(unsigned)argc; i++)
  385. {
  386. if (stricmp(argv[i], "--help")==0 ||
  387. stricmp(argv[i], "-h")==0)
  388. {
  389. roxie_common_usage(argv[0]);
  390. return EXIT_SUCCESS;
  391. }
  392. }
  393. #ifdef _USE_CPPUNIT
  394. if (argc>=2 && stricmp(argv[1], "-selftest")==0)
  395. {
  396. selfTestMode = true;
  397. queryStderrLogMsgHandler()->setMessageFields(MSGFIELD_time | MSGFIELD_milliTime | MSGFIELD_prefix);
  398. CppUnit::TextUi::TestRunner runner;
  399. if (argc==2)
  400. {
  401. CppUnit::TestFactoryRegistry &registry = CppUnit::TestFactoryRegistry::getRegistry();
  402. runner.addTest( registry.makeTest() );
  403. }
  404. else
  405. {
  406. // MORE - maybe add a 'list' function here?
  407. for (int name = 2; name < argc; name++)
  408. {
  409. if (stricmp(argv[name], "-q")==0)
  410. {
  411. traceLevel = 0;
  412. roxiemem::memTraceLevel = 0;
  413. removeLog();
  414. }
  415. else
  416. {
  417. CppUnit::TestFactoryRegistry &registry = CppUnit::TestFactoryRegistry::getRegistry(argv[name]);
  418. runner.addTest( registry.makeTest() );
  419. }
  420. }
  421. }
  422. bool wasSucessful = runner.run( "", false );
  423. releaseAtoms();
  424. return wasSucessful;
  425. }
  426. #endif
  427. #ifdef _DEBUG
  428. #ifdef _WIN32
  429. _CrtSetAllocHook(myhook);
  430. #endif
  431. #endif
  432. #ifndef __64BIT__
  433. // Restrict stack sizes on 32-bit systems
  434. Thread::setDefaultStackSize(0x10000); // NB under windows requires linker setting (/stack:)
  435. #endif
  436. srand( (unsigned)time( NULL ) );
  437. ccdChannels = createPTree("Channels");
  438. char currentDirectory[_MAX_DIR];
  439. if (!getcwd(currentDirectory, sizeof(currentDirectory)))
  440. throw MakeStringException(ROXIE_INTERNAL_ERROR, "getcwd failed (%d)", errno);
  441. codeDirectory.set(currentDirectory);
  442. addNonEmptyPathSepChar(codeDirectory);
  443. try
  444. {
  445. Owned<IProperties> globals = createProperties(true);
  446. for (int i = 1; i < argc; i++)
  447. globals->loadProp(argv[i], true);
  448. Owned<IFile> sentinelFile = createSentinelTarget();
  449. removeSentinelFile(sentinelFile);
  450. if (globals->hasProp("--topology"))
  451. globals->getProp("--topology", topologyFile);
  452. else
  453. topologyFile.append(codeDirectory).append(PATHSEPCHAR).append("RoxieTopology.xml");
  454. if (checkFileExists(topologyFile.str()))
  455. {
  456. DBGLOG("Loading topology file %s", topologyFile.str());
  457. topology = createPTreeFromXMLFile(topologyFile.str());
  458. saveTopology();
  459. }
  460. else
  461. {
  462. if (globals->hasProp("--topology"))
  463. {
  464. // Explicitly-named topology file SHOULD exist...
  465. throw MakeStringException(ROXIE_INVALID_TOPOLOGY, "topology file %s not found", topologyFile.str());
  466. }
  467. topology=createPTreeFromXMLString(
  468. "<RoxieTopology allFilesDynamic='1' localSlave='1' resolveLocally='1'>"
  469. " <RoxieFarmProcess/>"
  470. " <RoxieServerProcess netAddress='.'/>"
  471. "</RoxieTopology>"
  472. );
  473. int port = globals->getPropInt("--port", 9876);
  474. topology->setPropInt("RoxieFarmProcess/@port", port);
  475. topology->setProp("@daliServers", globals->queryProp("--daliServers"));
  476. topology->setProp("@traceLevel", globals->queryProp("--traceLevel"));
  477. topology->setProp("@memTraceLevel", globals->queryProp("--memTraceLevel"));
  478. }
  479. if (topology->hasProp("PreferredCluster"))
  480. {
  481. preferredClusters = new MapStringTo<int>(true);
  482. Owned<IPropertyTreeIterator> clusters = topology->getElements("PreferredCluster");
  483. ForEach(*clusters)
  484. {
  485. IPropertyTree &child = clusters->query();
  486. const char *name = child.queryProp("@name");
  487. int priority = child.getPropInt("@priority", 100);
  488. if (name && *name)
  489. preferredClusters->setValue(name, priority);
  490. }
  491. }
  492. topology->getProp("@name", roxieName);
  493. if (roxieName.length())
  494. setStatisticsComponentName(SCTroxie, roxieName, true);
  495. else
  496. setStatisticsComponentName(SCTroxie, "roxie", true);
  497. Owned<const IQueryDll> standAloneDll;
  498. if (globals->hasProp("--loadWorkunit"))
  499. {
  500. StringBuffer workunitName;
  501. globals->getProp("--loadWorkunit", workunitName);
  502. standAloneDll.setown(createQueryDll(workunitName));
  503. }
  504. else
  505. {
  506. Owned<ILoadedDllEntry> dll = createExeDllEntry(argv[0]);
  507. if (checkEmbeddedWorkUnitXML(dll))
  508. {
  509. standAloneDll.setown(createExeQueryDll(argv[0]));
  510. runOnce = globals->getPropInt("--port", 0) == 0;
  511. }
  512. }
  513. traceLevel = topology->getPropInt("@traceLevel", runOnce ? 0 : 1);
  514. if (traceLevel > MAXTRACELEVEL)
  515. traceLevel = MAXTRACELEVEL;
  516. udpTraceLevel = topology->getPropInt("@udpTraceLevel", runOnce ? 0 : 1);
  517. roxiemem::memTraceLevel = topology->getPropInt("@memTraceLevel", runOnce ? 0 : 1);
  518. soapTraceLevel = topology->getPropInt("@soapTraceLevel", runOnce ? 0 : 1);
  519. miscDebugTraceLevel = topology->getPropInt("@miscDebugTraceLevel", 0);
  520. Linked<IPropertyTree> directoryTree = topology->queryPropTree("Directories");
  521. if (!directoryTree)
  522. {
  523. Owned<IPropertyTree> envFile = getHPCCEnvironment();
  524. if (envFile)
  525. directoryTree.set(envFile->queryPropTree("Software/Directories"));
  526. }
  527. if (directoryTree)
  528. {
  529. getConfigurationDirectory(directoryTree, "query", "roxie", roxieName, queryDirectory);
  530. for (unsigned replicationLevel = 0; replicationLevel < MAX_REPLICATION_LEVELS; replicationLevel++)
  531. {
  532. StringBuffer dataDir;
  533. StringBuffer dirId("data");
  534. if (replicationLevel)
  535. dirId.append(replicationLevel+1);
  536. if (getConfigurationDirectory(directoryTree, dirId, "roxie", roxieName, dataDir))
  537. setBaseDirectory(dataDir, replicationLevel, DFD_OSdefault);
  538. }
  539. }
  540. directoryTree.clear();
  541. //Logging stuff
  542. if (globals->getPropBool("--stdlog", traceLevel != 0) || topology->getPropBool("@forceStdLog", false))
  543. queryStderrLogMsgHandler()->setMessageFields(MSGFIELD_time | MSGFIELD_milliTime | MSGFIELD_thread | MSGFIELD_prefix);
  544. else
  545. removeLog();
  546. if (globals->hasProp("--logfile"))
  547. {
  548. Owned<IComponentLogFileCreator> lf = createComponentLogFileCreator(topology, "roxie");
  549. lf->setMaxDetail(TopDetail);
  550. lf->beginLogging();
  551. logDirectory.set(lf->queryLogDir());
  552. #ifdef _DEBUG
  553. unsigned useLogQueue = topology->getPropBool("@useLogQueue", false);
  554. #else
  555. unsigned useLogQueue = topology->getPropBool("@useLogQueue", true);
  556. #endif
  557. if (useLogQueue)
  558. {
  559. unsigned logQueueLen = topology->getPropInt("@logQueueLen", 512);
  560. unsigned logQueueDrop = topology->getPropInt("@logQueueDrop", 32);
  561. queryLogMsgManager()->enterQueueingMode();
  562. queryLogMsgManager()->setQueueDroppingLimit(logQueueLen, logQueueDrop);
  563. }
  564. if (globals->getPropBool("--enableSysLog",true))
  565. UseSysLogForOperatorMessages();
  566. }
  567. roxieMetrics.setown(createRoxieMetricsManager());
  568. Owned<IPropertyTreeIterator> userMetrics = topology->getElements("./UserMetric");
  569. ForEach(*userMetrics)
  570. {
  571. IPropertyTree &metric = userMetrics->query();
  572. const char *name = metric.queryProp("@name");
  573. const char *regex= metric.queryProp("@regex");
  574. if (name && regex)
  575. roxieMetrics->addUserMetric(name, regex);
  576. else
  577. throw MakeStringException(ROXIE_INTERNAL_ERROR, "Invalid UserMetric element in topology file - name or regex missing");
  578. }
  579. restarts = globals->getPropInt("--restarts", 0);
  580. const char *preferredSubnet = topology->queryProp("@preferredSubnet");
  581. if (preferredSubnet)
  582. {
  583. const char *preferredSubnetMask = topology->queryProp("@preferredSubnetMask");
  584. if (!preferredSubnetMask) preferredSubnetMask = "255.255.255.0";
  585. if (!setPreferredSubnet(preferredSubnet, preferredSubnetMask))
  586. throw MakeStringException(ROXIE_INTERNAL_ERROR, "Error setting preferred subnet %s mask %s", preferredSubnet, preferredSubnetMask);
  587. }
  588. if (restarts)
  589. {
  590. if (traceLevel)
  591. DBGLOG("Roxie restarting: restarts = %d build = %s", restarts, BUILD_TAG);
  592. setStartRuid(restarts);
  593. }
  594. else
  595. {
  596. if (traceLevel)
  597. {
  598. DBGLOG("Roxie starting, build = %s", BUILD_TAG);
  599. }
  600. }
  601. headRegionSize = topology->getPropInt("@headRegionSize", 50);
  602. ccdMulticastPort = topology->getPropInt("@multicastPort", CCD_MULTICAST_PORT);
  603. statsExpiryTime = topology->getPropInt("@statsExpiryTime", 3600);
  604. roxiemem::memTraceSizeLimit = (memsize_t) topology->getPropInt64("@memTraceSizeLimit", 0);
  605. callbackRetries = topology->getPropInt("@callbackRetries", 3);
  606. callbackTimeout = topology->getPropInt("@callbackTimeout", 5000);
  607. lowTimeout = topology->getPropInt("@lowTimeout", 10000);
  608. highTimeout = topology->getPropInt("@highTimeout", 2000);
  609. slaTimeout = topology->getPropInt("@slaTimeout", 2000);
  610. parallelLoopFlowLimit = topology->getPropInt("@parallelLoopFlowLimit", 100);
  611. perChannelFlowLimit = topology->getPropInt("@perChannelFlowLimit", 10);
  612. copyResources = topology->getPropBool("@copyResources", true);
  613. useRemoteResources = topology->getPropBool("@useRemoteResources", true);
  614. checkFileDate = topology->getPropBool("@checkFileDate", true);
  615. const char *lazyOpenMode = topology->queryProp("@lazyOpen");
  616. if (!lazyOpenMode || stricmp(lazyOpenMode, "smart")==0)
  617. lazyOpen = (restarts > 0);
  618. else
  619. lazyOpen = topology->getPropBool("@lazyOpen", false);
  620. bool useNasTranslation = topology->getPropBool("@useNASTranslation", true);
  621. if (useNasTranslation)
  622. {
  623. Owned<IPropertyTree> nas = envGetNASConfiguration(topology);
  624. envInstallNASHooks(nas);
  625. }
  626. localSlave = topology->getPropBool("@localSlave", false);
  627. doIbytiDelay = topology->getPropBool("@doIbytiDelay", true);
  628. minIbytiDelay = topology->getPropInt("@minIbytiDelay", 2);
  629. initIbytiDelay = topology->getPropInt("@initIbytiDelay", 50);
  630. allFilesDynamic = topology->getPropBool("@allFilesDynamic", false);
  631. lockSuperFiles = topology->getPropBool("@lockSuperFiles", false);
  632. crcResources = topology->getPropBool("@crcResources", false);
  633. ignoreOrphans = topology->getPropBool("@ignoreOrphans", true);
  634. chunkingHeap = topology->getPropBool("@chunkingHeap", true);
  635. readTimeout = topology->getPropInt("@readTimeout", 300);
  636. logFullQueries = topology->getPropBool("@logFullQueries", false);
  637. debugPermitted = topology->getPropBool("@debugPermitted", true);
  638. blindLogging = topology->getPropBool("@blindLogging", false);
  639. if (!blindLogging)
  640. logExcessiveSeeks = true;
  641. preloadOnceData = topology->getPropBool("@preloadOnceData", true);
  642. reloadRetriesFailed = topology->getPropBool("@reloadRetriesSuspended", true);
  643. linuxYield = topology->getPropBool("@linuxYield", false);
  644. traceSmartStepping = topology->getPropBool("@traceSmartStepping", false);
  645. useMemoryMappedIndexes = topology->getPropBool("@useMemoryMappedIndexes", false);
  646. traceJHtreeAllocations = topology->getPropBool("@traceJHtreeAllocations", false);
  647. flushJHtreeCacheOnOOM = topology->getPropBool("@flushJHtreeCacheOnOOM", true);
  648. fastLaneQueue = topology->getPropBool("@fastLaneQueue", true);
  649. udpOutQsPriority = topology->getPropInt("@udpOutQsPriority", 0);
  650. udpSnifferEnabled = topology->getPropBool("@udpSnifferEnabled", true);
  651. udpInlineCollation = topology->getPropBool("@udpInlineCollation", false);
  652. udpInlineCollationPacketLimit = topology->getPropInt("@udpInlineCollationPacketLimit", 50);
  653. udpSendCompletedInData = topology->getPropBool("@udpSendCompletedInData", false);
  654. udpRetryBusySenders = topology->getPropInt("@udpRetryBusySenders", 0);
  655. udpMaxRetryTimedoutReqs = topology->getPropInt("@udpMaxRetryTimedoutReqs", 0);
  656. udpRequestToSendTimeout = topology->getPropInt("@udpRequestToSendTimeout", 5);
  657. // MORE: think of a better way/value/check maybe/and/or based on Roxie server timeout
  658. if (udpRequestToSendTimeout == 0)
  659. udpRequestToSendTimeout = 5;
  660. // MORE: might want to check socket buffer sizes against sys max here instead of udp threads ?
  661. udpSnifferReadThreadPriority = topology->getPropInt("@udpSnifferReadThreadPriority", 3);
  662. udpSnifferSendThreadPriority = topology->getPropInt("@udpSnifferSendThreadPriority", 3);
  663. udpMulticastBufferSize = topology->getPropInt("@udpMulticastBufferSize", 262142);
  664. udpFlowSocketsSize = topology->getPropInt("@udpFlowSocketsSize", 131072);
  665. udpLocalWriteSocketSize = topology->getPropInt("@udpLocalWriteSocketSize", 1024000);
  666. roxieMulticastEnabled = topology->getPropBool("@roxieMulticastEnabled", true); // enable use of multicast for sending requests to slaves
  667. if (udpSnifferEnabled && !roxieMulticastEnabled)
  668. {
  669. DBGLOG("WARNING: ignoring udpSnifferEnabled setting as multicast not enabled");
  670. udpSnifferEnabled = false;
  671. }
  672. int ttlTmp = topology->getPropInt("@multicastTTL", 1);
  673. if (ttlTmp < 0)
  674. {
  675. multicastTTL = 1;
  676. WARNLOG("multicastTTL value (%d) invalid, must be >=0, resetting to %u", ttlTmp, multicastTTL);
  677. }
  678. else if (ttlTmp > 255)
  679. {
  680. multicastTTL = 255;
  681. WARNLOG("multicastTTL value (%d) invalid, must be <=%u, resetting to maximum", ttlTmp, multicastTTL);
  682. }
  683. else
  684. multicastTTL = ttlTmp;
  685. indexReadChunkSize = topology->getPropInt("@indexReadChunkSize", 60000);
  686. numSlaveThreads = topology->getPropInt("@slaveThreads", 30);
  687. numServerThreads = topology->getPropInt("@serverThreads", 30);
  688. numRequestArrayThreads = topology->getPropInt("@requestArrayThreads", 5);
  689. maxBlockSize = topology->getPropInt("@maxBlockSize", 10000000);
  690. maxLockAttempts = topology->getPropInt("@maxLockAttempts", 5);
  691. enableHeartBeat = topology->getPropBool("@enableHeartBeat", true);
  692. checkCompleted = topology->getPropBool("@checkCompleted", true);
  693. preabortKeyedJoinsThreshold = topology->getPropInt("@preabortKeyedJoinsThreshold", 100);
  694. preabortIndexReadsThreshold = topology->getPropInt("@preabortIndexReadsThreshold", 100);
  695. defaultMemoryLimit = (memsize_t) topology->getPropInt64("@defaultMemoryLimit", 0);
  696. defaultTimeLimit[0] = (unsigned) topology->getPropInt64("@defaultLowPriorityTimeLimit", 0);
  697. defaultTimeLimit[1] = (unsigned) topology->getPropInt64("@defaultHighPriorityTimeLimit", 0);
  698. defaultTimeLimit[2] = (unsigned) topology->getPropInt64("@defaultSLAPriorityTimeLimit", 0);
  699. defaultWarnTimeLimit[0] = (unsigned) topology->getPropInt64("@defaultLowPriorityTimeWarning", 0);
  700. defaultWarnTimeLimit[1] = (unsigned) topology->getPropInt64("@defaultHighPriorityTimeWarning", 0);
  701. defaultWarnTimeLimit[2] = (unsigned) topology->getPropInt64("@defaultSLAPriorityTimeWarning", 0);
  702. defaultThorConnectTimeout = (unsigned) topology->getPropInt64("@defaultThorConnectTimeout", 60);
  703. defaultXmlReadFlags = topology->getPropBool("@defaultStripLeadingWhitespace", true) ? ptr_ignoreWhiteSpace : ptr_none;
  704. defaultParallelJoinPreload = topology->getPropInt("@defaultParallelJoinPreload", 0);
  705. defaultConcatPreload = topology->getPropInt("@defaultConcatPreload", 0);
  706. defaultFetchPreload = topology->getPropInt("@defaultFetchPreload", 0);
  707. defaultFullKeyedJoinPreload = topology->getPropInt("@defaultFullKeyedJoinPreload", 0);
  708. defaultKeyedJoinPreload = topology->getPropInt("@defaultKeyedJoinPreload", 0);
  709. defaultPrefetchProjectPreload = topology->getPropInt("@defaultPrefetchProjectPreload", 10);
  710. defaultStrandBlockSize = topology->getPropInt("@defaultStrandBlockSize", 512);
  711. defaultForceNumStrands = topology->getPropInt("@defaultForceNumStrands", 0);
  712. defaultCheckingHeap = topology->getPropBool("@checkingHeap", false); // NOTE - not in configmgr - too dangerous!
  713. slaveQueryReleaseDelaySeconds = topology->getPropInt("@slaveQueryReleaseDelaySeconds", 60);
  714. coresPerQuery = topology->getPropInt("@coresPerQuery", 0);
  715. diskReadBufferSize = topology->getPropInt("@diskReadBufferSize", 0x10000);
  716. fieldTranslationEnabled = topology->getPropBool("@fieldTranslationEnabled", false);
  717. pretendAllOpt = topology->getPropBool("@ignoreMissingFiles", false);
  718. memoryStatsInterval = topology->getPropInt("@memoryStatsInterval", 60);
  719. roxiemem::setMemoryStatsInterval(memoryStatsInterval);
  720. pingInterval = topology->getPropInt("@pingInterval", 0);
  721. socketCheckInterval = topology->getPropInt("@socketCheckInterval", runOnce ? 0 : 5000);
  722. memsize_t totalMemoryLimit = (memsize_t) topology->getPropInt64("@totalMemoryLimit", 0);
  723. bool allowHugePages = topology->getPropBool("@heapUseHugePages", false);
  724. bool allowTransparentHugePages = topology->getPropBool("@heapUseTransparentHugePages", true);
  725. bool retainMemory = topology->getPropBool("@heapRetainMemory", false);
  726. if (!totalMemoryLimit)
  727. totalMemoryLimit = 1024 * 0x100000; // 1 Gb;
  728. roxiemem::setTotalMemoryLimit(allowHugePages, allowTransparentHugePages, retainMemory, totalMemoryLimit, 0, NULL, NULL);
  729. traceStartStop = topology->getPropBool("@traceStartStop", false);
  730. traceServerSideCache = topology->getPropBool("@traceServerSideCache", false);
  731. defaultTimeActivities = topology->getPropBool("@timeActivities", true);
  732. defaultTraceEnabled = topology->getPropBool("@traceEnabled", false);
  733. defaultTraceLimit = topology->getPropInt("@traceLimit", 10);
  734. clientCert.certificate.set(topology->queryProp("@certificateFileName"));
  735. clientCert.privateKey.set(topology->queryProp("@privateKeyFileName"));
  736. clientCert.passphrase.set(topology->queryProp("@passphrase"));
  737. useHardLink = topology->getPropBool("@useHardLink", false);
  738. maxFileAge[false] = topology->getPropInt("@localFilesExpire", (unsigned) -1);
  739. maxFileAge[true] = topology->getPropInt("@remoteFilesExpire", 60*60*1000);
  740. minFilesOpen[false] = topology->getPropInt("@minLocalFilesOpen", 2000);
  741. minFilesOpen[true] = topology->getPropInt("@minRemoteFilesOpen", 500);
  742. maxFilesOpen[false] = topology->getPropInt("@maxLocalFilesOpen", 4000);
  743. maxFilesOpen[true] = topology->getPropInt("@maxRemoteFilesOpen", 1000);
  744. dafilesrvLookupTimeout = topology->getPropInt("@dafilesrvLookupTimeout", 10000);
  745. setRemoteFileTimeouts(dafilesrvLookupTimeout, 0);
  746. topology->getProp("@daliServers", fileNameServiceDali);
  747. trapTooManyActiveQueries = topology->getPropBool("@trapTooManyActiveQueries", true);
  748. maxEmptyLoopIterations = topology->getPropInt("@maxEmptyLoopIterations", 1000);
  749. maxGraphLoopIterations = topology->getPropInt("@maxGraphLoopIterations", 1000);
  750. mergeSlaveStatistics = topology->getPropBool("@mergeSlaveStatistics", true);
  751. enableKeyDiff = topology->getPropBool("@enableKeyDiff", true);
  752. // MORE: Get parms from topology after it is populated from Hardware/computer types section in configenv
  753. // Then if does not match and based on desired action in topolgy, either warn, or fatal exit or .... etc
  754. // Also get prim path and sec from topology
  755. #ifdef _WIN32
  756. getHardwareInfo(hdwInfo, "C:", "D:");
  757. #else // linux
  758. getHardwareInfo(hdwInfo, "/c$", "/d$");
  759. #endif
  760. if (traceLevel)
  761. {
  762. DBGLOG("Current Hardware Info: CPUs=%i, speed=%i MHz, Mem=%i MB , primDisk=%i GB, primFree=%i GB, secDisk=%i GB, secFree=%i GB, NIC=%i",
  763. hdwInfo.numCPUs, hdwInfo.CPUSpeed, hdwInfo.totalMemory,
  764. hdwInfo.primDiskSize, hdwInfo.primFreeSize, hdwInfo.secDiskSize, hdwInfo.secFreeSize, hdwInfo.NICSpeed);
  765. }
  766. parallelAggregate = topology->getPropInt("@parallelAggregate", 0);
  767. if (!parallelAggregate)
  768. parallelAggregate = hdwInfo.numCPUs;
  769. if (!parallelAggregate)
  770. parallelAggregate = 1;
  771. simpleLocalKeyedJoins = topology->getPropBool("@simpleLocalKeyedJoins", true);
  772. inMemoryKeysEnabled = topology->getPropBool("@inMemoryKeysEnabled", true);
  773. serverSideCacheSize = topology->getPropInt("@serverSideCacheSize", 0);
  774. setKeyIndexCacheSize((unsigned)-1); // unbound
  775. nodeCachePreload = topology->getPropBool("@nodeCachePreload", false);
  776. setNodeCachePreload(nodeCachePreload);
  777. nodeCacheMB = topology->getPropInt("@nodeCacheMem", 100);
  778. setNodeCacheMem(nodeCacheMB * 0x100000);
  779. leafCacheMB = topology->getPropInt("@leafCacheMem", 50);
  780. setLeafCacheMem(leafCacheMB * 0x100000);
  781. blobCacheMB = topology->getPropInt("@blobCacheMem", 0);
  782. setBlobCacheMem(blobCacheMB * 0x100000);
  783. unsigned __int64 affinity = topology->getPropInt64("@affinity", 0);
  784. updateAffinity(affinity);
  785. minFreeDiskSpace = topology->getPropInt64("@minFreeDiskSpace", (1024 * 0x100000)); // default to 1 GB
  786. if (topology->getPropBool("@jumboFrames", false))
  787. {
  788. mtu_size = 9000; // upper limit on outbound buffer size - allow some header room too
  789. roxiemem::setDataAlignmentSize(0x2000);
  790. }
  791. else
  792. {
  793. mtu_size = 1400; // upper limit on outbound buffer size - allow some header room too
  794. roxiemem::setDataAlignmentSize(0x400);
  795. }
  796. unsigned pinterval = topology->getPropInt("@systemMonitorInterval",1000*60);
  797. perfMonHook.setown(roxiemem::createRoxieMemStatsPerfMonHook()); // Note - we create even if pinterval is 0, as can be enabled via control message
  798. if (pinterval)
  799. startPerformanceMonitor(pinterval, PerfMonStandard, perfMonHook);
  800. topology->getProp("@pluginDirectory", pluginDirectory);
  801. if (pluginDirectory.length() == 0)
  802. pluginDirectory.append(codeDirectory).append("plugins");
  803. if (queryDirectory.length() == 0)
  804. {
  805. topology->getProp("@queryDir", queryDirectory);
  806. if (queryDirectory.length() == 0)
  807. queryDirectory.append(codeDirectory).append("queries");
  808. }
  809. addNonEmptyPathSepChar(queryDirectory);
  810. queryFileCache().start();
  811. getTempFilePath(tempDirectory, "roxie", topology);
  812. #ifdef _WIN32
  813. topology->addPropBool("@linuxOS", false);
  814. #else
  815. topology->addPropBool("@linuxOS", true);
  816. #endif
  817. allQuerySetNames.appendListUniq(topology->queryProp("@querySets"), ",");
  818. targetAliases = createProperties();
  819. StringArray tempList;
  820. tempList.appendListUniq(topology->queryProp("@targetAliases"), ",");
  821. ForEachItemIn(i, tempList)
  822. {
  823. const char *alias = tempList.item(i);
  824. const char *eq = strchr(alias, '=');
  825. if (eq)
  826. {
  827. StringAttr name(alias, eq-alias);
  828. if (!allQuerySetNames.contains(name))
  829. targetAliases->setProp(name.str(), ++eq);
  830. }
  831. }
  832. Owned<IPropertyTreeIterator> roxieServers = topology->getElements("./RoxieServerProcess");
  833. ForEach(*roxieServers)
  834. {
  835. IPropertyTree &roxieServer = roxieServers->query();
  836. const char *iptext = roxieServer.queryProp("@netAddress");
  837. unsigned nodeIndex = addRoxieNode(iptext);
  838. if (getNodeAddress(nodeIndex).isLocal())
  839. myNodeIndex = nodeIndex;
  840. if (traceLevel > 3)
  841. DBGLOG("Roxie server %u is at %s", nodeIndex, iptext);
  842. }
  843. if (myNodeIndex == -1)
  844. throw MakeStringException(MSGAUD_operator, ROXIE_INVALID_TOPOLOGY, "Invalid topology file - current node is not in server list");
  845. // Set multicast base addresses - must be done before generating slave channels
  846. if (roxieMulticastEnabled && !localSlave)
  847. {
  848. if (topology->queryProp("@multicastBase"))
  849. multicastBase.ipset(topology->queryProp("@multicastBase"));
  850. else
  851. throw MakeStringException(MSGAUD_operator, ROXIE_INVALID_TOPOLOGY, "Invalid topology file - multicastBase not set");
  852. if (topology->queryProp("@multicastLast"))
  853. multicastLast.ipset(topology->queryProp("@multicastLast"));
  854. else
  855. throw MakeStringException(MSGAUD_operator, ROXIE_INVALID_TOPOLOGY, "Invalid topology file - multicastLast not set");
  856. }
  857. // Generate the slave channels
  858. unsigned numDataCopies = topology->getPropInt("@numDataCopies", 1);
  859. if (!numDataCopies)
  860. throw MakeStringException(MSGAUD_operator, ROXIE_INVALID_TOPOLOGY, "Invalid topology file - numDataCopies should be > 0");
  861. unsigned channelsPerNode = topology->getPropInt("@channelsPerNode", 1);
  862. unsigned numNodes = getNumNodes();
  863. const char *slaveConfig = topology->queryProp("@slaveConfig");
  864. if (!slaveConfig)
  865. slaveConfig = "simple";
  866. if (strnicmp(slaveConfig, "cyclic", 6) == 0)
  867. {
  868. numChannels = numNodes;
  869. unsigned cyclicOffset = topology->getPropInt("@cyclicOffset", 1);
  870. for (unsigned i=0; i<numNodes; i++)
  871. {
  872. // Note this code is a little confusing - easy to get the cyclic offset backwards
  873. // cyclic offset means node n+offset has copy 2 for channel n, so node n has copy 2 for channel n-offset
  874. int channel = (int)i+1;
  875. for (unsigned copy=0; copy<numDataCopies; copy++)
  876. {
  877. if (channel < 1)
  878. channel = channel + numNodes;
  879. addChannel(i, channel, copy);
  880. channel -= cyclicOffset;
  881. }
  882. }
  883. }
  884. else if (strnicmp(slaveConfig, "overloaded", 10) == 0)
  885. {
  886. if (!channelsPerNode)
  887. throw MakeStringException(MSGAUD_operator, ROXIE_INVALID_TOPOLOGY, "Invalid topology file - channelsPerNode should be > 0");
  888. numChannels = numNodes * channelsPerNode;
  889. for (unsigned i=0; i<numNodes; i++)
  890. {
  891. int channel = (int)(i+1);
  892. for (unsigned copy=0; copy<channelsPerNode; copy++)
  893. {
  894. addChannel(i, channel, copy);
  895. channel += numNodes;
  896. }
  897. }
  898. }
  899. else // 'Full redundancy' or 'simple' mode
  900. {
  901. if (numNodes % numDataCopies)
  902. throw MakeStringException(MSGAUD_operator, ROXIE_INVALID_TOPOLOGY, "Invalid topology file - numChannels not an integer");
  903. numChannels = numNodes / numDataCopies;
  904. int channel = 1;
  905. for (unsigned i=0; i<numNodes; i++)
  906. {
  907. addChannel(i, channel, 0);
  908. channel++;
  909. if ((unsigned)channel > numChannels)
  910. channel = 1;
  911. }
  912. }
  913. if (!numChannels)
  914. throw MakeStringException(MSGAUD_operator, ROXIE_INVALID_TOPOLOGY, "Invalid topology file - numChannels calculated at 0");
  915. if (numChannels > 1 && localSlave)
  916. throw MakeStringException(MSGAUD_operator, ROXIE_INVALID_TOPOLOGY, "Invalid topology file - localSlave requires single channel (%d channels specified)", numChannels);
  917. // Now we know all the channels, we can open and subscribe the multicast channels
  918. if (!localSlave)
  919. openMulticastSocket();
  920. setDaliServixSocketCaching(true); // enable daliservix caching
  921. loadPlugins();
  922. createDelayedReleaser();
  923. globalPackageSetManager = createRoxiePackageSetManager(standAloneDll.getClear());
  924. globalPackageSetManager->load();
  925. unsigned snifferChannel = numChannels+2; // MORE - why +2 not +1 ??
  926. ROQ = createOutputQueueManager(snifferChannel, numSlaveThreads);
  927. ROQ->setHeadRegionSize(headRegionSize);
  928. ROQ->start();
  929. Owned<IPacketDiscarder> packetDiscarder = createPacketDiscarder();
  930. #if defined(WIN32) && defined(_DEBUG) && defined(_DEBUG_HEAP_FULL)
  931. int tmpFlag = _CrtSetDbgFlag( _CRTDBG_REPORT_FLAG );
  932. tmpFlag |= _CRTDBG_CHECK_ALWAYS_DF;
  933. _CrtSetDbgFlag( tmpFlag );
  934. #endif
  935. EnableSEHtoExceptionMapping();
  936. setSEHtoExceptionHandler(&abortHandler);
  937. Owned<IHpccProtocolPluginContext> protocolCtx = new CHpccProtocolPluginCtx();
  938. if (runOnce)
  939. {
  940. Owned<IHpccProtocolPlugin> protocolPlugin = loadHpccProtocolPlugin(protocolCtx, NULL);
  941. Owned<IHpccProtocolListener> roxieServer = protocolPlugin->createListener("runOnce", createRoxieProtocolMsgSink(getNodeAddress(myNodeIndex), 0, 1, false), 0, 0, NULL);
  942. try
  943. {
  944. const char *format = globals->queryProp("format");
  945. if (!format)
  946. {
  947. if (globals->hasProp("-xml"))
  948. format = "xml";
  949. else if (globals->hasProp("-csv"))
  950. format = "csv";
  951. else if (globals->hasProp("-raw"))
  952. format = "raw";
  953. else
  954. format = "ascii";
  955. }
  956. StringBuffer query;
  957. query.appendf("<roxie format='%s'/>", format);
  958. roxieServer->runOnce(query.str()); // MORE - should use the wu listener instead I suspect
  959. fflush(stdout); // in windows if output is redirected results don't appear without flushing
  960. }
  961. catch (IException *E)
  962. {
  963. EXCLOG(E);
  964. E->Release();
  965. }
  966. }
  967. else
  968. {
  969. Owned<IPropertyTreeIterator> roxieFarms = topology->getElements("./RoxieFarmProcess");
  970. ForEach(*roxieFarms)
  971. {
  972. IPropertyTree &roxieFarm = roxieFarms->query();
  973. unsigned listenQueue = roxieFarm.getPropInt("@listenQueue", DEFAULT_LISTEN_QUEUE_SIZE);
  974. unsigned numThreads = roxieFarm.getPropInt("@numThreads", numServerThreads);
  975. unsigned port = roxieFarm.getPropInt("@port", ROXIE_SERVER_PORT);
  976. unsigned requestArrayThreads = roxieFarm.getPropInt("@requestArrayThreads", 5);
  977. const IpAddress &ip = getNodeAddress(myNodeIndex);
  978. if (!roxiePort)
  979. {
  980. roxiePort = port;
  981. ownEP.set(roxiePort, ip);
  982. }
  983. bool suspended = roxieFarm.getPropBool("@suspended", false);
  984. Owned <IHpccProtocolListener> roxieServer;
  985. if (port)
  986. {
  987. const char *protocol = roxieFarm.queryProp("@protocol");
  988. const char *soname = roxieFarm.queryProp("@so");
  989. const char *config = roxieFarm.queryProp("@config");
  990. Owned<IHpccProtocolPlugin> protocolPlugin = ensureProtocolPlugin(*protocolCtx, soname);
  991. roxieServer.setown(protocolPlugin->createListener(protocol ? protocol : "native", createRoxieProtocolMsgSink(ip, port, numThreads, suspended), port, listenQueue, config));
  992. }
  993. else
  994. roxieServer.setown(createRoxieWorkUnitListener(numThreads, suspended));
  995. IHpccProtocolMsgSink *sink = roxieServer->queryMsgSink();
  996. const char *aclName = roxieFarm.queryProp("@aclName");
  997. if (aclName && *aclName)
  998. {
  999. Owned<IPropertyTree> aclInfo = createPTree("AccessInfo");
  1000. getAccessList(aclName, topology, aclInfo);
  1001. Owned<IPropertyTreeIterator> accesses = aclInfo->getElements("Access");
  1002. ForEach(*accesses)
  1003. {
  1004. IPropertyTree &access = accesses->query();
  1005. try
  1006. {
  1007. sink->addAccess(access.getPropBool("@allow", true), access.getPropBool("@allowBlind", true), access.queryProp("@ip"), access.queryProp("@mask"), access.queryProp("@query"), access.queryProp("@error"), access.getPropInt("@errorCode"));
  1008. }
  1009. catch (IException *E)
  1010. {
  1011. StringBuffer s, x;
  1012. E->errorMessage(s);
  1013. E->Release();
  1014. toXML(&access, x, 0, 0);
  1015. throw MakeStringException(ROXIE_ACL_ERROR, "Error in access statement %s: %s", x.str(), s.str());
  1016. }
  1017. }
  1018. }
  1019. socketListeners.append(*roxieServer.getLink());
  1020. time(&startupTime);
  1021. roxieServer->start();
  1022. }
  1023. writeSentinelFile(sentinelFile);
  1024. DBGLOG("Waiting for queries");
  1025. if (pingInterval)
  1026. startPingTimer();
  1027. LocalIAbortHandler abortHandler(waiter);
  1028. waiter.wait();
  1029. }
  1030. shuttingDown = true;
  1031. if (pingInterval)
  1032. stopPingTimer();
  1033. setSEHtoExceptionHandler(NULL);
  1034. while (socketListeners.isItem(0))
  1035. {
  1036. socketListeners.item(0).stop(1000);
  1037. socketListeners.remove(0);
  1038. }
  1039. packetDiscarder->stop();
  1040. packetDiscarder.clear();
  1041. ROQ->stop();
  1042. ROQ->join();
  1043. ROQ->Release();
  1044. ROQ = NULL;
  1045. }
  1046. catch (IException *E)
  1047. {
  1048. StringBuffer x;
  1049. DBGLOG("EXCEPTION: (%d): %s", E->errorCode(), E->errorMessage(x).str());
  1050. E->Release();
  1051. }
  1052. roxieMetrics.clear();
  1053. stopPerformanceMonitor();
  1054. ::Release(globalPackageSetManager);
  1055. globalPackageSetManager = NULL;
  1056. stopDelayedReleaser();
  1057. cleanupPlugins();
  1058. closeMulticastSockets();
  1059. releaseSlaveDynamicFileCache();
  1060. releaseRoxieStateCache();
  1061. setDaliServixSocketCaching(false); // make sure it cleans up or you get bogus memleak reports
  1062. setNodeCaching(false); // ditto
  1063. perfMonHook.clear();
  1064. strdup("Make sure leak checking is working");
  1065. UseSysLogForOperatorMessages(false);
  1066. ExitModuleObjects();
  1067. releaseAtoms();
  1068. strdup("Make sure leak checking is working");
  1069. #ifdef _WIN32
  1070. #ifdef _DEBUG
  1071. #if 1
  1072. StringBuffer leakFileDir(logDirectory.str());
  1073. leakFileDir.append("roxieleaks.log");
  1074. HANDLE h = CreateFile(leakFileDir.str(), GENERIC_READ|GENERIC_WRITE, 0, NULL, CREATE_ALWAYS, 0, 0);
  1075. _CrtSetReportMode( _CRT_WARN, _CRTDBG_MODE_FILE|_CRTDBG_MODE_DEBUG);
  1076. _CrtSetReportFile( _CRT_WARN, h);
  1077. _CrtSetReportMode( _CRT_ERROR, _CRTDBG_MODE_FILE|_CRTDBG_MODE_DEBUG);
  1078. _CrtSetReportFile( _CRT_ERROR, h);
  1079. _CrtSetReportMode( _CRT_ASSERT, _CRTDBG_MODE_FILE|_CRTDBG_MODE_DEBUG);
  1080. _CrtSetReportFile( _CRT_ASSERT, h);
  1081. // _CrtDumpMemoryLeaks(); if you uncomment these lines you get to see the leaks sooner (so can look in debugger at full memory)
  1082. // CloseHandle(h); but there will be additional leaks reported that are not really leaks
  1083. #endif
  1084. #endif
  1085. #endif
  1086. return 0;
  1087. }