Procházet zdrojové kódy

Merge pull request #5115 from ghalliday/issue10379

HPCC-10379 New semaphore implementation based on sem_xxx

Reviewed-By: Richard Chapman <rchapman@hpccsystems.com>
Richard Chapman před 11 roky
rodič
revize
b2ee7ad4fb

+ 67 - 7
system/jlib/jsem.cpp

@@ -24,22 +24,80 @@
 #ifndef _WIN32
 
 #include <sys/time.h>
+#include <semaphore.h>
 
+#ifndef USE_OLD_SEMAPHORE_CODE
 
 Semaphore::Semaphore(unsigned initialCount)
 {
-    init();
-    count = initialCount;
+    sem_init(&sem, 0, initialCount);
+}
+
+Semaphore::~Semaphore()
+{
+    sem_destroy(&sem);
+}
+
+void Semaphore::reinit(unsigned initialCount)
+{
+    sem_destroy(&sem);
+    sem_init(&sem, 0, initialCount);
+}
+
+void Semaphore::wait()
+{
+    sem_wait(&sem);
+}
+
+bool Semaphore::wait(unsigned timeout)
+{
+    if (timeout==(unsigned)-1) {
+        sem_wait(&sem);
+        return true;
+    }
+
+    //Ensure uncontended case is handled without calling gettimeofday
+    if (sem_trywait(&sem) == 0)
+        return true;
+
+    timeval cur;
+    gettimeofday(&cur, NULL);
+
+    timespec abs;
+    abs.tv_sec = cur.tv_sec + timeout/1000;
+    abs.tv_nsec = (cur.tv_usec + timeout%1000*1000)*1000;
+    if (abs.tv_nsec>=1000000000) {
+        abs.tv_nsec-=1000000000;
+        abs.tv_sec++;
+    }
+    int ret = sem_timedwait(&sem, &abs);
+    if (ret < 0)
+        return false;
+    return true;
+}
+
+
+
+void Semaphore::signal()
+{
+    sem_post(&sem);
 }
 
-#if 0 // not supported
-Semaphore::Semaphore(const char *name)
+void Semaphore::signal(unsigned n)
+{
+    for (unsigned i=0; i < n; i++)
+        sem_post(&sem);
+}
+
+#else
+
+//Old semaphore code based on condition variables.
+
+Semaphore::Semaphore(unsigned initialCount)
 {
-    //MORE - ignores the name...
     init();
-    count = 0;
+    count = initialCount;
 }
-#endif
 
 Semaphore::~Semaphore()
 {
@@ -118,3 +176,5 @@ void Semaphore::signal(unsigned n)
 
 #endif
 
+#endif
+

+ 9 - 1
system/jlib/jsem.hpp

@@ -119,6 +119,10 @@ protected:
 };
 #else
 
+#include <semaphore.h>
+
+//#define USE_OLD_SEMAPHORE_CODE
+
 class jlib_decl Semaphore
 {
 public:
@@ -129,15 +133,19 @@ public:
     void signal();
     void signal(unsigned count);
     void reinit(unsigned initialCount=0U);
+#ifndef USE_OLD_SEMAPHORE_CODE
+protected:
+    sem_t sem;
+#else
 protected:
     void init();
 protected:
     MutexId mx;
     pthread_cond_t cond;
     int count;
+#endif
 };
 
-
 #endif
 
 #endif

+ 1 - 0
testing/unittests/CMakeLists.txt

@@ -30,6 +30,7 @@ set (    SRCS
          unittests.cpp
          remotetests.cpp
          dalitests.cpp
+         jlibtests.cpp
     )
 
 include_directories (

+ 84 - 0
testing/unittests/jlibtests.cpp

@@ -0,0 +1,84 @@
+/*##############################################################################
+
+    HPCC SYSTEMS software Copyright (C) 2012 HPCC Systems.
+
+    Licensed under the Apache License, Version 2.0 (the "License");
+    you may not use this file except in compliance with the License.
+    You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing, software
+    distributed under the License is distributed on an "AS IS" BASIS,
+    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+    See the License for the specific language governing permissions and
+    limitations under the License.
+############################################################################## */
+
+/*
+ * Jlib regression tests
+ *
+ */
+
+#ifdef _USE_CPPUNIT
+#include "jsem.hpp"
+
+#include "unittests.hpp"
+
+class JlibSemTest : public CppUnit::TestFixture
+{
+public:
+    CPPUNIT_TEST_SUITE(JlibSemTest);
+        CPPUNIT_TEST(testSetup);
+        CPPUNIT_TEST(testSimple);
+        CPPUNIT_TEST(testCleanup);
+    CPPUNIT_TEST_SUITE_END();
+
+protected:
+
+    void testSetup()
+    {
+    }
+
+    void testCleanup()
+    {
+    }
+
+    void testTimedAvailable(Semaphore & sem)
+    {
+        unsigned now = msTick();
+        sem.wait(100);
+        unsigned taken = msTick() - now;
+        //Shouldn't cause a reschedule, definitely shouldn't wait for 100s
+        ASSERT(taken < 5);
+    }
+    void testTimedElapsed(Semaphore & sem, unsigned time)
+    {
+        unsigned now = msTick();
+        sem.wait(time);
+        unsigned taken = msTick() - now;
+        ASSERT(taken >= time && taken < 2*time);
+    }
+
+    void testSimple()
+    {
+        //Some very basic semaphore tests.
+        Semaphore sem;
+        sem.signal();
+        sem.wait();
+        testTimedElapsed(sem, 100);
+        sem.signal();
+        testTimedAvailable(sem);
+
+        sem.reinit(2);
+        sem.wait();
+        testTimedAvailable(sem);
+        testTimedElapsed(sem, 5);
+    }
+};
+
+CPPUNIT_TEST_SUITE_REGISTRATION( JlibSemTest );
+CPPUNIT_TEST_SUITE_NAMED_REGISTRATION( JlibSemTest, "JlibSemTest" );
+
+
+#endif // _USE_CPPUNIT