1
0

HUDI-105 : Fix up offsets not available on leader exception (#650)

* Fix up offsets not available on leader exception
This commit is contained in:
leiline
2019-05-24 10:32:31 +08:00
committed by Balaji Varadarajan
parent 2fe526d548
commit f120427607

View File

@@ -204,8 +204,9 @@ public class KafkaOffsetGen {
// Determine the offset ranges to read from
HashMap<TopicAndPartition, KafkaCluster.LeaderOffset> fromOffsets;
HashMap<TopicAndPartition, KafkaCluster.LeaderOffset> checkpointOffsets;
if (lastCheckpointStr.isPresent()) {
fromOffsets = CheckpointUtils.strToOffsets(lastCheckpointStr.get());
fromOffsets = checkupValidOffsets(cluster, lastCheckpointStr, topicPartitions);
} else {
KafkaResetOffsetStrategies autoResetValue = KafkaResetOffsetStrategies.valueOf(
props.getString("auto.offset.reset", Config.DEFAULT_AUTO_RESET_OFFSET.toString()).toUpperCase());
@@ -235,6 +236,26 @@ public class KafkaOffsetGen {
return offsetRanges;
}
// check up checkpoint offsets is valid or not, if true, return checkpoint offsets,
// else return earliest offsets
private HashMap<TopicAndPartition, KafkaCluster.LeaderOffset> checkupValidOffsets(
KafkaCluster cluster,
Optional<String> lastCheckpointStr,
Set<TopicAndPartition> topicPartitions) {
HashMap<TopicAndPartition, KafkaCluster.LeaderOffset> checkpointOffsets =
CheckpointUtils.strToOffsets(lastCheckpointStr.get());
HashMap<TopicAndPartition, KafkaCluster.LeaderOffset> earliestOffsets =
new HashMap(ScalaHelpers.toJavaMap(
cluster.getEarliestLeaderOffsets(topicPartitions).right().get()));
boolean checkpointOffsetReseter = checkpointOffsets.entrySet()
.stream()
.anyMatch(offset -> offset.getValue().offset()
< earliestOffsets.get(offset.getKey()).offset());
return checkpointOffsetReseter ? earliestOffsets : checkpointOffsets;
}
public String getTopicName() {
return topicName;
}