diff --git a/hudi-flink/src/main/java/org/apache/hudi/streamer/FlinkStreamerConfig.java b/hudi-flink/src/main/java/org/apache/hudi/streamer/FlinkStreamerConfig.java index 843e9bf8b..59534cf15 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/streamer/FlinkStreamerConfig.java +++ b/hudi-flink/src/main/java/org/apache/hudi/streamer/FlinkStreamerConfig.java @@ -21,6 +21,7 @@ package org.apache.hudi.streamer; import org.apache.hudi.client.utils.OperationConverter; import org.apache.hudi.common.model.OverwriteWithLatestAvroPayload; import org.apache.hudi.common.model.WriteOperationType; +import org.apache.hudi.common.util.StringUtils; import org.apache.hudi.configuration.FlinkOptions; import org.apache.hudi.keygen.constant.KeyGeneratorType; import org.apache.hudi.util.StreamerUtil; @@ -155,6 +156,11 @@ public class FlinkStreamerConfig extends Configuration { conf.setString(FlinkOptions.RECORD_KEY_FIELD, config.recordKeyField); conf.setString(FlinkOptions.PARTITION_PATH_FIELD, config.partitionPathField); conf.setString(FlinkOptions.KEYGEN_CLASS, config.keygenClass); + if (!StringUtils.isNullOrEmpty(config.keygenClass)) { + conf.setString(FlinkOptions.KEYGEN_CLASS, config.keygenClass); + } else { + conf.setString(FlinkOptions.KEYGEN_TYPE, config.keygenType); + } conf.setInteger(FlinkOptions.WRITE_TASKS, config.writeTaskNum); return conf;