Browse Source

HPCC-24960 GetTopicPartitionCount() now loads global configuration file

Global configuration should hold SSL connection information, among other things, which is needed for secured broker connection.

Updated README to clarify that SSL connection parameters should go into the global config and not a topic config.

Signed-off-by: Dan S. Camper <dan.camper@lexisnexisrisk.com>
Dan S. Camper 4 years ago
parent
commit
9f0e3d8b70
2 changed files with 55 additions and 27 deletions
  1. 4 0
      plugins/kafka/README.md
  2. 51 27
      plugins/kafka/kafka.cpp

+ 4 - 0
plugins/kafka/README.md

@@ -59,6 +59,10 @@ consumer.  For a publisher, the naming convention is
 `kafka_consumer_topic_<TopicName>.conf`.  In both cases, `<TopicName>` is the
 name of the topic you are publishing to or consuming from.
 
+Settings that affect the protocol used to connect to the Kafka broker (such as
+using SSL) should be placed only in the global configuration file, not in
+any per-topic configuration file.
+
 Configuration parameters loaded from a file override those set by the plugin
 with one exception:  the `metadata.broker.list` setting, if found in a
 configuration file, is ignored.  Apache Kafka brokers are always set in ECL.

+ 51 - 27
plugins/kafka/kafka.cpp

@@ -40,6 +40,9 @@ namespace KafkaPlugin
     // File Constants
     //--------------------------------------------------------------------------
 
+    // Filename of global Kafka configuration file
+    const char* GLOBAL_CONFIG_FILENAME = "kafka_global.conf";
+
     // The minimum number of seconds that a cached object can live
     // without activity
     const time_t OBJECT_EXPIRE_TIMEOUT_SECONDS = 60 * 2;
@@ -374,7 +377,7 @@ namespace KafkaPlugin
 
                     // Set any global configurations from file, allowing
                     // overrides of above settings
-                    applyConfig("kafka_global.conf", globalConfig, traceLevel);
+                    applyConfig(GLOBAL_CONFIG_FILENAME, globalConfig, traceLevel);
 
                     // Set producer callbacks
                     globalConfig->set("event_cb", static_cast<RdKafka::EventCb*>(this), errStr);
@@ -559,7 +562,7 @@ namespace KafkaPlugin
 
                     // Set any global configurations from file, allowing
                     // overrides of above settings
-                    applyConfig("kafka_global.conf", globalConfig, traceLevel);
+                    applyConfig(GLOBAL_CONFIG_FILENAME, globalConfig, traceLevel);
 
                     // Set consumer callbacks
                     globalConfig->set("event_cb", static_cast<RdKafka::EventCb*>(this), errStr);
@@ -953,56 +956,77 @@ namespace KafkaPlugin
         // C API, so we are basically creating a brand-new connection from
         // scratch.
 
+        int traceLevel = ctx->queryContextLogger().queryTraceLevel();
         __int32 pCount = 0;
         char errstr[512];
-        rd_kafka_conf_t* conf = rd_kafka_conf_new();
-        rd_kafka_t* rk = rd_kafka_new(RD_KAFKA_CONSUMER, conf, errstr, sizeof(errstr));
+        RdKafka::Conf* globalConfig = RdKafka::Conf::create(RdKafka::Conf::CONF_GLOBAL);
 
-        if (rk)
+        if (globalConfig)
         {
-            if (rd_kafka_brokers_add(rk, brokers) != 0)
-            {
-                rd_kafka_topic_conf_t* topic_conf = rd_kafka_topic_conf_new();
-                rd_kafka_topic_t* rkt = rd_kafka_topic_new(rk, topic, topic_conf);
+            // Load global config to pick up any protocol modifications
+            applyConfig(GLOBAL_CONFIG_FILENAME, globalConfig, traceLevel);
 
-                if (rkt)
+            // rd_kafka_new() takes ownership of the lower-level conf object, which in this case is a
+            // pointer currently owned by globalConfig; we need to pass a duplicate
+            // the conf pointer to rd_kafka_new() so we don't mangle globalConfig's internals
+            rd_kafka_conf_t* conf = rd_kafka_conf_dup(globalConfig->c_ptr_global());
+            rd_kafka_t* rk = rd_kafka_new(RD_KAFKA_CONSUMER, conf, errstr, sizeof(errstr));
+            delete globalConfig;
+
+            if (rk)
+            {
+                if (rd_kafka_brokers_add(rk, brokers) != 0)
                 {
-                    const struct rd_kafka_metadata* metadata = NULL;
-                    rd_kafka_resp_err_t err = rd_kafka_metadata(rk, 0, rkt, &metadata, 5000);
+                    rd_kafka_topic_conf_t* topic_conf = rd_kafka_topic_conf_new();
+                    rd_kafka_topic_t* rkt = rd_kafka_topic_new(rk, topic, topic_conf);
 
-                    if (err == RD_KAFKA_RESP_ERR_NO_ERROR)
+                    if (rkt)
                     {
-                        pCount = metadata->topics[0].partition_cnt;
+                        const struct rd_kafka_metadata* metadata = NULL;
+                        rd_kafka_resp_err_t err = rd_kafka_metadata(rk, 0, rkt, &metadata, 5000);
 
-                        rd_kafka_metadata_destroy(metadata);
+                        if (err == RD_KAFKA_RESP_ERR_NO_ERROR)
+                        {
+                            pCount = metadata->topics[0].partition_cnt;
+
+                            rd_kafka_metadata_destroy(metadata);
+                        }
+                        else
+                        {
+                            DBGLOG("Kafka: Error retrieving metadata from topic: %s @ %s: '%s'", topic, brokers, rd_kafka_err2str(err));
+                        }
+
+                        rd_kafka_topic_destroy(rkt);
                     }
                     else
                     {
-                        if (ctx->queryContextLogger().queryTraceLevel() > 4)
+                        if (traceLevel > 4)
                         {
-                            DBGLOG("Kafka: Error retrieving metadata from topic: %s @ %s: '%s'", topic, brokers, rd_kafka_err2str(err));
+                            DBGLOG("Kafka: Could not create topic configuration object: %s @ %s", topic, brokers);
                         }
                     }
-
-                    rd_kafka_topic_destroy(rkt);
                 }
                 else
                 {
-                    if (ctx->queryContextLogger().queryTraceLevel() > 4)
+                    if (traceLevel > 4)
                     {
-                        DBGLOG("Kafka: Could not create topic object: %s @ %s", topic, brokers);
+                        DBGLOG("Kafka: Could not add brokers: %s @ %s", topic, brokers);
                     }
                 }
+
+                rd_kafka_destroy(rk);
             }
             else
             {
-                if (ctx->queryContextLogger().queryTraceLevel() > 4)
-                {
-                    DBGLOG("Kafka: Could not add brokers: %s @ %s", topic, brokers);
-                }
+                DBGLOG("Kafka: Could not create consumer configuration object : %s @ %s: '%s'", topic, brokers, errstr);
+            }
+        }
+        else
+        {
+            if (traceLevel > 4)
+            {
+                DBGLOG("Kafka: Could not create global configuration object: %s @ %s", topic, brokers);
             }
-
-            rd_kafka_destroy(rk);
         }
 
         if (pCount == 0)