123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323132413251326132713281329133013311332133313341335133613371338133913401341134213431344134513461347134813491350135113521353135413551356135713581359136013611362136313641365136613671368136913701371137213731374137513761377137813791380138113821383138413851386138713881389139013911392139313941395139613971398139914001401140214031404140514061407140814091410141114121413141414151416141714181419142014211422142314241425142614271428142914301431143214331434143514361437143814391440144114421443144414451446144714481449145014511452145314541455145614571458145914601461146214631464146514661467146814691470147114721473147414751476147714781479148014811482148314841485148614871488148914901491149214931494149514961497149814991500150115021503150415051506150715081509151015111512151315141515151615171518151915201521152215231524152515261527152815291530153115321533153415351536153715381539154015411542154315441545154615471548154915501551155215531554155515561557155815591560156115621563156415651566156715681569157015711572157315741575157615771578157915801581158215831584158515861587158815891590159115921593159415951596159715981599160016011602160316041605160616071608160916101611161216131614161516161617161816191620162116221623162416251626162716281629163016311632163316341635163616371638163916401641164216431644164516461647164816491650165116521653165416551656165716581659166016611662166316641665166616671668166916701671167216731674167516761677167816791680168116821683168416851686168716881689169016911692169316941695169616971698169917001701170217031704170517061707170817091710171117121713171417151716171717181719172017211722172317241725172617271728172917301731173217331734173517361737173817391740174117421743174417451746174717481749175017511752175317541755175617571758175917601761176217631764176517661767176817691770177117721773177417751776177717781779178017811782178317841785178617871788178917901791179217931794179517961797179817991800180118021803180418051806180718081809181018111812181318141815181618171818181918201821182218231824182518261827182818291830183118321833183418351836183718381839184018411842184318441845184618471848184918501851185218531854185518561857185818591860186118621863186418651866186718681869187018711872187318741875187618771878187918801881188218831884188518861887188818891890189118921893189418951896189718981899190019011902190319041905190619071908190919101911191219131914191519161917191819191920192119221923192419251926192719281929193019311932193319341935193619371938193919401941194219431944194519461947194819491950195119521953195419551956195719581959196019611962196319641965196619671968196919701971197219731974197519761977197819791980198119821983198419851986198719881989199019911992199319941995199619971998199920002001200220032004200520062007200820092010201120122013201420152016201720182019202020212022202320242025202620272028202920302031203220332034203520362037203820392040204120422043204420452046204720482049205020512052205320542055205620572058205920602061206220632064206520662067206820692070 |
- /*##############################################################################
- HPCC SYSTEMS software Copyright (C) 2012 HPCC Systems.
- Licensed under the Apache License, Version 2.0 (the "License");
- you may not use this file except in compliance with the License.
- You may obtain a copy of the License at
- http://www.apache.org/licenses/LICENSE-2.0
- Unless required by applicable law or agreed to in writing, software
- distributed under the License is distributed on an "AS IS" BASIS,
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- See the License for the specific language governing permissions and
- limitations under the License.
- ############################################################################## */
- #define da_decl __declspec(dllexport)
- #include "platform.h"
- #include "jlib.hpp"
- #include "jfile.hpp"
- #include "jsuperhash.hpp"
- #include "jmisc.hpp"
- #include "jencrypt.hpp"
- #include "dacoven.hpp"
- #include "mpbuff.hpp"
- #include "mpcomm.hpp"
- #include "mputil.hpp"
- #include "daserver.hpp"
- #include "dasubs.ipp"
- #include "dasds.hpp"
- #include "daclient.hpp"
- #include "daldap.hpp"
- #include "dasess.hpp"
- #ifdef _MSC_VER
- #pragma warning (disable : 4355)
- #endif
- const char *queryRoleName(DaliClientRole role)
- {
- switch (role) {
- case DCR_Private: return "Private";
- case DCR_Diagnostic: return "Diagnostic";
- case DCR_ThorSlave: return "ThorSlave";
- case DCR_ThorMaster: return "ThorMaster";
- case DCR_EclServer: return "EclServer";
- case DCR_EclScheduler: return "EclScheduler";
- case DCR_EclAgent: return "EclAgent";
- case DCR_AgentExec: return "AgentExec";
- case DCR_DaliServer:return "DaliServer";
- case DCR_SashaServer: return "SashaServer";
- case DCR_Util: return "Util";
- case DCR_Dfu: return "Dfu";
- case DCR_DfuServer: return "DfuServer";
- case DCR_EspServer: return "EspServer";
- case DCR_WuClient: return "WuClient";
- case DCR_Config: return "Config";
- case DCR_Scheduler: return "Scheduler";
- case DCR_RoxyMaster: return "RoxieMaster";
- case DCR_RoxySlave: return "RoxieSlave";
- case DCR_BackupGen: return "BackupGen";
- case DCR_Other: return "Other";
- }
- return "Unknown";
- }
- interface ISessionManagerServer: implements IConnectionMonitor
- {
- virtual SessionId registerSession(SecurityToken tok,SessionId parentid) = 0;
- virtual SessionId registerClientProcess(INode *node,IGroup *&grp, DaliClientRole role) = 0;
- virtual void addProcessSession(SessionId id, INode *node, DaliClientRole role) = 0;
- virtual void addSession(SessionId id) = 0;
- virtual SessionId lookupProcessSession(INode *node) = 0;
- virtual INode *getProcessSessionNode(SessionId id) =0;
- virtual int getPermissionsLDAP(const char *key,const char *obj,IUserDescriptor *udesc,unsigned flags, int *err)=0;
- virtual bool clearPermissionsCache(IUserDescriptor *udesc) = 0;
- virtual void stopSession(SessionId sessid,bool failed) = 0;
- virtual void setClientAuth(IDaliClientAuthConnection *authconn) = 0;
- virtual void setLDAPconnection(IDaliLdapConnection *_ldapconn) = 0;
- virtual bool authorizeConnection(int role,bool revoke) = 0;
- virtual void start() = 0;
- virtual void ready() = 0;
- virtual void stop() = 0;
- virtual bool queryScopeScansEnabled(IUserDescriptor *udesc, int * err, StringBuffer &retMsg) = 0;
- virtual bool enableScopeScans(IUserDescriptor *udesc, bool enable, int * err, StringBuffer &retMsg) = 0;
- };
- static SessionId mySessionId=0;
- static ISessionManager *SessionManager=NULL;
- static ISessionManagerServer *SessionManagerServer=NULL;
- static CriticalSection sessionCrit;
- #define SESSIONREPLYTIMEOUT (3*60*1000)
- #define CLDAPE_getpermtimeout (-1)
- #define CLDAPE_ldapfailure (-2)
- class CDaliLDAP_Exception: public CInterface, implements IException
- {
- int errcode;
- public:
- CDaliLDAP_Exception(int _errcode)
- {
- errcode = _errcode;
- }
- int errorCode() const { return errcode; }
- StringBuffer & errorMessage(StringBuffer &str) const
- {
- if (errcode==0)
- return str;
- str.appendf("LDAP Exception(%d): ",errcode);
- if (errcode==CLDAPE_getpermtimeout)
- return str.append("getPermissionsLDAP - timeout to LDAP server");
- if (errcode==CLDAPE_ldapfailure)
- return str.append("getPermissionsLDAP - LDAP server failure");
- return str.append("Unknown Exception");
- }
- MessageAudience errorAudience() const { return MSGAUD_user; }
- IMPLEMENT_IINTERFACE;
- };
- class CdelayedTerminate: public Thread // slightly obfuscated stop code
- {
- byte err;
- int run()
- {
- while (getRandom()%711!=0) getRandom(); // look busy
- ERRLOG("Server fault %d",(int)err);
- while (getRandom()%7!=0) Sleep(1);
- exit(0);
- }
- public:
- CdelayedTerminate(byte _err)
- {
- err = _err;
- start();
- Release();
- Sleep(100);
- }
- };
- class CSessionState: public CInterface
- {
- protected: friend class CSessionStateTable;
- SessionId id;
- public:
- CSessionState(SessionId _id)
- {
- id = _id;
- }
- SessionId getId() const
- {
- return id;
- }
- };
- class CSessionStateTable: private SuperHashTableOf<CSessionState,SessionId>
- {
- CheckedCriticalSection sessstatesect;
- void onAdd(void *) {}
- void onRemove(void *e)
- {
- CSessionState &elem=*(CSessionState *)e;
- elem.Release();
- }
- unsigned getHashFromElement(const void *e) const
- {
- const CSessionState &elem=*(const CSessionState *)e;
- SessionId id=elem.id;
- return low(id)^(unsigned)high(id);
- }
- unsigned getHashFromFindParam(const void *fp) const
- {
- SessionId id = *(const SessionId *)fp;
- return low(id)^(unsigned)high(id);
- }
- const void * getFindParam(const void *p) const
- {
- const CSessionState &elem=*(const CSessionState *)p;
- return (void *)&elem.id;
- }
- bool matchesFindParam(const void * et, const void *fp, unsigned) const
- {
- return ((CSessionState *)et)->id==*(SessionId *)fp;
- }
- IMPLEMENT_SUPERHASHTABLEOF_REF_FIND(CSessionState,SessionId);
- public:
- CSessionStateTable()
- {
- }
- ~CSessionStateTable() {
- CHECKEDCRITICALBLOCK(sessstatesect,60000);
- releaseAll();
- }
- bool add(CSessionState *e) // takes ownership
- {
- CHECKEDCRITICALBLOCK(sessstatesect,60000);
- if (SuperHashTableOf<CSessionState,SessionId>::find(&e->id))
- return false;
- SuperHashTableOf<CSessionState,SessionId>::add(*e);
- return true;
- }
- CSessionState *query(SessionId id)
- {
- CHECKEDCRITICALBLOCK(sessstatesect,60000);
- return SuperHashTableOf<CSessionState,SessionId>::find(&id);
- }
-
- void remove(SessionId id)
- {
- CHECKEDCRITICALBLOCK(sessstatesect,60000);
- SuperHashTableOf<CSessionState,SessionId>::remove(&id);
- }
- };
- class CProcessSessionState: public CSessionState
- {
- INode *node;
- DaliClientRole role;
- UInt64Array previousSessionIds;
- public:
- CProcessSessionState(SessionId id,INode *_node,DaliClientRole _role)
- : CSessionState(id)
- {
- node = _node;
- node->Link();
- role = _role;
- }
- ~CProcessSessionState()
- {
- node->Release();
- }
- INode &queryNode() const
- {
- return *node;
- }
- DaliClientRole queryRole() const
- {
- return role;
- }
- StringBuffer &getDetails(StringBuffer &buf)
- {
- StringBuffer ep;
- return buf.appendf("%16"I64F"X: %s, role=%s",CSessionState::id,node->endpoint().getUrlStr(ep).str(),queryRoleName(role));
- }
- void addSessionIds(CProcessSessionState &other, bool prevOnly)
- {
- loop
- {
- SessionId id = other.dequeuePreviousSessionId();
- if (!id)
- break;
- previousSessionIds.append(id);
- }
- if (!prevOnly)
- previousSessionIds.append(other.getId());
- }
- SessionId dequeuePreviousSessionId()
- {
- if (!previousSessionIds.ordinality())
- return 0;
- return previousSessionIds.popGet();
- }
- unsigned previousSessionIdCount() const
- {
- return previousSessionIds.ordinality();
- }
- void removeOldSessionId(SessionId id)
- {
- if (previousSessionIds.zap(id))
- PROGLOG("Removed old sessionId (%"I64F"x) from current process state", id);
- }
- };
- class CMapProcessToSession: private SuperHashTableOf<CProcessSessionState,INode>
- {
- CheckedCriticalSection mapprocesssect;
- void onAdd(void *) {}
- void onRemove(void *e) {} // do nothing
- unsigned getHashFromElement(const void *e) const
- {
- const CProcessSessionState &elem=*(const CProcessSessionState *)e;
- return elem.queryNode().getHash();
- }
- unsigned getHashFromFindParam(const void *fp) const
- {
- return ((INode *)fp)->getHash();
- }
- const void * getFindParam(const void *p) const
- {
- const CProcessSessionState &elem=*(const CProcessSessionState *)p;
- return (void *)&elem.queryNode();
- }
- bool matchesFindParam(const void * et, const void *fp, unsigned) const
- {
- return ((CProcessSessionState *)et)->queryNode().equals((INode *)fp);
- }
- IMPLEMENT_SUPERHASHTABLEOF_REF_FIND(CProcessSessionState,INode);
- public:
- CMapProcessToSession()
- {
- }
- ~CMapProcessToSession()
- {
- CHECKEDCRITICALBLOCK(mapprocesssect,60000);
- releaseAll();
- }
- bool add(CProcessSessionState *e)
- {
- CHECKEDCRITICALBLOCK(mapprocesssect,60000);
- if (SuperHashTableOf<CProcessSessionState,INode>::find(&e->queryNode()))
- return false;
- SuperHashTableOf<CProcessSessionState,INode>::add(*e);
- return true;
- }
- void replace(CProcessSessionState *e)
- {
- CHECKEDCRITICALBLOCK(mapprocesssect,60000);
- SuperHashTableOf<CProcessSessionState,INode>::replace(*e);
- }
- CProcessSessionState *query(INode *n)
- {
- CHECKEDCRITICALBLOCK(mapprocesssect,60000);
- return SuperHashTableOf<CProcessSessionState,INode>::find(n);
- }
-
- bool remove(const CProcessSessionState *state, ISessionManagerServer *manager)
- {
- CHECKEDCRITICALBLOCK(mapprocesssect,60000);
- if (SuperHashTableOf<CProcessSessionState,INode>::removeExact((CProcessSessionState *)state))
- {
- if (manager)
- manager->authorizeConnection(state->queryRole(), true);
- return true;
- }
- return false;
- }
- unsigned count()
- {
- CHECKEDCRITICALBLOCK(mapprocesssect,60000);
- return SuperHashTableOf<CProcessSessionState,INode>::count();
- }
- CProcessSessionState *next(const CProcessSessionState *s)
- {
- CHECKEDCRITICALBLOCK(mapprocesssect,60000);
- return (CProcessSessionState *)SuperHashTableOf<CProcessSessionState,INode>::next(s);
- }
- };
- enum MSessionRequestKind {
- MSR_REGISTER_PROCESS_SESSION,
- MSR_SECONDARY_REGISTER_PROCESS_SESSION,
- MSR_REGISTER_SESSION,
- MSR_SECONDARY_REGISTER_SESSION,
- MSR_LOOKUP_PROCESS_SESSION,
- MSR_STOP_SESSION,
- MSR_IMPORT_CAPABILITIES,
- MSR_LOOKUP_LDAP_PERMISSIONS,
- MSR_CLEAR_PERMISSIONS_CACHE,
- MSR_EXIT, // TBD
- MSR_QUERY_SCOPE_SCANS_ENABLED,
- MSR_ENABLE_SCOPE_SCANS
- };
- class CQueryScopeScansEnabledReq : implements IMessageWrapper
- {
- public:
- bool enabled;
- Linked<IUserDescriptor> udesc;
- CQueryScopeScansEnabledReq(IUserDescriptor *_udesc) : udesc(_udesc) {}
- CQueryScopeScansEnabledReq() {}
- void serializeReq(CMessageBuffer &mb)
- {
- mb.append(MSR_QUERY_SCOPE_SCANS_ENABLED);
- udesc->serialize(mb);
- }
- void deserializeReq(CMessageBuffer &mb)
- {
- udesc.setown(createUserDescriptor(mb));
- }
- };
- class CEnableScopeScansReq : implements IMessageWrapper
- {
- public:
- bool doEnable;
- Linked<IUserDescriptor> udesc;
- CEnableScopeScansReq(IUserDescriptor *_udesc, bool _doEnable) : udesc(_udesc), doEnable(_doEnable) {}
- CEnableScopeScansReq() {}
- void serializeReq(CMessageBuffer &mb)
- {
- mb.append(MSR_ENABLE_SCOPE_SCANS);
- udesc->serialize(mb);
- mb.append(doEnable);
- }
- void deserializeReq(CMessageBuffer &mb)
- {
- udesc.setown(createUserDescriptor(mb));
- mb.read(doEnable);
- }
- };
- class CSessionRequestServer: public Thread
- {
- bool stopped;
- ISessionManagerServer &manager;
- Semaphore acceptConnections;
- public:
- CSessionRequestServer(ISessionManagerServer &_manager)
- : Thread("Session Manager, CSessionRequestServer"), manager(_manager)
- {
- stopped = true;
- }
- int run()
- {
- ICoven &coven=queryCoven();
- CMessageHandler<CSessionRequestServer> handler("CSessionRequestServer",this,&CSessionRequestServer::processMessage);
- stopped = false;
- CMessageBuffer mb;
- while (!stopped) {
- try {
- mb.clear();
- if (coven.recv(mb,RANK_ALL,MPTAG_DALI_SESSION_REQUEST,NULL))
- handler.handleMessage(mb);
- else
- stopped = true;
- }
- catch (IException *e)
- {
- EXCLOG(e, "CDaliPublisherServer");
- e->Release();
- }
- }
- return 0;
- }
- void processMessage(CMessageBuffer &mb)
- {
- ICoven &coven=queryCoven();
- SessionId id;
- int fn;
- mb.read(fn);
- switch (fn) {
- case MSR_REGISTER_PROCESS_SESSION: {
- acceptConnections.wait();
- acceptConnections.signal();
- Owned<INode> node(deserializeINode(mb));
- Owned<INode> servernode(deserializeINode(mb)); // hopefully me, but not if forwarded
- int role=0;
- if (mb.length()-mb.getPos()>=sizeof(role)) { // a capability block present
- mb.read(role);
- if (!manager.authorizeConnection(role,false)) {
- SocketEndpoint sender = mb.getSender();
- mb.clear();
- coven.reply(mb);
- MilliSleep(100+getRandom()%1000); // Causes client to 'work' for a short time.
- Owned<INode> node = createINode(sender);
- coven.disconnect(node);
- break;
- }
- #ifdef _DEBUG
- StringBuffer eps;
- PROGLOG("Connection to %s authorized",mb.getSender().getUrlStr(eps).str());
- #endif
- }
-
- IGroup *covengrp;
- id = manager.registerClientProcess(node.get(),covengrp,(DaliClientRole)role);
- mb.clear().append(id);
- if (covengrp->rank(servernode)==RANK_NULL) { // must have been redirected
- covengrp->Release(); // no good, so just use one we know about (may use something more sophisticated later)
- INode *na = servernode.get();
- covengrp = createIGroup(1, &na);
- }
- covengrp->serialize(mb);
- covengrp->Release();
- coven.reply(mb);
- }
- break;
- case MSR_SECONDARY_REGISTER_PROCESS_SESSION: {
- mb.read(id);
- Owned<INode> node (deserializeINode(mb));
- int role;
- mb.read(role);
- manager.addProcessSession(id,node.get(),(DaliClientRole)role);
- mb.clear();
- coven.reply(mb);
- }
- break;
- case MSR_REGISTER_SESSION: {
- SecurityToken tok;
- SessionId parentid;
- mb.read(tok).read(parentid);
- SessionId id = manager.registerSession(tok,parentid);
- mb.clear().append(id);
- coven.reply(mb);
- }
- break;
- case MSR_SECONDARY_REGISTER_SESSION: {
- mb.read(id);
- manager.addSession(id);
- mb.clear();
- coven.reply(mb);
- }
- break;
- case MSR_LOOKUP_PROCESS_SESSION: {
- // looks up from node or from id
- Owned<INode> node (deserializeINode(mb));
- if (node->endpoint().isNull()&&(mb.length()-mb.getPos()>=sizeof(id))) {
- mb.read(id);
- INode *n = manager.getProcessSessionNode(id);
- if (n)
- node.setown(n);
- node->serialize(mb.clear());
- }
- else {
- id = manager.lookupProcessSession(node.get());
- mb.clear().append(id);
- }
- coven.reply(mb);
- }
- break;
- case MSR_STOP_SESSION: {
- SessionId sessid;
- bool failed;
- mb.read(sessid).read(failed);
- manager.stopSession(sessid,failed);
- mb.clear();
- coven.reply(mb);
- }
- break;
- case MSR_LOOKUP_LDAP_PERMISSIONS: {
- StringAttr key;
- StringAttr obj;
- Owned<IUserDescriptor> udesc=createUserDescriptor();
- StringAttr username;
- StringAttr passwordenc;
- mb.read(key).read(obj);
- udesc->deserialize(mb);
- #ifdef NULL_DALIUSER_STACKTRACE
- //following debug code to be removed
- StringBuffer sb;
- udesc->getUserName(sb);
- if (0==sb.length())
- {
- DBGLOG("UNEXPECTED USER (NULL) in dasess.cpp CSessionRequestServer::processMessage() line %d", __LINE__);
- }
- #endif
- unsigned auditflags = 0;
- if (mb.length()-mb.getPos()>=sizeof(auditflags))
- mb.read(auditflags);
- int err = 0;
- int ret=manager.getPermissionsLDAP(key,obj,udesc,auditflags,&err);
- mb.clear().append(ret);
- if (err)
- mb.append(err);
- coven.reply(mb);
- }
- break;
- case MSR_CLEAR_PERMISSIONS_CACHE: {
- Owned<IUserDescriptor> udesc=createUserDescriptor();
- udesc->deserialize(mb);
- bool ok = manager.clearPermissionsCache(udesc);
- mb.append(ok);
- coven.reply(mb);
- }
- break;
- case MSR_QUERY_SCOPE_SCANS_ENABLED:{
- CQueryScopeScansEnabledReq req;
- req.deserializeReq(mb);
- int err;
- StringBuffer retMsg;
- bool enabled = manager.queryScopeScansEnabled(req.udesc, &err, retMsg);
- mb.clear().append(err);
- mb.append(enabled);
- mb.append(retMsg.str());
- if (err != 0 || retMsg.length())
- {
- StringBuffer user;
- req.udesc->getUserName(user);
- DBGLOG("Error %d querying scope scan status for %s : %s", err, user.str(), retMsg.str());
- }
- coven.reply(mb);
- }
- break;
- case MSR_ENABLE_SCOPE_SCANS:{
- CEnableScopeScansReq req;
- req.deserializeReq(mb);
- int err;
- StringBuffer retMsg;
- bool ok = manager.enableScopeScans(req.udesc, req.doEnable, &err, retMsg);
- mb.clear().append(err);
- mb.append(retMsg.str());
- if (err != 0 || retMsg.length())
- {
- StringBuffer user;
- req.udesc->getUserName(user);
- DBGLOG("Error %d %sing Scope Scan Status for %s: %s", err, req.doEnable?"Enabl":"Disabl", user.str(), retMsg.str());
- }
- coven.reply(mb);
- }
- break;
- }
- }
- void ready()
- {
- acceptConnections.signal();
- }
- void stop()
- {
- if (!stopped) {
- stopped = true;
- queryCoven().cancel(RANK_ALL, MPTAG_DALI_SESSION_REQUEST);
- }
- join();
- }
- };
- class CSessionManagerBase: public CInterface, implements ISessionManager
- {
- protected:
- CheckedCriticalSection sessmanagersect;
- public:
- IMPLEMENT_IINTERFACE;
- CSessionManagerBase()
- {
- servernotifys.kill();
- }
- virtual ~CSessionManagerBase()
- {
- }
- class CSessionSubscriptionProxy: public CInterface, implements ISubscription
- {
- ISessionNotify *sub;
- SubscriptionId id;
- MemoryAttr ma;
- SessionId sessid;
- public:
- IMPLEMENT_IINTERFACE;
- CSessionSubscriptionProxy(ISessionNotify *_sub,SessionId _sessid)
- {
- sub = LINK(_sub);
- sessid = _sessid;
- MemoryBuffer mb;
- mb.append(sessid);
- ma.set(mb.length(),mb.toByteArray());
- id = queryCoven().getUniqueId();
- }
- ~CSessionSubscriptionProxy()
- {
- sub->Release();
- }
- ISessionNotify *queryNotify()
- {
- return sub;
- }
- const MemoryAttr &queryData()
- {
- return ma;
- }
- SubscriptionId getId()
- {
- return id;
- }
- void doNotify(bool aborted)
- {
- if (aborted)
- sub->aborted(sessid);
- else
- sub->closed(sessid);
- }
- void notify(MemoryBuffer &mb)
- {
- bool aborted;
- mb.read(aborted);
- doNotify(aborted);
- }
- void abort()
- {
- //NH: TBD
- }
- bool aborted()
- {
- return false;
- }
- };
- CIArrayOf<CSessionSubscriptionProxy>servernotifys;
- SubscriptionId subscribeSession(SessionId sessid, ISessionNotify *inotify)
- {
- CSessionSubscriptionProxy *proxy;
- SubscriptionId id;
- {
- CHECKEDCRITICALBLOCK(sessmanagersect,60000);
- proxy = new CSessionSubscriptionProxy(inotify,sessid);
- id = proxy->getId();
- if (sessid==SESSID_DALI_SERVER) {
- servernotifys.append(*proxy);
- return id;
- }
- }
- querySubscriptionManager(SESSION_PUBLISHER)->add(proxy,id);
- return id;
- }
- void unsubscribeSession(SubscriptionId id)
- {
- {
- CHECKEDCRITICALBLOCK(sessmanagersect,60000);
- // check not a server subscription
- ForEachItemIn(i,servernotifys) {
- if (servernotifys.item(i).getId()==id) {
- servernotifys.remove(i);
- return;
- }
- }
- }
- querySubscriptionManager(SESSION_PUBLISHER)->remove(id);
- }
- class cNotify: public CInterface, implements ISessionNotify
- {
- public:
- IMPLEMENT_IINTERFACE;
- Semaphore sem;
- void closed(SessionId id)
- {
- //PROGLOG("Session closed %"I64F"x",id);
- sem.signal();
- }
- void aborted(SessionId id)
- {
- //PROGLOG("Session aborted %"I64F"x",id);
- sem.signal();
- }
- };
- bool sessionStopped(SessionId id, unsigned timeout)
- {
- Owned<INode> node = getProcessSessionNode(id);
- if (node.get()==NULL)
- return true;
- if (timeout==0)
- return false;
- Owned<cNotify> cnotify = new cNotify;
- querySessionManager().subscribeSession(id,cnotify);
- if (cnotify->sem.wait(timeout))
- return false;
- node.setown(getProcessSessionNode(id));
- return node.get()==NULL;
- }
- void notifyServerStopped(bool aborted)
- {
- CHECKEDCRITICALBLOCK(sessmanagersect,60000);
- // check not a server subscription
- ForEachItemIn(i,servernotifys) {
- servernotifys.item(i).doNotify(aborted);
- }
- }
- };
- class CClientSessionManager: public CSessionManagerBase, implements IConnectionMonitor
- {
- bool securitydisabled;
- public:
- IMPLEMENT_IINTERFACE;
- CClientSessionManager()
- {
- securitydisabled = false;
- }
- virtual ~CClientSessionManager()
- {
- stop();
- }
- SessionId lookupProcessSession(INode *node)
- {
- if (!node)
- return mySessionId;
- CMessageBuffer mb;
- mb.append((int)MSR_LOOKUP_PROCESS_SESSION);
- node->serialize(mb);
- if (!queryCoven().sendRecv(mb,RANK_RANDOM,MPTAG_DALI_SESSION_REQUEST,SESSIONREPLYTIMEOUT))
- return 0;
- SessionId ret;
- mb.read(ret);
- return ret;
- }
- virtual INode *getProcessSessionNode(SessionId id)
- {
- if (!id)
- return NULL;
- CMessageBuffer mb;
- mb.append((int)MSR_LOOKUP_PROCESS_SESSION);
- queryNullNode()->serialize(mb);
- mb.append(id);
- if (!queryCoven().sendRecv(mb,RANK_RANDOM,MPTAG_DALI_SESSION_REQUEST,SESSIONREPLYTIMEOUT))
- return NULL;
- Owned<INode> node = deserializeINode(mb);
- if (node->endpoint().isNull())
- return NULL;
- return node.getClear();
- }
- int getPermissionsLDAP(const char *key,const char *obj,IUserDescriptor *udesc,unsigned auditflags,int *err)
- {
- if (err)
- *err = 0;
- if (securitydisabled)
- return -1;
- if (queryDaliServerVersion().compare("1.8") < 0) {
- securitydisabled = true;
- return -1;
- }
- CMessageBuffer mb;
- mb.append((int)MSR_LOOKUP_LDAP_PERMISSIONS);
- mb.append(key).append(obj);
- #ifdef NULL_DALIUSER_STACKTRACE
- //following debug code to be removed
- StringBuffer sb;
- if (udesc)
- udesc->getUserName(sb);
- if (0==sb.length())
- {
- DBGLOG("UNEXPECTED USER (NULL) in dasess.cpp getPermissionsLDAP() line %d",__LINE__);
- PrintStackReport();
- }
- #endif
- udesc->serialize(mb);
- mb.append(auditflags);
- if (!queryCoven().sendRecv(mb,RANK_RANDOM,MPTAG_DALI_SESSION_REQUEST,SESSIONREPLYTIMEOUT))
- return 0;
- int ret=-1;
- if (mb.remaining()>=sizeof(ret)) {
- mb.read(ret);
- if (mb.remaining()>=sizeof(int)) {
- int e = 0;
- mb.read(e);
- if (err)
- *err = e;
- else if (e)
- throw new CDaliLDAP_Exception(e);
- }
- }
- if (ret==-1)
- securitydisabled = true;
- return ret;
- }
- bool clearPermissionsCache(IUserDescriptor *udesc)
- {
- if (securitydisabled)
- return true;
- if (queryDaliServerVersion().compare("1.8") < 0) {
- securitydisabled = true;
- return true;
- }
- CMessageBuffer mb;
- mb.append((int)MSR_CLEAR_PERMISSIONS_CACHE);
- udesc->serialize(mb);
- return queryCoven().sendRecv(mb,RANK_RANDOM,MPTAG_DALI_SESSION_REQUEST,SESSIONREPLYTIMEOUT);
- }
- bool queryScopeScansEnabled(IUserDescriptor *udesc, int * err, StringBuffer &retMsg)
- {
- if (queryDaliServerVersion().compare("3.10") < 0)
- {
- *err = -1;
- StringBuffer ver;
- queryDaliServerVersion().toString(ver);
- retMsg.appendf("Scope Scan status feature requires Dali V3.10 or newer, current Dali version %s",ver.str());
- return false;
- }
- if (securitydisabled)
- {
- *err = -1;
- retMsg.append("Security not enabled");
- return false;
- }
- if (queryDaliServerVersion().compare("1.8") < 0) {
- *err = -1;
- retMsg.append("Security not enabled");
- securitydisabled = true;
- return false;
- }
- CMessageBuffer mb;
- CQueryScopeScansEnabledReq req(udesc);
- req.serializeReq(mb);
- if (!queryCoven().sendRecv(mb,RANK_RANDOM,MPTAG_DALI_SESSION_REQUEST,SESSIONREPLYTIMEOUT))
- {
- *err = -1;
- retMsg.append("DALI Send/Recv error");
- return false;
- }
- int rc;
- bool enabled;
- mb.read(rc).read(enabled).read(retMsg);
- *err = rc;
- return enabled;
- }
- bool enableScopeScans(IUserDescriptor *udesc, bool enable, int * err, StringBuffer &retMsg)
- {
- if (queryDaliServerVersion().compare("3.10") < 0)
- {
- *err = -1;
- StringBuffer ver;
- queryDaliServerVersion().toString(ver);
- retMsg.appendf("Scope Scan enable/disable feature requires Dali V3.10 or newer, current Dali version %s",ver.str());
- return false;
- }
- if (securitydisabled)
- {
- *err = -1;
- retMsg.append("Security not enabled");
- return false;
- }
- if (queryDaliServerVersion().compare("1.8") < 0) {
- *err = -1;
- retMsg.append("Security not enabled");
- securitydisabled = true;
- return false;
- }
- CMessageBuffer mb;
- CEnableScopeScansReq req(udesc,enable);
- req.serializeReq(mb);
- if (!queryCoven().sendRecv(mb,RANK_RANDOM,MPTAG_DALI_SESSION_REQUEST,SESSIONREPLYTIMEOUT))
- {
- *err = -1;
- retMsg.append("DALI Send/Recv error");
- return false;
- }
- int rc;
- mb.read(rc).read(retMsg);
- *err = rc;
- if (rc == 0)
- {
- StringBuffer user;
- udesc->getUserName(user);
- DBGLOG("Scope Scans %sabled by %s",enable ? "En" : "Dis", user.str());
- }
- return rc == 0;
- }
- bool checkScopeScansLDAP()
- {
- assertex(!"checkScopeScansLDAP called on client");
- return true; // actually only used server size
- }
- unsigned getLDAPflags()
- {
- assertex(!"getLdapFlags called on client");
- return 0;
- }
- void setLDAPflags(unsigned)
- {
- assertex(!"setLdapFlags called on client");
- }
- bool authorizeConnection(DaliClientRole,bool)
- {
- return true;
- }
- SessionId startSession(SecurityToken tok, SessionId parentid)
- {
- CMessageBuffer mb;
- mb.append((int)MSR_REGISTER_SESSION).append(tok).append(parentid);
- if (!queryCoven().sendRecv(mb,RANK_RANDOM,MPTAG_DALI_SESSION_REQUEST,SESSIONREPLYTIMEOUT))
- return 0;
- SessionId ret;
- mb.read(ret);
- return ret;
- }
- void stopSession(SessionId sessid, bool failed)
- {
- if (sessid==SESSID_DALI_SERVER) {
- notifyServerStopped(failed);
- return;
- }
- CMessageBuffer mb;
- mb.append((int)MSR_STOP_SESSION).append(sessid).append(failed);
- queryCoven().sendRecv(mb,RANK_RANDOM,MPTAG_DALI_SESSION_REQUEST,SESSIONREPLYTIMEOUT);
- }
- void onClose(SocketEndpoint &ep)
- {
- CHECKEDCRITICALBLOCK(sessmanagersect,60000);
- Owned<INode> node = createINode(ep);
- if (queryCoven().inCoven(node)) {
- StringBuffer str;
- PROGLOG("Coven Session Stopping (%s)",ep.getUrlStr(str).str());
- if (queryCoven().size()==1)
- notifyServerStopped(true);
- }
- }
- void start()
- {
- addMPConnectionMonitor(this);
- }
- void stop()
- {
- }
- void ready()
- {
- removeMPConnectionMonitor(this);
- }
- StringBuffer &getClientProcessList(StringBuffer &buf)
- {
- // dummy
- return buf;
- }
- StringBuffer &getClientProcessEndpoint(SessionId,StringBuffer &buf)
- {
- // dummy
- return buf;
- }
- unsigned queryClientCount()
- {
- // dummy
- return 0;
- }
- void importCapabilities(MemoryBuffer &mb)
- {
- CMessageBuffer msg;
- msg.append((int)MSR_IMPORT_CAPABILITIES);
- msg.append(mb.length(), mb.toByteArray());
- queryCoven().sendRecv(msg,RANK_RANDOM,MPTAG_DALI_SESSION_REQUEST,SESSIONREPLYTIMEOUT);
- }
- };
- class CLdapWorkItem : public Thread
- {
- StringAttr key;
- StringAttr obj;
- Linked<IUserDescriptor> udesc;
- Linked<IDaliLdapConnection> ldapconn;
- unsigned flags;
- bool running;
- Semaphore contsem;
- Semaphore ready;
- Semaphore &threaddone;
- int ret;
- public:
- IMPLEMENT_IINTERFACE;
- CLdapWorkItem(IDaliLdapConnection *_ldapconn,Semaphore &_threaddone)
- : ldapconn(_ldapconn), threaddone(_threaddone)
- {
- running = false;
- }
- void start(const char *_key,const char *_obj,IUserDescriptor *_udesc,unsigned _flags)
- {
- key.set(_key);
- obj.set(_obj);
- #ifdef NULL_DALIUSER_STACKTRACE
- StringBuffer sb;
- if (_udesc)
- _udesc->getUserName(sb);
- if (sb.length()==0)
- {
- DBGLOG("UNEXPECTED USER (NULL) in dasess.cpp CLdapWorkItem::start() line %d",__LINE__);
- PrintStackReport();
- }
- #endif
- udesc.set(_udesc);
- flags = _flags;
- ret = CLDAPE_ldapfailure;
- if (!running) {
- running = true;
- Thread::start();
- }
- contsem.signal();
- }
- int run()
- {
- loop {
- contsem.wait();
- if (!running)
- break;
- try {
- ret = ldapconn->getPermissions(key,obj,udesc,flags);
- }
- catch(IException *e) {
- LOG(MCoperatorError, unknownJob, e, "CLdapWorkItem");
- e->Release();
- }
- ready.signal();
- }
- threaddone.signal();
- return 0;
- }
- bool wait(unsigned timeout, int &_ret)
- {
- if (ready.wait(timeout)) {
- _ret = ret;
- return true;
- }
- _ret = 0;
- return false;
- }
- void stop()
- {
- running = false;
- contsem.signal();
- }
- static CLdapWorkItem *get(IDaliLdapConnection *_ldapconn,Semaphore &_threaddone)
- {
- if (!_threaddone.wait(1000*60*5)) {
- ERRLOG("Too many stalled LDAP threads");
- return NULL;
- }
- return new CLdapWorkItem(_ldapconn,_threaddone);
- }
- };
- class CCovenSessionManager: public CSessionManagerBase, implements ISessionManagerServer, implements ISubscriptionManager
- {
- CSessionRequestServer sessionrequestserver;
- CSessionStateTable sessionstates;
- CMapProcessToSession processlookup;
- Owned<IDaliLdapConnection> ldapconn;
- Owned<CLdapWorkItem> ldapworker;
- Semaphore ldapsig;
- atomic_t ldapwaiting;
- Semaphore workthreadsem;
- bool stopping;
- void remoteAddProcessSession(rank_t dst,SessionId id,INode *node, DaliClientRole role)
- {
- CMessageBuffer mb;
- mb.append((int)MSR_SECONDARY_REGISTER_PROCESS_SESSION).append(id);
- node->serialize(mb);
- int r = (int)role;
- mb.append(role);
- queryCoven().sendRecv(mb,dst,MPTAG_DALI_SESSION_REQUEST);
- // no fail currently
- }
- void remoteAddSession(rank_t dst,SessionId id)
- {
- CMessageBuffer mb;
- mb.append((int)MSR_SECONDARY_REGISTER_SESSION).append(id);
- queryCoven().sendRecv(mb,dst,MPTAG_DALI_SESSION_REQUEST);
- // no fail currently
- }
- public:
- IMPLEMENT_IINTERFACE;
- CCovenSessionManager()
- : sessionrequestserver(*this)
- {
- mySessionId = queryCoven().getUniqueId(); // tell others in coven TBD
- registerSubscriptionManager(SESSION_PUBLISHER,this);
- atomic_set(&ldapwaiting,0);
- workthreadsem.signal(10);
- stopping = false;
- ldapsig.signal();
- }
- ~CCovenSessionManager()
- {
- stubTable.kill();
- }
- void start()
- {
- sessionrequestserver.start();
- }
- void stop()
- {
- stopping = true;
- if (!ldapsig.wait(60*1000))
- WARNLOG("LDAP stalled(1)");
- if (ldapworker) {
- ldapworker->stop();
- if (!ldapworker->join(1000))
- WARNLOG("LDAP stalled(2)");
- ldapworker.clear();
- }
- ldapconn.clear();
- removeMPConnectionMonitor(this);
- sessionrequestserver.stop();
- }
- void ready()
- {
- addMPConnectionMonitor(this);
- sessionrequestserver.ready();
- }
- void setLDAPconnection(IDaliLdapConnection *_ldapconn)
- {
- if (_ldapconn&&(_ldapconn->getLDAPflags()&DLF_ENABLED))
- ldapconn.setown(_ldapconn);
- else {
- ldapconn.clear();
- ::Release(_ldapconn);
- }
- }
- void setClientAuth(IDaliClientAuthConnection *_authconn)
- {
- }
- void addProcessSession(SessionId id,INode *client,DaliClientRole role)
- {
- StringBuffer str;
- PROGLOG("Session starting %"I64F"x (%s) : role=%s",id,client->endpoint().getUrlStr(str).str(),queryRoleName(role));
- CHECKEDCRITICALBLOCK(sessmanagersect,60000);
- CProcessSessionState *s = new CProcessSessionState(id,client,role);
- while (!sessionstates.add(s)) // takes ownership
- {
- WARNLOG("Dali session manager: session already registered");
- sessionstates.remove(id);
- }
- while (!processlookup.add(s))
- {
- /* There's existing ip:port match (client) in process table..
- * Old may be in process of closing or about to, but new has beaten the onClose() to it..
- * Track old sessions in new CProcessSessionState instance, so that in-process or upcoming onClose()/stopSession() can find them
- */
- CProcessSessionState *previousState = processlookup.query(client);
- dbgassertex(previousState); // Must be there, it's reason add() failed
- SessionId oldSessionId = previousState->getId();
- s->addSessionIds(*previousState, false); // merges sessions from previous process state into new one that replaces it
- WARNLOG("Dali session manager: registerClient process session already registered, old (%"I64F"x) replaced", oldSessionId);
- processlookup.remove(previousState, this);
- }
- }
- void addSession(SessionId id)
- {
- CHECKEDCRITICALBLOCK(sessmanagersect,60000);
- CSessionState *s = new CSessionState(id);
- while (!sessionstates.add(s)) { // takes ownership
- WARNLOG("Dali session manager: session already registered (2)");
- sessionstates.remove(id);
- }
- }
- SessionId registerSession(SecurityToken, SessionId parent)
- {
- // this is where security token should be checked
- // not in critical block (otherwise possibility of deadlock)
- ICoven &coven=queryCoven();
- SessionId id = coven.getUniqueId();
- rank_t myrank = coven.getServerRank();
- rank_t ownerrank = coven.chooseServer(id);
- // first do local
- if (myrank==ownerrank)
- addSession(id);
- else
- remoteAddSession(ownerrank,id);
- ForEachOtherNodeInGroup(r,coven.queryGroup()) {
- if (r!=ownerrank)
- remoteAddSession(r,id);
- }
- return id;
- }
- SessionId registerClientProcess(INode *client, IGroup *& retcoven, DaliClientRole role)
- {
- // not in critical block (otherwise possibility of deadlock)
- retcoven = NULL;
- ICoven &coven=queryCoven();
- SessionId id = coven.getUniqueId();
- rank_t myrank = coven.getServerRank();
- rank_t ownerrank = coven.chooseServer(id);
- // first do local
- if (myrank==ownerrank)
- addProcessSession(id,client,role);
- else
- remoteAddProcessSession(ownerrank,id,client,role);
- ForEachOtherNodeInGroup(r,coven.queryGroup()) {
- if (r!=ownerrank)
- remoteAddProcessSession(r,id,client,role);
- }
- retcoven = coven.getGroup();
- return id;
- }
- SessionId lookupProcessSession(INode *node)
- {
- if (!node)
- return mySessionId;
- CHECKEDCRITICALBLOCK(sessmanagersect,60000);
- CProcessSessionState *s= processlookup.query(node);
- return s?s->getId():0;
- }
- INode *getProcessSessionNode(SessionId id)
- {
- StringBuffer eps;
- if (SessionManager->getClientProcessEndpoint(id,eps).length()!=0)
- return createINode(eps.str());
- return NULL;
- }
- virtual int getPermissionsLDAP(const char *key,const char *obj,IUserDescriptor *udesc,unsigned flags, int *err)
- {
- if (err)
- *err = 0;
- if (!ldapconn)
- return -1;
- #ifdef _NO_LDAP
- return -1;
- #else
- #ifdef NULL_DALIUSER_STACKTRACE
- StringBuffer sb;
- if (udesc)
- udesc->getUserName(sb);
- if (sb.length()==0)
- {
- DBGLOG("UNEXPECTED USER (NULL) in dasess.cpp CCovenSessionManager::getPermissionsLDAP() line %d",__LINE__);
- PrintStackReport();
- }
- #endif
- if ((ldapconn->getLDAPflags()&(DLF_SAFE|DLF_ENABLED))!=(DLF_SAFE|DLF_ENABLED))
- return ldapconn->getPermissions(key,obj,udesc,flags);
- atomic_inc(&ldapwaiting);
- unsigned retries = 0;
- while (!stopping) {
- if (ldapsig.wait(1000)) {
- atomic_dec(&ldapwaiting);
- if (!ldapworker)
- ldapworker.setown(CLdapWorkItem::get(ldapconn,workthreadsem));
- if (ldapworker) {
- ldapworker->start(key,obj,udesc,flags);
- for (unsigned i=0;i<10;i++) {
- if (i)
- WARNLOG("LDAP stalled(%d) - retrying",i);
- int ret;
- if (ldapworker->wait(1000*20,ret)) {
- if (ret==CLDAPE_ldapfailure) {
- LOG(MCoperatorError, unknownJob, "LDAP - failure (returning no access for %s)",obj);
- ldapsig.signal();
- if (err)
- *err = CLDAPE_ldapfailure;
- return 0;
- }
- else {
- ldapsig.signal();
- return ret;
- }
- }
- if (atomic_read(&ldapwaiting)>10) // give up quicker if piling up
- break;
- if (i==5) { // one retry
- ldapworker->stop(); // abandon thread
- ldapworker.clear();
- ldapworker.setown(CLdapWorkItem::get(ldapconn,workthreadsem));
- if (ldapworker)
- ldapworker->start(key,obj,udesc,flags);
- }
- }
- if (ldapworker)
- ldapworker->stop();
- ldapworker.clear(); // abandon thread
- }
- LOG(MCoperatorError, unknownJob, "LDAP stalled - aborting (returning no access for %s)",obj);
- ldapsig.signal();
- if (err)
- *err = CLDAPE_getpermtimeout;
- return 0;
- }
- else {
- unsigned waiting = atomic_read(&ldapwaiting);
- static unsigned last=0;
- static unsigned lasttick=0;
- static unsigned first50=0;
- if ((waiting!=last)&&(msTick()-lasttick>1000)) {
- WARNLOG("%d threads waiting for ldap",waiting);
- last = waiting;
- lasttick = msTick();
- }
- if (waiting>50) {
- if (first50==0)
- first50 = msTick();
- else if (msTick()-first50>60*1000) {
- LOG(MCoperatorError, unknownJob, "LDAP stalled - aborting (returning 0 for %s)", obj);
- if (err)
- *err = CLDAPE_getpermtimeout;
- break;
- }
- }
- else
- first50 = 0;
- }
- }
- atomic_dec(&ldapwaiting);
- return 0;
- #endif
- }
- virtual bool clearPermissionsCache(IUserDescriptor *udesc)
- {
- #ifdef _NO_LDAP
- bool ok = true;
- #else
- bool ok = true;
- if (ldapconn->getLDAPflags() & DLF_ENABLED)
- ok = ldapconn->clearPermissionsCache(udesc);
- #endif
- return ok;
- }
- virtual bool queryScopeScansEnabled(IUserDescriptor *udesc, int * err, StringBuffer &retMsg)
- {
- #ifdef _NO_LDAP
- *err = -1;
- retMsg.append("LDAP not enabled");
- return false;
- #else
- *err = 0;
- return checkScopeScansLDAP();
- #endif
- }
- virtual bool enableScopeScans(IUserDescriptor *udesc, bool enable, int * err, StringBuffer &retMsg)
- {
- #ifdef _NO_LDAP
- *err = -1;
- retMsg.append("LDAP not supporteded");
- return false;
- #else
- if (!ldapconn)
- {
- *err = -1;
- retMsg.append("LDAP not connected");
- return false;
- }
- return ldapconn->enableScopeScans(udesc, enable, err);
- #endif
- }
- virtual bool checkScopeScansLDAP()
- {
- #ifdef _NO_LDAP
- return false;
- #else
- return ldapconn?ldapconn->checkScopeScans():false;
- #endif
- }
-
- virtual unsigned getLDAPflags()
- {
- #ifdef _NO_LDAP
- return 0;
- #else
- return ldapconn?ldapconn->getLDAPflags():0;
- #endif
- }
- void setLDAPflags(unsigned flags)
- {
- #ifndef _NO_LDAP
- if (ldapconn)
- ldapconn->setLDAPflags(flags);
- #endif
- }
- bool authorizeConnection(int role,bool revoke)
- {
- return true;
- }
- SessionId startSession(SecurityToken tok, SessionId parentid)
- {
- return registerSession(tok,parentid);
- }
- void setSessionDependancy(SessionId id , SessionId dependsonid )
- {
- UNIMPLEMENTED; // TBD
- }
- void clearSessionDependancy(SessionId id , SessionId dependsonid )
- {
- UNIMPLEMENTED; // TBD
- }
- protected:
- class CSessionSubscriptionStub: public CInterface
- {
- ISubscription *subs;
- SubscriptionId id;
- SessionId sessid;
- public:
- IMPLEMENT_IINTERFACE;
- CSessionSubscriptionStub(ISubscription *_subs,SubscriptionId _id) // takes ownership
- {
- subs = _subs;
- id = _id;
- const MemoryAttr &ma = subs->queryData();
- MemoryBuffer mb(ma.length(),ma.get());
- mb.read(sessid);
- }
- ~CSessionSubscriptionStub()
- {
- subs->Release();
- }
- SubscriptionId getId() { return id; }
- SessionId getSessionId() { return sessid; }
- void notify(bool abort)
- {
- MemoryBuffer mb;
- mb.append(abort);
- subs->notify(mb);
- }
- const void *queryFindParam() const { return &id; }
- };
- OwningSimpleHashTableOf<CSessionSubscriptionStub, SubscriptionId> stubTable;
- void add(ISubscription *subs,SubscriptionId id)
- {
- CSessionSubscriptionStub *nstub;
- {
- CHECKEDCRITICALBLOCK(sessmanagersect,60000);
- nstub = new CSessionSubscriptionStub(subs,id);
- if (sessionstates.query(nstub->getSessionId())||(nstub->getSessionId()==mySessionId))
- {
- stubTable.replace(*nstub);
- return;
- }
- }
- // see if session known
- MemoryBuffer mb;
- bool abort=true;
- mb.append(abort);
- ERRLOG("Session Manager - adding unknown session ID %"I64F"x", nstub->getSessionId());
- subs->notify(mb);
- delete nstub;
- return;
- }
- void remove(SubscriptionId id)
- {
- CHECKEDCRITICALBLOCK(sessmanagersect,60000);
- stubTable.remove(&id);
- }
- void stopSession(SessionId id, bool abort)
- {
- PROGLOG("Session stopping %"I64F"x %s",id,abort?"aborted":"ok");
- CHECKEDCRITICALBLOCK(sessmanagersect,60000);
- // do in multiple stages as may remove one or more sub sessions
- loop
- {
- CIArrayOf<CSessionSubscriptionStub> tonotify;
- SuperHashIteratorOf<CSessionSubscriptionStub> iter(stubTable);
- ForEach(iter)
- {
- CSessionSubscriptionStub &stub = iter.query();
- if (stub.getSessionId()==id)
- tonotify.append(*LINK(&stub));
- }
- if (tonotify.ordinality()==0)
- break;
- ForEachItemIn(j,tonotify)
- {
- CSessionSubscriptionStub &stub = tonotify.item(j);
- stubTable.removeExact(&stub);
- }
- CHECKEDCRITICALUNBLOCK(sessmanagersect,60000);
- ForEachItemIn(j2,tonotify)
- {
- CSessionSubscriptionStub &stub = tonotify.item(j2);
- try { stub.notify(abort); }
- catch (IException *e) { e->Release(); } // subscriber session may abort during stopSession
- }
- tonotify.kill(); // clear whilst sessmanagersect unblocked, as subs may query session manager.
- }
- const CSessionState *state = sessionstates.query(id);
- if (state)
- {
- const CProcessSessionState *pState = QUERYINTERFACE(state, const CProcessSessionState);
- if (pState)
- {
- CProcessSessionState *cState = processlookup.query(&pState->queryNode()); // get current
- if (pState == cState) // so is current one.
- {
- /* This is reinstating a previous CProcessSessionState for this node (if there is one),
- * that has not yet stopped, and adding any other pending process states to the CProcessSessionState
- * being reinstated.
- */
- SessionId prevId = cState->dequeuePreviousSessionId();
- if (prevId)
- {
- PROGLOG("Session (%"I64F"x) in stopSession, detected %d pending previous states, reinstating session (%"I64F"x) as current", id, cState->previousSessionIdCount(), prevId);
- CSessionState *prevSessionState = sessionstates.query(prevId);
- dbgassertex(prevSessionState); // must be there
- CProcessSessionState *prevProcessState = QUERYINTERFACE(prevSessionState, CProcessSessionState);
- dbgassertex(prevProcessState);
- /* NB: prevProcessState's have 0 entries in their previousSessionIds, since they were merged at replacement time
- * in addProcessSession()
- */
- /* add in any remaining to-be-stopped process sessions from current that's stopping into this previous state
- * that's being reinstated, so will be picked up on next onClose()/stopSession()
- */
- prevProcessState->addSessionIds(*cState, true);
- processlookup.replace(prevProcessState);
- }
- else
- processlookup.remove(pState, this);
- }
- else // Here because in stopSession for an previous process state, that has been replaced (in addProcessSession)
- {
- if (processlookup.remove(pState, this))
- {
- // Don't think possible to be here, if not current must have replaced afaics
- PROGLOG("Session (%"I64F"x) in stopSession, old process session removed", id);
- }
- else
- PROGLOG("Session (%"I64F"x) in stopSession, old process session was already removed", id); // because replaced
- if (cState)
- {
- PROGLOG("Session (%"I64F"x) was replaced, ensuring removed from new process state", id);
- cState->removeOldSessionId(id); // If already replaced, then must ensure no longer tracked by new
- }
- }
- }
- sessionstates.remove(id);
- }
- }
- void onClose(SocketEndpoint &ep)
- {
- StringBuffer clientStr;
- PROGLOG("Client closed (%s)", ep.getUrlStr(clientStr).str());
- SessionId idtostop;
- {
- CHECKEDCRITICALBLOCK(sessmanagersect,60000);
- Owned<INode> node = createINode(ep);
- if (queryCoven().inCoven(node))
- {
- PROGLOG("Coven Session Stopping (%s)", clientStr.str());
- // more TBD here
- return;
- }
- CProcessSessionState *s = processlookup.query(node);
- if (!s)
- return;
- idtostop = s->dequeuePreviousSessionId();
- if (idtostop)
- {
- PROGLOG("Previous sessionId (%"I64F"x) for %s was replaced by (%"I64F"x), stopping old session now", idtostop, clientStr.str(), s->getId());
- unsigned c = s->previousSessionIdCount();
- if (c) // very unlikely, but could be >1, trace for info.
- PROGLOG("%d old sessions pending closure", c);
- }
- else
- idtostop = s->getId();
- }
- stopSession(idtostop,true);
- }
- StringBuffer &getClientProcessList(StringBuffer &buf)
- {
- CHECKEDCRITICALBLOCK(sessmanagersect,60000);
- unsigned n = processlookup.count();
- CProcessSessionState *s=NULL;
- for (unsigned i=0;i<n;i++) {
- s=processlookup.next(s);
- if (!s)
- break;
- s->getDetails(buf).append('\n');
- }
- return buf;
- }
- StringBuffer &getClientProcessEndpoint(SessionId id,StringBuffer &buf)
- {
- CHECKEDCRITICALBLOCK(sessmanagersect,60000);
- const CSessionState *state = sessionstates.query(id);
- if (state) {
- const CProcessSessionState *pstate = QUERYINTERFACE(state,const CProcessSessionState);
- if (pstate)
- return pstate->queryNode().endpoint().getUrlStr(buf);
- }
- return buf;
- }
- unsigned queryClientCount()
- {
- CHECKEDCRITICALBLOCK(sessmanagersect,60000);
- return processlookup.count();
- }
- };
- ISessionManager &querySessionManager()
- {
- CriticalBlock block(sessionCrit);
- if (!SessionManager) {
- assertex(!isCovenActive()||!queryCoven().inCoven()); // Check not Coven server (if occurs - not initialized correctly;
- // If !coven someone is checking for dali so allow
- SessionManager = new CClientSessionManager();
-
- }
- return *SessionManager;
- }
- class CDaliSessionServer: public CInterface, public IDaliServer
- {
- public:
- IMPLEMENT_IINTERFACE;
- CDaliSessionServer()
- {
- }
- void start()
- {
- CriticalBlock block(sessionCrit);
- assertex(queryCoven().inCoven()); // must be member of coven
- CCovenSessionManager *serv = new CCovenSessionManager();
- SessionManagerServer = serv;
- SessionManager = serv;
- SessionManagerServer->start();
- }
- void suspend()
- {
- }
- void stop()
- {
- CriticalBlock block(sessionCrit);
- SessionManagerServer->stop();
- SessionManagerServer->Release();
- SessionManagerServer = NULL;
- SessionManager = NULL;
- }
- void ready()
- {
- SessionManagerServer->ready();
- }
- void nodeDown(rank_t rank)
- {
- assertex(!"TBD");
- }
- };
- IDaliServer *createDaliSessionServer()
- {
- return new CDaliSessionServer();
- }
- void setLDAPconnection(IDaliLdapConnection *ldapconn)
- {
- if (SessionManagerServer)
- SessionManagerServer->setLDAPconnection(ldapconn);
- }
- void setClientAuth(IDaliClientAuthConnection *authconn)
- {
- if (SessionManagerServer)
- SessionManagerServer->setClientAuth(authconn);
- }
- #define REG_SLEEP 5000
- bool registerClientProcess(ICommunicator *comm, IGroup *& retcoven,unsigned timeout,DaliClientRole role)
- {
- // NB doesn't use coven as not yet set up
- if (comm->queryGroup().ordinality()==0)
- return false;
- CTimeMon tm(timeout);
- retcoven = NULL;
- unsigned nextLog = 0, lastNextLog = 0;
- unsigned t=REG_SLEEP;
- unsigned remaining;
- rank_t r;
- while (!tm.timedout(&remaining)) {
- if (remaining>t)
- remaining = t;
- r = getRandom()%comm->queryGroup().ordinality();
-
- bool ok = false;
- if (remaining>10000) // MP protocol has a minimum time of 10s so if remaining < 10s use a detachable thread
- ok = comm->verifyConnection(r,remaining);
- else {
- struct cThread: public Thread
- {
- IMPLEMENT_IINTERFACE;
- Semaphore sem;
- Linked<ICommunicator> comm;
- bool ok;
- Owned<IException> exc;
- unsigned remaining;
- rank_t r;
- cThread(ICommunicator *_comm,rank_t _r,unsigned _remaining)
- : Thread ("dasess.registerClientProcess"), comm(_comm)
- {
- r = _r;
- remaining = _remaining;
- ok = false;
- }
- int run()
- {
- try {
- if (comm->verifyConnection(r,remaining))
- ok = true;
- }
- catch (IException *e)
- {
- exc.setown(e);
- }
- sem.signal();
- return 0;
- }
- } *t = new cThread(comm,r,remaining);
- t->start();
- t->sem.wait(remaining);
- ok = t->ok;
- if (t->exc.get()) {
- IException *e = t->exc.getClear();
- t->Release();
- throw e;
- }
- t->Release();
- }
- if (ok) {
- CMessageBuffer mb;
- mb.append((int)MSR_REGISTER_PROCESS_SESSION);
- queryMyNode()->serialize(mb);
- comm->queryGroup().queryNode(r).serialize(mb);
- mb.append((int)role);
- if (comm->sendRecv(mb,r,MPTAG_DALI_SESSION_REQUEST,SESSIONREPLYTIMEOUT)) {
- if (!mb.length())
- {
- // failed system capability match,
- retcoven = comm->getGroup(); // continue as if - will fail later more obscurely.
- return true;
- }
- mb.read(mySessionId);
- retcoven = deserializeIGroup(mb);
- return true;
- }
- }
- StringBuffer str;
- PROGLOG("Failed to connect to Dali Server %s.", comm->queryGroup().queryNode(r).endpoint().getUrlStr(str).str());
- if (tm.timedout())
- {
- PROGLOG("%s", str.append(" Timed out.").str());
- break;
- }
- else if (0 == nextLog)
- {
- PROGLOG("%s", str.append(" Retrying...").str());
- if ((lastNextLog * REG_SLEEP) >= 60000) // limit to a minute between logging
- nextLog = 60000 / REG_SLEEP;
- else
- nextLog = lastNextLog + 2; // wait +2 REG_SLEEP pauses before next logging
- lastNextLog = nextLog;
- }
- else
- --nextLog;
- Sleep(REG_SLEEP);
- }
- return false;
- }
- extern void stopClientProcess()
- {
- CriticalBlock block(sessionCrit);
- if (mySessionId&&SessionManager) {
- try {
- querySessionManager().stopSession(mySessionId,false);
- }
- catch (IDaliClient_Exception *e) {
- if (e->errorCode()!=DCERR_server_closed)
- throw;
- e->Release();
- }
- mySessionId = 0;
- }
- }
-
- class CProcessSessionWatchdog
- {
- };
- class CUserDescriptor: public CInterface, implements IUserDescriptor
- {
- StringAttr username;
- StringAttr passwordenc;
- public:
- IMPLEMENT_IINTERFACE;
- StringBuffer &getUserName(StringBuffer &buf)
- {
- return buf.append(username);
- }
- StringBuffer &getPassword(StringBuffer &buf)
- {
- decrypt(buf,passwordenc);
- return buf;
- }
- virtual void set(const char *name,const char *password)
- {
- username.set(name);
- StringBuffer buf;
- encrypt(buf,password);
- passwordenc.set(buf.str());
- }
- virtual void clear()
- {
- username.clear();
- passwordenc.clear();
- }
- void serialize(MemoryBuffer &mb)
- {
- mb.append(username).append(passwordenc);
- }
- void deserialize(MemoryBuffer &mb)
- {
- mb.read(username).read(passwordenc);
- }
- };
- IUserDescriptor *createUserDescriptor()
- {
- return new CUserDescriptor;
- }
- IUserDescriptor *createUserDescriptor(MemoryBuffer &mb)
- {
- IUserDescriptor * udesc = createUserDescriptor();
- udesc->deserialize(mb);
- return udesc;
- }
- MODULE_INIT(INIT_PRIORITY_DALI_DASESS)
- {
- return true;
- }
- MODULE_EXIT()
- {
- ::Release(SessionManager);
- SessionManager = NULL;
- }
|