[HUDI-4006] failOnDataLoss on delta-streamer kafka sources (#5718)
add new config key hoodie.deltastreamer.source.kafka.enable.failOnDataLoss when failOnDataLoss=false (current behaviour, the default), log a warning instead of seeking to earliest silently when failOnDataLoss is set, fail explicitly
This commit is contained in:
@@ -184,6 +184,11 @@ public class KafkaOffsetGen {
|
|||||||
.defaultValue(false)
|
.defaultValue(false)
|
||||||
.withDocumentation("Automatically submits offset to kafka.");
|
.withDocumentation("Automatically submits offset to kafka.");
|
||||||
|
|
||||||
|
public static final ConfigProperty<Boolean> ENABLE_FAIL_ON_DATA_LOSS = ConfigProperty
|
||||||
|
.key("hoodie.deltastreamer.source.kafka.enable.failOnDataLoss")
|
||||||
|
.defaultValue(false)
|
||||||
|
.withDocumentation("Fail when checkpoint goes out of bounds instead of seeking to earliest offsets.");
|
||||||
|
|
||||||
public static final ConfigProperty<Long> MAX_EVENTS_FROM_KAFKA_SOURCE_PROP = ConfigProperty
|
public static final ConfigProperty<Long> MAX_EVENTS_FROM_KAFKA_SOURCE_PROP = ConfigProperty
|
||||||
.key("hoodie.deltastreamer.kafka.source.maxEvents")
|
.key("hoodie.deltastreamer.kafka.source.maxEvents")
|
||||||
.defaultValue(5000000L)
|
.defaultValue(5000000L)
|
||||||
@@ -329,9 +334,19 @@ public class KafkaOffsetGen {
|
|||||||
Option<String> lastCheckpointStr, Set<TopicPartition> topicPartitions) {
|
Option<String> lastCheckpointStr, Set<TopicPartition> topicPartitions) {
|
||||||
Map<TopicPartition, Long> earliestOffsets = consumer.beginningOffsets(topicPartitions);
|
Map<TopicPartition, Long> earliestOffsets = consumer.beginningOffsets(topicPartitions);
|
||||||
Map<TopicPartition, Long> checkpointOffsets = CheckpointUtils.strToOffsets(lastCheckpointStr.get());
|
Map<TopicPartition, Long> checkpointOffsets = CheckpointUtils.strToOffsets(lastCheckpointStr.get());
|
||||||
boolean checkpointOffsetReseter = checkpointOffsets.entrySet().stream()
|
boolean isCheckpointOutOfBounds = checkpointOffsets.entrySet().stream()
|
||||||
.anyMatch(offset -> offset.getValue() < earliestOffsets.get(offset.getKey()));
|
.anyMatch(offset -> offset.getValue() < earliestOffsets.get(offset.getKey()));
|
||||||
return checkpointOffsetReseter ? earliestOffsets : checkpointOffsets;
|
if (isCheckpointOutOfBounds) {
|
||||||
|
if (this.props.getBoolean(Config.ENABLE_FAIL_ON_DATA_LOSS.key(), Config.ENABLE_FAIL_ON_DATA_LOSS.defaultValue())) {
|
||||||
|
throw new HoodieDeltaStreamerException("Some data may have been lost because they are not available in Kafka any more;"
|
||||||
|
+ " either the data was aged out by Kafka or the topic may have been deleted before all the data in the topic was processed.");
|
||||||
|
} else {
|
||||||
|
LOG.warn("Some data may have been lost because they are not available in Kafka any more;"
|
||||||
|
+ " either the data was aged out by Kafka or the topic may have been deleted before all the data in the topic was processed."
|
||||||
|
+ " If you want delta streamer to fail on such cases, set \"" + Config.ENABLE_FAIL_ON_DATA_LOSS.key() + "\" to \"true\".");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return isCheckpointOutOfBounds ? earliestOffsets : checkpointOffsets;
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|||||||
@@ -26,6 +26,7 @@ import org.apache.hudi.exception.HoodieNotSupportedException;
|
|||||||
import org.apache.hudi.testutils.SparkClientFunctionalTestHarness;
|
import org.apache.hudi.testutils.SparkClientFunctionalTestHarness;
|
||||||
import org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamerMetrics;
|
import org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamerMetrics;
|
||||||
import org.apache.hudi.utilities.deltastreamer.SourceFormatAdapter;
|
import org.apache.hudi.utilities.deltastreamer.SourceFormatAdapter;
|
||||||
|
import org.apache.hudi.utilities.exception.HoodieDeltaStreamerException;
|
||||||
import org.apache.hudi.utilities.schema.FilebasedSchemaProvider;
|
import org.apache.hudi.utilities.schema.FilebasedSchemaProvider;
|
||||||
import org.apache.hudi.utilities.sources.helpers.KafkaOffsetGen.Config;
|
import org.apache.hudi.utilities.sources.helpers.KafkaOffsetGen.Config;
|
||||||
|
|
||||||
@@ -48,6 +49,7 @@ import java.util.ArrayList;
|
|||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.Objects;
|
import java.util.Objects;
|
||||||
|
import java.util.Properties;
|
||||||
import java.util.UUID;
|
import java.util.UUID;
|
||||||
|
|
||||||
import static org.apache.hudi.utilities.sources.helpers.KafkaOffsetGen.Config.ENABLE_KAFKA_COMMIT_OFFSET;
|
import static org.apache.hudi.utilities.sources.helpers.KafkaOffsetGen.Config.ENABLE_KAFKA_COMMIT_OFFSET;
|
||||||
@@ -367,4 +369,41 @@ public class TestJsonKafkaSource extends SparkClientFunctionalTestHarness {
|
|||||||
props.remove(ConsumerConfig.GROUP_ID_CONFIG);
|
props.remove(ConsumerConfig.GROUP_ID_CONFIG);
|
||||||
assertThrows(HoodieNotSupportedException.class,() -> kafkaSource.getSource().onCommit(""));
|
assertThrows(HoodieNotSupportedException.class,() -> kafkaSource.getSource().onCommit(""));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testFailOnDataLoss() throws Exception {
|
||||||
|
// create a topic with very short retention
|
||||||
|
final String topic = TEST_TOPIC_PREFIX + "testFailOnDataLoss";
|
||||||
|
Properties topicConfig = new Properties();
|
||||||
|
topicConfig.setProperty("retention.ms", "10000");
|
||||||
|
testUtils.createTopic(topic, 1, topicConfig);
|
||||||
|
|
||||||
|
HoodieTestDataGenerator dataGenerator = new HoodieTestDataGenerator();
|
||||||
|
TypedProperties failOnDataLossProps = createPropsForJsonSource(topic, null, "earliest");
|
||||||
|
failOnDataLossProps.setProperty(Config.ENABLE_FAIL_ON_DATA_LOSS.key(), Boolean.toString(true));
|
||||||
|
|
||||||
|
Source jsonSource = new JsonKafkaSource(failOnDataLossProps, jsc(), spark(), schemaProvider, metrics);
|
||||||
|
SourceFormatAdapter kafkaSource = new SourceFormatAdapter(jsonSource);
|
||||||
|
testUtils.sendMessages(topic, jsonifyRecords(dataGenerator.generateInserts("000", 10)));
|
||||||
|
// send 10 records, extract 2 records to generate a checkpoint
|
||||||
|
InputBatch<JavaRDD<GenericRecord>> fetch1 = kafkaSource.fetchNewDataInAvroFormat(Option.empty(), 2);
|
||||||
|
assertEquals(2, fetch1.getBatch().get().count());
|
||||||
|
|
||||||
|
// wait for the checkpoint to expire
|
||||||
|
Thread.sleep(10001);
|
||||||
|
Throwable t = assertThrows(HoodieDeltaStreamerException.class, () -> {
|
||||||
|
kafkaSource.fetchNewDataInAvroFormat(Option.of(fetch1.getCheckpointForNextBatch()), Long.MAX_VALUE);
|
||||||
|
});
|
||||||
|
assertEquals(
|
||||||
|
"Some data may have been lost because they are not available in Kafka any more;"
|
||||||
|
+ " either the data was aged out by Kafka or the topic may have been deleted before all the data in the topic was processed.",
|
||||||
|
t.getMessage());
|
||||||
|
t = assertThrows(HoodieDeltaStreamerException.class, () -> {
|
||||||
|
kafkaSource.fetchNewDataInRowFormat(Option.of(fetch1.getCheckpointForNextBatch()), Long.MAX_VALUE);
|
||||||
|
});
|
||||||
|
assertEquals(
|
||||||
|
"Some data may have been lost because they are not available in Kafka any more;"
|
||||||
|
+ " either the data was aged out by Kafka or the topic may have been deleted before all the data in the topic was processed.",
|
||||||
|
t.getMessage());
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user