Browse Source

HPCC-14155 Skip slave unregister when asked to shutdown

Master waits for replies from all slaves before ending
so init script does not send kill signal to slaves which caused them
to unregister with a no longer running master.

Signed-off-by: Mark Kelly <mark.kelly@lexisnexis.com>
Mark Kelly 9 years ago
parent
commit
8420ffb2e1

+ 44 - 3
thorlcr/master/thmastermain.cpp

@@ -338,16 +338,15 @@ public:
     {
     {
         CriticalBlock block(crit);
         CriticalBlock block(crit);
         unsigned i=0;
         unsigned i=0;
+        mptag_t shutdownTag = createReplyTag();
         for (; i<queryNodeClusterWidth(); i++)
         for (; i<queryNodeClusterWidth(); i++)
         {
         {
             if (status->test(i))
             if (status->test(i))
             {
             {
-                status->set(i, false);
                 SocketEndpoint ep = queryNodeGroup().queryNode(i+1).endpoint();
                 SocketEndpoint ep = queryNodeGroup().queryNode(i+1).endpoint();
-                if (watchdog)
-                    watchdog->removeSlave(ep);
                 CMessageBuffer msg;
                 CMessageBuffer msg;
                 msg.append((unsigned)Shutdown);
                 msg.append((unsigned)Shutdown);
+                serializeMPtag(msg, shutdownTag);
                 try
                 try
                 {
                 {
                     queryNodeComm().send(msg, i+1, masterSlaveMpTag, MP_ASYNC_SEND);
                     queryNodeComm().send(msg, i+1, masterSlaveMpTag, MP_ASYNC_SEND);
@@ -358,6 +357,48 @@ public:
                     EXCLOG(e, "Shutting down slave");
                     EXCLOG(e, "Shutting down slave");
                     e->Release();
                     e->Release();
                 }
                 }
+                if (watchdog)
+                    watchdog->removeSlave(ep);
+            }
+        }
+
+        CTimeMon tm(20000);
+        unsigned numReplied = 0;
+        while (numReplied < slavesRegistered)
+        {
+            unsigned remaining;
+            if (tm.timedout(&remaining))
+            {
+                PROGLOG("Timeout waiting for Shutdown reply from slave(s) (%u replied out of %u total)", numReplied, slavesRegistered);
+                StringBuffer slaveList;
+                for (i=0;i<slavesRegistered;i++)
+                {
+                    if (status->test(i))
+                    {
+                        if (slaveList.length())
+                            slaveList.append(",");
+                        slaveList.append(i+1);
+                    }
+                }
+                if (slaveList.length())
+                    PROGLOG("Slaves that have not replied: %s", slaveList.str());
+                break;
+            }
+            try
+            {
+                rank_t sender;
+                CMessageBuffer msg;
+                if (queryNodeComm().recv(msg, RANK_ALL, shutdownTag, &sender, remaining))
+                {
+                    if (sender) // paranoid, sender should always be > 0
+                        status->set(sender-1, false);
+                    numReplied++;
+                }
+            }
+            catch (IException *e)
+            {
+                // do not log MP link closed exceptions from ending slaves
+                e->Release();
             }
             }
         }
         }
     }
     }

+ 12 - 5
thorlcr/slave/slavmain.cpp

@@ -95,7 +95,7 @@ void disableThorSlaveAsDaliClient()
 
 
 class CJobListener : public CSimpleInterface
 class CJobListener : public CSimpleInterface
 {
 {
-    bool stopped;
+    bool &stopped;
     CriticalSection crit;
     CriticalSection crit;
     OwningStringSuperHashTableOf<CJobSlave> jobs;
     OwningStringSuperHashTableOf<CJobSlave> jobs;
     CFifoFileCache querySoCache; // used to mirror master cache
     CFifoFileCache querySoCache; // used to mirror master cache
@@ -158,7 +158,7 @@ class CJobListener : public CSimpleInterface
     } excptHandler;
     } excptHandler;
 
 
 public:
 public:
-    CJobListener() : excptHandler(*this)
+    CJobListener(bool &_stopped) : stopped(_stopped), excptHandler(*this)
     {
     {
         stopped = true;
         stopped = true;
         channelsPerSlave = globals->getPropInt("@channelsPerSlave", 1);
         channelsPerSlave = globals->getPropInt("@channelsPerSlave", 1);
@@ -473,8 +473,15 @@ public:
                     }
                     }
                     case Shutdown:
                     case Shutdown:
                     {
                     {
-                        doReply = false;
                         stopped = true;
                         stopped = true;
+                        PROGLOG("Shutdown received");
+                        if (watchdog)
+                            watchdog->stop();
+                        mptag_t sdreplyTag;
+                        deserializeMPtag(msg, sdreplyTag);
+                        msg.setReplyTag(sdreplyTag);
+                        msg.clear();
+                        msg.append(false);
                         break;
                         break;
                     }
                     }
                     case GraphGetResult:
                     case GraphGetResult:
@@ -724,7 +731,7 @@ public:
     virtual IFileInProgressHandler &queryFileInProgressHandler() { return *fipHandler.get(); }
     virtual IFileInProgressHandler &queryFileInProgressHandler() { return *fipHandler.get(); }
 };
 };
 
 
-void slaveMain()
+void slaveMain(bool &jobListenerStopped)
 {
 {
     unsigned masterMemMB = globals->getPropInt("@masterTotalMem");
     unsigned masterMemMB = globals->getPropInt("@masterTotalMem");
     HardwareInfo hdwInfo;
     HardwareInfo hdwInfo;
@@ -742,7 +749,7 @@ void slaveMain()
     }
     }
     roxiemem::setTotalMemoryLimit(gmemAllowHugePages, gmemAllowTransparentHugePages, gmemRetainMemory, ((memsize_t)gmemSize) * 0x100000, 0, thorAllocSizes, NULL);
     roxiemem::setTotalMemoryLimit(gmemAllowHugePages, gmemAllowTransparentHugePages, gmemRetainMemory, ((memsize_t)gmemSize) * 0x100000, 0, thorAllocSizes, NULL);
 
 
-    CJobListener jobListener;
+    CJobListener jobListener(jobListenerStopped);
     CThorResourceSlave slaveResource;
     CThorResourceSlave slaveResource;
     setIThorResource(slaveResource);
     setIThorResource(slaveResource);
 
 

+ 1 - 1
thorlcr/slave/slavmain.hpp

@@ -19,7 +19,7 @@
 #define SLAVMAIN_HPP
 #define SLAVMAIN_HPP
 
 
 void abortSlave();
 void abortSlave();
-void slaveMain();
+void slaveMain(bool &jobListenerStopped);
 void enableThorSlaveAsDaliClient();
 void enableThorSlaveAsDaliClient();
 void disableThorSlaveAsDaliClient();
 void disableThorSlaveAsDaliClient();
 
 

+ 7 - 4
thorlcr/slave/slwatchdog.cpp

@@ -76,12 +76,15 @@ public:
     }
     }
     virtual void stop()
     virtual void stop()
     {
     {
+        if (!stopped)
+        {
 #ifdef _WIN32
 #ifdef _WIN32
-        threaded.adjustPriority(0); // restore to normal before stopping
+            threaded.adjustPriority(0); // restore to normal before stopping
 #endif
 #endif
-        stopped = true;
-        threaded.join();
-        LOG(MCdebugProgress, thorJob, "Stopped watchdog");
+            stopped = true;
+            threaded.join();
+            LOG(MCdebugProgress, thorJob, "Stopped watchdog");
+        }
     }
     }
 
 
     size32_t gatherData(MemoryBuffer &mb)
     size32_t gatherData(MemoryBuffer &mb)

+ 13 - 6
thorlcr/slave/thslavemain.cpp

@@ -162,6 +162,8 @@ static bool RegisterSelf(SocketEndpoint &masterEp)
     return true;
     return true;
 }
 }
 
 
+static bool jobListenerStopped = true;
+
 void UnregisterSelf(IException *e)
 void UnregisterSelf(IException *e)
 {
 {
     StringBuffer slfStr;
     StringBuffer slfStr;
@@ -180,7 +182,8 @@ void UnregisterSelf(IException *e)
         LOG(MCdebugProgress, thorJob, "Unregistered slave : %s", slfStr.str());
         LOG(MCdebugProgress, thorJob, "Unregistered slave : %s", slfStr.str());
     }
     }
     catch (IException *e) {
     catch (IException *e) {
-        FLLOG(MCexception(e), thorJob, e,"slave unregistration error");
+        if (!jobListenerStopped)
+            FLLOG(MCexception(e), thorJob, e,"slave unregistration error");
         e->Release();
         e->Release();
     }
     }
 }
 }
@@ -189,9 +192,12 @@ bool ControlHandler(ahType type)
 {
 {
     if (ahInterrupt == type)
     if (ahInterrupt == type)
         LOG(MCdebugProgress, thorJob, "CTRL-C pressed");
         LOG(MCdebugProgress, thorJob, "CTRL-C pressed");
-    if (masterNode)
-        UnregisterSelf(NULL);
-    abortSlave();
+    if (!jobListenerStopped)
+    {
+        if (masterNode)
+            UnregisterSelf(NULL);
+        abortSlave();
+    }
     return false;
     return false;
 }
 }
 
 
@@ -403,14 +409,15 @@ int main( int argc, char *argv[]  )
                 else
                 else
                     multiThorMemoryThreshold = 0;
                     multiThorMemoryThreshold = 0;
             }
             }
-            slaveMain();
+            slaveMain(jobListenerStopped);
         }
         }
 
 
         LOG(MCdebugProgress, thorJob, "ThorSlave terminated OK");
         LOG(MCdebugProgress, thorJob, "ThorSlave terminated OK");
     }
     }
     catch (IException *e) 
     catch (IException *e) 
     {
     {
-        FLLOG(MCexception(e), thorJob, e,"ThorSlave");
+        if (!jobListenerStopped)
+            FLLOG(MCexception(e), thorJob, e,"ThorSlave");
         unregisterException.setown(e);
         unregisterException.setown(e);
     }
     }
     ClearTempDirs();
     ClearTempDirs();