From e5b6b8602c242c89cdb45440df8d2996a6c301f1 Mon Sep 17 00:00:00 2001 From: Danny Chan Date: Thu, 28 Oct 2021 13:52:06 +0800 Subject: [PATCH] [HUDI-2633] Make precombine field optional for flink (#3874) --- .../hudi/configuration/FlinkOptions.java | 1 + .../hudi/configuration/OptionsResolver.java | 17 +++++++ .../sink/StreamWriteOperatorCoordinator.java | 6 ++- .../hudi/sink/utils/PayloadCreation.java | 8 ++-- .../apache/hudi/table/HoodieTableFactory.java | 27 +++++++++-- .../hudi/table/HoodieDataSourceITCase.java | 23 +++++++++ .../hudi/table/TestHoodieTableFactory.java | 48 ++++++++++++++----- .../java/org/apache/hudi/utils/TestSQL.java | 7 +++ 8 files changed, 115 insertions(+), 22 deletions(-) diff --git a/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java b/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java index a0e7c7ae9..6f8c1ff5b 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java +++ b/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java @@ -241,6 +241,7 @@ public class FlinkOptions extends HoodieConfig { .defaultValue("upsert") .withDescription("The write operation, that this write should do"); + public static final String NO_PRE_COMBINE = "no_precombine"; public static final ConfigOption PRECOMBINE_FIELD = ConfigOptions .key("write.precombine.field") .stringType() diff --git a/hudi-flink/src/main/java/org/apache/hudi/configuration/OptionsResolver.java b/hudi-flink/src/main/java/org/apache/hudi/configuration/OptionsResolver.java index fa8ee49f1..075736fe3 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/configuration/OptionsResolver.java +++ b/hudi-flink/src/main/java/org/apache/hudi/configuration/OptionsResolver.java @@ -18,6 +18,7 @@ package org.apache.hudi.configuration; +import org.apache.hudi.common.model.DefaultHoodieRecordPayload; import org.apache.hudi.common.model.WriteOperationType; import org.apache.flink.configuration.Configuration; @@ -67,4 +68,20 @@ public class OptionsResolver { .toUpperCase(Locale.ROOT) .equals(FlinkOptions.TABLE_TYPE_COPY_ON_WRITE); } + + /** + * Returns whether the payload clazz is {@link DefaultHoodieRecordPayload}. + */ + public static boolean isDefaultHoodieRecordPayloadClazz(Configuration conf) { + return conf.getString(FlinkOptions.PAYLOAD_CLASS_NAME).contains(DefaultHoodieRecordPayload.class.getSimpleName()); + } + + /** + * Returns the preCombine field + * or null if the value is set as {@link FlinkOptions#NO_PRE_COMBINE}. + */ + public static String getPreCombineField(Configuration conf) { + final String preCombineField = conf.getString(FlinkOptions.PRECOMBINE_FIELD); + return preCombineField.equals(FlinkOptions.NO_PRE_COMBINE) ? null : preCombineField; + } } diff --git a/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java b/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java index 2a4a51cf9..f9c810861 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java +++ b/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java @@ -238,8 +238,10 @@ public class StreamWriteOperatorCoordinator public void notifyCheckpointAborted(long checkpointId) { // once the checkpoint was aborted, unblock the writer tasks to // reuse the last instant. - executor.execute(() -> sendCommitAckEvents(checkpointId), - "unblock data write with aborted checkpoint %s", checkpointId); + if (!WriteMetadataEvent.BOOTSTRAP_INSTANT.equals(this.instant)) { + executor.execute(() -> sendCommitAckEvents(checkpointId), + "unblock data write with aborted checkpoint %s", checkpointId); + } } @Override diff --git a/hudi-flink/src/main/java/org/apache/hudi/sink/utils/PayloadCreation.java b/hudi-flink/src/main/java/org/apache/hudi/sink/utils/PayloadCreation.java index f2cb60d51..64facf3b1 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/sink/utils/PayloadCreation.java +++ b/hudi-flink/src/main/java/org/apache/hudi/sink/utils/PayloadCreation.java @@ -26,6 +26,7 @@ import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.ReflectionUtils; import org.apache.hudi.common.util.ValidationUtils; import org.apache.hudi.configuration.FlinkOptions; +import org.apache.hudi.configuration.OptionsResolver; import org.apache.avro.generic.GenericRecord; import org.apache.flink.configuration.Configuration; @@ -55,13 +56,14 @@ public class PayloadCreation implements Serializable { } public static PayloadCreation instance(Configuration conf) throws Exception { - boolean shouldCombine = conf.getBoolean(FlinkOptions.PRE_COMBINE) + String preCombineField = OptionsResolver.getPreCombineField(conf); + boolean needCombine = conf.getBoolean(FlinkOptions.PRE_COMBINE) || WriteOperationType.fromValue(conf.getString(FlinkOptions.OPERATION)) == WriteOperationType.UPSERT; - String preCombineField = null; + boolean shouldCombine = needCombine && preCombineField != null; + final Class[] argTypes; final Constructor constructor; if (shouldCombine) { - preCombineField = conf.getString(FlinkOptions.PRECOMBINE_FIELD); argTypes = new Class[] {GenericRecord.class, Comparable.class}; } else { argTypes = new Class[] {Option.class}; diff --git a/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableFactory.java b/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableFactory.java index 4fb37a36e..2a1eda00f 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableFactory.java +++ b/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableFactory.java @@ -18,6 +18,7 @@ package org.apache.hudi.table; +import org.apache.hudi.common.model.DefaultHoodieRecordPayload; import org.apache.hudi.configuration.FlinkOptions; import org.apache.hudi.configuration.OptionsResolver; import org.apache.hudi.exception.HoodieValidationException; @@ -117,20 +118,36 @@ public class HoodieTableFactory implements DynamicTableSourceFactory, DynamicTab // validate record key in pk absence. if (!schema.getPrimaryKey().isPresent()) { - Arrays.stream(conf.get(FlinkOptions.RECORD_KEY_FIELD).split(",")) + String[] recordKeys = conf.get(FlinkOptions.RECORD_KEY_FIELD).split(","); + if (recordKeys.length == 1 + && FlinkOptions.RECORD_KEY_FIELD.defaultValue().equals(recordKeys[0]) + && !fields.contains(recordKeys[0])) { + throw new HoodieValidationException("Primary key definition is required, use either PRIMARY KEY syntax " + + "or option '" + FlinkOptions.RECORD_KEY_FIELD.key() + "' to specify."); + } + + Arrays.stream(recordKeys) .filter(field -> !fields.contains(field)) .findAny() .ifPresent(f -> { - throw new ValidationException("Field '" + f + "' does not exist in the table schema." - + "Please define primary key or modify 'hoodie.datasource.write.recordkey.field' option."); + throw new HoodieValidationException("Field '" + f + "' specified in option " + + "'" + FlinkOptions.RECORD_KEY_FIELD.key() + "' does not exist in the table schema."); }); } // validate pre_combine key String preCombineField = conf.get(FlinkOptions.PRECOMBINE_FIELD); if (!fields.contains(preCombineField)) { - throw new ValidationException("Field " + preCombineField + " does not exist in the table schema." - + "Please check 'write.precombine.field' option."); + if (OptionsResolver.isDefaultHoodieRecordPayloadClazz(conf)) { + throw new HoodieValidationException("Option '" + FlinkOptions.PRECOMBINE_FIELD.key() + + "' is required for payload class: " + DefaultHoodieRecordPayload.class.getName()); + } + if (preCombineField.equals(FlinkOptions.PRECOMBINE_FIELD.defaultValue())) { + conf.setString(FlinkOptions.PRECOMBINE_FIELD, FlinkOptions.NO_PRE_COMBINE); + } else { + throw new HoodieValidationException("Field " + preCombineField + " does not exist in the table schema." + + "Please check '" + FlinkOptions.PRECOMBINE_FIELD.key() + "' option."); + } } } diff --git a/hudi-flink/src/test/java/org/apache/hudi/table/HoodieDataSourceITCase.java b/hudi-flink/src/test/java/org/apache/hudi/table/HoodieDataSourceITCase.java index 2d500ad39..8e366bba4 100644 --- a/hudi-flink/src/test/java/org/apache/hudi/table/HoodieDataSourceITCase.java +++ b/hudi-flink/src/test/java/org/apache/hudi/table/HoodieDataSourceITCase.java @@ -394,6 +394,29 @@ public class HoodieDataSourceITCase extends AbstractTestBase { + "+I[id8, Han, 56, 1970-01-01T00:00:08, par4]]"); } + @ParameterizedTest + @MethodSource("tableTypeAndPartitioningParams") + void testWriteAndReadWithProctimeSequence(HoodieTableType tableType, boolean hiveStylePartitioning) { + TableEnvironment tableEnv = batchTableEnv; + String hoodieTableDDL = sql("t1") + .field("uuid varchar(20)") + .field("name varchar(10)") + .field("age int") + .field("tss timestamp(3)") // use a different field with default precombine field 'ts' + .field("`partition` varchar(10)") + .option(FlinkOptions.PATH, tempFile.getAbsolutePath()) + .option(FlinkOptions.TABLE_TYPE, tableType) + .option(FlinkOptions.HIVE_STYLE_PARTITIONING, hiveStylePartitioning) + .end(); + tableEnv.executeSql(hoodieTableDDL); + + execInsertSql(tableEnv, TestSQL.INSERT_SAME_KEY_T1); + + List result1 = CollectionUtil.iterableToList( + () -> tableEnv.sqlQuery("select * from t1").execute().collect()); + assertRowsEquals(result1, "[+I[id1, Danny, 23, 1970-01-01T00:00:01, par1]]"); + } + @ParameterizedTest @EnumSource(value = HoodieTableType.class) void testBatchModeUpsertWithoutPartition(HoodieTableType tableType) { diff --git a/hudi-flink/src/test/java/org/apache/hudi/table/TestHoodieTableFactory.java b/hudi-flink/src/test/java/org/apache/hudi/table/TestHoodieTableFactory.java index ad55dbf23..01acbd9f5 100644 --- a/hudi-flink/src/test/java/org/apache/hudi/table/TestHoodieTableFactory.java +++ b/hudi-flink/src/test/java/org/apache/hudi/table/TestHoodieTableFactory.java @@ -18,7 +18,9 @@ package org.apache.hudi.table; +import org.apache.hudi.common.model.DefaultHoodieRecordPayload; import org.apache.hudi.configuration.FlinkOptions; +import org.apache.hudi.exception.HoodieValidationException; import org.apache.hudi.hive.MultiPartKeysValueExtractor; import org.apache.hudi.hive.SlashEncodedDayPartitionValueExtractor; import org.apache.hudi.keygen.ComplexAvroKeyGenerator; @@ -32,7 +34,6 @@ import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.ReadableConfig; import org.apache.flink.table.api.DataTypes; import org.apache.flink.table.api.Schema; -import org.apache.flink.table.api.ValidationException; import org.apache.flink.table.catalog.CatalogTable; import org.apache.flink.table.catalog.ObjectIdentifier; import org.apache.flink.table.catalog.ResolvedCatalogTable; @@ -84,39 +85,62 @@ public class TestHoodieTableFactory { @Test void testRequiredOptionsForSource() { - // miss pk and pre combine key will throw exception + // miss pk and precombine key will throw exception ResolvedSchema schema1 = SchemaBuilder.instance() .field("f0", DataTypes.INT().notNull()) .field("f1", DataTypes.VARCHAR(20)) .field("f2", DataTypes.TIMESTAMP(3)) .build(); final MockContext sourceContext1 = MockContext.getInstance(this.conf, schema1, "f2"); - assertThrows(ValidationException.class, () -> new HoodieTableFactory().createDynamicTableSource(sourceContext1)); - assertThrows(ValidationException.class, () -> new HoodieTableFactory().createDynamicTableSink(sourceContext1)); + assertThrows(HoodieValidationException.class, () -> new HoodieTableFactory().createDynamicTableSource(sourceContext1)); + assertThrows(HoodieValidationException.class, () -> new HoodieTableFactory().createDynamicTableSink(sourceContext1)); - // given the pk and miss the pre combine key will throw exception + // a non-exists precombine key will throw exception ResolvedSchema schema2 = SchemaBuilder.instance() + .field("f0", DataTypes.INT().notNull()) + .field("f1", DataTypes.VARCHAR(20)) + .field("f2", DataTypes.TIMESTAMP(3)) + .build(); + this.conf.setString(FlinkOptions.PRECOMBINE_FIELD, "non_exist_field"); + final MockContext sourceContext2 = MockContext.getInstance(this.conf, schema2, "f2"); + assertThrows(HoodieValidationException.class, () -> new HoodieTableFactory().createDynamicTableSource(sourceContext2)); + assertThrows(HoodieValidationException.class, () -> new HoodieTableFactory().createDynamicTableSink(sourceContext2)); + this.conf.setString(FlinkOptions.PRECOMBINE_FIELD, FlinkOptions.PRECOMBINE_FIELD.defaultValue()); + + // given the pk but miss the pre combine key will be ok + ResolvedSchema schema3 = SchemaBuilder.instance() .field("f0", DataTypes.INT().notNull()) .field("f1", DataTypes.VARCHAR(20)) .field("f2", DataTypes.TIMESTAMP(3)) .primaryKey("f0") .build(); - final MockContext sourceContext2 = MockContext.getInstance(this.conf, schema2, "f2"); - assertThrows(ValidationException.class, () -> new HoodieTableFactory().createDynamicTableSource(sourceContext2)); - assertThrows(ValidationException.class, () -> new HoodieTableFactory().createDynamicTableSink(sourceContext2)); + final MockContext sourceContext3 = MockContext.getInstance(this.conf, schema3, "f2"); + HoodieTableSource tableSource = (HoodieTableSource) new HoodieTableFactory().createDynamicTableSource(sourceContext3); + HoodieTableSink tableSink = (HoodieTableSink) new HoodieTableFactory().createDynamicTableSink(sourceContext3); + // the precombine field is overwritten + assertThat(tableSource.getConf().getString(FlinkOptions.PRECOMBINE_FIELD), is(FlinkOptions.NO_PRE_COMBINE)); + assertThat(tableSink.getConf().getString(FlinkOptions.PRECOMBINE_FIELD), is(FlinkOptions.NO_PRE_COMBINE)); + + // given pk but miss the pre combine key with DefaultHoodieRecordPayload should throw + this.conf.setString(FlinkOptions.PAYLOAD_CLASS_NAME, DefaultHoodieRecordPayload.class.getName()); + final MockContext sourceContext4 = MockContext.getInstance(this.conf, schema3, "f2"); + + assertThrows(HoodieValidationException.class, () -> new HoodieTableFactory().createDynamicTableSource(sourceContext4)); + assertThrows(HoodieValidationException.class, () -> new HoodieTableFactory().createDynamicTableSink(sourceContext4)); + this.conf.setString(FlinkOptions.PAYLOAD_CLASS_NAME, FlinkOptions.PAYLOAD_CLASS_NAME.defaultValue()); // given pk and pre combine key will be ok - ResolvedSchema schema3 = SchemaBuilder.instance() + ResolvedSchema schema4 = SchemaBuilder.instance() .field("f0", DataTypes.INT().notNull()) .field("f1", DataTypes.VARCHAR(20)) .field("f2", DataTypes.TIMESTAMP(3)) .field("ts", DataTypes.TIMESTAMP(3)) .primaryKey("f0") .build(); - final MockContext sourceContext3 = MockContext.getInstance(this.conf, schema3, "f2"); + final MockContext sourceContext5 = MockContext.getInstance(this.conf, schema4, "f2"); - assertDoesNotThrow(() -> new HoodieTableFactory().createDynamicTableSource(sourceContext3)); - assertDoesNotThrow(() -> new HoodieTableFactory().createDynamicTableSink(sourceContext3)); + assertDoesNotThrow(() -> new HoodieTableFactory().createDynamicTableSource(sourceContext5)); + assertDoesNotThrow(() -> new HoodieTableFactory().createDynamicTableSink(sourceContext5)); } @Test diff --git a/hudi-flink/src/test/java/org/apache/hudi/utils/TestSQL.java b/hudi-flink/src/test/java/org/apache/hudi/utils/TestSQL.java index 8822a6f79..9dc78aa4c 100644 --- a/hudi-flink/src/test/java/org/apache/hudi/utils/TestSQL.java +++ b/hudi-flink/src/test/java/org/apache/hudi/utils/TestSQL.java @@ -35,6 +35,13 @@ public class TestSQL { + "('id7','Bob',44,TIMESTAMP '1970-01-01 00:00:07','par4'),\n" + "('id8','Han',56,TIMESTAMP '1970-01-01 00:00:08','par4')"; + public static final String INSERT_SAME_KEY_T1 = "insert into t1 values\n" + + "('id1','Danny',23,TIMESTAMP '1970-01-01 00:00:05','par1'),\n" + + "('id1','Danny',23,TIMESTAMP '1970-01-01 00:00:04','par1'),\n" + + "('id1','Danny',23,TIMESTAMP '1970-01-01 00:00:03','par1'),\n" + + "('id1','Danny',23,TIMESTAMP '1970-01-01 00:00:02','par1'),\n" + + "('id1','Danny',23,TIMESTAMP '1970-01-01 00:00:01','par1')"; + public static final String UPDATE_INSERT_T1 = "insert into t1 values\n" + "('id1','Danny',24,TIMESTAMP '1970-01-01 00:00:01','par1'),\n" + "('id2','Stephen',34,TIMESTAMP '1970-01-01 00:00:02','par1'),\n"