diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/AvroKafkaSource.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/AvroKafkaSource.java index 2e4caa08b..84c6fd815 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/AvroKafkaSource.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/AvroKafkaSource.java @@ -65,12 +65,12 @@ public class AvroKafkaSource extends AvroSource { SchemaProvider schemaProvider, HoodieDeltaStreamerMetrics metrics) { super(props, sparkContext, sparkSession, schemaProvider); - props.put(NATIVE_KAFKA_KEY_DESERIALIZER_PROP, StringDeserializer.class); + props.put(NATIVE_KAFKA_KEY_DESERIALIZER_PROP, StringDeserializer.class.getName()); deserializerClassName = props.getString(DataSourceWriteOptions.KAFKA_AVRO_VALUE_DESERIALIZER_CLASS().key(), DataSourceWriteOptions.KAFKA_AVRO_VALUE_DESERIALIZER_CLASS().defaultValue()); try { - props.put(NATIVE_KAFKA_VALUE_DESERIALIZER_PROP, Class.forName(deserializerClassName)); + props.put(NATIVE_KAFKA_VALUE_DESERIALIZER_PROP, Class.forName(deserializerClassName).getName()); if (deserializerClassName.equals(KafkaAvroSchemaDeserializer.class.getName())) { if (schemaProvider == null) { throw new HoodieIOException("SchemaProvider has to be set to use KafkaAvroSchemaDeserializer"); diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/JsonKafkaSource.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/JsonKafkaSource.java index e8bd577db..d6152a177 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/JsonKafkaSource.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/JsonKafkaSource.java @@ -52,8 +52,8 @@ public class JsonKafkaSource extends JsonSource { SchemaProvider schemaProvider, HoodieDeltaStreamerMetrics metrics) { super(properties, sparkContext, sparkSession, schemaProvider); this.metrics = metrics; - properties.put("key.deserializer", StringDeserializer.class); - properties.put("value.deserializer", StringDeserializer.class); + properties.put("key.deserializer", StringDeserializer.class.getName()); + properties.put("value.deserializer", StringDeserializer.class.getName()); offsetGen = new KafkaOffsetGen(properties); } diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/debezium/DebeziumSource.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/debezium/DebeziumSource.java index 7018419c2..d9be692b5 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/debezium/DebeziumSource.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/debezium/DebeziumSource.java @@ -82,12 +82,12 @@ public abstract class DebeziumSource extends RowSource { HoodieDeltaStreamerMetrics metrics) { super(props, sparkContext, sparkSession, schemaProvider); - props.put(NATIVE_KAFKA_KEY_DESERIALIZER_PROP, StringDeserializer.class); + props.put(NATIVE_KAFKA_KEY_DESERIALIZER_PROP, StringDeserializer.class.getName()); deserializerClassName = props.getString(DataSourceWriteOptions.KAFKA_AVRO_VALUE_DESERIALIZER_CLASS().key(), DataSourceWriteOptions.KAFKA_AVRO_VALUE_DESERIALIZER_CLASS().defaultValue()); try { - props.put(NATIVE_KAFKA_VALUE_DESERIALIZER_PROP, Class.forName(deserializerClassName)); + props.put(NATIVE_KAFKA_VALUE_DESERIALIZER_PROP, Class.forName(deserializerClassName).getName()); } catch (ClassNotFoundException e) { String error = "Could not load custom avro kafka deserializer: " + deserializerClassName; LOG.error(error);