From 039aeb6dcee0a8eb4372c079ec07b8fc2582e41f Mon Sep 17 00:00:00 2001 From: Vinay Patil <52563354+veenaypatil@users.noreply.github.com> Date: Mon, 28 Jun 2021 19:22:05 +0530 Subject: [PATCH] [HUDI-1910] Commit Offset to Kafka after successful Hudi commit (#3092) --- .../callback/SourceCommitCallback.java | 33 +++++++++ .../utilities/deltastreamer/DeltaSync.java | 2 +- .../deltastreamer/SourceFormatAdapter.java | 4 + .../utilities/sources/AvroKafkaSource.java | 10 +++ .../utilities/sources/JsonKafkaSource.java | 10 +++ .../apache/hudi/utilities/sources/Source.java | 3 +- .../sources/helpers/KafkaOffsetGen.java | 51 +++++++++---- .../utilities/sources/TestKafkaSource.java | 73 ++++++++++++++++++- 8 files changed, 170 insertions(+), 16 deletions(-) create mode 100644 hudi-utilities/src/main/java/org/apache/hudi/utilities/callback/SourceCommitCallback.java diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/callback/SourceCommitCallback.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/callback/SourceCommitCallback.java new file mode 100644 index 000000000..3d3597320 --- /dev/null +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/callback/SourceCommitCallback.java @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.utilities.callback; + +/** + * A callback interface that provides the Source an option to perform action on successful Hudi commit. + */ +public interface SourceCommitCallback { + + /** + * Performs some action on successful Hudi commit like committing offsets to Kafka. + * + * @param lastCkptStr last checkpoint string. + */ + default void onCommit(String lastCkptStr) { + } +} diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java index c60449a79..cdea9120c 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java @@ -473,7 +473,7 @@ public class DeltaSync implements Serializable { boolean success = writeClient.commit(instantTime, writeStatusRDD, Option.of(checkpointCommitMetadata)); if (success) { LOG.info("Commit " + instantTime + " successful!"); - + this.formatAdapter.getSource().onCommit(checkpointStr); // Schedule compaction if needed if (cfg.isAsyncCompactionEnabled()) { scheduledCompactionInstant = writeClient.scheduleCompaction(Option.empty()); diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/SourceFormatAdapter.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/SourceFormatAdapter.java index 379cc4b75..e9367707e 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/SourceFormatAdapter.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/SourceFormatAdapter.java @@ -118,4 +118,8 @@ public final class SourceFormatAdapter { throw new IllegalArgumentException("Unknown source type (" + source.getSourceType() + ")"); } } + + public Source getSource() { + return source; + } } diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/AvroKafkaSource.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/AvroKafkaSource.java index 124487941..652e442a8 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/AvroKafkaSource.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/AvroKafkaSource.java @@ -40,6 +40,9 @@ import org.apache.spark.streaming.kafka010.KafkaUtils; import org.apache.spark.streaming.kafka010.LocationStrategies; import org.apache.spark.streaming.kafka010.OffsetRange; +import static org.apache.hudi.utilities.sources.helpers.KafkaOffsetGen.Config.DEFAULT_ENABLE_KAFKA_COMMIT_OFFSET; +import static org.apache.hudi.utilities.sources.helpers.KafkaOffsetGen.Config.ENABLE_KAFKA_COMMIT_OFFSET; + /** * Reads avro serialized Kafka data, based on the confluent schema-registry. */ @@ -95,4 +98,11 @@ public class AvroKafkaSource extends AvroSource { return KafkaUtils.createRDD(sparkContext, offsetGen.getKafkaParams(), offsetRanges, LocationStrategies.PreferConsistent()).map(obj -> (GenericRecord) obj.value()); } + + @Override + public void onCommit(String lastCkptStr) { + if (this.props.getBoolean(ENABLE_KAFKA_COMMIT_OFFSET, DEFAULT_ENABLE_KAFKA_COMMIT_OFFSET)) { + offsetGen.commitOffsetToKafka(lastCkptStr); + } + } } diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/JsonKafkaSource.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/JsonKafkaSource.java index cedaba48c..c1e2e3dad 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/JsonKafkaSource.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/JsonKafkaSource.java @@ -35,6 +35,9 @@ import org.apache.spark.streaming.kafka010.KafkaUtils; import org.apache.spark.streaming.kafka010.LocationStrategies; import org.apache.spark.streaming.kafka010.OffsetRange; +import static org.apache.hudi.utilities.sources.helpers.KafkaOffsetGen.Config.DEFAULT_ENABLE_KAFKA_COMMIT_OFFSET; +import static org.apache.hudi.utilities.sources.helpers.KafkaOffsetGen.Config.ENABLE_KAFKA_COMMIT_OFFSET; + /** * Read json kafka data. */ @@ -71,4 +74,11 @@ public class JsonKafkaSource extends JsonSource { return KafkaUtils.createRDD(sparkContext, offsetGen.getKafkaParams(), offsetRanges, LocationStrategies.PreferConsistent()).map(x -> (String) x.value()); } + + @Override + public void onCommit(String lastCkptStr) { + if (this.props.getBoolean(ENABLE_KAFKA_COMMIT_OFFSET, DEFAULT_ENABLE_KAFKA_COMMIT_OFFSET)) { + offsetGen.commitOffsetToKafka(lastCkptStr); + } + } } diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/Source.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/Source.java index 4d25d479a..6d610d5c8 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/Source.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/Source.java @@ -23,6 +23,7 @@ import org.apache.hudi.PublicAPIClass; import org.apache.hudi.PublicAPIMethod; import org.apache.hudi.common.config.TypedProperties; import org.apache.hudi.common.util.Option; +import org.apache.hudi.utilities.callback.SourceCommitCallback; import org.apache.hudi.utilities.schema.SchemaProvider; import org.apache.spark.api.java.JavaSparkContext; @@ -34,7 +35,7 @@ import java.io.Serializable; * Represents a source from which we can tail data. Assumes a constructor that takes properties. */ @PublicAPIClass(maturity = ApiMaturityLevel.STABLE) -public abstract class Source implements Serializable { +public abstract class Source implements SourceCommitCallback, Serializable { public enum SourceType { JSON, AVRO, ROW diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/KafkaOffsetGen.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/KafkaOffsetGen.java index be23002f8..d3f410d02 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/KafkaOffsetGen.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/KafkaOffsetGen.java @@ -26,9 +26,13 @@ import org.apache.hudi.exception.HoodieException; import org.apache.hudi.exception.HoodieNotSupportedException; import org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamerMetrics; +import org.apache.kafka.clients.consumer.CommitFailedException; +import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.clients.consumer.OffsetAndMetadata; import org.apache.kafka.common.PartitionInfo; import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.errors.TimeoutException; import org.apache.log4j.LogManager; import org.apache.log4j.Logger; import org.apache.spark.streaming.kafka010.OffsetRange; @@ -157,6 +161,8 @@ public class KafkaOffsetGen { private static final String KAFKA_TOPIC_NAME = "hoodie.deltastreamer.source.kafka.topic"; private static final String MAX_EVENTS_FROM_KAFKA_SOURCE_PROP = "hoodie.deltastreamer.kafka.source.maxEvents"; + public static final String ENABLE_KAFKA_COMMIT_OFFSET = "hoodie.deltastreamer.source.kafka.enable.commit.offset"; + public static final Boolean DEFAULT_ENABLE_KAFKA_COMMIT_OFFSET = false; // "auto.offset.reset" is kafka native config param. Do not change the config param name. public static final String KAFKA_AUTO_OFFSET_RESET = "auto.offset.reset"; private static final KafkaResetOffsetStrategies DEFAULT_KAFKA_AUTO_OFFSET_RESET = KafkaResetOffsetStrategies.LATEST; @@ -164,24 +170,14 @@ public class KafkaOffsetGen { public static long maxEventsFromKafkaSource = DEFAULT_MAX_EVENTS_FROM_KAFKA_SOURCE; } - private final HashMap kafkaParams; + private final Map kafkaParams; private final TypedProperties props; protected final String topicName; private KafkaResetOffsetStrategies autoResetValue; public KafkaOffsetGen(TypedProperties props) { this.props = props; - - kafkaParams = new HashMap<>(); - props.keySet().stream().filter(prop -> { - // In order to prevent printing unnecessary warn logs, here filter out the hoodie - // configuration items before passing to kafkaParams - return !prop.toString().startsWith("hoodie.") - // We need to pass some properties to kafka client so that KafkaAvroSchemaDeserializer can use it - || prop.toString().startsWith("hoodie.deltastreamer.source.kafka.value.deserializer."); - }).forEach(prop -> { - kafkaParams.put(prop.toString(), props.get(prop.toString())); - }); + kafkaParams = excludeHoodieConfigs(props); DataSourceUtils.checkRequiredProperties(props, Collections.singletonList(Config.KAFKA_TOPIC_NAME)); topicName = props.getString(Config.KAFKA_TOPIC_NAME); String kafkaAutoResetOffsetsStr = props.getString(Config.KAFKA_AUTO_OFFSET_RESET, Config.DEFAULT_KAFKA_AUTO_OFFSET_RESET.name().toLowerCase()); @@ -299,7 +295,36 @@ public class KafkaOffsetGen { return topicName; } - public HashMap getKafkaParams() { + public Map getKafkaParams() { return kafkaParams; } + + private Map excludeHoodieConfigs(TypedProperties props) { + Map kafkaParams = new HashMap<>(); + props.keySet().stream().filter(prop -> { + // In order to prevent printing unnecessary warn logs, here filter out the hoodie + // configuration items before passing to kafkaParams + return !prop.toString().startsWith("hoodie."); + }).forEach(prop -> { + kafkaParams.put(prop.toString(), props.get(prop.toString())); + }); + return kafkaParams; + } + + /** + * Commit offsets to Kafka only after hoodie commit is successful. + * @param checkpointStr checkpoint string containing offsets. + */ + public void commitOffsetToKafka(String checkpointStr) { + DataSourceUtils.checkRequiredProperties(props, Collections.singletonList(ConsumerConfig.GROUP_ID_CONFIG)); + Map offsetMap = CheckpointUtils.strToOffsets(checkpointStr); + Map kafkaParams = excludeHoodieConfigs(props); + Map offsetAndMetadataMap = new HashMap<>(offsetMap.size()); + try (KafkaConsumer consumer = new KafkaConsumer(kafkaParams)) { + offsetMap.forEach((topicPartition, offset) -> offsetAndMetadataMap.put(topicPartition, new OffsetAndMetadata(offset))); + consumer.commitSync(offsetAndMetadataMap); + } catch (CommitFailedException | TimeoutException e) { + LOG.warn("Committing offsets to Kafka failed, this does not impact processing of records", e); + } + } } diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestKafkaSource.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestKafkaSource.java index ec8a945c4..a1a00faa5 100644 --- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestKafkaSource.java +++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestKafkaSource.java @@ -22,6 +22,7 @@ import org.apache.hudi.AvroConversionUtils; import org.apache.hudi.common.config.TypedProperties; import org.apache.hudi.common.testutils.HoodieTestDataGenerator; import org.apache.hudi.common.util.Option; +import org.apache.hudi.exception.HoodieNotSupportedException; import org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamerMetrics; import org.apache.hudi.utilities.deltastreamer.SourceFormatAdapter; import org.apache.hudi.utilities.schema.FilebasedSchemaProvider; @@ -30,6 +31,9 @@ import org.apache.hudi.utilities.testutils.UtilitiesTestBase; import org.apache.avro.generic.GenericRecord; import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.clients.consumer.OffsetAndMetadata; +import org.apache.kafka.common.TopicPartition; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Row; @@ -40,9 +44,15 @@ import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; import java.util.UUID; +import static org.apache.hudi.utilities.sources.helpers.KafkaOffsetGen.Config.ENABLE_KAFKA_COMMIT_OFFSET; import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertThrows; import static org.mockito.Mockito.mock; /** @@ -58,7 +68,7 @@ public class TestKafkaSource extends UtilitiesTestBase { @BeforeAll public static void initClass() throws Exception { - UtilitiesTestBase.initClass(); + UtilitiesTestBase.initClass(false); } @AfterAll @@ -85,6 +95,7 @@ public class TestKafkaSource extends UtilitiesTestBase { props.setProperty("hoodie.deltastreamer.source.kafka.topic", TEST_TOPIC_NAME); props.setProperty("bootstrap.servers", testUtils.brokerAddress()); props.setProperty(Config.KAFKA_AUTO_OFFSET_RESET, resetStrategy); + props.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false"); props.setProperty("hoodie.deltastreamer.kafka.source.maxEvents", maxEventsToReadFromKafkaSource != null ? String.valueOf(maxEventsToReadFromKafkaSource) : String.valueOf(Config.maxEventsFromKafkaSource)); @@ -276,4 +287,64 @@ public class TestKafkaSource extends UtilitiesTestBase { kafkaSource.fetchNewDataInAvroFormat(Option.of(fetch4.getCheckpointForNextBatch()), Long.MAX_VALUE); assertEquals(Option.empty(), fetch6.getBatch()); } + + @Test + public void testCommitOffsetToKafka() { + // topic setup. + testUtils.createTopic(TEST_TOPIC_NAME, 2); + List topicPartitions = new ArrayList<>(); + TopicPartition topicPartition0 = new TopicPartition(TEST_TOPIC_NAME, 0); + topicPartitions.add(topicPartition0); + TopicPartition topicPartition1 = new TopicPartition(TEST_TOPIC_NAME, 1); + topicPartitions.add(topicPartition1); + + HoodieTestDataGenerator dataGenerator = new HoodieTestDataGenerator(); + TypedProperties props = createPropsForJsonSource(null, "earliest"); + props.put(ENABLE_KAFKA_COMMIT_OFFSET, "true"); + Source jsonSource = new JsonKafkaSource(props, jsc, sparkSession, schemaProvider, metrics); + SourceFormatAdapter kafkaSource = new SourceFormatAdapter(jsonSource); + + // 1. Extract without any checkpoint => get all the data, respecting sourceLimit + assertEquals(Option.empty(), kafkaSource.fetchNewDataInAvroFormat(Option.empty(), Long.MAX_VALUE).getBatch()); + testUtils.sendMessages(TEST_TOPIC_NAME, Helpers.jsonifyRecords(dataGenerator.generateInserts("000", 1000))); + + InputBatch> fetch1 = kafkaSource.fetchNewDataInAvroFormat(Option.empty(), 599); + // commit to kafka after first batch + kafkaSource.getSource().onCommit(fetch1.getCheckpointForNextBatch()); + try (KafkaConsumer consumer = new KafkaConsumer(props)) { + consumer.assign(topicPartitions); + + OffsetAndMetadata offsetAndMetadata = consumer.committed(topicPartition0); + assertNotNull(offsetAndMetadata); + assertEquals(300, offsetAndMetadata.offset()); + offsetAndMetadata = consumer.committed(topicPartition1); + assertNotNull(offsetAndMetadata); + assertEquals(299, offsetAndMetadata.offset()); + // end offsets will point to 500 for each partition because we consumed less messages from first batch + Map endOffsets = consumer.endOffsets(topicPartitions); + assertEquals(500L, endOffsets.get(topicPartition0)); + assertEquals(500L, endOffsets.get(topicPartition1)); + + testUtils.sendMessages(TEST_TOPIC_NAME, Helpers.jsonifyRecords(dataGenerator.generateInserts("001", 500))); + InputBatch> fetch2 = + kafkaSource.fetchNewDataInRowFormat(Option.of(fetch1.getCheckpointForNextBatch()), Long.MAX_VALUE); + + // commit to Kafka after second batch is processed completely + kafkaSource.getSource().onCommit(fetch2.getCheckpointForNextBatch()); + + offsetAndMetadata = consumer.committed(topicPartition0); + assertNotNull(offsetAndMetadata); + assertEquals(750, offsetAndMetadata.offset()); + offsetAndMetadata = consumer.committed(topicPartition1); + assertNotNull(offsetAndMetadata); + assertEquals(750, offsetAndMetadata.offset()); + + endOffsets = consumer.endOffsets(topicPartitions); + assertEquals(750L, endOffsets.get(topicPartition0)); + assertEquals(750L, endOffsets.get(topicPartition1)); + } + // check failure case + props.remove(ConsumerConfig.GROUP_ID_CONFIG); + assertThrows(HoodieNotSupportedException.class,() -> kafkaSource.getSource().onCommit("")); + } }