[HUDI-1704] Use PRIMARY KEY syntax to define record keys for Flink Hudi table (#2694)
The SQL PRIMARY KEY semantics is very same with Hoodie record key, using PRIMARY KEY is more straight-forward way instead of a table option: hoodie.datasource.write.recordkey.field. After this change, both PRIMARY KEY and table option can define hoodie record key, while the PRIMARY KEY has higher priority if both are defined. Note: a column with PRIMARY KEY constraint is forced to be non-nullable.
This commit is contained in:
@@ -377,4 +377,12 @@ public class FlinkOptions {
|
||||
map.forEach(configuration::setString);
|
||||
return configuration;
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns whether the given conf defines default value for the option {@code option}.
|
||||
*/
|
||||
public static <T> boolean isDefaultValueDefined(Configuration conf, ConfigOption<T> option) {
|
||||
return !conf.getOptional(option).isPresent()
|
||||
|| conf.get(option).equals(option.defaultValue());
|
||||
}
|
||||
}
|
||||
|
||||
@@ -19,11 +19,14 @@
|
||||
package org.apache.hudi.table;
|
||||
|
||||
import org.apache.hudi.configuration.FlinkOptions;
|
||||
import org.apache.hudi.keygen.ComplexAvroKeyGenerator;
|
||||
import org.apache.hudi.util.AvroSchemaConverter;
|
||||
|
||||
import org.apache.flink.configuration.Configuration;
|
||||
import org.apache.flink.table.api.TableSchema;
|
||||
import org.apache.flink.table.api.ValidationException;
|
||||
import org.apache.flink.table.api.constraints.UniqueConstraint;
|
||||
import org.apache.flink.table.catalog.CatalogTable;
|
||||
import org.apache.flink.table.data.RowData;
|
||||
import org.apache.flink.table.factories.FactoryUtil;
|
||||
import org.apache.flink.table.factories.TableSinkFactory;
|
||||
@@ -33,6 +36,8 @@ import org.apache.flink.table.sources.TableSource;
|
||||
import org.apache.flink.table.types.logical.LogicalType;
|
||||
import org.apache.flink.table.utils.TableSchemaUtils;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
@@ -43,19 +48,19 @@ import java.util.Map;
|
||||
* Hoodie data source/sink factory.
|
||||
*/
|
||||
public class HoodieTableFactory implements TableSourceFactory<RowData>, TableSinkFactory<RowData> {
|
||||
private static final Logger LOG = LoggerFactory.getLogger(HoodieTableFactory.class);
|
||||
|
||||
public static final String FACTORY_ID = "hudi";
|
||||
|
||||
@Override
|
||||
public TableSource<RowData> createTableSource(TableSourceFactory.Context context) {
|
||||
Configuration conf = FlinkOptions.fromMap(context.getTable().getOptions());
|
||||
conf.setString(FlinkOptions.TABLE_NAME.key(), context.getObjectIdentifier().getObjectName());
|
||||
conf.setString(FlinkOptions.PARTITION_PATH_FIELD, String.join(",", context.getTable().getPartitionKeys()));
|
||||
TableSchema schema = TableSchemaUtils.getPhysicalSchema(context.getTable().getSchema());
|
||||
setupConfOptions(conf, context.getObjectIdentifier().getObjectName(), context.getTable(), schema);
|
||||
Path path = new Path(conf.getOptional(FlinkOptions.PATH).orElseThrow(() ->
|
||||
new ValidationException("Option [path] should be not empty.")));
|
||||
TableSchema tableSchema = TableSchemaUtils.getPhysicalSchema(context.getTable().getSchema());
|
||||
inferAvroSchema(conf, tableSchema.toRowDataType().notNull().getLogicalType());
|
||||
return new HoodieTableSource(
|
||||
tableSchema,
|
||||
schema,
|
||||
path,
|
||||
context.getTable().getPartitionKeys(),
|
||||
conf.getString(FlinkOptions.PARTITION_DEFAULT_NAME),
|
||||
@@ -65,11 +70,9 @@ public class HoodieTableFactory implements TableSourceFactory<RowData>, TableSin
|
||||
@Override
|
||||
public TableSink<RowData> createTableSink(TableSinkFactory.Context context) {
|
||||
Configuration conf = FlinkOptions.fromMap(context.getTable().getOptions());
|
||||
conf.setString(FlinkOptions.TABLE_NAME.key(), context.getObjectIdentifier().getObjectName());
|
||||
conf.setString(FlinkOptions.PARTITION_PATH_FIELD, String.join(",", context.getTable().getPartitionKeys()));
|
||||
TableSchema tableSchema = TableSchemaUtils.getPhysicalSchema(context.getTable().getSchema());
|
||||
inferAvroSchema(conf, tableSchema.toRowDataType().notNull().getLogicalType());
|
||||
return new HoodieTableSink(conf, tableSchema);
|
||||
TableSchema schema = TableSchemaUtils.getPhysicalSchema(context.getTable().getSchema());
|
||||
setupConfOptions(conf, context.getObjectIdentifier().getObjectName(), context.getTable(), schema);
|
||||
return new HoodieTableSink(conf, schema);
|
||||
}
|
||||
|
||||
@Override
|
||||
@@ -89,6 +92,52 @@ public class HoodieTableFactory implements TableSourceFactory<RowData>, TableSin
|
||||
// Utilities
|
||||
// -------------------------------------------------------------------------
|
||||
|
||||
/**
|
||||
* Setup the config options based on the table definition, for e.g the table name, primary key.
|
||||
*
|
||||
* @param conf The configuration to setup
|
||||
* @param tableName The table name
|
||||
* @param table The catalog table
|
||||
* @param schema The physical schema
|
||||
*/
|
||||
private static void setupConfOptions(
|
||||
Configuration conf,
|
||||
String tableName,
|
||||
CatalogTable table,
|
||||
TableSchema schema) {
|
||||
// table name
|
||||
conf.setString(FlinkOptions.TABLE_NAME.key(), tableName);
|
||||
// hoodie key about options
|
||||
setupHoodieKeyOptions(conf, table);
|
||||
// infer avro schema from physical DDL schema
|
||||
inferAvroSchema(conf, schema.toRowDataType().notNull().getLogicalType());
|
||||
}
|
||||
|
||||
/**
|
||||
* Sets up the hoodie key options (e.g. record key and partition key) from the table definition.
|
||||
*/
|
||||
private static void setupHoodieKeyOptions(Configuration conf, CatalogTable table) {
|
||||
List<String> pkColumns = table.getSchema().getPrimaryKey()
|
||||
.map(UniqueConstraint::getColumns).orElse(Collections.emptyList());
|
||||
if (pkColumns.size() > 0) {
|
||||
// the PRIMARY KEY syntax always has higher priority than option FlinkOptions#RECORD_KEY_FIELD
|
||||
String recordKey = String.join(",", pkColumns);
|
||||
conf.setString(FlinkOptions.RECORD_KEY_FIELD, recordKey);
|
||||
}
|
||||
List<String> partitions = table.getPartitionKeys();
|
||||
if (partitions.size() > 0) {
|
||||
// the PARTITIONED BY syntax always has higher priority than option FlinkOptions#PARTITION_PATH_FIELD
|
||||
conf.setString(FlinkOptions.PARTITION_PATH_FIELD, String.join(",", partitions));
|
||||
}
|
||||
// tweak the key gen class if possible
|
||||
boolean complexHoodieKey = pkColumns.size() > 1 || partitions.size() > 1;
|
||||
if (complexHoodieKey && FlinkOptions.isDefaultValueDefined(conf, FlinkOptions.KEYGEN_CLASS)) {
|
||||
conf.setString(FlinkOptions.KEYGEN_CLASS, ComplexAvroKeyGenerator.class.getName());
|
||||
LOG.info("Table option [{}] is reset to {} because record key or partition path has two or more fields",
|
||||
FlinkOptions.KEYGEN_CLASS.key(), ComplexAvroKeyGenerator.class.getName());
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Inferences the deserialization Avro schema from the table schema (e.g. the DDL)
|
||||
* if both options {@link FlinkOptions#READ_AVRO_SCHEMA_PATH} and
|
||||
@@ -97,7 +146,7 @@ public class HoodieTableFactory implements TableSourceFactory<RowData>, TableSin
|
||||
* @param conf The configuration
|
||||
* @param rowType The specified table row type
|
||||
*/
|
||||
private void inferAvroSchema(Configuration conf, LogicalType rowType) {
|
||||
private static void inferAvroSchema(Configuration conf, LogicalType rowType) {
|
||||
if (!conf.getOptional(FlinkOptions.READ_AVRO_SCHEMA_PATH).isPresent()
|
||||
&& !conf.getOptional(FlinkOptions.READ_AVRO_SCHEMA).isPresent()) {
|
||||
String inferredSchema = AvroSchemaConverter.convertToSchema(rowType).toString();
|
||||
|
||||
@@ -19,11 +19,14 @@
|
||||
package org.apache.hudi.table;
|
||||
|
||||
import org.apache.hudi.configuration.FlinkOptions;
|
||||
import org.apache.hudi.keygen.ComplexAvroKeyGenerator;
|
||||
import org.apache.hudi.util.StreamerUtil;
|
||||
import org.apache.hudi.utils.TestConfigurations;
|
||||
|
||||
import org.apache.flink.configuration.Configuration;
|
||||
import org.apache.flink.configuration.ReadableConfig;
|
||||
import org.apache.flink.table.api.DataTypes;
|
||||
import org.apache.flink.table.api.TableSchema;
|
||||
import org.apache.flink.table.catalog.CatalogTable;
|
||||
import org.apache.flink.table.catalog.CatalogTableImpl;
|
||||
import org.apache.flink.table.catalog.ObjectIdentifier;
|
||||
@@ -36,6 +39,7 @@ import org.junit.jupiter.api.io.TempDir;
|
||||
import java.io.File;
|
||||
import java.io.IOException;
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
import java.util.Objects;
|
||||
|
||||
import static org.hamcrest.CoreMatchers.is;
|
||||
@@ -86,6 +90,38 @@ public class TestHoodieTableFactory {
|
||||
assertNull(conf2.get(FlinkOptions.READ_AVRO_SCHEMA), "expect schema string as null");
|
||||
}
|
||||
|
||||
@Test
|
||||
void testSetupHoodieKeyOptionsForSource() {
|
||||
this.conf.setString(FlinkOptions.RECORD_KEY_FIELD, "dummyField");
|
||||
this.conf.setString(FlinkOptions.KEYGEN_CLASS, "dummyKeyGenClass");
|
||||
// definition with simple primary key and partition path
|
||||
TableSchema schema1 = TableSchema.builder()
|
||||
.field("f0", DataTypes.INT().notNull())
|
||||
.field("f1", DataTypes.VARCHAR(20))
|
||||
.field("f2", DataTypes.TIMESTAMP(3))
|
||||
.primaryKey("f0")
|
||||
.build();
|
||||
final MockSourceContext sourceContext1 = MockSourceContext.getInstance(this.conf, schema1, "f2");
|
||||
final HoodieTableSource tableSource1 = (HoodieTableSource) new HoodieTableFactory().createTableSource(sourceContext1);
|
||||
final Configuration conf1 = tableSource1.getConf();
|
||||
assertThat(conf1.get(FlinkOptions.RECORD_KEY_FIELD), is("f0"));
|
||||
assertThat(conf1.get(FlinkOptions.KEYGEN_CLASS), is("dummyKeyGenClass"));
|
||||
|
||||
// definition with complex primary keys and partition paths
|
||||
this.conf.setString(FlinkOptions.KEYGEN_CLASS, FlinkOptions.KEYGEN_CLASS.defaultValue());
|
||||
TableSchema schema2 = TableSchema.builder()
|
||||
.field("f0", DataTypes.INT().notNull())
|
||||
.field("f1", DataTypes.VARCHAR(20).notNull())
|
||||
.field("f2", DataTypes.TIMESTAMP(3))
|
||||
.primaryKey("f0", "f1")
|
||||
.build();
|
||||
final MockSourceContext sourceContext2 = MockSourceContext.getInstance(this.conf, schema2, "f2");
|
||||
final HoodieTableSource tableSource2 = (HoodieTableSource) new HoodieTableFactory().createTableSource(sourceContext2);
|
||||
final Configuration conf2 = tableSource2.getConf();
|
||||
assertThat(conf2.get(FlinkOptions.RECORD_KEY_FIELD), is("f0,f1"));
|
||||
assertThat(conf2.get(FlinkOptions.KEYGEN_CLASS), is(ComplexAvroKeyGenerator.class.getName()));
|
||||
}
|
||||
|
||||
@Test
|
||||
void testInferAvroSchemaForSink() {
|
||||
// infer the schema if not specified
|
||||
@@ -102,6 +138,38 @@ public class TestHoodieTableFactory {
|
||||
assertNull(conf2.get(FlinkOptions.READ_AVRO_SCHEMA), "expect schema string as null");
|
||||
}
|
||||
|
||||
@Test
|
||||
void testSetupHoodieKeyOptionsForSink() {
|
||||
this.conf.setString(FlinkOptions.RECORD_KEY_FIELD, "dummyField");
|
||||
this.conf.setString(FlinkOptions.KEYGEN_CLASS, "dummyKeyGenClass");
|
||||
// definition with simple primary key and partition path
|
||||
TableSchema schema1 = TableSchema.builder()
|
||||
.field("f0", DataTypes.INT().notNull())
|
||||
.field("f1", DataTypes.VARCHAR(20))
|
||||
.field("f2", DataTypes.TIMESTAMP(3))
|
||||
.primaryKey("f0")
|
||||
.build();
|
||||
final MockSinkContext sinkContext1 = MockSinkContext.getInstance(this.conf, schema1, "f2");
|
||||
final HoodieTableSink tableSink1 = (HoodieTableSink) new HoodieTableFactory().createTableSink(sinkContext1);
|
||||
final Configuration conf1 = tableSink1.getConf();
|
||||
assertThat(conf1.get(FlinkOptions.RECORD_KEY_FIELD), is("f0"));
|
||||
assertThat(conf1.get(FlinkOptions.KEYGEN_CLASS), is("dummyKeyGenClass"));
|
||||
|
||||
// definition with complex primary keys and partition paths
|
||||
this.conf.setString(FlinkOptions.KEYGEN_CLASS, FlinkOptions.KEYGEN_CLASS.defaultValue());
|
||||
TableSchema schema2 = TableSchema.builder()
|
||||
.field("f0", DataTypes.INT().notNull())
|
||||
.field("f1", DataTypes.VARCHAR(20).notNull())
|
||||
.field("f2", DataTypes.TIMESTAMP(3))
|
||||
.primaryKey("f0", "f1")
|
||||
.build();
|
||||
final MockSinkContext sinkContext2 = MockSinkContext.getInstance(this.conf, schema2, "f2");
|
||||
final HoodieTableSink tableSink2 = (HoodieTableSink) new HoodieTableFactory().createTableSink(sinkContext2);
|
||||
final Configuration conf2 = tableSink2.getConf();
|
||||
assertThat(conf2.get(FlinkOptions.RECORD_KEY_FIELD), is("f0,f1"));
|
||||
assertThat(conf2.get(FlinkOptions.KEYGEN_CLASS), is(ComplexAvroKeyGenerator.class.getName()));
|
||||
}
|
||||
|
||||
// -------------------------------------------------------------------------
|
||||
// Inner Class
|
||||
// -------------------------------------------------------------------------
|
||||
@@ -111,13 +179,25 @@ public class TestHoodieTableFactory {
|
||||
*/
|
||||
private static class MockSourceContext implements TableSourceFactory.Context {
|
||||
private final Configuration conf;
|
||||
private final TableSchema schema;
|
||||
private final List<String> partitions;
|
||||
|
||||
private MockSourceContext(Configuration conf) {
|
||||
private MockSourceContext(Configuration conf, TableSchema schema, List<String> partitions) {
|
||||
this.conf = conf;
|
||||
this.schema = schema;
|
||||
this.partitions = partitions;
|
||||
}
|
||||
|
||||
static MockSourceContext getInstance(Configuration conf) {
|
||||
return new MockSourceContext(conf);
|
||||
return getInstance(conf, TestConfigurations.TABLE_SCHEMA, Collections.singletonList("partition"));
|
||||
}
|
||||
|
||||
static MockSourceContext getInstance(Configuration conf, TableSchema schema, String partition) {
|
||||
return getInstance(conf, schema, Collections.singletonList(partition));
|
||||
}
|
||||
|
||||
static MockSourceContext getInstance(Configuration conf, TableSchema schema, List<String> partitions) {
|
||||
return new MockSourceContext(conf, schema, partitions);
|
||||
}
|
||||
|
||||
@Override
|
||||
@@ -127,8 +207,7 @@ public class TestHoodieTableFactory {
|
||||
|
||||
@Override
|
||||
public CatalogTable getTable() {
|
||||
return new CatalogTableImpl(TestConfigurations.TABLE_SCHEMA, Collections.singletonList("partition"),
|
||||
conf.toMap(), "mock source table");
|
||||
return new CatalogTableImpl(schema, partitions, conf.toMap(), "mock source table");
|
||||
}
|
||||
|
||||
@Override
|
||||
@@ -142,13 +221,25 @@ public class TestHoodieTableFactory {
|
||||
*/
|
||||
private static class MockSinkContext implements TableSinkFactory.Context {
|
||||
private final Configuration conf;
|
||||
private final TableSchema schema;
|
||||
private final List<String> partitions;
|
||||
|
||||
private MockSinkContext(Configuration conf) {
|
||||
private MockSinkContext(Configuration conf, TableSchema schema, List<String> partitions) {
|
||||
this.conf = conf;
|
||||
this.schema = schema;
|
||||
this.partitions = partitions;
|
||||
}
|
||||
|
||||
static MockSinkContext getInstance(Configuration conf) {
|
||||
return new MockSinkContext(conf);
|
||||
return getInstance(conf, TestConfigurations.TABLE_SCHEMA, "partition");
|
||||
}
|
||||
|
||||
static MockSinkContext getInstance(Configuration conf, TableSchema schema, String partition) {
|
||||
return getInstance(conf, schema, Collections.singletonList(partition));
|
||||
}
|
||||
|
||||
static MockSinkContext getInstance(Configuration conf, TableSchema schema, List<String> partitions) {
|
||||
return new MockSinkContext(conf, schema, partitions);
|
||||
}
|
||||
|
||||
@Override
|
||||
@@ -158,8 +249,7 @@ public class TestHoodieTableFactory {
|
||||
|
||||
@Override
|
||||
public CatalogTable getTable() {
|
||||
return new CatalogTableImpl(TestConfigurations.TABLE_SCHEMA, Collections.singletonList("partition"),
|
||||
conf.toMap(), "mock sink table");
|
||||
return new CatalogTableImpl(this.schema, this.partitions, conf.toMap(), "mock sink table");
|
||||
}
|
||||
|
||||
@Override
|
||||
|
||||
@@ -73,7 +73,8 @@ public class TestConfigurations {
|
||||
+ " name varchar(10),\n"
|
||||
+ " age int,\n"
|
||||
+ " ts timestamp(3),\n"
|
||||
+ " `partition` varchar(20)\n"
|
||||
+ " `partition` varchar(20),\n"
|
||||
+ " PRIMARY KEY(uuid) NOT ENFORCED\n"
|
||||
+ ")\n"
|
||||
+ "PARTITIONED BY (`partition`)\n"
|
||||
+ "with (\n"
|
||||
|
||||
Reference in New Issue
Block a user