eclccserver.cpp 28 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813
  1. /*##############################################################################
  2. HPCC SYSTEMS software Copyright (C) 2012 HPCC Systems®.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. ############################################################################## */
  13. #include "jlib.hpp"
  14. #include "jmisc.hpp"
  15. #include "jisem.hpp"
  16. #include "jfile.hpp"
  17. #include "jencrypt.hpp"
  18. #include "jregexp.hpp"
  19. #include "mpbase.hpp"
  20. #include "daclient.hpp"
  21. #include "dasess.hpp"
  22. #include "danqs.hpp"
  23. #include "dalienv.hpp"
  24. #include "workunit.hpp"
  25. #include "wujobq.hpp"
  26. #include "dllserver.hpp"
  27. #include "thorplugin.hpp"
  28. static StringAttr dllPath;
  29. Owned<IPropertyTree> globals;
  30. //------------------------------------------------------------------------------------------------------------------
  31. // We use a separate thread for reading eclcc's stderr output. This prevents the thread that is
  32. // writing to its stdin from being blocked because eclcc is trying to write to stderr...
  33. //------------------------------------------------------------------------------------------------------------------
  34. interface IErrorReporter
  35. {
  36. virtual void reportError(IException *e) = 0;
  37. virtual void reportError(const char *errStr, unsigned retcode) = 0;
  38. };
  39. class ErrorReader : public Thread
  40. {
  41. public:
  42. ErrorReader(IPipeProcess *_pipe, IErrorReporter *_errorReporter)
  43. : Thread("EclccCompileThread::ErrorReader"), pipe(_pipe), errorReporter(_errorReporter), errors(0)
  44. {
  45. }
  46. virtual int run()
  47. {
  48. MemoryAttr buf;
  49. const size32_t incSize = 512;
  50. size32_t bufferSize = 0;
  51. char * buffer = NULL;
  52. size_t remaining = 0;
  53. bool eof = false;
  54. while (!eof)
  55. {
  56. if (remaining == bufferSize)
  57. {
  58. bufferSize += incSize;
  59. buffer = (char *) buf.reallocate(bufferSize);
  60. }
  61. size32_t read = pipe->readError(bufferSize-remaining, buffer+remaining);
  62. if ((read == 0) || (read == (size32_t)-1))
  63. eof = true;
  64. else
  65. remaining += read;
  66. char *finger = buffer;
  67. while (remaining)
  68. {
  69. char *eolpos = (char *) memchr(finger, '\n', remaining);
  70. if (eolpos)
  71. {
  72. *eolpos = '\0';
  73. if (eolpos > finger && eolpos[-1]=='\r')
  74. eolpos[-1] = '\0';
  75. if (errorReporter)
  76. errorReporter->reportError(finger, 0);
  77. else
  78. DBGLOG("%s", finger);
  79. errors++;
  80. remaining -= (eolpos-finger) + 1;
  81. finger = eolpos + 1;
  82. }
  83. else if (eof)
  84. {
  85. StringBuffer e(remaining, finger);
  86. if (errorReporter)
  87. errorReporter->reportError(e, 0);
  88. else
  89. DBGLOG("%s", e.str());
  90. errors++;
  91. break;
  92. }
  93. else
  94. break;
  95. }
  96. if (!eof && (finger != buffer))
  97. memmove(buffer, finger, remaining);
  98. }
  99. return 0;
  100. }
  101. unsigned errCount() const
  102. {
  103. return errors;
  104. }
  105. private:
  106. IPipeProcess *pipe;
  107. IErrorReporter *errorReporter;
  108. unsigned errors;
  109. };
  110. //------------------------------------------------------------------------------------------------------------------
  111. // Check for aborts of the workunit as it is compiling
  112. //------------------------------------------------------------------------------------------------------------------
  113. class AbortWaiter : public Thread
  114. {
  115. public:
  116. AbortWaiter(IPipeProcess *_pipe, IConstWorkUnit *_wu)
  117. : Thread("EclccCompileThread::AbortWaiter"), pipe(_pipe), wu(_wu)
  118. {
  119. }
  120. virtual int run()
  121. {
  122. wu->subscribe(SubscribeOptionAbort);
  123. try
  124. {
  125. for (;;)
  126. {
  127. if (sem.wait(2000))
  128. break;
  129. if (wu->aborting())
  130. {
  131. pipe->abort();
  132. break;
  133. }
  134. }
  135. }
  136. catch (IException *E)
  137. {
  138. ::Release(E);
  139. }
  140. return 0;
  141. }
  142. void stop()
  143. {
  144. sem.interrupt(NULL);
  145. join();
  146. }
  147. private:
  148. IPipeProcess *pipe;
  149. IConstWorkUnit *wu;
  150. InterruptableSemaphore sem;
  151. };
  152. //------------------------------------------------------------------------------------------------------------------
  153. // Class EclccCompileThread does the work of compiling workunits (using eclcc), and optionally then enqueueing them for execution by agentexec.
  154. // A threadpool is used to allow multiple compiles to be submitted at once. Threads are reused when compilation completes.
  155. //------------------------------------------------------------------------------------------------------------------
  156. class EclccCompileThread : implements IPooledThread, implements IErrorReporter, public CInterface
  157. {
  158. StringAttr wuid;
  159. Owned<IWorkUnit> workunit;
  160. StringBuffer idxStr;
  161. StringArray filesSeen;
  162. virtual void reportError(IException *e)
  163. {
  164. StringBuffer s;
  165. reportError(e->errorMessage(s).str(), 2);
  166. }
  167. virtual void reportError(const char *errStr, unsigned retcode)
  168. {
  169. // A typical error looks like this: stdin:(385,29): warning C1041: Record doesn't have an explicit maximum record size
  170. // we will also see (and want to skip) nn error(s), nn warning(s)
  171. // Errors reported in generated c++ files are a bit special - we want to add the generated c++ file (if it still exists) to the workunit
  172. RegExpr errCount, errParse, timings;
  173. timings.init("^<stat");
  174. errParse.init("^<exception");
  175. if (timings.find(errStr))
  176. {
  177. OwnedPTree timing = createPTreeFromXMLString(errStr, ipt_fast);
  178. if (timing)
  179. {
  180. unsigned __int64 nval = timing->getPropInt64("@value");
  181. unsigned __int64 nmax = timing->getPropInt64("@max");
  182. unsigned __int64 cnt = timing->getPropInt64("@count");
  183. const char * scope = timing->queryProp("@scope");
  184. StatisticScopeType scopeType = (StatisticScopeType)timing->getPropInt("@scopeType");
  185. StatisticKind kind = queryStatisticKind(timing->queryProp("@kind"), StKindNone);
  186. workunit->setStatistic(queryStatisticsComponentType(), queryStatisticsComponentName(), scopeType, scope, kind, NULL, nval, cnt, nmax, StatsMergeReplace);
  187. }
  188. else
  189. DBGLOG("Unrecognised timing: %s", errStr);
  190. }
  191. else if (errParse.find(errStr))
  192. {
  193. OwnedPTree exception = createPTreeFromXMLString(errStr, ipt_fast);
  194. if (exception)
  195. {
  196. Owned<IError> error = createError(exception);
  197. addWorkunitException(workunit, error, false);
  198. const char *filename = exception->queryProp("@filename");
  199. StringArray filesSeen;
  200. if (filename && filesSeen.appendUniq(filename) && endsWithIgnoreCase(filename, ".cpp") && checkFileExists(filename))
  201. {
  202. Owned<IWUQuery> query = workunit->updateQuery();
  203. associateLocalFile(query, FileTypeCpp, filename, pathTail(filename), 0, 0, 0);
  204. }
  205. }
  206. else
  207. DBGLOG("Unrecognised error: %s", errStr);
  208. }
  209. }
  210. void processOption(const char *option, const char *value, StringBuffer &eclccCmd, StringBuffer &eclccProgName, IPipeProcess &pipe, bool isLocal)
  211. {
  212. if (memicmp(option, "eclcc-", 6) == 0 || *option=='-')
  213. {
  214. //Allow eclcc-xx-<n> so that multiple values can be passed through for the same named debug symbol
  215. const char * start = option + (*option=='-' ? 1 : 6);
  216. const char * finger = (*start=='-') ? start+1 : start; //support leading double dash
  217. const char * dash = strrchr(finger, '-'); // position of trailing dash, if present
  218. StringAttr optName;
  219. if (dash && (dash != start))
  220. optName.set(start, dash-start);
  221. else
  222. optName.set(start);
  223. if (!optName)
  224. return;
  225. if (stricmp(optName, "hook") == 0)
  226. {
  227. if (isLocal)
  228. throw MakeStringException(0, "eclcc-hook option can not be set per-workunit"); // for security reasons
  229. eclccProgName.set(value);
  230. }
  231. else if (stricmp(optName, "compileOption") == 0)
  232. eclccCmd.appendf(" -Wc,%s", value);
  233. else if (stricmp(optName, "linkOption") == 0)
  234. eclccCmd.appendf(" -Wl,%s", value);
  235. else if (stricmp(optName, "includeLibraryPath") == 0)
  236. eclccCmd.appendf(" -I%s", value);
  237. else if (stricmp(optName, "libraryPath") == 0)
  238. eclccCmd.appendf(" -L%s", value);
  239. else if (strnicmp(optName, "-allow", 6)==0)
  240. {
  241. if (isLocal)
  242. throw MakeStringException(0, "eclcc-allow option can not be set per-workunit"); // for security reasons
  243. eclccCmd.appendf(" -%s=%s", optName.get(), value);
  244. }
  245. else if (*optName == 'd')
  246. {
  247. //Short term work around for the problem that all debug names get lower-cased
  248. eclccCmd.appendf(" -D%s=%s", optName.get()+1, value);
  249. }
  250. else
  251. eclccCmd.appendf(" -%s=%s", optName.get(), value);
  252. }
  253. else if (strchr(option, '-'))
  254. {
  255. StringBuffer envVar;
  256. if (isLocal)
  257. envVar.append("WU_");
  258. envVar.append(option);
  259. envVar.toUpperCase();
  260. envVar.replace('-','_');
  261. pipe.setenv(envVar, value);
  262. }
  263. else
  264. eclccCmd.appendf(" -f%s=%s", option, value);
  265. }
  266. bool compile(const char *wuid, const char *target, const char *targetCluster)
  267. {
  268. Owned<IConstWUQuery> query = workunit->getQuery();
  269. if (!query)
  270. {
  271. reportError("Workunit does not contain a query", 2);
  272. return false;
  273. }
  274. addTimeStamp(workunit, SSTglobal, NULL, StWhenCompiled);
  275. SCMStringBuffer mainDefinition;
  276. SCMStringBuffer eclQuery;
  277. query->getQueryText(eclQuery);
  278. query->getQueryMainDefinition(mainDefinition);
  279. StringBuffer eclccProgName;
  280. splitDirTail(queryCurrentProcessPath(), eclccProgName);
  281. eclccProgName.append("eclcc");
  282. StringBuffer eclccCmd(" -shared");
  283. if (eclQuery.length())
  284. eclccCmd.append(" -");
  285. if (mainDefinition.length())
  286. eclccCmd.append(" -main ").append(mainDefinition);
  287. eclccCmd.append(" --timings --xml");
  288. eclccCmd.append(" --nostdinc");
  289. if (globals->getPropBool("@enableEclccDali", true))
  290. {
  291. const char *daliServers = globals->queryProp("@daliServers");
  292. if (!daliServers)
  293. daliServers = ".";
  294. eclccCmd.appendf(" -dfs=%s", daliServers);
  295. const char *wuScope = workunit->queryWuScope();
  296. if (!isEmptyString(wuScope))
  297. eclccCmd.appendf(" -scope=%s", wuScope);
  298. eclccCmd.appendf(" -cluster=%s", targetCluster);
  299. SCMStringBuffer token;
  300. workunit->getSecurityToken(token);
  301. if (token.length())
  302. eclccCmd.appendf(" -wuid=%s -token=%s", workunit->queryWuid(), token.str());
  303. }
  304. Owned<IPipeProcess> pipe = createPipeProcess();
  305. pipe->setenv("ECLCCSERVER_THREAD_INDEX", idxStr.str());
  306. Owned<IPropertyTreeIterator> options = globals->getElements("./Option");
  307. ForEach(*options)
  308. {
  309. IPropertyTree &option = options->query();
  310. const char *name = option.queryProp("@name");
  311. const char *value = option.queryProp("@value");
  312. const char *cluster = option.queryProp("@cluster"); // if cluster is set it's specific to a particular target
  313. if (name && (cluster==NULL || cluster[0]==0 || strcmp(cluster, targetCluster)==0))
  314. processOption(name, value, eclccCmd, eclccProgName, *pipe, false);
  315. }
  316. eclccCmd.appendf(" -o%s", wuid);
  317. eclccCmd.appendf(" -platform=%s", target);
  318. eclccCmd.appendf(" --component=%s", queryStatisticsComponentName());
  319. Owned<IStringIterator> debugValues = &workunit->getDebugValues();
  320. ForEach (*debugValues)
  321. {
  322. SCMStringBuffer debugStr, valueStr;
  323. debugValues->str(debugStr);
  324. workunit->getDebugValue(debugStr.str(), valueStr);
  325. processOption(debugStr.str(), valueStr.str(), eclccCmd, eclccProgName, *pipe, true);
  326. }
  327. if (workunit->getResultLimit())
  328. {
  329. eclccCmd.appendf(" -fapplyInstantEclTransformations=1 -fapplyInstantEclTransformationsLimit=%u", workunit->getResultLimit());
  330. }
  331. try
  332. {
  333. cycle_t startCycles = get_cycles_now();
  334. Owned<ErrorReader> errorReader = new ErrorReader(pipe, this);
  335. Owned<AbortWaiter> abortWaiter = new AbortWaiter(pipe, workunit);
  336. eclccCmd.insert(0, eclccProgName);
  337. if (!pipe->run(eclccProgName, eclccCmd, ".", true, false, true, 0, true))
  338. throw makeStringExceptionV(999, "Failed to run eclcc command %s", eclccCmd.str());
  339. errorReader->start();
  340. abortWaiter->start();
  341. try
  342. {
  343. pipe->write(eclQuery.s.length(), eclQuery.s.str());
  344. pipe->closeInput();
  345. }
  346. catch (IException *e)
  347. {
  348. reportError(e);
  349. e->Release();
  350. }
  351. unsigned retcode = pipe->wait();
  352. errorReader->join();
  353. abortWaiter->stop();
  354. if (retcode == 0)
  355. {
  356. StringBuffer realdllname, dllurl;
  357. realdllname.append(SharedObjectPrefix).append(wuid).append(SharedObjectExtension);
  358. StringBuffer realdllfilename(dllPath);
  359. realdllfilename.append(SharedObjectPrefix).append(wuid).append(SharedObjectExtension);
  360. StringBuffer wuXML;
  361. if (!getWorkunitXMLFromFile(realdllfilename, wuXML))
  362. throw makeStringException(999, "Failed to extract workunit from query dll");
  363. Owned<ILocalWorkUnit> embeddedWU = createLocalWorkUnit(wuXML);
  364. queryExtendedWU(workunit)->copyWorkUnit(embeddedWU, false, true);
  365. workunit->setIsClone(false);
  366. const char *jobname = embeddedWU->queryJobName();
  367. if (jobname && *jobname) //let ECL win naming job during initial compile
  368. workunit->setJobName(jobname);
  369. if (!workunit->getDebugValueBool("obfuscateOutput", false))
  370. {
  371. Owned<IWUQuery> query = workunit->updateQuery();
  372. query->setQueryText(eclQuery.s.str());
  373. }
  374. createUNCFilename(realdllfilename.str(), dllurl);
  375. unsigned crc = crc_file(realdllfilename.str());
  376. Owned<IWUQuery> query = workunit->updateQuery();
  377. associateLocalFile(query, FileTypeDll, realdllfilename, "Workunit DLL", crc);
  378. queryDllServer().registerDll(realdllname.str(), "Workunit DLL", dllurl.str());
  379. cycle_t elapsedCycles = get_cycles_now() - startCycles;
  380. updateWorkunitTimeStat(workunit, SSTcompilestage, "compile", StTimeElapsed, NULL, cycle_to_nanosec(elapsedCycles));
  381. workunit->commit();
  382. return true;
  383. }
  384. }
  385. catch (IException * e)
  386. {
  387. reportError(e);
  388. e->Release();
  389. }
  390. return false;
  391. }
  392. void failCompilation(const char *error)
  393. {
  394. reportError(error, 2);
  395. workunit->setState(WUStateFailed);
  396. workunit->commit();
  397. workunit.clear();
  398. }
  399. public:
  400. IMPLEMENT_IINTERFACE;
  401. EclccCompileThread(unsigned _idx)
  402. {
  403. idxStr.append(_idx);
  404. }
  405. virtual void init(void *param) override
  406. {
  407. wuid.set((const char *) param);
  408. }
  409. virtual void threadmain() override
  410. {
  411. DBGLOG("Compile request processing for workunit %s", wuid.get());
  412. Owned<IWorkUnitFactory> factory = getWorkUnitFactory();
  413. workunit.setown(factory->updateWorkUnit(wuid.get()));
  414. if (!workunit)
  415. {
  416. DBGLOG("Workunit %s no longer exists", wuid.get());
  417. return;
  418. }
  419. if (workunit->aborting() || workunit->getState()==WUStateAborted)
  420. {
  421. workunit->setState(WUStateAborted);
  422. DBGLOG("Workunit %s aborted", wuid.get());
  423. workunit->commit();
  424. workunit.clear();
  425. return;
  426. }
  427. CSDSServerStatus serverstatus("ECLCCserver");
  428. serverstatus.queryProperties()->setProp("WorkUnit",wuid.get());
  429. serverstatus.commitProperties();
  430. workunit->setAgentSession(myProcessSession());
  431. StringAttr clusterName(workunit->queryClusterName());
  432. Owned<IConstWUClusterInfo> clusterInfo = getTargetClusterInfo(clusterName.str());
  433. if (!clusterInfo)
  434. {
  435. VStringBuffer errStr("Cluster %s not recognized", clusterName.str());
  436. failCompilation(errStr);
  437. return;
  438. }
  439. ClusterType platform = clusterInfo->getPlatform();
  440. clusterInfo.clear();
  441. workunit->setState(WUStateCompiling);
  442. workunit->commit();
  443. bool ok = false;
  444. try
  445. {
  446. ok = compile(wuid, clusterTypeString(platform, true), clusterName.str());
  447. }
  448. catch (IException * e)
  449. {
  450. StringBuffer msg;
  451. e->errorMessage(msg);
  452. addExceptionToWorkunit(workunit, SeverityError, "eclccserver", e->errorCode(), msg.str(), NULL, 0, 0, 0);
  453. e->Release();
  454. }
  455. if (ok)
  456. {
  457. workunit->setState(WUStateCompiled);
  458. const char *newClusterName = workunit->queryClusterName(); // Workunit can change the cluster name via #workunit, so reload it
  459. if (strcmp(newClusterName, clusterName.str()) != 0)
  460. {
  461. clusterInfo.setown(getTargetClusterInfo(newClusterName));
  462. if (!clusterInfo)
  463. {
  464. VStringBuffer errStr("Cluster %s by #workunit not recognized", newClusterName);
  465. failCompilation(errStr);
  466. return;
  467. }
  468. if (platform != clusterInfo->getPlatform())
  469. {
  470. VStringBuffer errStr("Cluster %s specified by #workunit is wrong type for this queue", newClusterName);
  471. failCompilation(errStr);
  472. return;
  473. }
  474. clusterInfo.clear();
  475. }
  476. if (workunit->getAction()==WUActionRun || workunit->getAction()==WUActionUnknown) // Assume they meant run....
  477. {
  478. if (isLibrary(workunit))
  479. {
  480. workunit->setState(WUStateCompleted);
  481. }
  482. else
  483. {
  484. workunit->schedule();
  485. SCMStringBuffer dllBuff;
  486. Owned<IConstWUQuery> wuQuery = workunit->getQuery();
  487. wuQuery->getQueryDllName(dllBuff);
  488. wuQuery.clear();
  489. if (dllBuff.length() > 0)
  490. {
  491. workunit.clear();
  492. if (!runWorkUnit(wuid, clusterName.str()))
  493. {
  494. Owned<IWorkUnitFactory> factory = getWorkUnitFactory();
  495. workunit.setown(factory->updateWorkUnit(wuid));
  496. reportError("Failed to execute workunit", 2);
  497. if (workunit->getState() != WUStateAborted)
  498. workunit->setState(WUStateFailed);
  499. }
  500. }
  501. else
  502. {
  503. reportError("Failed to execute workunit (unknown DLL name)", 2);
  504. workunit->setState(WUStateFailed);
  505. }
  506. }
  507. }
  508. }
  509. else if (workunit->getState() != WUStateAborted)
  510. workunit->setState(WUStateFailed);
  511. if (workunit)
  512. workunit->commit();
  513. workunit.clear();
  514. }
  515. virtual bool stop() override
  516. {
  517. return false; // should I try to abort?
  518. }
  519. virtual bool canReuse() const override
  520. {
  521. return true;
  522. }
  523. };
  524. #ifndef _WIN32
  525. static void generatePrecompiledHeader()
  526. {
  527. try
  528. {
  529. Owned<IPipeProcess> pipe = createPipeProcess();
  530. Owned<ErrorReader> errorReader = new ErrorReader(pipe, NULL);
  531. StringBuffer cmd;
  532. splitDirTail(queryCurrentProcessPath(), cmd);
  533. cmd.append("eclcc -pch");
  534. if (pipe->run("eclcc", cmd, ".", false, false, true, 0))
  535. {
  536. errorReader->start();
  537. unsigned retcode = pipe->wait();
  538. errorReader->join();
  539. if (retcode != 0 || errorReader->errCount() != 0)
  540. throw MakeStringException(0, "eclcc -pch failed");
  541. DBGLOG("Created precompiled header");
  542. }
  543. }
  544. catch (IException * e)
  545. {
  546. EXCLOG(e, "Creating precompiled header");
  547. e->Release();
  548. }
  549. }
  550. static void removePrecompiledHeader()
  551. {
  552. removeFileTraceIfFail("eclinclude4.hpp.gch");
  553. }
  554. #endif
  555. //------------------------------------------------------------------------------------------------------------------
  556. // Class EclccServer manages a pool of compile threads
  557. //------------------------------------------------------------------------------------------------------------------
  558. class EclccServer : public CInterface, implements IThreadFactory, implements IAbortHandler
  559. {
  560. StringAttr queueName;
  561. unsigned poolSize;
  562. Owned<IThreadPool> pool;
  563. unsigned threadsActive;
  564. CriticalSection threadActiveCrit;
  565. bool running;
  566. CSDSServerStatus serverstatus;
  567. Owned<IJobQueue> queue;
  568. public:
  569. IMPLEMENT_IINTERFACE;
  570. EclccServer(const char *_queueName, unsigned _poolSize)
  571. : queueName(_queueName), poolSize(_poolSize), serverstatus("ECLCCserver")
  572. {
  573. threadsActive = 0;
  574. running = false;
  575. pool.setown(createThreadPool("eclccServerPool", this, NULL, poolSize, INFINITE));
  576. serverstatus.queryProperties()->setProp("@queue",queueName.get());
  577. serverstatus.commitProperties();
  578. }
  579. ~EclccServer()
  580. {
  581. pool->joinAll(false, INFINITE);
  582. }
  583. void run()
  584. {
  585. DBGLOG("eclccServer (%d threads) waiting for requests on queue(s) %s", poolSize, queueName.get());
  586. queue.setown(createJobQueue(queueName.get()));
  587. queue->connect(false);
  588. running = true;
  589. LocalIAbortHandler abortHandler(*this);
  590. while (running)
  591. {
  592. try
  593. {
  594. Owned<IJobQueueItem> item = queue->dequeue();
  595. if (item.get())
  596. {
  597. try
  598. {
  599. pool->start((void *) item->queryWUID());
  600. }
  601. catch(IException *e)
  602. {
  603. StringBuffer m;
  604. EXCLOG(e, "eclccServer::run exception");
  605. e->Release();
  606. }
  607. catch(...)
  608. {
  609. ERRLOG("Unexpected exception in eclccServer::run caught");
  610. }
  611. }
  612. }
  613. catch (IException *E)
  614. {
  615. EXCLOG(E);
  616. releaseAtoms();
  617. ExitModuleObjects();
  618. _exit(2);
  619. }
  620. catch (...)
  621. {
  622. DBGLOG("Unknown exception caught in eclccServer::run - restarting");
  623. releaseAtoms();
  624. ExitModuleObjects();
  625. _exit(2);
  626. }
  627. }
  628. DBGLOG("eclccServer closing");
  629. }
  630. virtual IPooledThread *createNew()
  631. {
  632. CriticalBlock b(threadActiveCrit);
  633. return new EclccCompileThread(threadsActive++);
  634. }
  635. virtual bool onAbort()
  636. {
  637. running = false;
  638. if (queue)
  639. queue->cancelAcceptConversation();
  640. return false;
  641. }
  642. };
  643. void openLogFile()
  644. {
  645. StringBuffer logname;
  646. envGetConfigurationDirectory("log","eclccserver",globals->queryProp("@name"),logname);
  647. Owned<IComponentLogFileCreator> lf = createComponentLogFileCreator(logname.str(), "eclccserver");
  648. lf->beginLogging();
  649. }
  650. //=========================================================================================
  651. //////////////////////////////////////////////////////////////////////////////////////////////
  652. extern "C" void caughtSIGPIPE(int sig)
  653. {
  654. DBGLOG("Caught sigpipe %d", sig);
  655. }
  656. extern "C" void caughtSIGHUP(int sig)
  657. {
  658. DBGLOG("Caught sighup %d", sig);
  659. }
  660. extern "C" void caughtSIGALRM(int sig)
  661. {
  662. DBGLOG("Caught sigalrm %d", sig);
  663. }
  664. extern "C" void caughtSIGTERM(int sig)
  665. {
  666. DBGLOG("Caught sigterm %d", sig);
  667. }
  668. void initSignals()
  669. {
  670. #ifndef _WIN32
  671. // signal(SIGTERM, caughtSIGTERM);
  672. signal(SIGPIPE, caughtSIGPIPE);
  673. signal(SIGHUP, caughtSIGHUP);
  674. signal(SIGALRM, caughtSIGALRM);
  675. #endif
  676. }
  677. int main(int argc, const char *argv[])
  678. {
  679. InitModuleObjects();
  680. initSignals();
  681. NoQuickEditSection x;
  682. Owned<IFile> sentinelFile = createSentinelTarget();
  683. // We remove any existing sentinel until we have validated that we can successfully start (i.e. all options are valid...)
  684. removeSentinelFile(sentinelFile);
  685. try
  686. {
  687. globals.setown(createPTreeFromXMLFile("eclccserver.xml", ipt_caseInsensitive));
  688. }
  689. catch (IException * e)
  690. {
  691. EXCLOG(e, "Failed to load eclccserver.xml");
  692. e->Release();
  693. return 1;
  694. }
  695. catch(...)
  696. {
  697. ERRLOG("Failed to load eclccserver.xml");
  698. return 1;
  699. }
  700. const char * processName = globals->queryProp("@name");
  701. setStatisticsComponentName(SCTeclcc, processName, true);
  702. if (globals->getPropBool("@enableSysLog",true))
  703. UseSysLogForOperatorMessages();
  704. #ifndef _WIN32
  705. if (globals->getPropBool("@generatePrecompiledHeader",true))
  706. generatePrecompiledHeader();
  707. else
  708. removePrecompiledHeader();
  709. #endif
  710. const char *daliServers = globals->queryProp("@daliServers");
  711. if (!daliServers)
  712. {
  713. WARNLOG("No Dali server list specified - assuming local");
  714. daliServers = ".";
  715. }
  716. Owned<IGroup> serverGroup = createIGroup(daliServers, DALI_SERVER_PORT);
  717. try
  718. {
  719. initClientProcess(serverGroup, DCR_EclCCServer);
  720. openLogFile();
  721. SCMStringBuffer queueNames;
  722. getEclCCServerQueueNames(queueNames, processName);
  723. if (!queueNames.length())
  724. throw MakeStringException(0, "No clusters found to listen on");
  725. // The option has been renamed to avoid confusion with the similarly-named eclcc option, but
  726. // still accept the old name if the new one is not present.
  727. unsigned maxThreads = globals->getPropInt("@maxEclccProcesses", globals->getPropInt("@maxCompileThreads", 4));
  728. EclccServer server(queueNames.str(), maxThreads);
  729. // if we got here, eclserver is successfully started and all options are good, so create the "sentinel file" for re-runs from the script
  730. // put in its own "scope" to force the flush
  731. writeSentinelFile(sentinelFile);
  732. server.run();
  733. }
  734. catch (IException * e)
  735. {
  736. EXCLOG(e, "Terminating unexpectedly");
  737. e->Release();
  738. }
  739. catch(...)
  740. {
  741. ERRLOG("Terminating unexpectedly");
  742. }
  743. globals.clear();
  744. UseSysLogForOperatorMessages(false);
  745. ::closedownClientProcess(); // dali client closedown
  746. releaseAtoms();
  747. ExitModuleObjects();
  748. return 0;
  749. }