Ver código fonte

HPCC-14212 Add node subscription could cause deadlock

Add node subscription grabbed the subscriber crit 1st, then
the SDS data lock, this could deadlock with others who already
have the SDS data lock, then interact with the subscriber
manager.

Fix by grabbing the SDS data lock 1st, similar to fix HPCC-14190

Signed-off-by: Jake Smith <jake.smith@lexisnexis.com>
Jake Smith 9 anos atrás
pai
commit
f41d97edc5
2 arquivos alterados com 114 adições e 3 exclusões
  1. 2 2
      dali/base/dasds.cpp
  2. 112 1
      dali/datest/datest.cpp

+ 2 - 2
dali/base/dasds.cpp

@@ -2809,6 +2809,8 @@ public:
     // ISubscriptionManager impl.
     virtual void add(ISubscription *sub, SubscriptionId id)
     {
+        CHECKEDDALIREADLOCKBLOCK(owner.dataRWLock, readWriteTimeout);
+        CHECKEDCRITICALBLOCK(owner.treeRegCrit, fakeCritTimeout);
         CriticalBlock b(lock);
         /* calls back out to owner to scan for match, so that SDSManager can protect root/treereg.
          * It calls back (associateSubscriber) in this class to add subscribers based on matches.
@@ -8636,8 +8638,6 @@ void CCovenSDSManager::addNodeSubscriber(ISubscription *sub, SubscriptionId id)
     mb.read(xpath);
     mb.read(sendValue);
 
-    CHECKEDDALIREADLOCKBLOCK(dataRWLock, readWriteTimeout);
-    CHECKEDCRITICALBLOCK(treeRegCrit, fakeCritTimeout);
     Owned<IPropertyTreeIterator> iter = root->getElements(xpath+1);
     if (!iter->first())
         throw MakeSDSException(SDSExcpt_SubscriptionNoMatch, "Failed to match any nodes: %s", xpath.get());

+ 112 - 1
dali/datest/datest.cpp

@@ -2365,7 +2365,7 @@ public:
 };
 
 void TestSDS3(IGroup *group)
-{   
+{
     class TSDS1 : public CInterface, implements IThreadFactory
     {
     public:
@@ -2389,6 +2389,114 @@ void TestSDS3(IGroup *group)
     pool.clear();
 }
 
+void TestNodeSubs()
+{
+    class CNodeSubPool : public CSimpleInterfaceOf<IThreadFactory>
+    {
+        class CNodeSubscriber : public CSimpleInterfaceOf<ISDSNodeSubscription>
+        {
+        public:
+            virtual void notify(SubscriptionId id, SDSNotifyFlags flags, unsigned valueLen, const void *valueData)
+            {
+                PROGLOG("CNodeSubscriber notified");
+            }
+        };
+        SubscriptionId sid;
+        CriticalSection sidCrit;
+        Owned<ISDSNodeSubscription> subscriber;
+
+        void test()
+        {
+            try
+            {
+                unsigned t = getRandom()%4;
+                switch (t)
+                {
+                    case 0:
+                    {
+                        // connect thread
+                        Owned<IRemoteConnection> conn = querySDS().connect("/nodesubtest", myProcessSession(), RTM_CREATE|RTM_LOCK_WRITE, INFINITE);
+                        MilliSleep(5+getRandom()%50);
+                        conn.clear();
+                        break;
+                    }
+                    case 1:
+                    {
+                        // node sub test
+                        CriticalBlock b(sidCrit);
+                        sid = querySDS().subscribeExact("/nodesubtest", *subscriber, false);
+                        break;
+                    }
+                    case 2:
+                    {
+                        // node sub test
+                        CriticalBlock b(sidCrit);
+                        if (sid)
+                        {
+                            querySDS().unsubscribeExact(sid);
+                            sid = 0;
+                        }
+                        break;
+                    }
+                    case 3:
+                    {
+                        Owned<IRemoteConnection> conn = querySDS().connect("/nodesubtest", myProcessSession(), RTM_LOCK_WRITE, INFINITE);
+                        if (conn)
+                            conn->close(true);
+                        break;
+                    }
+                }
+            }
+            catch (IException *e)
+            {
+                PrintExceptionLog(e, NULL);
+                e->Release();
+            }
+        }
+        class CNodeSubThread : public CInterface, implements IPooledThread
+        {
+            CNodeSubPool &owner;
+        public:
+            IMPLEMENT_IINTERFACE;
+
+            CNodeSubThread(CNodeSubPool &_owner) : owner(_owner) { }
+            virtual void init(void *param)
+            {
+            }
+            virtual void main()
+            {
+                owner.test();
+            }
+            virtual bool stop() { return true; }
+            virtual bool canReuse() { return true; }
+        };
+    public:
+        CNodeSubPool()
+        {
+            sid = 0;
+            subscriber.setown(new CNodeSubscriber());
+        }
+        virtual IPooledThread *createNew()
+        {
+            return new CNodeSubThread(*this);
+        }
+    } poolFactory;
+
+    Owned<IThreadPool> pool = createThreadPool("TSDSTest", &poolFactory, NULL, 100, 100000);
+
+    unsigned tests = testParams.ordinality() ? atoi(testParams.item(0)) : 10;
+    for (unsigned t=0; t<tests; t++)
+    {
+        PROGLOG("Created test thread %d", t);
+        pool->start(NULL);
+    }
+
+    PrintLog("Joining all TSDSThread running threads");
+    pool->joinAll();
+    pool.clear();
+
+}
+
 void TestSDSXPaths()
 {
     const char *testXML =
@@ -3048,6 +3156,7 @@ int main(int argc, char* argv[])
                 case 9: TestExternal(); break;
                 case 10: TestSubLocks(); break;
                 case 11: TestSDS3(group); break;
+                case 12: TestNodeSubs(); break;
                 }
             }
             else if (TEST("DFS"))
@@ -3074,6 +3183,8 @@ int main(int argc, char* argv[])
                 TestSDS2();
             else if (TEST("SDS3"))
                 TestSDS3(group);
+            else if (TEST("NODESUBS"))
+                TestNodeSubs();
             else if (TEST("XPATHS"))
                 TestSDSXPaths();
             else if (TEST("STRESS"))