Bladeren bron

Merge pull request #11399 from jakesmith/hpcc-19973

HPCC-19973 Add NxN test.

Reviewed-by: Gavin Halliday <ghalliday@hpccsystems.com>
Gavin Halliday 7 jaren geleden
bovenliggende
commit
8f1d0dbb45
2 gewijzigde bestanden met toevoegingen van 463 en 2 verwijderingen
  1. 438 2
      system/mp/test/mptest.cpp
  2. 25 0
      system/mp/test/startn-mptests

+ 438 - 2
system/mp/test/mptest.cpp

@@ -10,6 +10,9 @@
 #include <mpbase.hpp>
 #include <mpcomm.hpp>
 
+#include <algorithm>
+#include <queue>
+
 using namespace std;
 
 #define MPPORT 8888
@@ -701,6 +704,411 @@ void MPAlltoAll(IGroup *group, ICommunicator *mpicomm, size32_t buffsize=0, unsi
     return;
 }
 
+void MPNxN(ICommunicator *comm, unsigned numStreams, size32_t perStreamMBSize, size32_t msgSize, bool async)
+{
+    // defaults
+    if (0 == numStreams)
+        numStreams = 8;
+    if (0 == perStreamMBSize)
+        perStreamMBSize = 100;
+    if (0 == msgSize)
+        msgSize = 1024*1024;
+
+    unsigned grpSize = comm->queryGroup().ordinality();
+    rank_t myRank = comm->queryGroup().rank();
+
+    PROGLOG("MPNxN: myrank=%u, numStreams=%u, perStreamMBSize=%u, msgSize(bytes)=%u", myRank=(unsigned)myRank, numStreams, perStreamMBSize, msgSize);
+
+    class CSendStream : public CInterfaceOf<IInterface>, implements IThreaded
+    {
+        CThreaded threaded;
+        ICommunicator *comm = nullptr;
+        rank_t myRank;
+        unsigned grpSize;
+        mptag_t mpTag, replyTag;
+        unsigned __int64 totalSendSize;
+        size32_t msgSize;
+        StringBuffer resultMsg;
+        bool passed = false;
+        std::vector<rank_t> tgtRanks;
+        StringBuffer tgtRanksStr;
+        bool async = false;
+
+        // used if async
+        class CAckThread : implements IThreaded
+        {
+            CThreaded threaded;
+            CSendStream &owner;
+            mptag_t mpTag;
+            CriticalSection cs;
+            Owned<IException> exception;
+            std::vector<std::queue<unsigned>> expectedHashes;
+
+            unsigned getNextExpectedHash(unsigned sender)
+            {
+                std::queue<unsigned> &queue = expectedHashes[sender];
+
+                CriticalBlock b(cs);
+                assertex(queue.size());
+                unsigned hash = queue.front();
+                expectedHashes[sender].pop();
+                return hash;
+            }
+        public:
+            CAckThread(CSendStream &_owner, mptag_t _mpTag) : owner(_owner), mpTag(_mpTag), threaded("CAckThread", this)
+            {
+                expectedHashes.resize(owner.grpSize);
+            }
+            void addHash(unsigned tgt, unsigned hash)
+            {
+                CriticalBlock b(cs);
+                expectedHashes[tgt].push(hash);
+            }
+            void start() { threaded.start(); }
+            void join()
+            {
+                threaded.join();
+                if (exception)
+                    throw exception.getClear();
+            }
+            // IThredaed
+            virtual void threadmain() override
+            {
+                try
+                {
+                    CMessageBuffer msg;
+                    rank_t sender;
+                    unsigned finalAcks = 0;
+                    while (true)
+                    {
+                        unsigned receivedHash;
+                        if (!owner.receiveAck(msg, sender, receivedHash)) // empty message indicates end by client
+                        {
+                            finalAcks++;
+                            if (finalAcks == owner.grpSize)
+                                break; // all done
+                        }
+                        else
+                        {
+                            unsigned expectedHash = getNextExpectedHash(sender);
+                            owner.verifyAck(expectedHash, receivedHash);
+                        }
+                    }
+                }
+                catch (IException *e)
+                {
+                    exception.setown(e);
+                }
+            }
+        } ackThread;
+
+        unsigned fillData(void *data, size32_t sz, unsigned hash)
+        {
+            byte *pData = (byte *)data;
+            do
+            {
+                if (sz<sizeof(hash))
+                {
+                    memset(pData, 0, sz);
+                    break;
+                }
+                hash = hashc((unsigned char *)&hash, sizeof(hash), hash);
+                memcpy(pData, &hash, sizeof(hash));
+                sz -= sizeof(hash);
+                pData += sizeof(hash);
+            }
+            while (true);
+            return hash;
+        }
+        bool receiveAck(CMessageBuffer &msg, rank_t &sender, unsigned &ack)
+        {
+            while (!comm->recv(msg, RANK_ALL, replyTag, &sender, 60000))
+                WARNLOG("Waiting for receive ack");
+            if (0 == msg.length())
+                return false; // final empty message indicated complete
+            if (std::find(tgtRanks.begin(), tgtRanks.end(), sender) == tgtRanks.end())
+                throwStringExceptionV(0, "Received reply from rank(%u) this stream did not send to", (unsigned)sender);
+            msg.read(ack);
+            msg.clear();
+            return true;
+        }
+        void verifyAck(unsigned receivedHash, unsigned expectedHash)
+        {
+            if (receivedHash != expectedHash)
+                throwStringExceptionV(0, "Checksums mismatch: %u sent vs %u received", expectedHash, receivedHash);
+        }
+
+    public:
+        CSendStream(ICommunicator *_comm, rank_t _myRank, unsigned _grpSize, mptag_t _mpTag, unsigned __int64 _totalSendSize, size32_t _msgSize, bool _async)
+            : threaded("CSendStream", this), comm(_comm), myRank(_myRank), grpSize(_grpSize), mpTag(_mpTag), totalSendSize(_totalSendSize), msgSize(_msgSize), ackThread(*this, _mpTag), async(_async)
+        {
+            if (1 == grpSize) // group only contains self, so target self
+                tgtRanks.push_back(0);
+            else
+            {
+                unsigned pc=25; // %
+
+                // add 'pc'% targets, starting from myRank+1
+                unsigned num = grpSize*pc/100;
+                if (0 == num)
+                    num = 1;
+                unsigned step = grpSize / num;
+                unsigned t = myRank+1;
+                if (t == grpSize)
+                    t = 0;
+                while (true)
+                {
+                    tgtRanks.push_back(t);
+                    --num;
+                    if (0 == num)
+                        break;
+                    t += step;
+                    if (t >= grpSize)
+                        t -= grpSize;
+                }
+            }
+
+            auto iter = tgtRanks.begin();
+            while (true)
+            {
+                tgtRanksStr.append(*iter);
+                iter++;
+                if (iter == tgtRanks.end())
+                    break;
+                tgtRanksStr.append(",");
+            }
+            replyTag = createReplyTag();
+            if (async)
+                ackThread.start();
+            threaded.start();
+        }
+        ~CSendStream()
+        {
+            threaded.join();
+        }
+        bool waitResult(StringBuffer &resultMessage)
+        {
+            threaded.join();
+            return passed;
+        }
+        const char *queryTgtRanks() { return tgtRanksStr; }
+        virtual void threadmain() override
+        {
+            passed = false;
+            try
+            {
+                CMessageBuffer msg, recvMsg;
+                msg.setReplyTag(replyTag);
+                void *data = msg.reserveTruncate(msgSize);
+                unsigned hash = (unsigned)mpTag;
+
+                VStringBuffer logMsg("NxN: mpTag=%u, dstRank(s) [%s]", (unsigned)mpTag, tgtRanksStr.str());
+                PROGLOG("%s", logMsg.str());
+
+                unsigned __int64 remaining = totalSendSize;
+                CCycleTimer timer;
+                while (true)
+                {
+                    size32_t sz;
+                    if (remaining >= msgSize)
+                    {
+                        sz = msgSize;
+                        remaining -= msgSize;
+                    }
+                    else
+                    {
+                        sz = remaining;
+                        msg.setLength(sz);
+                        remaining = 0;
+                    }
+                    hash = fillData(data, sz, hash);
+                    for (rank_t t: tgtRanks)
+                    {
+                        if (async)
+                            ackThread.addHash(t, hash);
+                        if (!comm->send(msg, t, mpTag))
+                            throwUnexpected();
+                    }
+                    if (!async)
+                    {
+                        rank_t sender;
+                        for (int t: tgtRanks)
+                        {
+                            rank_t sender;
+                            unsigned receivedHash;
+                            assertex(receiveAck(recvMsg, sender, receivedHash));
+                            verifyAck(hash, receivedHash);
+                        }
+                    }
+                    if (!remaining)
+                        break;
+                }
+                msg.clear();
+                // send blank msg to all to signal end to receivers.
+                if (!comm->send(msg, RANK_ALL, mpTag))
+                    throwUnexpected();
+                if (async)
+                    ackThread.join();
+                else
+                {
+                    rank_t sender;
+                    for (unsigned r=0; r<grpSize; r++)
+                    {
+                        while (!comm->recv(msg, r, replyTag, &sender, 60000))
+                            WARNLOG("Waiting for final ack");
+                        assertex(sender == r);
+                        assertex(0 == msg.length());
+                    }
+                }
+                float ms = timer.elapsedMs();
+                float mbPerSec = (totalSendSize/ms*1000)/0x100000;
+                PROGLOG("Stream stats: time taken = %.2f seconds, total sent=%u MB, throughput = %.2f MB/s", ms/1000, (unsigned)(totalSendSize/0x100000), mbPerSec);
+            }
+            catch (IException *e)
+            {
+                e->errorMessage(resultMsg);
+                EXCLOG(e, "FAIL");
+                e->Release();
+                return;
+            }
+            passed = true;
+        }
+    };
+    class CRecvServer : public CInterfaceOf<IInterface>, implements IThreaded
+    {
+        CThreaded threaded;
+        ICommunicator *comm;
+        rank_t myRank;
+        unsigned grpSize;
+        mptag_t mpTag;
+        unsigned numStreams;
+
+        unsigned checkData(MemoryBuffer &mb, unsigned hash)
+        {
+            size32_t len = mb.remaining();
+            const byte *p = (const byte *)mb.readDirect(len);
+            while (len >= sizeof(hash))
+            {
+                hash = hashc((unsigned char *)&hash, sizeof(hash), hash);
+                if (0 != memcmp(p, &hash, sizeof(hash)))
+                    return 0;
+                p += sizeof(hash);
+                len -= sizeof(hash);
+            }
+            return hash;
+        }
+    public:
+        CRecvServer(ICommunicator *_comm, rank_t _myRank, unsigned _grpSize, mptag_t _mpTag, unsigned _numStreams)
+            : threaded("CSendStream", this), comm(_comm), myRank(_myRank), grpSize(_grpSize), mpTag(_mpTag), numStreams(_numStreams)
+        {
+            threaded.start();
+        }
+        ~CRecvServer()
+        {
+            threaded.join();
+        }
+        void join()
+        {
+            threaded.join();
+        }
+        void stop()
+        {
+            comm->cancel(RANK_ALL, mpTag);
+            join();
+        }
+        virtual void threadmain() override
+        {
+            unsigned __int64 szRecvd = 0;
+            std::vector<rank_t> endReplyTags;
+            try
+            {
+                unsigned hash = (unsigned)mpTag;
+                unsigned clients = grpSize;
+                endReplyTags.resize(clients);
+                CMessageBuffer msg;
+                do
+                {
+                    rank_t sender;
+                    while (!comm->recv(msg, RANK_ALL, mpTag, &sender, 60000))
+                        PROGLOG("Waiting for data on %u", (unsigned)mpTag);
+                    if (!msg.length())
+                    {
+                        /* each client sends a zero length buffer when done.
+                         * When all received, receiver can stop, and replies to indicate finished.
+                         *
+                         */
+                        endReplyTags[(unsigned)sender] = msg.getReplyTag();
+                        --clients;
+                        if (0 == clients)
+                        {
+                            for (unsigned r=0; r<endReplyTags.size(); r++)
+                            {
+                                if (!comm->send(msg, r, (mptag_t)endReplyTags[r]))
+                                    throwUnexpected();
+                            }
+                            break;
+                        }
+                    }
+                    else
+                    {
+                        szRecvd += msg.length();
+
+                        // read 1st hash, then use to calculate and check incoming data.
+                        msg.read(hash);
+                        hash = checkData(msg, hash);
+
+                        msg.clear();
+                        msg.append(hash); // this should match what client calculated presend.
+                        if (!comm->reply(msg))
+                            throwUnexpected();
+                    }
+                }
+                while (true);
+            }
+            catch (IException *e)
+            {
+                EXCLOG(e, "CRecvServer");
+                e->Release();
+            }
+            PROGLOG("NxN:Receiver[tag=%u] szRecvd=%" I64F "u finished", (unsigned)mpTag, szRecvd);
+        }
+    };
+
+    mptag_t mpTag = (mptag_t)0x20000;
+
+    std::vector<Owned<CRecvServer>> receivers;
+    std::vector<Owned<CSendStream>> senders;
+    for (unsigned s=0; s<numStreams; s++)
+    {
+        receivers.push_back(new CRecvServer(comm, myRank, grpSize, mpTag, numStreams));
+        senders.push_back(new CSendStream(comm, myRank, grpSize, mpTag, ((unsigned __int64)perStreamMBSize)*0x100000, msgSize, async));
+        mpTag = (mptag_t)((unsigned)mpTag+1);
+    }
+    bool allSuccess = true;
+    for (unsigned senderN=0; senderN<senders.size(); senderN++)
+    {
+        const auto &sender = senders[senderN];
+        StringBuffer resultMsg;
+        bool res = sender->waitResult(resultMsg);
+        VStringBuffer logMsg("Stream[%u] from rank %u -> rank(s) [%s] result: ", senderN, (unsigned)myRank, sender->queryTgtRanks());
+        if (res)
+            logMsg.append("PASSED");
+        else
+        {
+            logMsg.append("FAILED - ").append(resultMsg.str());
+            allSuccess = false;
+        }
+        PROGLOG("%s", logMsg.str());
+    }
+    for (const auto &receiver: receivers)
+    {
+        if (allSuccess)
+            receiver->join();
+        else
+            receiver->stop();
+    }
+}
+
 void MPTest2(IGroup *group, ICommunicator *mpicomm)
 {
     rank_t myrank = group->rank();
@@ -746,6 +1154,10 @@ int main(int argc, char* argv[])
     size32_t buffsize = 0;
     unsigned numiters = 0;
     rank_t max_ranks = 0;
+    unsigned numStreams = 0;
+    unsigned perStreamMBSize = 0;
+    bool async = false;
+
     InitModuleObjects();
     EnableSEHtoExceptionMapping();
 
@@ -777,13 +1189,17 @@ int main(int argc, char* argv[])
  *  MultiTest
  *  MPAlltoAll
  *  MPTest2
+ *  MPNxN [-b <msgSiz>] [-s <numStreams>] [-m <perStreamMBSize>] [-a]
  *
  *  Options: (available with -f hostfile arg)
  *  --------
- *  -b buffsize (bytes) for MPAlltoAll test
+ *  -b buffsize (bytes) for MPAlltoAll and MPNxN tests
  *  -i iterations for MPRing and MPAlltoAll tests
  *  -n numprocs for when wanting to test a subset of ranks from hostfile/script
  *  -d for some additional debug output
+ *  -s number of streams for MPNxN test
+ *  -m total MB's to send per stream for MPNxN test
+ *  -a async for NxN test
  */
 
 #ifndef MYMACHINES
@@ -803,7 +1219,7 @@ int main(int argc, char* argv[])
         int my_port = atoi(argv[1]);
         char logfile[256] = { "" };
         sprintf(logfile,"mptest-%d.log",my_port);
-        // openLogFile(lf, logfile);
+        openLogFile(lf, logfile);
 
         IArrayOf<INode> nodes;
 
@@ -857,6 +1273,24 @@ int main(int argc, char* argv[])
                         j++;
                     }
                 }
+                else if (streq(argv[j], "-s"))
+                {
+                    if ((j+1) < argc)
+                    {
+                        numStreams = atoi(argv[j+1]);
+                        j++;
+                    }
+                }
+                else if (streq(argv[j], "-m"))
+                {
+                    if ((j+1) < argc)
+                    {
+                        perStreamMBSize = atoi(argv[j+1]);
+                        j++;
+                    }
+                }
+                else if (streq(argv[j], "-a"))
+                    async = true;
                 j++;
             }
 
@@ -958,6 +1392,8 @@ int main(int argc, char* argv[])
             MPRing(group, mpicomm, numiters);
         else if ( strieq(testname, "MPAlltoAll") || strieq(testname, "AlltoAll") )
             MPAlltoAll(group, mpicomm, buffsize, numiters);
+        else if ( strieq(testname, "MPNxN") || strieq(testname, "NxN") )
+            MPNxN(mpicomm, numStreams, perStreamMBSize, buffsize, async);
         else if ( strieq(testname, "MPTest2") || strieq(testname, "Test2") )
             MPTest2(group, mpicomm);
         else if ((int)strlen(testname) > 0)

+ 25 - 0
system/mp/test/startn-mptests

@@ -0,0 +1,25 @@
+#!/bin/bash
+
+if [ $# -lt 2 ]; then
+    echo "$0: <processes> <testname>"
+    exit 1
+fi
+
+processes=$1
+thetest=$2
+let "startn = 9000"
+let "endn = 9000 + ${processes} - 1"
+
+rm -f hosts
+for port in `seq ${startn} ${endn}`;
+do
+ echo ".:${port}" >> hosts
+done
+
+for port in `seq ${startn} ${endn}`;
+do
+ ./Debug/bin/mptest ${port} -f hosts -t ${thetest} ${@:3} &
+done
+
+wait
+