[HUDI-1233] Deltastreamer Kafka consumption delay reporting indicators (#2074)
This commit is contained in:
@@ -35,12 +35,15 @@ import org.apache.hudi.exception.HoodieException;
|
||||
import org.apache.hudi.exception.HoodieIOException;
|
||||
import org.apache.hudi.index.HoodieIndex;
|
||||
import org.apache.hudi.utilities.checkpointing.InitialCheckPointProvider;
|
||||
import org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamerMetrics;
|
||||
import org.apache.hudi.utilities.schema.DelegatingSchemaProvider;
|
||||
import org.apache.hudi.utilities.schema.RowBasedSchemaProvider;
|
||||
import org.apache.hudi.utilities.schema.SchemaPostProcessor;
|
||||
import org.apache.hudi.utilities.schema.SchemaPostProcessor.Config;
|
||||
import org.apache.hudi.utilities.schema.SchemaProvider;
|
||||
import org.apache.hudi.utilities.schema.SchemaProviderWithPostProcessor;
|
||||
import org.apache.hudi.utilities.sources.AvroKafkaSource;
|
||||
import org.apache.hudi.utilities.sources.JsonKafkaSource;
|
||||
import org.apache.hudi.utilities.sources.Source;
|
||||
import org.apache.hudi.utilities.sources.helpers.DFSPathSelector;
|
||||
import org.apache.hudi.utilities.transform.ChainedTransformer;
|
||||
@@ -93,8 +96,16 @@ public class UtilHelpers {
|
||||
private static final Logger LOG = LogManager.getLogger(UtilHelpers.class);
|
||||
|
||||
public static Source createSource(String sourceClass, TypedProperties cfg, JavaSparkContext jssc,
|
||||
SparkSession sparkSession, SchemaProvider schemaProvider) throws IOException {
|
||||
SparkSession sparkSession, SchemaProvider schemaProvider, HoodieDeltaStreamerMetrics metrics) throws IOException {
|
||||
|
||||
try {
|
||||
if (JsonKafkaSource.class.getName().equals(sourceClass)
|
||||
|| AvroKafkaSource.class.getName().equals(sourceClass)) {
|
||||
return (Source) ReflectionUtils.loadClass(sourceClass,
|
||||
new Class<?>[]{TypedProperties.class, JavaSparkContext.class, SparkSession.class, SchemaProvider.class, HoodieDeltaStreamerMetrics.class}, cfg,
|
||||
jssc, sparkSession, schemaProvider, metrics);
|
||||
}
|
||||
|
||||
return (Source) ReflectionUtils.loadClass(sourceClass,
|
||||
new Class<?>[] {TypedProperties.class, JavaSparkContext.class, SparkSession.class, SchemaProvider.class}, cfg,
|
||||
jssc, sparkSession, schemaProvider);
|
||||
|
||||
@@ -79,7 +79,6 @@ import java.util.List;
|
||||
import java.util.Objects;
|
||||
import java.util.Properties;
|
||||
import java.util.Set;
|
||||
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
import scala.collection.JavaConversions;
|
||||
@@ -171,6 +170,8 @@ public class DeltaSync implements Serializable {
|
||||
*/
|
||||
private transient HoodieWriteClient writeClient;
|
||||
|
||||
private transient HoodieDeltaStreamerMetrics metrics;
|
||||
|
||||
public DeltaSync(HoodieDeltaStreamer.Config cfg, SparkSession sparkSession, SchemaProvider schemaProvider,
|
||||
TypedProperties props, JavaSparkContext jssc, FileSystem fs, Configuration conf,
|
||||
Function<HoodieWriteClient, Boolean> onInitializingHoodieWriteClient) throws IOException {
|
||||
@@ -190,8 +191,10 @@ public class DeltaSync implements Serializable {
|
||||
this.transformer = UtilHelpers.createTransformer(cfg.transformerClassNames);
|
||||
this.keyGenerator = DataSourceUtils.createKeyGenerator(props);
|
||||
|
||||
this.metrics = new HoodieDeltaStreamerMetrics(getHoodieClientConfig(this.schemaProvider));
|
||||
|
||||
this.formatAdapter = new SourceFormatAdapter(
|
||||
UtilHelpers.createSource(cfg.sourceClassName, props, jssc, sparkSession, schemaProvider));
|
||||
UtilHelpers.createSource(cfg.sourceClassName, props, jssc, sparkSession, schemaProvider, metrics));
|
||||
this.conf = conf;
|
||||
}
|
||||
|
||||
@@ -226,7 +229,6 @@ public class DeltaSync implements Serializable {
|
||||
*/
|
||||
public Pair<Option<String>, JavaRDD<WriteStatus>> syncOnce() throws IOException {
|
||||
Pair<Option<String>, JavaRDD<WriteStatus>> result = null;
|
||||
HoodieDeltaStreamerMetrics metrics = new HoodieDeltaStreamerMetrics(getHoodieClientConfig(schemaProvider));
|
||||
Timer.Context overallTimerContext = metrics.getOverallTimerContext();
|
||||
|
||||
// Refresh Timeline
|
||||
|
||||
@@ -89,6 +89,12 @@ public class HoodieDeltaStreamerMetrics implements Serializable {
|
||||
}
|
||||
}
|
||||
|
||||
public void updateDeltaStreamerKafkaDelayCountMetrics(long kafkaDelayCount) {
|
||||
if (config.isMetricsOn()) {
|
||||
Metrics.registerGauge(getMetricsName("deltastreamer", "kafkaDelayCount"), kafkaDelayCount);
|
||||
}
|
||||
}
|
||||
|
||||
public long getDurationInMs(long ctxDuration) {
|
||||
return ctxDuration / 1000000;
|
||||
}
|
||||
|
||||
@@ -20,6 +20,7 @@ package org.apache.hudi.utilities.sources;
|
||||
|
||||
import org.apache.hudi.common.config.TypedProperties;
|
||||
import org.apache.hudi.common.util.Option;
|
||||
import org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamerMetrics;
|
||||
import org.apache.hudi.utilities.schema.SchemaProvider;
|
||||
import org.apache.hudi.utilities.sources.helpers.KafkaOffsetGen;
|
||||
import org.apache.hudi.utilities.sources.helpers.KafkaOffsetGen.CheckpointUtils;
|
||||
@@ -45,9 +46,12 @@ public class AvroKafkaSource extends AvroSource {
|
||||
|
||||
private final KafkaOffsetGen offsetGen;
|
||||
|
||||
private final HoodieDeltaStreamerMetrics metrics;
|
||||
|
||||
public AvroKafkaSource(TypedProperties props, JavaSparkContext sparkContext, SparkSession sparkSession,
|
||||
SchemaProvider schemaProvider) {
|
||||
SchemaProvider schemaProvider, HoodieDeltaStreamerMetrics metrics) {
|
||||
super(props, sparkContext, sparkSession, schemaProvider);
|
||||
this.metrics = metrics;
|
||||
props.put("key.deserializer", StringDeserializer.class);
|
||||
props.put("value.deserializer", KafkaAvroDeserializer.class);
|
||||
offsetGen = new KafkaOffsetGen(props);
|
||||
@@ -55,7 +59,7 @@ public class AvroKafkaSource extends AvroSource {
|
||||
|
||||
@Override
|
||||
protected InputBatch<JavaRDD<GenericRecord>> fetchNewData(Option<String> lastCheckpointStr, long sourceLimit) {
|
||||
OffsetRange[] offsetRanges = offsetGen.getNextOffsetRanges(lastCheckpointStr, sourceLimit);
|
||||
OffsetRange[] offsetRanges = offsetGen.getNextOffsetRanges(lastCheckpointStr, sourceLimit, metrics);
|
||||
long totalNewMsgs = CheckpointUtils.totalNewMessages(offsetRanges);
|
||||
LOG.info("About to read " + totalNewMsgs + " from Kafka for topic :" + offsetGen.getTopicName());
|
||||
if (totalNewMsgs <= 0) {
|
||||
|
||||
@@ -20,6 +20,7 @@ package org.apache.hudi.utilities.sources;
|
||||
|
||||
import org.apache.hudi.common.config.TypedProperties;
|
||||
import org.apache.hudi.common.util.Option;
|
||||
import org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamerMetrics;
|
||||
import org.apache.hudi.utilities.schema.SchemaProvider;
|
||||
import org.apache.hudi.utilities.sources.helpers.KafkaOffsetGen;
|
||||
import org.apache.hudi.utilities.sources.helpers.KafkaOffsetGen.CheckpointUtils;
|
||||
@@ -43,9 +44,12 @@ public class JsonKafkaSource extends JsonSource {
|
||||
|
||||
private final KafkaOffsetGen offsetGen;
|
||||
|
||||
private final HoodieDeltaStreamerMetrics metrics;
|
||||
|
||||
public JsonKafkaSource(TypedProperties properties, JavaSparkContext sparkContext, SparkSession sparkSession,
|
||||
SchemaProvider schemaProvider) {
|
||||
SchemaProvider schemaProvider, HoodieDeltaStreamerMetrics metrics) {
|
||||
super(properties, sparkContext, sparkSession, schemaProvider);
|
||||
this.metrics = metrics;
|
||||
properties.put("key.deserializer", StringDeserializer.class);
|
||||
properties.put("value.deserializer", StringDeserializer.class);
|
||||
offsetGen = new KafkaOffsetGen(properties);
|
||||
@@ -53,7 +57,7 @@ public class JsonKafkaSource extends JsonSource {
|
||||
|
||||
@Override
|
||||
protected InputBatch<JavaRDD<String>> fetchNewData(Option<String> lastCheckpointStr, long sourceLimit) {
|
||||
OffsetRange[] offsetRanges = offsetGen.getNextOffsetRanges(lastCheckpointStr, sourceLimit);
|
||||
OffsetRange[] offsetRanges = offsetGen.getNextOffsetRanges(lastCheckpointStr, sourceLimit, metrics);
|
||||
long totalNewMsgs = CheckpointUtils.totalNewMessages(offsetRanges);
|
||||
LOG.info("About to read " + totalNewMsgs + " from Kafka for topic :" + offsetGen.getTopicName());
|
||||
if (totalNewMsgs <= 0) {
|
||||
|
||||
@@ -24,6 +24,7 @@ import org.apache.hudi.common.util.Option;
|
||||
import org.apache.hudi.exception.HoodieException;
|
||||
import org.apache.hudi.exception.HoodieNotSupportedException;
|
||||
|
||||
import org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamerMetrics;
|
||||
import org.apache.kafka.clients.consumer.KafkaConsumer;
|
||||
import org.apache.kafka.common.PartitionInfo;
|
||||
import org.apache.kafka.common.TopicPartition;
|
||||
@@ -166,7 +167,7 @@ public class KafkaOffsetGen {
|
||||
topicName = props.getString(Config.KAFKA_TOPIC_NAME);
|
||||
}
|
||||
|
||||
public OffsetRange[] getNextOffsetRanges(Option<String> lastCheckpointStr, long sourceLimit) {
|
||||
public OffsetRange[] getNextOffsetRanges(Option<String> lastCheckpointStr, long sourceLimit, HoodieDeltaStreamerMetrics metrics) {
|
||||
|
||||
// Obtain current metadata for the topic
|
||||
Map<TopicPartition, Long> fromOffsets;
|
||||
@@ -183,6 +184,7 @@ public class KafkaOffsetGen {
|
||||
// Determine the offset ranges to read from
|
||||
if (lastCheckpointStr.isPresent() && !lastCheckpointStr.get().isEmpty()) {
|
||||
fromOffsets = checkupValidOffsets(consumer, lastCheckpointStr, topicPartitions);
|
||||
metrics.updateDeltaStreamerKafkaDelayCountMetrics(delayOffsetCalculation(lastCheckpointStr, topicPartitions, consumer));
|
||||
} else {
|
||||
KafkaResetOffsetStrategies autoResetValue = KafkaResetOffsetStrategies
|
||||
.valueOf(props.getString("auto.offset.reset", Config.DEFAULT_AUTO_RESET_OFFSET.toString()).toUpperCase());
|
||||
@@ -233,6 +235,18 @@ public class KafkaOffsetGen {
|
||||
return checkpointOffsetReseter ? earliestOffsets : checkpointOffsets;
|
||||
}
|
||||
|
||||
private Long delayOffsetCalculation(Option<String> lastCheckpointStr, Set<TopicPartition> topicPartitions, KafkaConsumer consumer) {
|
||||
Long delayCount = 0L;
|
||||
Map<TopicPartition, Long> checkpointOffsets = CheckpointUtils.strToOffsets(lastCheckpointStr.get());
|
||||
Map<TopicPartition, Long> lastOffsets = consumer.endOffsets(topicPartitions);
|
||||
|
||||
for (Map.Entry<TopicPartition, Long> entry : lastOffsets.entrySet()) {
|
||||
Long offect = checkpointOffsets.getOrDefault(entry.getKey(), 0L);
|
||||
delayCount += entry.getValue() - offect > 0 ? entry.getValue() - offect : 0L;
|
||||
}
|
||||
return delayCount;
|
||||
}
|
||||
|
||||
/**
|
||||
* Check if topic exists.
|
||||
* @param consumer kafka consumer
|
||||
|
||||
@@ -22,6 +22,7 @@ import org.apache.hudi.AvroConversionUtils;
|
||||
import org.apache.hudi.common.config.TypedProperties;
|
||||
import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
|
||||
import org.apache.hudi.common.util.Option;
|
||||
import org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamerMetrics;
|
||||
import org.apache.hudi.utilities.deltastreamer.SourceFormatAdapter;
|
||||
import org.apache.hudi.utilities.schema.FilebasedSchemaProvider;
|
||||
import org.apache.hudi.utilities.sources.helpers.KafkaOffsetGen.CheckpointUtils;
|
||||
@@ -46,6 +47,7 @@ import java.util.HashMap;
|
||||
import java.util.UUID;
|
||||
|
||||
import static org.junit.jupiter.api.Assertions.assertEquals;
|
||||
import static org.mockito.Mockito.mock;
|
||||
|
||||
/**
|
||||
* Tests against {@link AvroKafkaSource}.
|
||||
@@ -56,6 +58,7 @@ public class TestKafkaSource extends UtilitiesTestBase {
|
||||
|
||||
private FilebasedSchemaProvider schemaProvider;
|
||||
private KafkaTestUtils testUtils;
|
||||
private HoodieDeltaStreamerMetrics metrics = mock(HoodieDeltaStreamerMetrics.class);
|
||||
|
||||
@BeforeAll
|
||||
public static void initClass() throws Exception {
|
||||
@@ -101,7 +104,7 @@ public class TestKafkaSource extends UtilitiesTestBase {
|
||||
HoodieTestDataGenerator dataGenerator = new HoodieTestDataGenerator();
|
||||
TypedProperties props = createPropsForJsonSource(null, "earliest");
|
||||
|
||||
Source jsonSource = new JsonKafkaSource(props, jsc, sparkSession, schemaProvider);
|
||||
Source jsonSource = new JsonKafkaSource(props, jsc, sparkSession, schemaProvider, metrics);
|
||||
SourceFormatAdapter kafkaSource = new SourceFormatAdapter(jsonSource);
|
||||
|
||||
// 1. Extract without any checkpoint => get all the data, respecting sourceLimit
|
||||
@@ -149,11 +152,11 @@ public class TestKafkaSource extends UtilitiesTestBase {
|
||||
HoodieTestDataGenerator dataGenerator = new HoodieTestDataGenerator();
|
||||
|
||||
TypedProperties earliestProps = createPropsForJsonSource(null, "earliest");
|
||||
Source earliestJsonSource = new JsonKafkaSource(earliestProps, jsc, sparkSession, schemaProvider);
|
||||
Source earliestJsonSource = new JsonKafkaSource(earliestProps, jsc, sparkSession, schemaProvider, metrics);
|
||||
SourceFormatAdapter earliestKafkaSource = new SourceFormatAdapter(earliestJsonSource);
|
||||
|
||||
TypedProperties latestProps = createPropsForJsonSource(null, "latest");
|
||||
Source latestJsonSource = new JsonKafkaSource(latestProps, jsc, sparkSession, schemaProvider);
|
||||
Source latestJsonSource = new JsonKafkaSource(latestProps, jsc, sparkSession, schemaProvider, metrics);
|
||||
SourceFormatAdapter latestKafkaSource = new SourceFormatAdapter(latestJsonSource);
|
||||
|
||||
// 1. Extract with a none data kafka checkpoint
|
||||
@@ -181,7 +184,7 @@ public class TestKafkaSource extends UtilitiesTestBase {
|
||||
HoodieTestDataGenerator dataGenerator = new HoodieTestDataGenerator();
|
||||
TypedProperties props = createPropsForJsonSource(Long.MAX_VALUE, "earliest");
|
||||
|
||||
Source jsonSource = new JsonKafkaSource(props, jsc, sparkSession, schemaProvider);
|
||||
Source jsonSource = new JsonKafkaSource(props, jsc, sparkSession, schemaProvider, metrics);
|
||||
SourceFormatAdapter kafkaSource = new SourceFormatAdapter(jsonSource);
|
||||
Config.maxEventsFromKafkaSource = 500;
|
||||
|
||||
@@ -210,7 +213,7 @@ public class TestKafkaSource extends UtilitiesTestBase {
|
||||
HoodieTestDataGenerator dataGenerator = new HoodieTestDataGenerator();
|
||||
TypedProperties props = createPropsForJsonSource(Long.MAX_VALUE, "earliest");
|
||||
|
||||
Source jsonSource = new JsonKafkaSource(props, jsc, sparkSession, schemaProvider);
|
||||
Source jsonSource = new JsonKafkaSource(props, jsc, sparkSession, schemaProvider, metrics);
|
||||
SourceFormatAdapter kafkaSource = new SourceFormatAdapter(jsonSource);
|
||||
Config.maxEventsFromKafkaSource = 500;
|
||||
|
||||
@@ -242,7 +245,7 @@ public class TestKafkaSource extends UtilitiesTestBase {
|
||||
HoodieTestDataGenerator dataGenerator = new HoodieTestDataGenerator();
|
||||
TypedProperties props = createPropsForJsonSource(500L, "earliest");
|
||||
|
||||
Source jsonSource = new JsonKafkaSource(props, jsc, sparkSession, schemaProvider);
|
||||
Source jsonSource = new JsonKafkaSource(props, jsc, sparkSession, schemaProvider, metrics);
|
||||
SourceFormatAdapter kafkaSource = new SourceFormatAdapter(jsonSource);
|
||||
|
||||
// 1. Extract without any checkpoint => get all the data, respecting sourceLimit
|
||||
|
||||
Reference in New Issue
Block a user