|
@@ -130,12 +130,15 @@ namespace KafkaPlugin
|
|
|
|
|
|
KafkaStreamedDataset::~KafkaStreamedDataset()
|
|
|
{
|
|
|
- if (consumedRecCount > 0)
|
|
|
+ if (consumerPtr)
|
|
|
{
|
|
|
- consumerPtr->commitOffset(lastMsgOffset);
|
|
|
- }
|
|
|
+ if (consumedRecCount > 0)
|
|
|
+ {
|
|
|
+ consumerPtr->commitOffset(lastMsgOffset);
|
|
|
+ }
|
|
|
|
|
|
- delete(consumerPtr);
|
|
|
+ delete(consumerPtr);
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
const void* KafkaStreamedDataset::nextRow()
|
|
@@ -145,7 +148,7 @@ namespace KafkaPlugin
|
|
|
__int32 timeoutWait = 100; //!< Amount of time to wait between retries
|
|
|
__int32 attemptNum = 0;
|
|
|
|
|
|
- if (maxRecords <= 0 || consumedRecCount < maxRecords)
|
|
|
+ if (consumerPtr && (maxRecords <= 0 || consumedRecCount < maxRecords))
|
|
|
{
|
|
|
RdKafka::Message* messageObjPtr = NULL;
|
|
|
bool messageConsumed = false;
|
|
@@ -497,6 +500,22 @@ namespace KafkaPlugin
|
|
|
{
|
|
|
consumerPtr = NULL;
|
|
|
topicPtr = NULL;
|
|
|
+
|
|
|
+ char cpath[_MAX_DIR];
|
|
|
+
|
|
|
+ GetCurrentDirectory(_MAX_DIR, cpath);
|
|
|
+ offsetPath.append(cpath);
|
|
|
+ addPathSepChar(offsetPath);
|
|
|
+
|
|
|
+ offsetPath.append(topic.c_str());
|
|
|
+ offsetPath.append("-");
|
|
|
+ offsetPath.append(partitionNum);
|
|
|
+ if (!consumerGroup.empty())
|
|
|
+ {
|
|
|
+ offsetPath.append("-");
|
|
|
+ offsetPath.append(consumerGroup.c_str());
|
|
|
+ }
|
|
|
+ offsetPath.append(".offset");
|
|
|
}
|
|
|
|
|
|
Consumer::~Consumer()
|
|
@@ -562,6 +581,10 @@ namespace KafkaPlugin
|
|
|
// Ensure that some items are set a certain way
|
|
|
// by setting them after loading the external conf
|
|
|
topicConfPtr->set("auto.commit.enable", "false", errStr);
|
|
|
+ // Additional settings for updated librdkafka
|
|
|
+ topicConfPtr->set("enable.auto.commit", "false", errStr);
|
|
|
+ topicConfPtr->set("offset.store.method", "file", errStr);
|
|
|
+ topicConfPtr->set("offset.store.path", offsetPath.str(), errStr);
|
|
|
|
|
|
// Create the topic
|
|
|
topicPtr.store(RdKafka::Topic::create(consumerPtr, topic, topicConfPtr, errStr), std::memory_order_release);
|
|
@@ -589,7 +612,7 @@ namespace KafkaPlugin
|
|
|
return consumerPtr->consume(topicPtr, partitionNum, POLL_TIMEOUT);
|
|
|
}
|
|
|
|
|
|
- KafkaStreamedDataset* Consumer::getMessageDataset(IEngineRowAllocator* allocator, __int64 maxRecords)
|
|
|
+ void Consumer::prepForMessageFetch()
|
|
|
{
|
|
|
// Make sure we have a valid connection to the Kafka cluster
|
|
|
ensureSetup();
|
|
@@ -608,28 +631,11 @@ namespace KafkaPlugin
|
|
|
{
|
|
|
throw MakeStringException(-1, "Kafka: Failed to start Consumer read for %s:%d @ %s; error: %d", topic.c_str(), partitionNum, brokers.c_str(), startErr);
|
|
|
}
|
|
|
-
|
|
|
- return new KafkaStreamedDataset(this, allocator, traceLevel, maxRecords);
|
|
|
- }
|
|
|
-
|
|
|
- StringBuffer &Consumer::offsetFilePath(StringBuffer &offsetPath) const
|
|
|
- {
|
|
|
- offsetPath.append(topic.c_str());
|
|
|
- offsetPath.append("-");
|
|
|
- offsetPath.append(partitionNum);
|
|
|
- if (!consumerGroup.empty())
|
|
|
- {
|
|
|
- offsetPath.append("-");
|
|
|
- offsetPath.append(consumerGroup.c_str());
|
|
|
- }
|
|
|
- offsetPath.append(".offset");
|
|
|
-
|
|
|
- return offsetPath;
|
|
|
}
|
|
|
|
|
|
void Consumer::commitOffset(__int64 offset) const
|
|
|
{
|
|
|
- if (offset >= -1)
|
|
|
+ if (offset >= 0)
|
|
|
{
|
|
|
// Not using librdkafka's offset_store because it seems to be broken
|
|
|
// topicPtr->offset_store(partitionNum, offset);
|
|
@@ -639,9 +645,6 @@ namespace KafkaPlugin
|
|
|
// we left off; NOTE: librdkafka does not clean the topic name
|
|
|
// or consumer group name when constructing this path
|
|
|
// (which is actually a security concern), so we can't clean, either
|
|
|
- StringBuffer offsetPath;
|
|
|
- offsetFilePath(offsetPath);
|
|
|
-
|
|
|
std::ofstream outFile(offsetPath.str(), std::ofstream::trunc);
|
|
|
outFile << offset;
|
|
|
|
|
@@ -654,12 +657,9 @@ namespace KafkaPlugin
|
|
|
|
|
|
void Consumer::initFileOffsetIfNotExist() const
|
|
|
{
|
|
|
- StringBuffer offsetPath;
|
|
|
- offsetFilePath(offsetPath);
|
|
|
-
|
|
|
if (!checkFileExists(offsetPath.str()))
|
|
|
{
|
|
|
- commitOffset(-1);
|
|
|
+ commitOffset(0);
|
|
|
|
|
|
if (traceLevel > 4)
|
|
|
{
|
|
@@ -1000,7 +1000,17 @@ namespace KafkaPlugin
|
|
|
{
|
|
|
Consumer* consumerObjPtr = new Consumer(brokers, topic, consumerGroup, partitionNum, ctx->queryContextLogger().queryTraceLevel());
|
|
|
|
|
|
- return consumerObjPtr->getMessageDataset(allocator, maxRecords);
|
|
|
+ try
|
|
|
+ {
|
|
|
+ consumerObjPtr->prepForMessageFetch();
|
|
|
+ }
|
|
|
+ catch(...)
|
|
|
+ {
|
|
|
+ delete(consumerObjPtr);
|
|
|
+ throw;
|
|
|
+ }
|
|
|
+
|
|
|
+ return new KafkaStreamedDataset(consumerObjPtr, allocator, ctx->queryContextLogger().queryTraceLevel(), maxRecords);
|
|
|
}
|
|
|
|
|
|
ECL_KAFKA_API __int64 ECL_KAFKA_CALL setMessageOffset(ICodeContext* ctx, const char* brokers, const char* topic, const char* consumerGroup, __int32 partitionNum, __int64 newOffset)
|