Browse Source

HPCC-14244 Enable ability to create/use multiple MP servers

Add ability to create multiple MP servers (each on different port).

Signed-off-by: Jake Smith <jake.smith@lexisnexis.com>
Jake Smith 9 năm trước cách đây
mục cha
commit
a4011d03b3
2 tập tin đã thay đổi với 113 bổ sung49 xóa
  1. 103 49
      system/mp/mpcomm.cpp
  2. 10 0
      system/mp/mpcomm.hpp

+ 103 - 49
system/mp/mpcomm.cpp

@@ -455,13 +455,17 @@ class ForwardPacketHandler;
 class UserPacketHandler;
 class CMPNotifyClosedThread;
 
-static class CMPServer: private SuperHashTableOf<CMPChannel,SocketEndpoint>
+typedef SuperHashTableOf<CMPChannel,SocketEndpoint> CMPChannelHT;
+static class CMPServer: private CMPChannelHT, implements IMPServer
 {
-    unsigned short              port;
+    byte RTsalt;
     ISocketSelectHandler        *selecthandler;
     CMPConnectThread            *connectthread;
     CBufferQueue                receiveq;
     CMPNotifyClosedThread       *notifyclosedthread;
+    CriticalSection sect;
+protected:
+    unsigned short              port;
 public:
     static CriticalSection  serversect;
     static int                      servernest;
@@ -476,16 +480,18 @@ public:
     BroadcastPacketHandler      *broadcastpackethandler;    // TAG_SYS_BCAST
     UserPacketHandler           *userpackethandler;         // default
 
+    IMPLEMENT_IINTERFACE_USING(CMPChannelHT);
 
     CMPServer(unsigned _port);
     ~CMPServer();
     void start();
-    void stop();
+    virtual void stop();
     unsigned short getPort() { return port; }
     void setPort(unsigned short _port) { port = _port; }
     CMPChannel *lookup(const SocketEndpoint &remoteep);
     ISocketSelectHandler &querySelectHandler() { return *selecthandler; };
     CBufferQueue &getReceiveQ() { return receiveq; }
+    void checkTagOK(mptag_t tag);
     bool recv(CMessageBuffer &mbuf, const SocketEndpoint *ep, mptag_t tag, CTimeMon &tm);
     void flush(mptag_t tag);
     unsigned probe(const SocketEndpoint *ep, mptag_t tag, CTimeMon &tm, SocketEndpoint &sender);
@@ -509,46 +515,44 @@ protected:
 
     IMPLEMENT_SUPERHASHTABLEOF_REF_FIND(CMPChannel,SocketEndpoint);
 
+    CriticalSection replyTagSect;
+    int rettag;
+    INode *myNode;
 
+public:
+    virtual mptag_t createReplyTag()
+    {
+        // these are short-lived so a simple increment will do (I think this is OK!)
+        mptag_t ret;
+        {
+            CriticalBlock block(replyTagSect);
+            if (RTsalt==0xff) {
+                RTsalt = (byte)(getRandom()%16);
+                rettag = (int)TAG_REPLY_BASE-RTsalt;
+            }
+            if (rettag>(int)TAG_REPLY_BASE) {           // wrapped
+                rettag = (int)TAG_REPLY_BASE-RTsalt;
+            }
+            ret = (mptag_t)rettag;
+            rettag -= 16;
+        }
+        flush(ret);
+        return ret;
+    }
+    virtual ICommunicator *createCommunicator(IGroup *group, bool outer);
+    virtual INode *queryMyNode()
+    {
+        return myNode;
+    }
 } *MPserver=NULL;
 int CMPServer::servernest=0;
 bool CMPServer::serverpaused=false;
 CriticalSection CMPServer::serversect;
 
-byte RTsalt=0xff;
-
 mptag_t createReplyTag()
 {
-    // these are short-lived so a simple increment will do (I think this is OK!)
-    mptag_t ret;
-    {
-        static CriticalSection sect;
-        CriticalBlock block(sect);
-        static int rettag=(int)TAG_REPLY_BASE;  // NB negative
-        if (RTsalt==0xff) {
-            RTsalt = (byte)(getRandom()%16);
-            rettag = (int)TAG_REPLY_BASE-RTsalt;
-        }
-        if (rettag>(int)TAG_REPLY_BASE) {           // wrapped
-            rettag = (int)TAG_REPLY_BASE-RTsalt;
-        }
-        ret = (mptag_t)rettag;
-        rettag -= 16;
-    }
-    if (MPserver)
-        MPserver->flush(ret);
-    return ret;
-}
-
-void checkTagOK(mptag_t tag)
-{
-    if ((int)tag<=(int)TAG_REPLY_BASE) {
-        int dif = (int)TAG_REPLY_BASE-(int)tag;
-        if (dif%16!=RTsalt) {
-            ERRLOG("**Invalid MP tag used");
-            PrintStackReport();
-        }
-    }
+    assertex(MPserver);
+    return MPserver->createReplyTag();
 }
 
 //===========================================================================
@@ -2093,7 +2097,7 @@ public:
 CMPChannel *CMPServer::lookup(const SocketEndpoint &endpoint)
 {
     // there is an assumption here that no removes will be done within this loop
-    CriticalBlock block(serversect);
+    CriticalBlock block(sect);
     SocketEndpoint ep = endpoint;
     CMPChannel *e=find(ep);
     // Check for freed channels
@@ -2124,6 +2128,7 @@ CMPChannel *CMPServer::lookup(const SocketEndpoint &endpoint)
 
 CMPServer::CMPServer(unsigned _port)
 {
+    RTsalt=0xff;
     port = 0;   // connectthread tells me what port it actually connected on
     checkclosed = false;
     connectthread = new CMPConnectThread(this, _port);
@@ -2136,8 +2141,11 @@ CMPServer::CMPServer(unsigned _port)
     userpackethandler = new UserPacketHandler(this);        // default
     notifyclosedthread = new CMPNotifyClosedThread(this);
     notifyclosedthread->start();
-    initMyNode(port); // NB port set by connectthread constructor
     selecthandler->start();
+    rettag = (int)TAG_REPLY_BASE; // NB negative
+
+    SocketEndpoint ep(port); // NB port set by connectthread constructor
+    myNode = createINode(ep);
 }
 
 CMPServer::~CMPServer()
@@ -2160,6 +2168,18 @@ CMPServer::~CMPServer()
     delete multipackethandler;
     delete broadcastpackethandler;
     delete userpackethandler;
+    ::Release(myNode);
+}
+
+void CMPServer::checkTagOK(mptag_t tag)
+{
+    if ((int)tag<=(int)TAG_REPLY_BASE) {
+        int dif = (int)TAG_REPLY_BASE-(int)tag;
+        if (dif%16!=RTsalt) {
+            ERRLOG("**Invalid MP tag used");
+            PrintStackReport();
+        }
+    }
 }
 
 
@@ -2374,7 +2394,7 @@ bool CMPServer::matchesFindParam(const void * et, const void *fp, unsigned) cons
 
 bool CMPServer::nextChannel(CMPChannel *&cur)
 {
-    CriticalBlock block(serversect);
+    CriticalBlock block(sect);
     cur = (CMPChannel *)SuperHashTableOf<CMPChannel,SocketEndpoint>::next(cur);
     return cur!=NULL;
 }
@@ -2460,7 +2480,7 @@ public:
     {
         CriticalBlock block(verifysect);
         CTimeMon tm(timeout);
-        rank_t myrank = group->rank();
+        rank_t myrank = group->rank(parent->queryMyNode());
         {
             ForEachNodeInGroup(rank,*group) {
                 bool doverify;
@@ -2557,7 +2577,7 @@ public:
     bool sendRecv(CMessageBuffer &mbuff, INode *dst, mptag_t dsttag,  unsigned timeout=MP_WAIT_FOREVER)
     {
         assertex(dst);
-        mptag_t replytag = createReplyTag();
+        mptag_t replytag = parent->createReplyTag();
         CTimeMon tm(timeout);
         mbuff.setReplyTag(replytag);
         unsigned remaining;
@@ -2607,6 +2627,7 @@ class CCommunicator: public CInterface, public ICommunicator
     IGroup *group;
     CMPServer *parent;
     bool outer;
+    rank_t myrank;
 
     const SocketEndpoint &queryEndpoint(rank_t rank)
     {
@@ -2627,11 +2648,10 @@ public:
         // send does not corrupt mbuf
         if (dstrank==RANK_NULL)
             return false;
-        rank_t myrank = group->rank();
         if (dstrank==myrank) {
             CMessageBuffer *msg = mbuf.clone();
             // change sender
-            msg->init(queryMyNode()->endpoint(),tag,mbuf.getReplyTag());
+            msg->init(parent->queryMyNode()->endpoint(),tag,mbuf.getReplyTag());
             parent->getReceiveQ().enqueue(msg);
         }
         else {
@@ -2687,7 +2707,6 @@ public:
          * process i sends to process (i + 2^k) % p and receives from process (i - 2^k + p) % p.
          */
 
-        int myrank = group->rank();
         int numranks = group->ordinality();
         CMessageBuffer mb;
         rank_t r;
@@ -2741,7 +2760,6 @@ public:
     {
         CriticalBlock block(verifysect);
         CTimeMon tm(timeout);
-        rank_t myrank = group->rank();
         {
             ForEachNodeInGroup(rank,*group) {
                 bool doverify;
@@ -2850,14 +2868,14 @@ public:
             if (group->ordinality()>1) {
                 do {
                     sendrank = getRandom()%group->ordinality();
-                } while (sendrank==group->rank());
+                } while (sendrank==myrank);
             }
             else {
-                assertex(group->rank()!=0);
+                assertex(myrank!=0);
                 sendrank = 0;
             }
         }
-        mptag_t replytag = createReplyTag();
+        mptag_t replytag = parent->createReplyTag();
         CTimeMon tm(timeout);
         mbuff.setReplyTag(replytag);
         unsigned remaining;
@@ -2913,6 +2931,7 @@ public:
         outer = _outer;
         parent = _parent;
         group = LINK(_group); 
+        myrank = group->rank(parent->queryMyNode());
     }
     ~CCommunicator()
     {
@@ -2922,10 +2941,30 @@ public:
 };
 
 
+// Additional CMPServer methods
+
+ICommunicator *CMPServer::createCommunicator(IGroup *group, bool outer)
+{
+    return new CCommunicator(this,group,outer);
+}
+
+///////////////////////////////////
+
+
+class CGlobalMPServer : public CMPServer
+{
+public:
+    CGlobalMPServer(unsigned _port) : CMPServer(_port)
+    {
+        initMyNode(port); // NB port set by connectthread constructor in base
+    }
+};
+
+
 ICommunicator *createCommunicator(IGroup *group,bool outer)
 {
     assertex(MPserver!=NULL);
-    return new CCommunicator(MPserver,group,outer);
+    return MPserver->createCommunicator(group, outer);
 }
 
 static IInterCommunicator *worldcomm=NULL;
@@ -2939,6 +2978,15 @@ IInterCommunicator &queryWorldCommunicator()
     return *worldcomm;
 }
 
+
+IMPServer *startNewMPServer(unsigned port)
+{
+    assertex(sizeof(PacketHeader)==32);
+    CMPServer *mpServer = new CMPServer(port);
+    mpServer->start();
+    return mpServer;
+}
+
 void startMPServer(unsigned port, bool paused)
 {
     assertex(sizeof(PacketHeader)==32);
@@ -2948,7 +2996,7 @@ void startMPServer(unsigned port, bool paused)
         if (!CMPServer::serverpaused)
         {
             delete MPserver;
-            MPserver = new CMPServer(port);
+            MPserver = new CGlobalMPServer(port);
         }
         if (paused)
         {
@@ -2962,6 +3010,12 @@ void startMPServer(unsigned port, bool paused)
     CMPServer::servernest++;
 }
 
+IMPServer *getMPServer()
+{
+    CriticalBlock block(CMPServer::serversect);
+    assertex(MPserver);
+    return LINK(MPserver);
+}
 
 void stopMPServer()
 {

+ 10 - 0
system/mp/mpcomm.hpp

@@ -90,8 +90,18 @@ extern mp_decl mptag_t createReplyTag(); // creates (short-lived) reply-tag;
 extern mp_decl ICommunicator *createCommunicator(IGroup *group,bool outer=false); // outer allows nodes outside group to send
 extern mp_decl IInterCommunicator &queryWorldCommunicator();
 
+interface IMPServer : extends IInterface
+{
+    virtual mptag_t createReplyTag() = 0;
+    virtual ICommunicator *createCommunicator(IGroup *group, bool outer=false) = 0;
+    virtual void stop() = 0;
+    virtual INode *queryMyNode() = 0;
+};
+
 extern mp_decl void startMPServer(unsigned port,bool paused=false);
 extern mp_decl void stopMPServer();
+extern mp_decl IMPServer *getMPServer();
+extern mp_decl IMPServer *startNewMPServer(unsigned port);
 
 interface IConnectionMonitor: extends IInterface
 {