/*############################################################################## Copyright (C) 2011 HPCC Systems. All rights reserved. This program is free software: you can redistribute it and/or modify it under the terms of the GNU Affero General Public License as published by the Free Software Foundation, either version 3 of the License, or (at your option) any later version. This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero General Public License for more details. You should have received a copy of the GNU Affero General Public License along with this program. If not, see . ############################################################################## */ #define da_decl __declspec(dllexport) #include "platform.h" #include "jlib.hpp" #include "jfile.hpp" #include "jsuperhash.hpp" #include "jmisc.hpp" #include "jencrypt.hpp" #include "dacoven.hpp" #include "mpbuff.hpp" #include "mpcomm.hpp" #include "mputil.hpp" #include "daserver.hpp" #include "dasubs.ipp" #include "dasds.hpp" #include "daclient.hpp" #include "daldap.hpp" #include "dasess.hpp" #ifdef _MSC_VER #pragma warning (disable : 4355) #endif const char *queryRoleName(DaliClientRole role) { switch (role) { case DCR_Private: return "Private"; case DCR_Diagnostic: return "Diagnostic"; case DCR_ThorSlave: return "ThorSlave"; case DCR_ThorMaster: return "ThorMaster"; case DCR_EclServer: return "EclServer"; case DCR_EclScheduler: return "EclScheduler"; case DCR_EclAgent: return "EclAgent"; case DCR_AgentExec: return "AgentExec"; case DCR_DaliServer:return "DaliServer"; case DCR_SashaServer: return "SashaServer"; case DCR_Util: return "Util"; case DCR_Dfu: return "Dfu"; case DCR_DfuServer: return "DfuServer"; case DCR_EspServer: return "EspServer"; case DCR_WuClient: return "WuClient"; case DCR_Config: return "Config"; case DCR_Scheduler: return "Scheduler"; case DCR_RoxyMaster: return "RoxieMaster"; case DCR_RoxySlave: return "RoxieSlave"; case DCR_BackupGen: return "BackupGen"; case DCR_Other: return "Other"; } return "Unknown"; } interface ISessionManagerServer: implements IConnectionMonitor { virtual SessionId registerSession(SecurityToken tok,SessionId parentid) = 0; virtual SessionId registerClientProcess(INode *node,IGroup *&grp, DaliClientRole role) = 0; virtual void addProcessSession(SessionId id, INode *node, DaliClientRole role) = 0; virtual void addSession(SessionId id) = 0; virtual SessionId lookupProcessSession(INode *node) = 0; virtual INode *getProcessSessionNode(SessionId id) =0; virtual int getPermissionsLDAP(const char *key,const char *obj,IUserDescriptor *udesc,unsigned flags, int *err)=0; virtual void stopSession(SessionId sessid,bool failed) = 0; virtual void setClientAuth(IDaliClientAuthConnection *authconn) = 0; virtual void setLDAPconnection(IDaliLdapConnection *_ldapconn) = 0; virtual bool authorizeConnection(int role,bool revoke) = 0; virtual void start() = 0; virtual void ready() = 0; virtual void stop() = 0; }; static SessionId mySessionId=0; static ISessionManager *SessionManager=NULL; static ISessionManagerServer *SessionManagerServer=NULL; static CriticalSection sessionCrit; #define SESSIONREPLYTIMEOUT (3*60*1000) #define CLDAPE_getpermtimeout (-1) #define CLDAPE_ldapfailure (-2) class CDaliLDAP_Exception: public CInterface, implements IException { int errcode; public: CDaliLDAP_Exception(int _errcode) { errcode = _errcode; } int errorCode() const { return errcode; } StringBuffer & errorMessage(StringBuffer &str) const { if (errcode==0) return str; str.appendf("LDAP Exception(%d): ",errcode); if (errcode==CLDAPE_getpermtimeout) return str.append("getPermissionsLDAP - timeout to LDAP server"); if (errcode==CLDAPE_ldapfailure) return str.append("getPermissionsLDAP - LDAP server failure"); return str.append("Unknown Exception"); } MessageAudience errorAudience() const { return MSGAUD_user; } IMPLEMENT_IINTERFACE; }; class CdelayedTerminate: public Thread // slightly obfuscated stop code { byte err; int run() { while (getRandom()%711!=0) getRandom(); // look busy ERRLOG("Server fault %d",(int)err); while (getRandom()%7!=0) Sleep(1); exit(0); } public: CdelayedTerminate(byte _err) { err = _err; start(); Release(); Sleep(100); } }; class CSessionState: public CInterface { protected: friend class CSessionStateTable; SessionId id; public: CSessionState(SessionId _id) { id = _id; } SessionId getId() const { return id; } }; class CSessionStateTable: private SuperHashTableOf { CheckedCriticalSection sessstatesect; void onAdd(void *) {} void onRemove(void *e) { CSessionState &elem=*(CSessionState *)e; elem.Release(); } unsigned getHashFromElement(const void *e) const { const CSessionState &elem=*(const CSessionState *)e; SessionId id=elem.id; return low(id)^(unsigned)high(id); } unsigned getHashFromFindParam(const void *fp) const { SessionId id = *(const SessionId *)fp; return low(id)^(unsigned)high(id); } const void * getFindParam(const void *p) const { const CSessionState &elem=*(const CSessionState *)p; return (void *)&elem.id; } bool matchesFindParam(const void * et, const void *fp, unsigned) const { return ((CSessionState *)et)->id==*(SessionId *)fp; } IMPLEMENT_SUPERHASHTABLEOF_REF_FIND(CSessionState,SessionId); public: CSessionStateTable() { } ~CSessionStateTable() { CHECKEDCRITICALBLOCK(sessstatesect,60000); releaseAll(); } bool add(CSessionState *e) // takes ownership { CHECKEDCRITICALBLOCK(sessstatesect,60000); if (SuperHashTableOf::find(&e->id)) return false; SuperHashTableOf::add(*e); return true; } const CSessionState *query(SessionId id) { CHECKEDCRITICALBLOCK(sessstatesect,60000); return SuperHashTableOf::find(&id); } void remove(SessionId id) { CHECKEDCRITICALBLOCK(sessstatesect,60000); SuperHashTableOf::remove(&id); } }; class CProcessSessionState: public CSessionState { INode *node; DaliClientRole role; public: CProcessSessionState(SessionId id,INode *_node,DaliClientRole _role) : CSessionState(id) { node = _node; node->Link(); role = _role; } ~CProcessSessionState() { node->Release(); } INode &queryNode() const { return *node; } DaliClientRole queryRole() const { return role; } StringBuffer &getDetails(StringBuffer &buf) { StringBuffer ep; return buf.appendf("%16"I64F"X: %s, role=%s",CSessionState::id,node->endpoint().getUrlStr(ep).str(),queryRoleName(role)); } }; class CMapProcessToSession: private SuperHashTableOf { CheckedCriticalSection mapprocesssect; void onAdd(void *) {} void onRemove(void *e) {} // do nothing unsigned getHashFromElement(const void *e) const { const CProcessSessionState &elem=*(const CProcessSessionState *)e; return elem.queryNode().getHash(); } unsigned getHashFromFindParam(const void *fp) const { return ((INode *)fp)->getHash(); } const void * getFindParam(const void *p) const { const CProcessSessionState &elem=*(const CProcessSessionState *)p; return (void *)&elem.queryNode(); } bool matchesFindParam(const void * et, const void *fp, unsigned) const { return ((CProcessSessionState *)et)->queryNode().equals((INode *)fp); } IMPLEMENT_SUPERHASHTABLEOF_REF_FIND(CProcessSessionState,INode); public: CMapProcessToSession() { } ~CMapProcessToSession() { CHECKEDCRITICALBLOCK(mapprocesssect,60000); releaseAll(); } bool add(CProcessSessionState *e) { CHECKEDCRITICALBLOCK(mapprocesssect,60000); if (SuperHashTableOf::find(&e->queryNode())) return false; SuperHashTableOf::add(*e); return true; } CProcessSessionState *query(INode *n) { CHECKEDCRITICALBLOCK(mapprocesssect,60000); return SuperHashTableOf::find(n); } void remove(INode *n,ISessionManagerServer *manager) { CHECKEDCRITICALBLOCK(mapprocesssect,60000); CProcessSessionState *sstate = SuperHashTableOf::find(n); if (sstate) { if (manager) manager->authorizeConnection(sstate->queryRole(),true); SuperHashTableOf::removeExact(sstate); } } unsigned count() { CHECKEDCRITICALBLOCK(mapprocesssect,60000); return SuperHashTableOf::count(); } CProcessSessionState *next(const CProcessSessionState *s) { CHECKEDCRITICALBLOCK(mapprocesssect,60000); return (CProcessSessionState *)SuperHashTableOf::next(s); } }; enum MSessionRequestKind { MSR_REGISTER_PROCESS_SESSION, MSR_SECONDARY_REGISTER_PROCESS_SESSION, MSR_REGISTER_SESSION, MSR_SECONDARY_REGISTER_SESSION, MSR_LOOKUP_PROCESS_SESSION, MSR_STOP_SESSION, MSR_IMPORT_CAPABILITIES, MSR_LOOKUP_LDAP_PERMISSIONS, MSR_EXIT // TBD }; class CSessionRequestServer: public Thread { bool stopped; ISessionManagerServer &manager; Semaphore acceptConnections; public: CSessionRequestServer(ISessionManagerServer &_manager) : Thread("Session Manager, CSessionRequestServer"), manager(_manager) { stopped = true; } int run() { ICoven &coven=queryCoven(); CMessageHandler handler("CSessionRequestServer",this,&CSessionRequestServer::processMessage); stopped = false; CMessageBuffer mb; while (!stopped) { try { mb.clear(); if (coven.recv(mb,RANK_ALL,MPTAG_DALI_SESSION_REQUEST,NULL)) handler.handleMessage(mb); else stopped = true; } catch (IException *e) { EXCLOG(e, "CDaliPublisherServer"); e->Release(); } } return 0; } void processMessage(CMessageBuffer &mb) { ICoven &coven=queryCoven(); SessionId id; int fn; mb.read(fn); switch (fn) { case MSR_REGISTER_PROCESS_SESSION: { acceptConnections.wait(); acceptConnections.signal(); Owned node(deserializeINode(mb)); Owned servernode(deserializeINode(mb)); // hopefully me, but not if forwarded int role=0; if (mb.length()-mb.getPos()>=sizeof(role)) { // a capability block present mb.read(role); if (!manager.authorizeConnection(role,false)) { SocketEndpoint sender = mb.getSender(); mb.clear(); coven.reply(mb); MilliSleep(100+getRandom()%1000); // Causes client to 'work' for a short time. Owned node = createINode(sender); coven.disconnect(node); break; } #ifdef _DEBUG StringBuffer eps; PROGLOG("Connection to %s authorized",mb.getSender().getUrlStr(eps).str()); #endif } IGroup *covengrp; id = manager.registerClientProcess(node.get(),covengrp,(DaliClientRole)role); mb.clear().append(id); if (covengrp->rank(servernode)==RANK_NULL) { // must have been redirected covengrp->Release(); // no good, so just use one we know about (may use something more sophisticated later) INode *na = servernode.get(); covengrp = createIGroup(1, &na); } covengrp->serialize(mb); covengrp->Release(); coven.reply(mb); } break; case MSR_SECONDARY_REGISTER_PROCESS_SESSION: { mb.read(id); Owned node (deserializeINode(mb)); int role; mb.read(role); manager.addProcessSession(id,node.get(),(DaliClientRole)role); mb.clear(); coven.reply(mb); } break; case MSR_REGISTER_SESSION: { SecurityToken tok; SessionId parentid; mb.read(tok).read(parentid); SessionId id = manager.registerSession(tok,parentid); mb.clear().append(id); coven.reply(mb); } break; case MSR_SECONDARY_REGISTER_SESSION: { mb.read(id); manager.addSession(id); mb.clear(); coven.reply(mb); } break; case MSR_LOOKUP_PROCESS_SESSION: { // looks up from node or from id Owned node (deserializeINode(mb)); if (node->endpoint().isNull()&&(mb.length()-mb.getPos()>=sizeof(id))) { mb.read(id); INode *n = manager.getProcessSessionNode(id); if (n) node.setown(n); node->serialize(mb.clear()); } else { id = manager.lookupProcessSession(node.get()); mb.clear().append(id); } coven.reply(mb); } break; case MSR_STOP_SESSION: { SessionId sessid; bool failed; mb.read(sessid).read(failed); manager.stopSession(sessid,failed); mb.clear(); coven.reply(mb); } break; case MSR_LOOKUP_LDAP_PERMISSIONS: { StringAttr key; StringAttr obj; Owned udesc=createUserDescriptor(); StringAttr username; StringAttr passwordenc; mb.read(key).read(obj); udesc->deserialize(mb); #ifdef _DALIUSER_STACKTRACE //following debug code to be removed StringBuffer sb; udesc->getUserName(sb); if (0==sb.length() || !strcmpi(sb.str(), "daliuser")) { DBGLOG("UNEXPECTED USER '%s' in %s line %ld",username,__FILE__, __LINE__); PrintStackReport(); } #endif unsigned auditflags = 0; if (mb.length()-mb.getPos()>=sizeof(auditflags)) mb.read(auditflags); int err = 0; int ret=manager.getPermissionsLDAP(key,obj,udesc,auditflags,&err); mb.clear().append(ret); if (err) mb.append(err); coven.reply(mb); } break; } } void ready() { acceptConnections.signal(); } void stop() { if (!stopped) { stopped = true; queryCoven().cancel(RANK_ALL, MPTAG_DALI_SESSION_REQUEST); } join(); } }; class CSessionManagerBase: public CInterface, implements ISessionManager { protected: CheckedCriticalSection sessmanagersect; public: IMPLEMENT_IINTERFACE; CSessionManagerBase() { servernotifys.kill(); } virtual ~CSessionManagerBase() { } class CSessionSubscriptionProxy: public CInterface, implements ISubscription { ISessionNotify *sub; SubscriptionId id; MemoryAttr ma; SessionId sessid; public: IMPLEMENT_IINTERFACE; CSessionSubscriptionProxy(ISessionNotify *_sub,SessionId _sessid) { sub = LINK(_sub); sessid = _sessid; MemoryBuffer mb; mb.append(sessid); ma.set(mb.length(),mb.toByteArray()); id = queryCoven().getUniqueId(); } ~CSessionSubscriptionProxy() { sub->Release(); } ISessionNotify *queryNotify() { return sub; } const MemoryAttr &queryData() { return ma; } SubscriptionId getId() { return id; } void doNotify(bool aborted) { if (aborted) sub->aborted(sessid); else sub->closed(sessid); } void notify(MemoryBuffer &mb) { bool aborted; mb.read(aborted); doNotify(aborted); } void abort() { //NH: TBD } bool aborted() { return false; } }; CIArrayOfservernotifys; SubscriptionId subscribeSession(SessionId sessid, ISessionNotify *inotify) { CSessionSubscriptionProxy *proxy; SubscriptionId id; { CHECKEDCRITICALBLOCK(sessmanagersect,60000); proxy = new CSessionSubscriptionProxy(inotify,sessid); id = proxy->getId(); if (sessid==SESSID_DALI_SERVER) { servernotifys.append(*proxy); return id; } } querySubscriptionManager(SESSION_PUBLISHER)->add(proxy,id); return id; } void unsubscribeSession(SubscriptionId id) { { CHECKEDCRITICALBLOCK(sessmanagersect,60000); // check not a server subscription ForEachItemIn(i,servernotifys) { if (servernotifys.item(i).getId()==id) { servernotifys.remove(i); return; } } } querySubscriptionManager(SESSION_PUBLISHER)->remove(id); } class cNotify: public CInterface, implements ISessionNotify { public: IMPLEMENT_IINTERFACE; Semaphore sem; void closed(SessionId id) { //PROGLOG("Session closed %"I64F"x",id); sem.signal(); } void aborted(SessionId id) { //PROGLOG("Session aborted %"I64F"x",id); sem.signal(); } }; bool sessionStopped(SessionId id, unsigned timeout) { Owned node = getProcessSessionNode(id); if (node.get()==NULL) return true; if (timeout==0) return false; Owned cnotify = new cNotify; querySessionManager().subscribeSession(id,cnotify); if (cnotify->sem.wait(timeout)) return false; node.setown(getProcessSessionNode(id)); return node.get()==NULL; } void notifyServerStopped(bool aborted) { CHECKEDCRITICALBLOCK(sessmanagersect,60000); // check not a server subscription ForEachItemIn(i,servernotifys) { servernotifys.item(i).doNotify(aborted); } } }; class CClientSessionManager: public CSessionManagerBase, implements IConnectionMonitor { bool securitydisabled; public: IMPLEMENT_IINTERFACE; CClientSessionManager() { securitydisabled = false; } virtual ~CClientSessionManager() { stop(); } SessionId lookupProcessSession(INode *node) { if (!node) return mySessionId; CMessageBuffer mb; mb.append((int)MSR_LOOKUP_PROCESS_SESSION); node->serialize(mb); if (!queryCoven().sendRecv(mb,RANK_RANDOM,MPTAG_DALI_SESSION_REQUEST,SESSIONREPLYTIMEOUT)) return 0; SessionId ret; mb.read(ret); return ret; } virtual INode *getProcessSessionNode(SessionId id) { if (!id) return NULL; CMessageBuffer mb; mb.append((int)MSR_LOOKUP_PROCESS_SESSION); queryNullNode()->serialize(mb); mb.append(id); if (!queryCoven().sendRecv(mb,RANK_RANDOM,MPTAG_DALI_SESSION_REQUEST,SESSIONREPLYTIMEOUT)) return NULL; Owned node = deserializeINode(mb); if (node->endpoint().isNull()) return NULL; return node.getClear(); } int getPermissionsLDAP(const char *key,const char *obj,IUserDescriptor *udesc,unsigned auditflags,int *err) { if (err) *err = 0; if (securitydisabled) return -1; if (queryDaliServerVersion().compare("1.8") < 0) { securitydisabled = true; return -1; } CMessageBuffer mb; mb.append((int)MSR_LOOKUP_LDAP_PERMISSIONS); mb.append(key).append(obj); #ifdef _DALIUSER_STACKTRACE //following debug code to be removed StringBuffer sb; udesc->getUserName(sb); if (0==sb.length() || !strcmpi(sb.str(), "daliuser")) { DBGLOG("UNEXPECTED USER '%s' in %s line %ld",sb.str(),__FILE__, __LINE__); PrintStackReport(); } #endif udesc->serialize(mb); mb.append(auditflags); if (!queryCoven().sendRecv(mb,RANK_RANDOM,MPTAG_DALI_SESSION_REQUEST,SESSIONREPLYTIMEOUT)) return 0; int ret=-1; if (mb.remaining()>=sizeof(ret)) { mb.read(ret); int e = 0; if (mb.remaining()>=sizeof(e)) { if (err) *err = e; else if (e) throw new CDaliLDAP_Exception(e); } } if (ret==-1) securitydisabled = true; return ret; } bool checkScopeScansLDAP() { assertex(!"checkScopeScansLDAP called on client"); return true; // actually only used server size } unsigned getLDAPflags() { assertex(!"getLdapFlags called on client"); return 0; } void setLDAPflags(unsigned) { assertex(!"setLdapFlags called on client"); } bool authorizeConnection(DaliClientRole,bool) { return true; } SessionId startSession(SecurityToken tok, SessionId parentid) { CMessageBuffer mb; mb.append((int)MSR_REGISTER_SESSION).append(tok).append(parentid); if (!queryCoven().sendRecv(mb,RANK_RANDOM,MPTAG_DALI_SESSION_REQUEST),SESSIONREPLYTIMEOUT) return 0; SessionId ret; mb.read(ret); return ret; } void stopSession(SessionId sessid, bool failed) { if (sessid==SESSID_DALI_SERVER) { notifyServerStopped(failed); return; } CMessageBuffer mb; mb.append((int)MSR_STOP_SESSION).append(sessid).append(failed); queryCoven().sendRecv(mb,RANK_RANDOM,MPTAG_DALI_SESSION_REQUEST,SESSIONREPLYTIMEOUT); } void onClose(SocketEndpoint &ep) { CHECKEDCRITICALBLOCK(sessmanagersect,60000); Owned node = createINode(ep); if (queryCoven().inCoven(node)) { StringBuffer str; PROGLOG("Coven Session Stopping (%s)",ep.getUrlStr(str).str()); if (queryCoven().size()==1) notifyServerStopped(true); } } void start() { addMPConnectionMonitor(this); } void stop() { } void ready() { removeMPConnectionMonitor(this); } StringBuffer &getClientProcessList(StringBuffer &buf) { // dummy return buf; } StringBuffer &getClientProcessEndpoint(SessionId,StringBuffer &buf) { // dummy return buf; } unsigned queryClientCount() { // dummy return 0; } void importCapabilities(MemoryBuffer &mb) { CMessageBuffer msg; msg.append((int)MSR_IMPORT_CAPABILITIES); msg.append(mb.length(), mb.toByteArray()); queryCoven().sendRecv(msg,RANK_RANDOM,MPTAG_DALI_SESSION_REQUEST,SESSIONREPLYTIMEOUT); } }; class CLdapWorkItem : public Thread { StringAttr key; StringAttr obj; Linked udesc; Linked ldapconn; unsigned flags; bool running; Semaphore contsem; Semaphore ready; Semaphore &threaddone; int ret; public: IMPLEMENT_IINTERFACE; CLdapWorkItem(IDaliLdapConnection *_ldapconn,Semaphore &_threaddone) : ldapconn(_ldapconn), threaddone(_threaddone) { running = false; } void start(const char *_key,const char *_obj,IUserDescriptor *_udesc,unsigned _flags) { key.set(_key); obj.set(_obj); udesc.set(_udesc); flags = _flags; ret = CLDAPE_ldapfailure; if (!running) { running = true; Thread::start(); } contsem.signal(); } int run() { loop { contsem.wait(); if (!running) break; try { ret = ldapconn->getPermissions(key,obj,udesc,flags); } catch(IException *e) { LOG(MCoperatorError, unknownJob, e, "CLdapWorkItem"); e->Release(); } ready.signal(); } threaddone.signal(); return 0; } bool wait(unsigned timeout, int &_ret) { if (ready.wait(timeout)) { _ret = ret; return true; } _ret = 0; return false; } void stop() { running = false; contsem.signal(); } static CLdapWorkItem *get(IDaliLdapConnection *_ldapconn,Semaphore &_threaddone) { if (!_threaddone.wait(1000*60*5)) { ERRLOG("Too many stalled LDAP threads"); return NULL; } return new CLdapWorkItem(_ldapconn,_threaddone); } }; class CCovenSessionManager: public CSessionManagerBase, implements ISessionManagerServer, implements ISubscriptionManager { CSessionRequestServer sessionrequestserver; CSessionStateTable sessionstates; CMapProcessToSession processlookup; Owned ldapconn; Owned ldapworker; Semaphore ldapsig; atomic_t ldapwaiting; Semaphore workthreadsem; bool stopping; void remoteAddProcessSession(rank_t dst,SessionId id,INode *node, DaliClientRole role) { CMessageBuffer mb; mb.append((int)MSR_SECONDARY_REGISTER_PROCESS_SESSION).append(id); node->serialize(mb); int r = (int)role; mb.append(role); queryCoven().sendRecv(mb,dst,MPTAG_DALI_SESSION_REQUEST); // no fail currently } void remoteAddSession(rank_t dst,SessionId id) { CMessageBuffer mb; mb.append((int)MSR_SECONDARY_REGISTER_SESSION).append(id); queryCoven().sendRecv(mb,dst,MPTAG_DALI_SESSION_REQUEST); // no fail currently } public: IMPLEMENT_IINTERFACE; CCovenSessionManager() : sessionrequestserver(*this) { mySessionId = queryCoven().getUniqueId(); // tell others in coven TBD registerSubscriptionManager(SESSION_PUBLISHER,this); atomic_set(&ldapwaiting,0); workthreadsem.signal(10); stopping = false; ldapsig.signal(); } ~CCovenSessionManager() { stubs.kill(); } void start() { sessionrequestserver.start(); } void stop() { stopping = true; if (!ldapsig.wait(60*1000)) WARNLOG("LDAP stalled(1)"); if (ldapworker) { ldapworker->stop(); if (!ldapworker->join(1000)) WARNLOG("LDAP stalled(2)"); ldapworker.clear(); } ldapconn.clear(); removeMPConnectionMonitor(this); sessionrequestserver.stop(); } void ready() { addMPConnectionMonitor(this); sessionrequestserver.ready(); } void setLDAPconnection(IDaliLdapConnection *_ldapconn) { if (_ldapconn&&(_ldapconn->getLDAPflags()&DLF_ENABLED)) ldapconn.setown(_ldapconn); else { ldapconn.clear(); ::Release(_ldapconn); } } void setClientAuth(IDaliClientAuthConnection *_authconn) { } void addProcessSession(SessionId id,INode *client,DaliClientRole role) { StringBuffer str; PROGLOG("Session starting %"I64F"x (%s) : role=%s",id,client->endpoint().getUrlStr(str).str(),queryRoleName(role)); CHECKEDCRITICALBLOCK(sessmanagersect,60000); CProcessSessionState *s = new CProcessSessionState(id,client,role); while (!sessionstates.add(s)) { // takes ownership WARNLOG("Dali session manager: session already registered"); sessionstates.remove(id); } while (!processlookup.add(s)) { ERRLOG("Dali session manager: registerClient process session already registered"); processlookup.remove(client,this); } } void addSession(SessionId id) { CHECKEDCRITICALBLOCK(sessmanagersect,60000); CSessionState *s = new CSessionState(id); while (!sessionstates.add(s)) { // takes ownership WARNLOG("Dali session manager: session already registered (2)"); sessionstates.remove(id); } } SessionId registerSession(SecurityToken, SessionId parent) { // this is where security token should be checked // not in critical block (otherwise possibility of deadlock) ICoven &coven=queryCoven(); SessionId id = coven.getUniqueId(); rank_t myrank = coven.getServerRank(); rank_t ownerrank = coven.chooseServer(id); // first do local if (myrank==ownerrank) addSession(id); else remoteAddSession(ownerrank,id); ForEachOtherNodeInGroup(r,coven.queryGroup()) { if (r!=ownerrank) remoteAddSession(r,id); } return id; } SessionId registerClientProcess(INode *client, IGroup *& retcoven, DaliClientRole role) { // not in critical block (otherwise possibility of deadlock) retcoven = NULL; ICoven &coven=queryCoven(); SessionId id = coven.getUniqueId(); rank_t myrank = coven.getServerRank(); rank_t ownerrank = coven.chooseServer(id); // first do local if (myrank==ownerrank) addProcessSession(id,client,role); else remoteAddProcessSession(ownerrank,id,client,role); ForEachOtherNodeInGroup(r,coven.queryGroup()) { if (r!=ownerrank) remoteAddProcessSession(r,id,client,role); } retcoven = coven.getGroup(); return id; } SessionId lookupProcessSession(INode *node) { if (!node) return mySessionId; CHECKEDCRITICALBLOCK(sessmanagersect,60000); CProcessSessionState *s= processlookup.query(node); return s?s->getId():0; } INode *getProcessSessionNode(SessionId id) { StringBuffer eps; if (SessionManager->getClientProcessEndpoint(id,eps).length()!=0) return createINode(eps.str()); return NULL; } virtual int getPermissionsLDAP(const char *key,const char *obj,IUserDescriptor *udesc,unsigned flags, int *err) { if (err) *err = 0; if (!ldapconn) return -1; #ifdef _NO_LDAP return -1; #else if ((ldapconn->getLDAPflags()&(DLF_SAFE|DLF_ENABLED))!=(DLF_SAFE|DLF_ENABLED)) return ldapconn->getPermissions(key,obj,udesc,flags); atomic_inc(&ldapwaiting); unsigned retries = 0; while (!stopping) { if (ldapsig.wait(1000)) { atomic_dec(&ldapwaiting); if (!ldapworker) ldapworker.setown(CLdapWorkItem::get(ldapconn,workthreadsem)); if (ldapworker) { ldapworker->start(key,obj,udesc,flags); for (unsigned i=0;i<10;i++) { if (i) WARNLOG("LDAP stalled(%d) - retrying",i); int ret; if (ldapworker->wait(1000*20,ret)) { if (ret==CLDAPE_ldapfailure) { LOG(MCoperatorError, unknownJob, "LDAP - failure (returning no access for %s)",obj); ldapsig.signal(); if (err) *err = CLDAPE_ldapfailure; return 0; } else { ldapsig.signal(); return ret; } } if (atomic_read(&ldapwaiting)>10) // give up quicker if piling up break; if (i==5) { // one retry ldapworker->stop(); // abandon thread ldapworker.clear(); ldapworker.setown(CLdapWorkItem::get(ldapconn,workthreadsem)); if (ldapworker) ldapworker->start(key,obj,udesc,flags); } } if (ldapworker) ldapworker->stop(); ldapworker.clear(); // abandon thread } LOG(MCoperatorError, unknownJob, "LDAP stalled - aborting (returning no access for %s)",obj); ldapsig.signal(); if (err) *err = CLDAPE_getpermtimeout; return 0; } else { unsigned waiting = atomic_read(&ldapwaiting); static unsigned last=0; static unsigned lasttick=0; static unsigned first50=0; if ((waiting!=last)&&(msTick()-lasttick>1000)) { WARNLOG("%d threads waiting for ldap",waiting); last = waiting; lasttick = msTick(); } if (waiting>50) { if (first50==0) first50 = msTick(); else if (msTick()-first50>60*1000) { LOG(MCoperatorError, unknownJob, "LDAP stalled - aborting (returning 0 for %s)", obj); if (err) *err = CLDAPE_getpermtimeout; break; } } else first50 = 0; } } atomic_dec(&ldapwaiting); return 0; #endif } virtual bool checkScopeScansLDAP() { #ifdef _NO_LDAP return false; #else return ldapconn?ldapconn->checkScopeScans():false; #endif } virtual unsigned getLDAPflags() { #ifdef _NO_LDAP return 0; #else return ldapconn?ldapconn->getLDAPflags():0; #endif } void setLDAPflags(unsigned flags) { #ifndef _NO_LDAP if (ldapconn) ldapconn->setLDAPflags(flags); #endif } bool authorizeConnection(int role,bool revoke) { return true; } SessionId startSession(SecurityToken tok, SessionId parentid) { return registerSession(tok,parentid); } void setSessionDependancy(SessionId id , SessionId dependsonid ) { UNIMPLEMENTED; // TBD } void clearSessionDependancy(SessionId id , SessionId dependsonid ) { UNIMPLEMENTED; // TBD } protected: class CSessionSubscriptionStub: public CInterface { ISubscription *subs; SubscriptionId id; SessionId sessid; public: IMPLEMENT_IINTERFACE; CSessionSubscriptionStub(ISubscription *_subs,SubscriptionId _id) // takes ownership { subs = _subs; id = _id; const MemoryAttr &ma = subs->queryData(); MemoryBuffer mb(ma.length(),ma.get()); mb.read(sessid); } ~CSessionSubscriptionStub() { subs->Release(); } SubscriptionId getId() { return id; } SessionId getSessionId() { return sessid; } void notify(bool abort) { MemoryBuffer mb; mb.append(abort); subs->notify(mb); } }; CIArrayOf stubs; unsigned findsub(SubscriptionId id) { ForEachItemIn(i,stubs) { CSessionSubscriptionStub &stub = stubs.item(i); if (stub.getId()==id) return i; } return NotFound; } void add(ISubscription *subs,SubscriptionId id) { CSessionSubscriptionStub *nstub; { CHECKEDCRITICALBLOCK(sessmanagersect,60000); nstub = new CSessionSubscriptionStub(subs,id); if (sessionstates.query(nstub->getSessionId())||(nstub->getSessionId()==mySessionId)) { stubs.append(*nstub); return; } } // see if session known MemoryBuffer mb; bool abort=true; mb.append(abort); ERRLOG("Session Manager - adding unknown session ID %"I64F"x", nstub->getSessionId()); subs->notify(mb); delete nstub; return; } void remove(SubscriptionId id) { CHECKEDCRITICALBLOCK(sessmanagersect,60000); unsigned i=findsub(id); if (i!=NotFound) stubs.remove(i); } void stopSession(SessionId id, bool abort) { PROGLOG("Session stopping %"I64F"x %s",id,abort?"aborted":"ok"); CHECKEDCRITICALBLOCK(sessmanagersect,60000); // do in multiple stages as may remove one or more sub sussions loop { CIArrayOf tonotify; for (unsigned i=stubs.ordinality();i;) { CSessionSubscriptionStub &stub = stubs.item(--i); if (stub.getSessionId()==id) { stubs.remove(i, true); tonotify.append(stub); } } if (tonotify.ordinality()==0) break; CHECKEDCRITICALUNBLOCK(sessmanagersect,60000); ForEachItemIn(j,tonotify) { CSessionSubscriptionStub &stub = tonotify.item(j); try { stub.notify(abort); } catch (IException *e) { e->Release(); } // subscriber session may abort during stopSession } } const CSessionState *state = sessionstates.query(id); if (state) { const CProcessSessionState *pstate = QUERYINTERFACE(state,const CProcessSessionState); if (pstate) processlookup.remove(&pstate->queryNode(),this); sessionstates.remove(id); } } void onClose(SocketEndpoint &ep) { StringBuffer str; PROGLOG("Client closed (%s)",ep.getUrlStr(str).str()); SessionId idtostop; { CHECKEDCRITICALBLOCK(sessmanagersect,60000); Owned node = createINode(ep); if (queryCoven().inCoven(node)) { StringBuffer str; PROGLOG("Coven Session Stopping (%s)",ep.getUrlStr(str).str()); // more TBD here return; } CProcessSessionState *s= processlookup.query(node); if (!s) return; idtostop = s->getId(); } stopSession(idtostop,true); } StringBuffer &getClientProcessList(StringBuffer &buf) { CHECKEDCRITICALBLOCK(sessmanagersect,60000); unsigned n = processlookup.count(); CProcessSessionState *s=NULL; for (unsigned i=0;igetDetails(buf).append('\n'); } return buf; } StringBuffer &getClientProcessEndpoint(SessionId id,StringBuffer &buf) { CHECKEDCRITICALBLOCK(sessmanagersect,60000); const CSessionState *state = sessionstates.query(id); if (state) { const CProcessSessionState *pstate = QUERYINTERFACE(state,const CProcessSessionState); if (pstate) return pstate->queryNode().endpoint().getUrlStr(buf); } return buf; } unsigned queryClientCount() { CHECKEDCRITICALBLOCK(sessmanagersect,60000); return processlookup.count(); } }; ISessionManager &querySessionManager() { CriticalBlock block(sessionCrit); if (!SessionManager) { assertex(!isCovenActive()||!queryCoven().inCoven()); // Check not Coven server (if occurs - not initialized correctly; // If !coven someone is checking for dali so allow SessionManager = new CClientSessionManager(); } return *SessionManager; } class CDaliSessionServer: public CInterface, public IDaliServer { public: IMPLEMENT_IINTERFACE; CDaliSessionServer() { } void start() { CriticalBlock block(sessionCrit); assertex(queryCoven().inCoven()); // must be member of coven CCovenSessionManager *serv = new CCovenSessionManager(); SessionManagerServer = serv; SessionManager = serv; SessionManagerServer->start(); } void suspend() { } void stop() { CriticalBlock block(sessionCrit); SessionManagerServer->stop(); SessionManagerServer->Release(); SessionManagerServer = NULL; SessionManager = NULL; } void ready() { SessionManagerServer->ready(); } void nodeDown(rank_t rank) { assertex(!"TBD"); } }; IDaliServer *createDaliSessionServer() { return new CDaliSessionServer(); } void setLDAPconnection(IDaliLdapConnection *ldapconn) { if (SessionManagerServer) SessionManagerServer->setLDAPconnection(ldapconn); } void setClientAuth(IDaliClientAuthConnection *authconn) { if (SessionManagerServer) SessionManagerServer->setClientAuth(authconn); } #define REG_SLEEP 5000 bool registerClientProcess(ICommunicator *comm, IGroup *& retcoven,unsigned timeout,DaliClientRole role) { // NB doesn't use coven as not yet set up if (comm->queryGroup().ordinality()==0) return false; CTimeMon tm(timeout); retcoven = NULL; unsigned nextLog = 0, lastNextLog = 0; unsigned t=REG_SLEEP; unsigned remaining; rank_t r; while (!tm.timedout(&remaining)) { if (remaining>t) remaining = t; r = getRandom()%comm->queryGroup().ordinality(); bool ok = false; if (remaining>10000) // MP protocol has a minimum time of 10s so if remaining < 10s use a detachable thread ok = comm->verifyConnection(r,remaining); else { struct cThread: public Thread { IMPLEMENT_IINTERFACE; Semaphore sem; Linked comm; bool ok; Owned exc; unsigned remaining; rank_t r; cThread(ICommunicator *_comm,rank_t _r,unsigned _remaining) : Thread ("dasess.registerClientProcess"), comm(_comm) { r = _r; remaining = _remaining; ok = false; } int run() { try { if (comm->verifyConnection(r,remaining)) ok = true; } catch (IException *e) { exc.setown(e); } sem.signal(); return 0; } } *t = new cThread(comm,r,remaining); t->start(); t->sem.wait(remaining); ok = t->ok; if (t->exc.get()) { IException *e = t->exc.getClear(); t->Release(); throw e; } t->Release(); } if (ok) { CMessageBuffer mb; mb.append((int)MSR_REGISTER_PROCESS_SESSION); queryMyNode()->serialize(mb); comm->queryGroup().queryNode(r).serialize(mb); mb.append((int)role); if (comm->sendRecv(mb,r,MPTAG_DALI_SESSION_REQUEST,SESSIONREPLYTIMEOUT)) { if (!mb.length()) { // failed system capability match, retcoven = comm->getGroup(); // continue as if - will fail later more obscurely. return true; } mb.read(mySessionId); retcoven = deserializeIGroup(mb); return true; } } StringBuffer str; PROGLOG("Failed to connect to Dali Server %s.", comm->queryGroup().queryNode(r).endpoint().getUrlStr(str).str()); if (tm.timedout()) { PROGLOG("%s", str.append(" Timed out.").str()); break; } else if (0 == nextLog) { PROGLOG("%s", str.append(" Retrying...").str()); if ((lastNextLog * REG_SLEEP) >= 60000) // limit to a minute between logging nextLog = 60000 / REG_SLEEP; else nextLog = lastNextLog + 2; // wait +2 REG_SLEEP pauses before next logging lastNextLog = nextLog; } else --nextLog; Sleep(REG_SLEEP); } return false; } extern void stopClientProcess() { CriticalBlock block(sessionCrit); if (mySessionId&&SessionManager) { try { querySessionManager().stopSession(mySessionId,false); } catch (IDaliClient_Exception *e) { if (e->errorCode()!=DCERR_server_closed) throw; e->Release(); } mySessionId = 0; } } class CProcessSessionWatchdog { }; class CUserDescriptor: public CInterface, implements IUserDescriptor { StringAttr username; StringAttr passwordenc; public: IMPLEMENT_IINTERFACE; StringBuffer &getUserName(StringBuffer &buf) { return buf.append(username); } StringBuffer &getPassword(StringBuffer &buf) { decrypt(buf,passwordenc); return buf; } virtual void set(const char *name,const char *password) { username.set(name); StringBuffer buf; encrypt(buf,password); passwordenc.set(buf.str()); } virtual void clear() { username.clear(); passwordenc.clear(); } void serialize(MemoryBuffer &mb) { mb.append(username).append(passwordenc); } void deserialize(MemoryBuffer &mb) { mb.read(username).read(passwordenc); } }; IUserDescriptor *createUserDescriptor() { return new CUserDescriptor; } MODULE_INIT(INIT_PRIORITY_DALI_DASESS) { return true; } MODULE_EXIT() { ::Release(SessionManager); SessionManager = NULL; }