[HUDI-1835] Fixing kafka native config param for auto offset reset (#2864)
This commit is contained in:
committed by
GitHub
parent
1b27259b53
commit
3e4fa170cf
@@ -157,9 +157,9 @@ public class KafkaOffsetGen {
|
||||
|
||||
private static final String KAFKA_TOPIC_NAME = "hoodie.deltastreamer.source.kafka.topic";
|
||||
private static final String MAX_EVENTS_FROM_KAFKA_SOURCE_PROP = "hoodie.deltastreamer.kafka.source.maxEvents";
|
||||
// "auto.reset.offsets" is kafka native config param. Do not change the config param name.
|
||||
public static final String KAFKA_AUTO_RESET_OFFSETS = "auto.reset.offsets";
|
||||
private static final KafkaResetOffsetStrategies DEFAULT_KAFKA_AUTO_RESET_OFFSETS = KafkaResetOffsetStrategies.LATEST;
|
||||
// "auto.offset.reset" is kafka native config param. Do not change the config param name.
|
||||
public static final String KAFKA_AUTO_OFFSET_RESET = "auto.offset.reset";
|
||||
private static final KafkaResetOffsetStrategies DEFAULT_KAFKA_AUTO_OFFSET_RESET = KafkaResetOffsetStrategies.LATEST;
|
||||
public static final long DEFAULT_MAX_EVENTS_FROM_KAFKA_SOURCE = 5000000;
|
||||
public static long maxEventsFromKafkaSource = DEFAULT_MAX_EVENTS_FROM_KAFKA_SOURCE;
|
||||
}
|
||||
@@ -182,7 +182,7 @@ public class KafkaOffsetGen {
|
||||
});
|
||||
DataSourceUtils.checkRequiredProperties(props, Collections.singletonList(Config.KAFKA_TOPIC_NAME));
|
||||
topicName = props.getString(Config.KAFKA_TOPIC_NAME);
|
||||
String kafkaAutoResetOffsetsStr = props.getString(Config.KAFKA_AUTO_RESET_OFFSETS, Config.DEFAULT_KAFKA_AUTO_RESET_OFFSETS.name().toLowerCase());
|
||||
String kafkaAutoResetOffsetsStr = props.getString(Config.KAFKA_AUTO_OFFSET_RESET, Config.DEFAULT_KAFKA_AUTO_OFFSET_RESET.name().toLowerCase());
|
||||
boolean found = false;
|
||||
for (KafkaResetOffsetStrategies entry: KafkaResetOffsetStrategies.values()) {
|
||||
if (entry.name().toLowerCase().equals(kafkaAutoResetOffsetsStr)) {
|
||||
@@ -192,7 +192,7 @@ public class KafkaOffsetGen {
|
||||
}
|
||||
}
|
||||
if (!found) {
|
||||
throw new HoodieDeltaStreamerException(Config.KAFKA_AUTO_RESET_OFFSETS + " config set to unknown value " + kafkaAutoResetOffsetsStr);
|
||||
throw new HoodieDeltaStreamerException(Config.KAFKA_AUTO_OFFSET_RESET + " config set to unknown value " + kafkaAutoResetOffsetsStr);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -267,7 +267,7 @@ public class TestHoodieDeltaStreamer extends UtilitiesTestBase {
|
||||
protected static void populateCommonKafkaProps(TypedProperties props) {
|
||||
//Kafka source properties
|
||||
props.setProperty("bootstrap.servers", testUtils.brokerAddress());
|
||||
props.setProperty(Config.KAFKA_AUTO_RESET_OFFSETS, "earliest");
|
||||
props.setProperty(Config.KAFKA_AUTO_OFFSET_RESET, "earliest");
|
||||
props.setProperty("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
|
||||
props.setProperty("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
|
||||
props.setProperty("hoodie.deltastreamer.kafka.source.maxEvents", String.valueOf(5000));
|
||||
@@ -1352,7 +1352,7 @@ public class TestHoodieDeltaStreamer extends UtilitiesTestBase {
|
||||
props.setProperty("hoodie.deltastreamer.source.kafka.topic",topicName);
|
||||
props.setProperty("hoodie.deltastreamer.schemaprovider.source.schema.file", dfsBasePath + "/source_uber.avsc");
|
||||
props.setProperty("hoodie.deltastreamer.schemaprovider.target.schema.file", dfsBasePath + "/target_uber.avsc");
|
||||
props.setProperty(Config.KAFKA_AUTO_RESET_OFFSETS, autoResetValue);
|
||||
props.setProperty(Config.KAFKA_AUTO_OFFSET_RESET, autoResetValue);
|
||||
|
||||
UtilitiesTestBase.Helpers.savePropsToDFS(props, dfs, dfsBasePath + "/" + propsFileName);
|
||||
}
|
||||
|
||||
@@ -88,7 +88,7 @@ public class TestKafkaSource extends UtilitiesTestBase {
|
||||
TypedProperties props = new TypedProperties();
|
||||
props.setProperty("hoodie.deltastreamer.source.kafka.topic", TEST_TOPIC_NAME);
|
||||
props.setProperty("bootstrap.servers", testUtils.brokerAddress());
|
||||
props.setProperty(Config.KAFKA_AUTO_RESET_OFFSETS, resetStrategy);
|
||||
props.setProperty(Config.KAFKA_AUTO_OFFSET_RESET, resetStrategy);
|
||||
props.setProperty("hoodie.deltastreamer.kafka.source.maxEvents",
|
||||
maxEventsToReadFromKafkaSource != null ? String.valueOf(maxEventsToReadFromKafkaSource) :
|
||||
String.valueOf(Config.maxEventsFromKafkaSource));
|
||||
|
||||
Reference in New Issue
Block a user