123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482 |
- /*##############################################################################
- HPCC SYSTEMS software Copyright (C) 2012 HPCC Systems®.
- Licensed under the Apache License, Version 2.0 (the "License");
- you may not use this file except in compliance with the License.
- You may obtain a copy of the License at
- http://www.apache.org/licenses/LICENSE-2.0
- Unless required by applicable law or agreed to in writing, software
- distributed under the License is distributed on an "AS IS" BASIS,
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- See the License for the specific language governing permissions and
- limitations under the License.
- ############################################################################## */
- #include "platform.h"
- #include "jmutex.hpp"
- #include "jsuperhash.hpp"
- #include "jmisc.hpp"
- #include "jfile.hpp"
- #include <stdio.h>
- #include <assert.h>
- //===========================================================================
- #ifndef _WIN32
- Mutex::Mutex()
- {
- pthread_mutex_init(&mutex, NULL);
- pthread_cond_init(&lock_free, NULL);
- owner = 0;
- lockcount = 0;
- }
- Mutex::~Mutex()
- {
- pthread_cond_destroy(&lock_free);
- pthread_mutex_destroy(&mutex);
- }
- void Mutex::lock()
- {
- pthread_mutex_lock(&mutex);
- while ((owner!=0) && !pthread_equal(owner, pthread_self()))
- pthread_cond_wait(&lock_free, &mutex);
- if (lockcount++==0)
- owner = pthread_self();
- pthread_mutex_unlock(&mutex);
- }
- bool Mutex::lockWait(unsigned timeout)
- {
- if (timeout==(unsigned)-1) {
- lock();
- return true;
- }
- pthread_mutex_lock(&mutex);
- bool first=true;
- while ((owner!=0) && !pthread_equal(owner, pthread_self())) {
- timespec abs;
- if (first) {
- getEndTime(abs, timeout);
- first = false;
- }
- if (pthread_cond_timedwait(&lock_free, &mutex, &abs)==ETIMEDOUT) {
- pthread_mutex_unlock(&mutex);
- return false;
- }
- }
- if (lockcount++==0)
- owner = pthread_self();
- pthread_mutex_unlock(&mutex);
- return true;
- }
- void Mutex::unlock()
- {
- pthread_mutex_lock(&mutex);
- #ifdef _DEBUG
- assertex(pthread_equal(owner, pthread_self()));
- #endif
- if (--lockcount==0)
- {
- owner = 0;
- pthread_cond_signal(&lock_free);
- }
- pthread_mutex_unlock(&mutex);
- }
- void Mutex::lockAll(int count)
- {
- if (count) {
- pthread_mutex_lock(&mutex);
- while ((owner!=0) && !pthread_equal(owner, pthread_self()))
- pthread_cond_wait(&lock_free, &mutex);
- lockcount = count;
- owner = pthread_self();
- pthread_mutex_unlock(&mutex);
- }
- }
- int Mutex::unlockAll()
- {
- pthread_mutex_lock(&mutex);
- int ret = lockcount;
- if (lockcount!=0) {
- #ifdef _DEBUG
- assertex(pthread_equal(owner, pthread_self()));
- #endif
- lockcount = 0;
- owner = 0;
- pthread_cond_signal(&lock_free);
- }
- pthread_mutex_unlock(&mutex);
- return ret;
- }
- inline bool read_data(int fd, void *buf, size_t nbytes)
- {
- size32_t nread = 0;
- while (nread<nbytes) {
- size32_t rd = read(fd, (char *)buf + nread, nbytes-nread);
- if ((int)rd>=0)
- nread += rd;
- else if (errno != EINTR)
- return false;
- }
- return true;
- }
- inline bool write_data(int fd, const void *buf, size_t nbytes)
- {
- size32_t nwritten= 0;
- while (nwritten<nbytes) {
- size32_t wr = write(fd, (const char *)buf + nwritten, nbytes-nwritten);
- if ((int)wr>=0)
- nwritten += wr;
- else if (errno != EINTR)
- return false;
- }
- return true;
- }
- #define POLLTIME (1000*15)
- static bool lock_file(const char *lfpath)
- {
- unsigned attempt = 0;
- while (attempt < 3) {
- char lckcontents[12];
- int fd = open(lfpath, O_RDWR | O_CREAT | O_EXCL, S_IRWXU);
- if (fd==-1) {
- if (errno != EEXIST)
- break;
- fd = open(lfpath, O_RDONLY);
- if (fd==-1)
- break;
- bool ok = read_data(fd, lckcontents, sizeof(lckcontents)-1);
- close(fd);
- if (ok) {
- lckcontents[sizeof(lckcontents)-1] = 0;
- int pid = atoi(lckcontents);
- if (pid==getpid())
- return true;
- if (kill(pid, 0) == -1) {
- if (errno != ESRCH)
- return false;
- unlink(lfpath);
- continue;
- }
- }
- Sleep(1000);
- attempt++;
- }
- else {
- sprintf(lckcontents,"%10d\n",(int)getpid());
- bool ok = write_data(fd, lckcontents, sizeof(lckcontents)-1);
- close(fd);
- if (!ok)
- break;
- }
- }
- return false;
- }
- static void unlock_file(const char *lfpath)
- {
- for (unsigned attempt=0;attempt<10;attempt++) {
- if (unlink(lfpath)>=0)
- return;
- attempt++;
- Sleep(500);
- }
- IERRLOG("NamedMutex cannot unlock file (%d)",errno);
- }
- static CriticalSection lockPrefixCS;
- static StringBuffer lockPrefix;
- NamedMutex::NamedMutex(const char *name)
- {
- {
- CriticalBlock b(lockPrefixCS);
- if (0 == lockPrefix.length())
- {
- if (!getConfigurationDirectory(NULL, "lock", NULL, NULL, lockPrefix))
- WARNLOG("Failed to get lock directory from environment");
- }
- addPathSepChar(lockPrefix);
- lockPrefix.append("JLIBMUTEX_");
- }
- StringBuffer tmp(lockPrefix);
- tmp.append("JLIBMUTEX_").append(name);
- mutexfname = tmp.detach();
- }
- NamedMutex::~NamedMutex()
- {
- free(mutexfname);
- }
- void NamedMutex::lock()
- {
- // first lock locally
- threadmutex.lock();
- // then lock globally
- for (;;) {
- if (lock_file(mutexfname))
- return;
- Sleep(POLLTIME);
- }
- }
- bool NamedMutex::lockWait(unsigned timeout)
- {
- unsigned t = msTick();
- // first lock locally
- if (!threadmutex.lockWait(timeout))
- return false;
- // then lock globally
- for (;;) {
- if (lock_file(mutexfname))
- return true;
- unsigned elapsed = msTick()-t;
- if (elapsed>=timeout) {
- threadmutex.unlock();
- break;
- }
- Sleep((timeout-elapsed)>POLLTIME?POLLTIME:(timeout-elapsed));
- }
- return false;
- }
- void NamedMutex::unlock()
- {
- // assumed held
- unlock_file(mutexfname);
- threadmutex.unlock();
- }
- #endif
- void synchronized::throwLockException(unsigned timeout)
- {
- throw MakeStringException(0,"Can not lock - %d",timeout);
- }
- //===========================================================================
- void Monitor::wait()
- {
- assertex(owner==GetCurrentThreadId());
- waiting++;
- void *cur = last;
- last = &cur;
- while (1) {
- int locked = unlockAll();
- sem->wait();
- lockAll(locked);
- if (cur==NULL) { // i.e. first in
- void **p=(void **)&last;
- while (*p!=&cur)
- p = (void **)*p;
- *p = NULL; // reset so next picks up
- break;
- }
- sem->signal();
- }
- }
- void Monitor::notify()
- { // should always be locked
- assertex(owner==GetCurrentThreadId());
- if (waiting)
- {
- waiting--;
- sem->signal();
- }
- }
- void Monitor::notifyAll()
- { // should always be locked
- assertex(owner==GetCurrentThreadId());
- if (waiting)
- {
- sem->signal(waiting);
- waiting = 0;
- }
- }
- //==================================================================================
- #ifdef USE_PTHREAD_RWLOCK
- bool ReadWriteLock::lockRead(unsigned timeout)
- {
- if (timeout == (unsigned)-1)
- {
- lockRead();
- return true;
- }
- if (pthread_rwlock_tryrdlock(&rwlock) == 0)
- return true;
- timespec endtime;
- getEndTime(endtime, timeout);
- return (pthread_rwlock_timedrdlock(&rwlock, &endtime) == 0);
- }
- bool ReadWriteLock::lockWrite(unsigned timeout)
- {
- if (timeout == (unsigned)-1)
- {
- lockWrite();
- return true;
- }
- if (pthread_rwlock_trywrlock(&rwlock) == 0)
- return true;
- timespec endtime;
- getEndTime(endtime, timeout);
- return (pthread_rwlock_timedwrlock(&rwlock, &endtime) == 0);
- }
- #endif
- //==================================================================================
- #ifdef USECHECKEDCRITICALSECTIONS
- CheckedReadLockBlock::CheckedReadLockBlock(ReadWriteLock &l, unsigned timeout, const char *fname,unsigned lnum) : lock(l)
- {
- for (;;)
- {
- if (lock.lockRead(timeout))
- break;
- PROGLOG("CheckedReadLockBlock timeout %s(%d)",fname,lnum);
- PrintStackReport();
- }
- }
- CheckedWriteLockBlock::CheckedWriteLockBlock(ReadWriteLock &l, unsigned timeout, const char *fname,unsigned lnum) : lock(l)
- {
- for (;;)
- {
- if (lock.lockWrite(timeout))
- break;
- PROGLOG("CheckedWriteLockBlock timeout %s(%d)",fname,lnum);
- PrintStackReport();
- }
- }
- void checkedReadLockEnter(ReadWriteLock &lock, unsigned timeout, const char *fname, unsigned lnum)
- {
- for (;;)
- {
- if (lock.lockRead(timeout))
- break;
- PROGLOG("checkedReadLockEnter timeout %s(%d)",fname,lnum);
- PrintStackReport();
- }
- }
- void checkedWriteLockEnter(ReadWriteLock &lock, unsigned timeout, const char *fname, unsigned lnum)
- {
- for (;;)
- {
- if (lock.lockWrite(timeout))
- break;
- PROGLOG("checkedWriteLockEnter timeout %s(%d)",fname,lnum);
- PrintStackReport();
- }
- }
- //==================================================================================
- void checkedCritEnter(CheckedCriticalSection &crit, unsigned timeout, const char *fname, unsigned lnum)
- {
- for (;;)
- {
- if (crit.lockWait(timeout))
- break;
- PROGLOG("checkedCritEnter timeout %s(%d)",fname,lnum);
- PrintStackReport();
- }
- }
- void checkedCritLeave(CheckedCriticalSection &crit)
- {
- crit.unlock();
- }
- CheckedCriticalBlock::CheckedCriticalBlock(CheckedCriticalSection &c, unsigned timeout, const char *fname,unsigned lnum)
- : crit(c)
- {
- for (;;)
- {
- if (crit.lockWait(timeout))
- break;
- PROGLOG("CheckedCriticalBlock timeout %s(%d)",fname,lnum);
- PrintStackReport();
- }
- }
- CheckedCriticalUnblock::~CheckedCriticalUnblock()
- {
- for (;;)
- {
- if (crit.lockWait(timeout))
- break;
- PROGLOG("CheckedCriticalUnblock timeout %s(%d)",fname,lnum);
- PrintStackReport();
- }
- }
- #endif
- void ThreadYield()
- { // works for SCHED_RR threads (<spit>) also
- #ifdef _WIN32
- Sleep(0);
- #else
- pthread_t self = pthread_self();
- int policy;
- sched_param param;
- pthread_getschedparam(self, &policy, ¶m);
- if (policy==SCHED_RR) {
- int saveprio = param.sched_priority;
- param.sched_priority = 0;
- pthread_setschedparam(self, SCHED_OTHER, ¶m);
- param.sched_priority = saveprio;
- sched_yield();
- pthread_setschedparam(self, policy, ¶m);
- }
- else
- sched_yield();
- #endif
- }
- void spinUntilReady(std::atomic_uint &value)
- {
- unsigned i = 0;
- const unsigned maxSpins = 10;
- while (value.load(std::memory_order_relaxed))
- {
- if (i++ == maxSpins)
- {
- i = 0;
- ThreadYield();
- }
- }
- }
|