/*############################################################################## Copyright (C) 2011 HPCC Systems. All rights reserved. This program is free software: you can redistribute it and/or modify it under the terms of the GNU Affero General Public License as published by the Free Software Foundation, either version 3 of the License, or (at your option) any later version. This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero General Public License for more details. You should have received a copy of the GNU Affero General Public License along with this program. If not, see . ############################################################################## */ #ifndef __ESPP_HPP__ #define __ESPP_HPP__ #include "espthread.hpp" #include "espcfg.ipp" typedef ISocket * isockp; typedef MapBetween SocketPortMap; MAKEPointerArray(ISocket, SocketPortArray); class CEspTerminator : public Thread { public: IMPLEMENT_IINTERFACE; virtual int run() { sleep(15); _exit(0); } }; class CEspServer : public CInterface, implements ISocketSelectHandler, implements IEspServer, implements IEspContainer, implements IRestartHandler { private: SocketEndpoint m_address; Owned m_selectHndlr; SocketPortMap m_srvSockets; SocketPortArray m_socketCleanup; Semaphore m_waitForExit; bool m_exiting; bool m_useDali; LogLevel m_logLevel; bool m_logReq; bool m_logResp; unsigned m_slowProcessingTime; StringAttr m_frameTitle; Mutex abortMutex; bool m_SEHMappingEnabled; CEspConfig* m_config; public: IMPLEMENT_IINTERFACE; //CEspServer(SocketEndpoint address) CEspServer(CEspConfig* config) : m_config(config) { //m_address = address; m_address = config->getLocalEndpoint(); m_selectHndlr.setown(createSocketSelectHandler()); m_exiting = false; m_useDali = false; m_logLevel = config->m_options.logLevel; m_logReq = config->m_options.logReq; m_logResp = config->m_options.logResp; m_slowProcessingTime = config->m_options.slowProcessingTime; m_frameTitle.set(config->m_options.frameTitle); m_SEHMappingEnabled = false; } ~CEspServer() { ForEachItemIn(sindex, m_socketCleanup) { m_socketCleanup.item(sindex).Release(); } m_socketCleanup.kill(); } void waitForExit(CEspConfig &config) { if (config.usesDali()) { m_useDali = true; while (!m_exiting) { bool daliOk; { synchronized sync(abortMutex); daliOk=config.checkDali(); } if (daliOk) m_waitForExit.wait(1000); else { DBGLOG("Exiting ESP -- Lost DALI connection!"); break; } } } else { m_useDali = false; m_waitForExit.wait(); sleep(1); } } //IRestartHandler void Restart() { exitESP(); } //IEspContainer void exitESP() { if(m_SEHMappingEnabled) { DisableSEHtoExceptionMapping(); m_SEHMappingEnabled = false; } // YMA: there'll be a leak here, but it's ok. CEspTerminator* terminator = new CEspTerminator; terminator->start(); m_exiting=true; if(!m_useDali) m_waitForExit.signal(); } void setLogLevel(LogLevel level) { m_logLevel = level; } LogLevel getLogLevel() { return m_logLevel; } bool getLogRequests() { return m_logReq; } bool getLogResponses() { return m_logResp; } void setFrameTitle(const char* title) { m_frameTitle.set(title); } const char* getFrameTitle() { return m_frameTitle.get(); } unsigned getSlowProcessingTime() { return m_slowProcessingTime; } void log(LogLevel level, const char* fmt, ...) __attribute__((format(printf, 3, 4))) { if (getLogLevel()>=level) { va_list args; va_start(args, fmt); VALOG(MCdebugInfo, unknownJob, fmt, args); va_end(args); } } //IEspServer void addProtocol(IEspProtocol &protocol) { } void addBinding(const char * name, const char * host, unsigned short port, IEspProtocol &protocol, IEspRpcBinding &binding, bool isdefault, IPropertyTree* cfgtree) { StringBuffer strIP; if (host != NULL) strIP.append(host); else m_address.getIpText(strIP); LOG(MCprogress, "binding %s, on %s:%d", name, strIP.str(), port); ISocket **socketp = m_srvSockets.getValue(port); ISocket *socket=(socketp!=NULL) ? *socketp : NULL; if (socket==NULL) { int backlogsize = 0; if(cfgtree) { const char* blstr = cfgtree->queryProp("@maxBacklogQueueSize"); if(blstr && *blstr) backlogsize = atoi(blstr); } if(backlogsize > 0) { socket = ISocket::create_ip(port, strIP.str(), backlogsize); } else { socket = ISocket::create_ip(port, strIP.str()); } m_socketCleanup.append(*socket); LOG(MCprogress, " created server socket(%d)", socket->OShandle()); m_srvSockets.setValue(port, socket); add(socket, SELECTMODE_READ | SELECTMODE_WRITE, dynamic_cast(&protocol)); LOG(MCprogress, " Socket(%d) listening.", socket->OShandle()); } if (socket) { protocol.addBindingMap(socket, &binding, isdefault); socket->Release(); } else { IERRLOG("Can't create socket on %s:%d", strIP.str(), port); throw MakeStringException(-1, "Can't create socket on %s:%d", strIP.str(), port); } } //ISocketHandler void start() { m_selectHndlr->start(); } void add(ISocket *sock,unsigned mode,ISocketSelectNotify *nfy) { m_selectHndlr->add(sock, mode, nfy); } void remove(ISocket *sock) { m_selectHndlr->remove(sock); } void stop(bool wait) { if(m_selectHndlr) { m_selectHndlr->stop(wait); DBGLOG("select handler stopped."); } } void setSavedSEHHandler(bool mappingEnabled) { m_SEHMappingEnabled = mappingEnabled; } virtual void sendSnmpMessage(const char* msg) { throwUnexpected(); } }; class CEspAbortHandler : public CInterface, implements IAbortHandler { CEspConfig* m_config; CEspServer* m_srv; public: IMPLEMENT_IINTERFACE; CEspAbortHandler() { m_config=NULL; m_srv=NULL; addAbortHandler(*this); } ~CEspAbortHandler() { removeAbortHandler(*this); } void setConfig(CEspConfig* config) { m_config = config; } void setServer(CEspServer* srv) { m_srv = srv; } //IAbortHandler bool onAbort() { LOG(MCprogress, "ESP Abort Handler..."); m_srv->exitESP(); return false; } }; #define MAX_CHILDREN 1 #endif //__ESPP_HPP__