Pārlūkot izejas kodu

HPCC-23849 Fix memory leaks with Kafka plugin

Also fixup README so that it displays properly from github.

Signed-off-by: Dan S. Camper <dan.camper@lexisnexisrisk.com>
Dan S. Camper 5 gadi atpakaļ
vecāks
revīzija
dfcb05c56c
2 mainītis faili ar 18 papildinājumiem un 14 dzēšanām
  1. 14 14
      plugins/kafka/README.md
  2. 4 0
      plugins/kafka/kafka.cpp

+ 14 - 14
plugins/kafka/README.md

@@ -1,4 +1,4 @@
-#ECL Apache Kafka Plugin
+# ECL Apache Kafka Plugin
 
 This is the ECL plugin to access [Apache Kafka](https://kafka.apache.org), a
 publish-subscribe messaging system.  ECL string data can be both published to
@@ -7,7 +7,7 @@ and consumed from Apache Kafka brokers.
 Client access is via a third-party C++ plugin,
 [librdkafka](https://github.com/edenhill/librdkafka).
 
-##Installation and Dependencies
+## Installation and Dependencies
 
 [librdkafka](https://github.com/edenhill/librdkafka) is included as a git
 submodule in HPCC-Platform.  It will be built and integrated automatically when
@@ -26,7 +26,7 @@ your installation by creating a topic and interacting with it.
 
 *Note:* Apache Kafka version 0.8.2 or later is recommended.
 
-##Plugin Configuration
+## Plugin Configuration
 
 The Apache Kafka plugin uses sensible default configuration values but these can
 be modified via configuration files.
@@ -79,7 +79,7 @@ overriding their normal default values:
     fetch.message.max.bytes=10000000
     auto.offset.reset=smallest
 
-##Publishing messages with the plugin
+## Publishing messages with the plugin
 
 Publishing string messages begins with instantiating an ECL module that defines
 the Apache Kafka cluster and the topic into which the messages will be posted. 
@@ -132,7 +132,7 @@ calling the following module function:
 `GetTopicPartitionCount()` returns zero if the topic has not been created or
 there are has been an error.
 
-##Consuming messages with the plugin
+## Consuming messages with the plugin
 
 As with publishing, consuming string messages begins with instantiating an ECL
 module that defines the Apache Kafka cluster and the topic from which the
@@ -226,14 +226,14 @@ calling the following module function:
 `GetTopicPartitionCount()` returns zero if the topic has not been created or
 there are has been an error.
 
-##Complete ECL Examples
+## Complete ECL Examples
 
 The following code will publish 100K messages to a topic named 'MyTestTopic' on
 an Apache Kafka broker located at address 10.211.55.13.  If you are running a
 single-node HPCC cluster and have installed Kafka on the same node, you can use
 'localhost' instead (or omit the parameter, as it defaults to 'localhost').
 
-###Publishing
+### Publishing
 
     IMPORT kafka;
 
@@ -256,7 +256,7 @@ single-node HPCC cluster and have installed Kafka on the same node, you can use
 
     APPLY(ds, ORDERED(p.PublishMessage(message)));
 
-###Consuming
+### Consuming
 
 This code will read the messages written by the publishing example, above.  It
 will also show the number of partitions in the topic and the offsets of the
@@ -275,7 +275,7 @@ last-read messages.
     OUTPUT(offsets, NAMED('LastMessageOffsets'));
     OUTPUT(partitionCount, NAMED('PartitionCount'));
 
-###Resetting Offsets
+### Resetting Offsets
 
 Resetting offsets is useful when you have a topic already published with
 messages and you need to reread its messages from the very beginning.
@@ -286,9 +286,9 @@ messages and you need to reread its messages from the very beginning.
 
     c.ResetMessageOffsets();
 
-##Behaviour and Implementation Details
+## Behaviour and Implementation Details
 
-###Partitioning within Apache Kafka Topics
+### Partitioning within Apache Kafka Topics
 
 Topic partitioning is covered in Apache Kafka's
 [introduction](https://kafka.apache.org/documentation.html#introduction).  There
@@ -321,7 +321,7 @@ has never been seen before if someone publishes to it, and that topic will have
 only one partition.  Both actions -- whether a topic is automatically created
 and how many partitions it will have -- are configurable within Apache Kafka.
 
-###Publisher Connections
+### Publisher Connections
 
 This plugin caches the internal publisher objects and their connections. 
 Publishing from ECL, technically, only writes the messages to a local cache. 
@@ -332,14 +332,14 @@ need to hang around for some additional time.  The upside is that the cached
 objects and connections will be reused for subsequent publish operations,
 speeding up the entire process.
 
-###Consumer Connections
+### Consumer Connections
 
 Unlike publisher objects, one consumer object is created per thread for each
 connection.  A connection is to a specific broker, topic, consumer group, and
 partition number combination.  The consumer objects and connections live only as
 long as needed.
 
-###Saved Topic Offsets
+### Saved Topic Offsets
 
 By default, consumers save to a file the offset of the last-read message from a
 given topic, consumer group, and partition combination.  The offset is saved so

+ 4 - 0
plugins/kafka/kafka.cpp

@@ -382,6 +382,7 @@ namespace KafkaPlugin
 
                     // Create the producer
                     producerPtr = RdKafka::Producer::create(globalConfig, errStr);
+                    delete globalConfig;
 
                     if (producerPtr)
                     {
@@ -393,6 +394,7 @@ namespace KafkaPlugin
 
                         // Create the topic
                         topicPtr.store(RdKafka::Topic::create(producerPtr, topic, topicConfPtr, errStr), std::memory_order_release);
+                        delete topicConfPtr;
 
                         if (topicPtr)
                         {
@@ -564,6 +566,7 @@ namespace KafkaPlugin
 
                     // Create the consumer
                     consumerPtr = RdKafka::Consumer::create(globalConfig, errStr);
+                    delete globalConfig;
 
                     if (consumerPtr)
                     {
@@ -588,6 +591,7 @@ namespace KafkaPlugin
 
                         // Create the topic
                         topicPtr.store(RdKafka::Topic::create(consumerPtr, topic, topicConfPtr, errStr), std::memory_order_release);
+                        delete topicConfPtr;
 
                         if (!topicPtr)
                         {