kafka.hpp 20 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442
  1. /*##############################################################################
  2. HPCC SYSTEMS software Copyright (C) 2015 HPCC Systems®.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. ############################################################################## */
  13. #ifndef ECL_KAFKA_INCL
  14. #define ECL_KAFKA_INCL
  15. #ifdef _WIN32
  16. #define ECL_KAFKA_CALL _cdecl
  17. #ifdef ECL_KAFKA_EXPORTS
  18. #define ECL_KAFKA_API __declspec(dllexport)
  19. #else
  20. #define ECL_KAFKA_API __declspec(dllimport)
  21. #endif
  22. #else
  23. #define ECL_KAFKA_CALL
  24. #define ECL_KAFKA_API
  25. #endif
  26. #include "platform.h"
  27. #include "jthread.hpp"
  28. #include "hqlplugins.hpp"
  29. #include "eclrtl_imp.hpp"
  30. #include "eclhelper.hpp"
  31. #include <atomic>
  32. #include <string>
  33. #include <time.h>
  34. #include "librdkafka/rdkafkacpp.h"
  35. #ifdef ECL_KAFKA_EXPORTS
  36. extern "C"
  37. {
  38. ECL_KAFKA_API bool getECLPluginDefinition(ECLPluginDefinitionBlock *pb);
  39. }
  40. #endif
  41. extern "C++"
  42. {
  43. namespace KafkaPlugin
  44. {
  45. class KafkaObj;
  46. class Poller;
  47. class Publisher;
  48. class Consumer;
  49. class KafkaStreamedDataset;
  50. /** @class KafkaObj
  51. *
  52. * Parent class for both Publisher and Consumer classes. Provides
  53. * easy way for a Poller object to access either for callbacks, etc.
  54. */
  55. class KafkaObj
  56. {
  57. public:
  58. /**
  59. * Returns a pointer to the librdkafka object that can be either
  60. * a producer or consumer.
  61. */
  62. virtual RdKafka::Handle* handle() = 0;
  63. };
  64. //----------------------------------------------------------------------
  65. /** @class Poller
  66. *
  67. * Background execution of librdkafka's poll() function, which is
  68. * required in order to batch I/O. One Poller will be created
  69. * for each Publisher and Consumer object actively used
  70. */
  71. class Poller : public Thread
  72. {
  73. public:
  74. /**
  75. * Constructor
  76. *
  77. * @param _parentPtr Pointer to Publisher or Consumer object
  78. * that created this object
  79. * @param _pollTimeout The number of milliseconds to wait
  80. * for events within librdkafka
  81. */
  82. Poller(KafkaObj* _parentPtr, __int32 _pollTimeout);
  83. /**
  84. * Starts execution of the thread main event loop
  85. */
  86. virtual void start();
  87. /**
  88. * Stops execution of the thread main event loop. Note that we
  89. * wait until the main event loop has actually stopped before
  90. * returning.
  91. */
  92. void stop();
  93. /**
  94. * Entry point to the thread main event loop. Exiting this
  95. * method means that the thread should stop.
  96. */
  97. virtual int run();
  98. private:
  99. std::atomic_bool shouldRun; //!< If true, we should execute our thread's main event loop
  100. KafkaObj* parentPtr; //!< Pointer to object that started this threaded execution
  101. __int32 pollTimeout; //!< The amount of time (in ms) we give to librdkafka's poll() function
  102. };
  103. //----------------------------------------------------------------------
  104. class Publisher : public KafkaObj, public RdKafka::EventCb, public RdKafka::DeliveryReportCb
  105. {
  106. public:
  107. /**
  108. * Constructor
  109. *
  110. * @param _brokers One or more Kafka brokers, in the
  111. * format 'name[:port]' where 'name'
  112. * is either a host name or IP address;
  113. * multiple brokers can be delimited
  114. * with commas
  115. * @param _topic The name of the topic we will be
  116. * publishing to
  117. * @param _pollTimeout The number of milliseconds to wait
  118. * for events within librdkafka
  119. * @param _traceLevel Current logging level
  120. */
  121. Publisher(const std::string& _brokers, const std::string& _topic, __int32 _pollTimeout, int _traceLevel);
  122. virtual ~Publisher();
  123. /**
  124. * @return A pointer to the librdkafka producer object.
  125. */
  126. virtual RdKafka::Handle* handle();
  127. /**
  128. * @return Updates the touch time and returns it.
  129. */
  130. time_t updateTimeTouched();
  131. /**
  132. * @return The time at which this object was created
  133. */
  134. time_t getTimeTouched() const;
  135. /**
  136. * If needed, establish connection to Kafka cluster using the
  137. * parameters stored within this object.
  138. */
  139. void ensureSetup();
  140. /**
  141. * Stops the attached poller's main event loop. This should be
  142. * called before deletion.
  143. */
  144. void shutdownPoller();
  145. /**
  146. * @return Returns the number of messages currently waiting
  147. * in the local outbound queue, ready for transmission
  148. * to the Kafka cluster
  149. */
  150. __int32 messagesWaitingInQueue();
  151. /**
  152. * Send one message
  153. *
  154. * @param message The message to send
  155. * @param key The key to attach to the message
  156. */
  157. void sendMessage(const std::string& message, const std::string& key);
  158. /**
  159. * Callback function. librdkafka will call here, outside of a
  160. * poll(), when it has interesting things to tell us
  161. *
  162. * @param event Reference to an Event object provided
  163. * by librdkafka
  164. */
  165. virtual void event_cb(RdKafka::Event& event);
  166. /**
  167. * Callback function. librdkafka will call here to notify
  168. * us of problems with delivering messages to the server
  169. *
  170. * @param message Reference to an Message object provided
  171. * by librdkafka
  172. */
  173. virtual void dr_cb (RdKafka::Message& message);
  174. private:
  175. std::string brokers; //!< One or more Kafka bootstrap brokers; comma-delimited; NameOrIP[:port]
  176. std::string topic; //!< The name of the topic to publish to
  177. RdKafka::Producer* producerPtr; //!< Pointer to librdkafka producer object
  178. std::atomic<RdKafka::Topic*> topicPtr; //!< Pointer to librdkafka topic object
  179. CriticalSection lock; //!< Mutex to ensure that only one thread creates the librdkafka object pointers
  180. Poller* pollerPtr; //!< Pointer to the threaded Poller object that gives time to librdkafka
  181. __int32 pollTimeout; //!< The amount of time (in ms) we give to librdkafka's poll() function
  182. time_t timeCreated; //!< The time at which this object was created
  183. int traceLevel; //!< The current logging level
  184. };
  185. //----------------------------------------------------------------------
  186. class Consumer : public KafkaObj, public RdKafka::EventCb
  187. {
  188. public:
  189. /**
  190. * Constructor
  191. *
  192. * @param _brokers One or more Kafka brokers, in the
  193. * format 'name[:port]' where 'name'
  194. * is either a host name or IP address;
  195. * multiple brokers can be delimited
  196. * with commas
  197. * @param _topic The name of the topic we will be
  198. * consuming from
  199. * @param _partitionNum The topic partition number we will be
  200. * consuming from
  201. * @param _traceLevel Current logging level
  202. */
  203. Consumer(const std::string& _brokers, const std::string& _topic, const std::string& _consumerGroup, __int32 _partitionNum, int _traceLevel);
  204. virtual ~Consumer();
  205. /**
  206. * @return A pointer to the librdkafka consumer object.
  207. */
  208. virtual RdKafka::Handle* handle();
  209. /**
  210. * If needed, establish connection to Kafka cluster using the
  211. * parameters stored within this object.
  212. */
  213. void ensureSetup();
  214. /**
  215. * @return Returns one new message from the inbound Kafka
  216. * topic. A NON-NULL RESULT MUST EVENTUALLY BE
  217. * DISPOSED OF WITH A CALL TO delete().
  218. */
  219. RdKafka::Message* getOneMessage();
  220. /**
  221. * Retrieves many messages from the inbound Kafka topic and
  222. * returns them as a streamed dataset. Note that this is a
  223. * per-brokers/per-topic/per-partition retrieval.
  224. *
  225. * @param allocator The allocator to use with RowBuilder
  226. * @param maxRecords The maximum number of records
  227. * to retrieved
  228. *
  229. * @return An IRowStream streamed dataset object pointer
  230. */
  231. KafkaStreamedDataset* getMessageDataset(IEngineRowAllocator* allocator, __int64 maxRecords = 1);
  232. /**
  233. * @return StringBuffer object containing the path to this
  234. * consumer's offset file
  235. */
  236. StringBuffer offsetFilePath() const;
  237. /**
  238. * Commits the given offset to storage so we can pick up
  239. * where we left off in a subsequent read.
  240. *
  241. * @param offset The offset to store
  242. */
  243. void commitOffset(__int64 offset) const;
  244. /**
  245. * If the offset file does not exist, create one with a
  246. * default offset
  247. */
  248. void initFileOffsetIfNotExist() const;
  249. /**
  250. * Callback function. librdkafka will call here, outside of a
  251. * poll(), when it has interesting things to tell us
  252. *
  253. * @param event Reference to an Event object provided
  254. * by librdkafka
  255. */
  256. virtual void event_cb(RdKafka::Event& event);
  257. private:
  258. std::string brokers; //!< One or more Kafka bootstrap brokers; comma-delimited; NameOrIP[:port]
  259. std::string topic; //!< The name of the topic to consume from
  260. std::string consumerGroup; //!< The name of the consumer group for this consumer object
  261. RdKafka::Consumer* consumerPtr; //!< Pointer to librdkafka consumer object
  262. std::atomic<RdKafka::Topic*> topicPtr; //!< Pointer to librdkafka topic object
  263. CriticalSection lock; //!< Mutex to ensure that only one thread creates the librdkafka object pointers or starts/stops the queue
  264. __int32 partitionNum; //!< The partition within the topic from which we will be pulling messages
  265. bool queueStarted; //!< If true, we have started the process of reading from the queue
  266. int traceLevel; //!< The current logging level
  267. };
  268. //----------------------------------------------------------------------
  269. class KafkaStreamedDataset : public RtlCInterface, implements IRowStream
  270. {
  271. public:
  272. /**
  273. * Constructor
  274. *
  275. * @param _consumerPtr Pointer to the Consumer object
  276. * from which we will be retrieving
  277. * records
  278. * @param _resultAllocator The memory allocator used to build
  279. * the result rows; this is provided
  280. * by the platform during the
  281. * plugin call
  282. * @param _traceLevel The current logging level
  283. * @param _maxRecords The maximum number of records
  284. * to return; use 0 to return all
  285. * available records
  286. */
  287. KafkaStreamedDataset(Consumer* _consumerPtr, IEngineRowAllocator* _resultAllocator, int _traceLevel, __int64 _maxRecords = -1);
  288. virtual ~KafkaStreamedDataset();
  289. RTLIMPLEMENT_IINTERFACE
  290. virtual const void* nextRow();
  291. virtual void stop();
  292. private:
  293. Consumer* consumerPtr; //!< Pointer to the Consumer object that we will read from
  294. Linked<IEngineRowAllocator> resultAllocator; //!< Pointer to allocator used when building result rows
  295. int traceLevel; //!< The current logging level
  296. bool shouldRead; //!< If true, we should continue trying to read more messages
  297. __int64 maxRecords; //!< The maximum number of messages to read
  298. __int64 consumedRecCount; //!< The number of messages actually read
  299. __int64 lastMsgOffset; //!< The offset of the last message read from the consumer
  300. };
  301. //----------------------------------------------------------------------
  302. /**
  303. * Queues the message for publishing to a topic on a Kafka cluster.
  304. *
  305. * @param brokers One or more Kafka brokers, in the
  306. * format 'name[:port]' where 'name'
  307. * is either a host name or IP address;
  308. * multiple brokers can be delimited
  309. * with commas
  310. * @param topic The name of the topic
  311. * @param message The message to send
  312. * @param key The key to use for the message
  313. *
  314. * @return true if the message was cached successfully
  315. */
  316. ECL_KAFKA_API bool ECL_KAFKA_CALL publishMessage(const char* brokers, const char* topic, const char* message, const char* key);
  317. /**
  318. * Get the number of partitions currently set up for a topic on a cluster.
  319. *
  320. * @param brokers One or more Kafka brokers, in the
  321. * format 'name[:port]' where 'name'
  322. * is either a host name or IP address;
  323. * multiple brokers can be delimited
  324. * with commas
  325. * @param topic The name of the topic
  326. *
  327. * @return The number of partitions or zero if either the topic does not
  328. * exist or there was an error
  329. */
  330. ECL_KAFKA_API __int32 ECL_KAFKA_CALL getTopicPartitionCount(const char* brokers, const char* topic);
  331. /**
  332. * Retrieves a set of messages on a topic from a Kafka cluster.
  333. *
  334. * @param ctx Platform-provided context point
  335. * @param allocator Platform-provided memory allocator used
  336. * to help build data rows for returning
  337. * @param brokers One or more Kafka brokers, in the
  338. * format 'name[:port]' where 'name'
  339. * is either a host name or IP address;
  340. * multiple brokers can be delimited
  341. * with commas
  342. * @param topic The name of the topic
  343. * @param consumerGroup The name of the consumer group to use; see
  344. * https://kafka.apache.org/documentation.html#introduction
  345. * @param partitionNum The topic partition from which to pull
  346. * messages; this is a zero-based index
  347. * @param maxRecords The maximum number of records return;
  348. * pass zero to return as many messages
  349. * as possible (dangerous)
  350. *
  351. * @return An IRowStream pointer representing the fetched messages
  352. * or NULL if no messages could be retrieved
  353. */
  354. ECL_KAFKA_API IRowStream* ECL_KAFKA_CALL getMessageDataset(ICodeContext* ctx, IEngineRowAllocator* allocator, const char* brokers, const char* topic, const char* consumerGroup, __int32 partitionNum, __int64 maxRecords);
  355. /**
  356. * Resets the saved offsets for a partition.
  357. *
  358. * @param ctx Platform-provided context point
  359. * @param brokers One or more Kafka brokers, in the
  360. * format 'name[:port]' where 'name'
  361. * is either a host name or IP address;
  362. * multiple brokers can be delimited
  363. * with commas
  364. * @param topic The name of the topic
  365. * @param consumerGroup The name of the consumer group to use; see
  366. * https://kafka.apache.org/documentation.html#introduction
  367. * @param partitionNum The topic partition from which to pull
  368. * messages; this is a zero-based index
  369. * @param newOffset The new offset to save
  370. *
  371. * @return The offset that was saved
  372. */
  373. ECL_KAFKA_API __int64 ECL_KAFKA_CALL setMessageOffset(ICodeContext* ctx, const char* brokers, const char* topic, const char* consumerGroup, __int32 partitionNum, __int64 newOffset);
  374. }
  375. }
  376. #endif