Prechádzať zdrojové kódy

HPCC-14606 ESP deadlock with fcl and SuperOwnerLock

Signed-off-by: Mark Kelly <mark.kelly@lexisnexis.com>
Mark Kelly 9 rokov pred
rodič
commit
d148c6a4f6
1 zmenil súbory, kde vykonal 61 pridanie a 2 odobranie
  1. 61 2
      dali/base/dadfs.cpp

+ 61 - 2
dali/base/dadfs.cpp

@@ -577,6 +577,7 @@ public:
 
 class CFileSuperOwnerLock : protected CFileLockCompound
 {
+    CriticalSection superOwnerLockSect;
 public:
     bool init(const CDfsLogicalFileName &logicalName, IRemoteConnection *conn, unsigned timeout, const char *msg)
     {
@@ -586,6 +587,57 @@ public:
     {
         return CFileLockCompound::detach();
     }
+    bool initWithFileLock(const CDfsLogicalFileName &logicalName, IRemoteConnection *conn, unsigned timeout, const char *msg, CFileLock &fcl, unsigned fclmode)
+    {
+        // SuperOwnerLock while holding fcl
+        IRemoteConnection *fclConn = fcl.queryConnection();
+        CTimeMon tm(timeout);
+        unsigned remaining = timeout;
+        unsigned interval = 5000;
+        loop
+        {
+            if (tm.timedout(&remaining))
+                return false;
+            try
+            {
+                if (init(logicalName, conn, 0, msg))
+                    return true;
+            }
+            catch (ISDSException *e)
+            {
+                if (SDSExcpt_LockTimeout != e->errorCode())
+                    throw;
+                e->Release();
+            }
+            if (tm.timedout(&remaining))
+                return false;
+            unsigned delta = remaining;
+            if (delta < interval)
+                delta = interval;
+            // release lock
+            {
+                CriticalBlock block(superOwnerLockSect);
+                fclConn->changeMode(RTM_NONE, delta);
+                fclConn->reload();
+            }
+            tm.timedout(&remaining);
+            unsigned stime = 1000 * (2+getRandom()%15); // 2-15 sec
+            if (stime > remaining)
+                stime = remaining;
+            // let another get excl lock
+            Sleep(stime);
+            tm.timedout(&remaining);
+            delta = remaining;
+            if (delta < interval)
+                delta = interval;
+            // get lock again (waiting for other to release excl)
+            {
+                CriticalBlock block(superOwnerLockSect);
+                fclConn->changeMode(fclmode, delta);
+                fclConn->reload();
+            }
+        }
+    }
 };
 
 class CScopeConnectLock
@@ -7363,11 +7415,18 @@ IDistributedFile *CDistributedFileDirectory::dolookup(CDfsLogicalFileName &_logi
                 CFileLock fcl;
                 unsigned mode = RTM_LOCK_READ | RTM_SUB;
                 if (hold) mode |= RTM_LOCK_HOLD;
+                // initWithFileLock() will loop (releasing and relocking fcl) if it cannot get SuperOwner lock
+                // another way to solve this could be to:
+                // if (lockSuperOwner)
+                // {
+                //     mode &= ~RTM_LOCK_READ;
+                //     mode |= RTM_LOCK_WRITE;
+                // }
                 if (!fcl.init(*logicalname, mode, timeout, "CDistributedFileDirectory::lookup"))
                     break;
                 CFileSuperOwnerLock superOwnerLock;
                 if (lockSuperOwner)
-                    verifyex(superOwnerLock.init(*logicalname, NULL, defaultTimeout, "CDistributedFileDirectory::dolookup(SuperOwnerLock)"));
+                    verifyex(superOwnerLock.initWithFileLock(*logicalname, NULL, defaultTimeout, "CDistributedFileDirectory::dolookup(SuperOwnerLock)", fcl, mode));
                 if (fcl.getKind() == DXB_File)
                 {
                     StringBuffer cname;
@@ -11411,7 +11470,7 @@ bool CDistributedFileDirectory::getFileSuperOwners(const char *logicalname, Stri
     if (!lock.init(lfn, RTM_LOCK_READ, defaultTimeout, "CDistributedFileDirectory::getFileSuperOwners"))
         return false;
     CFileSuperOwnerLock superOwnerLock;
-    verifyex(superOwnerLock.init(lfn, NULL, defaultTimeout, "CDistributedFileDirectory::getFileSuperOwners(SuperOwnerLock)"));
+    verifyex(superOwnerLock.initWithFileLock(lfn, NULL, defaultTimeout, "CDistributedFileDirectory::getFileSuperOwners(SuperOwnerLock)", lock, RTM_LOCK_READ));
     Owned<IPropertyTreeIterator> iter = lock.queryRoot()->getElements("SuperOwner");
     StringBuffer pname;
     ForEach(*iter) {