From 59978ef4a91fab67c4bb028f13de54fe8234a40d Mon Sep 17 00:00:00 2001 From: luokey <854194341@qq.com> Date: Fri, 24 Jun 2022 15:05:03 +0800 Subject: [PATCH] [HUDI-4260] Change KEYGEN_CLASS_NAME without default value (#5877) * Change KEYGEN_CLASS_NAME without default value Co-authored-by: 854194341@qq.com --- .../java/org/apache/hudi/configuration/FlinkOptions.java | 2 +- .../java/org/apache/hudi/sink/bulk/RowDataKeyGen.java | 2 +- .../org/apache/hudi/table/TestHoodieTableFactory.java | 8 ++++---- 3 files changed, 6 insertions(+), 6 deletions(-) diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java index f34c67351..a436963bf 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java @@ -363,7 +363,7 @@ public class FlinkOptions extends HoodieConfig { public static final ConfigOption KEYGEN_CLASS_NAME = ConfigOptions .key(HoodieWriteConfig.KEYGENERATOR_CLASS_NAME.key()) .stringType() - .defaultValue("") + .noDefaultValue() .withDescription("Key generator class, that implements will extract the key out of incoming record"); public static final ConfigOption KEYGEN_TYPE = ConfigOptions diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bulk/RowDataKeyGen.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bulk/RowDataKeyGen.java index 3f84b2799..dd0cd7403 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bulk/RowDataKeyGen.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bulk/RowDataKeyGen.java @@ -116,7 +116,7 @@ public class RowDataKeyGen implements Serializable { public static RowDataKeyGen instance(Configuration conf, RowType rowType) { Option keyGeneratorOpt = Option.empty(); - if (conf.getString(FlinkOptions.KEYGEN_CLASS_NAME).equals(TimestampBasedAvroKeyGenerator.class.getName())) { + if (TimestampBasedAvroKeyGenerator.class.getName().equals(conf.getString(FlinkOptions.KEYGEN_CLASS_NAME))) { try { keyGeneratorOpt = Option.of(new TimestampBasedAvroKeyGenerator(StreamerUtil.flinkConf2TypedProperties(conf))); } catch (IOException e) { diff --git a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/TestHoodieTableFactory.java b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/TestHoodieTableFactory.java index efd365064..0a7a62204 100644 --- a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/TestHoodieTableFactory.java +++ b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/TestHoodieTableFactory.java @@ -189,7 +189,7 @@ public class TestHoodieTableFactory { assertThat(conf1.get(FlinkOptions.KEYGEN_CLASS_NAME), is("dummyKeyGenClass")); // definition with complex primary keys and partition paths - this.conf.setString(FlinkOptions.KEYGEN_CLASS_NAME, FlinkOptions.KEYGEN_CLASS_NAME.defaultValue()); + this.conf.removeConfig(FlinkOptions.KEYGEN_CLASS_NAME); ResolvedSchema schema2 = SchemaBuilder.instance() .field("f0", DataTypes.INT().notNull()) .field("f1", DataTypes.VARCHAR(20).notNull()) @@ -204,7 +204,7 @@ public class TestHoodieTableFactory { assertThat(conf2.get(FlinkOptions.KEYGEN_CLASS_NAME), is(ComplexAvroKeyGenerator.class.getName())); // definition with complex primary keys and empty partition paths - this.conf.setString(FlinkOptions.KEYGEN_CLASS_NAME, FlinkOptions.KEYGEN_CLASS_NAME.defaultValue()); + this.conf.removeConfig(FlinkOptions.KEYGEN_CLASS_NAME); final MockContext sourceContext3 = MockContext.getInstance(this.conf, schema2, ""); final HoodieTableSource tableSource3 = (HoodieTableSource) new HoodieTableFactory().createDynamicTableSource(sourceContext3); final Configuration conf3 = tableSource3.getConf(); @@ -328,7 +328,7 @@ public class TestHoodieTableFactory { assertThat(conf1.get(FlinkOptions.KEYGEN_CLASS_NAME), is("dummyKeyGenClass")); // definition with complex primary keys and partition paths - this.conf.setString(FlinkOptions.KEYGEN_CLASS_NAME, FlinkOptions.KEYGEN_CLASS_NAME.defaultValue()); + this.conf.removeConfig(FlinkOptions.KEYGEN_CLASS_NAME); ResolvedSchema schema2 = SchemaBuilder.instance() .field("f0", DataTypes.INT().notNull()) .field("f1", DataTypes.VARCHAR(20).notNull()) @@ -343,7 +343,7 @@ public class TestHoodieTableFactory { assertThat(conf2.get(FlinkOptions.KEYGEN_CLASS_NAME), is(ComplexAvroKeyGenerator.class.getName())); // definition with complex primary keys and empty partition paths - this.conf.setString(FlinkOptions.KEYGEN_CLASS_NAME, FlinkOptions.KEYGEN_CLASS_NAME.defaultValue()); + this.conf.removeConfig(FlinkOptions.KEYGEN_CLASS_NAME); final MockContext sinkContext3 = MockContext.getInstance(this.conf, schema2, ""); final HoodieTableSink tableSink3 = (HoodieTableSink) new HoodieTableFactory().createDynamicTableSink(sinkContext3); final Configuration conf3 = tableSink3.getConf();