Sfoglia il codice sorgente

HPCC-24155 Update Kafka plugin to support UTF-8 encoded messages

Signed-off-by: Dan S. Camper <dan.camper@lexisnexisrisk.com>
Dan S. Camper 5 anni fa
parent
commit
558e7a37af

+ 4 - 4
plugins/kafka/README.md

@@ -103,7 +103,7 @@ Example instantiating a publishing module:
 
 The module contains an exported function for publishing a message, defined as:
 
-    BOOLEAN PublishMessage(CONST VARSTRING message, CONST VARSTRING key = '');
+    BOOLEAN PublishMessage(CONST UTF8 message, CONST UTF8 key = '');
 
 The module function requires a string message and allows you to specify a 'key'
 that affects how Apache Kafka stores the message.  Key values act a lot like the
@@ -167,7 +167,7 @@ defined in the module.  The layout for that dataset is:
     KafkaMessage := RECORD
         UNSIGNED4   partition;
         INTEGER8    offset;
-        STRING      message;
+        UTF8        message;
     END;
 
 Example retrieving up to 10,000 messages:
@@ -238,7 +238,7 @@ single-node HPCC cluster and have installed Kafka on the same node, you can use
     IMPORT kafka;
 
     MyDataLayout := RECORD
-        STRING  message;
+        UTF8    message;
     END;
 
     ds := DATASET
@@ -247,7 +247,7 @@ single-node HPCC cluster and have installed Kafka on the same node, you can use
             TRANSFORM
                 (
                     MyDataLayout,
-                    SELF.message := 'Test message ' + (STRING)COUNTER
+                    SELF.message := U8'Test message ' + (UTF8)COUNTER
                 ),
             DISTRIBUTED
         );

+ 9 - 6
plugins/kafka/kafka.cpp

@@ -173,12 +173,12 @@ namespace KafkaPlugin
                                     //  EXPORT KafkaMessage := RECORD
                                     //      UNSIGNED4   partitionNum;
                                     //      UNSIGNED8   offset;
-                                    //      STRING      message;
+                                    //      UTF8        message;
                                     //  END;
 
                                     *(__int32*)(row) = messageObjPtr->partition();
                                     *(__int64*)(row + sizeof(__int32)) = messageObjPtr->offset();
-                                    *(size32_t*)(row + sizeof(__int32) + sizeof(__int64)) = messageObjPtr->len();
+                                    *(size32_t*)(row + sizeof(__int32) + sizeof(__int64)) = rtlUtf8Length(messageObjPtr->len(), messageObjPtr->payload());
                                     memcpy(row + sizeof(__int32) + sizeof(__int64) + sizeof(size32_t), messageObjPtr->payload(), messageObjPtr->len());
 
                                     result = rowBuilder.finalizeRowClear(len);
@@ -921,13 +921,15 @@ namespace KafkaPlugin
     // Advertised Entry Point Functions
     //--------------------------------------------------------------------------
 
-    ECL_KAFKA_API bool ECL_KAFKA_CALL publishMessage(ICodeContext* ctx, const char* brokers, const char* topic, const char* message, const char* key)
+    ECL_KAFKA_API bool ECL_KAFKA_CALL publishMessage(ICodeContext* ctx, const char* brokers, const char* topic, size32_t lenMessage, const char* message, size32_t lenKey, const char* key)
     {
         std::call_once(pubCacheInitFlag, setupPublisherCache, ctx->queryContextLogger().queryTraceLevel());
 
-        Publisher* pubObjPtr = publisherCache->getPublisher(brokers, topic, POLL_TIMEOUT);
+        Publisher*          pubObjPtr = publisherCache->getPublisher(brokers, topic, POLL_TIMEOUT);
+        std::string         messageStr(message, rtlUtf8Size(lenMessage, message));
+        std::string         keyStr(key, rtlUtf8Size(lenKey, key));
 
-        pubObjPtr->sendMessage(message, key);
+        pubObjPtr->sendMessage(messageStr, keyStr);
 
         return true;
     }
@@ -1031,9 +1033,10 @@ namespace KafkaPlugin
 // Plugin Initialization and Teardown
 //==============================================================================
 
-#define CURRENT_KAFKA_VERSION "kafka plugin 1.0.0"
+#define CURRENT_KAFKA_VERSION "kafka plugin 1.1.0"
 
 static const char* kafkaCompatibleVersions[] = {
+    "kafka plugin 1.0.0",
     CURRENT_KAFKA_VERSION,
     NULL };
 

+ 3 - 3
plugins/kafka/kafka.ecllib

@@ -24,13 +24,13 @@ END;
 // Record structure that will be used to return Kafka messages to ECL
 EXPORT KafkaMessage := RECORD
     KafkaMessageOffset;
-    STRING      message;
+    UTF8        message;
 END;
 
 // Service definition
 SHARED kafka := SERVICE : plugin('kafka'), namespace('KafkaPlugin')
 
-    BOOLEAN PublishMessage(CONST VARSTRING brokers, CONST VARSTRING topic, CONST VARSTRING message, CONST VARSTRING key) : cpp,action,context,entrypoint='publishMessage';
+    BOOLEAN PublishMessage(CONST VARSTRING brokers, CONST VARSTRING topic, CONST UTF8 message, CONST UTF8 key) : cpp,action,context,entrypoint='publishMessage';
     INTEGER4 getTopicPartitionCount(CONST VARSTRING brokers, CONST VARSTRING topic) : cpp,action,context,entrypoint='getTopicPartitionCount';
     STREAMED DATASET(KafkaMessage) GetMessageDataset(CONST VARSTRING brokers, CONST VARSTRING topic, CONST VARSTRING consumerGroup, INTEGER4 partitionNum, INTEGER8 maxRecords) : cpp,action,context,entrypoint='getMessageDataset';
     INTEGER8 SetMessageOffset(CONST VARSTRING brokers, CONST VARSTRING topic, CONST VARSTRING consumerGroup, INTEGER4 partitionNum, INTEGER8 newOffset) : cpp,action,context,entrypoint='setMessageOffset';
@@ -73,7 +73,7 @@ EXPORT KafkaPublisher(VARSTRING topic, VARSTRING brokers = 'localhost') := MODUL
      *
      * @return  TRUE
      */
-    EXPORT BOOLEAN PublishMessage(CONST VARSTRING message, CONST VARSTRING key = '') := kafka.PublishMessage(brokers, topic, message, key);
+    EXPORT BOOLEAN PublishMessage(CONST UTF8 message, CONST UTF8 key = U8'') := kafka.PublishMessage(brokers, topic, message, key);
 
 END;
 

+ 7 - 3
plugins/kafka/kafka.hpp

@@ -356,12 +356,16 @@ extern "C++"
          *                              multiple brokers can be delimited
          *                              with commas
          * @param   topic               The name of the topic
-         * @param   message             The message to send
-         * @param   key                 The key to use for the message
+         * @param   lenMessage          Length (in characters, not bytes)
+         *                              of message
+         * @param   message             The UTF-8 message to send
+         * @param   lenKey              Length (in characters, not bytes)
+         *                              of key
+         * @param   key                 The UTF-8 key to use for the message
          *
          * @return  true if the message was cached successfully
          */
-        ECL_KAFKA_API bool ECL_KAFKA_CALL publishMessage(const char* brokers, const char* topic, const char* message, const char* key);
+        ECL_KAFKA_API bool ECL_KAFKA_CALL publishMessage(ICodeContext* ctx, const char* brokers, const char* topic, size32_t lenMessage, const char* message, size32_t lenKey, const char* key);
 
         /**
          * Get the number of partitions currently set up for a topic on a cluster.

+ 13 - 2
testing/regress/ecl/kafkatest.ecl

@@ -52,8 +52,8 @@ SEQUENTIAL
 
         OUTPUT(p.PublishMessage('Regular message'), NAMED('PublishMessageUnkeyed'));
 
-        OUTPUT(p.PublishMessage('Keyed message'), NAMED('PublishMessageKeyed'));
-        
+        OUTPUT(p.PublishMessage('Keyed message', 'mykey'), NAMED('PublishMessageKeyed'));
+
         // Idle while Kafka publishes
         Std.System.Debug.Sleep(1000);
 
@@ -72,4 +72,15 @@ SEQUENTIAL
         OUTPUT(c.ResetMessageOffsets(), NAMED('ConsumerResetMessageOffsets3'));
 
         OUTPUT(c.LastMessageOffsets(c.GetMessages(10)), NAMED('ConsumerLastMessageOffsets'));
+
+        // Test UTF-8 support
+
+        OUTPUT(p.PublishMessage(U8'UTF-8: Blah – Unkeyed'), NAMED('PublishMessageUnkeyedUTF8')); // contains en-dash
+
+        OUTPUT(p.PublishMessage(U8'UTF-8: Blah – Keyed', U8'My–Key'), NAMED('PublishMessageKeyedUTF8')); // contains en-dashes
+
+        // Idle while Kafka publishes
+        Std.System.Debug.Sleep(1000);
+
+        OUTPUT(c.GetMessages(10), NAMED('GetMessagesUTF8'));
     );

+ 10 - 0
testing/regress/ecl/key/kafkatest.xml

@@ -41,3 +41,13 @@
 <Dataset name='ConsumerLastMessageOffsets'>
  <Row><partitionnum>0</partitionnum><offset>1</offset></Row>
 </Dataset>
+<Dataset name='PublishMessageUnkeyedUTF8'>
+ <Row><PublishMessageUnkeyedUTF8>true</PublishMessageUnkeyedUTF8></Row>
+</Dataset>
+<Dataset name='PublishMessageKeyedUTF8'>
+ <Row><PublishMessageKeyedUTF8>true</PublishMessageKeyedUTF8></Row>
+</Dataset>
+<Dataset name='GetMessagesUTF8'>
+ <Row><partitionnum>0</partitionnum><offset>2</offset><message>UTF-8: Blah – Unkeyed</message></Row>
+ <Row><partitionnum>0</partitionnum><offset>3</offset><message>UTF-8: Blah – Keyed</message></Row>
+</Dataset>