Fixing kafka auto.reset.offsets config param key (#2691)
This commit is contained in:
committed by
GitHub
parent
55a489c769
commit
161d530f93
@@ -157,7 +157,8 @@ public class KafkaOffsetGen {
|
||||
|
||||
private static final String KAFKA_TOPIC_NAME = "hoodie.deltastreamer.source.kafka.topic";
|
||||
private static final String MAX_EVENTS_FROM_KAFKA_SOURCE_PROP = "hoodie.deltastreamer.kafka.source.maxEvents";
|
||||
private static final String KAFKA_AUTO_RESET_OFFSETS = "hoodie.deltastreamer.source.kafka.auto.reset.offsets";
|
||||
// "auto.reset.offsets" is kafka native config param. Do not change the config param name.
|
||||
public static final String KAFKA_AUTO_RESET_OFFSETS = "auto.reset.offsets";
|
||||
private static final KafkaResetOffsetStrategies DEFAULT_KAFKA_AUTO_RESET_OFFSETS = KafkaResetOffsetStrategies.LATEST;
|
||||
public static final long DEFAULT_MAX_EVENTS_FROM_KAFKA_SOURCE = 5000000;
|
||||
public static long maxEventsFromKafkaSource = DEFAULT_MAX_EVENTS_FROM_KAFKA_SOURCE;
|
||||
@@ -206,8 +207,8 @@ public class KafkaOffsetGen {
|
||||
.map(x -> new TopicPartition(x.topic(), x.partition())).collect(Collectors.toSet());
|
||||
|
||||
// Determine the offset ranges to read from
|
||||
if (lastCheckpointStr.isPresent() && !lastCheckpointStr.get().isEmpty()) {
|
||||
fromOffsets = checkupValidOffsets(consumer, lastCheckpointStr, topicPartitions);
|
||||
if (lastCheckpointStr.isPresent() && !lastCheckpointStr.get().isEmpty() && checkTopicCheckpoint(lastCheckpointStr)) {
|
||||
fromOffsets = fetchValidOffsets(consumer, lastCheckpointStr, topicPartitions);
|
||||
metrics.updateDeltaStreamerKafkaDelayCountMetrics(delayOffsetCalculation(lastCheckpointStr, topicPartitions, consumer));
|
||||
} else {
|
||||
switch (autoResetValue) {
|
||||
@@ -245,27 +246,20 @@ public class KafkaOffsetGen {
|
||||
return CheckpointUtils.computeOffsetRanges(fromOffsets, toOffsets, numEvents);
|
||||
}
|
||||
|
||||
// check up checkpoint offsets is valid or not, if true, return checkpoint offsets,
|
||||
// else return earliest offsets
|
||||
private Map<TopicPartition, Long> checkupValidOffsets(KafkaConsumer consumer,
|
||||
/**
|
||||
* Fetch checkpoint offsets for each partition.
|
||||
* @param consumer instance of {@link KafkaConsumer} to fetch offsets from.
|
||||
* @param lastCheckpointStr last checkpoint string.
|
||||
* @param topicPartitions set of topic partitions.
|
||||
* @return a map of Topic partitions to offsets.
|
||||
*/
|
||||
private Map<TopicPartition, Long> fetchValidOffsets(KafkaConsumer consumer,
|
||||
Option<String> lastCheckpointStr, Set<TopicPartition> topicPartitions) {
|
||||
Map<TopicPartition, Long> earliestOffsets = consumer.beginningOffsets(topicPartitions);
|
||||
if (checkTopicCheckpoint(lastCheckpointStr)) {
|
||||
Map<TopicPartition, Long> checkpointOffsets = CheckpointUtils.strToOffsets(lastCheckpointStr.get());
|
||||
boolean checkpointOffsetReseter = checkpointOffsets.entrySet().stream()
|
||||
.anyMatch(offset -> offset.getValue() < earliestOffsets.get(offset.getKey()));
|
||||
return checkpointOffsetReseter ? earliestOffsets : checkpointOffsets;
|
||||
}
|
||||
|
||||
switch (autoResetValue) {
|
||||
case EARLIEST:
|
||||
return earliestOffsets;
|
||||
case LATEST:
|
||||
return consumer.endOffsets(topicPartitions);
|
||||
default:
|
||||
throw new HoodieNotSupportedException("Auto reset value must be one of 'earliest' or 'latest' ");
|
||||
}
|
||||
|
||||
Map<TopicPartition, Long> checkpointOffsets = CheckpointUtils.strToOffsets(lastCheckpointStr.get());
|
||||
boolean checkpointOffsetReseter = checkpointOffsets.entrySet().stream()
|
||||
.anyMatch(offset -> offset.getValue() < earliestOffsets.get(offset.getKey()));
|
||||
return checkpointOffsetReseter ? earliestOffsets : checkpointOffsets;
|
||||
}
|
||||
|
||||
private Long delayOffsetCalculation(Option<String> lastCheckpointStr, Set<TopicPartition> topicPartitions, KafkaConsumer consumer) {
|
||||
|
||||
@@ -53,6 +53,7 @@ import org.apache.hudi.utilities.sources.InputBatch;
|
||||
import org.apache.hudi.utilities.sources.JsonKafkaSource;
|
||||
import org.apache.hudi.utilities.sources.ParquetDFSSource;
|
||||
import org.apache.hudi.utilities.sources.TestDataSource;
|
||||
import org.apache.hudi.utilities.sources.helpers.KafkaOffsetGen.Config;
|
||||
import org.apache.hudi.utilities.testutils.UtilitiesTestBase;
|
||||
import org.apache.hudi.utilities.testutils.sources.DistributedTestDataSource;
|
||||
import org.apache.hudi.utilities.testutils.sources.config.SourceConfigs;
|
||||
@@ -259,7 +260,7 @@ public class TestHoodieDeltaStreamer extends UtilitiesTestBase {
|
||||
protected static void populateCommonKafkaProps(TypedProperties props) {
|
||||
//Kafka source properties
|
||||
props.setProperty("bootstrap.servers", testUtils.brokerAddress());
|
||||
props.setProperty("hoodie.deltastreamer.source.kafka.auto.reset.offsets", "earliest");
|
||||
props.setProperty(Config.KAFKA_AUTO_RESET_OFFSETS, "earliest");
|
||||
props.setProperty("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
|
||||
props.setProperty("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
|
||||
props.setProperty("hoodie.deltastreamer.kafka.source.maxEvents", String.valueOf(5000));
|
||||
@@ -1344,7 +1345,7 @@ public class TestHoodieDeltaStreamer extends UtilitiesTestBase {
|
||||
props.setProperty("hoodie.deltastreamer.source.kafka.topic",topicName);
|
||||
props.setProperty("hoodie.deltastreamer.schemaprovider.source.schema.file", dfsBasePath + "/source_uber.avsc");
|
||||
props.setProperty("hoodie.deltastreamer.schemaprovider.target.schema.file", dfsBasePath + "/target_uber.avsc");
|
||||
props.setProperty("hoodie.deltastreamer.source.kafka.auto.reset.offsets", autoResetValue);
|
||||
props.setProperty(Config.KAFKA_AUTO_RESET_OFFSETS, autoResetValue);
|
||||
|
||||
UtilitiesTestBase.Helpers.savePropsToDFS(props, dfs, dfsBasePath + "/" + propsFileName);
|
||||
}
|
||||
|
||||
@@ -88,7 +88,7 @@ public class TestKafkaSource extends UtilitiesTestBase {
|
||||
TypedProperties props = new TypedProperties();
|
||||
props.setProperty("hoodie.deltastreamer.source.kafka.topic", TEST_TOPIC_NAME);
|
||||
props.setProperty("bootstrap.servers", testUtils.brokerAddress());
|
||||
props.setProperty("hoodie.deltastreamer.source.kafka.auto.reset.offsets", resetStrategy);
|
||||
props.setProperty(Config.KAFKA_AUTO_RESET_OFFSETS, resetStrategy);
|
||||
props.setProperty("hoodie.deltastreamer.kafka.source.maxEvents",
|
||||
maxEventsToReadFromKafkaSource != null ? String.valueOf(maxEventsToReadFromKafkaSource) :
|
||||
String.valueOf(Config.maxEventsFromKafkaSource));
|
||||
|
||||
Reference in New Issue
Block a user