[HUDI-3413]fix jackson parse error when empty message from JsonKafkaSource Using HoodieDeltaStreamer (#4794)
This commit is contained in:
@@ -20,6 +20,7 @@ package org.apache.hudi.utilities.sources;
|
|||||||
|
|
||||||
import org.apache.hudi.common.config.TypedProperties;
|
import org.apache.hudi.common.config.TypedProperties;
|
||||||
import org.apache.hudi.common.util.Option;
|
import org.apache.hudi.common.util.Option;
|
||||||
|
import org.apache.hudi.common.util.StringUtils;
|
||||||
import org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamerMetrics;
|
import org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamerMetrics;
|
||||||
import org.apache.hudi.utilities.exception.HoodieSourceTimeoutException;
|
import org.apache.hudi.utilities.exception.HoodieSourceTimeoutException;
|
||||||
import org.apache.hudi.utilities.schema.SchemaProvider;
|
import org.apache.hudi.utilities.schema.SchemaProvider;
|
||||||
@@ -79,7 +80,7 @@ public class JsonKafkaSource extends JsonSource {
|
|||||||
offsetGen.getKafkaParams(),
|
offsetGen.getKafkaParams(),
|
||||||
offsetRanges,
|
offsetRanges,
|
||||||
LocationStrategies.PreferConsistent())
|
LocationStrategies.PreferConsistent())
|
||||||
.filter(x -> Objects.nonNull(x.value()))
|
.filter(x -> !StringUtils.isNullOrEmpty((String)x.value()))
|
||||||
.map(x -> x.value().toString());
|
.map(x -> x.value().toString());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user