|
@@ -85,7 +85,7 @@ Publishing string messages begins with instantiating an ECL module that defines
|
|
|
the Apache Kafka cluster and the topic into which the messages will be posted.
|
|
|
The definition of the module is:
|
|
|
|
|
|
- Publisher(VARSTRING topic, VARSTRING brokers = 'localhost') := MODULE
|
|
|
+ KafkaPublisher(VARSTRING topic, VARSTRING brokers = 'localhost') := MODULE
|
|
|
...
|
|
|
END
|
|
|
|
|
@@ -99,7 +99,7 @@ connection is made.
|
|
|
|
|
|
Example instantiating a publishing module:
|
|
|
|
|
|
- p := kafka.Publisher('MyTopic', '10.211.55.13');
|
|
|
+ p := kafka.KafkaPublisher('MyTopic', '10.211.55.13');
|
|
|
|
|
|
The module contains an exported function for publishing a message, defined as:
|
|
|
|
|
@@ -138,24 +138,24 @@ As with publishing, consuming string messages begins with instantiating an ECL
|
|
|
module that defines the Apache Kafka cluster and the topic from which the
|
|
|
messages will be read. The definition of the module is:
|
|
|
|
|
|
- Consumer(VARSTRING topic,
|
|
|
- VARSTRING brokers = 'localhost',
|
|
|
- VARSTRING consumerGroup = 'hpcc') := MODULE
|
|
|
+ KafkaConsumer(VARSTRING topic,
|
|
|
+ VARSTRING brokers = 'localhost',
|
|
|
+ VARSTRING consumerGroup = 'hpcc') := MODULE
|
|
|
...
|
|
|
END
|
|
|
|
|
|
The module requires you to designate a topic by name. Optionally, you may also
|
|
|
cite at least one Apache Kafka broker and a consumer group. The format and
|
|
|
-requirements for a broker are the same as for instantiating a Producer module.
|
|
|
-Consumer groups in Apache Kafka allow multiple consumer instances, like Thor
|
|
|
-nodes, to form a "logical consumer" and be able to retrieve messages in parallel
|
|
|
-and without duplication. See the "Consumers" subtopic in Apache Kafka's
|
|
|
-[introduction](https://kafka.apache.org/documentation.html#introduction) for
|
|
|
-more details.
|
|
|
+requirements for a broker are the same as for instantiating a KafkaPublisher
|
|
|
+module. Consumer groups in Apache Kafka allow multiple consumer instances, like
|
|
|
+Thor nodes, to form a "logical consumer" and be able to retrieve messages in
|
|
|
+parallel and without duplication. See the "Consumers" subtopic in Apache
|
|
|
+Kafka's [introduction](https://kafka.apache.org/documentation.html#introduction)
|
|
|
+for more details.
|
|
|
|
|
|
Example:
|
|
|
|
|
|
- c := kafka.Consumer('MyTopic', '10.211.55.13');
|
|
|
+ c := kafka.KafkaConsumer('MyTopic', '10.211.55.13');
|
|
|
|
|
|
The module contains an exported function for consuming messages, defined as:
|
|
|
|
|
@@ -252,7 +252,7 @@ single-node HPCC cluster and have installed Kafka on the same node, you can use
|
|
|
DISTRIBUTED
|
|
|
);
|
|
|
|
|
|
- p := kafka.Publisher('MyTestTopic', brokers := '10.211.55.13');
|
|
|
+ p := kafka.KafkaPublisher('MyTestTopic', brokers := '10.211.55.13');
|
|
|
|
|
|
APPLY(ds, ORDERED(p.PublishMessage(message)));
|
|
|
|
|
@@ -264,7 +264,7 @@ last-read messages.
|
|
|
|
|
|
IMPORT kafka;
|
|
|
|
|
|
- c := kafka.Consumer('MyTestTopic', brokers := '10.211.55.13');
|
|
|
+ c := kafka.KafkaConsumer('MyTestTopic', brokers := '10.211.55.13');
|
|
|
|
|
|
ds := c.GetMessages(200000);
|
|
|
offsets := c.LastMessageOffsets(ds);
|
|
@@ -282,7 +282,7 @@ messages and you need to reread its messages from the very beginning.
|
|
|
|
|
|
IMPORT kafka;
|
|
|
|
|
|
- c := kafka.Consumer('MyTestTopic', brokers := '10.211.55.13');
|
|
|
+ c := kafka.KafkaConsumer('MyTestTopic', brokers := '10.211.55.13');
|
|
|
|
|
|
c.ResetMessageOffsets();
|
|
|
|