[HUDI-4260] Change KEYGEN_CLASS_NAME without default value (#5877)
* Change KEYGEN_CLASS_NAME without default value Co-authored-by: 854194341@qq.com <loukey_7821>
This commit is contained in:
@@ -363,7 +363,7 @@ public class FlinkOptions extends HoodieConfig {
|
|||||||
public static final ConfigOption<String> KEYGEN_CLASS_NAME = ConfigOptions
|
public static final ConfigOption<String> KEYGEN_CLASS_NAME = ConfigOptions
|
||||||
.key(HoodieWriteConfig.KEYGENERATOR_CLASS_NAME.key())
|
.key(HoodieWriteConfig.KEYGENERATOR_CLASS_NAME.key())
|
||||||
.stringType()
|
.stringType()
|
||||||
.defaultValue("")
|
.noDefaultValue()
|
||||||
.withDescription("Key generator class, that implements will extract the key out of incoming record");
|
.withDescription("Key generator class, that implements will extract the key out of incoming record");
|
||||||
|
|
||||||
public static final ConfigOption<String> KEYGEN_TYPE = ConfigOptions
|
public static final ConfigOption<String> KEYGEN_TYPE = ConfigOptions
|
||||||
|
|||||||
@@ -116,7 +116,7 @@ public class RowDataKeyGen implements Serializable {
|
|||||||
|
|
||||||
public static RowDataKeyGen instance(Configuration conf, RowType rowType) {
|
public static RowDataKeyGen instance(Configuration conf, RowType rowType) {
|
||||||
Option<TimestampBasedAvroKeyGenerator> keyGeneratorOpt = Option.empty();
|
Option<TimestampBasedAvroKeyGenerator> keyGeneratorOpt = Option.empty();
|
||||||
if (conf.getString(FlinkOptions.KEYGEN_CLASS_NAME).equals(TimestampBasedAvroKeyGenerator.class.getName())) {
|
if (TimestampBasedAvroKeyGenerator.class.getName().equals(conf.getString(FlinkOptions.KEYGEN_CLASS_NAME))) {
|
||||||
try {
|
try {
|
||||||
keyGeneratorOpt = Option.of(new TimestampBasedAvroKeyGenerator(StreamerUtil.flinkConf2TypedProperties(conf)));
|
keyGeneratorOpt = Option.of(new TimestampBasedAvroKeyGenerator(StreamerUtil.flinkConf2TypedProperties(conf)));
|
||||||
} catch (IOException e) {
|
} catch (IOException e) {
|
||||||
|
|||||||
@@ -189,7 +189,7 @@ public class TestHoodieTableFactory {
|
|||||||
assertThat(conf1.get(FlinkOptions.KEYGEN_CLASS_NAME), is("dummyKeyGenClass"));
|
assertThat(conf1.get(FlinkOptions.KEYGEN_CLASS_NAME), is("dummyKeyGenClass"));
|
||||||
|
|
||||||
// definition with complex primary keys and partition paths
|
// definition with complex primary keys and partition paths
|
||||||
this.conf.setString(FlinkOptions.KEYGEN_CLASS_NAME, FlinkOptions.KEYGEN_CLASS_NAME.defaultValue());
|
this.conf.removeConfig(FlinkOptions.KEYGEN_CLASS_NAME);
|
||||||
ResolvedSchema schema2 = SchemaBuilder.instance()
|
ResolvedSchema schema2 = SchemaBuilder.instance()
|
||||||
.field("f0", DataTypes.INT().notNull())
|
.field("f0", DataTypes.INT().notNull())
|
||||||
.field("f1", DataTypes.VARCHAR(20).notNull())
|
.field("f1", DataTypes.VARCHAR(20).notNull())
|
||||||
@@ -204,7 +204,7 @@ public class TestHoodieTableFactory {
|
|||||||
assertThat(conf2.get(FlinkOptions.KEYGEN_CLASS_NAME), is(ComplexAvroKeyGenerator.class.getName()));
|
assertThat(conf2.get(FlinkOptions.KEYGEN_CLASS_NAME), is(ComplexAvroKeyGenerator.class.getName()));
|
||||||
|
|
||||||
// definition with complex primary keys and empty partition paths
|
// definition with complex primary keys and empty partition paths
|
||||||
this.conf.setString(FlinkOptions.KEYGEN_CLASS_NAME, FlinkOptions.KEYGEN_CLASS_NAME.defaultValue());
|
this.conf.removeConfig(FlinkOptions.KEYGEN_CLASS_NAME);
|
||||||
final MockContext sourceContext3 = MockContext.getInstance(this.conf, schema2, "");
|
final MockContext sourceContext3 = MockContext.getInstance(this.conf, schema2, "");
|
||||||
final HoodieTableSource tableSource3 = (HoodieTableSource) new HoodieTableFactory().createDynamicTableSource(sourceContext3);
|
final HoodieTableSource tableSource3 = (HoodieTableSource) new HoodieTableFactory().createDynamicTableSource(sourceContext3);
|
||||||
final Configuration conf3 = tableSource3.getConf();
|
final Configuration conf3 = tableSource3.getConf();
|
||||||
@@ -328,7 +328,7 @@ public class TestHoodieTableFactory {
|
|||||||
assertThat(conf1.get(FlinkOptions.KEYGEN_CLASS_NAME), is("dummyKeyGenClass"));
|
assertThat(conf1.get(FlinkOptions.KEYGEN_CLASS_NAME), is("dummyKeyGenClass"));
|
||||||
|
|
||||||
// definition with complex primary keys and partition paths
|
// definition with complex primary keys and partition paths
|
||||||
this.conf.setString(FlinkOptions.KEYGEN_CLASS_NAME, FlinkOptions.KEYGEN_CLASS_NAME.defaultValue());
|
this.conf.removeConfig(FlinkOptions.KEYGEN_CLASS_NAME);
|
||||||
ResolvedSchema schema2 = SchemaBuilder.instance()
|
ResolvedSchema schema2 = SchemaBuilder.instance()
|
||||||
.field("f0", DataTypes.INT().notNull())
|
.field("f0", DataTypes.INT().notNull())
|
||||||
.field("f1", DataTypes.VARCHAR(20).notNull())
|
.field("f1", DataTypes.VARCHAR(20).notNull())
|
||||||
@@ -343,7 +343,7 @@ public class TestHoodieTableFactory {
|
|||||||
assertThat(conf2.get(FlinkOptions.KEYGEN_CLASS_NAME), is(ComplexAvroKeyGenerator.class.getName()));
|
assertThat(conf2.get(FlinkOptions.KEYGEN_CLASS_NAME), is(ComplexAvroKeyGenerator.class.getName()));
|
||||||
|
|
||||||
// definition with complex primary keys and empty partition paths
|
// definition with complex primary keys and empty partition paths
|
||||||
this.conf.setString(FlinkOptions.KEYGEN_CLASS_NAME, FlinkOptions.KEYGEN_CLASS_NAME.defaultValue());
|
this.conf.removeConfig(FlinkOptions.KEYGEN_CLASS_NAME);
|
||||||
final MockContext sinkContext3 = MockContext.getInstance(this.conf, schema2, "");
|
final MockContext sinkContext3 = MockContext.getInstance(this.conf, schema2, "");
|
||||||
final HoodieTableSink tableSink3 = (HoodieTableSink) new HoodieTableFactory().createDynamicTableSink(sinkContext3);
|
final HoodieTableSink tableSink3 = (HoodieTableSink) new HoodieTableFactory().createDynamicTableSink(sinkContext3);
|
||||||
final Configuration conf3 = tableSink3.getConf();
|
final Configuration conf3 = tableSink3.getConf();
|
||||||
|
|||||||
Reference in New Issue
Block a user