From b7586a563253082a00d62ac81e3b5acbb378472a Mon Sep 17 00:00:00 2001 From: Danny Chan Date: Fri, 6 Aug 2021 10:30:52 +0800 Subject: [PATCH] [HUDI-2274] Allows INSERT duplicates for Flink MOR table (#3403) --- .../table/HoodieFlinkMergeOnReadTable.java | 10 ++++++---- .../hudi/configuration/FlinkOptions.java | 6 +++--- .../hudi/streamer/FlinkStreamerConfig.java | 6 +++--- .../apache/hudi/table/HoodieTableFactory.java | 11 ++++++----- .../org/apache/hudi/util/StreamerUtil.java | 2 +- .../apache/hudi/sink/TestWriteCopyOnWrite.java | 1 + .../apache/hudi/sink/TestWriteMergeOnRead.java | 6 ------ .../sink/TestWriteMergeOnReadWithCompact.java | 6 ------ .../hudi/table/TestHoodieTableFactory.java | 18 ------------------ 9 files changed, 20 insertions(+), 46 deletions(-) diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/HoodieFlinkMergeOnReadTable.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/HoodieFlinkMergeOnReadTable.java index de485dbaa..bfe8b6f49 100644 --- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/HoodieFlinkMergeOnReadTable.java +++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/HoodieFlinkMergeOnReadTable.java @@ -82,10 +82,12 @@ public class HoodieFlinkMergeOnReadTable HoodieWriteHandle writeHandle, String instantTime, List> hoodieRecords) { - ValidationUtils.checkArgument(writeHandle instanceof FlinkAppendHandle, - "MOR write handle should always be a FlinkAppendHandle"); - FlinkAppendHandle appendHandle = (FlinkAppendHandle) writeHandle; - return new FlinkUpsertDeltaCommitActionExecutor<>(context, appendHandle, config, this, instantTime, hoodieRecords).execute(); + if (writeHandle instanceof FlinkAppendHandle) { + FlinkAppendHandle appendHandle = (FlinkAppendHandle) writeHandle; + return new FlinkUpsertDeltaCommitActionExecutor<>(context, appendHandle, config, this, instantTime, hoodieRecords).execute(); + } else { + return super.insert(context, writeHandle, instantTime, hoodieRecords); + } } @Override diff --git a/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java b/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java index 34b050c92..c0d721ad7 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java +++ b/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java @@ -209,11 +209,11 @@ public class FlinkOptions extends HoodieConfig { .defaultValue(TABLE_TYPE_COPY_ON_WRITE) .withDescription("Type of table to write. COPY_ON_WRITE (or) MERGE_ON_READ"); - public static final ConfigOption INSERT_ALLOW_DUP = ConfigOptions - .key("write.insert.allow_dup") + public static final ConfigOption INSERT_DEDUP = ConfigOptions + .key("write.insert.deduplicate") .booleanType() .defaultValue(true) - .withDescription("Whether to allow data duplication for INSERT operation, if enabled, writes the base files directly, default true"); + .withDescription("Whether to deduplicate for INSERT operation, if disabled, writes the base files directly, default true"); public static final ConfigOption OPERATION = ConfigOptions .key("write.operation") diff --git a/hudi-flink/src/main/java/org/apache/hudi/streamer/FlinkStreamerConfig.java b/hudi-flink/src/main/java/org/apache/hudi/streamer/FlinkStreamerConfig.java index a7ef14fa4..a4591c835 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/streamer/FlinkStreamerConfig.java +++ b/hudi-flink/src/main/java/org/apache/hudi/streamer/FlinkStreamerConfig.java @@ -69,8 +69,8 @@ public class FlinkStreamerConfig extends Configuration { @Parameter(names = {"--table-type"}, description = "Type of table. COPY_ON_WRITE (or) MERGE_ON_READ.", required = true) public String tableType; - @Parameter(names = {"--insert-allow-dup"}, description = "Whether to allow data duplication for INSERT operation, if enabled, writes the base files directly.", required = true) - public Boolean insertAllowDup = true; + @Parameter(names = {"--insert-dedup"}, description = "Whether to deduplicate for INSERT operation, if disabled, writes the base files directly.", required = true) + public Boolean insertDedup = true; @Parameter(names = {"--props"}, description = "Path to properties file on localfs or dfs, with configurations for " + "hoodie client, schema provider, key generator and data source. For hoodie client props, sane defaults are " @@ -308,7 +308,7 @@ public class FlinkStreamerConfig extends Configuration { conf.setString(FlinkOptions.TABLE_NAME, config.targetTableName); // copy_on_write works same as COPY_ON_WRITE conf.setString(FlinkOptions.TABLE_TYPE, config.tableType.toUpperCase()); - conf.setBoolean(FlinkOptions.INSERT_ALLOW_DUP, config.insertAllowDup); + conf.setBoolean(FlinkOptions.INSERT_DEDUP, config.insertDedup); conf.setString(FlinkOptions.OPERATION, config.operation.value()); conf.setString(FlinkOptions.PRECOMBINE_FIELD, config.sourceOrderingField); conf.setString(FlinkOptions.PAYLOAD_CLASS, config.payloadClassName); diff --git a/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableFactory.java b/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableFactory.java index d3c538825..413f395fc 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableFactory.java +++ b/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableFactory.java @@ -23,6 +23,7 @@ import org.apache.hudi.hive.MultiPartKeysValueExtractor; import org.apache.hudi.keygen.ComplexAvroKeyGenerator; import org.apache.hudi.keygen.NonpartitionedAvroKeyGenerator; import org.apache.hudi.util.AvroSchemaConverter; +import org.apache.hudi.util.StreamerUtil; import org.apache.flink.configuration.ConfigOption; import org.apache.flink.configuration.Configuration; @@ -129,11 +130,6 @@ public class HoodieTableFactory implements DynamicTableSourceFactory, DynamicTab throw new ValidationException("Field " + preCombineField + " does not exist in the table schema." + "Please check 'write.precombine.field' option."); } - - if (conf.getString(FlinkOptions.TABLE_TYPE).toUpperCase().equals(FlinkOptions.TABLE_TYPE_MERGE_ON_READ) - && conf.getBoolean(FlinkOptions.INSERT_ALLOW_DUP)) { - throw new ValidationException("Option 'write.insert.allow_dup' is only allowed for COPY_ON_WRITE table."); - } } /** @@ -214,6 +210,11 @@ public class HoodieTableFactory implements DynamicTableSourceFactory, DynamicTab // if compaction schedule is on, tweak the target io to 500GB conf.setLong(FlinkOptions.COMPACTION_TARGET_IO, 500 * 1024L); } + if (StreamerUtil.allowDuplicateInserts(conf)) { + // no need for compaction if insert duplicates is allowed + conf.setBoolean(FlinkOptions.COMPACTION_ASYNC_ENABLED, false); + conf.setBoolean(FlinkOptions.COMPACTION_SCHEDULE_ENABLED, false); + } } /** diff --git a/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java b/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java index 7967b69d9..3d53a07b4 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java +++ b/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java @@ -350,6 +350,6 @@ public class StreamerUtil { public static boolean allowDuplicateInserts(Configuration conf) { WriteOperationType operationType = WriteOperationType.fromValue(conf.getString(FlinkOptions.OPERATION)); - return operationType == WriteOperationType.INSERT && conf.getBoolean(FlinkOptions.INSERT_ALLOW_DUP); + return operationType == WriteOperationType.INSERT && !conf.getBoolean(FlinkOptions.INSERT_DEDUP); } } diff --git a/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteCopyOnWrite.java b/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteCopyOnWrite.java index 29a7455b6..d5639c580 100644 --- a/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteCopyOnWrite.java +++ b/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteCopyOnWrite.java @@ -538,6 +538,7 @@ public class TestWriteCopyOnWrite { // reset the config option conf.setDouble(FlinkOptions.WRITE_BATCH_SIZE, 0.0006); // 630 bytes batch size conf.setString(FlinkOptions.OPERATION, WriteOperationType.INSERT.value()); + conf.setBoolean(FlinkOptions.INSERT_DEDUP, false); funcWrapper = new StreamWriteFunctionWrapper<>(tempFile.getAbsolutePath(), conf); // open the function and ingest data diff --git a/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteMergeOnRead.java b/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteMergeOnRead.java index fa4f92bdc..07e23b56e 100644 --- a/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteMergeOnRead.java +++ b/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteMergeOnRead.java @@ -37,7 +37,6 @@ import org.apache.avro.Schema; import org.apache.flink.configuration.Configuration; import org.apache.hadoop.fs.FileSystem; import org.junit.jupiter.api.BeforeEach; -import org.junit.jupiter.api.Test; import java.io.File; import java.util.Comparator; @@ -68,11 +67,6 @@ public class TestWriteMergeOnRead extends TestWriteCopyOnWrite { conf.setBoolean(FlinkOptions.COMPACTION_ASYNC_ENABLED, false); } - @Test - public void testInsertAllowsDuplication() { - // ignore the test because only COW table supports INSERT duplication - } - @Override protected void checkWrittenData(File baseFile, Map expected, int partitions) throws Exception { HoodieTableMetaClient metaClient = HoodieFlinkTable.create(writeConfig, context).getMetaClient(); diff --git a/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteMergeOnReadWithCompact.java b/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteMergeOnReadWithCompact.java index de54d90c3..acce120f4 100644 --- a/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteMergeOnReadWithCompact.java +++ b/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteMergeOnReadWithCompact.java @@ -22,7 +22,6 @@ import org.apache.hudi.common.model.HoodieTableType; import org.apache.hudi.configuration.FlinkOptions; import org.apache.flink.configuration.Configuration; -import org.junit.jupiter.api.Test; import java.util.HashMap; import java.util.Map; @@ -38,11 +37,6 @@ public class TestWriteMergeOnReadWithCompact extends TestWriteCopyOnWrite { conf.setInteger(FlinkOptions.COMPACTION_DELTA_COMMITS, 1); } - @Test - public void testInsertAllowsDuplication() { - // ignore the test because only COW table supports INSERT duplication - } - @Override protected Map getExpectedBeforeCheckpointComplete() { return EXPECTED1; diff --git a/hudi-flink/src/test/java/org/apache/hudi/table/TestHoodieTableFactory.java b/hudi-flink/src/test/java/org/apache/hudi/table/TestHoodieTableFactory.java index 21f1647a9..799739cfc 100644 --- a/hudi-flink/src/test/java/org/apache/hudi/table/TestHoodieTableFactory.java +++ b/hudi-flink/src/test/java/org/apache/hudi/table/TestHoodieTableFactory.java @@ -340,24 +340,6 @@ public class TestHoodieTableFactory { assertThat(conf2.getInteger(FlinkOptions.ARCHIVE_MAX_COMMITS), is(45)); } - @Test - void testMorTableInsertAllowDuplication() { - TableSchema schema = TableSchema.builder() - .field("f0", DataTypes.INT().notNull()) - .field("f1", DataTypes.VARCHAR(20)) - .field("f2", DataTypes.TIMESTAMP(3)) - .field("ts", DataTypes.TIMESTAMP(3)) - .primaryKey("f0") - .build(); - // overwrite the operation - this.conf.setString(FlinkOptions.OPERATION.key(), "insert"); - this.conf.setString(FlinkOptions.TABLE_TYPE.key(), FlinkOptions.TABLE_TYPE_MERGE_ON_READ); - - final MockContext sinkContext = MockContext.getInstance(this.conf, schema, "f2"); - assertThrows(ValidationException.class, () -> new HoodieTableFactory().createDynamicTableSink(sinkContext), - "Option 'write.insert.allow_dup' is only allowed for COPY_ON_WRITE table."); - } - // ------------------------------------------------------------------------- // Inner Class // -------------------------------------------------------------------------