1
0

[HUDI-1910] Commit Offset to Kafka after successful Hudi commit (#3092)

This commit is contained in:
Vinay Patil
2021-06-28 19:22:05 +05:30
committed by GitHub
parent 34fc8a8880
commit 039aeb6dce
8 changed files with 170 additions and 16 deletions

View File

@@ -0,0 +1,33 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi.utilities.callback;
/**
* A callback interface that provides the Source an option to perform action on successful Hudi commit.
*/
public interface SourceCommitCallback {
/**
* Performs some action on successful Hudi commit like committing offsets to Kafka.
*
* @param lastCkptStr last checkpoint string.
*/
default void onCommit(String lastCkptStr) {
}
}

View File

@@ -473,7 +473,7 @@ public class DeltaSync implements Serializable {
boolean success = writeClient.commit(instantTime, writeStatusRDD, Option.of(checkpointCommitMetadata));
if (success) {
LOG.info("Commit " + instantTime + " successful!");
this.formatAdapter.getSource().onCommit(checkpointStr);
// Schedule compaction if needed
if (cfg.isAsyncCompactionEnabled()) {
scheduledCompactionInstant = writeClient.scheduleCompaction(Option.empty());

View File

@@ -118,4 +118,8 @@ public final class SourceFormatAdapter {
throw new IllegalArgumentException("Unknown source type (" + source.getSourceType() + ")");
}
}
public Source getSource() {
return source;
}
}

View File

@@ -40,6 +40,9 @@ import org.apache.spark.streaming.kafka010.KafkaUtils;
import org.apache.spark.streaming.kafka010.LocationStrategies;
import org.apache.spark.streaming.kafka010.OffsetRange;
import static org.apache.hudi.utilities.sources.helpers.KafkaOffsetGen.Config.DEFAULT_ENABLE_KAFKA_COMMIT_OFFSET;
import static org.apache.hudi.utilities.sources.helpers.KafkaOffsetGen.Config.ENABLE_KAFKA_COMMIT_OFFSET;
/**
* Reads avro serialized Kafka data, based on the confluent schema-registry.
*/
@@ -95,4 +98,11 @@ public class AvroKafkaSource extends AvroSource {
return KafkaUtils.createRDD(sparkContext, offsetGen.getKafkaParams(), offsetRanges,
LocationStrategies.PreferConsistent()).map(obj -> (GenericRecord) obj.value());
}
@Override
public void onCommit(String lastCkptStr) {
if (this.props.getBoolean(ENABLE_KAFKA_COMMIT_OFFSET, DEFAULT_ENABLE_KAFKA_COMMIT_OFFSET)) {
offsetGen.commitOffsetToKafka(lastCkptStr);
}
}
}

View File

@@ -35,6 +35,9 @@ import org.apache.spark.streaming.kafka010.KafkaUtils;
import org.apache.spark.streaming.kafka010.LocationStrategies;
import org.apache.spark.streaming.kafka010.OffsetRange;
import static org.apache.hudi.utilities.sources.helpers.KafkaOffsetGen.Config.DEFAULT_ENABLE_KAFKA_COMMIT_OFFSET;
import static org.apache.hudi.utilities.sources.helpers.KafkaOffsetGen.Config.ENABLE_KAFKA_COMMIT_OFFSET;
/**
* Read json kafka data.
*/
@@ -71,4 +74,11 @@ public class JsonKafkaSource extends JsonSource {
return KafkaUtils.createRDD(sparkContext, offsetGen.getKafkaParams(), offsetRanges,
LocationStrategies.PreferConsistent()).map(x -> (String) x.value());
}
@Override
public void onCommit(String lastCkptStr) {
if (this.props.getBoolean(ENABLE_KAFKA_COMMIT_OFFSET, DEFAULT_ENABLE_KAFKA_COMMIT_OFFSET)) {
offsetGen.commitOffsetToKafka(lastCkptStr);
}
}
}

View File

@@ -23,6 +23,7 @@ import org.apache.hudi.PublicAPIClass;
import org.apache.hudi.PublicAPIMethod;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.utilities.callback.SourceCommitCallback;
import org.apache.hudi.utilities.schema.SchemaProvider;
import org.apache.spark.api.java.JavaSparkContext;
@@ -34,7 +35,7 @@ import java.io.Serializable;
* Represents a source from which we can tail data. Assumes a constructor that takes properties.
*/
@PublicAPIClass(maturity = ApiMaturityLevel.STABLE)
public abstract class Source<T> implements Serializable {
public abstract class Source<T> implements SourceCommitCallback, Serializable {
public enum SourceType {
JSON, AVRO, ROW

View File

@@ -26,9 +26,13 @@ import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.exception.HoodieNotSupportedException;
import org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamerMetrics;
import org.apache.kafka.clients.consumer.CommitFailedException;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.TimeoutException;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.apache.spark.streaming.kafka010.OffsetRange;
@@ -157,6 +161,8 @@ public class KafkaOffsetGen {
private static final String KAFKA_TOPIC_NAME = "hoodie.deltastreamer.source.kafka.topic";
private static final String MAX_EVENTS_FROM_KAFKA_SOURCE_PROP = "hoodie.deltastreamer.kafka.source.maxEvents";
public static final String ENABLE_KAFKA_COMMIT_OFFSET = "hoodie.deltastreamer.source.kafka.enable.commit.offset";
public static final Boolean DEFAULT_ENABLE_KAFKA_COMMIT_OFFSET = false;
// "auto.offset.reset" is kafka native config param. Do not change the config param name.
public static final String KAFKA_AUTO_OFFSET_RESET = "auto.offset.reset";
private static final KafkaResetOffsetStrategies DEFAULT_KAFKA_AUTO_OFFSET_RESET = KafkaResetOffsetStrategies.LATEST;
@@ -164,24 +170,14 @@ public class KafkaOffsetGen {
public static long maxEventsFromKafkaSource = DEFAULT_MAX_EVENTS_FROM_KAFKA_SOURCE;
}
private final HashMap<String, Object> kafkaParams;
private final Map<String, Object> kafkaParams;
private final TypedProperties props;
protected final String topicName;
private KafkaResetOffsetStrategies autoResetValue;
public KafkaOffsetGen(TypedProperties props) {
this.props = props;
kafkaParams = new HashMap<>();
props.keySet().stream().filter(prop -> {
// In order to prevent printing unnecessary warn logs, here filter out the hoodie
// configuration items before passing to kafkaParams
return !prop.toString().startsWith("hoodie.")
// We need to pass some properties to kafka client so that KafkaAvroSchemaDeserializer can use it
|| prop.toString().startsWith("hoodie.deltastreamer.source.kafka.value.deserializer.");
}).forEach(prop -> {
kafkaParams.put(prop.toString(), props.get(prop.toString()));
});
kafkaParams = excludeHoodieConfigs(props);
DataSourceUtils.checkRequiredProperties(props, Collections.singletonList(Config.KAFKA_TOPIC_NAME));
topicName = props.getString(Config.KAFKA_TOPIC_NAME);
String kafkaAutoResetOffsetsStr = props.getString(Config.KAFKA_AUTO_OFFSET_RESET, Config.DEFAULT_KAFKA_AUTO_OFFSET_RESET.name().toLowerCase());
@@ -299,7 +295,36 @@ public class KafkaOffsetGen {
return topicName;
}
public HashMap<String, Object> getKafkaParams() {
public Map<String, Object> getKafkaParams() {
return kafkaParams;
}
private Map<String, Object> excludeHoodieConfigs(TypedProperties props) {
Map<String, Object> kafkaParams = new HashMap<>();
props.keySet().stream().filter(prop -> {
// In order to prevent printing unnecessary warn logs, here filter out the hoodie
// configuration items before passing to kafkaParams
return !prop.toString().startsWith("hoodie.");
}).forEach(prop -> {
kafkaParams.put(prop.toString(), props.get(prop.toString()));
});
return kafkaParams;
}
/**
* Commit offsets to Kafka only after hoodie commit is successful.
* @param checkpointStr checkpoint string containing offsets.
*/
public void commitOffsetToKafka(String checkpointStr) {
DataSourceUtils.checkRequiredProperties(props, Collections.singletonList(ConsumerConfig.GROUP_ID_CONFIG));
Map<TopicPartition, Long> offsetMap = CheckpointUtils.strToOffsets(checkpointStr);
Map<String, Object> kafkaParams = excludeHoodieConfigs(props);
Map<TopicPartition, OffsetAndMetadata> offsetAndMetadataMap = new HashMap<>(offsetMap.size());
try (KafkaConsumer consumer = new KafkaConsumer(kafkaParams)) {
offsetMap.forEach((topicPartition, offset) -> offsetAndMetadataMap.put(topicPartition, new OffsetAndMetadata(offset)));
consumer.commitSync(offsetAndMetadataMap);
} catch (CommitFailedException | TimeoutException e) {
LOG.warn("Committing offsets to Kafka failed, this does not impact processing of records", e);
}
}
}

View File

@@ -22,6 +22,7 @@ import org.apache.hudi.AvroConversionUtils;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.exception.HoodieNotSupportedException;
import org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamerMetrics;
import org.apache.hudi.utilities.deltastreamer.SourceFormatAdapter;
import org.apache.hudi.utilities.schema.FilebasedSchemaProvider;
@@ -30,6 +31,9 @@ import org.apache.hudi.utilities.testutils.UtilitiesTestBase;
import org.apache.avro.generic.GenericRecord;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
@@ -40,9 +44,15 @@ import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import static org.apache.hudi.utilities.sources.helpers.KafkaOffsetGen.Config.ENABLE_KAFKA_COMMIT_OFFSET;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.mockito.Mockito.mock;
/**
@@ -58,7 +68,7 @@ public class TestKafkaSource extends UtilitiesTestBase {
@BeforeAll
public static void initClass() throws Exception {
UtilitiesTestBase.initClass();
UtilitiesTestBase.initClass(false);
}
@AfterAll
@@ -85,6 +95,7 @@ public class TestKafkaSource extends UtilitiesTestBase {
props.setProperty("hoodie.deltastreamer.source.kafka.topic", TEST_TOPIC_NAME);
props.setProperty("bootstrap.servers", testUtils.brokerAddress());
props.setProperty(Config.KAFKA_AUTO_OFFSET_RESET, resetStrategy);
props.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
props.setProperty("hoodie.deltastreamer.kafka.source.maxEvents",
maxEventsToReadFromKafkaSource != null ? String.valueOf(maxEventsToReadFromKafkaSource) :
String.valueOf(Config.maxEventsFromKafkaSource));
@@ -276,4 +287,64 @@ public class TestKafkaSource extends UtilitiesTestBase {
kafkaSource.fetchNewDataInAvroFormat(Option.of(fetch4.getCheckpointForNextBatch()), Long.MAX_VALUE);
assertEquals(Option.empty(), fetch6.getBatch());
}
@Test
public void testCommitOffsetToKafka() {
// topic setup.
testUtils.createTopic(TEST_TOPIC_NAME, 2);
List<TopicPartition> topicPartitions = new ArrayList<>();
TopicPartition topicPartition0 = new TopicPartition(TEST_TOPIC_NAME, 0);
topicPartitions.add(topicPartition0);
TopicPartition topicPartition1 = new TopicPartition(TEST_TOPIC_NAME, 1);
topicPartitions.add(topicPartition1);
HoodieTestDataGenerator dataGenerator = new HoodieTestDataGenerator();
TypedProperties props = createPropsForJsonSource(null, "earliest");
props.put(ENABLE_KAFKA_COMMIT_OFFSET, "true");
Source jsonSource = new JsonKafkaSource(props, jsc, sparkSession, schemaProvider, metrics);
SourceFormatAdapter kafkaSource = new SourceFormatAdapter(jsonSource);
// 1. Extract without any checkpoint => get all the data, respecting sourceLimit
assertEquals(Option.empty(), kafkaSource.fetchNewDataInAvroFormat(Option.empty(), Long.MAX_VALUE).getBatch());
testUtils.sendMessages(TEST_TOPIC_NAME, Helpers.jsonifyRecords(dataGenerator.generateInserts("000", 1000)));
InputBatch<JavaRDD<GenericRecord>> fetch1 = kafkaSource.fetchNewDataInAvroFormat(Option.empty(), 599);
// commit to kafka after first batch
kafkaSource.getSource().onCommit(fetch1.getCheckpointForNextBatch());
try (KafkaConsumer consumer = new KafkaConsumer(props)) {
consumer.assign(topicPartitions);
OffsetAndMetadata offsetAndMetadata = consumer.committed(topicPartition0);
assertNotNull(offsetAndMetadata);
assertEquals(300, offsetAndMetadata.offset());
offsetAndMetadata = consumer.committed(topicPartition1);
assertNotNull(offsetAndMetadata);
assertEquals(299, offsetAndMetadata.offset());
// end offsets will point to 500 for each partition because we consumed less messages from first batch
Map endOffsets = consumer.endOffsets(topicPartitions);
assertEquals(500L, endOffsets.get(topicPartition0));
assertEquals(500L, endOffsets.get(topicPartition1));
testUtils.sendMessages(TEST_TOPIC_NAME, Helpers.jsonifyRecords(dataGenerator.generateInserts("001", 500)));
InputBatch<Dataset<Row>> fetch2 =
kafkaSource.fetchNewDataInRowFormat(Option.of(fetch1.getCheckpointForNextBatch()), Long.MAX_VALUE);
// commit to Kafka after second batch is processed completely
kafkaSource.getSource().onCommit(fetch2.getCheckpointForNextBatch());
offsetAndMetadata = consumer.committed(topicPartition0);
assertNotNull(offsetAndMetadata);
assertEquals(750, offsetAndMetadata.offset());
offsetAndMetadata = consumer.committed(topicPartition1);
assertNotNull(offsetAndMetadata);
assertEquals(750, offsetAndMetadata.offset());
endOffsets = consumer.endOffsets(topicPartitions);
assertEquals(750L, endOffsets.get(topicPartition0));
assertEquals(750L, endOffsets.get(topicPartition1));
}
// check failure case
props.remove(ConsumerConfig.GROUP_ID_CONFIG);
assertThrows(HoodieNotSupportedException.class,() -> kafkaSource.getSource().onCommit(""));
}
}