/*############################################################################## HPCC SYSTEMS software Copyright (C) 2015 HPCC Systems®. Licensed under the Apache License, Version 2.0 (the License); you may not use this file except in compliance with the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an AS IS BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. ############################################################################## */ // Record structure containing message offset positioning EXPORT KafkaMessageOffset := RECORD UNSIGNED4 partitionNum; INTEGER8 offset; END; // Record structure that will be used to return Kafka messages to ECL EXPORT KafkaMessage := RECORD KafkaMessageOffset; UTF8 message; END; // Service definition SHARED kafka := SERVICE : plugin('kafka'), namespace('KafkaPlugin') BOOLEAN PublishMessage(CONST VARSTRING brokers, CONST VARSTRING topic, CONST UTF8 message, CONST UTF8 key) : cpp,action,context,entrypoint='publishMessage'; INTEGER4 getTopicPartitionCount(CONST VARSTRING brokers, CONST VARSTRING topic) : cpp,action,context,entrypoint='getTopicPartitionCount'; STREAMED DATASET(KafkaMessage) GetMessageDataset(CONST VARSTRING brokers, CONST VARSTRING topic, CONST VARSTRING consumerGroup, INTEGER4 partitionNum, INTEGER8 maxRecords) : cpp,action,context,entrypoint='getMessageDataset'; INTEGER8 SetMessageOffset(CONST VARSTRING brokers, CONST VARSTRING topic, CONST VARSTRING consumerGroup, INTEGER4 partitionNum, INTEGER8 newOffset) : cpp,action,context,entrypoint='setMessageOffset'; END; /** * Module wrapping message publishing functions. * * @param topic The name of the topic this module will be publishing to; * cannot be an empty string; REQUIRED * @param brokers One or more Kafka broker; each broker should be in the * form 'Name[:port]' where 'Name' may be either a DNS name * or an IP address; multiple brokers should be delimited * with a comma; brokers can also be set in the * kafka_global.conf configuration file, in which case * you should pass an empty string; OPTIONAL, * defaulting to 'localhost' */ EXPORT KafkaPublisher(VARSTRING topic, VARSTRING brokers = 'localhost') := MODULE /** * Get the number of partitions currently set up for this topic * * @return The number of partitions or zero if either the topic does not * exist or there was an error */ EXPORT INTEGER4 GetTopicPartitionCount() := kafka.getTopicPartitionCount(brokers, topic); /** * Queue one message for publishing to the current Kafka topic * * @param message The message to publish; must not be an empty string; * REQUIRED * @param key A key to attach to the message, used by Kafka to * route the message to a particular partition (keys * with the same value wind up on the same partition); * an empty string indicates no key value; OPTIONAL, * defaults to an empty string * * @return TRUE */ EXPORT BOOLEAN PublishMessage(CONST UTF8 message, CONST UTF8 key = U8'') := kafka.PublishMessage(brokers, topic, message, key); END; /** * Module wrapping message consuming functions. * * @param topic The name of the topic this module will be publishing to; * cannot be an empty string; REQUIRED * @param brokers One or more Kafka broker; each broker should be in the * form 'Name[:port]' where 'Name' may be either a DNS name * or an IP address; multiple brokers should be delimited * with a comma; brokers can also be set in the * kafka_global.conf configuration file, in which case * you should pass an empty string; OPTIONAL, * defaulting to 'localhost' * @param consumerGroup The name of the Kafka consumer group to use for any * message consumption; * (see https://kafka.apache.org/documentation.html#introduction); * OPTIONAL, defaults to 'hpcc' */ EXPORT KafkaConsumer(VARSTRING topic, VARSTRING brokers = 'localhost', VARSTRING consumerGroup = 'hpcc') := MODULE /** * Get the number of partitions currently set up for this topic * * @return The number of partitions or zero if either the topic does not * exist or there was an error */ EXPORT INTEGER4 GetTopicPartitionCount() := kafka.getTopicPartitionCount(brokers, topic); /** * Consume previously-published messages from the current topic. * * @param maxRecords The maximum number of records to retrieve; pass * zero to return as many messages as there are * queued (dangerous); REQUIRED * * @return A new dataset containing the retrieved messages */ EXPORT DATASET(KafkaMessage) GetMessages(INTEGER8 maxRecords) := FUNCTION // Record structure to hold messages from multiple partitions MultiNodeMessageRec := RECORD DATASET(KafkaMessage) messages; END; numberOfPartitions := GetTopicPartitionCount() : INDEPENDENT; maxRecordsPerNode := MAX(maxRecords DIV numberOfPartitions, 1); // Container holding messages from all partitions; in a multi-node setup // the work will be distributed among the nodes (at least up to the // number of partitions); note that 'COUNTER - 1' is actually the // Kafka partition number that will be read messageContainer := DATASET ( numberOfPartitions, TRANSFORM ( MultiNodeMessageRec, SELF.messages := kafka.GetMessageDataset(brokers, topic, consumerGroup, COUNTER - 1, maxRecordsPerNode) ), DISTRIBUTED ); // Map messages from multiple partitions back to final record structure resultDS := NORMALIZE ( messageContainer, LEFT.messages, TRANSFORM ( KafkaMessage, SELF := RIGHT ), LOCAL ); RETURN resultDS; END; /** * Given a set of messages, presumably just consumed from an Apache Kafka * cluster, summarize the last-read message offsets on a per-partition basis. * This is useful for logging/saving the last messages read during a * particular run, which can then be used to restore system state if you * have to re-consume older messages (see SetMessageOffsets() function). * * @param messages A dataset of consumed messages; REQUIRED * * @return A new dataset containing a summary of partitions and their * associated last-read message offsets. */ EXPORT DATASET(KafkaMessageOffset) LastMessageOffsets(DATASET(KafkaMessage) messages) := FUNCTION t := TABLE ( messages, { partitionNum, INTEGER8 offset := MAX(GROUP, offset) }, partitionNum, MERGE ); f := PROJECT(t, TRANSFORM(KafkaMessageOffset, SELF := LEFT)); RETURN f; END; /** * Resets the last-read partition offsets to the values in the given dataset. * This is useful for "rewinding" message reading to an earlier point. The * next call to GetMessages() will start consuming at the points described * in the dataset. * * @param offsets A dataset of of partitions and the offsets to which * you want to set each, like the result from a call * to LastMessageOffsets(); REQUIRED * * @return The number of partitions set */ EXPORT UNSIGNED4 SetMessageOffsets(DATASET(KafkaMessageOffset) offsets) := FUNCTION // Distribute the offset data so that each partition lines up on the right node distOffsets := DISTRIBUTE(offsets, partitionNum); // Temporary result layout that will capture a COUNTER value generated // by PROJECT, which in turn ensures that the LOCAL flag is actually used // and our data distribution is honored (the distribution is required in // order to ensure that kafka.SetMessageOffset() is called on the correct // Thor nodes) ResultLayout := RECORD KafkaMessageOffset; UNSIGNED4 c; END; // Set the offset for each partition on each node result := PROJECT ( NOFOLD(distOffsets), TRANSFORM ( ResultLayout, SELF.offset := kafka.SetMessageOffset(brokers, topic, consumerGroup, LEFT.partitionNum, LEFT.offset), SELF.c := COUNTER, SELF := LEFT ), LOCAL ); RETURN COUNT(result(offset >= 0)); END; /** * Convenience function. Resets all topic partitions to their earliest * point. * * @return The number of partitions reset */ EXPORT UNSIGNED4 ResetMessageOffsets() := FUNCTION numberOfPartitions := GetTopicPartitionCount() : INDEPENDENT; offsets := DATASET ( numberOfPartitions, TRANSFORM ( KafkaMessageOffset, SELF.partitionNum := COUNTER - 1, SELF.offset := 0 ) ); RETURN SetMessageOffsets(offsets); END; END;