diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/KafkaOffsetGen.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/KafkaOffsetGen.java index bc29be926..3a17a7fbe 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/KafkaOffsetGen.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/KafkaOffsetGen.java @@ -157,9 +157,9 @@ public class KafkaOffsetGen { private static final String KAFKA_TOPIC_NAME = "hoodie.deltastreamer.source.kafka.topic"; private static final String MAX_EVENTS_FROM_KAFKA_SOURCE_PROP = "hoodie.deltastreamer.kafka.source.maxEvents"; - // "auto.reset.offsets" is kafka native config param. Do not change the config param name. - public static final String KAFKA_AUTO_RESET_OFFSETS = "auto.reset.offsets"; - private static final KafkaResetOffsetStrategies DEFAULT_KAFKA_AUTO_RESET_OFFSETS = KafkaResetOffsetStrategies.LATEST; + // "auto.offset.reset" is kafka native config param. Do not change the config param name. + public static final String KAFKA_AUTO_OFFSET_RESET = "auto.offset.reset"; + private static final KafkaResetOffsetStrategies DEFAULT_KAFKA_AUTO_OFFSET_RESET = KafkaResetOffsetStrategies.LATEST; public static final long DEFAULT_MAX_EVENTS_FROM_KAFKA_SOURCE = 5000000; public static long maxEventsFromKafkaSource = DEFAULT_MAX_EVENTS_FROM_KAFKA_SOURCE; } @@ -182,7 +182,7 @@ public class KafkaOffsetGen { }); DataSourceUtils.checkRequiredProperties(props, Collections.singletonList(Config.KAFKA_TOPIC_NAME)); topicName = props.getString(Config.KAFKA_TOPIC_NAME); - String kafkaAutoResetOffsetsStr = props.getString(Config.KAFKA_AUTO_RESET_OFFSETS, Config.DEFAULT_KAFKA_AUTO_RESET_OFFSETS.name().toLowerCase()); + String kafkaAutoResetOffsetsStr = props.getString(Config.KAFKA_AUTO_OFFSET_RESET, Config.DEFAULT_KAFKA_AUTO_OFFSET_RESET.name().toLowerCase()); boolean found = false; for (KafkaResetOffsetStrategies entry: KafkaResetOffsetStrategies.values()) { if (entry.name().toLowerCase().equals(kafkaAutoResetOffsetsStr)) { @@ -192,7 +192,7 @@ public class KafkaOffsetGen { } } if (!found) { - throw new HoodieDeltaStreamerException(Config.KAFKA_AUTO_RESET_OFFSETS + " config set to unknown value " + kafkaAutoResetOffsetsStr); + throw new HoodieDeltaStreamerException(Config.KAFKA_AUTO_OFFSET_RESET + " config set to unknown value " + kafkaAutoResetOffsetsStr); } } diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieDeltaStreamer.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieDeltaStreamer.java index f9d016214..7d4db2c12 100644 --- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieDeltaStreamer.java +++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieDeltaStreamer.java @@ -267,7 +267,7 @@ public class TestHoodieDeltaStreamer extends UtilitiesTestBase { protected static void populateCommonKafkaProps(TypedProperties props) { //Kafka source properties props.setProperty("bootstrap.servers", testUtils.brokerAddress()); - props.setProperty(Config.KAFKA_AUTO_RESET_OFFSETS, "earliest"); + props.setProperty(Config.KAFKA_AUTO_OFFSET_RESET, "earliest"); props.setProperty("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); props.setProperty("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); props.setProperty("hoodie.deltastreamer.kafka.source.maxEvents", String.valueOf(5000)); @@ -1352,7 +1352,7 @@ public class TestHoodieDeltaStreamer extends UtilitiesTestBase { props.setProperty("hoodie.deltastreamer.source.kafka.topic",topicName); props.setProperty("hoodie.deltastreamer.schemaprovider.source.schema.file", dfsBasePath + "/source_uber.avsc"); props.setProperty("hoodie.deltastreamer.schemaprovider.target.schema.file", dfsBasePath + "/target_uber.avsc"); - props.setProperty(Config.KAFKA_AUTO_RESET_OFFSETS, autoResetValue); + props.setProperty(Config.KAFKA_AUTO_OFFSET_RESET, autoResetValue); UtilitiesTestBase.Helpers.savePropsToDFS(props, dfs, dfsBasePath + "/" + propsFileName); } diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestKafkaSource.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestKafkaSource.java index cf05ae6be..0c0dcc322 100644 --- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestKafkaSource.java +++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestKafkaSource.java @@ -88,7 +88,7 @@ public class TestKafkaSource extends UtilitiesTestBase { TypedProperties props = new TypedProperties(); props.setProperty("hoodie.deltastreamer.source.kafka.topic", TEST_TOPIC_NAME); props.setProperty("bootstrap.servers", testUtils.brokerAddress()); - props.setProperty(Config.KAFKA_AUTO_RESET_OFFSETS, resetStrategy); + props.setProperty(Config.KAFKA_AUTO_OFFSET_RESET, resetStrategy); props.setProperty("hoodie.deltastreamer.kafka.source.maxEvents", maxEventsToReadFromKafkaSource != null ? String.valueOf(maxEventsToReadFromKafkaSource) : String.valueOf(Config.maxEventsFromKafkaSource));