Преглед изворни кода

HPCC-19955 Use the pthread read write lock implementation

Signed-off-by: Gavin Halliday <gavin.halliday@lexisnexis.com>
Gavin Halliday пре 7 година
родитељ
комит
0503ba4bb9
5 измењених фајлова са 114 додато и 22 уклоњено
  1. 39 9
      system/jlib/jmutex.cpp
  2. 33 0
      system/jlib/jmutex.hpp
  3. 16 9
      system/jlib/jsem.cpp
  4. 2 0
      system/jlib/jsem.hpp
  5. 24 4
      testing/unittests/jlibtests.cpp

+ 39 - 9
system/jlib/jmutex.cpp

@@ -63,15 +63,7 @@ bool Mutex::lockWait(unsigned timeout)
     while ((owner!=0) && !pthread_equal(owner, pthread_self())) {
         timespec abs;
         if (first) {
-            timeval cur;
-            gettimeofday(&cur, NULL);
-            unsigned tsec=(timeout/1000);
-            abs.tv_sec = cur.tv_sec + tsec;
-            abs.tv_nsec = (cur.tv_usec + timeout%1000*1000)*1000;
-            if (abs.tv_nsec>=1000000000) {
-                abs.tv_nsec-=1000000000;
-                abs.tv_sec++;
-            }
+            getEndTime(abs, timeout);
             first = false;
         }
         if (pthread_cond_timedwait(&lock_free, &mutex, &abs)==ETIMEDOUT) {
@@ -329,6 +321,44 @@ void Monitor::notifyAll()
 
 //==================================================================================
 
+#ifdef USE_PTHREAD_RWLOCK
+
+bool ReadWriteLock::lockRead(unsigned timeout)
+{
+    if (timeout == (unsigned)-1)
+    {
+        lockRead();
+        return true;
+    }
+
+    if (pthread_rwlock_tryrdlock(&rwlock) == 0)
+        return true;
+
+    timespec endtime;
+    getEndTime(endtime, timeout);
+    return (pthread_rwlock_timedrdlock(&rwlock, &endtime) == 0);
+}
+
+bool ReadWriteLock::lockWrite(unsigned timeout)
+{
+    if (timeout == (unsigned)-1)
+    {
+        lockWrite();
+        return true;
+    }
+
+    if (pthread_rwlock_trywrlock(&rwlock) == 0)
+        return true;
+
+    timespec endtime;
+    getEndTime(endtime, timeout);
+    return (pthread_rwlock_timedwrlock(&rwlock, &endtime) == 0);
+}
+
+#endif
+
+//==================================================================================
+
 #ifdef USECHECKEDCRITICALSECTIONS
 CheckedReadLockBlock::CheckedReadLockBlock(ReadWriteLock &l, unsigned timeout, const char *fname,unsigned lnum) : lock(l)
 {

+ 33 - 0
system/jlib/jmutex.hpp

@@ -572,6 +572,13 @@ public:
     void notifyAll();   // only called when locked -- notifys for all waiting threads
 };
 
+//--------------------------------------------------------------------------------------------------------------------
+
+//Currently disabled since performance profile of own implementation is preferable, and queryWriteLocked() cannot be implemented
+//#define USE_PTHREAD_RWLOCK
+
+#ifndef USE_PTHREAD_RWLOCK
+
 class jlib_decl ReadWriteLock
 {
     bool lockRead(bool timed, unsigned timeout) { 
@@ -696,6 +703,29 @@ protected:
 #endif
 };
 
+#else
+
+class jlib_decl ReadWriteLock
+{
+public:
+    ReadWriteLock()         { pthread_rwlock_init(&rwlock, nullptr); }
+    ~ReadWriteLock()        { pthread_rwlock_destroy(&rwlock); }
+
+    void lockRead()         { pthread_rwlock_rdlock(&rwlock); }
+    void lockWrite()        { pthread_rwlock_wrlock(&rwlock); }
+    bool lockRead(unsigned timeout);
+    bool lockWrite(unsigned timeout);
+    void unlock()           { pthread_rwlock_unlock(&rwlock); }
+    void unlockRead()       { pthread_rwlock_unlock(&rwlock); }
+    void unlockWrite()      { pthread_rwlock_unlock(&rwlock); }
+    //  bool queryWriteLocked(); // I don't think this can be implemented on top of the pthread interface
+
+protected:
+    pthread_rwlock_t    rwlock;
+};
+
+#endif
+
 class ReadLockBlock
 {
     ReadWriteLock *lock;
@@ -728,6 +758,9 @@ public:
     }
 };
 
+
+//--------------------------------------------------------------------------------------------------------------------
+
 class Barrier
 {
     CriticalSection crit;

+ 16 - 9
system/jlib/jsem.cpp

@@ -26,6 +26,21 @@
 #include <sys/time.h>
 #include <semaphore.h>
 
+
+void getEndTime(timespec & abs, unsigned timeout)
+{
+    timeval cur;
+    gettimeofday(&cur, NULL);
+
+    abs.tv_sec = cur.tv_sec + timeout/1000;
+    abs.tv_nsec = (cur.tv_usec + timeout%1000*1000)*1000;
+    if (abs.tv_nsec>=1000000000) {
+        abs.tv_nsec-=1000000000;
+        abs.tv_sec++;
+    }
+}
+
+
 #ifndef USE_OLD_SEMAPHORE_CODE
 
 Semaphore::Semaphore(unsigned initialCount)
@@ -65,16 +80,8 @@ bool Semaphore::wait(unsigned timeout)
     if (sem_trywait(&sem) == 0)
         return true;
 
-    timeval cur;
-    gettimeofday(&cur, NULL);
-
     timespec abs;
-    abs.tv_sec = cur.tv_sec + timeout/1000;
-    abs.tv_nsec = (cur.tv_usec + timeout%1000*1000)*1000;
-    if (abs.tv_nsec>=1000000000) {
-        abs.tv_nsec-=1000000000;
-        abs.tv_sec++;
-    }
+    getEndTime(abs, timeout);
     int ret = sem_timedwait(&sem, &abs);
     if (ret < 0)
         return false;

+ 2 - 0
system/jlib/jsem.hpp

@@ -22,6 +22,8 @@
 
 #include "jiface.hpp"
 
+void jlib_decl getEndTime(timespec & abs, unsigned timeout);
+
 #ifdef _WIN32
 
 class jlib_decl Semaphore

+ 24 - 4
testing/unittests/jlibtests.cpp

@@ -1528,6 +1528,16 @@ public:
         contendedTimes.append(tester.run(title, numCores * 2, numIterations));\
     }
 
+    //Use to common out a test
+    #define XDO_TEST(LOCK, CLOCK, COUNTER, NUMVALUES, NUMLOCKS)   \
+    { \
+        uncontendedTimes.append(0);\
+        minorTimes.append(0);\
+        typicalTimes.append(0);\
+        contendedTimes.append(0);\
+    }
+
+
     class Null
     {};
 
@@ -1555,6 +1565,16 @@ public:
         DO_TEST(Null, Null, unsigned __int64, 2, 1);
         DO_TEST(Null, Null, unsigned __int64, 5, 1);
 
+        //Read locks will fail to prevent values being lost, but the timings are useful in comparison with CriticalSection
+        DO_TEST(ReadWriteLock, ReadLockBlock, unsigned __int64, 1, 1);
+        DO_TEST(ReadWriteLock, ReadLockBlock, unsigned __int64, 2, 1);
+        DO_TEST(ReadWriteLock, ReadLockBlock, unsigned __int64, 5, 1);
+        DO_TEST(ReadWriteLock, ReadLockBlock, unsigned __int64, 1, 2);
+        DO_TEST(ReadWriteLock, WriteLockBlock, unsigned __int64, 1, 1);
+        DO_TEST(ReadWriteLock, WriteLockBlock, unsigned __int64, 2, 1);
+        DO_TEST(ReadWriteLock, WriteLockBlock, unsigned __int64, 5, 1);
+        DO_TEST(ReadWriteLock, WriteLockBlock, unsigned __int64, 1, 2);
+
         printf("Summary\n");
         summariseTimings("Uncontended", uncontendedTimes);
         summariseTimings("Minor", minorTimes);
@@ -1564,10 +1584,10 @@ public:
 
     void summariseTimings(const char * option, UInt64Array & times)
     {
-        printf("%11s 1x: cs(%3" I64F "u) spin(%3" I64F "u) atomic(%3" I64F "u) ratomic(%3" I64F "u) cas(%3" I64F "u)   "
-                    "5x: cs(%3" I64F "u) spin(%3" I64F "u) atomic(%3" I64F "u) ratomic(%3" I64F "u) cas(%3" I64F "u)\n", option,
-                    times.item(0), times.item(4), times.item(8), times.item(12), times.item(14),
-                    times.item(2), times.item(6), times.item(10), times.item(13), times.item(15));
+        printf("%11s 1x: cs(%3" I64F "u) spin(%3" I64F "u) atomic(%3" I64F "u) ratomic(%3" I64F "u) cas(%3" I64F "u) rd(%3" I64F "u) wr(%3" I64F "u)   "
+                    "5x: cs(%3" I64F "u) spin(%3" I64F "u) atomic(%3" I64F "u) ratomic(%3" I64F "u) cas(%3" I64F "u) rd(%3" I64F "u) wr(%3" I64F "u)\n", option,
+                    times.item(0), times.item(4), times.item(8), times.item(12), times.item(14), times.item(19), times.item(23),
+                    times.item(2), times.item(6), times.item(10), times.item(13), times.item(15), times.item(21), times.item(25));
     }
 
 private: