[HUDI-1818] Validate required fields for Flink HoodieTable (#2930)
This commit is contained in:
@@ -40,9 +40,11 @@ import org.apache.hadoop.fs.Path;
|
|||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
|
import java.util.Arrays;
|
||||||
import java.util.Collections;
|
import java.util.Collections;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Set;
|
import java.util.Set;
|
||||||
|
import java.util.stream.Collectors;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Hoodie data source/sink factory.
|
* Hoodie data source/sink factory.
|
||||||
@@ -59,6 +61,7 @@ public class HoodieTableFactory implements DynamicTableSourceFactory, DynamicTab
|
|||||||
|
|
||||||
Configuration conf = (Configuration) helper.getOptions();
|
Configuration conf = (Configuration) helper.getOptions();
|
||||||
TableSchema schema = TableSchemaUtils.getPhysicalSchema(context.getCatalogTable().getSchema());
|
TableSchema schema = TableSchemaUtils.getPhysicalSchema(context.getCatalogTable().getSchema());
|
||||||
|
validateRequiredFields(conf, schema);
|
||||||
setupConfOptions(conf, context.getObjectIdentifier().getObjectName(), context.getCatalogTable(), schema);
|
setupConfOptions(conf, context.getObjectIdentifier().getObjectName(), context.getCatalogTable(), schema);
|
||||||
|
|
||||||
Path path = new Path(conf.getOptional(FlinkOptions.PATH).orElseThrow(() ->
|
Path path = new Path(conf.getOptional(FlinkOptions.PATH).orElseThrow(() ->
|
||||||
@@ -75,6 +78,7 @@ public class HoodieTableFactory implements DynamicTableSourceFactory, DynamicTab
|
|||||||
public DynamicTableSink createDynamicTableSink(Context context) {
|
public DynamicTableSink createDynamicTableSink(Context context) {
|
||||||
Configuration conf = FlinkOptions.fromMap(context.getCatalogTable().getOptions());
|
Configuration conf = FlinkOptions.fromMap(context.getCatalogTable().getOptions());
|
||||||
TableSchema schema = TableSchemaUtils.getPhysicalSchema(context.getCatalogTable().getSchema());
|
TableSchema schema = TableSchemaUtils.getPhysicalSchema(context.getCatalogTable().getSchema());
|
||||||
|
validateRequiredFields(conf, schema);
|
||||||
setupConfOptions(conf, context.getObjectIdentifier().getObjectName(), context.getCatalogTable(), schema);
|
setupConfOptions(conf, context.getObjectIdentifier().getObjectName(), context.getCatalogTable(), schema);
|
||||||
return new HoodieTableSink(conf, schema);
|
return new HoodieTableSink(conf, schema);
|
||||||
}
|
}
|
||||||
@@ -98,6 +102,33 @@ public class HoodieTableFactory implements DynamicTableSourceFactory, DynamicTab
|
|||||||
// Utilities
|
// Utilities
|
||||||
// -------------------------------------------------------------------------
|
// -------------------------------------------------------------------------
|
||||||
|
|
||||||
|
/** Validate required options. e.g record key and pre combine key.
|
||||||
|
*
|
||||||
|
* @param conf The table options
|
||||||
|
* @param schema The table schema
|
||||||
|
*/
|
||||||
|
private void validateRequiredFields(Configuration conf, TableSchema schema) {
|
||||||
|
List<String> fields = Arrays.stream(schema.getFieldNames()).collect(Collectors.toList());
|
||||||
|
|
||||||
|
// validate record key in pk absence.
|
||||||
|
if (!schema.getPrimaryKey().isPresent()) {
|
||||||
|
Arrays.stream(conf.get(FlinkOptions.RECORD_KEY_FIELD).split(","))
|
||||||
|
.filter(field -> !fields.contains(field))
|
||||||
|
.findAny()
|
||||||
|
.ifPresent(e -> {
|
||||||
|
throw new ValidationException("The " + e + " field not exists in table schema."
|
||||||
|
+ "Please define primary key or modify hoodie.datasource.write.recordkey.field option.");
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
// validate pre combine key
|
||||||
|
String preCombineField = conf.get(FlinkOptions.PRECOMBINE_FIELD);
|
||||||
|
if (!fields.contains(preCombineField)) {
|
||||||
|
throw new ValidationException("The " + preCombineField + " field not exists in table schema."
|
||||||
|
+ "Please check write.precombine.field option.");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Setup the config options based on the table definition, for e.g the table name, primary key.
|
* Setup the config options based on the table definition, for e.g the table name, primary key.
|
||||||
*
|
*
|
||||||
|
|||||||
@@ -28,6 +28,7 @@ import org.apache.flink.configuration.Configuration;
|
|||||||
import org.apache.flink.configuration.ReadableConfig;
|
import org.apache.flink.configuration.ReadableConfig;
|
||||||
import org.apache.flink.table.api.DataTypes;
|
import org.apache.flink.table.api.DataTypes;
|
||||||
import org.apache.flink.table.api.TableSchema;
|
import org.apache.flink.table.api.TableSchema;
|
||||||
|
import org.apache.flink.table.api.ValidationException;
|
||||||
import org.apache.flink.table.catalog.CatalogTable;
|
import org.apache.flink.table.catalog.CatalogTable;
|
||||||
import org.apache.flink.table.catalog.CatalogTableImpl;
|
import org.apache.flink.table.catalog.CatalogTableImpl;
|
||||||
import org.apache.flink.table.catalog.ObjectIdentifier;
|
import org.apache.flink.table.catalog.ObjectIdentifier;
|
||||||
@@ -44,7 +45,9 @@ import java.util.Objects;
|
|||||||
|
|
||||||
import static org.hamcrest.CoreMatchers.is;
|
import static org.hamcrest.CoreMatchers.is;
|
||||||
import static org.hamcrest.MatcherAssert.assertThat;
|
import static org.hamcrest.MatcherAssert.assertThat;
|
||||||
|
import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
|
||||||
import static org.junit.jupiter.api.Assertions.assertNull;
|
import static org.junit.jupiter.api.Assertions.assertNull;
|
||||||
|
import static org.junit.jupiter.api.Assertions.assertThrows;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Test cases for {@link HoodieTableFactory}.
|
* Test cases for {@link HoodieTableFactory}.
|
||||||
@@ -74,6 +77,43 @@ public class TestHoodieTableFactory {
|
|||||||
StreamerUtil.initTableIfNotExists(this.conf);
|
StreamerUtil.initTableIfNotExists(this.conf);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
void testRequiredOptionsForSource() {
|
||||||
|
// miss pk and pre combine key will throw exception
|
||||||
|
TableSchema schema1 = TableSchema.builder()
|
||||||
|
.field("f0", DataTypes.INT().notNull())
|
||||||
|
.field("f1", DataTypes.VARCHAR(20))
|
||||||
|
.field("f2", DataTypes.TIMESTAMP(3))
|
||||||
|
.build();
|
||||||
|
final MockContext sourceContext1 = MockContext.getInstance(this.conf, schema1, "f2");
|
||||||
|
assertThrows(ValidationException.class, () -> new HoodieTableFactory().createDynamicTableSource(sourceContext1));
|
||||||
|
assertThrows(ValidationException.class, () -> new HoodieTableFactory().createDynamicTableSink(sourceContext1));
|
||||||
|
|
||||||
|
// given the pk and miss the pre combine key will throw exception
|
||||||
|
TableSchema schema2 = TableSchema.builder()
|
||||||
|
.field("f0", DataTypes.INT().notNull())
|
||||||
|
.field("f1", DataTypes.VARCHAR(20))
|
||||||
|
.field("f2", DataTypes.TIMESTAMP(3))
|
||||||
|
.primaryKey("f0")
|
||||||
|
.build();
|
||||||
|
final MockContext sourceContext2 = MockContext.getInstance(this.conf, schema2, "f2");
|
||||||
|
assertThrows(ValidationException.class, () -> new HoodieTableFactory().createDynamicTableSource(sourceContext2));
|
||||||
|
assertThrows(ValidationException.class, () -> new HoodieTableFactory().createDynamicTableSink(sourceContext2));
|
||||||
|
|
||||||
|
// given pk and pre combine key will be ok
|
||||||
|
TableSchema schema3 = TableSchema.builder()
|
||||||
|
.field("f0", DataTypes.INT().notNull())
|
||||||
|
.field("f1", DataTypes.VARCHAR(20))
|
||||||
|
.field("f2", DataTypes.TIMESTAMP(3))
|
||||||
|
.field("ts", DataTypes.TIMESTAMP(3))
|
||||||
|
.primaryKey("f0")
|
||||||
|
.build();
|
||||||
|
final MockContext sourceContext3 = MockContext.getInstance(this.conf, schema3, "f2");
|
||||||
|
|
||||||
|
assertDoesNotThrow(() -> new HoodieTableFactory().createDynamicTableSource(sourceContext3));
|
||||||
|
assertDoesNotThrow(() -> new HoodieTableFactory().createDynamicTableSink(sourceContext3));
|
||||||
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
void testInferAvroSchemaForSource() {
|
void testInferAvroSchemaForSource() {
|
||||||
// infer the schema if not specified
|
// infer the schema if not specified
|
||||||
@@ -99,6 +139,7 @@ public class TestHoodieTableFactory {
|
|||||||
.field("f0", DataTypes.INT().notNull())
|
.field("f0", DataTypes.INT().notNull())
|
||||||
.field("f1", DataTypes.VARCHAR(20))
|
.field("f1", DataTypes.VARCHAR(20))
|
||||||
.field("f2", DataTypes.TIMESTAMP(3))
|
.field("f2", DataTypes.TIMESTAMP(3))
|
||||||
|
.field("ts", DataTypes.TIMESTAMP(3))
|
||||||
.primaryKey("f0")
|
.primaryKey("f0")
|
||||||
.build();
|
.build();
|
||||||
final MockContext sourceContext1 = MockContext.getInstance(this.conf, schema1, "f2");
|
final MockContext sourceContext1 = MockContext.getInstance(this.conf, schema1, "f2");
|
||||||
@@ -113,6 +154,7 @@ public class TestHoodieTableFactory {
|
|||||||
.field("f0", DataTypes.INT().notNull())
|
.field("f0", DataTypes.INT().notNull())
|
||||||
.field("f1", DataTypes.VARCHAR(20).notNull())
|
.field("f1", DataTypes.VARCHAR(20).notNull())
|
||||||
.field("f2", DataTypes.TIMESTAMP(3))
|
.field("f2", DataTypes.TIMESTAMP(3))
|
||||||
|
.field("ts", DataTypes.TIMESTAMP(3))
|
||||||
.primaryKey("f0", "f1")
|
.primaryKey("f0", "f1")
|
||||||
.build();
|
.build();
|
||||||
final MockContext sourceContext2 = MockContext.getInstance(this.conf, schema2, "f2");
|
final MockContext sourceContext2 = MockContext.getInstance(this.conf, schema2, "f2");
|
||||||
@@ -137,6 +179,7 @@ public class TestHoodieTableFactory {
|
|||||||
.field("f0", DataTypes.INT().notNull())
|
.field("f0", DataTypes.INT().notNull())
|
||||||
.field("f1", DataTypes.VARCHAR(20))
|
.field("f1", DataTypes.VARCHAR(20))
|
||||||
.field("f2", DataTypes.TIMESTAMP(3))
|
.field("f2", DataTypes.TIMESTAMP(3))
|
||||||
|
.field("ts", DataTypes.TIMESTAMP(3))
|
||||||
.primaryKey("f0")
|
.primaryKey("f0")
|
||||||
.build();
|
.build();
|
||||||
// set up new retains commits that is less than min archive commits
|
// set up new retains commits that is less than min archive commits
|
||||||
@@ -183,6 +226,7 @@ public class TestHoodieTableFactory {
|
|||||||
.field("f0", DataTypes.INT().notNull())
|
.field("f0", DataTypes.INT().notNull())
|
||||||
.field("f1", DataTypes.VARCHAR(20))
|
.field("f1", DataTypes.VARCHAR(20))
|
||||||
.field("f2", DataTypes.TIMESTAMP(3))
|
.field("f2", DataTypes.TIMESTAMP(3))
|
||||||
|
.field("ts", DataTypes.TIMESTAMP(3))
|
||||||
.primaryKey("f0")
|
.primaryKey("f0")
|
||||||
.build();
|
.build();
|
||||||
final MockContext sinkContext1 = MockContext.getInstance(this.conf, schema1, "f2");
|
final MockContext sinkContext1 = MockContext.getInstance(this.conf, schema1, "f2");
|
||||||
@@ -197,6 +241,7 @@ public class TestHoodieTableFactory {
|
|||||||
.field("f0", DataTypes.INT().notNull())
|
.field("f0", DataTypes.INT().notNull())
|
||||||
.field("f1", DataTypes.VARCHAR(20).notNull())
|
.field("f1", DataTypes.VARCHAR(20).notNull())
|
||||||
.field("f2", DataTypes.TIMESTAMP(3))
|
.field("f2", DataTypes.TIMESTAMP(3))
|
||||||
|
.field("ts", DataTypes.TIMESTAMP(3))
|
||||||
.primaryKey("f0", "f1")
|
.primaryKey("f0", "f1")
|
||||||
.build();
|
.build();
|
||||||
final MockContext sinkContext2 = MockContext.getInstance(this.conf, schema2, "f2");
|
final MockContext sinkContext2 = MockContext.getInstance(this.conf, schema2, "f2");
|
||||||
@@ -221,6 +266,7 @@ public class TestHoodieTableFactory {
|
|||||||
.field("f0", DataTypes.INT().notNull())
|
.field("f0", DataTypes.INT().notNull())
|
||||||
.field("f1", DataTypes.VARCHAR(20))
|
.field("f1", DataTypes.VARCHAR(20))
|
||||||
.field("f2", DataTypes.TIMESTAMP(3))
|
.field("f2", DataTypes.TIMESTAMP(3))
|
||||||
|
.field("ts", DataTypes.TIMESTAMP(3))
|
||||||
.primaryKey("f0")
|
.primaryKey("f0")
|
||||||
.build();
|
.build();
|
||||||
// set up new retains commits that is less than min archive commits
|
// set up new retains commits that is less than min archive commits
|
||||||
|
|||||||
Reference in New Issue
Block a user