1
0

[MINOR] Minor improvement in JsonkafkaSource (#4620)

This commit is contained in:
wangxianghu
2022-01-18 11:13:05 +04:00
committed by GitHub
parent f18447406d
commit 3d93e857cc

View File

@@ -35,6 +35,8 @@ import org.apache.spark.streaming.kafka010.KafkaUtils;
import org.apache.spark.streaming.kafka010.LocationStrategies; import org.apache.spark.streaming.kafka010.LocationStrategies;
import org.apache.spark.streaming.kafka010.OffsetRange; import org.apache.spark.streaming.kafka010.OffsetRange;
import java.util.Objects;
/** /**
* Read json kafka data. * Read json kafka data.
*/ */
@@ -68,17 +70,18 @@ public class JsonKafkaSource extends JsonSource {
} }
private JavaRDD<String> toRDD(OffsetRange[] offsetRanges) { private JavaRDD<String> toRDD(OffsetRange[] offsetRanges) {
return KafkaUtils.createRDD(sparkContext, offsetGen.getKafkaParams(), offsetRanges, return KafkaUtils.createRDD(sparkContext,
LocationStrategies.PreferConsistent()).filter(x -> { offsetGen.getKafkaParams(),
String msgValue = (String) x.value(); offsetRanges,
//Filter null messages from Kafka to prevent Exceptions LocationStrategies.PreferConsistent())
return msgValue != null; .filter(x -> Objects.nonNull(x.value()))
}).map(x -> (String) x.value()); .map(x -> x.value().toString());
} }
@Override @Override
public void onCommit(String lastCkptStr) { public void onCommit(String lastCkptStr) {
if (this.props.getBoolean(KafkaOffsetGen.Config.ENABLE_KAFKA_COMMIT_OFFSET.key(), KafkaOffsetGen.Config.ENABLE_KAFKA_COMMIT_OFFSET.defaultValue())) { if (this.props.getBoolean(KafkaOffsetGen.Config.ENABLE_KAFKA_COMMIT_OFFSET.key(),
KafkaOffsetGen.Config.ENABLE_KAFKA_COMMIT_OFFSET.defaultValue())) {
offsetGen.commitOffsetToKafka(lastCkptStr); offsetGen.commitOffsetToKafka(lastCkptStr);
} }
} }