[HUDI-4122] Fix NPE caused by adding kafka nodes (#5632)
This commit is contained in:
@@ -22,17 +22,17 @@ import org.apache.hudi.DataSourceUtils;
|
|||||||
import org.apache.hudi.common.config.ConfigProperty;
|
import org.apache.hudi.common.config.ConfigProperty;
|
||||||
import org.apache.hudi.common.config.TypedProperties;
|
import org.apache.hudi.common.config.TypedProperties;
|
||||||
import org.apache.hudi.common.util.Option;
|
import org.apache.hudi.common.util.Option;
|
||||||
import org.apache.hudi.utilities.exception.HoodieDeltaStreamerException;
|
|
||||||
import org.apache.hudi.exception.HoodieException;
|
import org.apache.hudi.exception.HoodieException;
|
||||||
import org.apache.hudi.exception.HoodieNotSupportedException;
|
import org.apache.hudi.exception.HoodieNotSupportedException;
|
||||||
|
|
||||||
import org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamerMetrics;
|
import org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamerMetrics;
|
||||||
|
import org.apache.hudi.utilities.exception.HoodieDeltaStreamerException;
|
||||||
import org.apache.hudi.utilities.sources.AvroKafkaSource;
|
import org.apache.hudi.utilities.sources.AvroKafkaSource;
|
||||||
|
|
||||||
import org.apache.kafka.clients.consumer.CommitFailedException;
|
import org.apache.kafka.clients.consumer.CommitFailedException;
|
||||||
import org.apache.kafka.clients.consumer.ConsumerConfig;
|
import org.apache.kafka.clients.consumer.ConsumerConfig;
|
||||||
import org.apache.kafka.clients.consumer.KafkaConsumer;
|
import org.apache.kafka.clients.consumer.KafkaConsumer;
|
||||||
import org.apache.kafka.clients.consumer.OffsetAndTimestamp;
|
|
||||||
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
|
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
|
||||||
|
import org.apache.kafka.clients.consumer.OffsetAndTimestamp;
|
||||||
import org.apache.kafka.common.PartitionInfo;
|
import org.apache.kafka.common.PartitionInfo;
|
||||||
import org.apache.kafka.common.TopicPartition;
|
import org.apache.kafka.common.TopicPartition;
|
||||||
import org.apache.kafka.common.errors.TimeoutException;
|
import org.apache.kafka.common.errors.TimeoutException;
|
||||||
@@ -48,6 +48,7 @@ import java.util.HashSet;
|
|||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.Set;
|
import java.util.Set;
|
||||||
|
import java.util.concurrent.TimeUnit;
|
||||||
import java.util.function.Function;
|
import java.util.function.Function;
|
||||||
import java.util.regex.Matcher;
|
import java.util.regex.Matcher;
|
||||||
import java.util.regex.Pattern;
|
import java.util.regex.Pattern;
|
||||||
@@ -169,9 +170,14 @@ public class KafkaOffsetGen {
|
|||||||
.withDocumentation("Kafka topic name.");
|
.withDocumentation("Kafka topic name.");
|
||||||
|
|
||||||
public static final ConfigProperty<String> KAFKA_CHECKPOINT_TYPE = ConfigProperty
|
public static final ConfigProperty<String> KAFKA_CHECKPOINT_TYPE = ConfigProperty
|
||||||
.key("hoodie.deltastreamer.source.kafka.checkpoint.type")
|
.key("hoodie.deltastreamer.source.kafka.checkpoint.type")
|
||||||
.defaultValue("string")
|
.defaultValue("string")
|
||||||
.withDocumentation("Kafka chepoint type.");
|
.withDocumentation("Kafka checkpoint type.");
|
||||||
|
|
||||||
|
public static final ConfigProperty<Long> KAFKA_FETCH_PARTITION_TIME_OUT = ConfigProperty
|
||||||
|
.key("hoodie.deltastreamer.source.kafka.fetch_partition.time.out")
|
||||||
|
.defaultValue(300 * 1000L)
|
||||||
|
.withDocumentation("Time out for fetching partitions. 5min by default");
|
||||||
|
|
||||||
public static final ConfigProperty<Boolean> ENABLE_KAFKA_COMMIT_OFFSET = ConfigProperty
|
public static final ConfigProperty<Boolean> ENABLE_KAFKA_COMMIT_OFFSET = ConfigProperty
|
||||||
.key("hoodie.deltastreamer.source.kafka.enable.commit.offset")
|
.key("hoodie.deltastreamer.source.kafka.enable.commit.offset")
|
||||||
@@ -236,8 +242,7 @@ public class KafkaOffsetGen {
|
|||||||
if (!checkTopicExists(consumer)) {
|
if (!checkTopicExists(consumer)) {
|
||||||
throw new HoodieException("Kafka topic:" + topicName + " does not exist");
|
throw new HoodieException("Kafka topic:" + topicName + " does not exist");
|
||||||
}
|
}
|
||||||
List<PartitionInfo> partitionInfoList;
|
List<PartitionInfo> partitionInfoList = fetchPartitionInfos(consumer, topicName);
|
||||||
partitionInfoList = consumer.partitionsFor(topicName);
|
|
||||||
Set<TopicPartition> topicPartitions = partitionInfoList.stream()
|
Set<TopicPartition> topicPartitions = partitionInfoList.stream()
|
||||||
.map(x -> new TopicPartition(x.topic(), x.partition())).collect(Collectors.toSet());
|
.map(x -> new TopicPartition(x.topic(), x.partition())).collect(Collectors.toSet());
|
||||||
|
|
||||||
@@ -287,6 +292,32 @@ public class KafkaOffsetGen {
|
|||||||
return CheckpointUtils.computeOffsetRanges(fromOffsets, toOffsets, numEvents);
|
return CheckpointUtils.computeOffsetRanges(fromOffsets, toOffsets, numEvents);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Fetch partition infos for given topic.
|
||||||
|
*
|
||||||
|
* @param consumer
|
||||||
|
* @param topicName
|
||||||
|
*/
|
||||||
|
private List<PartitionInfo> fetchPartitionInfos(KafkaConsumer consumer, String topicName) {
|
||||||
|
long timeout = this.props.getLong(Config.KAFKA_FETCH_PARTITION_TIME_OUT.key(), Config.KAFKA_FETCH_PARTITION_TIME_OUT.defaultValue());
|
||||||
|
long start = System.currentTimeMillis();
|
||||||
|
|
||||||
|
List<PartitionInfo> partitionInfos;
|
||||||
|
do {
|
||||||
|
partitionInfos = consumer.partitionsFor(topicName);
|
||||||
|
try {
|
||||||
|
TimeUnit.SECONDS.sleep(10);
|
||||||
|
} catch (InterruptedException e) {
|
||||||
|
LOG.error("Sleep failed while fetching partitions");
|
||||||
|
}
|
||||||
|
} while (partitionInfos == null && (System.currentTimeMillis() <= (start + timeout)));
|
||||||
|
|
||||||
|
if (partitionInfos == null) {
|
||||||
|
throw new HoodieDeltaStreamerException(String.format("Can not find metadata for topic %s from kafka cluster", topicName));
|
||||||
|
}
|
||||||
|
return partitionInfos;
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Fetch checkpoint offsets for each partition.
|
* Fetch checkpoint offsets for each partition.
|
||||||
* @param consumer instance of {@link KafkaConsumer} to fetch offsets from.
|
* @param consumer instance of {@link KafkaConsumer} to fetch offsets from.
|
||||||
|
|||||||
Reference in New Issue
Block a user