From f120427607986de82f99f2ba9a2eb0ff136c2bae Mon Sep 17 00:00:00 2001 From: leiline Date: Fri, 24 May 2019 10:32:31 +0800 Subject: [PATCH] HUDI-105 : Fix up offsets not available on leader exception (#650) * Fix up offsets not available on leader exception --- .../sources/helpers/KafkaOffsetGen.java | 23 ++++++++++++++++++- 1 file changed, 22 insertions(+), 1 deletion(-) diff --git a/hoodie-utilities/src/main/java/com/uber/hoodie/utilities/sources/helpers/KafkaOffsetGen.java b/hoodie-utilities/src/main/java/com/uber/hoodie/utilities/sources/helpers/KafkaOffsetGen.java index 947f3c48a..5a4b72796 100644 --- a/hoodie-utilities/src/main/java/com/uber/hoodie/utilities/sources/helpers/KafkaOffsetGen.java +++ b/hoodie-utilities/src/main/java/com/uber/hoodie/utilities/sources/helpers/KafkaOffsetGen.java @@ -204,8 +204,9 @@ public class KafkaOffsetGen { // Determine the offset ranges to read from HashMap fromOffsets; + HashMap checkpointOffsets; if (lastCheckpointStr.isPresent()) { - fromOffsets = CheckpointUtils.strToOffsets(lastCheckpointStr.get()); + fromOffsets = checkupValidOffsets(cluster, lastCheckpointStr, topicPartitions); } else { KafkaResetOffsetStrategies autoResetValue = KafkaResetOffsetStrategies.valueOf( props.getString("auto.offset.reset", Config.DEFAULT_AUTO_RESET_OFFSET.toString()).toUpperCase()); @@ -235,6 +236,26 @@ public class KafkaOffsetGen { return offsetRanges; } + // check up checkpoint offsets is valid or not, if true, return checkpoint offsets, + // else return earliest offsets + private HashMap checkupValidOffsets( + KafkaCluster cluster, + Optional lastCheckpointStr, + Set topicPartitions) { + HashMap checkpointOffsets = + CheckpointUtils.strToOffsets(lastCheckpointStr.get()); + HashMap earliestOffsets = + new HashMap(ScalaHelpers.toJavaMap( + cluster.getEarliestLeaderOffsets(topicPartitions).right().get())); + + boolean checkpointOffsetReseter = checkpointOffsets.entrySet() + .stream() + .anyMatch(offset -> offset.getValue().offset() + < earliestOffsets.get(offset.getKey()).offset()); + return checkpointOffsetReseter ? earliestOffsets : checkpointOffsets; + } + + public String getTopicName() { return topicName; }