diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/KafkaOffsetGen.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/KafkaOffsetGen.java index 23d3c8d61..e958f85ed 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/KafkaOffsetGen.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/KafkaOffsetGen.java @@ -172,6 +172,9 @@ public class KafkaOffsetGen { Map fromOffsets; Map toOffsets; try (KafkaConsumer consumer = new KafkaConsumer(kafkaParams)) { + if (!checkTopicExists(consumer)) { + throw new HoodieException("Kafka topic:" + topicName + " does not exist"); + } List partitionInfoList; partitionInfoList = consumer.partitionsFor(topicName); Set topicPartitions = partitionInfoList.stream() @@ -230,6 +233,16 @@ public class KafkaOffsetGen { return checkpointOffsetReseter ? earliestOffsets : checkpointOffsets; } + /** + * Check if topic exists. + * @param consumer kafka consumer + * @return + */ + public boolean checkTopicExists(KafkaConsumer consumer) { + Map> result = consumer.listTopics(); + return result.containsKey(topicName); + } + public String getTopicName() { return topicName; }