Browse Source

Merge pull request #14305 from dcamper/hpcc-24960-kafka-plugin-config-ssl

HPCC-24960 Kafka Plugin: GetTopicPartitionCount() now loads global configuration file

Reviewed-by: Gavin Halliday <ghalliday@hpccsystems.com>
Merged-by: Gavin Halliday <ghalliday@hpccsystems.com>
Gavin Halliday 4 years ago
parent
commit
ae7117afb9
2 changed files with 55 additions and 27 deletions
  1. 4 0
      plugins/kafka/README.md
  2. 51 27
      plugins/kafka/kafka.cpp

+ 4 - 0
plugins/kafka/README.md

@@ -59,6 +59,10 @@ consumer.  For a publisher, the naming convention is
 `kafka_consumer_topic_<TopicName>.conf`.  In both cases, `<TopicName>` is the
 name of the topic you are publishing to or consuming from.
 
+Settings that affect the protocol used to connect to the Kafka broker (such as
+using SSL) should be placed only in the global configuration file, not in
+any per-topic configuration file.
+
 Configuration parameters loaded from a file override those set by the plugin
 with one exception:  the `metadata.broker.list` setting, if found in a
 configuration file, is ignored.  Apache Kafka brokers are always set in ECL.

+ 51 - 27
plugins/kafka/kafka.cpp

@@ -40,6 +40,9 @@ namespace KafkaPlugin
     // File Constants
     //--------------------------------------------------------------------------
 
+    // Filename of global Kafka configuration file
+    const char* GLOBAL_CONFIG_FILENAME = "kafka_global.conf";
+
     // The minimum number of seconds that a cached object can live
     // without activity
     const time_t OBJECT_EXPIRE_TIMEOUT_SECONDS = 60 * 2;
@@ -374,7 +377,7 @@ namespace KafkaPlugin
 
                     // Set any global configurations from file, allowing
                     // overrides of above settings
-                    applyConfig("kafka_global.conf", globalConfig, traceLevel);
+                    applyConfig(GLOBAL_CONFIG_FILENAME, globalConfig, traceLevel);
 
                     // Set producer callbacks
                     globalConfig->set("event_cb", static_cast<RdKafka::EventCb*>(this), errStr);
@@ -559,7 +562,7 @@ namespace KafkaPlugin
 
                     // Set any global configurations from file, allowing
                     // overrides of above settings
-                    applyConfig("kafka_global.conf", globalConfig, traceLevel);
+                    applyConfig(GLOBAL_CONFIG_FILENAME, globalConfig, traceLevel);
 
                     // Set consumer callbacks
                     globalConfig->set("event_cb", static_cast<RdKafka::EventCb*>(this), errStr);
@@ -953,56 +956,77 @@ namespace KafkaPlugin
         // C API, so we are basically creating a brand-new connection from
         // scratch.
 
+        int traceLevel = ctx->queryContextLogger().queryTraceLevel();
         __int32 pCount = 0;
         char errstr[512];
-        rd_kafka_conf_t* conf = rd_kafka_conf_new();
-        rd_kafka_t* rk = rd_kafka_new(RD_KAFKA_CONSUMER, conf, errstr, sizeof(errstr));
+        RdKafka::Conf* globalConfig = RdKafka::Conf::create(RdKafka::Conf::CONF_GLOBAL);
 
-        if (rk)
+        if (globalConfig)
         {
-            if (rd_kafka_brokers_add(rk, brokers) != 0)
-            {
-                rd_kafka_topic_conf_t* topic_conf = rd_kafka_topic_conf_new();
-                rd_kafka_topic_t* rkt = rd_kafka_topic_new(rk, topic, topic_conf);
+            // Load global config to pick up any protocol modifications
+            applyConfig(GLOBAL_CONFIG_FILENAME, globalConfig, traceLevel);
 
-                if (rkt)
+            // rd_kafka_new() takes ownership of the lower-level conf object, which in this case is a
+            // pointer currently owned by globalConfig; we need to pass a duplicate
+            // the conf pointer to rd_kafka_new() so we don't mangle globalConfig's internals
+            rd_kafka_conf_t* conf = rd_kafka_conf_dup(globalConfig->c_ptr_global());
+            rd_kafka_t* rk = rd_kafka_new(RD_KAFKA_CONSUMER, conf, errstr, sizeof(errstr));
+            delete globalConfig;
+
+            if (rk)
+            {
+                if (rd_kafka_brokers_add(rk, brokers) != 0)
                 {
-                    const struct rd_kafka_metadata* metadata = NULL;
-                    rd_kafka_resp_err_t err = rd_kafka_metadata(rk, 0, rkt, &metadata, 5000);
+                    rd_kafka_topic_conf_t* topic_conf = rd_kafka_topic_conf_new();
+                    rd_kafka_topic_t* rkt = rd_kafka_topic_new(rk, topic, topic_conf);
 
-                    if (err == RD_KAFKA_RESP_ERR_NO_ERROR)
+                    if (rkt)
                     {
-                        pCount = metadata->topics[0].partition_cnt;
+                        const struct rd_kafka_metadata* metadata = NULL;
+                        rd_kafka_resp_err_t err = rd_kafka_metadata(rk, 0, rkt, &metadata, 5000);
 
-                        rd_kafka_metadata_destroy(metadata);
+                        if (err == RD_KAFKA_RESP_ERR_NO_ERROR)
+                        {
+                            pCount = metadata->topics[0].partition_cnt;
+
+                            rd_kafka_metadata_destroy(metadata);
+                        }
+                        else
+                        {
+                            DBGLOG("Kafka: Error retrieving metadata from topic: %s @ %s: '%s'", topic, brokers, rd_kafka_err2str(err));
+                        }
+
+                        rd_kafka_topic_destroy(rkt);
                     }
                     else
                     {
-                        if (ctx->queryContextLogger().queryTraceLevel() > 4)
+                        if (traceLevel > 4)
                         {
-                            DBGLOG("Kafka: Error retrieving metadata from topic: %s @ %s: '%s'", topic, brokers, rd_kafka_err2str(err));
+                            DBGLOG("Kafka: Could not create topic configuration object: %s @ %s", topic, brokers);
                         }
                     }
-
-                    rd_kafka_topic_destroy(rkt);
                 }
                 else
                 {
-                    if (ctx->queryContextLogger().queryTraceLevel() > 4)
+                    if (traceLevel > 4)
                     {
-                        DBGLOG("Kafka: Could not create topic object: %s @ %s", topic, brokers);
+                        DBGLOG("Kafka: Could not add brokers: %s @ %s", topic, brokers);
                     }
                 }
+
+                rd_kafka_destroy(rk);
             }
             else
             {
-                if (ctx->queryContextLogger().queryTraceLevel() > 4)
-                {
-                    DBGLOG("Kafka: Could not add brokers: %s @ %s", topic, brokers);
-                }
+                DBGLOG("Kafka: Could not create consumer configuration object : %s @ %s: '%s'", topic, brokers, errstr);
+            }
+        }
+        else
+        {
+            if (traceLevel > 4)
+            {
+                DBGLOG("Kafka: Could not create global configuration object: %s @ %s", topic, brokers);
             }
-
-            rd_kafka_destroy(rk);
         }
 
         if (pCount == 0)