diff --git a/hudi-utilities/pom.xml b/hudi-utilities/pom.xml index 4dcc96631..089b7801b 100644 --- a/hudi-utilities/pom.xml +++ b/hudi-utilities/pom.xml @@ -254,7 +254,7 @@ com.twitter bijection-avro_${scala.binary.version} - 0.9.3 + 0.9.7 diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/AvroKafkaSource.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/AvroKafkaSource.java index 500c41271..ff8ea5a7a 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/AvroKafkaSource.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/AvroKafkaSource.java @@ -26,11 +26,13 @@ import org.apache.hudi.exception.HoodieIOException; import org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamerMetrics; import org.apache.hudi.utilities.deser.KafkaAvroSchemaDeserializer; import org.apache.hudi.utilities.schema.SchemaProvider; +import org.apache.hudi.utilities.sources.helpers.AvroConvertor; import org.apache.hudi.utilities.sources.helpers.KafkaOffsetGen; import org.apache.hudi.utilities.sources.helpers.KafkaOffsetGen.CheckpointUtils; import org.apache.avro.generic.GenericRecord; import org.apache.kafka.common.serialization.StringDeserializer; +import org.apache.kafka.common.serialization.ByteArrayDeserializer; import org.apache.log4j.LogManager; import org.apache.log4j.Logger; import org.apache.spark.api.java.JavaRDD; @@ -55,13 +57,15 @@ public class AvroKafkaSource extends AvroSource { private final KafkaOffsetGen offsetGen; private final HoodieDeltaStreamerMetrics metrics; + private final SchemaProvider schemaProvider; + private final String deserializerClassName; public AvroKafkaSource(TypedProperties props, JavaSparkContext sparkContext, SparkSession sparkSession, SchemaProvider schemaProvider, HoodieDeltaStreamerMetrics metrics) { super(props, sparkContext, sparkSession, schemaProvider); props.put(NATIVE_KAFKA_KEY_DESERIALIZER_PROP, StringDeserializer.class); - String deserializerClassName = props.getString(DataSourceWriteOptions.KAFKA_AVRO_VALUE_DESERIALIZER_CLASS().key(), + deserializerClassName = props.getString(DataSourceWriteOptions.KAFKA_AVRO_VALUE_DESERIALIZER_CLASS().key(), DataSourceWriteOptions.KAFKA_AVRO_VALUE_DESERIALIZER_CLASS().defaultValue()); try { @@ -78,6 +82,7 @@ public class AvroKafkaSource extends AvroSource { throw new HoodieException(error, e); } + this.schemaProvider = schemaProvider; this.metrics = metrics; offsetGen = new KafkaOffsetGen(props); } @@ -91,12 +96,21 @@ public class AvroKafkaSource extends AvroSource { return new InputBatch<>(Option.empty(), CheckpointUtils.offsetsToStr(offsetRanges)); } JavaRDD newDataRDD = toRDD(offsetRanges); - return new InputBatch<>(Option.of(newDataRDD), KafkaOffsetGen.CheckpointUtils.offsetsToStr(offsetRanges)); + return new InputBatch<>(Option.of(newDataRDD), CheckpointUtils.offsetsToStr(offsetRanges)); } private JavaRDD toRDD(OffsetRange[] offsetRanges) { - return KafkaUtils.createRDD(sparkContext, offsetGen.getKafkaParams(), offsetRanges, - LocationStrategies.PreferConsistent()).map(obj -> (GenericRecord) obj.value()); + if (deserializerClassName.equals(ByteArrayDeserializer.class.getName())) { + if (schemaProvider == null) { + throw new HoodieException("Please provide a valid schema provider class when use ByteArrayDeserializer!"); + } + AvroConvertor convertor = new AvroConvertor(schemaProvider.getSourceSchema()); + return KafkaUtils.createRDD(sparkContext, offsetGen.getKafkaParams(), offsetRanges, + LocationStrategies.PreferConsistent()).map(obj -> convertor.fromAvroBinary(obj.value())); + } else { + return KafkaUtils.createRDD(sparkContext, offsetGen.getKafkaParams(), offsetRanges, + LocationStrategies.PreferConsistent()).map(obj -> (GenericRecord) obj.value()); + } } @Override