[HUDI-3521] Fixing kakfa key and value serializer value type from class to string (#4919)
This commit is contained in:
committed by
GitHub
parent
c77b2591d0
commit
2f99e8458a
@@ -65,12 +65,12 @@ public class AvroKafkaSource extends AvroSource {
|
|||||||
SchemaProvider schemaProvider, HoodieDeltaStreamerMetrics metrics) {
|
SchemaProvider schemaProvider, HoodieDeltaStreamerMetrics metrics) {
|
||||||
super(props, sparkContext, sparkSession, schemaProvider);
|
super(props, sparkContext, sparkSession, schemaProvider);
|
||||||
|
|
||||||
props.put(NATIVE_KAFKA_KEY_DESERIALIZER_PROP, StringDeserializer.class);
|
props.put(NATIVE_KAFKA_KEY_DESERIALIZER_PROP, StringDeserializer.class.getName());
|
||||||
deserializerClassName = props.getString(DataSourceWriteOptions.KAFKA_AVRO_VALUE_DESERIALIZER_CLASS().key(),
|
deserializerClassName = props.getString(DataSourceWriteOptions.KAFKA_AVRO_VALUE_DESERIALIZER_CLASS().key(),
|
||||||
DataSourceWriteOptions.KAFKA_AVRO_VALUE_DESERIALIZER_CLASS().defaultValue());
|
DataSourceWriteOptions.KAFKA_AVRO_VALUE_DESERIALIZER_CLASS().defaultValue());
|
||||||
|
|
||||||
try {
|
try {
|
||||||
props.put(NATIVE_KAFKA_VALUE_DESERIALIZER_PROP, Class.forName(deserializerClassName));
|
props.put(NATIVE_KAFKA_VALUE_DESERIALIZER_PROP, Class.forName(deserializerClassName).getName());
|
||||||
if (deserializerClassName.equals(KafkaAvroSchemaDeserializer.class.getName())) {
|
if (deserializerClassName.equals(KafkaAvroSchemaDeserializer.class.getName())) {
|
||||||
if (schemaProvider == null) {
|
if (schemaProvider == null) {
|
||||||
throw new HoodieIOException("SchemaProvider has to be set to use KafkaAvroSchemaDeserializer");
|
throw new HoodieIOException("SchemaProvider has to be set to use KafkaAvroSchemaDeserializer");
|
||||||
|
|||||||
@@ -52,8 +52,8 @@ public class JsonKafkaSource extends JsonSource {
|
|||||||
SchemaProvider schemaProvider, HoodieDeltaStreamerMetrics metrics) {
|
SchemaProvider schemaProvider, HoodieDeltaStreamerMetrics metrics) {
|
||||||
super(properties, sparkContext, sparkSession, schemaProvider);
|
super(properties, sparkContext, sparkSession, schemaProvider);
|
||||||
this.metrics = metrics;
|
this.metrics = metrics;
|
||||||
properties.put("key.deserializer", StringDeserializer.class);
|
properties.put("key.deserializer", StringDeserializer.class.getName());
|
||||||
properties.put("value.deserializer", StringDeserializer.class);
|
properties.put("value.deserializer", StringDeserializer.class.getName());
|
||||||
offsetGen = new KafkaOffsetGen(properties);
|
offsetGen = new KafkaOffsetGen(properties);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -82,12 +82,12 @@ public abstract class DebeziumSource extends RowSource {
|
|||||||
HoodieDeltaStreamerMetrics metrics) {
|
HoodieDeltaStreamerMetrics metrics) {
|
||||||
super(props, sparkContext, sparkSession, schemaProvider);
|
super(props, sparkContext, sparkSession, schemaProvider);
|
||||||
|
|
||||||
props.put(NATIVE_KAFKA_KEY_DESERIALIZER_PROP, StringDeserializer.class);
|
props.put(NATIVE_KAFKA_KEY_DESERIALIZER_PROP, StringDeserializer.class.getName());
|
||||||
deserializerClassName = props.getString(DataSourceWriteOptions.KAFKA_AVRO_VALUE_DESERIALIZER_CLASS().key(),
|
deserializerClassName = props.getString(DataSourceWriteOptions.KAFKA_AVRO_VALUE_DESERIALIZER_CLASS().key(),
|
||||||
DataSourceWriteOptions.KAFKA_AVRO_VALUE_DESERIALIZER_CLASS().defaultValue());
|
DataSourceWriteOptions.KAFKA_AVRO_VALUE_DESERIALIZER_CLASS().defaultValue());
|
||||||
|
|
||||||
try {
|
try {
|
||||||
props.put(NATIVE_KAFKA_VALUE_DESERIALIZER_PROP, Class.forName(deserializerClassName));
|
props.put(NATIVE_KAFKA_VALUE_DESERIALIZER_PROP, Class.forName(deserializerClassName).getName());
|
||||||
} catch (ClassNotFoundException e) {
|
} catch (ClassNotFoundException e) {
|
||||||
String error = "Could not load custom avro kafka deserializer: " + deserializerClassName;
|
String error = "Could not load custom avro kafka deserializer: " + deserializerClassName;
|
||||||
LOG.error(error);
|
LOG.error(error);
|
||||||
|
|||||||
Reference in New Issue
Block a user