uttest.cpp 45 KB


  1. /*##############################################################################
  2. HPCC SYSTEMS software Copyright (C) 2012 HPCC Systems®.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. ############################################################################## */
  13. ///* simple test
  14. #include "udplib.hpp"
  15. #include "roxiemem.hpp"
  16. //#include "udptrr.hpp"
  17. //#include "udptrs.hpp"
  18. #include "jthread.hpp"
  19. #include "jsocket.hpp"
  20. #include "jsem.hpp"
  21. #include "jdebug.hpp"
  22. #include <time.h>
  23. /*=============================================================================================
  24. Findings:
  25. - Detect gaps in incoming sequence
  26. - Implement then test lost packet resending
  27. - Probably worth special casing self->self comms. (later)
  28. */
  29. #if 1
  30. void usage()
  31. {
  32. printf("USAGE: uttest [options] iprange\n");
  33. printf("Options are:\n");
  34. printf(
  35. "--jumboFrames\n"
  36. "--useAeron\n"
  37. "--udpLocalWriteSocketSize nn\n"
  38. "--udpRetryBusySenders nn\n"
  39. "--maxPacketsPerSender nn\n"
  40. "--udpQueueSize nn\n"
  41. "--udpRTSTimeout nn\n"
  42. "--udpSnifferEnabled 0|1\n"
  43. "--udpTraceCategories nn\n"
  44. "--udpTraceLevel nn\n"
  45. "--dontSendToSelf\n"
  46. "--sendSize nnMB\n"
  47. "--rawSpeedTest\n"
  48. "--rawBufferSize nn\n"
  49. );
  50. ExitModuleObjects();
  51. releaseAtoms();
  52. exit(1);
  53. }
  54. const char *multicastIPStr = "239.1.1.1";
  55. IpAddress multicastIP(multicastIPStr);
  56. unsigned udpNumQs = 1;
  57. unsigned numNodes;
  58. unsigned myIndex;
  59. unsigned udpQueueSize = 100;
  60. unsigned maxPacketsPerSender = 0x7fffffff;
  61. bool sending = true;
  62. bool receiving = true;
  63. bool dontSendToSelf = false;
  64. offset_t sendSize = 0;
  65. bool doSortSimulator = false;
  66. bool simpleSequential = true;
  67. float slowNodeSkew = 1.0;
  68. unsigned numSortSlaves = 50;
  69. bool useAeron = false;
  70. bool doRawTest = false;
  71. unsigned rawBufferSize = 1024;
  72. unsigned rowSize = 100; // MORE - take params
  73. bool variableRows = true;
  74. unsigned maxMessageSize=10000;
  75. bool incrementRowSize = variableRows;
  76. unsigned maxRowSize=5000;
  77. unsigned minRowSize=1;
  78. bool readRows = true;
  79. IpAddressArray allNodes;
  80. struct TestHeader
  81. {
  82. unsigned sequence;
  83. unsigned nodeIndex;
  84. };
  85. class SendAsFastAsPossible : public Thread
  86. {
  87. ISocket *flowSocket;
  88. unsigned size;
  89. static SpinLock ratelock;
  90. static unsigned lastReport;
  91. static unsigned totalSent;
  92. public:
  93. SendAsFastAsPossible(unsigned port, unsigned sendSize)
  94. {
  95. SocketEndpoint ep(port, allNodes.item(0));
  96. flowSocket = ISocket::udp_connect(ep);
  97. size = sendSize;
  98. }
  99. virtual int run()
  100. {
  101. byte *buffer = new byte[65535];
  102. if (!lastReport)
  103. lastReport = msTick();
  104. for (;;)
  105. {
  106. unsigned lim = (1024 * 1024) / size;
  107. for (unsigned i = 0; i < lim; i++)
  108. flowSocket->write(buffer, size);
  109. SpinBlock b(ratelock);
  110. totalSent += lim * size;
  111. unsigned now = msTick();
  112. unsigned elapsed = now - lastReport;
  113. if (elapsed >= 1000)
  114. {
  115. unsigned rate = (((__int64) totalSent) * 1000) / elapsed;
  116. DBGLOG("%.2f Mbytes/sec", ((float) rate) / (1024*1024));
  117. totalSent = 0;
  118. lastReport = now;
  119. }
  120. }
  121. throwUnexpected(); // loop never terminates, but some compilers complain about missing return without this line
  122. }
  123. };
  124. SpinLock SendAsFastAsPossible::ratelock;
  125. unsigned SendAsFastAsPossible::lastReport = 0;
  126. unsigned SendAsFastAsPossible::totalSent = 0;
  127. class Receiver : public Thread
  128. {
  129. std::atomic<bool> running;
  130. Semaphore started;
  131. offset_t allReceived;
  132. CriticalSection arsect;
  133. public:
  134. Receiver() : Thread("Receiver")
  135. {
  136. running = false;
  137. allReceived = 0;
  138. }
  139. virtual void start()
  140. {
  141. Thread::start();
  142. started.wait();
  143. }
  144. void stop(offset_t torecv)
  145. {
  146. CriticalBlock block(arsect);
  147. while (allReceived<torecv) {
  148. PROGLOG("Waiting for Receiver (%" I64F "d bytes remaining)",torecv-allReceived);
  149. CriticalUnblock unblock(arsect);
  150. Sleep(1000);
  151. }
  152. running = false;
  153. }
  154. virtual int run()
  155. {
  156. Owned<IReceiveManager> rcvMgr;
  157. if (useAeron)
  158. {
  159. SocketEndpoint myEP(7000, myNode.getIpAddress());
  160. rcvMgr.setown(createAeronReceiveManager(myEP, false));
  161. }
  162. else
  163. rcvMgr.setown(createReceiveManager(7000, 7001, 7002, udpQueueSize, false));
  164. Owned<roxiemem::IRowManager> rowMgr = roxiemem::createRowManager(0, NULL, queryDummyContextLogger(), NULL, false);
  165. Owned<IMessageCollator> collator = rcvMgr->createMessageCollator(rowMgr, 1);
  166. unsigned lastReport = 0;
  167. offset_t receivedTotal = 0;
  168. offset_t lastTotal = 0;
  169. unsigned *received = new unsigned[numNodes];
  170. unsigned *lastSequence = new unsigned[numNodes];
  171. for (unsigned i = 0; i < numNodes; i++)
  172. {
  173. received[i] = 0;
  174. lastSequence[i] = 0;
  175. }
  176. running = true;
  177. started.signal();
  178. unsigned start = msTick();
  179. unsigned lastReceived = start;
  180. while (running)
  181. {
  182. bool dummy;
  183. Owned <IMessageResult> result = collator->getNextResult(2000, dummy);
  184. if (result)
  185. {
  186. if (!lastReport)
  187. {
  188. start = msTick(); // get first message free....
  189. lastReport = msTick();
  190. }
  191. // process data here....
  192. unsigned headerLength;
  193. const TestHeader *header = (const TestHeader *) result->getMessageHeader(headerLength);
  194. assertex (headerLength == sizeof(TestHeader) && header->nodeIndex < numNodes);
  195. if (header->sequence > lastSequence[header->nodeIndex])
  196. {
  197. if (header->sequence != lastSequence[header->nodeIndex]+1)
  198. {
  199. DBGLOG("Missing messages %u-%u from node %u", lastSequence[header->nodeIndex]+1, header->sequence-1, header->nodeIndex);
  200. }
  201. lastSequence[header->nodeIndex] = header->sequence;
  202. }
  203. else
  204. {
  205. DBGLOG("Out-of-sequence message %u from node %u", header->sequence, header->nodeIndex);
  206. if (header->sequence+256 < lastSequence[header->nodeIndex])
  207. {
  208. DBGLOG("Assuming receiver restart");
  209. lastSequence[header->nodeIndex] = header->sequence;
  210. }
  211. }
  212. if (readRows)
  213. {
  214. Owned<IMessageUnpackCursor> cursor = result->getCursor(rowMgr);
  215. for (;;)
  216. {
  217. if (variableRows)
  218. {
  219. RecordLengthType *rowlen = (RecordLengthType *) cursor->getNext(sizeof(RecordLengthType));
  220. if (rowlen)
  221. {
  222. const void *data = cursor->getNext(*rowlen);
  223. // MORE - check contents
  224. received[header->nodeIndex] += *rowlen;
  225. receivedTotal += *rowlen;
  226. allReceived += *rowlen;
  227. ReleaseRoxieRow(rowlen);
  228. ReleaseRoxieRow(data);
  229. }
  230. else
  231. break;
  232. }
  233. else
  234. UNIMPLEMENTED;
  235. }
  236. }
  237. }
  238. lastReceived = msTick();
  239. if (lastReport && (lastReceived - lastReport > 10000))
  240. {
  241. lastReport = lastReceived;
  242. offset_t receivedRecent = receivedTotal - lastTotal;
  243. DBGLOG("Received %" I64F "u bytes, rate = %.2f MB/s", receivedRecent, ((double)receivedRecent)/10485760.0);
  244. for (unsigned i = 0; i < numNodes; i++)
  245. {
  246. DBGLOG(" %u bytes from node %u", received[i], i);
  247. received[i] = 0;
  248. }
  249. DBGLOG("Received %" I64F "u bytes total", receivedTotal);
  250. lastTotal = receivedTotal;
  251. }
  252. }
  253. {
  254. CriticalBlock block(arsect);
  255. double totalRate = (((double)allReceived)/1048576.0)/((lastReceived-start)/1000.0);
  256. DBGLOG("Node %d All Received %" I64F "d bytes, rate = %.2f MB/s", myIndex, allReceived, totalRate);
  257. }
  258. rcvMgr->detachCollator(collator);
  259. delete [] received;
  260. delete [] lastSequence;
  261. return 0;
  262. }
  263. };
  264. void testNxN()
  265. {
  266. if (maxPacketsPerSender > udpQueueSize)
  267. maxPacketsPerSender = udpQueueSize;
  268. Owned <ISendManager> sendMgr;
  269. if (useAeron)
  270. sendMgr.setown(createAeronSendManager(7000, udpNumQs, myNode.getIpAddress(), false));
  271. else
  272. sendMgr.setown(createSendManager(7000, 7001, 7002, 100, udpNumQs, myNode.getIpAddress(), nullptr, false));
  273. Receiver receiver;
  274. IMessagePacker **packers = new IMessagePacker *[numNodes];
  275. unsigned *sequences = new unsigned[numNodes];
  276. for (unsigned i = 0; i < numNodes; i++)
  277. {
  278. sequences[i] = 1;
  279. packers[i] = NULL;
  280. }
  281. DBGLOG("Ready to start");
  282. if (receiving)
  283. {
  284. receiver.start();
  285. if (numNodes > 1)
  286. Sleep(5000);
  287. }
  288. offset_t sentTotal = 0;
  289. offset_t lastTotal = 0;
  290. if (sending)
  291. {
  292. Sleep(5000); // Give receivers a fighting chance
  293. unsigned dest = 0;
  294. unsigned start = msTick();
  295. unsigned last = start;
  296. if (sendSize)
  297. {
  298. unsigned n = dontSendToSelf ? numNodes -1 : numNodes;
  299. sendSize /= 100*n;
  300. sendSize *= 100*n;
  301. }
  302. for (;;)
  303. {
  304. do {
  305. dest++;
  306. if (dest == numNodes)
  307. dest = 0;
  308. }
  309. while (dontSendToSelf&&(dest==myIndex));
  310. if (!packers[dest])
  311. {
  312. TestHeader t = {sequences[dest], myIndex};
  313. ServerIdentifier destServer;
  314. destServer.setIp(allNodes.item(dest));
  315. packers[dest] = sendMgr->createMessagePacker(1, sequences[dest], &t, sizeof(t), destServer, 0);
  316. }
  317. void *row = packers[dest]->getBuffer(rowSize, variableRows);
  318. memset(row, 0xaa, rowSize);
  319. packers[dest]->putBuffer(row, rowSize, variableRows);
  320. if (packers[dest]->size() > maxMessageSize)
  321. {
  322. unsigned now = msTick();
  323. if (now-last>10000) {
  324. offset_t sentRecent = sentTotal - lastTotal;
  325. DBGLOG("Sent %" I64F "d bytes total, rate = %.2f MB/s", sentTotal, (((double)sentTotal)/1048576.0)/((now-start)/1000.0));
  326. DBGLOG("Sent %" I64F "d bytes this period, rate = %.2f MB/s", sentRecent, (((double)sentRecent)/1048576.0)/((now-last)/1000.0));
  327. last = now;
  328. lastTotal = sentTotal;
  329. }
  330. packers[dest]->flush();
  331. packers[dest]->Release();
  332. packers[dest] = NULL;
  333. sequences[dest]++;
  334. }
  335. sentTotal += rowSize;
  336. if (incrementRowSize)
  337. {
  338. rowSize++;
  339. if (rowSize==maxRowSize)
  340. rowSize = minRowSize;
  341. }
  342. if (sendSize && sentTotal>=sendSize)
  343. break;
  344. }
  345. for (unsigned i = 0; i < numNodes; i++)
  346. {
  347. if (packers[i])
  348. packers[i]->flush();
  349. }
  350. DBGLOG("Node %d All Sent %" I64F "d bytes total, rate = %.2f MB/s", myIndex, sentTotal, (((double)sentTotal)/1048576.0)/((msTick()-start)/1000.0));
  351. while (!sendMgr->allDone())
  352. {
  353. DBGLOG("Node %d waiting for queued data to be flushed", myIndex);
  354. Sleep(1000);
  355. }
  356. DBGLOG("All data sent");
  357. }
  358. else if (receiving)
  359. Sleep(3000000);
  360. receiver.stop(sentTotal);
  361. receiver.join();
  362. Sleep(10*1000); // possible receivers may request retries so should leave senders alive for a bit
  363. for (unsigned ii = 0; ii < numNodes; ii++)
  364. {
  365. if (packers[ii])
  366. packers[ii]->Release();
  367. }
  368. delete [] packers;
  369. delete [] sequences;
  370. }
  371. void rawSendTest()
  372. {
  373. unsigned startPort = 7002;
  374. for (unsigned senders = 0; senders < 10; senders++)
  375. {
  376. DBGLOG("Starting sender %d on port %d", senders+1, startPort);
  377. SendAsFastAsPossible *newSender = new SendAsFastAsPossible(startPort++, rawBufferSize);
  378. newSender->start();
  379. Sleep(10000);
  380. }
  381. }
  382. class SortMaster
  383. {
  384. unsigned __int64 receivingMask;
  385. unsigned __int64 sendingMask;
  386. unsigned __int64 *nodeData;
  387. int numSlaves;
  388. CriticalSection masterCrit;
  389. int *nextNode;
  390. Semaphore *receiveSem;
  391. int *receivesCompleted;
  392. public:
  393. SortMaster(int _numSlaves)
  394. {
  395. receivingMask = 0;
  396. sendingMask = 0;
  397. numSlaves = _numSlaves;
  398. nodeData = new unsigned __int64[numSlaves];
  399. for (int i = 0; i < numSlaves; i++)
  400. nodeData[i] = ((unsigned __int64) 1) << i;
  401. nextNode = NULL;
  402. receiveSem = NULL;
  403. receivesCompleted = NULL;
  404. if (simpleSequential)
  405. {
  406. nextNode = new int[numSlaves];
  407. receiveSem = new Semaphore[numSlaves];
  408. for (int i = 0; i < numSlaves; i++)
  409. {
  410. nextNode[i] = i+1;
  411. receiveSem[i].signal();
  412. }
  413. nextNode[numSlaves-1] = 0;
  414. }
  415. else
  416. {
  417. receivesCompleted = new int[numSlaves];
  418. for (int i = 0; i < numSlaves; i++)
  419. receivesCompleted[i] = 0;
  420. }
  421. }
  422. ~SortMaster()
  423. {
  424. delete [] nodeData;
  425. delete [] nextNode;
  426. delete [] receiveSem;
  427. delete [] receivesCompleted;
  428. }
  429. int requestToSend(int sendingNode)
  430. {
  431. int receivingNode = 0;
  432. if (simpleSequential)
  433. {
  434. {
  435. CriticalBlock b(masterCrit);
  436. receivingNode = nextNode[sendingNode];
  437. nextNode[sendingNode]++;
  438. if (nextNode[sendingNode] >= numSlaves)
  439. nextNode[sendingNode] = 0;
  440. sendingMask |= (((unsigned __int64) 1)<<sendingNode);
  441. receivingMask |= (((unsigned __int64) 1)<<receivingNode);
  442. }
  443. receiveSem[receivingNode].wait();
  444. }
  445. else
  446. {
  447. // Nigel's algorithm - find a node that this slave hasn't yet sent to, which is idle, which is furthest behind, and (if poss) which is not currently sending
  448. int bestScore = -1;
  449. while (bestScore == -1)
  450. {
  451. CriticalBlock b(masterCrit);
  452. for (int i = 0; i < numSlaves; i++)
  453. {
  454. if ((nodeData[sendingNode] & (((unsigned __int64) 1) << i)) == 0)
  455. {
  456. // I still need to send to this node
  457. if ((receivingMask & (((unsigned __int64) 1) << i)) == 0)
  458. {
  459. // and it is idle...
  460. int score = 2*receivesCompleted[i];
  461. if ((sendingMask & (((unsigned __int64) 1) << i)) != 0)
  462. score++;
  463. if (score < bestScore || bestScore==-1)
  464. {
  465. bestScore = score;
  466. receivingNode = i;
  467. }
  468. }
  469. }
  470. }
  471. if (bestScore == -1)
  472. {
  473. CriticalUnblock b(masterCrit);
  474. Sleep(10); // MORE - should wait until something changes then retry
  475. }
  476. else
  477. {
  478. sendingMask |= (((unsigned __int64) 1)<<sendingNode);
  479. receivingMask |= (((unsigned __int64) 1)<<receivingNode);
  480. nodeData[sendingNode] |= (((unsigned __int64) 1) << receivingNode);
  481. break;
  482. }
  483. }
  484. }
  485. return receivingNode;
  486. };
  487. void noteTransferStart(int sendingNode, int receivingNode)
  488. {
  489. // Nothing here at the moment - we set the masks in requestToSend to ensure atomicity
  490. };
  491. void noteTransferEnd(int sendingNode, int receivingNode)
  492. {
  493. CriticalBlock b(masterCrit);
  494. sendingMask &= ~(((unsigned __int64) 1)<<sendingNode);
  495. receivingMask &= ~(((unsigned __int64) 1)<<receivingNode);
  496. if (simpleSequential)
  497. receiveSem[receivingNode].signal();
  498. else
  499. receivesCompleted[receivingNode]++;
  500. };
  501. inline int queryNumSlaves() const
  502. {
  503. return numSlaves;
  504. }
  505. };
  506. class SortSlave : public Thread
  507. {
  508. SortMaster *master;
  509. int myIdx;
  510. int slavesDone;
  511. int dataSize(int targetIndex)
  512. {
  513. if (targetIndex==0 && slowNodeSkew)
  514. return (int)(1000 * slowNodeSkew);
  515. else
  516. return 1000;
  517. }
  518. public:
  519. SortSlave()
  520. {
  521. master = NULL;
  522. myIdx = -1;
  523. slavesDone = 0;
  524. }
  525. void init(SortMaster *_master, unsigned _myIdx)
  526. {
  527. master = _master;
  528. myIdx = _myIdx;
  529. slavesDone = 0;
  530. }
  531. void sendTo(unsigned datasize, unsigned slaveIdx)
  532. {
  533. assert(slaveIdx != myIdx);
  534. DBGLOG("Node %d sending %d bytes to node %d", myIdx, datasize, slaveIdx);
  535. master->noteTransferStart(myIdx, slaveIdx);
  536. Sleep(datasize);
  537. master->noteTransferEnd(myIdx, slaveIdx);
  538. }
  539. virtual int run()
  540. {
  541. while (slavesDone < (master->queryNumSlaves() - 1))
  542. {
  543. unsigned nextDest = master->requestToSend(myIdx);
  544. sendTo(dataSize(nextDest), nextDest);
  545. slavesDone++;
  546. }
  547. return 0;
  548. }
  549. };
  550. void sortSimulator()
  551. {
  552. // test out various ideas for determining the order in which n nodes should exchange data....
  553. SortMaster master(numSortSlaves);
  554. SortSlave *slaves = new SortSlave[numSortSlaves];
  555. unsigned start = msTick();
  556. for (unsigned i = 0; i < numSortSlaves; i++)
  557. {
  558. slaves[i].init(&master, i);
  559. slaves[i].start();
  560. }
  561. for (unsigned j = 0; j < numSortSlaves; j++)
  562. {
  563. slaves[j].join();
  564. }
  565. unsigned elapsed = msTick() - start;
  566. DBGLOG("Complete in %d.%03d seconds", elapsed / 1000, elapsed % 1000);
  567. DBGLOG("sequential=%d, skewFactor %f", (int) simpleSequential, slowNodeSkew);
  568. delete[] slaves;
  569. }
  570. int main(int argc, char * argv[] )
  571. {
  572. InitModuleObjects();
  573. if (argc < 2)
  574. usage();
  575. strdup("Make sure leak checking is working");
  576. queryStderrLogMsgHandler()->setMessageFields(MSGFIELD_time | MSGFIELD_thread | MSGFIELD_prefix);
  577. {
  578. Owned<IComponentLogFileCreator> lf = createComponentLogFileCreator("UDPTRANSPORT");
  579. lf->setCreateAliasFile(false);
  580. lf->setRolling(false);
  581. lf->setAppend(false);
  582. lf->setMaxDetail(TopDetail);
  583. lf->setMsgFields(MSGFIELD_STANDARD);
  584. lf->beginLogging();
  585. }
  586. StringBuffer cmdline;
  587. int c;
  588. for (c = 0; c < argc; c++) {
  589. if (c)
  590. cmdline.append(' ');
  591. cmdline.append(argv[c]);
  592. }
  593. DBGLOG("%s",cmdline.str());
  594. // queryLogMsgManager()->enterQueueingMode();
  595. // queryLogMsgManager()->setQueueDroppingLimit(512, 32);
  596. for (c = 1; c < argc; c++)
  597. {
  598. const char *ip = argv[c];
  599. const char *dash = strchr(ip, '-');
  600. if (dash==ip)
  601. {
  602. if (strcmp(ip, "--udpQueueSize")==0)
  603. {
  604. c++;
  605. if (c==argc || !isdigit(*argv[c]))
  606. usage();
  607. udpQueueSize = atoi(argv[c]);
  608. }
  609. if (strcmp(ip, "--udpRTSTimeout")==0)
  610. {
  611. c++;
  612. if (c==argc || !isdigit(*argv[c]))
  613. usage();
  614. udpRequestTimeout = atoi(argv[c]);
  615. }
  616. else if (strcmp(ip, "--jumboFrames")==0)
  617. {
  618. roxiemem::setDataAlignmentSize(0x2000);
  619. }
  620. else if (strcmp(ip, "--useAeron")==0)
  621. {
  622. useAeron = true;
  623. }
  624. else if (strcmp(ip, "--rawSpeedTest")==0)
  625. {
  626. doRawTest = true;
  627. }
  628. else if (strcmp(ip, "--udpLocalWriteSocketSize")==0)
  629. {
  630. c++;
  631. if (c==argc)
  632. usage();
  633. udpLocalWriteSocketSize = atoi(argv[c]);
  634. }
  635. else if (strcmp(ip, "--maxPacketsPerSender")==0)
  636. {
  637. c++;
  638. if (c==argc)
  639. usage();
  640. maxPacketsPerSender = atoi(argv[c]);
  641. }
  642. else if (strcmp(ip, "--udpTraceLevel")==0)
  643. {
  644. c++;
  645. if (c==argc)
  646. usage();
  647. udpTraceLevel = atoi(argv[c]);
  648. }
  649. else if (strcmp(ip, "--dontSendToSelf")==0)
  650. {
  651. dontSendToSelf = true;
  652. }
  653. else if (strcmp(ip, "--sortSimulator")==0)
  654. {
  655. doSortSimulator = true;
  656. }
  657. else if (strcmp(ip, "--sendSize")==0)
  658. {
  659. c++;
  660. if (c==argc)
  661. usage();
  662. sendSize = (offset_t)atoi(argv[c])*(offset_t)0x100000;
  663. }
  664. else if (strcmp(ip, "--rawBufferSize")==0)
  665. {
  666. c++;
  667. if (c==argc)
  668. usage();
  669. rawBufferSize = atoi(argv[c]);
  670. }
  671. else
  672. usage();
  673. }
  674. else if (dash && isdigit(dash[1]) && dash>ip && isdigit(dash[-1]))
  675. {
  676. const char *startrange = dash-1;
  677. while (isdigit(startrange[-1]))
  678. startrange--;
  679. char *endptr;
  680. unsigned firstnum = atoi(startrange);
  681. unsigned lastnum = strtol(dash+1, &endptr, 10);
  682. while (firstnum <= lastnum)
  683. {
  684. StringBuffer ipstr;
  685. ipstr.append(startrange - ip, ip).append(firstnum).append(endptr);
  686. const IpAddress nodeIP(ipstr);
  687. allNodes.append(nodeIP);
  688. nodeIP.getIpText(ipstr.clear());
  689. printf("Added node %s\n", ipstr.str());
  690. firstnum++;
  691. }
  692. }
  693. else
  694. {
  695. const IpAddress nodeIP(ip);
  696. allNodes.append(nodeIP);
  697. printf("Added node %s\n", ip);
  698. }
  699. }
  700. if (doRawTest)
  701. rawSendTest();
  702. else if (doSortSimulator)
  703. sortSimulator();
  704. else
  705. {
  706. numNodes = allNodes.ordinality();
  707. myNode.setIp(IpAddress("."));
  708. myIndex = numNodes;
  709. ForEachItemIn(idx, allNodes)
  710. {
  711. if (allNodes.item(idx).ipequals(myNode.getIpAddress()))
  712. {
  713. myIndex = idx;
  714. break;
  715. }
  716. }
  717. if (myIndex >= numNodes)
  718. {
  719. printf("ERROR: my ip does not appear to be in range\n");
  720. usage();
  721. }
  722. roxiemem::setTotalMemoryLimit(false, true, false, 1048576000, 0, NULL, NULL);
  723. testNxN();
  724. roxiemem::releaseRoxieHeap();
  725. }
  726. ExitModuleObjects();
  727. releaseAtoms();
  728. return 0;
  729. }
  730. #else
  731. // Ole's old test - look at sometime!
  732. #define MAX_PACKERS 10
  733. #define MAX_PACKETS 20
  734. struct PackerInfo {
  735. unsigned numPackets;
  736. unsigned packetsSizes[MAX_PACKETS];
  737. };
  738. char *progName;
  739. bool noendwait = false;
  740. unsigned thisTrace = 1;
  741. unsigned modeType = 0;
  742. unsigned myIndex = 0;
  743. unsigned destA = 0;
  744. unsigned destB = 0;
  745. char *multiCast = "239.1.1.2";
  746. unsigned udpNumQs = 3;
  747. unsigned numPackers = 2;
  748. unsigned numSizes = 4;
  749. unsigned numSends = 10;
  750. unsigned initSize = 100;
  751. unsigned sizeMulti = 2;
  752. unsigned delayPackers = 0;
  753. unsigned getUnpackerTimeout = 10000;
  754. unsigned packerHdrSize = 32;
  755. struct PackerInfo packersInfo[MAX_PACKERS]; // list of packers info, if used. each is alist of sizes (msgs).
  756. unsigned numPackersInfo = 0;
  757. void usage(char *err = NULL)
  758. {
  759. if (err) fprintf(stderr, "Usage Error: %s\n", err);
  760. fprintf(stderr, "Usage: %s [ -send [-destA IP] [-destB IP] ] [-receive]\n", progName);
  761. fprintf(stderr, " [-multiCast IP] [-udpTimeout sec] [-udpMaxTimeouts val]\n");
  762. fprintf(stderr, " [-udpNumQs val] [-udpQsPriority val] [-packerHdrSize val]\n");
  763. fprintf(stderr, " [-numPackers val] [-numSizes val] [-numSends val]\n");
  764. fprintf(stderr, " [-initSize val] [-sizeMulti val] [-delayPackers msec]\n");
  765. fprintf(stderr, " [-udpTrace val] [-thisTrace val] [-noendwait]\n");
  766. fprintf(stderr, " [-send] : Sets the mode to sender mode (i.e roxie slave like) <default dual mode>\n");
  767. fprintf(stderr, " [-receive] : Sets the mode to receiver mode (i.e roxie server like) <default dual mode>\n");
  768. fprintf(stderr, " [-destA IP] : Sets the sender destination ip address to IP (i.e roxie server IP) <default to local host>\n");
  769. fprintf(stderr, " [-destB IP] : Sets the sender second destination ip address to IP <default no sec dest>\n");
  770. fprintf(stderr, " [-multiCast IP] : Sets the sniffer multicast ip address to IP <default %s>\n", multiCast);
  771. fprintf(stderr, " [-udpTimeout msec] : Sets the sender udpRequestToSendTimeout value <default %i>\n", udpRequestToSendTimeout);
  772. fprintf(stderr, " [-udpNumQs val] : Sets the sender's number of output queues <default %i>\n", udpNumQs);
  773. fprintf(stderr, " [-udpQsPriority val] : Sets the sender's output queues priority udpQsPriority <default %i>\n", udpOutQsPriority);
  774. fprintf(stderr, " [-packerHdrSize val] : Sets the packers header size (like RoxieHeader) <default %i>\n", packerHdrSize);
  775. fprintf(stderr, " [-numPackers val] : Sets the number of packers/unpackers to create/expect <default %i>\n", numPackers);
  776. fprintf(stderr, " [-packers val vale .]: Sets a packer specific packet sizes, this option can be repeated as many packers as needed\n");
  777. fprintf(stderr, " [-numSizes val] : Sets the number of packet data sizes to try sending/receiving <default %i>\n", numSizes);
  778. fprintf(stderr, " [-numSends val]] : Sets the number of msgs per size per packer to send <default %i>\n", numSends);
  779. fprintf(stderr, " [-initSize val] : Sets the size of the first msg(s) per packer to send <default %i>\n", initSize);
  780. fprintf(stderr, " [-sizeMulti val] : Sets the multiplier value of the size of subsequent msgs per packer <default %i>\n", sizeMulti);
  781. fprintf(stderr, " [-delayPackers msec] : Sets the delay value between sent packers (simulate roxie server/slave) <default %i>\n", delayPackers);
  782. fprintf(stderr, " [-getUnpackerTimeout msec] : Sets the timeout value used when calling getNextUnpacker <default %i>\n", getUnpackerTimeout);
  783. fprintf(stderr, " [-thisTrace val] : Sets the trace level of this program <default %i>\n", thisTrace);
  784. fprintf(stderr, " [-udpTrace val] : Sets the udpTraveLevel value <default %i>\n", udpTraceLevel);
  785. fprintf(stderr,"\n\nEnter q to terminate program : ");
  786. fflush(stdout);
  787. char tmpBuf[10]; scanf("%s", tmpBuf);
  788. exit(1);
  789. }
  790. #define SND_MODE_BIT 0x01
  791. #define RCV_MODE_BIT 0x02
  792. int main(int argc, char * argv[] )
  793. {
  794. InitModuleObjects();
  795. progName = argv[0];
  796. destA = myIndex = addRoxieNode(GetCachedHostName());
  797. udpOutQsPriority = 5;
  798. udpTraceLevel = 1;
  799. setTotalMemoryLimit(104857600);
  800. char errBuff[100];
  801. for (int i = 1; i < argc; i++)
  802. {
  803. if (*argv[i] == '-')
  804. {
  805. if(stricmp(argv[i]+1,"send")==0)
  806. modeType |= SND_MODE_BIT;
  807. else if(stricmp(argv[i]+1,"receive")==0)
  808. modeType |= RCV_MODE_BIT;
  809. else if(stricmp(argv[i]+1,"noendwait")==0)
  810. noendwait = true;
  811. else if(stricmp(argv[i]+1,"destA")==0)
  812. {
  813. if (i+1 < argc)
  814. {
  815. destA = addRoxieNode(argv[++i]);
  816. }
  817. else
  818. {
  819. sprintf(errBuff,"Missing IP address after \"%s\"", argv[i]);
  820. usage(errBuff);
  821. }
  822. }
  823. else if(stricmp(argv[i]+1,"destB")==0)
  824. {
  825. if (i+1 < argc)
  826. {
  827. destB = addRoxieNode(argv[++i]);
  828. }
  829. else
  830. {
  831. sprintf(errBuff,"Missing IP address after \"%s\"", argv[i]);
  832. usage(errBuff);
  833. }
  834. }
  835. else if(stricmp(argv[i]+1,"multiCast")==0)
  836. {
  837. if (++i < argc)
  838. {
  839. multiCast = argv[i];
  840. }
  841. else
  842. {
  843. sprintf(errBuff,"Missing IP address after \"%s\"", argv[i-1]);
  844. usage(errBuff);
  845. }
  846. }
  847. else if(stricmp(argv[i]+1,"udpTimeout")==0)
  848. {
  849. if (++i < argc)
  850. {
  851. udpRequestTimeout = atoi(argv[i]);
  852. }
  853. else
  854. {
  855. sprintf(errBuff,"Missing value after \"%s\"", argv[i-1]);
  856. usage(errBuff);
  857. }
  858. }
  859. else if(stricmp(argv[i]+1,"udpNumQs")==0)
  860. {
  861. if (++i < argc)
  862. {
  863. udpNumQs = atoi(argv[i]);
  864. }
  865. else
  866. {
  867. sprintf(errBuff,"Missing value after \"%s\"", argv[i-1]);
  868. usage(errBuff);
  869. }
  870. }
  871. else if(stricmp(argv[i]+1,"udpQsPriority")==0)
  872. {
  873. if (++i < argc)
  874. {
  875. udpOutQsPriority = atoi(argv[i]);
  876. }
  877. else
  878. {
  879. sprintf(errBuff,"Missing value after \"%s\"", argv[i-1]);
  880. usage(errBuff);
  881. }
  882. }
  883. else if(stricmp(argv[i]+1,"packerHdrSize")==0)
  884. {
  885. if (++i < argc)
  886. {
  887. packerHdrSize = atoi(argv[i]);
  888. }
  889. else
  890. {
  891. sprintf(errBuff,"Missing value after \"%s\"", argv[i-1]);
  892. usage(errBuff);
  893. }
  894. }
  895. else if(stricmp(argv[i]+1,"numPackers")==0)
  896. {
  897. if (++i < argc)
  898. {
  899. numPackers = atoi(argv[i]);
  900. }
  901. else
  902. {
  903. sprintf(errBuff,"Missing value after \"%s\"", argv[i-1]);
  904. usage(errBuff);
  905. }
  906. }
  907. else if(stricmp(argv[i]+1,"packer")==0)
  908. {
  909. if (numPackersInfo >= MAX_PACKERS)
  910. {
  911. sprintf(errBuff,"Too many packers are listed - max=%i", MAX_PACKERS);
  912. usage(errBuff);
  913. }
  914. struct PackerInfo &packerInfo = packersInfo[numPackersInfo];
  915. packerInfo.numPackets = 0;
  916. while ((++i < argc) && (*argv[i] != '-'))
  917. {
  918. if (packerInfo.numPackets >= MAX_PACKETS)
  919. {
  920. sprintf(errBuff,"Too many packets in packer - max=%i", MAX_PACKETS);
  921. usage(errBuff);
  922. }
  923. packerInfo.packetsSizes[packerInfo.numPackets] = atoi(argv[i]);
  924. packerInfo.numPackets++;
  925. }
  926. if (packerInfo.numPackets == 0)
  927. {
  928. sprintf(errBuff,"Missing packer packets info");
  929. usage(errBuff);
  930. }
  931. --i;
  932. numPackersInfo++;
  933. }
  934. else if(stricmp(argv[i]+1,"numSizes")==0)
  935. {
  936. if (++i < argc)
  937. {
  938. numSizes = atoi(argv[i]);
  939. }
  940. else
  941. {
  942. sprintf(errBuff,"Missing value after \"%s\"", argv[i-1]);
  943. usage(errBuff);
  944. }
  945. }
  946. else if(stricmp(argv[i]+1,"numSends")==0)
  947. {
  948. if (++i < argc)
  949. {
  950. numSends = atoi(argv[i]);
  951. }
  952. else
  953. {
  954. sprintf(errBuff,"Missing value after \"%s\"", argv[i-1]);
  955. usage(errBuff);
  956. }
  957. }
  958. else if(stricmp(argv[i]+1,"initSize")==0)
  959. {
  960. if (++i < argc)
  961. {
  962. initSize = atoi(argv[i]);
  963. }
  964. else
  965. {
  966. sprintf(errBuff,"Missing value after \"%s\"", argv[i-1]);
  967. usage(errBuff);
  968. }
  969. }
  970. else if(stricmp(argv[i]+1,"sizeMulti")==0)
  971. {
  972. if (++i < argc)
  973. {
  974. sizeMulti = atoi(argv[i]);
  975. }
  976. else
  977. {
  978. sprintf(errBuff,"Missing value after \"%s\"", argv[i-1]);
  979. usage(errBuff);
  980. }
  981. }
  982. else if(stricmp(argv[i]+1,"delayPackers")==0)
  983. {
  984. if (++i < argc)
  985. {
  986. delayPackers = atoi(argv[i]);
  987. }
  988. else
  989. {
  990. sprintf(errBuff,"Missing value after \"%s\"", argv[i-1]);
  991. usage(errBuff);
  992. }
  993. }
  994. else if(stricmp(argv[i]+1,"getUnpackerTimeout")==0)
  995. {
  996. if (++i < argc)
  997. {
  998. getUnpackerTimeout = atoi(argv[i]);
  999. }
  1000. else
  1001. {
  1002. sprintf(errBuff,"Missing value after \"%s\"", argv[i-1]);
  1003. usage(errBuff);
  1004. }
  1005. }
  1006. else if(stricmp(argv[i]+1,"thisTrace")==0)
  1007. {
  1008. if (++i < argc)
  1009. {
  1010. thisTrace = atoi(argv[i]);
  1011. }
  1012. else
  1013. {
  1014. sprintf(errBuff,"Missing value after \"%s\"", argv[i-1]);
  1015. usage(errBuff);
  1016. }
  1017. }
  1018. else if(stricmp(argv[i]+1,"udpTrace")==0)
  1019. {
  1020. if (++i < argc)
  1021. {
  1022. udpTraceLevel = atoi(argv[i]);
  1023. }
  1024. else
  1025. {
  1026. sprintf(errBuff,"Missing value after \"%s\"", argv[i-1]);
  1027. usage(errBuff);
  1028. }
  1029. }
  1030. else
  1031. {
  1032. sprintf(errBuff,"Invalid argument option \"%s\"", argv[i]);
  1033. usage(errBuff);
  1034. }
  1035. }
  1036. else
  1037. {
  1038. sprintf(errBuff,"Argument option \"%s\" missing \"-\" ", argv[i]);
  1039. usage(errBuff);
  1040. }
  1041. }
  1042. // default is daul mode (send and receive)
  1043. if (!modeType) modeType = SND_MODE_BIT | RCV_MODE_BIT;
  1044. IReceiveManager *rcvMgr = NULL;
  1045. IRowManager *rowMgr = NULL;
  1046. IMessageCollator *msgCollA = NULL;
  1047. IMessageCollator *msgCollB = NULL;
  1048. ISendManager *sendMgr = NULL;
  1049. if (modeType & RCV_MODE_BIT)
  1050. {
  1051. rcvMgr = createReceiveManager(7000, 7001, 7002, 100, false);
  1052. rowMgr = createRowManager(0, NULL, queryDummyContextLogger(), NULL, false);
  1053. msgCollA = rcvMgr->createMessageCollator(rowMgr, 100);
  1054. if (destB)
  1055. {
  1056. msgCollB = rcvMgr->createMessageCollator(rowMgr, 200);
  1057. }
  1058. Sleep(1000);
  1059. }
  1060. if (modeType & SND_MODE_BIT)
  1061. {
  1062. sendMgr = createSendManager(7000, 7001, 7002, 7003, multiCast, 100, udpNumQs, 100, NULL, myIndex);
  1063. Sleep(5000);
  1064. char locBuff[100000];
  1065. for (unsigned packerNum=0; packerNum < numPackers; packerNum++)
  1066. {
  1067. unsigned totalSize = 0;
  1068. char packAHdr[100];
  1069. char packBHdr[100];
  1070. sprintf(packAHdr,"helloA%i", packerNum);
  1071. if (thisTrace)
  1072. printf("Creating packer - hdrLen=%i header %s\n", packerHdrSize, packAHdr);
  1073. IMessagePacker *msgPackA = sendMgr->createMessagePacker(100, 0, packAHdr, packerHdrSize, destA, 1);
  1074. IMessagePacker *msgPackB = NULL;
  1075. if (destB)
  1076. {
  1077. sprintf(packBHdr,"helloB%i", packerNum);
  1078. if (thisTrace)
  1079. printf("Creating packer - hdrLen=%i header %s\n", packerHdrSize, packBHdr);
  1080. msgPackB = sendMgr->createMessagePacker(200, 0, packBHdr, packerHdrSize, destB, 0);
  1081. }
  1082. unsigned buffSize = initSize;
  1083. int pkIx = packerNum;
  1084. int nmSizes = numSizes;
  1085. if (numPackersInfo)
  1086. {
  1087. if (pkIx >= numPackersInfo) pkIx %= numPackersInfo;
  1088. nmSizes = packersInfo[pkIx].numPackets;
  1089. }
  1090. for (unsigned sizeNum=0; sizeNum < nmSizes; sizeNum++, buffSize *= sizeMulti)
  1091. {
  1092. unsigned nmSends = numSends;
  1093. if (numPackersInfo)
  1094. {
  1095. nmSends = 1;
  1096. buffSize = packersInfo[pkIx].packetsSizes[sizeNum];
  1097. }
  1098. for (unsigned sendNum=0; sendNum < nmSends; sendNum++)
  1099. {
  1100. sprintf(locBuff,"size=%i num=%i multi=%i packer=%i hello world",
  1101. buffSize, sendNum, sizeNum, packerNum);
  1102. if (thisTrace > 1)
  1103. printf("Sending data : %s\n", locBuff);
  1104. char *transBuff = (char*) msgPackA->getBuffer(buffSize, false);
  1105. strncpy(transBuff, locBuff, buffSize);
  1106. msgPackA->putBuffer(transBuff, buffSize, false);
  1107. if (msgPackB)
  1108. {
  1109. transBuff = (char*) msgPackB->getBuffer(buffSize, false);
  1110. strncpy(transBuff, locBuff, buffSize);
  1111. msgPackB->putBuffer(transBuff, buffSize, false);
  1112. }
  1113. totalSize += buffSize;
  1114. }
  1115. }
  1116. msgPackA->flush(true);
  1117. msgPackA->Release();
  1118. if (thisTrace)
  1119. printf("Packer %s total data size = %i\n", packAHdr, totalSize);
  1120. if (msgPackB)
  1121. {
  1122. msgPackB->flush(true);
  1123. msgPackB->Release();
  1124. if (thisTrace)
  1125. printf("Packer %s total data size = \n", packBHdr, totalSize);
  1126. }
  1127. if (delayPackers) Sleep(delayPackers);
  1128. }
  1129. while(!sendMgr->allDone()) Sleep(50);
  1130. }
  1131. if (modeType & RCV_MODE_BIT)
  1132. {
  1133. for (unsigned unpackerNum=0; unpackerNum < numPackers; unpackerNum++)
  1134. {
  1135. bool anyActivity_a;
  1136. bool anyActivity_b;
  1137. IMessageResult *resultA = msgCollA->getNextResult(getUnpackerTimeout, anyActivity_a);
  1138. if (!resultA)
  1139. {
  1140. printf("timeout waiting on msgCollA->getNextResult(%i,..)\n", getUnpackerTimeout);
  1141. }
  1142. IMessageResult *resultB = NULL;
  1143. if (msgCollB)
  1144. {
  1145. resultB = msgCollB->getNextResult(getUnpackerTimeout, anyActivity_b);
  1146. if (!resultB)
  1147. {
  1148. printf("timeout waiting on msgCollB->getNextResult(%i,..)\n", getUnpackerTimeout);
  1149. }
  1150. }
  1151. unsigned len;
  1152. const void *hdr;
  1153. char locBuff[100000];
  1154. if (resultA)
  1155. {
  1156. hdr = resultA->getMessageHeader(len);
  1157. if (thisTrace)
  1158. printf("Got unpacker - hdrLen=%i header %s\n", len, hdr);
  1159. }
  1160. if (resultB)
  1161. {
  1162. hdr = resultB->getMessageHeader(len);
  1163. if (thisTrace)
  1164. printf("Got unpacker - hdrLen=%i header \"%s\"\n", len, hdr);
  1165. }
  1166. if (!resultA && resultB)
  1167. {
  1168. resultA = resultB;
  1169. resultB = NULL;
  1170. }
  1171. if (!resultA) continue;
  1172. Owned<IMessageUnpackCursor> unpackA = resultA->getCursor(rowMgr);
  1173. Owned<IMessageUnpackCursor> unpackB = resultB ? resultB->getCursor(rowMgr) : NULL;
  1174. unsigned totalSize = 0;
  1175. unsigned buffSize = initSize;
  1176. if (unpackerNum)
  1177. {
  1178. int size;
  1179. if (thisTrace)
  1180. printf("Calling getNext() for all data available in packer \"%s\"\n", hdr);
  1181. void * p= unpackA->getNext(0x0ffffffff,&size);
  1182. totalSize += size;
  1183. }
  1184. else
  1185. {
  1186. if (thisTrace)
  1187. printf("Calling getNext() with diff sizes for packer \"%s\"\n", hdr);
  1188. buffSize = initSize;
  1189. int pkIx = unpackerNum;
  1190. int nmSizes = numSizes;
  1191. if (numPackersInfo)
  1192. {
  1193. if (pkIx >= numPackersInfo) pkIx %= numPackersInfo;
  1194. nmSizes = packersInfo[pkIx].numPackets;
  1195. }
  1196. for (unsigned sizeNum=0; sizeNum < nmSizes; sizeNum++, buffSize *= sizeMulti)
  1197. {
  1198. unsigned nmSends = numSends;
  1199. if (numPackersInfo)
  1200. {
  1201. nmSends = 1;
  1202. buffSize = packersInfo[pkIx].packetsSizes[sizeNum];
  1203. }
  1204. for (unsigned sendNum=0; sendNum < nmSends; sendNum++)
  1205. {
  1206. int size;
  1207. void *transBuff= unpackA->getNext(buffSize, &size);
  1208. if (!transBuff)
  1209. {
  1210. if (thisTrace > 1)
  1211. printf("end of data\n");
  1212. }
  1213. else {
  1214. totalSize += size;
  1215. memcpy(locBuff, transBuff, size);
  1216. locBuff[size]=0;
  1217. if (thisTrace > 1)
  1218. printf("Received (for size=%i num=%i multi=%i unpacker=%i) data : %s\n",
  1219. buffSize, sendNum, sizeNum, unpackerNum, locBuff);
  1220. }
  1221. }
  1222. }
  1223. }
  1224. if (thisTrace)
  1225. printf("Unpacker %s total data size = %i\n", hdr, totalSize);
  1226. buffSize=initSize;
  1227. if (thisTrace > 1)
  1228. printf("Trying to read more than written\n");
  1229. void *transBuff = unpackA->getNext(buffSize);
  1230. if (!transBuff)
  1231. {
  1232. if (thisTrace > 1)
  1233. printf("OK: Could not read more than written\n");
  1234. }
  1235. else
  1236. {
  1237. memcpy(locBuff, transBuff, buffSize);
  1238. locBuff[buffSize]=0;
  1239. printf("WARNING: read more than written: (%s)\n", locBuff);
  1240. }
  1241. printf("\n\n\n");
  1242. unpackA->Release();
  1243. if (unpackB) unpackB->Release();
  1244. }
  1245. }
  1246. if (msgCollA)
  1247. {
  1248. rcvMgr->detachCollator(msgCollA);
  1249. msgCollA->Release();
  1250. }
  1251. if (msgCollB)
  1252. {
  1253. rcvMgr->detachCollator(msgCollB);
  1254. msgCollB->Release();
  1255. }
  1256. if (sendMgr) sendMgr->Release();
  1257. if (rcvMgr) rcvMgr->Release();
  1258. if (!noendwait)
  1259. {
  1260. printf("\n\nEnter q to terminate program : ");
  1261. scanf("%s", errBuff);
  1262. }
  1263. return 0;
  1264. }
  1265. #endif