From 67c31243522268002c8b1bf5285dfe3c07cf5783 Mon Sep 17 00:00:00 2001 From: vinoyang Date: Thu, 17 Jun 2021 21:22:13 +0800 Subject: [PATCH] [HUDI-2032] Make keygen class and keygen type optional for FlinkStreamerConfig (#3104) * [HUDI-2032] Make keygen class and keygen type optional for FlinkStreamerConfig * Address the review suggestion --- .../java/org/apache/hudi/streamer/FlinkStreamerConfig.java | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/hudi-flink/src/main/java/org/apache/hudi/streamer/FlinkStreamerConfig.java b/hudi-flink/src/main/java/org/apache/hudi/streamer/FlinkStreamerConfig.java index 843e9bf8b..59534cf15 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/streamer/FlinkStreamerConfig.java +++ b/hudi-flink/src/main/java/org/apache/hudi/streamer/FlinkStreamerConfig.java @@ -21,6 +21,7 @@ package org.apache.hudi.streamer; import org.apache.hudi.client.utils.OperationConverter; import org.apache.hudi.common.model.OverwriteWithLatestAvroPayload; import org.apache.hudi.common.model.WriteOperationType; +import org.apache.hudi.common.util.StringUtils; import org.apache.hudi.configuration.FlinkOptions; import org.apache.hudi.keygen.constant.KeyGeneratorType; import org.apache.hudi.util.StreamerUtil; @@ -155,6 +156,11 @@ public class FlinkStreamerConfig extends Configuration { conf.setString(FlinkOptions.RECORD_KEY_FIELD, config.recordKeyField); conf.setString(FlinkOptions.PARTITION_PATH_FIELD, config.partitionPathField); conf.setString(FlinkOptions.KEYGEN_CLASS, config.keygenClass); + if (!StringUtils.isNullOrEmpty(config.keygenClass)) { + conf.setString(FlinkOptions.KEYGEN_CLASS, config.keygenClass); + } else { + conf.setString(FlinkOptions.KEYGEN_TYPE, config.keygenType); + } conf.setInteger(FlinkOptions.WRITE_TASKS, config.writeTaskNum); return conf;