From ede6c9bda4dba8a9981252bd05f0725ff1024b95 Mon Sep 17 00:00:00 2001 From: Litianye Date: Sun, 14 Jun 2020 18:01:44 +0800 Subject: [PATCH] [HUDI-1006] Deltastreamer use kafkaSource with offset reset strategy:latest can't consume data (#1719) --- .../utilities/deltastreamer/DeltaSync.java | 5 ++- .../utilities/sources/AvroKafkaSource.java | 5 +-- .../utilities/sources/JsonKafkaSource.java | 6 +-- .../sources/helpers/KafkaOffsetGen.java | 3 -- .../utilities/sources/TestKafkaSource.java | 43 ++++++++++++++++--- 5 files changed, 47 insertions(+), 15 deletions(-) diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java index e36495dac..1a3e43c49 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java @@ -258,7 +258,10 @@ public class DeltaSync implements Serializable { if (cfg.checkpoint != null && !cfg.checkpoint.equals(commitMetadata.getMetadata(CHECKPOINT_RESET_KEY))) { resumeCheckpointStr = Option.of(cfg.checkpoint); } else if (commitMetadata.getMetadata(CHECKPOINT_KEY) != null) { - resumeCheckpointStr = Option.of(commitMetadata.getMetadata(CHECKPOINT_KEY)); + //if previous checkpoint is an empty string, skip resume use Option.empty() + if (!commitMetadata.getMetadata(CHECKPOINT_KEY).isEmpty()) { + resumeCheckpointStr = Option.of(commitMetadata.getMetadata(CHECKPOINT_KEY)); + } } else { throw new HoodieDeltaStreamerException( "Unable to find previous checkpoint. Please double check if this table " diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/AvroKafkaSource.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/AvroKafkaSource.java index 202dbc471..66a38e2a4 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/AvroKafkaSource.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/AvroKafkaSource.java @@ -57,10 +57,9 @@ public class AvroKafkaSource extends AvroSource { protected InputBatch> fetchNewData(Option lastCheckpointStr, long sourceLimit) { OffsetRange[] offsetRanges = offsetGen.getNextOffsetRanges(lastCheckpointStr, sourceLimit); long totalNewMsgs = CheckpointUtils.totalNewMessages(offsetRanges); + LOG.info("About to read " + totalNewMsgs + " from Kafka for topic :" + offsetGen.getTopicName()); if (totalNewMsgs <= 0) { - return new InputBatch<>(Option.empty(), lastCheckpointStr.isPresent() ? lastCheckpointStr.get() : ""); - } else { - LOG.info("About to read " + totalNewMsgs + " from Kafka for topic :" + offsetGen.getTopicName()); + return new InputBatch<>(Option.empty(), CheckpointUtils.offsetsToStr(offsetRanges)); } JavaRDD newDataRDD = toRDD(offsetRanges); return new InputBatch<>(Option.of(newDataRDD), KafkaOffsetGen.CheckpointUtils.offsetsToStr(offsetRanges)); diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/JsonKafkaSource.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/JsonKafkaSource.java index 4c9785b61..833c6c630 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/JsonKafkaSource.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/JsonKafkaSource.java @@ -55,10 +55,10 @@ public class JsonKafkaSource extends JsonSource { protected InputBatch> fetchNewData(Option lastCheckpointStr, long sourceLimit) { OffsetRange[] offsetRanges = offsetGen.getNextOffsetRanges(lastCheckpointStr, sourceLimit); long totalNewMsgs = CheckpointUtils.totalNewMessages(offsetRanges); - if (totalNewMsgs <= 0) { - return new InputBatch<>(Option.empty(), lastCheckpointStr.isPresent() ? lastCheckpointStr.get() : ""); - } LOG.info("About to read " + totalNewMsgs + " from Kafka for topic :" + offsetGen.getTopicName()); + if (totalNewMsgs <= 0) { + return new InputBatch<>(Option.empty(), CheckpointUtils.offsetsToStr(offsetRanges)); + } JavaRDD newDataRDD = toRDD(offsetRanges); return new InputBatch<>(Option.of(newDataRDD), CheckpointUtils.offsetsToStr(offsetRanges)); } diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/KafkaOffsetGen.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/KafkaOffsetGen.java index 933127487..cf8e3d8f0 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/KafkaOffsetGen.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/KafkaOffsetGen.java @@ -55,9 +55,6 @@ public class KafkaOffsetGen { */ public static HashMap strToOffsets(String checkpointStr) { HashMap offsetMap = new HashMap<>(); - if (checkpointStr.length() == 0) { - return offsetMap; - } String[] splits = checkpointStr.split(","); String topic = splits[0]; for (int i = 1; i < splits.length; i++) { diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestKafkaSource.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestKafkaSource.java index 33c0a7f69..aeedca366 100644 --- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestKafkaSource.java +++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestKafkaSource.java @@ -81,11 +81,11 @@ public class TestKafkaSource extends UtilitiesTestBase { testUtils.teardown(); } - private TypedProperties createPropsForJsonSource(Long maxEventsToReadFromKafkaSource) { + private TypedProperties createPropsForJsonSource(Long maxEventsToReadFromKafkaSource, String resetStrategy) { TypedProperties props = new TypedProperties(); props.setProperty("hoodie.deltastreamer.source.kafka.topic", TEST_TOPIC_NAME); props.setProperty("bootstrap.servers", testUtils.brokerAddress()); - props.setProperty("auto.offset.reset", "earliest"); + props.setProperty("auto.offset.reset", resetStrategy); props.setProperty("hoodie.deltastreamer.kafka.source.maxEvents", maxEventsToReadFromKafkaSource != null ? String.valueOf(maxEventsToReadFromKafkaSource) : String.valueOf(Config.maxEventsFromKafkaSource)); @@ -99,7 +99,7 @@ public class TestKafkaSource extends UtilitiesTestBase { // topic setup. testUtils.createTopic(TEST_TOPIC_NAME, 2); HoodieTestDataGenerator dataGenerator = new HoodieTestDataGenerator(); - TypedProperties props = createPropsForJsonSource(null); + TypedProperties props = createPropsForJsonSource(null, "earliest"); Source jsonSource = new JsonKafkaSource(props, jsc, sparkSession, schemaProvider); SourceFormatAdapter kafkaSource = new SourceFormatAdapter(jsonSource); @@ -141,12 +141,45 @@ public class TestKafkaSource extends UtilitiesTestBase { assertEquals(Option.empty(), fetch4AsRows.getBatch()); } + // test case with kafka offset reset strategy + @Test + public void testJsonKafkaSourceResetStrategy() { + // topic setup. + testUtils.createTopic(TEST_TOPIC_NAME, 2); + HoodieTestDataGenerator dataGenerator = new HoodieTestDataGenerator(); + + TypedProperties earliestProps = createPropsForJsonSource(null, "earliest"); + Source earliestJsonSource = new JsonKafkaSource(earliestProps, jsc, sparkSession, schemaProvider); + SourceFormatAdapter earliestKafkaSource = new SourceFormatAdapter(earliestJsonSource); + + TypedProperties latestProps = createPropsForJsonSource(null, "latest"); + Source latestJsonSource = new JsonKafkaSource(latestProps, jsc, sparkSession, schemaProvider); + SourceFormatAdapter latestKafkaSource = new SourceFormatAdapter(latestJsonSource); + + // 1. Extract with a none data kafka checkpoint + // => get a checkpoint string like "hoodie_test,0:0,1:0", latest checkpoint should be equals to earliest checkpoint + InputBatch> earFetch0 = earliestKafkaSource.fetchNewDataInAvroFormat(Option.empty(), Long.MAX_VALUE); + InputBatch> latFetch0 = latestKafkaSource.fetchNewDataInAvroFormat(Option.empty(), Long.MAX_VALUE); + assertEquals(earFetch0.getBatch(), latFetch0.getBatch()); + assertEquals(earFetch0.getCheckpointForNextBatch(), latFetch0.getCheckpointForNextBatch()); + + testUtils.sendMessages(TEST_TOPIC_NAME, Helpers.jsonifyRecords(dataGenerator.generateInserts("000", 1000))); + + // 2. Extract new checkpoint with a null / empty string pre checkpoint + // => earliest fetch with max source limit will get all of data and a end offset checkpoint + InputBatch> earFetch1 = earliestKafkaSource.fetchNewDataInAvroFormat(Option.empty(), Long.MAX_VALUE); + + // => [a null pre checkpoint] latest reset fetch will get a end offset checkpoint same to earliest + InputBatch> latFetch1 = latestKafkaSource.fetchNewDataInAvroFormat(Option.empty(), Long.MAX_VALUE); + assertEquals(earFetch1.getCheckpointForNextBatch(), latFetch1.getCheckpointForNextBatch()); + } + @Test public void testJsonKafkaSourceWithDefaultUpperCap() { // topic setup. testUtils.createTopic(TEST_TOPIC_NAME, 2); HoodieTestDataGenerator dataGenerator = new HoodieTestDataGenerator(); - TypedProperties props = createPropsForJsonSource(Long.MAX_VALUE); + TypedProperties props = createPropsForJsonSource(Long.MAX_VALUE, "earliest"); Source jsonSource = new JsonKafkaSource(props, jsc, sparkSession, schemaProvider); SourceFormatAdapter kafkaSource = new SourceFormatAdapter(jsonSource); @@ -175,7 +208,7 @@ public class TestKafkaSource extends UtilitiesTestBase { // topic setup. testUtils.createTopic(TEST_TOPIC_NAME, 2); HoodieTestDataGenerator dataGenerator = new HoodieTestDataGenerator(); - TypedProperties props = createPropsForJsonSource(500L); + TypedProperties props = createPropsForJsonSource(500L, "earliest"); Source jsonSource = new JsonKafkaSource(props, jsc, sparkSession, schemaProvider); SourceFormatAdapter kafkaSource = new SourceFormatAdapter(jsonSource);