소스 검색

HPCC-23440 Need access to aeron configuration settings

Signed-off-by: Richard Chapman <rchapman@hpccsystems.com>
Richard Chapman 5 년 전
부모
커밋
66756a0bd8
3개의 변경된 파일26개의 추가작업 그리고 7개의 파일을 삭제
  1. 2 0
      roxie/ccd/ccdmain.cpp
  2. 23 7
      roxie/udplib/udpaeron.cpp
  3. 1 0
      roxie/udplib/udplib.hpp

+ 2 - 0
roxie/ccd/ccdmain.cpp

@@ -1122,6 +1122,8 @@ int STARTQUERY_API start_query(int argc, const char *argv[])
             else
                 throw MakeStringException(MSGAUD_operator, ROXIE_INVALID_TOPOLOGY, "Invalid topology file - multicastLast not set");
         }
+        if (useAeron)
+            setAeronProperties(topology);
 
         if (useDynamicServers)
         {

+ 23 - 7
roxie/udplib/udpaeron.cpp

@@ -40,6 +40,25 @@ unsigned aeronConnectTimeout = 5000;
 unsigned aeronPollFragmentsLimit = 10;
 unsigned aeronIdleSleepMs = 1;
 
+unsigned aeronMtuLength = 0;
+unsigned aeronSocketRcvbuf = 0;
+unsigned aeronSocketSndbuf = 0;
+unsigned aeronInitialWindow = 0;
+
+extern UDPLIB_API void setAeronProperties(const IPropertyTree *config)
+{
+    useEmbeddedAeronDriver = config->getPropBool("@aeronUseEmbeddedDriver", true);
+    aeronConnectTimeout = config->getPropInt("@aeronConnectTimeout", 5000);
+    aeronPollFragmentsLimit = config->getPropInt("@aeronPollFragmentsLimit", 10);
+    aeronIdleSleepMs = config->getPropInt("@aeronIdleSleepMs", 1);
+
+    aeronMtuLength = config->getPropInt("@aeronMtuLength", 0);
+    aeronSocketRcvbuf = config->getPropInt("@aeronSocketRcvbuf", 0);
+    aeronSocketSndbuf = config->getPropInt("@aeronSocketSndbuf", 0);
+    aeronInitialWindow = config->getPropInt("@aeronInitialWindow", 0);
+
+}
+
 static std::thread aeronDriverThread;
 static InterruptableSemaphore driverStarted;
 
@@ -81,13 +100,10 @@ int startAeronDriver()
         context->warn_if_dirs_exist = false;
         context->term_buffer_sparse_file = false;
 
-        // MORE - should possibly allow these to be configured, or experiment to find what values work well for Roxie
-        // In my (very rudimentary and non-representative) tests these values seem ok.
-
-        context->mtu_length=16384;
-        context->socket_rcvbuf=2097152;
-        context->socket_sndbuf=2097152;
-        context->initial_window_length=2097152;
+        if (aeronMtuLength) context->mtu_length = aeronMtuLength;
+        if (aeronSocketRcvbuf) context->socket_rcvbuf = aeronSocketRcvbuf;
+        if (aeronSocketSndbuf) context->socket_sndbuf = aeronSocketSndbuf;
+        if (aeronInitialWindow) context->initial_window_length = aeronInitialWindow;
 
         if (aeron_driver_init(&driver, context) < 0)
             throw makeStringExceptionV(MSGAUD_operator, -1, "AERON: error initializing driver (%d) %s", aeron_errcode(), aeron_errmsg());

+ 1 - 0
roxie/udplib/udplib.hpp

@@ -142,6 +142,7 @@ interface ISendManager : extends IInterface
 extern UDPLIB_API IReceiveManager *createReceiveManager(int server_flow_port, int data_port, int client_flow_port, int sniffer_port, const IpAddress &sniffer_multicast_ip, int queue_size, unsigned maxSlotsPerSender);
 extern UDPLIB_API ISendManager *createSendManager(int server_flow_port, int data_port, int client_flow_port, int sniffer_port, const IpAddress &sniffer_multicast_ip, int queue_size_pr_server, int queues_pr_server, TokenBucket *rateLimiter);
 
+extern UDPLIB_API void setAeronProperties(const IPropertyTree *config);
 extern UDPLIB_API IReceiveManager *createAeronReceiveManager(const SocketEndpoint &ep);
 extern UDPLIB_API ISendManager *createAeronSendManager(unsigned dataPort, unsigned numQueues, const IpAddress &myIP);