1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201120212031204120512061207120812091210121112121213121412151216121712181219122012211222122312241225122612271228122912301231123212331234123512361237123812391240124112421243124412451246124712481249125012511252125312541255125612571258125912601261126212631264126512661267126812691270127112721273127412751276127712781279128012811282128312841285128612871288128912901291129212931294129512961297129812991300130113021303130413051306130713081309131013111312131313141315131613171318131913201321132213231324132513261327132813291330133113321333133413351336133713381339134013411342134313441345134613471348134913501351135213531354135513561357135813591360136113621363136413651366136713681369137013711372137313741375137613771378137913801381138213831384138513861387138813891390139113921393139413951396139713981399140014011402140314041405140614071408140914101411141214131414141514161417141814191420142114221423142414251426142714281429143014311432143314341435143614371438143914401441144214431444144514461447144814491450145114521453145414551456145714581459146014611462146314641465146614671468146914701471147214731474147514761477147814791480148114821483148414851486148714881489149014911492149314941495149614971498149915001501150215031504150515061507150815091510151115121513151415151516151715181519152015211522152315241525152615271528152915301531153215331534153515361537153815391540154115421543154415451546154715481549155015511552155315541555155615571558155915601561156215631564156515661567156815691570157115721573157415751576157715781579158015811582158315841585158615871588158915901591159215931594159515961597159815991600160116021603160416051606160716081609161016111612161316141615161616171618161916201621162216231624162516261627162816291630163116321633163416351636163716381639164016411642164316441645164616471648164916501651165216531654165516561657165816591660166116621663166416651666166716681669167016711672167316741675167616771678167916801681168216831684168516861687168816891690169116921693169416951696169716981699170017011702170317041705170617071708170917101711171217131714171517161717171817191720 |
- /*##############################################################################
- Copyright (C) 2011 HPCC Systems.
- All rights reserved. This program is free software: you can redistribute it and/or modify
- it under the terms of the GNU Affero General Public License as
- published by the Free Software Foundation, either version 3 of the
- License, or (at your option) any later version.
- This program is distributed in the hope that it will be useful,
- but WITHOUT ANY WARRANTY; without even the implied warranty of
- MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- GNU Affero General Public License for more details.
- You should have received a copy of the GNU Affero General Public License
- along with this program. If not, see <http://www.gnu.org/licenses/>.
- ############################################################################## */
- #define da_decl __declspec(dllexport)
- #include "platform.h"
- #include "jlib.hpp"
- #include "jfile.hpp"
- #include "jsuperhash.hpp"
- #include "jmisc.hpp"
- #include "jencrypt.hpp"
- #include "dacoven.hpp"
- #include "mpbuff.hpp"
- #include "mpcomm.hpp"
- #include "mputil.hpp"
- #include "daserver.hpp"
- #include "dasubs.ipp"
- #include "dasds.hpp"
- #include "daclient.hpp"
- #include "daldap.hpp"
- #include "dasess.hpp"
- #ifdef _MSC_VER
- #pragma warning (disable : 4355)
- #endif
- const char *queryRoleName(DaliClientRole role)
- {
- switch (role) {
- case DCR_Private: return "Private";
- case DCR_Diagnostic: return "Diagnostic";
- case DCR_ThorSlave: return "ThorSlave";
- case DCR_ThorMaster: return "ThorMaster";
- case DCR_EclServer: return "EclServer";
- case DCR_EclScheduler: return "EclScheduler";
- case DCR_EclAgent: return "EclAgent";
- case DCR_AgentExec: return "AgentExec";
- case DCR_DaliServer:return "DaliServer";
- case DCR_SashaServer: return "SashaServer";
- case DCR_Util: return "Util";
- case DCR_Dfu: return "Dfu";
- case DCR_DfuServer: return "DfuServer";
- case DCR_EspServer: return "EspServer";
- case DCR_WuClient: return "WuClient";
- case DCR_Config: return "Config";
- case DCR_Scheduler: return "Scheduler";
- case DCR_RoxyMaster: return "RoxieMaster";
- case DCR_RoxySlave: return "RoxieSlave";
- case DCR_BackupGen: return "BackupGen";
- case DCR_Other: return "Other";
- }
- return "Unknown";
- }
- interface ISessionManagerServer: implements IConnectionMonitor
- {
- virtual SessionId registerSession(SecurityToken tok,SessionId parentid) = 0;
- virtual SessionId registerClientProcess(INode *node,IGroup *&grp, DaliClientRole role) = 0;
- virtual void addProcessSession(SessionId id, INode *node, DaliClientRole role) = 0;
- virtual void addSession(SessionId id) = 0;
- virtual SessionId lookupProcessSession(INode *node) = 0;
- virtual INode *getProcessSessionNode(SessionId id) =0;
- virtual int getPermissionsLDAP(const char *key,const char *obj,IUserDescriptor *udesc,unsigned flags, int *err)=0;
- virtual void stopSession(SessionId sessid,bool failed) = 0;
- virtual void setClientAuth(IDaliClientAuthConnection *authconn) = 0;
- virtual void setLDAPconnection(IDaliLdapConnection *_ldapconn) = 0;
- virtual bool authorizeConnection(int role,bool revoke) = 0;
- virtual void start() = 0;
- virtual void ready() = 0;
- virtual void stop() = 0;
- };
- static SessionId mySessionId=0;
- static ISessionManager *SessionManager=NULL;
- static ISessionManagerServer *SessionManagerServer=NULL;
- static CriticalSection sessionCrit;
- #define SESSIONREPLYTIMEOUT (3*60*1000)
- #define CLDAPE_getpermtimeout (-1)
- #define CLDAPE_ldapfailure (-2)
- class CDaliLDAP_Exception: public CInterface, implements IException
- {
- int errcode;
- public:
- CDaliLDAP_Exception(int _errcode)
- {
- errcode = _errcode;
- }
- int errorCode() const { return errcode; }
- StringBuffer & errorMessage(StringBuffer &str) const
- {
- if (errcode==0)
- return str;
- str.appendf("LDAP Exception(%d): ",errcode);
- if (errcode==CLDAPE_getpermtimeout)
- return str.append("getPermissionsLDAP - timeout to LDAP server");
- if (errcode==CLDAPE_ldapfailure)
- return str.append("getPermissionsLDAP - LDAP server failure");
- return str.append("Unknown Exception");
- }
- MessageAudience errorAudience() const { return MSGAUD_user; }
- IMPLEMENT_IINTERFACE;
- };
- class CdelayedTerminate: public Thread // slightly obfuscated stop code
- {
- byte err;
- int run()
- {
- while (getRandom()%711!=0) getRandom(); // look busy
- ERRLOG("Server fault %d",(int)err);
- while (getRandom()%7!=0) Sleep(1);
- exit(0);
- }
- public:
- CdelayedTerminate(byte _err)
- {
- err = _err;
- start();
- Release();
- Sleep(100);
- }
- };
- class CSessionState: public CInterface
- {
- protected: friend class CSessionStateTable;
- SessionId id;
- public:
- CSessionState(SessionId _id)
- {
- id = _id;
- }
- SessionId getId() const
- {
- return id;
- }
- };
- class CSessionStateTable: private SuperHashTableOf<CSessionState,SessionId>
- {
- CheckedCriticalSection sessstatesect;
- void onAdd(void *) {}
- void onRemove(void *e)
- {
- CSessionState &elem=*(CSessionState *)e;
- elem.Release();
- }
- unsigned getHashFromElement(const void *e) const
- {
- const CSessionState &elem=*(const CSessionState *)e;
- SessionId id=elem.id;
- return low(id)^(unsigned)high(id);
- }
- unsigned getHashFromFindParam(const void *fp) const
- {
- SessionId id = *(const SessionId *)fp;
- return low(id)^(unsigned)high(id);
- }
- const void * getFindParam(const void *p) const
- {
- const CSessionState &elem=*(const CSessionState *)p;
- return (void *)&elem.id;
- }
- bool matchesFindParam(const void * et, const void *fp, unsigned) const
- {
- return ((CSessionState *)et)->id==*(SessionId *)fp;
- }
- IMPLEMENT_SUPERHASHTABLEOF_REF_FIND(CSessionState,SessionId);
- public:
- CSessionStateTable()
- {
- }
- ~CSessionStateTable() {
- CHECKEDCRITICALBLOCK(sessstatesect,60000);
- releaseAll();
- }
- bool add(CSessionState *e) // takes ownership
- {
- CHECKEDCRITICALBLOCK(sessstatesect,60000);
- if (SuperHashTableOf<CSessionState,SessionId>::find(&e->id))
- return false;
- SuperHashTableOf<CSessionState,SessionId>::add(*e);
- return true;
- }
- const CSessionState *query(SessionId id)
- {
- CHECKEDCRITICALBLOCK(sessstatesect,60000);
- return SuperHashTableOf<CSessionState,SessionId>::find(&id);
- }
-
- void remove(SessionId id)
- {
- CHECKEDCRITICALBLOCK(sessstatesect,60000);
- SuperHashTableOf<CSessionState,SessionId>::remove(&id);
- }
- };
- class CProcessSessionState: public CSessionState
- {
- INode *node;
- DaliClientRole role;
- public:
- CProcessSessionState(SessionId id,INode *_node,DaliClientRole _role)
- : CSessionState(id)
- {
- node = _node;
- node->Link();
- role = _role;
- }
- ~CProcessSessionState()
- {
- node->Release();
- }
- INode &queryNode() const
- {
- return *node;
- }
- DaliClientRole queryRole() const
- {
- return role;
- }
- StringBuffer &getDetails(StringBuffer &buf)
- {
- StringBuffer ep;
- return buf.appendf("%16"I64F"X: %s, role=%s",CSessionState::id,node->endpoint().getUrlStr(ep).str(),queryRoleName(role));
- }
- };
- class CMapProcessToSession: private SuperHashTableOf<CProcessSessionState,INode>
- {
- CheckedCriticalSection mapprocesssect;
- void onAdd(void *) {}
- void onRemove(void *e) {} // do nothing
- unsigned getHashFromElement(const void *e) const
- {
- const CProcessSessionState &elem=*(const CProcessSessionState *)e;
- return elem.queryNode().getHash();
- }
- unsigned getHashFromFindParam(const void *fp) const
- {
- return ((INode *)fp)->getHash();
- }
- const void * getFindParam(const void *p) const
- {
- const CProcessSessionState &elem=*(const CProcessSessionState *)p;
- return (void *)&elem.queryNode();
- }
- bool matchesFindParam(const void * et, const void *fp, unsigned) const
- {
- return ((CProcessSessionState *)et)->queryNode().equals((INode *)fp);
- }
- IMPLEMENT_SUPERHASHTABLEOF_REF_FIND(CProcessSessionState,INode);
- public:
- CMapProcessToSession()
- {
- }
- ~CMapProcessToSession()
- {
- CHECKEDCRITICALBLOCK(mapprocesssect,60000);
- releaseAll();
- }
- bool add(CProcessSessionState *e)
- {
- CHECKEDCRITICALBLOCK(mapprocesssect,60000);
- if (SuperHashTableOf<CProcessSessionState,INode>::find(&e->queryNode()))
- return false;
- SuperHashTableOf<CProcessSessionState,INode>::add(*e);
- return true;
- }
- CProcessSessionState *query(INode *n)
- {
- CHECKEDCRITICALBLOCK(mapprocesssect,60000);
- return SuperHashTableOf<CProcessSessionState,INode>::find(n);
- }
-
- void remove(INode *n,ISessionManagerServer *manager)
- {
- CHECKEDCRITICALBLOCK(mapprocesssect,60000);
- CProcessSessionState *sstate = SuperHashTableOf<CProcessSessionState,INode>::find(n);
- if (sstate) {
- if (manager)
- manager->authorizeConnection(sstate->queryRole(),true);
- SuperHashTableOf<CProcessSessionState,INode>::removeExact(sstate);
- }
- }
- unsigned count()
- {
- CHECKEDCRITICALBLOCK(mapprocesssect,60000);
- return SuperHashTableOf<CProcessSessionState,INode>::count();
- }
- CProcessSessionState *next(const CProcessSessionState *s)
- {
- CHECKEDCRITICALBLOCK(mapprocesssect,60000);
- return (CProcessSessionState *)SuperHashTableOf<CProcessSessionState,INode>::next(s);
- }
- };
- enum MSessionRequestKind {
- MSR_REGISTER_PROCESS_SESSION,
- MSR_SECONDARY_REGISTER_PROCESS_SESSION,
- MSR_REGISTER_SESSION,
- MSR_SECONDARY_REGISTER_SESSION,
- MSR_LOOKUP_PROCESS_SESSION,
- MSR_STOP_SESSION,
- MSR_IMPORT_CAPABILITIES,
- MSR_LOOKUP_LDAP_PERMISSIONS,
- MSR_EXIT // TBD
- };
- class CSessionRequestServer: public Thread
- {
- bool stopped;
- ISessionManagerServer &manager;
- Semaphore acceptConnections;
- public:
- CSessionRequestServer(ISessionManagerServer &_manager)
- : Thread("Session Manager, CSessionRequestServer"), manager(_manager)
- {
- stopped = true;
- }
- int run()
- {
- ICoven &coven=queryCoven();
- CMessageHandler<CSessionRequestServer> handler("CSessionRequestServer",this,&CSessionRequestServer::processMessage);
- stopped = false;
- CMessageBuffer mb;
- while (!stopped) {
- try {
- mb.clear();
- if (coven.recv(mb,RANK_ALL,MPTAG_DALI_SESSION_REQUEST,NULL))
- handler.handleMessage(mb);
- else
- stopped = true;
- }
- catch (IException *e)
- {
- EXCLOG(e, "CDaliPublisherServer");
- e->Release();
- }
- }
- return 0;
- }
- void processMessage(CMessageBuffer &mb)
- {
- ICoven &coven=queryCoven();
- SessionId id;
- int fn;
- mb.read(fn);
- switch (fn) {
- case MSR_REGISTER_PROCESS_SESSION: {
- acceptConnections.wait();
- acceptConnections.signal();
- Owned<INode> node(deserializeINode(mb));
- Owned<INode> servernode(deserializeINode(mb)); // hopefully me, but not if forwarded
- int role=0;
- if (mb.length()-mb.getPos()>=sizeof(role)) { // a capability block present
- mb.read(role);
- if (!manager.authorizeConnection(role,false)) {
- SocketEndpoint sender = mb.getSender();
- mb.clear();
- coven.reply(mb);
- MilliSleep(100+getRandom()%1000); // Causes client to 'work' for a short time.
- Owned<INode> node = createINode(sender);
- coven.disconnect(node);
- break;
- }
- #ifdef _DEBUG
- StringBuffer eps;
- PROGLOG("Connection to %s authorized",mb.getSender().getUrlStr(eps).str());
- #endif
- }
-
- IGroup *covengrp;
- id = manager.registerClientProcess(node.get(),covengrp,(DaliClientRole)role);
- mb.clear().append(id);
- if (covengrp->rank(servernode)==RANK_NULL) { // must have been redirected
- covengrp->Release(); // no good, so just use one we know about (may use something more sophisticated later)
- INode *na = servernode.get();
- covengrp = createIGroup(1, &na);
- }
- covengrp->serialize(mb);
- covengrp->Release();
- coven.reply(mb);
- }
- break;
- case MSR_SECONDARY_REGISTER_PROCESS_SESSION: {
- mb.read(id);
- Owned<INode> node (deserializeINode(mb));
- int role;
- mb.read(role);
- manager.addProcessSession(id,node.get(),(DaliClientRole)role);
- mb.clear();
- coven.reply(mb);
- }
- break;
- case MSR_REGISTER_SESSION: {
- SecurityToken tok;
- SessionId parentid;
- mb.read(tok).read(parentid);
- SessionId id = manager.registerSession(tok,parentid);
- mb.clear().append(id);
- coven.reply(mb);
- }
- break;
- case MSR_SECONDARY_REGISTER_SESSION: {
- mb.read(id);
- manager.addSession(id);
- mb.clear();
- coven.reply(mb);
- }
- break;
- case MSR_LOOKUP_PROCESS_SESSION: {
- // looks up from node or from id
- Owned<INode> node (deserializeINode(mb));
- if (node->endpoint().isNull()&&(mb.length()-mb.getPos()>=sizeof(id))) {
- mb.read(id);
- INode *n = manager.getProcessSessionNode(id);
- if (n)
- node.setown(n);
- node->serialize(mb.clear());
- }
- else {
- id = manager.lookupProcessSession(node.get());
- mb.clear().append(id);
- }
- coven.reply(mb);
- }
- break;
- case MSR_STOP_SESSION: {
- SessionId sessid;
- bool failed;
- mb.read(sessid).read(failed);
- manager.stopSession(sessid,failed);
- mb.clear();
- coven.reply(mb);
- }
- break;
- case MSR_LOOKUP_LDAP_PERMISSIONS: {
- StringAttr key;
- StringAttr obj;
- Owned<IUserDescriptor> udesc=createUserDescriptor();
- StringAttr username;
- StringAttr passwordenc;
- mb.read(key).read(obj);
- udesc->deserialize(mb);
- #ifdef _DALIUSER_STACKTRACE
- //following debug code to be removed
- StringBuffer sb;
- udesc->getUserName(sb);
- if (0==sb.length() || !strcmpi(sb.str(), "daliuser"))
- {
- DBGLOG("UNEXPECTED USER '%s' in %s line %ld",username,__FILE__, __LINE__);
- PrintStackReport();
- }
- #endif
- unsigned auditflags = 0;
- if (mb.length()-mb.getPos()>=sizeof(auditflags))
- mb.read(auditflags);
- int err = 0;
- int ret=manager.getPermissionsLDAP(key,obj,udesc,auditflags,&err);
- mb.clear().append(ret);
- if (err)
- mb.append(err);
- coven.reply(mb);
- }
- break;
- }
- }
- void ready()
- {
- acceptConnections.signal();
- }
- void stop()
- {
- if (!stopped) {
- stopped = true;
- queryCoven().cancel(RANK_ALL, MPTAG_DALI_SESSION_REQUEST);
- }
- join();
- }
- };
- class CSessionManagerBase: public CInterface, implements ISessionManager
- {
- protected:
- CheckedCriticalSection sessmanagersect;
- public:
- IMPLEMENT_IINTERFACE;
- CSessionManagerBase()
- {
- servernotifys.kill();
- }
- virtual ~CSessionManagerBase()
- {
- }
- class CSessionSubscriptionProxy: public CInterface, implements ISubscription
- {
- ISessionNotify *sub;
- SubscriptionId id;
- MemoryAttr ma;
- SessionId sessid;
- public:
- IMPLEMENT_IINTERFACE;
- CSessionSubscriptionProxy(ISessionNotify *_sub,SessionId _sessid)
- {
- sub = LINK(_sub);
- sessid = _sessid;
- MemoryBuffer mb;
- mb.append(sessid);
- ma.set(mb.length(),mb.toByteArray());
- id = queryCoven().getUniqueId();
- }
- ~CSessionSubscriptionProxy()
- {
- sub->Release();
- }
- ISessionNotify *queryNotify()
- {
- return sub;
- }
- const MemoryAttr &queryData()
- {
- return ma;
- }
- SubscriptionId getId()
- {
- return id;
- }
- void doNotify(bool aborted)
- {
- if (aborted)
- sub->aborted(sessid);
- else
- sub->closed(sessid);
- }
- void notify(MemoryBuffer &mb)
- {
- bool aborted;
- mb.read(aborted);
- doNotify(aborted);
- }
- void abort()
- {
- //NH: TBD
- }
- bool aborted()
- {
- return false;
- }
- };
- CIArrayOf<CSessionSubscriptionProxy>servernotifys;
- SubscriptionId subscribeSession(SessionId sessid, ISessionNotify *inotify)
- {
- CSessionSubscriptionProxy *proxy;
- SubscriptionId id;
- {
- CHECKEDCRITICALBLOCK(sessmanagersect,60000);
- proxy = new CSessionSubscriptionProxy(inotify,sessid);
- id = proxy->getId();
- if (sessid==SESSID_DALI_SERVER) {
- servernotifys.append(*proxy);
- return id;
- }
- }
- querySubscriptionManager(SESSION_PUBLISHER)->add(proxy,id);
- return id;
- }
- void unsubscribeSession(SubscriptionId id)
- {
- {
- CHECKEDCRITICALBLOCK(sessmanagersect,60000);
- // check not a server subscription
- ForEachItemIn(i,servernotifys) {
- if (servernotifys.item(i).getId()==id) {
- servernotifys.remove(i);
- return;
- }
- }
- }
- querySubscriptionManager(SESSION_PUBLISHER)->remove(id);
- }
- class cNotify: public CInterface, implements ISessionNotify
- {
- public:
- IMPLEMENT_IINTERFACE;
- Semaphore sem;
- void closed(SessionId id)
- {
- //PROGLOG("Session closed %"I64F"x",id);
- sem.signal();
- }
- void aborted(SessionId id)
- {
- //PROGLOG("Session aborted %"I64F"x",id);
- sem.signal();
- }
- };
- bool sessionStopped(SessionId id, unsigned timeout)
- {
- Owned<INode> node = getProcessSessionNode(id);
- if (node.get()==NULL)
- return true;
- if (timeout==0)
- return false;
- Owned<cNotify> cnotify = new cNotify;
- querySessionManager().subscribeSession(id,cnotify);
- if (cnotify->sem.wait(timeout))
- return false;
- node.setown(getProcessSessionNode(id));
- return node.get()==NULL;
- }
- void notifyServerStopped(bool aborted)
- {
- CHECKEDCRITICALBLOCK(sessmanagersect,60000);
- // check not a server subscription
- ForEachItemIn(i,servernotifys) {
- servernotifys.item(i).doNotify(aborted);
- }
- }
- };
- class CClientSessionManager: public CSessionManagerBase, implements IConnectionMonitor
- {
- bool securitydisabled;
- public:
- IMPLEMENT_IINTERFACE;
- CClientSessionManager()
- {
- securitydisabled = false;
- }
- virtual ~CClientSessionManager()
- {
- stop();
- }
- SessionId lookupProcessSession(INode *node)
- {
- if (!node)
- return mySessionId;
- CMessageBuffer mb;
- mb.append((int)MSR_LOOKUP_PROCESS_SESSION);
- node->serialize(mb);
- if (!queryCoven().sendRecv(mb,RANK_RANDOM,MPTAG_DALI_SESSION_REQUEST,SESSIONREPLYTIMEOUT))
- return 0;
- SessionId ret;
- mb.read(ret);
- return ret;
- }
- virtual INode *getProcessSessionNode(SessionId id)
- {
- if (!id)
- return NULL;
- CMessageBuffer mb;
- mb.append((int)MSR_LOOKUP_PROCESS_SESSION);
- queryNullNode()->serialize(mb);
- mb.append(id);
- if (!queryCoven().sendRecv(mb,RANK_RANDOM,MPTAG_DALI_SESSION_REQUEST,SESSIONREPLYTIMEOUT))
- return NULL;
- Owned<INode> node = deserializeINode(mb);
- if (node->endpoint().isNull())
- return NULL;
- return node.getClear();
- }
- int getPermissionsLDAP(const char *key,const char *obj,IUserDescriptor *udesc,unsigned auditflags,int *err)
- {
- if (err)
- *err = 0;
- if (securitydisabled)
- return -1;
- if (queryDaliServerVersion().compare("1.8") < 0) {
- securitydisabled = true;
- return -1;
- }
- CMessageBuffer mb;
- mb.append((int)MSR_LOOKUP_LDAP_PERMISSIONS);
- mb.append(key).append(obj);
- #ifdef _DALIUSER_STACKTRACE
- //following debug code to be removed
- StringBuffer sb;
- udesc->getUserName(sb);
- if (0==sb.length() || !strcmpi(sb.str(), "daliuser"))
- {
- DBGLOG("UNEXPECTED USER '%s' in %s line %ld",sb.str(),__FILE__, __LINE__);
- PrintStackReport();
- }
- #endif
- udesc->serialize(mb);
- mb.append(auditflags);
- if (!queryCoven().sendRecv(mb,RANK_RANDOM,MPTAG_DALI_SESSION_REQUEST,SESSIONREPLYTIMEOUT))
- return 0;
- int ret=-1;
- if (mb.remaining()>=sizeof(ret)) {
- mb.read(ret);
- int e = 0;
- if (mb.remaining()>=sizeof(e)) {
- if (err)
- *err = e;
- else if (e)
- throw new CDaliLDAP_Exception(e);
- }
- }
- if (ret==-1)
- securitydisabled = true;
- return ret;
- }
- bool checkScopeScansLDAP()
- {
- assertex(!"checkScopeScansLDAP called on client");
- return true; // actually only used server size
- }
- unsigned getLDAPflags()
- {
- assertex(!"getLdapFlags called on client");
- return 0;
- }
- void setLDAPflags(unsigned)
- {
- assertex(!"setLdapFlags called on client");
- }
- bool authorizeConnection(DaliClientRole,bool)
- {
- return true;
- }
- SessionId startSession(SecurityToken tok, SessionId parentid)
- {
- CMessageBuffer mb;
- mb.append((int)MSR_REGISTER_SESSION).append(tok).append(parentid);
- if (!queryCoven().sendRecv(mb,RANK_RANDOM,MPTAG_DALI_SESSION_REQUEST),SESSIONREPLYTIMEOUT)
- return 0;
- SessionId ret;
- mb.read(ret);
- return ret;
- }
- void stopSession(SessionId sessid, bool failed)
- {
- if (sessid==SESSID_DALI_SERVER) {
- notifyServerStopped(failed);
- return;
- }
- CMessageBuffer mb;
- mb.append((int)MSR_STOP_SESSION).append(sessid).append(failed);
- queryCoven().sendRecv(mb,RANK_RANDOM,MPTAG_DALI_SESSION_REQUEST,SESSIONREPLYTIMEOUT);
- }
- void onClose(SocketEndpoint &ep)
- {
- CHECKEDCRITICALBLOCK(sessmanagersect,60000);
- Owned<INode> node = createINode(ep);
- if (queryCoven().inCoven(node)) {
- StringBuffer str;
- PROGLOG("Coven Session Stopping (%s)",ep.getUrlStr(str).str());
- if (queryCoven().size()==1)
- notifyServerStopped(true);
- }
- }
- void start()
- {
- addMPConnectionMonitor(this);
- }
- void stop()
- {
- }
- void ready()
- {
- removeMPConnectionMonitor(this);
- }
- StringBuffer &getClientProcessList(StringBuffer &buf)
- {
- // dummy
- return buf;
- }
- StringBuffer &getClientProcessEndpoint(SessionId,StringBuffer &buf)
- {
- // dummy
- return buf;
- }
- unsigned queryClientCount()
- {
- // dummy
- return 0;
- }
- void importCapabilities(MemoryBuffer &mb)
- {
- CMessageBuffer msg;
- msg.append((int)MSR_IMPORT_CAPABILITIES);
- msg.append(mb.length(), mb.toByteArray());
- queryCoven().sendRecv(msg,RANK_RANDOM,MPTAG_DALI_SESSION_REQUEST,SESSIONREPLYTIMEOUT);
- }
- };
- class CLdapWorkItem : public Thread
- {
- StringAttr key;
- StringAttr obj;
- Linked<IUserDescriptor> udesc;
- Linked<IDaliLdapConnection> ldapconn;
- unsigned flags;
- bool running;
- Semaphore contsem;
- Semaphore ready;
- Semaphore &threaddone;
- int ret;
- public:
- IMPLEMENT_IINTERFACE;
- CLdapWorkItem(IDaliLdapConnection *_ldapconn,Semaphore &_threaddone)
- : ldapconn(_ldapconn), threaddone(_threaddone)
- {
- running = false;
- }
- void start(const char *_key,const char *_obj,IUserDescriptor *_udesc,unsigned _flags)
- {
- key.set(_key);
- obj.set(_obj);
- udesc.set(_udesc);
- flags = _flags;
- ret = CLDAPE_ldapfailure;
- if (!running) {
- running = true;
- Thread::start();
- }
- contsem.signal();
- }
- int run()
- {
- loop {
- contsem.wait();
- if (!running)
- break;
- try {
- ret = ldapconn->getPermissions(key,obj,udesc,flags);
- }
- catch(IException *e) {
- LOG(MCoperatorError, unknownJob, e, "CLdapWorkItem");
- e->Release();
- }
- ready.signal();
- }
- threaddone.signal();
- return 0;
- }
- bool wait(unsigned timeout, int &_ret)
- {
- if (ready.wait(timeout)) {
- _ret = ret;
- return true;
- }
- _ret = 0;
- return false;
- }
- void stop()
- {
- running = false;
- contsem.signal();
- }
- static CLdapWorkItem *get(IDaliLdapConnection *_ldapconn,Semaphore &_threaddone)
- {
- if (!_threaddone.wait(1000*60*5)) {
- ERRLOG("Too many stalled LDAP threads");
- return NULL;
- }
- return new CLdapWorkItem(_ldapconn,_threaddone);
- }
- };
- class CCovenSessionManager: public CSessionManagerBase, implements ISessionManagerServer, implements ISubscriptionManager
- {
- CSessionRequestServer sessionrequestserver;
- CSessionStateTable sessionstates;
- CMapProcessToSession processlookup;
- Owned<IDaliLdapConnection> ldapconn;
- Owned<CLdapWorkItem> ldapworker;
- Semaphore ldapsig;
- atomic_t ldapwaiting;
- Semaphore workthreadsem;
- bool stopping;
- void remoteAddProcessSession(rank_t dst,SessionId id,INode *node, DaliClientRole role)
- {
- CMessageBuffer mb;
- mb.append((int)MSR_SECONDARY_REGISTER_PROCESS_SESSION).append(id);
- node->serialize(mb);
- int r = (int)role;
- mb.append(role);
- queryCoven().sendRecv(mb,dst,MPTAG_DALI_SESSION_REQUEST);
- // no fail currently
- }
- void remoteAddSession(rank_t dst,SessionId id)
- {
- CMessageBuffer mb;
- mb.append((int)MSR_SECONDARY_REGISTER_SESSION).append(id);
- queryCoven().sendRecv(mb,dst,MPTAG_DALI_SESSION_REQUEST);
- // no fail currently
- }
- public:
- IMPLEMENT_IINTERFACE;
- CCovenSessionManager()
- : sessionrequestserver(*this)
- {
- mySessionId = queryCoven().getUniqueId(); // tell others in coven TBD
- registerSubscriptionManager(SESSION_PUBLISHER,this);
- atomic_set(&ldapwaiting,0);
- workthreadsem.signal(10);
- stopping = false;
- ldapsig.signal();
- }
- ~CCovenSessionManager()
- {
- stubs.kill();
- }
- void start()
- {
- sessionrequestserver.start();
- }
- void stop()
- {
- stopping = true;
- if (!ldapsig.wait(60*1000))
- WARNLOG("LDAP stalled(1)");
- if (ldapworker) {
- ldapworker->stop();
- if (!ldapworker->join(1000))
- WARNLOG("LDAP stalled(2)");
- ldapworker.clear();
- }
- ldapconn.clear();
- removeMPConnectionMonitor(this);
- sessionrequestserver.stop();
- }
- void ready()
- {
- addMPConnectionMonitor(this);
- sessionrequestserver.ready();
- }
- void setLDAPconnection(IDaliLdapConnection *_ldapconn)
- {
- if (_ldapconn&&(_ldapconn->getLDAPflags()&DLF_ENABLED))
- ldapconn.setown(_ldapconn);
- else {
- ldapconn.clear();
- ::Release(_ldapconn);
- }
- }
- void setClientAuth(IDaliClientAuthConnection *_authconn)
- {
- }
- void addProcessSession(SessionId id,INode *client,DaliClientRole role)
- {
- StringBuffer str;
- PROGLOG("Session starting %"I64F"x (%s) : role=%s",id,client->endpoint().getUrlStr(str).str(),queryRoleName(role));
- CHECKEDCRITICALBLOCK(sessmanagersect,60000);
- CProcessSessionState *s = new CProcessSessionState(id,client,role);
- while (!sessionstates.add(s)) { // takes ownership
- WARNLOG("Dali session manager: session already registered");
- sessionstates.remove(id);
- }
- while (!processlookup.add(s)) {
- ERRLOG("Dali session manager: registerClient process session already registered");
- processlookup.remove(client,this);
- }
- }
- void addSession(SessionId id)
- {
- CHECKEDCRITICALBLOCK(sessmanagersect,60000);
- CSessionState *s = new CSessionState(id);
- while (!sessionstates.add(s)) { // takes ownership
- WARNLOG("Dali session manager: session already registered (2)");
- sessionstates.remove(id);
- }
- }
- SessionId registerSession(SecurityToken, SessionId parent)
- {
- // this is where security token should be checked
- // not in critical block (otherwise possibility of deadlock)
- ICoven &coven=queryCoven();
- SessionId id = coven.getUniqueId();
- rank_t myrank = coven.getServerRank();
- rank_t ownerrank = coven.chooseServer(id);
- // first do local
- if (myrank==ownerrank)
- addSession(id);
- else
- remoteAddSession(ownerrank,id);
- ForEachOtherNodeInGroup(r,coven.queryGroup()) {
- if (r!=ownerrank)
- remoteAddSession(r,id);
- }
- return id;
- }
- SessionId registerClientProcess(INode *client, IGroup *& retcoven, DaliClientRole role)
- {
- // not in critical block (otherwise possibility of deadlock)
- retcoven = NULL;
- ICoven &coven=queryCoven();
- SessionId id = coven.getUniqueId();
- rank_t myrank = coven.getServerRank();
- rank_t ownerrank = coven.chooseServer(id);
- // first do local
- if (myrank==ownerrank)
- addProcessSession(id,client,role);
- else
- remoteAddProcessSession(ownerrank,id,client,role);
- ForEachOtherNodeInGroup(r,coven.queryGroup()) {
- if (r!=ownerrank)
- remoteAddProcessSession(r,id,client,role);
- }
- retcoven = coven.getGroup();
- return id;
- }
- SessionId lookupProcessSession(INode *node)
- {
- if (!node)
- return mySessionId;
- CHECKEDCRITICALBLOCK(sessmanagersect,60000);
- CProcessSessionState *s= processlookup.query(node);
- return s?s->getId():0;
- }
- INode *getProcessSessionNode(SessionId id)
- {
- StringBuffer eps;
- if (SessionManager->getClientProcessEndpoint(id,eps).length()!=0)
- return createINode(eps.str());
- return NULL;
- }
- virtual int getPermissionsLDAP(const char *key,const char *obj,IUserDescriptor *udesc,unsigned flags, int *err)
- {
- if (err)
- *err = 0;
- if (!ldapconn)
- return -1;
- #ifdef _NO_LDAP
- return -1;
- #else
- if ((ldapconn->getLDAPflags()&(DLF_SAFE|DLF_ENABLED))!=(DLF_SAFE|DLF_ENABLED))
- return ldapconn->getPermissions(key,obj,udesc,flags);
- atomic_inc(&ldapwaiting);
- unsigned retries = 0;
- while (!stopping) {
- if (ldapsig.wait(1000)) {
- atomic_dec(&ldapwaiting);
- if (!ldapworker)
- ldapworker.setown(CLdapWorkItem::get(ldapconn,workthreadsem));
- if (ldapworker) {
- ldapworker->start(key,obj,udesc,flags);
- for (unsigned i=0;i<10;i++) {
- if (i)
- WARNLOG("LDAP stalled(%d) - retrying",i);
- int ret;
- if (ldapworker->wait(1000*20,ret)) {
- if (ret==CLDAPE_ldapfailure) {
- LOG(MCoperatorError, unknownJob, "LDAP - failure (returning no access for %s)",obj);
- ldapsig.signal();
- if (err)
- *err = CLDAPE_ldapfailure;
- return 0;
- }
- else {
- ldapsig.signal();
- return ret;
- }
- }
- if (atomic_read(&ldapwaiting)>10) // give up quicker if piling up
- break;
- if (i==5) { // one retry
- ldapworker->stop(); // abandon thread
- ldapworker.clear();
- ldapworker.setown(CLdapWorkItem::get(ldapconn,workthreadsem));
- if (ldapworker)
- ldapworker->start(key,obj,udesc,flags);
- }
- }
- if (ldapworker)
- ldapworker->stop();
- ldapworker.clear(); // abandon thread
- }
- LOG(MCoperatorError, unknownJob, "LDAP stalled - aborting (returning no access for %s)",obj);
- ldapsig.signal();
- if (err)
- *err = CLDAPE_getpermtimeout;
- return 0;
- }
- else {
- unsigned waiting = atomic_read(&ldapwaiting);
- static unsigned last=0;
- static unsigned lasttick=0;
- static unsigned first50=0;
- if ((waiting!=last)&&(msTick()-lasttick>1000)) {
- WARNLOG("%d threads waiting for ldap",waiting);
- last = waiting;
- lasttick = msTick();
- }
- if (waiting>50) {
- if (first50==0)
- first50 = msTick();
- else if (msTick()-first50>60*1000) {
- LOG(MCoperatorError, unknownJob, "LDAP stalled - aborting (returning 0 for %s)", obj);
- if (err)
- *err = CLDAPE_getpermtimeout;
- break;
- }
- }
- else
- first50 = 0;
- }
- }
- atomic_dec(&ldapwaiting);
- return 0;
- #endif
- }
- virtual bool checkScopeScansLDAP()
- {
- #ifdef _NO_LDAP
- return false;
- #else
- return ldapconn?ldapconn->checkScopeScans():false;
- #endif
- }
-
- virtual unsigned getLDAPflags()
- {
- #ifdef _NO_LDAP
- return 0;
- #else
- return ldapconn?ldapconn->getLDAPflags():0;
- #endif
- }
- void setLDAPflags(unsigned flags)
- {
- #ifndef _NO_LDAP
- if (ldapconn)
- ldapconn->setLDAPflags(flags);
- #endif
- }
- bool authorizeConnection(int role,bool revoke)
- {
- return true;
- }
- SessionId startSession(SecurityToken tok, SessionId parentid)
- {
- return registerSession(tok,parentid);
- }
- void setSessionDependancy(SessionId id , SessionId dependsonid )
- {
- UNIMPLEMENTED; // TBD
- }
- void clearSessionDependancy(SessionId id , SessionId dependsonid )
- {
- UNIMPLEMENTED; // TBD
- }
- protected:
- class CSessionSubscriptionStub: public CInterface
- {
- ISubscription *subs;
- SubscriptionId id;
- SessionId sessid;
- public:
- IMPLEMENT_IINTERFACE;
- CSessionSubscriptionStub(ISubscription *_subs,SubscriptionId _id) // takes ownership
- {
- subs = _subs;
- id = _id;
- const MemoryAttr &ma = subs->queryData();
- MemoryBuffer mb(ma.length(),ma.get());
- mb.read(sessid);
- }
- ~CSessionSubscriptionStub()
- {
- subs->Release();
- }
- SubscriptionId getId() { return id; }
- SessionId getSessionId() { return sessid; }
- void notify(bool abort)
- {
- MemoryBuffer mb;
- mb.append(abort);
- subs->notify(mb);
- }
- };
- CIArrayOf<CSessionSubscriptionStub> stubs;
- unsigned findsub(SubscriptionId id)
- {
- ForEachItemIn(i,stubs) {
- CSessionSubscriptionStub &stub = stubs.item(i);
- if (stub.getId()==id)
- return i;
- }
- return NotFound;
- }
- void add(ISubscription *subs,SubscriptionId id)
- {
- CSessionSubscriptionStub *nstub;
- {
- CHECKEDCRITICALBLOCK(sessmanagersect,60000);
- nstub = new CSessionSubscriptionStub(subs,id);
- if (sessionstates.query(nstub->getSessionId())||(nstub->getSessionId()==mySessionId)) {
- stubs.append(*nstub);
- return;
- }
- }
- // see if session known
- MemoryBuffer mb;
- bool abort=true;
- mb.append(abort);
- ERRLOG("Session Manager - adding unknown session ID %"I64F"x", nstub->getSessionId());
- subs->notify(mb);
- delete nstub;
- return;
- }
- void remove(SubscriptionId id)
- {
- CHECKEDCRITICALBLOCK(sessmanagersect,60000);
- unsigned i=findsub(id);
- if (i!=NotFound)
- stubs.remove(i);
- }
- void stopSession(SessionId id, bool abort)
- {
- PROGLOG("Session stopping %"I64F"x %s",id,abort?"aborted":"ok");
- CHECKEDCRITICALBLOCK(sessmanagersect,60000);
- // do in multiple stages as may remove one or more sub sussions
- loop {
- CIArrayOf<CSessionSubscriptionStub> tonotify;
- for (unsigned i=stubs.ordinality();i;) {
- CSessionSubscriptionStub &stub = stubs.item(--i);
- if (stub.getSessionId()==id) {
- stubs.remove(i, true);
- tonotify.append(stub);
- }
- }
- if (tonotify.ordinality()==0)
- break;
- CHECKEDCRITICALUNBLOCK(sessmanagersect,60000);
- ForEachItemIn(j,tonotify) {
- CSessionSubscriptionStub &stub = tonotify.item(j);
- try { stub.notify(abort); }
- catch (IException *e) { e->Release(); } // subscriber session may abort during stopSession
- }
- }
- const CSessionState *state = sessionstates.query(id);
- if (state) {
- const CProcessSessionState *pstate = QUERYINTERFACE(state,const CProcessSessionState);
- if (pstate)
- processlookup.remove(&pstate->queryNode(),this);
- sessionstates.remove(id);
- }
- }
- void onClose(SocketEndpoint &ep)
- {
- StringBuffer str;
- PROGLOG("Client closed (%s)",ep.getUrlStr(str).str());
- SessionId idtostop;
- {
- CHECKEDCRITICALBLOCK(sessmanagersect,60000);
- Owned<INode> node = createINode(ep);
- if (queryCoven().inCoven(node)) {
- StringBuffer str;
- PROGLOG("Coven Session Stopping (%s)",ep.getUrlStr(str).str());
- // more TBD here
- return;
- }
- CProcessSessionState *s= processlookup.query(node);
- if (!s)
- return;
- idtostop = s->getId();
- }
- stopSession(idtostop,true);
- }
- StringBuffer &getClientProcessList(StringBuffer &buf)
- {
- CHECKEDCRITICALBLOCK(sessmanagersect,60000);
- unsigned n = processlookup.count();
- CProcessSessionState *s=NULL;
- for (unsigned i=0;i<n;i++) {
- s=processlookup.next(s);
- if (!s)
- break;
- s->getDetails(buf).append('\n');
- }
- return buf;
- }
- StringBuffer &getClientProcessEndpoint(SessionId id,StringBuffer &buf)
- {
- CHECKEDCRITICALBLOCK(sessmanagersect,60000);
- const CSessionState *state = sessionstates.query(id);
- if (state) {
- const CProcessSessionState *pstate = QUERYINTERFACE(state,const CProcessSessionState);
- if (pstate)
- return pstate->queryNode().endpoint().getUrlStr(buf);
- }
- return buf;
- }
- unsigned queryClientCount()
- {
- CHECKEDCRITICALBLOCK(sessmanagersect,60000);
- return processlookup.count();
- }
- };
- ISessionManager &querySessionManager()
- {
- CriticalBlock block(sessionCrit);
- if (!SessionManager) {
- assertex(!isCovenActive()||!queryCoven().inCoven()); // Check not Coven server (if occurs - not initialized correctly;
- // If !coven someone is checking for dali so allow
- SessionManager = new CClientSessionManager();
-
- }
- return *SessionManager;
- }
- class CDaliSessionServer: public CInterface, public IDaliServer
- {
- public:
- IMPLEMENT_IINTERFACE;
- CDaliSessionServer()
- {
- }
- void start()
- {
- CriticalBlock block(sessionCrit);
- assertex(queryCoven().inCoven()); // must be member of coven
- CCovenSessionManager *serv = new CCovenSessionManager();
- SessionManagerServer = serv;
- SessionManager = serv;
- SessionManagerServer->start();
- }
- void suspend()
- {
- }
- void stop()
- {
- CriticalBlock block(sessionCrit);
- SessionManagerServer->stop();
- SessionManagerServer->Release();
- SessionManagerServer = NULL;
- SessionManager = NULL;
- }
- void ready()
- {
- SessionManagerServer->ready();
- }
- void nodeDown(rank_t rank)
- {
- assertex(!"TBD");
- }
- };
- IDaliServer *createDaliSessionServer()
- {
- return new CDaliSessionServer();
- }
- void setLDAPconnection(IDaliLdapConnection *ldapconn)
- {
- if (SessionManagerServer)
- SessionManagerServer->setLDAPconnection(ldapconn);
- }
- void setClientAuth(IDaliClientAuthConnection *authconn)
- {
- if (SessionManagerServer)
- SessionManagerServer->setClientAuth(authconn);
- }
- #define REG_SLEEP 5000
- bool registerClientProcess(ICommunicator *comm, IGroup *& retcoven,unsigned timeout,DaliClientRole role)
- {
- // NB doesn't use coven as not yet set up
- if (comm->queryGroup().ordinality()==0)
- return false;
- CTimeMon tm(timeout);
- retcoven = NULL;
- unsigned nextLog = 0, lastNextLog = 0;
- unsigned t=REG_SLEEP;
- unsigned remaining;
- rank_t r;
- while (!tm.timedout(&remaining)) {
- if (remaining>t)
- remaining = t;
- r = getRandom()%comm->queryGroup().ordinality();
-
- bool ok = false;
- if (remaining>10000) // MP protocol has a minimum time of 10s so if remaining < 10s use a detachable thread
- ok = comm->verifyConnection(r,remaining);
- else {
- struct cThread: public Thread
- {
- IMPLEMENT_IINTERFACE;
- Semaphore sem;
- Linked<ICommunicator> comm;
- bool ok;
- Owned<IException> exc;
- unsigned remaining;
- rank_t r;
- cThread(ICommunicator *_comm,rank_t _r,unsigned _remaining)
- : Thread ("dasess.registerClientProcess"), comm(_comm)
- {
- r = _r;
- remaining = _remaining;
- ok = false;
- }
- int run()
- {
- try {
- if (comm->verifyConnection(r,remaining))
- ok = true;
- }
- catch (IException *e)
- {
- exc.setown(e);
- }
- sem.signal();
- return 0;
- }
- } *t = new cThread(comm,r,remaining);
- t->start();
- t->sem.wait(remaining);
- ok = t->ok;
- if (t->exc.get()) {
- IException *e = t->exc.getClear();
- t->Release();
- throw e;
- }
- t->Release();
- }
- if (ok) {
- CMessageBuffer mb;
- mb.append((int)MSR_REGISTER_PROCESS_SESSION);
- queryMyNode()->serialize(mb);
- comm->queryGroup().queryNode(r).serialize(mb);
- mb.append((int)role);
- if (comm->sendRecv(mb,r,MPTAG_DALI_SESSION_REQUEST,SESSIONREPLYTIMEOUT)) {
- if (!mb.length())
- {
- // failed system capability match,
- retcoven = comm->getGroup(); // continue as if - will fail later more obscurely.
- return true;
- }
- mb.read(mySessionId);
- retcoven = deserializeIGroup(mb);
- return true;
- }
- }
- StringBuffer str;
- PROGLOG("Failed to connect to Dali Server %s.", comm->queryGroup().queryNode(r).endpoint().getUrlStr(str).str());
- if (tm.timedout())
- {
- PROGLOG("%s", str.append(" Timed out.").str());
- break;
- }
- else if (0 == nextLog)
- {
- PROGLOG("%s", str.append(" Retrying...").str());
- if ((lastNextLog * REG_SLEEP) >= 60000) // limit to a minute between logging
- nextLog = 60000 / REG_SLEEP;
- else
- nextLog = lastNextLog + 2; // wait +2 REG_SLEEP pauses before next logging
- lastNextLog = nextLog;
- }
- else
- --nextLog;
- Sleep(REG_SLEEP);
- }
- return false;
- }
- extern void stopClientProcess()
- {
- CriticalBlock block(sessionCrit);
- if (mySessionId&&SessionManager) {
- try {
- querySessionManager().stopSession(mySessionId,false);
- }
- catch (IDaliClient_Exception *e) {
- if (e->errorCode()!=DCERR_server_closed)
- throw;
- e->Release();
- }
- mySessionId = 0;
- }
- }
-
- class CProcessSessionWatchdog
- {
- };
- class CUserDescriptor: public CInterface, implements IUserDescriptor
- {
- StringAttr username;
- StringAttr passwordenc;
- public:
- IMPLEMENT_IINTERFACE;
- StringBuffer &getUserName(StringBuffer &buf)
- {
- return buf.append(username);
- }
- StringBuffer &getPassword(StringBuffer &buf)
- {
- decrypt(buf,passwordenc);
- return buf;
- }
- virtual void set(const char *name,const char *password)
- {
- username.set(name);
- StringBuffer buf;
- encrypt(buf,password);
- passwordenc.set(buf.str());
- }
- virtual void clear()
- {
- username.clear();
- passwordenc.clear();
- }
- void serialize(MemoryBuffer &mb)
- {
- mb.append(username).append(passwordenc);
- }
- void deserialize(MemoryBuffer &mb)
- {
- mb.read(username).read(passwordenc);
- }
- };
- IUserDescriptor *createUserDescriptor()
- {
- return new CUserDescriptor;
- }
- MODULE_INIT(INIT_PRIORITY_DALI_DASESS)
- {
- return true;
- }
- MODULE_EXIT()
- {
- ::Release(SessionManager);
- SessionManager = NULL;
- }
|