Jelajahi Sumber

HPCC-14393 New Kafka plugin for Thor and Roxie

Initial commit.

Signed-off-by: Dan S. Camper <dan.camper@lexisnexis.com>
Dan S. Camper 9 tahun lalu
induk
melakukan
0f517b4ef3

+ 3 - 0
.gitmodules

@@ -31,3 +31,6 @@
 [submodule "esp/src/crossfilter"]
 	path = esp/src/crossfilter
 	url = https://github.com/hpcc-systems/crossfilter.git
+[submodule "plugins/kafka/librdkafka"]
+	path = plugins/kafka/librdkafka
+	url = https://github.com/hpcc-systems/librdkafka.git

+ 2 - 0
cmake_modules/commonSetup.cmake

@@ -93,6 +93,7 @@ IF ("${COMMONSETUP_DONE}" STREQUAL "")
   option(USE_RINSIDE "Enable R support" ON)
   option(USE_MEMCACHED "Enable Memcached support" ON)
   option(USE_REDIS "Enable Redis support" ON)
+  option(USE_KAFKA "Enable Kafka support" ON)
 
   if (APPLE OR WIN32)
       option(USE_TBB "Enable Threading Building Block support" OFF)
@@ -112,6 +113,7 @@ IF ("${COMMONSETUP_DONE}" STREQUAL "")
       set( USE_CASSANDRA OFF )
       set( USE_MEMCACHED OFF )
       set( USE_REDIS OFF )
+      set( USE_KAFKA OFF )
   endif()
 
   if ( USE_XALAN AND USE_LIBXSLT )

+ 1 - 0
plugins/CMakeLists.txt

@@ -33,3 +33,4 @@ add_subdirectory (Rembed)
 add_subdirectory (cassandra)
 add_subdirectory (memcached)
 add_subdirectory (redis)
+add_subdirectory (kafka)

+ 127 - 0
plugins/kafka/CMakeLists.txt

@@ -0,0 +1,127 @@
+################################################################################
+#    HPCC SYSTEMS software Copyright (C) 2015 HPCC Systems®.
+#
+#    Licensed under the Apache License, Version 2.0 (the "License");
+#    you may not use this file except in compliance with the License.
+#    You may obtain a copy of the License at
+#
+#       http://www.apache.org/licenses/LICENSE-2.0
+#
+#    Unless required by applicable law or agreed to in writing, software
+#    distributed under the License is distributed on an "AS IS" BASIS,
+#    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#    See the License for the specific language governing permissions and
+#    limitations under the License.
+################################################################################
+
+# Component: kafka
+
+#####################################################
+# Description:
+# ------------
+#    Cmake Input File for kafka
+#####################################################
+
+project( kafka )
+
+if (USE_KAFKA)
+
+    ADD_PLUGIN(kafka PACKAGES OPTION MAKE_KAFKA)
+
+    if ( MAKE_KAFKA )
+
+        # librdkafka packages do not include everything we need to properly
+        # build against it; until/if they are ever fixed we will need to build
+        # our own version of librdkafka from source and include it ourselves
+
+        if( NOT EXISTS "${PROJECT_SOURCE_DIR}/librdkafka/configure" )
+            message( FATAL_ERROR
+"   The librdkafka submodule is not available.
+   This normally indicates that the git submodule has not been fetched.
+   Please run git submodule update --init --recursive")
+        endif()
+
+        if (APPLE)
+            set ( LIBRDKAFKA_LIB ${PROJECT_BINARY_DIR}/lib/librdkafka.dylib )
+            set ( LIBRDKAFKA_LIB_REAL ${PROJECT_BINARY_DIR}/lib/librdkafka.1.dylib )
+            set ( LIBRDKAFKACPP_LIB ${PROJECT_BINARY_DIR}/lib/librdkafka++.dylib )
+            set ( LIBRDKAFKACPP_LIB_REAL ${PROJECT_BINARY_DIR}/lib/librdkafka++.1.dylib )
+        else()
+            set ( LIBRDKAFKA_LIB ${PROJECT_BINARY_DIR}/lib/librdkafka.so )
+            set ( LIBRDKAFKA_LIB_REAL ${PROJECT_BINARY_DIR}/lib/librdkafka.so.1 )
+            set ( LIBRDKAFKACPP_LIB ${PROJECT_BINARY_DIR}/lib/librdkafka++.so )
+            set ( LIBRDKAFKACPP_LIB_REAL ${PROJECT_BINARY_DIR}/lib/librdkafka++.so.1 )
+        endif()
+
+        # librdkafka does not support out-of-source builds, so let's copy all
+        # of its source code to our binary directory and build there; further,
+        # we need to pull some working directory shenanigans for each command
+        # in order to make the built scripts function correctly
+
+        add_custom_command ( OUTPUT ${LIBRDKAFKA_LIB}
+                COMMAND cp -r ${PROJECT_SOURCE_DIR}/librdkafka ${PROJECT_BINARY_DIR}/src
+                COMMAND cd ${PROJECT_BINARY_DIR}/src && ./configure --prefix=${PROJECT_BINARY_DIR}
+                COMMAND cd ${PROJECT_BINARY_DIR}/src && make && make install
+                COMMENT Copying and building librdkafka
+            )
+
+        add_custom_target ( librdkafka-build ALL DEPENDS ${LIBRDKAFKA_LIB} )
+
+        # Add both libraries from librdkafka
+
+        add_library ( librdkafka SHARED IMPORTED )
+        set_property ( TARGET librdkafka PROPERTY IMPORTED_LOCATION ${LIBRDKAFKA_LIB} )
+        add_dependencies ( librdkafka librdkafka-build )
+
+        add_library ( librdkafkacpp STATIC IMPORTED )
+        set_property ( TARGET librdkafkacpp PROPERTY IMPORTED_LOCATION ${LIBRDKAFKACPP_LIB} )
+        add_dependencies ( librdkafkacpp librdkafka-build )
+
+        set (   SRCS
+                kafka.hpp
+                kafka.cpp
+            )
+
+        include_directories (
+                ./../../system/include
+                ./../../rtl/eclrtl
+                ./../../rtl/include
+                ./../../common/deftype
+                ./../../system/jlib
+                ${PROJECT_BINARY_DIR}/include
+                ${CMAKE_BINARY_DIR}
+            )
+
+        ADD_DEFINITIONS( -D_USRDLL -DECL_KAFKA_EXPORTS )
+        HPCC_ADD_LIBRARY( kafka SHARED ${SRCS} )
+
+        if (${CMAKE_VERSION} VERSION_LESS "2.8.9")
+            message("WARNING: Cannot set NO_SONAME. shlibdeps will give warnings when package is installed")
+        elseif(NOT APPLE)
+            set_target_properties( kafka PROPERTIES NO_SONAME 1 )
+        endif()
+
+        install ( TARGETS kafka
+                DESTINATION plugins
+            )
+
+        # Install our built librdkafka libraries into the RPM
+        install ( FILES ${LIBRDKAFKA_LIB} ${LIBRDKAFKA_LIB_REAL} ${LIBRDKAFKACPP_LIB} ${LIBRDKAFKACPP_LIB_REAL}
+                DESTINATION ${LIB_DIR}
+                COMPONENT Runtime
+            )
+
+        target_link_libraries ( kafka
+                librdkafka
+                librdkafkacpp
+                eclrtl
+                jlib
+                ${ZLIB_LIBRARIES}
+            )
+
+    endif()
+
+endif()
+
+#Even if not making the kafka plugin, we want to install the header
+install ( FILES ${CMAKE_CURRENT_SOURCE_DIR}/kafka.ecllib DESTINATION plugins COMPONENT Runtime)

+ 359 - 0
plugins/kafka/README.md

@@ -0,0 +1,359 @@
+#ECL Apache Kafka Plugin
+
+This is the ECL plugin to access [Apache Kafka](https://kafka.apache.org), a
+publish-subscribe messaging system.  ECL string data can be both published to
+and consumed from Apache Kafka brokers.
+
+Client access is via a third-party C++ plugin,
+[librdkafka](https://github.com/edenhill/librdkafka).
+
+##Installation and Dependencies
+
+[librdkafka](https://github.com/edenhill/librdkafka) is included as a git
+submodule in HPCC-Platform.  It will be built and integrated automatically when
+you build the HPCC-Platform project.
+
+The recommended method for obtaining Apache Kafka is via
+[download](https://kafka.apache.org/downloads.html).
+
+Note that Apache Kafka has its own set of dependencies, most notably
+[zookeeper](https://zookeeper.apache.org).  The Kafka download file does contain
+a Zookeeper installation, so for testing purposes you need to download only
+Apache Kafka and follow the excellent
+[instructions](https://kafka.apache.org/documentation.html#quickstart).  Those
+instructions will tell you how to start Zookeeper and Apache Kafka, then test
+your installation by creating a topic and interacting with it.
+
+*Note:* Apache Kafka version 0.8.2 or later is recommended.
+
+##Plugin Configuration
+
+The Apache Kafka plugin uses sensible default configuration values but these can
+be modified via configuration files.
+
+There are two types of configurations:  Global and per-topic.  Some
+configuration parameters are applicable only to publishers (producers, in Apache
+Kafka's terminology), others only to consumers, and some to both.  Details on
+the supported configuration parameters can be found on the [librdkafka
+configuration
+page](https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md).
+
+A configuration file is a simple text document with a series of key/value
+parameters, formatted like:
+
+    key=value
+    key=value
+    ...
+    key=value
+
+A '#' character at the beginning of a line denotes a comment.  Note that this is
+the only kind of comment supported in configuration files.
+
+Whenever a new connection is created (either publisher or consumer) the plugin
+will scan for configuration files.  All configuration files will reside in the
+HPCC configuration directory, which is `/etc/HPCCSystems`.  The global
+configuration file should be named `kafka_global.conf`.  Per-topic configuration
+files are also supported, and they can be different for a publisher or a
+consumer.  For a publisher, the naming convention is
+`kafka_publisher_topic_<TopicName>.conf` and for a consumer it is
+`kafka_consumer_topic_<TopicName>.conf`.  In both cases, `<TopicName>` is the
+name of the topic you are publishing to or consuming from.
+
+Configuration parameters loaded from a file override those set by the plugin
+with one exception:  the `metadata.broker.list` setting, if found in a
+configuration file, is ignored.  Apache Kafka brokers are always set in ECL.
+
+The following configuration parameters are set by the plugin for publishers,
+overriding their normal default values:
+
+    queue.buffering.max.messages=1000000
+    compression.codec=snappy
+    message.send.max.retries=3
+    retry.backoff.ms=500
+
+The following configuration parameters are set by the plugin for consumers,
+overriding their normal default values:
+
+    compression.codec=snappy
+    queued.max.messages.kbytes=10000000
+    fetch.message.max.bytes=10000000
+    auto.offset.reset=smallest
+
+##Publishing messages with the plugin
+
+Publishing string messages begins with instantiating an ECL module that defines
+the Apache Kafka cluster and the topic into which the messages will be posted. 
+The definition of the module is:
+
+    Publisher(VARSTRING topic, VARSTRING brokers = 'localhost') := MODULE
+        ...
+    END
+
+The module requires you to designate a topic by name and, optionally, at least
+one Apache Kafka broker.  The format of the broker is `BrokerName[:port]` where
+`BrokerName` is either an IP address or a DNS name of a broker.  You can
+optionally include a port number if the default Apache Kafka broker port is not
+used.  Multiple brokers can be listed, separated by commas.  Only one broker in
+an Apache Kafka cluster is required; the rest can be discovered once a
+connection is made.
+
+Example instantiating a publishing module:
+
+    p := kafka.Publisher('MyTopic', '10.211.55.13');
+
+The module contains an exported function for publishing a message, defined as:
+
+    BOOLEAN PublishMessage(CONST VARSTRING message, CONST VARSTRING key = '');
+
+The module function requires a string message and allows you to specify a 'key'
+that affects how Apache Kafka stores the message.  Key values act a lot like the
+expression argument in ECL's DISTRIBUTE() function:  Messages with the same key
+value wind up on the same Apache Kafka partition within the topic.  This can
+affect how consumers retrieve the published messages.  More details regarding
+partitions and how keys are used can be found Apache Kafka's
+[introduction](https://kafka.apache.org/documentation.html#introduction).  If a
+key value is not supplied than the messages are distributed among the available
+partitions for that topic.
+
+Examples:
+
+    p.PublishMessage('This is a test message');
+    p.PublishMessage('A keyed message', 'MyKey');
+    p.PublishMessage('Another keyed message', 'MyKey');
+
+Note that keys are not retrieved by the ECL Apache Kafka consumer.  They are
+used only to determine how the messages are stored.
+
+You can find out how many partitions are available in a publisher's topic by
+calling the following module function:
+
+    partitionCount := p.GetTopicPartitionCount();
+
+`GetTopicPartitionCount()` returns zero if the topic has not been created or
+there are has been an error.
+
+##Consuming messages with the plugin
+
+As with publishing, consuming string messages begins with instantiating an ECL
+module that defines the Apache Kafka cluster and the topic from which the
+messages will be read.  The definition of the module is:
+
+    Consumer(VARSTRING topic,
+             VARSTRING brokers = 'localhost',
+             VARSTRING consumerGroup = 'hpcc') := MODULE
+        ...
+    END
+
+The module requires you to designate a topic by name.  Optionally, you may also
+cite at least one Apache Kafka broker and a consumer group.  The format and
+requirements for a broker are the same as for instantiating a Producer module. 
+Consumer groups in Apache Kafka allow multiple consumer instances, like Thor
+nodes, to form a "logical consumer" and be able to retrieve messages in parallel
+and without duplication.  See the "Consumers" subtopic in Apache Kafka's
+[introduction](https://kafka.apache.org/documentation.html#introduction) for
+more details.
+
+Example:
+
+    c := kafka.Consumer('MyTopic', '10.211.55.13');
+
+The module contains an exported function for consuming messages, defined as:
+
+    DATASET(KafkaMessage) GetMessages(INTEGER4 maxRecords);
+
+This function returns a new dataset containing messages consumed by the topic
+defined in the module.  The layout for that dataset is:
+
+    KafkaMessage := RECORD
+        UNSIGNED4   partition;
+        INTEGER8    offset;
+        STRING      message;
+    END;
+
+Example retrieving up to 10,000 messages:
+
+    myMessages := c.GetMessages(10000);
+
+After you consume some messages it may be beneficial to track the last-read
+offset from each Apache Kafka topic partition.  The following module function
+does that:
+
+    DATASET(KafkaMessageOffset) LastMessageOffsets(DATASET(KafkaMessage) messages);
+
+Basically, you pass in the just-consumed message dataset to the function and get
+back a small dataset containing just the partition numbers and the last-read
+message's offset.  The layout of the returned dataset is:
+
+    KafkaMessageOffset := RECORD
+        UNSIGNED4   partitionNum;
+        INTEGER8    offset;
+    END;
+
+Example call:
+
+    myOffsets := c.LastMessageOffsets(myMessages);
+
+If you later find out that you need to "rewind" your consumption -- read old
+messages, in other words -- you can use the data within a KafkaMessageOffset
+dataset to reset your consumers, making the next `GetMessages()` call pick up
+from that point.  Use the following module function to reset the offsets:
+
+    UNSIGNED4 SetMessageOffsets(DATASET(KafkaMessageOffset) offsets);
+
+The function returns the number of partitions reset (which should equal the
+number of records you're handing the function).
+
+Example call:
+
+    numPartitionsReset := c.SetMessageOffsets(myOffsets);
+
+You can easily reset all topic partitions to their earliest point with the
+following module function:
+
+    UNSIGNED4 ResetMessageOffsets();
+
+This function returns the number of partitions reset.
+
+Example call:
+
+    numPartitionsReset := c.ResetMessageOffsets();
+
+You can find out how many partitions are available in a consumers's topic by
+calling the following module function:
+
+    partitionCount := c.GetTopicPartitionCount();
+
+`GetTopicPartitionCount()` returns zero if the topic has not been created or
+there are has been an error.
+
+##Complete ECL Examples
+
+The following code will publish 100K messages to a topic named 'MyTestTopic' on
+an Apache Kafka broker located at address 10.211.55.13.  If you are running a
+single-node HPCC cluster and have installed Kafka on the same node, you can use
+'localhost' instead (or omit the parameter, as it defaults to 'localhost').
+
+###Publishing
+
+    IMPORT kafka;
+
+    MyDataLayout := RECORD
+        STRING  message;
+    END;
+
+    ds := DATASET
+        (
+            100000,
+            TRANSFORM
+                (
+                    MyDataLayout,
+                    SELF.message := 'Test message ' + (STRING)COUNTER
+                ),
+            DISTRIBUTED
+        );
+
+    p := kafka.Publisher('MyTestTopic', brokers := '10.211.55.13');
+
+    APPLY(ds, ORDERED(p.PublishMessage(message)));
+
+###Consuming
+
+This code will read the messages written by the publishing example, above.  It
+will also show the number of partitions in the topic and the offsets of the
+last-read messages.
+
+    IMPORT kafka;
+
+    c := kafka.Consumer('MyTestTopic', brokers := '10.211.55.13');
+
+    ds := c.GetMessages(200000);
+    offsets := c.LastMessageOffsets(ds);
+    partitionCount := c.GetTopicPartitionCount();
+
+    OUTPUT(ds, NAMED('MessageSample'));
+    OUTPUT(COUNT(ds), NAMED('MessageCount'));
+    OUTPUT(offsets, NAMED('LastMessageOffsets'));
+    OUTPUT(partitionCount, NAMED('PartitionCount'));
+
+###Resetting Offsets
+
+Resetting offsets is useful when you have a topic already published with
+messages and you need to reread its messages from the very beginning.
+
+    IMPORT kafka;
+
+    c := kafka.Consumer('MyTestTopic', brokers := '10.211.55.13');
+
+    c.ResetMessageOffsets();
+
+##Behaviour and Implementation Details
+
+###Partitioning within Apache Kafka Topics
+
+Topic partitioning is covered in Apache Kafka's
+[introduction](https://kafka.apache.org/documentation.html#introduction).  There
+is a performance relationship between the number of partitions in a topic and
+the size of the HPCC cluster when consuming messages.  Ideally, the number of
+partitions will exactly equal the number of HPCC nodes consuming messages.  For
+Thor, this means the total number of slaves rather than the number of nodes, as
+that can be different in a multi-slave setup.  For Roxie, the number is always
+one.  If there are fewer partitions than nodes (slaves) then not all of your
+cluster will be utilized when consuming messages; if there are more partitions
+than nodes (slaves) then some nodes will be performing extra work, consuming
+from multiple partitions.  In either mismatch case, you may want to consider
+using the ECL DISTRIBUTE() function to redistribute your data before processing.
+
+When messages are published without a 'key' argument to a topic that has more
+than one partition, Apache Kafka will distribute those messages among the
+partitions.  The distribution is not perfect.  For example, if you publish 20
+messages to a topic with two partitions, one partition may wind up with 7
+messages and the other with 13 (or some other mix of message counts that total
+20).  When testing your code, be aware of this behavior and always request more
+messages than you publish.  In the examples above, 100K messages were published
+but up to 200K messages were requested.  This ensures that you receive all of
+the messages you publish.  This is typically not an issue in a production
+environment, as your requested consumption message count is more a function of
+how much data you're willing to process in one step than with how many messages
+are actually stored in the topic.
+
+Be aware that, by default, Apache Kafka will automatically create a topic that
+has never been seen before if someone publishes to it, and that topic will have
+only one partition.  Both actions -- whether a topic is automatically created
+and how many partitions it will have -- are configurable within Apache Kafka.
+
+###Publisher Connections
+
+This plugin caches the internal publisher objects and their connections. 
+Publishing from ECL, technically, only writes the messages to a local cache. 
+Those messages are batched and set to Apache Kafka for higher performance in a
+background thread.  Because this batching can extend far beyond the time ECL
+spends sending the data to the local cache, the objects (and their connections)
+need to hang around for some additional time.  The upside is that the cached
+objects and connections will be reused for subsequent publish operations,
+speeding up the entire process.
+
+###Consumer Connections
+
+Unlike publisher objects, one consumer object is created per thread for each
+connection.  A connection is to a specific broker, topic, consumer group, and
+partition number combination.  The consumer objects and connections live only as
+long as needed.
+
+###Saved Topic Offsets
+
+By default, consumers save to a file the offset of the last-read message from a
+given topic, consumer group, and partition combination.  The offset is saved so
+that the next time the consumer is fired up for that particular connection
+combination, the consumption process can pick up where it left off.  The file is
+saved to the HPCC engine's data directory which is typically
+`/var/lib/HPCCSystems/mythor/`, `/var/lib/HPCCSystems/myroxie/` or
+`/var/lib/HPCCSystems/myeclagent/` depending on the engine you're using (the
+exact path may be different if you have named an engine differently in your HPCC
+configuration).  The format of the saved offset filename is
+`<TopicName>-<PartitionNum>-<ConsumerGroup>.offset`.
+
+Note that saving partition offsets is engine-specific.  One practical
+consideration of this is that you cannot have one engine (e.g. Thor) consume
+from a given topic and then have another engine (e.g. Roxie) consume the next
+set of messages from that topic.  Both engines can consume messages without a
+problem, but they will not track each other's last-read positions.

File diff ditekan karena terlalu besar
+ 1068 - 0
plugins/kafka/kafka.cpp


+ 255 - 0
plugins/kafka/kafka.ecllib

@@ -0,0 +1,255 @@
+/*##############################################################################
+
+    HPCC SYSTEMS software Copyright (C) 2015 HPCC Systems®.
+
+    Licensed under the Apache License, Version 2.0 (the License);
+    you may not use this file except in compliance with the License.
+    You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing, software
+    distributed under the License is distributed on an AS IS BASIS,
+    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+    See the License for the specific language governing permissions and
+    limitations under the License.
+############################################################################## */
+
+// Record structure containing message offset positioning
+EXPORT KafkaMessageOffset := RECORD
+    UNSIGNED4   partitionNum;
+    INTEGER8    offset;
+END;
+
+// Record structure that will be used to return Kafka messages to ECL
+EXPORT KafkaMessage := RECORD
+    KafkaMessageOffset;
+    STRING      message;
+END;
+
+// Service definition
+SHARED kafka := SERVICE : plugin('kafka'), namespace('KafkaPlugin')
+
+    BOOLEAN PublishMessage(CONST VARSTRING brokers, CONST VARSTRING topic, CONST VARSTRING message, CONST VARSTRING key) : cpp,action,context,entrypoint='publishMessage';
+    INTEGER4 getTopicPartitionCount(CONST VARSTRING brokers, CONST VARSTRING topic) : cpp,action,context,entrypoint='getTopicPartitionCount';
+    STREAMED DATASET(KafkaMessage) GetMessageDataset(CONST VARSTRING brokers, CONST VARSTRING topic, CONST VARSTRING consumerGroup, INTEGER4 partitionNum, INTEGER8 maxRecords) : cpp,action,context,entrypoint='getMessageDataset';
+    INTEGER8 SetMessageOffset(CONST VARSTRING brokers, CONST VARSTRING topic, CONST VARSTRING consumerGroup, INTEGER4 partitionNum, INTEGER8 newOffset) : cpp,action,context,entrypoint='setMessageOffset';
+
+END;
+
+/**
+ * Module wrapping message publishing functions.
+ *
+ * @param   topic           The name of the topic this module will be publishing to;
+ *                          cannot be an empty string; REQUIRED
+ * @param   brokers         One or more Kafka broker; each broker should be in the
+ *                          form 'Name[:port]' where 'Name' may be either a DNS name
+ *                          or an IP address; multiple brokers should be delimited
+ *                          with a comma; brokers can also be set in the
+ *                          kafka_global.conf configuration file, in which case
+ *                          you should pass an empty string; OPTIONAL,
+ *                          defaulting to 'localhost'
+ */
+EXPORT Publisher(VARSTRING topic, VARSTRING brokers = 'localhost') := MODULE
+
+    /**
+     * Get the number of partitions currently set up for this topic
+     *
+     * @return  The number of partitions or zero if either the topic does not
+     *          exist or there was an error
+     */
+    EXPORT INTEGER4 GetTopicPartitionCount() := kafka.getTopicPartitionCount(brokers, topic);
+
+    /**
+     * Queue one message for publishing to the current Kafka topic
+     *
+     * @param   message     The message to publish; must not be an empty string;
+     *                      REQUIRED
+     * @param   key         A key to attach to the message, used by Kafka to
+     *                      route the message to a particular partition (keys
+     *                      with the same value wind up on the same partition);
+     *                      an empty string indicates no key value; OPTIONAL,
+     *                      defaults to an empty string
+     *
+     * @return  TRUE
+     */
+    EXPORT BOOLEAN PublishMessage(CONST VARSTRING message, CONST VARSTRING key = '') := kafka.PublishMessage(brokers, topic, message, key);
+
+END;
+
+/**
+ * Module wrapping message consuming functions.
+ *
+ * @param   topic           The name of the topic this module will be publishing to;
+ *                          cannot be an empty string; REQUIRED
+ * @param   brokers         One or more Kafka broker; each broker should be in the
+ *                          form 'Name[:port]' where 'Name' may be either a DNS name
+ *                          or an IP address; multiple brokers should be delimited
+ *                          with a comma; brokers can also be set in the
+ *                          kafka_global.conf configuration file, in which case
+ *                          you should pass an empty string; OPTIONAL,
+ *                          defaulting to 'localhost'
+ * @param   consumerGroup   The name of the Kafka consumer group to use for any
+ *                          message consumption;
+ *                          (see https://kafka.apache.org/documentation.html#introduction);
+ *                          OPTIONAL, defaults to 'hpcc'
+ */
+EXPORT Consumer(VARSTRING topic, VARSTRING brokers = 'localhost', VARSTRING consumerGroup = 'hpcc') := MODULE
+
+    /**
+     * Get the number of partitions currently set up for this topic
+     *
+     * @return  The number of partitions or zero if either the topic does not
+     *          exist or there was an error
+     */
+    EXPORT INTEGER4 GetTopicPartitionCount() := kafka.getTopicPartitionCount(brokers, topic);
+
+    /**
+     * Consume previously-published messages from the current topic.
+     *
+     * @param   maxRecords  The maximum number of records to retrieve; pass
+     *                      zero to return as many messages as there are
+     *                      queued (dangerous); REQUIRED
+     *
+     * @return  A new dataset containing the retrieved messages
+     */
+    EXPORT DATASET(KafkaMessage) GetMessages(INTEGER8 maxRecords) := FUNCTION
+
+        // Record structure to hold messages from multiple partitions
+        MultiNodeMessageRec := RECORD
+            DATASET(KafkaMessage)   messages;
+        END;
+
+        numberOfPartitions := GetTopicPartitionCount() : INDEPENDENT;
+        maxRecordsPerNode := MAX(maxRecords DIV numberOfPartitions, 1);
+
+        // Container holding messages from all partitions; in a multi-node setup
+        // the work will be distributed among the nodes (at least up to the
+        // number of partitions); note that 'COUNTER - 1' is actually the
+        // Kafka partition number that will be read
+        messageContainer := DATASET
+            (
+                numberOfPartitions,
+                TRANSFORM
+                    (
+                        MultiNodeMessageRec,
+                        SELF.messages := kafka.GetMessageDataset(brokers, topic, consumerGroup, COUNTER - 1, maxRecordsPerNode)
+                    ),
+                DISTRIBUTED
+            );
+
+        // Map messages from multiple partitions back to final record structure
+        resultDS := NORMALIZE
+            (
+                messageContainer,
+                LEFT.messages,
+                TRANSFORM
+                    (
+                        KafkaMessage,
+                        SELF := RIGHT
+                    ),
+                LOCAL
+            );
+
+        RETURN resultDS;
+
+    END;
+
+    /**
+     * Given a set of messages, presumably just consumed from an Apache Kafka
+     * cluster, summarize the last-read message offsets on a per-partition basis.
+     * This is useful for logging/saving the last messages read during a
+     * particular run, which can then be used to restore system state if you
+     * have to re-consume older messages (see SetMessageOffsets() function).
+     *
+     * @param   messages    A dataset of consumed messages; REQUIRED
+     *
+     * @return  A new dataset containing a summary of partitions and their
+     *          associated last-read message offsets.
+     */
+    EXPORT DATASET(KafkaMessageOffset) LastMessageOffsets(DATASET(KafkaMessage) messages) := FUNCTION
+        t := TABLE
+            (
+                messages,
+                {
+                    partitionNum,
+                    INTEGER8    offset := MAX(GROUP, offset)
+                },
+                partitionNum,
+                MERGE
+            );
+
+        f := PROJECT(t, TRANSFORM(KafkaMessageOffset, SELF := LEFT));
+
+        RETURN f;
+    END;
+
+    /**
+     * Resets the last-read partition offsets to the values in the given dataset.
+     * This is useful for "rewinding" message reading to an earlier point.  The
+     * next call to GetMessages() will start consuming at the points described
+     * in the dataset.
+     *
+     * @param   offsets     A dataset of of partitions and the offsets to which
+     *                      you want to set each, like the result from a call
+     *                      to LastMessageOffsets(); REQUIRED
+     *
+     * @return  The number of partitions set
+     */
+    EXPORT UNSIGNED4 SetMessageOffsets(DATASET(KafkaMessageOffset) offsets) := FUNCTION
+
+        // Distribute the offset data so that each partition lines up on the right node
+        distOffsets := DISTRIBUTE(offsets, partitionNum);
+
+        // Temporary result layout that will capture a COUNTER value generated
+        // by PROJECT, which in turn ensures that the LOCAL flag is actually used
+        // and our data distribution is honored (the distribution is required in
+        // order to ensure that kafka.SetMessageOffset() is called on the correct
+        // Thor nodes)
+        ResultLayout := RECORD
+            KafkaMessageOffset;
+            UNSIGNED4   c;
+        END;
+
+        // Set the offset for each partition on each node
+        result := PROJECT
+            (
+                distOffsets,
+                TRANSFORM
+                    (
+                        ResultLayout,
+                        SELF.offset := kafka.SetMessageOffset(brokers, topic, consumerGroup, LEFT.partitionNum, LEFT.offset),
+                        SELF.c := COUNTER,
+                        SELF := LEFT
+                    ),
+                LOCAL
+            );
+
+        RETURN COUNT(result(offset >= -1));
+    END;
+
+    /**
+     * Convenience function.  Resets all topic partitions to their earliest
+     * point.
+     *
+     * @return  The number of partitions reset
+     */
+    EXPORT UNSIGNED4 ResetMessageOffsets() := FUNCTION
+
+        numberOfPartitions := GetTopicPartitionCount() : INDEPENDENT;
+
+        offsets := DATASET
+            (
+                numberOfPartitions,
+                TRANSFORM
+                    (
+                        KafkaMessageOffset,
+                        SELF.partitionNum := COUNTER - 1,
+                        SELF.offset := -1
+                    )
+            );
+
+        RETURN SetMessageOffsets(offsets);
+    END;
+
+END;

+ 441 - 0
plugins/kafka/kafka.hpp

@@ -0,0 +1,441 @@
+/*##############################################################################
+
+    HPCC SYSTEMS software Copyright (C) 2015 HPCC Systems®.
+
+    Licensed under the Apache License, Version 2.0 (the "License");
+    you may not use this file except in compliance with the License.
+    You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing, software
+    distributed under the License is distributed on an "AS IS" BASIS,
+    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+    See the License for the specific language governing permissions and
+    limitations under the License.
+############################################################################## */
+
+#ifndef ECL_KAFKA_INCL
+#define ECL_KAFKA_INCL
+
+#ifdef _WIN32
+#define ECL_KAFKA_CALL _cdecl
+#ifdef ECL_KAFKA_EXPORTS
+#define ECL_KAFKA_API __declspec(dllexport)
+#else
+#define ECL_KAFKA_API __declspec(dllimport)
+#endif
+#else
+#define ECL_KAFKA_CALL
+#define ECL_KAFKA_API
+#endif
+
+#include "platform.h"
+#include "jthread.hpp"
+#include "hqlplugins.hpp"
+#include "eclrtl_imp.hpp"
+#include "eclhelper.hpp"
+
+#include <atomic>
+#include <string>
+#include <time.h>
+
+#include "librdkafka/rdkafkacpp.h"
+
+#ifdef ECL_KAFKA_EXPORTS
+extern "C"
+{
+    ECL_KAFKA_API bool getECLPluginDefinition(ECLPluginDefinitionBlock *pb);
+}
+#endif
+
+extern "C++"
+{
+    namespace KafkaPlugin
+    {
+        class KafkaObj;
+        class Poller;
+        class Publisher;
+        class Consumer;
+        class KafkaStreamedDataset;
+
+        /** @class KafkaObj
+         *
+         *  Parent class for both Publisher and Consumer classes.  Provides
+         *  easy way for a Poller object to access either for callbacks, etc.
+         */
+        class KafkaObj
+        {
+            public:
+
+                /**
+                 * Returns a pointer to the librdkafka object that can be either
+                 * a producer or consumer.
+                 */
+                virtual RdKafka::Handle* handle() = 0;
+        };
+
+        //----------------------------------------------------------------------
+
+        /** @class Poller
+         *
+         *  Background execution of librdkafka's poll() function, which is
+         *  required in order to batch I/O.  One Poller will be created
+         *  for each Publisher and Consumer object actively used
+         */
+        class Poller : public Thread
+        {
+            public:
+
+                /**
+                 * Constructor
+                 *
+                 * @param   _parentPtr      Pointer to Publisher or Consumer object
+                 *                          that created this object
+                 * @param   _pollTimeout    The number of milliseconds to wait
+                 *                          for events within librdkafka
+                 */
+                Poller(KafkaObj* _parentPtr, __int32 _pollTimeout);
+
+                /**
+                 * Starts execution of the thread main event loop
+                 */
+                virtual void start();
+
+                /**
+                 * Stops execution of the thread main event loop.  Note that we
+                 * wait until the main event loop has actually stopped before
+                 * returning.
+                 */
+                void stop();
+
+                /**
+                 * Entry point to the thread main event loop.  Exiting this
+                 * method means that the thread should stop.
+                 */
+                virtual int run();
+
+            private:
+
+                std::atomic_bool    shouldRun;      //!< If true, we should execute our thread's main event loop
+                KafkaObj*           parentPtr;      //!< Pointer to object that started this threaded execution
+                __int32             pollTimeout;    //!< The amount of time (in ms) we give to librdkafka's poll() function
+        };
+
+        //----------------------------------------------------------------------
+
+        class Publisher : public KafkaObj, public RdKafka::EventCb, public RdKafka::DeliveryReportCb
+        {
+            public:
+
+                /**
+                 * Constructor
+                 *
+                 * @param   _brokers        One or more Kafka brokers, in the
+                 *                          format 'name[:port]' where 'name'
+                 *                          is either a host name or IP address;
+                 *                          multiple brokers can be delimited
+                 *                          with commas
+                 * @param   _topic          The name of the topic we will be
+                 *                          publishing to
+                 * @param   _pollTimeout    The number of milliseconds to wait
+                 *                          for events within librdkafka
+                 * @param   _traceLevel     Current logging level
+                 */
+                Publisher(const std::string& _brokers, const std::string& _topic, __int32 _pollTimeout, int _traceLevel);
+
+                virtual ~Publisher();
+
+                /**
+                 * @return  A pointer to the librdkafka producer object.
+                 */
+                virtual RdKafka::Handle* handle();
+
+                /**
+                 * @return  Updates the touch time and returns it.
+                 */
+                time_t updateTimeTouched();
+
+                /**
+                 * @return  The time at which this object was created
+                 */
+                time_t getTimeTouched() const;
+
+                /**
+                 * If needed, establish connection to Kafka cluster using the
+                 * parameters stored within this object.
+                 */
+                void ensureSetup();
+
+                /**
+                 * Stops the attached poller's main event loop.  This should be
+                 * called before deletion.
+                 */
+                void shutdownPoller();
+
+                /**
+                 * @return  Returns the number of messages currently waiting
+                 *          in the local outbound queue, ready for transmission
+                 *          to the Kafka cluster
+                 */
+                __int32 messagesWaitingInQueue();
+
+                /**
+                 * Send one message
+                 *
+                 * @param   message     The message to send
+                 * @param   key         The key to attach to the message
+                 */
+                void sendMessage(const std::string& message, const std::string& key);
+
+                /**
+                 * Callback function.  librdkafka will call here, outside of a
+                 * poll(), when it has interesting things to tell us
+                 *
+                 * @param   event       Reference to an Event object provided
+                 *                      by librdkafka
+                 */
+                virtual void event_cb(RdKafka::Event& event);
+
+                /**
+                 * Callback function.  librdkafka will call here to notify
+                 * us of problems with delivering messages to the server
+                 *
+                 * @param   message     Reference to an Message object provided
+                 *                      by librdkafka
+                 */
+                virtual void dr_cb (RdKafka::Message& message);
+
+            private:
+
+                std::string                     brokers;        //!< One or more Kafka bootstrap brokers; comma-delimited; NameOrIP[:port]
+                std::string                     topic;          //!< The name of the topic to publish to
+                RdKafka::Producer*              producerPtr;    //!< Pointer to librdkafka producer object
+                std::atomic<RdKafka::Topic*>    topicPtr;       //!< Pointer to librdkafka topic object
+                CriticalSection                 lock;           //!< Mutex to ensure that only one thread creates the librdkafka object pointers
+                Poller*                         pollerPtr;      //!< Pointer to the threaded Poller object that gives time to librdkafka
+                __int32                         pollTimeout;    //!< The amount of time (in ms) we give to librdkafka's poll() function
+                time_t                          timeCreated;    //!< The time at which this object was created
+                int                             traceLevel;     //!< The current logging level
+        };
+
+        //----------------------------------------------------------------------
+
+        class Consumer : public KafkaObj, public RdKafka::EventCb
+        {
+            public:
+
+                /**
+                 * Constructor
+                 *
+                 * @param   _brokers        One or more Kafka brokers, in the
+                 *                          format 'name[:port]' where 'name'
+                 *                          is either a host name or IP address;
+                 *                          multiple brokers can be delimited
+                 *                          with commas
+                 * @param   _topic          The name of the topic we will be
+                 *                          consuming from
+                 * @param   _partitionNum   The topic partition number we will be
+                 *                          consuming from
+                 * @param   _traceLevel     Current logging level
+                 */
+                Consumer(const std::string& _brokers, const std::string& _topic, const std::string& _consumerGroup, __int32 _partitionNum, int _traceLevel);
+
+                virtual ~Consumer();
+
+                /**
+                 * @return  A pointer to the librdkafka consumer object.
+                 */
+                virtual RdKafka::Handle* handle();
+
+                /**
+                 * If needed, establish connection to Kafka cluster using the
+                 * parameters stored within this object.
+                 */
+                void ensureSetup();
+
+                /**
+                 * @return  Returns one new message from the inbound Kafka
+                 *          topic.  A NON-NULL RESULT MUST EVENTUALLY BE
+                 *          DISPOSED OF WITH A CALL TO delete().
+                 */
+                RdKafka::Message* getOneMessage();
+
+                /**
+                 * Retrieves many messages from the inbound Kafka topic and
+                 * returns them as a streamed dataset.  Note that this is a
+                 * per-brokers/per-topic/per-partition retrieval.
+                 *
+                 * @param   allocator       The allocator to use with RowBuilder
+                 * @param   maxRecords      The maximum number of records
+                 *                          to retrieved
+                 *
+                 * @return  An IRowStream streamed dataset object pointer
+                 */
+                KafkaStreamedDataset* getMessageDataset(IEngineRowAllocator* allocator, __int64 maxRecords = 1);
+
+                /**
+                 * @return  StringBuffer object containing the path to this
+                 *          consumer's offset file
+                 */
+                StringBuffer offsetFilePath() const;
+
+                /**
+                 * Commits the given offset to storage so we can pick up
+                 * where we left off in a subsequent read.
+                 *
+                 * @param   offset          The offset to store
+                 */
+                void commitOffset(__int64 offset) const;
+
+                /**
+                 * If the offset file does not exist, create one with a
+                 * default offset
+                 */
+                void initFileOffsetIfNotExist() const;
+
+                /**
+                 * Callback function.  librdkafka will call here, outside of a
+                 * poll(), when it has interesting things to tell us
+                 *
+                 * @param   event       Reference to an Event object provided
+                 *                      by librdkafka
+                 */
+                virtual void event_cb(RdKafka::Event& event);
+
+            private:
+
+                std::string                     brokers;        //!< One or more Kafka bootstrap brokers; comma-delimited; NameOrIP[:port]
+                std::string                     topic;          //!< The name of the topic to consume from
+                std::string                     consumerGroup;  //!< The name of the consumer group for this consumer object
+                RdKafka::Consumer*              consumerPtr;    //!< Pointer to librdkafka consumer object
+                std::atomic<RdKafka::Topic*>    topicPtr;       //!< Pointer to librdkafka topic object
+                CriticalSection                 lock;           //!< Mutex to ensure that only one thread creates the librdkafka object pointers or starts/stops the queue
+                __int32                         partitionNum;   //!< The partition within the topic from which we will be pulling messages
+                bool                            queueStarted;   //!< If true, we have started the process of reading from the queue
+                int                             traceLevel;     //!< The current logging level
+        };
+
+        //----------------------------------------------------------------------
+
+        class KafkaStreamedDataset : public RtlCInterface, implements IRowStream
+        {
+            public:
+
+                /**
+                 * Constructor
+                 *
+                 * @param   _consumerPtr        Pointer to the Consumer object
+                 *                              from which we will be retrieving
+                 *                              records
+                 * @param   _resultAllocator    The memory allocator used to build
+                 *                              the result rows; this is provided
+                 *                              by the platform during the
+                 *                              plugin call
+                 * @param   _traceLevel         The current logging level
+                 * @param   _maxRecords         The maximum number of records
+                 *                              to return; use 0 to return all
+                 *                              available records
+                 */
+                KafkaStreamedDataset(Consumer* _consumerPtr, IEngineRowAllocator* _resultAllocator, int _traceLevel, __int64 _maxRecords = -1);
+
+                virtual ~KafkaStreamedDataset();
+
+                RTLIMPLEMENT_IINTERFACE
+
+                virtual const void* nextRow();
+
+                virtual void stop();
+
+            private:
+
+                Consumer*                       consumerPtr;        //!< Pointer to the Consumer object that we will read from
+                Linked<IEngineRowAllocator>     resultAllocator;    //!< Pointer to allocator used when building result rows
+                int                             traceLevel;         //!< The current logging level
+                bool                            shouldRead;         //!< If true, we should continue trying to read more messages
+                __int64                         maxRecords;         //!< The maximum number of messages to read
+                __int64                         consumedRecCount;   //!< The number of messages actually read
+                __int64                         lastMsgOffset;      //!< The offset of the last message read from the consumer
+        };
+
+        //----------------------------------------------------------------------
+
+        /**
+         * Queues the message for publishing to a topic on a Kafka cluster.
+         *
+         * @param   brokers             One or more Kafka brokers, in the
+         *                              format 'name[:port]' where 'name'
+         *                              is either a host name or IP address;
+         *                              multiple brokers can be delimited
+         *                              with commas
+         * @param   topic               The name of the topic
+         * @param   message             The message to send
+         * @param   key                 The key to use for the message
+         *
+         * @return  true if the message was cached successfully
+         */
+        ECL_KAFKA_API bool ECL_KAFKA_CALL publishMessage(const char* brokers, const char* topic, const char* message, const char* key);
+
+        /**
+         * Get the number of partitions currently set up for a topic on a cluster.
+         *
+         * @param   brokers             One or more Kafka brokers, in the
+         *                              format 'name[:port]' where 'name'
+         *                              is either a host name or IP address;
+         *                              multiple brokers can be delimited
+         *                              with commas
+         * @param   topic               The name of the topic
+         *
+         * @return  The number of partitions or zero if either the topic does not
+         *          exist or there was an error
+         */
+        ECL_KAFKA_API __int32 ECL_KAFKA_CALL getTopicPartitionCount(const char* brokers, const char* topic);
+
+        /**
+         * Retrieves a set of messages on a topic from a Kafka cluster.
+         *
+         * @param   ctx                 Platform-provided context point
+         * @param   allocator           Platform-provided memory allocator used
+         *                              to help build data rows for returning
+         * @param   brokers             One or more Kafka brokers, in the
+         *                              format 'name[:port]' where 'name'
+         *                              is either a host name or IP address;
+         *                              multiple brokers can be delimited
+         *                              with commas
+         * @param   topic               The name of the topic
+         * @param   consumerGroup       The name of the consumer group to use; see
+         *                              https://kafka.apache.org/documentation.html#introduction
+         * @param   partitionNum        The topic partition from which to pull
+         *                              messages; this is a zero-based index
+         * @param   maxRecords          The maximum number of records return;
+         *                              pass zero to return as many messages
+         *                              as possible (dangerous)
+         *
+         * @return  An IRowStream pointer representing the fetched messages
+         *          or NULL if no messages could be retrieved
+         */
+        ECL_KAFKA_API IRowStream* ECL_KAFKA_CALL getMessageDataset(ICodeContext* ctx, IEngineRowAllocator* allocator, const char* brokers, const char* topic, const char* consumerGroup, __int32 partitionNum, __int64 maxRecords);
+
+        /**
+         * Resets the saved offsets for a partition.
+         *
+         * @param   ctx                 Platform-provided context point
+         * @param   brokers             One or more Kafka brokers, in the
+         *                              format 'name[:port]' where 'name'
+         *                              is either a host name or IP address;
+         *                              multiple brokers can be delimited
+         *                              with commas
+         * @param   topic               The name of the topic
+         * @param   consumerGroup       The name of the consumer group to use; see
+         *                              https://kafka.apache.org/documentation.html#introduction
+         * @param   partitionNum        The topic partition from which to pull
+         *                              messages; this is a zero-based index
+         * @param   newOffset           The new offset to save
+         *
+         * @return  The offset that was saved
+         */
+        ECL_KAFKA_API __int64 ECL_KAFKA_CALL setMessageOffset(ICodeContext* ctx, const char* brokers, const char* topic, const char* consumerGroup, __int32 partitionNum, __int64 newOffset);
+    }
+}
+
+#endif

+ 1 - 0
plugins/kafka/librdkafka

@@ -0,0 +1 @@
+Subproject commit 3e1babf4f26a7d12bbd272c1cdf4aa6a44000d4a

+ 75 - 0
testing/regress/ecl/kafkatest.ecl

@@ -0,0 +1,75 @@
+/*##############################################################################
+
+    HPCC SYSTEMS software Copyright (C) 2015 HPCC Systems®.
+
+    Licensed under the Apache License, Version 2.0 (the "License");
+    you may not use this file except in compliance with the License.
+    You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing, software
+    distributed under the License is distributed on an "AS IS" BASIS,
+    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+    See the License for the specific language governing permissions and
+    limitations under the License.
+############################################################################## */
+
+//class=embedded
+//class=3rdparty
+
+IMPORT kafka;
+IMPORT Std;
+
+/*******************************************************************************
+ * These tests assume a Kafka instance running on the local host with a default
+ * Kafka configuration.  Each iteration of the test creates a new Kafka topic
+ * named after the WUID of the test.  These should be periodically cleaned
+ * off Kafka (or the Kafka instance itself be refreshed).
+ ******************************************************************************/
+
+KAFKA_BROKER := '127.0.0.1';
+KAFKA_TEST_TOPIC := Std.System.Job.WUID() : INDEPENDENT;
+KAFKA_CONSUMER_GROUP := 'regress';
+
+p := kafka.Publisher(KAFKA_TEST_TOPIC, KAFKA_BROKER);
+c := kafka.Consumer(KAFKA_TEST_TOPIC, KAFKA_BROKER, KAFKA_CONSUMER_GROUP);
+
+SEQUENTIAL
+    (
+        // Action to prompt Kafka to create a new topic for us; this will
+        // will result in a partition count of zero, which is normal
+        OUTPUT(c.GetTopicPartitionCount(), NAMED('PingToCreateTopic'));
+
+        // Idle while Kafka prepares topic
+        Std.System.Debug.Sleep(1000);
+
+        OUTPUT(c.GetTopicPartitionCount(), NAMED('ConsumerGetTopicPartitionCount'));
+
+        OUTPUT(c.ResetMessageOffsets(), NAMED('ConsumerResetMessageOffsets1'));
+
+        OUTPUT(p.GetTopicPartitionCount(), NAMED('PublisherGetTopicPartitionCount'));
+
+        OUTPUT(p.PublishMessage('Regular message'), NAMED('PublishMessageUnkeyed'));
+
+        OUTPUT(p.PublishMessage('Keyed message'), NAMED('PublishMessageKeyed'));
+        
+        // Idle while Kafka publishes
+        Std.System.Debug.Sleep(1000);
+
+        OUTPUT(c.GetMessages(10), NAMED('GetMessages1'));
+
+        OUTPUT(c.GetMessages(10), NAMED('GetMessagesEmpty'));
+
+        OUTPUT(c.ResetMessageOffsets(), NAMED('ConsumerResetMessageOffsets2'));
+
+        OUTPUT(c.GetMessages(10), NAMED('GetMessages2'));
+
+        OUTPUT(c.SetMessageOffsets(DATASET([{0,0}], kafka.KafkaMessageOffset)), NAMED('ConsumerSetExplicitMessageOffsets'));
+
+        OUTPUT(c.GetMessages(10), NAMED('GetMessages3'));
+
+        OUTPUT(c.ResetMessageOffsets(), NAMED('ConsumerResetMessageOffsets3'));
+
+        OUTPUT(c.LastMessageOffsets(c.GetMessages(10)), NAMED('ConsumerLastMessageOffsets'));
+    );

+ 43 - 0
testing/regress/ecl/key/kafkatest.xml

@@ -0,0 +1,43 @@
+<Dataset name='PingToCreateTopic'>
+ <Row><PingToCreateTopic>0</PingToCreateTopic></Row>
+</Dataset>
+<Dataset name='ConsumerGetTopicPartitionCount'>
+ <Row><ConsumerGetTopicPartitionCount>1</ConsumerGetTopicPartitionCount></Row>
+</Dataset>
+<Dataset name='ConsumerResetMessageOffsets1'>
+ <Row><ConsumerResetMessageOffsets1>1</ConsumerResetMessageOffsets1></Row>
+</Dataset>
+<Dataset name='PublisherGetTopicPartitionCount'>
+ <Row><PublisherGetTopicPartitionCount>1</PublisherGetTopicPartitionCount></Row>
+</Dataset>
+<Dataset name='PublishMessageUnkeyed'>
+ <Row><PublishMessageUnkeyed>true</PublishMessageUnkeyed></Row>
+</Dataset>
+<Dataset name='PublishMessageKeyed'>
+ <Row><PublishMessageKeyed>true</PublishMessageKeyed></Row>
+</Dataset>
+<Dataset name='GetMessages1'>
+ <Row><partitionnum>0</partitionnum><offset>0</offset><message>Regular message</message></Row>
+ <Row><partitionnum>0</partitionnum><offset>1</offset><message>Keyed message</message></Row>
+</Dataset>
+<Dataset name='GetMessagesEmpty'>
+</Dataset>
+<Dataset name='ConsumerResetMessageOffsets2'>
+ <Row><ConsumerResetMessageOffsets2>1</ConsumerResetMessageOffsets2></Row>
+</Dataset>
+<Dataset name='GetMessages2'>
+ <Row><partitionnum>0</partitionnum><offset>0</offset><message>Regular message</message></Row>
+ <Row><partitionnum>0</partitionnum><offset>1</offset><message>Keyed message</message></Row>
+</Dataset>
+<Dataset name='ConsumerSetExplicitMessageOffsets'>
+ <Row><ConsumerSetExplicitMessageOffsets>1</ConsumerSetExplicitMessageOffsets></Row>
+</Dataset>
+<Dataset name='GetMessages3'>
+ <Row><partitionnum>0</partitionnum><offset>1</offset><message>Keyed message</message></Row>
+</Dataset>
+<Dataset name='ConsumerResetMessageOffsets3'>
+ <Row><ConsumerResetMessageOffsets3>1</ConsumerResetMessageOffsets3></Row>
+</Dataset>
+<Dataset name='ConsumerLastMessageOffsets'>
+ <Row><partitionnum>0</partitionnum><offset>1</offset></Row>
+</Dataset>