123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252 |
- /*##############################################################################
- HPCC SYSTEMS software Copyright (C) 2021 HPCC Systems®.
- Licensed under the Apache License, Version 2.0 (the "License");
- you may not use this file except in compliance with the License.
- You may obtain a copy of the License at
- http://www.apache.org/licenses/LICENSE-2.0
- Unless required by applicable law or agreed to in writing, software
- distributed under the License is distributed on an "AS IS" BASIS,
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- See the License for the specific language governing permissions and
- limitations under the License.
- ############################################################################## */
- #include "udplib.hpp"
- #include "udpsha.hpp"
- #include "udptrs.hpp"
- #include "udpipmap.hpp"
- #include "roxiemem.hpp"
- #include "jptree.hpp"
- #include "portlist.h"
- using roxiemem::DataBuffer;
- using roxiemem::IDataBufferManager;
- #ifdef SOCKET_SIMULATION
- Owned<IDataBufferManager> dbm;
- static unsigned numThreads = 20;
- static unsigned packetsPerThread = 0;
- static bool restartSender = false;
- static bool restartReceiver = false;
- static constexpr const char * defaultYaml = R"!!(
- version: "1.0"
- udpsim:
- dropDataPackets: false
- dropOkToSendPackets: 0
- dropRequestReceivedPackets: 0
- dropRequestToSendPackets: 0
- dropRequestToSendMorePackets: 0
- dropSendCompletedPackets: 0
- help: false
- numThreads: 20
- outputconfig: false
- packetsPerThread: 10000
- restartReceiver: false
- restartSender: false
- udpTraceLevel: 1
- udpTraceTimeouts: true
- udpResendLostPackets: true
- udpRequestToSendTimeout: 1000
- udpRequestToSendAckTimeout: 1000
- udpMaxPendingPermits: 1
- udpTraceFlow: false
- useQueue: false
- )!!";
- bool isNumeric(const char *str)
- {
- while (*str)
- {
- if (!isdigit(*str))
- return false;
- str++;
- }
- return true;
- }
- bool isBoolean(const char *str)
- {
- return streq(str, "true") || streq(str, "false");
- }
- void usage()
- {
- printf("USAGE: udpsim [options]\n");
- printf("Options are:\n");
- Owned<IPropertyTree> defaults = createPTreeFromYAMLString(defaultYaml);
- IPropertyTree * allowed = defaults->queryPropTree("udpsim");
- Owned<IAttributeIterator> aiter = allowed->getAttributes();
- ForEach(*aiter)
- {
- printf(" --%s", aiter->queryName()+1);
- if (isBoolean(aiter->queryValue()))
- printf("[=0|1]\n");
- else
- printf("=nn\n");
- }
- ExitModuleObjects();
- releaseAtoms();
- exit(2);
- }
- void initOptions(int argc, const char **argv)
- {
- Owned<IPropertyTree> defaults = createPTreeFromYAMLString(defaultYaml);
- IPropertyTree * allowed = defaults->queryPropTree("udpsim");
- for (unsigned argNo = 1; argNo < argc; argNo++)
- {
- const char *arg = argv[argNo];
- if (arg[0]=='-' && arg[1]=='-')
- {
- arg += 2;
- StringBuffer attrname("@");
- const char * eq = strchr(arg, '=');
- if (eq)
- attrname.append(eq-arg, arg);
- else
- attrname.append(arg);
- if (!allowed->hasProp(attrname))
- {
- printf("Unrecognized option %s\n\n", attrname.str()+1);
- usage();
- }
- if (!eq && !isBoolean(allowed->queryProp(attrname)))
- {
- printf("Option %s requires a value\n\n", attrname.str()+1);
- usage();
- }
- }
- else
- {
- printf("Unexpected argument %s\n\n", arg);
- usage();
- }
- }
- Owned<IPropertyTree> options = loadConfiguration(defaultYaml, argv, "udpsim", "UDPSIM", nullptr, nullptr);
- if (options->getPropBool("@help", false))
- usage();
- #ifdef TEST_DROPPED_PACKETS
- udpDropDataPackets = options->getPropBool("@dropDataPackets", false);
- udpDropFlowPackets[flowType::ok_to_send] = options->getPropInt("@dropOkToSendPackets", 0); // drop 1 in N
- udpDropFlowPackets[flowType::request_received] = options->getPropInt("@dropRequestReceivedPackets", 0); // drop 1 in N
- udpDropFlowPackets[flowType::request_to_send] = options->getPropInt("@dropRequestToSendPackets", 0); // drop 1 in N
- udpDropFlowPackets[flowType::request_to_send_more] = options->getPropInt("@dropRequestToSendMorePackets", 0); // drop 1 in N
- udpDropFlowPackets[flowType::send_completed] = options->getPropInt("@dropSendCompletedPackets", 0); // drop 1 in N
- #endif
- restartSender = options->getPropBool("@restartSender");
- restartReceiver = options->getPropBool("@restartReceiver");
- numThreads = options->getPropInt("@numThreads", 0);
- udpTraceLevel = options->getPropInt("@udpTraceLevel", 1);
- udpTraceTimeouts = options->getPropBool("@udpTraceTimeouts", true);
- udpResendLostPackets = options->getPropBool("@udpResendLostPackets", true);
- udpRequestToSendTimeout = options->getPropInt("@udpRequestToSendTimeout", 1000);
- udpRequestToSendAckTimeout = options->getPropInt("@udpRequestToSendAckTimeout", 1000);
- udpMaxPendingPermits = options->getPropInt("@udpMaxPendingPermits", 1);
- udpTraceFlow = options->getPropBool("@udpTraceFlow", false);
- packetsPerThread = options->getPropInt("@packetsPerThread");
- udpTestUseUdpSockets = !options->getPropBool("@useQueue");
- isUdpTestMode = true;
- roxiemem::setTotalMemoryLimit(false, false, false, 20*1024*1024, 0, NULL, NULL);
- dbm.setown(roxiemem::createDataBufferManager(roxiemem::DATA_ALIGNMENT_SIZE));
- }
- // How many times the simulated sender [i] should start
- unsigned numStarts(unsigned i)
- {
- if (i==1 && restartSender)
- return 2;
- return 1;
- }
- void simulateTraffic()
- {
- constexpr unsigned numReceiveSlots = 100;
- constexpr unsigned maxSlotsPerClient = 100;
- constexpr unsigned maxSendQueueSize = 100;
- try
- {
- myNode.setIp(IpAddress("1.2.3.4"));
- Owned<IReceiveManager> rm = createReceiveManager(CCD_SERVER_FLOW_PORT, CCD_DATA_PORT, CCD_CLIENT_FLOW_PORT, numReceiveSlots, maxSlotsPerClient, false);
- unsigned begin = msTick();
- printf("Start test\n");
- asyncFor(numThreads+1, numThreads+1, [maxSendQueueSize, numReceiveSlots, maxSlotsPerClient, &rm](unsigned i)
- {
- if (!i)
- {
- if (restartReceiver)
- {
- Sleep(100);
- rm.clear();
- rm.setown(createReceiveManager(CCD_SERVER_FLOW_PORT, CCD_DATA_PORT, CCD_CLIENT_FLOW_PORT, numReceiveSlots, maxSlotsPerClient, false));
- }
- }
- else
- {
- unsigned header = 0;
- unsigned myStarts = numStarts(i);
- for (unsigned startNo = 0; startNo < myStarts; startNo++)
- {
- IpAddress pretendIP(VStringBuffer("8.8.8.%d", i));
- // Note - this is assuming we send flow on the data port (that option defaults true in roxie too)
- Owned<ISendManager> sm = createSendManager(CCD_DATA_PORT, CCD_DATA_PORT, CCD_CLIENT_FLOW_PORT, maxSendQueueSize, 3, pretendIP, nullptr, false);
- unsigned numPackets = packetsPerThread / myStarts;
- for (unsigned j = 0; j < packetsPerThread; j++)
- {
- Owned<IMessagePacker> mp = sm->createMessagePacker(0, 0, &header, sizeof(header), myNode, 0);
- void *buf = mp->getBuffer(500, false);
- memset(buf, i, 500);
- mp->putBuffer(buf, 500, false);
- mp->flush();
- }
- // Wait until all the packets have been sent and acknowledged, for last start only
- // For prior starts, we are trying to simulate a sender stopping abruptly (e.g. from a restart) so we don't want to close it down cleanly.
- if (startNo == myStarts-1)
- while (!sm->allDone())
- Sleep(50);
- DBGLOG("UdpSim sender thread %d sent %d packets", i, numPackets);
- }
- DBGLOG("UdpSim sender thread %d completed", i);
- }
- });
- printf("UdpSim test took %ums\n", msTick() - begin);
- }
- catch (IException * e)
- {
- EXCLOG(e);
- e->Release();
- }
- }
- int main(int argc, const char **argv)
- {
- InitModuleObjects();
- strdup("Make sure leak checking is working");
- queryStderrLogMsgHandler()->setMessageFields(MSGFIELD_time|MSGFIELD_microTime|MSGFIELD_milliTime|MSGFIELD_thread);
- initOptions(argc, argv);
- simulateTraffic();
- ExitModuleObjects();
- releaseAtoms();
- return 0;
- }
- #else
- int main(int argc, const char **arv)
- {
- printf("udpsim requires a build with SOCKET_SIMULATION enabled\n");
- return 2;
- }
- #endif
|