[HUDI-596] Close KafkaConsumer every time (#1303)
This commit is contained in:
@@ -172,33 +172,35 @@ public class KafkaOffsetGen {
|
|||||||
public OffsetRange[] getNextOffsetRanges(Option<String> lastCheckpointStr, long sourceLimit) {
|
public OffsetRange[] getNextOffsetRanges(Option<String> lastCheckpointStr, long sourceLimit) {
|
||||||
|
|
||||||
// Obtain current metadata for the topic
|
// Obtain current metadata for the topic
|
||||||
KafkaConsumer consumer = new KafkaConsumer(kafkaParams);
|
|
||||||
List<PartitionInfo> partitionInfoList;
|
|
||||||
partitionInfoList = consumer.partitionsFor(topicName);
|
|
||||||
Set<TopicPartition> topicPartitions = partitionInfoList.stream()
|
|
||||||
.map(x -> new TopicPartition(x.topic(), x.partition())).collect(Collectors.toSet());
|
|
||||||
|
|
||||||
// Determine the offset ranges to read from
|
|
||||||
Map<TopicPartition, Long> fromOffsets;
|
Map<TopicPartition, Long> fromOffsets;
|
||||||
if (lastCheckpointStr.isPresent()) {
|
Map<TopicPartition, Long> toOffsets;
|
||||||
fromOffsets = checkupValidOffsets(consumer, lastCheckpointStr, topicPartitions);
|
try (KafkaConsumer consumer = new KafkaConsumer(kafkaParams)) {
|
||||||
} else {
|
List<PartitionInfo> partitionInfoList;
|
||||||
KafkaResetOffsetStrategies autoResetValue = KafkaResetOffsetStrategies
|
partitionInfoList = consumer.partitionsFor(topicName);
|
||||||
.valueOf(props.getString("auto.offset.reset", Config.DEFAULT_AUTO_RESET_OFFSET.toString()).toUpperCase());
|
Set<TopicPartition> topicPartitions = partitionInfoList.stream()
|
||||||
switch (autoResetValue) {
|
.map(x -> new TopicPartition(x.topic(), x.partition())).collect(Collectors.toSet());
|
||||||
case EARLIEST:
|
|
||||||
fromOffsets = consumer.beginningOffsets(topicPartitions);
|
|
||||||
break;
|
|
||||||
case LATEST:
|
|
||||||
fromOffsets = consumer.endOffsets(topicPartitions);
|
|
||||||
break;
|
|
||||||
default:
|
|
||||||
throw new HoodieNotSupportedException("Auto reset value must be one of 'smallest' or 'largest' ");
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Obtain the latest offsets.
|
// Determine the offset ranges to read from
|
||||||
Map<TopicPartition, Long> toOffsets = consumer.endOffsets(topicPartitions);
|
if (lastCheckpointStr.isPresent()) {
|
||||||
|
fromOffsets = checkupValidOffsets(consumer, lastCheckpointStr, topicPartitions);
|
||||||
|
} else {
|
||||||
|
KafkaResetOffsetStrategies autoResetValue = KafkaResetOffsetStrategies
|
||||||
|
.valueOf(props.getString("auto.offset.reset", Config.DEFAULT_AUTO_RESET_OFFSET.toString()).toUpperCase());
|
||||||
|
switch (autoResetValue) {
|
||||||
|
case EARLIEST:
|
||||||
|
fromOffsets = consumer.beginningOffsets(topicPartitions);
|
||||||
|
break;
|
||||||
|
case LATEST:
|
||||||
|
fromOffsets = consumer.endOffsets(topicPartitions);
|
||||||
|
break;
|
||||||
|
default:
|
||||||
|
throw new HoodieNotSupportedException("Auto reset value must be one of 'smallest' or 'largest' ");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Obtain the latest offsets.
|
||||||
|
toOffsets = consumer.endOffsets(topicPartitions);
|
||||||
|
}
|
||||||
|
|
||||||
// Come up with final set of OffsetRanges to read (account for new partitions, limit number of events)
|
// Come up with final set of OffsetRanges to read (account for new partitions, limit number of events)
|
||||||
long maxEventsToReadFromKafka = props.getLong(Config.MAX_EVENTS_FROM_KAFKA_SOURCE_PROP,
|
long maxEventsToReadFromKafka = props.getLong(Config.MAX_EVENTS_FROM_KAFKA_SOURCE_PROP,
|
||||||
|
|||||||
Reference in New Issue
Block a user