Selaa lähdekoodia

Merge pull request #7947 from mckellyln/hpcc-14155

HPCC-14155 Skip slave unregister when asked to shutdown

Reviewed-By: Jake Smith <jake.smith@lexisnexis.com>
Reviewed-By: Richard Chapman <rchapman@hpccsystems.com>
Richard Chapman 9 vuotta sitten
vanhempi
commit
c2f16eb1a5

+ 44 - 3
thorlcr/master/thmastermain.cpp

@@ -338,16 +338,15 @@ public:
     {
         CriticalBlock block(crit);
         unsigned i=0;
+        mptag_t shutdownTag = createReplyTag();
         for (; i<queryNodeClusterWidth(); i++)
         {
             if (status->test(i))
             {
-                status->set(i, false);
                 SocketEndpoint ep = queryNodeGroup().queryNode(i+1).endpoint();
-                if (watchdog)
-                    watchdog->removeSlave(ep);
                 CMessageBuffer msg;
                 msg.append((unsigned)Shutdown);
+                serializeMPtag(msg, shutdownTag);
                 try
                 {
                     queryNodeComm().send(msg, i+1, masterSlaveMpTag, MP_ASYNC_SEND);
@@ -358,6 +357,48 @@ public:
                     EXCLOG(e, "Shutting down slave");
                     e->Release();
                 }
+                if (watchdog)
+                    watchdog->removeSlave(ep);
+            }
+        }
+
+        CTimeMon tm(20000);
+        unsigned numReplied = 0;
+        while (numReplied < slavesRegistered)
+        {
+            unsigned remaining;
+            if (tm.timedout(&remaining))
+            {
+                PROGLOG("Timeout waiting for Shutdown reply from slave(s) (%u replied out of %u total)", numReplied, slavesRegistered);
+                StringBuffer slaveList;
+                for (i=0;i<slavesRegistered;i++)
+                {
+                    if (status->test(i))
+                    {
+                        if (slaveList.length())
+                            slaveList.append(",");
+                        slaveList.append(i+1);
+                    }
+                }
+                if (slaveList.length())
+                    PROGLOG("Slaves that have not replied: %s", slaveList.str());
+                break;
+            }
+            try
+            {
+                rank_t sender;
+                CMessageBuffer msg;
+                if (queryNodeComm().recv(msg, RANK_ALL, shutdownTag, &sender, remaining))
+                {
+                    if (sender) // paranoid, sender should always be > 0
+                        status->set(sender-1, false);
+                    numReplied++;
+                }
+            }
+            catch (IException *e)
+            {
+                // do not log MP link closed exceptions from ending slaves
+                e->Release();
             }
         }
     }

+ 12 - 5
thorlcr/slave/slavmain.cpp

@@ -95,7 +95,7 @@ void disableThorSlaveAsDaliClient()
 
 class CJobListener : public CSimpleInterface
 {
-    bool stopped;
+    bool &stopped;
     CriticalSection crit;
     OwningStringSuperHashTableOf<CJobSlave> jobs;
     CFifoFileCache querySoCache; // used to mirror master cache
@@ -158,7 +158,7 @@ class CJobListener : public CSimpleInterface
     } excptHandler;
 
 public:
-    CJobListener() : excptHandler(*this)
+    CJobListener(bool &_stopped) : stopped(_stopped), excptHandler(*this)
     {
         stopped = true;
         channelsPerSlave = globals->getPropInt("@channelsPerSlave", 1);
@@ -473,8 +473,15 @@ public:
                     }
                     case Shutdown:
                     {
-                        doReply = false;
                         stopped = true;
+                        PROGLOG("Shutdown received");
+                        if (watchdog)
+                            watchdog->stop();
+                        mptag_t sdreplyTag;
+                        deserializeMPtag(msg, sdreplyTag);
+                        msg.setReplyTag(sdreplyTag);
+                        msg.clear();
+                        msg.append(false);
                         break;
                     }
                     case GraphGetResult:
@@ -724,7 +731,7 @@ public:
     virtual IFileInProgressHandler &queryFileInProgressHandler() { return *fipHandler.get(); }
 };
 
-void slaveMain()
+void slaveMain(bool &jobListenerStopped)
 {
     unsigned masterMemMB = globals->getPropInt("@masterTotalMem");
     HardwareInfo hdwInfo;
@@ -742,7 +749,7 @@ void slaveMain()
     }
     roxiemem::setTotalMemoryLimit(gmemAllowHugePages, gmemAllowTransparentHugePages, gmemRetainMemory, ((memsize_t)gmemSize) * 0x100000, 0, thorAllocSizes, NULL);
 
-    CJobListener jobListener;
+    CJobListener jobListener(jobListenerStopped);
     CThorResourceSlave slaveResource;
     setIThorResource(slaveResource);
 

+ 1 - 1
thorlcr/slave/slavmain.hpp

@@ -19,7 +19,7 @@
 #define SLAVMAIN_HPP
 
 void abortSlave();
-void slaveMain();
+void slaveMain(bool &jobListenerStopped);
 void enableThorSlaveAsDaliClient();
 void disableThorSlaveAsDaliClient();
 

+ 7 - 4
thorlcr/slave/slwatchdog.cpp

@@ -76,12 +76,15 @@ public:
     }
     virtual void stop()
     {
+        if (!stopped)
+        {
 #ifdef _WIN32
-        threaded.adjustPriority(0); // restore to normal before stopping
+            threaded.adjustPriority(0); // restore to normal before stopping
 #endif
-        stopped = true;
-        threaded.join();
-        LOG(MCdebugProgress, thorJob, "Stopped watchdog");
+            stopped = true;
+            threaded.join();
+            LOG(MCdebugProgress, thorJob, "Stopped watchdog");
+        }
     }
 
     size32_t gatherData(MemoryBuffer &mb)

+ 13 - 6
thorlcr/slave/thslavemain.cpp

@@ -162,6 +162,8 @@ static bool RegisterSelf(SocketEndpoint &masterEp)
     return true;
 }
 
+static bool jobListenerStopped = true;
+
 void UnregisterSelf(IException *e)
 {
     StringBuffer slfStr;
@@ -180,7 +182,8 @@ void UnregisterSelf(IException *e)
         LOG(MCdebugProgress, thorJob, "Unregistered slave : %s", slfStr.str());
     }
     catch (IException *e) {
-        FLLOG(MCexception(e), thorJob, e,"slave unregistration error");
+        if (!jobListenerStopped)
+            FLLOG(MCexception(e), thorJob, e,"slave unregistration error");
         e->Release();
     }
 }
@@ -189,9 +192,12 @@ bool ControlHandler(ahType type)
 {
     if (ahInterrupt == type)
         LOG(MCdebugProgress, thorJob, "CTRL-C pressed");
-    if (masterNode)
-        UnregisterSelf(NULL);
-    abortSlave();
+    if (!jobListenerStopped)
+    {
+        if (masterNode)
+            UnregisterSelf(NULL);
+        abortSlave();
+    }
     return false;
 }
 
@@ -403,14 +409,15 @@ int main( int argc, char *argv[]  )
                 else
                     multiThorMemoryThreshold = 0;
             }
-            slaveMain();
+            slaveMain(jobListenerStopped);
         }
 
         LOG(MCdebugProgress, thorJob, "ThorSlave terminated OK");
     }
     catch (IException *e) 
     {
-        FLLOG(MCexception(e), thorJob, e,"ThorSlave");
+        if (!jobListenerStopped)
+            FLLOG(MCexception(e), thorJob, e,"ThorSlave");
         unregisterException.setown(e);
     }
     ClearTempDirs();