Browse Source

Merge pull request #13300 from jakesmith/hpcc-23433-rtm-lock-race

HPCC-23433 Fix locking race condition with RTM_DELETE_ON_DISCONNECT

Reviewed-by: Gavin Halliday <ghalliday@hpccsystems.com>
Gavin Halliday 5 years ago
parent
commit
3a29c1068e
5 changed files with 355 additions and 185 deletions
  1. 41 54
      dali/base/dasds.cpp
  2. 185 2
      system/jlib/jmutex.cpp
  3. 16 90
      system/jlib/jmutex.hpp
  4. 20 0
      testing/unittests/dalitests.cpp
  5. 93 39
      testing/unittests/jlibtests.cpp

+ 41 - 54
dali/base/dasds.cpp

@@ -202,7 +202,6 @@ public:
 class CLCLockBlock : public CInterface
 {
     ReadWriteLock &lock;
-    bool readLocked; // false == writeLocked
     unsigned got, lnum;
 public:
     CLCLockBlock(ReadWriteLock &_lock, bool readLock, unsigned timeout, const char *fname, unsigned _lnum) : lock(_lock), lnum(_lnum)
@@ -224,24 +223,25 @@ public:
             PrintStackReport();
         }
         got = msTick();
-        readLocked = readLock; // false == writeLocked
     };
     ~CLCLockBlock()
     {
-        if (readLocked)
-            lock.unlockRead();
-        else
-            lock.unlockWrite();
+        bool writeLocked = lock.queryWriteLocked();
+        lock.unlock();
         unsigned e=msTick()-got;
         if (e>readWriteSlowTracing)
         {
             StringBuffer s("TIME: CLCLockBlock(write=");
-            s.append(!readLocked).append(",lnum=").append(lnum).append(") took ").append(e).append(" ms");
+            s.append(writeLocked).append(",lnum=").append(lnum).append(") took ").append(e).append(" ms");
             DBGLOG("%s", s.str());
             if (readWriteStackTracing)
                 PrintStackReport();
         }
     }
+    void changeToWrite(unsigned timeout)
+    {
+        lock.changeToWrite(timeout);
+    }
 };
 #else
 class LinkingCriticalBlock : public CriticalBlock, public CInterface, implements IInterface
@@ -253,7 +253,6 @@ public:
 class CLCLockBlock : public CInterface
 {
     ReadWriteLock &lock;
-    bool readLocked; // false == writeLocked
 public:
     CLCLockBlock(ReadWriteLock &_lock, bool readLock, unsigned timeout, const char *fname, unsigned lnum) : lock(_lock)
     {
@@ -261,30 +260,21 @@ public:
             lock.lockRead();
         else
             lock.lockWrite();
-        readLocked = readLock; // false == writeLocked
     };
     ~CLCLockBlock()
     {
-        if (readLocked)
-            lock.unlockRead();
-        else
-            lock.unlockWrite();
+        lock.unlock();
+    }
+    void changeToWrite(unsigned timeout)
+    {
+        lock.changeToWrite(timeout);
     }
 };
 #endif
-class CLCReadLockBlock : public CLCLockBlock
-{
-public:
-    CLCReadLockBlock(ReadWriteLock &lock, unsigned timeout, const char *fname, unsigned lnum) : CLCLockBlock(lock, true, timeout, fname, lnum) { }
-};
-class CLCWriteLockBlock : public CLCLockBlock
-{
-public:
-    CLCWriteLockBlock(ReadWriteLock &lock, unsigned timeout, const char *fname, unsigned lnum) : CLCLockBlock(lock, false, timeout, fname, lnum) { }
-};
+
 #ifdef USECHECKEDCRITICALSECTIONS
-#define CHECKEDDALIREADLOCKBLOCK(l,timeout)  Owned<CLCReadLockBlock> glue(block,__LINE__) = new CLCReadLockBlock(l,timeout,__FILE__,__LINE__)
-#define CHECKEDDALIWRITELOCKBLOCK(l,timeout)  Owned<CLCWriteLockBlock> glue(block,__LINE__) = new CLCWriteLockBlock(l,timeout,__FILE__,__LINE__)
+#define CHECKEDDALIREADLOCKBLOCK(l, timeout)  Owned<CLCLockBlock> glue(block,__LINE__) = new CLCLockBlock(l, true, timeout, __FILE__, __LINE__)
+#define CHECKEDDALIWRITELOCKBLOCK(l, timeout)  Owned<CLCLockBlock> glue(block,__LINE__) = new CLCLockBlock(l, false, timeout, __FILE__, __LINE__)
 #else
 #define CHECKEDDALIREADLOCKBLOCK(l,timeout)   ReadLockBlock glue(block,__LINE__)(l)
 #define CHECKEDDALIWRITELOCKBLOCK(l,timeout)  WriteLockBlock glue(block,__LINE__)(l)
@@ -1968,7 +1958,7 @@ public:
     unsigned __int64 getNextExternal() { return nextExternal++; }
     CServerConnection *createConnectionInstance(CRemoteTreeBase *root, SessionId sessionId, unsigned mode, unsigned timeout, const char *xpath, CRemoteTreeBase *&tree, ConnectionId connectionId, StringAttr *deltaPath, Owned<IPropertyTree> &deltaChange, Owned<CBranchChange> &branchChange, unsigned &additions);
     void createConnection(SessionId sessionId, unsigned mode, unsigned timeout, const char *xpath, CServerRemoteTree *&tree, ConnectionId &connectionId, bool primary, Owned<LinkingCriticalBlock> &connectCritBlock);
-    void disconnect(ConnectionId connectionId, bool deleteRoot=false, Owned<CLCLockBlock> *lockBlock=NULL);
+    void disconnect(ConnectionId connectionId, bool deleteRoot=false, CLCLockBlock *lockBlock=nullptr);
     void registerTree(__int64 serverId, CServerRemoteTree &tree);
     void unregisterTree(__int64 uniqId);
     CServerRemoteTree *queryRegisteredTree(__int64 uniqId);
@@ -2260,11 +2250,11 @@ void CServerConnection::aborted(SessionId id)
 {
     LOG(MCdebugInfo(100), unknownJob, "CServerConnection: connection aborted (%" I64F "x) sessId=%" I64F "x",connectionId, id);
 #if 0 // JCSMORE - think this is ok, but concerned about deadlock, change later.
-    Owned<CLCLockBlock> lockBlock = new CLCWriteLockBlock(((CCovenSDSManager &)manager).dataRWLock, readWriteTimeout, __FILE__, __LINE__);
+    Owned<CLCLockBlock> lockBlock = new CLCLockBlock(((CCovenSDSManager &)manager).dataRWLock, false, readWriteTimeout, __FILE__, __LINE__);
     SDSManager->disconnect(connectionId, false);
 #else
-    Owned<CLCLockBlock> lockBlock = new CLCReadLockBlock(((CCovenSDSManager &)manager).dataRWLock, readWriteTimeout, __FILE__, __LINE__);
-    SDSManager->disconnect(connectionId, false, &lockBlock);
+    Owned<CLCLockBlock> lockBlock = new CLCLockBlock(((CCovenSDSManager &)manager).dataRWLock, true, readWriteTimeout, __FILE__, __LINE__);
+    SDSManager->disconnect(connectionId, false, lockBlock);
 #endif
 }
 
@@ -4104,9 +4094,9 @@ void CSDSTransactionServer::processMessage(CMessageBuffer &mb)
                     transactionLog.log("xpath='%s' mode=%d", xpath.get(), (unsigned)mode);
                 Owned<LinkingCriticalBlock> connectCritBlock = new LinkingCriticalBlock(manager.connectCrit, __FILE__, __LINE__);
                 if (RTM_CREATE == (mode & RTM_CREATE_MASK) || RTM_CREATE_QUERY == (mode & RTM_CREATE_MASK))
-                    lockBlock.setown(new CLCWriteLockBlock(manager.dataRWLock, readWriteTimeout, __FILE__, __LINE__));
+                    lockBlock.setown(new CLCLockBlock(manager.dataRWLock, false, readWriteTimeout, __FILE__, __LINE__));
                 else
-                    lockBlock.setown(new CLCReadLockBlock(manager.dataRWLock, readWriteTimeout, __FILE__, __LINE__));
+                    lockBlock.setown(new CLCLockBlock(manager.dataRWLock, true, readWriteTimeout, __FILE__, __LINE__));
                 if (queryTransactionLogging())
                     transactionLog.markExtra();
                 connectionId = 0;
@@ -4150,7 +4140,7 @@ void CSDSTransactionServer::processMessage(CMessageBuffer &mb)
                 Owned<IMultipleConnector> mConnect = deserializeIMultipleConnector(mb);
                 mb.clear();
 
-                lockBlock.setown(new CLCReadLockBlock(manager.dataRWLock, readWriteTimeout, __FILE__, __LINE__));
+                lockBlock.setown(new CLCLockBlock(manager.dataRWLock, true, readWriteTimeout, __FILE__, __LINE__));
 
                 try
                 {
@@ -4371,9 +4361,9 @@ void CSDSTransactionServer::processMessage(CMessageBuffer &mb)
                 { 
                     CheckTime block1("DAMP_SDSCMD_DATA.1");
                     if (data || deleteRoot)
-                        lockBlock.setown(new CLCWriteLockBlock(manager.dataRWLock, readWriteTimeout, __FILE__, __LINE__));
+                        lockBlock.setown(new CLCLockBlock(manager.dataRWLock, false, readWriteTimeout, __FILE__, __LINE__));
                     else
-                        lockBlock.setown(new CLCReadLockBlock(manager.dataRWLock, readWriteTimeout, __FILE__, __LINE__));
+                        lockBlock.setown(new CLCLockBlock(manager.dataRWLock, true, readWriteTimeout, __FILE__, __LINE__));
                 }
                 unsigned dataStart = mb.getPos();
                 commitTimingBlock.recordSize(mb.length() - dataStart);
@@ -4419,11 +4409,11 @@ void CSDSTransactionServer::processMessage(CMessageBuffer &mb)
                 catch (IException *)
                 {
                     if (disconnect)
-                        manager.disconnect(connectionId, deleteRoot, (data || deleteRoot)?NULL:&lockBlock);
+                        manager.disconnect(connectionId, deleteRoot, (data || deleteRoot)?nullptr:lockBlock);
                     throw;
                 }
                 if (disconnect)
-                    manager.disconnect(connectionId, deleteRoot, (data || deleteRoot)?NULL:&lockBlock);
+                    manager.disconnect(connectionId, deleteRoot, (data || deleteRoot)?nullptr:lockBlock);
 
                 break;
             }
@@ -6681,7 +6671,7 @@ StringBuffer &getMConnectString(IMultipleConnector *mConnect, StringBuffer &s)
 IRemoteConnections *CCovenSDSManager::connect(IMultipleConnector *mConnect, SessionId id, unsigned timeout)
 {
     Owned<CLCLockBlock> lockBlock;
-    lockBlock.setown(new CLCReadLockBlock(dataRWLock, readWriteTimeout, __FILE__, __LINE__));
+    lockBlock.setown(new CLCLockBlock(dataRWLock, true, readWriteTimeout, __FILE__, __LINE__));
 
     Owned<CRemoteConnections> remoteConnections = new CRemoteConnections;
     unsigned c;
@@ -6705,9 +6695,9 @@ IRemoteConnection *CCovenSDSManager::connect(const char *xpath, SessionId id, un
     {
         connectCritBlock.setown(new LinkingCriticalBlock(connectCrit, __FILE__, __LINE__));
         if (RTM_CREATE == (mode & RTM_CREATE_MASK) || RTM_CREATE_QUERY == (mode & RTM_CREATE_MASK))
-            lockBlock.setown(new CLCWriteLockBlock(dataRWLock, readWriteTimeout, __FILE__, __LINE__));
+            lockBlock.setown(new CLCLockBlock(dataRWLock, false, readWriteTimeout, __FILE__, __LINE__));
         else
-            lockBlock.setown(new CLCReadLockBlock(dataRWLock, readWriteTimeout, __FILE__, __LINE__));
+            lockBlock.setown(new CLCLockBlock(dataRWLock, true, readWriteTimeout, __FILE__, __LINE__));
     }
 
     CServerRemoteTree *_tree;
@@ -6865,9 +6855,9 @@ void CCovenSDSManager::installNotifyHandler(const char *handlerKey, ISDSNotifyHa
 // ISDSConnectionManager impl.
 void CCovenSDSManager::commit(CRemoteConnection &connection, bool *disconnectDeleteRoot)
 {
-    Owned<CLCWriteLockBlock> lockBlock;
+    Owned<CLCLockBlock> lockBlock;
     if (!RTM_MODE(connection.queryMode(), RTM_INTERNAL))
-        lockBlock.setown(new CLCWriteLockBlock(dataRWLock, readWriteTimeout, __FILE__, __LINE__));
+        lockBlock.setown(new CLCLockBlock(dataRWLock, false, readWriteTimeout, __FILE__, __LINE__));
 
     CClientRemoteTree *tree = (CClientRemoteTree *) connection.queryRoot();
 
@@ -6914,9 +6904,9 @@ void CCovenSDSManager::commit(CRemoteConnection &connection, bool *disconnectDel
 
 CRemoteTreeBase *CCovenSDSManager::get(CRemoteConnection &connection, __int64 serverId)
 {
-    Owned<CLCReadLockBlock> lockBlock;
+    Owned<CLCLockBlock> lockBlock;
     if (!RTM_MODE(connection.queryMode(), RTM_INTERNAL))
-        lockBlock.setown(new CLCReadLockBlock(dataRWLock, readWriteTimeout, __FILE__, __LINE__));
+        lockBlock.setown(new CLCLockBlock(dataRWLock, true, readWriteTimeout, __FILE__, __LINE__));
     CDisableFetchChangeBlock block(connection);
     CRemoteTreeBase *connectionRoot = (CRemoteTreeBase *) connection.queryRoot();
     Owned<CServerRemoteTree> tree = getRegisteredTree(connectionRoot->queryServerId());
@@ -6929,9 +6919,9 @@ CRemoteTreeBase *CCovenSDSManager::get(CRemoteConnection &connection, __int64 se
 
 void CCovenSDSManager::getChildren(CRemoteTreeBase &parent, CRemoteConnection &connection, unsigned levels)
 {
-    Owned<CLCReadLockBlock> lockBlock;
+    Owned<CLCLockBlock> lockBlock;
     if (!RTM_MODE(connection.queryMode(), RTM_INTERNAL))
-        lockBlock.setown(new CLCReadLockBlock(dataRWLock, readWriteTimeout, __FILE__, __LINE__));
+        lockBlock.setown(new CLCLockBlock(dataRWLock, true, readWriteTimeout, __FILE__, __LINE__));
     CDisableFetchChangeBlock block(connection);
     Owned<CServerRemoteTree> serverParent = (CServerRemoteTree *)getRegisteredTree(parent.queryServerId());
     if (serverParent)
@@ -6940,9 +6930,9 @@ void CCovenSDSManager::getChildren(CRemoteTreeBase &parent, CRemoteConnection &c
 
 void CCovenSDSManager::getChildrenFor(CRTArray &childLessList, CRemoteConnection &connection, unsigned levels)
 {
-    Owned<CLCReadLockBlock> lockBlock;
+    Owned<CLCLockBlock> lockBlock;
     if (!RTM_MODE(connection.queryMode(), RTM_INTERNAL))
-        lockBlock.setown(new CLCReadLockBlock(dataRWLock, readWriteTimeout, __FILE__, __LINE__));
+        lockBlock.setown(new CLCLockBlock(dataRWLock, true, readWriteTimeout, __FILE__, __LINE__));
     CDisableFetchChangeBlock block(connection);
 
     ForEachItemIn(f, childLessList)
@@ -7032,7 +7022,7 @@ void CCovenSDSManager::_getChildren(CRemoteTreeBase &parent, CServerRemoteTree &
 
 IPropertyTreeIterator *CCovenSDSManager::getElements(CRemoteConnection &connection, const char *xpath)
 {
-    Owned<CLCReadLockBlock> lockBlock = new CLCReadLockBlock(dataRWLock, readWriteTimeout, __FILE__, __LINE__);
+    Owned<CLCLockBlock> lockBlock = new CLCLockBlock(dataRWLock, true, readWriteTimeout, __FILE__, __LINE__);
     CDisableFetchChangeBlock block(connection);
     Owned<CServerRemoteTree> serverConnRoot = (CServerRemoteTree *)getRegisteredTree(((CClientRemoteTree *)connection.queryRoot())->queryServerId());
     Owned<DaliPTArrayIterator> elements = new DaliPTArrayIterator();
@@ -7427,7 +7417,7 @@ bool CCovenSDSManager::unlock(__int64 connectionId, bool close, StringBuffer &co
     if (close)
     {
         PROGLOG("forcing unlock & disconnection of connection : %s", connectionInfo.str());
-        Owned<CLCLockBlock> lockBlock = new CLCWriteLockBlock(dataRWLock, readWriteTimeout, __FILE__, __LINE__);
+        Owned<CLCLockBlock> lockBlock = new CLCLockBlock(dataRWLock, false, readWriteTimeout, __FILE__, __LINE__);
         SDSManager->disconnect(connectionId, false);
     }
     else // leave connection open, just unlock
@@ -7818,7 +7808,7 @@ CServerConnection *CCovenSDSManager::getConnection(ConnectionId id)
     return conn;
 }
 
-void CCovenSDSManager::disconnect(ConnectionId id, bool deleteRoot, Owned<CLCLockBlock> *lockBlock)
+void CCovenSDSManager::disconnect(ConnectionId id, bool deleteRoot, CLCLockBlock *lockBlock)
 {
     Linked<CServerConnection> connection;
     { CHECKEDCRITICALBLOCK(cTableCrit, fakeCritTimeout);
@@ -7863,10 +7853,7 @@ void CCovenSDSManager::disconnect(ConnectionId id, bool deleteRoot, Owned<CLCLoc
     if (deleteRoot)
     {
         if (lockBlock)
-        {
-            lockBlock->clear();
-            lockBlock->setown(new CLCWriteLockBlock(dataRWLock, readWriteTimeout, __FILE__, __LINE__));
-        }
+            lockBlock->changeToWrite(readWriteTimeout);
         connection->queryParent()->removeTree(tree);
         writeTransactions++;
         if (!orphaned)

+ 185 - 2
system/jlib/jmutex.cpp

@@ -15,7 +15,6 @@
     limitations under the License.
 ############################################################################## */
 
-
 #include "platform.h"
 #include "jmutex.hpp"
 #include "jsuperhash.hpp"
@@ -321,7 +320,191 @@ void Monitor::notifyAll()
 
 //==================================================================================
 
-#ifdef USE_PTHREAD_RWLOCK
+#ifndef USE_PTHREAD_RWLOCK
+
+bool ReadWriteLock::lockRead(bool timed, unsigned timeout)
+{  
+    cs.enter(); 
+    if (writeLocks == 0) 
+    {
+        readLocks++;
+        cs.leave();
+    }
+    else
+    {
+        readWaiting++;
+        cs.leave();
+        if (timed)
+        {
+            if (!readSem.wait(timeout))
+            {
+                cs.enter(); 
+                if (!readSem.wait(0))
+                {
+                    readWaiting--;
+                    cs.leave();
+                    return false;
+                }
+                cs.leave();
+            }
+        }
+        else
+            readSem.wait();
+        //NB: waiting and locks adjusted before the signal occurs.
+    }
+    return true;
+}
+
+bool ReadWriteLock::lockWrite(bool timed, unsigned timeout)
+{
+    cs.enter();
+    if ((readLocks == 0) && (writeLocks == 0))
+    {
+        writeLocks++;
+        cs.leave();
+    }
+    else
+    {
+        writeWaiting++;
+        cs.leave();
+        if (timed)
+        {
+            if (!writeSem.wait(timeout))
+            {
+                cs.enter(); 
+                if (!writeSem.wait(0))
+                {
+                    writeWaiting--;
+                    cs.leave();
+                    return false;
+                }
+                cs.leave();
+            }
+        }
+        else
+            writeSem.wait();
+        //NB: waiting and locks adjusted before the signal occurs.
+    }
+#ifdef _DEBUG
+    exclWriteOwner = GetCurrentThreadId();
+#endif
+    return true;
+}
+
+bool ReadWriteLock::changeToWrite(bool timed, unsigned timeout)
+{
+    cs.enter();
+
+    // invalid use cases, changeToWrite must only be called when read locked.
+    if (writeLocks)
+    {
+        cs.leave();
+        throw makeStringException(9999, "Internal Error: ReadWriteLock::changeToWrite - already write locked");
+    }
+    else if (0 == readLocks)
+    {
+        cs.leave();
+        throw makeStringException(9999, "Internal Error: ReadWriteLock::changeToWrite - not read locked");
+    }
+
+    if (readLocks == 1)
+    {
+        readLocks--;
+        writeLocks++;
+        cs.leave();
+    }
+    else // readLocks>1
+    {
+        readToWriteWaiting++;
+        cs.leave();
+        if (timed)
+        {
+            if (!readToWriteSem.wait(timeout))
+            {
+                cs.enter(); 
+                if (!readToWriteSem.wait(0))
+                {
+                    readToWriteWaiting--;
+                    cs.leave();
+                    return false;
+                }
+                cs.leave();
+            }
+        }
+        else
+            readToWriteSem.wait();
+        //NB: waiting and locks adjusted before the signal occurs.
+    }
+#ifdef _DEBUG
+    exclWriteOwner = GetCurrentThreadId();
+#endif
+    return true;
+}
+
+bool ReadWriteLock::changeToRead()
+{
+    cs.enter();
+    if (!writeLocks)
+    {
+        cs.leave();
+        throw makeStringException(9999, "Internal Error: ReadWriteLock::changeToRead - not write locked");
+
+    }
+    --writeLocks;
+    dbgassertex(!readLocks);
+    readLocks++;
+    cs.leave();
+#ifdef _DEBUG
+    exclWriteOwner = 0;
+#endif
+    return true;
+}
+
+void ReadWriteLock::unlock()
+{
+    cs.enter(); 
+    if (readLocks) // implies this unlock() is paired with a previous read lock
+    {
+        readLocks--;
+        if (readToWriteWaiting && (1 == readLocks))
+        {
+            readToWriteWaiting--;
+            readLocks--;
+            writeLocks++;
+            readToWriteSem.signal();
+        }
+        else if (writeWaiting && (0 == readLocks))
+        {
+            writeWaiting--;
+            writeLocks++;
+            writeSem.signal();
+        }
+    }
+    else // implies this unlock() is paired with a previous write lock
+    {
+        writeLocks--;
+        dbgassertex(writeLocks == 0);
+        if (readWaiting)
+        {
+            unsigned numWaiting = readWaiting;
+            readWaiting = 0;
+            readLocks += numWaiting;
+            readSem.signal(numWaiting);
+        }
+        else if (writeWaiting)
+        {
+            writeWaiting--;
+            writeLocks++;
+            writeSem.signal();
+        }
+#ifdef _DEBUG
+        exclWriteOwner = 0;
+#endif
+    }
+    cs.leave();
+}
+
+#else
 
 bool ReadWriteLock::lockRead(unsigned timeout)
 {

+ 16 - 90
system/jlib/jmutex.hpp

@@ -586,71 +586,14 @@ public:
 
 class jlib_decl ReadWriteLock
 {
-    bool lockRead(bool timed, unsigned timeout) { 
-                                cs.enter(); 
-                                if (writeLocks == 0) 
-                                {
-                                    readLocks++;
-                                    cs.leave();
-                                }
-                                else
-                                {
-                                    readWaiting++;
-                                    cs.leave();
-                                    if (timed)
-                                    {
-                                        if (!readSem.wait(timeout)) {
-                                            cs.enter(); 
-                                            if (!readSem.wait(0)) {
-                                                readWaiting--;
-                                                cs.leave();
-                                                return false;
-                                            }
-                                            cs.leave();
-                                        }
-                                    }
-                                    else
-                                        readSem.wait();
-                                    //NB: waiting and locks adjusted before the signal occurs.
-                                }
-                                return true;
-                            }
-    bool lockWrite(bool timed, unsigned timeout) { 
-                                cs.enter(); 
-                                if ((readLocks == 0) && (writeLocks == 0))
-                                {
-                                    writeLocks++;
-                                    cs.leave();
-                                }
-                                else
-                                {
-                                    writeWaiting++;
-                                    cs.leave();
-                                    if (timed)
-                                    {
-                                        if (!writeSem.wait(timeout)) {
-                                            cs.enter(); 
-                                            if (!writeSem.wait(0)) {
-                                                writeWaiting--;
-                                                cs.leave();
-                                                return false;
-                                            }
-                                            cs.leave();
-                                        }
-                                    }
-                                    else
-                                        writeSem.wait();
-                                    //NB: waiting and locks adjusted before the signal occurs.
-                                }
-#ifdef _DEBUG
-                                exclWriteOwner = GetCurrentThreadId();
-#endif
-                                return true;
-                            }
+    bool lockRead(bool timed, unsigned timeout);
+    bool lockWrite(bool timed, unsigned timeout);
+    bool changeToWrite(bool timed, unsigned timeout);
 public:
     ReadWriteLock()
     {
         readLocks = 0; writeLocks = 0; readWaiting = 0; writeWaiting = 0;
+        readToWriteWaiting = 0;
 #ifdef _DEBUG
         exclWriteOwner = 0;
 #endif
@@ -661,35 +604,16 @@ public:
     void lockWrite()        { lockWrite(false, 0); }
     bool lockRead(unsigned timeout) { return lockRead(true, timeout); }
     bool lockWrite(unsigned timeout) { return lockWrite(true, timeout); }
-    void unlock()           { 
-                                cs.enter(); 
-                                if (readLocks) readLocks--;
-                                else
-                                {
-                                    writeLocks--;
-#ifdef _DEBUG
-                                    exclWriteOwner = 0;
-#endif
-                                }
-                                assertex(writeLocks == 0);
-                                if (readLocks == 0)
-                                {
-                                    if (readWaiting)
-                                    {
-                                        unsigned numWaiting = readWaiting;
-                                        readWaiting = 0;
-                                        readLocks += numWaiting;
-                                        readSem.signal(numWaiting);
-                                    }
-                                    else if (writeWaiting)
-                                    {
-                                        writeWaiting--;
-                                        writeLocks++;
-                                        writeSem.signal();
-                                    }
-                                }
-                                cs.leave();
-                            }
+    void changeToWrite()
+    {
+        changeToWrite(false, 0);
+    }
+    bool changeToWrite(unsigned timeout)
+    {
+        return changeToWrite(true, timeout);
+    }
+    bool changeToRead();
+    void unlock();
     bool queryWriteLocked() { return (writeLocks != 0); }
     void unlockRead()       { unlock(); }
     void unlockWrite()      { unlock(); }
@@ -699,10 +623,12 @@ protected:
     CriticalSection     cs;
     Semaphore           readSem;
     Semaphore           writeSem;
+    Semaphore           readToWriteSem;
     unsigned            readLocks;
     unsigned            writeLocks;
     unsigned            readWaiting;
     unsigned            writeWaiting;
+    unsigned            readToWriteWaiting;
 #ifdef _DEBUG
     ThreadId            exclWriteOwner;
 #endif

+ 20 - 0
testing/unittests/dalitests.cpp

@@ -33,6 +33,7 @@
 #include "dautils.hpp"
 
 #include <vector>
+#include <future>
 
 #include "unittests.hpp"
 
@@ -804,6 +805,7 @@ class CDaliSDSStressTests : public CppUnit::TestFixture
         CPPUNIT_TEST(testSDSSubs);
         CPPUNIT_TEST(testSDSSubs2);
         CPPUNIT_TEST(testSDSNodeSubs);
+        CPPUNIT_TEST(testEphemeralLocks);
     CPPUNIT_TEST_SUITE_END();
 
     const IContextLogger &logctx;
@@ -1438,6 +1440,24 @@ public:
 
         ASSERT(0xa68e2324 == results.getCRC() && "SDS Node notifcation differences");
     }
+    void testEphemeralLocks()
+    {
+        auto createEphemeralLock = [&](const char *xpath, unsigned timeout, unsigned type)
+        {
+            unsigned mode = RTM_LOCK_WRITE | RTM_CREATE_QUERY | RTM_DELETE_ON_DISCONNECT;
+            Owned<IRemoteConnection> conn = querySDS().connect(xpath, myProcessSession(), mode, timeout);
+            conn->commit();
+        };
+
+        unsigned numThreads = 100;
+        std::vector<std::future<void>> results;
+        const char *xpath = "/Locks/TestEphemeralLock";
+        unsigned timeout = 2000;
+        for (unsigned t=0; t<numThreads; t++)
+            results.push_back(std::async(std::launch::async, createEphemeralLock, xpath, timeout, t%2));
+        for (auto &f: results)
+            f.get();
+    }
 };
 
 CPPUNIT_TEST_SUITE_REGISTRATION( CDaliSDSStressTests );

+ 93 - 39
testing/unittests/jlibtests.cpp

@@ -1529,16 +1529,16 @@ public:
         COUNTER extraValues[NUMVALUES];
     };
 
-    #define DO_TEST(LOCK, CLOCK, COUNTER, NUMVALUES, NUMLOCKS)   \
+    #define DO_TEST(LOCK, CLOCK, COUNTER, NUMVALUES, NUMLOCKS, NUMITERATIONS)   \
     { \
-        const char * title = #LOCK "," #COUNTER;\
+        const char * title = #LOCK "," #CLOCK "," #COUNTER;\
         LockTester<LOCK, CLOCK, COUNTER, NUMVALUES, NUMLOCKS> tester;\
-        uncontendedTimes.append(tester.run(title, 1, numIterations));\
-        minorTimes.append(tester.run(title, 2, numIterations));\
-        typicalTimes.append(tester.run(title, numCores / 2, numIterations));\
-        tester.run(title, numCores, numIterations);\
-        tester.run(title, numCores + 1, numIterations);\
-        contendedTimes.append(tester.run(title, numCores * 2, numIterations));\
+        uncontendedTimes.append(tester.run(title, 1, NUMITERATIONS));\
+        minorTimes.append(tester.run(title, 2, NUMITERATIONS));\
+        typicalTimes.append(tester.run(title, numCores / 2, NUMITERATIONS));\
+        tester.run(title, numCores, NUMITERATIONS);\
+        tester.run(title, numCores + 1, NUMITERATIONS);\
+        contendedTimes.append(tester.run(title, numCores * 2, NUMITERATIONS));\
     }
 
     //Use to common out a test
@@ -1558,35 +1558,89 @@ public:
     const unsigned numCores = getAffinityCpus();
     void runAllTests()
     {
-        DO_TEST(CriticalSection, CriticalBlock, unsigned __int64, 1, 1);
-        DO_TEST(CriticalSection, CriticalBlock, unsigned __int64, 2, 1);
-        DO_TEST(CriticalSection, CriticalBlock, unsigned __int64, 5, 1);
-        DO_TEST(CriticalSection, CriticalBlock, unsigned __int64, 1, 2);
-        DO_TEST(SpinLock, SpinBlock, unsigned __int64, 1, 1);
-        DO_TEST(SpinLock, SpinBlock, unsigned __int64, 2, 1);
-        DO_TEST(SpinLock, SpinBlock, unsigned __int64, 5, 1);
-        DO_TEST(SpinLock, SpinBlock, unsigned __int64, 1, 2);
-        DO_TEST(Null, Null, std::atomic<unsigned __int64>, 1, 1);
-        DO_TEST(Null, Null, std::atomic<unsigned __int64>, 2, 1);
-        DO_TEST(Null, Null, std::atomic<unsigned __int64>, 5, 1);
-        DO_TEST(Null, Null, std::atomic<unsigned __int64>, 1, 2);
-        DO_TEST(Null, Null, RelaxedAtomic<unsigned __int64>, 1, 1);
-        DO_TEST(Null, Null, RelaxedAtomic<unsigned __int64>, 5, 1);
-        DO_TEST(Null, Null, CasCounter, 1, 1);
-        DO_TEST(Null, Null, CasCounter, 5, 1);
-        DO_TEST(Null, Null, unsigned __int64, 1, 1);
-        DO_TEST(Null, Null, unsigned __int64, 2, 1);
-        DO_TEST(Null, Null, unsigned __int64, 5, 1);
+        class WriteToReadLockBlock
+        {
+            ReadWriteLock *lock;
+        public:
+            WriteToReadLockBlock(ReadWriteLock &l) : lock(&l)
+            {
+                lock->lockWrite();
+            }
+            ~WriteToReadLockBlock()
+            {
+                if (lock)
+                {
+                    lock->changeToRead();
+                    lock->unlockRead();
+                }
+            }
+        };
+
+        class ReadToWriteLockBlock
+        {
+            ReadWriteLock *lock;
+        public:
+            ReadToWriteLockBlock(ReadWriteLock &l) : lock(&l)
+            {
+                lock->lockRead();
+            }
+            ~ReadToWriteLockBlock()
+            {
+                if (lock)
+                {
+                    // try to swap to write lock, a lot threads will fail to as highly contended.
+                    unsigned attempts = 5;
+                    while (true)
+                    {
+                        if (lock->changeToWrite(1))
+                            break;
+                        else if (0 == --attempts)
+                            break;
+                    }
+                    lock->unlock();
+                }
+            }
+        };
+
+        DO_TEST(CriticalSection, CriticalBlock, unsigned __int64, 1, 1, numIterations);
+        DO_TEST(CriticalSection, CriticalBlock, unsigned __int64, 2, 1, numIterations);
+        DO_TEST(CriticalSection, CriticalBlock, unsigned __int64, 5, 1, numIterations);
+        DO_TEST(CriticalSection, CriticalBlock, unsigned __int64, 1, 2, numIterations);
+        DO_TEST(SpinLock, SpinBlock, unsigned __int64, 1, 1, numIterations);
+        DO_TEST(SpinLock, SpinBlock, unsigned __int64, 2, 1, numIterations);
+        DO_TEST(SpinLock, SpinBlock, unsigned __int64, 5, 1, numIterations);
+        DO_TEST(SpinLock, SpinBlock, unsigned __int64, 1, 2, numIterations);
+        DO_TEST(Null, Null, std::atomic<unsigned __int64>, 1, 1, numIterations);
+        DO_TEST(Null, Null, std::atomic<unsigned __int64>, 2, 1, numIterations);
+        DO_TEST(Null, Null, std::atomic<unsigned __int64>, 5, 1, numIterations);
+        DO_TEST(Null, Null, std::atomic<unsigned __int64>, 1, 2, numIterations);
+        DO_TEST(Null, Null, RelaxedAtomic<unsigned __int64>, 1, 1, numIterations);
+        DO_TEST(Null, Null, RelaxedAtomic<unsigned __int64>, 5, 1, numIterations);
+        DO_TEST(Null, Null, CasCounter, 1, 1, numIterations);
+        DO_TEST(Null, Null, CasCounter, 5, 1, numIterations);
+        DO_TEST(Null, Null, unsigned __int64, 1, 1, numIterations);
+        DO_TEST(Null, Null, unsigned __int64, 2, 1, numIterations);
+        DO_TEST(Null, Null, unsigned __int64, 5, 1, numIterations);
 
         //Read locks will fail to prevent values being lost, but the timings are useful in comparison with CriticalSection
-        DO_TEST(ReadWriteLock, ReadLockBlock, unsigned __int64, 1, 1);
-        DO_TEST(ReadWriteLock, ReadLockBlock, unsigned __int64, 2, 1);
-        DO_TEST(ReadWriteLock, ReadLockBlock, unsigned __int64, 5, 1);
-        DO_TEST(ReadWriteLock, ReadLockBlock, unsigned __int64, 1, 2);
-        DO_TEST(ReadWriteLock, WriteLockBlock, unsigned __int64, 1, 1);
-        DO_TEST(ReadWriteLock, WriteLockBlock, unsigned __int64, 2, 1);
-        DO_TEST(ReadWriteLock, WriteLockBlock, unsigned __int64, 5, 1);
-        DO_TEST(ReadWriteLock, WriteLockBlock, unsigned __int64, 1, 2);
+        DO_TEST(ReadWriteLock, ReadLockBlock, unsigned __int64, 1, 1, numIterations);
+        DO_TEST(ReadWriteLock, ReadLockBlock, unsigned __int64, 2, 1, numIterations);
+        DO_TEST(ReadWriteLock, ReadLockBlock, unsigned __int64, 5, 1, numIterations);
+        DO_TEST(ReadWriteLock, ReadLockBlock, unsigned __int64, 1, 2, numIterations);
+        DO_TEST(ReadWriteLock, WriteLockBlock, unsigned __int64, 1, 1, numIterations);
+        DO_TEST(ReadWriteLock, WriteLockBlock, unsigned __int64, 2, 1, numIterations);
+        DO_TEST(ReadWriteLock, WriteLockBlock, unsigned __int64, 5, 1, numIterations);
+        DO_TEST(ReadWriteLock, WriteLockBlock, unsigned __int64, 1, 2, numIterations);
+
+        DO_TEST(ReadWriteLock, WriteToReadLockBlock, unsigned __int64, 1, 1, numIterations);
+        DO_TEST(ReadWriteLock, WriteToReadLockBlock, unsigned __int64, 2, 1, numIterations);
+        DO_TEST(ReadWriteLock, WriteToReadLockBlock, unsigned __int64, 5, 1, numIterations);
+        DO_TEST(ReadWriteLock, WriteToReadLockBlock, unsigned __int64, 1, 2, numIterations);
+
+        DO_TEST(ReadWriteLock, ReadToWriteLockBlock, unsigned __int64, 1, 1, 10000);
+        DO_TEST(ReadWriteLock, ReadToWriteLockBlock, unsigned __int64, 2, 1, 10000);
+        DO_TEST(ReadWriteLock, ReadToWriteLockBlock, unsigned __int64, 5, 1, 10000);
+        DO_TEST(ReadWriteLock, ReadToWriteLockBlock, unsigned __int64, 1, 2, 10000);
 
         printf("Summary\n");
         summariseTimings("Uncontended", uncontendedTimes);
@@ -1597,10 +1651,10 @@ public:
 
     void summariseTimings(const char * option, UInt64Array & times)
     {
-        printf("%11s 1x: cs(%3" I64F "u) spin(%3" I64F "u) atomic(%3" I64F "u) ratomic(%3" I64F "u) cas(%3" I64F "u) rd(%3" I64F "u) wr(%3" I64F "u)   "
-                    "5x: cs(%3" I64F "u) spin(%3" I64F "u) atomic(%3" I64F "u) ratomic(%3" I64F "u) cas(%3" I64F "u) rd(%3" I64F "u) wr(%3" I64F "u)\n", option,
-                    times.item(0), times.item(4), times.item(8), times.item(12), times.item(14), times.item(19), times.item(23),
-                    times.item(2), times.item(6), times.item(10), times.item(13), times.item(15), times.item(21), times.item(25));
+        printf("%11s 1x: cs(%3" I64F "u) spin(%3" I64F "u) atomic(%3" I64F "u) ratomic(%3" I64F "u) cas(%3" I64F "u) rd(%3" I64F "u) wr(%3" I64F "u) w2r(%3" I64F "u) r2w(%3" I64F "u) "
+                    "5x: cs(%3" I64F "u) spin(%3" I64F "u) atomic(%3" I64F "u) ratomic(%3" I64F "u) cas(%3" I64F "u) rd(%3" I64F "u) wr(%3" I64F "u) w2r(%3" I64F "u) r2w(%3" I64F "u)\n", option,
+                    times.item(0), times.item(4), times.item(8), times.item(12), times.item(14), times.item(19), times.item(23), times.item(27), times.item(31),
+                    times.item(2), times.item(6), times.item(10), times.item(13), times.item(15), times.item(21), times.item(25), times.item(29), times.item(33));
     }
 
 private: