1
0

[HUDI-2069] Refactored String constants (#3172)

This commit is contained in:
Sebastian Bernauer
2021-07-07 20:22:00 +02:00
committed by GitHub
parent ea9e5d0e8b
commit 8f7ad8b178
5 changed files with 33 additions and 30 deletions

View File

@@ -25,6 +25,7 @@ import org.apache.hudi.config.HoodieWriteConfig
import org.apache.hudi.hive.{HiveSyncTool, SlashEncodedDayPartitionValueExtractor}
import org.apache.hudi.keygen.constant.KeyGeneratorOptions
import org.apache.hudi.keygen.{CustomKeyGenerator, SimpleKeyGenerator}
import org.apache.log4j.LogManager
import org.apache.spark.sql.execution.datasources.{DataSourceUtils => SparkDataSourceUtils}
@@ -473,15 +474,11 @@ object DataSourceWriteOptions {
.defaultValue("true")
.withDocumentation("")
val KAFKA_AVRO_VALUE_DESERIALIZER: ConfigProperty[String] = ConfigProperty
val KAFKA_AVRO_VALUE_DESERIALIZER_CLASS: ConfigProperty[String] = ConfigProperty
.key("hoodie.deltastreamer.source.kafka.value.deserializer.class")
.noDefaultValue()
.withDocumentation("")
val KAFKA_AVRO_VALUE_DESERIALIZER_SCHEMA: ConfigProperty[String] = ConfigProperty
.key("hoodie.deltastreamer.source.kafka.value.deserializer.schema")
.noDefaultValue()
.withDocumentation("")
.defaultValue("io.confluent.kafka.serializers.KafkaAvroDeserializer")
.sinceVersion("0.9.0")
.withDocumentation("This class is used by kafka client to deserialize the records")
}
object DataSourceOptionsHelper {

View File

@@ -18,14 +18,14 @@
package org.apache.hudi.utilities.deser;
import org.apache.hudi.DataSourceWriteOptions;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.exception.HoodieException;
import org.apache.avro.Schema;
import org.apache.hudi.utilities.sources.AvroKafkaSource;
import org.apache.kafka.common.errors.SerializationException;
import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient;
import io.confluent.kafka.serializers.KafkaAvroDeserializer;
import org.apache.avro.Schema;
import org.apache.kafka.common.errors.SerializationException;
import java.util.Map;
import java.util.Map.Entry;
@@ -48,7 +48,7 @@ public class KafkaAvroSchemaDeserializer extends KafkaAvroDeserializer {
super.configure(configs, isKey);
try {
TypedProperties props = getConvertToTypedProperties(configs);
sourceSchema = new Schema.Parser().parse(props.getString(DataSourceWriteOptions.KAFKA_AVRO_VALUE_DESERIALIZER_SCHEMA().key()));
sourceSchema = new Schema.Parser().parse(props.getString(AvroKafkaSource.KAFKA_AVRO_VALUE_DESERIALIZER_SCHEMA));
} catch (Throwable e) {
throw new HoodieException(e);
}

View File

@@ -24,11 +24,11 @@ import org.apache.hudi.common.util.Option;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamerMetrics;
import org.apache.hudi.utilities.deser.KafkaAvroSchemaDeserializer;
import org.apache.hudi.utilities.schema.SchemaProvider;
import org.apache.hudi.utilities.sources.helpers.KafkaOffsetGen;
import org.apache.hudi.utilities.sources.helpers.KafkaOffsetGen.CheckpointUtils;
import io.confluent.kafka.serializers.KafkaAvroDeserializer;
import org.apache.avro.generic.GenericRecord;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.log4j.LogManager;
@@ -40,8 +40,8 @@ import org.apache.spark.streaming.kafka010.KafkaUtils;
import org.apache.spark.streaming.kafka010.LocationStrategies;
import org.apache.spark.streaming.kafka010.OffsetRange;
import static org.apache.hudi.utilities.sources.helpers.KafkaOffsetGen.Config.DEFAULT_ENABLE_KAFKA_COMMIT_OFFSET;
import static org.apache.hudi.utilities.sources.helpers.KafkaOffsetGen.Config.ENABLE_KAFKA_COMMIT_OFFSET;
import static org.apache.hudi.utilities.sources.helpers.KafkaOffsetGen.Config.DEFAULT_ENABLE_KAFKA_COMMIT_OFFSET;
/**
* Reads avro serialized Kafka data, based on the confluent schema-registry.
@@ -52,6 +52,10 @@ public class AvroKafkaSource extends AvroSource {
// these are native kafka's config. do not change the config names.
private static final String NATIVE_KAFKA_KEY_DESERIALIZER_PROP = "key.deserializer";
private static final String NATIVE_KAFKA_VALUE_DESERIALIZER_PROP = "value.deserializer";
// These are settings used to pass things to KafkaAvroDeserializer
public static final String KAFKA_AVRO_VALUE_DESERIALIZER_PROPERTY_PREFIX = "hoodie.deltastreamer.source.kafka.value.deserializer.";
public static final String KAFKA_AVRO_VALUE_DESERIALIZER_SCHEMA = KAFKA_AVRO_VALUE_DESERIALIZER_PROPERTY_PREFIX + "schema";
private final KafkaOffsetGen offsetGen;
private final HoodieDeltaStreamerMetrics metrics;
@@ -60,22 +64,21 @@ public class AvroKafkaSource extends AvroSource {
super(props, sparkContext, sparkSession, schemaProvider);
props.put(NATIVE_KAFKA_KEY_DESERIALIZER_PROP, StringDeserializer.class);
String deserializerClassName = props.getString(DataSourceWriteOptions.KAFKA_AVRO_VALUE_DESERIALIZER().key(), "");
String deserializerClassName = props.getString(DataSourceWriteOptions.KAFKA_AVRO_VALUE_DESERIALIZER_CLASS().key(),
DataSourceWriteOptions.KAFKA_AVRO_VALUE_DESERIALIZER_CLASS().defaultValue());
if (deserializerClassName.isEmpty()) {
props.put(NATIVE_KAFKA_VALUE_DESERIALIZER_PROP, KafkaAvroDeserializer.class);
} else {
try {
props.put(NATIVE_KAFKA_VALUE_DESERIALIZER_PROP, Class.forName(deserializerClassName));
try {
props.put(NATIVE_KAFKA_VALUE_DESERIALIZER_PROP, Class.forName(deserializerClassName));
if (deserializerClassName.equals(KafkaAvroSchemaDeserializer.class.getName())) {
if (schemaProvider == null) {
throw new HoodieIOException("SchemaProvider has to be set to use custom Deserializer");
throw new HoodieIOException("SchemaProvider has to be set to use KafkaAvroSchemaDeserializer");
}
props.put(DataSourceWriteOptions.KAFKA_AVRO_VALUE_DESERIALIZER_SCHEMA().key(), schemaProvider.getSourceSchema().toString());
} catch (ClassNotFoundException e) {
String error = "Could not load custom avro kafka deserializer: " + deserializerClassName;
LOG.error(error);
throw new HoodieException(error, e);
props.put(KAFKA_AVRO_VALUE_DESERIALIZER_SCHEMA, schemaProvider.getSourceSchema().toString());
}
} catch (ClassNotFoundException e) {
String error = "Could not load custom avro kafka deserializer: " + deserializerClassName;
LOG.error(error);
throw new HoodieException(error, e);
}
this.metrics = metrics;

View File

@@ -26,6 +26,7 @@ import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.exception.HoodieNotSupportedException;
import org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamerMetrics;
import org.apache.hudi.utilities.sources.AvroKafkaSource;
import org.apache.kafka.clients.consumer.CommitFailedException;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.KafkaConsumer;
@@ -310,7 +311,9 @@ public class KafkaOffsetGen {
props.keySet().stream().filter(prop -> {
// In order to prevent printing unnecessary warn logs, here filter out the hoodie
// configuration items before passing to kafkaParams
return !prop.toString().startsWith("hoodie.");
return !prop.toString().startsWith("hoodie.")
// We need to pass some properties to kafka client so that KafkaAvroSchemaDeserializer can use it
|| prop.toString().startsWith(AvroKafkaSource.KAFKA_AVRO_VALUE_DESERIALIZER_PROPERTY_PREFIX);
}).forEach(prop -> {
kafkaParams.put(prop.toString(), props.get(prop.toString()));
});

View File

@@ -18,8 +18,8 @@
package org.apache.hudi.utilities.deser;
import org.apache.hudi.DataSourceWriteOptions;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.utilities.sources.AvroKafkaSource;
import org.apache.hudi.utilities.sources.helpers.SchemaTestProvider;
import org.apache.hudi.utilities.testutils.UtilitiesTestBase;
@@ -100,7 +100,7 @@ public class TestKafkaAvroSchemaDeserializer extends UtilitiesTestBase {
public void testKafkaAvroSchemaDeserializer() {
byte[] bytesOrigRecord;
IndexedRecord avroRecord = createUserRecord();
config.put(DataSourceWriteOptions.KAFKA_AVRO_VALUE_DESERIALIZER_SCHEMA().key(), origSchema.toString());
config.put(AvroKafkaSource.KAFKA_AVRO_VALUE_DESERIALIZER_SCHEMA, origSchema.toString());
KafkaAvroSchemaDeserializer avroDeserializer = new KafkaAvroSchemaDeserializer(schemaRegistry, new HashMap(config));
avroDeserializer.configure(new HashMap(config), false);
@@ -112,7 +112,7 @@ public class TestKafkaAvroSchemaDeserializer extends UtilitiesTestBase {
byte[] bytesExtendedRecord = avroSerializer.serialize(topic, avroRecordWithAllField);
SchemaTestProvider.schemaToReturn.set(evolSchema);
config.put(DataSourceWriteOptions.KAFKA_AVRO_VALUE_DESERIALIZER_SCHEMA().key(), evolSchema.toString());
config.put(AvroKafkaSource.KAFKA_AVRO_VALUE_DESERIALIZER_SCHEMA, evolSchema.toString());
avroDeserializer = new KafkaAvroSchemaDeserializer(schemaRegistry, new HashMap(config));
avroDeserializer.configure(new HashMap(config), false);
// record is serialized w/ evolved schema, and deserialized w/ evolved schema