[HUDI-1006] Deltastreamer use kafkaSource with offset reset strategy:latest can't consume data (#1719)
This commit is contained in:
@@ -258,7 +258,10 @@ public class DeltaSync implements Serializable {
|
|||||||
if (cfg.checkpoint != null && !cfg.checkpoint.equals(commitMetadata.getMetadata(CHECKPOINT_RESET_KEY))) {
|
if (cfg.checkpoint != null && !cfg.checkpoint.equals(commitMetadata.getMetadata(CHECKPOINT_RESET_KEY))) {
|
||||||
resumeCheckpointStr = Option.of(cfg.checkpoint);
|
resumeCheckpointStr = Option.of(cfg.checkpoint);
|
||||||
} else if (commitMetadata.getMetadata(CHECKPOINT_KEY) != null) {
|
} else if (commitMetadata.getMetadata(CHECKPOINT_KEY) != null) {
|
||||||
|
//if previous checkpoint is an empty string, skip resume use Option.empty()
|
||||||
|
if (!commitMetadata.getMetadata(CHECKPOINT_KEY).isEmpty()) {
|
||||||
resumeCheckpointStr = Option.of(commitMetadata.getMetadata(CHECKPOINT_KEY));
|
resumeCheckpointStr = Option.of(commitMetadata.getMetadata(CHECKPOINT_KEY));
|
||||||
|
}
|
||||||
} else {
|
} else {
|
||||||
throw new HoodieDeltaStreamerException(
|
throw new HoodieDeltaStreamerException(
|
||||||
"Unable to find previous checkpoint. Please double check if this table "
|
"Unable to find previous checkpoint. Please double check if this table "
|
||||||
|
|||||||
@@ -57,10 +57,9 @@ public class AvroKafkaSource extends AvroSource {
|
|||||||
protected InputBatch<JavaRDD<GenericRecord>> fetchNewData(Option<String> lastCheckpointStr, long sourceLimit) {
|
protected InputBatch<JavaRDD<GenericRecord>> fetchNewData(Option<String> lastCheckpointStr, long sourceLimit) {
|
||||||
OffsetRange[] offsetRanges = offsetGen.getNextOffsetRanges(lastCheckpointStr, sourceLimit);
|
OffsetRange[] offsetRanges = offsetGen.getNextOffsetRanges(lastCheckpointStr, sourceLimit);
|
||||||
long totalNewMsgs = CheckpointUtils.totalNewMessages(offsetRanges);
|
long totalNewMsgs = CheckpointUtils.totalNewMessages(offsetRanges);
|
||||||
if (totalNewMsgs <= 0) {
|
|
||||||
return new InputBatch<>(Option.empty(), lastCheckpointStr.isPresent() ? lastCheckpointStr.get() : "");
|
|
||||||
} else {
|
|
||||||
LOG.info("About to read " + totalNewMsgs + " from Kafka for topic :" + offsetGen.getTopicName());
|
LOG.info("About to read " + totalNewMsgs + " from Kafka for topic :" + offsetGen.getTopicName());
|
||||||
|
if (totalNewMsgs <= 0) {
|
||||||
|
return new InputBatch<>(Option.empty(), CheckpointUtils.offsetsToStr(offsetRanges));
|
||||||
}
|
}
|
||||||
JavaRDD<GenericRecord> newDataRDD = toRDD(offsetRanges);
|
JavaRDD<GenericRecord> newDataRDD = toRDD(offsetRanges);
|
||||||
return new InputBatch<>(Option.of(newDataRDD), KafkaOffsetGen.CheckpointUtils.offsetsToStr(offsetRanges));
|
return new InputBatch<>(Option.of(newDataRDD), KafkaOffsetGen.CheckpointUtils.offsetsToStr(offsetRanges));
|
||||||
|
|||||||
@@ -55,10 +55,10 @@ public class JsonKafkaSource extends JsonSource {
|
|||||||
protected InputBatch<JavaRDD<String>> fetchNewData(Option<String> lastCheckpointStr, long sourceLimit) {
|
protected InputBatch<JavaRDD<String>> fetchNewData(Option<String> lastCheckpointStr, long sourceLimit) {
|
||||||
OffsetRange[] offsetRanges = offsetGen.getNextOffsetRanges(lastCheckpointStr, sourceLimit);
|
OffsetRange[] offsetRanges = offsetGen.getNextOffsetRanges(lastCheckpointStr, sourceLimit);
|
||||||
long totalNewMsgs = CheckpointUtils.totalNewMessages(offsetRanges);
|
long totalNewMsgs = CheckpointUtils.totalNewMessages(offsetRanges);
|
||||||
if (totalNewMsgs <= 0) {
|
|
||||||
return new InputBatch<>(Option.empty(), lastCheckpointStr.isPresent() ? lastCheckpointStr.get() : "");
|
|
||||||
}
|
|
||||||
LOG.info("About to read " + totalNewMsgs + " from Kafka for topic :" + offsetGen.getTopicName());
|
LOG.info("About to read " + totalNewMsgs + " from Kafka for topic :" + offsetGen.getTopicName());
|
||||||
|
if (totalNewMsgs <= 0) {
|
||||||
|
return new InputBatch<>(Option.empty(), CheckpointUtils.offsetsToStr(offsetRanges));
|
||||||
|
}
|
||||||
JavaRDD<String> newDataRDD = toRDD(offsetRanges);
|
JavaRDD<String> newDataRDD = toRDD(offsetRanges);
|
||||||
return new InputBatch<>(Option.of(newDataRDD), CheckpointUtils.offsetsToStr(offsetRanges));
|
return new InputBatch<>(Option.of(newDataRDD), CheckpointUtils.offsetsToStr(offsetRanges));
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -55,9 +55,6 @@ public class KafkaOffsetGen {
|
|||||||
*/
|
*/
|
||||||
public static HashMap<TopicPartition, Long> strToOffsets(String checkpointStr) {
|
public static HashMap<TopicPartition, Long> strToOffsets(String checkpointStr) {
|
||||||
HashMap<TopicPartition, Long> offsetMap = new HashMap<>();
|
HashMap<TopicPartition, Long> offsetMap = new HashMap<>();
|
||||||
if (checkpointStr.length() == 0) {
|
|
||||||
return offsetMap;
|
|
||||||
}
|
|
||||||
String[] splits = checkpointStr.split(",");
|
String[] splits = checkpointStr.split(",");
|
||||||
String topic = splits[0];
|
String topic = splits[0];
|
||||||
for (int i = 1; i < splits.length; i++) {
|
for (int i = 1; i < splits.length; i++) {
|
||||||
|
|||||||
@@ -81,11 +81,11 @@ public class TestKafkaSource extends UtilitiesTestBase {
|
|||||||
testUtils.teardown();
|
testUtils.teardown();
|
||||||
}
|
}
|
||||||
|
|
||||||
private TypedProperties createPropsForJsonSource(Long maxEventsToReadFromKafkaSource) {
|
private TypedProperties createPropsForJsonSource(Long maxEventsToReadFromKafkaSource, String resetStrategy) {
|
||||||
TypedProperties props = new TypedProperties();
|
TypedProperties props = new TypedProperties();
|
||||||
props.setProperty("hoodie.deltastreamer.source.kafka.topic", TEST_TOPIC_NAME);
|
props.setProperty("hoodie.deltastreamer.source.kafka.topic", TEST_TOPIC_NAME);
|
||||||
props.setProperty("bootstrap.servers", testUtils.brokerAddress());
|
props.setProperty("bootstrap.servers", testUtils.brokerAddress());
|
||||||
props.setProperty("auto.offset.reset", "earliest");
|
props.setProperty("auto.offset.reset", resetStrategy);
|
||||||
props.setProperty("hoodie.deltastreamer.kafka.source.maxEvents",
|
props.setProperty("hoodie.deltastreamer.kafka.source.maxEvents",
|
||||||
maxEventsToReadFromKafkaSource != null ? String.valueOf(maxEventsToReadFromKafkaSource) :
|
maxEventsToReadFromKafkaSource != null ? String.valueOf(maxEventsToReadFromKafkaSource) :
|
||||||
String.valueOf(Config.maxEventsFromKafkaSource));
|
String.valueOf(Config.maxEventsFromKafkaSource));
|
||||||
@@ -99,7 +99,7 @@ public class TestKafkaSource extends UtilitiesTestBase {
|
|||||||
// topic setup.
|
// topic setup.
|
||||||
testUtils.createTopic(TEST_TOPIC_NAME, 2);
|
testUtils.createTopic(TEST_TOPIC_NAME, 2);
|
||||||
HoodieTestDataGenerator dataGenerator = new HoodieTestDataGenerator();
|
HoodieTestDataGenerator dataGenerator = new HoodieTestDataGenerator();
|
||||||
TypedProperties props = createPropsForJsonSource(null);
|
TypedProperties props = createPropsForJsonSource(null, "earliest");
|
||||||
|
|
||||||
Source jsonSource = new JsonKafkaSource(props, jsc, sparkSession, schemaProvider);
|
Source jsonSource = new JsonKafkaSource(props, jsc, sparkSession, schemaProvider);
|
||||||
SourceFormatAdapter kafkaSource = new SourceFormatAdapter(jsonSource);
|
SourceFormatAdapter kafkaSource = new SourceFormatAdapter(jsonSource);
|
||||||
@@ -141,12 +141,45 @@ public class TestKafkaSource extends UtilitiesTestBase {
|
|||||||
assertEquals(Option.empty(), fetch4AsRows.getBatch());
|
assertEquals(Option.empty(), fetch4AsRows.getBatch());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// test case with kafka offset reset strategy
|
||||||
|
@Test
|
||||||
|
public void testJsonKafkaSourceResetStrategy() {
|
||||||
|
// topic setup.
|
||||||
|
testUtils.createTopic(TEST_TOPIC_NAME, 2);
|
||||||
|
HoodieTestDataGenerator dataGenerator = new HoodieTestDataGenerator();
|
||||||
|
|
||||||
|
TypedProperties earliestProps = createPropsForJsonSource(null, "earliest");
|
||||||
|
Source earliestJsonSource = new JsonKafkaSource(earliestProps, jsc, sparkSession, schemaProvider);
|
||||||
|
SourceFormatAdapter earliestKafkaSource = new SourceFormatAdapter(earliestJsonSource);
|
||||||
|
|
||||||
|
TypedProperties latestProps = createPropsForJsonSource(null, "latest");
|
||||||
|
Source latestJsonSource = new JsonKafkaSource(latestProps, jsc, sparkSession, schemaProvider);
|
||||||
|
SourceFormatAdapter latestKafkaSource = new SourceFormatAdapter(latestJsonSource);
|
||||||
|
|
||||||
|
// 1. Extract with a none data kafka checkpoint
|
||||||
|
// => get a checkpoint string like "hoodie_test,0:0,1:0", latest checkpoint should be equals to earliest checkpoint
|
||||||
|
InputBatch<JavaRDD<GenericRecord>> earFetch0 = earliestKafkaSource.fetchNewDataInAvroFormat(Option.empty(), Long.MAX_VALUE);
|
||||||
|
InputBatch<JavaRDD<GenericRecord>> latFetch0 = latestKafkaSource.fetchNewDataInAvroFormat(Option.empty(), Long.MAX_VALUE);
|
||||||
|
assertEquals(earFetch0.getBatch(), latFetch0.getBatch());
|
||||||
|
assertEquals(earFetch0.getCheckpointForNextBatch(), latFetch0.getCheckpointForNextBatch());
|
||||||
|
|
||||||
|
testUtils.sendMessages(TEST_TOPIC_NAME, Helpers.jsonifyRecords(dataGenerator.generateInserts("000", 1000)));
|
||||||
|
|
||||||
|
// 2. Extract new checkpoint with a null / empty string pre checkpoint
|
||||||
|
// => earliest fetch with max source limit will get all of data and a end offset checkpoint
|
||||||
|
InputBatch<JavaRDD<GenericRecord>> earFetch1 = earliestKafkaSource.fetchNewDataInAvroFormat(Option.empty(), Long.MAX_VALUE);
|
||||||
|
|
||||||
|
// => [a null pre checkpoint] latest reset fetch will get a end offset checkpoint same to earliest
|
||||||
|
InputBatch<JavaRDD<GenericRecord>> latFetch1 = latestKafkaSource.fetchNewDataInAvroFormat(Option.empty(), Long.MAX_VALUE);
|
||||||
|
assertEquals(earFetch1.getCheckpointForNextBatch(), latFetch1.getCheckpointForNextBatch());
|
||||||
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testJsonKafkaSourceWithDefaultUpperCap() {
|
public void testJsonKafkaSourceWithDefaultUpperCap() {
|
||||||
// topic setup.
|
// topic setup.
|
||||||
testUtils.createTopic(TEST_TOPIC_NAME, 2);
|
testUtils.createTopic(TEST_TOPIC_NAME, 2);
|
||||||
HoodieTestDataGenerator dataGenerator = new HoodieTestDataGenerator();
|
HoodieTestDataGenerator dataGenerator = new HoodieTestDataGenerator();
|
||||||
TypedProperties props = createPropsForJsonSource(Long.MAX_VALUE);
|
TypedProperties props = createPropsForJsonSource(Long.MAX_VALUE, "earliest");
|
||||||
|
|
||||||
Source jsonSource = new JsonKafkaSource(props, jsc, sparkSession, schemaProvider);
|
Source jsonSource = new JsonKafkaSource(props, jsc, sparkSession, schemaProvider);
|
||||||
SourceFormatAdapter kafkaSource = new SourceFormatAdapter(jsonSource);
|
SourceFormatAdapter kafkaSource = new SourceFormatAdapter(jsonSource);
|
||||||
@@ -175,7 +208,7 @@ public class TestKafkaSource extends UtilitiesTestBase {
|
|||||||
// topic setup.
|
// topic setup.
|
||||||
testUtils.createTopic(TEST_TOPIC_NAME, 2);
|
testUtils.createTopic(TEST_TOPIC_NAME, 2);
|
||||||
HoodieTestDataGenerator dataGenerator = new HoodieTestDataGenerator();
|
HoodieTestDataGenerator dataGenerator = new HoodieTestDataGenerator();
|
||||||
TypedProperties props = createPropsForJsonSource(500L);
|
TypedProperties props = createPropsForJsonSource(500L, "earliest");
|
||||||
|
|
||||||
Source jsonSource = new JsonKafkaSource(props, jsc, sparkSession, schemaProvider);
|
Source jsonSource = new JsonKafkaSource(props, jsc, sparkSession, schemaProvider);
|
||||||
SourceFormatAdapter kafkaSource = new SourceFormatAdapter(jsonSource);
|
SourceFormatAdapter kafkaSource = new SourceFormatAdapter(jsonSource);
|
||||||
|
|||||||
Reference in New Issue
Block a user