Sfoglia il codice sorgente

Merge pull request #13165 from jakesmith/hpcc-23140-verifyAll

HPCC-23140 Refactor/speed up verifyAll and add per connect timeout

Reviewed-By: Mark Kelly <mark.kelly@lexisnexis.com>
Reviewed-By: Gavin Halliday <gavin.halliday@lexisnexis.com>
Reviewed-By: Richard Chapman <rchapman@hpccsystems.com>
Richard Chapman 5 anni fa
parent
commit
71b8e04f72

+ 2 - 2
dali/base/dacoven.cpp

@@ -291,10 +291,10 @@ public:
     }
 
 
-    virtual bool verifyAll(bool duplex=false, unsigned timeout=1000*60*30)
+    virtual bool verifyAll(bool duplex=false, unsigned timeout=1000*60*30, unsigned perConnectionTimeout=0)
     {
         assertex(comm);
-        return comm->verifyAll(duplex,timeout);
+        return comm->verifyAll(duplex, timeout, perConnectionTimeout);
     }
     
     // receive, returns senders rank or false if no message available in time given or cancel called

+ 88 - 32
system/mp/mpcomm.cpp

@@ -24,6 +24,9 @@
     look at all timeouts
 */
 
+#include <future>
+#include <vector>
+
 #include "platform.h"
 #include "portlist.h"
 #include "jlib.hpp"
@@ -82,7 +85,6 @@
 
 #define _TRACING
 
-static  CriticalSection verifysect;
 static  CriticalSection childprocesssect;
 static  UnsignedArray childprocesslist;
 
@@ -2604,6 +2606,7 @@ void CMPServer::notifyClosed(SocketEndpoint &ep, bool trace)
 class CInterCommunicator: public IInterCommunicator, public CInterface
 {
     CMPServer *parent;
+    CriticalSection verifysect;
 
 public:
     IMPLEMENT_IINTERFACE;
@@ -2817,6 +2820,7 @@ class CCommunicator: public ICommunicator, public CInterface
     CMPServer *parent;
     bool outer;
     rank_t myrank;
+    CriticalSection verifysect;
 
     const SocketEndpoint &queryEndpoint(rank_t rank)
     {
@@ -2945,46 +2949,98 @@ public:
         return channel->verifyConnection(tm,true);
     }
 
-    bool verifyAll(bool duplex, unsigned timeout)
+    bool verifyAll(bool duplex, unsigned totalTimeout, unsigned perConnectionTimeout)
     {
         CriticalBlock block(verifysect);
-        CTimeMon tm(timeout);
+        CTimeMon totalTM(totalTimeout);
+
+        Semaphore sem;
+        sem.signal(getAffinityCpus());
+        std::atomic<bool> abort{false};
+        
+        auto verifyConnWithConnect = [&](unsigned rank, unsigned timeout)
         {
-            ForEachNodeInGroup(rank,*group) {
-                bool doverify;
-                if (duplex)
-                    doverify = (rank!=myrank);
-                else if ((rank&1)==(myrank&1))
-                    doverify = (myrank>rank);
-                else
-                    doverify = (myrank<rank);
-                if (doverify) {
-                    Owned<CMPChannel> channel = getChannel(rank);
+            CTimeMon tm(timeout);
+            Owned<CMPChannel> channel = getChannel(rank);
+            return channel->verifyConnection(tm, true);    
+        };
+
+        auto verifyConnWithoutConnect = [&](unsigned rank, unsigned timeout)
+        {
+            CTimeMon tm(timeout);
+            while (true)
+            {
+                Owned<CMPChannel> channel = getChannel(rank);
+                if (channel->verifyConnection(tm, false))
+                    return true;
+                if (abort || tm.timedout())
+                    return false;
+                Sleep(100);
+            }
+        };
+
+        auto threadedVerifyConnectFunc = [&](rank_t rank, std::function<bool (unsigned rank, unsigned timeout)> connectFunc)
+        {
+            // NB: running because took (via wait()) a semaphore slot, restore it at end of scope
+            struct RestoreSlot
+            {
+                Semaphore &sem;
+                RestoreSlot(Semaphore &_sem) : sem(_sem) { }
+                ~RestoreSlot() { sem.signal(); }
+            } restoreSlot(sem);
+
+            unsigned timeoutMs;
+            if (totalTM.timedout(&timeoutMs) || abort)
+                return false;
+            if (perConnectionTimeout && (perConnectionTimeout < timeoutMs))
+                timeoutMs = perConnectionTimeout;
+
+            if (!connectFunc(rank, timeoutMs))
+            {
+                abort = true; // ensure verifyFunc knows before release slot, to prevent other thread being launched
+                return false;
+            }
+            return true;
+        };
+
+        auto verifyFunc = [&](std::function<bool (unsigned rank)> isRankToVerifyFunc, std::function<bool (unsigned rank, unsigned timeout)> connectFunc)
+        {
+            std::vector<std::future<bool>> results;
+            for (rank_t rank=0; rank<group->ordinality(); rank++)
+            {
+                if (isRankToVerifyFunc(rank))
+                {
+                    // check timeout before and after sem.wait
+                    // NB: sem.wait if successful, takes a slot which is restored by the thread when it is done
                     unsigned remaining;
-                    if (tm.timedout(&remaining)) {
-                        return false;
+                    if (totalTM.timedout(&remaining) || !sem.wait(remaining) || totalTM.timedout(&remaining))
+                    {
+                        abort = true;
+                        break;
                     }
-                    if (!channel->verifyConnection(tm,true))
-                        return false;
+                    else if (abort)
+                        break;
+                    results.push_back(std::async(std::launch::async, threadedVerifyConnectFunc, rank, connectFunc));
                 }
             }
-        }
-        if (!duplex) {
-            ForEachNodeInGroup(rank,*group) {
-                bool doverify = ((rank&1)==(myrank&1))?(myrank<rank):(myrank>rank);
-                if (doverify) {
-                    Owned<CMPChannel> channel = getChannel(rank);
-                    while (!channel->verifyConnection(tm,false)) {
-                        unsigned remaining;
-                        if (tm.timedout(&remaining))
-                            return false;
-                        CriticalUnblock unblock(verifysect);
-                        Sleep(100);
-                    }
-                }
+            bool res = true;
+            for (auto &f: results)
+            {
+                if (!f.get())
+                    res = false;
             }
+            return res && !abort;
+        };
+
+        if (duplex)
+            return verifyFunc([this](rank_t rank) { return rank != myrank; }, verifyConnWithConnect);
+        else
+        {
+            if (!verifyFunc([this](rank_t rank) { return ((rank&1)==(myrank&1)) ? (myrank > rank) : (myrank < rank); }, verifyConnWithConnect))
+                return false;
+
+            return verifyFunc([this](rank_t rank) { return ((rank&1)==(myrank&1)) ? (myrank < rank) : (myrank > rank); }, verifyConnWithoutConnect);
         }
-        return true;
     }
 
     unsigned probe(rank_t srcrank, mptag_t tag, rank_t *sender, unsigned timeout=0)

+ 1 - 1
system/mp/mpcomm.hpp

@@ -54,7 +54,7 @@ interface ICommunicator: extends IInterface
     virtual void flush  (mptag_t tag) = 0;    // flushes pending buffers
 
     virtual bool verifyConnection(rank_t rank, unsigned timeout=1000*60*5) = 0; // verifies connected to rank
-    virtual bool verifyAll(bool duplex=false, unsigned timeout=1000*60*30) = 0;
+    virtual bool verifyAll(bool duplex=false, unsigned timeout=1000*60*30, unsigned perConnectionTimeout=0) = 0;
     virtual void disconnect(INode *node) = 0;
     virtual void barrier() = 0;
     virtual const SocketEndpoint &queryChannelPeerEndpoint(const SocketEndpoint &sender) const = 0;

+ 2 - 2
thorlcr/master/thgraphmanager.cpp

@@ -450,8 +450,8 @@ void CJobManager::run()
     PROGLOG("verifying mp connection to all slaves");
     Owned<IMPServer> mpServer = getMPServer();
     Owned<ICommunicator> comm = mpServer->createCommunicator(&queryClusterGroup());
-    if (!comm->verifyAll())
-        OERRLOG("Failed to connect to all slaves");
+    if (!comm->verifyAll(false, 1000*60*30, 1000*60))
+        throwStringExceptionV(0, "Failed to connect to all slaves");    
     else
         PROGLOG("verified mp connection to all slaves");
 

+ 3 - 4
thorlcr/master/thmastermain.cpp

@@ -862,10 +862,9 @@ int main( int argc, char *argv[]  )
             }
 
             PROGLOG("verifying mp connection to rest of cluster");
-            if (!queryNodeComm().verifyAll())
-                OERRLOG("Failed to connect to all nodes");
-            else
-                PROGLOG("verified mp connection to rest of cluster");
+            if (!queryNodeComm().verifyAll(false, 1000*60*30, 1000*60))
+                throwStringExceptionV(0, "Failed to connect to all nodes");
+            PROGLOG("verified mp connection to rest of cluster");
 
             LOG(MCauditInfo, ",Progress,Thor,Startup,%s,%s,%s,%s",nodeGroup.str(),thorname,queueName.str(),logUrl.str());
             auditStartLogged = true;