[HUDI-2320] Add support ByteArrayDeserializer in AvroKafkaSource (#3502)
This commit is contained in:
@@ -254,7 +254,7 @@
|
||||
<dependency>
|
||||
<groupId>com.twitter</groupId>
|
||||
<artifactId>bijection-avro_${scala.binary.version}</artifactId>
|
||||
<version>0.9.3</version>
|
||||
<version>0.9.7</version>
|
||||
</dependency>
|
||||
|
||||
<!-- Kafka -->
|
||||
|
||||
@@ -26,11 +26,13 @@ import org.apache.hudi.exception.HoodieIOException;
|
||||
import org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamerMetrics;
|
||||
import org.apache.hudi.utilities.deser.KafkaAvroSchemaDeserializer;
|
||||
import org.apache.hudi.utilities.schema.SchemaProvider;
|
||||
import org.apache.hudi.utilities.sources.helpers.AvroConvertor;
|
||||
import org.apache.hudi.utilities.sources.helpers.KafkaOffsetGen;
|
||||
import org.apache.hudi.utilities.sources.helpers.KafkaOffsetGen.CheckpointUtils;
|
||||
|
||||
import org.apache.avro.generic.GenericRecord;
|
||||
import org.apache.kafka.common.serialization.StringDeserializer;
|
||||
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
|
||||
import org.apache.log4j.LogManager;
|
||||
import org.apache.log4j.Logger;
|
||||
import org.apache.spark.api.java.JavaRDD;
|
||||
@@ -55,13 +57,15 @@ public class AvroKafkaSource extends AvroSource {
|
||||
|
||||
private final KafkaOffsetGen offsetGen;
|
||||
private final HoodieDeltaStreamerMetrics metrics;
|
||||
private final SchemaProvider schemaProvider;
|
||||
private final String deserializerClassName;
|
||||
|
||||
public AvroKafkaSource(TypedProperties props, JavaSparkContext sparkContext, SparkSession sparkSession,
|
||||
SchemaProvider schemaProvider, HoodieDeltaStreamerMetrics metrics) {
|
||||
super(props, sparkContext, sparkSession, schemaProvider);
|
||||
|
||||
props.put(NATIVE_KAFKA_KEY_DESERIALIZER_PROP, StringDeserializer.class);
|
||||
String deserializerClassName = props.getString(DataSourceWriteOptions.KAFKA_AVRO_VALUE_DESERIALIZER_CLASS().key(),
|
||||
deserializerClassName = props.getString(DataSourceWriteOptions.KAFKA_AVRO_VALUE_DESERIALIZER_CLASS().key(),
|
||||
DataSourceWriteOptions.KAFKA_AVRO_VALUE_DESERIALIZER_CLASS().defaultValue());
|
||||
|
||||
try {
|
||||
@@ -78,6 +82,7 @@ public class AvroKafkaSource extends AvroSource {
|
||||
throw new HoodieException(error, e);
|
||||
}
|
||||
|
||||
this.schemaProvider = schemaProvider;
|
||||
this.metrics = metrics;
|
||||
offsetGen = new KafkaOffsetGen(props);
|
||||
}
|
||||
@@ -91,12 +96,21 @@ public class AvroKafkaSource extends AvroSource {
|
||||
return new InputBatch<>(Option.empty(), CheckpointUtils.offsetsToStr(offsetRanges));
|
||||
}
|
||||
JavaRDD<GenericRecord> newDataRDD = toRDD(offsetRanges);
|
||||
return new InputBatch<>(Option.of(newDataRDD), KafkaOffsetGen.CheckpointUtils.offsetsToStr(offsetRanges));
|
||||
return new InputBatch<>(Option.of(newDataRDD), CheckpointUtils.offsetsToStr(offsetRanges));
|
||||
}
|
||||
|
||||
private JavaRDD<GenericRecord> toRDD(OffsetRange[] offsetRanges) {
|
||||
return KafkaUtils.createRDD(sparkContext, offsetGen.getKafkaParams(), offsetRanges,
|
||||
LocationStrategies.PreferConsistent()).map(obj -> (GenericRecord) obj.value());
|
||||
if (deserializerClassName.equals(ByteArrayDeserializer.class.getName())) {
|
||||
if (schemaProvider == null) {
|
||||
throw new HoodieException("Please provide a valid schema provider class when use ByteArrayDeserializer!");
|
||||
}
|
||||
AvroConvertor convertor = new AvroConvertor(schemaProvider.getSourceSchema());
|
||||
return KafkaUtils.<String, byte[]>createRDD(sparkContext, offsetGen.getKafkaParams(), offsetRanges,
|
||||
LocationStrategies.PreferConsistent()).map(obj -> convertor.fromAvroBinary(obj.value()));
|
||||
} else {
|
||||
return KafkaUtils.createRDD(sparkContext, offsetGen.getKafkaParams(), offsetRanges,
|
||||
LocationStrategies.PreferConsistent()).map(obj -> (GenericRecord) obj.value());
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
|
||||
Reference in New Issue
Block a user