[HUDI-1282] Check whether the topic exists before deltastrmer consumes Kafka (#2090)
This commit is contained in:
@@ -172,6 +172,9 @@ public class KafkaOffsetGen {
|
|||||||
Map<TopicPartition, Long> fromOffsets;
|
Map<TopicPartition, Long> fromOffsets;
|
||||||
Map<TopicPartition, Long> toOffsets;
|
Map<TopicPartition, Long> toOffsets;
|
||||||
try (KafkaConsumer consumer = new KafkaConsumer(kafkaParams)) {
|
try (KafkaConsumer consumer = new KafkaConsumer(kafkaParams)) {
|
||||||
|
if (!checkTopicExists(consumer)) {
|
||||||
|
throw new HoodieException("Kafka topic:" + topicName + " does not exist");
|
||||||
|
}
|
||||||
List<PartitionInfo> partitionInfoList;
|
List<PartitionInfo> partitionInfoList;
|
||||||
partitionInfoList = consumer.partitionsFor(topicName);
|
partitionInfoList = consumer.partitionsFor(topicName);
|
||||||
Set<TopicPartition> topicPartitions = partitionInfoList.stream()
|
Set<TopicPartition> topicPartitions = partitionInfoList.stream()
|
||||||
@@ -230,6 +233,16 @@ public class KafkaOffsetGen {
|
|||||||
return checkpointOffsetReseter ? earliestOffsets : checkpointOffsets;
|
return checkpointOffsetReseter ? earliestOffsets : checkpointOffsets;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Check if topic exists.
|
||||||
|
* @param consumer kafka consumer
|
||||||
|
* @return
|
||||||
|
*/
|
||||||
|
public boolean checkTopicExists(KafkaConsumer consumer) {
|
||||||
|
Map<String, List<PartitionInfo>> result = consumer.listTopics();
|
||||||
|
return result.containsKey(topicName);
|
||||||
|
}
|
||||||
|
|
||||||
public String getTopicName() {
|
public String getTopicName() {
|
||||||
return topicName;
|
return topicName;
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user