[HUDI-2633] Make precombine field optional for flink (#3874)
This commit is contained in:
@@ -241,6 +241,7 @@ public class FlinkOptions extends HoodieConfig {
|
||||
.defaultValue("upsert")
|
||||
.withDescription("The write operation, that this write should do");
|
||||
|
||||
public static final String NO_PRE_COMBINE = "no_precombine";
|
||||
public static final ConfigOption<String> PRECOMBINE_FIELD = ConfigOptions
|
||||
.key("write.precombine.field")
|
||||
.stringType()
|
||||
|
||||
@@ -18,6 +18,7 @@
|
||||
|
||||
package org.apache.hudi.configuration;
|
||||
|
||||
import org.apache.hudi.common.model.DefaultHoodieRecordPayload;
|
||||
import org.apache.hudi.common.model.WriteOperationType;
|
||||
|
||||
import org.apache.flink.configuration.Configuration;
|
||||
@@ -67,4 +68,20 @@ public class OptionsResolver {
|
||||
.toUpperCase(Locale.ROOT)
|
||||
.equals(FlinkOptions.TABLE_TYPE_COPY_ON_WRITE);
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns whether the payload clazz is {@link DefaultHoodieRecordPayload}.
|
||||
*/
|
||||
public static boolean isDefaultHoodieRecordPayloadClazz(Configuration conf) {
|
||||
return conf.getString(FlinkOptions.PAYLOAD_CLASS_NAME).contains(DefaultHoodieRecordPayload.class.getSimpleName());
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns the preCombine field
|
||||
* or null if the value is set as {@link FlinkOptions#NO_PRE_COMBINE}.
|
||||
*/
|
||||
public static String getPreCombineField(Configuration conf) {
|
||||
final String preCombineField = conf.getString(FlinkOptions.PRECOMBINE_FIELD);
|
||||
return preCombineField.equals(FlinkOptions.NO_PRE_COMBINE) ? null : preCombineField;
|
||||
}
|
||||
}
|
||||
|
||||
@@ -238,8 +238,10 @@ public class StreamWriteOperatorCoordinator
|
||||
public void notifyCheckpointAborted(long checkpointId) {
|
||||
// once the checkpoint was aborted, unblock the writer tasks to
|
||||
// reuse the last instant.
|
||||
executor.execute(() -> sendCommitAckEvents(checkpointId),
|
||||
"unblock data write with aborted checkpoint %s", checkpointId);
|
||||
if (!WriteMetadataEvent.BOOTSTRAP_INSTANT.equals(this.instant)) {
|
||||
executor.execute(() -> sendCommitAckEvents(checkpointId),
|
||||
"unblock data write with aborted checkpoint %s", checkpointId);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
|
||||
@@ -26,6 +26,7 @@ import org.apache.hudi.common.util.Option;
|
||||
import org.apache.hudi.common.util.ReflectionUtils;
|
||||
import org.apache.hudi.common.util.ValidationUtils;
|
||||
import org.apache.hudi.configuration.FlinkOptions;
|
||||
import org.apache.hudi.configuration.OptionsResolver;
|
||||
|
||||
import org.apache.avro.generic.GenericRecord;
|
||||
import org.apache.flink.configuration.Configuration;
|
||||
@@ -55,13 +56,14 @@ public class PayloadCreation implements Serializable {
|
||||
}
|
||||
|
||||
public static PayloadCreation instance(Configuration conf) throws Exception {
|
||||
boolean shouldCombine = conf.getBoolean(FlinkOptions.PRE_COMBINE)
|
||||
String preCombineField = OptionsResolver.getPreCombineField(conf);
|
||||
boolean needCombine = conf.getBoolean(FlinkOptions.PRE_COMBINE)
|
||||
|| WriteOperationType.fromValue(conf.getString(FlinkOptions.OPERATION)) == WriteOperationType.UPSERT;
|
||||
String preCombineField = null;
|
||||
boolean shouldCombine = needCombine && preCombineField != null;
|
||||
|
||||
final Class<?>[] argTypes;
|
||||
final Constructor<?> constructor;
|
||||
if (shouldCombine) {
|
||||
preCombineField = conf.getString(FlinkOptions.PRECOMBINE_FIELD);
|
||||
argTypes = new Class<?>[] {GenericRecord.class, Comparable.class};
|
||||
} else {
|
||||
argTypes = new Class<?>[] {Option.class};
|
||||
|
||||
@@ -18,6 +18,7 @@
|
||||
|
||||
package org.apache.hudi.table;
|
||||
|
||||
import org.apache.hudi.common.model.DefaultHoodieRecordPayload;
|
||||
import org.apache.hudi.configuration.FlinkOptions;
|
||||
import org.apache.hudi.configuration.OptionsResolver;
|
||||
import org.apache.hudi.exception.HoodieValidationException;
|
||||
@@ -117,20 +118,36 @@ public class HoodieTableFactory implements DynamicTableSourceFactory, DynamicTab
|
||||
|
||||
// validate record key in pk absence.
|
||||
if (!schema.getPrimaryKey().isPresent()) {
|
||||
Arrays.stream(conf.get(FlinkOptions.RECORD_KEY_FIELD).split(","))
|
||||
String[] recordKeys = conf.get(FlinkOptions.RECORD_KEY_FIELD).split(",");
|
||||
if (recordKeys.length == 1
|
||||
&& FlinkOptions.RECORD_KEY_FIELD.defaultValue().equals(recordKeys[0])
|
||||
&& !fields.contains(recordKeys[0])) {
|
||||
throw new HoodieValidationException("Primary key definition is required, use either PRIMARY KEY syntax "
|
||||
+ "or option '" + FlinkOptions.RECORD_KEY_FIELD.key() + "' to specify.");
|
||||
}
|
||||
|
||||
Arrays.stream(recordKeys)
|
||||
.filter(field -> !fields.contains(field))
|
||||
.findAny()
|
||||
.ifPresent(f -> {
|
||||
throw new ValidationException("Field '" + f + "' does not exist in the table schema."
|
||||
+ "Please define primary key or modify 'hoodie.datasource.write.recordkey.field' option.");
|
||||
throw new HoodieValidationException("Field '" + f + "' specified in option "
|
||||
+ "'" + FlinkOptions.RECORD_KEY_FIELD.key() + "' does not exist in the table schema.");
|
||||
});
|
||||
}
|
||||
|
||||
// validate pre_combine key
|
||||
String preCombineField = conf.get(FlinkOptions.PRECOMBINE_FIELD);
|
||||
if (!fields.contains(preCombineField)) {
|
||||
throw new ValidationException("Field " + preCombineField + " does not exist in the table schema."
|
||||
+ "Please check 'write.precombine.field' option.");
|
||||
if (OptionsResolver.isDefaultHoodieRecordPayloadClazz(conf)) {
|
||||
throw new HoodieValidationException("Option '" + FlinkOptions.PRECOMBINE_FIELD.key()
|
||||
+ "' is required for payload class: " + DefaultHoodieRecordPayload.class.getName());
|
||||
}
|
||||
if (preCombineField.equals(FlinkOptions.PRECOMBINE_FIELD.defaultValue())) {
|
||||
conf.setString(FlinkOptions.PRECOMBINE_FIELD, FlinkOptions.NO_PRE_COMBINE);
|
||||
} else {
|
||||
throw new HoodieValidationException("Field " + preCombineField + " does not exist in the table schema."
|
||||
+ "Please check '" + FlinkOptions.PRECOMBINE_FIELD.key() + "' option.");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -394,6 +394,29 @@ public class HoodieDataSourceITCase extends AbstractTestBase {
|
||||
+ "+I[id8, Han, 56, 1970-01-01T00:00:08, par4]]");
|
||||
}
|
||||
|
||||
@ParameterizedTest
|
||||
@MethodSource("tableTypeAndPartitioningParams")
|
||||
void testWriteAndReadWithProctimeSequence(HoodieTableType tableType, boolean hiveStylePartitioning) {
|
||||
TableEnvironment tableEnv = batchTableEnv;
|
||||
String hoodieTableDDL = sql("t1")
|
||||
.field("uuid varchar(20)")
|
||||
.field("name varchar(10)")
|
||||
.field("age int")
|
||||
.field("tss timestamp(3)") // use a different field with default precombine field 'ts'
|
||||
.field("`partition` varchar(10)")
|
||||
.option(FlinkOptions.PATH, tempFile.getAbsolutePath())
|
||||
.option(FlinkOptions.TABLE_TYPE, tableType)
|
||||
.option(FlinkOptions.HIVE_STYLE_PARTITIONING, hiveStylePartitioning)
|
||||
.end();
|
||||
tableEnv.executeSql(hoodieTableDDL);
|
||||
|
||||
execInsertSql(tableEnv, TestSQL.INSERT_SAME_KEY_T1);
|
||||
|
||||
List<Row> result1 = CollectionUtil.iterableToList(
|
||||
() -> tableEnv.sqlQuery("select * from t1").execute().collect());
|
||||
assertRowsEquals(result1, "[+I[id1, Danny, 23, 1970-01-01T00:00:01, par1]]");
|
||||
}
|
||||
|
||||
@ParameterizedTest
|
||||
@EnumSource(value = HoodieTableType.class)
|
||||
void testBatchModeUpsertWithoutPartition(HoodieTableType tableType) {
|
||||
|
||||
@@ -18,7 +18,9 @@
|
||||
|
||||
package org.apache.hudi.table;
|
||||
|
||||
import org.apache.hudi.common.model.DefaultHoodieRecordPayload;
|
||||
import org.apache.hudi.configuration.FlinkOptions;
|
||||
import org.apache.hudi.exception.HoodieValidationException;
|
||||
import org.apache.hudi.hive.MultiPartKeysValueExtractor;
|
||||
import org.apache.hudi.hive.SlashEncodedDayPartitionValueExtractor;
|
||||
import org.apache.hudi.keygen.ComplexAvroKeyGenerator;
|
||||
@@ -32,7 +34,6 @@ import org.apache.flink.configuration.Configuration;
|
||||
import org.apache.flink.configuration.ReadableConfig;
|
||||
import org.apache.flink.table.api.DataTypes;
|
||||
import org.apache.flink.table.api.Schema;
|
||||
import org.apache.flink.table.api.ValidationException;
|
||||
import org.apache.flink.table.catalog.CatalogTable;
|
||||
import org.apache.flink.table.catalog.ObjectIdentifier;
|
||||
import org.apache.flink.table.catalog.ResolvedCatalogTable;
|
||||
@@ -84,39 +85,62 @@ public class TestHoodieTableFactory {
|
||||
|
||||
@Test
|
||||
void testRequiredOptionsForSource() {
|
||||
// miss pk and pre combine key will throw exception
|
||||
// miss pk and precombine key will throw exception
|
||||
ResolvedSchema schema1 = SchemaBuilder.instance()
|
||||
.field("f0", DataTypes.INT().notNull())
|
||||
.field("f1", DataTypes.VARCHAR(20))
|
||||
.field("f2", DataTypes.TIMESTAMP(3))
|
||||
.build();
|
||||
final MockContext sourceContext1 = MockContext.getInstance(this.conf, schema1, "f2");
|
||||
assertThrows(ValidationException.class, () -> new HoodieTableFactory().createDynamicTableSource(sourceContext1));
|
||||
assertThrows(ValidationException.class, () -> new HoodieTableFactory().createDynamicTableSink(sourceContext1));
|
||||
assertThrows(HoodieValidationException.class, () -> new HoodieTableFactory().createDynamicTableSource(sourceContext1));
|
||||
assertThrows(HoodieValidationException.class, () -> new HoodieTableFactory().createDynamicTableSink(sourceContext1));
|
||||
|
||||
// given the pk and miss the pre combine key will throw exception
|
||||
// a non-exists precombine key will throw exception
|
||||
ResolvedSchema schema2 = SchemaBuilder.instance()
|
||||
.field("f0", DataTypes.INT().notNull())
|
||||
.field("f1", DataTypes.VARCHAR(20))
|
||||
.field("f2", DataTypes.TIMESTAMP(3))
|
||||
.build();
|
||||
this.conf.setString(FlinkOptions.PRECOMBINE_FIELD, "non_exist_field");
|
||||
final MockContext sourceContext2 = MockContext.getInstance(this.conf, schema2, "f2");
|
||||
assertThrows(HoodieValidationException.class, () -> new HoodieTableFactory().createDynamicTableSource(sourceContext2));
|
||||
assertThrows(HoodieValidationException.class, () -> new HoodieTableFactory().createDynamicTableSink(sourceContext2));
|
||||
this.conf.setString(FlinkOptions.PRECOMBINE_FIELD, FlinkOptions.PRECOMBINE_FIELD.defaultValue());
|
||||
|
||||
// given the pk but miss the pre combine key will be ok
|
||||
ResolvedSchema schema3 = SchemaBuilder.instance()
|
||||
.field("f0", DataTypes.INT().notNull())
|
||||
.field("f1", DataTypes.VARCHAR(20))
|
||||
.field("f2", DataTypes.TIMESTAMP(3))
|
||||
.primaryKey("f0")
|
||||
.build();
|
||||
final MockContext sourceContext2 = MockContext.getInstance(this.conf, schema2, "f2");
|
||||
assertThrows(ValidationException.class, () -> new HoodieTableFactory().createDynamicTableSource(sourceContext2));
|
||||
assertThrows(ValidationException.class, () -> new HoodieTableFactory().createDynamicTableSink(sourceContext2));
|
||||
final MockContext sourceContext3 = MockContext.getInstance(this.conf, schema3, "f2");
|
||||
HoodieTableSource tableSource = (HoodieTableSource) new HoodieTableFactory().createDynamicTableSource(sourceContext3);
|
||||
HoodieTableSink tableSink = (HoodieTableSink) new HoodieTableFactory().createDynamicTableSink(sourceContext3);
|
||||
// the precombine field is overwritten
|
||||
assertThat(tableSource.getConf().getString(FlinkOptions.PRECOMBINE_FIELD), is(FlinkOptions.NO_PRE_COMBINE));
|
||||
assertThat(tableSink.getConf().getString(FlinkOptions.PRECOMBINE_FIELD), is(FlinkOptions.NO_PRE_COMBINE));
|
||||
|
||||
// given pk but miss the pre combine key with DefaultHoodieRecordPayload should throw
|
||||
this.conf.setString(FlinkOptions.PAYLOAD_CLASS_NAME, DefaultHoodieRecordPayload.class.getName());
|
||||
final MockContext sourceContext4 = MockContext.getInstance(this.conf, schema3, "f2");
|
||||
|
||||
assertThrows(HoodieValidationException.class, () -> new HoodieTableFactory().createDynamicTableSource(sourceContext4));
|
||||
assertThrows(HoodieValidationException.class, () -> new HoodieTableFactory().createDynamicTableSink(sourceContext4));
|
||||
this.conf.setString(FlinkOptions.PAYLOAD_CLASS_NAME, FlinkOptions.PAYLOAD_CLASS_NAME.defaultValue());
|
||||
|
||||
// given pk and pre combine key will be ok
|
||||
ResolvedSchema schema3 = SchemaBuilder.instance()
|
||||
ResolvedSchema schema4 = SchemaBuilder.instance()
|
||||
.field("f0", DataTypes.INT().notNull())
|
||||
.field("f1", DataTypes.VARCHAR(20))
|
||||
.field("f2", DataTypes.TIMESTAMP(3))
|
||||
.field("ts", DataTypes.TIMESTAMP(3))
|
||||
.primaryKey("f0")
|
||||
.build();
|
||||
final MockContext sourceContext3 = MockContext.getInstance(this.conf, schema3, "f2");
|
||||
final MockContext sourceContext5 = MockContext.getInstance(this.conf, schema4, "f2");
|
||||
|
||||
assertDoesNotThrow(() -> new HoodieTableFactory().createDynamicTableSource(sourceContext3));
|
||||
assertDoesNotThrow(() -> new HoodieTableFactory().createDynamicTableSink(sourceContext3));
|
||||
assertDoesNotThrow(() -> new HoodieTableFactory().createDynamicTableSource(sourceContext5));
|
||||
assertDoesNotThrow(() -> new HoodieTableFactory().createDynamicTableSink(sourceContext5));
|
||||
}
|
||||
|
||||
@Test
|
||||
|
||||
@@ -35,6 +35,13 @@ public class TestSQL {
|
||||
+ "('id7','Bob',44,TIMESTAMP '1970-01-01 00:00:07','par4'),\n"
|
||||
+ "('id8','Han',56,TIMESTAMP '1970-01-01 00:00:08','par4')";
|
||||
|
||||
public static final String INSERT_SAME_KEY_T1 = "insert into t1 values\n"
|
||||
+ "('id1','Danny',23,TIMESTAMP '1970-01-01 00:00:05','par1'),\n"
|
||||
+ "('id1','Danny',23,TIMESTAMP '1970-01-01 00:00:04','par1'),\n"
|
||||
+ "('id1','Danny',23,TIMESTAMP '1970-01-01 00:00:03','par1'),\n"
|
||||
+ "('id1','Danny',23,TIMESTAMP '1970-01-01 00:00:02','par1'),\n"
|
||||
+ "('id1','Danny',23,TIMESTAMP '1970-01-01 00:00:01','par1')";
|
||||
|
||||
public static final String UPDATE_INSERT_T1 = "insert into t1 values\n"
|
||||
+ "('id1','Danny',24,TIMESTAMP '1970-01-01 00:00:01','par1'),\n"
|
||||
+ "('id2','Stephen',34,TIMESTAMP '1970-01-01 00:00:02','par1'),\n"
|
||||
|
||||
Reference in New Issue
Block a user