mptest.cpp 49 KB


  1. // CSocketSelectThread error 10038
  2. #include <platform.h>
  3. #include <jlib.hpp>
  4. #include <jthread.hpp>
  5. #include <jmisc.hpp>
  6. #include <jcrc.hpp>
  7. #include <mpbase.hpp>
  8. #include <mpcomm.hpp>
  9. #include <algorithm>
  10. #include <queue>
  11. #include <string>
  12. using namespace std;
  13. #define MPPORT 8888
  14. //#define GPF
  15. #define TEST_AlltoAll "AlltoAll"
  16. #define TEST_STREAM "Stream"
  17. #define TEST_MULTI "Multi"
  18. #define TEST_RING "Ring"
  19. #define TEST_RANK "PrintRank"
  20. #define TEST_SELFSEND "SelfSend"
  21. #define TEST_SINGLE_SEND "SingleSend"
  22. #define TEST_RIGHT_SHIFT "RightShift"
  23. #define TEST_RECV_FROM_ANY "RecvFromAny"
  24. #define TEST_SEND_TO_ALL "SendToAll"
  25. #define TEST_MULTI_MT "MTMultiSendRecv"
  26. #define TEST_NXN "NxN"
  27. // #define aWhile 100000
  28. #define aWhile 10
  29. class CSectionTimer
  30. {
  31. HiresTimer hrt[1000];
  32. unsigned tids[1000];
  33. const char *name;
  34. static CriticalSection findsect;
  35. double total;
  36. double max;
  37. unsigned count;
  38. unsigned idx()
  39. {
  40. CriticalBlock block(findsect);
  41. unsigned tid = (unsigned)(memsize_t)GetCurrentThreadId();
  42. unsigned i;
  43. for (i=0;i<999;i++)
  44. {
  45. if (tids[i]==tid)
  46. break;
  47. if (tids[i]==0)
  48. {
  49. tids[i] = tid;
  50. break;
  51. }
  52. }
  53. return i;
  54. }
  55. public:
  56. CSectionTimer(const char *_name)
  57. {
  58. name = (const char *)strdup(_name);
  59. total = 0;
  60. max = 0;
  61. memset(tids,0,sizeof(tids));
  62. count = 0;
  63. }
  64. ~CSectionTimer()
  65. {
  66. free((void *)name);
  67. }
  68. void begin()
  69. {
  70. hrt[idx()].reset();
  71. }
  72. void end()
  73. {
  74. double v = hrt[idx()].get();
  75. total += v;
  76. if (max<v)
  77. max = v;
  78. count++;
  79. }
  80. void print()
  81. {
  82. if (count)
  83. PROGLOG("TIME: %s(%u): max=%.6f, avg=%.6f, tot=%.6f",name,count,max,(double)total/count,total);
  84. }
  85. };
  86. CriticalSection CSectionTimer::findsect;
  87. class TimedBlock
  88. {
  89. CSectionTimer &stim;
  90. public:
  91. TimedBlock(CSectionTimer &_stim) : stim(_stim) { stim.begin(); }
  92. ~TimedBlock() { stim.end(); }
  93. };
  94. class TimedCriticalBlock
  95. {
  96. CriticalSection &crit;
  97. public:
  98. TimedCriticalBlock(CriticalSection &c,CSectionTimer &stim)
  99. : crit(c)
  100. {
  101. TimedBlock block(stim); crit.enter();
  102. }
  103. ~TimedCriticalBlock() { crit.leave(); }
  104. };
  105. static CSectionTimer STsend("send");
  106. static CSectionTimer STrecv("recv");
  107. //#define NITER 100
  108. #define NITER 40
  109. #define BLOCKSIZE (0x100000*10)
  110. //#define BLOCKSIZE (0x1000*10)
  111. #define WRITEDELAY 100
  112. #define READDELAY 5000
  113. void StreamTest(IGroup *group,ICommunicator *comm)
  114. {
  115. CMessageBuffer mb;
  116. for (unsigned i=0;i<NITER;i++)
  117. {
  118. if (group->rank() == 1)
  119. {
  120. mb.clear();
  121. StringBuffer header;
  122. header.append("Test Block #").append(i);
  123. mb.append(header.str()).reserve(BLOCKSIZE-mb.length());
  124. PROGLOG("MPTEST: StreamTest sending '%s' length %u",header.str(),mb.length());
  125. {
  126. TimedBlock block(STsend);
  127. comm->send(mb,0,MPTAG_TEST,MP_ASYNC_SEND);
  128. }
  129. PROGLOG("MPTEST: StreamTest sent");
  130. //Sleep(WRITEDELAY);
  131. }
  132. else if (group->rank() == 0)
  133. {
  134. rank_t r;
  135. comm->recv(mb,RANK_ALL,MPTAG_TEST,&r);
  136. StringAttr str;
  137. PROGLOG("MPTEST: StreamTest receiving");
  138. {
  139. TimedBlock block(STrecv);
  140. mb.read(str);
  141. }
  142. PROGLOG("MPTEST: StreamTest received(%u) '%s' length %u",r,str.get(),mb.length());
  143. }
  144. else
  145. {
  146. PROGLOG("MPTEST: StreamTest skipping extra rank %u", group->rank());
  147. break;
  148. }
  149. }
  150. comm->barrier();
  151. STsend.print();
  152. STrecv.print();
  153. }
  154. void Test1(IGroup *group,ICommunicator *comm)
  155. {
  156. PROGLOG("test1");
  157. CMessageBuffer mb;
  158. if (group->rank()==0)
  159. {
  160. mb.append("Hello - Test1");
  161. comm->send(mb,1,MPTAG_TEST);
  162. }
  163. else if (group->rank()==1)
  164. {
  165. rank_t r;
  166. comm->recv(mb,0,MPTAG_TEST,&r);
  167. StringAttr str;
  168. mb.read(str);
  169. PROGLOG("(1) Received '%s' from rank %u",str.get(),r);
  170. }
  171. comm->barrier();
  172. }
  173. void Test2(IGroup *group,ICommunicator *comm)
  174. {
  175. PROGLOG("test2");
  176. CMessageBuffer mb;
  177. if (group->rank()==0)
  178. {
  179. mb.append("Hello - Test2");
  180. comm->send(mb,RANK_ALL,MPTAG_TEST);
  181. }
  182. else if (group->rank()==1)
  183. {
  184. #ifdef GPF
  185. PROGLOG("GPFING");
  186. Sleep(aWhile);
  187. byte *p = NULL; *p = 1;
  188. #endif
  189. rank_t r;
  190. comm->recv(mb,RANK_ALL,MPTAG_TEST,&r);
  191. StringAttr str;
  192. mb.read(str);
  193. PROGLOG("(2) Received '%s' from rank %u",str.get(),r);
  194. }
  195. comm->barrier();
  196. }
  197. void Test3(IGroup *group,ICommunicator *comm)
  198. {
  199. PROGLOG("test3");
  200. CMessageBuffer mb;
  201. if (group->rank()==0)
  202. {
  203. mb.append("Hello - Test3");
  204. comm->send(mb,1,MPTAG_TEST);
  205. }
  206. else if (group->rank()==1)
  207. {
  208. rank_t r;
  209. comm->recv(mb,RANK_ALL,MPTAG_TEST,&r);
  210. StringAttr str;
  211. mb.read(str);
  212. PROGLOG("(3) Received '%s' from rank %u",str.get(),r);
  213. }
  214. comm->barrier();
  215. }
  216. void Test4(IGroup *group,ICommunicator *comm)
  217. {
  218. PROGLOG("test4");
  219. CMessageBuffer mb;
  220. if (group->rank()==0)
  221. {
  222. INode *singlenode=&group->queryNode(1);
  223. IGroup *singlegroup = createIGroup(1,&singlenode);
  224. ICommunicator * singlecomm = createCommunicator(singlegroup);
  225. mb.append("Hello - Test4");
  226. singlecomm->send(mb,0,MPTAG_TEST);
  227. singlecomm->Release();
  228. singlegroup->Release();
  229. }
  230. else if (group->rank()==1)
  231. {
  232. rank_t r;
  233. comm->recv(mb,RANK_ALL,MPTAG_TEST,&r);
  234. StringAttr str;
  235. mb.read(str);
  236. PROGLOG("(4) Received '%s' from rank %u",str.get(),r);
  237. }
  238. comm->barrier();
  239. }
  240. void Test5(IGroup *group,ICommunicator *comm)
  241. {
  242. PROGLOG("test5");
  243. rank_t rank = group->rank();
  244. INode *singlenode=&group->queryNode(1);
  245. IGroup *singlegroup = createIGroup(1,&singlenode);
  246. ICommunicator * singlecomm = createCommunicator(singlegroup);
  247. CMessageBuffer mb;
  248. if (rank==0) {
  249. mb.append("Hello - Test5");
  250. singlecomm->send(mb,0,MPTAG_TEST);
  251. }
  252. else if (rank==1)
  253. {
  254. rank_t r;
  255. singlecomm->recv(mb,RANK_ALL,MPTAG_TEST,&r);
  256. StringAttr str;
  257. mb.read(str);
  258. PROGLOG("(5) Received '%s' from rank %u (unknown)",str.get(),r);
  259. }
  260. comm->barrier();
  261. singlecomm->Release();
  262. singlegroup->Release();
  263. }
  264. void Test6(IGroup *group,ICommunicator *comm)
  265. {
  266. PROGLOG("test6");
  267. //DebugBreak();
  268. CMessageBuffer mb;
  269. StringAttr str;
  270. if (group->rank()==1)
  271. {
  272. mb.append("Test");
  273. bool cancelled = comm->sendRecv(mb,0,MPTAG_TEST);
  274. StringAttr str;
  275. mb.read(str);
  276. StringBuffer url;
  277. PROGLOG("(6) Received '%s' from %s",str.get(),mb.getSender().getUrlStr(url).str());
  278. }
  279. else if (group->rank()==0)
  280. {
  281. rank_t r;
  282. comm->recv(mb,RANK_ALL,MPTAG_TEST,&r);
  283. mb.read(str);
  284. PROGLOG("(6) - str = <%s>", str.get());
  285. assertex(strcmp(str.get(),"Test")==0);
  286. mb.clear();
  287. mb.append("Hello - Test6");
  288. printf("crash now!");
  289. Sleep(1);
  290. comm->reply(mb);
  291. }
  292. comm->barrier();
  293. }
  294. void Test7(IGroup *group,ICommunicator *comm)
  295. {
  296. PROGLOG("test7");
  297. CMessageBuffer mb;
  298. if (group->rank()==0)
  299. {
  300. mb.append("Hello - Test7");
  301. mb.reserve(150*1024);
  302. comm->send(mb,1,MPTAG_TEST);
  303. }
  304. else if (group->rank()==1)
  305. {
  306. rank_t r;
  307. comm->recv(mb,(mptag_t) TAG_ALL,MPTAG_TEST,&r);
  308. StringAttr str;
  309. mb.read(str);
  310. PROGLOG("Received '%s' from rank %u",str.get(),r);
  311. }
  312. comm->barrier();
  313. }
  314. // #define MAXBUFFERSIZE 0x100000
  315. #define MAXBUFFERSIZE 0x10000
  316. struct CRandomBuffer
  317. {
  318. size32_t size;
  319. char buffer[MAXBUFFERSIZE];
  320. unsigned crc;
  321. void fill()
  322. {
  323. size = getRandom()%MAXBUFFERSIZE;
  324. // size = 100000;
  325. if (size)
  326. {
  327. char c = (char)getRandom();
  328. #if 0
  329. for (unsigned i=0;i<size;i++)
  330. {
  331. buffer[i] = c;
  332. c += (c*16);
  333. c += 113;
  334. }
  335. #endif
  336. for (unsigned i=0;i<size;i++)
  337. {
  338. buffer[i] = 'a' + i%26;
  339. }
  340. }
  341. crc = crc32(&buffer[0],size,0);
  342. }
  343. bool check()
  344. {
  345. int errs = 50;
  346. if (crc!=crc32(buffer,size,0))
  347. {
  348. PROGLOG("**** Error: CRC check failed");
  349. PROGLOG("size = %u",size);
  350. char c = buffer[0];
  351. for (unsigned i=1;i<size;i++)
  352. {
  353. c += (c*16);
  354. c += 113;
  355. if (buffer[i] != c)
  356. {
  357. PROGLOG("Failed at %u, expected %02x found %02x %02x %02x %02x %02x %02x %02x %02x",i,(int)(byte)c,(int)(byte)buffer[i],(int)(byte)buffer[i+1],(int)(byte)buffer[i+2],(int)(byte)buffer[i+3],(int)(byte)buffer[i+4],(int)(byte)buffer[i+5],(int)(byte)buffer[i+6],(int)(byte)buffer[i+7]);
  358. if (errs--==0)
  359. break;
  360. }
  361. }
  362. return false;
  363. }
  364. return true;
  365. }
  366. void serialize(MemoryBuffer &mb)
  367. {
  368. // PROGLOG("1serialize: size = %u, length = %u", size, mb.length());
  369. mb.append(size).append(size,buffer).append(crc);
  370. // PROGLOG("2serialize: size = %u, length = %u", size, mb.length());
  371. }
  372. void deserialize(MemoryBuffer &mb)
  373. {
  374. // PROGLOG("1de-serialize: size = %u, length = %u", size, mb.length());
  375. mb.read(size);
  376. // PROGLOG("2de-serialize: size = %u, length = %u", size, mb.length());
  377. mb.read(size,buffer).read(crc);
  378. }
  379. };
  380. void printtrc(char c)
  381. {
  382. static CriticalSection crit;
  383. CriticalBlock block(crit);
  384. printf("%c",c);
  385. }
  386. // #define N 100
  387. #define N 20
  388. void MultiTest(ICommunicator *_comm)
  389. {
  390. class Server: public Thread
  391. {
  392. public:
  393. Owned<ICommunicator> comm;
  394. Server(ICommunicator *_comm) { comm.set(_comm); }
  395. int run()
  396. {
  397. unsigned n=(comm->queryGroup().ordinality()-1)*N;
  398. CMessageBuffer mb;
  399. CRandomBuffer *buff = new CRandomBuffer();
  400. PROGLOG("MPTEST: MultiTest server started, myrank = %u", comm->queryGroup().rank());
  401. try
  402. {
  403. while(n--)
  404. {
  405. mb.clear();
  406. rank_t rr;
  407. if (!comm->recv(mb,RANK_ALL,MPTAG_TEST,&rr))
  408. break;
  409. PROGLOG("MPTEST: MultiTest server Received from %u, len = %u",rr, mb.length());
  410. StringBuffer str;
  411. comm->queryGroup().queryNode(rr).endpoint().getUrlStr(str);
  412. // PROGLOG("MPTEST: MultiTest server Received from %s",str.str());
  413. buff->deserialize(mb);
  414. #ifdef DO_CRC_CHECK
  415. if (!buff->check())
  416. PROGLOG("MPTEST: MultiTest server Received from %s",str.str());
  417. #endif
  418. mb.clear().append(buff->crc);
  419. int delay = getRandom() % 20;
  420. Sleep(delay);
  421. comm->reply(mb);
  422. }
  423. }
  424. catch (IException *e)
  425. {
  426. pexception("Server Exception",e);
  427. e->Release();
  428. }
  429. comm->barrier();
  430. PROGLOG("MPTEST: MultiTest server stopped");
  431. delete buff;
  432. return 0;
  433. }
  434. } server(_comm);
  435. Owned<ICommunicator> comm;
  436. comm.set(_comm);
  437. server.start();
  438. CMessageBuffer mb;
  439. CRandomBuffer *buff = new CRandomBuffer();
  440. unsigned nr = comm->queryGroup().ordinality();
  441. unsigned n=(nr-1)*N;
  442. rank_t r = comm->queryGroup().rank();
  443. rank_t *targets = new rank_t[n];
  444. rank_t *t = targets;
  445. rank_t i;
  446. for (i=0;i<nr;i++)
  447. if (i!=r)
  448. for (unsigned j=0;j<N;j++)
  449. *(t++) = i;
  450. unsigned k=n;
  451. while (k>1)
  452. {
  453. i = getRandom()%k; // NB n is correct here
  454. k--;
  455. unsigned t = targets[i];
  456. targets[i] = targets[k];
  457. targets[k] = t;
  458. }
  459. PROGLOG("MPTEST: Multitest client started, myrank = %u", comm->queryGroup().rank());
  460. try {
  461. while (n--)
  462. {
  463. buff->fill();
  464. buff->serialize(mb.clear());
  465. #if 0
  466. StringBuffer str;
  467. comm->queryGroup().queryNode(targets[n]).endpoint().getUrlStr(str);
  468. PROGLOG("MPTEST: Multitest client Sending to %s, length=%u",str.str(), mb.length());
  469. #endif
  470. PROGLOG("MPTEST: Multitest client Sending to %u, length=%u", targets[n], mb.length());
  471. if (!comm->sendRecv(mb,targets[n],MPTAG_TEST))
  472. break;
  473. // Sleep((n+1)*2000);
  474. // PROGLOG("MPTEST: Multitest client Sent to %s",str.str());
  475. unsigned crc;
  476. mb.read(crc);
  477. assertex(crc==buff->crc);
  478. }
  479. }
  480. catch (IException *e)
  481. {
  482. pexception("Client Exception",e);
  483. e->Release();
  484. }
  485. PROGLOG("MPTEST: MultiTest client finished");
  486. server.join();
  487. delete [] targets;
  488. delete buff;
  489. }
  490. void MPRing(IGroup *group, ICommunicator *mpicomm, unsigned iters=0)
  491. {
  492. CMessageBuffer smb;
  493. CMessageBuffer rmb;
  494. rank_t myrank = group->rank();
  495. rank_t numranks = group->ordinality();
  496. if (numranks < 2)
  497. throw MakeStringException(-1, "MPTEST: MPRing Error, numranks (%u) must be > 1", numranks);
  498. if (iters == 0)
  499. iters = 1000;
  500. unsigned pintvl = iters/10;
  501. if (pintvl < 1)
  502. pintvl = 1;
  503. PROGLOG("MPTEST: MPRing myrank=%u numranks=%u iters=%u", myrank, numranks, iters);
  504. unsigned next = myrank;
  505. unsigned prev = myrank;
  506. unsigned k = 0;
  507. do
  508. {
  509. next = (next+1) % numranks;
  510. prev = prev > 0 ? prev-1 : numranks-1;
  511. // skip self
  512. if ( (next == prev) && (next == myrank) )
  513. continue;
  514. smb.clear();
  515. smb.append(k);
  516. if ((k%pintvl) == 0)
  517. PROGLOG("MPTEST: MPRing %u send to rank %u", myrank, next);
  518. bool oksend = mpicomm->send(smb, next, MPTAG_TEST);
  519. if (!oksend)
  520. throw MakeStringException(-1, "MPTEST: MPRing %u send() to rank %u failed", myrank, next);
  521. rmb.clear();
  522. if ((k%pintvl) == 0)
  523. PROGLOG("MPTEST: MPRing %u recv from rank %u", myrank, prev);
  524. bool okrecv = mpicomm->recv(rmb, prev, MPTAG_TEST);
  525. if (!okrecv)
  526. throw MakeStringException(-1, "MPTEST: MPRing %u recv() from rank %u failed", myrank, prev);
  527. rmb.read(k);
  528. k++;
  529. if ((k%pintvl) == 0)
  530. PROGLOG("MPTEST: MPRing %u iteration %u complete", myrank, k);
  531. if (k == iters)
  532. break;
  533. }
  534. while (true);
  535. PROGLOG("MPTEST: MPRing complete");
  536. mpicomm->barrier();
  537. }
  538. #define MSGLEN 1048576
  539. void MPAlltoAll(IGroup *group, ICommunicator *mpicomm, size32_t buffsize=0, unsigned iters=0)
  540. {
  541. rank_t myrank = group->rank();
  542. rank_t numranks = group->ordinality();
  543. if (numranks < 2)
  544. throw MakeStringException(-1, "MPAlltoAll: MPRing Error, numranks (%u) must be > 1", numranks);
  545. if (buffsize == 0)
  546. buffsize = MSGLEN;
  547. if (iters == 0)
  548. iters = 1000;
  549. PROGLOG("MPTEST: MPAlltoAll myrank=%u numranks=%u buffsize=%u iters=%u", myrank, numranks, buffsize, iters);
  550. // ---------
  551. class Sender : public Thread
  552. {
  553. public:
  554. Linked<ICommunicator> mpicomm;
  555. rank_t numranks;
  556. rank_t myrank;
  557. size32_t buffsize;
  558. unsigned iters;
  559. Sender(ICommunicator *_mpicomm, rank_t _numranks, rank_t _myrank, size32_t _buffsize, unsigned _iters) : mpicomm(_mpicomm), numranks(_numranks), myrank(_myrank), buffsize(_buffsize), iters(_iters)
  560. {
  561. }
  562. int run()
  563. {
  564. PROGLOG("MPTEST: MPAlltoAll sender started, myrank = %u", myrank);
  565. int pintvl = iters/10;
  566. if (pintvl < 1)
  567. pintvl = 1;
  568. CMessageBuffer smb;
  569. smb.appendBytes('a', buffsize);
  570. for (unsigned k=1;k<=iters;k++)
  571. {
  572. bool oksend = mpicomm->send(smb, RANK_ALL_OTHER, MPTAG_TEST);
  573. if (!oksend)
  574. throw MakeStringException(-1, "MPTEST: MPAlltoAll %u send() failed", myrank);
  575. if ((k%pintvl) == 0)
  576. PROGLOG("MPTEST: MPAlltoAll sender %u iteration %u complete", myrank, k);
  577. }
  578. mpicomm->barrier();
  579. PROGLOG("MPTEST: MPAlltoAll sender stopped");
  580. return 0;
  581. }
  582. } sender(mpicomm, numranks, myrank, buffsize, iters);
  583. unsigned startTime = msTick();
  584. sender.start();
  585. // ---------
  586. PROGLOG("MPTEST: MPAlltoAll receiver started, myrank = %u", myrank);
  587. int pintvl = iters/10;
  588. if (pintvl < 1)
  589. pintvl = 1;
  590. CMessageBuffer rmb(buffsize);
  591. for (unsigned k=1;k<=iters;k++)
  592. {
  593. for (rank_t i=1;i<numranks;i++)
  594. {
  595. // rmb.clear();
  596. bool okrecv = mpicomm->recv(rmb, RANK_ALL, MPTAG_TEST);
  597. if (!okrecv)
  598. throw MakeStringException(-1, "MPTEST: MPAlltoAll %u recv() failed", myrank);
  599. if (i==1 && (k%pintvl) == 0)
  600. PROGLOG("MPTEST: MPAlltoAll receiver rank %u iteration %u complete", myrank, k);
  601. }
  602. }
  603. mpicomm->barrier();
  604. PROGLOG("MPTEST: MPAlltoAll receiver finished");
  605. // ---------
  606. sender.join();
  607. unsigned endTime = msTick();
  608. double msgRateMB = (2.0*(double)buffsize*(double)iters*(double)(numranks-1)) / ((endTime-startTime)*1000.0);
  609. PROGLOG("MPTEST: MPAlltoAll complete %g MB/s", msgRateMB);
  610. return;
  611. }
  612. void MPTest2(IGroup *group, ICommunicator *mpicomm)
  613. {
  614. rank_t myrank = group->rank();
  615. rank_t numranks = group->ordinality();
  616. PROGLOG("MPTEST: MPTest2: myrank=%u numranks=%u", myrank, numranks);
  617. mpicomm->barrier();
  618. return;
  619. }
  620. void testIPnodeHash()
  621. {
  622. setNodeCaching(true);
  623. class casyncfor: public CAsyncFor
  624. {
  625. public:
  626. casyncfor()
  627. {
  628. }
  629. void Do(unsigned i)
  630. {
  631. StringBuffer ips;
  632. ips.appendf("%d.%d.%d.%d",i/256,1,2,getRandom()%10);
  633. SocketEndpoint ep(ips.str());
  634. try {
  635. Owned<INode> node = createINode(ep);
  636. }
  637. catch (IException *e)
  638. {
  639. EXCLOG(e,"failed");
  640. }
  641. }
  642. } afor;
  643. afor.For(100000,10);
  644. }
  645. //-----------Utility classes and global variables---------------//
  646. CriticalSection sendCriticalSec;
  647. CriticalSection recvCriticalSec;
  648. CriticalSection validateCriticalSec;
  649. bool* validate;
  650. int getNextCount(CriticalSection &sect, int &count)
  651. {
  652. CriticalBlock block(sect);
  653. if (count)
  654. return count--;
  655. else
  656. return 0;
  657. }
  658. //validate that numbers from 1 to maxCounter are received only once
  659. void setValidate(int i, int maxCounter)
  660. {
  661. CriticalBlock block(validateCriticalSec);
  662. assertex(i>0);
  663. assertex(i<=maxCounter);
  664. assertex(validate[i-1] == false);
  665. validate[i-1] = true;
  666. }
  667. //-------------------------------------------------------------//
  668. /**
  669. * Test sending a message to its self
  670. */
  671. void MPSelfSend(ICommunicator *mpcomm)
  672. {
  673. CMessageBuffer mb;
  674. int sendMessage = 1234;
  675. int receivedMessage;
  676. rank_t myrank = mpcomm->getGroup()->rank();
  677. mb.append(sendMessage);
  678. mpcomm->send(mb, myrank, MPTAG_TEST);
  679. mb.clear();
  680. mpcomm->recv(mb, myrank, MPTAG_TEST);
  681. mb.read(receivedMessage);
  682. assertex(sendMessage == receivedMessage);
  683. PROGLOG("MPTEST: %s: Message sent from %d to %d", TEST_SELFSEND, myrank, myrank);
  684. }
  685. /**
  686. * Test sending message to next (wrap-around) processor
  687. */
  688. void MPRightShift(ICommunicator* comm)
  689. {
  690. IGroup* group = comm->getGroup();
  691. rank_t p = group->ordinality();
  692. rank_t rank = group->rank();
  693. rank_t source_rank = (rank - 1 + p) % p;
  694. rank_t destination_rank = (rank + 1) % p;
  695. CMessageBuffer sendMsg;
  696. sendMsg.append(rank);
  697. comm->send(sendMsg, destination_rank, MPTAG_TEST);
  698. CMessageBuffer recvMsg;
  699. int received_msg;
  700. comm->recv(recvMsg, source_rank, MPTAG_TEST);
  701. recvMsg.read(received_msg);
  702. assertex(source_rank == received_msg);
  703. PROGLOG("Message received from node %d to node %d.", source_rank, rank);
  704. }
  705. /**
  706. * Test receiving message from an unknown node
  707. */
  708. void MPReceiveFromAny(ICommunicator* comm)
  709. {
  710. IGroup* group = comm->getGroup();
  711. rank_t nodeRank = group->rank();
  712. rank_t p = group->ordinality();
  713. rank_t rank = group->rank();
  714. rank_t destinationRank = (p-1);
  715. double expectedValue = 1234.0;
  716. PROGLOG("nodeRank=%u", nodeRank);
  717. if (rank == nodeRank)
  718. {
  719. CMessageBuffer sendMsg;
  720. sendMsg.append(expectedValue);
  721. comm->send(sendMsg, destinationRank, MPTAG_TEST);
  722. PROGLOG("Message sent by node %d to node %d.", rank, destinationRank);
  723. }
  724. if (rank == destinationRank)
  725. {
  726. CMessageBuffer recvMsg;
  727. bool success = comm->recv(recvMsg, RANK_ALL, MPTAG_TEST);
  728. assertex(success);
  729. double receivedValue;
  730. recvMsg.read(receivedValue);
  731. PROGLOG("rank=%u, nodeRank=%u", comm->getGroup()->rank(recvMsg.getSender()), nodeRank);
  732. assertex(nodeRank == comm->getGroup()->rank(recvMsg.getSender()));
  733. assertex(expectedValue == receivedValue);
  734. PROGLOG("Message successfully received from node %d to node %d.", comm->getGroup()->rank(recvMsg.getSender()), rank);
  735. }
  736. }
  737. /**
  738. * Test one node sending a message to all nodes
  739. */
  740. void MPSendToAll(ICommunicator* comm)
  741. {
  742. IGroup* group = comm->getGroup();
  743. rank_t nodeRank = group->rank();
  744. rank_t p = group->ordinality();
  745. rank_t rank = group->rank();
  746. double expectedValue = 1234.0;
  747. double receivedValue;
  748. if (rank == nodeRank)
  749. {
  750. CMessageBuffer sendMsg;
  751. sendMsg.append(expectedValue);
  752. comm->send(sendMsg, RANK_ALL, MPTAG_TEST);
  753. }
  754. CMessageBuffer recvMsg;
  755. comm->recv(recvMsg, nodeRank, MPTAG_TEST, NULL);
  756. recvMsg.read(receivedValue);
  757. assertex(expectedValue == receivedValue);
  758. PROGLOG("Message received from node %d to node %d.", nodeRank, rank);
  759. }
  760. /**
  761. * Test multiple threads calling send and recv functions
  762. */
  763. void MPMultiMTSendRecv(ICommunicator* comm, int counter)
  764. {
  765. assertex(comm->getGroup()->ordinality()>1);
  766. counter = (counter? counter: 100);
  767. int SEND_THREADS, RECV_THREADS;
  768. SEND_THREADS = RECV_THREADS = 8;
  769. rank_t rank = comm->getGroup()->rank();
  770. // nodes ranked 0 and 1 will be conducting this test
  771. if (rank<2)
  772. {
  773. validate = new bool[counter];
  774. for(int i=0; i<counter; i++) validate[i] = false;
  775. class SWorker: public Thread
  776. {
  777. private:
  778. ICommunicator* comm;
  779. int* counter;
  780. public:
  781. SWorker(ICommunicator* _comm, int* _counter):comm(_comm), counter(_counter){}
  782. int run()
  783. {
  784. IGroup *group = comm->getGroup();
  785. rank_t p = group->ordinality();
  786. rank_t rank = group->rank();
  787. rank_t destination_rank = 1 - rank;
  788. CMessageBuffer sendMsg;
  789. int served = 0;
  790. while(true)
  791. {
  792. sendMsg.clear();
  793. int v = getNextCount(sendCriticalSec, *counter);
  794. if (v > 0)
  795. {
  796. sendMsg.append(v);
  797. comm->send(sendMsg, destination_rank, MPTAG_TEST);
  798. served++;
  799. }
  800. else
  801. {
  802. break;
  803. }
  804. }
  805. PROGLOG("This thread sent %d", served);
  806. return 0;
  807. }
  808. };
  809. class RWorker: public Thread
  810. {
  811. private:
  812. ICommunicator* comm;
  813. int* counter;
  814. int maxCounter;
  815. public:
  816. RWorker(ICommunicator* _comm, int* _counter):comm(_comm), counter(_counter), maxCounter(*_counter){}
  817. int run()
  818. {
  819. IGroup *group = comm->getGroup();
  820. rank_t p = group->ordinality();
  821. rank_t rank = group->rank();
  822. rank_t source_rank = 1 - rank;
  823. int served = 0;
  824. CMessageBuffer recvMsg;
  825. int received_msg;
  826. while (*counter)
  827. {
  828. recvMsg.clear();
  829. if (comm->recv(recvMsg, source_rank, MPTAG_TEST, NULL, 100))
  830. {
  831. recvMsg.read(received_msg);
  832. setValidate(received_msg, maxCounter);
  833. getNextCount(recvCriticalSec, *counter);
  834. served++;
  835. }
  836. }
  837. PROGLOG("This thread received %d", served);
  838. return 0;
  839. }
  840. };
  841. std::vector<Thread*> workers;
  842. int s_counter, r_counter;
  843. s_counter = r_counter = counter;
  844. PROGLOG("counter=%d", counter);
  845. for(int i=0;i<SEND_THREADS; i++)
  846. {
  847. workers.push_back(new SWorker(comm, &s_counter));
  848. }
  849. for(int i=0;i<RECV_THREADS; i++)
  850. {
  851. workers.push_back(new RWorker(comm, &r_counter));
  852. }
  853. for(int i=0;i<workers.size(); i++)
  854. {
  855. workers[i]->start();
  856. }
  857. for(int i=0;i<workers.size(); i++)
  858. {
  859. workers[i]->join();
  860. }
  861. for(int i=0;i<workers.size(); i++)
  862. {
  863. delete workers[i];
  864. }
  865. assertex(s_counter == 0);
  866. assertex(r_counter == 0);
  867. PROGLOG("Rank %d sent %d messages", rank, (counter-s_counter));
  868. PROGLOG("Rank %d received %d messages", rank, (counter-r_counter));
  869. delete [] validate;
  870. }
  871. comm->barrier();
  872. }
  873. void MPNxN(ICommunicator *comm, unsigned numStreams, size32_t perStreamMBSize, size32_t msgSize, bool async)
  874. {
  875. // defaults
  876. if (0 == numStreams)
  877. numStreams = 8;
  878. if (0 == perStreamMBSize)
  879. perStreamMBSize = 100;
  880. if (0 == msgSize)
  881. msgSize = 1024*1024;
  882. unsigned grpSize = comm->queryGroup().ordinality();
  883. rank_t myRank = comm->queryGroup().rank();
  884. PROGLOG("MPNxN: myrank=%u, numStreams=%u, perStreamMBSize=%u, msgSize(bytes)=%u", myRank=(unsigned)myRank, numStreams, perStreamMBSize, msgSize);
  885. class CSendStream : public CInterfaceOf<IInterface>, implements IThreaded
  886. {
  887. CThreaded threaded;
  888. ICommunicator *comm = nullptr;
  889. rank_t myRank;
  890. unsigned grpSize;
  891. mptag_t mpTag, replyTag;
  892. unsigned __int64 totalSendSize;
  893. size32_t msgSize;
  894. StringBuffer resultMsg;
  895. bool passed = false;
  896. std::vector<rank_t> tgtRanks;
  897. StringBuffer tgtRanksStr;
  898. bool async = false;
  899. // used if async
  900. class CAckThread : implements IThreaded
  901. {
  902. CThreaded threaded;
  903. CSendStream &owner;
  904. mptag_t mpTag;
  905. CriticalSection cs;
  906. Owned<IException> exception;
  907. std::vector<std::queue<unsigned>> expectedHashes;
  908. unsigned getNextExpectedHash(unsigned sender)
  909. {
  910. std::queue<unsigned> &queue = expectedHashes[sender];
  911. CriticalBlock b(cs);
  912. assertex(queue.size());
  913. unsigned hash = queue.front();
  914. expectedHashes[sender].pop();
  915. return hash;
  916. }
  917. public:
  918. CAckThread(CSendStream &_owner, mptag_t _mpTag) : owner(_owner), mpTag(_mpTag), threaded("CAckThread", this)
  919. {
  920. expectedHashes.resize(owner.grpSize);
  921. }
  922. void addHash(unsigned tgt, unsigned hash)
  923. {
  924. CriticalBlock b(cs);
  925. expectedHashes[tgt].push(hash);
  926. }
  927. void start() { threaded.start(); }
  928. void join()
  929. {
  930. threaded.join();
  931. if (exception)
  932. throw exception.getClear();
  933. }
  934. // IThredaed
  935. virtual void threadmain() override
  936. {
  937. try
  938. {
  939. CMessageBuffer msg;
  940. rank_t sender;
  941. unsigned finalAcks = 0;
  942. while (true)
  943. {
  944. unsigned receivedHash;
  945. if (!owner.receiveAck(msg, sender, receivedHash)) // empty message indicates end by client
  946. {
  947. finalAcks++;
  948. if (finalAcks == owner.grpSize)
  949. break; // all done
  950. }
  951. else
  952. {
  953. unsigned expectedHash = getNextExpectedHash(sender);
  954. owner.verifyAck(receivedHash, expectedHash);
  955. }
  956. }
  957. }
  958. catch (IException *e)
  959. {
  960. exception.setown(e);
  961. }
  962. }
  963. } ackThread;
  964. unsigned fillData(void *data, size32_t sz, unsigned hash)
  965. {
  966. byte *pData = (byte *)data;
  967. do
  968. {
  969. if (sz<sizeof(hash))
  970. {
  971. memset(pData, 0, sz);
  972. break;
  973. }
  974. hash = hashc((unsigned char *)&hash, sizeof(hash), hash);
  975. memcpy(pData, &hash, sizeof(hash));
  976. sz -= sizeof(hash);
  977. pData += sizeof(hash);
  978. }
  979. while (true);
  980. return hash;
  981. }
  982. bool receiveAck(CMessageBuffer &msg, rank_t &sender, unsigned &ack)
  983. {
  984. while (!comm->recv(msg, RANK_ALL, replyTag, &sender, 60000))
  985. WARNLOG("Waiting for receive ack");
  986. if (0 == msg.length())
  987. return false; // final empty message indicated complete
  988. if (std::find(tgtRanks.begin(), tgtRanks.end(), sender) == tgtRanks.end())
  989. throwStringExceptionV(0, "Received reply from rank(%u) this stream did not send to", (unsigned)sender);
  990. msg.read(ack);
  991. msg.clear();
  992. return true;
  993. }
  994. void verifyAck(unsigned receivedHash, unsigned expectedHash)
  995. {
  996. if (receivedHash != expectedHash)
  997. throwStringExceptionV(0, "Checksums mismatch: %u sent vs %u received", expectedHash, receivedHash);
  998. }
  999. public:
  1000. CSendStream(ICommunicator *_comm, rank_t _myRank, unsigned _grpSize, mptag_t _mpTag, unsigned __int64 _totalSendSize, size32_t _msgSize, bool _async)
  1001. : threaded("CSendStream", this), comm(_comm), myRank(_myRank), grpSize(_grpSize), mpTag(_mpTag), totalSendSize(_totalSendSize), msgSize(_msgSize), ackThread(*this, _mpTag), async(_async)
  1002. {
  1003. if (1 == grpSize) // group only contains self, so target self
  1004. tgtRanks.push_back(0);
  1005. else
  1006. {
  1007. unsigned pc=25; // %
  1008. // add 'pc'% targets, starting from myRank+1
  1009. unsigned num = grpSize*pc/100;
  1010. if (0 == num)
  1011. num = 1;
  1012. unsigned step = grpSize / num;
  1013. unsigned t = myRank+1;
  1014. if (t == grpSize)
  1015. t = 0;
  1016. while (true)
  1017. {
  1018. tgtRanks.push_back(t);
  1019. --num;
  1020. if (0 == num)
  1021. break;
  1022. t += step;
  1023. if (t >= grpSize)
  1024. t -= grpSize;
  1025. }
  1026. }
  1027. auto iter = tgtRanks.begin();
  1028. while (true)
  1029. {
  1030. tgtRanksStr.append(*iter);
  1031. iter++;
  1032. if (iter == tgtRanks.end())
  1033. break;
  1034. tgtRanksStr.append(",");
  1035. }
  1036. replyTag = createReplyTag();
  1037. if (async)
  1038. ackThread.start();
  1039. threaded.start();
  1040. }
  1041. ~CSendStream()
  1042. {
  1043. threaded.join();
  1044. }
  1045. bool waitResult(StringBuffer &resultMessage)
  1046. {
  1047. threaded.join();
  1048. return passed;
  1049. }
  1050. const char *queryTgtRanks() { return tgtRanksStr; }
  1051. virtual void threadmain() override
  1052. {
  1053. passed = false;
  1054. try
  1055. {
  1056. CMessageBuffer msg, recvMsg;
  1057. msg.setReplyTag(replyTag);
  1058. void *data = msg.reserveTruncate(msgSize);
  1059. unsigned hash = (unsigned)mpTag;
  1060. VStringBuffer logMsg("NxN: mpTag=%u, dstRank(s) [%s]", (unsigned)mpTag, tgtRanksStr.str());
  1061. PROGLOG("%s", logMsg.str());
  1062. unsigned __int64 remaining = totalSendSize;
  1063. CCycleTimer timer;
  1064. while (true)
  1065. {
  1066. size32_t sz;
  1067. if (remaining >= msgSize)
  1068. {
  1069. sz = msgSize;
  1070. remaining -= msgSize;
  1071. }
  1072. else
  1073. {
  1074. sz = remaining;
  1075. msg.setLength(sz);
  1076. remaining = 0;
  1077. }
  1078. hash = fillData(data, sz, hash);
  1079. for (rank_t t: tgtRanks)
  1080. {
  1081. if (async)
  1082. ackThread.addHash(t, hash);
  1083. if (!comm->send(msg, t, mpTag))
  1084. throwUnexpected();
  1085. }
  1086. if (!async)
  1087. {
  1088. rank_t sender;
  1089. for (int t: tgtRanks)
  1090. {
  1091. rank_t sender;
  1092. unsigned receivedHash;
  1093. assertex(receiveAck(recvMsg, sender, receivedHash));
  1094. verifyAck(hash, receivedHash);
  1095. }
  1096. }
  1097. if (!remaining)
  1098. break;
  1099. }
  1100. msg.clear();
  1101. // send blank msg to all to signal end to receivers.
  1102. if (!comm->send(msg, RANK_ALL, mpTag))
  1103. throwUnexpected();
  1104. if (async)
  1105. ackThread.join();
  1106. else
  1107. {
  1108. rank_t sender;
  1109. for (unsigned r=0; r<grpSize; r++)
  1110. {
  1111. while (!comm->recv(msg, r, replyTag, &sender, 60000))
  1112. WARNLOG("Waiting for final ack");
  1113. assertex(sender == r);
  1114. assertex(0 == msg.length());
  1115. }
  1116. }
  1117. float ms = timer.elapsedMs();
  1118. float mbPerSec = (totalSendSize/ms*1000)/0x100000;
  1119. PROGLOG("Stream stats: time taken = %.2f seconds, total sent=%u MB, throughput = %.2f MB/s", ms/1000, (unsigned)(totalSendSize/0x100000), mbPerSec);
  1120. }
  1121. catch (IException *e)
  1122. {
  1123. e->errorMessage(resultMsg);
  1124. EXCLOG(e, "FAIL");
  1125. e->Release();
  1126. return;
  1127. }
  1128. passed = true;
  1129. }
  1130. };
  1131. class CRecvServer : public CInterfaceOf<IInterface>, implements IThreaded
  1132. {
  1133. CThreaded threaded;
  1134. ICommunicator *comm;
  1135. rank_t myRank;
  1136. unsigned grpSize;
  1137. mptag_t mpTag;
  1138. unsigned numStreams;
  1139. unsigned checkData(MemoryBuffer &mb, unsigned hash)
  1140. {
  1141. size32_t len = mb.remaining();
  1142. const byte *p = (const byte *)mb.readDirect(len);
  1143. while (len >= sizeof(hash))
  1144. {
  1145. hash = hashc((unsigned char *)&hash, sizeof(hash), hash);
  1146. if (0 != memcmp(p, &hash, sizeof(hash)))
  1147. return 0;
  1148. p += sizeof(hash);
  1149. len -= sizeof(hash);
  1150. }
  1151. return hash;
  1152. }
  1153. public:
  1154. CRecvServer(ICommunicator *_comm, rank_t _myRank, unsigned _grpSize, mptag_t _mpTag, unsigned _numStreams)
  1155. : threaded("CSendStream", this), comm(_comm), myRank(_myRank), grpSize(_grpSize), mpTag(_mpTag), numStreams(_numStreams)
  1156. {
  1157. threaded.start();
  1158. }
  1159. ~CRecvServer()
  1160. {
  1161. threaded.join();
  1162. }
  1163. void join()
  1164. {
  1165. threaded.join();
  1166. }
  1167. void stop()
  1168. {
  1169. comm->cancel(RANK_ALL, mpTag);
  1170. join();
  1171. }
  1172. virtual void threadmain() override
  1173. {
  1174. unsigned __int64 szRecvd = 0;
  1175. std::vector<rank_t> endReplyTags;
  1176. try
  1177. {
  1178. unsigned hash = (unsigned)mpTag;
  1179. unsigned clients = grpSize;
  1180. endReplyTags.resize(clients);
  1181. CMessageBuffer msg;
  1182. do
  1183. {
  1184. rank_t sender;
  1185. while (!comm->recv(msg, RANK_ALL, mpTag, &sender, 60000))
  1186. PROGLOG("Waiting for data on %u", (unsigned)mpTag);
  1187. if (!msg.length())
  1188. {
  1189. /* each client sends a zero length buffer when done.
  1190. * When all received, receiver can stop, and replies to indicate finished.
  1191. *
  1192. */
  1193. endReplyTags[(unsigned)sender] = msg.getReplyTag();
  1194. --clients;
  1195. if (0 == clients)
  1196. {
  1197. for (unsigned r=0; r<endReplyTags.size(); r++)
  1198. {
  1199. if (!comm->send(msg, r, (mptag_t)endReplyTags[r]))
  1200. throwUnexpected();
  1201. }
  1202. break;
  1203. }
  1204. }
  1205. else
  1206. {
  1207. szRecvd += msg.length();
  1208. // read 1st hash, then use to calculate and check incoming data.
  1209. msg.read(hash);
  1210. hash = checkData(msg, hash);
  1211. msg.clear();
  1212. msg.append(hash); // this should match what client calculated presend.
  1213. if (!comm->reply(msg))
  1214. throwUnexpected();
  1215. }
  1216. }
  1217. while (true);
  1218. }
  1219. catch (IException *e)
  1220. {
  1221. EXCLOG(e, "CRecvServer");
  1222. e->Release();
  1223. }
  1224. PROGLOG("NxN:Receiver[tag=%u] szRecvd=%" I64F "u finished", (unsigned)mpTag, szRecvd);
  1225. }
  1226. };
  1227. mptag_t mpTag = (mptag_t)0x20000;
  1228. std::vector<Owned<CRecvServer>> receivers;
  1229. std::vector<Owned<CSendStream>> senders;
  1230. for (unsigned s=0; s<numStreams; s++)
  1231. {
  1232. receivers.push_back(new CRecvServer(comm, myRank, grpSize, mpTag, numStreams));
  1233. senders.push_back(new CSendStream(comm, myRank, grpSize, mpTag, ((unsigned __int64)perStreamMBSize)*0x100000, msgSize, async));
  1234. mpTag = (mptag_t)((unsigned)mpTag+1);
  1235. }
  1236. bool allSuccess = true;
  1237. for (unsigned senderN=0; senderN<senders.size(); senderN++)
  1238. {
  1239. const auto &sender = senders[senderN];
  1240. StringBuffer resultMsg;
  1241. bool res = sender->waitResult(resultMsg);
  1242. VStringBuffer logMsg("Stream[%u] from rank %u -> rank(s) [%s] result: ", senderN, (unsigned)myRank, sender->queryTgtRanks());
  1243. if (res)
  1244. logMsg.append("PASSED");
  1245. else
  1246. {
  1247. logMsg.append("FAILED - ").append(resultMsg.str());
  1248. allSuccess = false;
  1249. }
  1250. PROGLOG("%s", logMsg.str());
  1251. }
  1252. for (const auto &receiver: receivers)
  1253. {
  1254. if (allSuccess)
  1255. receiver->join();
  1256. else
  1257. receiver->stop();
  1258. }
  1259. }
  1260. // various test use some of these configurable parameters
  1261. static size32_t buffsize = 0;
  1262. static unsigned numiters = 0;
  1263. static unsigned numStreams = 0;
  1264. static unsigned perStreamMBSize = 0;
  1265. static bool async = false;
  1266. void runTest(const char *caption, const char *testname, IGroup* group, ICommunicator* comm)
  1267. {
  1268. if (group->rank()==0)
  1269. {
  1270. printf("\n\n");
  1271. PROGLOG("%s %s", caption, testname);
  1272. PROGLOG("========================");
  1273. }
  1274. comm->barrier();
  1275. if (strieq(testname, TEST_STREAM))
  1276. StreamTest(group, comm);
  1277. else if (strieq(testname, TEST_MULTI))
  1278. MultiTest(comm);
  1279. else if (strieq(testname, TEST_RING))
  1280. MPRing(group, comm, numiters);
  1281. else if (strieq(testname, TEST_AlltoAll))
  1282. MPAlltoAll(group, comm, buffsize, numiters);
  1283. else if (strieq(testname, TEST_RANK))
  1284. MPTest2(group, comm);
  1285. else if (strieq(testname, TEST_SELFSEND))
  1286. MPSelfSend(comm);
  1287. else if (strieq(testname, TEST_SINGLE_SEND))
  1288. Test1(group, comm);
  1289. else if (strieq(testname, TEST_RIGHT_SHIFT))
  1290. MPRightShift(comm);
  1291. else if (strieq(testname, TEST_RECV_FROM_ANY))
  1292. MPReceiveFromAny(comm);
  1293. else if (strieq(testname, TEST_SEND_TO_ALL))
  1294. MPSendToAll(comm);
  1295. else if (strieq(testname, TEST_MULTI_MT))
  1296. MPMultiMTSendRecv(comm, numiters);
  1297. else if (strieq(testname, TEST_NXN))
  1298. MPNxN(comm, numStreams, perStreamMBSize, buffsize, async);
  1299. else
  1300. PROGLOG("MPTEST: Error, invalid testname specified (-t %s)", testname);
  1301. comm->barrier();
  1302. }
  1303. bool createNodeList(IArrayOf<INode> &nodes, const char* hostfile, int my_port, rank_t max_ranks) {
  1304. unsigned i = 1;
  1305. char hoststr[256] = { "" };
  1306. FILE* fp = fopen(hostfile, "r");
  1307. if (fp == NULL)
  1308. {
  1309. PROGLOG("MPTest: Error, cannot open hostfile <%s>", hostfile);
  1310. return false;
  1311. }
  1312. char line[256] = { "" };
  1313. while (fgets(line, 255, fp) != NULL)
  1314. {
  1315. if ((max_ranks > 0) && ((i - 1) >= max_ranks))
  1316. break;
  1317. int srtn = sscanf(line, "%s", hoststr);
  1318. if (srtn == 1 && line[0] != '#') {
  1319. INode* newNode = createINode(hoststr, my_port);
  1320. nodes.append(*newNode);
  1321. i++;
  1322. }
  1323. }
  1324. fclose(fp);
  1325. return true;
  1326. }
  1327. void printHelp(char* executableName)
  1328. {
  1329. printf("\nMPTEST: Usage: %s <myport> [-f <hostfile> [-t <testname> -b <buffsize> -i <iters> -r <rank> -n <numprocs> -d] | <ip:port> <ip:port>] [-mpi] [-mp]\n\n",executableName);
  1330. std::vector<std::string> tests = { TEST_RANK, TEST_SELFSEND, TEST_MULTI,
  1331. TEST_STREAM, TEST_RING, TEST_AlltoAll, TEST_SINGLE_SEND,
  1332. TEST_RIGHT_SHIFT, TEST_RECV_FROM_ANY, TEST_SEND_TO_ALL,
  1333. TEST_MULTI_MT, TEST_NXN };
  1334. printf("\t <testname>");
  1335. for (auto &testName: tests)
  1336. printf("\t%s\n\t\t", testName.c_str());
  1337. printf("\n");
  1338. }
  1339. int main(int argc, char* argv[])
  1340. {
  1341. int mpi_debug = 0;
  1342. char testname[256] = { "" };
  1343. rank_t max_ranks = 0;
  1344. unsigned startupDelay = 0;
  1345. InitModuleObjects();
  1346. EnableSEHtoExceptionMapping();
  1347. // startMPServer(9123);
  1348. // testIPnodeHash();
  1349. // stopMPServer();
  1350. // return 0;
  1351. /* mp hostfile format:
  1352. * -------------------
  1353. * <IP0>:port0
  1354. * <IP1>:port1
  1355. * <IP2>:port2
  1356. * ...
  1357. *
  1358. * run script:
  1359. * -----------
  1360. * # NOTE: because mptest will stop if its cmdline port and native IP do not match
  1361. * # corresponding entry in hostfile - the same cmdline can be repeated on all hosts ...
  1362. * mptest port0 -f hostfile [-t testname] [-b buffsize] [-i iters] [-n numprocs] [-d] &
  1363. * mptest port1 -f hostfile ... &
  1364. * ...
  1365. * [wait]
  1366. *
  1367. * Test names (-t):
  1368. * -----------
  1369. * MPRing (default)
  1370. * StreamTest
  1371. * MultiTest
  1372. * MPAlltoAll
  1373. * MPTest2
  1374. * MPNxN [-b <msgSiz>] [-s <numStreams>] [-m <perStreamMBSize>] [-a]
  1375. *
  1376. * Options: (available with -f hostfile arg)
  1377. * --------
  1378. * -b buffsize (bytes) for MPAlltoAll and MPNxN tests
  1379. * -i iterations for MPRing and MPAlltoAll tests
  1380. * -n numprocs for when wanting to test a subset of ranks from hostfile/script
  1381. * -d for some additional debug output
  1382. * -s number of streams for MPNxN test
  1383. * -m total MB's to send per stream for MPNxN test
  1384. * -a async for NxN test
  1385. */
  1386. int argSize = argc;
  1387. char** argL = argv;
  1388. if (argSize<3)
  1389. {
  1390. printHelp(argv[0]);
  1391. return 0;
  1392. }
  1393. try
  1394. {
  1395. EnableSEHtoExceptionMapping();
  1396. StringBuffer lf;
  1397. rank_t tot_ranks = 0;
  1398. int my_port = atoi(argv[1]);
  1399. char logfile[256] = { "" };
  1400. sprintf(logfile,"mptest-%d.log",my_port);
  1401. openLogFile(lf, logfile);
  1402. IArrayOf<INode> nodes;
  1403. const char * hostfile = nullptr;
  1404. if (argSize > 3)
  1405. {
  1406. if (strcmp(argL[2], "-f") == 0)
  1407. hostfile = argL[3];
  1408. }
  1409. if (hostfile)
  1410. {
  1411. int j = 4;
  1412. while (j < argSize)
  1413. {
  1414. if (streq(argL[j], "-t"))
  1415. {
  1416. if ((j+1) < argSize)
  1417. {
  1418. strcpy(testname, argL[j+1]);
  1419. j++;
  1420. }
  1421. }
  1422. else if (streq(argL[j], "-d"))
  1423. {
  1424. mpi_debug++;
  1425. }
  1426. else if (streq(argL[j], "-b"))
  1427. {
  1428. if ((j+1) < argSize)
  1429. {
  1430. buffsize = atoi(argL[j+1]);
  1431. j++;
  1432. }
  1433. }
  1434. else if (streq(argL[j], "-i"))
  1435. {
  1436. if ((j+1) < argSize)
  1437. {
  1438. numiters = atoi(argL[j+1]);
  1439. j++;
  1440. }
  1441. }
  1442. else if ( streq(argL[j], "-n") || streq(argL[j], "-np") )
  1443. {
  1444. if ((j+1) < argSize)
  1445. {
  1446. max_ranks = atoi(argL[j+1]);
  1447. j++;
  1448. }
  1449. }
  1450. else if (streq(argv[j], "-s"))
  1451. {
  1452. if ((j+1) < argc)
  1453. {
  1454. numStreams = atoi(argv[j+1]);
  1455. j++;
  1456. }
  1457. }
  1458. else if (streq(argv[j], "-m"))
  1459. {
  1460. if ((j+1) < argc)
  1461. {
  1462. perStreamMBSize = atoi(argv[j+1]);
  1463. j++;
  1464. }
  1465. }
  1466. else if (streq(argv[j], "-a"))
  1467. async = true;
  1468. else if (streq(argv[j], "-delay"))
  1469. {
  1470. if ((j+1) < argc)
  1471. {
  1472. startupDelay = atoi(argv[j+1]);
  1473. j++;
  1474. }
  1475. }
  1476. j++;
  1477. }
  1478. if (!createNodeList(nodes, hostfile, my_port, max_ranks))
  1479. return 1;
  1480. }
  1481. else
  1482. {
  1483. unsigned i = 1;
  1484. while (i+1 < argSize)
  1485. {
  1486. PROGLOG("MPTEST: adding node %u, port = <%s>", i-1, argL[i+1]);
  1487. INode *newNode = createINode(argL[i+1], my_port);
  1488. nodes.append(*newNode);
  1489. i++;
  1490. }
  1491. }
  1492. tot_ranks = nodes.length();
  1493. if (startupDelay)
  1494. {
  1495. PROGLOG("Pausing for startupDelay = %u second(s)", startupDelay);
  1496. MilliSleep(startupDelay * 1000);
  1497. PROGLOG("Resuming");
  1498. }
  1499. Owned<IGroup> group = createIGroup(tot_ranks, nodes.getArray());
  1500. // stop if not meant for this host ...
  1501. IpAddress myIp;
  1502. GetHostIp(myIp);
  1503. SocketEndpoint myEp(my_port, myIp);
  1504. bool die = true;
  1505. for (rank_t k=0;k<tot_ranks;k++)
  1506. {
  1507. if (nodes.item(k).endpoint().equals(myEp))
  1508. die = false;
  1509. }
  1510. if (die)
  1511. return 0;
  1512. PROGLOG("MPTEST: Starting, port = %d tot ranks = %u", my_port, tot_ranks);
  1513. startMPServer(my_port);
  1514. if (mpi_debug)
  1515. {
  1516. for (rank_t k=0;k<tot_ranks;k++)
  1517. {
  1518. StringBuffer urlStr;
  1519. nodes.item(k).endpoint().getUrlStr(urlStr);
  1520. PROGLOG("MPTEST: adding node %u, %s", k, urlStr.str());
  1521. }
  1522. }
  1523. Owned<ICommunicator> comm = createCommunicator(group);
  1524. runTest("MPTEST: Running MP Test:", testname, group, comm);
  1525. stopMPServer();
  1526. }
  1527. catch (IException *e)
  1528. {
  1529. pexception("Exception",e);
  1530. e->Release();
  1531. }
  1532. PROGLOG("MPTEST: bye");
  1533. return 0;
  1534. }