udpsim.cpp 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342
  1. /*##############################################################################
  2. HPCC SYSTEMS software Copyright (C) 2021 HPCC Systems®.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. ############################################################################## */
  13. #include "udplib.hpp"
  14. #include "udpsha.hpp"
  15. #include "udptrs.hpp"
  16. #include "udpipmap.hpp"
  17. #include "roxiemem.hpp"
  18. #include "jptree.hpp"
  19. #include "portlist.h"
  20. using roxiemem::DataBuffer;
  21. using roxiemem::IDataBufferManager;
  22. #ifdef SOCKET_SIMULATION
  23. Owned<IDataBufferManager> dbm;
  24. static unsigned numThreads = 20;
  25. static unsigned numReceiveSlots = 100;
  26. static unsigned packetsPerThread = 0;
  27. static unsigned minWork = 0;
  28. static unsigned maxWork = 0;
  29. static unsigned optWorkFrequency = 0;
  30. static bool restartSender = false;
  31. static bool restartReceiver = false;
  32. static bool sendFlowWithData = false;
  33. static constexpr const char * defaultYaml = R"!!(
  34. version: "1.0"
  35. udpsim:
  36. dropDataPackets: false
  37. dropDataPacketsPercent: 0
  38. dropOkToSendPackets: 0
  39. dropRequestReceivedPackets: 0
  40. dropRequestToSendPackets: 0
  41. dropRequestToSendMorePackets: 0
  42. dropSendStartPackets: 0
  43. dropSendCompletedPackets: 0
  44. help: false
  45. minWork: 0 # minimum amount of work
  46. maxWork: 0 # maximum work per set of packets, if 0 use minWork
  47. workFrequency: 5 # Do work once every 5 packets
  48. numThreads: 20
  49. numReceiveSlots: 100
  50. outputconfig: false
  51. packetsPerThread: 10000
  52. restartReceiver: false
  53. restartSender: false
  54. sanityCheckUdpSettings: true
  55. sendFlowWithData: false
  56. udpResendLostPackets: true
  57. udpFlowAckTimeout: 2
  58. updDataSendTimeout: 20
  59. udpRequestTimeout: 20
  60. udpPermitTimeout: 50
  61. udpResendDelay: 0
  62. udpMaxPermitDeadTimeouts: 5
  63. udpRequestDeadTimeout: 10000
  64. udpMaxPendingPermits: 10
  65. udpMaxClientPercent: 200
  66. udpMinSlotsPerSender: 1
  67. udpAssumeSequential: false
  68. udpAllowAsyncPermits: false
  69. udpTraceLevel: 1
  70. udpTraceTimeouts: true
  71. udpTestSocketDelay: 0
  72. udpTestSocketJitter: false
  73. udpTestVariableDelay: false
  74. udpTraceFlow: false
  75. useQueue: false
  76. udpAdjustThreadPriorities: false
  77. )!!";
  78. bool isNumeric(const char *str)
  79. {
  80. while (*str)
  81. {
  82. if (!isdigit(*str))
  83. return false;
  84. str++;
  85. }
  86. return true;
  87. }
  88. bool isBoolean(const char *str)
  89. {
  90. return streq(str, "true") || streq(str, "false");
  91. }
  92. void usage()
  93. {
  94. printf("USAGE: udpsim [options]\n");
  95. printf("Options are:\n");
  96. Owned<IPropertyTree> defaults = createPTreeFromYAMLString(defaultYaml);
  97. IPropertyTree * allowed = defaults->queryPropTree("udpsim");
  98. Owned<IAttributeIterator> aiter = allowed->getAttributes();
  99. ForEach(*aiter)
  100. {
  101. printf(" --%s", aiter->queryName()+1);
  102. if (isBoolean(aiter->queryValue()))
  103. printf("[=0|1]\n");
  104. else
  105. printf("=nn\n");
  106. }
  107. ExitModuleObjects();
  108. releaseAtoms();
  109. exit(2);
  110. }
  111. void initOptions(int argc, const char **argv)
  112. {
  113. Owned<IPropertyTree> defaults = createPTreeFromYAMLString(defaultYaml);
  114. IPropertyTree * allowed = defaults->queryPropTree("udpsim");
  115. for (unsigned argNo = 1; argNo < argc; argNo++)
  116. {
  117. const char *arg = argv[argNo];
  118. if (arg[0]=='-' && arg[1]=='-')
  119. {
  120. arg += 2;
  121. StringBuffer attrname("@");
  122. const char * eq = strchr(arg, '=');
  123. if (eq)
  124. attrname.append(eq-arg, arg);
  125. else
  126. attrname.append(arg);
  127. if (!allowed->hasProp(attrname))
  128. {
  129. printf("Unrecognized option %s\n\n", attrname.str()+1);
  130. usage();
  131. }
  132. if (!eq && !isBoolean(allowed->queryProp(attrname)))
  133. {
  134. printf("Option %s requires a value\n\n", attrname.str()+1);
  135. usage();
  136. }
  137. }
  138. else
  139. {
  140. printf("Unexpected argument %s\n\n", arg);
  141. usage();
  142. }
  143. }
  144. Owned<IPropertyTree> options = loadConfiguration(defaultYaml, argv, "udpsim", "UDPSIM", nullptr, nullptr);
  145. if (options->getPropBool("@help", false))
  146. usage();
  147. #ifdef TEST_DROPPED_PACKETS
  148. udpDropDataPackets = options->getPropBool("@dropDataPackets", false);
  149. udpDropDataPacketsPercent = options->getPropInt("@dropDataPacketsPercent", 0);
  150. udpDropFlowPackets[flowType::ok_to_send] = options->getPropInt("@dropOkToSendPackets", 0); // drop 1 in N
  151. udpDropFlowPackets[flowType::request_received] = options->getPropInt("@dropRequestReceivedPackets", 0); // drop 1 in N
  152. udpDropFlowPackets[flowType::request_to_send] = options->getPropInt("@dropRequestToSendPackets", 0); // drop 1 in N
  153. udpDropFlowPackets[flowType::request_to_send_more] = options->getPropInt("@dropRequestToSendMorePackets", 0); // drop 1 in N
  154. udpDropFlowPackets[flowType::send_start] = options->getPropInt("@dropSendStartPackets", 0); // drop 1 in N
  155. udpDropFlowPackets[flowType::send_completed] = options->getPropInt("@dropSendCompletedPackets", 0); // drop 1 in N
  156. #endif
  157. restartSender = options->getPropBool("@restartSender");
  158. restartReceiver = options->getPropBool("@restartReceiver");
  159. minWork = options->getPropInt("@minWork", 0);
  160. maxWork = options->getPropInt("@maxWork", 0);
  161. optWorkFrequency = options->getPropInt("@workFrequency", 0);
  162. numThreads = options->getPropInt("@numThreads", 0);
  163. udpTraceLevel = options->getPropInt("@udpTraceLevel", 1);
  164. udpTraceTimeouts = options->getPropBool("@udpTraceTimeouts", true);
  165. udpResendLostPackets = options->getPropBool("@udpResendLostPackets", true);
  166. udpPermitTimeout = options->getPropInt("@udpPermitTimeout", udpPermitTimeout);
  167. udpRequestTimeout = options->getPropInt("@udpRequestTimeout", udpRequestTimeout);
  168. udpFlowAckTimeout = options->getPropInt("@udpFlowAckTimeout", udpFlowAckTimeout);
  169. updDataSendTimeout = options->getPropInt("@udpDataSendTimeout", updDataSendTimeout);
  170. udpResendDelay = options->getPropInt("@udpResendDelay", udpResendDelay);
  171. udpMaxPermitDeadTimeouts = options->getPropInt("@udpMaxPermitDeadTimeouts", udpMaxPermitDeadTimeouts);
  172. udpRequestDeadTimeout = options->getPropInt("@udpRequestDeadTimeout", udpRequestDeadTimeout);
  173. udpAssumeSequential = options->getPropBool("@udpAssumeSequential", udpAssumeSequential);
  174. udpAllowAsyncPermits = options->getPropBool("@udpAllowAsyncPermits", udpAllowAsyncPermits);
  175. udpMaxPendingPermits = options->getPropInt("@udpMaxPendingPermits", 1);
  176. udpMinSlotsPerSender = options->getPropInt("@udpMinSlotsPerSender", udpMinSlotsPerSender);
  177. udpMaxClientPercent = options->getPropInt("@udpMaxClientPercent", udpMaxClientPercent);
  178. udpTraceFlow = options->getPropBool("@udpTraceFlow", false);
  179. udpTestUseUdpSockets = !options->getPropBool("@useQueue");
  180. udpTestSocketDelay = options->getPropInt("@udpTestSocketDelay", 0);
  181. udpTestSocketJitter = options->getPropBool("@udpTestSocketJitter");
  182. udpTestVariableDelay = options->getPropBool("@udpTestVariableDelay");
  183. if (udpTestSocketJitter && !udpTestSocketDelay)
  184. {
  185. printf("udpTestSocketDelay requires udpTestSocketDelay to be set - setting to 1\n");
  186. udpTestSocketDelay = 1;
  187. }
  188. if (udpTestVariableDelay && !udpTestSocketDelay)
  189. {
  190. printf("udpTestVariableDelay requires udpTestSocketDelay to be set - setting to 1\n");
  191. udpTestSocketDelay = 1;
  192. }
  193. if (udpTestSocketDelay && udpTestUseUdpSockets)
  194. {
  195. printf("udpTestSocketDelay requires queue mode (--useQueue=1) - setting it on\n");
  196. udpTestUseUdpSockets = false;
  197. }
  198. udpAdjustThreadPriorities = options->getPropBool("@udpAdjustThreadPriorities", udpAdjustThreadPriorities);
  199. packetsPerThread = options->getPropInt("@packetsPerThread");
  200. numReceiveSlots = options->getPropInt("@numReceiveSlots");
  201. isUdpTestMode = true;
  202. roxiemem::setTotalMemoryLimit(false, true, false, 20*1024*1024, 0, NULL, NULL);
  203. dbm.setown(roxiemem::createDataBufferManager(roxiemem::DATA_ALIGNMENT_SIZE));
  204. if (options->getPropBool("sanityCheckUdpSettings", true))
  205. {
  206. unsigned __int64 networkSpeed = options->getPropInt64("@udpNetworkSpeed", 10 * U64C(0x40000000));
  207. sanityCheckUdpSettings(numReceiveSlots, numThreads, networkSpeed);
  208. }
  209. }
  210. // How many times the simulated sender [i] should start
  211. unsigned numStarts(unsigned i)
  212. {
  213. if (i==1 && restartSender)
  214. return 2;
  215. return 1;
  216. }
  217. void simulateTraffic()
  218. {
  219. const unsigned maxSendQueueSize = 100;
  220. try
  221. {
  222. myNode.setIp(IpAddress("1.2.3.4"));
  223. Owned<IReceiveManager> rm = createReceiveManager(CCD_SERVER_FLOW_PORT, CCD_DATA_PORT, CCD_CLIENT_FLOW_PORT, numReceiveSlots, false);
  224. unsigned begin = msTick();
  225. std::atomic<unsigned> workValue{0};
  226. asyncFor(numThreads+1, numThreads+1, [&workValue, maxSendQueueSize, &rm](unsigned i)
  227. {
  228. if (!i)
  229. {
  230. if (restartReceiver)
  231. {
  232. Sleep(100);
  233. rm.clear();
  234. rm.setown(createReceiveManager(CCD_SERVER_FLOW_PORT, CCD_DATA_PORT, CCD_CLIENT_FLOW_PORT, numReceiveSlots, false));
  235. }
  236. }
  237. else
  238. {
  239. unsigned header = 0;
  240. const unsigned serverFlowPort = sendFlowWithData ? CCD_DATA_PORT : CCD_SERVER_FLOW_PORT;
  241. unsigned myStarts = numStarts(i);
  242. for (unsigned startNo = 0; startNo < myStarts; startNo++)
  243. {
  244. IpAddress pretendIP(VStringBuffer("8.8.8.%d", i));
  245. // Note - this is assuming we send flow on the data port (that option defaults true in roxie too)
  246. Owned<ISendManager> sm = createSendManager(serverFlowPort, CCD_DATA_PORT, CCD_CLIENT_FLOW_PORT, maxSendQueueSize, 3, pretendIP, nullptr, false);
  247. Owned<IMessagePacker> mp = sm->createMessagePacker(0, 0, &header, sizeof(header), myNode, 0);
  248. unsigned numPackets = packetsPerThread / myStarts;
  249. for (unsigned j = 0; j < packetsPerThread; j++)
  250. {
  251. if (minWork || maxWork)
  252. {
  253. if ((j % optWorkFrequency) == 0)
  254. {
  255. unsigned work = minWork;
  256. if (maxWork > minWork)
  257. {
  258. //Add some variability in the amount of work required for each packet
  259. unsigned extra = hashc((const byte *)&j, sizeof(j), i) % (maxWork - minWork);
  260. work += extra;
  261. }
  262. unsigned tally = 0;
  263. for (unsigned iWork=0; iWork < work; iWork++)
  264. {
  265. tally = hashc((const byte *)&iWork, sizeof(iWork), tally);
  266. }
  267. workValue += tally;
  268. }
  269. }
  270. void *buf = mp->getBuffer(500, false);
  271. memset(buf, i, 500);
  272. mp->putBuffer(buf, 500, false);
  273. mp->flush();
  274. }
  275. // Wait until all the packets have been sent and acknowledged, for last start only
  276. // For prior starts, we are trying to simulate a sender stopping abruptly (e.g. from a restart) so we don't want to close it down cleanly.
  277. if (startNo == myStarts-1)
  278. while (!sm->allDone())
  279. Sleep(50);
  280. DBGLOG("UdpSim sender thread %d sent %d packets", i, numPackets);
  281. }
  282. DBGLOG("UdpSim sender thread %d completed", i);
  283. }
  284. });
  285. printf("UdpSim test took %ums\n", msTick() - begin);
  286. }
  287. catch (IException * e)
  288. {
  289. EXCLOG(e);
  290. e->Release();
  291. }
  292. }
  293. int main(int argc, const char **argv)
  294. {
  295. InitModuleObjects();
  296. strdup("Make sure leak checking is working");
  297. queryStderrLogMsgHandler()->setMessageFields(MSGFIELD_time|MSGFIELD_microTime|MSGFIELD_milliTime|MSGFIELD_thread|MSGFIELD_prefix);
  298. initOptions(argc, argv);
  299. simulateTraffic();
  300. ExitModuleObjects();
  301. releaseAtoms();
  302. return 0;
  303. }
  304. #else
  305. int main(int argc, const char **arv)
  306. {
  307. printf("udpsim requires a build with SOCKET_SIMULATION enabled\n");
  308. return 2;
  309. }
  310. #endif