diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/JsonKafkaSource.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/JsonKafkaSource.java index 39340d098..3dfc61100 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/JsonKafkaSource.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/JsonKafkaSource.java @@ -35,6 +35,8 @@ import org.apache.spark.streaming.kafka010.KafkaUtils; import org.apache.spark.streaming.kafka010.LocationStrategies; import org.apache.spark.streaming.kafka010.OffsetRange; +import java.util.Objects; + /** * Read json kafka data. */ @@ -68,17 +70,18 @@ public class JsonKafkaSource extends JsonSource { } private JavaRDD toRDD(OffsetRange[] offsetRanges) { - return KafkaUtils.createRDD(sparkContext, offsetGen.getKafkaParams(), offsetRanges, - LocationStrategies.PreferConsistent()).filter(x -> { - String msgValue = (String) x.value(); - //Filter null messages from Kafka to prevent Exceptions - return msgValue != null; - }).map(x -> (String) x.value()); + return KafkaUtils.createRDD(sparkContext, + offsetGen.getKafkaParams(), + offsetRanges, + LocationStrategies.PreferConsistent()) + .filter(x -> Objects.nonNull(x.value())) + .map(x -> x.value().toString()); } @Override public void onCommit(String lastCkptStr) { - if (this.props.getBoolean(KafkaOffsetGen.Config.ENABLE_KAFKA_COMMIT_OFFSET.key(), KafkaOffsetGen.Config.ENABLE_KAFKA_COMMIT_OFFSET.defaultValue())) { + if (this.props.getBoolean(KafkaOffsetGen.Config.ENABLE_KAFKA_COMMIT_OFFSET.key(), + KafkaOffsetGen.Config.ENABLE_KAFKA_COMMIT_OFFSET.defaultValue())) { offsetGen.commitOffsetToKafka(lastCkptStr); } }