浏览代码

Merge pull request #11330 from mckellyln/better_mp_tests

HPCC-19950 Added MP tests - Ring and AlltoAll

Reviewed-By: Jake Smith <jake.smith@lexisnexis.com>
Reviewed-By: Richard Chapman <rchapman@hpccsystems.com>
Richard Chapman 7 年之前
父节点
当前提交
5a937bf9d6
共有 1 个文件被更改,包括 347 次插入175 次删除
  1. 347 175
      system/mp/test/mptest.cpp

+ 347 - 175
system/mp/test/mptest.cpp

@@ -14,11 +14,13 @@ using namespace std;
 
 #define MPPORT 8888
 
-#define MULTITEST
+//#define MULTITEST
 //#define STREAMTEST
-//#define MPITEST
-//#define MPITEST2
+//#define MPRING
+//#define MPALLTOALL
+//#define MPTEST2
 //#define GPF
+#define DYNAMIC_TEST
 
 #ifdef MULTITEST
 //#define MYMACHINES "10.150.10.16,10.150.10.17,10.150.10.18,10.150.10.19,10.150.10.20,10.150.10.21,10.150.10.22,10.150.10.23,10.150.10.47,10.150.10.48,10.150.10.49,10.150.10.50,10.150.10.51,10.150.10.52,10.150.10.53,10.150.10.54,10.150.10.55,10.150.10.73,10.150.10.75,10.150.10.79"
@@ -87,7 +89,7 @@ public:
     void print()
     {
         if (count)
-            PrintLog("TIME: %s(%d): max=%.6f, avg=%.6f, tot=%.6f",name,count,max,(double)total/count,total);
+            PrintLog("TIME: %s(%u): max=%.6f, avg=%.6f, tot=%.6f",name,count,max,(double)total/count,total);
     }
 };
 
@@ -143,30 +145,33 @@ void StreamTest(IGroup *group,ICommunicator *comm)
             StringBuffer header;
             header.append("Test Block #").append(i);
             mb.append(header.str()).reserve(BLOCKSIZE-mb.length());
-            PrintLog("Sending '%s' length %d",header.str(),mb.length());
+            PrintLog("MPTEST: StreamTest sending '%s' length %u",header.str(),mb.length());
             {
                 TimedBlock block(STsend);
                 comm->send(mb,0,MPTAG_TEST,MP_ASYNC_SEND);
             }
-            PrintLog("Sent");
+            PrintLog("MPTEST: StreamTest sent");
             //Sleep(WRITEDELAY);
         }
         else if (group->rank() == 0) {
             rank_t r;
             comm->recv(mb,RANK_ALL,MPTAG_TEST,&r);
             StringAttr str;
-            PrintLog("Receiving");
+            PrintLog("MPTEST: StreamTest receiving");
             {
                 TimedBlock block(STrecv);
                 mb.read(str);
             }
-            PrintLog("Received(%d) '%s' length %d",r,str.get(),mb.length());
+            PrintLog("MPTEST: StreamTest received(%u) '%s' length %u",r,str.get(),mb.length());
             //if (i==0)
             //  Sleep(1000*1000); // 15 mins or so
             //Sleep(READDELAY);
         }
         else
-            PrintLog("Skipping extra rank %d", group->rank());
+        {
+            PrintLog("MPTEST: StreamTest skipping extra rank %u", group->rank());
+            break;
+        }
     }
 
     comm->barrier();
@@ -192,7 +197,7 @@ void Test1(IGroup *group,ICommunicator *comm)
         comm->recv(mb,0,MPTAG_TEST,&r);
         StringAttr str;
         mb.read(str);
-        PrintLog("(1) Received '%s' from rank %d",str.get(),r);
+        PrintLog("(1) Received '%s' from rank %u",str.get(),r);
     }
     comm->barrier();
 }
@@ -216,7 +221,7 @@ void Test2(IGroup *group,ICommunicator *comm)
         comm->recv(mb,RANK_ALL,MPTAG_TEST,&r);
         StringAttr str;
         mb.read(str);
-        PrintLog("(2) Received '%s' from rank %d",str.get(),r);
+        PrintLog("(2) Received '%s' from rank %u",str.get(),r);
     }
     comm->barrier();
 }
@@ -235,7 +240,7 @@ void Test3(IGroup *group,ICommunicator *comm)
         comm->recv(mb,RANK_ALL,MPTAG_TEST,&r);
         StringAttr str;
         mb.read(str);
-        PrintLog("(3) Received '%s' from rank %d",str.get(),r);
+        PrintLog("(3) Received '%s' from rank %u",str.get(),r);
     }
     comm->barrier();
 }
@@ -259,7 +264,7 @@ void Test4(IGroup *group,ICommunicator *comm)
         comm->recv(mb,RANK_ALL,MPTAG_TEST,&r);
         StringAttr str;
         mb.read(str);
-        PrintLog("(4) Received '%s' from rank %d",str.get(),r);
+        PrintLog("(4) Received '%s' from rank %u",str.get(),r);
     }
     comm->barrier();
 }
@@ -282,7 +287,7 @@ void Test5(IGroup *group,ICommunicator *comm)
         singlecomm->recv(mb,RANK_ALL,MPTAG_TEST,&r);
         StringAttr str;
         mb.read(str);
-        PrintLog("(5) Received '%s' from rank %d (unknown)",str.get(),r);
+        PrintLog("(5) Received '%s' from rank %u (unknown)",str.get(),r);
     }
     comm->barrier();
     singlecomm->Release();
@@ -335,7 +340,7 @@ void Test7(IGroup *group,ICommunicator *comm)
         comm->recv(mb,(mptag_t) TAG_ALL,MPTAG_TEST,&r);
         StringAttr str;
         mb.read(str);
-        PrintLog("Received '%s' from rank %d",str.get(),r);
+        PrintLog("Received '%s' from rank %u",str.get(),r);
     }
     comm->barrier();
 }
@@ -371,13 +376,13 @@ struct CRandomBuffer
         int errs = 50;
         if (crc!=crc32(buffer,size,0)) {
             PrintLog("**** Error: CRC check failed");
-            PrintLog("size = %d",size);
+            PrintLog("size = %u",size);
             char c = buffer[0];
             for (unsigned i=1;i<size;i++) {
                 c += (c*16);
                 c += 113;
                 if (buffer[i] != c) {
-                    PrintLog("Failed at %d, expected %02x found %02x %02x %02x %02x %02x %02x %02x %02x",i,(int)(byte)c,(int)(byte)buffer[i],(int)(byte)buffer[i+1],(int)(byte)buffer[i+2],(int)(byte)buffer[i+3],(int)(byte)buffer[i+4],(int)(byte)buffer[i+5],(int)(byte)buffer[i+6],(int)(byte)buffer[i+7]);
+                    PrintLog("Failed at %u, expected %02x found %02x %02x %02x %02x %02x %02x %02x %02x",i,(int)(byte)c,(int)(byte)buffer[i],(int)(byte)buffer[i+1],(int)(byte)buffer[i+2],(int)(byte)buffer[i+3],(int)(byte)buffer[i+4],(int)(byte)buffer[i+5],(int)(byte)buffer[i+6],(int)(byte)buffer[i+7]);
                     if (errs--==0)
                         break;
                 }
@@ -424,23 +429,23 @@ void MultiTest(ICommunicator *_comm)
             unsigned n=(comm->queryGroup().ordinality()-1)*N;
             CMessageBuffer mb;
             CRandomBuffer *buff = new CRandomBuffer();
-            PrintLog("MPTEST: started server, myrank = %d", comm->queryGroup().rank());
+            PrintLog("MPTEST: MultiTest server started, myrank = %u", comm->queryGroup().rank());
             try {
                 while(n--) {
                     mb.clear();
                     rank_t rr;
                     if (!comm->recv(mb,RANK_ALL,MPTAG_TEST,&rr)) 
                         break;
-                    PrintLog("MPTEST: Received from %d, len = %d",rr, mb.length());
+                    PrintLog("MPTEST: MultiTest server Received from %u, len = %u",rr, mb.length());
                     StringBuffer str;
                     comm->queryGroup().queryNode(rr).endpoint().getUrlStr(str);
-                    // PrintLog("MPTEST: Received from %s",str.str());
+                    // PrintLog("MPTEST: MultiTest server Received from %s",str.str());
 
                     buff->deserialize(mb);
 
 #ifdef DO_CRC_CHECK
                     if (!buff->check())
-                        PrintLog("MPTEST: Received from %s",str.str());
+                        PrintLog("MPTEST: MultiTest server Received from %s",str.str());
 #endif
 
                     mb.clear().append(buff->crc);
@@ -455,9 +460,9 @@ void MultiTest(ICommunicator *_comm)
                 pexception("Server Exception",e);
             }
 
-            comm->barrier();  // MCK
+            comm->barrier();
 
-            PrintLog("MPTEST: stopped server");
+            PrintLog("MPTEST: MultiTest server stopped");
             delete buff;
             return 0;
         }
@@ -492,7 +497,7 @@ void MultiTest(ICommunicator *_comm)
         targets[k] = t;
     }
 
-    PrintLog("MPTEST: client started, myrank = %d", comm->queryGroup().rank());
+    PrintLog("MPTEST: Multitest client started, myrank = %u", comm->queryGroup().rank());
 
     try {
         while (n--) {
@@ -502,16 +507,16 @@ void MultiTest(ICommunicator *_comm)
 #if 0
             StringBuffer str;
             comm->queryGroup().queryNode(targets[n]).endpoint().getUrlStr(str);
-            PrintLog("MPTEST: Sending to %s, length=%u",str.str(), mb.length());
+            PrintLog("MPTEST: Multitest client Sending to %s, length=%u",str.str(), mb.length());
 #endif
 
-            PrintLog("MPTEST: Sending to %d, length=%u", targets[n], mb.length());
+            PrintLog("MPTEST: Multitest client Sending to %u, length=%u", targets[n], mb.length());
 
             if (!comm->sendRecv(mb,targets[n],MPTAG_TEST)) 
                 break;
 
             // Sleep((n+1)*2000);
-            // PrintLog("MPTEST: Sent to %s",str.str());
+            // PrintLog("MPTEST: Multitest client Sent to %s",str.str());
             unsigned crc;
             mb.read(crc);
             assertex(crc==buff->crc);
@@ -521,7 +526,7 @@ void MultiTest(ICommunicator *_comm)
         pexception("Client Exception",e);
     }
 
-    PrintLog("MPTEST: client finished");
+    PrintLog("MPTEST: MultiTest client finished");
 
     server.join();
 
@@ -529,119 +534,179 @@ void MultiTest(ICommunicator *_comm)
     delete buff;
 }
 
-void MPITest(IGroup *group, ICommunicator *mpicomm)
+void MPRing(IGroup *group, ICommunicator *mpicomm, unsigned iters=0)
 {
-    CMessageBuffer mb;
-    CMessageBuffer mb2;
-    int myrank = group->rank();
-    int numranks = group->ordinality();
+    CMessageBuffer smb;
+    CMessageBuffer rmb;
+    rank_t myrank = group->rank();
+    rank_t numranks = group->ordinality();
+
+    if (numranks < 2)
+        throw MakeStringException(-1, "MPTEST: MPRing Error, numranks (%u) must be > 1", numranks);
 
-    int rnksumtotal = 0;
-    for(int i=0;i<numranks;i++)
-        rnksumtotal += (i+1);
+    if (iters == 0)
+        iters = 1000;
+
+    unsigned pintvl = iters/10;
+    if (pintvl < 1)
+        pintvl = 1;
+
+    PrintLog("MPTEST: MPRing myrank=%u numranks=%u iters=%u", myrank, numranks, iters);
+
+    unsigned next = myrank;
+    unsigned prev = myrank;
+    unsigned k = 0;
+    do
+    {
+        next = (next+1) % numranks;
+        prev = prev > 0 ? prev-1 : numranks-1;
 
-    PrintLog("MPTEST: MPITest myrank=%d numranks=%d rnksumtotal=%d", myrank, numranks, rnksumtotal);
+        // skip self
+        if ( (next == prev) && (next == myrank) )
+            continue;
 
-    // send and recv to/from all others without a send/recv deadlock ...
+        smb.clear();
+        smb.append(k);
+        if ((k%pintvl) == 0)
+            PrintLog("MPTEST: MPRing %u send to rank %u", myrank, next);
+        bool oksend = mpicomm->send(smb, next, MPTAG_TEST);
+        if (!oksend)
+            throw MakeStringException(-1, "MPTEST: MPRing %u send() to rank %u failed", myrank, next);
 
-    mb.clear();
-    mb.append(myrank+1);
+        rmb.clear();
+        if ((k%pintvl) == 0)
+            PrintLog("MPTEST: MPRing %u recv from rank %u", myrank, prev);
+        bool okrecv = mpicomm->recv(rmb, prev, MPTAG_TEST);
+        if (!okrecv)
+            throw MakeStringException(-1, "MPTEST: MPRing %u recv() from rank %u failed", myrank, prev);
+        rmb.read(k);
 
-    rank_t r;
-    int rankval;
-    int ranksum = myrank+1;
+        k++;
 
-    int left, right;
+        if ((k%pintvl) == 0)
+            PrintLog("MPTEST: MPRing %u iteration %u complete", myrank, k);
 
-    if (numranks == 2)
+        if (k == iters)
+            break;
+    }
+    while (true);
+
+    PrintLog("MPTEST: MPRing complete");
+
+    mpicomm->barrier();
+
+    return;
+}
+
+#define MSGLEN 1048576
+
+void MPAlltoAll(IGroup *group, ICommunicator *mpicomm, size32_t buffsize=0, unsigned iters=0)
+{
+    rank_t myrank = group->rank();
+    rank_t numranks = group->ordinality();
+
+    if (numranks < 2)
+        throw MakeStringException(-1, "MPAlltoAll: MPRing Error, numranks (%u) must be > 1", numranks);
+
+    if (buffsize == 0)
+        buffsize = MSGLEN;
+    if (iters == 0)
+        iters = 1000;
+    if (iters < 1)
+        iters = 1;
+
+    PrintLog("MPTEST: MPAlltoAll myrank=%u numranks=%u buffsize=%u iters=%u", myrank, numranks, buffsize, iters);
+
+    // ---------
+
+    class Sender : public Thread
     {
-        if (myrank == 0)
+    public:
+        Linked<ICommunicator> mpicomm;
+        rank_t numranks;
+        rank_t myrank;
+        size32_t buffsize;
+        unsigned iters;
+        Sender(ICommunicator *_mpicomm, rank_t _numranks, rank_t _myrank, size32_t _buffsize, unsigned _iters) : mpicomm(_mpicomm), numranks(_numranks), myrank(_myrank), buffsize(_buffsize), iters(_iters)
         {
-            left = 1;
-            right = 1;
-            PrintLog("MPTEST: MPITest: %d send to rank %d", myrank, right);
-            mpicomm->send(mb,right,MPTAG_TEST);
-
-            mb2.clear();
-            PrintLog("MPTEST: MPITest: %d recv from rank %d", myrank, left);
-            mpicomm->recv(mb2,left,MPTAG_TEST,&r);
-            mb2.read(rankval);
-            ranksum += rankval;
         }
-        else
+
+        int run()
         {
-            left = 0;
-            right = 0;
-            mb2.clear();
-            PrintLog("MPTEST: MPITest: %d recv from rank %d", myrank, left);
-            mpicomm->recv(mb2,left,MPTAG_TEST,&r);
-            mb2.read(rankval);
-            ranksum += rankval;
-
-            PrintLog("MPTEST: MPITest: %d send to rank %d", myrank, right);
-            mpicomm->send(mb,right,MPTAG_TEST);
+            PrintLog("MPTEST: MPAlltoAll sender started, myrank = %u", myrank);
+
+            int pintvl = iters/10;
+            if (pintvl < 1)
+                pintvl = 1;
+
+            CMessageBuffer smb;
+            smb.appendBytes('a', buffsize);
+
+            for (unsigned k=1;k<=iters;k++)
+            {
+                bool oksend = mpicomm->send(smb, RANK_ALL_OTHER, MPTAG_TEST);
+                if (!oksend)
+                    throw MakeStringException(-1, "MPTEST: MPAlltoAll %u send() failed", myrank);
+                if ((k%pintvl) == 0)
+                    PrintLog("MPTEST: MPAlltoAll sender %u iteration %u complete", myrank, k);
+            }
+
+            mpicomm->barrier();
+            PrintLog("MPTEST: MPAlltoAll sender stopped");
+            return 0;
         }
-    }
-    else if (numranks > 2)
+    } sender(mpicomm, numranks, myrank, buffsize, iters);
+
+    unsigned startTime = msTick();
+
+    sender.start();
+
+    // ---------
+
+    PrintLog("MPTEST: MPAlltoAll receiver started, myrank = %u", myrank);
+
+    int pintvl = iters/10;
+    if (pintvl < 1)
+        pintvl = 1;
+
+    CMessageBuffer rmb(buffsize);
+
+    for (unsigned k=1;k<=iters;k++)
     {
-        int m = 0;
-        while (m < (numranks - 1))
+        for (rank_t i=1;i<numranks;i++)
         {
-            int rankid = 0;
-            while (rankid < numranks)
-            {
-                left = rankid - 1 - m;
-                if (left < 0)
-                    left = numranks + left;
-                right = rankid + 1 + m;
-                if (right >= numranks)
-                    right = right % numranks;
-
-                if (rankid == myrank)
-                {
-                    if (rankid == 0)
-                    {
-                        PrintLog("MPTEST: MPITest: %d send to rank %d", myrank, right);
-                        mpicomm->send(mb,right,MPTAG_TEST);
-                        mb2.clear();
-                        PrintLog("MPTEST: MPITest: %d recv from rank %d", myrank, left);
-                        mpicomm->recv(mb2,left,MPTAG_TEST,&r);
-                        mb2.read(rankval);
-                        ranksum += rankval;
-                    }
-                    else
-                    {
-                        mb2.clear();
-                        PrintLog("MPTEST: MPITest: %d recv from rank %d", myrank, left);
-                        mpicomm->recv(mb2,left,MPTAG_TEST,&r);
-                        mb2.read(rankval);
-                        ranksum += rankval;
-                        PrintLog("MPTEST: MPITest: %d send to rank %d", myrank, right);
-                        mpicomm->send(mb,right,MPTAG_TEST);
-                    }
-                }
-                rankid++;
-
-            }
-            m++;
+            // rmb.clear();
+            bool okrecv = mpicomm->recv(rmb, RANK_ALL, MPTAG_TEST);
+            if (!okrecv)
+                throw MakeStringException(-1, "MPTEST: MPAlltoAll %u recv() failed", myrank);
+            if (i==1 && (k%pintvl) == 0)
+                PrintLog("MPTEST: MPAlltoAll receiver rank %u iteration %u complete", myrank, k);
         }
     }
 
-    PrintLog("MPTEST: MPITest: ranksum = %d", ranksum);
+    mpicomm->barrier();
 
-    assertex(rnksumtotal==ranksum);
+    PrintLog("MPTEST: MPAlltoAll receiver finished");
 
-    mpicomm->barrier();
+    // ---------
+
+    sender.join();
+
+    unsigned endTime = msTick();
+
+    double msgRateMB = (2.0*(double)buffsize*(double)iters*(double)(numranks-1)) / ((endTime-startTime)*1000.0);
+
+    PrintLog("MPTEST: MPAlltoAll complete %g MB/s", msgRateMB);
 
     return;
 }
 
-void MPITest2(IGroup *group, ICommunicator *mpicomm)
+void MPTest2(IGroup *group, ICommunicator *mpicomm)
 {
-    int myrank = group->rank();
-    int numranks = group->ordinality();
+    rank_t myrank = group->rank();
+    rank_t numranks = group->ordinality();
 
-    PrintLog("MPTEST: MPITest2: myrank=%d numranks=%d", myrank, numranks);
+    PrintLog("MPTEST: MPTest2: myrank=%u numranks=%u", myrank, numranks);
 
     mpicomm->barrier();
 
@@ -676,6 +741,11 @@ void testIPnodeHash()
 
 int main(int argc, char* argv[])
 {
+    int mpi_debug = 0;
+    char testname[256] = { "" };
+    size32_t buffsize = 0;
+    unsigned numiters = 0;
+    rank_t max_ranks = 0;
     InitModuleObjects();
     EnableSEHtoExceptionMapping();
 
@@ -684,9 +754,41 @@ int main(int argc, char* argv[])
 //  stopMPServer();
 //  return 0;
 
+/*  mp hostfile format:
+ *  -------------------
+ *  <IP0>:port0
+ *  <IP1>:port1
+ *  <IP2>:port2
+ *  ...
+ *
+ *  run script:
+ *  -----------
+ *  # NOTE: because mptest will stop if its cmdline port and native IP do not match
+ *  # corresponding entry in hostfile - the same cmdline can be repeated on all hosts ...
+ *  mptest port0 -f hostfile [-t testname] [-b buffsize] [-i iters] [-n numprocs] [-d] &
+ *  mptest port1 -f hostfile ... &
+ *  ...
+ *  [wait]
+ *
+ *  Test names (-t):
+ *  -----------
+ *  MPRing (default)
+ *  StreamTest
+ *  MultiTest
+ *  MPAlltoAll
+ *  MPTest2
+ *
+ *  Options: (available with -f hostfile arg)
+ *  --------
+ *  -b buffsize (bytes) for MPAlltoAll test
+ *  -i iterations for MPRing and MPAlltoAll tests
+ *  -n numprocs for when wanting to test a subset of ranks from hostfile/script
+ *  -d for some additional debug output
+ */
+
 #ifndef MYMACHINES
     if (argc<3) {
-        printf("\nMPTEST: Usage: %s <myport> [-f <file> | <ip:port> <ip:port> ...]\n\n", argv[0]);
+        printf("\nMPTEST: Usage: %s <myport> [-f <hostfile> [-t <testname> -b <buffsize> -i <iters> -n <numprocs> -d] | <ip:port> <ip:port>]\n\n", argv[0]);
         return 0;
     }
 #endif
@@ -697,43 +799,84 @@ int main(int argc, char* argv[])
         // PrintLog("MPTEST Starting");
 
 #ifndef MYMACHINES
-        int num_nodes = 0;
+        rank_t tot_ranks = 0;
         int my_port = atoi(argv[1]);
         char logfile[256] = { "" };
         sprintf(logfile,"mptest-%d.log",my_port);
         // openLogFile(lf, logfile);
 
-        PrintLog("MPTEST: Starting %d", my_port);
+        IArrayOf<INode> nodes;
 
-        startMPServer(my_port);
-
-        INode *nodes[1000];
-
-        const char * argfile = nullptr;
+        const char * hostfile = nullptr;
         if (argc > 3)
         {
             if (strcmp(argv[2], "-f") == 0)
-                argfile = argv[3];
+                hostfile = argv[3];
         }
 
-        int i = 1;
-        if (argfile)
+        unsigned i = 1;
+        if (hostfile)
         {
+
+            int j = 4;
+            while (j < argc)
+            {
+                if (streq(argv[j], "-t"))
+                {
+                    if ((j+1) < argc)
+                    {
+                        strcpy(testname, argv[j+1]);
+                        j++;
+                    }
+                }
+                else if (streq(argv[j], "-d"))
+                {
+                    mpi_debug++;
+                }
+                else if (streq(argv[j], "-b"))
+                {
+                    if ((j+1) < argc)
+                    {
+                        buffsize = atoi(argv[j+1]);
+                        j++;
+                    }
+                }
+                else if (streq(argv[j], "-i"))
+                {
+                    if ((j+1) < argc)
+                    {
+                        numiters = atoi(argv[j+1]);
+                        j++;
+                    }
+                }
+                else if ( streq(argv[j], "-n") || streq(argv[j], "-np") )
+                {
+                    if ((j+1) < argc)
+                    {
+                        max_ranks = atoi(argv[j+1]);
+                        j++;
+                    }
+                }
+                j++;
+            }
+
             char hoststr[256] = { "" };
-            FILE *fp = fopen(argfile, "r");
+            FILE *fp = fopen(hostfile, "r");
             if (fp == NULL)
             {
-                PrintLog("MPTest: Error cannot open file <%s>", argfile);
+                PrintLog("MPTest: Error, cannot open hostfile <%s>", hostfile);
                 return 1;
             }
             char line[256] = { "" };
             while(fgets(line, 255, fp) != NULL)
             {
+                if ( (max_ranks > 0) && ((i-1) >= max_ranks) )
+                    break;
                 int srtn = sscanf(line,"%s",hoststr);
                 if (srtn == 1 && line[0] != '#')
                 {
-                    PrintLog("MPTEST: adding node %d, port = <%s>", i-1, hoststr);
-                    nodes[i-1] = createINode(hoststr, my_port);
+                    INode *newNode = createINode(hoststr, my_port);
+                    nodes.append(*newNode);
                     i++;
                 }
             }
@@ -741,82 +884,111 @@ int main(int argc, char* argv[])
         }
         else
         {
-            while (i+1 < argc && i-1 < 1000) {
-                PrintLog("MPTEST: adding node %d, port = <%s>", i-1, argv[i+1]);
-                nodes[i-1] = createINode(argv[i+1], my_port);
+            while (i+1 < argc)
+            {
+                PrintLog("MPTEST: adding node %u, port = <%s>", i-1, argv[i+1]);
+                INode *newNode = createINode(argv[i+1], my_port);
+                nodes.append(*newNode);
                 i++;
             }
         }
 
-        PrintLog("MPTEST: num_nodes = %d", i-1);
+        tot_ranks = i-1;
+
+        Owned<IGroup> group = createIGroup(tot_ranks, nodes.getArray());
+
+        // stop if not meant for this host ...
+
+        IpAddress myIp;
+        GetHostIp(myIp);
+        SocketEndpoint myEp(my_port, myIp);
 
-        IGroup *group = createIGroup(i-1,nodes);
+        bool die = true;
+        for (rank_t k=0;k<tot_ranks;k++)
+        {
+            if (nodes.item(k).endpoint().equals(myEp))
+                die = false;
+        }
+
+        if (die)
+            return 0;
+
+        PrintLog("MPTEST: Starting, port = %d tot ranks = %u", my_port, tot_ranks);
+
+        startMPServer(my_port);
+
+        if (mpi_debug)
+        {
+            for (rank_t k=0;k<tot_ranks;k++)
+            {
+                StringBuffer urlStr;
+                nodes.item(k).endpoint().getUrlStr(urlStr);
+                PrintLog("MPTEST: adding node %u, %s", k, urlStr.str());
+            }
+        }
 #else
         openLogFile(lf, "mptest.log");
         startMPServer(MPPORT);
-        IGroup *group = createIGroup(MYMACHINES,MPPORT); 
+        Owned<IGroup> group = createIGroup(MYMACHINES,MPPORT);
 #endif
 
-#ifdef STREAMTEST
+        Owned<ICommunicator> mpicomm = createCommunicator(group);
 
-        ICommunicator * mpicomm = createCommunicator(group);
+#ifdef STREAMTEST
         StreamTest(group,mpicomm);
-        mpicomm->Release();
-
 #else
 # ifdef MULTITEST
-
-        ICommunicator * mpicomm = createCommunicator(group);
         MultiTest(mpicomm);
-        mpicomm->Release();
-
 # else
-#  ifdef MPITEST
-
-        ICommunicator * mpicomm = createCommunicator(group);
-        MPITest(group, mpicomm);
-        mpicomm->Release();
-
+#  ifdef MPRING
+        MPRing(group, mpicomm, numiters);
 #  else
-#   ifdef MPITEST2
-
-        ICommunicator * mpicomm = createCommunicator(group);
-        MPITest2(group, mpicomm);
-        mpicomm->Release();
-
+#   ifdef MPALLTOALL
+        MPAlltoAll(group, mpicomm, buffsize, numiters);
 #   else
-
-        ICommunicator * comm = createCommunicator(group);
-        for (unsigned i = 0;i<1;i++) {
-            Test1(group,comm);
+#    ifdef MPTEST2
+        MPTest2(group, mpicomm);
+#    else
+#     ifdef DYNAMIC_TEST
+        if (strnicmp(testname, "Stream", 6)==0)
+            StreamTest(group, mpicomm);
+        else if (strnicmp(testname, "Multi", 5)==0)
+            MultiTest(mpicomm);
+        else if ( strieq(testname, "MPRing") || strieq(testname, "Ring") )
+            MPRing(group, mpicomm, numiters);
+        else if ( strieq(testname, "MPAlltoAll") || strieq(testname, "AlltoAll") )
+            MPAlltoAll(group, mpicomm, buffsize, numiters);
+        else if ( strieq(testname, "MPTest2") || strieq(testname, "Test2") )
+            MPTest2(group, mpicomm);
+        else if ((int)strlen(testname) > 0)
+            PrintLog("MPTEST: Error, invalid testname specified (-t %s)", testname);
+        else  // default is MPRing ...
+            MPRing(group, mpicomm, numiters);
+#     else
+        for (unsigned i = 0;i<1;i++)
+        {
+            Test1(group,mpicomm);
             PrintLog("MPTEST: test1 done, waiting"); Sleep(aWhile);
-            Test2(group,comm);
+            Test2(group,mpicomm);
             PrintLog("MPTEST: test2 done, waiting"); Sleep(aWhile);
-            Test3(group,comm);
+            Test3(group,mpicomm);
             PrintLog("MPTEST: test3 done, waiting"); Sleep(aWhile);
-            Test4(group,comm);
+            Test4(group,mpicomm);
             PrintLog("MPTEST: test4 done, waiting"); Sleep(aWhile);
-            Test5(group,comm);
+            Test5(group,mpicomm);
             PrintLog("MPTEST: test5 done, waiting"); Sleep(aWhile);
-            Test6(group,comm);
+            Test6(group,mpicomm);
             PrintLog("MPTEST: test6 done, waiting"); Sleep(aWhile);
-            Test7(group,comm);
+            Test7(group,mpicomm);
             PrintLog("MPTEST: test7 done, waiting"); Sleep(aWhile);
         }
-        comm->Release();
-
+#     endif
+#    endif
 #   endif
 #  endif
 # endif
 #endif
 
-        group->Release();
-
-#ifndef MYMACHINES
-        for (int i=0;i<num_nodes;i++)
-            nodes[i]->Release();
-#endif
-
         stopMPServer();
     }
     catch (IException *e) {