mptest.cpp 21 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829
  1. // CSocketSelectThread error 10038
  2. #include <platform.h>
  3. #include <jlib.hpp>
  4. #include <jthread.hpp>
  5. #include <jmisc.hpp>
  6. #include <jcrc.hpp>
  7. #include <mpbase.hpp>
  8. #include <mpcomm.hpp>
  9. using namespace std;
  10. #define MPPORT 8888
  11. #define MULTITEST
  12. //#define STREAMTEST
  13. //#define MPITEST
  14. //#define MPITEST2
  15. //#define GPF
  16. #ifdef MULTITEST
  17. //#define MYMACHINES "10.150.10.16,10.150.10.17,10.150.10.18,10.150.10.19,10.150.10.20,10.150.10.21,10.150.10.22,10.150.10.23,10.150.10.47,10.150.10.48,10.150.10.49,10.150.10.50,10.150.10.51,10.150.10.52,10.150.10.53,10.150.10.54,10.150.10.55,10.150.10.73,10.150.10.75,10.150.10.79"
  18. //#define MYMACHINES "192.168.16.124,10.150.10.17,10.150.10.18,10.150.10.19,10.150.10.20,10.150.10.21,10.150.10.22,10.150.10.23,10.150.10.47,10.150.10.48,10.150.10.49,10.150.10.50,10.150.10.51,10.150.10.52,10.150.10.53,10.150.10.54,10.150.10.55,10.150.10.73,10.150.10.75,10.150.10.79"
  19. #endif
  20. // #define aWhile 100000
  21. #define aWhile 10
  22. class CSectionTimer
  23. {
  24. HiresTimer hrt[1000];
  25. unsigned tids[1000];
  26. const char *name;
  27. static CriticalSection findsect;
  28. double total;
  29. double max;
  30. unsigned count;
  31. unsigned idx()
  32. {
  33. CriticalBlock block(findsect);
  34. unsigned tid = (unsigned)(memsize_t)GetCurrentThreadId();
  35. unsigned i;
  36. for (i=0;i<999;i++) {
  37. if (tids[i]==tid)
  38. break;
  39. if (tids[i]==0) {
  40. tids[i] = tid;
  41. break;
  42. }
  43. }
  44. return i;
  45. }
  46. public:
  47. CSectionTimer(const char *_name)
  48. {
  49. name = (const char *)strdup(_name);
  50. total = 0;
  51. max = 0;
  52. memset(tids,0,sizeof(tids));
  53. count = 0;
  54. }
  55. ~CSectionTimer()
  56. {
  57. free((void *)name);
  58. }
  59. void begin()
  60. {
  61. hrt[idx()].reset();
  62. }
  63. void end()
  64. {
  65. double v = hrt[idx()].get();
  66. total += v;
  67. if (max<v)
  68. max = v;
  69. count++;
  70. }
  71. void print()
  72. {
  73. if (count)
  74. PrintLog("TIME: %s(%d): max=%.6f, avg=%.6f, tot=%.6f",name,count,max,(double)total/count,total);
  75. }
  76. };
  77. CriticalSection CSectionTimer::findsect;
  78. class TimedBlock
  79. {
  80. CSectionTimer &stim;
  81. public:
  82. TimedBlock(CSectionTimer &_stim) : stim(_stim) { stim.begin(); }
  83. ~TimedBlock() { stim.end(); }
  84. };
  85. class TimedCriticalBlock
  86. {
  87. CriticalSection &crit;
  88. public:
  89. TimedCriticalBlock(CriticalSection &c,CSectionTimer &stim)
  90. : crit(c)
  91. {
  92. TimedBlock block(stim); crit.enter();
  93. }
  94. ~TimedCriticalBlock() { crit.leave(); }
  95. };
  96. static CSectionTimer STsend("send");
  97. static CSectionTimer STrecv("recv");
  98. //#define NITER 100
  99. #define NITER 40
  100. #define BLOCKSIZE (0x100000*10)
  101. //#define BLOCKSIZE (0x1000*10)
  102. #define WRITEDELAY 100
  103. #define READDELAY 5000
  104. void StreamTest(IGroup *group,ICommunicator *comm)
  105. {
  106. void *bufs[18];
  107. unsigned bi;
  108. for (bi=0;bi<16;bi++) {
  109. bufs[bi] = malloc(1024*1024*100);
  110. assertex(bufs[bi]);
  111. memset(bufs[bi],bi,1024*1024*100);
  112. }
  113. CMessageBuffer mb;
  114. for (unsigned i=0;i<NITER;i++) {
  115. if (group->rank() == 1) {
  116. mb.clear();
  117. StringBuffer header;
  118. header.append("Test Block #").append(i);
  119. mb.append(header.str()).reserve(BLOCKSIZE-mb.length());
  120. PrintLog("Sending '%s' length %d",header.str(),mb.length());
  121. {
  122. TimedBlock block(STsend);
  123. comm->send(mb,0,MPTAG_TEST,MP_ASYNC_SEND);
  124. }
  125. PrintLog("Sent");
  126. //Sleep(WRITEDELAY);
  127. }
  128. else if (group->rank() == 0) {
  129. rank_t r;
  130. comm->recv(mb,RANK_ALL,MPTAG_TEST,&r);
  131. StringAttr str;
  132. PrintLog("Receiving");
  133. {
  134. TimedBlock block(STrecv);
  135. mb.read(str);
  136. }
  137. PrintLog("Received(%d) '%s' length %d",r,str.get(),mb.length());
  138. //if (i==0)
  139. // Sleep(1000*1000); // 15 mins or so
  140. //Sleep(READDELAY);
  141. }
  142. else
  143. PrintLog("Skipping extra rank %d", group->rank());
  144. }
  145. comm->barrier();
  146. for (bi=0;bi<16;bi++)
  147. free(bufs[bi]);
  148. STsend.print();
  149. STrecv.print();
  150. }
  151. void Test1(IGroup *group,ICommunicator *comm)
  152. {
  153. PrintLog("test1");
  154. CMessageBuffer mb;
  155. if (group->rank()==0) {
  156. mb.append("Hello - Test1");
  157. comm->send(mb,1,MPTAG_TEST);
  158. }
  159. else if (group->rank()==1) {
  160. rank_t r;
  161. comm->recv(mb,0,MPTAG_TEST,&r);
  162. StringAttr str;
  163. mb.read(str);
  164. PrintLog("(1) Received '%s' from rank %d",str.get(),r);
  165. }
  166. comm->barrier();
  167. }
  168. void Test2(IGroup *group,ICommunicator *comm)
  169. {
  170. PrintLog("test2");
  171. CMessageBuffer mb;
  172. if (group->rank()==0) {
  173. mb.append("Hello - Test2");
  174. comm->send(mb,RANK_ALL,MPTAG_TEST);
  175. }
  176. else if (group->rank()==1) {
  177. #ifdef GPF
  178. PrintLog("GPFING");
  179. Sleep(aWhile);
  180. byte *p = NULL; *p = 1;
  181. #endif
  182. rank_t r;
  183. comm->recv(mb,RANK_ALL,MPTAG_TEST,&r);
  184. StringAttr str;
  185. mb.read(str);
  186. PrintLog("(2) Received '%s' from rank %d",str.get(),r);
  187. }
  188. comm->barrier();
  189. }
  190. void Test3(IGroup *group,ICommunicator *comm)
  191. {
  192. PrintLog("test3");
  193. CMessageBuffer mb;
  194. if (group->rank()==0) {
  195. mb.append("Hello - Test3");
  196. comm->send(mb,1,MPTAG_TEST);
  197. }
  198. else if (group->rank()==1) {
  199. rank_t r;
  200. comm->recv(mb,RANK_ALL,MPTAG_TEST,&r);
  201. StringAttr str;
  202. mb.read(str);
  203. PrintLog("(3) Received '%s' from rank %d",str.get(),r);
  204. }
  205. comm->barrier();
  206. }
  207. void Test4(IGroup *group,ICommunicator *comm)
  208. {
  209. PrintLog("test4");
  210. CMessageBuffer mb;
  211. if (group->rank()==0) {
  212. INode *singlenode=&group->queryNode(1);
  213. IGroup *singlegroup = createIGroup(1,&singlenode);
  214. ICommunicator * singlecomm = createCommunicator(singlegroup);
  215. mb.append("Hello - Test4");
  216. singlecomm->send(mb,0,MPTAG_TEST);
  217. singlecomm->Release();
  218. singlegroup->Release();
  219. }
  220. else if (group->rank()==1) {
  221. rank_t r;
  222. comm->recv(mb,RANK_ALL,MPTAG_TEST,&r);
  223. StringAttr str;
  224. mb.read(str);
  225. PrintLog("(4) Received '%s' from rank %d",str.get(),r);
  226. }
  227. comm->barrier();
  228. }
  229. void Test5(IGroup *group,ICommunicator *comm)
  230. {
  231. PrintLog("test5");
  232. rank_t rank = group->rank();
  233. INode *singlenode=&group->queryNode(1);
  234. IGroup *singlegroup = createIGroup(1,&singlenode);
  235. ICommunicator * singlecomm = createCommunicator(singlegroup);
  236. CMessageBuffer mb;
  237. if (rank==0) {
  238. mb.append("Hello - Test5");
  239. singlecomm->send(mb,0,MPTAG_TEST);
  240. }
  241. else if (rank==1) {
  242. rank_t r;
  243. singlecomm->recv(mb,RANK_ALL,MPTAG_TEST,&r);
  244. StringAttr str;
  245. mb.read(str);
  246. PrintLog("(5) Received '%s' from rank %d (unknown)",str.get(),r);
  247. }
  248. comm->barrier();
  249. singlecomm->Release();
  250. singlegroup->Release();
  251. }
  252. void Test6(IGroup *group,ICommunicator *comm)
  253. {
  254. PrintLog("test6");
  255. //DebugBreak();
  256. CMessageBuffer mb;
  257. StringAttr str;
  258. if (group->rank()==1) {
  259. mb.append("Test");
  260. bool cancelled = comm->sendRecv(mb,0,MPTAG_TEST);
  261. StringAttr str;
  262. mb.read(str);
  263. StringBuffer url;
  264. PrintLog("(6) Received '%s' from %s",str.get(),mb.getSender().getUrlStr(url).str());
  265. }
  266. else if (group->rank()==0) {
  267. rank_t r;
  268. comm->recv(mb,RANK_ALL,MPTAG_TEST,&r);
  269. mb.read(str);
  270. PrintLog("(6) - str = <%s>", str.get());
  271. assertex(strcmp(str.get(),"Test")==0);
  272. mb.clear();
  273. mb.append("Hello - Test6");
  274. printf("crash now!");
  275. Sleep(1);
  276. comm->reply(mb);
  277. }
  278. comm->barrier();
  279. }
  280. void Test7(IGroup *group,ICommunicator *comm)
  281. {
  282. PrintLog("test7");
  283. CMessageBuffer mb;
  284. if (group->rank()==0) {
  285. mb.append("Hello - Test7");
  286. mb.reserve(150*1024);
  287. comm->send(mb,1,MPTAG_TEST);
  288. }
  289. else if (group->rank()==1) {
  290. rank_t r;
  291. comm->recv(mb,(mptag_t) TAG_ALL,MPTAG_TEST,&r);
  292. StringAttr str;
  293. mb.read(str);
  294. PrintLog("Received '%s' from rank %d",str.get(),r);
  295. }
  296. comm->barrier();
  297. }
  298. // #define MAXBUFFERSIZE 0x100000
  299. #define MAXBUFFERSIZE 0x10000
  300. struct CRandomBuffer
  301. {
  302. size32_t size;
  303. char buffer[MAXBUFFERSIZE];
  304. unsigned crc;
  305. void fill() {
  306. size = getRandom()%MAXBUFFERSIZE;
  307. // size = 100000;
  308. if (size) {
  309. char c = (char)getRandom();
  310. #if 0
  311. for (unsigned i=0;i<size;i++) {
  312. buffer[i] = c;
  313. c += (c*16);
  314. c += 113;
  315. }
  316. #endif
  317. for (unsigned i=0;i<size;i++) {
  318. buffer[i] = 'a' + i%26;
  319. }
  320. }
  321. crc = crc32(&buffer[0],size,0);
  322. }
  323. bool check()
  324. {
  325. int errs = 50;
  326. if (crc!=crc32(buffer,size,0)) {
  327. PrintLog("**** Error: CRC check failed");
  328. PrintLog("size = %d",size);
  329. char c = buffer[0];
  330. for (unsigned i=1;i<size;i++) {
  331. c += (c*16);
  332. c += 113;
  333. if (buffer[i] != c) {
  334. PrintLog("Failed at %d, expected %02x found %02x %02x %02x %02x %02x %02x %02x %02x",i,(int)(byte)c,(int)(byte)buffer[i],(int)(byte)buffer[i+1],(int)(byte)buffer[i+2],(int)(byte)buffer[i+3],(int)(byte)buffer[i+4],(int)(byte)buffer[i+5],(int)(byte)buffer[i+6],(int)(byte)buffer[i+7]);
  335. if (errs--==0)
  336. break;
  337. }
  338. }
  339. return false;
  340. }
  341. return true;
  342. }
  343. void serialize(MemoryBuffer &mb)
  344. {
  345. // PROGLOG("1serialize: size = %u, length = %u", size, mb.length());
  346. mb.append(size).append(size,buffer).append(crc);
  347. // PROGLOG("2serialize: size = %u, length = %u", size, mb.length());
  348. }
  349. void deserialize(MemoryBuffer &mb)
  350. {
  351. // PROGLOG("1de-serialize: size = %u, length = %u", size, mb.length());
  352. mb.read(size);
  353. // PROGLOG("2de-serialize: size = %u, length = %u", size, mb.length());
  354. mb.read(size,buffer).read(crc);
  355. }
  356. };
  357. void printtrc(char c)
  358. {
  359. static CriticalSection crit;
  360. CriticalBlock block(crit);
  361. printf("%c",c);
  362. }
  363. // #define N 100
  364. #define N 20
  365. void MultiTest(ICommunicator *_comm)
  366. {
  367. class Server: public Thread
  368. {
  369. public:
  370. Owned<ICommunicator> comm;
  371. Server(ICommunicator *_comm) { comm.set(_comm); }
  372. int run()
  373. {
  374. unsigned n=(comm->queryGroup().ordinality()-1)*N;
  375. CMessageBuffer mb;
  376. CRandomBuffer *buff = new CRandomBuffer();
  377. PrintLog("MPTEST: started server, myrank = %d", comm->queryGroup().rank());
  378. try {
  379. while(n--) {
  380. mb.clear();
  381. rank_t rr;
  382. if (!comm->recv(mb,RANK_ALL,MPTAG_TEST,&rr))
  383. break;
  384. PrintLog("MPTEST: Received from %d, len = %d",rr, mb.length());
  385. StringBuffer str;
  386. comm->queryGroup().queryNode(rr).endpoint().getUrlStr(str);
  387. // PrintLog("MPTEST: Received from %s",str.str());
  388. buff->deserialize(mb);
  389. #ifdef DO_CRC_CHECK
  390. if (!buff->check())
  391. PrintLog("MPTEST: Received from %s",str.str());
  392. #endif
  393. mb.clear().append(buff->crc);
  394. int delay = getRandom() % 20;
  395. Sleep(delay);
  396. comm->reply(mb);
  397. }
  398. }
  399. catch (IException *e) {
  400. pexception("Server Exception",e);
  401. }
  402. comm->barrier(); // MCK
  403. PrintLog("MPTEST: stopped server");
  404. delete buff;
  405. return 0;
  406. }
  407. } server(_comm);
  408. Owned<ICommunicator> comm;
  409. comm.set(_comm);
  410. server.start();
  411. CMessageBuffer mb;
  412. CRandomBuffer *buff = new CRandomBuffer();
  413. unsigned nr = comm->queryGroup().ordinality();
  414. unsigned n=(nr-1)*N;
  415. rank_t r = comm->queryGroup().rank();
  416. rank_t *targets = new rank_t[n];
  417. rank_t *t = targets;
  418. rank_t i;
  419. for (i=0;i<nr;i++)
  420. if (i!=r)
  421. for (unsigned j=0;j<N;j++)
  422. *(t++) = i;
  423. unsigned k=n;
  424. while (k>1) {
  425. i = getRandom()%k; // NB n is correct here
  426. k--;
  427. unsigned t = targets[i];
  428. targets[i] = targets[k];
  429. targets[k] = t;
  430. }
  431. PrintLog("MPTEST: client started, myrank = %d", comm->queryGroup().rank());
  432. try {
  433. while (n--) {
  434. buff->fill();
  435. buff->serialize(mb.clear());
  436. #if 0
  437. StringBuffer str;
  438. comm->queryGroup().queryNode(targets[n]).endpoint().getUrlStr(str);
  439. PrintLog("MPTEST: Sending to %s, length=%u",str.str(), mb.length());
  440. #endif
  441. PrintLog("MPTEST: Sending to %d, length=%u", targets[n], mb.length());
  442. if (!comm->sendRecv(mb,targets[n],MPTAG_TEST))
  443. break;
  444. // Sleep((n+1)*2000);
  445. // PrintLog("MPTEST: Sent to %s",str.str());
  446. unsigned crc;
  447. mb.read(crc);
  448. assertex(crc==buff->crc);
  449. }
  450. }
  451. catch (IException *e) {
  452. pexception("Client Exception",e);
  453. }
  454. PrintLog("MPTEST: client finished");
  455. server.join();
  456. delete [] targets;
  457. delete buff;
  458. }
  459. void MPITest(IGroup *group, ICommunicator *mpicomm)
  460. {
  461. CMessageBuffer mb;
  462. CMessageBuffer mb2;
  463. int myrank = group->rank();
  464. int numranks = group->ordinality();
  465. int rnksumtotal = 0;
  466. for(int i=0;i<numranks;i++)
  467. rnksumtotal += (i+1);
  468. PrintLog("MPTEST: MPITest myrank=%d numranks=%d rnksumtotal=%d", myrank, numranks, rnksumtotal);
  469. // send and recv to/from all others without a send/recv deadlock ...
  470. mb.clear();
  471. mb.append(myrank+1);
  472. rank_t r;
  473. int rankval;
  474. int ranksum = myrank+1;
  475. int left, right;
  476. if (numranks == 2)
  477. {
  478. if (myrank == 0)
  479. {
  480. left = 1;
  481. right = 1;
  482. PrintLog("MPTEST: MPITest: %d send to rank %d", myrank, right);
  483. mpicomm->send(mb,right,MPTAG_TEST);
  484. mb2.clear();
  485. PrintLog("MPTEST: MPITest: %d recv from rank %d", myrank, left);
  486. mpicomm->recv(mb2,left,MPTAG_TEST,&r);
  487. mb2.read(rankval);
  488. ranksum += rankval;
  489. }
  490. else
  491. {
  492. left = 0;
  493. right = 0;
  494. mb2.clear();
  495. PrintLog("MPTEST: MPITest: %d recv from rank %d", myrank, left);
  496. mpicomm->recv(mb2,left,MPTAG_TEST,&r);
  497. mb2.read(rankval);
  498. ranksum += rankval;
  499. PrintLog("MPTEST: MPITest: %d send to rank %d", myrank, right);
  500. mpicomm->send(mb,right,MPTAG_TEST);
  501. }
  502. }
  503. else if (numranks > 2)
  504. {
  505. int m = 0;
  506. while (m < (numranks - 1))
  507. {
  508. int rankid = 0;
  509. while (rankid < numranks)
  510. {
  511. left = rankid - 1 - m;
  512. if (left < 0)
  513. left = numranks + left;
  514. right = rankid + 1 + m;
  515. if (right >= numranks)
  516. right = right % numranks;
  517. if (rankid == myrank)
  518. {
  519. if (rankid == 0)
  520. {
  521. PrintLog("MPTEST: MPITest: %d send to rank %d", myrank, right);
  522. mpicomm->send(mb,right,MPTAG_TEST);
  523. mb2.clear();
  524. PrintLog("MPTEST: MPITest: %d recv from rank %d", myrank, left);
  525. mpicomm->recv(mb2,left,MPTAG_TEST,&r);
  526. mb2.read(rankval);
  527. ranksum += rankval;
  528. }
  529. else
  530. {
  531. mb2.clear();
  532. PrintLog("MPTEST: MPITest: %d recv from rank %d", myrank, left);
  533. mpicomm->recv(mb2,left,MPTAG_TEST,&r);
  534. mb2.read(rankval);
  535. ranksum += rankval;
  536. PrintLog("MPTEST: MPITest: %d send to rank %d", myrank, right);
  537. mpicomm->send(mb,right,MPTAG_TEST);
  538. }
  539. }
  540. rankid++;
  541. }
  542. m++;
  543. }
  544. }
  545. PrintLog("MPTEST: MPITest: ranksum = %d", ranksum);
  546. assertex(rnksumtotal==ranksum);
  547. mpicomm->barrier();
  548. return;
  549. }
  550. void MPITest2(IGroup *group, ICommunicator *mpicomm)
  551. {
  552. int myrank = group->rank();
  553. int numranks = group->ordinality();
  554. PrintLog("MPTEST: MPITest2: myrank=%d numranks=%d", myrank, numranks);
  555. mpicomm->barrier();
  556. return;
  557. }
  558. void testIPnodeHash()
  559. {
  560. setNodeCaching(true);
  561. class casyncfor: public CAsyncFor
  562. {
  563. public:
  564. casyncfor()
  565. {
  566. }
  567. void Do(unsigned i)
  568. {
  569. StringBuffer ips;
  570. ips.appendf("%d.%d.%d.%d",i/256,1,2,getRandom()%10);
  571. SocketEndpoint ep(ips.str());
  572. try {
  573. Owned<INode> node = createINode(ep);
  574. }
  575. catch (IException *e)
  576. {
  577. EXCLOG(e,"failed");
  578. }
  579. }
  580. } afor;
  581. afor.For(100000,10);
  582. }
  583. int main(int argc, char* argv[])
  584. {
  585. InitModuleObjects();
  586. EnableSEHtoExceptionMapping();
  587. // startMPServer(9123);
  588. // testIPnodeHash();
  589. // stopMPServer();
  590. // return 0;
  591. #ifndef MYMACHINES
  592. if (argc<3) {
  593. printf("\nMPTEST: Usage: %s <myport> [-f <file> | <ip:port> <ip:port> ...]\n\n", argv[0]);
  594. return 0;
  595. }
  596. #endif
  597. try {
  598. EnableSEHtoExceptionMapping();
  599. StringBuffer lf;
  600. // PrintLog("MPTEST Starting");
  601. #ifndef MYMACHINES
  602. int num_nodes = 0;
  603. int my_port = atoi(argv[1]);
  604. char logfile[256] = { "" };
  605. sprintf(logfile,"mptest-%d.log",my_port);
  606. // openLogFile(lf, logfile);
  607. PrintLog("MPTEST: Starting %d", my_port);
  608. startMPServer(my_port);
  609. INode *nodes[1000];
  610. const char * argfile = nullptr;
  611. if (argc > 3)
  612. {
  613. if (strcmp(argv[2], "-f") == 0)
  614. argfile = argv[3];
  615. }
  616. int i = 1;
  617. if (argfile)
  618. {
  619. char hoststr[256] = { "" };
  620. FILE *fp = fopen(argfile, "r");
  621. if (fp == NULL)
  622. {
  623. PrintLog("MPTest: Error cannot open file <%s>", argfile);
  624. return 1;
  625. }
  626. char line[256] = { "" };
  627. while(fgets(line, 255, fp) != NULL)
  628. {
  629. int srtn = sscanf(line,"%s",hoststr);
  630. if (srtn == 1 && line[0] != '#')
  631. {
  632. PrintLog("MPTEST: adding node %d, port = <%s>", i-1, hoststr);
  633. nodes[i-1] = createINode(hoststr, my_port);
  634. i++;
  635. }
  636. }
  637. fclose(fp);
  638. }
  639. else
  640. {
  641. while (i+1 < argc && i-1 < 1000) {
  642. PrintLog("MPTEST: adding node %d, port = <%s>", i-1, argv[i+1]);
  643. nodes[i-1] = createINode(argv[i+1], my_port);
  644. i++;
  645. }
  646. }
  647. PrintLog("MPTEST: num_nodes = %d", i-1);
  648. IGroup *group = createIGroup(i-1,nodes);
  649. #else
  650. openLogFile(lf, "mptest.log");
  651. startMPServer(MPPORT);
  652. IGroup *group = createIGroup(MYMACHINES,MPPORT);
  653. #endif
  654. #ifdef STREAMTEST
  655. ICommunicator * mpicomm = createCommunicator(group);
  656. StreamTest(group,mpicomm);
  657. mpicomm->Release();
  658. #else
  659. # ifdef MULTITEST
  660. ICommunicator * mpicomm = createCommunicator(group);
  661. MultiTest(mpicomm);
  662. mpicomm->Release();
  663. # else
  664. # ifdef MPITEST
  665. ICommunicator * mpicomm = createCommunicator(group);
  666. MPITest(group, mpicomm);
  667. mpicomm->Release();
  668. # else
  669. # ifdef MPITEST2
  670. ICommunicator * mpicomm = createCommunicator(group);
  671. MPITest2(group, mpicomm);
  672. mpicomm->Release();
  673. # else
  674. ICommunicator * comm = createCommunicator(group);
  675. for (unsigned i = 0;i<1;i++) {
  676. Test1(group,comm);
  677. PrintLog("MPTEST: test1 done, waiting"); Sleep(aWhile);
  678. Test2(group,comm);
  679. PrintLog("MPTEST: test2 done, waiting"); Sleep(aWhile);
  680. Test3(group,comm);
  681. PrintLog("MPTEST: test3 done, waiting"); Sleep(aWhile);
  682. Test4(group,comm);
  683. PrintLog("MPTEST: test4 done, waiting"); Sleep(aWhile);
  684. Test5(group,comm);
  685. PrintLog("MPTEST: test5 done, waiting"); Sleep(aWhile);
  686. Test6(group,comm);
  687. PrintLog("MPTEST: test6 done, waiting"); Sleep(aWhile);
  688. Test7(group,comm);
  689. PrintLog("MPTEST: test7 done, waiting"); Sleep(aWhile);
  690. }
  691. comm->Release();
  692. # endif
  693. # endif
  694. # endif
  695. #endif
  696. group->Release();
  697. #ifndef MYMACHINES
  698. for (int i=0;i<num_nodes;i++)
  699. nodes[i]->Release();
  700. #endif
  701. stopMPServer();
  702. }
  703. catch (IException *e) {
  704. pexception("Exception",e);
  705. }
  706. PrintLog("MPTEST: bye");
  707. return 0;
  708. }