diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/JsonKafkaSource.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/JsonKafkaSource.java index 200b6450e..e1a540f82 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/JsonKafkaSource.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/JsonKafkaSource.java @@ -20,6 +20,7 @@ package org.apache.hudi.utilities.sources; import org.apache.hudi.common.config.TypedProperties; import org.apache.hudi.common.util.Option; +import org.apache.hudi.common.util.StringUtils; import org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamerMetrics; import org.apache.hudi.utilities.exception.HoodieSourceTimeoutException; import org.apache.hudi.utilities.schema.SchemaProvider; @@ -79,7 +80,7 @@ public class JsonKafkaSource extends JsonSource { offsetGen.getKafkaParams(), offsetRanges, LocationStrategies.PreferConsistent()) - .filter(x -> Objects.nonNull(x.value())) + .filter(x -> !StringUtils.isNullOrEmpty((String)x.value())) .map(x -> x.value().toString()); }