udpsim.cpp 8.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252
  1. /*##############################################################################
  2. HPCC SYSTEMS software Copyright (C) 2021 HPCC Systems®.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. ############################################################################## */
  13. #include "udplib.hpp"
  14. #include "udpsha.hpp"
  15. #include "udptrs.hpp"
  16. #include "udpipmap.hpp"
  17. #include "roxiemem.hpp"
  18. #include "jptree.hpp"
  19. #include "portlist.h"
  20. using roxiemem::DataBuffer;
  21. using roxiemem::IDataBufferManager;
  22. #ifdef SOCKET_SIMULATION
  23. Owned<IDataBufferManager> dbm;
  24. static unsigned numThreads = 20;
  25. static unsigned packetsPerThread = 0;
  26. static bool restartSender = false;
  27. static bool restartReceiver = false;
  28. static constexpr const char * defaultYaml = R"!!(
  29. version: "1.0"
  30. udpsim:
  31. dropDataPackets: false
  32. dropOkToSendPackets: 0
  33. dropRequestReceivedPackets: 0
  34. dropRequestToSendPackets: 0
  35. dropRequestToSendMorePackets: 0
  36. dropSendCompletedPackets: 0
  37. help: false
  38. numThreads: 20
  39. outputconfig: false
  40. packetsPerThread: 10000
  41. restartReceiver: false
  42. restartSender: false
  43. udpTraceLevel: 1
  44. udpTraceTimeouts: true
  45. udpResendLostPackets: true
  46. udpRequestToSendTimeout: 1000
  47. udpRequestToSendAckTimeout: 1000
  48. udpMaxPendingPermits: 1
  49. udpTraceFlow: false
  50. useQueue: false
  51. )!!";
  52. bool isNumeric(const char *str)
  53. {
  54. while (*str)
  55. {
  56. if (!isdigit(*str))
  57. return false;
  58. str++;
  59. }
  60. return true;
  61. }
  62. bool isBoolean(const char *str)
  63. {
  64. return streq(str, "true") || streq(str, "false");
  65. }
  66. void usage()
  67. {
  68. printf("USAGE: udpsim [options]\n");
  69. printf("Options are:\n");
  70. Owned<IPropertyTree> defaults = createPTreeFromYAMLString(defaultYaml);
  71. IPropertyTree * allowed = defaults->queryPropTree("udpsim");
  72. Owned<IAttributeIterator> aiter = allowed->getAttributes();
  73. ForEach(*aiter)
  74. {
  75. printf(" --%s", aiter->queryName()+1);
  76. if (isBoolean(aiter->queryValue()))
  77. printf("[=0|1]\n");
  78. else
  79. printf("=nn\n");
  80. }
  81. ExitModuleObjects();
  82. releaseAtoms();
  83. exit(2);
  84. }
  85. void initOptions(int argc, const char **argv)
  86. {
  87. Owned<IPropertyTree> defaults = createPTreeFromYAMLString(defaultYaml);
  88. IPropertyTree * allowed = defaults->queryPropTree("udpsim");
  89. for (unsigned argNo = 1; argNo < argc; argNo++)
  90. {
  91. const char *arg = argv[argNo];
  92. if (arg[0]=='-' && arg[1]=='-')
  93. {
  94. arg += 2;
  95. StringBuffer attrname("@");
  96. const char * eq = strchr(arg, '=');
  97. if (eq)
  98. attrname.append(eq-arg, arg);
  99. else
  100. attrname.append(arg);
  101. if (!allowed->hasProp(attrname))
  102. {
  103. printf("Unrecognized option %s\n\n", attrname.str()+1);
  104. usage();
  105. }
  106. if (!eq && !isBoolean(allowed->queryProp(attrname)))
  107. {
  108. printf("Option %s requires a value\n\n", attrname.str()+1);
  109. usage();
  110. }
  111. }
  112. else
  113. {
  114. printf("Unexpected argument %s\n\n", arg);
  115. usage();
  116. }
  117. }
  118. Owned<IPropertyTree> options = loadConfiguration(defaultYaml, argv, "udpsim", "UDPSIM", nullptr, nullptr);
  119. if (options->getPropBool("@help", false))
  120. usage();
  121. #ifdef TEST_DROPPED_PACKETS
  122. udpDropDataPackets = options->getPropBool("@dropDataPackets", false);
  123. udpDropFlowPackets[flowType::ok_to_send] = options->getPropInt("@dropOkToSendPackets", 0); // drop 1 in N
  124. udpDropFlowPackets[flowType::request_received] = options->getPropInt("@dropRequestReceivedPackets", 0); // drop 1 in N
  125. udpDropFlowPackets[flowType::request_to_send] = options->getPropInt("@dropRequestToSendPackets", 0); // drop 1 in N
  126. udpDropFlowPackets[flowType::request_to_send_more] = options->getPropInt("@dropRequestToSendMorePackets", 0); // drop 1 in N
  127. udpDropFlowPackets[flowType::send_completed] = options->getPropInt("@dropSendCompletedPackets", 0); // drop 1 in N
  128. #endif
  129. restartSender = options->getPropBool("@restartSender");
  130. restartReceiver = options->getPropBool("@restartReceiver");
  131. numThreads = options->getPropInt("@numThreads", 0);
  132. udpTraceLevel = options->getPropInt("@udpTraceLevel", 1);
  133. udpTraceTimeouts = options->getPropBool("@udpTraceTimeouts", true);
  134. udpResendLostPackets = options->getPropBool("@udpResendLostPackets", true);
  135. udpRequestToSendTimeout = options->getPropInt("@udpRequestToSendTimeout", 1000);
  136. udpRequestToSendAckTimeout = options->getPropInt("@udpRequestToSendAckTimeout", 1000);
  137. udpMaxPendingPermits = options->getPropInt("@udpMaxPendingPermits", 1);
  138. udpTraceFlow = options->getPropBool("@udpTraceFlow", false);
  139. packetsPerThread = options->getPropInt("@packetsPerThread");
  140. udpTestUseUdpSockets = !options->getPropBool("@useQueue");
  141. isUdpTestMode = true;
  142. roxiemem::setTotalMemoryLimit(false, false, false, 20*1024*1024, 0, NULL, NULL);
  143. dbm.setown(roxiemem::createDataBufferManager(roxiemem::DATA_ALIGNMENT_SIZE));
  144. }
  145. // How many times the simulated sender [i] should start
  146. unsigned numStarts(unsigned i)
  147. {
  148. if (i==1 && restartSender)
  149. return 2;
  150. return 1;
  151. }
  152. void simulateTraffic()
  153. {
  154. constexpr unsigned numReceiveSlots = 100;
  155. constexpr unsigned maxSlotsPerClient = 100;
  156. constexpr unsigned maxSendQueueSize = 100;
  157. try
  158. {
  159. myNode.setIp(IpAddress("1.2.3.4"));
  160. Owned<IReceiveManager> rm = createReceiveManager(CCD_SERVER_FLOW_PORT, CCD_DATA_PORT, CCD_CLIENT_FLOW_PORT, numReceiveSlots, maxSlotsPerClient, false);
  161. unsigned begin = msTick();
  162. printf("Start test\n");
  163. asyncFor(numThreads+1, numThreads+1, [maxSendQueueSize, numReceiveSlots, maxSlotsPerClient, &rm](unsigned i)
  164. {
  165. if (!i)
  166. {
  167. if (restartReceiver)
  168. {
  169. Sleep(100);
  170. rm.clear();
  171. rm.setown(createReceiveManager(CCD_SERVER_FLOW_PORT, CCD_DATA_PORT, CCD_CLIENT_FLOW_PORT, numReceiveSlots, maxSlotsPerClient, false));
  172. }
  173. }
  174. else
  175. {
  176. unsigned header = 0;
  177. unsigned myStarts = numStarts(i);
  178. for (unsigned startNo = 0; startNo < myStarts; startNo++)
  179. {
  180. IpAddress pretendIP(VStringBuffer("8.8.8.%d", i));
  181. // Note - this is assuming we send flow on the data port (that option defaults true in roxie too)
  182. Owned<ISendManager> sm = createSendManager(CCD_DATA_PORT, CCD_DATA_PORT, CCD_CLIENT_FLOW_PORT, maxSendQueueSize, 3, pretendIP, nullptr, false);
  183. unsigned numPackets = packetsPerThread / myStarts;
  184. for (unsigned j = 0; j < packetsPerThread; j++)
  185. {
  186. Owned<IMessagePacker> mp = sm->createMessagePacker(0, 0, &header, sizeof(header), myNode, 0);
  187. void *buf = mp->getBuffer(500, false);
  188. memset(buf, i, 500);
  189. mp->putBuffer(buf, 500, false);
  190. mp->flush();
  191. }
  192. // Wait until all the packets have been sent and acknowledged, for last start only
  193. // For prior starts, we are trying to simulate a sender stopping abruptly (e.g. from a restart) so we don't want to close it down cleanly.
  194. if (startNo == myStarts-1)
  195. while (!sm->allDone())
  196. Sleep(50);
  197. DBGLOG("UdpSim sender thread %d sent %d packets", i, numPackets);
  198. }
  199. DBGLOG("UdpSim sender thread %d completed", i);
  200. }
  201. });
  202. printf("UdpSim test took %ums\n", msTick() - begin);
  203. }
  204. catch (IException * e)
  205. {
  206. EXCLOG(e);
  207. e->Release();
  208. }
  209. }
  210. int main(int argc, const char **argv)
  211. {
  212. InitModuleObjects();
  213. strdup("Make sure leak checking is working");
  214. queryStderrLogMsgHandler()->setMessageFields(MSGFIELD_time|MSGFIELD_microTime|MSGFIELD_milliTime|MSGFIELD_thread);
  215. initOptions(argc, argv);
  216. simulateTraffic();
  217. ExitModuleObjects();
  218. releaseAtoms();
  219. return 0;
  220. }
  221. #else
  222. int main(int argc, const char **arv)
  223. {
  224. printf("udpsim requires a build with SOCKET_SIMULATION enabled\n");
  225. return 2;
  226. }
  227. #endif