jmutex.cpp 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441
  1. /*##############################################################################
  2. HPCC SYSTEMS software Copyright (C) 2012 HPCC Systems.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. ############################################################################## */
  13. #include "platform.h"
  14. #include "jmutex.hpp"
  15. #include "jsuperhash.hpp"
  16. #include "jmisc.hpp"
  17. #include "jfile.hpp"
  18. #include <stdio.h>
  19. #include <assert.h>
  20. //===========================================================================
  21. #ifndef _WIN32
  22. Mutex::Mutex()
  23. {
  24. pthread_mutex_init(&mutex, NULL);
  25. pthread_cond_init(&lock_free, NULL);
  26. owner = 0;
  27. lockcount = 0;
  28. }
  29. Mutex::~Mutex()
  30. {
  31. pthread_cond_destroy(&lock_free);
  32. pthread_mutex_destroy(&mutex);
  33. }
  34. void Mutex::lock()
  35. {
  36. pthread_mutex_lock(&mutex);
  37. while ((owner!=0) && !pthread_equal(owner, pthread_self()))
  38. pthread_cond_wait(&lock_free, &mutex);
  39. if (lockcount++==0)
  40. owner = pthread_self();
  41. pthread_mutex_unlock(&mutex);
  42. }
  43. bool Mutex::lockWait(unsigned timeout)
  44. {
  45. if (timeout==(unsigned)-1) {
  46. lock();
  47. return true;
  48. }
  49. pthread_mutex_lock(&mutex);
  50. bool first=true;
  51. while ((owner!=0) && !pthread_equal(owner, pthread_self())) {
  52. timespec abs;
  53. if (first) {
  54. timeval cur;
  55. gettimeofday(&cur, NULL);
  56. unsigned tsec=(timeout/1000);
  57. abs.tv_sec = cur.tv_sec + tsec;
  58. abs.tv_nsec = (cur.tv_usec + timeout%1000*1000)*1000;
  59. if (abs.tv_nsec>=1000000000) {
  60. abs.tv_nsec-=1000000000;
  61. abs.tv_sec++;
  62. }
  63. first = false;
  64. }
  65. if (pthread_cond_timedwait(&lock_free, &mutex, &abs)==ETIMEDOUT) {
  66. pthread_mutex_unlock(&mutex);
  67. return false;
  68. }
  69. }
  70. if (lockcount++==0)
  71. owner = pthread_self();
  72. pthread_mutex_unlock(&mutex);
  73. return true;
  74. }
  75. void Mutex::unlock()
  76. {
  77. pthread_mutex_lock(&mutex);
  78. #ifdef _DEBUG
  79. assertex(pthread_equal(owner, pthread_self()));
  80. #endif
  81. if (--lockcount==0)
  82. {
  83. owner = 0;
  84. pthread_cond_signal(&lock_free);
  85. }
  86. pthread_mutex_unlock(&mutex);
  87. }
  88. void Mutex::lockAll(int count)
  89. {
  90. if (count) {
  91. pthread_mutex_lock(&mutex);
  92. while ((owner!=0) && !pthread_equal(owner, pthread_self()))
  93. pthread_cond_wait(&lock_free, &mutex);
  94. lockcount = count;
  95. owner = pthread_self();
  96. pthread_mutex_unlock(&mutex);
  97. }
  98. }
  99. int Mutex::unlockAll()
  100. {
  101. pthread_mutex_lock(&mutex);
  102. int ret = lockcount;
  103. if (lockcount!=0) {
  104. #ifdef _DEBUG
  105. assertex(pthread_equal(owner, pthread_self()));
  106. #endif
  107. lockcount = 0;
  108. owner = 0;
  109. pthread_cond_signal(&lock_free);
  110. }
  111. pthread_mutex_unlock(&mutex);
  112. return ret;
  113. }
  114. inline bool read_data(int fd, void *buf, size_t nbytes)
  115. {
  116. size32_t nread = 0;
  117. while (nread<nbytes) {
  118. size32_t rd = read(fd, (char *)buf + nread, nbytes-nread);
  119. if ((int)rd>=0)
  120. nread += rd;
  121. else if (errno != EINTR)
  122. return false;
  123. }
  124. return true;
  125. }
  126. inline bool write_data(int fd, const void *buf, size_t nbytes)
  127. {
  128. size32_t nwritten= 0;
  129. while (nwritten<nbytes) {
  130. size32_t wr = write(fd, (const char *)buf + nwritten, nbytes-nwritten);
  131. if ((int)wr>=0)
  132. nwritten += wr;
  133. else if (errno != EINTR)
  134. return false;
  135. }
  136. return true;
  137. }
  138. #define POLLTIME (1000*15)
  139. static bool lock_file(const char *lfpath)
  140. {
  141. unsigned attempt = 0;
  142. int err = 0;
  143. while (attempt < 3) {
  144. char lckcontents[12];
  145. int fd = open(lfpath, O_RDWR | O_CREAT | O_EXCL, S_IRWXU);
  146. if (fd==-1) {
  147. if (errno != EEXIST)
  148. break;
  149. fd = open(lfpath, O_RDONLY);
  150. if (fd==-1)
  151. break;
  152. bool ok = read_data(fd, lckcontents, sizeof(lckcontents)-1);
  153. close(fd);
  154. if (ok) {
  155. lckcontents[sizeof(lckcontents)-1] = 0;
  156. int pid = atoi(lckcontents);
  157. if (pid==getpid())
  158. return true;
  159. if (kill(pid, 0) == -1) {
  160. if (errno != ESRCH)
  161. return false;
  162. unlink(lfpath);
  163. continue;
  164. }
  165. }
  166. Sleep(1000);
  167. attempt++;
  168. }
  169. else {
  170. sprintf(lckcontents,"%10d\n",(int)getpid());
  171. bool ok = write_data(fd, lckcontents, sizeof(lckcontents)-1);
  172. close(fd);
  173. if (!ok)
  174. break;
  175. }
  176. }
  177. return false;
  178. }
  179. static void unlock_file(const char *lfpath)
  180. {
  181. unsigned attempt = 0;
  182. for (unsigned attempt=0;attempt<10;attempt++) {
  183. if (unlink(lfpath)>=0)
  184. return;
  185. attempt++;
  186. Sleep(500);
  187. }
  188. ERRLOG("NamedMutex cannot unlock file (%d)",errno);
  189. }
  190. static CriticalSection lockPrefixCS;
  191. static StringBuffer lockPrefix;
  192. NamedMutex::NamedMutex(const char *name)
  193. {
  194. {
  195. CriticalBlock b(lockPrefixCS);
  196. if (0 == lockPrefix.length())
  197. {
  198. if (!getConfigurationDirectory(NULL, "lock", NULL, NULL, lockPrefix))
  199. throw MakeStringException(0, "Failed to get lock directory from environment");
  200. }
  201. addPathSepChar(lockPrefix);
  202. lockPrefix.append("JLIBMUTEX_");
  203. }
  204. StringBuffer tmp(lockPrefix);
  205. tmp.append("JLIBMUTEX_").append(name);
  206. mutexfname = tmp.detach();
  207. }
  208. NamedMutex::~NamedMutex()
  209. {
  210. free(mutexfname);
  211. }
  212. void NamedMutex::lock()
  213. {
  214. // first lock locally
  215. threadmutex.lock();
  216. // then lock globally
  217. loop {
  218. if (lock_file(mutexfname))
  219. return;
  220. Sleep(POLLTIME);
  221. }
  222. }
  223. bool NamedMutex::lockWait(unsigned timeout)
  224. {
  225. unsigned t = msTick();
  226. // first lock locally
  227. if (!threadmutex.lockWait(timeout))
  228. return false;
  229. // then lock globally
  230. loop {
  231. if (lock_file(mutexfname))
  232. return true;
  233. unsigned elapsed = msTick()-t;
  234. if (elapsed>=timeout) {
  235. threadmutex.unlock();
  236. break;
  237. }
  238. Sleep((timeout-elapsed)>POLLTIME?POLLTIME:(timeout-elapsed));
  239. }
  240. return false;
  241. }
  242. void NamedMutex::unlock()
  243. {
  244. // assumed held
  245. unlock_file(mutexfname);
  246. threadmutex.unlock();
  247. }
  248. #endif
  249. void synchronized::throwLockException(unsigned timeout)
  250. {
  251. throw MakeStringException(0,"Can not lock - %d",timeout);
  252. }
  253. //===========================================================================
  254. void Monitor::wait()
  255. {
  256. assertex(owner==GetCurrentThreadId());
  257. waiting++;
  258. void *cur = last;
  259. last = &cur;
  260. while (1) {
  261. int locked = unlockAll();
  262. sem->wait();
  263. lockAll(locked);
  264. if (cur==NULL) { // i.e. first in
  265. void **p=(void **)&last;
  266. while (*p!=&cur)
  267. p = (void **)*p;
  268. *p = NULL; // reset so next picks up
  269. break;
  270. }
  271. sem->signal();
  272. }
  273. }
  274. void Monitor::notify()
  275. { // should always be locked
  276. assertex(owner==GetCurrentThreadId());
  277. if (waiting)
  278. {
  279. waiting--;
  280. sem->signal();
  281. }
  282. }
  283. void Monitor::notifyAll()
  284. { // should always be locked
  285. assertex(owner==GetCurrentThreadId());
  286. if (waiting)
  287. {
  288. sem->signal(waiting);
  289. waiting = 0;
  290. }
  291. }
  292. //==================================================================================
  293. #ifdef USECHECKEDCRITICALSECTIONS
  294. CheckedReadLockBlock::CheckedReadLockBlock(ReadWriteLock &l, unsigned timeout, const char *fname,unsigned lnum) : lock(l)
  295. {
  296. loop
  297. {
  298. if (lock.lockRead(timeout))
  299. break;
  300. PROGLOG("CheckedReadLockBlock timeout %s(%d)",fname,lnum);
  301. PrintStackReport();
  302. }
  303. }
  304. CheckedWriteLockBlock::CheckedWriteLockBlock(ReadWriteLock &l, unsigned timeout, const char *fname,unsigned lnum) : lock(l)
  305. {
  306. loop
  307. {
  308. if (lock.lockWrite(timeout))
  309. break;
  310. PROGLOG("CheckedWriteLockBlock timeout %s(%d)",fname,lnum);
  311. PrintStackReport();
  312. }
  313. }
  314. void checkedReadLockEnter(ReadWriteLock &lock, unsigned timeout, const char *fname, unsigned lnum)
  315. {
  316. loop
  317. {
  318. if (lock.lockRead(timeout))
  319. break;
  320. PROGLOG("checkedReadLockEnter timeout %s(%d)",fname,lnum);
  321. PrintStackReport();
  322. }
  323. }
  324. void checkedWriteLockEnter(ReadWriteLock &lock, unsigned timeout, const char *fname, unsigned lnum)
  325. {
  326. loop
  327. {
  328. if (lock.lockWrite(timeout))
  329. break;
  330. PROGLOG("checkedWriteLockEnter timeout %s(%d)",fname,lnum);
  331. PrintStackReport();
  332. }
  333. }
  334. //==================================================================================
  335. void checkedCritEnter(CheckedCriticalSection &crit, unsigned timeout, const char *fname, unsigned lnum)
  336. {
  337. loop
  338. {
  339. if (crit.lockWait(timeout))
  340. break;
  341. PROGLOG("checkedCritEnter timeout %s(%d)",fname,lnum);
  342. PrintStackReport();
  343. }
  344. }
  345. void checkedCritLeave(CheckedCriticalSection &crit)
  346. {
  347. crit.unlock();
  348. }
  349. CheckedCriticalBlock::CheckedCriticalBlock(CheckedCriticalSection &c, unsigned timeout, const char *fname,unsigned lnum)
  350. : crit(c)
  351. {
  352. loop
  353. {
  354. if (crit.lockWait(timeout))
  355. break;
  356. PROGLOG("CheckedCriticalBlock timeout %s(%d)",fname,lnum);
  357. PrintStackReport();
  358. }
  359. }
  360. CheckedCriticalUnblock::~CheckedCriticalUnblock()
  361. {
  362. loop
  363. {
  364. if (crit.lockWait(timeout))
  365. break;
  366. PROGLOG("CheckedCriticalUnblock timeout %s(%d)",fname,lnum);
  367. PrintStackReport();
  368. }
  369. }
  370. #endif
  371. void ThreadYield()
  372. { // works for SCHED_RR threads (<spit>) also
  373. #ifdef _WIN32
  374. Sleep(0);
  375. #else
  376. pthread_t self = pthread_self();
  377. int policy;
  378. sched_param param;
  379. pthread_getschedparam(self, &policy, &param);
  380. if (policy==SCHED_RR) {
  381. int saveprio = param.sched_priority;
  382. param.sched_priority = 0;
  383. pthread_setschedparam(self, SCHED_OTHER, &param);
  384. param.sched_priority = saveprio;
  385. sched_yield();
  386. pthread_setschedparam(self, policy, &param);
  387. }
  388. else
  389. sched_yield();
  390. #endif
  391. }