diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/JsonKafkaSource.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/JsonKafkaSource.java index cf9e905bc..39340d098 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/JsonKafkaSource.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/JsonKafkaSource.java @@ -69,7 +69,11 @@ public class JsonKafkaSource extends JsonSource { private JavaRDD toRDD(OffsetRange[] offsetRanges) { return KafkaUtils.createRDD(sparkContext, offsetGen.getKafkaParams(), offsetRanges, - LocationStrategies.PreferConsistent()).map(x -> (String) x.value()); + LocationStrategies.PreferConsistent()).filter(x -> { + String msgValue = (String) x.value(); + //Filter null messages from Kafka to prevent Exceptions + return msgValue != null; + }).map(x -> (String) x.value()); } @Override diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestJsonKafkaSource.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestJsonKafkaSource.java index da11035c9..2ed4c4258 100644 --- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestJsonKafkaSource.java +++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestJsonKafkaSource.java @@ -151,6 +151,28 @@ public class TestJsonKafkaSource extends UtilitiesTestBase { assertEquals(Option.empty(), fetch4AsRows.getBatch()); } + // test whether empty messages can be filtered + @Test + public void testJsonKafkaSourceFilterNullMsg() { + // topic setup. + testUtils.createTopic(TEST_TOPIC_NAME, 2); + HoodieTestDataGenerator dataGenerator = new HoodieTestDataGenerator(); + TypedProperties props = createPropsForJsonSource(null, "earliest"); + + Source jsonSource = new JsonKafkaSource(props, jsc, sparkSession, schemaProvider, metrics); + SourceFormatAdapter kafkaSource = new SourceFormatAdapter(jsonSource); + + // 1. Extract without any checkpoint => get all the data, respecting sourceLimit + assertEquals(Option.empty(), kafkaSource.fetchNewDataInAvroFormat(Option.empty(), Long.MAX_VALUE).getBatch()); + // Send 1000 non-null messages to Kafka + testUtils.sendMessages(TEST_TOPIC_NAME, Helpers.jsonifyRecords(dataGenerator.generateInserts("000", 1000))); + // Send 100 null messages to Kafka + testUtils.sendMessages(TEST_TOPIC_NAME,new String[100]); + InputBatch> fetch1 = kafkaSource.fetchNewDataInAvroFormat(Option.empty(), Long.MAX_VALUE); + // Verify that messages with null values are filtered + assertEquals(1000, fetch1.getBatch().get().count()); + } + // test case with kafka offset reset strategy @Test public void testJsonKafkaSourceResetStrategy() {