jmutex.cpp 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482
  1. /*##############################################################################
  2. HPCC SYSTEMS software Copyright (C) 2012 HPCC Systems®.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. ############################################################################## */
  13. #include "platform.h"
  14. #include "jmutex.hpp"
  15. #include "jsuperhash.hpp"
  16. #include "jmisc.hpp"
  17. #include "jfile.hpp"
  18. #include <stdio.h>
  19. #include <assert.h>
  20. //===========================================================================
  21. #ifndef _WIN32
  22. Mutex::Mutex()
  23. {
  24. pthread_mutex_init(&mutex, NULL);
  25. pthread_cond_init(&lock_free, NULL);
  26. owner = 0;
  27. lockcount = 0;
  28. }
  29. Mutex::~Mutex()
  30. {
  31. pthread_cond_destroy(&lock_free);
  32. pthread_mutex_destroy(&mutex);
  33. }
  34. void Mutex::lock()
  35. {
  36. pthread_mutex_lock(&mutex);
  37. while ((owner!=0) && !pthread_equal(owner, pthread_self()))
  38. pthread_cond_wait(&lock_free, &mutex);
  39. if (lockcount++==0)
  40. owner = pthread_self();
  41. pthread_mutex_unlock(&mutex);
  42. }
  43. bool Mutex::lockWait(unsigned timeout)
  44. {
  45. if (timeout==(unsigned)-1) {
  46. lock();
  47. return true;
  48. }
  49. pthread_mutex_lock(&mutex);
  50. bool first=true;
  51. while ((owner!=0) && !pthread_equal(owner, pthread_self())) {
  52. timespec abs;
  53. if (first) {
  54. getEndTime(abs, timeout);
  55. first = false;
  56. }
  57. if (pthread_cond_timedwait(&lock_free, &mutex, &abs)==ETIMEDOUT) {
  58. pthread_mutex_unlock(&mutex);
  59. return false;
  60. }
  61. }
  62. if (lockcount++==0)
  63. owner = pthread_self();
  64. pthread_mutex_unlock(&mutex);
  65. return true;
  66. }
  67. void Mutex::unlock()
  68. {
  69. pthread_mutex_lock(&mutex);
  70. #ifdef _DEBUG
  71. assertex(pthread_equal(owner, pthread_self()));
  72. #endif
  73. if (--lockcount==0)
  74. {
  75. owner = 0;
  76. pthread_cond_signal(&lock_free);
  77. }
  78. pthread_mutex_unlock(&mutex);
  79. }
  80. void Mutex::lockAll(int count)
  81. {
  82. if (count) {
  83. pthread_mutex_lock(&mutex);
  84. while ((owner!=0) && !pthread_equal(owner, pthread_self()))
  85. pthread_cond_wait(&lock_free, &mutex);
  86. lockcount = count;
  87. owner = pthread_self();
  88. pthread_mutex_unlock(&mutex);
  89. }
  90. }
  91. int Mutex::unlockAll()
  92. {
  93. pthread_mutex_lock(&mutex);
  94. int ret = lockcount;
  95. if (lockcount!=0) {
  96. #ifdef _DEBUG
  97. assertex(pthread_equal(owner, pthread_self()));
  98. #endif
  99. lockcount = 0;
  100. owner = 0;
  101. pthread_cond_signal(&lock_free);
  102. }
  103. pthread_mutex_unlock(&mutex);
  104. return ret;
  105. }
  106. inline bool read_data(int fd, void *buf, size_t nbytes)
  107. {
  108. size32_t nread = 0;
  109. while (nread<nbytes) {
  110. size32_t rd = read(fd, (char *)buf + nread, nbytes-nread);
  111. if ((int)rd>=0)
  112. nread += rd;
  113. else if (errno != EINTR)
  114. return false;
  115. }
  116. return true;
  117. }
  118. inline bool write_data(int fd, const void *buf, size_t nbytes)
  119. {
  120. size32_t nwritten= 0;
  121. while (nwritten<nbytes) {
  122. size32_t wr = write(fd, (const char *)buf + nwritten, nbytes-nwritten);
  123. if ((int)wr>=0)
  124. nwritten += wr;
  125. else if (errno != EINTR)
  126. return false;
  127. }
  128. return true;
  129. }
  130. #define POLLTIME (1000*15)
  131. static bool lock_file(const char *lfpath)
  132. {
  133. unsigned attempt = 0;
  134. while (attempt < 3) {
  135. char lckcontents[12];
  136. int fd = open(lfpath, O_RDWR | O_CREAT | O_EXCL, S_IRWXU);
  137. if (fd==-1) {
  138. if (errno != EEXIST)
  139. break;
  140. fd = open(lfpath, O_RDONLY);
  141. if (fd==-1)
  142. break;
  143. bool ok = read_data(fd, lckcontents, sizeof(lckcontents)-1);
  144. close(fd);
  145. if (ok) {
  146. lckcontents[sizeof(lckcontents)-1] = 0;
  147. int pid = atoi(lckcontents);
  148. if (pid==getpid())
  149. return true;
  150. if (kill(pid, 0) == -1) {
  151. if (errno != ESRCH)
  152. return false;
  153. unlink(lfpath);
  154. continue;
  155. }
  156. }
  157. Sleep(1000);
  158. attempt++;
  159. }
  160. else {
  161. sprintf(lckcontents,"%10d\n",(int)getpid());
  162. bool ok = write_data(fd, lckcontents, sizeof(lckcontents)-1);
  163. close(fd);
  164. if (!ok)
  165. break;
  166. }
  167. }
  168. return false;
  169. }
  170. static void unlock_file(const char *lfpath)
  171. {
  172. for (unsigned attempt=0;attempt<10;attempt++) {
  173. if (unlink(lfpath)>=0)
  174. return;
  175. attempt++;
  176. Sleep(500);
  177. }
  178. IERRLOG("NamedMutex cannot unlock file (%d)",errno);
  179. }
  180. static CriticalSection lockPrefixCS;
  181. static StringBuffer lockPrefix;
  182. NamedMutex::NamedMutex(const char *name)
  183. {
  184. {
  185. CriticalBlock b(lockPrefixCS);
  186. if (0 == lockPrefix.length())
  187. {
  188. if (!getConfigurationDirectory(NULL, "lock", NULL, NULL, lockPrefix))
  189. WARNLOG("Failed to get lock directory from environment");
  190. }
  191. addPathSepChar(lockPrefix);
  192. lockPrefix.append("JLIBMUTEX_");
  193. }
  194. StringBuffer tmp(lockPrefix);
  195. tmp.append("JLIBMUTEX_").append(name);
  196. mutexfname = tmp.detach();
  197. }
  198. NamedMutex::~NamedMutex()
  199. {
  200. free(mutexfname);
  201. }
  202. void NamedMutex::lock()
  203. {
  204. // first lock locally
  205. threadmutex.lock();
  206. // then lock globally
  207. for (;;) {
  208. if (lock_file(mutexfname))
  209. return;
  210. Sleep(POLLTIME);
  211. }
  212. }
  213. bool NamedMutex::lockWait(unsigned timeout)
  214. {
  215. unsigned t = msTick();
  216. // first lock locally
  217. if (!threadmutex.lockWait(timeout))
  218. return false;
  219. // then lock globally
  220. for (;;) {
  221. if (lock_file(mutexfname))
  222. return true;
  223. unsigned elapsed = msTick()-t;
  224. if (elapsed>=timeout) {
  225. threadmutex.unlock();
  226. break;
  227. }
  228. Sleep((timeout-elapsed)>POLLTIME?POLLTIME:(timeout-elapsed));
  229. }
  230. return false;
  231. }
  232. void NamedMutex::unlock()
  233. {
  234. // assumed held
  235. unlock_file(mutexfname);
  236. threadmutex.unlock();
  237. }
  238. #endif
  239. void synchronized::throwLockException(unsigned timeout)
  240. {
  241. throw MakeStringException(0,"Can not lock - %d",timeout);
  242. }
  243. //===========================================================================
  244. void Monitor::wait()
  245. {
  246. assertex(owner==GetCurrentThreadId());
  247. waiting++;
  248. void *cur = last;
  249. last = &cur;
  250. while (1) {
  251. int locked = unlockAll();
  252. sem->wait();
  253. lockAll(locked);
  254. if (cur==NULL) { // i.e. first in
  255. void **p=(void **)&last;
  256. while (*p!=&cur)
  257. p = (void **)*p;
  258. *p = NULL; // reset so next picks up
  259. break;
  260. }
  261. sem->signal();
  262. }
  263. }
  264. void Monitor::notify()
  265. { // should always be locked
  266. assertex(owner==GetCurrentThreadId());
  267. if (waiting)
  268. {
  269. waiting--;
  270. sem->signal();
  271. }
  272. }
  273. void Monitor::notifyAll()
  274. { // should always be locked
  275. assertex(owner==GetCurrentThreadId());
  276. if (waiting)
  277. {
  278. sem->signal(waiting);
  279. waiting = 0;
  280. }
  281. }
  282. //==================================================================================
  283. #ifdef USE_PTHREAD_RWLOCK
  284. bool ReadWriteLock::lockRead(unsigned timeout)
  285. {
  286. if (timeout == (unsigned)-1)
  287. {
  288. lockRead();
  289. return true;
  290. }
  291. if (pthread_rwlock_tryrdlock(&rwlock) == 0)
  292. return true;
  293. timespec endtime;
  294. getEndTime(endtime, timeout);
  295. return (pthread_rwlock_timedrdlock(&rwlock, &endtime) == 0);
  296. }
  297. bool ReadWriteLock::lockWrite(unsigned timeout)
  298. {
  299. if (timeout == (unsigned)-1)
  300. {
  301. lockWrite();
  302. return true;
  303. }
  304. if (pthread_rwlock_trywrlock(&rwlock) == 0)
  305. return true;
  306. timespec endtime;
  307. getEndTime(endtime, timeout);
  308. return (pthread_rwlock_timedwrlock(&rwlock, &endtime) == 0);
  309. }
  310. #endif
  311. //==================================================================================
  312. #ifdef USECHECKEDCRITICALSECTIONS
  313. CheckedReadLockBlock::CheckedReadLockBlock(ReadWriteLock &l, unsigned timeout, const char *fname,unsigned lnum) : lock(l)
  314. {
  315. for (;;)
  316. {
  317. if (lock.lockRead(timeout))
  318. break;
  319. PROGLOG("CheckedReadLockBlock timeout %s(%d)",fname,lnum);
  320. PrintStackReport();
  321. }
  322. }
  323. CheckedWriteLockBlock::CheckedWriteLockBlock(ReadWriteLock &l, unsigned timeout, const char *fname,unsigned lnum) : lock(l)
  324. {
  325. for (;;)
  326. {
  327. if (lock.lockWrite(timeout))
  328. break;
  329. PROGLOG("CheckedWriteLockBlock timeout %s(%d)",fname,lnum);
  330. PrintStackReport();
  331. }
  332. }
  333. void checkedReadLockEnter(ReadWriteLock &lock, unsigned timeout, const char *fname, unsigned lnum)
  334. {
  335. for (;;)
  336. {
  337. if (lock.lockRead(timeout))
  338. break;
  339. PROGLOG("checkedReadLockEnter timeout %s(%d)",fname,lnum);
  340. PrintStackReport();
  341. }
  342. }
  343. void checkedWriteLockEnter(ReadWriteLock &lock, unsigned timeout, const char *fname, unsigned lnum)
  344. {
  345. for (;;)
  346. {
  347. if (lock.lockWrite(timeout))
  348. break;
  349. PROGLOG("checkedWriteLockEnter timeout %s(%d)",fname,lnum);
  350. PrintStackReport();
  351. }
  352. }
  353. //==================================================================================
  354. void checkedCritEnter(CheckedCriticalSection &crit, unsigned timeout, const char *fname, unsigned lnum)
  355. {
  356. for (;;)
  357. {
  358. if (crit.lockWait(timeout))
  359. break;
  360. PROGLOG("checkedCritEnter timeout %s(%d)",fname,lnum);
  361. PrintStackReport();
  362. }
  363. }
  364. void checkedCritLeave(CheckedCriticalSection &crit)
  365. {
  366. crit.unlock();
  367. }
  368. CheckedCriticalBlock::CheckedCriticalBlock(CheckedCriticalSection &c, unsigned timeout, const char *fname,unsigned lnum)
  369. : crit(c)
  370. {
  371. for (;;)
  372. {
  373. if (crit.lockWait(timeout))
  374. break;
  375. PROGLOG("CheckedCriticalBlock timeout %s(%d)",fname,lnum);
  376. PrintStackReport();
  377. }
  378. }
  379. CheckedCriticalUnblock::~CheckedCriticalUnblock()
  380. {
  381. for (;;)
  382. {
  383. if (crit.lockWait(timeout))
  384. break;
  385. PROGLOG("CheckedCriticalUnblock timeout %s(%d)",fname,lnum);
  386. PrintStackReport();
  387. }
  388. }
  389. #endif
  390. void ThreadYield()
  391. { // works for SCHED_RR threads (<spit>) also
  392. #ifdef _WIN32
  393. Sleep(0);
  394. #else
  395. pthread_t self = pthread_self();
  396. int policy;
  397. sched_param param;
  398. pthread_getschedparam(self, &policy, &param);
  399. if (policy==SCHED_RR) {
  400. int saveprio = param.sched_priority;
  401. param.sched_priority = 0;
  402. pthread_setschedparam(self, SCHED_OTHER, &param);
  403. param.sched_priority = saveprio;
  404. sched_yield();
  405. pthread_setschedparam(self, policy, &param);
  406. }
  407. else
  408. sched_yield();
  409. #endif
  410. }
  411. void spinUntilReady(std::atomic_uint &value)
  412. {
  413. unsigned i = 0;
  414. const unsigned maxSpins = 10;
  415. while (value.load(std::memory_order_relaxed))
  416. {
  417. if (i++ == maxSpins)
  418. {
  419. i = 0;
  420. ThreadYield();
  421. }
  422. }
  423. }