kafka.ecllib 10 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256
  1. /*##############################################################################
  2. HPCC SYSTEMS software Copyright (C) 2015 HPCC Systems®.
  3. Licensed under the Apache License, Version 2.0 (the License);
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an AS IS BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. ############################################################################## */
  13. // Record structure containing message offset positioning
  14. EXPORT KafkaMessageOffset := RECORD
  15. UNSIGNED4 partitionNum;
  16. INTEGER8 offset;
  17. END;
  18. // Record structure that will be used to return Kafka messages to ECL
  19. EXPORT KafkaMessage := RECORD
  20. KafkaMessageOffset;
  21. UTF8 message;
  22. END;
  23. // Service definition
  24. SHARED kafka := SERVICE : plugin('kafka'), namespace('KafkaPlugin')
  25. BOOLEAN PublishMessage(CONST VARSTRING brokers, CONST VARSTRING topic, CONST UTF8 message, CONST UTF8 key) : cpp,action,context,entrypoint='publishMessage';
  26. INTEGER4 getTopicPartitionCount(CONST VARSTRING brokers, CONST VARSTRING topic) : cpp,action,context,entrypoint='getTopicPartitionCount';
  27. STREAMED DATASET(KafkaMessage) GetMessageDataset(CONST VARSTRING brokers, CONST VARSTRING topic, CONST VARSTRING consumerGroup, INTEGER4 partitionNum, INTEGER8 maxRecords) : cpp,action,context,entrypoint='getMessageDataset';
  28. INTEGER8 SetMessageOffset(CONST VARSTRING brokers, CONST VARSTRING topic, CONST VARSTRING consumerGroup, INTEGER4 partitionNum, INTEGER8 newOffset) : cpp,action,context,entrypoint='setMessageOffset';
  29. END;
  30. /**
  31. * Module wrapping message publishing functions.
  32. *
  33. * @param topic The name of the topic this module will be publishing to;
  34. * cannot be an empty string; REQUIRED
  35. * @param brokers One or more Kafka broker; each broker should be in the
  36. * form 'Name[:port]' where 'Name' may be either a DNS name
  37. * or an IP address; multiple brokers should be delimited
  38. * with a comma; brokers can also be set in the
  39. * kafka_global.conf configuration file, in which case
  40. * you should pass an empty string; OPTIONAL,
  41. * defaulting to 'localhost'
  42. */
  43. EXPORT KafkaPublisher(VARSTRING topic, VARSTRING brokers = 'localhost') := MODULE
  44. /**
  45. * Get the number of partitions currently set up for this topic
  46. *
  47. * @return The number of partitions or zero if either the topic does not
  48. * exist or there was an error
  49. */
  50. EXPORT INTEGER4 GetTopicPartitionCount() := kafka.getTopicPartitionCount(brokers, topic);
  51. /**
  52. * Queue one message for publishing to the current Kafka topic
  53. *
  54. * @param message The message to publish; must not be an empty string;
  55. * REQUIRED
  56. * @param key A key to attach to the message, used by Kafka to
  57. * route the message to a particular partition (keys
  58. * with the same value wind up on the same partition);
  59. * an empty string indicates no key value; OPTIONAL,
  60. * defaults to an empty string
  61. *
  62. * @return TRUE
  63. */
  64. EXPORT BOOLEAN PublishMessage(CONST UTF8 message, CONST UTF8 key = U8'') := kafka.PublishMessage(brokers, topic, message, key);
  65. END;
  66. /**
  67. * Module wrapping message consuming functions.
  68. *
  69. * @param topic The name of the topic this module will be publishing to;
  70. * cannot be an empty string; REQUIRED
  71. * @param brokers One or more Kafka broker; each broker should be in the
  72. * form 'Name[:port]' where 'Name' may be either a DNS name
  73. * or an IP address; multiple brokers should be delimited
  74. * with a comma; brokers can also be set in the
  75. * kafka_global.conf configuration file, in which case
  76. * you should pass an empty string; OPTIONAL,
  77. * defaulting to 'localhost'
  78. * @param consumerGroup The name of the Kafka consumer group to use for any
  79. * message consumption;
  80. * (see https://kafka.apache.org/documentation.html#introduction);
  81. * OPTIONAL, defaults to 'hpcc'
  82. */
  83. EXPORT KafkaConsumer(VARSTRING topic, VARSTRING brokers = 'localhost', VARSTRING consumerGroup = 'hpcc') := MODULE
  84. /**
  85. * Get the number of partitions currently set up for this topic
  86. *
  87. * @return The number of partitions or zero if either the topic does not
  88. * exist or there was an error
  89. */
  90. EXPORT INTEGER4 GetTopicPartitionCount() := kafka.getTopicPartitionCount(brokers, topic);
  91. /**
  92. * Consume previously-published messages from the current topic.
  93. *
  94. * @param maxRecords The maximum number of records to retrieve; pass
  95. * zero to return as many messages as there are
  96. * queued (dangerous); REQUIRED
  97. *
  98. * @return A new dataset containing the retrieved messages
  99. */
  100. EXPORT DATASET(KafkaMessage) GetMessages(INTEGER8 maxRecords) := FUNCTION
  101. // Record structure to hold messages from multiple partitions
  102. MultiNodeMessageRec := RECORD
  103. DATASET(KafkaMessage) messages;
  104. END;
  105. numberOfPartitions := GetTopicPartitionCount() : INDEPENDENT;
  106. maxRecordsPerNode := MAX(maxRecords DIV numberOfPartitions, 1);
  107. // Container holding messages from all partitions; in a multi-node setup
  108. // the work will be distributed among the nodes (at least up to the
  109. // number of partitions); note that 'COUNTER - 1' is actually the
  110. // Kafka partition number that will be read
  111. messageContainer := DATASET
  112. (
  113. numberOfPartitions,
  114. TRANSFORM
  115. (
  116. MultiNodeMessageRec,
  117. SELF.messages := kafka.GetMessageDataset(brokers, topic, consumerGroup, COUNTER - 1, maxRecordsPerNode)
  118. ),
  119. DISTRIBUTED
  120. );
  121. // Map messages from multiple partitions back to final record structure
  122. resultDS := NORMALIZE
  123. (
  124. messageContainer,
  125. LEFT.messages,
  126. TRANSFORM
  127. (
  128. KafkaMessage,
  129. SELF := RIGHT
  130. ),
  131. LOCAL
  132. );
  133. RETURN resultDS;
  134. END;
  135. /**
  136. * Given a set of messages, presumably just consumed from an Apache Kafka
  137. * cluster, summarize the last-read message offsets on a per-partition basis.
  138. * This is useful for logging/saving the last messages read during a
  139. * particular run, which can then be used to restore system state if you
  140. * have to re-consume older messages (see SetMessageOffsets() function).
  141. *
  142. * @param messages A dataset of consumed messages; REQUIRED
  143. *
  144. * @return A new dataset containing a summary of partitions and their
  145. * associated last-read message offsets.
  146. */
  147. EXPORT DATASET(KafkaMessageOffset) LastMessageOffsets(DATASET(KafkaMessage) messages) := FUNCTION
  148. t := TABLE
  149. (
  150. messages,
  151. {
  152. partitionNum,
  153. INTEGER8 offset := MAX(GROUP, offset)
  154. },
  155. partitionNum,
  156. MERGE
  157. );
  158. f := PROJECT(t, TRANSFORM(KafkaMessageOffset, SELF := LEFT));
  159. RETURN f;
  160. END;
  161. /**
  162. * Resets the last-read partition offsets to the values in the given dataset.
  163. * This is useful for "rewinding" message reading to an earlier point. The
  164. * next call to GetMessages() will start consuming at the points described
  165. * in the dataset.
  166. *
  167. * @param offsets A dataset of of partitions and the offsets to which
  168. * you want to set each, like the result from a call
  169. * to LastMessageOffsets(); REQUIRED
  170. *
  171. * @return The number of partitions set
  172. */
  173. EXPORT UNSIGNED4 SetMessageOffsets(DATASET(KafkaMessageOffset) offsets) := FUNCTION
  174. // Distribute the offset data so that each partition lines up on the right node
  175. distOffsets := DISTRIBUTE(offsets, partitionNum);
  176. // Temporary result layout that will capture a COUNTER value generated
  177. // by PROJECT, which in turn ensures that the LOCAL flag is actually used
  178. // and our data distribution is honored (the distribution is required in
  179. // order to ensure that kafka.SetMessageOffset() is called on the correct
  180. // Thor nodes)
  181. ResultLayout := RECORD
  182. KafkaMessageOffset;
  183. UNSIGNED4 c;
  184. END;
  185. // Set the offset for each partition on each node
  186. result := PROJECT
  187. (
  188. NOFOLD(distOffsets),
  189. TRANSFORM
  190. (
  191. ResultLayout,
  192. SELF.offset := kafka.SetMessageOffset(brokers, topic, consumerGroup, LEFT.partitionNum, LEFT.offset),
  193. SELF.c := COUNTER,
  194. SELF := LEFT
  195. ),
  196. LOCAL
  197. );
  198. RETURN COUNT(result(offset >= 0));
  199. END;
  200. /**
  201. * Convenience function. Resets all topic partitions to their earliest
  202. * point.
  203. *
  204. * @return The number of partitions reset
  205. */
  206. EXPORT UNSIGNED4 ResetMessageOffsets() := FUNCTION
  207. numberOfPartitions := GetTopicPartitionCount() : INDEPENDENT;
  208. offsets := DATASET
  209. (
  210. numberOfPartitions,
  211. TRANSFORM
  212. (
  213. KafkaMessageOffset,
  214. SELF.partitionNum := COUNTER - 1,
  215. SELF.offset := 0
  216. )
  217. );
  218. RETURN SetMessageOffsets(offsets);
  219. END;
  220. END;