|
@@ -30,7 +30,10 @@ using roxiemem::IDataBufferManager;
|
|
|
|
|
|
Owned<IDataBufferManager> dbm;
|
|
|
|
|
|
-unsigned numThreads = 20;
|
|
|
+static unsigned numThreads = 20;
|
|
|
+static unsigned packetsPerThread = 0;
|
|
|
+static bool restartSender = false;
|
|
|
+static bool restartReceiver = false;
|
|
|
|
|
|
static constexpr const char * defaultYaml = R"!!(
|
|
|
version: "1.0"
|
|
@@ -44,6 +47,9 @@ udpsim:
|
|
|
help: false
|
|
|
numThreads: 20
|
|
|
outputconfig: false
|
|
|
+ packetsPerThread: 10000
|
|
|
+ restartReceiver: false
|
|
|
+ restartSender: false
|
|
|
udpTraceLevel: 1
|
|
|
udpTraceTimeouts: true
|
|
|
udpResendLostPackets: true
|
|
@@ -51,6 +57,7 @@ udpsim:
|
|
|
udpRequestToSendAckTimeout: 1000
|
|
|
udpMaxPendingPermits: 1
|
|
|
udpTraceFlow: false
|
|
|
+ useQueue: false
|
|
|
)!!";
|
|
|
|
|
|
bool isNumeric(const char *str)
|
|
@@ -134,6 +141,9 @@ void initOptions(int argc, const char **argv)
|
|
|
udpDropFlowPackets[flowType::request_to_send_more] = options->getPropInt("@dropRequestToSendMorePackets", 0); // drop 1 in N
|
|
|
udpDropFlowPackets[flowType::send_completed] = options->getPropInt("@dropSendCompletedPackets", 0); // drop 1 in N
|
|
|
#endif
|
|
|
+ restartSender = options->getPropBool("@restartSender");
|
|
|
+ restartReceiver = options->getPropBool("@restartReceiver");
|
|
|
+
|
|
|
numThreads = options->getPropInt("@numThreads", 0);
|
|
|
udpTraceLevel = options->getPropInt("@udpTraceLevel", 1);
|
|
|
udpTraceTimeouts = options->getPropBool("@udpTraceTimeouts", true);
|
|
@@ -142,12 +152,22 @@ void initOptions(int argc, const char **argv)
|
|
|
udpRequestToSendAckTimeout = options->getPropInt("@udpRequestToSendAckTimeout", 1000);
|
|
|
udpMaxPendingPermits = options->getPropInt("@udpMaxPendingPermits", 1);
|
|
|
udpTraceFlow = options->getPropBool("@udpTraceFlow", false);
|
|
|
+ packetsPerThread = options->getPropInt("@packetsPerThread");
|
|
|
+ udpTestUseUdpSockets = !options->getPropBool("@useQueue");
|
|
|
|
|
|
isUdpTestMode = true;
|
|
|
roxiemem::setTotalMemoryLimit(false, false, false, 20*1024*1024, 0, NULL, NULL);
|
|
|
dbm.setown(roxiemem::createDataBufferManager(roxiemem::DATA_ALIGNMENT_SIZE));
|
|
|
}
|
|
|
|
|
|
+// How many times the simulated sender [i] should start
|
|
|
+unsigned numStarts(unsigned i)
|
|
|
+{
|
|
|
+ if (i==1 && restartSender)
|
|
|
+ return 2;
|
|
|
+ return 1;
|
|
|
+}
|
|
|
+
|
|
|
void simulateTraffic()
|
|
|
{
|
|
|
constexpr unsigned numReceiveSlots = 100;
|
|
@@ -159,31 +179,52 @@ void simulateTraffic()
|
|
|
Owned<IReceiveManager> rm = createReceiveManager(CCD_SERVER_FLOW_PORT, CCD_DATA_PORT, CCD_CLIENT_FLOW_PORT, numReceiveSlots, maxSlotsPerClient, false);
|
|
|
unsigned begin = msTick();
|
|
|
printf("Start test\n");
|
|
|
- asyncFor(numThreads, numThreads, [maxSendQueueSize](unsigned i)
|
|
|
+ asyncFor(numThreads+1, numThreads+1, [maxSendQueueSize, numReceiveSlots, maxSlotsPerClient, &rm](unsigned i)
|
|
|
{
|
|
|
- unsigned header = 0;
|
|
|
- IpAddress pretendIP(VStringBuffer("8.8.8.%d", i));
|
|
|
- // Note - this is assuming we send flow on the data port (that option defaults true in roxie too)
|
|
|
- Owned<ISendManager> sm = createSendManager(CCD_DATA_PORT, CCD_DATA_PORT, CCD_CLIENT_FLOW_PORT, maxSendQueueSize, 3, pretendIP, nullptr, false);
|
|
|
- Owned<IMessagePacker> mp = sm->createMessagePacker(0, 0, &header, sizeof(header), myNode, 0);
|
|
|
- for (unsigned j = 0; j < 10000; j++)
|
|
|
+ if (!i)
|
|
|
{
|
|
|
- void *buf = mp->getBuffer(500, false);
|
|
|
- memset(buf, i, 500);
|
|
|
- mp->putBuffer(buf, 500, false);
|
|
|
+ if (restartReceiver)
|
|
|
+ {
|
|
|
+ Sleep(100);
|
|
|
+ rm.clear();
|
|
|
+ rm.setown(createReceiveManager(CCD_SERVER_FLOW_PORT, CCD_DATA_PORT, CCD_CLIENT_FLOW_PORT, numReceiveSlots, maxSlotsPerClient, false));
|
|
|
+ }
|
|
|
+ }
|
|
|
+ else
|
|
|
+ {
|
|
|
+ unsigned header = 0;
|
|
|
+ unsigned myStarts = numStarts(i);
|
|
|
+ for (unsigned startNo = 0; startNo < myStarts; startNo++)
|
|
|
+ {
|
|
|
+ IpAddress pretendIP(VStringBuffer("8.8.8.%d", i));
|
|
|
+ // Note - this is assuming we send flow on the data port (that option defaults true in roxie too)
|
|
|
+ Owned<ISendManager> sm = createSendManager(CCD_DATA_PORT, CCD_DATA_PORT, CCD_CLIENT_FLOW_PORT, maxSendQueueSize, 3, pretendIP, nullptr, false);
|
|
|
+ unsigned numPackets = packetsPerThread / myStarts;
|
|
|
+ for (unsigned j = 0; j < packetsPerThread; j++)
|
|
|
+ {
|
|
|
+ Owned<IMessagePacker> mp = sm->createMessagePacker(0, 0, &header, sizeof(header), myNode, 0);
|
|
|
+ void *buf = mp->getBuffer(500, false);
|
|
|
+ memset(buf, i, 500);
|
|
|
+ mp->putBuffer(buf, 500, false);
|
|
|
+ mp->flush();
|
|
|
+ }
|
|
|
+
|
|
|
+ // Wait until all the packets have been sent and acknowledged, for last start only
|
|
|
+ // For prior starts, we are trying to simulate a sender stopping abruptly (e.g. from a restart) so we don't want to close it down cleanly.
|
|
|
+ if (startNo == myStarts-1)
|
|
|
+ while (!sm->allDone())
|
|
|
+ Sleep(50);
|
|
|
+ DBGLOG("UdpSim sender thread %d sent %d packets", i, numPackets);
|
|
|
+ }
|
|
|
+ DBGLOG("UdpSim sender thread %d completed", i);
|
|
|
}
|
|
|
- mp->flush();
|
|
|
-
|
|
|
- //wait until all the packets have been sent and acknowledged
|
|
|
- while(!sm->allDone())
|
|
|
- Sleep(50);
|
|
|
});
|
|
|
- printf("End test %u\n", msTick() - begin);
|
|
|
+ printf("UdpSim test took %ums\n", msTick() - begin);
|
|
|
}
|
|
|
catch (IException * e)
|
|
|
{
|
|
|
- StringBuffer msg;
|
|
|
- printf("Exception: %s\n", e->errorMessage(msg).str());
|
|
|
+ EXCLOG(e);
|
|
|
+ e->Release();
|
|
|
}
|
|
|
}
|
|
|
|