dalock.cpp 16 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633
  1. /*##############################################################################
  2. HPCC SYSTEMS software Copyright (C) 2012 HPCC Systems®.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. ############################################################################## */
  13. #define da_decl DECL_EXPORT
  14. #include "platform.h"
  15. #include "jlib.hpp"
  16. #include "jsuperhash.hpp"
  17. #include "jmisc.hpp"
  18. static CBuildVersion _bv("$Name$ $Id: dalock.cpp 62376 2011-02-04 21:59:58Z sort $");
  19. #include "mpbuff.hpp"
  20. #include "mpcomm.hpp"
  21. #include "mputil.hpp"
  22. #include "dacoven.hpp"
  23. #include "daserver.hpp"
  24. #include "dalock.hpp"
  25. interface IDistributedLockManager
  26. {
  27. virtual ~IDistributedLockManager() { }
  28. virtual DistributedLockId createDistributedLockId()=0;
  29. virtual void releaseDistributedLockId(DistributedLockId id)=0;
  30. virtual bool localLock(DistributedLockId id,SessionId owner,bool exclusive=true,long timeout=-1) { return false; }
  31. virtual void localUnlock(DistributedLockId id,SessionId owner) {}
  32. virtual bool lock(DistributedLockId id,SessionId owner,bool exclusive=true,long timeout=-1) = 0;
  33. virtual void unlock(DistributedLockId id,SessionId owner) = 0;
  34. virtual void start() {} ;
  35. virtual void stop() {} ;
  36. };
  37. static IDistributedLockManager *DistributedLockManager=NULL;
  38. #define LOCKREPLYTIMEOUT (3*60*1000)
  39. class CLockState: public CInterface
  40. {
  41. public:
  42. Int64Array owners;
  43. Semaphore sem;
  44. CriticalSection sect;
  45. unsigned short waiting;
  46. unsigned exclusivenest;
  47. DistributedLockId id;
  48. CLockState(DistributedLockId _id)
  49. {
  50. id = _id;
  51. waiting = 0;
  52. exclusivenest = 0;
  53. }
  54. bool lock(SessionId owner,bool exclusive, unsigned timeout)
  55. {
  56. CTimeMon tm(timeout);
  57. sect.enter();
  58. loop {
  59. unsigned num = owners.ordinality();
  60. if (exclusive) {
  61. if (num==0) {
  62. owners.append(owner);
  63. exclusivenest = 1;
  64. break;
  65. }
  66. else if (exclusivenest && (owners.item(0)==owner)) {
  67. exclusivenest++;
  68. break;
  69. }
  70. }
  71. else if (!exclusivenest) {
  72. owners.append(owner);
  73. break;
  74. }
  75. waiting++;
  76. sect.leave();
  77. unsigned remaining;
  78. if (tm.timedout(&remaining)||!sem.wait(remaining)) {
  79. sect.enter();
  80. if (!sem.wait(0)) {
  81. waiting--;
  82. sect.leave();
  83. return false;
  84. }
  85. }
  86. else
  87. sect.enter();
  88. }
  89. sect.leave();
  90. return true;
  91. }
  92. void unlock(SessionId owner)
  93. {
  94. sect.enter();
  95. if (exclusivenest) {
  96. exclusivenest--;
  97. if (exclusivenest) { // still locked
  98. assertex(owners.item(0)==owner);
  99. sect.leave();
  100. return;
  101. }
  102. }
  103. verifyex(owners.zap(owner));
  104. if (owners.ordinality()==0) {
  105. exclusivenest = 0;
  106. if (waiting) {
  107. sem.signal(waiting);
  108. waiting = 0;
  109. }
  110. }
  111. else {
  112. assertex(!exclusivenest);
  113. }
  114. sect.leave();
  115. }
  116. };
  117. class CLockStateTable: private SuperHashTableOf<CLockState,DistributedLockId>
  118. {
  119. CriticalSection sect;
  120. void onAdd(void *)
  121. {
  122. // not used
  123. }
  124. void onRemove(void *e)
  125. {
  126. CLockState &elem=*(CLockState *)e;
  127. elem.Release();
  128. }
  129. unsigned getHashFromElement(const void *e) const
  130. {
  131. const CLockState &elem=*(const CLockState *)e;
  132. DistributedLockId id=elem.id;
  133. return low(id)^(unsigned)high(id);
  134. }
  135. unsigned getHashFromFindParam(const void *fp) const
  136. {
  137. DistributedLockId id = *(const DistributedLockId *)fp;
  138. return low(id)^(unsigned)high(id);
  139. }
  140. const void * getFindParam(const void *p) const
  141. {
  142. const CLockState &elem=*(const CLockState *)p;
  143. return &elem.id;
  144. }
  145. bool matchesFindParam(const void * et, const void *fp, unsigned) const
  146. {
  147. return ((CLockState *)et)->id==*(DistributedLockId *)fp;
  148. }
  149. IMPLEMENT_SUPERHASHTABLEOF_REF_FIND(CLockState,DistributedLockId);
  150. public:
  151. CLockStateTable()
  152. {
  153. }
  154. ~CLockStateTable()
  155. {
  156. _releaseAll();
  157. }
  158. bool lock(DistributedLockId id,SessionId owner,bool excl,unsigned timeout)
  159. {
  160. CLockState * s;
  161. {
  162. CriticalBlock block(sect);
  163. s = find(id);
  164. if (!s) {
  165. s = new CLockState(id);
  166. add(*s);
  167. }
  168. }
  169. return s->lock(owner,excl,timeout);
  170. }
  171. void unlock(DistributedLockId id,SessionId owner)
  172. {
  173. CLockState * s;
  174. {
  175. CriticalBlock block(sect);
  176. s = find(id);
  177. assertex(s);
  178. }
  179. s->unlock(owner);
  180. }
  181. };
  182. enum MLockRequestKind {
  183. MLR_ALLOC_LOCK_ID,
  184. MLR_FREE_LOCK_ID,
  185. MLR_PRIMARY_LOCK_REQUEST,
  186. MLR_SECONDARY_LOCK_REQUEST,
  187. MLR_PRIMARY_UNLOCK_REQUEST,
  188. MLR_SECONDARY_UNLOCK_REQUEST,
  189. MLR_EXIT // TBD
  190. };
  191. class CLockRequestServer: public Thread
  192. {
  193. bool stopped;
  194. IDistributedLockManager &manager;
  195. public:
  196. CLockRequestServer(IDistributedLockManager &_manager)
  197. : Thread("Lock Manager, CLockRequestServer"), manager(_manager)
  198. {
  199. stopped = true;
  200. }
  201. int run()
  202. {
  203. ICoven &coven=queryCoven();
  204. ICommunicator &comm=coven.queryComm();
  205. CMessageHandler<CLockRequestServer> handler("CLockRequestServer",this,&CLockRequestServer::processMessage);
  206. stopped = false;
  207. CMessageBuffer mb;
  208. while (!stopped) {
  209. try {
  210. mb.clear();
  211. if (comm.recv(mb,RANK_ALL,MPTAG_DALI_LOCK_REQUEST,NULL)) {
  212. handler.handleMessage(mb);
  213. }
  214. else
  215. stopped = true;
  216. }
  217. catch (IException *e)
  218. {
  219. EXCLOG(e, "CLockRequestServer");
  220. e->Release();
  221. }
  222. }
  223. return 0;
  224. }
  225. void processMessage(CMessageBuffer &mb)
  226. {
  227. ICoven &coven=queryCoven();
  228. ICommunicator &comm=coven.queryComm();
  229. DistributedLockId id;
  230. SessionId session;
  231. bool exclusive;
  232. long timeout;
  233. int fn;
  234. mb.read(fn);
  235. switch (fn) {
  236. case MLR_ALLOC_LOCK_ID: {
  237. id = manager.createDistributedLockId();
  238. mb.clear().append(id);
  239. comm.reply(mb);
  240. }
  241. break;
  242. case MLR_FREE_LOCK_ID: {
  243. mb.read(id);
  244. manager.releaseDistributedLockId(id);
  245. }
  246. break;
  247. case MLR_PRIMARY_LOCK_REQUEST: {
  248. mb.read(id).read(session).read(exclusive).read(timeout);
  249. bool ret = manager.lock(id,session,exclusive,timeout);
  250. mb.clear().append(ret);
  251. comm.reply(mb);
  252. }
  253. break;
  254. case MLR_PRIMARY_UNLOCK_REQUEST: {
  255. mb.read(id).read(session);
  256. manager.unlock(id,session);
  257. mb.clear();
  258. comm.reply(mb);
  259. }
  260. break;
  261. case MLR_SECONDARY_LOCK_REQUEST: {
  262. mb.read(id).read(session).read(exclusive).read(timeout);
  263. bool ret = manager.localLock(id,session,exclusive,timeout);
  264. mb.clear().append(ret);
  265. comm.reply(mb);
  266. }
  267. break;
  268. case MLR_SECONDARY_UNLOCK_REQUEST: {
  269. mb.read(id).read(session);
  270. manager.localUnlock(id,session);
  271. mb.clear();
  272. comm.reply(mb);
  273. }
  274. break;
  275. }
  276. }
  277. void stop()
  278. {
  279. if (!stopped) {
  280. stopped = true;
  281. queryCoven().queryComm().cancel(RANK_ALL, MPTAG_DALI_LOCK_REQUEST);
  282. }
  283. join();
  284. }
  285. };
  286. class CClientDistributedLockManager: implements IDistributedLockManager
  287. {
  288. public:
  289. CClientDistributedLockManager()
  290. {
  291. }
  292. ~CClientDistributedLockManager()
  293. {
  294. }
  295. DistributedLockId createDistributedLockId()
  296. {
  297. CMessageBuffer mb;
  298. mb.append((int)MLR_ALLOC_LOCK_ID);
  299. queryCoven().sendRecv(mb,RANK_RANDOM,MPTAG_DALI_LOCK_REQUEST);
  300. DistributedLockId ret;
  301. mb.read(ret);
  302. return ret;
  303. }
  304. void releaseDistributedLockId(DistributedLockId id)
  305. {
  306. // maybe some checking here?
  307. CMessageBuffer mb;
  308. mb.append((int)MLR_FREE_LOCK_ID).append(id);
  309. try {
  310. ICoven &coven=queryCoven();
  311. coven.queryComm().send(mb,coven.chooseServer(id),MPTAG_DALI_LOCK_REQUEST,MP_ASYNC_SEND);
  312. }
  313. catch (IMP_Exception *e) // ignore if fails
  314. {
  315. if (e->errorCode()!=MPERR_link_closed)
  316. throw;
  317. EXCLOG(e,"releaseDistributedLockId");
  318. e->Release();
  319. }
  320. }
  321. bool lock(DistributedLockId id,SessionId owner,bool exclusive=true,long timeout=-1)
  322. {
  323. CMessageBuffer mb;
  324. mb.append((int)MLR_PRIMARY_LOCK_REQUEST).append(id).append(owner).append(exclusive).append(timeout);
  325. ICoven &coven=queryCoven();
  326. coven.sendRecv(mb,coven.chooseServer(id),MPTAG_DALI_LOCK_REQUEST);
  327. bool ret;
  328. mb.read(ret);
  329. return ret;
  330. }
  331. void unlock(DistributedLockId id,SessionId owner)
  332. {
  333. CMessageBuffer mb;
  334. mb.append((int)MLR_PRIMARY_UNLOCK_REQUEST).append(id).append(owner);
  335. ICoven &coven=queryCoven();
  336. coven.sendRecv(mb,coven.chooseServer(id),MPTAG_DALI_LOCK_REQUEST);
  337. }
  338. };
  339. #ifdef _MSC_VER
  340. #pragma warning (push)
  341. #pragma warning (disable : 4355) // 'this' : used in base member initializer list
  342. #endif
  343. class CCovenDistributedLockManager: implements IDistributedLockManager
  344. {
  345. DistributedLockId nextLockId;
  346. CLockRequestServer lockrequestserver;
  347. CLockStateTable lockstates;
  348. ICoven &coven;
  349. public:
  350. CCovenDistributedLockManager(ICoven &_coven)
  351. : coven(_coven), lockrequestserver(*this)
  352. {
  353. nextLockId = 0;
  354. }
  355. ~CCovenDistributedLockManager()
  356. {
  357. }
  358. DistributedLockId createDistributedLockId()
  359. {
  360. return coven.getUniqueId();
  361. }
  362. void releaseDistributedLockId(DistributedLockId id)
  363. {
  364. // should remove lock info etc TBD
  365. #if 0 // TBD
  366. CovenServerId dst = coven.originUniqueId(id);
  367. if (dst==myid) { // my lock
  368. lockallocator.freeid(id);
  369. }
  370. else {
  371. CMessageBuffer mb;
  372. mb.append((int)MLR_FREE_LOCK_ID).append(id);
  373. queryCoven().send(mb,coven.getServerRank(dst),MPTAG_DALI_LOCK_REQUEST,MP_ASYNC_SEND);
  374. }
  375. #endif
  376. }
  377. bool remoteLock(rank_t dst,DistributedLockId id,SessionId owner,bool exclusive=true,long timeout=-1)
  378. {
  379. CMessageBuffer mb;
  380. mb.append((int)MLR_SECONDARY_LOCK_REQUEST).append(id).append(owner).append(exclusive).append(timeout);
  381. queryCoven().sendRecv(mb,dst,MPTAG_DALI_LOCK_REQUEST);
  382. bool ret;
  383. mb.read(ret);
  384. return ret;
  385. }
  386. void remoteUnlock(rank_t dst,DistributedLockId id,SessionId owner)
  387. {
  388. CMessageBuffer mb;
  389. mb.append((int)MLR_SECONDARY_UNLOCK_REQUEST).append(id).append(owner);
  390. queryCoven().sendRecv(mb,dst,MPTAG_DALI_LOCK_REQUEST);
  391. }
  392. bool localLock(DistributedLockId id,SessionId owner,bool exclusive=true,long timeout=-1)
  393. {
  394. return lockstates.lock(id,owner,exclusive,timeout);
  395. }
  396. void localUnlock(DistributedLockId id,SessionId owner)
  397. {
  398. lockstates.unlock(id,owner);
  399. }
  400. bool lock(DistributedLockId id,SessionId owner,bool exclusive=true,long timeout=-1)
  401. {
  402. rank_t myrank = coven.getServerRank();
  403. rank_t ownerrank = coven.chooseServer(id);
  404. // first do owner
  405. if (myrank==ownerrank) {
  406. if (!localLock(id,owner,exclusive,timeout))
  407. return false;
  408. }
  409. else if (!remoteLock(ownerrank,id,owner,exclusive,timeout))
  410. return false;
  411. // all others should succeed quickly
  412. IGroup &grp = queryCoven().queryComm().queryGroup();
  413. ForEachOtherNodeInGroup(r,grp) {
  414. if (r!=ownerrank)
  415. remoteLock(r,id,owner,exclusive);
  416. }
  417. return true;
  418. }
  419. void unlock(DistributedLockId id,SessionId owner)
  420. {
  421. rank_t myrank = coven.getServerRank();
  422. rank_t ownerrank = coven.chooseServer(id);
  423. // first do owner
  424. if (myrank==ownerrank) {
  425. localUnlock(id,owner);
  426. }
  427. else
  428. remoteUnlock(ownerrank,id,owner);
  429. // all others should succeed quickly
  430. ForEachOtherNodeInGroup(r,coven.queryComm().queryGroup()) {
  431. if (r!=ownerrank)
  432. remoteUnlock(r,id,owner);
  433. }
  434. }
  435. void start()
  436. {
  437. lockrequestserver.start();
  438. }
  439. void stop()
  440. {
  441. lockrequestserver.stop();
  442. }
  443. };
  444. #ifdef _MSC_VER
  445. #pragma warning (pop) // warning 4355
  446. #endif
  447. IDistributedLockManager &queryDistributedLockManager()
  448. {
  449. if (!DistributedLockManager) {
  450. assertex(!queryCoven().inCoven()); // Check not Coven server (if occurs - not initialized correctly;
  451. DistributedLockManager = new CClientDistributedLockManager();
  452. }
  453. return *DistributedLockManager;
  454. }
  455. DistributedLockId createDistributedLockId()
  456. {
  457. return queryDistributedLockManager().createDistributedLockId();
  458. }
  459. void releaseDistributedLockId(DistributedLockId id)
  460. {
  461. queryDistributedLockManager().releaseDistributedLockId(id);
  462. }
  463. DistributedLockId lookupDistributedLockId(const char *name)
  464. {
  465. assertex(!"TBD");
  466. return 0;
  467. }
  468. class CDistributedLock: implements IDistributedLock, public CInterface
  469. {
  470. DistributedLockId id;
  471. SessionId session;
  472. public:
  473. IMPLEMENT_IINTERFACE;
  474. CDistributedLock(DistributedLockId _id, SessionId _session)
  475. {
  476. id = _id;
  477. session = _session;
  478. }
  479. bool lock(bool exclusive=true,long timeout=-1)
  480. {
  481. return queryDistributedLockManager().lock(id,session,exclusive,timeout);
  482. }
  483. void unlock()
  484. {
  485. queryDistributedLockManager().unlock(id,session);
  486. }
  487. bool relock(bool exclusive=true,long timeout=-1)
  488. {
  489. assertex(!"TBD");
  490. return false; // TBD
  491. }
  492. DistributedLockId getID()
  493. {
  494. return id;
  495. }
  496. SessionId getSession()
  497. {
  498. return session;
  499. }
  500. };
  501. IDistributedLock *createDistributedLock(DistributedLockId id, SessionId session)
  502. {
  503. return new CDistributedLock(id,(session==0)?myProcessSession():session);
  504. }
  505. class CDaliLockServer: public IDaliServer, public CInterface
  506. {
  507. public:
  508. IMPLEMENT_IINTERFACE;
  509. void start()
  510. {
  511. ICoven &coven=queryCoven();
  512. assertex(coven.inCoven()); // must be member of coven
  513. DistributedLockManager = new CCovenDistributedLockManager(coven);
  514. DistributedLockManager->start();
  515. }
  516. void ready()
  517. {
  518. }
  519. void suspend()
  520. {
  521. }
  522. void stop()
  523. {
  524. DistributedLockManager->stop();
  525. delete DistributedLockManager;
  526. DistributedLockManager = NULL;
  527. }
  528. void nodeDown(rank_t rank)
  529. {
  530. assertex("TBD");
  531. }
  532. };
  533. IDaliServer *createDaliLockServer()
  534. {
  535. return new CDaliLockServer();
  536. }