uttest.cpp 45 KB


  1. /*##############################################################################
  2. HPCC SYSTEMS software Copyright (C) 2012 HPCC Systems®.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. ############################################################################## */
  13. ///* simple test
  14. #include "udplib.hpp"
  15. #include "roxiemem.hpp"
  16. //#include "udptrr.hpp"
  17. //#include "udptrs.hpp"
  18. #include "jthread.hpp"
  19. #include "jsocket.hpp"
  20. #include "jsem.hpp"
  21. #include "jdebug.hpp"
  22. #include <time.h>
  23. #if defined(_DEBUG) && defined(_WIN32) && !defined(USING_MPATROL)
  24. #define new new(_NORMAL_BLOCK, __FILE__, __LINE__)
  25. #endif
  26. /*=============================================================================================
  27. Findings:
  28. - Detect gaps in incoming sequence
  29. - Implement then test lost packet resending
  30. - Probably worth special casing self->self comms. (later)
  31. */
  32. #if 1
  33. void usage()
  34. {
  35. printf("USAGE: uttest [options] iprange\n");
  36. printf("Options are:\n");
  37. printf(
  38. "--jumboFrames\n"
  39. "--udpLocalWriteSocketSize nn\n"
  40. "--udpRetryBusySenders nn\n"
  41. "--maxPacketsPerSender nn\n"
  42. "--udpQueueSize nn\n"
  43. "--udpRTSTimeout nn\n"
  44. "--udpSnifferEnabled 0|1\n"
  45. "--udpTraceCategories nn\n"
  46. "--udpTraceLevel nn\n"
  47. "--dontSendToSelf\n"
  48. "--sendSize nnMB\n"
  49. "--rawSpeedTest\n"
  50. "--rawBufferSize nn\n"
  51. );
  52. ExitModuleObjects();
  53. releaseAtoms();
  54. exit(1);
  55. }
  56. const char *multicastIP = "239.1.1.1";
  57. unsigned udpNumQs = 1;
  58. unsigned numNodes;
  59. unsigned myIndex;
  60. unsigned udpQueueSize = 100;
  61. unsigned maxPacketsPerSender = 0x7fffffff;
  62. bool sending = true;
  63. bool receiving = true;
  64. bool dontSendToSelf = false;
  65. offset_t sendSize = 0;
  66. bool doSortSimulator = false;
  67. bool simpleSequential = true;
  68. float slowNodeSkew = 1.0;
  69. unsigned numSortSlaves = 50;
  70. bool doRawTest = false;
  71. unsigned rawBufferSize = 1024;
  72. unsigned rowSize = 100; // MORE - take params
  73. bool variableRows = true;
  74. unsigned maxMessageSize=10000;
  75. bool incrementRowSize = variableRows;
  76. unsigned maxRowSize=5000;
  77. unsigned minRowSize=1;
  78. bool readRows = true;
  79. struct TestHeader
  80. {
  81. unsigned sequence;
  82. unsigned nodeIndex;
  83. };
  84. class SendAsFastAsPossible : public Thread
  85. {
  86. ISocket *flowSocket;
  87. unsigned size;
  88. static SpinLock ratelock;
  89. static unsigned lastReport;
  90. static unsigned totalSent;
  91. public:
  92. SendAsFastAsPossible(unsigned port, unsigned sendSize)
  93. {
  94. SocketEndpoint ep(port, getNodeAddress(0));
  95. flowSocket = ISocket::udp_connect(ep);
  96. size = sendSize;
  97. }
  98. virtual int run()
  99. {
  100. byte *buffer = new byte[65535];
  101. if (!lastReport)
  102. lastReport = msTick();
  103. for (;;)
  104. {
  105. unsigned lim = (1024 * 1024) / size;
  106. for (unsigned i = 0; i < lim; i++)
  107. flowSocket->write(buffer, size);
  108. SpinBlock b(ratelock);
  109. totalSent += lim * size;
  110. unsigned now = msTick();
  111. unsigned elapsed = now - lastReport;
  112. if (elapsed >= 1000)
  113. {
  114. unsigned rate = (((__int64) totalSent) * 1000) / elapsed;
  115. DBGLOG("%.2f Mbytes/sec", ((float) rate) / (1024*1024));
  116. totalSent = 0;
  117. lastReport = now;
  118. }
  119. }
  120. throwUnexpected(); // loop never terminates, but some compilers complain about missing return without this line
  121. }
  122. };
  123. SpinLock SendAsFastAsPossible::ratelock;
  124. unsigned SendAsFastAsPossible::lastReport = 0;
  125. unsigned SendAsFastAsPossible::totalSent = 0;
  126. class Receiver : public Thread
  127. {
  128. bool running;
  129. Semaphore started;
  130. offset_t allReceived;
  131. CriticalSection arsect;
  132. public:
  133. Receiver() : Thread("Receiver")
  134. {
  135. running = false;
  136. allReceived = 0;
  137. }
  138. virtual void start()
  139. {
  140. Thread::start();
  141. started.wait();
  142. }
  143. void stop(offset_t torecv)
  144. {
  145. CriticalBlock block(arsect);
  146. while (allReceived<torecv) {
  147. PROGLOG("Waiting for Receiver (%" I64F "d remaining)",torecv-allReceived);
  148. CriticalUnblock unblock(arsect);
  149. Sleep(1000);
  150. }
  151. running = false;
  152. }
  153. virtual int run()
  154. {
  155. Owned<IReceiveManager> rcvMgr = createReceiveManager(7000, 7001, 7002, 7003, multicastIP, udpQueueSize, maxPacketsPerSender, myIndex);
  156. Owned<roxiemem::IRowManager> rowMgr = roxiemem::createRowManager(0, NULL, queryDummyContextLogger(), NULL);
  157. Owned<IMessageCollator> collator = rcvMgr->createMessageCollator(rowMgr, 1);
  158. unsigned lastReport = 0;
  159. unsigned receivedTotal = 0;
  160. unsigned *received = new unsigned[numNodes];
  161. unsigned *lastSequence = new unsigned[numNodes];
  162. for (unsigned i = 0; i < numNodes; i++)
  163. {
  164. received[i] = 0;
  165. lastSequence[i] = 0;
  166. }
  167. running = true;
  168. started.signal();
  169. unsigned start = msTick();
  170. unsigned lastReceived = start;
  171. while (running)
  172. {
  173. bool dummy;
  174. Owned <IMessageResult> result = collator->getNextResult(2000, dummy);
  175. if (result)
  176. {
  177. if (!lastReport)
  178. {
  179. start = msTick(); // get first message free....
  180. lastReport = msTick();
  181. }
  182. // process data here....
  183. unsigned headerLength;
  184. const TestHeader *header = (const TestHeader *) result->getMessageHeader(headerLength);
  185. assertex (headerLength == sizeof(TestHeader) && header->nodeIndex < numNodes);
  186. if (header->sequence > lastSequence[header->nodeIndex])
  187. {
  188. if (header->sequence != lastSequence[header->nodeIndex]+1)
  189. DBGLOG("Missing messages %u-%u from node %u", lastSequence[header->nodeIndex]+1, header->sequence-1, header->nodeIndex);
  190. lastSequence[header->nodeIndex] = header->sequence;
  191. }
  192. else
  193. {
  194. DBGLOG("Out-of-sequence message %u from node %u", header->sequence, header->nodeIndex);
  195. if (header->sequence+256 < lastSequence[header->nodeIndex])
  196. {
  197. DBGLOG("Assuming receiver restart");
  198. lastSequence[header->nodeIndex] = header->sequence;
  199. }
  200. }
  201. if (readRows)
  202. {
  203. Owned<IMessageUnpackCursor> cursor = result->getCursor(rowMgr);
  204. for (;;)
  205. {
  206. if (variableRows)
  207. {
  208. RecordLengthType *rowlen = (RecordLengthType *) cursor->getNext(sizeof(RecordLengthType));
  209. if (rowlen)
  210. {
  211. const void *data = cursor->getNext(*rowlen);
  212. // MORE - check contents
  213. received[header->nodeIndex] += *rowlen;
  214. receivedTotal += *rowlen;
  215. allReceived += *rowlen;
  216. ReleaseRoxieRow(rowlen);
  217. ReleaseRoxieRow(data);
  218. }
  219. else
  220. break;
  221. }
  222. else
  223. UNIMPLEMENTED;
  224. }
  225. }
  226. }
  227. lastReceived = msTick();
  228. if (lastReport && (lastReceived - lastReport > 10000))
  229. {
  230. lastReport = lastReceived;
  231. DBGLOG("Received %u bytes total, rate = %.2f MB/s", receivedTotal, ((double)receivedTotal)/10485760.0);
  232. for (unsigned i = 0; i < numNodes; i++)
  233. {
  234. DBGLOG(" %u bytes from node %u", received[i], i);
  235. received[i] = 0;
  236. }
  237. receivedTotal = 0;
  238. }
  239. }
  240. {
  241. CriticalBlock block(arsect);
  242. double totalRate = (((double)allReceived)/1048576.0)/((lastReceived-start)/1000.0);
  243. DBGLOG("Node %d All Received %" I64F "d bytes, rate = %.2f MB/s", myIndex, allReceived, totalRate);
  244. }
  245. rcvMgr->detachCollator(collator);
  246. delete [] received;
  247. delete [] lastSequence;
  248. return 0;
  249. }
  250. };
  251. void testNxN()
  252. {
  253. if (maxPacketsPerSender > udpQueueSize)
  254. maxPacketsPerSender = udpQueueSize;
  255. Owned <ISendManager> sendMgr = createSendManager(7000, 7001, 7002, 7003, multicastIP, 100, udpNumQs, NULL, myIndex);
  256. Receiver receiver;
  257. IMessagePacker **packers = new IMessagePacker *[numNodes];
  258. unsigned *sequences = new unsigned[numNodes];
  259. for (unsigned i = 0; i < numNodes; i++)
  260. {
  261. sequences[i] = 1;
  262. packers[i] = NULL;
  263. }
  264. DBGLOG("Ready to start");
  265. if (receiving)
  266. {
  267. receiver.start();
  268. if (numNodes > 1)
  269. Sleep(5000);
  270. }
  271. offset_t sentTotal = 0;
  272. if (sending)
  273. {
  274. Sleep(5000); // Give receivers a fighting chance
  275. unsigned dest = 0;
  276. unsigned start = msTick();
  277. unsigned last = start;
  278. if (sendSize)
  279. {
  280. unsigned n = dontSendToSelf ? numNodes -1 : numNodes;
  281. sendSize /= 100*n;
  282. sendSize *= 100*n;
  283. }
  284. for (;;)
  285. {
  286. do {
  287. dest++;
  288. if (dest == numNodes)
  289. dest = 0;
  290. }
  291. while (dontSendToSelf&&(dest==myIndex));
  292. if (!packers[dest])
  293. {
  294. TestHeader t = {sequences[dest], myIndex};
  295. packers[dest] = sendMgr->createMessagePacker(1, sequences[dest], &t, sizeof(t), dest, 0);
  296. }
  297. void *row = packers[dest]->getBuffer(rowSize, variableRows);
  298. memset(row, 0xaa, rowSize);
  299. packers[dest]->putBuffer(row, rowSize, variableRows);
  300. if (packers[dest]->size() > maxMessageSize)
  301. {
  302. unsigned now = msTick();
  303. if (now-last>10000) {
  304. DBGLOG("Sent %" I64F "d bytes total, rate = %.2f MB/s", sentTotal, (((double)sentTotal)/1048576.0)/((now-start)/1000.0));
  305. last = now;
  306. }
  307. packers[dest]->flush(true);
  308. packers[dest]->Release();
  309. packers[dest] = NULL;
  310. sequences[dest]++;
  311. }
  312. sentTotal += rowSize;
  313. if (incrementRowSize)
  314. {
  315. rowSize++;
  316. if (rowSize==maxRowSize)
  317. rowSize = minRowSize;
  318. }
  319. if (sendSize && sentTotal>=sendSize)
  320. break;
  321. }
  322. for (unsigned i = 0; i < numNodes; i++)
  323. {
  324. if (packers[i])
  325. packers[i]->flush(true);
  326. }
  327. DBGLOG("Node %d All Sent %" I64F "d bytes total, rate = %.2f MB/s", myIndex, sentTotal, (((double)sentTotal)/1048576.0)/((msTick()-start)/1000.0));
  328. while (!sendMgr->allDone())
  329. {
  330. DBGLOG("Node %d waiting for queued data to be flushed", myIndex);
  331. Sleep(1000);
  332. }
  333. DBGLOG("All data sent");
  334. }
  335. else if (receiving)
  336. Sleep(3000000);
  337. receiver.stop(sentTotal);
  338. receiver.join();
  339. Sleep(10*1000); // possible receivers may request retries so should leave senders alive for a bit
  340. for (unsigned ii = 0; ii < numNodes; ii++)
  341. {
  342. if (packers[ii])
  343. packers[ii]->Release();
  344. }
  345. delete [] packers;
  346. delete [] sequences;
  347. }
  348. void rawSendTest()
  349. {
  350. unsigned startPort = 7002;
  351. for (unsigned senders = 0; senders < 10; senders++)
  352. {
  353. DBGLOG("Starting sender %d on port %d", senders+1, startPort);
  354. SendAsFastAsPossible *newSender = new SendAsFastAsPossible(startPort++, rawBufferSize);
  355. newSender->start();
  356. Sleep(10000);
  357. }
  358. }
  359. class SortMaster
  360. {
  361. unsigned __int64 receivingMask;
  362. unsigned __int64 sendingMask;
  363. unsigned __int64 *nodeData;
  364. int numSlaves;
  365. CriticalSection masterCrit;
  366. int *nextNode;
  367. Semaphore *receiveSem;
  368. int *receivesCompleted;
  369. public:
  370. SortMaster(int _numSlaves)
  371. {
  372. receivingMask = 0;
  373. sendingMask = 0;
  374. numSlaves = _numSlaves;
  375. nodeData = new unsigned __int64[numSlaves];
  376. for (int i = 0; i < numSlaves; i++)
  377. nodeData[i] = ((unsigned __int64) 1) << i;
  378. nextNode = NULL;
  379. receiveSem = NULL;
  380. receivesCompleted = NULL;
  381. if (simpleSequential)
  382. {
  383. nextNode = new int[numSlaves];
  384. receiveSem = new Semaphore[numSlaves];
  385. for (int i = 0; i < numSlaves; i++)
  386. {
  387. nextNode[i] = i+1;
  388. receiveSem[i].signal();
  389. }
  390. nextNode[numSlaves-1] = 0;
  391. }
  392. else
  393. {
  394. receivesCompleted = new int[numSlaves];
  395. for (int i = 0; i < numSlaves; i++)
  396. receivesCompleted[i] = 0;
  397. }
  398. }
  399. ~SortMaster()
  400. {
  401. delete [] nodeData;
  402. delete [] nextNode;
  403. delete [] receiveSem;
  404. delete [] receivesCompleted;
  405. }
  406. int requestToSend(int sendingNode)
  407. {
  408. int receivingNode = 0;
  409. if (simpleSequential)
  410. {
  411. {
  412. CriticalBlock b(masterCrit);
  413. receivingNode = nextNode[sendingNode];
  414. nextNode[sendingNode]++;
  415. if (nextNode[sendingNode] >= numSlaves)
  416. nextNode[sendingNode] = 0;
  417. sendingMask |= (((unsigned __int64) 1)<<sendingNode);
  418. receivingMask |= (((unsigned __int64) 1)<<receivingNode);
  419. }
  420. receiveSem[receivingNode].wait();
  421. }
  422. else
  423. {
  424. // Nigel's algorithm - find a node that this slave hasn't yet sent to, which is idle, which is furthest behind, and (if poss) which is not currently sending
  425. int bestScore = -1;
  426. while (bestScore == -1)
  427. {
  428. CriticalBlock b(masterCrit);
  429. for (int i = 0; i < numSlaves; i++)
  430. {
  431. if ((nodeData[sendingNode] & (((unsigned __int64) 1) << i)) == 0)
  432. {
  433. // I still need to send to this node
  434. if ((receivingMask & (((unsigned __int64) 1) << i)) == 0)
  435. {
  436. // and it is idle...
  437. int score = 2*receivesCompleted[i];
  438. if ((sendingMask & (((unsigned __int64) 1) << i)) != 0)
  439. score++;
  440. if (score < bestScore || bestScore==-1)
  441. {
  442. bestScore = score;
  443. receivingNode = i;
  444. }
  445. }
  446. }
  447. }
  448. if (bestScore == -1)
  449. {
  450. CriticalUnblock b(masterCrit);
  451. Sleep(10); // MORE - should wait until something changes then retry
  452. }
  453. else
  454. {
  455. sendingMask |= (((unsigned __int64) 1)<<sendingNode);
  456. receivingMask |= (((unsigned __int64) 1)<<receivingNode);
  457. nodeData[sendingNode] |= (((unsigned __int64) 1) << receivingNode);
  458. break;
  459. }
  460. }
  461. }
  462. return receivingNode;
  463. };
  464. void noteTransferStart(int sendingNode, int receivingNode)
  465. {
  466. // Nothing here at the moment - we set the masks in requestToSend to ensure atomicity
  467. };
  468. void noteTransferEnd(int sendingNode, int receivingNode)
  469. {
  470. CriticalBlock b(masterCrit);
  471. sendingMask &= ~(((unsigned __int64) 1)<<sendingNode);
  472. receivingMask &= ~(((unsigned __int64) 1)<<receivingNode);
  473. if (simpleSequential)
  474. receiveSem[receivingNode].signal();
  475. else
  476. receivesCompleted[receivingNode]++;
  477. };
  478. inline int queryNumSlaves() const
  479. {
  480. return numSlaves;
  481. }
  482. };
  483. class SortSlave : public Thread
  484. {
  485. SortMaster *master;
  486. int myIdx;
  487. int slavesDone;
  488. int dataSize(int targetIndex)
  489. {
  490. if (targetIndex==0 && slowNodeSkew)
  491. return (int)(1000 * slowNodeSkew);
  492. else
  493. return 1000;
  494. }
  495. public:
  496. SortSlave()
  497. {
  498. master = NULL;
  499. myIdx = -1;
  500. slavesDone = 0;
  501. }
  502. void init(SortMaster *_master, unsigned _myIdx)
  503. {
  504. master = _master;
  505. myIdx = _myIdx;
  506. slavesDone = 0;
  507. }
  508. void sendTo(unsigned datasize, unsigned slaveIdx)
  509. {
  510. assert(slaveIdx != myIdx);
  511. DBGLOG("Node %d sending %d bytes to node %d", myIdx, datasize, slaveIdx);
  512. master->noteTransferStart(myIdx, slaveIdx);
  513. Sleep(datasize);
  514. master->noteTransferEnd(myIdx, slaveIdx);
  515. }
  516. virtual int run()
  517. {
  518. while (slavesDone < (master->queryNumSlaves() - 1))
  519. {
  520. unsigned nextDest = master->requestToSend(myIdx);
  521. sendTo(dataSize(nextDest), nextDest);
  522. slavesDone++;
  523. }
  524. return 0;
  525. }
  526. };
  527. void sortSimulator()
  528. {
  529. // test out various ideas for determining the order in which n nodes should exchange data....
  530. SortMaster master(numSortSlaves);
  531. SortSlave *slaves = new SortSlave[numSortSlaves];
  532. unsigned start = msTick();
  533. for (unsigned i = 0; i < numSortSlaves; i++)
  534. {
  535. slaves[i].init(&master, i);
  536. slaves[i].start();
  537. }
  538. for (unsigned j = 0; j < numSortSlaves; j++)
  539. {
  540. slaves[j].join();
  541. }
  542. unsigned elapsed = msTick() - start;
  543. DBGLOG("Complete in %d.%03d seconds", elapsed / 1000, elapsed % 1000);
  544. DBGLOG("sequential=%d, skewFactor %f", (int) simpleSequential, slowNodeSkew);
  545. delete[] slaves;
  546. }
  547. int main(int argc, char * argv[] )
  548. {
  549. InitModuleObjects();
  550. if (argc < 2)
  551. usage();
  552. strdup("Make sure leak checking is working");
  553. queryStderrLogMsgHandler()->setMessageFields(MSGFIELD_time | MSGFIELD_thread | MSGFIELD_prefix);
  554. {
  555. Owned<IComponentLogFileCreator> lf = createComponentLogFileCreator("UDPTRANSPORT");
  556. lf->setCreateAliasFile(false);
  557. lf->setRolling(false);
  558. lf->setAppend(false);
  559. lf->setMaxDetail(TopDetail);
  560. lf->setMsgFields(MSGFIELD_STANDARD);
  561. lf->beginLogging();
  562. }
  563. StringBuffer cmdline;
  564. int c;
  565. for (c = 0; c < argc; c++) {
  566. if (c)
  567. cmdline.append(' ');
  568. cmdline.append(argv[c]);
  569. }
  570. DBGLOG("%s",cmdline.str());
  571. // queryLogMsgManager()->enterQueueingMode();
  572. // queryLogMsgManager()->setQueueDroppingLimit(512, 32);
  573. udpRequestToSendTimeout = 5000;
  574. for (c = 1; c < argc; c++)
  575. {
  576. const char *ip = argv[c];
  577. const char *dash = strchr(ip, '-');
  578. if (dash==ip)
  579. {
  580. if (strcmp(ip, "--udpQueueSize")==0)
  581. {
  582. c++;
  583. if (c==argc || !isdigit(*argv[c]))
  584. usage();
  585. udpQueueSize = atoi(argv[c]);
  586. }
  587. if (strcmp(ip, "--udpRTSTimeout")==0)
  588. {
  589. c++;
  590. if (c==argc || !isdigit(*argv[c]))
  591. usage();
  592. udpRequestToSendTimeout = atoi(argv[c]);
  593. }
  594. else if (strcmp(ip, "--jumboFrames")==0)
  595. {
  596. roxiemem::setDataAlignmentSize(0x2000);
  597. }
  598. else if (strcmp(ip, "--rawSpeedTest")==0)
  599. {
  600. doRawTest = true;
  601. }
  602. else if (strcmp(ip, "--udpLocalWriteSocketSize")==0)
  603. {
  604. c++;
  605. if (c==argc)
  606. usage();
  607. udpLocalWriteSocketSize = atoi(argv[c]);
  608. }
  609. else if (strcmp(ip, "--udpRetryBusySenders")==0)
  610. {
  611. c++;
  612. if (c==argc)
  613. usage();
  614. udpRetryBusySenders = atoi(argv[c]);
  615. }
  616. else if (strcmp(ip, "--maxPacketsPerSender")==0)
  617. {
  618. c++;
  619. if (c==argc)
  620. usage();
  621. maxPacketsPerSender = atoi(argv[c]);
  622. }
  623. else if (strcmp(ip, "--udpSnifferEnabled")==0)
  624. {
  625. c++;
  626. if (c==argc)
  627. usage();
  628. udpSnifferEnabled = atoi(argv[c]) != 0;
  629. }
  630. else if (strcmp(ip, "--udpTraceLevel")==0)
  631. {
  632. c++;
  633. if (c==argc)
  634. usage();
  635. udpTraceLevel = atoi(argv[c]);
  636. }
  637. else if (strcmp(ip, "--udpTraceCategories")==0)
  638. {
  639. c++;
  640. if (c==argc)
  641. usage();
  642. udpTraceCategories = atoi(argv[c]);
  643. }
  644. else if (strcmp(ip, "--dontSendToSelf")==0)
  645. {
  646. dontSendToSelf = true;
  647. }
  648. else if (strcmp(ip, "--sortSimulator")==0)
  649. {
  650. doSortSimulator = true;
  651. }
  652. else if (strcmp(ip, "--sendSize")==0)
  653. {
  654. c++;
  655. if (c==argc)
  656. usage();
  657. sendSize = (offset_t)atoi(argv[c])*(offset_t)0x100000;
  658. }
  659. else if (strcmp(ip, "--rawBufferSize")==0)
  660. {
  661. c++;
  662. if (c==argc)
  663. usage();
  664. rawBufferSize = atoi(argv[c]);
  665. }
  666. else
  667. usage();
  668. }
  669. else if (dash && isdigit(dash[1]) && dash>ip && isdigit(dash[-1]))
  670. {
  671. const char *startrange = dash-1;
  672. while (isdigit(startrange[-1]))
  673. startrange--;
  674. char *endptr;
  675. unsigned firstnum = atoi(startrange);
  676. unsigned lastnum = strtol(dash+1, &endptr, 10);
  677. while (firstnum <= lastnum)
  678. {
  679. StringBuffer ipstr;
  680. ipstr.append(startrange - ip, ip).append(firstnum).append(endptr);
  681. unsigned nodeIdx = addRoxieNode(ipstr.str());
  682. const IpAddress &nodeIP = getNodeAddress(nodeIdx);
  683. nodeIP.getIpText(ipstr.clear());
  684. printf("Node %u is %s\n", nodeIdx, ipstr.str());
  685. firstnum++;
  686. }
  687. }
  688. else
  689. {
  690. StringBuffer ipstr;
  691. unsigned nodeIdx = addRoxieNode(ip);
  692. const IpAddress &nodeIP = getNodeAddress(nodeIdx);
  693. nodeIP.getIpText(ipstr.clear());
  694. printf("Node %u is %s\n", nodeIdx, ipstr.str());
  695. }
  696. }
  697. if (doRawTest)
  698. rawSendTest();
  699. else if (doSortSimulator)
  700. sortSimulator();
  701. else
  702. {
  703. numNodes = getNumNodes();
  704. myIndex = addRoxieNode(GetCachedHostName());
  705. if (myIndex >= numNodes)
  706. {
  707. printf("ERROR: my ip does not appear to be in range\n");
  708. usage();
  709. }
  710. roxiemem::setTotalMemoryLimit(false, true, false, 1048576000, 0, NULL, NULL);
  711. testNxN();
  712. roxiemem::releaseRoxieHeap();
  713. }
  714. ExitModuleObjects();
  715. releaseAtoms();
  716. return 0;
  717. }
  718. #else
  719. // Ole's old test - look at sometime!
  720. #define MAX_PACKERS 10
  721. #define MAX_PACKETS 20
  722. struct PackerInfo {
  723. unsigned numPackets;
  724. unsigned packetsSizes[MAX_PACKETS];
  725. };
  726. char *progName;
  727. bool noendwait = false;
  728. unsigned thisTrace = 1;
  729. unsigned modeType = 0;
  730. unsigned myIndex = 0;
  731. unsigned destA = 0;
  732. unsigned destB = 0;
  733. char *multiCast = "239.1.1.2";
  734. unsigned udpNumQs = 3;
  735. unsigned numPackers = 2;
  736. unsigned numSizes = 4;
  737. unsigned numSends = 10;
  738. unsigned initSize = 100;
  739. unsigned sizeMulti = 2;
  740. unsigned delayPackers = 0;
  741. unsigned getUnpackerTimeout = 10000;
  742. unsigned packerHdrSize = 32;
  743. struct PackerInfo packersInfo[MAX_PACKERS]; // list of packers info, if used. each is alist of sizes (msgs).
  744. unsigned numPackersInfo = 0;
  745. void usage(char *err = NULL)
  746. {
  747. if (err) fprintf(stderr, "Usage Error: %s\n", err);
  748. fprintf(stderr, "Usage: %s [ -send [-destA IP] [-destB IP] ] [-receive]\n", progName);
  749. fprintf(stderr, " [-multiCast IP] [-udpTimeout sec] [-udpMaxTimeouts val]\n");
  750. fprintf(stderr, " [-udpNumQs val] [-udpQsPriority val] [-packerHdrSize val]\n");
  751. fprintf(stderr, " [-numPackers val] [-numSizes val] [-numSends val]\n");
  752. fprintf(stderr, " [-initSize val] [-sizeMulti val] [-delayPackers msec]\n");
  753. fprintf(stderr, " [-udpTrace val] [-thisTrace val] [-noendwait]\n");
  754. fprintf(stderr, " [-send] : Sets the mode to sender mode (i.e roxie slave like) <default dual mode>\n");
  755. fprintf(stderr, " [-receive] : Sets the mode to receiver mode (i.e roxie server like) <default dual mode>\n");
  756. fprintf(stderr, " [-destA IP] : Sets the sender destination ip address to IP (i.e roxie server IP) <default to local host>\n");
  757. fprintf(stderr, " [-destB IP] : Sets the sender second destination ip address to IP <default no sec dest>\n");
  758. fprintf(stderr, " [-multiCast IP] : Sets the sniffer multicast ip address to IP <default %s>\n", multiCast);
  759. fprintf(stderr, " [-udpTimeout msec] : Sets the sender udpRequestToSendTimeout value <default %i>\n", udpRequestToSendTimeout);
  760. fprintf(stderr, " [-udpMaxTimeouts val]: Sets the sender udpMaxRetryTimedoutReqs value <default %i>\n", udpMaxRetryTimedoutReqs);
  761. fprintf(stderr, " [-udpNumQs val] : Sets the sender's number of output queues <default %i>\n", udpNumQs);
  762. fprintf(stderr, " [-udpQsPriority val] : Sets the sender's output queues priority udpQsPriority <default %i>\n", udpOutQsPriority);
  763. fprintf(stderr, " [-packerHdrSize val] : Sets the packers header size (like RoxieHeader) <default %i>\n", packerHdrSize);
  764. fprintf(stderr, " [-numPackers val] : Sets the number of packers/unpackers to create/expect <default %i>\n", numPackers);
  765. fprintf(stderr, " [-packers val vale .]: Sets a packer specific packet sizes, this option can be repeated as many packers as needed\n");
  766. fprintf(stderr, " [-numSizes val] : Sets the number of packet data sizes to try sending/receiving <default %i>\n", numSizes);
  767. fprintf(stderr, " [-numSends val]] : Sets the number of msgs per size per packer to send <default %i>\n", numSends);
  768. fprintf(stderr, " [-initSize val] : Sets the size of the first msg(s) per packer to send <default %i>\n", initSize);
  769. fprintf(stderr, " [-sizeMulti val] : Sets the multiplier value of the size of subsequent msgs per packer <default %i>\n", sizeMulti);
  770. fprintf(stderr, " [-delayPackers msec] : Sets the delay value between sent packers (simulate roxie server/slave) <default %i>\n", delayPackers);
  771. fprintf(stderr, " [-getUnpackerTimeout msec] : Sets the timeout value used when calling getNextUnpacker <default %i>\n", getUnpackerTimeout);
  772. fprintf(stderr, " [-thisTrace val] : Sets the trace level of this program <default %i>\n", thisTrace);
  773. fprintf(stderr, " [-udpTrace val] : Sets the udpTraveLevel value <default %i>\n", udpTraceLevel);
  774. fprintf(stderr,"\n\nEnter q to terminate program : ");
  775. fflush(stdout);
  776. char tmpBuf[10]; scanf("%s", tmpBuf);
  777. exit(1);
  778. }
  779. #define SND_MODE_BIT 0x01
  780. #define RCV_MODE_BIT 0x02
  781. int main(int argc, char * argv[] )
  782. {
  783. InitModuleObjects();
  784. progName = argv[0];
  785. destA = myIndex = addRoxieNode(GetCachedHostName());
  786. udpRequestToSendTimeout = 5000;
  787. udpMaxRetryTimedoutReqs = 3;
  788. udpOutQsPriority = 5;
  789. udpTraceLevel = 1;
  790. setTotalMemoryLimit(104857600);
  791. char errBuff[100];
  792. for (int i = 1; i < argc; i++)
  793. {
  794. if (*argv[i] == '-')
  795. {
  796. if(stricmp(argv[i]+1,"send")==0)
  797. modeType |= SND_MODE_BIT;
  798. else if(stricmp(argv[i]+1,"receive")==0)
  799. modeType |= RCV_MODE_BIT;
  800. else if(stricmp(argv[i]+1,"noendwait")==0)
  801. noendwait = true;
  802. else if(stricmp(argv[i]+1,"destA")==0)
  803. {
  804. if (i+1 < argc)
  805. {
  806. destA = addRoxieNode(argv[++i]);
  807. }
  808. else
  809. {
  810. sprintf(errBuff,"Missing IP address after \"%s\"", argv[i]);
  811. usage(errBuff);
  812. }
  813. }
  814. else if(stricmp(argv[i]+1,"destB")==0)
  815. {
  816. if (i+1 < argc)
  817. {
  818. destB = addRoxieNode(argv[++i]);
  819. }
  820. else
  821. {
  822. sprintf(errBuff,"Missing IP address after \"%s\"", argv[i]);
  823. usage(errBuff);
  824. }
  825. }
  826. else if(stricmp(argv[i]+1,"multiCast")==0)
  827. {
  828. if (++i < argc)
  829. {
  830. multiCast = argv[i];
  831. }
  832. else
  833. {
  834. sprintf(errBuff,"Missing IP address after \"%s\"", argv[i-1]);
  835. usage(errBuff);
  836. }
  837. }
  838. else if(stricmp(argv[i]+1,"udpTimeout")==0)
  839. {
  840. if (++i < argc)
  841. {
  842. udpRequestToSendTimeout = atoi(argv[i]);
  843. }
  844. else
  845. {
  846. sprintf(errBuff,"Missing value after \"%s\"", argv[i-1]);
  847. usage(errBuff);
  848. }
  849. }
  850. else if(stricmp(argv[i]+1,"udpMaxTimeouts")==0)
  851. {
  852. if (++i < argc)
  853. {
  854. udpMaxRetryTimedoutReqs = atoi(argv[i]);
  855. }
  856. else
  857. {
  858. sprintf(errBuff,"Missing value after \"%s\"", argv[i-1]);
  859. usage(errBuff);
  860. }
  861. }
  862. else if(stricmp(argv[i]+1,"udpNumQs")==0)
  863. {
  864. if (++i < argc)
  865. {
  866. udpNumQs = atoi(argv[i]);
  867. }
  868. else
  869. {
  870. sprintf(errBuff,"Missing value after \"%s\"", argv[i-1]);
  871. usage(errBuff);
  872. }
  873. }
  874. else if(stricmp(argv[i]+1,"udpQsPriority")==0)
  875. {
  876. if (++i < argc)
  877. {
  878. udpOutQsPriority = atoi(argv[i]);
  879. }
  880. else
  881. {
  882. sprintf(errBuff,"Missing value after \"%s\"", argv[i-1]);
  883. usage(errBuff);
  884. }
  885. }
  886. else if(stricmp(argv[i]+1,"packerHdrSize")==0)
  887. {
  888. if (++i < argc)
  889. {
  890. packerHdrSize = atoi(argv[i]);
  891. }
  892. else
  893. {
  894. sprintf(errBuff,"Missing value after \"%s\"", argv[i-1]);
  895. usage(errBuff);
  896. }
  897. }
  898. else if(stricmp(argv[i]+1,"numPackers")==0)
  899. {
  900. if (++i < argc)
  901. {
  902. numPackers = atoi(argv[i]);
  903. }
  904. else
  905. {
  906. sprintf(errBuff,"Missing value after \"%s\"", argv[i-1]);
  907. usage(errBuff);
  908. }
  909. }
  910. else if(stricmp(argv[i]+1,"packer")==0)
  911. {
  912. if (numPackersInfo >= MAX_PACKERS)
  913. {
  914. sprintf(errBuff,"Too many packers are listed - max=%i", MAX_PACKERS);
  915. usage(errBuff);
  916. }
  917. struct PackerInfo &packerInfo = packersInfo[numPackersInfo];
  918. packerInfo.numPackets = 0;
  919. while ((++i < argc) && (*argv[i] != '-'))
  920. {
  921. if (packerInfo.numPackets >= MAX_PACKETS)
  922. {
  923. sprintf(errBuff,"Too many packets in packer - max=%i", MAX_PACKETS);
  924. usage(errBuff);
  925. }
  926. packerInfo.packetsSizes[packerInfo.numPackets] = atoi(argv[i]);
  927. packerInfo.numPackets++;
  928. }
  929. if (packerInfo.numPackets == 0)
  930. {
  931. sprintf(errBuff,"Missing packer packets info");
  932. usage(errBuff);
  933. }
  934. --i;
  935. numPackersInfo++;
  936. }
  937. else if(stricmp(argv[i]+1,"numSizes")==0)
  938. {
  939. if (++i < argc)
  940. {
  941. numSizes = atoi(argv[i]);
  942. }
  943. else
  944. {
  945. sprintf(errBuff,"Missing value after \"%s\"", argv[i-1]);
  946. usage(errBuff);
  947. }
  948. }
  949. else if(stricmp(argv[i]+1,"numSends")==0)
  950. {
  951. if (++i < argc)
  952. {
  953. numSends = atoi(argv[i]);
  954. }
  955. else
  956. {
  957. sprintf(errBuff,"Missing value after \"%s\"", argv[i-1]);
  958. usage(errBuff);
  959. }
  960. }
  961. else if(stricmp(argv[i]+1,"initSize")==0)
  962. {
  963. if (++i < argc)
  964. {
  965. initSize = atoi(argv[i]);
  966. }
  967. else
  968. {
  969. sprintf(errBuff,"Missing value after \"%s\"", argv[i-1]);
  970. usage(errBuff);
  971. }
  972. }
  973. else if(stricmp(argv[i]+1,"sizeMulti")==0)
  974. {
  975. if (++i < argc)
  976. {
  977. sizeMulti = atoi(argv[i]);
  978. }
  979. else
  980. {
  981. sprintf(errBuff,"Missing value after \"%s\"", argv[i-1]);
  982. usage(errBuff);
  983. }
  984. }
  985. else if(stricmp(argv[i]+1,"delayPackers")==0)
  986. {
  987. if (++i < argc)
  988. {
  989. delayPackers = atoi(argv[i]);
  990. }
  991. else
  992. {
  993. sprintf(errBuff,"Missing value after \"%s\"", argv[i-1]);
  994. usage(errBuff);
  995. }
  996. }
  997. else if(stricmp(argv[i]+1,"getUnpackerTimeout")==0)
  998. {
  999. if (++i < argc)
  1000. {
  1001. getUnpackerTimeout = atoi(argv[i]);
  1002. }
  1003. else
  1004. {
  1005. sprintf(errBuff,"Missing value after \"%s\"", argv[i-1]);
  1006. usage(errBuff);
  1007. }
  1008. }
  1009. else if(stricmp(argv[i]+1,"thisTrace")==0)
  1010. {
  1011. if (++i < argc)
  1012. {
  1013. thisTrace = atoi(argv[i]);
  1014. }
  1015. else
  1016. {
  1017. sprintf(errBuff,"Missing value after \"%s\"", argv[i-1]);
  1018. usage(errBuff);
  1019. }
  1020. }
  1021. else if(stricmp(argv[i]+1,"udpTrace")==0)
  1022. {
  1023. if (++i < argc)
  1024. {
  1025. udpTraceLevel = atoi(argv[i]);
  1026. }
  1027. else
  1028. {
  1029. sprintf(errBuff,"Missing value after \"%s\"", argv[i-1]);
  1030. usage(errBuff);
  1031. }
  1032. }
  1033. else
  1034. {
  1035. sprintf(errBuff,"Invalid argument option \"%s\"", argv[i]);
  1036. usage(errBuff);
  1037. }
  1038. }
  1039. else
  1040. {
  1041. sprintf(errBuff,"Argument option \"%s\" missing \"-\" ", argv[i]);
  1042. usage(errBuff);
  1043. }
  1044. }
  1045. // default is daul mode (send and receive)
  1046. if (!modeType) modeType = SND_MODE_BIT | RCV_MODE_BIT;
  1047. IReceiveManager *rcvMgr = NULL;
  1048. IRowManager *rowMgr = NULL;
  1049. IMessageCollator *msgCollA = NULL;
  1050. IMessageCollator *msgCollB = NULL;
  1051. ISendManager *sendMgr = NULL;
  1052. if (modeType & RCV_MODE_BIT)
  1053. {
  1054. rcvMgr = createReceiveManager(7000, 7001, 7002, 7003, multiCast, 100, 0x7fffffff, myIndex);
  1055. rowMgr = createRowManager(0, NULL, queryDummyContextLogger(), NULL);
  1056. msgCollA = rcvMgr->createMessageCollator(rowMgr, 100);
  1057. if (destB)
  1058. {
  1059. msgCollB = rcvMgr->createMessageCollator(rowMgr, 200);
  1060. }
  1061. Sleep(1000);
  1062. }
  1063. if (modeType & SND_MODE_BIT)
  1064. {
  1065. sendMgr = createSendManager(7000, 7001, 7002, 7003, multiCast, 100, udpNumQs, 100, NULL, myIndex);
  1066. Sleep(5000);
  1067. char locBuff[100000];
  1068. for (unsigned packerNum=0; packerNum < numPackers; packerNum++)
  1069. {
  1070. unsigned totalSize = 0;
  1071. char packAHdr[100];
  1072. char packBHdr[100];
  1073. sprintf(packAHdr,"helloA%i", packerNum);
  1074. if (thisTrace)
  1075. printf("Creating packer - hdrLen=%i header %s\n", packerHdrSize, packAHdr);
  1076. IMessagePacker *msgPackA = sendMgr->createMessagePacker(100, 0, packAHdr, packerHdrSize, destA, 1);
  1077. IMessagePacker *msgPackB = NULL;
  1078. if (destB)
  1079. {
  1080. sprintf(packBHdr,"helloB%i", packerNum);
  1081. if (thisTrace)
  1082. printf("Creating packer - hdrLen=%i header %s\n", packerHdrSize, packBHdr);
  1083. msgPackB = sendMgr->createMessagePacker(200, 0, packBHdr, packerHdrSize, destB, 0);
  1084. }
  1085. unsigned buffSize = initSize;
  1086. int pkIx = packerNum;
  1087. int nmSizes = numSizes;
  1088. if (numPackersInfo)
  1089. {
  1090. if (pkIx >= numPackersInfo) pkIx %= numPackersInfo;
  1091. nmSizes = packersInfo[pkIx].numPackets;
  1092. }
  1093. for (unsigned sizeNum=0; sizeNum < nmSizes; sizeNum++, buffSize *= sizeMulti)
  1094. {
  1095. unsigned nmSends = numSends;
  1096. if (numPackersInfo)
  1097. {
  1098. nmSends = 1;
  1099. buffSize = packersInfo[pkIx].packetsSizes[sizeNum];
  1100. }
  1101. for (unsigned sendNum=0; sendNum < nmSends; sendNum++)
  1102. {
  1103. sprintf(locBuff,"size=%i num=%i multi=%i packer=%i hello world",
  1104. buffSize, sendNum, sizeNum, packerNum);
  1105. if (thisTrace > 1)
  1106. printf("Sending data : %s\n", locBuff);
  1107. char *transBuff = (char*) msgPackA->getBuffer(buffSize, false);
  1108. strncpy(transBuff, locBuff, buffSize);
  1109. msgPackA->putBuffer(transBuff, buffSize, false);
  1110. if (msgPackB)
  1111. {
  1112. transBuff = (char*) msgPackB->getBuffer(buffSize, false);
  1113. strncpy(transBuff, locBuff, buffSize);
  1114. msgPackB->putBuffer(transBuff, buffSize, false);
  1115. }
  1116. totalSize += buffSize;
  1117. }
  1118. }
  1119. msgPackA->flush(true);
  1120. msgPackA->Release();
  1121. if (thisTrace)
  1122. printf("Packer %s total data size = %i\n", packAHdr, totalSize);
  1123. if (msgPackB)
  1124. {
  1125. msgPackB->flush(true);
  1126. msgPackB->Release();
  1127. if (thisTrace)
  1128. printf("Packer %s total data size = \n", packBHdr, totalSize);
  1129. }
  1130. if (delayPackers) Sleep(delayPackers);
  1131. }
  1132. while(!sendMgr->allDone()) Sleep(50);
  1133. }
  1134. if (modeType & RCV_MODE_BIT)
  1135. {
  1136. for (unsigned unpackerNum=0; unpackerNum < numPackers; unpackerNum++)
  1137. {
  1138. bool anyActivity_a;
  1139. bool anyActivity_b;
  1140. IMessageResult *resultA = msgCollA->getNextResult(getUnpackerTimeout, anyActivity_a);
  1141. if (!resultA)
  1142. {
  1143. printf("timeout waiting on msgCollA->getNextResult(%i,..)\n", getUnpackerTimeout);
  1144. }
  1145. IMessageResult *resultB = NULL;
  1146. if (msgCollB)
  1147. {
  1148. resultB = msgCollB->getNextResult(getUnpackerTimeout, anyActivity_b);
  1149. if (!resultB)
  1150. {
  1151. printf("timeout waiting on msgCollB->getNextResult(%i,..)\n", getUnpackerTimeout);
  1152. }
  1153. }
  1154. unsigned len;
  1155. const void *hdr;
  1156. char locBuff[100000];
  1157. if (resultA)
  1158. {
  1159. hdr = resultA->getMessageHeader(len);
  1160. if (thisTrace)
  1161. printf("Got unpacker - hdrLen=%i header %s\n", len, hdr);
  1162. }
  1163. if (resultB)
  1164. {
  1165. hdr = resultB->getMessageHeader(len);
  1166. if (thisTrace)
  1167. printf("Got unpacker - hdrLen=%i header \"%s\"\n", len, hdr);
  1168. }
  1169. if (!resultA && resultB)
  1170. {
  1171. resultA = resultB;
  1172. resultB = NULL;
  1173. }
  1174. if (!resultA) continue;
  1175. Owned<IMessageUnpackCursor> unpackA = resultA->getCursor(rowMgr);
  1176. Owned<IMessageUnpackCursor> unpackB = resultB ? resultB->getCursor(rowMgr) : NULL;
  1177. unsigned totalSize = 0;
  1178. unsigned buffSize = initSize;
  1179. if (unpackerNum)
  1180. {
  1181. int size;
  1182. if (thisTrace)
  1183. printf("Calling getNext() for all data available in packer \"%s\"\n", hdr);
  1184. void * p= unpackA->getNext(0x0ffffffff,&size);
  1185. totalSize += size;
  1186. }
  1187. else
  1188. {
  1189. if (thisTrace)
  1190. printf("Calling getNext() with diff sizes for packer \"%s\"\n", hdr);
  1191. buffSize = initSize;
  1192. int pkIx = unpackerNum;
  1193. int nmSizes = numSizes;
  1194. if (numPackersInfo)
  1195. {
  1196. if (pkIx >= numPackersInfo) pkIx %= numPackersInfo;
  1197. nmSizes = packersInfo[pkIx].numPackets;
  1198. }
  1199. for (unsigned sizeNum=0; sizeNum < nmSizes; sizeNum++, buffSize *= sizeMulti)
  1200. {
  1201. unsigned nmSends = numSends;
  1202. if (numPackersInfo)
  1203. {
  1204. nmSends = 1;
  1205. buffSize = packersInfo[pkIx].packetsSizes[sizeNum];
  1206. }
  1207. for (unsigned sendNum=0; sendNum < nmSends; sendNum++)
  1208. {
  1209. int size;
  1210. void *transBuff= unpackA->getNext(buffSize, &size);
  1211. if (!transBuff)
  1212. {
  1213. if (thisTrace > 1)
  1214. printf("end of data\n");
  1215. }
  1216. else {
  1217. totalSize += size;
  1218. memcpy(locBuff, transBuff, size);
  1219. locBuff[size]=0;
  1220. if (thisTrace > 1)
  1221. printf("Received (for size=%i num=%i multi=%i unpacker=%i) data : %s\n",
  1222. buffSize, sendNum, sizeNum, unpackerNum, locBuff);
  1223. }
  1224. }
  1225. }
  1226. }
  1227. if (thisTrace)
  1228. printf("Unpacker %s total data size = %i\n", hdr, totalSize);
  1229. buffSize=initSize;
  1230. if (thisTrace > 1)
  1231. printf("Trying to read more than written\n");
  1232. void *transBuff = unpackA->getNext(buffSize);
  1233. if (!transBuff)
  1234. {
  1235. if (thisTrace > 1)
  1236. printf("OK: Could not read more than written\n");
  1237. }
  1238. else
  1239. {
  1240. memcpy(locBuff, transBuff, buffSize);
  1241. locBuff[buffSize]=0;
  1242. printf("WARNING: read more than written: (%s)\n", locBuff);
  1243. }
  1244. printf("\n\n\n");
  1245. unpackA->Release();
  1246. if (unpackB) unpackB->Release();
  1247. }
  1248. }
  1249. if (msgCollA)
  1250. {
  1251. rcvMgr->detachCollator(msgCollA);
  1252. msgCollA->Release();
  1253. }
  1254. if (msgCollB)
  1255. {
  1256. rcvMgr->detachCollator(msgCollB);
  1257. msgCollB->Release();
  1258. }
  1259. if (sendMgr) sendMgr->Release();
  1260. if (rcvMgr) rcvMgr->Release();
  1261. if (!noendwait)
  1262. {
  1263. printf("\n\nEnter q to terminate program : ");
  1264. scanf("%s", errBuff);
  1265. }
  1266. return 0;
  1267. }
  1268. #endif