Browse Source

HPCC-14432 Minor rationalization of new global MPServer code.

Signed-off-by: Jake Smith <jake.smith@lexisnexis.com>
Jake Smith 9 years ago
parent
commit
ff80298bff
1 changed files with 104 additions and 87 deletions
  1. 104 87
      system/mp/mpcomm.cpp

+ 104 - 87
system/mp/mpcomm.cpp

@@ -456,7 +456,7 @@ class UserPacketHandler;
 class CMPNotifyClosedThread;
 
 typedef SuperHashTableOf<CMPChannel,SocketEndpoint> CMPChannelHT;
-static class CMPServer: private CMPChannelHT, implements IMPServer
+class CMPServer: private CMPChannelHT, implements IMPServer
 {
     byte RTsalt;
     ISocketSelectHandler        *selecthandler;
@@ -467,9 +467,6 @@ static class CMPServer: private CMPChannelHT, implements IMPServer
 protected:
     unsigned short              port;
 public:
-    static CriticalSection  serversect;
-    static int                      servernest;
-    static bool                     serverpaused;
     bool checkclosed;
 
 // packet handlers
@@ -544,16 +541,7 @@ public:
     {
         return myNode;
     }
-} *MPserver=NULL;
-int CMPServer::servernest=0;
-bool CMPServer::serverpaused=false;
-CriticalSection CMPServer::serversect;
-
-mptag_t createReplyTag()
-{
-    assertex(MPserver);
-    return MPserver->createReplyTag();
-}
+};
 
 //===========================================================================
 
@@ -2950,126 +2938,155 @@ ICommunicator *CMPServer::createCommunicator(IGroup *group, bool outer)
 
 ///////////////////////////////////
 
+IMPServer *startNewMPServer(unsigned port)
+{
+    assertex(sizeof(PacketHeader)==32);
+    CMPServer *mpServer = new CMPServer(port);
+    mpServer->start();
+    return mpServer;
+}
+
 
 class CGlobalMPServer : public CMPServer
 {
+    int nestLevel;
+    bool paused;
+    IInterCommunicator *worldcomm;
 public:
+    static CriticalSection sect;
+
     CGlobalMPServer(unsigned _port) : CMPServer(_port)
     {
-        initMyNode(port); // NB port set by connectthread constructor in base
+        worldcomm = NULL;
+        nestLevel = 0;
+    }
+    ~CGlobalMPServer()
+    {
+        ::Release(worldcomm);
+    }
+    IInterCommunicator &queryWorldCommunicator()
+    {
+        if (!worldcomm)
+            worldcomm = new CInterCommunicator(this);
+        return *worldcomm;
     }
+    unsigned incNest() { return ++nestLevel; }
+    unsigned decNest() { return --nestLevel; }
+    unsigned queryNest() { return nestLevel; }
+    bool isPaused() const { return paused; }
+    void setPaused(bool onOff) { paused = onOff; }
 };
+CriticalSection CGlobalMPServer::sect;
+static CGlobalMPServer *globalMPServer;
 
-
-ICommunicator *createCommunicator(IGroup *group,bool outer)
-{
-    assertex(MPserver!=NULL);
-    return MPserver->createCommunicator(group, outer);
-}
-
-
-static IInterCommunicator *worldcomm=NULL;
 MODULE_INIT(INIT_PRIORITY_STANDARD)
 {
+    globalMPServer = NULL;
     return true;
 }
 MODULE_EXIT()
 {
-    ::Release(worldcomm);
-}
-
-IInterCommunicator &queryWorldCommunicator()
-{
-    CriticalBlock block(CMPServer::serversect);
-    assertex(MPserver!=NULL);
-    if (!worldcomm)
-        worldcomm = new CInterCommunicator(MPserver);
-    return *worldcomm;
-}
-
-
-IMPServer *startNewMPServer(unsigned port)
-{
-    assertex(sizeof(PacketHeader)==32);
-    CMPServer *mpServer = new CMPServer(port);
-    mpServer->start();
-    return mpServer;
+    ::Release(globalMPServer);
 }
 
 void startMPServer(unsigned port, bool paused)
 {
     assertex(sizeof(PacketHeader)==32);
-    CriticalBlock block(CMPServer::serversect); 
-    if (CMPServer::servernest==0)
+    CriticalBlock block(CGlobalMPServer::sect);
+    if (NULL == globalMPServer)
+    {
+        globalMPServer = new CGlobalMPServer(port);
+        initMyNode(globalMPServer->getPort());
+    }
+    if (0 == globalMPServer->queryNest())
     {
-        if (!CMPServer::serverpaused)
-        {
-            delete MPserver;
-            MPserver = new CGlobalMPServer(port);
-        }
         if (paused)
         {
-            CMPServer::serverpaused = true;
+            globalMPServer->setPaused(paused);
             return;
         }
-        queryLogMsgManager()->setPort(MPserver->getPort());
-        MPserver->start();
-        CMPServer::serverpaused = false;
+        queryLogMsgManager()->setPort(globalMPServer->getPort());
+        globalMPServer->start();
+        globalMPServer->setPaused(false);
     }
-    CMPServer::servernest++;
-}
-
-IMPServer *getMPServer()
-{
-    CriticalBlock block(CMPServer::serversect);
-    assertex(MPserver);
-    return LINK(MPserver);
+    globalMPServer->incNest();
 }
 
 void stopMPServer()
 {
-    CriticalBlock block(CMPServer::serversect);
-    if (--CMPServer::servernest==0) {
-        stopLogMsgReceivers();
+    CGlobalMPServer *_globalMPServer = NULL;
+    {
+        CriticalBlock block(CGlobalMPServer::sect);
+        if (NULL == globalMPServer)
+            return;
+        if (0 == globalMPServer->decNest())
+        {
+            stopLogMsgReceivers();
 #ifdef _TRACE
-        LOG(MCdebugInfo(100), unknownJob, "MP: Stopping MP Server");
+            LOG(MCdebugInfo(100), unknownJob, "MP: Stopping MP Server");
 #endif
-        CriticalUnblock unblock(CMPServer::serversect);
-        assertex(MPserver!=NULL);
-        MPserver->stop();
-        delete MPserver;
-        MPserver = NULL;
-        ::Release(worldcomm);
-        worldcomm = NULL;
-        initMyNode(0);
+            _globalMPServer = globalMPServer;
+            globalMPServer = NULL;
+        }
+    }
+    if (NULL == _globalMPServer)
+        return;
+    _globalMPServer->stop();
+    _globalMPServer->Release();
 #ifdef _TRACE
-        LOG(MCdebugInfo(100), unknownJob, "MP: Stopped MP Server");
+    LOG(MCdebugInfo(100), unknownJob, "MP: Stopped MP Server");
 #endif
-    }
+    CriticalBlock block(CGlobalMPServer::sect);
+    initMyNode(0);
 }
 
-extern mp_decl void addMPConnectionMonitor(IConnectionMonitor *monitor)
+IInterCommunicator &queryWorldCommunicator()
 {
-    CriticalBlock block(CMPServer::serversect);
-    assertex(MPserver);
-    MPserver->addConnectionMonitor(monitor);
+    CriticalBlock block(CGlobalMPServer::sect);
+    assertex(globalMPServer);
+    return globalMPServer->queryWorldCommunicator();
 }
 
-extern mp_decl void removeMPConnectionMonitor(IConnectionMonitor *monitor)
+mptag_t createReplyTag()
 {
-    CriticalBlock block(CMPServer::serversect);
-    if (MPserver)
-        MPserver->removeConnectionMonitor(monitor);
+    assertex(globalMPServer);
+    return globalMPServer->createReplyTag();
+}
+
+ICommunicator *createCommunicator(IGroup *group, bool outer)
+{
+    assertex(globalMPServer);
+    return globalMPServer->createCommunicator(group, outer);
 }
 
 StringBuffer &getReceiveQueueDetails(StringBuffer &buf)
 {
-    CriticalBlock block(CMPServer::serversect);
-    if (MPserver)
-        MPserver->getReceiveQueueDetails(buf);
+    CriticalBlock block(CGlobalMPServer::sect);
+    if (globalMPServer)
+        globalMPServer->getReceiveQueueDetails(buf);
     return buf;
 }
 
+void addMPConnectionMonitor(IConnectionMonitor *monitor)
+{
+    CriticalBlock block(CGlobalMPServer::sect);
+    assertex(globalMPServer);
+    globalMPServer->addConnectionMonitor(monitor);
+}
+
+void removeMPConnectionMonitor(IConnectionMonitor *monitor)
+{
+    CriticalBlock block(CGlobalMPServer::sect);
+    if (globalMPServer)
+        globalMPServer->removeConnectionMonitor(monitor);
+}
+
+IMPServer *getMPServer()
+{
+    CriticalBlock block(CGlobalMPServer::sect);
+    assertex(globalMPServer);
+    return LINK(globalMPServer);
+}
 
 
 void registerSelfDestructChildProcess(HANDLE handle)