瀏覽代碼

HPCC-12128 - MP connections take too long

Resolve deadlock when A->B and B->A with higher address+port value

Updated from review and comments, phase 4

Signed-off-by: Mark Kelly <mark.kelly@lexisnexis.com>
Mark Kelly 10 年之前
父節點
當前提交
8350e0b98d
共有 5 個文件被更改,包括 563 次插入96 次删除
  1. 6 0
      dali/base/dacoven.cpp
  2. 272 34
      system/mp/mpcomm.cpp
  3. 1 0
      system/mp/mpcomm.hpp
  4. 1 0
      system/mp/mptag.hpp
  5. 283 62
      system/mp/test/mptest.cpp

+ 6 - 0
dali/base/dacoven.cpp

@@ -290,6 +290,12 @@ public:
         return (coven.queryGroup().rank(ep)!=RANK_NULL);
     }
 
+    virtual void barrier(void)
+    {
+        assertex(comm);
+        return comm->barrier();
+    }
+
     virtual bool verifyConnection(rank_t rank, unsigned timeout=1000*60*5)
     {
         assertex(comm);

+ 272 - 34
system/mp/mpcomm.cpp

@@ -60,15 +60,21 @@
 #define CANCELTIMEOUT       1000        // 1 sec
 
 #define CONNECT_TIMEOUT         (5*60*1000) // 5 mins
+
 #define CONNECT_READ_TIMEOUT    (3*60*1000) // 3 mins
-#define CONNECT_TIMEOUT_INTERVAL 5000 // 5 secs
+#define CONNECT_TIMEOUT_INTERVAL 1000 // 1 sec
 #define CONFIRM_TIMEOUT         (CONNECT_READ_TIMEOUT/2) // 1.5 mins
 #define CONFIRM_TIMEOUT_INTERVAL 5000 // 5 secs
-#define CONFIRM_TRACESLOW_THRESHOLD 1000 // 1 sec
+#define TRACESLOW_THRESHOLD      1000 // 1 sec
 
 #define VERIFY_DELAY            (1*60*1000)  // 1 Minute
 #define VERIFY_TIMEOUT          (1*60*1000)  // 1 Minute
 
+#define DIGIT1 (256UL*256UL*256UL*65536UL)
+#define DIGIT2 (256UL*256UL*65536UL)
+#define DIGIT3 (256UL*65536UL)
+#define DIGIT4 (65536UL)
+
 #define _TRACING
 
 static  CriticalSection verifysect;
@@ -642,7 +648,7 @@ void traceSlowReadTms(const char *msg, ISocket *sock, void *dst, size32_t minSiz
             WARNLOG("%s %s, stalled for %d ms so far", msg, epStr.str(), elapsedMs);
         }
     }
-    if (readTmsTimer.elapsedMs() >= CONFIRM_TRACESLOW_THRESHOLD)
+    if (readTmsTimer.elapsedMs() >= TRACESLOW_THRESHOLD)
     {
         if (0 == epStr.length())
         {
@@ -669,6 +675,10 @@ class CMPChannel: public CInterface
     mptag_t multitag;                   // current multi send in progress
     bool closed;
     IArrayOf<ISocket> keptsockets;
+    CriticalSection attachsect;
+    unsigned long attachaddrval;
+    SocketEndpoint attachep;
+    atomic_t attachchk;
 
 protected: friend class CMPServer;
     SocketEndpoint remoteep;
@@ -687,8 +697,9 @@ protected: friend class CMPPacketReader;
         // also in sendmutex
 
         ISocket *newsock=NULL;
-        unsigned retrycount = 10;
+        unsigned retrycount = 20;
         unsigned remaining;
+
         while (!channelsock) {
             try {
                 StringBuffer str;
@@ -708,7 +719,7 @@ protected: friend class CMPPacketReader;
                 newsock = ISocket::connect_timeout(remoteep,remaining);
                 newsock->set_keep_alive(true);
 #ifdef _FULLTRACE
-                LOG(MCdebugInfo(100), unknownJob, "MP: connect after socket connect");
+                LOG(MCdebugInfo(100), unknownJob, "MP: connect after socket connect, retrycount = %d", retrycount);
 #endif
 
                 SocketEndpointV4 id[2];
@@ -716,7 +727,14 @@ protected: friend class CMPPacketReader;
                 hostep.setLocalHost(parent->getPort());
                 id[0].set(hostep);
                 id[1].set(remoteep);
+
+                unsigned __int64 addrval = DIGIT1*id[0].ip[0] + DIGIT2*id[0].ip[1] + DIGIT3*id[0].ip[2] + DIGIT4*id[0].ip[3] + id[0].port;
+#ifdef _TRACE
+                PROGLOG("MP: connect addrval = %lu", addrval);
+#endif
+
                 newsock->write(&id[0],sizeof(id)); 
+
 #ifdef _FULLTRACE
                 StringBuffer tmp1;
                 id[0].getUrlStr(tmp1);
@@ -724,15 +742,121 @@ protected: friend class CMPPacketReader;
                 id[1].getUrlStr(tmp1);
                 LOG(MCdebugInfo(100), unknownJob, "MP: connect after socket write %s",tmp1.str());
 #endif
-                size32_t reply;
-                size32_t rd;
-                traceSlowReadTms("MP: connect to", newsock, &reply, sizeof(reply), sizeof(reply), rd, CONNECT_READ_TIMEOUT, CONNECT_TIMEOUT_INTERVAL);
+
+                size32_t reply = 0;
+                size32_t rd = 0;
+
+#ifdef _TRACE
+                LOG(MCdebugInfo(100), unknownJob, "MP: connect after socket write, waiting for read");
+#endif
+
+                // Wait for connection reply but also check for A<->B deadlock (where both processes are here
+                // waiting for other side to send confirm) and decide who stops waiting based on address.
+                // To be compatible with older versions of mplib which will not do this,
+                // loop with short wait time and release CS to allow other side to proceed
+                StringBuffer epStr;
+                unsigned startMs = msTick();
+
+                unsigned loopCnt = ((CONNECT_READ_TIMEOUT / retrycount) / CONNECT_TIMEOUT_INTERVAL) + 1;
+#ifdef _TRACE
+                PROGLOG("MP: loopCnt start = %u", loopCnt);
+#endif
+                while (loopCnt-- > 0)
+                {
+                    {
+                        CriticalBlock block(attachsect);
+#ifdef _TRACE
+                        PROGLOG("MP: connect got attachsect, attachchk = %d, loopCnt = %u", atomic_read(&attachchk), loopCnt);
+#endif
+                        if (atomic_read(&attachchk) > 0)
+                        {
+                            if (remoteep.equals(attachep))
+                            {
+#ifdef _TRACE
+                                PROGLOG("MP: deadlock situation [] attachaddrval = %lu addrval = %lu", attachaddrval, addrval);
+#endif
+                                if (attachaddrval < addrval)
+                                    break;
+                            }
+                        }
+                    }
+
+                    rd = 0;
+
+                    try
+                    {
+                        newsock->readtms(&reply,sizeof(reply),sizeof(reply),rd,CONNECT_TIMEOUT_INTERVAL);
+                    }
+                    catch (IException *e)
+                    {
+#ifdef _TRACE
+                        PROGLOG("MP: loop exception code = %d, loopCnt = %u", e->errorCode(), loopCnt);
+#endif
+                        if ( (e->errorCode() != JSOCKERR_timeout_expired) ||
+                             ((e->errorCode() == JSOCKERR_timeout_expired) && (loopCnt == 0)) )
+                        {
+                                if (tm.timedout(&remaining))
+                                {
 #ifdef _FULLTRACE
-                LOG(MCdebugInfo(100), unknownJob, "MP: connect after socket read %d",reply);
+                                    EXCLOG(e,"MP: connect timed out 3");
+#endif
+                                    e->Release();
+                                    newsock->Release();
+                                    return false;
+                                }
+#ifdef _TRACE
+                                EXCLOG(e, "MP: Failed to connect");
+#endif
+                                e->Release();
+                                if ((retrycount--==0)||(tm.timeout==MP_ASYNC_SEND))
+                                {   // don't bother retrying on async send
+                                    IMP_Exception *e=new CMPException(MPERR_connection_failed,remoteep);
+                                    throw e;
+                                }
+#ifdef _TRACE
+                                LOG(MCdebugInfo(100), unknownJob, "MP: Retrying connection to %s, %d attempts left",remoteep.getUrlStr(str).toCharArray(),retrycount+1);
+#endif
+                        }
+                        else
+                        {
+                            if (0 == epStr.length())
+                            {
+                                SocketEndpoint ep;
+                                newsock->getPeerEndpoint(ep);
+                                ep.getUrlStr(epStr);
+                            }
+                            WARNLOG("MP: connect to: %s, stalled for %d ms so far", epStr.str(), msTick()-startMs);
+                            e->Release();
+                        }
+                    }
+#ifdef _FULLTRACE
+                    PROGLOG("MP: rd = %d", rd);
+#endif
+                    if (rd != 0)
+                        break;
+                }
+
+#ifdef _TRACE
+                LOG(MCdebugInfo(100), unknownJob, "MP: connect after socket read rd=%u, reply=%u, sizeof(id)=%lu", rd, reply, sizeof(id));
 #endif
-                if (reply!=0) {
+
+                if (reply!=0)
+                {
+                    unsigned elapsedMs = msTick() - startMs;
+                    if (elapsedMs >= TRACESLOW_THRESHOLD)
+                    {
+                        if (0 == epStr.length())
+                        {
+                            SocketEndpoint ep;
+                            newsock->getPeerEndpoint(ep);
+                            ep.getUrlStr(epStr);
+                        }
+                        WARNLOG("MP: connect to: %s, took: %d ms", epStr.str(), elapsedMs);
+                    }
+
                     assertex(reply==sizeof(id));    // how can this fail?
-                    if (attachSocket(newsock,remoteep,hostep,true, NULL)) {
+                    if (attachSocket(newsock,remoteep,hostep,true,NULL,addrval))
+                    {
                         newsock->Release();
 #ifdef _TRACE
                         LOG(MCdebugInfo(100), unknownJob, "MP: connected to %s",str.toCharArray());
@@ -742,6 +866,7 @@ protected: friend class CMPPacketReader;
                         break;
                     }
                 }
+
             }
             catch (IException *e)
             {
@@ -752,7 +877,6 @@ protected: friend class CMPPacketReader;
                     e->Release();
                     return false;
                 }
-                StringBuffer str;
 #ifdef _TRACE
                 EXCLOG(e, "MP: Failed to connect");
 #endif
@@ -762,22 +886,40 @@ protected: friend class CMPPacketReader;
                     throw e;
                 }
 #ifdef _TRACE
+                StringBuffer str;
                 str.clear();
                 LOG(MCdebugInfo(100), unknownJob, "MP: Retrying connection to %s, %d attempts left",remoteep.getUrlStr(str).toCharArray(),retrycount+1);
 #endif
             }
+
             ::Release(newsock);
             newsock = NULL;
+
             {
                 CriticalUnblock unblock(connectsect); // to avoid connecting philosopher problem
 #ifdef _FULLTRACE
-                    PROGLOG("MP: before sleep");
+                PROGLOG("MP: before sleep");
 #endif
-                Sleep(2000+getRandom()%3000);
+                // check often if channelsock was created from accept thread
+                Sleep(50);
+                unsigned totalt = 2000 + getRandom() % 3000;
+                unsigned startt = msTick();
+                unsigned deltat = 0;
+                while (deltat < totalt)
+                {
+                    {
+                        CriticalBlock block(connectsect);
+                        if (channelsock)
+                            break;
+                    }
+                    deltat = msTick() - startt;
+                    Sleep(50);
+                }
 #ifdef _FULLTRACE
-                    PROGLOG("MP: after sleep");
+                PROGLOG("MP: after sleep");
 #endif
             }
+
         }
         return true;
     }
@@ -792,8 +934,7 @@ public:
 
     void reset();
 
-    bool attachSocket(ISocket *newsock,const SocketEndpoint &_remoteep,const SocketEndpoint &_localep,bool ismaster, size32_t *confirm);
-
+    bool attachSocket(ISocket *newsock,const SocketEndpoint &_remoteep,const SocketEndpoint &_localep,bool ismaster,size32_t *confirm, unsigned long addrval=0);
 
     bool writepacket(const void *hdr,size32_t hdrsize,const void *hdr2,size32_t hdr2size,const void *body,size32_t bodysize,CTimeMon &tm)
     {
@@ -939,6 +1080,12 @@ public:
                 parent->checkclosed = true;
             s=channelsock;
             channelsock = NULL;
+            {
+                CriticalBlock block(attachsect);
+                attachaddrval = 0;
+                attachep.set(NULL);
+                atomic_set(&attachchk, 0);
+            }
             if (!keepsocket) {
                 try {
                     s->shutdown();
@@ -1382,6 +1529,9 @@ CMPChannel::CMPChannel(CMPServer *_parent,SocketEndpoint &_remoteep)
     closed = false;
     master = false;
     sendwaiting = 0;
+    attachaddrval = 0;
+    attachep.set(NULL);
+    atomic_set(&attachchk, 0);
 }
 
 void CMPChannel::reset()
@@ -1395,6 +1545,9 @@ void CMPChannel::reset()
     closed = false;
     master = false;
     sendwaiting = 0;
+    attachaddrval = 0;
+    attachep.set(NULL);
+    atomic_set(&attachchk, 0);
 }
 
 
@@ -1405,24 +1558,45 @@ CMPChannel::~CMPChannel()
     reader->Release();
 }
 
-bool CMPChannel::attachSocket(ISocket *newsock,const SocketEndpoint &remoteep,const SocketEndpoint &_localep,bool ismaster, size32_t *confirm) // takes ownership if succeeds
+bool CMPChannel::attachSocket(ISocket *newsock,const SocketEndpoint &_remoteep,const SocketEndpoint &_localep,bool ismaster,size32_t *confirm, unsigned long addrval) // takes ownership if succeeds
 {
+    struct attachdTor
+    {
+        atomic_t &attchk;
+        attachdTor(atomic_t &_attchk) : attchk(_attchk) { }
+        ~attachdTor() { atomic_dec(&attchk); }
+    } attachChk (attachchk);
+
 #ifdef _FULLTRACE       
-    PROGLOG("MP: attachSocket on entry");
+    PROGLOG("MP: attachSocket on entry, ismaster = %d, confirm = %p, channelsock = %p, addrval = %u", ismaster, confirm, channelsock, addrval);
 #endif
-    CriticalBlock block(connectsect); 
+
+    {
+        CriticalBlock block(attachsect);
+        attachaddrval = addrval;
+        attachep = _remoteep;
+        atomic_inc(&attachchk);
+    }
+
+    CriticalBlock block(connectsect);
+
 #ifdef _FULLTRACE       
-    PROGLOG("MP: attachSocket got connectsect");
+    PROGLOG("MP: attachSocket got connectsect, channelsock = %p", channelsock);
 #endif
+
     // resolution to stop clash i.e. A sends to B at exactly same time B sends to A
+
     if (channelsock) {
-        if (remoteep.port==0)
+
+        if (_remoteep.port==0)
             return false;
+
         StringBuffer ep1;
         StringBuffer ep2;
         _localep.getUrlStr(ep1);
-        remoteep.getUrlStr(ep2);
+        _remoteep.getUrlStr(ep2);
         LOG(MCdebugInfo(100), unknownJob, "MP: Possible clash between %s->%s %d(%d)",ep1.str(),ep2.str(),(int)ismaster,(int)master);
+
         try {
             if (ismaster!=master) {
                 if (ismaster) {
@@ -1459,20 +1633,27 @@ bool CMPChannel::attachSocket(ISocket *newsock,const SocketEndpoint &remoteep,co
         }
 
     }
+
     if (confirm)
         newsock->write(confirm,sizeof(*confirm)); // confirm while still in connectsect
+
     closed = false;
     reader->init(this);
     channelsock = LINK(newsock);
+
 #ifdef _FULLTRACE       
     PROGLOG("MP: attachSocket before select add");
 #endif
+
     parent->querySelectHandler().add(channelsock,SELECTMODE_READ,reader);
+
 #ifdef _FULLTRACE       
     PROGLOG("MP: attachSocket after select add");
 #endif
+
     localep = _localep;
     master = ismaster;
+
     return true;
 }
 
@@ -1686,7 +1867,7 @@ void CMPConnectThread::start(unsigned short port)
 int CMPConnectThread::run()
 {
 #ifdef _TRACE
-    LOG(MCdebugInfo(100), unknownJob, "MP: Connect Thread Starting");
+    LOG(MCdebugInfo(100), unknownJob, "MP: Connect Thread Starting - accept loop");
 #endif
     while (running) {
         ISocket *sock=NULL;
@@ -1708,7 +1889,7 @@ int CMPConnectThread::run()
             try {
                 sock->set_keep_alive(true);
                 size32_t rd;
-                SocketEndpoint remoteep;
+                SocketEndpoint _remoteep;
                 SocketEndpoint hostep;
                 SocketEndpointV4 id[2];
                 traceSlowReadTms("MP: initial accept packet from", sock, &id[0], sizeof(id), sizeof(id), rd, CONFIRM_TIMEOUT, CONFIRM_TIMEOUT_INTERVAL);
@@ -1722,11 +1903,17 @@ int CMPConnectThread::run()
                     sock->close();
                     continue;
                 }
-                id[0].get(remoteep);
+                id[0].get(_remoteep);
                 id[1].get(hostep);
-                if (remoteep.isNull() || hostep.isNull())
+
+                unsigned __int64 addrval = DIGIT1*id[0].ip[0] + DIGIT2*id[0].ip[1] + DIGIT3*id[0].ip[2] + DIGIT4*id[0].ip[3] + id[0].port;
+#ifdef _TRACE
+                PROGLOG("MP: Connect Thread: addrval = %lu", addrval);
+#endif
+
+                if (_remoteep.isNull() || hostep.isNull())
                 {
-                    // JCSMORE, I think remoteep really must/should match a IP of this local host
+                    // JCSMORE, I think _remoteep really must/should match a IP of this local host
                     StringBuffer errMsg("MP Connect Thread: invalid remote and/or host ep serialized from ");
                     SocketEndpoint ep;
                     sock->getPeerEndpoint(ep);
@@ -1737,13 +1924,13 @@ int CMPConnectThread::run()
                 }
 #ifdef _FULLTRACE       
                 StringBuffer tmp1;
-                remoteep.getUrlStr(tmp1);
+                _remoteep.getUrlStr(tmp1);
                 tmp1.append(' ');
                 hostep.getUrlStr(tmp1);
                 PROGLOG("MP: Connect Thread: after read %s",tmp1.str());
 #endif
                 checkSelfDestruct(&id[0],sizeof(id));
-                if (!parent->lookup(remoteep).attachSocket(sock,remoteep,hostep,false, &rd)) {
+                if (!parent->lookup(_remoteep).attachSocket(sock,_remoteep,hostep,false,&rd,addrval)) {
 #ifdef _FULLTRACE       
                     PROGLOG("MP Connect Thread: lookup failed");
 #endif
@@ -1752,7 +1939,7 @@ int CMPConnectThread::run()
 #ifdef _TRACE
                     StringBuffer str1;
                     StringBuffer str2;
-                    LOG(MCdebugInfo(100), unknownJob, "MP Connect Thread: connected to %s",remoteep.getUrlStr(str1).toCharArray());
+                    LOG(MCdebugInfo(100), unknownJob, "MP Connect Thread: connected to %s",_remoteep.getUrlStr(str1).toCharArray());
 #endif
                 }
 #ifdef _FULLTRACE       
@@ -2118,7 +2305,7 @@ void CMPServer::notifyClosed(SocketEndpoint &ep)
 {
 #ifdef _TRACE
     StringBuffer url;
-    LOG(MCdebugInfo(100), unknownJob, "CMPServer::notifyClosed %s",ep.getUrlStr(url).str());
+    LOG(MCdebugInfo(100), unknownJob, "MP: CMPServer::notifyClosed %s",ep.getUrlStr(url).str());
 #endif
     notifyclosedthread->notify(ep);
 }
@@ -2408,6 +2595,57 @@ public:
         return true;
     }
 
+    void barrier(void)
+    {
+#ifdef _TRACE
+        PrintLog("MP: barrier enter");
+#endif
+
+        /*
+         * Use the dissemination algorithm described in:
+         * Debra Hensgen, Raphael Finkel, and Udi Manbet, "Two Algorithms for Barrier Synchronization,"
+         * International Journal of Parallel Programming, 17(1):1-17, 1988.
+         * It uses ceiling(lgp) steps. In step k, 0 <= k <= (ceiling(lgp)-1),
+         * process i sends to process (i + 2^k) % p and receives from process (i - 2^k + p) % p.
+         */
+
+        int myrank = group->rank();
+        int numranks = group->ordinality();
+        CMessageBuffer mb;
+        rank_t r;
+
+        int mask = 0x1;
+        while (mask < numranks)
+        {
+            int dst = (myrank + mask) % numranks;
+            int src = (myrank - mask + numranks) % numranks;
+
+#ifdef _TRACE
+            PrintLog("MP: barrier: send to %d, recv from %d", dst, src);
+#endif
+
+            // NOTE: MPI method MUST use sendrecv so as to not send/recv deadlock ...
+
+            mb.clear();
+            mb.append("MPTAG_BARRIER");
+            bool oks = send(mb,dst,MPTAG_BARRIER,120000);
+            mb.clear();
+            bool okr = recv(mb,src,MPTAG_BARRIER,&r);
+
+            if (!oks && !okr)
+            {
+                PrintLog("MP: barrier: Error sending or recving");
+                break;
+            }
+
+            mask <<= 1;
+        }
+
+#ifdef _TRACE
+        PrintLog("MP: barrier leave");
+#endif
+    }
+
     bool verifyConnection(rank_t rank,  unsigned timeout)
     {
         CriticalBlock block(verifysect);
@@ -2653,7 +2891,7 @@ void stopMPServer()
     if (--CMPServer::servernest==0) {
         stopLogMsgReceivers();
 #ifdef _TRACE
-        LOG(MCdebugInfo(100), unknownJob, "Stopping MP Server");
+        LOG(MCdebugInfo(100), unknownJob, "MP: Stopping MP Server");
 #endif
         CriticalUnblock unblock(CMPServer::serversect);
         assertex(MPserver!=NULL);
@@ -2664,7 +2902,7 @@ void stopMPServer()
         worldcomm = NULL;
         initMyNode(0);
 #ifdef _TRACE
-        LOG(MCdebugInfo(100), unknownJob, "Stopped MP Server");
+        LOG(MCdebugInfo(100), unknownJob, "MP: Stopped MP Server");
 #endif
     }
 }

+ 1 - 0
system/mp/mpcomm.hpp

@@ -56,6 +56,7 @@ interface ICommunicator: extends IInterface
     virtual bool verifyConnection(rank_t rank, unsigned timeout=1000*60*5) = 0; // verifies connected to rank
     virtual bool verifyAll(bool duplex=false, unsigned timeout=1000*60*30) = 0;
     virtual void disconnect(INode *node) = 0;
+    virtual void barrier() = 0;
 };
 
 interface IInterCommunicator: extends IInterface

+ 1 - 0
system/mp/mptag.hpp

@@ -55,6 +55,7 @@ TAGENUM
     DEFTAG ( MPTAG_THORRESOURCELOCK )
     DEFTAG ( MPTAG_MPTX )
     DEFTAG ( MPTAG_THORWATCHDOG )
+    DEFTAG ( MPTAG_BARRIER )
 
     // new static tags go above here
 

+ 283 - 62
system/mp/test/mptest.cpp

@@ -10,10 +10,14 @@
 #include <mpbase.hpp>
 #include <mpcomm.hpp>
 
+using namespace std;
+
 #define MPPORT 8888
 
-//#define MULTITEST
-#define STREAMTEST
+#define MULTITEST
+//#define STREAMTEST
+//#define MPITEST
+//#define MPITEST2
 //#define GPF
 
 #ifdef MULTITEST
@@ -21,7 +25,8 @@
 //#define MYMACHINES "192.168.16.124,10.150.10.17,10.150.10.18,10.150.10.19,10.150.10.20,10.150.10.21,10.150.10.22,10.150.10.23,10.150.10.47,10.150.10.48,10.150.10.49,10.150.10.50,10.150.10.51,10.150.10.52,10.150.10.53,10.150.10.54,10.150.10.55,10.150.10.73,10.150.10.75,10.150.10.79"
 #endif
 
-#define aWhile 100000
+// #define aWhile 100000
+#define aWhile 10
 
 
 
@@ -112,8 +117,10 @@ public:
 static CSectionTimer STsend("send");
 static CSectionTimer STrecv("recv");
 
-#define NITER 100
+//#define NITER 100
+#define NITER 40
 #define BLOCKSIZE (0x100000*10)
+//#define BLOCKSIZE (0x1000*10)
 
 #define WRITEDELAY 100
 #define READDELAY   5000
@@ -122,6 +129,7 @@ void StreamTest(IGroup *group,ICommunicator *comm)
 {
     void *bufs[18]; 
     unsigned bi;
+
     for (bi=0;bi<16;bi++) {
         bufs[bi] = malloc(1024*1024*100);
         assertex(bufs[bi]);
@@ -130,7 +138,7 @@ void StreamTest(IGroup *group,ICommunicator *comm)
 
     CMessageBuffer mb;
     for (unsigned i=0;i<NITER;i++) {
-        if (group->rank()!=0) {
+        if (group->rank() == 1) {
             mb.clear();
             StringBuffer header;
             header.append("Test Block #").append(i);
@@ -143,7 +151,7 @@ void StreamTest(IGroup *group,ICommunicator *comm)
             PrintLog("Sent");
             //Sleep(WRITEDELAY);
         }
-        else {
+        else if (group->rank() == 0) {
             rank_t r;
             comm->recv(mb,RANK_ALL,MPTAG_TEST,&r);
             StringAttr str;
@@ -157,9 +165,15 @@ void StreamTest(IGroup *group,ICommunicator *comm)
             //  Sleep(1000*1000); // 15 mins or so
             //Sleep(READDELAY);
         }
+        else
+            PrintLog("Skipping extra rank %d", group->rank());
     }
+
+    comm->barrier();
+
     for (bi=0;bi<16;bi++) 
         free(bufs[bi]);
+
     STsend.print();
     STrecv.print();
 }
@@ -167,29 +181,32 @@ void StreamTest(IGroup *group,ICommunicator *comm)
 
 void Test1(IGroup *group,ICommunicator *comm)
 { 
+    PrintLog("test1");
     CMessageBuffer mb;
     if (group->rank()==0) {
         mb.append("Hello - Test1");
         comm->send(mb,1,MPTAG_TEST);
     }
-    else {
+    else if (group->rank()==1) {
         rank_t r;
         comm->recv(mb,0,MPTAG_TEST,&r);
         StringAttr str;
         mb.read(str);
         PrintLog("(1) Received '%s' from rank %d",str.get(),r);
     }
+    comm->barrier();
 }
 
 
 void Test2(IGroup *group,ICommunicator *comm)
 {
+    PrintLog("test2");
     CMessageBuffer mb;
     if (group->rank()==0) {
         mb.append("Hello - Test2");
         comm->send(mb,RANK_ALL,MPTAG_TEST);
     }
-    else {
+    else if (group->rank()==1) {
 #ifdef GPF
         PrintLog("GPFING");
         Sleep(aWhile);
@@ -201,66 +218,73 @@ void Test2(IGroup *group,ICommunicator *comm)
         mb.read(str);
         PrintLog("(2) Received '%s' from rank %d",str.get(),r);
     }
+    comm->barrier();
 }
 
 
 void Test3(IGroup *group,ICommunicator *comm)
 {
+    PrintLog("test3");
     CMessageBuffer mb;
     if (group->rank()==0) {
         mb.append("Hello - Test3");
         comm->send(mb,1,MPTAG_TEST);
     }
-    else {
+    else if (group->rank()==1) {
         rank_t r;
         comm->recv(mb,RANK_ALL,MPTAG_TEST,&r);
         StringAttr str;
         mb.read(str);
         PrintLog("(3) Received '%s' from rank %d",str.get(),r);
     }
+    comm->barrier();
 }
 
 
 void Test4(IGroup *group,ICommunicator *comm)
 {
+    PrintLog("test4");
     CMessageBuffer mb;
     if (group->rank()==0) {
         INode *singlenode=&group->queryNode(1);
         IGroup *singlegroup = createIGroup(1,&singlenode);
-        ICommunicator * singlecomm = createCommunicator(singlegroup,true);
+        ICommunicator * singlecomm = createCommunicator(singlegroup);
         mb.append("Hello - Test4");
         singlecomm->send(mb,0,MPTAG_TEST);
         singlecomm->Release();
         singlegroup->Release();
     }
-    else {
+    else if (group->rank()==1) {
         rank_t r;
         comm->recv(mb,RANK_ALL,MPTAG_TEST,&r);
         StringAttr str;
         mb.read(str);
         PrintLog("(4) Received '%s' from rank %d",str.get(),r);
     }
+    comm->barrier();
 }
 
 
 void Test5(IGroup *group,ICommunicator *comm)
 {
+    PrintLog("test5");
     rank_t rank = group->rank();
     INode *singlenode=&group->queryNode(1);
     IGroup *singlegroup = createIGroup(1,&singlenode);
-    ICommunicator * singlecomm = createCommunicator(singlegroup,true);
+    ICommunicator * singlecomm = createCommunicator(singlegroup);
     CMessageBuffer mb;
     if (rank==0) {
         mb.append("Hello - Test5");
         singlecomm->send(mb,0,MPTAG_TEST);
     }
-    else {
+    else if (rank==1) {
         rank_t r;
         singlecomm->recv(mb,RANK_ALL,MPTAG_TEST,&r);
         StringAttr str;
         mb.read(str);
         PrintLog("(5) Received '%s' from rank %d (unknown)",str.get(),r);
     }
+    comm->barrier();
     singlecomm->Release();
     singlegroup->Release();
 }
@@ -268,6 +292,7 @@ void Test5(IGroup *group,ICommunicator *comm)
 
 void Test6(IGroup *group,ICommunicator *comm)
 {
+    PrintLog("test6");
     //DebugBreak();
     CMessageBuffer mb;
     StringAttr str;
@@ -279,40 +304,45 @@ void Test6(IGroup *group,ICommunicator *comm)
         StringBuffer url;
         PrintLog("(6) Received '%s' from %s",str.get(),mb.getSender().getUrlStr(url).str());
     }
-    else {
+    else if (group->rank()==0) {
         rank_t r;
         comm->recv(mb,RANK_ALL,MPTAG_TEST,&r);
         mb.read(str);
+        PrintLog("(6) - str = <%s>", str.get());
         assertex(strcmp(str.get(),"Test")==0);
         mb.clear();
         mb.append("Hello - Test6");
 
         printf("crash now!");
-        Sleep(10*1000);
+        Sleep(1);
 
         comm->reply(mb);
     }
+    comm->barrier();
 }
 
 void Test7(IGroup *group,ICommunicator *comm)
 { 
+    PrintLog("test7");
     CMessageBuffer mb;
     if (group->rank()==0) {
         mb.append("Hello - Test7");
         mb.reserve(150*1024);
         comm->send(mb,1,MPTAG_TEST);
     }
-    else {
+    else if (group->rank()==1) {
         rank_t r;
         comm->recv(mb,(mptag_t) TAG_ALL,MPTAG_TEST,&r);
         StringAttr str;
         mb.read(str);
         PrintLog("Received '%s' from rank %d",str.get(),r);
     }
+    comm->barrier();
 }
 
 
-#define MAXBUFFERSIZE 0x100000
+// #define MAXBUFFERSIZE 0x100000
+#define MAXBUFFERSIZE 0x10000
 struct CRandomBuffer
 {   
     size32_t size;
@@ -320,13 +350,19 @@ struct CRandomBuffer
     unsigned crc;
     void fill() {
         size = getRandom()%MAXBUFFERSIZE;
+        // size = 100000;
         if (size) {
             char c = (char)getRandom();
+#if 0
             for (unsigned i=0;i<size;i++) {
                 buffer[i] = c;
                 c += (c*16);
                 c += 113;
             }
+#endif
+            for (unsigned i=0;i<size;i++) {
+                buffer[i] = 'a' + i%26;
+            }
         }
         crc = crc32(&buffer[0],size,0);
     }
@@ -352,11 +388,15 @@ struct CRandomBuffer
     }
     void serialize(MemoryBuffer &mb)
     {
+        // PROGLOG("1serialize: size = %u, length = %u", size, mb.length());
         mb.append(size).append(size,buffer).append(crc);
+        // PROGLOG("2serialize: size = %u, length = %u", size, mb.length());
     }
     void deserialize(MemoryBuffer &mb)
     {
+        // PROGLOG("1de-serialize: size = %u, length = %u", size, mb.length());
         mb.read(size);
+        // PROGLOG("2de-serialize: size = %u, length = %u", size, mb.length());
         mb.read(size,buffer).read(crc);
     }
 };
@@ -368,7 +408,8 @@ void printtrc(char c)
     printf("%c",c);
 }
 
-#define N 100
+// #define N 100
+#define N 20
 
 void MultiTest(ICommunicator *_comm)
 {
@@ -383,19 +424,24 @@ void MultiTest(ICommunicator *_comm)
             unsigned n=(comm->queryGroup().ordinality()-1)*N;
             CMessageBuffer mb;
             CRandomBuffer *buff = new CRandomBuffer();
-            PrintLog("started server");
+            PrintLog("MPTEST: started server");
             try {
                 while(n--) {
                     mb.clear();
                     rank_t rr;
                     if (!comm->recv(mb,RANK_ALL,MPTAG_TEST,&rr)) 
                         break;
+                    PrintLog("MPTEST: Received from %d, len = %d",rr, mb.length());
                     StringBuffer str;
                     comm->queryGroup().queryNode(rr).endpoint().getUrlStr(str);
-//                  PrintLog("Received from %s",str.str());
+                    // PrintLog("MPTEST: Received from %s",str.str());
+
                     buff->deserialize(mb);
+
+#ifdef DO_CRC_CHECK
                     if (!buff->check())
-                        PrintLog("Received from %s",str.str());
+                        PrintLog("MPTEST: Received from %s",str.str());
+#endif
 
                     mb.clear().append(buff->crc);
                     comm->reply(mb);
@@ -404,15 +450,21 @@ void MultiTest(ICommunicator *_comm)
             catch (IException *e) {
                 pexception("Server Exception",e);
             }
-            PrintLog("stopped server");
+
+            comm->barrier();  // MCK
+
+            PrintLog("MPTEST: stopped server");
             delete buff;
             return 0;
         }
 
     } server(_comm);
+
     Owned<ICommunicator> comm;
     comm.set(_comm); 
+
     server.start();
+
     CMessageBuffer mb;
     CRandomBuffer *buff = new CRandomBuffer();
     unsigned nr = comm->queryGroup().ordinality();
@@ -421,10 +473,12 @@ void MultiTest(ICommunicator *_comm)
     rank_t *targets = new rank_t[n];
     rank_t *t = targets;
     rank_t i;
+
     for (i=0;i<nr;i++) 
         if (i!=r)
             for (unsigned j=0;j<N;j++) 
                 *(t++) = i;
+
     unsigned k=n;
     while (k>1) {
         i = getRandom()%k;  // NB n is correct here 
@@ -433,17 +487,27 @@ void MultiTest(ICommunicator *_comm)
         targets[i] = targets[k];
         targets[k] = t;
     }
-    PrintLog("client started");
+
+    PrintLog("MPTEST: client started");
+
     try {
         while (n--) {
             buff->fill();
             buff->serialize(mb.clear());
+
+#if 0
             StringBuffer str;
             comm->queryGroup().queryNode(targets[n]).endpoint().getUrlStr(str);
-//          PrintLog("Sending to %s",str.str());
+            PrintLog("MPTEST: Sending to %s, length=%u",str.str(), mb.length());
+#endif
+
+            PrintLog("MPTEST: Sending to %d, length=%u", targets[n], mb.length());
+
             if (!comm->sendRecv(mb,targets[n],MPTAG_TEST)) 
                 break;
-//          PrintLog("Sent to %s",str.str());
+
+            // Sleep((n+1)*2000);
+            // PrintLog("MPTEST: Sent to %s",str.str());
             unsigned crc;
             mb.read(crc);
             assertex(crc==buff->crc);
@@ -452,12 +516,134 @@ void MultiTest(ICommunicator *_comm)
     catch (IException *e) {
         pexception("Client Exception",e);
     }
-    PrintLog("client finished");
+
+    PrintLog("MPTEST: client finished");
+
     server.join();
+
     delete [] targets;
     delete buff;
 }
 
+void MPITest(IGroup *group, ICommunicator *mpicomm)
+{
+    CMessageBuffer mb;
+    CMessageBuffer mb2;
+    int myrank = group->rank();
+    int numranks = group->ordinality();
+
+    int rnksumtotal = 0;
+    for(int i=0;i<numranks;i++)
+        rnksumtotal += (i+1);
+
+    PrintLog("MPTEST: MPITest myrank=%d numranks=%d rnksumtotal=%d", myrank, numranks, rnksumtotal);
+
+    // send and recv to/from all others without a send/recv deadlock ...
+
+    mb.clear();
+    mb.append(myrank+1);
+
+    rank_t r;
+    int rankval;
+    int ranksum = myrank+1;
+
+    int left, right;
+
+    if (numranks == 2)
+    {
+        if (myrank == 0)
+        {
+            left = 1;
+            right = 1;
+            PrintLog("MPTEST: MPITest: %d send to rank %d", myrank, right);
+            mpicomm->send(mb,right,MPTAG_TEST);
+
+            mb2.clear();
+            PrintLog("MPTEST: MPITest: %d recv from rank %d", myrank, left);
+            mpicomm->recv(mb2,left,MPTAG_TEST,&r);
+            mb2.read(rankval);
+            ranksum += rankval;
+        }
+        else
+        {
+            left = 0;
+            right = 0;
+            mb2.clear();
+            PrintLog("MPTEST: MPITest: %d recv from rank %d", myrank, left);
+            mpicomm->recv(mb2,left,MPTAG_TEST,&r);
+            mb2.read(rankval);
+            ranksum += rankval;
+
+            PrintLog("MPTEST: MPITest: %d send to rank %d", myrank, right);
+            mpicomm->send(mb,right,MPTAG_TEST);
+        }
+    }
+    else if (numranks > 2)
+    {
+        int m = 0;
+        while (m < (numranks - 1))
+        {
+            int rankid = 0;
+            while (rankid < numranks)
+            {
+                left = rankid - 1 - m;
+                if (left < 0)
+                    left = numranks + left;
+                right = rankid + 1 + m;
+                if (right >= numranks)
+                    right = right % numranks;
+
+                if (rankid == myrank)
+                {
+                    if (rankid == 0)
+                    {
+                        PrintLog("MPTEST: MPITest: %d send to rank %d", myrank, right);
+                        mpicomm->send(mb,right,MPTAG_TEST);
+                        mb2.clear();
+                        PrintLog("MPTEST: MPITest: %d recv from rank %d", myrank, left);
+                        mpicomm->recv(mb2,left,MPTAG_TEST,&r);
+                        mb2.read(rankval);
+                        ranksum += rankval;
+                    }
+                    else
+                    {
+                        mb2.clear();
+                        PrintLog("MPTEST: MPITest: %d recv from rank %d", myrank, left);
+                        mpicomm->recv(mb2,left,MPTAG_TEST,&r);
+                        mb2.read(rankval);
+                        ranksum += rankval;
+                        PrintLog("MPTEST: MPITest: %d send to rank %d", myrank, right);
+                        mpicomm->send(mb,right,MPTAG_TEST);
+                    }
+                }
+                rankid++;
+
+            }
+            m++;
+        }
+    }
+
+    PrintLog("MPTEST: MPITest: ranksum = %d", ranksum);
+
+    assertex(rnksumtotal==ranksum);
+
+    mpicomm->barrier();
+
+    return;
+}
+
+void MPITest2(IGroup *group, ICommunicator *mpicomm)
+{
+    int myrank = group->rank();
+    int numranks = group->ordinality();
+
+    PrintLog("MPTEST: MPITest2: myrank=%d numranks=%d", myrank, numranks);
+
+    mpicomm->barrier();
+
+    return;
+}
+
 void testIPnodeHash()
 {
     setNodeCaching(true);
@@ -488,80 +674,115 @@ int main(int argc, char* argv[])
 {
     InitModuleObjects();
     EnableSEHtoExceptionMapping();
-//  startMPServer(9123);
-    testIPnodeHash();
-
 
+//  startMPServer(9123);
+//  testIPnodeHash();
 //  stopMPServer();
-    return 0;
-#if 0
+//  return 0;
 
 #ifndef MYMACHINES
     if (argc<3) {
-        printf("mptest <ip> <ip>\n");
-        printf("mptest <ip:port> <ip:port> <myport>\n");
+        printf("\nMPTEST: Usage: %s <myport> <ip:port> <ip:port> ...\n\n", argv[0]);
         return 0;
     }
 #endif
+
     try {
         EnableSEHtoExceptionMapping();
         StringBuffer lf;
-        openLogFile(lf, "c:\\mptest.log");
-        PrintLog("MPTEST Starting");
+        openLogFile(lf, "mptest.log");
+        // PrintLog("MPTEST Starting");
 
 #ifndef MYMACHINES
-        startMPServer((argc==3)?MPPORT:atoi(argv[3]));
-        INode *nodes[2];
-        nodes[0] = createINode(argv[1],MPPORT);
-        nodes[1] = createINode(argv[2],MPPORT);
+        int num_nodes = 0;
+        int my_port = atoi(argv[1]);
+
+        PrintLog("MPTEST: Starting %d", my_port);
+
+        startMPServer(my_port);
+
+        INode *nodes[1000];
+
+        int i = 1;
+        while (i+1 < argc && i-1 < 1000) {
+            PrintLog("MPTEST: adding node %d, port = <%s>", i-1, argv[i+1]);
+            nodes[i-1] = createINode(argv[i+1], my_port);
+            i++;
+        }
+
+        PrintLog("MPTEST: num_nodes = %d", i-1);
 
-        IGroup *group = createIGroup(2,nodes); 
+        IGroup *group = createIGroup(i-1,nodes);
 #else
         startMPServer(MPPORT);
         IGroup *group = createIGroup(MYMACHINES,MPPORT); 
 #endif
-#if 0 // --------
-
-        ICommunicator * comm = createCommunicator(group);
 
 #ifdef STREAMTEST
-        StreamTest(group,comm);
-#else
-#ifdef MULTITEST
-        MultiTest(comm);
+
+        ICommunicator * mpicomm = createCommunicator(group);
+        StreamTest(group,mpicomm);
+        mpicomm->Release();
+
 #else
-        for (unsigned i = 0;i<100;i++) {
+# ifdef MULTITEST
+
+        ICommunicator * mpicomm = createCommunicator(group);
+        MultiTest(mpicomm);
+        mpicomm->Release();
+
+# else
+#  ifdef MPITEST
+
+        ICommunicator * mpicomm = createCommunicator(group);
+        MPITest(group, mpicomm);
+        mpicomm->Release();
+
+#  else
+#   ifdef MPITEST2
+
+        ICommunicator * mpicomm = createCommunicator(group);
+        MPITest2(group, mpicomm);
+        mpicomm->Release();
+
+#   else
+
+        ICommunicator * comm = createCommunicator(group);
+        for (unsigned i = 0;i<1;i++) {
             Test1(group,comm);
-            PrintLog("test1 done, waiting"); Sleep(aWhile); 
+            PrintLog("MPTEST: test1 done, waiting"); Sleep(aWhile);
             Test2(group,comm);
-            PrintLog("test2 done, waiting"); Sleep(aWhile); 
+            PrintLog("MPTEST: test2 done, waiting"); Sleep(aWhile);
             Test3(group,comm);
-            PrintLog("test3 done, waiting"); Sleep(aWhile); 
+            PrintLog("MPTEST: test3 done, waiting"); Sleep(aWhile);
             Test4(group,comm);
-            PrintLog("test4 done, waiting"); Sleep(aWhile); 
+            PrintLog("MPTEST: test4 done, waiting"); Sleep(aWhile);
             Test5(group,comm);
-            PrintLog("test5 done, waiting"); Sleep(aWhile); 
+            PrintLog("MPTEST: test5 done, waiting"); Sleep(aWhile);
             Test6(group,comm);
-            PrintLog("test6 done, waiting"); Sleep(aWhile); 
+            PrintLog("MPTEST: test6 done, waiting"); Sleep(aWhile);
             Test7(group,comm);
-            PrintLog("test7 done, waiting"); Sleep(aWhile); 
+            PrintLog("MPTEST: test7 done, waiting"); Sleep(aWhile);
         }
-#endif
+        comm->Release();
+
+#   endif
+#  endif
+# endif
 #endif
 
-        comm->Release();
         group->Release();
+
 #ifndef MYMACHINES
-        nodes[0]->Release();
-        nodes[1]->Release();
+        for (int i=0;i<num_nodes;i++)
+            nodes[i]->Release();
 #endif
-#endif // --------------
+
         stopMPServer();
     }
     catch (IException *e) {
         pexception("Exception",e);
     }
-#endif
+
     return 0;
 }
-