123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256 |
- /*##############################################################################
- HPCC SYSTEMS software Copyright (C) 2015 HPCC Systems®.
- Licensed under the Apache License, Version 2.0 (the License);
- you may not use this file except in compliance with the License.
- You may obtain a copy of the License at
- http://www.apache.org/licenses/LICENSE-2.0
- Unless required by applicable law or agreed to in writing, software
- distributed under the License is distributed on an AS IS BASIS,
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- See the License for the specific language governing permissions and
- limitations under the License.
- ############################################################################## */
- // Record structure containing message offset positioning
- EXPORT KafkaMessageOffset := RECORD
- UNSIGNED4 partitionNum;
- INTEGER8 offset;
- END;
- // Record structure that will be used to return Kafka messages to ECL
- EXPORT KafkaMessage := RECORD
- KafkaMessageOffset;
- UTF8 message;
- END;
- // Service definition
- SHARED kafka := SERVICE : plugin('kafka'), namespace('KafkaPlugin')
- BOOLEAN PublishMessage(CONST VARSTRING brokers, CONST VARSTRING topic, CONST UTF8 message, CONST UTF8 key) : cpp,action,context,entrypoint='publishMessage';
- INTEGER4 getTopicPartitionCount(CONST VARSTRING brokers, CONST VARSTRING topic) : cpp,action,context,entrypoint='getTopicPartitionCount';
- STREAMED DATASET(KafkaMessage) GetMessageDataset(CONST VARSTRING brokers, CONST VARSTRING topic, CONST VARSTRING consumerGroup, INTEGER4 partitionNum, INTEGER8 maxRecords) : cpp,action,context,entrypoint='getMessageDataset';
- INTEGER8 SetMessageOffset(CONST VARSTRING brokers, CONST VARSTRING topic, CONST VARSTRING consumerGroup, INTEGER4 partitionNum, INTEGER8 newOffset) : cpp,action,context,entrypoint='setMessageOffset';
- END;
- /**
- * Module wrapping message publishing functions.
- *
- * @param topic The name of the topic this module will be publishing to;
- * cannot be an empty string; REQUIRED
- * @param brokers One or more Kafka broker; each broker should be in the
- * form 'Name[:port]' where 'Name' may be either a DNS name
- * or an IP address; multiple brokers should be delimited
- * with a comma; brokers can also be set in the
- * kafka_global.conf configuration file, in which case
- * you should pass an empty string; OPTIONAL,
- * defaulting to 'localhost'
- */
- EXPORT KafkaPublisher(VARSTRING topic, VARSTRING brokers = 'localhost') := MODULE
- /**
- * Get the number of partitions currently set up for this topic
- *
- * @return The number of partitions or zero if either the topic does not
- * exist or there was an error
- */
- EXPORT INTEGER4 GetTopicPartitionCount() := kafka.getTopicPartitionCount(brokers, topic);
- /**
- * Queue one message for publishing to the current Kafka topic
- *
- * @param message The message to publish; must not be an empty string;
- * REQUIRED
- * @param key A key to attach to the message, used by Kafka to
- * route the message to a particular partition (keys
- * with the same value wind up on the same partition);
- * an empty string indicates no key value; OPTIONAL,
- * defaults to an empty string
- *
- * @return TRUE
- */
- EXPORT BOOLEAN PublishMessage(CONST UTF8 message, CONST UTF8 key = U8'') := kafka.PublishMessage(brokers, topic, message, key);
- END;
- /**
- * Module wrapping message consuming functions.
- *
- * @param topic The name of the topic this module will be publishing to;
- * cannot be an empty string; REQUIRED
- * @param brokers One or more Kafka broker; each broker should be in the
- * form 'Name[:port]' where 'Name' may be either a DNS name
- * or an IP address; multiple brokers should be delimited
- * with a comma; brokers can also be set in the
- * kafka_global.conf configuration file, in which case
- * you should pass an empty string; OPTIONAL,
- * defaulting to 'localhost'
- * @param consumerGroup The name of the Kafka consumer group to use for any
- * message consumption;
- * (see https://kafka.apache.org/documentation.html#introduction);
- * OPTIONAL, defaults to 'hpcc'
- */
- EXPORT KafkaConsumer(VARSTRING topic, VARSTRING brokers = 'localhost', VARSTRING consumerGroup = 'hpcc') := MODULE
- /**
- * Get the number of partitions currently set up for this topic
- *
- * @return The number of partitions or zero if either the topic does not
- * exist or there was an error
- */
- EXPORT INTEGER4 GetTopicPartitionCount() := kafka.getTopicPartitionCount(brokers, topic);
- /**
- * Consume previously-published messages from the current topic.
- *
- * @param maxRecords The maximum number of records to retrieve; pass
- * zero to return as many messages as there are
- * queued (dangerous); REQUIRED
- *
- * @return A new dataset containing the retrieved messages
- */
- EXPORT DATASET(KafkaMessage) GetMessages(INTEGER8 maxRecords) := FUNCTION
- // Record structure to hold messages from multiple partitions
- MultiNodeMessageRec := RECORD
- DATASET(KafkaMessage) messages;
- END;
- numberOfPartitions := GetTopicPartitionCount() : INDEPENDENT;
- maxRecordsPerNode := MAX(maxRecords DIV numberOfPartitions, 1);
- // Container holding messages from all partitions; in a multi-node setup
- // the work will be distributed among the nodes (at least up to the
- // number of partitions); note that 'COUNTER - 1' is actually the
- // Kafka partition number that will be read
- messageContainer := DATASET
- (
- numberOfPartitions,
- TRANSFORM
- (
- MultiNodeMessageRec,
- SELF.messages := kafka.GetMessageDataset(brokers, topic, consumerGroup, COUNTER - 1, maxRecordsPerNode)
- ),
- DISTRIBUTED
- );
- // Map messages from multiple partitions back to final record structure
- resultDS := NORMALIZE
- (
- messageContainer,
- LEFT.messages,
- TRANSFORM
- (
- KafkaMessage,
- SELF := RIGHT
- ),
- LOCAL
- );
- RETURN resultDS;
- END;
- /**
- * Given a set of messages, presumably just consumed from an Apache Kafka
- * cluster, summarize the last-read message offsets on a per-partition basis.
- * This is useful for logging/saving the last messages read during a
- * particular run, which can then be used to restore system state if you
- * have to re-consume older messages (see SetMessageOffsets() function).
- *
- * @param messages A dataset of consumed messages; REQUIRED
- *
- * @return A new dataset containing a summary of partitions and their
- * associated last-read message offsets.
- */
- EXPORT DATASET(KafkaMessageOffset) LastMessageOffsets(DATASET(KafkaMessage) messages) := FUNCTION
- t := TABLE
- (
- messages,
- {
- partitionNum,
- INTEGER8 offset := MAX(GROUP, offset)
- },
- partitionNum,
- MERGE
- );
- f := PROJECT(t, TRANSFORM(KafkaMessageOffset, SELF := LEFT));
- RETURN f;
- END;
- /**
- * Resets the last-read partition offsets to the values in the given dataset.
- * This is useful for "rewinding" message reading to an earlier point. The
- * next call to GetMessages() will start consuming at the points described
- * in the dataset.
- *
- * @param offsets A dataset of of partitions and the offsets to which
- * you want to set each, like the result from a call
- * to LastMessageOffsets(); REQUIRED
- *
- * @return The number of partitions set
- */
- EXPORT UNSIGNED4 SetMessageOffsets(DATASET(KafkaMessageOffset) offsets) := FUNCTION
- // Distribute the offset data so that each partition lines up on the right node
- distOffsets := DISTRIBUTE(offsets, partitionNum);
- // Temporary result layout that will capture a COUNTER value generated
- // by PROJECT, which in turn ensures that the LOCAL flag is actually used
- // and our data distribution is honored (the distribution is required in
- // order to ensure that kafka.SetMessageOffset() is called on the correct
- // Thor nodes)
- ResultLayout := RECORD
- KafkaMessageOffset;
- UNSIGNED4 c;
- END;
- // Set the offset for each partition on each node
- result := PROJECT
- (
- NOFOLD(distOffsets),
- TRANSFORM
- (
- ResultLayout,
- SELF.offset := kafka.SetMessageOffset(brokers, topic, consumerGroup, LEFT.partitionNum, LEFT.offset),
- SELF.c := COUNTER,
- SELF := LEFT
- ),
- LOCAL
- );
- RETURN COUNT(result(offset >= 0));
- END;
- /**
- * Convenience function. Resets all topic partitions to their earliest
- * point.
- *
- * @return The number of partitions reset
- */
- EXPORT UNSIGNED4 ResetMessageOffsets() := FUNCTION
- numberOfPartitions := GetTopicPartitionCount() : INDEPENDENT;
- offsets := DATASET
- (
- numberOfPartitions,
- TRANSFORM
- (
- KafkaMessageOffset,
- SELF.partitionNum := COUNTER - 1,
- SELF.offset := 0
- )
- );
- RETURN SetMessageOffsets(offsets);
- END;
- END;
|