kafka.hpp 20 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453
  1. /*##############################################################################
  2. HPCC SYSTEMS software Copyright (C) 2015 HPCC Systems®.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. ############################################################################## */
  13. #ifndef ECL_KAFKA_INCL
  14. #define ECL_KAFKA_INCL
  15. #ifdef _WIN32
  16. #define ECL_KAFKA_CALL _cdecl
  17. #else
  18. #define ECL_KAFKA_CALL
  19. #endif
  20. #ifdef ECL_KAFKA_EXPORTS
  21. #define ECL_KAFKA_API DECL_EXPORT
  22. #else
  23. #define ECL_KAFKA_API DECL_IMPORT
  24. #endif
  25. #include "platform.h"
  26. #include "jthread.hpp"
  27. #include "hqlplugins.hpp"
  28. #include "eclrtl_imp.hpp"
  29. #include "eclhelper.hpp"
  30. #include <atomic>
  31. #include <string>
  32. #include <time.h>
  33. #include "librdkafka/rdkafkacpp.h"
  34. #ifdef ECL_KAFKA_EXPORTS
  35. extern "C"
  36. {
  37. ECL_KAFKA_API bool getECLPluginDefinition(ECLPluginDefinitionBlock *pb);
  38. }
  39. #endif
  40. extern "C++"
  41. {
  42. namespace KafkaPlugin
  43. {
  44. class KafkaObj;
  45. class Poller;
  46. class Publisher;
  47. class Consumer;
  48. class KafkaStreamedDataset;
  49. /** @class KafkaObj
  50. *
  51. * Parent class for both Publisher and Consumer classes. Provides
  52. * easy way for a Poller object to access either for callbacks, etc.
  53. */
  54. class KafkaObj
  55. {
  56. public:
  57. /**
  58. * Returns a pointer to the librdkafka object that can be either
  59. * a producer or consumer.
  60. */
  61. virtual RdKafka::Handle* handle() = 0;
  62. };
  63. //----------------------------------------------------------------------
  64. /** @class Poller
  65. *
  66. * Background execution of librdkafka's poll() function, which is
  67. * required in order to batch I/O. One Poller will be created
  68. * for each Publisher and Consumer object actively used
  69. */
  70. class Poller : public Thread
  71. {
  72. public:
  73. /**
  74. * Constructor
  75. *
  76. * @param _parentPtr Pointer to Publisher or Consumer object
  77. * that created this object
  78. * @param _pollTimeout The number of milliseconds to wait
  79. * for events within librdkafka
  80. */
  81. Poller(KafkaObj* _parentPtr, __int32 _pollTimeout);
  82. /**
  83. * Starts execution of the thread main event loop
  84. */
  85. virtual void start();
  86. /**
  87. * Stops execution of the thread main event loop. Note that we
  88. * wait until the main event loop has actually stopped before
  89. * returning.
  90. */
  91. void stop();
  92. /**
  93. * Entry point to the thread main event loop. Exiting this
  94. * method means that the thread should stop.
  95. */
  96. virtual int run();
  97. private:
  98. std::atomic_bool shouldRun; //!< If true, we should execute our thread's main event loop
  99. KafkaObj* parentPtr; //!< Pointer to object that started this threaded execution
  100. __int32 pollTimeout; //!< The amount of time (in ms) we give to librdkafka's poll() function
  101. };
  102. //----------------------------------------------------------------------
  103. class Publisher : public KafkaObj, public RdKafka::EventCb, public RdKafka::DeliveryReportCb
  104. {
  105. public:
  106. /**
  107. * Constructor
  108. *
  109. * @param _brokers One or more Kafka brokers, in the
  110. * format 'name[:port]' where 'name'
  111. * is either a host name or IP address;
  112. * multiple brokers can be delimited
  113. * with commas
  114. * @param _topic The name of the topic we will be
  115. * publishing to
  116. * @param _pollTimeout The number of milliseconds to wait
  117. * for events within librdkafka
  118. * @param _traceLevel Current logging level
  119. */
  120. Publisher(const std::string& _brokers, const std::string& _topic, __int32 _pollTimeout, int _traceLevel);
  121. virtual ~Publisher();
  122. /**
  123. * @return A pointer to the librdkafka producer object.
  124. */
  125. virtual RdKafka::Handle* handle();
  126. /**
  127. * @return Updates the touch time and returns it.
  128. */
  129. time_t updateTimeTouched();
  130. /**
  131. * @return The time at which this object was created
  132. */
  133. time_t getTimeTouched() const;
  134. /**
  135. * If needed, establish connection to Kafka cluster using the
  136. * parameters stored within this object.
  137. */
  138. void ensureSetup();
  139. /**
  140. * Stops the attached poller's main event loop. This should be
  141. * called before deletion.
  142. */
  143. void shutdownPoller();
  144. /**
  145. * @return Returns the number of messages currently waiting
  146. * in the local outbound queue, ready for transmission
  147. * to the Kafka cluster
  148. */
  149. __int32 messagesWaitingInQueue();
  150. /**
  151. * Send one message
  152. *
  153. * @param message The message to send
  154. * @param key The key to attach to the message
  155. */
  156. void sendMessage(const std::string& message, const std::string& key);
  157. /**
  158. * Callback function. librdkafka will call here, outside of a
  159. * poll(), when it has interesting things to tell us
  160. *
  161. * @param event Reference to an Event object provided
  162. * by librdkafka
  163. */
  164. virtual void event_cb(RdKafka::Event& event);
  165. /**
  166. * Callback function. librdkafka will call here to notify
  167. * us of problems with delivering messages to the server
  168. *
  169. * @param message Reference to an Message object provided
  170. * by librdkafka
  171. */
  172. virtual void dr_cb (RdKafka::Message& message);
  173. private:
  174. std::string brokers; //!< One or more Kafka bootstrap brokers; comma-delimited; NameOrIP[:port]
  175. std::string topic; //!< The name of the topic to publish to
  176. RdKafka::Producer* producerPtr; //!< Pointer to librdkafka producer object
  177. std::atomic<RdKafka::Topic*> topicPtr; //!< Pointer to librdkafka topic object
  178. CriticalSection lock; //!< Mutex to ensure that only one thread creates the librdkafka object pointers
  179. Poller* pollerPtr; //!< Pointer to the threaded Poller object that gives time to librdkafka
  180. __int32 pollTimeout; //!< The amount of time (in ms) we give to librdkafka's poll() function
  181. time_t timeCreated; //!< The time at which this object was created
  182. int traceLevel; //!< The current logging level
  183. };
  184. //----------------------------------------------------------------------
  185. class Consumer : public KafkaObj, public RdKafka::EventCb
  186. {
  187. public:
  188. /**
  189. * Constructor
  190. *
  191. * @param _brokers One or more Kafka brokers, in the
  192. * format 'name[:port]' where 'name'
  193. * is either a host name or IP address;
  194. * multiple brokers can be delimited
  195. * with commas
  196. * @param _topic The name of the topic we will be
  197. * consuming from
  198. * @param _partitionNum The topic partition number we will be
  199. * consuming from
  200. * @param _traceLevel Current logging level
  201. */
  202. Consumer(const std::string& _brokers, const std::string& _topic, const std::string& _consumerGroup, __int32 _partitionNum, int _traceLevel);
  203. virtual ~Consumer();
  204. /**
  205. * @return A pointer to the librdkafka consumer object.
  206. */
  207. virtual RdKafka::Handle* handle();
  208. /**
  209. * If needed, establish connection to Kafka cluster using the
  210. * parameters stored within this object.
  211. */
  212. void ensureSetup();
  213. /**
  214. * @return Returns one new message from the inbound Kafka
  215. * topic. A NON-NULL RESULT MUST EVENTUALLY BE
  216. * DISPOSED OF WITH A CALL TO delete().
  217. */
  218. RdKafka::Message* getOneMessage();
  219. /**
  220. * Initializes the object and prepares it to receive
  221. * messages from a specific broker/topic/partition.
  222. */
  223. void prepForMessageFetch();
  224. /**
  225. * Commits the given offset to storage so we can pick up
  226. * where we left off in a subsequent read.
  227. *
  228. * @param offset The offset to store
  229. */
  230. void commitOffset(__int64 offset) const;
  231. /**
  232. * If the offset file does not exist, create one with a
  233. * default offset
  234. */
  235. void initFileOffsetIfNotExist() const;
  236. /**
  237. * Callback function. librdkafka will call here, outside of a
  238. * poll(), when it has interesting things to tell us
  239. *
  240. * @param event Reference to an Event object provided
  241. * by librdkafka
  242. */
  243. virtual void event_cb(RdKafka::Event& event);
  244. private:
  245. std::string brokers; //!< One or more Kafka bootstrap brokers; comma-delimited; NameOrIP[:port]
  246. std::string topic; //!< The name of the topic to consume from
  247. std::string consumerGroup; //!< The name of the consumer group for this consumer object
  248. StringBuffer offsetPath; //!< Full path to the Kafka topic offset file
  249. RdKafka::Consumer* consumerPtr; //!< Pointer to librdkafka consumer object
  250. std::atomic<RdKafka::Topic*> topicPtr; //!< Pointer to librdkafka topic object
  251. CriticalSection lock; //!< Mutex to ensure that only one thread creates the librdkafka object pointers or starts/stops the queue
  252. __int32 partitionNum; //!< The partition within the topic from which we will be pulling messages
  253. int traceLevel; //!< The current logging level
  254. };
  255. //----------------------------------------------------------------------
  256. class KafkaStreamedDataset : implements IRowStream, public RtlCInterface
  257. {
  258. public:
  259. /**
  260. * Constructor
  261. *
  262. * @param _consumerPtr Pointer to the Consumer object
  263. * from which we will be retrieving
  264. * records
  265. * @param _resultAllocator The memory allocator used to build
  266. * the result rows; this is provided
  267. * by the platform during the
  268. * plugin call
  269. * @param _traceLevel The current logging level
  270. * @param _maxRecords The maximum number of records
  271. * to return; use 0 to return all
  272. * available records
  273. */
  274. KafkaStreamedDataset(Consumer* _consumerPtr, IEngineRowAllocator* _resultAllocator, int _traceLevel, __int64 _maxRecords = -1);
  275. virtual ~KafkaStreamedDataset();
  276. RTLIMPLEMENT_IINTERFACE
  277. virtual const void* nextRow();
  278. virtual void stop();
  279. private:
  280. Consumer* consumerPtr; //!< Pointer to the Consumer object that we will read from
  281. Linked<IEngineRowAllocator> resultAllocator; //!< Pointer to allocator used when building result rows
  282. int traceLevel; //!< The current logging level
  283. bool shouldRead; //!< If true, we should continue trying to read more messages
  284. __int64 maxRecords; //!< The maximum number of messages to read
  285. __int64 consumedRecCount; //!< The number of messages actually read
  286. __int64 lastMsgOffset; //!< The offset of the last message read from the consumer
  287. };
  288. //----------------------------------------------------------------------
  289. /**
  290. * Queues the message for publishing to a topic on a Kafka cluster.
  291. *
  292. * @param ctx The execution context
  293. * @param brokers One or more Kafka brokers, in the
  294. * format 'name[:port]' where 'name'
  295. * is either a host name or IP address;
  296. * multiple brokers can be delimited
  297. * with commas
  298. * @param topic The name of the topic
  299. * @param message The message to send
  300. * @param key The key to use for the message
  301. *
  302. * @return true if the message was cached successfully
  303. */
  304. ECL_KAFKA_API bool ECL_KAFKA_CALL publishMessage(ICodeContext* ctx, const char* brokers, const char* topic, const char* message, const char* key);
  305. //----------------------------------------------------------------------
  306. /**
  307. * Queues the message for publishing to a topic on a Kafka cluster.
  308. *
  309. * @param ctx The execution context
  310. * @param brokers One or more Kafka brokers, in the
  311. * format 'name[:port]' where 'name'
  312. * is either a host name or IP address;
  313. * multiple brokers can be delimited
  314. * with commas
  315. * @param topic The name of the topic
  316. * @param lenMessage Length (in characters, not bytes)
  317. * of message
  318. * @param message The UTF-8 message to send
  319. * @param lenKey Length (in characters, not bytes)
  320. * of key
  321. * @param key The UTF-8 key to use for the message
  322. *
  323. * @return true if the message was cached successfully
  324. */
  325. ECL_KAFKA_API bool ECL_KAFKA_CALL publishMessage(ICodeContext* ctx, const char* brokers, const char* topic, size32_t lenMessage, const char* message, size32_t lenKey, const char* key);
  326. /**
  327. * Get the number of partitions currently set up for a topic on a cluster.
  328. *
  329. * @param brokers One or more Kafka brokers, in the
  330. * format 'name[:port]' where 'name'
  331. * is either a host name or IP address;
  332. * multiple brokers can be delimited
  333. * with commas
  334. * @param topic The name of the topic
  335. *
  336. * @return The number of partitions or zero if either the topic does not
  337. * exist or there was an error
  338. */
  339. ECL_KAFKA_API __int32 ECL_KAFKA_CALL getTopicPartitionCount(const char* brokers, const char* topic);
  340. /**
  341. * Retrieves a set of messages on a topic from a Kafka cluster.
  342. *
  343. * @param ctx Platform-provided context point
  344. * @param allocator Platform-provided memory allocator used
  345. * to help build data rows for returning
  346. * @param brokers One or more Kafka brokers, in the
  347. * format 'name[:port]' where 'name'
  348. * is either a host name or IP address;
  349. * multiple brokers can be delimited
  350. * with commas
  351. * @param topic The name of the topic
  352. * @param consumerGroup The name of the consumer group to use; see
  353. * https://kafka.apache.org/documentation.html#introduction
  354. * @param partitionNum The topic partition from which to pull
  355. * messages; this is a zero-based index
  356. * @param maxRecords The maximum number of records return;
  357. * pass zero to return as many messages
  358. * as possible (dangerous)
  359. *
  360. * @return An IRowStream pointer representing the fetched messages
  361. * or NULL if no messages could be retrieved
  362. */
  363. ECL_KAFKA_API IRowStream* ECL_KAFKA_CALL getMessageDataset(ICodeContext* ctx, IEngineRowAllocator* allocator, const char* brokers, const char* topic, const char* consumerGroup, __int32 partitionNum, __int64 maxRecords);
  364. /**
  365. * Resets the saved offsets for a partition.
  366. *
  367. * @param ctx Platform-provided context point
  368. * @param brokers One or more Kafka brokers, in the
  369. * format 'name[:port]' where 'name'
  370. * is either a host name or IP address;
  371. * multiple brokers can be delimited
  372. * with commas
  373. * @param topic The name of the topic
  374. * @param consumerGroup The name of the consumer group to use; see
  375. * https://kafka.apache.org/documentation.html#introduction
  376. * @param partitionNum The topic partition from which to pull
  377. * messages; this is a zero-based index
  378. * @param newOffset The new offset to save
  379. *
  380. * @return The offset that was saved
  381. */
  382. ECL_KAFKA_API __int64 ECL_KAFKA_CALL setMessageOffset(ICodeContext* ctx, const char* brokers, const char* topic, const char* consumerGroup, __int32 partitionNum, __int64 newOffset);
  383. }
  384. }
  385. #endif