[HUDI-2487] Fix JsonKafkaSource cannot filter empty messages from kafka (#3715)
This commit is contained in:
@@ -69,7 +69,11 @@ public class JsonKafkaSource extends JsonSource {
|
|||||||
|
|
||||||
private JavaRDD<String> toRDD(OffsetRange[] offsetRanges) {
|
private JavaRDD<String> toRDD(OffsetRange[] offsetRanges) {
|
||||||
return KafkaUtils.createRDD(sparkContext, offsetGen.getKafkaParams(), offsetRanges,
|
return KafkaUtils.createRDD(sparkContext, offsetGen.getKafkaParams(), offsetRanges,
|
||||||
LocationStrategies.PreferConsistent()).map(x -> (String) x.value());
|
LocationStrategies.PreferConsistent()).filter(x -> {
|
||||||
|
String msgValue = (String) x.value();
|
||||||
|
//Filter null messages from Kafka to prevent Exceptions
|
||||||
|
return msgValue != null;
|
||||||
|
}).map(x -> (String) x.value());
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
|||||||
@@ -151,6 +151,28 @@ public class TestJsonKafkaSource extends UtilitiesTestBase {
|
|||||||
assertEquals(Option.empty(), fetch4AsRows.getBatch());
|
assertEquals(Option.empty(), fetch4AsRows.getBatch());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// test whether empty messages can be filtered
|
||||||
|
@Test
|
||||||
|
public void testJsonKafkaSourceFilterNullMsg() {
|
||||||
|
// topic setup.
|
||||||
|
testUtils.createTopic(TEST_TOPIC_NAME, 2);
|
||||||
|
HoodieTestDataGenerator dataGenerator = new HoodieTestDataGenerator();
|
||||||
|
TypedProperties props = createPropsForJsonSource(null, "earliest");
|
||||||
|
|
||||||
|
Source jsonSource = new JsonKafkaSource(props, jsc, sparkSession, schemaProvider, metrics);
|
||||||
|
SourceFormatAdapter kafkaSource = new SourceFormatAdapter(jsonSource);
|
||||||
|
|
||||||
|
// 1. Extract without any checkpoint => get all the data, respecting sourceLimit
|
||||||
|
assertEquals(Option.empty(), kafkaSource.fetchNewDataInAvroFormat(Option.empty(), Long.MAX_VALUE).getBatch());
|
||||||
|
// Send 1000 non-null messages to Kafka
|
||||||
|
testUtils.sendMessages(TEST_TOPIC_NAME, Helpers.jsonifyRecords(dataGenerator.generateInserts("000", 1000)));
|
||||||
|
// Send 100 null messages to Kafka
|
||||||
|
testUtils.sendMessages(TEST_TOPIC_NAME,new String[100]);
|
||||||
|
InputBatch<JavaRDD<GenericRecord>> fetch1 = kafkaSource.fetchNewDataInAvroFormat(Option.empty(), Long.MAX_VALUE);
|
||||||
|
// Verify that messages with null values are filtered
|
||||||
|
assertEquals(1000, fetch1.getBatch().get().count());
|
||||||
|
}
|
||||||
|
|
||||||
// test case with kafka offset reset strategy
|
// test case with kafka offset reset strategy
|
||||||
@Test
|
@Test
|
||||||
public void testJsonKafkaSourceResetStrategy() {
|
public void testJsonKafkaSourceResetStrategy() {
|
||||||
|
|||||||
Reference in New Issue
Block a user