diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala index 89faa3bbb..06c2ea369 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala @@ -382,8 +382,6 @@ object DataSourceWriteOptions { // Avro Kafka Source configs val KAFKA_AVRO_VALUE_DESERIALIZER = "hoodie.deltastreamer.source.kafka.value.deserializer.class" - - // Schema provider class to be set to be used in custom kakfa deserializer - val SCHEMA_PROVIDER_CLASS_PROP = "hoodie.deltastreamer.schemaprovider.class" - + // Schema to be used in custom kakfa deserializer + val KAFKA_AVRO_VALUE_DESERIALIZER_SCHEMA = "hoodie.deltastreamer.source.kafka.value.deserializer.schema" } diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deser/KafkaAvroSchemaDeserializer.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deser/KafkaAvroSchemaDeserializer.java index 5d0a116ca..da04b632a 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deser/KafkaAvroSchemaDeserializer.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deser/KafkaAvroSchemaDeserializer.java @@ -24,14 +24,11 @@ import org.apache.avro.Schema; import org.apache.hudi.DataSourceWriteOptions; import org.apache.hudi.common.config.TypedProperties; -import org.apache.hudi.common.util.ReflectionUtils; import org.apache.hudi.exception.HoodieException; -import org.apache.hudi.utilities.schema.SchemaProvider; import org.apache.kafka.common.errors.SerializationException; import java.util.Map; import java.util.Map.Entry; -import java.util.Objects; /** * Extending {@link KafkaAvroSchemaDeserializer} as we need to be able to inject reader schema during deserialization. @@ -51,9 +48,7 @@ public class KafkaAvroSchemaDeserializer extends KafkaAvroDeserializer { super.configure(configs, isKey); try { TypedProperties props = getConvertToTypedProperties(configs); - String className = props.getString(DataSourceWriteOptions.SCHEMA_PROVIDER_CLASS_PROP()); - SchemaProvider schemaProvider = (SchemaProvider) ReflectionUtils.loadClass(className, props); - sourceSchema = Objects.requireNonNull(schemaProvider).getSourceSchema(); + sourceSchema = new Schema.Parser().parse(props.getString(DataSourceWriteOptions.KAFKA_AVRO_VALUE_DESERIALIZER_SCHEMA())); } catch (Throwable e) { throw new HoodieException(e); } diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/AvroKafkaSource.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/AvroKafkaSource.java index 511a72c28..124487941 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/AvroKafkaSource.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/AvroKafkaSource.java @@ -63,11 +63,11 @@ public class AvroKafkaSource extends AvroSource { props.put(NATIVE_KAFKA_VALUE_DESERIALIZER_PROP, KafkaAvroDeserializer.class); } else { try { + props.put(NATIVE_KAFKA_VALUE_DESERIALIZER_PROP, Class.forName(deserializerClassName)); if (schemaProvider == null) { throw new HoodieIOException("SchemaProvider has to be set to use custom Deserializer"); } - props.put(DataSourceWriteOptions.SCHEMA_PROVIDER_CLASS_PROP(), schemaProvider.getClass().getName()); - props.put(NATIVE_KAFKA_VALUE_DESERIALIZER_PROP, Class.forName(deserializerClassName)); + props.put(DataSourceWriteOptions.KAFKA_AVRO_VALUE_DESERIALIZER_SCHEMA(), schemaProvider.getSourceSchema().toString()); } catch (ClassNotFoundException e) { String error = "Could not load custom avro kafka deserializer: " + deserializerClassName; LOG.error(error); diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/KafkaOffsetGen.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/KafkaOffsetGen.java index c14046ee4..be23002f8 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/KafkaOffsetGen.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/KafkaOffsetGen.java @@ -176,7 +176,9 @@ public class KafkaOffsetGen { props.keySet().stream().filter(prop -> { // In order to prevent printing unnecessary warn logs, here filter out the hoodie // configuration items before passing to kafkaParams - return !prop.toString().startsWith("hoodie."); + return !prop.toString().startsWith("hoodie.") + // We need to pass some properties to kafka client so that KafkaAvroSchemaDeserializer can use it + || prop.toString().startsWith("hoodie.deltastreamer.source.kafka.value.deserializer."); }).forEach(prop -> { kafkaParams.put(prop.toString(), props.get(prop.toString())); }); diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/deser/TestKafkaAvroSchemaDeserializer.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/deser/TestKafkaAvroSchemaDeserializer.java index 14c5f0107..714d636bd 100644 --- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/deser/TestKafkaAvroSchemaDeserializer.java +++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/deser/TestKafkaAvroSchemaDeserializer.java @@ -51,13 +51,12 @@ public class TestKafkaAvroSchemaDeserializer extends UtilitiesTestBase { private final String topic; private final Schema origSchema = createUserSchema(); private final Schema evolSchema = createExtendUserSchema(); - private Properties defaultConfig = new Properties(); + private Properties config = new Properties(); public TestKafkaAvroSchemaDeserializer() { - defaultConfig.put(KafkaAvroDeserializerConfig.SCHEMA_REGISTRY_URL_CONFIG, "bogus"); - defaultConfig.put("hoodie.deltastreamer.schemaprovider.class", SchemaTestProvider.class.getName()); + config.put(KafkaAvroDeserializerConfig.SCHEMA_REGISTRY_URL_CONFIG, "bogus"); schemaRegistry = new MockSchemaRegistryClient(); - avroSerializer = new KafkaAvroSerializer(schemaRegistry, new HashMap(defaultConfig)); + avroSerializer = new KafkaAvroSerializer(schemaRegistry, new HashMap(config)); topic = "test"; } @@ -66,8 +65,7 @@ public class TestKafkaAvroSchemaDeserializer extends UtilitiesTestBase { + "\"name\": \"User\"," + "\"fields\": [{\"name\": \"name\", \"type\": \"string\"}]}"; Schema.Parser parser = new Schema.Parser(); - Schema schema = parser.parse(userSchema); - return schema; + return parser.parse(userSchema); } private IndexedRecord createUserRecord() { @@ -83,8 +81,7 @@ public class TestKafkaAvroSchemaDeserializer extends UtilitiesTestBase { + "\"fields\": [{\"name\": \"name\", \"type\": \"string\"}, " + "{\"name\": \"age\", \"type\": [\"null\", \"int\"], \"default\": null}]}"; Schema.Parser parser = new Schema.Parser(); - Schema schema = parser.parse(userSchema); - return schema; + return parser.parse(userSchema); } private IndexedRecord createExtendUserRecord() { @@ -102,9 +99,10 @@ public class TestKafkaAvroSchemaDeserializer extends UtilitiesTestBase { public void testKafkaAvroSchemaDeserializer() { byte[] bytesOrigRecord; IndexedRecord avroRecord = createUserRecord(); - SchemaTestProvider.schemaToReturn.set(origSchema); - KafkaAvroSchemaDeserializer avroDeserializer = new KafkaAvroSchemaDeserializer(schemaRegistry, new HashMap(defaultConfig)); - avroDeserializer.configure(new HashMap(defaultConfig), false); + config.put("hoodie.deltastreamer.source.kafka.value.deserializer.schema", origSchema.toString()); + + KafkaAvroSchemaDeserializer avroDeserializer = new KafkaAvroSchemaDeserializer(schemaRegistry, new HashMap(config)); + avroDeserializer.configure(new HashMap(config), false); bytesOrigRecord = avroSerializer.serialize(topic, avroRecord); // record is serialized in orig schema and deserialized using same schema. assertEquals(avroRecord, avroDeserializer.deserialize(false, topic, false, bytesOrigRecord, origSchema)); @@ -113,8 +111,9 @@ public class TestKafkaAvroSchemaDeserializer extends UtilitiesTestBase { byte[] bytesExtendedRecord = avroSerializer.serialize(topic, avroRecordWithAllField); SchemaTestProvider.schemaToReturn.set(evolSchema); - avroDeserializer = new KafkaAvroSchemaDeserializer(schemaRegistry, new HashMap(defaultConfig)); - avroDeserializer.configure(new HashMap(defaultConfig), false); + config.put("hoodie.deltastreamer.source.kafka.value.deserializer.schema", evolSchema.toString()); + avroDeserializer = new KafkaAvroSchemaDeserializer(schemaRegistry, new HashMap(config)); + avroDeserializer.configure(new HashMap(config), false); // record is serialized w/ evolved schema, and deserialized w/ evolved schema IndexedRecord avroRecordWithAllFieldActual = (IndexedRecord) avroDeserializer.deserialize(false, topic, false, bytesExtendedRecord, evolSchema); assertEquals(avroRecordWithAllField, avroRecordWithAllFieldActual);