From 9518f786102566398fa20001533d5d86f68deb5b Mon Sep 17 00:00:00 2001 From: zhangxiang17 Date: Sat, 12 Feb 2022 15:37:29 +0800 Subject: [PATCH] [HUDI-3413]fix jackson parse error when empty message from JsonKafkaSource Using HoodieDeltaStreamer (#4794) --- .../org/apache/hudi/utilities/sources/JsonKafkaSource.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/JsonKafkaSource.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/JsonKafkaSource.java index 200b6450e..e1a540f82 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/JsonKafkaSource.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/JsonKafkaSource.java @@ -20,6 +20,7 @@ package org.apache.hudi.utilities.sources; import org.apache.hudi.common.config.TypedProperties; import org.apache.hudi.common.util.Option; +import org.apache.hudi.common.util.StringUtils; import org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamerMetrics; import org.apache.hudi.utilities.exception.HoodieSourceTimeoutException; import org.apache.hudi.utilities.schema.SchemaProvider; @@ -79,7 +80,7 @@ public class JsonKafkaSource extends JsonSource { offsetGen.getKafkaParams(), offsetRanges, LocationStrategies.PreferConsistent()) - .filter(x -> Objects.nonNull(x.value())) + .filter(x -> !StringUtils.isNullOrEmpty((String)x.value())) .map(x -> x.value().toString()); }