From 6c84ef20ac737087ed76c12b2f3e40d9c3aa4b30 Mon Sep 17 00:00:00 2001 From: liujinhui <965147871@qq.com> Date: Wed, 16 Sep 2020 10:43:52 +0800 Subject: [PATCH] [HUDI-1282] Check whether the topic exists before deltastrmer consumes Kafka (#2090) --- .../utilities/sources/helpers/KafkaOffsetGen.java | 13 +++++++++++++ 1 file changed, 13 insertions(+) diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/KafkaOffsetGen.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/KafkaOffsetGen.java index 23d3c8d61..e958f85ed 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/KafkaOffsetGen.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/KafkaOffsetGen.java @@ -172,6 +172,9 @@ public class KafkaOffsetGen { Map fromOffsets; Map toOffsets; try (KafkaConsumer consumer = new KafkaConsumer(kafkaParams)) { + if (!checkTopicExists(consumer)) { + throw new HoodieException("Kafka topic:" + topicName + " does not exist"); + } List partitionInfoList; partitionInfoList = consumer.partitionsFor(topicName); Set topicPartitions = partitionInfoList.stream() @@ -230,6 +233,16 @@ public class KafkaOffsetGen { return checkpointOffsetReseter ? earliestOffsets : checkpointOffsets; } + /** + * Check if topic exists. + * @param consumer kafka consumer + * @return + */ + public boolean checkTopicExists(KafkaConsumer consumer) { + Map> result = consumer.listTopics(); + return result.containsKey(topicName); + } + public String getTopicName() { return topicName; }