jmutex.cpp 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453
  1. /*##############################################################################
  2. HPCC SYSTEMS software Copyright (C) 2012 HPCC Systems®.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. ############################################################################## */
  13. #include "platform.h"
  14. #include "jmutex.hpp"
  15. #include "jsuperhash.hpp"
  16. #include "jmisc.hpp"
  17. #include "jfile.hpp"
  18. #include <stdio.h>
  19. #include <assert.h>
  20. //===========================================================================
  21. #ifndef _WIN32
  22. Mutex::Mutex()
  23. {
  24. pthread_mutex_init(&mutex, NULL);
  25. pthread_cond_init(&lock_free, NULL);
  26. owner = 0;
  27. lockcount = 0;
  28. }
  29. Mutex::~Mutex()
  30. {
  31. pthread_cond_destroy(&lock_free);
  32. pthread_mutex_destroy(&mutex);
  33. }
  34. void Mutex::lock()
  35. {
  36. pthread_mutex_lock(&mutex);
  37. while ((owner!=0) && !pthread_equal(owner, pthread_self()))
  38. pthread_cond_wait(&lock_free, &mutex);
  39. if (lockcount++==0)
  40. owner = pthread_self();
  41. pthread_mutex_unlock(&mutex);
  42. }
  43. bool Mutex::lockWait(unsigned timeout)
  44. {
  45. if (timeout==(unsigned)-1) {
  46. lock();
  47. return true;
  48. }
  49. pthread_mutex_lock(&mutex);
  50. bool first=true;
  51. while ((owner!=0) && !pthread_equal(owner, pthread_self())) {
  52. timespec abs;
  53. if (first) {
  54. timeval cur;
  55. gettimeofday(&cur, NULL);
  56. unsigned tsec=(timeout/1000);
  57. abs.tv_sec = cur.tv_sec + tsec;
  58. abs.tv_nsec = (cur.tv_usec + timeout%1000*1000)*1000;
  59. if (abs.tv_nsec>=1000000000) {
  60. abs.tv_nsec-=1000000000;
  61. abs.tv_sec++;
  62. }
  63. first = false;
  64. }
  65. if (pthread_cond_timedwait(&lock_free, &mutex, &abs)==ETIMEDOUT) {
  66. pthread_mutex_unlock(&mutex);
  67. return false;
  68. }
  69. }
  70. if (lockcount++==0)
  71. owner = pthread_self();
  72. pthread_mutex_unlock(&mutex);
  73. return true;
  74. }
  75. void Mutex::unlock()
  76. {
  77. pthread_mutex_lock(&mutex);
  78. #ifdef _DEBUG
  79. assertex(pthread_equal(owner, pthread_self()));
  80. #endif
  81. if (--lockcount==0)
  82. {
  83. owner = 0;
  84. pthread_cond_signal(&lock_free);
  85. }
  86. pthread_mutex_unlock(&mutex);
  87. }
  88. void Mutex::lockAll(int count)
  89. {
  90. if (count) {
  91. pthread_mutex_lock(&mutex);
  92. while ((owner!=0) && !pthread_equal(owner, pthread_self()))
  93. pthread_cond_wait(&lock_free, &mutex);
  94. lockcount = count;
  95. owner = pthread_self();
  96. pthread_mutex_unlock(&mutex);
  97. }
  98. }
  99. int Mutex::unlockAll()
  100. {
  101. pthread_mutex_lock(&mutex);
  102. int ret = lockcount;
  103. if (lockcount!=0) {
  104. #ifdef _DEBUG
  105. assertex(pthread_equal(owner, pthread_self()));
  106. #endif
  107. lockcount = 0;
  108. owner = 0;
  109. pthread_cond_signal(&lock_free);
  110. }
  111. pthread_mutex_unlock(&mutex);
  112. return ret;
  113. }
  114. inline bool read_data(int fd, void *buf, size_t nbytes)
  115. {
  116. size32_t nread = 0;
  117. while (nread<nbytes) {
  118. size32_t rd = read(fd, (char *)buf + nread, nbytes-nread);
  119. if ((int)rd>=0)
  120. nread += rd;
  121. else if (errno != EINTR)
  122. return false;
  123. }
  124. return true;
  125. }
  126. inline bool write_data(int fd, const void *buf, size_t nbytes)
  127. {
  128. size32_t nwritten= 0;
  129. while (nwritten<nbytes) {
  130. size32_t wr = write(fd, (const char *)buf + nwritten, nbytes-nwritten);
  131. if ((int)wr>=0)
  132. nwritten += wr;
  133. else if (errno != EINTR)
  134. return false;
  135. }
  136. return true;
  137. }
  138. #define POLLTIME (1000*15)
  139. static bool lock_file(const char *lfpath)
  140. {
  141. unsigned attempt = 0;
  142. while (attempt < 3) {
  143. char lckcontents[12];
  144. int fd = open(lfpath, O_RDWR | O_CREAT | O_EXCL, S_IRWXU);
  145. if (fd==-1) {
  146. if (errno != EEXIST)
  147. break;
  148. fd = open(lfpath, O_RDONLY);
  149. if (fd==-1)
  150. break;
  151. bool ok = read_data(fd, lckcontents, sizeof(lckcontents)-1);
  152. close(fd);
  153. if (ok) {
  154. lckcontents[sizeof(lckcontents)-1] = 0;
  155. int pid = atoi(lckcontents);
  156. if (pid==getpid())
  157. return true;
  158. if (kill(pid, 0) == -1) {
  159. if (errno != ESRCH)
  160. return false;
  161. unlink(lfpath);
  162. continue;
  163. }
  164. }
  165. Sleep(1000);
  166. attempt++;
  167. }
  168. else {
  169. sprintf(lckcontents,"%10d\n",(int)getpid());
  170. bool ok = write_data(fd, lckcontents, sizeof(lckcontents)-1);
  171. close(fd);
  172. if (!ok)
  173. break;
  174. }
  175. }
  176. return false;
  177. }
  178. static void unlock_file(const char *lfpath)
  179. {
  180. for (unsigned attempt=0;attempt<10;attempt++) {
  181. if (unlink(lfpath)>=0)
  182. return;
  183. attempt++;
  184. Sleep(500);
  185. }
  186. ERRLOG("NamedMutex cannot unlock file (%d)",errno);
  187. }
  188. static CriticalSection lockPrefixCS;
  189. static StringBuffer lockPrefix;
  190. NamedMutex::NamedMutex(const char *name)
  191. {
  192. {
  193. CriticalBlock b(lockPrefixCS);
  194. if (0 == lockPrefix.length())
  195. {
  196. if (!getConfigurationDirectory(NULL, "lock", NULL, NULL, lockPrefix))
  197. throw MakeStringException(0, "Failed to get lock directory from environment");
  198. }
  199. addPathSepChar(lockPrefix);
  200. lockPrefix.append("JLIBMUTEX_");
  201. }
  202. StringBuffer tmp(lockPrefix);
  203. tmp.append("JLIBMUTEX_").append(name);
  204. mutexfname = tmp.detach();
  205. }
  206. NamedMutex::~NamedMutex()
  207. {
  208. free(mutexfname);
  209. }
  210. void NamedMutex::lock()
  211. {
  212. // first lock locally
  213. threadmutex.lock();
  214. // then lock globally
  215. loop {
  216. if (lock_file(mutexfname))
  217. return;
  218. Sleep(POLLTIME);
  219. }
  220. }
  221. bool NamedMutex::lockWait(unsigned timeout)
  222. {
  223. unsigned t = msTick();
  224. // first lock locally
  225. if (!threadmutex.lockWait(timeout))
  226. return false;
  227. // then lock globally
  228. loop {
  229. if (lock_file(mutexfname))
  230. return true;
  231. unsigned elapsed = msTick()-t;
  232. if (elapsed>=timeout) {
  233. threadmutex.unlock();
  234. break;
  235. }
  236. Sleep((timeout-elapsed)>POLLTIME?POLLTIME:(timeout-elapsed));
  237. }
  238. return false;
  239. }
  240. void NamedMutex::unlock()
  241. {
  242. // assumed held
  243. unlock_file(mutexfname);
  244. threadmutex.unlock();
  245. }
  246. #endif
  247. void synchronized::throwLockException(unsigned timeout)
  248. {
  249. throw MakeStringException(0,"Can not lock - %d",timeout);
  250. }
  251. //===========================================================================
  252. void Monitor::wait()
  253. {
  254. assertex(owner==GetCurrentThreadId());
  255. waiting++;
  256. void *cur = last;
  257. last = &cur;
  258. while (1) {
  259. int locked = unlockAll();
  260. sem->wait();
  261. lockAll(locked);
  262. if (cur==NULL) { // i.e. first in
  263. void **p=(void **)&last;
  264. while (*p!=&cur)
  265. p = (void **)*p;
  266. *p = NULL; // reset so next picks up
  267. break;
  268. }
  269. sem->signal();
  270. }
  271. }
  272. void Monitor::notify()
  273. { // should always be locked
  274. assertex(owner==GetCurrentThreadId());
  275. if (waiting)
  276. {
  277. waiting--;
  278. sem->signal();
  279. }
  280. }
  281. void Monitor::notifyAll()
  282. { // should always be locked
  283. assertex(owner==GetCurrentThreadId());
  284. if (waiting)
  285. {
  286. sem->signal(waiting);
  287. waiting = 0;
  288. }
  289. }
  290. //==================================================================================
  291. #ifdef USECHECKEDCRITICALSECTIONS
  292. CheckedReadLockBlock::CheckedReadLockBlock(ReadWriteLock &l, unsigned timeout, const char *fname,unsigned lnum) : lock(l)
  293. {
  294. loop
  295. {
  296. if (lock.lockRead(timeout))
  297. break;
  298. PROGLOG("CheckedReadLockBlock timeout %s(%d)",fname,lnum);
  299. PrintStackReport();
  300. }
  301. }
  302. CheckedWriteLockBlock::CheckedWriteLockBlock(ReadWriteLock &l, unsigned timeout, const char *fname,unsigned lnum) : lock(l)
  303. {
  304. loop
  305. {
  306. if (lock.lockWrite(timeout))
  307. break;
  308. PROGLOG("CheckedWriteLockBlock timeout %s(%d)",fname,lnum);
  309. PrintStackReport();
  310. }
  311. }
  312. void checkedReadLockEnter(ReadWriteLock &lock, unsigned timeout, const char *fname, unsigned lnum)
  313. {
  314. loop
  315. {
  316. if (lock.lockRead(timeout))
  317. break;
  318. PROGLOG("checkedReadLockEnter timeout %s(%d)",fname,lnum);
  319. PrintStackReport();
  320. }
  321. }
  322. void checkedWriteLockEnter(ReadWriteLock &lock, unsigned timeout, const char *fname, unsigned lnum)
  323. {
  324. loop
  325. {
  326. if (lock.lockWrite(timeout))
  327. break;
  328. PROGLOG("checkedWriteLockEnter timeout %s(%d)",fname,lnum);
  329. PrintStackReport();
  330. }
  331. }
  332. //==================================================================================
  333. void checkedCritEnter(CheckedCriticalSection &crit, unsigned timeout, const char *fname, unsigned lnum)
  334. {
  335. loop
  336. {
  337. if (crit.lockWait(timeout))
  338. break;
  339. PROGLOG("checkedCritEnter timeout %s(%d)",fname,lnum);
  340. PrintStackReport();
  341. }
  342. }
  343. void checkedCritLeave(CheckedCriticalSection &crit)
  344. {
  345. crit.unlock();
  346. }
  347. CheckedCriticalBlock::CheckedCriticalBlock(CheckedCriticalSection &c, unsigned timeout, const char *fname,unsigned lnum)
  348. : crit(c)
  349. {
  350. loop
  351. {
  352. if (crit.lockWait(timeout))
  353. break;
  354. PROGLOG("CheckedCriticalBlock timeout %s(%d)",fname,lnum);
  355. PrintStackReport();
  356. }
  357. }
  358. CheckedCriticalUnblock::~CheckedCriticalUnblock()
  359. {
  360. loop
  361. {
  362. if (crit.lockWait(timeout))
  363. break;
  364. PROGLOG("CheckedCriticalUnblock timeout %s(%d)",fname,lnum);
  365. PrintStackReport();
  366. }
  367. }
  368. #endif
  369. void ThreadYield()
  370. { // works for SCHED_RR threads (<spit>) also
  371. #ifdef _WIN32
  372. Sleep(0);
  373. #else
  374. pthread_t self = pthread_self();
  375. int policy;
  376. sched_param param;
  377. pthread_getschedparam(self, &policy, &param);
  378. if (policy==SCHED_RR) {
  379. int saveprio = param.sched_priority;
  380. param.sched_priority = 0;
  381. pthread_setschedparam(self, SCHED_OTHER, &param);
  382. param.sched_priority = saveprio;
  383. sched_yield();
  384. pthread_setschedparam(self, policy, &param);
  385. }
  386. else
  387. sched_yield();
  388. #endif
  389. }
  390. void spinUntilReady(atomic_t &value)
  391. {
  392. unsigned i = 0;
  393. const unsigned maxSpins = 10;
  394. while (atomic_read(&value))
  395. {
  396. if (i++ == maxSpins)
  397. {
  398. i = 0;
  399. ThreadYield();
  400. }
  401. }
  402. }