[HUDI-2069] Fix KafkaAvroSchemaDeserializer to not rely on reflection (#3111)
[HUDI-2069] KafkaAvroSchemaDeserializer should get sourceSchema passed instead using Reflection
This commit is contained in:
committed by
GitHub
parent
84dd3ca18b
commit
b32855545b
@@ -382,8 +382,6 @@ object DataSourceWriteOptions {
|
|||||||
|
|
||||||
// Avro Kafka Source configs
|
// Avro Kafka Source configs
|
||||||
val KAFKA_AVRO_VALUE_DESERIALIZER = "hoodie.deltastreamer.source.kafka.value.deserializer.class"
|
val KAFKA_AVRO_VALUE_DESERIALIZER = "hoodie.deltastreamer.source.kafka.value.deserializer.class"
|
||||||
|
// Schema to be used in custom kakfa deserializer
|
||||||
// Schema provider class to be set to be used in custom kakfa deserializer
|
val KAFKA_AVRO_VALUE_DESERIALIZER_SCHEMA = "hoodie.deltastreamer.source.kafka.value.deserializer.schema"
|
||||||
val SCHEMA_PROVIDER_CLASS_PROP = "hoodie.deltastreamer.schemaprovider.class"
|
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -24,14 +24,11 @@ import org.apache.avro.Schema;
|
|||||||
|
|
||||||
import org.apache.hudi.DataSourceWriteOptions;
|
import org.apache.hudi.DataSourceWriteOptions;
|
||||||
import org.apache.hudi.common.config.TypedProperties;
|
import org.apache.hudi.common.config.TypedProperties;
|
||||||
import org.apache.hudi.common.util.ReflectionUtils;
|
|
||||||
import org.apache.hudi.exception.HoodieException;
|
import org.apache.hudi.exception.HoodieException;
|
||||||
import org.apache.hudi.utilities.schema.SchemaProvider;
|
|
||||||
import org.apache.kafka.common.errors.SerializationException;
|
import org.apache.kafka.common.errors.SerializationException;
|
||||||
|
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.Map.Entry;
|
import java.util.Map.Entry;
|
||||||
import java.util.Objects;
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Extending {@link KafkaAvroSchemaDeserializer} as we need to be able to inject reader schema during deserialization.
|
* Extending {@link KafkaAvroSchemaDeserializer} as we need to be able to inject reader schema during deserialization.
|
||||||
@@ -51,9 +48,7 @@ public class KafkaAvroSchemaDeserializer extends KafkaAvroDeserializer {
|
|||||||
super.configure(configs, isKey);
|
super.configure(configs, isKey);
|
||||||
try {
|
try {
|
||||||
TypedProperties props = getConvertToTypedProperties(configs);
|
TypedProperties props = getConvertToTypedProperties(configs);
|
||||||
String className = props.getString(DataSourceWriteOptions.SCHEMA_PROVIDER_CLASS_PROP());
|
sourceSchema = new Schema.Parser().parse(props.getString(DataSourceWriteOptions.KAFKA_AVRO_VALUE_DESERIALIZER_SCHEMA()));
|
||||||
SchemaProvider schemaProvider = (SchemaProvider) ReflectionUtils.loadClass(className, props);
|
|
||||||
sourceSchema = Objects.requireNonNull(schemaProvider).getSourceSchema();
|
|
||||||
} catch (Throwable e) {
|
} catch (Throwable e) {
|
||||||
throw new HoodieException(e);
|
throw new HoodieException(e);
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -63,11 +63,11 @@ public class AvroKafkaSource extends AvroSource {
|
|||||||
props.put(NATIVE_KAFKA_VALUE_DESERIALIZER_PROP, KafkaAvroDeserializer.class);
|
props.put(NATIVE_KAFKA_VALUE_DESERIALIZER_PROP, KafkaAvroDeserializer.class);
|
||||||
} else {
|
} else {
|
||||||
try {
|
try {
|
||||||
|
props.put(NATIVE_KAFKA_VALUE_DESERIALIZER_PROP, Class.forName(deserializerClassName));
|
||||||
if (schemaProvider == null) {
|
if (schemaProvider == null) {
|
||||||
throw new HoodieIOException("SchemaProvider has to be set to use custom Deserializer");
|
throw new HoodieIOException("SchemaProvider has to be set to use custom Deserializer");
|
||||||
}
|
}
|
||||||
props.put(DataSourceWriteOptions.SCHEMA_PROVIDER_CLASS_PROP(), schemaProvider.getClass().getName());
|
props.put(DataSourceWriteOptions.KAFKA_AVRO_VALUE_DESERIALIZER_SCHEMA(), schemaProvider.getSourceSchema().toString());
|
||||||
props.put(NATIVE_KAFKA_VALUE_DESERIALIZER_PROP, Class.forName(deserializerClassName));
|
|
||||||
} catch (ClassNotFoundException e) {
|
} catch (ClassNotFoundException e) {
|
||||||
String error = "Could not load custom avro kafka deserializer: " + deserializerClassName;
|
String error = "Could not load custom avro kafka deserializer: " + deserializerClassName;
|
||||||
LOG.error(error);
|
LOG.error(error);
|
||||||
|
|||||||
@@ -176,7 +176,9 @@ public class KafkaOffsetGen {
|
|||||||
props.keySet().stream().filter(prop -> {
|
props.keySet().stream().filter(prop -> {
|
||||||
// In order to prevent printing unnecessary warn logs, here filter out the hoodie
|
// In order to prevent printing unnecessary warn logs, here filter out the hoodie
|
||||||
// configuration items before passing to kafkaParams
|
// configuration items before passing to kafkaParams
|
||||||
return !prop.toString().startsWith("hoodie.");
|
return !prop.toString().startsWith("hoodie.")
|
||||||
|
// We need to pass some properties to kafka client so that KafkaAvroSchemaDeserializer can use it
|
||||||
|
|| prop.toString().startsWith("hoodie.deltastreamer.source.kafka.value.deserializer.");
|
||||||
}).forEach(prop -> {
|
}).forEach(prop -> {
|
||||||
kafkaParams.put(prop.toString(), props.get(prop.toString()));
|
kafkaParams.put(prop.toString(), props.get(prop.toString()));
|
||||||
});
|
});
|
||||||
|
|||||||
@@ -51,13 +51,12 @@ public class TestKafkaAvroSchemaDeserializer extends UtilitiesTestBase {
|
|||||||
private final String topic;
|
private final String topic;
|
||||||
private final Schema origSchema = createUserSchema();
|
private final Schema origSchema = createUserSchema();
|
||||||
private final Schema evolSchema = createExtendUserSchema();
|
private final Schema evolSchema = createExtendUserSchema();
|
||||||
private Properties defaultConfig = new Properties();
|
private Properties config = new Properties();
|
||||||
|
|
||||||
public TestKafkaAvroSchemaDeserializer() {
|
public TestKafkaAvroSchemaDeserializer() {
|
||||||
defaultConfig.put(KafkaAvroDeserializerConfig.SCHEMA_REGISTRY_URL_CONFIG, "bogus");
|
config.put(KafkaAvroDeserializerConfig.SCHEMA_REGISTRY_URL_CONFIG, "bogus");
|
||||||
defaultConfig.put("hoodie.deltastreamer.schemaprovider.class", SchemaTestProvider.class.getName());
|
|
||||||
schemaRegistry = new MockSchemaRegistryClient();
|
schemaRegistry = new MockSchemaRegistryClient();
|
||||||
avroSerializer = new KafkaAvroSerializer(schemaRegistry, new HashMap(defaultConfig));
|
avroSerializer = new KafkaAvroSerializer(schemaRegistry, new HashMap(config));
|
||||||
topic = "test";
|
topic = "test";
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -66,8 +65,7 @@ public class TestKafkaAvroSchemaDeserializer extends UtilitiesTestBase {
|
|||||||
+ "\"name\": \"User\","
|
+ "\"name\": \"User\","
|
||||||
+ "\"fields\": [{\"name\": \"name\", \"type\": \"string\"}]}";
|
+ "\"fields\": [{\"name\": \"name\", \"type\": \"string\"}]}";
|
||||||
Schema.Parser parser = new Schema.Parser();
|
Schema.Parser parser = new Schema.Parser();
|
||||||
Schema schema = parser.parse(userSchema);
|
return parser.parse(userSchema);
|
||||||
return schema;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
private IndexedRecord createUserRecord() {
|
private IndexedRecord createUserRecord() {
|
||||||
@@ -83,8 +81,7 @@ public class TestKafkaAvroSchemaDeserializer extends UtilitiesTestBase {
|
|||||||
+ "\"fields\": [{\"name\": \"name\", \"type\": \"string\"}, "
|
+ "\"fields\": [{\"name\": \"name\", \"type\": \"string\"}, "
|
||||||
+ "{\"name\": \"age\", \"type\": [\"null\", \"int\"], \"default\": null}]}";
|
+ "{\"name\": \"age\", \"type\": [\"null\", \"int\"], \"default\": null}]}";
|
||||||
Schema.Parser parser = new Schema.Parser();
|
Schema.Parser parser = new Schema.Parser();
|
||||||
Schema schema = parser.parse(userSchema);
|
return parser.parse(userSchema);
|
||||||
return schema;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
private IndexedRecord createExtendUserRecord() {
|
private IndexedRecord createExtendUserRecord() {
|
||||||
@@ -102,9 +99,10 @@ public class TestKafkaAvroSchemaDeserializer extends UtilitiesTestBase {
|
|||||||
public void testKafkaAvroSchemaDeserializer() {
|
public void testKafkaAvroSchemaDeserializer() {
|
||||||
byte[] bytesOrigRecord;
|
byte[] bytesOrigRecord;
|
||||||
IndexedRecord avroRecord = createUserRecord();
|
IndexedRecord avroRecord = createUserRecord();
|
||||||
SchemaTestProvider.schemaToReturn.set(origSchema);
|
config.put("hoodie.deltastreamer.source.kafka.value.deserializer.schema", origSchema.toString());
|
||||||
KafkaAvroSchemaDeserializer avroDeserializer = new KafkaAvroSchemaDeserializer(schemaRegistry, new HashMap(defaultConfig));
|
|
||||||
avroDeserializer.configure(new HashMap(defaultConfig), false);
|
KafkaAvroSchemaDeserializer avroDeserializer = new KafkaAvroSchemaDeserializer(schemaRegistry, new HashMap(config));
|
||||||
|
avroDeserializer.configure(new HashMap(config), false);
|
||||||
bytesOrigRecord = avroSerializer.serialize(topic, avroRecord);
|
bytesOrigRecord = avroSerializer.serialize(topic, avroRecord);
|
||||||
// record is serialized in orig schema and deserialized using same schema.
|
// record is serialized in orig schema and deserialized using same schema.
|
||||||
assertEquals(avroRecord, avroDeserializer.deserialize(false, topic, false, bytesOrigRecord, origSchema));
|
assertEquals(avroRecord, avroDeserializer.deserialize(false, topic, false, bytesOrigRecord, origSchema));
|
||||||
@@ -113,8 +111,9 @@ public class TestKafkaAvroSchemaDeserializer extends UtilitiesTestBase {
|
|||||||
byte[] bytesExtendedRecord = avroSerializer.serialize(topic, avroRecordWithAllField);
|
byte[] bytesExtendedRecord = avroSerializer.serialize(topic, avroRecordWithAllField);
|
||||||
|
|
||||||
SchemaTestProvider.schemaToReturn.set(evolSchema);
|
SchemaTestProvider.schemaToReturn.set(evolSchema);
|
||||||
avroDeserializer = new KafkaAvroSchemaDeserializer(schemaRegistry, new HashMap(defaultConfig));
|
config.put("hoodie.deltastreamer.source.kafka.value.deserializer.schema", evolSchema.toString());
|
||||||
avroDeserializer.configure(new HashMap(defaultConfig), false);
|
avroDeserializer = new KafkaAvroSchemaDeserializer(schemaRegistry, new HashMap(config));
|
||||||
|
avroDeserializer.configure(new HashMap(config), false);
|
||||||
// record is serialized w/ evolved schema, and deserialized w/ evolved schema
|
// record is serialized w/ evolved schema, and deserialized w/ evolved schema
|
||||||
IndexedRecord avroRecordWithAllFieldActual = (IndexedRecord) avroDeserializer.deserialize(false, topic, false, bytesExtendedRecord, evolSchema);
|
IndexedRecord avroRecordWithAllFieldActual = (IndexedRecord) avroDeserializer.deserialize(false, topic, false, bytesExtendedRecord, evolSchema);
|
||||||
assertEquals(avroRecordWithAllField, avroRecordWithAllFieldActual);
|
assertEquals(avroRecordWithAllField, avroRecordWithAllFieldActual);
|
||||||
|
|||||||
Reference in New Issue
Block a user