From 4774c4248f3ea2ce863c6f76679639ae1cdddf9c Mon Sep 17 00:00:00 2001 From: Qi Ji Date: Mon, 13 Jun 2022 22:31:57 +0800 Subject: [PATCH] [HUDI-4006] failOnDataLoss on delta-streamer kafka sources (#5718) add new config key hoodie.deltastreamer.source.kafka.enable.failOnDataLoss when failOnDataLoss=false (current behaviour, the default), log a warning instead of seeking to earliest silently when failOnDataLoss is set, fail explicitly --- .../sources/helpers/KafkaOffsetGen.java | 19 ++++++++- .../sources/TestJsonKafkaSource.java | 39 +++++++++++++++++++ 2 files changed, 56 insertions(+), 2 deletions(-) diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/KafkaOffsetGen.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/KafkaOffsetGen.java index 1abd2616b..cc577621f 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/KafkaOffsetGen.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/KafkaOffsetGen.java @@ -184,6 +184,11 @@ public class KafkaOffsetGen { .defaultValue(false) .withDocumentation("Automatically submits offset to kafka."); + public static final ConfigProperty ENABLE_FAIL_ON_DATA_LOSS = ConfigProperty + .key("hoodie.deltastreamer.source.kafka.enable.failOnDataLoss") + .defaultValue(false) + .withDocumentation("Fail when checkpoint goes out of bounds instead of seeking to earliest offsets."); + public static final ConfigProperty MAX_EVENTS_FROM_KAFKA_SOURCE_PROP = ConfigProperty .key("hoodie.deltastreamer.kafka.source.maxEvents") .defaultValue(5000000L) @@ -329,9 +334,19 @@ public class KafkaOffsetGen { Option lastCheckpointStr, Set topicPartitions) { Map earliestOffsets = consumer.beginningOffsets(topicPartitions); Map checkpointOffsets = CheckpointUtils.strToOffsets(lastCheckpointStr.get()); - boolean checkpointOffsetReseter = checkpointOffsets.entrySet().stream() + boolean isCheckpointOutOfBounds = checkpointOffsets.entrySet().stream() .anyMatch(offset -> offset.getValue() < earliestOffsets.get(offset.getKey())); - return checkpointOffsetReseter ? earliestOffsets : checkpointOffsets; + if (isCheckpointOutOfBounds) { + if (this.props.getBoolean(Config.ENABLE_FAIL_ON_DATA_LOSS.key(), Config.ENABLE_FAIL_ON_DATA_LOSS.defaultValue())) { + throw new HoodieDeltaStreamerException("Some data may have been lost because they are not available in Kafka any more;" + + " either the data was aged out by Kafka or the topic may have been deleted before all the data in the topic was processed."); + } else { + LOG.warn("Some data may have been lost because they are not available in Kafka any more;" + + " either the data was aged out by Kafka or the topic may have been deleted before all the data in the topic was processed." + + " If you want delta streamer to fail on such cases, set \"" + Config.ENABLE_FAIL_ON_DATA_LOSS.key() + "\" to \"true\"."); + } + } + return isCheckpointOutOfBounds ? earliestOffsets : checkpointOffsets; } /** diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestJsonKafkaSource.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestJsonKafkaSource.java index 87f1774e0..05d79e044 100644 --- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestJsonKafkaSource.java +++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestJsonKafkaSource.java @@ -26,6 +26,7 @@ import org.apache.hudi.exception.HoodieNotSupportedException; import org.apache.hudi.testutils.SparkClientFunctionalTestHarness; import org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamerMetrics; import org.apache.hudi.utilities.deltastreamer.SourceFormatAdapter; +import org.apache.hudi.utilities.exception.HoodieDeltaStreamerException; import org.apache.hudi.utilities.schema.FilebasedSchemaProvider; import org.apache.hudi.utilities.sources.helpers.KafkaOffsetGen.Config; @@ -48,6 +49,7 @@ import java.util.ArrayList; import java.util.List; import java.util.Map; import java.util.Objects; +import java.util.Properties; import java.util.UUID; import static org.apache.hudi.utilities.sources.helpers.KafkaOffsetGen.Config.ENABLE_KAFKA_COMMIT_OFFSET; @@ -367,4 +369,41 @@ public class TestJsonKafkaSource extends SparkClientFunctionalTestHarness { props.remove(ConsumerConfig.GROUP_ID_CONFIG); assertThrows(HoodieNotSupportedException.class,() -> kafkaSource.getSource().onCommit("")); } + + @Test + public void testFailOnDataLoss() throws Exception { + // create a topic with very short retention + final String topic = TEST_TOPIC_PREFIX + "testFailOnDataLoss"; + Properties topicConfig = new Properties(); + topicConfig.setProperty("retention.ms", "10000"); + testUtils.createTopic(topic, 1, topicConfig); + + HoodieTestDataGenerator dataGenerator = new HoodieTestDataGenerator(); + TypedProperties failOnDataLossProps = createPropsForJsonSource(topic, null, "earliest"); + failOnDataLossProps.setProperty(Config.ENABLE_FAIL_ON_DATA_LOSS.key(), Boolean.toString(true)); + + Source jsonSource = new JsonKafkaSource(failOnDataLossProps, jsc(), spark(), schemaProvider, metrics); + SourceFormatAdapter kafkaSource = new SourceFormatAdapter(jsonSource); + testUtils.sendMessages(topic, jsonifyRecords(dataGenerator.generateInserts("000", 10))); + // send 10 records, extract 2 records to generate a checkpoint + InputBatch> fetch1 = kafkaSource.fetchNewDataInAvroFormat(Option.empty(), 2); + assertEquals(2, fetch1.getBatch().get().count()); + + // wait for the checkpoint to expire + Thread.sleep(10001); + Throwable t = assertThrows(HoodieDeltaStreamerException.class, () -> { + kafkaSource.fetchNewDataInAvroFormat(Option.of(fetch1.getCheckpointForNextBatch()), Long.MAX_VALUE); + }); + assertEquals( + "Some data may have been lost because they are not available in Kafka any more;" + + " either the data was aged out by Kafka or the topic may have been deleted before all the data in the topic was processed.", + t.getMessage()); + t = assertThrows(HoodieDeltaStreamerException.class, () -> { + kafkaSource.fetchNewDataInRowFormat(Option.of(fetch1.getCheckpointForNextBatch()), Long.MAX_VALUE); + }); + assertEquals( + "Some data may have been lost because they are not available in Kafka any more;" + + " either the data was aged out by Kafka or the topic may have been deleted before all the data in the topic was processed.", + t.getMessage()); + } }