diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/KafkaOffsetGen.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/KafkaOffsetGen.java index 7a3f82b07..789daf188 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/KafkaOffsetGen.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/KafkaOffsetGen.java @@ -157,7 +157,8 @@ public class KafkaOffsetGen { private static final String KAFKA_TOPIC_NAME = "hoodie.deltastreamer.source.kafka.topic"; private static final String MAX_EVENTS_FROM_KAFKA_SOURCE_PROP = "hoodie.deltastreamer.kafka.source.maxEvents"; - private static final String KAFKA_AUTO_RESET_OFFSETS = "hoodie.deltastreamer.source.kafka.auto.reset.offsets"; + // "auto.reset.offsets" is kafka native config param. Do not change the config param name. + public static final String KAFKA_AUTO_RESET_OFFSETS = "auto.reset.offsets"; private static final KafkaResetOffsetStrategies DEFAULT_KAFKA_AUTO_RESET_OFFSETS = KafkaResetOffsetStrategies.LATEST; public static final long DEFAULT_MAX_EVENTS_FROM_KAFKA_SOURCE = 5000000; public static long maxEventsFromKafkaSource = DEFAULT_MAX_EVENTS_FROM_KAFKA_SOURCE; @@ -206,8 +207,8 @@ public class KafkaOffsetGen { .map(x -> new TopicPartition(x.topic(), x.partition())).collect(Collectors.toSet()); // Determine the offset ranges to read from - if (lastCheckpointStr.isPresent() && !lastCheckpointStr.get().isEmpty()) { - fromOffsets = checkupValidOffsets(consumer, lastCheckpointStr, topicPartitions); + if (lastCheckpointStr.isPresent() && !lastCheckpointStr.get().isEmpty() && checkTopicCheckpoint(lastCheckpointStr)) { + fromOffsets = fetchValidOffsets(consumer, lastCheckpointStr, topicPartitions); metrics.updateDeltaStreamerKafkaDelayCountMetrics(delayOffsetCalculation(lastCheckpointStr, topicPartitions, consumer)); } else { switch (autoResetValue) { @@ -245,27 +246,20 @@ public class KafkaOffsetGen { return CheckpointUtils.computeOffsetRanges(fromOffsets, toOffsets, numEvents); } - // check up checkpoint offsets is valid or not, if true, return checkpoint offsets, - // else return earliest offsets - private Map checkupValidOffsets(KafkaConsumer consumer, + /** + * Fetch checkpoint offsets for each partition. + * @param consumer instance of {@link KafkaConsumer} to fetch offsets from. + * @param lastCheckpointStr last checkpoint string. + * @param topicPartitions set of topic partitions. + * @return a map of Topic partitions to offsets. + */ + private Map fetchValidOffsets(KafkaConsumer consumer, Option lastCheckpointStr, Set topicPartitions) { Map earliestOffsets = consumer.beginningOffsets(topicPartitions); - if (checkTopicCheckpoint(lastCheckpointStr)) { - Map checkpointOffsets = CheckpointUtils.strToOffsets(lastCheckpointStr.get()); - boolean checkpointOffsetReseter = checkpointOffsets.entrySet().stream() - .anyMatch(offset -> offset.getValue() < earliestOffsets.get(offset.getKey())); - return checkpointOffsetReseter ? earliestOffsets : checkpointOffsets; - } - - switch (autoResetValue) { - case EARLIEST: - return earliestOffsets; - case LATEST: - return consumer.endOffsets(topicPartitions); - default: - throw new HoodieNotSupportedException("Auto reset value must be one of 'earliest' or 'latest' "); - } - + Map checkpointOffsets = CheckpointUtils.strToOffsets(lastCheckpointStr.get()); + boolean checkpointOffsetReseter = checkpointOffsets.entrySet().stream() + .anyMatch(offset -> offset.getValue() < earliestOffsets.get(offset.getKey())); + return checkpointOffsetReseter ? earliestOffsets : checkpointOffsets; } private Long delayOffsetCalculation(Option lastCheckpointStr, Set topicPartitions, KafkaConsumer consumer) { diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieDeltaStreamer.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieDeltaStreamer.java index 8f3e04521..29dd3ae4d 100644 --- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieDeltaStreamer.java +++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieDeltaStreamer.java @@ -53,6 +53,7 @@ import org.apache.hudi.utilities.sources.InputBatch; import org.apache.hudi.utilities.sources.JsonKafkaSource; import org.apache.hudi.utilities.sources.ParquetDFSSource; import org.apache.hudi.utilities.sources.TestDataSource; +import org.apache.hudi.utilities.sources.helpers.KafkaOffsetGen.Config; import org.apache.hudi.utilities.testutils.UtilitiesTestBase; import org.apache.hudi.utilities.testutils.sources.DistributedTestDataSource; import org.apache.hudi.utilities.testutils.sources.config.SourceConfigs; @@ -259,7 +260,7 @@ public class TestHoodieDeltaStreamer extends UtilitiesTestBase { protected static void populateCommonKafkaProps(TypedProperties props) { //Kafka source properties props.setProperty("bootstrap.servers", testUtils.brokerAddress()); - props.setProperty("hoodie.deltastreamer.source.kafka.auto.reset.offsets", "earliest"); + props.setProperty(Config.KAFKA_AUTO_RESET_OFFSETS, "earliest"); props.setProperty("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); props.setProperty("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); props.setProperty("hoodie.deltastreamer.kafka.source.maxEvents", String.valueOf(5000)); @@ -1344,7 +1345,7 @@ public class TestHoodieDeltaStreamer extends UtilitiesTestBase { props.setProperty("hoodie.deltastreamer.source.kafka.topic",topicName); props.setProperty("hoodie.deltastreamer.schemaprovider.source.schema.file", dfsBasePath + "/source_uber.avsc"); props.setProperty("hoodie.deltastreamer.schemaprovider.target.schema.file", dfsBasePath + "/target_uber.avsc"); - props.setProperty("hoodie.deltastreamer.source.kafka.auto.reset.offsets", autoResetValue); + props.setProperty(Config.KAFKA_AUTO_RESET_OFFSETS, autoResetValue); UtilitiesTestBase.Helpers.savePropsToDFS(props, dfs, dfsBasePath + "/" + propsFileName); } diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestKafkaSource.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestKafkaSource.java index 9004c661b..cf05ae6be 100644 --- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestKafkaSource.java +++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestKafkaSource.java @@ -88,7 +88,7 @@ public class TestKafkaSource extends UtilitiesTestBase { TypedProperties props = new TypedProperties(); props.setProperty("hoodie.deltastreamer.source.kafka.topic", TEST_TOPIC_NAME); props.setProperty("bootstrap.servers", testUtils.brokerAddress()); - props.setProperty("hoodie.deltastreamer.source.kafka.auto.reset.offsets", resetStrategy); + props.setProperty(Config.KAFKA_AUTO_RESET_OFFSETS, resetStrategy); props.setProperty("hoodie.deltastreamer.kafka.source.maxEvents", maxEventsToReadFromKafkaSource != null ? String.valueOf(maxEventsToReadFromKafkaSource) : String.valueOf(Config.maxEventsFromKafkaSource));