/*##############################################################################
Copyright (C) 2011 HPCC Systems.
All rights reserved. This program is free software: you can redistribute it and/or modify
it under the terms of the GNU Affero General Public License as
published by the Free Software Foundation, either version 3 of the
License, or (at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU Affero General Public License for more details.
You should have received a copy of the GNU Affero General Public License
along with this program. If not, see .
############################################################################## */
#define da_decl __declspec(dllexport)
#include "platform.h"
#include "jlib.hpp"
#include "jfile.hpp"
#include "jsuperhash.hpp"
#include "jmisc.hpp"
#include "jencrypt.hpp"
#include "dacoven.hpp"
#include "mpbuff.hpp"
#include "mpcomm.hpp"
#include "mputil.hpp"
#include "daserver.hpp"
#include "dasubs.ipp"
#include "dasds.hpp"
#include "daclient.hpp"
#include "daldap.hpp"
#include "dasess.hpp"
#ifdef _MSC_VER
#pragma warning (disable : 4355)
#endif
const char *queryRoleName(DaliClientRole role)
{
switch (role) {
case DCR_Private: return "Private";
case DCR_Diagnostic: return "Diagnostic";
case DCR_ThorSlave: return "ThorSlave";
case DCR_ThorMaster: return "ThorMaster";
case DCR_EclServer: return "EclServer";
case DCR_EclScheduler: return "EclScheduler";
case DCR_EclAgent: return "EclAgent";
case DCR_AgentExec: return "AgentExec";
case DCR_DaliServer:return "DaliServer";
case DCR_SashaServer: return "SashaServer";
case DCR_Util: return "Util";
case DCR_Dfu: return "Dfu";
case DCR_DfuServer: return "DfuServer";
case DCR_EspServer: return "EspServer";
case DCR_WuClient: return "WuClient";
case DCR_Config: return "Config";
case DCR_Scheduler: return "Scheduler";
case DCR_RoxyMaster: return "RoxieMaster";
case DCR_RoxySlave: return "RoxieSlave";
case DCR_BackupGen: return "BackupGen";
case DCR_Other: return "Other";
}
return "Unknown";
}
interface ISessionManagerServer: implements IConnectionMonitor
{
virtual SessionId registerSession(SecurityToken tok,SessionId parentid) = 0;
virtual SessionId registerClientProcess(INode *node,IGroup *&grp, DaliClientRole role) = 0;
virtual void addProcessSession(SessionId id, INode *node, DaliClientRole role) = 0;
virtual void addSession(SessionId id) = 0;
virtual SessionId lookupProcessSession(INode *node) = 0;
virtual INode *getProcessSessionNode(SessionId id) =0;
virtual int getPermissionsLDAP(const char *key,const char *obj,IUserDescriptor *udesc,unsigned flags, int *err)=0;
virtual void stopSession(SessionId sessid,bool failed) = 0;
virtual void setClientAuth(IDaliClientAuthConnection *authconn) = 0;
virtual void setLDAPconnection(IDaliLdapConnection *_ldapconn) = 0;
virtual bool authorizeConnection(int role,bool revoke) = 0;
virtual void start() = 0;
virtual void ready() = 0;
virtual void stop() = 0;
};
static SessionId mySessionId=0;
static ISessionManager *SessionManager=NULL;
static ISessionManagerServer *SessionManagerServer=NULL;
static CriticalSection sessionCrit;
#define SESSIONREPLYTIMEOUT (3*60*1000)
#define CLDAPE_getpermtimeout (-1)
#define CLDAPE_ldapfailure (-2)
class CDaliLDAP_Exception: public CInterface, implements IException
{
int errcode;
public:
CDaliLDAP_Exception(int _errcode)
{
errcode = _errcode;
}
int errorCode() const { return errcode; }
StringBuffer & errorMessage(StringBuffer &str) const
{
if (errcode==0)
return str;
str.appendf("LDAP Exception(%d): ",errcode);
if (errcode==CLDAPE_getpermtimeout)
return str.append("getPermissionsLDAP - timeout to LDAP server");
if (errcode==CLDAPE_ldapfailure)
return str.append("getPermissionsLDAP - LDAP server failure");
return str.append("Unknown Exception");
}
MessageAudience errorAudience() const { return MSGAUD_user; }
IMPLEMENT_IINTERFACE;
};
class CdelayedTerminate: public Thread // slightly obfuscated stop code
{
byte err;
int run()
{
while (getRandom()%711!=0) getRandom(); // look busy
ERRLOG("Server fault %d",(int)err);
while (getRandom()%7!=0) Sleep(1);
exit(0);
}
public:
CdelayedTerminate(byte _err)
{
err = _err;
start();
Release();
Sleep(100);
}
};
class CSessionState: public CInterface
{
protected: friend class CSessionStateTable;
SessionId id;
public:
CSessionState(SessionId _id)
{
id = _id;
}
SessionId getId() const
{
return id;
}
};
class CSessionStateTable: private SuperHashTableOf
{
CheckedCriticalSection sessstatesect;
void onAdd(void *) {}
void onRemove(void *e)
{
CSessionState &elem=*(CSessionState *)e;
elem.Release();
}
unsigned getHashFromElement(const void *e) const
{
const CSessionState &elem=*(const CSessionState *)e;
SessionId id=elem.id;
return low(id)^(unsigned)high(id);
}
unsigned getHashFromFindParam(const void *fp) const
{
SessionId id = *(const SessionId *)fp;
return low(id)^(unsigned)high(id);
}
const void * getFindParam(const void *p) const
{
const CSessionState &elem=*(const CSessionState *)p;
return (void *)&elem.id;
}
bool matchesFindParam(const void * et, const void *fp, unsigned) const
{
return ((CSessionState *)et)->id==*(SessionId *)fp;
}
IMPLEMENT_SUPERHASHTABLEOF_REF_FIND(CSessionState,SessionId);
public:
CSessionStateTable()
{
}
~CSessionStateTable() {
CHECKEDCRITICALBLOCK(sessstatesect,60000);
releaseAll();
}
bool add(CSessionState *e) // takes ownership
{
CHECKEDCRITICALBLOCK(sessstatesect,60000);
if (SuperHashTableOf::find(&e->id))
return false;
SuperHashTableOf::add(*e);
return true;
}
const CSessionState *query(SessionId id)
{
CHECKEDCRITICALBLOCK(sessstatesect,60000);
return SuperHashTableOf::find(&id);
}
void remove(SessionId id)
{
CHECKEDCRITICALBLOCK(sessstatesect,60000);
SuperHashTableOf::remove(&id);
}
};
class CProcessSessionState: public CSessionState
{
INode *node;
DaliClientRole role;
public:
CProcessSessionState(SessionId id,INode *_node,DaliClientRole _role)
: CSessionState(id)
{
node = _node;
node->Link();
role = _role;
}
~CProcessSessionState()
{
node->Release();
}
INode &queryNode() const
{
return *node;
}
DaliClientRole queryRole() const
{
return role;
}
StringBuffer &getDetails(StringBuffer &buf)
{
StringBuffer ep;
return buf.appendf("%16"I64F"X: %s, role=%s",CSessionState::id,node->endpoint().getUrlStr(ep).str(),queryRoleName(role));
}
};
class CMapProcessToSession: private SuperHashTableOf
{
CheckedCriticalSection mapprocesssect;
void onAdd(void *) {}
void onRemove(void *e) {} // do nothing
unsigned getHashFromElement(const void *e) const
{
const CProcessSessionState &elem=*(const CProcessSessionState *)e;
return elem.queryNode().getHash();
}
unsigned getHashFromFindParam(const void *fp) const
{
return ((INode *)fp)->getHash();
}
const void * getFindParam(const void *p) const
{
const CProcessSessionState &elem=*(const CProcessSessionState *)p;
return (void *)&elem.queryNode();
}
bool matchesFindParam(const void * et, const void *fp, unsigned) const
{
return ((CProcessSessionState *)et)->queryNode().equals((INode *)fp);
}
IMPLEMENT_SUPERHASHTABLEOF_REF_FIND(CProcessSessionState,INode);
public:
CMapProcessToSession()
{
}
~CMapProcessToSession()
{
CHECKEDCRITICALBLOCK(mapprocesssect,60000);
releaseAll();
}
bool add(CProcessSessionState *e)
{
CHECKEDCRITICALBLOCK(mapprocesssect,60000);
if (SuperHashTableOf::find(&e->queryNode()))
return false;
SuperHashTableOf::add(*e);
return true;
}
CProcessSessionState *query(INode *n)
{
CHECKEDCRITICALBLOCK(mapprocesssect,60000);
return SuperHashTableOf::find(n);
}
void remove(INode *n,ISessionManagerServer *manager)
{
CHECKEDCRITICALBLOCK(mapprocesssect,60000);
CProcessSessionState *sstate = SuperHashTableOf::find(n);
if (sstate) {
if (manager)
manager->authorizeConnection(sstate->queryRole(),true);
SuperHashTableOf::removeExact(sstate);
}
}
unsigned count()
{
CHECKEDCRITICALBLOCK(mapprocesssect,60000);
return SuperHashTableOf::count();
}
CProcessSessionState *next(const CProcessSessionState *s)
{
CHECKEDCRITICALBLOCK(mapprocesssect,60000);
return (CProcessSessionState *)SuperHashTableOf::next(s);
}
};
enum MSessionRequestKind {
MSR_REGISTER_PROCESS_SESSION,
MSR_SECONDARY_REGISTER_PROCESS_SESSION,
MSR_REGISTER_SESSION,
MSR_SECONDARY_REGISTER_SESSION,
MSR_LOOKUP_PROCESS_SESSION,
MSR_STOP_SESSION,
MSR_IMPORT_CAPABILITIES,
MSR_LOOKUP_LDAP_PERMISSIONS,
MSR_EXIT // TBD
};
class CSessionRequestServer: public Thread
{
bool stopped;
ISessionManagerServer &manager;
Semaphore acceptConnections;
public:
CSessionRequestServer(ISessionManagerServer &_manager)
: Thread("Session Manager, CSessionRequestServer"), manager(_manager)
{
stopped = true;
}
int run()
{
ICoven &coven=queryCoven();
CMessageHandler handler("CSessionRequestServer",this,&CSessionRequestServer::processMessage);
stopped = false;
CMessageBuffer mb;
while (!stopped) {
try {
mb.clear();
if (coven.recv(mb,RANK_ALL,MPTAG_DALI_SESSION_REQUEST,NULL))
handler.handleMessage(mb);
else
stopped = true;
}
catch (IException *e)
{
EXCLOG(e, "CDaliPublisherServer");
e->Release();
}
}
return 0;
}
void processMessage(CMessageBuffer &mb)
{
ICoven &coven=queryCoven();
SessionId id;
int fn;
mb.read(fn);
switch (fn) {
case MSR_REGISTER_PROCESS_SESSION: {
acceptConnections.wait();
acceptConnections.signal();
Owned node(deserializeINode(mb));
Owned servernode(deserializeINode(mb)); // hopefully me, but not if forwarded
int role=0;
if (mb.length()-mb.getPos()>=sizeof(role)) { // a capability block present
mb.read(role);
if (!manager.authorizeConnection(role,false)) {
SocketEndpoint sender = mb.getSender();
mb.clear();
coven.reply(mb);
MilliSleep(100+getRandom()%1000); // Causes client to 'work' for a short time.
Owned node = createINode(sender);
coven.disconnect(node);
break;
}
#ifdef _DEBUG
StringBuffer eps;
PROGLOG("Connection to %s authorized",mb.getSender().getUrlStr(eps).str());
#endif
}
IGroup *covengrp;
id = manager.registerClientProcess(node.get(),covengrp,(DaliClientRole)role);
mb.clear().append(id);
if (covengrp->rank(servernode)==RANK_NULL) { // must have been redirected
covengrp->Release(); // no good, so just use one we know about (may use something more sophisticated later)
INode *na = servernode.get();
covengrp = createIGroup(1, &na);
}
covengrp->serialize(mb);
covengrp->Release();
coven.reply(mb);
}
break;
case MSR_SECONDARY_REGISTER_PROCESS_SESSION: {
mb.read(id);
Owned node (deserializeINode(mb));
int role;
mb.read(role);
manager.addProcessSession(id,node.get(),(DaliClientRole)role);
mb.clear();
coven.reply(mb);
}
break;
case MSR_REGISTER_SESSION: {
SecurityToken tok;
SessionId parentid;
mb.read(tok).read(parentid);
SessionId id = manager.registerSession(tok,parentid);
mb.clear().append(id);
coven.reply(mb);
}
break;
case MSR_SECONDARY_REGISTER_SESSION: {
mb.read(id);
manager.addSession(id);
mb.clear();
coven.reply(mb);
}
break;
case MSR_LOOKUP_PROCESS_SESSION: {
// looks up from node or from id
Owned node (deserializeINode(mb));
if (node->endpoint().isNull()&&(mb.length()-mb.getPos()>=sizeof(id))) {
mb.read(id);
INode *n = manager.getProcessSessionNode(id);
if (n)
node.setown(n);
node->serialize(mb.clear());
}
else {
id = manager.lookupProcessSession(node.get());
mb.clear().append(id);
}
coven.reply(mb);
}
break;
case MSR_STOP_SESSION: {
SessionId sessid;
bool failed;
mb.read(sessid).read(failed);
manager.stopSession(sessid,failed);
mb.clear();
coven.reply(mb);
}
break;
case MSR_LOOKUP_LDAP_PERMISSIONS: {
StringAttr key;
StringAttr obj;
Owned udesc=createUserDescriptor();
StringAttr username;
StringAttr passwordenc;
mb.read(key).read(obj);
udesc->deserialize(mb);
#ifdef _DALIUSER_STACKTRACE
//following debug code to be removed
StringBuffer sb;
udesc->getUserName(sb);
if (0==sb.length() || !strcmpi(sb.str(), "daliuser"))
{
DBGLOG("UNEXPECTED USER '%s' in %s line %ld",username,__FILE__, __LINE__);
PrintStackReport();
}
#endif
unsigned auditflags = 0;
if (mb.length()-mb.getPos()>=sizeof(auditflags))
mb.read(auditflags);
int err = 0;
int ret=manager.getPermissionsLDAP(key,obj,udesc,auditflags,&err);
mb.clear().append(ret);
if (err)
mb.append(err);
coven.reply(mb);
}
break;
}
}
void ready()
{
acceptConnections.signal();
}
void stop()
{
if (!stopped) {
stopped = true;
queryCoven().cancel(RANK_ALL, MPTAG_DALI_SESSION_REQUEST);
}
join();
}
};
class CSessionManagerBase: public CInterface, implements ISessionManager
{
protected:
CheckedCriticalSection sessmanagersect;
public:
IMPLEMENT_IINTERFACE;
CSessionManagerBase()
{
servernotifys.kill();
}
virtual ~CSessionManagerBase()
{
}
class CSessionSubscriptionProxy: public CInterface, implements ISubscription
{
ISessionNotify *sub;
SubscriptionId id;
MemoryAttr ma;
SessionId sessid;
public:
IMPLEMENT_IINTERFACE;
CSessionSubscriptionProxy(ISessionNotify *_sub,SessionId _sessid)
{
sub = LINK(_sub);
sessid = _sessid;
MemoryBuffer mb;
mb.append(sessid);
ma.set(mb.length(),mb.toByteArray());
id = queryCoven().getUniqueId();
}
~CSessionSubscriptionProxy()
{
sub->Release();
}
ISessionNotify *queryNotify()
{
return sub;
}
const MemoryAttr &queryData()
{
return ma;
}
SubscriptionId getId()
{
return id;
}
void doNotify(bool aborted)
{
if (aborted)
sub->aborted(sessid);
else
sub->closed(sessid);
}
void notify(MemoryBuffer &mb)
{
bool aborted;
mb.read(aborted);
doNotify(aborted);
}
void abort()
{
//NH: TBD
}
bool aborted()
{
return false;
}
};
CIArrayOfservernotifys;
SubscriptionId subscribeSession(SessionId sessid, ISessionNotify *inotify)
{
CSessionSubscriptionProxy *proxy;
SubscriptionId id;
{
CHECKEDCRITICALBLOCK(sessmanagersect,60000);
proxy = new CSessionSubscriptionProxy(inotify,sessid);
id = proxy->getId();
if (sessid==SESSID_DALI_SERVER) {
servernotifys.append(*proxy);
return id;
}
}
querySubscriptionManager(SESSION_PUBLISHER)->add(proxy,id);
return id;
}
void unsubscribeSession(SubscriptionId id)
{
{
CHECKEDCRITICALBLOCK(sessmanagersect,60000);
// check not a server subscription
ForEachItemIn(i,servernotifys) {
if (servernotifys.item(i).getId()==id) {
servernotifys.remove(i);
return;
}
}
}
querySubscriptionManager(SESSION_PUBLISHER)->remove(id);
}
class cNotify: public CInterface, implements ISessionNotify
{
public:
IMPLEMENT_IINTERFACE;
Semaphore sem;
void closed(SessionId id)
{
//PROGLOG("Session closed %"I64F"x",id);
sem.signal();
}
void aborted(SessionId id)
{
//PROGLOG("Session aborted %"I64F"x",id);
sem.signal();
}
};
bool sessionStopped(SessionId id, unsigned timeout)
{
Owned node = getProcessSessionNode(id);
if (node.get()==NULL)
return true;
if (timeout==0)
return false;
Owned cnotify = new cNotify;
querySessionManager().subscribeSession(id,cnotify);
if (cnotify->sem.wait(timeout))
return false;
node.setown(getProcessSessionNode(id));
return node.get()==NULL;
}
void notifyServerStopped(bool aborted)
{
CHECKEDCRITICALBLOCK(sessmanagersect,60000);
// check not a server subscription
ForEachItemIn(i,servernotifys) {
servernotifys.item(i).doNotify(aborted);
}
}
};
class CClientSessionManager: public CSessionManagerBase, implements IConnectionMonitor
{
bool securitydisabled;
public:
IMPLEMENT_IINTERFACE;
CClientSessionManager()
{
securitydisabled = false;
}
virtual ~CClientSessionManager()
{
stop();
}
SessionId lookupProcessSession(INode *node)
{
if (!node)
return mySessionId;
CMessageBuffer mb;
mb.append((int)MSR_LOOKUP_PROCESS_SESSION);
node->serialize(mb);
if (!queryCoven().sendRecv(mb,RANK_RANDOM,MPTAG_DALI_SESSION_REQUEST,SESSIONREPLYTIMEOUT))
return 0;
SessionId ret;
mb.read(ret);
return ret;
}
virtual INode *getProcessSessionNode(SessionId id)
{
if (!id)
return NULL;
CMessageBuffer mb;
mb.append((int)MSR_LOOKUP_PROCESS_SESSION);
queryNullNode()->serialize(mb);
mb.append(id);
if (!queryCoven().sendRecv(mb,RANK_RANDOM,MPTAG_DALI_SESSION_REQUEST,SESSIONREPLYTIMEOUT))
return NULL;
Owned node = deserializeINode(mb);
if (node->endpoint().isNull())
return NULL;
return node.getClear();
}
int getPermissionsLDAP(const char *key,const char *obj,IUserDescriptor *udesc,unsigned auditflags,int *err)
{
if (err)
*err = 0;
if (securitydisabled)
return -1;
if (queryDaliServerVersion().compare("1.8") < 0) {
securitydisabled = true;
return -1;
}
CMessageBuffer mb;
mb.append((int)MSR_LOOKUP_LDAP_PERMISSIONS);
mb.append(key).append(obj);
#ifdef _DALIUSER_STACKTRACE
//following debug code to be removed
StringBuffer sb;
udesc->getUserName(sb);
if (0==sb.length() || !strcmpi(sb.str(), "daliuser"))
{
DBGLOG("UNEXPECTED USER '%s' in %s line %ld",sb.str(),__FILE__, __LINE__);
PrintStackReport();
}
#endif
udesc->serialize(mb);
mb.append(auditflags);
if (!queryCoven().sendRecv(mb,RANK_RANDOM,MPTAG_DALI_SESSION_REQUEST,SESSIONREPLYTIMEOUT))
return 0;
int ret=-1;
if (mb.remaining()>=sizeof(ret)) {
mb.read(ret);
int e = 0;
if (mb.remaining()>=sizeof(e)) {
if (err)
*err = e;
else if (e)
throw new CDaliLDAP_Exception(e);
}
}
if (ret==-1)
securitydisabled = true;
return ret;
}
bool checkScopeScansLDAP()
{
assertex(!"checkScopeScansLDAP called on client");
return true; // actually only used server size
}
unsigned getLDAPflags()
{
assertex(!"getLdapFlags called on client");
return 0;
}
void setLDAPflags(unsigned)
{
assertex(!"setLdapFlags called on client");
}
bool authorizeConnection(DaliClientRole,bool)
{
return true;
}
SessionId startSession(SecurityToken tok, SessionId parentid)
{
CMessageBuffer mb;
mb.append((int)MSR_REGISTER_SESSION).append(tok).append(parentid);
if (!queryCoven().sendRecv(mb,RANK_RANDOM,MPTAG_DALI_SESSION_REQUEST),SESSIONREPLYTIMEOUT)
return 0;
SessionId ret;
mb.read(ret);
return ret;
}
void stopSession(SessionId sessid, bool failed)
{
if (sessid==SESSID_DALI_SERVER) {
notifyServerStopped(failed);
return;
}
CMessageBuffer mb;
mb.append((int)MSR_STOP_SESSION).append(sessid).append(failed);
queryCoven().sendRecv(mb,RANK_RANDOM,MPTAG_DALI_SESSION_REQUEST,SESSIONREPLYTIMEOUT);
}
void onClose(SocketEndpoint &ep)
{
CHECKEDCRITICALBLOCK(sessmanagersect,60000);
Owned node = createINode(ep);
if (queryCoven().inCoven(node)) {
StringBuffer str;
PROGLOG("Coven Session Stopping (%s)",ep.getUrlStr(str).str());
if (queryCoven().size()==1)
notifyServerStopped(true);
}
}
void start()
{
addMPConnectionMonitor(this);
}
void stop()
{
}
void ready()
{
removeMPConnectionMonitor(this);
}
StringBuffer &getClientProcessList(StringBuffer &buf)
{
// dummy
return buf;
}
StringBuffer &getClientProcessEndpoint(SessionId,StringBuffer &buf)
{
// dummy
return buf;
}
unsigned queryClientCount()
{
// dummy
return 0;
}
void importCapabilities(MemoryBuffer &mb)
{
CMessageBuffer msg;
msg.append((int)MSR_IMPORT_CAPABILITIES);
msg.append(mb.length(), mb.toByteArray());
queryCoven().sendRecv(msg,RANK_RANDOM,MPTAG_DALI_SESSION_REQUEST,SESSIONREPLYTIMEOUT);
}
};
class CLdapWorkItem : public Thread
{
StringAttr key;
StringAttr obj;
Linked udesc;
Linked ldapconn;
unsigned flags;
bool running;
Semaphore contsem;
Semaphore ready;
Semaphore &threaddone;
int ret;
public:
IMPLEMENT_IINTERFACE;
CLdapWorkItem(IDaliLdapConnection *_ldapconn,Semaphore &_threaddone)
: ldapconn(_ldapconn), threaddone(_threaddone)
{
running = false;
}
void start(const char *_key,const char *_obj,IUserDescriptor *_udesc,unsigned _flags)
{
key.set(_key);
obj.set(_obj);
udesc.set(_udesc);
flags = _flags;
ret = CLDAPE_ldapfailure;
if (!running) {
running = true;
Thread::start();
}
contsem.signal();
}
int run()
{
loop {
contsem.wait();
if (!running)
break;
try {
ret = ldapconn->getPermissions(key,obj,udesc,flags);
}
catch(IException *e) {
LOG(MCoperatorError, unknownJob, e, "CLdapWorkItem");
e->Release();
}
ready.signal();
}
threaddone.signal();
return 0;
}
bool wait(unsigned timeout, int &_ret)
{
if (ready.wait(timeout)) {
_ret = ret;
return true;
}
_ret = 0;
return false;
}
void stop()
{
running = false;
contsem.signal();
}
static CLdapWorkItem *get(IDaliLdapConnection *_ldapconn,Semaphore &_threaddone)
{
if (!_threaddone.wait(1000*60*5)) {
ERRLOG("Too many stalled LDAP threads");
return NULL;
}
return new CLdapWorkItem(_ldapconn,_threaddone);
}
};
class CCovenSessionManager: public CSessionManagerBase, implements ISessionManagerServer, implements ISubscriptionManager
{
CSessionRequestServer sessionrequestserver;
CSessionStateTable sessionstates;
CMapProcessToSession processlookup;
Owned ldapconn;
Owned ldapworker;
Semaphore ldapsig;
atomic_t ldapwaiting;
Semaphore workthreadsem;
bool stopping;
void remoteAddProcessSession(rank_t dst,SessionId id,INode *node, DaliClientRole role)
{
CMessageBuffer mb;
mb.append((int)MSR_SECONDARY_REGISTER_PROCESS_SESSION).append(id);
node->serialize(mb);
int r = (int)role;
mb.append(role);
queryCoven().sendRecv(mb,dst,MPTAG_DALI_SESSION_REQUEST);
// no fail currently
}
void remoteAddSession(rank_t dst,SessionId id)
{
CMessageBuffer mb;
mb.append((int)MSR_SECONDARY_REGISTER_SESSION).append(id);
queryCoven().sendRecv(mb,dst,MPTAG_DALI_SESSION_REQUEST);
// no fail currently
}
public:
IMPLEMENT_IINTERFACE;
CCovenSessionManager()
: sessionrequestserver(*this)
{
mySessionId = queryCoven().getUniqueId(); // tell others in coven TBD
registerSubscriptionManager(SESSION_PUBLISHER,this);
atomic_set(&ldapwaiting,0);
workthreadsem.signal(10);
stopping = false;
ldapsig.signal();
}
~CCovenSessionManager()
{
stubs.kill();
}
void start()
{
sessionrequestserver.start();
}
void stop()
{
stopping = true;
if (!ldapsig.wait(60*1000))
WARNLOG("LDAP stalled(1)");
if (ldapworker) {
ldapworker->stop();
if (!ldapworker->join(1000))
WARNLOG("LDAP stalled(2)");
ldapworker.clear();
}
ldapconn.clear();
removeMPConnectionMonitor(this);
sessionrequestserver.stop();
}
void ready()
{
addMPConnectionMonitor(this);
sessionrequestserver.ready();
}
void setLDAPconnection(IDaliLdapConnection *_ldapconn)
{
if (_ldapconn&&(_ldapconn->getLDAPflags()&DLF_ENABLED))
ldapconn.setown(_ldapconn);
else {
ldapconn.clear();
::Release(_ldapconn);
}
}
void setClientAuth(IDaliClientAuthConnection *_authconn)
{
}
void addProcessSession(SessionId id,INode *client,DaliClientRole role)
{
StringBuffer str;
PROGLOG("Session starting %"I64F"x (%s) : role=%s",id,client->endpoint().getUrlStr(str).str(),queryRoleName(role));
CHECKEDCRITICALBLOCK(sessmanagersect,60000);
CProcessSessionState *s = new CProcessSessionState(id,client,role);
while (!sessionstates.add(s)) { // takes ownership
WARNLOG("Dali session manager: session already registered");
sessionstates.remove(id);
}
while (!processlookup.add(s)) {
ERRLOG("Dali session manager: registerClient process session already registered");
processlookup.remove(client,this);
}
}
void addSession(SessionId id)
{
CHECKEDCRITICALBLOCK(sessmanagersect,60000);
CSessionState *s = new CSessionState(id);
while (!sessionstates.add(s)) { // takes ownership
WARNLOG("Dali session manager: session already registered (2)");
sessionstates.remove(id);
}
}
SessionId registerSession(SecurityToken, SessionId parent)
{
// this is where security token should be checked
// not in critical block (otherwise possibility of deadlock)
ICoven &coven=queryCoven();
SessionId id = coven.getUniqueId();
rank_t myrank = coven.getServerRank();
rank_t ownerrank = coven.chooseServer(id);
// first do local
if (myrank==ownerrank)
addSession(id);
else
remoteAddSession(ownerrank,id);
ForEachOtherNodeInGroup(r,coven.queryGroup()) {
if (r!=ownerrank)
remoteAddSession(r,id);
}
return id;
}
SessionId registerClientProcess(INode *client, IGroup *& retcoven, DaliClientRole role)
{
// not in critical block (otherwise possibility of deadlock)
retcoven = NULL;
ICoven &coven=queryCoven();
SessionId id = coven.getUniqueId();
rank_t myrank = coven.getServerRank();
rank_t ownerrank = coven.chooseServer(id);
// first do local
if (myrank==ownerrank)
addProcessSession(id,client,role);
else
remoteAddProcessSession(ownerrank,id,client,role);
ForEachOtherNodeInGroup(r,coven.queryGroup()) {
if (r!=ownerrank)
remoteAddProcessSession(r,id,client,role);
}
retcoven = coven.getGroup();
return id;
}
SessionId lookupProcessSession(INode *node)
{
if (!node)
return mySessionId;
CHECKEDCRITICALBLOCK(sessmanagersect,60000);
CProcessSessionState *s= processlookup.query(node);
return s?s->getId():0;
}
INode *getProcessSessionNode(SessionId id)
{
StringBuffer eps;
if (SessionManager->getClientProcessEndpoint(id,eps).length()!=0)
return createINode(eps.str());
return NULL;
}
virtual int getPermissionsLDAP(const char *key,const char *obj,IUserDescriptor *udesc,unsigned flags, int *err)
{
if (err)
*err = 0;
if (!ldapconn)
return -1;
#ifdef _NO_LDAP
return -1;
#else
if ((ldapconn->getLDAPflags()&(DLF_SAFE|DLF_ENABLED))!=(DLF_SAFE|DLF_ENABLED))
return ldapconn->getPermissions(key,obj,udesc,flags);
atomic_inc(&ldapwaiting);
unsigned retries = 0;
while (!stopping) {
if (ldapsig.wait(1000)) {
atomic_dec(&ldapwaiting);
if (!ldapworker)
ldapworker.setown(CLdapWorkItem::get(ldapconn,workthreadsem));
if (ldapworker) {
ldapworker->start(key,obj,udesc,flags);
for (unsigned i=0;i<10;i++) {
if (i)
WARNLOG("LDAP stalled(%d) - retrying",i);
int ret;
if (ldapworker->wait(1000*20,ret)) {
if (ret==CLDAPE_ldapfailure) {
LOG(MCoperatorError, unknownJob, "LDAP - failure (returning no access for %s)",obj);
ldapsig.signal();
if (err)
*err = CLDAPE_ldapfailure;
return 0;
}
else {
ldapsig.signal();
return ret;
}
}
if (atomic_read(&ldapwaiting)>10) // give up quicker if piling up
break;
if (i==5) { // one retry
ldapworker->stop(); // abandon thread
ldapworker.clear();
ldapworker.setown(CLdapWorkItem::get(ldapconn,workthreadsem));
if (ldapworker)
ldapworker->start(key,obj,udesc,flags);
}
}
if (ldapworker)
ldapworker->stop();
ldapworker.clear(); // abandon thread
}
LOG(MCoperatorError, unknownJob, "LDAP stalled - aborting (returning no access for %s)",obj);
ldapsig.signal();
if (err)
*err = CLDAPE_getpermtimeout;
return 0;
}
else {
unsigned waiting = atomic_read(&ldapwaiting);
static unsigned last=0;
static unsigned lasttick=0;
static unsigned first50=0;
if ((waiting!=last)&&(msTick()-lasttick>1000)) {
WARNLOG("%d threads waiting for ldap",waiting);
last = waiting;
lasttick = msTick();
}
if (waiting>50) {
if (first50==0)
first50 = msTick();
else if (msTick()-first50>60*1000) {
LOG(MCoperatorError, unknownJob, "LDAP stalled - aborting (returning 0 for %s)", obj);
if (err)
*err = CLDAPE_getpermtimeout;
break;
}
}
else
first50 = 0;
}
}
atomic_dec(&ldapwaiting);
return 0;
#endif
}
virtual bool checkScopeScansLDAP()
{
#ifdef _NO_LDAP
return false;
#else
return ldapconn?ldapconn->checkScopeScans():false;
#endif
}
virtual unsigned getLDAPflags()
{
#ifdef _NO_LDAP
return 0;
#else
return ldapconn?ldapconn->getLDAPflags():0;
#endif
}
void setLDAPflags(unsigned flags)
{
#ifndef _NO_LDAP
if (ldapconn)
ldapconn->setLDAPflags(flags);
#endif
}
bool authorizeConnection(int role,bool revoke)
{
return true;
}
SessionId startSession(SecurityToken tok, SessionId parentid)
{
return registerSession(tok,parentid);
}
void setSessionDependancy(SessionId id , SessionId dependsonid )
{
UNIMPLEMENTED; // TBD
}
void clearSessionDependancy(SessionId id , SessionId dependsonid )
{
UNIMPLEMENTED; // TBD
}
protected:
class CSessionSubscriptionStub: public CInterface
{
ISubscription *subs;
SubscriptionId id;
SessionId sessid;
public:
IMPLEMENT_IINTERFACE;
CSessionSubscriptionStub(ISubscription *_subs,SubscriptionId _id) // takes ownership
{
subs = _subs;
id = _id;
const MemoryAttr &ma = subs->queryData();
MemoryBuffer mb(ma.length(),ma.get());
mb.read(sessid);
}
~CSessionSubscriptionStub()
{
subs->Release();
}
SubscriptionId getId() { return id; }
SessionId getSessionId() { return sessid; }
void notify(bool abort)
{
MemoryBuffer mb;
mb.append(abort);
subs->notify(mb);
}
};
CIArrayOf stubs;
unsigned findsub(SubscriptionId id)
{
ForEachItemIn(i,stubs) {
CSessionSubscriptionStub &stub = stubs.item(i);
if (stub.getId()==id)
return i;
}
return NotFound;
}
void add(ISubscription *subs,SubscriptionId id)
{
CSessionSubscriptionStub *nstub;
{
CHECKEDCRITICALBLOCK(sessmanagersect,60000);
nstub = new CSessionSubscriptionStub(subs,id);
if (sessionstates.query(nstub->getSessionId())||(nstub->getSessionId()==mySessionId)) {
stubs.append(*nstub);
return;
}
}
// see if session known
MemoryBuffer mb;
bool abort=true;
mb.append(abort);
ERRLOG("Session Manager - adding unknown session ID %"I64F"x", nstub->getSessionId());
subs->notify(mb);
delete nstub;
return;
}
void remove(SubscriptionId id)
{
CHECKEDCRITICALBLOCK(sessmanagersect,60000);
unsigned i=findsub(id);
if (i!=NotFound)
stubs.remove(i);
}
void stopSession(SessionId id, bool abort)
{
PROGLOG("Session stopping %"I64F"x %s",id,abort?"aborted":"ok");
CHECKEDCRITICALBLOCK(sessmanagersect,60000);
// do in multiple stages as may remove one or more sub sussions
loop {
CIArrayOf tonotify;
for (unsigned i=stubs.ordinality();i;) {
CSessionSubscriptionStub &stub = stubs.item(--i);
if (stub.getSessionId()==id) {
stubs.remove(i, true);
tonotify.append(stub);
}
}
if (tonotify.ordinality()==0)
break;
CHECKEDCRITICALUNBLOCK(sessmanagersect,60000);
ForEachItemIn(j,tonotify) {
CSessionSubscriptionStub &stub = tonotify.item(j);
try { stub.notify(abort); }
catch (IException *e) { e->Release(); } // subscriber session may abort during stopSession
}
}
const CSessionState *state = sessionstates.query(id);
if (state) {
const CProcessSessionState *pstate = QUERYINTERFACE(state,const CProcessSessionState);
if (pstate)
processlookup.remove(&pstate->queryNode(),this);
sessionstates.remove(id);
}
}
void onClose(SocketEndpoint &ep)
{
StringBuffer str;
PROGLOG("Client closed (%s)",ep.getUrlStr(str).str());
SessionId idtostop;
{
CHECKEDCRITICALBLOCK(sessmanagersect,60000);
Owned node = createINode(ep);
if (queryCoven().inCoven(node)) {
StringBuffer str;
PROGLOG("Coven Session Stopping (%s)",ep.getUrlStr(str).str());
// more TBD here
return;
}
CProcessSessionState *s= processlookup.query(node);
if (!s)
return;
idtostop = s->getId();
}
stopSession(idtostop,true);
}
StringBuffer &getClientProcessList(StringBuffer &buf)
{
CHECKEDCRITICALBLOCK(sessmanagersect,60000);
unsigned n = processlookup.count();
CProcessSessionState *s=NULL;
for (unsigned i=0;igetDetails(buf).append('\n');
}
return buf;
}
StringBuffer &getClientProcessEndpoint(SessionId id,StringBuffer &buf)
{
CHECKEDCRITICALBLOCK(sessmanagersect,60000);
const CSessionState *state = sessionstates.query(id);
if (state) {
const CProcessSessionState *pstate = QUERYINTERFACE(state,const CProcessSessionState);
if (pstate)
return pstate->queryNode().endpoint().getUrlStr(buf);
}
return buf;
}
unsigned queryClientCount()
{
CHECKEDCRITICALBLOCK(sessmanagersect,60000);
return processlookup.count();
}
};
ISessionManager &querySessionManager()
{
CriticalBlock block(sessionCrit);
if (!SessionManager) {
assertex(!isCovenActive()||!queryCoven().inCoven()); // Check not Coven server (if occurs - not initialized correctly;
// If !coven someone is checking for dali so allow
SessionManager = new CClientSessionManager();
}
return *SessionManager;
}
class CDaliSessionServer: public CInterface, public IDaliServer
{
public:
IMPLEMENT_IINTERFACE;
CDaliSessionServer()
{
}
void start()
{
CriticalBlock block(sessionCrit);
assertex(queryCoven().inCoven()); // must be member of coven
CCovenSessionManager *serv = new CCovenSessionManager();
SessionManagerServer = serv;
SessionManager = serv;
SessionManagerServer->start();
}
void suspend()
{
}
void stop()
{
CriticalBlock block(sessionCrit);
SessionManagerServer->stop();
SessionManagerServer->Release();
SessionManagerServer = NULL;
SessionManager = NULL;
}
void ready()
{
SessionManagerServer->ready();
}
void nodeDown(rank_t rank)
{
assertex(!"TBD");
}
};
IDaliServer *createDaliSessionServer()
{
return new CDaliSessionServer();
}
void setLDAPconnection(IDaliLdapConnection *ldapconn)
{
if (SessionManagerServer)
SessionManagerServer->setLDAPconnection(ldapconn);
}
void setClientAuth(IDaliClientAuthConnection *authconn)
{
if (SessionManagerServer)
SessionManagerServer->setClientAuth(authconn);
}
#define REG_SLEEP 5000
bool registerClientProcess(ICommunicator *comm, IGroup *& retcoven,unsigned timeout,DaliClientRole role)
{
// NB doesn't use coven as not yet set up
if (comm->queryGroup().ordinality()==0)
return false;
CTimeMon tm(timeout);
retcoven = NULL;
unsigned nextLog = 0, lastNextLog = 0;
unsigned t=REG_SLEEP;
unsigned remaining;
rank_t r;
while (!tm.timedout(&remaining)) {
if (remaining>t)
remaining = t;
r = getRandom()%comm->queryGroup().ordinality();
bool ok = false;
if (remaining>10000) // MP protocol has a minimum time of 10s so if remaining < 10s use a detachable thread
ok = comm->verifyConnection(r,remaining);
else {
struct cThread: public Thread
{
IMPLEMENT_IINTERFACE;
Semaphore sem;
Linked comm;
bool ok;
Owned exc;
unsigned remaining;
rank_t r;
cThread(ICommunicator *_comm,rank_t _r,unsigned _remaining)
: Thread ("dasess.registerClientProcess"), comm(_comm)
{
r = _r;
remaining = _remaining;
ok = false;
}
int run()
{
try {
if (comm->verifyConnection(r,remaining))
ok = true;
}
catch (IException *e)
{
exc.setown(e);
}
sem.signal();
return 0;
}
} *t = new cThread(comm,r,remaining);
t->start();
t->sem.wait(remaining);
ok = t->ok;
if (t->exc.get()) {
IException *e = t->exc.getClear();
t->Release();
throw e;
}
t->Release();
}
if (ok) {
CMessageBuffer mb;
mb.append((int)MSR_REGISTER_PROCESS_SESSION);
queryMyNode()->serialize(mb);
comm->queryGroup().queryNode(r).serialize(mb);
mb.append((int)role);
if (comm->sendRecv(mb,r,MPTAG_DALI_SESSION_REQUEST,SESSIONREPLYTIMEOUT)) {
if (!mb.length())
{
// failed system capability match,
retcoven = comm->getGroup(); // continue as if - will fail later more obscurely.
return true;
}
mb.read(mySessionId);
retcoven = deserializeIGroup(mb);
return true;
}
}
StringBuffer str;
PROGLOG("Failed to connect to Dali Server %s.", comm->queryGroup().queryNode(r).endpoint().getUrlStr(str).str());
if (tm.timedout())
{
PROGLOG("%s", str.append(" Timed out.").str());
break;
}
else if (0 == nextLog)
{
PROGLOG("%s", str.append(" Retrying...").str());
if ((lastNextLog * REG_SLEEP) >= 60000) // limit to a minute between logging
nextLog = 60000 / REG_SLEEP;
else
nextLog = lastNextLog + 2; // wait +2 REG_SLEEP pauses before next logging
lastNextLog = nextLog;
}
else
--nextLog;
Sleep(REG_SLEEP);
}
return false;
}
extern void stopClientProcess()
{
CriticalBlock block(sessionCrit);
if (mySessionId&&SessionManager) {
try {
querySessionManager().stopSession(mySessionId,false);
}
catch (IDaliClient_Exception *e) {
if (e->errorCode()!=DCERR_server_closed)
throw;
e->Release();
}
mySessionId = 0;
}
}
class CProcessSessionWatchdog
{
};
class CUserDescriptor: public CInterface, implements IUserDescriptor
{
StringAttr username;
StringAttr passwordenc;
public:
IMPLEMENT_IINTERFACE;
StringBuffer &getUserName(StringBuffer &buf)
{
return buf.append(username);
}
StringBuffer &getPassword(StringBuffer &buf)
{
decrypt(buf,passwordenc);
return buf;
}
virtual void set(const char *name,const char *password)
{
username.set(name);
StringBuffer buf;
encrypt(buf,password);
passwordenc.set(buf.str());
}
virtual void clear()
{
username.clear();
passwordenc.clear();
}
void serialize(MemoryBuffer &mb)
{
mb.append(username).append(passwordenc);
}
void deserialize(MemoryBuffer &mb)
{
mb.read(username).read(passwordenc);
}
};
IUserDescriptor *createUserDescriptor()
{
return new CUserDescriptor;
}
MODULE_INIT(INIT_PRIORITY_DALI_DASESS)
{
return true;
}
MODULE_EXIT()
{
::Release(SessionManager);
SessionManager = NULL;
}