浏览代码

HPCC-26859 Move udp simulation tests out of unittests framework

Signed-off-by: Richard Chapman <rchapman@hpccsystems.com>
Richard Chapman 3 年之前
父节点
当前提交
709a54090e

+ 1 - 0
roxie/udplib/CMakeLists.txt

@@ -28,4 +28,5 @@ project (AllProjects)
 
 include ( udplib.cmake)
 include ( udptransport.cmake)
+include ( udpsim.cmake)
 

+ 1 - 0
roxie/udplib/udplib.hpp

@@ -161,6 +161,7 @@ extern UDPLIB_API RelaxedAtomic<unsigned> unwantedDiscarded;
 
 extern UDPLIB_API bool udpTraceFlow;
 extern UDPLIB_API bool udpTraceTimeouts;
+
 extern UDPLIB_API unsigned udpTraceLevel;
 extern UDPLIB_API unsigned udpOutQsPriority;
 extern UDPLIB_API void queryMemoryPoolStats(StringBuffer &memStats);

+ 6 - 80
roxie/udplib/udpsha.cpp

@@ -41,6 +41,12 @@ unsigned udpFlowSocketsSize = 131072;
 unsigned udpLocalWriteSocketSize = 1024000;
 unsigned udpStatsReportInterval = 60000;
 
+#ifdef TEST_DROPPED_PACKETS
+bool udpDropDataPackets = false;
+unsigned udpDropFlowPackets[flowType::max_flow_cmd] = {};
+unsigned flowPacketsSent[flowType::max_flow_cmd] = {};
+#endif
+
 unsigned multicastTTL = 1;
 
 MODULE_INIT(INIT_PRIORITY_STANDARD)
@@ -673,7 +679,6 @@ fake read socket that
 #ifdef SOCKET_SIMULATION
 bool isUdpTestMode = false;
 
-
 CSimulatedQueueWriteSocket* CSimulatedQueueWriteSocket::udp_connect(const SocketEndpoint &ep)
 {
     return new CSimulatedQueueWriteSocket(ep);
@@ -844,83 +849,4 @@ void CSimulatedUdpWriteSocket::close()
     realSocket->close();
 }
 
-
-
-//-----------------------------------------------------------------------------------------------------
-
-#ifdef _USE_CPPUNIT
-
-class SimulatedUdpStressTest : public CppUnit::TestFixture
-{
-    CPPUNIT_TEST_SUITE(SimulatedUdpStressTest);
-    CPPUNIT_TEST(simulateTraffic);
-    CPPUNIT_TEST_SUITE_END();
-
-    Owned<IDataBufferManager> dbm;
-    bool initialized = false;
-
-    void testInit()
-    {
-        if (!initialized)
-        {
-            udpTraceLevel = 1;
-            udpTraceTimeouts = true;
-            udpResendLostPackets = true;
-            udpRequestToSendTimeout = 10000;
-            udpRequestToSendAckTimeout = 10000;
-            udpMaxPendingPermits = 1;
-            udpTraceFlow = 0;
-            isUdpTestMode = true;
-            roxiemem::setTotalMemoryLimit(false, false, false, 20*1024*1024, 0, NULL, NULL);
-            dbm.setown(roxiemem::createDataBufferManager(roxiemem::DATA_ALIGNMENT_SIZE));
-            initialized = true;
-        }
-    }
-
-    void simulateTraffic()
-    {
-        constexpr unsigned numReceiveSlots = 100;
-        constexpr unsigned maxSlotsPerClient = 100;
-        constexpr unsigned maxSendQueueSize = 100;
-        try
-        {
-            testInit();
-            myNode.setIp(IpAddress("1.2.3.4"));
-            Owned<IReceiveManager> rm = createReceiveManager(CCD_SERVER_FLOW_PORT, CCD_DATA_PORT, CCD_CLIENT_FLOW_PORT, numReceiveSlots, maxSlotsPerClient, false);
-            unsigned begin = msTick();
-            printf("Start test\n");
-            asyncFor(20, 20, [](unsigned i)
-            {
-                unsigned header = 0;
-                IpAddress pretendIP(VStringBuffer("8.8.8.%d", i));
-                // Note - this is assuming we send flow on the data port (that option defaults true in roxie too)
-                Owned<ISendManager> sm = createSendManager(CCD_DATA_PORT, CCD_DATA_PORT, CCD_CLIENT_FLOW_PORT, maxSendQueueSize, 3, pretendIP, nullptr, false);
-                Owned<IMessagePacker> mp = sm->createMessagePacker(0, 0, &header, sizeof(header), myNode, 0);
-                for (unsigned i = 0; i < 10000; i++)
-                {
-                    void *buf = mp->getBuffer(500, false);
-                    memset(buf, i, 500);
-                    mp->putBuffer(buf, 500, false);
-                }
-                mp->flush();
-
-                //wait until all the packets have been sent and acknowledged
-                while(!sm->allDone())
-                    Sleep(50);
-            });
-            printf("End test %u\n", msTick() - begin);
-        }
-        catch (IException * e)
-        {
-            StringBuffer msg;
-            printf("Exception: %s\n", e->errorMessage(msg).str());
-            throw;
-        }
-    }
-};
-
-CPPUNIT_TEST_SUITE_REGISTRATION( SimulatedUdpStressTest );
-CPPUNIT_TEST_SUITE_NAMED_REGISTRATION( SimulatedUdpStressTest, "SimulatedUdpStressTest" );
-
-#endif
 #endif

+ 12 - 2
roxie/udplib/udpsha.hpp

@@ -202,7 +202,7 @@ public:
 
 class flowType {
 public:
-    enum flowCmd : unsigned short { ok_to_send, request_received, request_to_send, send_completed, request_to_send_more };
+    enum flowCmd : unsigned short { ok_to_send, request_received, request_to_send, send_completed, request_to_send_more, max_flow_cmd };
     static const char *name(flowCmd m)
     {
         switch (m)
@@ -267,7 +267,17 @@ inline bool checkTraceLevel(unsigned category, unsigned level)
 #define SOCKET_SIMULATION_UDP
 
 #ifdef SOCKET_SIMULATION
-extern bool isUdpTestMode;
+#ifdef _DEBUG
+#define TEST_DROPPED_PACKETS
+#endif
+
+#ifdef TEST_DROPPED_PACKETS
+extern UDPLIB_API bool udpDropDataPackets;
+extern UDPLIB_API unsigned udpDropFlowPackets[flowType::max_flow_cmd];
+extern unsigned flowPacketsSent[flowType::max_flow_cmd];
+#endif
+
+extern UDPLIB_API bool isUdpTestMode;
 
 class CSocketSimulator : public CInterfaceOf<ISocket>
 {

+ 48 - 0
roxie/udplib/udpsim.cmake

@@ -0,0 +1,48 @@
+################################################################################
+#    HPCC SYSTEMS software Copyright (C) 2021 HPCC Systems®.
+#
+#    Licensed under the Apache License, Version 2.0 (the "License");
+#    you may not use this file except in compliance with the License.
+#    You may obtain a copy of the License at
+#
+#       http://www.apache.org/licenses/LICENSE-2.0
+#
+#    Unless required by applicable law or agreed to in writing, software
+#    distributed under the License is distributed on an "AS IS" BASIS,
+#    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#    See the License for the specific language governing permissions and
+#    limitations under the License.
+################################################################################
+
+# Component: udptransport 
+
+#####################################################
+# Description:
+# ------------
+#    Cmake Input File for udpsim
+#####################################################
+
+
+project( udptransport ) 
+
+set (    SRCS 
+         udpsim.cpp 
+    )
+
+include_directories ( 
+         ./../../roxie/roxiemem 
+         ./../../system/include 
+         ./../../system/jlib 
+         ./../../roxie/ccd 
+    )
+
+HPCC_ADD_EXECUTABLE ( udpsim ${SRCS} )
+
+#We don't currently ship this - it's for developer use only
+#install ( TARGETS udpsim RUNTIME DESTINATION ${EXEC_DIR} )  
+
+target_link_libraries ( udpsim
+         jlib
+         roxiemem
+         udplib 
+    )

+ 210 - 0
roxie/udplib/udpsim.cpp

@@ -0,0 +1,210 @@
+/*##############################################################################
+
+    HPCC SYSTEMS software Copyright (C) 2021 HPCC Systems®.
+
+    Licensed under the Apache License, Version 2.0 (the "License");
+    you may not use this file except in compliance with the License.
+    You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing, software
+    distributed under the License is distributed on an "AS IS" BASIS,
+    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+    See the License for the specific language governing permissions and
+    limitations under the License.
+############################################################################## */
+
+#include "udplib.hpp"
+#include "udpsha.hpp"
+#include "udptrs.hpp"
+#include "udpipmap.hpp"
+#include "roxiemem.hpp"
+#include "jptree.hpp"
+#include "portlist.h"
+
+using roxiemem::DataBuffer;
+using roxiemem::IDataBufferManager;
+
+#ifdef SOCKET_SIMULATION
+
+Owned<IDataBufferManager> dbm;
+
+unsigned numThreads = 20;
+
+static constexpr const char * defaultYaml = R"!!(
+version: "1.0"
+udpsim:
+  dropDataPackets: false
+  dropOkToSendPackets: 0
+  dropRequestReceivedPackets: 0
+  dropRequestToSendPackets: 0
+  dropRequestToSendMorePackets: 0
+  dropSendCompletedPackets: 0
+  help: false
+  numThreads: 20
+  outputconfig: false
+  udpTraceLevel: 1
+  udpTraceTimeouts: true
+  udpResendLostPackets: true
+  udpRequestToSendTimeout: 1000
+  udpRequestToSendAckTimeout: 1000
+  udpMaxPendingPermits: 1
+  udpTraceFlow: false
+)!!";
+
+bool isNumeric(const char *str)
+{
+    while (*str)
+    {
+        if (!isdigit(*str))
+            return false;
+        str++;
+    }
+    return true;
+}
+
+bool isBoolean(const char *str)
+{
+    return streq(str, "true") || streq(str, "false");
+}
+
+void usage()
+{
+    printf("USAGE: udpsim [options]\n");
+    printf("Options are:\n");
+    Owned<IPropertyTree> defaults = createPTreeFromYAMLString(defaultYaml);
+    IPropertyTree * allowed = defaults->queryPropTree("udpsim");
+    Owned<IAttributeIterator> aiter = allowed->getAttributes();
+    ForEach(*aiter)
+    {
+        printf("  --%s", aiter->queryName()+1);
+        if (isBoolean(aiter->queryValue()))
+            printf("[=0|1]\n");
+        else
+            printf("=nn\n");
+    }
+    ExitModuleObjects();
+    releaseAtoms();
+    exit(2);
+}
+
+void initOptions(int argc, const char **argv)
+{
+    Owned<IPropertyTree> defaults = createPTreeFromYAMLString(defaultYaml);
+    IPropertyTree * allowed = defaults->queryPropTree("udpsim");
+    for (unsigned argNo = 1; argNo < argc; argNo++)
+    {
+        const char *arg = argv[argNo];
+        if (arg[0]=='-' && arg[1]=='-')
+        {
+            arg += 2;
+            StringBuffer attrname("@");
+            const char * eq = strchr(arg, '=');
+            if (eq)
+                attrname.append(eq-arg, arg);
+            else
+                attrname.append(arg);
+            if (!allowed->hasProp(attrname))
+            {
+                printf("Unrecognized option %s\n\n", attrname.str()+1);
+                usage();
+            }
+            if (!eq && !isBoolean(allowed->queryProp(attrname)))
+            {
+                printf("Option %s requires a value\n\n", attrname.str()+1);
+                usage();
+            }
+        }
+        else
+        {
+            printf("Unexpected argument %s\n\n", arg);
+            usage();
+        }
+    }
+
+    Owned<IPropertyTree> options = loadConfiguration(defaultYaml, argv, "udpsim", "UDPSIM", nullptr, nullptr);
+    if (options->getPropBool("@help", false))
+        usage();
+#ifdef TEST_DROPPED_PACKETS
+    udpDropDataPackets = options->getPropBool("@dropDataPackets", false);
+    udpDropFlowPackets[flowType::ok_to_send] = options->getPropInt("@dropOkToSendPackets", 0);  // drop 1 in N
+    udpDropFlowPackets[flowType::request_received] = options->getPropInt("@dropRequestReceivedPackets", 0);  // drop 1 in N
+    udpDropFlowPackets[flowType::request_to_send] = options->getPropInt("@dropRequestToSendPackets", 0);  // drop 1 in N
+    udpDropFlowPackets[flowType::request_to_send_more] = options->getPropInt("@dropRequestToSendMorePackets", 0);  // drop 1 in N
+    udpDropFlowPackets[flowType::send_completed] = options->getPropInt("@dropSendCompletedPackets", 0);  // drop 1 in N
+#endif
+    numThreads = options->getPropInt("@numThreads", 0);
+    udpTraceLevel = options->getPropInt("@udpTraceLevel", 1);
+    udpTraceTimeouts = options->getPropBool("@udpTraceTimeouts", true);
+    udpResendLostPackets = options->getPropBool("@udpResendLostPackets", true);
+    udpRequestToSendTimeout = options->getPropInt("@udpRequestToSendTimeout", 1000);
+    udpRequestToSendAckTimeout = options->getPropInt("@udpRequestToSendAckTimeout", 1000);
+    udpMaxPendingPermits = options->getPropInt("@udpMaxPendingPermits", 1);
+    udpTraceFlow = options->getPropBool("@udpTraceFlow", false);
+
+    isUdpTestMode = true;
+    roxiemem::setTotalMemoryLimit(false, false, false, 20*1024*1024, 0, NULL, NULL);
+    dbm.setown(roxiemem::createDataBufferManager(roxiemem::DATA_ALIGNMENT_SIZE));
+}
+
+void simulateTraffic()
+{
+    constexpr unsigned numReceiveSlots = 100;
+    constexpr unsigned maxSlotsPerClient = 100;
+    constexpr unsigned maxSendQueueSize = 100;
+    try
+    {
+        myNode.setIp(IpAddress("1.2.3.4"));
+        Owned<IReceiveManager> rm = createReceiveManager(CCD_SERVER_FLOW_PORT, CCD_DATA_PORT, CCD_CLIENT_FLOW_PORT, numReceiveSlots, maxSlotsPerClient, false);
+        unsigned begin = msTick();
+        printf("Start test\n");
+        asyncFor(numThreads, numThreads, [maxSendQueueSize](unsigned i)
+        {
+            unsigned header = 0;
+            IpAddress pretendIP(VStringBuffer("8.8.8.%d", i));
+            // Note - this is assuming we send flow on the data port (that option defaults true in roxie too)
+            Owned<ISendManager> sm = createSendManager(CCD_DATA_PORT, CCD_DATA_PORT, CCD_CLIENT_FLOW_PORT, maxSendQueueSize, 3, pretendIP, nullptr, false);
+            Owned<IMessagePacker> mp = sm->createMessagePacker(0, 0, &header, sizeof(header), myNode, 0);
+            for (unsigned j = 0; j < 10000; j++)
+            {
+                void *buf = mp->getBuffer(500, false);
+                memset(buf, i, 500);
+                mp->putBuffer(buf, 500, false);
+            }
+            mp->flush();
+
+            //wait until all the packets have been sent and acknowledged
+            while(!sm->allDone())
+                Sleep(50);
+        });
+        printf("End test %u\n", msTick() - begin);
+    }
+    catch (IException * e)
+    {
+        StringBuffer msg;
+        printf("Exception: %s\n", e->errorMessage(msg).str());
+    }
+}
+
+int main(int argc, const char **argv)
+{
+    InitModuleObjects();
+    strdup("Make sure leak checking is working");
+    queryStderrLogMsgHandler()->setMessageFields(MSGFIELD_time|MSGFIELD_microTime|MSGFIELD_milliTime|MSGFIELD_thread);
+    initOptions(argc, argv);
+    simulateTraffic();
+    ExitModuleObjects();
+    releaseAtoms();
+    return 0;
+}
+
+#else
+
+int main(int argc, const char **arv)
+{
+    printf("udpsim requires a build with SOCKET_SIMULATION enabled\n");
+    return 2;
+}
+
+#endif

+ 18 - 0
roxie/udplib/udptrr.cpp

@@ -210,6 +210,15 @@ class CReceiveManager : implements IReceiveManager, public CInterface
                     StringBuffer ipStr;
                     DBGLOG("UdpReceiver: sending request_received msg seq %" SEQF "u to node=%s", _flowSeq, dest.getIpText(ipStr).str());
                 }
+#ifdef TEST_DROPPED_PACKETS
+                flowPacketsSent[msg.cmd]++;
+                if (udpDropFlowPackets[msg.cmd] && flowPacketsSent[msg.cmd]%udpDropFlowPackets[msg.cmd]==0)
+                {
+                    StringBuffer ipStr;
+                    DBGLOG("UdpReceiver: deliberately dropping request_received msg seq %" SEQF "u to node=%s", _flowSeq, dest.getIpText(ipStr).str());
+                }
+                else
+#endif
                 flowSocket->write(&msg, udpResendLostPackets ? sizeof(UdpPermitToSendMsg) : offsetof(UdpPermitToSendMsg, seen));
                 flowPermitsSent++;
 
@@ -241,6 +250,15 @@ class CReceiveManager : implements IReceiveManager, public CInterface
                     StringBuffer ipStr;
                     DBGLOG("UdpReceiver: sending ok_to_send %u msg seq %" SEQF "u to node=%s", maxTransfer, flowSeq, dest.getIpText(ipStr).str());
                 }
+#ifdef TEST_DROPPED_PACKETS
+                flowPacketsSent[msg.cmd]++;
+                if (udpDropFlowPackets[msg.cmd] && flowPacketsSent[msg.cmd]%udpDropFlowPackets[msg.cmd]==0)
+                {
+                    StringBuffer ipStr;
+                    DBGLOG("UdpReceiver: deliberately dropping ok_to_send %u msg seq %" SEQF "u to node=%s", maxTransfer, flowSeq, dest.getIpText(ipStr).str());
+                }
+                else
+#endif
                 flowSocket->write(&msg, udpResendLostPackets ? sizeof(UdpPermitToSendMsg) : offsetof(UdpPermitToSendMsg, seen));
                 flowPermitsSent++;
             }

+ 10 - 5
roxie/udplib/udptrs.cpp

@@ -40,10 +40,6 @@ unsigned udpMaxRetryTimedoutReqs = 0; // 0 means off (keep retrying forever)
 unsigned udpRequestToSendTimeout = 0; // value in milliseconds - 0 means calculate from query timeouts
 unsigned udpRequestToSendAckTimeout = 10; // value in milliseconds
 
-#ifdef _DEBUG
-//#define TEST_DROPPED_PACKETS
-#endif
-
 using roxiemem::DataBuffer;
 /*
  *
@@ -216,6 +212,15 @@ private:
                 StringBuffer s, s2;
                 DBGLOG("UdpSender[%s]: sending flowType::%s msg %" SEQF "u flowSeq %" SEQF "u to node=%s", msg.sourceNode.getTraceText(s2).str(), flowType::name(msg.cmd), msg.sendSeq, msg.flowSeq, ip.getIpText(s).str());
             }
+#ifdef TEST_DROPPED_PACKETS
+            flowPacketsSent[msg.cmd]++;
+            if (udpDropFlowPackets[msg.cmd] && flowPacketsSent[msg.cmd]%udpDropFlowPackets[msg.cmd] == 0)
+            {
+                StringBuffer s, s2;
+                DBGLOG("UdpSender[%s]: deliberately dropping flowType::%s msg %" SEQF "u flowSeq %" SEQF "u to node=%s", msg.sourceNode.getTraceText(s2).str(), flowType::name(msg.cmd), msg.sendSeq, msg.flowSeq, ip.getIpText(s).str());
+            }
+            else
+#endif
             send_flow_socket->write(&msg, sizeof(UdpRequestToSendMsg));
             flowRequestsSent++;
         }
@@ -418,7 +423,7 @@ public:
             try
             {
 #ifdef TEST_DROPPED_PACKETS
-                if (((header->pktSeq & UDP_PACKET_RESENT)==0) && (header->pktSeq==0 || header->pktSeq==10 || ((header->pktSeq&UDP_PACKET_COMPLETE) != 0)))
+                if (udpDropDataPackets && ((header->pktSeq & UDP_PACKET_RESENT)==0) && (header->pktSeq==0 || header->pktSeq==10 || ((header->pktSeq&UDP_PACKET_COMPLETE) != 0)))
                     DBGLOG("Deliberately dropping packet %" SEQF "u", header->sendSeq);
                 else
 #endif

+ 3 - 3
system/jlib/jptree.cpp

@@ -8414,7 +8414,7 @@ void mergeConfiguration(IPropertyTree & target, const IPropertyTree & source, co
         bool first = false;
         bool endprior = false;
         bool sequence = checkInSequence(child, seqname, first, endprior);
-        if (first && (!name || isScalarItem(child))) //arrays of unamed objects or scalars are replaced
+        if (first && (!name || isScalarItem(child))) //arrays of unnamed objects or scalars are replaced
             target.removeProp(tag);
 
         IPropertyTree * match = ensureMergeConfigTarget(target, tag, altname ? altNameAttribute : "@name", name, sequence);
@@ -8565,7 +8565,7 @@ static void applyCommandLineOption(IPropertyTree * config, const char * option,
     config->setProp(path, value);
 }
 
-static void applyCommandLineOption(IPropertyTree * config, const char * option, std::initializer_list<const char *> ignoreOptions)
+static void applyCommandLineOption(IPropertyTree * config, const char * option, std::initializer_list<const std::string> ignoreOptions)
 {
     const char * eq = strchr(option, '=');
     StringBuffer name;
@@ -8638,7 +8638,7 @@ Owned<IPropertyTree> getGlobalConfigSP()
     return getGlobalConfig();
 }
 
-jlib_decl IPropertyTree * loadArgsIntoConfiguration(IPropertyTree *config, const char * * argv, std::initializer_list<const char *> ignoreOptions)
+jlib_decl IPropertyTree * loadArgsIntoConfiguration(IPropertyTree *config, const char * * argv, std::initializer_list<const std::string> ignoreOptions)
 {
     for (const char * * pArg = argv; *pArg; pArg++)
     {

+ 1 - 1
system/jlib/jptree.hpp

@@ -316,7 +316,7 @@ inline static bool isValidXPathChr(char c)
 //export for unit test
 jlib_decl void mergeConfiguration(IPropertyTree & target, const IPropertyTree & source, const char *altNameAttribute=nullptr, bool overwriteAttr=true);
 
-jlib_decl IPropertyTree * loadArgsIntoConfiguration(IPropertyTree *config, const char * * argv, std::initializer_list<const char *> ignoreOptions = {});
+jlib_decl IPropertyTree * loadArgsIntoConfiguration(IPropertyTree *config, const char * * argv, std::initializer_list<const std::string> ignoreOptions = {});
 jlib_decl IPropertyTree * loadConfiguration(IPropertyTree * defaultConfig, const char * * argv, const char * componentTag, const char * envPrefix, const char * legacyFilename, IPropertyTree * (mapper)(IPropertyTree *), const char *altNameAttribute=nullptr, bool monitor=true);
 jlib_decl IPropertyTree * loadConfiguration(const char * defaultYaml, const char * * argv, const char * componentTag, const char * envPrefix, const char * legacyFilename, IPropertyTree * (mapper)(IPropertyTree *), const char *altNameAttribute=nullptr, bool monitor=true);
 jlib_decl IPropertyTree * getCostsConfiguration();