espp.hpp 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421
  1. /*##############################################################################
  2. HPCC SYSTEMS software Copyright (C) 2012 HPCC Systems®.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. ############################################################################## */
  13. #ifndef __ESPP_HPP__
  14. #define __ESPP_HPP__
  15. #include "espthread.hpp"
  16. #include "espcfg.ipp"
  17. typedef ISocket * isockp;
  18. typedef MapBetween<int, int, isockp, isockp> SocketPortMap;
  19. typedef CopyReferenceArrayOf<ISocket> SocketPortArray;
  20. class CEspTerminator : public Thread
  21. {
  22. public:
  23. IMPLEMENT_IINTERFACE;
  24. virtual int run()
  25. {
  26. sleep(15);
  27. _exit(0);
  28. }
  29. };
  30. class CEspServer : public CInterface,
  31. implements ISocketSelectHandler,
  32. implements IEspServer,
  33. implements IEspContainer,
  34. implements IRestartHandler
  35. {
  36. private:
  37. SocketEndpoint m_address;
  38. Owned<ISocketSelectHandler> m_selectHndlr;
  39. SocketPortMap m_srvSockets;
  40. SocketPortArray m_socketCleanup;
  41. Semaphore m_waitForExit;
  42. bool m_exiting;
  43. bool m_useDali;
  44. LogLevel m_logLevel;
  45. bool m_logReq;
  46. bool m_logResp;
  47. LogLevel txSummaryLevel;
  48. bool txSummaryResourceReq;
  49. unsigned m_slowProcessingTime;
  50. StringAttr m_frameTitle;
  51. Mutex abortMutex;
  52. bool m_SEHMappingEnabled;
  53. CEspConfig* m_config;
  54. CriticalSection m_BindingCritSect;
  55. unsigned countCacheClients = 0;
  56. MapStringToMyClass<IEspCache> cacheClientMap;
  57. public:
  58. IMPLEMENT_IINTERFACE;
  59. //CEspServer(SocketEndpoint address)
  60. CEspServer(CEspConfig* config) : m_config(config)
  61. {
  62. //m_address = address;
  63. m_address = config->getLocalEndpoint();
  64. m_selectHndlr.setown(createSocketSelectHandler());
  65. m_exiting = false;
  66. m_useDali = false;
  67. m_logLevel = config->m_options.logLevel;
  68. m_logReq = config->m_options.logReq;
  69. m_logResp = config->m_options.logResp;
  70. txSummaryLevel = config->m_options.txSummaryLevel;
  71. txSummaryResourceReq = config->m_options.txSummaryResourceReq;
  72. m_slowProcessingTime = config->m_options.slowProcessingTime;
  73. m_frameTitle.set(config->m_options.frameTitle);
  74. m_SEHMappingEnabled = false;
  75. }
  76. ~CEspServer()
  77. {
  78. ForEachItemIn(sindex, m_socketCleanup)
  79. {
  80. m_socketCleanup.item(sindex).Release();
  81. }
  82. m_socketCleanup.kill();
  83. }
  84. void waitForExit(CEspConfig &config)
  85. {
  86. if (config.usesDali())
  87. {
  88. m_useDali = true;
  89. while (!m_exiting)
  90. {
  91. bool daliOk;
  92. {
  93. synchronized sync(abortMutex);
  94. if (!config.isDetachedFromDali())
  95. daliOk=config.checkDali();
  96. //else
  97. // daliOk=true;
  98. }
  99. if (config.isDetachedFromDali() || daliOk)
  100. m_waitForExit.wait(1000); //if detached, should we wait longer?
  101. else
  102. {
  103. DBGLOG("Exiting ESP -- Lost DALI connection!");
  104. break;
  105. }
  106. }
  107. }
  108. else
  109. {
  110. m_useDali = false;
  111. m_waitForExit.wait();
  112. sleep(1);
  113. }
  114. }
  115. //IRestartHandler
  116. void Restart()
  117. {
  118. exitESP();
  119. }
  120. //IEspContainer
  121. void exitESP()
  122. {
  123. if(m_SEHMappingEnabled)
  124. {
  125. DisableSEHtoExceptionMapping();
  126. m_SEHMappingEnabled = false;
  127. }
  128. // YMA: there'll be a leak here, but it's ok.
  129. CEspTerminator* terminator = new CEspTerminator;
  130. terminator->start();
  131. m_exiting=true;
  132. if(!m_useDali)
  133. m_waitForExit.signal();
  134. }
  135. void setLogLevel(LogLevel level) { m_logLevel = level; }
  136. void setLogRequests(bool logReq) { m_logReq = logReq; }
  137. void setLogResponses(bool logResp) { m_logResp = logResp; }
  138. void setTxSummaryLevel(LogLevel level) { txSummaryLevel = level; }
  139. void setTxSummaryResourceReq(bool logReq) { txSummaryResourceReq = logReq; }
  140. LogLevel getLogLevel() { return m_logLevel; }
  141. bool getLogRequests() { return m_logReq; }
  142. bool getLogResponses() { return m_logResp; }
  143. LogLevel getTxSummaryLevel() { return txSummaryLevel; }
  144. bool getTxSummaryResourceReq() { return txSummaryResourceReq; }
  145. void setFrameTitle(const char* title) { m_frameTitle.set(title); }
  146. const char* getFrameTitle() { return m_frameTitle.get(); }
  147. unsigned getSlowProcessingTime() { return m_slowProcessingTime; }
  148. void log(LogLevel level, const char* fmt, ...) __attribute__((format(printf, 3, 4)))
  149. {
  150. if (getLogLevel()>=level)
  151. {
  152. va_list args;
  153. va_start(args, fmt);
  154. VALOG(MCdebugInfo, unknownJob, fmt, args);
  155. va_end(args);
  156. }
  157. }
  158. //IEspServer
  159. void addProtocol(IEspProtocol &protocol)
  160. {
  161. }
  162. void addBinding(const char * name, const char * host, unsigned short port, IEspProtocol &protocol, IEspRpcBinding &binding, bool isdefault, IPropertyTree* cfgtree)
  163. {
  164. StringBuffer strIP;
  165. if (host != NULL)
  166. strIP.append(host);
  167. else
  168. m_address.getIpText(strIP);
  169. LOG(MCprogress, "binding %s, on %s:%d", name, strIP.str(), port);
  170. CriticalBlock cb(m_BindingCritSect);
  171. ISocket **socketp = m_srvSockets.getValue(port);
  172. ISocket *socket=(socketp!=NULL) ? *socketp : NULL;
  173. if (socket==NULL)
  174. {
  175. int backlogsize = 0;
  176. if(cfgtree)
  177. {
  178. const char* blstr = cfgtree->queryProp("@maxBacklogQueueSize");
  179. if(blstr && *blstr)
  180. backlogsize = atoi(blstr);
  181. }
  182. if(backlogsize > 0)
  183. {
  184. socket = ISocket::create_ip(port, strIP.str(), backlogsize);
  185. }
  186. else
  187. {
  188. socket = ISocket::create_ip(port, strIP.str());
  189. }
  190. m_socketCleanup.append(*socket);
  191. LOG(MCprogress, " created server socket(%d)", socket->OShandle());
  192. m_srvSockets.setValue(port, socket);
  193. add(socket, SELECTMODE_READ | SELECTMODE_WRITE, dynamic_cast<ISocketSelectNotify*>(&protocol));
  194. LOG(MCprogress, " Socket(%d) listening.", socket->OShandle());
  195. }
  196. if (socket)
  197. {
  198. protocol.addBindingMap(socket, &binding, isdefault);
  199. socket->Release();
  200. }
  201. else
  202. {
  203. IERRLOG("Can't create socket on %s:%d", strIP.str(), port);
  204. throw MakeStringException(-1, "Can't create socket on %s:%d", strIP.str(), port);
  205. }
  206. }
  207. virtual void removeBinding(unsigned short port, IEspRpcBinding & bind)
  208. {
  209. IEspProtocol* prot = dynamic_cast<IEspProtocol*>(bind.queryListener());
  210. if (prot)
  211. {
  212. CriticalBlock cb(m_BindingCritSect);
  213. int left = prot->removeBindingMap(port, &bind);
  214. if (left == 0)
  215. {
  216. DBGLOG("No more bindings on port %d, so freeing up the port.",port);
  217. ISocket **socketp = m_srvSockets.getValue(port);
  218. ISocket *socket=(socketp!=nullptr) ? *socketp : nullptr;
  219. if (socket != nullptr)
  220. {
  221. remove(socket);
  222. m_srvSockets.remove(port);
  223. socket->close();
  224. }
  225. }
  226. }
  227. }
  228. virtual IPropertyTree* queryProcConfig()
  229. {
  230. return m_config->queryProcConfig();
  231. }
  232. virtual IEspProtocol* queryProtocol(const char* name)
  233. {
  234. return m_config->queryProtocol(name);
  235. }
  236. virtual IEspRpcBinding* queryBinding(const char* name)
  237. {
  238. return m_config->queryBinding(name);
  239. }
  240. virtual const char* getProcName()
  241. {
  242. return m_config->getProcName();
  243. }
  244. //ISocketHandler
  245. void start()
  246. {
  247. m_selectHndlr->start();
  248. }
  249. void add(ISocket *sock,unsigned mode,ISocketSelectNotify *nfy)
  250. {
  251. m_selectHndlr->add(sock, mode, nfy);
  252. }
  253. void remove(ISocket *sock)
  254. {
  255. m_selectHndlr->remove(sock);
  256. }
  257. void stop(bool wait)
  258. {
  259. if(m_selectHndlr)
  260. {
  261. m_selectHndlr->stop(wait);
  262. DBGLOG("select handler stopped.");
  263. }
  264. }
  265. void setSavedSEHHandler(bool mappingEnabled)
  266. {
  267. m_SEHMappingEnabled = mappingEnabled;
  268. }
  269. virtual void sendSnmpMessage(const char* msg) { throwUnexpected(); }
  270. virtual bool addCacheClient(const char *id, const char *cacheInitString)
  271. {
  272. Owned<IEspCache> cacheClient = createESPCache(cacheInitString);
  273. if (!cacheClient)
  274. return false;
  275. cacheClientMap.setValue(id, cacheClient);
  276. countCacheClients++;
  277. return true;
  278. }
  279. virtual bool hasCacheClient()
  280. {
  281. return countCacheClients > 0;
  282. }
  283. virtual const void *queryCacheClient(const char* id)
  284. {
  285. return countCacheClients > 1 ? cacheClientMap.getValue(id) : nullptr;
  286. }
  287. virtual void clearCacheByGroupID(const char *ids, StringArray& errorMsgs)
  288. {
  289. StringArray idList;
  290. idList.appendListUniq(ids, ",");
  291. ForEachItemIn(i, idList)
  292. {
  293. const char *id = idList.item(i);
  294. IEspCache* cacheClient = (IEspCache*) queryCacheClient(id);
  295. if (cacheClient)
  296. cacheClient->flush(0);
  297. else
  298. {
  299. VStringBuffer msg("Failed to get ESPCache client %s.", id);
  300. errorMsgs.append(msg);
  301. }
  302. }
  303. }
  304. virtual bool reSubscribeESPToDali()
  305. {
  306. return m_config->reSubscribeESPToDali();
  307. }
  308. virtual bool unsubscribeESPFromDali()
  309. {
  310. return m_config->unsubscribeESPFromDali();
  311. }
  312. virtual bool detachESPFromDali(bool force)
  313. {
  314. return m_config->detachESPFromDali(force);
  315. }
  316. virtual bool attachESPToDali()
  317. {
  318. return m_config->attachESPToDali();
  319. }
  320. virtual bool isAttachedToDali()
  321. {
  322. return !m_config->isDetachedFromDali();
  323. }
  324. virtual bool isSubscribedToDali()
  325. {
  326. return m_config->isSubscribedToDali();
  327. }
  328. };
  329. class CEspAbortHandler : public CInterface,
  330. implements IAbortHandler
  331. {
  332. CEspConfig* m_config;
  333. CEspServer* m_srv;
  334. public:
  335. IMPLEMENT_IINTERFACE;
  336. CEspAbortHandler()
  337. {
  338. m_config=NULL;
  339. m_srv=NULL;
  340. addAbortHandler(*this);
  341. }
  342. ~CEspAbortHandler()
  343. {
  344. removeAbortHandler(*this);
  345. }
  346. void setConfig(CEspConfig* config)
  347. {
  348. m_config = config;
  349. }
  350. void setServer(CEspServer* srv)
  351. {
  352. m_srv = srv;
  353. }
  354. //IAbortHandler
  355. bool onAbort()
  356. {
  357. LOG(MCprogress, "ESP Abort Handler...");
  358. m_srv->exitESP();
  359. return false;
  360. }
  361. };
  362. #define MAX_CHILDREN 1
  363. #endif //__ESPP_HPP__