diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/UtilHelpers.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/UtilHelpers.java index ac4880585..0f4a64cd2 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/UtilHelpers.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/UtilHelpers.java @@ -35,12 +35,15 @@ import org.apache.hudi.exception.HoodieException; import org.apache.hudi.exception.HoodieIOException; import org.apache.hudi.index.HoodieIndex; import org.apache.hudi.utilities.checkpointing.InitialCheckPointProvider; +import org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamerMetrics; import org.apache.hudi.utilities.schema.DelegatingSchemaProvider; import org.apache.hudi.utilities.schema.RowBasedSchemaProvider; import org.apache.hudi.utilities.schema.SchemaPostProcessor; import org.apache.hudi.utilities.schema.SchemaPostProcessor.Config; import org.apache.hudi.utilities.schema.SchemaProvider; import org.apache.hudi.utilities.schema.SchemaProviderWithPostProcessor; +import org.apache.hudi.utilities.sources.AvroKafkaSource; +import org.apache.hudi.utilities.sources.JsonKafkaSource; import org.apache.hudi.utilities.sources.Source; import org.apache.hudi.utilities.sources.helpers.DFSPathSelector; import org.apache.hudi.utilities.transform.ChainedTransformer; @@ -93,8 +96,16 @@ public class UtilHelpers { private static final Logger LOG = LogManager.getLogger(UtilHelpers.class); public static Source createSource(String sourceClass, TypedProperties cfg, JavaSparkContext jssc, - SparkSession sparkSession, SchemaProvider schemaProvider) throws IOException { + SparkSession sparkSession, SchemaProvider schemaProvider, HoodieDeltaStreamerMetrics metrics) throws IOException { + try { + if (JsonKafkaSource.class.getName().equals(sourceClass) + || AvroKafkaSource.class.getName().equals(sourceClass)) { + return (Source) ReflectionUtils.loadClass(sourceClass, + new Class[]{TypedProperties.class, JavaSparkContext.class, SparkSession.class, SchemaProvider.class, HoodieDeltaStreamerMetrics.class}, cfg, + jssc, sparkSession, schemaProvider, metrics); + } + return (Source) ReflectionUtils.loadClass(sourceClass, new Class[] {TypedProperties.class, JavaSparkContext.class, SparkSession.class, SchemaProvider.class}, cfg, jssc, sparkSession, schemaProvider); diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java index 2b0de9355..afe686213 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java @@ -79,7 +79,6 @@ import java.util.List; import java.util.Objects; import java.util.Properties; import java.util.Set; - import java.util.stream.Collectors; import scala.collection.JavaConversions; @@ -171,6 +170,8 @@ public class DeltaSync implements Serializable { */ private transient HoodieWriteClient writeClient; + private transient HoodieDeltaStreamerMetrics metrics; + public DeltaSync(HoodieDeltaStreamer.Config cfg, SparkSession sparkSession, SchemaProvider schemaProvider, TypedProperties props, JavaSparkContext jssc, FileSystem fs, Configuration conf, Function onInitializingHoodieWriteClient) throws IOException { @@ -190,8 +191,10 @@ public class DeltaSync implements Serializable { this.transformer = UtilHelpers.createTransformer(cfg.transformerClassNames); this.keyGenerator = DataSourceUtils.createKeyGenerator(props); + this.metrics = new HoodieDeltaStreamerMetrics(getHoodieClientConfig(this.schemaProvider)); + this.formatAdapter = new SourceFormatAdapter( - UtilHelpers.createSource(cfg.sourceClassName, props, jssc, sparkSession, schemaProvider)); + UtilHelpers.createSource(cfg.sourceClassName, props, jssc, sparkSession, schemaProvider, metrics)); this.conf = conf; } @@ -226,7 +229,6 @@ public class DeltaSync implements Serializable { */ public Pair, JavaRDD> syncOnce() throws IOException { Pair, JavaRDD> result = null; - HoodieDeltaStreamerMetrics metrics = new HoodieDeltaStreamerMetrics(getHoodieClientConfig(schemaProvider)); Timer.Context overallTimerContext = metrics.getOverallTimerContext(); // Refresh Timeline diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamerMetrics.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamerMetrics.java index 8e3d25326..056713771 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamerMetrics.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamerMetrics.java @@ -89,6 +89,12 @@ public class HoodieDeltaStreamerMetrics implements Serializable { } } + public void updateDeltaStreamerKafkaDelayCountMetrics(long kafkaDelayCount) { + if (config.isMetricsOn()) { + Metrics.registerGauge(getMetricsName("deltastreamer", "kafkaDelayCount"), kafkaDelayCount); + } + } + public long getDurationInMs(long ctxDuration) { return ctxDuration / 1000000; } diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/AvroKafkaSource.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/AvroKafkaSource.java index 66a38e2a4..256516bd2 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/AvroKafkaSource.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/AvroKafkaSource.java @@ -20,6 +20,7 @@ package org.apache.hudi.utilities.sources; import org.apache.hudi.common.config.TypedProperties; import org.apache.hudi.common.util.Option; +import org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamerMetrics; import org.apache.hudi.utilities.schema.SchemaProvider; import org.apache.hudi.utilities.sources.helpers.KafkaOffsetGen; import org.apache.hudi.utilities.sources.helpers.KafkaOffsetGen.CheckpointUtils; @@ -45,9 +46,12 @@ public class AvroKafkaSource extends AvroSource { private final KafkaOffsetGen offsetGen; + private final HoodieDeltaStreamerMetrics metrics; + public AvroKafkaSource(TypedProperties props, JavaSparkContext sparkContext, SparkSession sparkSession, - SchemaProvider schemaProvider) { + SchemaProvider schemaProvider, HoodieDeltaStreamerMetrics metrics) { super(props, sparkContext, sparkSession, schemaProvider); + this.metrics = metrics; props.put("key.deserializer", StringDeserializer.class); props.put("value.deserializer", KafkaAvroDeserializer.class); offsetGen = new KafkaOffsetGen(props); @@ -55,7 +59,7 @@ public class AvroKafkaSource extends AvroSource { @Override protected InputBatch> fetchNewData(Option lastCheckpointStr, long sourceLimit) { - OffsetRange[] offsetRanges = offsetGen.getNextOffsetRanges(lastCheckpointStr, sourceLimit); + OffsetRange[] offsetRanges = offsetGen.getNextOffsetRanges(lastCheckpointStr, sourceLimit, metrics); long totalNewMsgs = CheckpointUtils.totalNewMessages(offsetRanges); LOG.info("About to read " + totalNewMsgs + " from Kafka for topic :" + offsetGen.getTopicName()); if (totalNewMsgs <= 0) { diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/JsonKafkaSource.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/JsonKafkaSource.java index 833c6c630..cedaba48c 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/JsonKafkaSource.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/JsonKafkaSource.java @@ -20,6 +20,7 @@ package org.apache.hudi.utilities.sources; import org.apache.hudi.common.config.TypedProperties; import org.apache.hudi.common.util.Option; +import org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamerMetrics; import org.apache.hudi.utilities.schema.SchemaProvider; import org.apache.hudi.utilities.sources.helpers.KafkaOffsetGen; import org.apache.hudi.utilities.sources.helpers.KafkaOffsetGen.CheckpointUtils; @@ -43,9 +44,12 @@ public class JsonKafkaSource extends JsonSource { private final KafkaOffsetGen offsetGen; + private final HoodieDeltaStreamerMetrics metrics; + public JsonKafkaSource(TypedProperties properties, JavaSparkContext sparkContext, SparkSession sparkSession, - SchemaProvider schemaProvider) { + SchemaProvider schemaProvider, HoodieDeltaStreamerMetrics metrics) { super(properties, sparkContext, sparkSession, schemaProvider); + this.metrics = metrics; properties.put("key.deserializer", StringDeserializer.class); properties.put("value.deserializer", StringDeserializer.class); offsetGen = new KafkaOffsetGen(properties); @@ -53,7 +57,7 @@ public class JsonKafkaSource extends JsonSource { @Override protected InputBatch> fetchNewData(Option lastCheckpointStr, long sourceLimit) { - OffsetRange[] offsetRanges = offsetGen.getNextOffsetRanges(lastCheckpointStr, sourceLimit); + OffsetRange[] offsetRanges = offsetGen.getNextOffsetRanges(lastCheckpointStr, sourceLimit, metrics); long totalNewMsgs = CheckpointUtils.totalNewMessages(offsetRanges); LOG.info("About to read " + totalNewMsgs + " from Kafka for topic :" + offsetGen.getTopicName()); if (totalNewMsgs <= 0) { diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/KafkaOffsetGen.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/KafkaOffsetGen.java index e958f85ed..fc7ba7909 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/KafkaOffsetGen.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/KafkaOffsetGen.java @@ -24,6 +24,7 @@ import org.apache.hudi.common.util.Option; import org.apache.hudi.exception.HoodieException; import org.apache.hudi.exception.HoodieNotSupportedException; +import org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamerMetrics; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.common.PartitionInfo; import org.apache.kafka.common.TopicPartition; @@ -166,7 +167,7 @@ public class KafkaOffsetGen { topicName = props.getString(Config.KAFKA_TOPIC_NAME); } - public OffsetRange[] getNextOffsetRanges(Option lastCheckpointStr, long sourceLimit) { + public OffsetRange[] getNextOffsetRanges(Option lastCheckpointStr, long sourceLimit, HoodieDeltaStreamerMetrics metrics) { // Obtain current metadata for the topic Map fromOffsets; @@ -183,6 +184,7 @@ public class KafkaOffsetGen { // Determine the offset ranges to read from if (lastCheckpointStr.isPresent() && !lastCheckpointStr.get().isEmpty()) { fromOffsets = checkupValidOffsets(consumer, lastCheckpointStr, topicPartitions); + metrics.updateDeltaStreamerKafkaDelayCountMetrics(delayOffsetCalculation(lastCheckpointStr, topicPartitions, consumer)); } else { KafkaResetOffsetStrategies autoResetValue = KafkaResetOffsetStrategies .valueOf(props.getString("auto.offset.reset", Config.DEFAULT_AUTO_RESET_OFFSET.toString()).toUpperCase()); @@ -233,6 +235,18 @@ public class KafkaOffsetGen { return checkpointOffsetReseter ? earliestOffsets : checkpointOffsets; } + private Long delayOffsetCalculation(Option lastCheckpointStr, Set topicPartitions, KafkaConsumer consumer) { + Long delayCount = 0L; + Map checkpointOffsets = CheckpointUtils.strToOffsets(lastCheckpointStr.get()); + Map lastOffsets = consumer.endOffsets(topicPartitions); + + for (Map.Entry entry : lastOffsets.entrySet()) { + Long offect = checkpointOffsets.getOrDefault(entry.getKey(), 0L); + delayCount += entry.getValue() - offect > 0 ? entry.getValue() - offect : 0L; + } + return delayCount; + } + /** * Check if topic exists. * @param consumer kafka consumer diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestKafkaSource.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestKafkaSource.java index ea77e26ce..e8cb2a6f1 100644 --- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestKafkaSource.java +++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestKafkaSource.java @@ -22,6 +22,7 @@ import org.apache.hudi.AvroConversionUtils; import org.apache.hudi.common.config.TypedProperties; import org.apache.hudi.common.testutils.HoodieTestDataGenerator; import org.apache.hudi.common.util.Option; +import org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamerMetrics; import org.apache.hudi.utilities.deltastreamer.SourceFormatAdapter; import org.apache.hudi.utilities.schema.FilebasedSchemaProvider; import org.apache.hudi.utilities.sources.helpers.KafkaOffsetGen.CheckpointUtils; @@ -46,6 +47,7 @@ import java.util.HashMap; import java.util.UUID; import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.mockito.Mockito.mock; /** * Tests against {@link AvroKafkaSource}. @@ -56,6 +58,7 @@ public class TestKafkaSource extends UtilitiesTestBase { private FilebasedSchemaProvider schemaProvider; private KafkaTestUtils testUtils; + private HoodieDeltaStreamerMetrics metrics = mock(HoodieDeltaStreamerMetrics.class); @BeforeAll public static void initClass() throws Exception { @@ -101,7 +104,7 @@ public class TestKafkaSource extends UtilitiesTestBase { HoodieTestDataGenerator dataGenerator = new HoodieTestDataGenerator(); TypedProperties props = createPropsForJsonSource(null, "earliest"); - Source jsonSource = new JsonKafkaSource(props, jsc, sparkSession, schemaProvider); + Source jsonSource = new JsonKafkaSource(props, jsc, sparkSession, schemaProvider, metrics); SourceFormatAdapter kafkaSource = new SourceFormatAdapter(jsonSource); // 1. Extract without any checkpoint => get all the data, respecting sourceLimit @@ -149,11 +152,11 @@ public class TestKafkaSource extends UtilitiesTestBase { HoodieTestDataGenerator dataGenerator = new HoodieTestDataGenerator(); TypedProperties earliestProps = createPropsForJsonSource(null, "earliest"); - Source earliestJsonSource = new JsonKafkaSource(earliestProps, jsc, sparkSession, schemaProvider); + Source earliestJsonSource = new JsonKafkaSource(earliestProps, jsc, sparkSession, schemaProvider, metrics); SourceFormatAdapter earliestKafkaSource = new SourceFormatAdapter(earliestJsonSource); TypedProperties latestProps = createPropsForJsonSource(null, "latest"); - Source latestJsonSource = new JsonKafkaSource(latestProps, jsc, sparkSession, schemaProvider); + Source latestJsonSource = new JsonKafkaSource(latestProps, jsc, sparkSession, schemaProvider, metrics); SourceFormatAdapter latestKafkaSource = new SourceFormatAdapter(latestJsonSource); // 1. Extract with a none data kafka checkpoint @@ -181,7 +184,7 @@ public class TestKafkaSource extends UtilitiesTestBase { HoodieTestDataGenerator dataGenerator = new HoodieTestDataGenerator(); TypedProperties props = createPropsForJsonSource(Long.MAX_VALUE, "earliest"); - Source jsonSource = new JsonKafkaSource(props, jsc, sparkSession, schemaProvider); + Source jsonSource = new JsonKafkaSource(props, jsc, sparkSession, schemaProvider, metrics); SourceFormatAdapter kafkaSource = new SourceFormatAdapter(jsonSource); Config.maxEventsFromKafkaSource = 500; @@ -210,7 +213,7 @@ public class TestKafkaSource extends UtilitiesTestBase { HoodieTestDataGenerator dataGenerator = new HoodieTestDataGenerator(); TypedProperties props = createPropsForJsonSource(Long.MAX_VALUE, "earliest"); - Source jsonSource = new JsonKafkaSource(props, jsc, sparkSession, schemaProvider); + Source jsonSource = new JsonKafkaSource(props, jsc, sparkSession, schemaProvider, metrics); SourceFormatAdapter kafkaSource = new SourceFormatAdapter(jsonSource); Config.maxEventsFromKafkaSource = 500; @@ -242,7 +245,7 @@ public class TestKafkaSource extends UtilitiesTestBase { HoodieTestDataGenerator dataGenerator = new HoodieTestDataGenerator(); TypedProperties props = createPropsForJsonSource(500L, "earliest"); - Source jsonSource = new JsonKafkaSource(props, jsc, sparkSession, schemaProvider); + Source jsonSource = new JsonKafkaSource(props, jsc, sparkSession, schemaProvider, metrics); SourceFormatAdapter kafkaSource = new SourceFormatAdapter(jsonSource); // 1. Extract without any checkpoint => get all the data, respecting sourceLimit