dasess.cpp 74 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214121512161217121812191220122112221223122412251226122712281229123012311232123312341235123612371238123912401241124212431244124512461247124812491250125112521253125412551256125712581259126012611262126312641265126612671268126912701271127212731274127512761277127812791280128112821283128412851286128712881289129012911292129312941295129612971298129913001301130213031304130513061307130813091310131113121313131413151316131713181319132013211322132313241325132613271328132913301331133213331334133513361337133813391340134113421343134413451346134713481349135013511352135313541355135613571358135913601361136213631364136513661367136813691370137113721373137413751376137713781379138013811382138313841385138613871388138913901391139213931394139513961397139813991400140114021403140414051406140714081409141014111412141314141415141614171418141914201421142214231424142514261427142814291430143114321433143414351436143714381439144014411442144314441445144614471448144914501451145214531454145514561457145814591460146114621463146414651466146714681469147014711472147314741475147614771478147914801481148214831484148514861487148814891490149114921493149414951496149714981499150015011502150315041505150615071508150915101511151215131514151515161517151815191520152115221523152415251526152715281529153015311532153315341535153615371538153915401541154215431544154515461547154815491550155115521553155415551556155715581559156015611562156315641565156615671568156915701571157215731574157515761577157815791580158115821583158415851586158715881589159015911592159315941595159615971598159916001601160216031604160516061607160816091610161116121613161416151616161716181619162016211622162316241625162616271628162916301631163216331634163516361637163816391640164116421643164416451646164716481649165016511652165316541655165616571658165916601661166216631664166516661667166816691670167116721673167416751676167716781679168016811682168316841685168616871688168916901691169216931694169516961697169816991700170117021703170417051706170717081709171017111712171317141715171617171718171917201721172217231724172517261727172817291730173117321733173417351736173717381739174017411742174317441745174617471748174917501751175217531754175517561757175817591760176117621763176417651766176717681769177017711772177317741775177617771778177917801781178217831784178517861787178817891790179117921793179417951796179717981799180018011802180318041805180618071808180918101811181218131814181518161817181818191820182118221823182418251826182718281829183018311832183318341835183618371838183918401841184218431844184518461847184818491850185118521853185418551856185718581859186018611862186318641865186618671868186918701871187218731874187518761877187818791880188118821883188418851886188718881889189018911892189318941895189618971898189919001901190219031904190519061907190819091910191119121913191419151916191719181919192019211922192319241925192619271928192919301931193219331934193519361937193819391940194119421943194419451946194719481949195019511952195319541955195619571958195919601961196219631964196519661967196819691970197119721973197419751976197719781979198019811982198319841985198619871988198919901991199219931994199519961997199819992000200120022003200420052006200720082009201020112012201320142015201620172018201920202021202220232024202520262027202820292030203120322033203420352036203720382039204020412042204320442045204620472048204920502051205220532054205520562057205820592060206120622063206420652066206720682069207020712072207320742075207620772078207920802081208220832084208520862087208820892090209120922093209420952096209720982099210021012102210321042105210621072108210921102111211221132114211521162117211821192120212121222123212421252126212721282129213021312132213321342135213621372138213921402141214221432144214521462147214821492150215121522153215421552156215721582159216021612162216321642165216621672168216921702171217221732174217521762177217821792180218121822183218421852186218721882189219021912192219321942195219621972198219922002201220222032204220522062207220822092210221122122213221422152216221722182219222022212222222322242225222622272228222922302231223222332234223522362237223822392240224122422243224422452246224722482249225022512252225322542255225622572258225922602261226222632264226522662267226822692270227122722273227422752276227722782279228022812282228322842285228622872288228922902291229222932294229522962297229822992300230123022303230423052306230723082309231023112312231323142315231623172318231923202321232223232324232523262327232823292330233123322333233423352336233723382339234023412342234323442345234623472348234923502351235223532354235523562357235823592360236123622363236423652366236723682369237023712372237323742375237623772378237923802381238223832384238523862387238823892390239123922393239423952396239723982399240024012402240324042405240624072408240924102411
  1. /*##############################################################################
  2. HPCC SYSTEMS software Copyright (C) 2012 HPCC Systems®.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. ############################################################################## */
  13. #define da_decl DECL_EXPORT
  14. #include <unordered_map>
  15. #include <unordered_set>
  16. #include <string>
  17. #include "platform.h"
  18. #include "jlib.hpp"
  19. #include "jfile.hpp"
  20. #include "jsuperhash.hpp"
  21. #include "jmisc.hpp"
  22. #include "jencrypt.hpp"
  23. #include "dacoven.hpp"
  24. #include "mpbuff.hpp"
  25. #include "mpcomm.hpp"
  26. #include "mputil.hpp"
  27. #include "daserver.hpp"
  28. #include "dasubs.ipp"
  29. #include "dasds.hpp"
  30. #include "daclient.hpp"
  31. #include "daldap.hpp"
  32. #include "seclib.hpp"
  33. #include "dasess.hpp"
  34. #include "digisign.hpp"
  35. using namespace cryptohelper;
  36. #ifdef _MSC_VER
  37. #pragma warning (disable : 4355)
  38. #endif
  39. static std::unordered_map<std::string, DaliClientRole> daliClientRoleMap = {
  40. { "Private", DCR_Private },
  41. { "ThorSlave", DCR_ThorSlave },
  42. { "ThorMaster", DCR_ThorMaster },
  43. { "EclCCServer", DCR_EclCCServer },
  44. { "EclCC", DCR_EclCC },
  45. { "EclServer", DCR_EclServer },
  46. { "EclScheduler", DCR_EclScheduler },
  47. { "EclAgent", DCR_EclAgent },
  48. { "AgentExec", DCR_AgentExec },
  49. { "DaliServer", DCR_DaliServer },
  50. { "SashaServer", DCR_SashaServer },
  51. { "DfuServer", DCR_DfuServer },
  52. { "EspServer", DCR_EspServer },
  53. { "Config", DCR_Config },
  54. { "SchedulerAdmin", DCR_ScheduleAdmin },
  55. { "RoxieMaster", DCR_RoxyMaster },
  56. { "BackupGen", DCR_BackupGen },
  57. { "DaFsControl", DCR_DaFsControl },
  58. { "SwapNode", DCR_SwapNode },
  59. { "DaliAdmin", DCR_DaliAdmin },
  60. { "UpdateEnv", DCR_UpdateEnv },
  61. { "TreeView", DCR_TreeView },
  62. { "DaliDiag", DCR_DaliDiag },
  63. { "Testing", DCR_Testing },
  64. { "XRef", DCR_XRef },
  65. { "EclMinus", DCR_EclMinus },
  66. { "Monitoring", DCR_Monitoring },
  67. };
  68. const char *queryRoleName(DaliClientRole role)
  69. {
  70. switch (role) {
  71. case DCR_Private: return "Private";
  72. case DCR_ThorSlave: return "ThorSlave";
  73. case DCR_ThorMaster: return "ThorMaster";
  74. case DCR_EclCCServer: return "EclCCServer";
  75. case DCR_EclCC: return "EclCC";
  76. case DCR_EclServer: return "EclServer";
  77. case DCR_EclScheduler: return "EclScheduler";
  78. case DCR_EclAgent: return "EclAgent";
  79. case DCR_AgentExec: return "AgentExec";
  80. case DCR_DaliServer:return "DaliServer";
  81. case DCR_SashaServer: return "SashaServer";
  82. case DCR_DfuServer: return "DfuServer";
  83. case DCR_EspServer: return "EspServer";
  84. case DCR_Config: return "Config";
  85. case DCR_ScheduleAdmin: return "SchedulerAdmin";
  86. case DCR_RoxyMaster: return "RoxieMaster";
  87. case DCR_BackupGen: return "BackupGen";
  88. case DCR_DaFsControl: return "DaFsControl";
  89. case DCR_SwapNode: return "SwapNode";
  90. case DCR_DaliAdmin: return "DaliAdmin";
  91. case DCR_UpdateEnv: return "UpdateEnv";
  92. case DCR_TreeView: return "TreeView";
  93. case DCR_DaliDiag: return "DaliDiag";
  94. case DCR_Testing: return "Testing";
  95. case DCR_XRef: return "XRef";
  96. case DCR_EclMinus: return "EclMinus";
  97. case DCR_Monitoring: return "Monitoring";
  98. }
  99. return "Unknown";
  100. }
  101. interface ISessionManagerServer: implements IConnectionMonitor
  102. {
  103. virtual SessionId registerSession(SecurityToken tok,SessionId parentid) = 0;
  104. virtual SessionId registerClientProcess(INode *node,IGroup *&grp, DaliClientRole role) = 0;
  105. virtual void addProcessSession(SessionId id, INode *node, DaliClientRole role) = 0;
  106. virtual void addSession(SessionId id) = 0;
  107. virtual SessionId lookupProcessSession(INode *node) = 0;
  108. virtual INode *getProcessSessionNode(SessionId id) =0;
  109. virtual SecAccessFlags getPermissionsLDAP(const char *key,const char *obj,IUserDescriptor *udesc,unsigned flags, int *err)=0;
  110. virtual bool clearPermissionsCache(IUserDescriptor *udesc) = 0;
  111. virtual void stopSession(SessionId sessid,bool failed) = 0;
  112. virtual void setClientAuth(IDaliClientAuthConnection *authconn) = 0;
  113. virtual void setLDAPconnection(IDaliLdapConnection *_ldapconn) = 0;
  114. virtual bool authorizeConnection(const INode *client, DaliClientRole role) = 0;
  115. virtual void start() = 0;
  116. virtual void ready() = 0;
  117. virtual void stop() = 0;
  118. virtual bool queryScopeScansEnabled(IUserDescriptor *udesc, int * err, StringBuffer &retMsg) = 0;
  119. virtual bool enableScopeScans(IUserDescriptor *udesc, bool enable, int * err, StringBuffer &retMsg) = 0;
  120. };
  121. static SessionId mySessionId=0;
  122. static ISessionManager *SessionManager=NULL;
  123. static ISessionManagerServer *SessionManagerServer=NULL;
  124. static CriticalSection sessionCrit;
  125. #define SESSIONREPLYTIMEOUT (3*60*1000)
  126. #define CLDAPE_getpermtimeout (-1)
  127. #define CLDAPE_ldapfailure (-2)
  128. class DECL_EXCEPTION CDaliLDAP_Exception: implements IException, public CInterface
  129. {
  130. int errcode;
  131. public:
  132. CDaliLDAP_Exception(int _errcode)
  133. {
  134. errcode = _errcode;
  135. }
  136. int errorCode() const { return errcode; }
  137. StringBuffer & errorMessage(StringBuffer &str) const
  138. {
  139. if (errcode==0)
  140. return str;
  141. str.appendf("LDAP Exception(%d): ",errcode);
  142. if (errcode==CLDAPE_getpermtimeout)
  143. return str.append("getPermissionsLDAP - timeout to LDAP server");
  144. if (errcode==CLDAPE_ldapfailure)
  145. return str.append("getPermissionsLDAP - LDAP server failure");
  146. return str.append("Unknown Exception");
  147. }
  148. MessageAudience errorAudience() const { return MSGAUD_user; }
  149. IMPLEMENT_IINTERFACE;
  150. };
  151. class CdelayedTerminate: public Thread // slightly obfuscated stop code
  152. {
  153. byte err;
  154. int run()
  155. {
  156. while (getRandom()%711!=0) getRandom(); // look busy
  157. IERRLOG("Server fault %d",(int)err);
  158. while (getRandom()%7!=0) Sleep(1);
  159. exit(0);
  160. }
  161. public:
  162. CdelayedTerminate(byte _err)
  163. {
  164. err = _err;
  165. start();
  166. Release();
  167. Sleep(100);
  168. }
  169. };
  170. class CSessionState: public CInterface
  171. {
  172. protected: friend class CSessionStateTable;
  173. SessionId id;
  174. public:
  175. CSessionState(SessionId _id)
  176. {
  177. id = _id;
  178. }
  179. SessionId getId() const
  180. {
  181. return id;
  182. }
  183. };
  184. class CSessionStateTable: private SuperHashTableOf<CSessionState,SessionId>
  185. {
  186. CheckedCriticalSection sessstatesect;
  187. void onAdd(void *) {}
  188. void onRemove(void *e)
  189. {
  190. CSessionState &elem=*(CSessionState *)e;
  191. elem.Release();
  192. }
  193. unsigned getHashFromElement(const void *e) const
  194. {
  195. const CSessionState &elem=*(const CSessionState *)e;
  196. SessionId id=elem.id;
  197. return low(id)^(unsigned)high(id);
  198. }
  199. unsigned getHashFromFindParam(const void *fp) const
  200. {
  201. SessionId id = *(const SessionId *)fp;
  202. return low(id)^(unsigned)high(id);
  203. }
  204. const void * getFindParam(const void *p) const
  205. {
  206. const CSessionState &elem=*(const CSessionState *)p;
  207. return (void *)&elem.id;
  208. }
  209. bool matchesFindParam(const void * et, const void *fp, unsigned) const
  210. {
  211. return ((CSessionState *)et)->id==*(SessionId *)fp;
  212. }
  213. IMPLEMENT_SUPERHASHTABLEOF_REF_FIND(CSessionState,SessionId);
  214. public:
  215. CSessionStateTable()
  216. {
  217. }
  218. ~CSessionStateTable() {
  219. CHECKEDCRITICALBLOCK(sessstatesect,60000);
  220. _releaseAll();
  221. }
  222. bool add(CSessionState *e) // takes ownership
  223. {
  224. CHECKEDCRITICALBLOCK(sessstatesect,60000);
  225. if (SuperHashTableOf<CSessionState,SessionId>::find(&e->id))
  226. return false;
  227. SuperHashTableOf<CSessionState,SessionId>::add(*e);
  228. return true;
  229. }
  230. CSessionState *query(SessionId id)
  231. {
  232. CHECKEDCRITICALBLOCK(sessstatesect,60000);
  233. return SuperHashTableOf<CSessionState,SessionId>::find(&id);
  234. }
  235. void remove(SessionId id)
  236. {
  237. CHECKEDCRITICALBLOCK(sessstatesect,60000);
  238. SuperHashTableOf<CSessionState,SessionId>::remove(&id);
  239. }
  240. };
  241. class CProcessSessionState: public CSessionState
  242. {
  243. INode *node;
  244. DaliClientRole role;
  245. UInt64Array previousSessionIds;
  246. public:
  247. CProcessSessionState(SessionId id,INode *_node,DaliClientRole _role)
  248. : CSessionState(id)
  249. {
  250. node = _node;
  251. node->Link();
  252. role = _role;
  253. }
  254. ~CProcessSessionState()
  255. {
  256. node->Release();
  257. }
  258. INode &queryNode() const
  259. {
  260. return *node;
  261. }
  262. DaliClientRole queryRole() const
  263. {
  264. return role;
  265. }
  266. StringBuffer &getDetails(StringBuffer &buf)
  267. {
  268. StringBuffer ep;
  269. return buf.appendf("%16" I64F "X: %s, role=%s",CSessionState::id,node->endpoint().getUrlStr(ep).str(),queryRoleName(role));
  270. }
  271. void addSessionIds(CProcessSessionState &other, bool prevOnly)
  272. {
  273. for (;;)
  274. {
  275. SessionId id = other.dequeuePreviousSessionId();
  276. if (!id)
  277. break;
  278. previousSessionIds.append(id);
  279. }
  280. if (!prevOnly)
  281. previousSessionIds.append(other.getId());
  282. }
  283. SessionId dequeuePreviousSessionId()
  284. {
  285. if (!previousSessionIds.ordinality())
  286. return 0;
  287. return previousSessionIds.popGet();
  288. }
  289. unsigned previousSessionIdCount() const
  290. {
  291. return previousSessionIds.ordinality();
  292. }
  293. void removeOldSessionId(SessionId id)
  294. {
  295. if (previousSessionIds.zap(id))
  296. PROGLOG("Removed old sessionId (%" I64F "x) from current process state", id);
  297. }
  298. };
  299. class CMapProcessToSession: private SuperHashTableOf<CProcessSessionState,INode>
  300. {
  301. CheckedCriticalSection mapprocesssect;
  302. void onAdd(void *) {}
  303. void onRemove(void *e) {} // do nothing
  304. unsigned getHashFromElement(const void *e) const
  305. {
  306. const CProcessSessionState &elem=*(const CProcessSessionState *)e;
  307. return elem.queryNode().getHash();
  308. }
  309. unsigned getHashFromFindParam(const void *fp) const
  310. {
  311. return ((INode *)fp)->getHash();
  312. }
  313. const void * getFindParam(const void *p) const
  314. {
  315. const CProcessSessionState &elem=*(const CProcessSessionState *)p;
  316. return (void *)&elem.queryNode();
  317. }
  318. bool matchesFindParam(const void * et, const void *fp, unsigned) const
  319. {
  320. return ((CProcessSessionState *)et)->queryNode().equals((INode *)fp);
  321. }
  322. IMPLEMENT_SUPERHASHTABLEOF_REF_FIND(CProcessSessionState,INode);
  323. public:
  324. CMapProcessToSession()
  325. {
  326. }
  327. ~CMapProcessToSession()
  328. {
  329. CHECKEDCRITICALBLOCK(mapprocesssect,60000);
  330. _releaseAll();
  331. }
  332. bool add(CProcessSessionState *e)
  333. {
  334. CHECKEDCRITICALBLOCK(mapprocesssect,60000);
  335. if (SuperHashTableOf<CProcessSessionState,INode>::find(&e->queryNode()))
  336. return false;
  337. SuperHashTableOf<CProcessSessionState,INode>::add(*e);
  338. return true;
  339. }
  340. void replace(CProcessSessionState *e)
  341. {
  342. CHECKEDCRITICALBLOCK(mapprocesssect,60000);
  343. SuperHashTableOf<CProcessSessionState,INode>::replace(*e);
  344. }
  345. CProcessSessionState *query(INode *n)
  346. {
  347. CHECKEDCRITICALBLOCK(mapprocesssect,60000);
  348. return SuperHashTableOf<CProcessSessionState,INode>::find(n);
  349. }
  350. bool remove(const CProcessSessionState *state, ISessionManagerServer *manager)
  351. {
  352. CHECKEDCRITICALBLOCK(mapprocesssect,60000);
  353. return SuperHashTableOf<CProcessSessionState,INode>::removeExact((CProcessSessionState *)state);
  354. }
  355. unsigned count()
  356. {
  357. CHECKEDCRITICALBLOCK(mapprocesssect,60000);
  358. return SuperHashTableOf<CProcessSessionState,INode>::count();
  359. }
  360. CProcessSessionState *next(const CProcessSessionState *s)
  361. {
  362. CHECKEDCRITICALBLOCK(mapprocesssect,60000);
  363. return (CProcessSessionState *)SuperHashTableOf<CProcessSessionState,INode>::next(s);
  364. }
  365. };
  366. enum MSessionRequestKind {
  367. MSR_REGISTER_PROCESS_SESSION,
  368. MSR_SECONDARY_REGISTER_PROCESS_SESSION,
  369. MSR_REGISTER_SESSION,
  370. MSR_SECONDARY_REGISTER_SESSION,
  371. MSR_LOOKUP_PROCESS_SESSION,
  372. MSR_STOP_SESSION,
  373. MSR_IMPORT_CAPABILITIES,
  374. MSR_LOOKUP_LDAP_PERMISSIONS,
  375. MSR_CLEAR_PERMISSIONS_CACHE,
  376. MSR_EXIT, // TBD
  377. MSR_QUERY_SCOPE_SCANS_ENABLED,
  378. MSR_ENABLE_SCOPE_SCANS
  379. };
  380. class CQueryScopeScansEnabledReq : implements IMessageWrapper
  381. {
  382. public:
  383. bool enabled;
  384. Linked<IUserDescriptor> udesc;
  385. CQueryScopeScansEnabledReq(IUserDescriptor *_udesc) : udesc(_udesc) {}
  386. CQueryScopeScansEnabledReq() {}
  387. void serializeReq(CMessageBuffer &mb)
  388. {
  389. mb.append(MSR_QUERY_SCOPE_SCANS_ENABLED);
  390. udesc->serialize(mb);
  391. }
  392. void deserializeReq(CMessageBuffer &mb)
  393. {
  394. udesc.setown(createUserDescriptor(mb));
  395. }
  396. };
  397. class CEnableScopeScansReq : implements IMessageWrapper
  398. {
  399. public:
  400. bool doEnable;
  401. Linked<IUserDescriptor> udesc;
  402. CEnableScopeScansReq(IUserDescriptor *_udesc, bool _doEnable) : udesc(_udesc), doEnable(_doEnable) {}
  403. CEnableScopeScansReq() {}
  404. void serializeReq(CMessageBuffer &mb)
  405. {
  406. mb.append(MSR_ENABLE_SCOPE_SCANS);
  407. udesc->serialize(mb);
  408. mb.append(doEnable);
  409. }
  410. void deserializeReq(CMessageBuffer &mb)
  411. {
  412. udesc.setown(createUserDescriptor(mb));
  413. mb.read(doEnable);
  414. }
  415. };
  416. class CSessionRequestServer: public Thread
  417. {
  418. bool stopped;
  419. ISessionManagerServer &manager;
  420. Semaphore acceptConnections;
  421. public:
  422. CSessionRequestServer(ISessionManagerServer &_manager)
  423. : Thread("Session Manager, CSessionRequestServer"), manager(_manager)
  424. {
  425. stopped = true;
  426. }
  427. int run()
  428. {
  429. ICoven &coven=queryCoven();
  430. CMessageHandler<CSessionRequestServer> handler("CSessionRequestServer",this,&CSessionRequestServer::processMessage);
  431. stopped = false;
  432. CMessageBuffer mb;
  433. while (!stopped) {
  434. try {
  435. mb.clear();
  436. if (coven.recv(mb,RANK_ALL,MPTAG_DALI_SESSION_REQUEST,NULL))
  437. handler.handleMessage(mb);
  438. else
  439. stopped = true;
  440. }
  441. catch (IException *e)
  442. {
  443. EXCLOG(e, "CDaliPublisherServer");
  444. e->Release();
  445. }
  446. }
  447. return 0;
  448. }
  449. void processMessage(CMessageBuffer &mb)
  450. {
  451. ICoven &coven=queryCoven();
  452. SessionId id;
  453. int fn;
  454. mb.read(fn);
  455. switch (fn) {
  456. case MSR_REGISTER_PROCESS_SESSION: {
  457. acceptConnections.wait();
  458. acceptConnections.signal();
  459. Owned<INode> node(deserializeINode(mb));
  460. Owned<INode> servernode(deserializeINode(mb)); // hopefully me, but not if forwarded
  461. int role=0;
  462. if (mb.length()-mb.getPos()>=sizeof(role)) { // a capability block present
  463. mb.read(role);
  464. if (!manager.authorizeConnection(node, (DaliClientRole) role))
  465. {
  466. MilliSleep(2000); // Delay makes rapid probing of all possible roles slightly more painful.
  467. SocketEndpoint sender = mb.getSender();
  468. mb.clear();
  469. mb.append((SessionId) 0);
  470. INode *na = queryNullNode();
  471. Owned<IGroup> dummyCoven = createIGroup(1, &na);
  472. dummyCoven->serialize(mb);
  473. const char *roleName = queryRoleName((DaliClientRole)role);
  474. Owned<IException> e = makeStringExceptionV(-1, "Access denied! [role=%s]", roleName);
  475. serializeException(e, mb);
  476. coven.reply(mb);
  477. MilliSleep(100+getRandom()%1000); // Causes client to 'work' for a short time.
  478. Owned<INode> node = createINode(sender);
  479. coven.disconnect(node);
  480. break;
  481. }
  482. #ifdef _DEBUG
  483. StringBuffer eps;
  484. PROGLOG("Connection to %s at %s authorized",queryRoleName((DaliClientRole)role),mb.getSender().getUrlStr(eps).str());
  485. #endif
  486. }
  487. IGroup *covengrp;
  488. id = manager.registerClientProcess(node.get(),covengrp,(DaliClientRole)role);
  489. mb.clear().append(id);
  490. if (covengrp->rank(servernode)==RANK_NULL) { // must have been redirected
  491. covengrp->Release(); // no good, so just use one we know about (may use something more sophisticated later)
  492. INode *na = servernode.get();
  493. covengrp = createIGroup(1, &na);
  494. }
  495. covengrp->serialize(mb);
  496. covengrp->Release();
  497. coven.reply(mb);
  498. }
  499. break;
  500. case MSR_SECONDARY_REGISTER_PROCESS_SESSION: {
  501. mb.read(id);
  502. Owned<INode> node (deserializeINode(mb));
  503. int role;
  504. mb.read(role);
  505. manager.addProcessSession(id,node.get(),(DaliClientRole)role);
  506. mb.clear();
  507. coven.reply(mb);
  508. }
  509. break;
  510. case MSR_REGISTER_SESSION: {
  511. SecurityToken tok;
  512. SessionId parentid;
  513. mb.read(tok).read(parentid);
  514. SessionId id = manager.registerSession(tok,parentid);
  515. mb.clear().append(id);
  516. coven.reply(mb);
  517. }
  518. break;
  519. case MSR_SECONDARY_REGISTER_SESSION: {
  520. mb.read(id);
  521. manager.addSession(id);
  522. mb.clear();
  523. coven.reply(mb);
  524. }
  525. break;
  526. case MSR_LOOKUP_PROCESS_SESSION: {
  527. // looks up from node or from id
  528. Owned<INode> node (deserializeINode(mb));
  529. if (node->endpoint().isNull()&&(mb.length()-mb.getPos()>=sizeof(id))) {
  530. mb.read(id);
  531. INode *n = manager.getProcessSessionNode(id);
  532. if (n)
  533. node.setown(n);
  534. node->serialize(mb.clear());
  535. }
  536. else {
  537. id = manager.lookupProcessSession(node.get());
  538. mb.clear().append(id);
  539. }
  540. coven.reply(mb);
  541. }
  542. break;
  543. case MSR_STOP_SESSION: {
  544. SessionId sessid;
  545. bool failed;
  546. mb.read(sessid).read(failed);
  547. manager.stopSession(sessid,failed);
  548. mb.clear();
  549. coven.reply(mb);
  550. }
  551. break;
  552. case MSR_LOOKUP_LDAP_PERMISSIONS: {
  553. StringAttr key;
  554. StringAttr obj;
  555. Owned<IUserDescriptor> udesc=createUserDescriptor();
  556. mb.read(key).read(obj);
  557. udesc->deserialize(mb);
  558. #ifdef NULL_DALIUSER_STACKTRACE
  559. //following debug code to be removed
  560. StringBuffer sb;
  561. udesc->getUserName(sb);
  562. if (0==sb.length())
  563. {
  564. DBGLOG("UNEXPECTED USER (NULL) in dasess.cpp CSessionRequestServer::processMessage() line %d", __LINE__);
  565. }
  566. #endif
  567. unsigned auditflags = 0;
  568. if (mb.length()-mb.getPos()>=sizeof(auditflags))
  569. mb.read(auditflags);
  570. if (mb.remaining() > 0)
  571. {
  572. StringBuffer reqSignature; // now ignored
  573. CDateTime reqUTCTimestamp; // also ignored
  574. mb.read(reqSignature);
  575. reqUTCTimestamp.deserialize(mb);
  576. }
  577. int err = 0;
  578. SecAccessFlags perms = manager.getPermissionsLDAP(key,obj,udesc,auditflags,&err);
  579. mb.clear().append((int)perms);
  580. if (err)
  581. mb.append(err);
  582. coven.reply(mb);
  583. }
  584. break;
  585. case MSR_CLEAR_PERMISSIONS_CACHE: {
  586. Owned<IUserDescriptor> udesc=createUserDescriptor();
  587. udesc->deserialize(mb);
  588. bool ok = manager.clearPermissionsCache(udesc);
  589. mb.append(ok);
  590. coven.reply(mb);
  591. }
  592. break;
  593. case MSR_QUERY_SCOPE_SCANS_ENABLED:{
  594. CQueryScopeScansEnabledReq req;
  595. req.deserializeReq(mb);
  596. int err;
  597. StringBuffer retMsg;
  598. bool enabled = manager.queryScopeScansEnabled(req.udesc, &err, retMsg);
  599. mb.clear().append(err);
  600. mb.append(enabled);
  601. mb.append(retMsg.str());
  602. if (err != 0 || retMsg.length())
  603. {
  604. StringBuffer user;
  605. req.udesc->getUserName(user);
  606. DBGLOG("Error %d querying scope scan status for %s : %s", err, user.str(), retMsg.str());
  607. }
  608. coven.reply(mb);
  609. }
  610. break;
  611. case MSR_ENABLE_SCOPE_SCANS:{
  612. CEnableScopeScansReq req;
  613. req.deserializeReq(mb);
  614. int err;
  615. StringBuffer retMsg;
  616. bool ok = manager.enableScopeScans(req.udesc, req.doEnable, &err, retMsg);
  617. mb.clear().append(err);
  618. mb.append(retMsg.str());
  619. if (err != 0 || retMsg.length())
  620. {
  621. StringBuffer user;
  622. req.udesc->getUserName(user);
  623. DBGLOG("Error %d %sing Scope Scan Status for %s: %s", err, req.doEnable?"Enabl":"Disabl", user.str(), retMsg.str());
  624. }
  625. coven.reply(mb);
  626. }
  627. break;
  628. }
  629. }
  630. void ready()
  631. {
  632. acceptConnections.signal();
  633. }
  634. void stop()
  635. {
  636. if (!stopped) {
  637. stopped = true;
  638. queryCoven().cancel(RANK_ALL, MPTAG_DALI_SESSION_REQUEST);
  639. }
  640. join();
  641. }
  642. };
  643. class CSessionManagerBase: implements ISessionManager, public CInterface
  644. {
  645. protected:
  646. CheckedCriticalSection sessmanagersect;
  647. public:
  648. IMPLEMENT_IINTERFACE;
  649. CSessionManagerBase()
  650. {
  651. servernotifys.kill();
  652. }
  653. virtual ~CSessionManagerBase()
  654. {
  655. }
  656. class CSessionSubscriptionProxy: implements ISubscription, public CInterface
  657. {
  658. ISessionNotify *sub;
  659. SubscriptionId id;
  660. MemoryAttr ma;
  661. SessionId sessid;
  662. public:
  663. IMPLEMENT_IINTERFACE;
  664. CSessionSubscriptionProxy(ISessionNotify *_sub,SessionId _sessid)
  665. {
  666. sub = LINK(_sub);
  667. sessid = _sessid;
  668. MemoryBuffer mb;
  669. mb.append(sessid);
  670. ma.set(mb.length(),mb.toByteArray());
  671. id = queryCoven().getUniqueId();
  672. }
  673. ~CSessionSubscriptionProxy()
  674. {
  675. sub->Release();
  676. }
  677. ISessionNotify *queryNotify()
  678. {
  679. return sub;
  680. }
  681. const MemoryAttr &queryData()
  682. {
  683. return ma;
  684. }
  685. SubscriptionId getId()
  686. {
  687. return id;
  688. }
  689. void doNotify(bool aborted)
  690. {
  691. if (aborted)
  692. sub->aborted(sessid);
  693. else
  694. sub->closed(sessid);
  695. }
  696. void notify(MemoryBuffer &mb)
  697. {
  698. bool aborted;
  699. mb.read(aborted);
  700. doNotify(aborted);
  701. }
  702. void abort()
  703. {
  704. //NH: TBD
  705. }
  706. bool aborted()
  707. {
  708. return false;
  709. }
  710. };
  711. CIArrayOf<CSessionSubscriptionProxy>servernotifys;
  712. SubscriptionId subscribeSession(SessionId sessid, ISessionNotify *inotify)
  713. {
  714. CSessionSubscriptionProxy *proxy;
  715. SubscriptionId id;
  716. {
  717. CHECKEDCRITICALBLOCK(sessmanagersect,60000);
  718. proxy = new CSessionSubscriptionProxy(inotify,sessid);
  719. id = proxy->getId();
  720. if (sessid==SESSID_DALI_SERVER) {
  721. servernotifys.append(*proxy);
  722. return id;
  723. }
  724. }
  725. querySubscriptionManager(SESSION_PUBLISHER)->add(proxy,id);
  726. return id;
  727. }
  728. void unsubscribeSession(SubscriptionId id)
  729. {
  730. {
  731. CHECKEDCRITICALBLOCK(sessmanagersect,60000);
  732. // check not a server subscription
  733. ForEachItemIn(i,servernotifys) {
  734. if (servernotifys.item(i).getId()==id) {
  735. servernotifys.remove(i);
  736. return;
  737. }
  738. }
  739. }
  740. querySubscriptionManager(SESSION_PUBLISHER)->remove(id);
  741. }
  742. class cNotify: implements ISessionNotify, public CInterface
  743. {
  744. public:
  745. IMPLEMENT_IINTERFACE;
  746. Semaphore sem;
  747. void closed(SessionId id)
  748. {
  749. //PROGLOG("Session closed %" I64F "x",id);
  750. sem.signal();
  751. }
  752. void aborted(SessionId id)
  753. {
  754. //PROGLOG("Session aborted %" I64F "x",id);
  755. sem.signal();
  756. }
  757. };
  758. bool sessionStopped(SessionId id, unsigned timeout)
  759. {
  760. Owned<INode> node = getProcessSessionNode(id);
  761. if (node.get()==NULL)
  762. return true;
  763. if (timeout==0)
  764. return false;
  765. Owned<cNotify> cnotify = new cNotify;
  766. querySessionManager().subscribeSession(id,cnotify);
  767. if (cnotify->sem.wait(timeout))
  768. return false;
  769. node.setown(getProcessSessionNode(id));
  770. return node.get()==NULL;
  771. }
  772. void notifyServerStopped(bool aborted)
  773. {
  774. CHECKEDCRITICALBLOCK(sessmanagersect,60000);
  775. // check not a server subscription
  776. ForEachItemIn(i,servernotifys) {
  777. servernotifys.item(i).doNotify(aborted);
  778. }
  779. }
  780. };
  781. class CClientSessionManager: public CSessionManagerBase, implements IConnectionMonitor
  782. {
  783. bool securitydisabled;
  784. public:
  785. IMPLEMENT_IINTERFACE;
  786. CClientSessionManager()
  787. {
  788. securitydisabled = false;
  789. }
  790. virtual ~CClientSessionManager()
  791. {
  792. stop();
  793. }
  794. SessionId lookupProcessSession(INode *node)
  795. {
  796. if (!node)
  797. return mySessionId;
  798. CMessageBuffer mb;
  799. mb.append((int)MSR_LOOKUP_PROCESS_SESSION);
  800. node->serialize(mb);
  801. if (!queryCoven().sendRecv(mb,RANK_RANDOM,MPTAG_DALI_SESSION_REQUEST,SESSIONREPLYTIMEOUT))
  802. return 0;
  803. SessionId ret;
  804. mb.read(ret);
  805. return ret;
  806. }
  807. virtual INode *getProcessSessionNode(SessionId id)
  808. {
  809. if (!id)
  810. return NULL;
  811. CMessageBuffer mb;
  812. mb.append((int)MSR_LOOKUP_PROCESS_SESSION);
  813. queryNullNode()->serialize(mb);
  814. mb.append(id);
  815. if (!queryCoven().sendRecv(mb,RANK_RANDOM,MPTAG_DALI_SESSION_REQUEST,SESSIONREPLYTIMEOUT))
  816. return NULL;
  817. Owned<INode> node = deserializeINode(mb);
  818. if (node->endpoint().isNull())
  819. return NULL;
  820. return node.getClear();
  821. }
  822. SecAccessFlags getPermissionsLDAP(const char *key,const char *obj,IUserDescriptor *udesc,unsigned auditflags, int *err)
  823. {
  824. if (err)
  825. *err = 0;
  826. if (securitydisabled)
  827. return SecAccess_Unavailable;
  828. if (queryDaliServerVersion().compare("1.8") < 0) {
  829. securitydisabled = true;
  830. return SecAccess_Unavailable;
  831. }
  832. if (!udesc)
  833. return SecAccess_Unknown;
  834. CMessageBuffer mb;
  835. mb.append((int)MSR_LOOKUP_LDAP_PERMISSIONS);
  836. mb.append(key).append(obj);
  837. #ifdef NULL_DALIUSER_STACKTRACE
  838. //following debug code to be removed
  839. StringBuffer sb;
  840. if (udesc)
  841. udesc->getUserName(sb);
  842. if (0==sb.length())
  843. {
  844. DBGLOG("UNEXPECTED USER (NULL) in dasess.cpp getPermissionsLDAP() line %d",__LINE__);
  845. PrintStackReport();
  846. }
  847. #endif
  848. udesc->serializeWithoutPassword(mb);//serialize user descriptor without password
  849. mb.append(auditflags);
  850. //Serialize signature. If not provided, compute it
  851. if (queryDaliServerVersion().compare("3.15") >= 0)
  852. {
  853. //Backwards compatibility in case another parameter is added later, otherwise could be removed
  854. CDateTime reqUTCTimestamp;
  855. mb.append(""); // previously a base64 encoded signature
  856. reqUTCTimestamp.serialize(mb);
  857. }
  858. if (!queryCoven().sendRecv(mb,RANK_RANDOM,MPTAG_DALI_SESSION_REQUEST,SESSIONREPLYTIMEOUT))
  859. return SecAccess_None;
  860. SecAccessFlags perms = SecAccess_Unavailable;
  861. if (mb.remaining()>=sizeof(perms)) {
  862. mb.read((int &)perms);
  863. if (mb.remaining()>=sizeof(int)) {
  864. int e = 0;
  865. mb.read(e);
  866. if (err)
  867. *err = e;
  868. else if (e)
  869. throw new CDaliLDAP_Exception(e);
  870. }
  871. }
  872. if (perms == SecAccess_Unavailable)
  873. securitydisabled = true;
  874. return perms;
  875. }
  876. bool clearPermissionsCache(IUserDescriptor *udesc)
  877. {
  878. if (securitydisabled)
  879. return true;
  880. if (queryDaliServerVersion().compare("1.8") < 0) {
  881. securitydisabled = true;
  882. return true;
  883. }
  884. CMessageBuffer mb;
  885. mb.append((int)MSR_CLEAR_PERMISSIONS_CACHE);
  886. udesc->serialize(mb);
  887. return queryCoven().sendRecv(mb,RANK_RANDOM,MPTAG_DALI_SESSION_REQUEST,SESSIONREPLYTIMEOUT);
  888. }
  889. bool queryScopeScansEnabled(IUserDescriptor *udesc, int * err, StringBuffer &retMsg)
  890. {
  891. if (queryDaliServerVersion().compare("3.10") < 0)
  892. {
  893. *err = -1;
  894. StringBuffer ver;
  895. queryDaliServerVersion().toString(ver);
  896. retMsg.appendf("Scope Scan status feature requires Dali V3.10 or newer, current Dali version %s",ver.str());
  897. return false;
  898. }
  899. if (securitydisabled)
  900. {
  901. *err = -1;
  902. retMsg.append("Security not enabled");
  903. return false;
  904. }
  905. if (queryDaliServerVersion().compare("1.8") < 0) {
  906. *err = -1;
  907. retMsg.append("Security not enabled");
  908. securitydisabled = true;
  909. return false;
  910. }
  911. CMessageBuffer mb;
  912. CQueryScopeScansEnabledReq req(udesc);
  913. req.serializeReq(mb);
  914. if (!queryCoven().sendRecv(mb,RANK_RANDOM,MPTAG_DALI_SESSION_REQUEST,SESSIONREPLYTIMEOUT))
  915. {
  916. *err = -1;
  917. retMsg.append("DALI Send/Recv error");
  918. return false;
  919. }
  920. int rc;
  921. bool enabled;
  922. mb.read(rc).read(enabled).read(retMsg);
  923. *err = rc;
  924. return enabled;
  925. }
  926. bool enableScopeScans(IUserDescriptor *udesc, bool enable, int * err, StringBuffer &retMsg)
  927. {
  928. if (queryDaliServerVersion().compare("3.10") < 0)
  929. {
  930. *err = -1;
  931. StringBuffer ver;
  932. queryDaliServerVersion().toString(ver);
  933. retMsg.appendf("Scope Scan enable/disable feature requires Dali V3.10 or newer, current Dali version %s",ver.str());
  934. return false;
  935. }
  936. if (securitydisabled)
  937. {
  938. *err = -1;
  939. retMsg.append("Security not enabled");
  940. return false;
  941. }
  942. if (queryDaliServerVersion().compare("1.8") < 0) {
  943. *err = -1;
  944. retMsg.append("Security not enabled");
  945. securitydisabled = true;
  946. return false;
  947. }
  948. CMessageBuffer mb;
  949. CEnableScopeScansReq req(udesc,enable);
  950. req.serializeReq(mb);
  951. if (!queryCoven().sendRecv(mb,RANK_RANDOM,MPTAG_DALI_SESSION_REQUEST,SESSIONREPLYTIMEOUT))
  952. {
  953. *err = -1;
  954. retMsg.append("DALI Send/Recv error");
  955. return false;
  956. }
  957. int rc;
  958. mb.read(rc).read(retMsg);
  959. *err = rc;
  960. if (rc == 0)
  961. {
  962. StringBuffer user;
  963. udesc->getUserName(user);
  964. DBGLOG("Scope Scans %sabled by %s",enable ? "En" : "Dis", user.str());
  965. }
  966. return rc == 0;
  967. }
  968. bool checkScopeScansLDAP()
  969. {
  970. throwUnexpectedX("checkScopeScansLDAP called on client");
  971. }
  972. unsigned getLDAPflags()
  973. {
  974. throwUnexpectedX("getLdapFlags called on client");
  975. }
  976. void setLDAPflags(unsigned)
  977. {
  978. throwUnexpectedX("setLdapFlags called on client");
  979. }
  980. SessionId startSession(SecurityToken tok, SessionId parentid)
  981. {
  982. CMessageBuffer mb;
  983. mb.append((int)MSR_REGISTER_SESSION).append(tok).append(parentid);
  984. if (!queryCoven().sendRecv(mb,RANK_RANDOM,MPTAG_DALI_SESSION_REQUEST,SESSIONREPLYTIMEOUT))
  985. return 0;
  986. SessionId ret;
  987. mb.read(ret);
  988. return ret;
  989. }
  990. void stopSession(SessionId sessid, bool failed)
  991. {
  992. if (sessid==SESSID_DALI_SERVER) {
  993. notifyServerStopped(failed);
  994. return;
  995. }
  996. CMessageBuffer mb;
  997. mb.append((int)MSR_STOP_SESSION).append(sessid).append(failed);
  998. queryCoven().sendRecv(mb,RANK_RANDOM,MPTAG_DALI_SESSION_REQUEST,SESSIONREPLYTIMEOUT);
  999. }
  1000. virtual void refreshWhiteList() override
  1001. {
  1002. throwUnexpectedX("refreshWhiteList called on client");
  1003. }
  1004. virtual StringBuffer &getWhiteList(StringBuffer &out) const override
  1005. {
  1006. throwUnexpectedX("getWhiteList called on client");
  1007. }
  1008. void onClose(SocketEndpoint &ep)
  1009. {
  1010. CHECKEDCRITICALBLOCK(sessmanagersect,60000);
  1011. Owned<INode> node = createINode(ep);
  1012. if (queryCoven().inCoven(node)) {
  1013. StringBuffer str;
  1014. PROGLOG("Coven Session Stopping (%s)",ep.getUrlStr(str).str());
  1015. if (queryCoven().size()==1)
  1016. notifyServerStopped(true);
  1017. }
  1018. }
  1019. void start()
  1020. {
  1021. addMPConnectionMonitor(this);
  1022. }
  1023. void stop()
  1024. {
  1025. }
  1026. void ready()
  1027. {
  1028. removeMPConnectionMonitor(this);
  1029. }
  1030. StringBuffer &getClientProcessList(StringBuffer &buf)
  1031. {
  1032. // dummy
  1033. return buf;
  1034. }
  1035. StringBuffer &getClientProcessEndpoint(SessionId,StringBuffer &buf)
  1036. {
  1037. // dummy
  1038. return buf;
  1039. }
  1040. unsigned queryClientCount()
  1041. {
  1042. // dummy
  1043. return 0;
  1044. }
  1045. void importCapabilities(MemoryBuffer &mb)
  1046. {
  1047. CMessageBuffer msg;
  1048. msg.append((int)MSR_IMPORT_CAPABILITIES);
  1049. msg.append(mb.length(), mb.toByteArray());
  1050. queryCoven().sendRecv(msg,RANK_RANDOM,MPTAG_DALI_SESSION_REQUEST,SESSIONREPLYTIMEOUT);
  1051. }
  1052. };
  1053. class CLdapWorkItem : public Thread
  1054. {
  1055. StringAttr key;
  1056. StringAttr obj;
  1057. Linked<IUserDescriptor> udesc;
  1058. Linked<IDaliLdapConnection> ldapconn;
  1059. unsigned flags;
  1060. bool running;
  1061. Semaphore contsem;
  1062. Semaphore ready;
  1063. Semaphore &threaddone;
  1064. int ret;
  1065. public:
  1066. CLdapWorkItem(IDaliLdapConnection *_ldapconn,Semaphore &_threaddone)
  1067. : ldapconn(_ldapconn), threaddone(_threaddone)
  1068. {
  1069. running = false;
  1070. }
  1071. void start(const char *_key,const char *_obj,IUserDescriptor *_udesc,unsigned _flags)
  1072. {
  1073. key.set(_key);
  1074. obj.set(_obj);
  1075. #ifdef NULL_DALIUSER_STACKTRACE
  1076. StringBuffer sb;
  1077. if (_udesc)
  1078. _udesc->getUserName(sb);
  1079. if (sb.length()==0)
  1080. {
  1081. DBGLOG("UNEXPECTED USER (NULL) in dasess.cpp CLdapWorkItem::start() line %d",__LINE__);
  1082. PrintStackReport();
  1083. }
  1084. #endif
  1085. udesc.set(_udesc);
  1086. flags = _flags;
  1087. ret = CLDAPE_ldapfailure;
  1088. if (!running) {
  1089. running = true;
  1090. Thread::start();
  1091. }
  1092. contsem.signal();
  1093. }
  1094. int run()
  1095. {
  1096. for (;;) {
  1097. contsem.wait();
  1098. if (!running)
  1099. break;
  1100. try {
  1101. ret = ldapconn->getPermissions(key,obj,udesc,flags);
  1102. }
  1103. catch(IException *e) {
  1104. LOG(MCoperatorError, unknownJob, e, "CLdapWorkItem");
  1105. e->Release();
  1106. }
  1107. ready.signal();
  1108. }
  1109. threaddone.signal();
  1110. return 0;
  1111. }
  1112. bool wait(unsigned timeout, int &_ret)
  1113. {
  1114. if (ready.wait(timeout)) {
  1115. _ret = ret;
  1116. return true;
  1117. }
  1118. _ret = 0;
  1119. return false;
  1120. }
  1121. void stop()
  1122. {
  1123. running = false;
  1124. contsem.signal();
  1125. }
  1126. static CLdapWorkItem *get(IDaliLdapConnection *_ldapconn,Semaphore &_threaddone)
  1127. {
  1128. if (!_threaddone.wait(1000*60*5)) {
  1129. OWARNLOG("Too many stalled LDAP threads");
  1130. return NULL;
  1131. }
  1132. return new CLdapWorkItem(_ldapconn,_threaddone);
  1133. }
  1134. };
  1135. // std::hash specialization for DaliClientRole, used in std::pair for whiteList
  1136. namespace std {
  1137. template <>
  1138. struct hash<DaliClientRole> {
  1139. size_t operator ()(DaliClientRole value) const {
  1140. return static_cast<size_t>(value);
  1141. }
  1142. };
  1143. }
  1144. /* NB: Ideally this belongs within common/environment,
  1145. * however, that would introduce a circular dependency.
  1146. */
  1147. class CWhiteListHandler
  1148. {
  1149. struct PairHasher
  1150. {
  1151. template <class T1, class T2>
  1152. std::size_t operator () (std::pair<T1, T2> const &pair) const
  1153. {
  1154. std::size_t h1 = std::hash<T1>()(pair.first);
  1155. std::size_t h2 = std::hash<T2>()(pair.second);
  1156. return h1 ^ h2;
  1157. }
  1158. };
  1159. std::unordered_set<std::pair<std::string, DaliClientRole>, PairHasher> whiteList;
  1160. std::unordered_map<std::string, std::string> machineMap;
  1161. mutable CriticalSection populatedCrit;
  1162. bool populated = false;
  1163. bool enabled = true;
  1164. void populateMachineMap(IPropertyTree &environment)
  1165. {
  1166. Owned<IPropertyTreeIterator> machineIter = environment.getElements("Hardware/Computer");
  1167. ForEach(*machineIter)
  1168. {
  1169. const IPropertyTree &machine = machineIter->query();
  1170. const char *name = machine.queryProp("@name");
  1171. const char *host = machine.queryProp("@netAddress");
  1172. machineMap.insert({name, host});
  1173. }
  1174. }
  1175. const char *resolveComputer(const char *compName, const char *defaultValue, StringBuffer &result) const
  1176. {
  1177. const auto &it = machineMap.find(compName);
  1178. if (it == machineMap.end())
  1179. return defaultValue;
  1180. IpAddress ip(it->second.c_str());
  1181. if (ip.isNull())
  1182. return defaultValue;
  1183. return ip.getIpText(result);
  1184. }
  1185. void addRoles(const IPropertyTree &component, const std::vector<DaliClientRole> &roles)
  1186. {
  1187. Owned<IPropertyTreeIterator> instanceIter = component.getElements("Instance");
  1188. ForEach(*instanceIter)
  1189. {
  1190. const char *compName = instanceIter->query().queryProp("@computer");
  1191. StringBuffer ipSB;
  1192. const char *ip = resolveComputer(compName, component.queryProp("@netAddress"), ipSB);
  1193. if (ip)
  1194. {
  1195. for (auto &role: roles)
  1196. whiteList.insert({ ip, role });
  1197. }
  1198. }
  1199. }
  1200. void populate()
  1201. {
  1202. Owned<IRemoteConnection> conn = querySDS().connect("/Environment", 0, 0, INFINITE);
  1203. assertex(conn);
  1204. populateMachineMap(*conn->queryRoot());
  1205. enum SoftwareComponentType
  1206. {
  1207. RoxieCluster,
  1208. ThorCluster,
  1209. EclAgentProcess,
  1210. DfuServerProcess,
  1211. EclCCServerProcess,
  1212. EspProcess,
  1213. SashaServerProcess,
  1214. EclSchedulerProcess,
  1215. DaliServerProcess,
  1216. BackupNodeProcess,
  1217. EclServerProcess,
  1218. };
  1219. std::unordered_map<std::string, SoftwareComponentType> softwareTypeRoleMap = {
  1220. { "RoxieCluster", RoxieCluster },
  1221. { "ThorCluster", ThorCluster },
  1222. { "EclAgentProcess", EclAgentProcess },
  1223. { "DfuServerProcess", DfuServerProcess },
  1224. { "EclCCServerProcess", EclCCServerProcess },
  1225. { "EspProcess", EspProcess },
  1226. { "SashaServerProcess", SashaServerProcess },
  1227. { "EclSchedulerProcess", EclSchedulerProcess },
  1228. { "DaliServerProcess", DaliServerProcess },
  1229. { "BackupNodeProcess", BackupNodeProcess },
  1230. { "EclServerProcess", EclServerProcess },
  1231. };
  1232. Owned<IPropertyTreeIterator> softwareIter = conn->queryRoot()->getElements("Software/*");
  1233. ForEach(*softwareIter)
  1234. {
  1235. const IPropertyTree &component = softwareIter->query();
  1236. const char *compProcess = component.queryName();
  1237. const auto &it = softwareTypeRoleMap.find(compProcess);
  1238. if (it != softwareTypeRoleMap.end())
  1239. {
  1240. switch (it->second)
  1241. {
  1242. case RoxieCluster:
  1243. {
  1244. Owned<IPropertyTreeIterator> serverIter = component.getElements("RoxieServerProcess");
  1245. ForEach(*serverIter)
  1246. {
  1247. const IPropertyTree &server = serverIter->query();
  1248. const char *serverCompName = server.queryProp("@computer");
  1249. StringBuffer ipSB;
  1250. const char *ip = resolveComputer(serverCompName, server.queryProp("@netAddress"), ipSB);
  1251. if (ip)
  1252. whiteList.insert({ ip, DCR_RoxyMaster });
  1253. }
  1254. break;
  1255. }
  1256. case ThorCluster:
  1257. {
  1258. const char *masterCompName = component.queryProp("ThorMasterProcess/@computer");
  1259. StringBuffer ipSB;
  1260. const char *ip = resolveComputer(masterCompName, component.queryProp("@netAddress"), ipSB);
  1261. if (ip)
  1262. {
  1263. whiteList.insert({ ip, DCR_ThorMaster });
  1264. whiteList.insert({ ip, DCR_DaliAdmin });
  1265. }
  1266. break;
  1267. }
  1268. case EclAgentProcess:
  1269. addRoles(component, { DCR_EclAgent, DCR_AgentExec });
  1270. break;
  1271. case DfuServerProcess:
  1272. addRoles(component, { DCR_DfuServer });
  1273. break;
  1274. case EclCCServerProcess:
  1275. addRoles(component, { DCR_EclCCServer, DCR_EclCC });
  1276. break;
  1277. case EclServerProcess:
  1278. addRoles(component, { DCR_EclServer, DCR_EclCC });
  1279. break;
  1280. case EspProcess:
  1281. addRoles(component, { DCR_EspServer });
  1282. break;
  1283. case SashaServerProcess:
  1284. addRoles(component, { DCR_SashaServer, DCR_XRef });
  1285. break;
  1286. case EclSchedulerProcess:
  1287. addRoles(component, { DCR_EclScheduler });
  1288. break;
  1289. case BackupNodeProcess:
  1290. addRoles(component, { DCR_BackupGen, DCR_DaliAdmin });
  1291. break;
  1292. case DaliServerProcess:
  1293. addRoles(component, { DCR_DaliServer, DCR_DaliDiag, DCR_SwapNode, DCR_UpdateEnv, DCR_DaliAdmin, DCR_TreeView, DCR_Testing, DCR_DaFsControl, DCR_XRef, DCR_Config, DCR_ScheduleAdmin, DCR_Monitoring });
  1294. break;
  1295. }
  1296. }
  1297. }
  1298. // only ever expecting 1 DaliServerProcess and 1 WhiteList
  1299. IPropertyTree *whiteListTree = conn->queryRoot()->queryPropTree("Software/DaliServerProcess[1]/WhiteList[1]");
  1300. if (whiteListTree)
  1301. {
  1302. enabled = whiteListTree->getPropBool("@enabled", true); // on by default
  1303. Owned<IPropertyTreeIterator> whiteListIter = whiteListTree->getElements("Entry");
  1304. ForEach(*whiteListIter)
  1305. {
  1306. const IPropertyTree &entry = whiteListIter->query();
  1307. StringArray hosts, roles;
  1308. hosts.appendListUniq(entry.queryProp("@hosts"), ",");
  1309. roles.appendListUniq(entry.queryProp("@roles"), ",");
  1310. ForEachItemIn(h, hosts)
  1311. {
  1312. ForEachItemIn(r, roles)
  1313. {
  1314. const char *roleStr = roles.item(r);
  1315. const auto &it = daliClientRoleMap.find(roleStr);
  1316. if (it != daliClientRoleMap.end())
  1317. {
  1318. IpAddress ip(hosts.item(h));
  1319. if (!ip.isNull())
  1320. {
  1321. StringBuffer ipStr;
  1322. whiteList.insert({ ip.getIpText(ipStr).str(), it->second });
  1323. }
  1324. }
  1325. }
  1326. }
  1327. }
  1328. }
  1329. populated = true;
  1330. }
  1331. void ensurePopulated() const
  1332. {
  1333. // should be called within CS
  1334. if (populated)
  1335. return;
  1336. (const_cast <CWhiteListHandler *> (this))->populate();
  1337. }
  1338. public:
  1339. bool isWhiteListed(const char *ip, DaliClientRole role) const
  1340. {
  1341. CriticalBlock block(populatedCrit);
  1342. ensurePopulated();
  1343. const auto &it = whiteList.find({ip, role});
  1344. if (it != whiteList.end())
  1345. return true;
  1346. else if (enabled)
  1347. return false;
  1348. else
  1349. {
  1350. WARNLOG("Whitelist mechanism is currently disabled");
  1351. return true;
  1352. }
  1353. }
  1354. StringBuffer &getWhiteList(StringBuffer &out) const
  1355. {
  1356. CriticalBlock block(populatedCrit);
  1357. ensurePopulated();
  1358. for (auto &it: whiteList)
  1359. out.append(it.first.c_str()).append(", ").append(queryRoleName(it.second)).append("\n");
  1360. return out;
  1361. }
  1362. void refresh()
  1363. {
  1364. /* NB: clear only, so that next usage will re-populated
  1365. * Do not want to repopulate now, because refresh() is likely called within a update write transaction
  1366. */
  1367. CriticalBlock block(populatedCrit);
  1368. enabled = true;
  1369. whiteList.clear();
  1370. machineMap.clear();
  1371. populated = false;
  1372. }
  1373. };
  1374. class CCovenSessionManager: public CSessionManagerBase, implements ISessionManagerServer, implements ISubscriptionManager
  1375. {
  1376. CSessionRequestServer sessionrequestserver;
  1377. CSessionStateTable sessionstates;
  1378. CMapProcessToSession processlookup;
  1379. Owned<IDaliLdapConnection> ldapconn;
  1380. Owned<CLdapWorkItem> ldapworker;
  1381. Semaphore ldapsig;
  1382. atomic_t ldapwaiting;
  1383. Semaphore workthreadsem;
  1384. bool stopping;
  1385. CWhiteListHandler whiteListHandler;
  1386. void remoteAddProcessSession(rank_t dst,SessionId id,INode *node, DaliClientRole role)
  1387. {
  1388. CMessageBuffer mb;
  1389. mb.append((int)MSR_SECONDARY_REGISTER_PROCESS_SESSION).append(id);
  1390. node->serialize(mb);
  1391. int r = (int)role;
  1392. mb.append(role);
  1393. queryCoven().sendRecv(mb,dst,MPTAG_DALI_SESSION_REQUEST);
  1394. // no fail currently
  1395. }
  1396. void remoteAddSession(rank_t dst,SessionId id)
  1397. {
  1398. CMessageBuffer mb;
  1399. mb.append((int)MSR_SECONDARY_REGISTER_SESSION).append(id);
  1400. queryCoven().sendRecv(mb,dst,MPTAG_DALI_SESSION_REQUEST);
  1401. // no fail currently
  1402. }
  1403. public:
  1404. IMPLEMENT_IINTERFACE;
  1405. CCovenSessionManager()
  1406. : sessionrequestserver(*this)
  1407. {
  1408. mySessionId = queryCoven().getUniqueId(); // tell others in coven TBD
  1409. registerSubscriptionManager(SESSION_PUBLISHER,this);
  1410. atomic_set(&ldapwaiting,0);
  1411. workthreadsem.signal(10);
  1412. stopping = false;
  1413. ldapsig.signal();
  1414. }
  1415. ~CCovenSessionManager()
  1416. {
  1417. stubTable.kill();
  1418. }
  1419. void start()
  1420. {
  1421. sessionrequestserver.start();
  1422. }
  1423. void stop()
  1424. {
  1425. stopping = true;
  1426. if (!ldapsig.wait(60*1000))
  1427. OWARNLOG("LDAP stalled(1)");
  1428. if (ldapworker) {
  1429. ldapworker->stop();
  1430. if (!ldapworker->join(1000))
  1431. OWARNLOG("LDAP stalled(2)");
  1432. ldapworker.clear();
  1433. }
  1434. ldapconn.clear();
  1435. removeMPConnectionMonitor(this);
  1436. sessionrequestserver.stop();
  1437. }
  1438. void ready()
  1439. {
  1440. addMPConnectionMonitor(this);
  1441. sessionrequestserver.ready();
  1442. }
  1443. void setLDAPconnection(IDaliLdapConnection *_ldapconn)
  1444. {
  1445. if (_ldapconn&&(_ldapconn->getLDAPflags()&DLF_ENABLED))
  1446. ldapconn.setown(_ldapconn);
  1447. else {
  1448. ldapconn.clear();
  1449. ::Release(_ldapconn);
  1450. }
  1451. }
  1452. void setClientAuth(IDaliClientAuthConnection *_authconn)
  1453. {
  1454. }
  1455. void addProcessSession(SessionId id,INode *client,DaliClientRole role)
  1456. {
  1457. StringBuffer str;
  1458. PROGLOG("Session starting %" I64F "x (%s) : role=%s",id,client->endpoint().getUrlStr(str).str(),queryRoleName(role));
  1459. CHECKEDCRITICALBLOCK(sessmanagersect,60000);
  1460. CProcessSessionState *s = new CProcessSessionState(id,client,role);
  1461. while (!sessionstates.add(s)) // takes ownership
  1462. {
  1463. OWARNLOG("Dali session manager: session already registered");
  1464. sessionstates.remove(id);
  1465. }
  1466. while (!processlookup.add(s))
  1467. {
  1468. /* There's existing ip:port match (client) in process table..
  1469. * Old may be in process of closing or about to, but new has beaten the onClose() to it..
  1470. * Track old sessions in new CProcessSessionState instance, so that in-process or upcoming onClose()/stopSession() can find them
  1471. */
  1472. CProcessSessionState *previousState = processlookup.query(client);
  1473. dbgassertex(previousState); // Must be there, it's reason add() failed
  1474. SessionId oldSessionId = previousState->getId();
  1475. s->addSessionIds(*previousState, false); // merges sessions from previous process state into new one that replaces it
  1476. OWARNLOG("Dali session manager: registerClient process session already registered, old (%" I64F "x) replaced", oldSessionId);
  1477. processlookup.remove(previousState, this);
  1478. }
  1479. }
  1480. void addSession(SessionId id)
  1481. {
  1482. CHECKEDCRITICALBLOCK(sessmanagersect,60000);
  1483. CSessionState *s = new CSessionState(id);
  1484. while (!sessionstates.add(s)) { // takes ownership
  1485. OWARNLOG("Dali session manager: session already registered (2)");
  1486. sessionstates.remove(id);
  1487. }
  1488. }
  1489. SessionId registerSession(SecurityToken, SessionId parent)
  1490. {
  1491. // this is where security token should be checked
  1492. // not in critical block (otherwise possibility of deadlock)
  1493. ICoven &coven=queryCoven();
  1494. SessionId id = coven.getUniqueId();
  1495. rank_t myrank = coven.getServerRank();
  1496. rank_t ownerrank = coven.chooseServer(id);
  1497. // first do local
  1498. if (myrank==ownerrank)
  1499. addSession(id);
  1500. else
  1501. remoteAddSession(ownerrank,id);
  1502. ForEachOtherNodeInGroup(r,coven.queryGroup()) {
  1503. if (r!=ownerrank)
  1504. remoteAddSession(r,id);
  1505. }
  1506. return id;
  1507. }
  1508. SessionId registerClientProcess(INode *client, IGroup *& retcoven, DaliClientRole role)
  1509. {
  1510. // not in critical block (otherwise possibility of deadlock)
  1511. retcoven = NULL;
  1512. ICoven &coven=queryCoven();
  1513. SessionId id = coven.getUniqueId();
  1514. rank_t myrank = coven.getServerRank();
  1515. rank_t ownerrank = coven.chooseServer(id);
  1516. // first do local
  1517. if (myrank==ownerrank)
  1518. addProcessSession(id,client,role);
  1519. else
  1520. remoteAddProcessSession(ownerrank,id,client,role);
  1521. ForEachOtherNodeInGroup(r,coven.queryGroup()) {
  1522. if (r!=ownerrank)
  1523. remoteAddProcessSession(r,id,client,role);
  1524. }
  1525. retcoven = coven.getGroup();
  1526. return id;
  1527. }
  1528. SessionId lookupProcessSession(INode *node)
  1529. {
  1530. if (!node)
  1531. return mySessionId;
  1532. CHECKEDCRITICALBLOCK(sessmanagersect,60000);
  1533. CProcessSessionState *s= processlookup.query(node);
  1534. return s?s->getId():0;
  1535. }
  1536. INode *getProcessSessionNode(SessionId id)
  1537. {
  1538. StringBuffer eps;
  1539. if (SessionManager->getClientProcessEndpoint(id,eps).length()!=0)
  1540. return createINode(eps.str());
  1541. return NULL;
  1542. }
  1543. //ISessionManagerServer
  1544. //Dali method to handle permission request
  1545. virtual SecAccessFlags getPermissionsLDAP(const char *key,const char *obj,IUserDescriptor *udesc,unsigned flags, int *err)
  1546. {
  1547. if (err)
  1548. *err = 0;
  1549. if (!ldapconn)
  1550. return SecAccess_Unavailable;
  1551. #ifdef _NO_LDAP
  1552. return SecAccess_Unavailable;
  1553. #else
  1554. #ifdef NULL_DALIUSER_STACKTRACE
  1555. StringBuffer sb;
  1556. if (udesc)
  1557. udesc->getUserName(sb);
  1558. if (sb.length()==0)
  1559. {
  1560. DBGLOG("UNEXPECTED USER (NULL) in dasess.cpp CCovenSessionManager::getPermissionsLDAP() line %d",__LINE__);
  1561. PrintStackReport();
  1562. }
  1563. #endif
  1564. if ((ldapconn->getLDAPflags()&(DLF_SAFE|DLF_ENABLED))!=(DLF_SAFE|DLF_ENABLED))
  1565. return ldapconn->getPermissions(key,obj,udesc,flags);
  1566. atomic_inc(&ldapwaiting);
  1567. unsigned retries = 0;
  1568. while (!stopping) {
  1569. if (ldapsig.wait(1000)) {
  1570. atomic_dec(&ldapwaiting);
  1571. if (!ldapworker)
  1572. ldapworker.setown(CLdapWorkItem::get(ldapconn,workthreadsem));
  1573. if (ldapworker) {
  1574. ldapworker->start(key,obj,udesc,flags);
  1575. for (unsigned i=0;i<10;i++) {
  1576. if (i)
  1577. OWARNLOG("LDAP stalled(%d) - retrying",i);
  1578. SecAccessFlags ret;
  1579. if (ldapworker->wait(1000*20,(int&)ret)) {
  1580. if (ret==CLDAPE_ldapfailure) {
  1581. OERRLOG("LDAP - failure (returning no access for %s)",obj);
  1582. ldapsig.signal();
  1583. if (err)
  1584. *err = CLDAPE_ldapfailure;
  1585. return SecAccess_None;
  1586. }
  1587. else {
  1588. ldapsig.signal();
  1589. return ret;
  1590. }
  1591. }
  1592. if (atomic_read(&ldapwaiting)>10) // give up quicker if piling up
  1593. break;
  1594. if (i==5) { // one retry
  1595. ldapworker->stop(); // abandon thread
  1596. ldapworker.clear();
  1597. ldapworker.setown(CLdapWorkItem::get(ldapconn,workthreadsem));
  1598. if (ldapworker)
  1599. ldapworker->start(key,obj,udesc,flags);
  1600. }
  1601. }
  1602. if (ldapworker)
  1603. ldapworker->stop();
  1604. ldapworker.clear(); // abandon thread
  1605. }
  1606. OERRLOG("LDAP stalled - aborting (returning no access for %s)",obj);
  1607. ldapsig.signal();
  1608. if (err)
  1609. *err = CLDAPE_getpermtimeout;
  1610. return SecAccess_None;
  1611. }
  1612. else {
  1613. unsigned waiting = atomic_read(&ldapwaiting);
  1614. static unsigned last=0;
  1615. static unsigned lasttick=0;
  1616. static unsigned first50=0;
  1617. if ((waiting!=last)&&(msTick()-lasttick>1000)) {
  1618. OWARNLOG("%d threads waiting for ldap",waiting);
  1619. last = waiting;
  1620. lasttick = msTick();
  1621. }
  1622. if (waiting>50) {
  1623. if (first50==0)
  1624. first50 = msTick();
  1625. else if (msTick()-first50>60*1000) {
  1626. OERRLOG("LDAP stalled - aborting (returning 0 for %s)", obj);
  1627. if (err)
  1628. *err = CLDAPE_getpermtimeout;
  1629. break;
  1630. }
  1631. }
  1632. else
  1633. first50 = 0;
  1634. }
  1635. }
  1636. atomic_dec(&ldapwaiting);
  1637. return SecAccess_None;
  1638. #endif
  1639. }
  1640. virtual bool clearPermissionsCache(IUserDescriptor *udesc)
  1641. {
  1642. #ifdef _NO_LDAP
  1643. bool ok = true;
  1644. #else
  1645. bool ok = true;
  1646. if (ldapconn && ldapconn->getLDAPflags() & DLF_ENABLED)
  1647. ok = ldapconn->clearPermissionsCache(udesc);
  1648. #endif
  1649. return ok;
  1650. }
  1651. virtual bool queryScopeScansEnabled(IUserDescriptor *udesc, int * err, StringBuffer &retMsg)
  1652. {
  1653. #ifdef _NO_LDAP
  1654. *err = -1;
  1655. retMsg.append("LDAP not enabled");
  1656. return false;
  1657. #else
  1658. *err = 0;
  1659. return checkScopeScansLDAP();
  1660. #endif
  1661. }
  1662. virtual bool enableScopeScans(IUserDescriptor *udesc, bool enable, int * err, StringBuffer &retMsg)
  1663. {
  1664. #ifdef _NO_LDAP
  1665. *err = -1;
  1666. retMsg.append("LDAP not supporteded");
  1667. return false;
  1668. #else
  1669. if (!ldapconn)
  1670. {
  1671. *err = -1;
  1672. retMsg.append("LDAP not connected");
  1673. return false;
  1674. }
  1675. return ldapconn->enableScopeScans(udesc, enable, err);
  1676. #endif
  1677. }
  1678. virtual bool checkScopeScansLDAP()
  1679. {
  1680. #ifdef _NO_LDAP
  1681. return false;
  1682. #else
  1683. return ldapconn?ldapconn->checkScopeScans():false;
  1684. #endif
  1685. }
  1686. virtual unsigned getLDAPflags()
  1687. {
  1688. #ifdef _NO_LDAP
  1689. return 0;
  1690. #else
  1691. return ldapconn?ldapconn->getLDAPflags():0;
  1692. #endif
  1693. }
  1694. void setLDAPflags(unsigned flags)
  1695. {
  1696. #ifndef _NO_LDAP
  1697. if (ldapconn)
  1698. ldapconn->setLDAPflags(flags);
  1699. #endif
  1700. }
  1701. bool authorizeConnection(const INode *client, DaliClientRole role)
  1702. {
  1703. StringBuffer ipStr;
  1704. client->endpoint().getIpText(ipStr);
  1705. return whiteListHandler.isWhiteListed(ipStr, role);
  1706. }
  1707. SessionId startSession(SecurityToken tok, SessionId parentid)
  1708. {
  1709. return registerSession(tok,parentid);
  1710. }
  1711. void setSessionDependancy(SessionId id , SessionId dependsonid )
  1712. {
  1713. UNIMPLEMENTED; // TBD
  1714. }
  1715. void clearSessionDependancy(SessionId id , SessionId dependsonid )
  1716. {
  1717. UNIMPLEMENTED; // TBD
  1718. }
  1719. protected:
  1720. class CSessionSubscriptionStub: public CInterface
  1721. {
  1722. ISubscription *subs;
  1723. SubscriptionId id;
  1724. SessionId sessid;
  1725. public:
  1726. CSessionSubscriptionStub(ISubscription *_subs,SubscriptionId _id) // takes ownership
  1727. {
  1728. subs = _subs;
  1729. id = _id;
  1730. const MemoryAttr &ma = subs->queryData();
  1731. MemoryBuffer mb(ma.length(),ma.get());
  1732. mb.read(sessid);
  1733. }
  1734. ~CSessionSubscriptionStub()
  1735. {
  1736. subs->Release();
  1737. }
  1738. SubscriptionId getId() { return id; }
  1739. SessionId getSessionId() { return sessid; }
  1740. void notify(bool abort)
  1741. {
  1742. MemoryBuffer mb;
  1743. mb.append(abort);
  1744. subs->notify(mb);
  1745. }
  1746. const void *queryFindParam() const { return &id; }
  1747. };
  1748. OwningSimpleHashTableOf<CSessionSubscriptionStub, SubscriptionId> stubTable;
  1749. void add(ISubscription *subs,SubscriptionId id)
  1750. {
  1751. CSessionSubscriptionStub *nstub;
  1752. {
  1753. CHECKEDCRITICALBLOCK(sessmanagersect,60000);
  1754. nstub = new CSessionSubscriptionStub(subs,id);
  1755. if (sessionstates.query(nstub->getSessionId())||(nstub->getSessionId()==mySessionId))
  1756. {
  1757. stubTable.replace(*nstub);
  1758. return;
  1759. }
  1760. }
  1761. // see if session known
  1762. MemoryBuffer mb;
  1763. bool abort=true;
  1764. mb.append(abort);
  1765. OWARNLOG("Session Manager - adding unknown session ID %" I64F "x", nstub->getSessionId());
  1766. subs->notify(mb);
  1767. delete nstub;
  1768. return;
  1769. }
  1770. void remove(SubscriptionId id)
  1771. {
  1772. CHECKEDCRITICALBLOCK(sessmanagersect,60000);
  1773. stubTable.remove(&id);
  1774. }
  1775. void stopSession(SessionId id, bool abort)
  1776. {
  1777. PROGLOG("Session stopping %" I64F "x %s",id,abort?"aborted":"ok");
  1778. CHECKEDCRITICALBLOCK(sessmanagersect,60000);
  1779. // do in multiple stages as may remove one or more sub sessions
  1780. for (;;)
  1781. {
  1782. CIArrayOf<CSessionSubscriptionStub> tonotify;
  1783. SuperHashIteratorOf<CSessionSubscriptionStub> iter(stubTable);
  1784. ForEach(iter)
  1785. {
  1786. CSessionSubscriptionStub &stub = iter.query();
  1787. if (stub.getSessionId()==id)
  1788. tonotify.append(*LINK(&stub));
  1789. }
  1790. if (tonotify.ordinality()==0)
  1791. break;
  1792. ForEachItemIn(j,tonotify)
  1793. {
  1794. CSessionSubscriptionStub &stub = tonotify.item(j);
  1795. stubTable.removeExact(&stub);
  1796. }
  1797. CHECKEDCRITICALUNBLOCK(sessmanagersect,60000);
  1798. ForEachItemIn(j2,tonotify)
  1799. {
  1800. CSessionSubscriptionStub &stub = tonotify.item(j2);
  1801. try { stub.notify(abort); }
  1802. catch (IException *e) { e->Release(); } // subscriber session may abort during stopSession
  1803. }
  1804. tonotify.kill(); // clear whilst sessmanagersect unblocked, as subs may query session manager.
  1805. }
  1806. const CSessionState *state = sessionstates.query(id);
  1807. if (state)
  1808. {
  1809. const CProcessSessionState *pState = QUERYINTERFACE(state, const CProcessSessionState);
  1810. if (pState)
  1811. {
  1812. CProcessSessionState *cState = processlookup.query(&pState->queryNode()); // get current
  1813. if (pState == cState) // so is current one.
  1814. {
  1815. /* This is reinstating a previous CProcessSessionState for this node (if there is one),
  1816. * that has not yet stopped, and adding any other pending process states to the CProcessSessionState
  1817. * being reinstated.
  1818. */
  1819. SessionId prevId = cState->dequeuePreviousSessionId();
  1820. if (prevId)
  1821. {
  1822. PROGLOG("Session (%" I64F "x) in stopSession, detected %d pending previous states, reinstating session (%" I64F "x) as current", id, cState->previousSessionIdCount(), prevId);
  1823. CSessionState *prevSessionState = sessionstates.query(prevId);
  1824. dbgassertex(prevSessionState); // must be there
  1825. CProcessSessionState *prevProcessState = QUERYINTERFACE(prevSessionState, CProcessSessionState);
  1826. dbgassertex(prevProcessState);
  1827. /* NB: prevProcessState's have 0 entries in their previousSessionIds, since they were merged at replacement time
  1828. * in addProcessSession()
  1829. */
  1830. /* add in any remaining to-be-stopped process sessions from current that's stopping into this previous state
  1831. * that's being reinstated, so will be picked up on next onClose()/stopSession()
  1832. */
  1833. prevProcessState->addSessionIds(*cState, true);
  1834. processlookup.replace(prevProcessState);
  1835. }
  1836. else
  1837. processlookup.remove(pState, this);
  1838. }
  1839. else // Here because in stopSession for an previous process state, that has been replaced (in addProcessSession)
  1840. {
  1841. if (processlookup.remove(pState, this))
  1842. {
  1843. // Don't think possible to be here, if not current must have replaced afaics
  1844. PROGLOG("Session (%" I64F "x) in stopSession, old process session removed", id);
  1845. }
  1846. else
  1847. PROGLOG("Session (%" I64F "x) in stopSession, old process session was already removed", id); // because replaced
  1848. if (cState)
  1849. {
  1850. PROGLOG("Session (%" I64F "x) was replaced, ensuring removed from new process state", id);
  1851. cState->removeOldSessionId(id); // If already replaced, then must ensure no longer tracked by new
  1852. }
  1853. }
  1854. }
  1855. sessionstates.remove(id);
  1856. }
  1857. }
  1858. virtual void refreshWhiteList() override
  1859. {
  1860. whiteListHandler.refresh();
  1861. }
  1862. virtual StringBuffer &getWhiteList(StringBuffer &out) const override
  1863. {
  1864. return whiteListHandler.getWhiteList(out);
  1865. }
  1866. void onClose(SocketEndpoint &ep)
  1867. {
  1868. StringBuffer clientStr;
  1869. PROGLOG("Client closed (%s)", ep.getUrlStr(clientStr).str());
  1870. SessionId idtostop;
  1871. {
  1872. CHECKEDCRITICALBLOCK(sessmanagersect,60000);
  1873. Owned<INode> node = createINode(ep);
  1874. if (queryCoven().inCoven(node))
  1875. {
  1876. PROGLOG("Coven Session Stopping (%s)", clientStr.str());
  1877. // more TBD here
  1878. return;
  1879. }
  1880. CProcessSessionState *s = processlookup.query(node);
  1881. if (!s)
  1882. return;
  1883. idtostop = s->dequeuePreviousSessionId();
  1884. if (idtostop)
  1885. {
  1886. PROGLOG("Previous sessionId (%" I64F "x) for %s was replaced by (%" I64F "x), stopping old session now", idtostop, clientStr.str(), s->getId());
  1887. unsigned c = s->previousSessionIdCount();
  1888. if (c) // very unlikely, but could be >1, trace for info.
  1889. PROGLOG("%d old sessions pending closure", c);
  1890. }
  1891. else
  1892. idtostop = s->getId();
  1893. }
  1894. stopSession(idtostop,true);
  1895. }
  1896. StringBuffer &getClientProcessList(StringBuffer &buf)
  1897. {
  1898. CHECKEDCRITICALBLOCK(sessmanagersect,60000);
  1899. unsigned n = processlookup.count();
  1900. CProcessSessionState *s=NULL;
  1901. for (unsigned i=0;i<n;i++) {
  1902. s=processlookup.next(s);
  1903. if (!s)
  1904. break;
  1905. s->getDetails(buf).append('\n');
  1906. }
  1907. return buf;
  1908. }
  1909. StringBuffer &getClientProcessEndpoint(SessionId id,StringBuffer &buf)
  1910. {
  1911. CHECKEDCRITICALBLOCK(sessmanagersect,60000);
  1912. const CSessionState *state = sessionstates.query(id);
  1913. if (state) {
  1914. const CProcessSessionState *pstate = QUERYINTERFACE(state,const CProcessSessionState);
  1915. if (pstate)
  1916. return pstate->queryNode().endpoint().getUrlStr(buf);
  1917. }
  1918. return buf;
  1919. }
  1920. unsigned queryClientCount()
  1921. {
  1922. CHECKEDCRITICALBLOCK(sessmanagersect,60000);
  1923. return processlookup.count();
  1924. }
  1925. };
  1926. ISessionManager &querySessionManager()
  1927. {
  1928. CriticalBlock block(sessionCrit);
  1929. if (!SessionManager) {
  1930. assertex(!isCovenActive()||!queryCoven().inCoven()); // Check not Coven server (if occurs - not initialized correctly;
  1931. // If !coven someone is checking for dali so allow
  1932. SessionManager = new CClientSessionManager();
  1933. }
  1934. return *SessionManager;
  1935. }
  1936. class CDaliSessionServer: public IDaliServer, public CInterface
  1937. {
  1938. public:
  1939. IMPLEMENT_IINTERFACE;
  1940. CDaliSessionServer()
  1941. {
  1942. }
  1943. void start()
  1944. {
  1945. CriticalBlock block(sessionCrit);
  1946. assertex(queryCoven().inCoven()); // must be member of coven
  1947. CCovenSessionManager *serv = new CCovenSessionManager();
  1948. SessionManagerServer = serv;
  1949. SessionManager = serv;
  1950. SessionManagerServer->start();
  1951. }
  1952. void suspend()
  1953. {
  1954. }
  1955. void stop()
  1956. {
  1957. CriticalBlock block(sessionCrit);
  1958. SessionManagerServer->stop();
  1959. SessionManagerServer->Release();
  1960. SessionManagerServer = NULL;
  1961. SessionManager = NULL;
  1962. }
  1963. void ready()
  1964. {
  1965. SessionManagerServer->ready();
  1966. }
  1967. void nodeDown(rank_t rank)
  1968. {
  1969. throwUnexpectedX("TBD");
  1970. }
  1971. };
  1972. IDaliServer *createDaliSessionServer()
  1973. {
  1974. return new CDaliSessionServer();
  1975. }
  1976. void setLDAPconnection(IDaliLdapConnection *ldapconn)
  1977. {
  1978. if (SessionManagerServer)
  1979. SessionManagerServer->setLDAPconnection(ldapconn);
  1980. }
  1981. void setClientAuth(IDaliClientAuthConnection *authconn)
  1982. {
  1983. if (SessionManagerServer)
  1984. SessionManagerServer->setClientAuth(authconn);
  1985. }
  1986. #define REG_SLEEP 5000
  1987. bool registerClientProcess(ICommunicator *comm, IGroup *& retcoven,unsigned timeout,DaliClientRole role)
  1988. {
  1989. // NB doesn't use coven as not yet set up
  1990. if (comm->queryGroup().ordinality()==0)
  1991. return false;
  1992. CTimeMon tm(timeout);
  1993. retcoven = NULL;
  1994. unsigned nextLog = 0, lastNextLog = 0;
  1995. unsigned t=REG_SLEEP;
  1996. unsigned remaining;
  1997. rank_t r;
  1998. while (!tm.timedout(&remaining)) {
  1999. if (remaining>t)
  2000. remaining = t;
  2001. r = getRandom()%comm->queryGroup().ordinality();
  2002. bool ok = false;
  2003. if (remaining>10000) // MP protocol has a minimum time of 10s so if remaining < 10s use a detachable thread
  2004. ok = comm->verifyConnection(r,remaining);
  2005. else {
  2006. struct cThread: public Thread
  2007. {
  2008. Semaphore sem;
  2009. Linked<ICommunicator> comm;
  2010. bool ok;
  2011. Owned<IException> exc;
  2012. unsigned remaining;
  2013. rank_t r;
  2014. cThread(ICommunicator *_comm,rank_t _r,unsigned _remaining)
  2015. : Thread ("dasess.registerClientProcess"), comm(_comm)
  2016. {
  2017. r = _r;
  2018. remaining = _remaining;
  2019. ok = false;
  2020. }
  2021. int run()
  2022. {
  2023. try {
  2024. if (comm->verifyConnection(r,remaining))
  2025. ok = true;
  2026. }
  2027. catch (IException *e)
  2028. {
  2029. exc.setown(e);
  2030. }
  2031. sem.signal();
  2032. return 0;
  2033. }
  2034. } *t = new cThread(comm,r,remaining);
  2035. t->start();
  2036. t->sem.wait(remaining);
  2037. ok = t->ok;
  2038. if (t->exc.get()) {
  2039. IException *e = t->exc.getClear();
  2040. t->Release();
  2041. throw e;
  2042. }
  2043. t->Release();
  2044. }
  2045. if (ok) {
  2046. CMessageBuffer mb;
  2047. mb.append((int)MSR_REGISTER_PROCESS_SESSION);
  2048. queryMyNode()->serialize(mb);
  2049. comm->queryGroup().queryNode(r).serialize(mb);
  2050. mb.append((int)role);
  2051. if (comm->sendRecv(mb,r,MPTAG_DALI_SESSION_REQUEST,SESSIONREPLYTIMEOUT)) {
  2052. if (!mb.length())
  2053. {
  2054. // failed system capability match,
  2055. retcoven = comm->getGroup(); // continue as if - will fail later more obscurely.
  2056. return true;
  2057. }
  2058. mb.read(mySessionId);
  2059. retcoven = deserializeIGroup(mb);
  2060. if (!mySessionId)
  2061. throw deserializeException(mb);
  2062. return true;
  2063. }
  2064. }
  2065. StringBuffer str;
  2066. OERRLOG("Failed to connect to Dali Server %s.", comm->queryGroup().queryNode(r).endpoint().getUrlStr(str).str());
  2067. if (tm.timedout())
  2068. {
  2069. PROGLOG("%s", str.append(" Timed out.").str());
  2070. break;
  2071. }
  2072. else if (0 == nextLog)
  2073. {
  2074. PROGLOG("%s", str.append(" Retrying...").str());
  2075. if ((lastNextLog * REG_SLEEP) >= 60000) // limit to a minute between logging
  2076. nextLog = 60000 / REG_SLEEP;
  2077. else
  2078. nextLog = lastNextLog + 2; // wait +2 REG_SLEEP pauses before next logging
  2079. lastNextLog = nextLog;
  2080. }
  2081. else
  2082. --nextLog;
  2083. Sleep(REG_SLEEP);
  2084. }
  2085. return false;
  2086. }
  2087. extern void stopClientProcess()
  2088. {
  2089. CriticalBlock block(sessionCrit);
  2090. if (mySessionId&&SessionManager) {
  2091. try {
  2092. querySessionManager().stopSession(mySessionId,false);
  2093. }
  2094. catch (IDaliClient_Exception *e) {
  2095. if (e->errorCode()!=DCERR_server_closed)
  2096. throw;
  2097. e->Release();
  2098. }
  2099. mySessionId = 0;
  2100. }
  2101. }
  2102. class CProcessSessionWatchdog
  2103. {
  2104. };
  2105. class CUserDescriptor: implements IUserDescriptor, public CInterface
  2106. {
  2107. StringAttr username;
  2108. StringAttr passwordenc;
  2109. StringBuffer signature;//user's digital Signature
  2110. public:
  2111. IMPLEMENT_IINTERFACE;
  2112. CUserDescriptor()
  2113. {
  2114. }
  2115. StringBuffer &getUserName(StringBuffer &buf)
  2116. {
  2117. return buf.append(username);
  2118. }
  2119. StringBuffer &getPassword(StringBuffer &buf)
  2120. {
  2121. decrypt(buf,passwordenc);
  2122. return buf;
  2123. }
  2124. const char *querySignature()
  2125. {
  2126. return signature.str();
  2127. }
  2128. virtual void set(const char *name,const char *password)
  2129. {
  2130. username.set(name);
  2131. StringBuffer buf;
  2132. encrypt(buf,password);
  2133. passwordenc.set(buf.str());
  2134. }
  2135. void set(const char *_name, const char *_password, const char *_signature)
  2136. {
  2137. set(_name, _password);
  2138. signature.clear().append(_signature);
  2139. }
  2140. virtual void clear()
  2141. {
  2142. username.clear();
  2143. passwordenc.clear();
  2144. signature.clear();
  2145. }
  2146. void serializeWithoutPassword(MemoryBuffer &mb)
  2147. {
  2148. StringBuffer emptyPwd;
  2149. mb.append(username).append(emptyPwd);
  2150. }
  2151. void serialize(MemoryBuffer &mb)
  2152. {
  2153. mb.append(username).append(passwordenc);
  2154. }
  2155. void deserialize(MemoryBuffer &mb)
  2156. {
  2157. mb.read(username).read(passwordenc);
  2158. }
  2159. };
  2160. IUserDescriptor *createUserDescriptor()
  2161. {
  2162. return new CUserDescriptor;
  2163. }
  2164. IUserDescriptor *createUserDescriptor(MemoryBuffer &mb)
  2165. {
  2166. IUserDescriptor * udesc = createUserDescriptor();
  2167. udesc->deserialize(mb);
  2168. return udesc;
  2169. }
  2170. MODULE_INIT(INIT_PRIORITY_DALI_DASESS)
  2171. {
  2172. return true;
  2173. }
  2174. MODULE_EXIT()
  2175. {
  2176. ::Release(SessionManager);
  2177. SessionManager = NULL;
  2178. }