diff --git a/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java b/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java index 17eee61d8..59192381d 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java +++ b/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java @@ -377,4 +377,12 @@ public class FlinkOptions { map.forEach(configuration::setString); return configuration; } + + /** + * Returns whether the given conf defines default value for the option {@code option}. + */ + public static boolean isDefaultValueDefined(Configuration conf, ConfigOption option) { + return !conf.getOptional(option).isPresent() + || conf.get(option).equals(option.defaultValue()); + } } diff --git a/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableFactory.java b/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableFactory.java index 36020f48c..a2dac3639 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableFactory.java +++ b/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableFactory.java @@ -19,11 +19,14 @@ package org.apache.hudi.table; import org.apache.hudi.configuration.FlinkOptions; +import org.apache.hudi.keygen.ComplexAvroKeyGenerator; import org.apache.hudi.util.AvroSchemaConverter; import org.apache.flink.configuration.Configuration; import org.apache.flink.table.api.TableSchema; import org.apache.flink.table.api.ValidationException; +import org.apache.flink.table.api.constraints.UniqueConstraint; +import org.apache.flink.table.catalog.CatalogTable; import org.apache.flink.table.data.RowData; import org.apache.flink.table.factories.FactoryUtil; import org.apache.flink.table.factories.TableSinkFactory; @@ -33,6 +36,8 @@ import org.apache.flink.table.sources.TableSource; import org.apache.flink.table.types.logical.LogicalType; import org.apache.flink.table.utils.TableSchemaUtils; import org.apache.hadoop.fs.Path; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.util.Collections; import java.util.HashMap; @@ -43,19 +48,19 @@ import java.util.Map; * Hoodie data source/sink factory. */ public class HoodieTableFactory implements TableSourceFactory, TableSinkFactory { + private static final Logger LOG = LoggerFactory.getLogger(HoodieTableFactory.class); + public static final String FACTORY_ID = "hudi"; @Override public TableSource createTableSource(TableSourceFactory.Context context) { Configuration conf = FlinkOptions.fromMap(context.getTable().getOptions()); - conf.setString(FlinkOptions.TABLE_NAME.key(), context.getObjectIdentifier().getObjectName()); - conf.setString(FlinkOptions.PARTITION_PATH_FIELD, String.join(",", context.getTable().getPartitionKeys())); + TableSchema schema = TableSchemaUtils.getPhysicalSchema(context.getTable().getSchema()); + setupConfOptions(conf, context.getObjectIdentifier().getObjectName(), context.getTable(), schema); Path path = new Path(conf.getOptional(FlinkOptions.PATH).orElseThrow(() -> new ValidationException("Option [path] should be not empty."))); - TableSchema tableSchema = TableSchemaUtils.getPhysicalSchema(context.getTable().getSchema()); - inferAvroSchema(conf, tableSchema.toRowDataType().notNull().getLogicalType()); return new HoodieTableSource( - tableSchema, + schema, path, context.getTable().getPartitionKeys(), conf.getString(FlinkOptions.PARTITION_DEFAULT_NAME), @@ -65,11 +70,9 @@ public class HoodieTableFactory implements TableSourceFactory, TableSin @Override public TableSink createTableSink(TableSinkFactory.Context context) { Configuration conf = FlinkOptions.fromMap(context.getTable().getOptions()); - conf.setString(FlinkOptions.TABLE_NAME.key(), context.getObjectIdentifier().getObjectName()); - conf.setString(FlinkOptions.PARTITION_PATH_FIELD, String.join(",", context.getTable().getPartitionKeys())); - TableSchema tableSchema = TableSchemaUtils.getPhysicalSchema(context.getTable().getSchema()); - inferAvroSchema(conf, tableSchema.toRowDataType().notNull().getLogicalType()); - return new HoodieTableSink(conf, tableSchema); + TableSchema schema = TableSchemaUtils.getPhysicalSchema(context.getTable().getSchema()); + setupConfOptions(conf, context.getObjectIdentifier().getObjectName(), context.getTable(), schema); + return new HoodieTableSink(conf, schema); } @Override @@ -89,6 +92,52 @@ public class HoodieTableFactory implements TableSourceFactory, TableSin // Utilities // ------------------------------------------------------------------------- + /** + * Setup the config options based on the table definition, for e.g the table name, primary key. + * + * @param conf The configuration to setup + * @param tableName The table name + * @param table The catalog table + * @param schema The physical schema + */ + private static void setupConfOptions( + Configuration conf, + String tableName, + CatalogTable table, + TableSchema schema) { + // table name + conf.setString(FlinkOptions.TABLE_NAME.key(), tableName); + // hoodie key about options + setupHoodieKeyOptions(conf, table); + // infer avro schema from physical DDL schema + inferAvroSchema(conf, schema.toRowDataType().notNull().getLogicalType()); + } + + /** + * Sets up the hoodie key options (e.g. record key and partition key) from the table definition. + */ + private static void setupHoodieKeyOptions(Configuration conf, CatalogTable table) { + List pkColumns = table.getSchema().getPrimaryKey() + .map(UniqueConstraint::getColumns).orElse(Collections.emptyList()); + if (pkColumns.size() > 0) { + // the PRIMARY KEY syntax always has higher priority than option FlinkOptions#RECORD_KEY_FIELD + String recordKey = String.join(",", pkColumns); + conf.setString(FlinkOptions.RECORD_KEY_FIELD, recordKey); + } + List partitions = table.getPartitionKeys(); + if (partitions.size() > 0) { + // the PARTITIONED BY syntax always has higher priority than option FlinkOptions#PARTITION_PATH_FIELD + conf.setString(FlinkOptions.PARTITION_PATH_FIELD, String.join(",", partitions)); + } + // tweak the key gen class if possible + boolean complexHoodieKey = pkColumns.size() > 1 || partitions.size() > 1; + if (complexHoodieKey && FlinkOptions.isDefaultValueDefined(conf, FlinkOptions.KEYGEN_CLASS)) { + conf.setString(FlinkOptions.KEYGEN_CLASS, ComplexAvroKeyGenerator.class.getName()); + LOG.info("Table option [{}] is reset to {} because record key or partition path has two or more fields", + FlinkOptions.KEYGEN_CLASS.key(), ComplexAvroKeyGenerator.class.getName()); + } + } + /** * Inferences the deserialization Avro schema from the table schema (e.g. the DDL) * if both options {@link FlinkOptions#READ_AVRO_SCHEMA_PATH} and @@ -97,7 +146,7 @@ public class HoodieTableFactory implements TableSourceFactory, TableSin * @param conf The configuration * @param rowType The specified table row type */ - private void inferAvroSchema(Configuration conf, LogicalType rowType) { + private static void inferAvroSchema(Configuration conf, LogicalType rowType) { if (!conf.getOptional(FlinkOptions.READ_AVRO_SCHEMA_PATH).isPresent() && !conf.getOptional(FlinkOptions.READ_AVRO_SCHEMA).isPresent()) { String inferredSchema = AvroSchemaConverter.convertToSchema(rowType).toString(); diff --git a/hudi-flink/src/test/java/org/apache/hudi/table/TestHoodieTableFactory.java b/hudi-flink/src/test/java/org/apache/hudi/table/TestHoodieTableFactory.java index f64808e95..44c030aaf 100644 --- a/hudi-flink/src/test/java/org/apache/hudi/table/TestHoodieTableFactory.java +++ b/hudi-flink/src/test/java/org/apache/hudi/table/TestHoodieTableFactory.java @@ -19,11 +19,14 @@ package org.apache.hudi.table; import org.apache.hudi.configuration.FlinkOptions; +import org.apache.hudi.keygen.ComplexAvroKeyGenerator; import org.apache.hudi.util.StreamerUtil; import org.apache.hudi.utils.TestConfigurations; import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.ReadableConfig; +import org.apache.flink.table.api.DataTypes; +import org.apache.flink.table.api.TableSchema; import org.apache.flink.table.catalog.CatalogTable; import org.apache.flink.table.catalog.CatalogTableImpl; import org.apache.flink.table.catalog.ObjectIdentifier; @@ -36,6 +39,7 @@ import org.junit.jupiter.api.io.TempDir; import java.io.File; import java.io.IOException; import java.util.Collections; +import java.util.List; import java.util.Objects; import static org.hamcrest.CoreMatchers.is; @@ -86,6 +90,38 @@ public class TestHoodieTableFactory { assertNull(conf2.get(FlinkOptions.READ_AVRO_SCHEMA), "expect schema string as null"); } + @Test + void testSetupHoodieKeyOptionsForSource() { + this.conf.setString(FlinkOptions.RECORD_KEY_FIELD, "dummyField"); + this.conf.setString(FlinkOptions.KEYGEN_CLASS, "dummyKeyGenClass"); + // definition with simple primary key and partition path + TableSchema schema1 = TableSchema.builder() + .field("f0", DataTypes.INT().notNull()) + .field("f1", DataTypes.VARCHAR(20)) + .field("f2", DataTypes.TIMESTAMP(3)) + .primaryKey("f0") + .build(); + final MockSourceContext sourceContext1 = MockSourceContext.getInstance(this.conf, schema1, "f2"); + final HoodieTableSource tableSource1 = (HoodieTableSource) new HoodieTableFactory().createTableSource(sourceContext1); + final Configuration conf1 = tableSource1.getConf(); + assertThat(conf1.get(FlinkOptions.RECORD_KEY_FIELD), is("f0")); + assertThat(conf1.get(FlinkOptions.KEYGEN_CLASS), is("dummyKeyGenClass")); + + // definition with complex primary keys and partition paths + this.conf.setString(FlinkOptions.KEYGEN_CLASS, FlinkOptions.KEYGEN_CLASS.defaultValue()); + TableSchema schema2 = TableSchema.builder() + .field("f0", DataTypes.INT().notNull()) + .field("f1", DataTypes.VARCHAR(20).notNull()) + .field("f2", DataTypes.TIMESTAMP(3)) + .primaryKey("f0", "f1") + .build(); + final MockSourceContext sourceContext2 = MockSourceContext.getInstance(this.conf, schema2, "f2"); + final HoodieTableSource tableSource2 = (HoodieTableSource) new HoodieTableFactory().createTableSource(sourceContext2); + final Configuration conf2 = tableSource2.getConf(); + assertThat(conf2.get(FlinkOptions.RECORD_KEY_FIELD), is("f0,f1")); + assertThat(conf2.get(FlinkOptions.KEYGEN_CLASS), is(ComplexAvroKeyGenerator.class.getName())); + } + @Test void testInferAvroSchemaForSink() { // infer the schema if not specified @@ -102,6 +138,38 @@ public class TestHoodieTableFactory { assertNull(conf2.get(FlinkOptions.READ_AVRO_SCHEMA), "expect schema string as null"); } + @Test + void testSetupHoodieKeyOptionsForSink() { + this.conf.setString(FlinkOptions.RECORD_KEY_FIELD, "dummyField"); + this.conf.setString(FlinkOptions.KEYGEN_CLASS, "dummyKeyGenClass"); + // definition with simple primary key and partition path + TableSchema schema1 = TableSchema.builder() + .field("f0", DataTypes.INT().notNull()) + .field("f1", DataTypes.VARCHAR(20)) + .field("f2", DataTypes.TIMESTAMP(3)) + .primaryKey("f0") + .build(); + final MockSinkContext sinkContext1 = MockSinkContext.getInstance(this.conf, schema1, "f2"); + final HoodieTableSink tableSink1 = (HoodieTableSink) new HoodieTableFactory().createTableSink(sinkContext1); + final Configuration conf1 = tableSink1.getConf(); + assertThat(conf1.get(FlinkOptions.RECORD_KEY_FIELD), is("f0")); + assertThat(conf1.get(FlinkOptions.KEYGEN_CLASS), is("dummyKeyGenClass")); + + // definition with complex primary keys and partition paths + this.conf.setString(FlinkOptions.KEYGEN_CLASS, FlinkOptions.KEYGEN_CLASS.defaultValue()); + TableSchema schema2 = TableSchema.builder() + .field("f0", DataTypes.INT().notNull()) + .field("f1", DataTypes.VARCHAR(20).notNull()) + .field("f2", DataTypes.TIMESTAMP(3)) + .primaryKey("f0", "f1") + .build(); + final MockSinkContext sinkContext2 = MockSinkContext.getInstance(this.conf, schema2, "f2"); + final HoodieTableSink tableSink2 = (HoodieTableSink) new HoodieTableFactory().createTableSink(sinkContext2); + final Configuration conf2 = tableSink2.getConf(); + assertThat(conf2.get(FlinkOptions.RECORD_KEY_FIELD), is("f0,f1")); + assertThat(conf2.get(FlinkOptions.KEYGEN_CLASS), is(ComplexAvroKeyGenerator.class.getName())); + } + // ------------------------------------------------------------------------- // Inner Class // ------------------------------------------------------------------------- @@ -111,13 +179,25 @@ public class TestHoodieTableFactory { */ private static class MockSourceContext implements TableSourceFactory.Context { private final Configuration conf; + private final TableSchema schema; + private final List partitions; - private MockSourceContext(Configuration conf) { + private MockSourceContext(Configuration conf, TableSchema schema, List partitions) { this.conf = conf; + this.schema = schema; + this.partitions = partitions; } static MockSourceContext getInstance(Configuration conf) { - return new MockSourceContext(conf); + return getInstance(conf, TestConfigurations.TABLE_SCHEMA, Collections.singletonList("partition")); + } + + static MockSourceContext getInstance(Configuration conf, TableSchema schema, String partition) { + return getInstance(conf, schema, Collections.singletonList(partition)); + } + + static MockSourceContext getInstance(Configuration conf, TableSchema schema, List partitions) { + return new MockSourceContext(conf, schema, partitions); } @Override @@ -127,8 +207,7 @@ public class TestHoodieTableFactory { @Override public CatalogTable getTable() { - return new CatalogTableImpl(TestConfigurations.TABLE_SCHEMA, Collections.singletonList("partition"), - conf.toMap(), "mock source table"); + return new CatalogTableImpl(schema, partitions, conf.toMap(), "mock source table"); } @Override @@ -142,13 +221,25 @@ public class TestHoodieTableFactory { */ private static class MockSinkContext implements TableSinkFactory.Context { private final Configuration conf; + private final TableSchema schema; + private final List partitions; - private MockSinkContext(Configuration conf) { + private MockSinkContext(Configuration conf, TableSchema schema, List partitions) { this.conf = conf; + this.schema = schema; + this.partitions = partitions; } static MockSinkContext getInstance(Configuration conf) { - return new MockSinkContext(conf); + return getInstance(conf, TestConfigurations.TABLE_SCHEMA, "partition"); + } + + static MockSinkContext getInstance(Configuration conf, TableSchema schema, String partition) { + return getInstance(conf, schema, Collections.singletonList(partition)); + } + + static MockSinkContext getInstance(Configuration conf, TableSchema schema, List partitions) { + return new MockSinkContext(conf, schema, partitions); } @Override @@ -158,8 +249,7 @@ public class TestHoodieTableFactory { @Override public CatalogTable getTable() { - return new CatalogTableImpl(TestConfigurations.TABLE_SCHEMA, Collections.singletonList("partition"), - conf.toMap(), "mock sink table"); + return new CatalogTableImpl(this.schema, this.partitions, conf.toMap(), "mock sink table"); } @Override diff --git a/hudi-flink/src/test/java/org/apache/hudi/utils/TestConfigurations.java b/hudi-flink/src/test/java/org/apache/hudi/utils/TestConfigurations.java index e32b9c021..eaf597976 100644 --- a/hudi-flink/src/test/java/org/apache/hudi/utils/TestConfigurations.java +++ b/hudi-flink/src/test/java/org/apache/hudi/utils/TestConfigurations.java @@ -73,7 +73,8 @@ public class TestConfigurations { + " name varchar(10),\n" + " age int,\n" + " ts timestamp(3),\n" - + " `partition` varchar(20)\n" + + " `partition` varchar(20),\n" + + " PRIMARY KEY(uuid) NOT ENFORCED\n" + ")\n" + "PARTITIONED BY (`partition`)\n" + "with (\n"