kafka.cpp 40 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128
  1. /*##############################################################################
  2. HPCC SYSTEMS software Copyright (C) 2015 HPCC Systems®.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. ############################################################################## */
  13. #include "kafka.hpp"
  14. #include "rtlds_imp.hpp"
  15. #include "jlog.hpp"
  16. #include "jmutex.hpp"
  17. #include "jprop.hpp"
  18. #include "jfile.hpp"
  19. #include "build-config.h"
  20. #include "librdkafka/rdkafka.h"
  21. #include <map>
  22. #include <fstream>
  23. #include <mutex>
  24. //==============================================================================
  25. // Kafka Interface Code
  26. //==============================================================================
  27. namespace KafkaPlugin
  28. {
  29. //--------------------------------------------------------------------------
  30. // File Constants
  31. //--------------------------------------------------------------------------
  32. // Filename of global Kafka configuration file
  33. const char* GLOBAL_CONFIG_FILENAME = "kafka_global.conf";
  34. // The minimum number of seconds that a cached object can live
  35. // without activity
  36. const time_t OBJECT_EXPIRE_TIMEOUT_SECONDS = 60 * 2;
  37. // The number of milliseconds given to librdkafka to perform explicit
  38. // background activity
  39. const __int32 POLL_TIMEOUT = 1000;
  40. //--------------------------------------------------------------------------
  41. // Static Variables
  42. //--------------------------------------------------------------------------
  43. static std::once_flag pubCacheInitFlag;
  44. //--------------------------------------------------------------------------
  45. // Static Methods (internal)
  46. //--------------------------------------------------------------------------
  47. /**
  48. * Look for an optional configuration file and apply any found configuration
  49. * parameters to a librdkafka configuration object.
  50. *
  51. * @param configFilePath The path to a configuration file; it is not
  52. * necessary for the file to exist
  53. * @param globalConfigPtr A pointer to the configuration object that
  54. * will receive any found parameters
  55. * @param traceLevel The current log trace level
  56. */
  57. static void applyConfig(const char* configFilePath, RdKafka::Conf* globalConfigPtr, int traceLevel)
  58. {
  59. if (configFilePath && *configFilePath && globalConfigPtr)
  60. {
  61. std::string errStr;
  62. StringBuffer fullConfigPath;
  63. fullConfigPath.append(CONFIG_DIR).append(PATHSEPSTR).append(configFilePath);
  64. Owned<IProperties> properties = createProperties(fullConfigPath.str(), true);
  65. Owned<IPropertyIterator> props = properties->getIterator();
  66. ForEach(*props)
  67. {
  68. StringBuffer key(props->getPropKey());
  69. key.trim();
  70. if (key.length() > 0 && key.charAt(0) != '#')
  71. {
  72. if (strcmp(key.str(), "metadata.broker.list") != 0)
  73. {
  74. const char* value = properties->queryProp(key);
  75. if (value && *value)
  76. {
  77. if (globalConfigPtr->set(key.str(), value, errStr) != RdKafka::Conf::CONF_OK)
  78. {
  79. DBGLOG("Kafka: Failed to set config param from file %s: '%s' = '%s'; error: '%s'", configFilePath, key.str(), value, errStr.c_str());
  80. }
  81. else if (traceLevel > 4)
  82. {
  83. DBGLOG("Kafka: Set config param from file %s: '%s' = '%s'", configFilePath, key.str(), value);
  84. }
  85. }
  86. }
  87. else
  88. {
  89. DBGLOG("Kafka: Setting '%s' ignored in config file %s", key.str(), configFilePath);
  90. }
  91. }
  92. }
  93. }
  94. }
  95. //--------------------------------------------------------------------------
  96. // Plugin Classes
  97. //--------------------------------------------------------------------------
  98. KafkaStreamedDataset::KafkaStreamedDataset(Consumer* _consumerPtr, IEngineRowAllocator* _resultAllocator, int _traceLevel, __int64 _maxRecords)
  99. : consumerPtr(_consumerPtr),
  100. resultAllocator(_resultAllocator),
  101. traceLevel(_traceLevel),
  102. maxRecords(_maxRecords)
  103. {
  104. shouldRead = true;
  105. consumedRecCount = 0;
  106. lastMsgOffset = 0;
  107. }
  108. KafkaStreamedDataset::~KafkaStreamedDataset()
  109. {
  110. if (consumerPtr)
  111. {
  112. if (consumedRecCount > 0)
  113. {
  114. consumerPtr->commitOffset(lastMsgOffset);
  115. }
  116. delete(consumerPtr);
  117. }
  118. }
  119. const void* KafkaStreamedDataset::nextRow()
  120. {
  121. const void* result = NULL;
  122. __int32 maxAttempts = 10; //!< Maximum number of tries if local queue is full
  123. __int32 timeoutWait = 100; //!< Amount of time to wait between retries
  124. __int32 attemptNum = 0;
  125. if (consumerPtr && (maxRecords <= 0 || consumedRecCount < maxRecords))
  126. {
  127. RdKafka::Message* messageObjPtr = NULL;
  128. bool messageConsumed = false;
  129. while (!messageConsumed && shouldRead && attemptNum < maxAttempts)
  130. {
  131. messageObjPtr = consumerPtr->getOneMessage(); // messageObjPtr must be deleted when we are through with it
  132. if (messageObjPtr)
  133. {
  134. try
  135. {
  136. switch (messageObjPtr->err())
  137. {
  138. case RdKafka::ERR_NO_ERROR:
  139. {
  140. RtlDynamicRowBuilder rowBuilder(resultAllocator);
  141. unsigned len = sizeof(__int32) + sizeof(__int64) + sizeof(size32_t) + messageObjPtr->len();
  142. byte* row = rowBuilder.ensureCapacity(len, NULL);
  143. // Populating this structure:
  144. // EXPORT KafkaMessage := RECORD
  145. // UNSIGNED4 partitionNum;
  146. // UNSIGNED8 offset;
  147. // UTF8 message;
  148. // END;
  149. *(__int32*)(row) = messageObjPtr->partition();
  150. *(__int64*)(row + sizeof(__int32)) = messageObjPtr->offset();
  151. *(size32_t*)(row + sizeof(__int32) + sizeof(__int64)) = rtlUtf8Length(messageObjPtr->len(), messageObjPtr->payload());
  152. memcpy(row + sizeof(__int32) + sizeof(__int64) + sizeof(size32_t), messageObjPtr->payload(), messageObjPtr->len());
  153. result = rowBuilder.finalizeRowClear(len);
  154. lastMsgOffset = messageObjPtr->offset();
  155. ++consumedRecCount;
  156. // Give opportunity for consumer to pull in any additional messages
  157. consumerPtr->handle()->poll(0);
  158. // Mark as loaded so we don't retry
  159. messageConsumed = true;
  160. }
  161. break;
  162. case RdKafka::ERR__TIMED_OUT:
  163. // No new messages arrived and we timed out waiting
  164. ++attemptNum;
  165. consumerPtr->handle()->poll(timeoutWait);
  166. break;
  167. case RdKafka::ERR__PARTITION_EOF:
  168. // We reached the end of the messages in the partition
  169. if (traceLevel > 4)
  170. {
  171. DBGLOG("Kafka: EOF reading message from partition %d", messageObjPtr->partition());
  172. }
  173. shouldRead = false;
  174. break;
  175. case RdKafka::ERR__UNKNOWN_PARTITION:
  176. // Unknown partition; don't throw an error here because
  177. // in some configurations (e.g. more Thor slaves than
  178. // partitions) not all consumers will have a partition
  179. // to read
  180. if (traceLevel > 4)
  181. {
  182. DBGLOG("Kafka: Unknown partition while trying to read");
  183. }
  184. shouldRead = false;
  185. break;
  186. case RdKafka::ERR__UNKNOWN_TOPIC:
  187. throw MakeStringException(-1, "Kafka: Error while reading message: '%s'", messageObjPtr->errstr().c_str());
  188. break;
  189. }
  190. }
  191. catch (...)
  192. {
  193. delete(messageObjPtr);
  194. throw;
  195. }
  196. delete(messageObjPtr);
  197. messageObjPtr = NULL;
  198. }
  199. }
  200. }
  201. return result;
  202. }
  203. void KafkaStreamedDataset::stop()
  204. {
  205. shouldRead = false;
  206. }
  207. //--------------------------------------------------------------------------
  208. Poller::Poller(KafkaObj* _parentPtr, __int32 _pollTimeout)
  209. : Thread("Kafka::Poller"),
  210. parentPtr(_parentPtr),
  211. pollTimeout(_pollTimeout),
  212. shouldRun(false)
  213. {
  214. }
  215. void Poller::start()
  216. {
  217. if (!isAlive() && parentPtr)
  218. {
  219. shouldRun = true;
  220. Thread::start();
  221. }
  222. }
  223. void Poller::stop()
  224. {
  225. if (isAlive())
  226. {
  227. shouldRun = false;
  228. join();
  229. }
  230. }
  231. int Poller::run()
  232. {
  233. RdKafka::Handle* handle = parentPtr->handle();
  234. while (shouldRun)
  235. {
  236. handle->poll(pollTimeout);
  237. }
  238. return 0;
  239. }
  240. //--------------------------------------------------------------------------
  241. Publisher::Publisher(const std::string& _brokers, const std::string& _topic, __int32 _pollTimeout, int _traceLevel)
  242. : brokers(_brokers),
  243. topic(_topic),
  244. pollTimeout(_pollTimeout),
  245. traceLevel(_traceLevel)
  246. {
  247. producerPtr = NULL;
  248. topicPtr = NULL;
  249. pollerPtr = new Poller(this, _pollTimeout);
  250. updateTimeTouched();
  251. }
  252. Publisher::~Publisher()
  253. {
  254. delete(pollerPtr);
  255. delete(topicPtr.load());
  256. delete(producerPtr);
  257. }
  258. RdKafka::Handle* Publisher::handle()
  259. {
  260. return static_cast<RdKafka::Handle*>(producerPtr);
  261. }
  262. time_t Publisher::updateTimeTouched()
  263. {
  264. timeCreated = time(NULL);
  265. return timeCreated;
  266. }
  267. time_t Publisher::getTimeTouched() const
  268. {
  269. return timeCreated;
  270. }
  271. void Publisher::shutdownPoller()
  272. {
  273. if (pollerPtr)
  274. {
  275. // Wait until we send all messages
  276. while (messagesWaitingInQueue() > 0)
  277. {
  278. usleep(pollTimeout);
  279. }
  280. // Tell poller to stop
  281. pollerPtr->stop();
  282. }
  283. }
  284. __int32 Publisher::messagesWaitingInQueue()
  285. {
  286. __int32 queueLength = 0;
  287. if (producerPtr)
  288. {
  289. queueLength = producerPtr->outq_len();
  290. }
  291. return queueLength;
  292. }
  293. void Publisher::ensureSetup()
  294. {
  295. if (!topicPtr.load(std::memory_order_acquire))
  296. {
  297. CriticalBlock block(lock);
  298. if (!topicPtr.load(std::memory_order_relaxed))
  299. {
  300. std::string errStr;
  301. RdKafka::Conf* globalConfig = RdKafka::Conf::create(RdKafka::Conf::CONF_GLOBAL);
  302. if (globalConfig)
  303. {
  304. // Set global configuration parameters, used mainly at the producer level
  305. globalConfig->set("metadata.broker.list", brokers, errStr);
  306. globalConfig->set("queue.buffering.max.messages", "1000000", errStr);
  307. globalConfig->set("compression.codec", "snappy", errStr);
  308. globalConfig->set("message.send.max.retries", "3", errStr);
  309. globalConfig->set("retry.backoff.ms", "500", errStr);
  310. // Set any global configurations from file, allowing
  311. // overrides of above settings
  312. applyConfig(GLOBAL_CONFIG_FILENAME, globalConfig, traceLevel);
  313. // Set producer callbacks
  314. globalConfig->set("event_cb", static_cast<RdKafka::EventCb*>(this), errStr);
  315. globalConfig->set("dr_cb", static_cast<RdKafka::DeliveryReportCb*>(this), errStr);
  316. // Create the producer
  317. producerPtr = RdKafka::Producer::create(globalConfig, errStr);
  318. delete globalConfig;
  319. if (producerPtr)
  320. {
  321. RdKafka::Conf* topicConfPtr = RdKafka::Conf::create(RdKafka::Conf::CONF_TOPIC);
  322. // Set any topic configurations from file
  323. std::string confName = "kafka_publisher_topic_" + topic + ".conf";
  324. applyConfig(confName.c_str(), topicConfPtr, traceLevel);
  325. // Create the topic
  326. topicPtr.store(RdKafka::Topic::create(producerPtr, topic, topicConfPtr, errStr), std::memory_order_release);
  327. delete topicConfPtr;
  328. if (topicPtr)
  329. {
  330. // Start the attached background poller
  331. pollerPtr->start();
  332. }
  333. else
  334. {
  335. throw MakeStringException(-1, "Kafka: Unable to create producer topic object for topic '%s'; error: '%s'", topic.c_str(), errStr.c_str());
  336. }
  337. }
  338. else
  339. {
  340. throw MakeStringException(-1, "Kafka: Unable to create producer object for brokers '%s'; error: '%s'", brokers.c_str(), errStr.c_str());
  341. }
  342. }
  343. else
  344. {
  345. throw MakeStringException(-1, "Kafka: Unable to create producer global configuration object for brokers '%s'; error: '%s'", brokers.c_str(), errStr.c_str());
  346. }
  347. }
  348. }
  349. }
  350. void Publisher::sendMessage(const std::string& message, const std::string& key)
  351. {
  352. __int32 maxAttempts = 10; //!< Maximum number of tries if local queue is full
  353. __int32 attemptNum = 0;
  354. // Make sure we have a valid connection to the Kafka cluster
  355. ensureSetup();
  356. // Actually send the message
  357. while (true)
  358. {
  359. RdKafka::ErrorCode resp = producerPtr->produce(topicPtr, RdKafka::Topic::PARTITION_UA, RdKafka::Producer::RK_MSG_COPY, const_cast<char*>(message.c_str()), message.size(), (key.empty() ? NULL : &key), NULL);
  360. if (resp == RdKafka::ERR_NO_ERROR)
  361. {
  362. break;
  363. }
  364. else if (resp == RdKafka::ERR__QUEUE_FULL)
  365. {
  366. if (attemptNum < maxAttempts)
  367. {
  368. usleep(pollTimeout);
  369. ++attemptNum;
  370. }
  371. else
  372. {
  373. throw MakeStringException(-1, "Kafka: Unable to send message to topic '%s'; error: '%s'", topic.c_str(), RdKafka::err2str(resp).c_str());
  374. }
  375. }
  376. else
  377. {
  378. throw MakeStringException(-1, "Kafka: Unable to send message to topic '%s'; error: '%s'", topic.c_str(), RdKafka::err2str(resp).c_str());
  379. }
  380. }
  381. }
  382. void Publisher::event_cb(RdKafka::Event& event)
  383. {
  384. if (traceLevel > 4)
  385. {
  386. switch (event.type())
  387. {
  388. case RdKafka::Event::EVENT_ERROR:
  389. DBGLOG("Kafka: Error: %s", event.str().c_str());
  390. break;
  391. case RdKafka::Event::EVENT_STATS:
  392. DBGLOG("Kafka: Stats: %s", event.str().c_str());
  393. break;
  394. case RdKafka::Event::EVENT_LOG:
  395. DBGLOG("Kafka: Log: %s", event.str().c_str());
  396. break;
  397. }
  398. }
  399. }
  400. void Publisher::dr_cb (RdKafka::Message& message)
  401. {
  402. if (message.err() != RdKafka::ERR_NO_ERROR)
  403. {
  404. StringBuffer payloadStr;
  405. if (message.len() == 0)
  406. payloadStr.append("<no message>");
  407. else
  408. payloadStr.append(message.len(), static_cast<const char*>(message.payload()));
  409. DBGLOG("Kafka: Error publishing message: %d (%s); message: '%s'", message.err(), message.errstr().c_str(), payloadStr.str());
  410. }
  411. }
  412. //--------------------------------------------------------------------------
  413. Consumer::Consumer(const std::string& _brokers, const std::string& _topic, const std::string& _consumerGroup, __int32 _partitionNum, int _traceLevel)
  414. : brokers(_brokers),
  415. topic(_topic),
  416. consumerGroup(_consumerGroup),
  417. partitionNum(_partitionNum),
  418. traceLevel(_traceLevel)
  419. {
  420. consumerPtr = NULL;
  421. topicPtr = NULL;
  422. char cpath[_MAX_DIR];
  423. GetCurrentDirectory(_MAX_DIR, cpath);
  424. offsetPath.append(cpath);
  425. addPathSepChar(offsetPath);
  426. offsetPath.append(topic.c_str());
  427. offsetPath.append("-");
  428. offsetPath.append(partitionNum);
  429. if (!consumerGroup.empty())
  430. {
  431. offsetPath.append("-");
  432. offsetPath.append(consumerGroup.c_str());
  433. }
  434. offsetPath.append(".offset");
  435. }
  436. Consumer::~Consumer()
  437. {
  438. if (consumerPtr && topicPtr)
  439. {
  440. consumerPtr->stop(topicPtr, partitionNum);
  441. }
  442. delete(topicPtr.load());
  443. delete(consumerPtr);
  444. }
  445. RdKafka::Handle* Consumer::handle()
  446. {
  447. return static_cast<RdKafka::Handle*>(consumerPtr);
  448. }
  449. void Consumer::ensureSetup()
  450. {
  451. if (!topicPtr.load(std::memory_order_acquire))
  452. {
  453. CriticalBlock block(lock);
  454. if (!topicPtr.load(std::memory_order_relaxed))
  455. {
  456. initFileOffsetIfNotExist();
  457. std::string errStr;
  458. RdKafka::Conf* globalConfig = RdKafka::Conf::create(RdKafka::Conf::CONF_GLOBAL);
  459. if (globalConfig)
  460. {
  461. // Set global configuration parameters, used mainly at the consumer level
  462. globalConfig->set("metadata.broker.list", brokers, errStr);
  463. globalConfig->set("compression.codec", "snappy", errStr);
  464. globalConfig->set("queued.max.messages.kbytes", "10000000", errStr);
  465. globalConfig->set("fetch.message.max.bytes", "10000000", errStr);
  466. // Set any global configurations from file, allowing
  467. // overrides of above settings
  468. applyConfig(GLOBAL_CONFIG_FILENAME, globalConfig, traceLevel);
  469. // Set consumer callbacks
  470. globalConfig->set("event_cb", static_cast<RdKafka::EventCb*>(this), errStr);
  471. // Create the consumer
  472. consumerPtr = RdKafka::Consumer::create(globalConfig, errStr);
  473. delete globalConfig;
  474. if (consumerPtr)
  475. {
  476. RdKafka::Conf* topicConfPtr = RdKafka::Conf::create(RdKafka::Conf::CONF_TOPIC);
  477. // Set the per-topic configuration parameters
  478. topicConfPtr->set("group.id", consumerGroup, errStr);
  479. topicConfPtr->set("auto.offset.reset", "smallest", errStr);
  480. // Set any topic configurations from file, allowing
  481. // overrides of above settings
  482. std::string confName = "kafka_consumer_topic_" + topic + ".conf";
  483. applyConfig(confName.c_str(), topicConfPtr, traceLevel);
  484. // Ensure that some items are set a certain way
  485. // by setting them after loading the external conf
  486. topicConfPtr->set("auto.commit.enable", "false", errStr);
  487. // Additional settings for updated librdkafka
  488. topicConfPtr->set("enable.auto.commit", "false", errStr);
  489. topicConfPtr->set("offset.store.method", "file", errStr);
  490. topicConfPtr->set("offset.store.path", offsetPath.str(), errStr);
  491. // Create the topic
  492. topicPtr.store(RdKafka::Topic::create(consumerPtr, topic, topicConfPtr, errStr), std::memory_order_release);
  493. delete topicConfPtr;
  494. if (!topicPtr)
  495. {
  496. throw MakeStringException(-1, "Kafka: Unable to create consumer topic object for topic '%s'; error: '%s'", topic.c_str(), errStr.c_str());
  497. }
  498. }
  499. else
  500. {
  501. throw MakeStringException(-1, "Kafka: Unable to create consumer object for brokers '%s'; error: '%s'", brokers.c_str(), errStr.c_str());
  502. }
  503. }
  504. else
  505. {
  506. throw MakeStringException(-1, "Kafka: Unable to create consumer global configuration object for brokers '%s'; error: '%s'", brokers.c_str(), errStr.c_str());
  507. }
  508. }
  509. }
  510. }
  511. RdKafka::Message* Consumer::getOneMessage()
  512. {
  513. return consumerPtr->consume(topicPtr, partitionNum, POLL_TIMEOUT);
  514. }
  515. void Consumer::prepForMessageFetch()
  516. {
  517. // Make sure we have a valid connection to the Kafka cluster
  518. ensureSetup();
  519. // Start the local read queue
  520. RdKafka::ErrorCode startErr = consumerPtr->start(topicPtr, partitionNum, RdKafka::Topic::OFFSET_STORED);
  521. if (startErr == RdKafka::ERR_NO_ERROR)
  522. {
  523. if (traceLevel > 4)
  524. {
  525. DBGLOG("Kafka: Started Consumer for %s:%d @ %s", topic.c_str(), partitionNum, brokers.c_str());
  526. }
  527. }
  528. else
  529. {
  530. throw MakeStringException(-1, "Kafka: Failed to start Consumer read for %s:%d @ %s; error: %d", topic.c_str(), partitionNum, brokers.c_str(), startErr);
  531. }
  532. }
  533. void Consumer::commitOffset(__int64 offset) const
  534. {
  535. if (offset >= 0)
  536. {
  537. // Not using librdkafka's offset_store because it seems to be broken
  538. // topicPtr->offset_store(partitionNum, offset);
  539. // Create/overwrite a file using the same naming convention and
  540. // file contents that librdkafka uses so it can pick up where
  541. // we left off; NOTE: librdkafka does not clean the topic name
  542. // or consumer group name when constructing this path
  543. // (which is actually a security concern), so we can't clean, either
  544. std::ofstream outFile(offsetPath.str(), std::ofstream::trunc);
  545. outFile << offset;
  546. if (traceLevel > 4)
  547. {
  548. DBGLOG("Kafka: Saved offset %lld to %s", offset, offsetPath.str());
  549. }
  550. }
  551. }
  552. void Consumer::initFileOffsetIfNotExist() const
  553. {
  554. if (!checkFileExists(offsetPath.str()))
  555. {
  556. commitOffset(0);
  557. if (traceLevel > 4)
  558. {
  559. DBGLOG("Kafka: Creating initial offset file %s", offsetPath.str());
  560. }
  561. }
  562. }
  563. void Consumer::event_cb(RdKafka::Event& event)
  564. {
  565. if (traceLevel > 4)
  566. {
  567. switch (event.type())
  568. {
  569. case RdKafka::Event::EVENT_ERROR:
  570. DBGLOG("Kafka: Error: %s", event.str().c_str());
  571. break;
  572. case RdKafka::Event::EVENT_STATS:
  573. DBGLOG("Kafka: Stats: %s", event.str().c_str());
  574. break;
  575. case RdKafka::Event::EVENT_LOG:
  576. DBGLOG("Kafka: Log: %s", event.str().c_str());
  577. break;
  578. }
  579. }
  580. }
  581. //--------------------------------------------------------------------------
  582. /** @class PublisherCacheObj
  583. *
  584. * Class used to create and cache publisher objects and connections
  585. */
  586. static class PublisherCacheObj
  587. {
  588. private:
  589. typedef std::map<std::string, Publisher*> ObjMap;
  590. public:
  591. /**
  592. * Constructor
  593. *
  594. * @param _traceLevel The current logging level
  595. */
  596. PublisherCacheObj(int _traceLevel)
  597. : traceLevel(_traceLevel)
  598. {
  599. }
  600. void deleteAll()
  601. {
  602. CriticalBlock block(lock);
  603. for (ObjMap::iterator x = cachedPublishers.begin(); x != cachedPublishers.end(); x++)
  604. {
  605. if (x->second)
  606. {
  607. // Shutdown the attached poller before deleting
  608. x->second->shutdownPoller();
  609. // Now delete
  610. delete(x->second);
  611. }
  612. }
  613. cachedPublishers.clear();
  614. }
  615. /**
  616. * Remove previously-created objects that have been inactive
  617. * for awhile
  618. */
  619. void expire()
  620. {
  621. if (!cachedPublishers.empty())
  622. {
  623. CriticalBlock block(lock);
  624. time_t oldestAllowedTime = time(NULL) - OBJECT_EXPIRE_TIMEOUT_SECONDS;
  625. __int32 expireCount = 0;
  626. for (ObjMap::iterator x = cachedPublishers.begin(); x != cachedPublishers.end(); /* increment handled explicitly */)
  627. {
  628. // Expire only if the publisher has been inactive and if
  629. // there are no messages in the outbound queue
  630. if (x->second && x->second->getTimeTouched() < oldestAllowedTime && x->second->messagesWaitingInQueue() == 0)
  631. {
  632. // Shutdown the attached poller before deleting
  633. x->second->shutdownPoller();
  634. // Delete the object
  635. delete(x->second);
  636. // Erase from map
  637. cachedPublishers.erase(x++);
  638. ++expireCount;
  639. }
  640. else
  641. {
  642. x++;
  643. }
  644. }
  645. if (traceLevel > 4 && expireCount > 0)
  646. {
  647. DBGLOG("Kafka: Expired %d cached publisher%s", expireCount, (expireCount == 1 ? "" : "s"));
  648. }
  649. }
  650. }
  651. /**
  652. * Gets an established Publisher, based on unique broker/topic
  653. * pairs, or creates a new one.
  654. *
  655. * @param brokers One or more Kafka brokers, in the
  656. * format 'name[:port]' where 'name'
  657. * is either a host name or IP address;
  658. * multiple brokers can be delimited
  659. * with commas
  660. * @param topic The name of the topic
  661. * @param pollTimeout The number of milliseconds to give
  662. * to librdkafka when executing
  663. * asynchronous activities
  664. *
  665. * @return A pointer to a Publisher* object.
  666. */
  667. Publisher* getPublisher(const std::string& brokers, const std::string& topic, __int32 pollTimeout)
  668. {
  669. Publisher* pubObjPtr = NULL;
  670. StringBuffer suffixStr;
  671. std::string key;
  672. // Create the key used to look up previously-created objects
  673. suffixStr.append(pollTimeout);
  674. key = brokers + "+" + topic + "+" + suffixStr.str();
  675. {
  676. CriticalBlock block(lock);
  677. // Try to find a cached publisher
  678. pubObjPtr = cachedPublishers[key];
  679. if (pubObjPtr)
  680. {
  681. pubObjPtr->updateTimeTouched();
  682. }
  683. else
  684. {
  685. // Publisher for that set of brokers and topic does not exist; create one
  686. pubObjPtr = new Publisher(brokers, topic, pollTimeout, traceLevel);
  687. cachedPublishers[key] = pubObjPtr;
  688. if (traceLevel > 4)
  689. {
  690. DBGLOG("Kafka: Created and cached new publisher object: %s @ %s", topic.c_str(), brokers.c_str());
  691. }
  692. }
  693. }
  694. if (!pubObjPtr)
  695. {
  696. throw MakeStringException(-1, "Kafka: Unable to create publisher for brokers '%s' and topic '%s'", brokers.c_str(), topic.c_str());
  697. }
  698. return pubObjPtr;
  699. }
  700. private:
  701. ObjMap cachedPublishers; //!< std::map of created Publisher object pointers
  702. CriticalSection lock; //!< Mutex guarding modifications to cachedPublishers
  703. int traceLevel; //!< The current logging level
  704. } *publisherCache;
  705. //--------------------------------------------------------------------------
  706. /** @class PublisherCacheExpirerObj
  707. * Class used to expire old publisher objects held within publisherCache
  708. */
  709. static class PublisherCacheExpirerObj : public Thread
  710. {
  711. public:
  712. PublisherCacheExpirerObj()
  713. : Thread("Kafka::PublisherExpirer"),
  714. shouldRun(false)
  715. {
  716. }
  717. virtual void start()
  718. {
  719. if (!isAlive())
  720. {
  721. shouldRun = true;
  722. Thread::start();
  723. }
  724. }
  725. virtual void stop()
  726. {
  727. if (isAlive())
  728. {
  729. shouldRun = false;
  730. join();
  731. }
  732. }
  733. virtual int run()
  734. {
  735. while (shouldRun)
  736. {
  737. if (publisherCache)
  738. {
  739. publisherCache->expire();
  740. }
  741. usleep(1000);
  742. }
  743. return 0;
  744. }
  745. private:
  746. std::atomic_bool shouldRun; //!< If true, we should execute our thread's main event loop
  747. } *publisherCacheExpirer;
  748. //--------------------------------------------------------------------------
  749. // Lazy Initialization
  750. //--------------------------------------------------------------------------
  751. /**
  752. * Make sure the publisher object cache is initialized as well as the
  753. * associated background thread for expiring idle publishers. This is
  754. * called only once.
  755. *
  756. * @param traceLevel Current logging level
  757. */
  758. static void setupPublisherCache(int traceLevel)
  759. {
  760. KafkaPlugin::publisherCache = new KafkaPlugin::PublisherCacheObj(traceLevel);
  761. KafkaPlugin::publisherCacheExpirer = new KafkaPlugin::PublisherCacheExpirerObj;
  762. KafkaPlugin::publisherCacheExpirer->start();
  763. }
  764. //--------------------------------------------------------------------------
  765. // Advertised Entry Point Functions
  766. //--------------------------------------------------------------------------
  767. ECL_KAFKA_API bool ECL_KAFKA_CALL publishMessage(ICodeContext* ctx, const char* brokers, const char* topic, const char* message, const char* key)
  768. {
  769. std::call_once(pubCacheInitFlag, setupPublisherCache, ctx->queryContextLogger().queryTraceLevel());
  770. Publisher* pubObjPtr = publisherCache->getPublisher(brokers, topic, POLL_TIMEOUT);
  771. pubObjPtr->sendMessage(message, key);
  772. return true;
  773. }
  774. ECL_KAFKA_API bool ECL_KAFKA_CALL publishMessage(ICodeContext* ctx, const char* brokers, const char* topic, size32_t lenMessage, const char* message, size32_t lenKey, const char* key)
  775. {
  776. std::call_once(pubCacheInitFlag, setupPublisherCache, ctx->queryContextLogger().queryTraceLevel());
  777. Publisher* pubObjPtr = publisherCache->getPublisher(brokers, topic, POLL_TIMEOUT);
  778. std::string messageStr(message, rtlUtf8Size(lenMessage, message));
  779. std::string keyStr(key, rtlUtf8Size(lenKey, key));
  780. pubObjPtr->sendMessage(messageStr, keyStr);
  781. return true;
  782. }
  783. ECL_KAFKA_API __int32 ECL_KAFKA_CALL getTopicPartitionCount(ICodeContext* ctx, const char* brokers, const char* topic)
  784. {
  785. // We have to use librdkafka's C API for this right now, as the C++ API
  786. // does not expose a topic's metadata. In addition, there is no easy
  787. // link between the exposed C++ objects and the structs used by the
  788. // C API, so we are basically creating a brand-new connection from
  789. // scratch.
  790. int traceLevel = ctx->queryContextLogger().queryTraceLevel();
  791. __int32 pCount = 0;
  792. char errstr[512];
  793. RdKafka::Conf* globalConfig = RdKafka::Conf::create(RdKafka::Conf::CONF_GLOBAL);
  794. if (globalConfig)
  795. {
  796. // Load global config to pick up any protocol modifications
  797. applyConfig(GLOBAL_CONFIG_FILENAME, globalConfig, traceLevel);
  798. // rd_kafka_new() takes ownership of the lower-level conf object, which in this case is a
  799. // pointer currently owned by globalConfig; we need to pass a duplicate
  800. // the conf pointer to rd_kafka_new() so we don't mangle globalConfig's internals
  801. rd_kafka_conf_t* conf = rd_kafka_conf_dup(globalConfig->c_ptr_global());
  802. rd_kafka_t* rk = rd_kafka_new(RD_KAFKA_CONSUMER, conf, errstr, sizeof(errstr));
  803. delete globalConfig;
  804. if (rk)
  805. {
  806. if (rd_kafka_brokers_add(rk, brokers) != 0)
  807. {
  808. rd_kafka_topic_conf_t* topic_conf = rd_kafka_topic_conf_new();
  809. rd_kafka_topic_t* rkt = rd_kafka_topic_new(rk, topic, topic_conf);
  810. if (rkt)
  811. {
  812. const struct rd_kafka_metadata* metadata = NULL;
  813. rd_kafka_resp_err_t err = rd_kafka_metadata(rk, 0, rkt, &metadata, 5000);
  814. if (err == RD_KAFKA_RESP_ERR_NO_ERROR)
  815. {
  816. pCount = metadata->topics[0].partition_cnt;
  817. rd_kafka_metadata_destroy(metadata);
  818. }
  819. else
  820. {
  821. DBGLOG("Kafka: Error retrieving metadata from topic: %s @ %s: '%s'", topic, brokers, rd_kafka_err2str(err));
  822. }
  823. rd_kafka_topic_destroy(rkt);
  824. }
  825. else
  826. {
  827. if (traceLevel > 4)
  828. {
  829. DBGLOG("Kafka: Could not create topic configuration object: %s @ %s", topic, brokers);
  830. }
  831. }
  832. }
  833. else
  834. {
  835. if (traceLevel > 4)
  836. {
  837. DBGLOG("Kafka: Could not add brokers: %s @ %s", topic, brokers);
  838. }
  839. }
  840. rd_kafka_destroy(rk);
  841. }
  842. else
  843. {
  844. DBGLOG("Kafka: Could not create consumer configuration object : %s @ %s: '%s'", topic, brokers, errstr);
  845. }
  846. }
  847. else
  848. {
  849. if (traceLevel > 4)
  850. {
  851. DBGLOG("Kafka: Could not create global configuration object: %s @ %s", topic, brokers);
  852. }
  853. }
  854. if (pCount == 0)
  855. {
  856. DBGLOG("Kafka: Unable to retrieve partition count from topic: %s @ %s", topic, brokers);
  857. }
  858. return pCount;
  859. }
  860. ECL_KAFKA_API IRowStream* ECL_KAFKA_CALL getMessageDataset(ICodeContext* ctx, IEngineRowAllocator* allocator, const char* brokers, const char* topic, const char* consumerGroup, __int32 partitionNum, __int64 maxRecords)
  861. {
  862. Consumer* consumerObjPtr = new Consumer(brokers, topic, consumerGroup, partitionNum, ctx->queryContextLogger().queryTraceLevel());
  863. try
  864. {
  865. consumerObjPtr->prepForMessageFetch();
  866. }
  867. catch(...)
  868. {
  869. delete(consumerObjPtr);
  870. throw;
  871. }
  872. return new KafkaStreamedDataset(consumerObjPtr, allocator, ctx->queryContextLogger().queryTraceLevel(), maxRecords);
  873. }
  874. ECL_KAFKA_API __int64 ECL_KAFKA_CALL setMessageOffset(ICodeContext* ctx, const char* brokers, const char* topic, const char* consumerGroup, __int32 partitionNum, __int64 newOffset)
  875. {
  876. Consumer consumerObj(brokers, topic, consumerGroup, partitionNum, ctx->queryContextLogger().queryTraceLevel());
  877. consumerObj.commitOffset(newOffset);
  878. return newOffset;
  879. }
  880. }
  881. //==============================================================================
  882. // Plugin Initialization and Teardown
  883. //==============================================================================
  884. #define CURRENT_KAFKA_VERSION "kafka plugin 1.1.0"
  885. static const char* kafkaCompatibleVersions[] = {
  886. "kafka plugin 1.0.0",
  887. CURRENT_KAFKA_VERSION,
  888. NULL };
  889. ECL_KAFKA_API bool getECLPluginDefinition(ECLPluginDefinitionBlock* pb)
  890. {
  891. if (pb->size == sizeof(ECLPluginDefinitionBlockEx))
  892. {
  893. ECLPluginDefinitionBlockEx* pbx = static_cast<ECLPluginDefinitionBlockEx*>(pb);
  894. pbx->compatibleVersions = kafkaCompatibleVersions;
  895. }
  896. else if (pb->size != sizeof(ECLPluginDefinitionBlock))
  897. {
  898. return false;
  899. }
  900. pb->magicVersion = PLUGIN_VERSION;
  901. pb->version = CURRENT_KAFKA_VERSION;
  902. pb->moduleName = "kafka";
  903. pb->ECL = NULL;
  904. pb->flags = PLUGIN_IMPLICIT_MODULE;
  905. pb->description = "ECL plugin library for the C++ API in librdkafka++";
  906. return true;
  907. }
  908. MODULE_INIT(INIT_PRIORITY_STANDARD)
  909. {
  910. KafkaPlugin::publisherCache = NULL;
  911. KafkaPlugin::publisherCacheExpirer = NULL;
  912. return true;
  913. }
  914. MODULE_EXIT()
  915. {
  916. // Delete the background thread expiring items from the publisher cache
  917. // before deleting the publisher cache
  918. if (KafkaPlugin::publisherCacheExpirer)
  919. {
  920. KafkaPlugin::publisherCacheExpirer->stop();
  921. delete(KafkaPlugin::publisherCacheExpirer);
  922. KafkaPlugin::publisherCacheExpirer = NULL;
  923. }
  924. if (KafkaPlugin::publisherCache)
  925. {
  926. KafkaPlugin::publisherCache->deleteAll();
  927. delete(KafkaPlugin::publisherCache);
  928. KafkaPlugin::publisherCache = NULL;
  929. }
  930. RdKafka::wait_destroyed(3000);
  931. }