From 2c370cbae084a41162fedbcc0b1e66558629dcbe Mon Sep 17 00:00:00 2001 From: Danny Chan Date: Thu, 14 Oct 2021 19:42:56 +0800 Subject: [PATCH] [HUDI-2556] Tweak some default config options for flink (#3800) * rename write.insert.drop.duplicates to write.precombine and set it as true for COW table * set index.global.enabled default as true * set compaction.target_io default as 500GB --- .../hudi/configuration/FlinkOptions.java | 26 ++++++++++--------- .../apache/hudi/sink/StreamWriteFunction.java | 4 +-- .../sink/StreamWriteOperatorCoordinator.java | 10 +++---- .../hudi/sink/utils/PayloadCreation.java | 2 +- .../hudi/streamer/FlinkStreamerConfig.java | 8 +++--- .../apache/hudi/table/HoodieTableFactory.java | 19 +++++++++----- .../org/apache/hudi/util/StreamerUtil.java | 12 +-------- .../hudi/sink/TestWriteCopyOnWrite.java | 4 +-- .../hudi/table/HoodieDataSourceITCase.java | 4 +-- .../hudi/table/TestHoodieTableFactory.java | 15 +++++++++++ 10 files changed, 57 insertions(+), 47 deletions(-) diff --git a/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java b/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java index b2359f4b3..fe0237065 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java +++ b/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java @@ -115,15 +115,15 @@ public class FlinkOptions extends HoodieConfig { public static final ConfigOption INDEX_STATE_TTL = ConfigOptions .key("index.state.ttl") .doubleType() - .defaultValue(1.5D) - .withDescription("Index state ttl in days, default 1.5 day"); + .defaultValue(0D) + .withDescription("Index state ttl in days, default stores the index permanently"); public static final ConfigOption INDEX_GLOBAL_ENABLED = ConfigOptions .key("index.global.enabled") .booleanType() - .defaultValue(false) + .defaultValue(true) .withDescription("Whether to update index for the old partition path\n" - + "if same key record with different partition path came in, default false"); + + "if same key record with different partition path came in, default true"); public static final ConfigOption INDEX_PARTITION_REGEX = ConfigOptions .key("index.partition.regex") @@ -255,15 +255,17 @@ public class FlinkOptions extends HoodieConfig { + "This will render any value set for the option in-effective"); /** - * Flag to indicate whether to drop duplicates upon insert. - * By default insert will accept duplicates, to gain extra performance. + * Flag to indicate whether to drop duplicates before insert/upsert. + * By default false to gain extra performance. */ - public static final ConfigOption INSERT_DROP_DUPS = ConfigOptions - .key("write.insert.drop.duplicates") + public static final ConfigOption PRE_COMBINE = ConfigOptions + .key("write.precombine") .booleanType() .defaultValue(false) - .withDescription("Flag to indicate whether to drop duplicates upon insert.\n" - + "By default insert will accept duplicates, to gain extra performance"); + .withDescription("Flag to indicate whether to drop duplicates before insert/upsert.\n" + + "By default these cases will accept duplicates, to gain extra performance:\n" + + "1) insert operation;\n" + + "2) upsert for MOR table, the MOR table deduplicate on reading"); public static final ConfigOption RETRY_TIMES = ConfigOptions .key("write.retry.times") @@ -496,8 +498,8 @@ public class FlinkOptions extends HoodieConfig { public static final ConfigOption COMPACTION_TARGET_IO = ConfigOptions .key("compaction.target_io") .longType() - .defaultValue(5120L) // default 5 GB - .withDescription("Target IO per compaction (both read and write), default 5 GB"); + .defaultValue(500 * 1024L) // default 500 GB + .withDescription("Target IO per compaction (both read and write), default 500 GB"); public static final ConfigOption CLEAN_ASYNC_ENABLED = ConfigOptions .key("clean.async.enabled") diff --git a/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteFunction.java b/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteFunction.java index f8eea2e89..d510de2b1 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteFunction.java +++ b/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteFunction.java @@ -419,7 +419,7 @@ public class StreamWriteFunction extends AbstractStreamWriteFunction { List records = bucket.writeBuffer(); ValidationUtils.checkState(records.size() > 0, "Data bucket to flush has no buffering records"); - if (config.getBoolean(FlinkOptions.INSERT_DROP_DUPS)) { + if (config.getBoolean(FlinkOptions.PRE_COMBINE)) { records = FlinkWriteHelper.newInstance().deduplicateRecords(records, (HoodieIndex) null, -1); } bucket.preWrite(records); @@ -454,7 +454,7 @@ public class StreamWriteFunction extends AbstractStreamWriteFunction { .forEach(bucket -> { List records = bucket.writeBuffer(); if (records.size() > 0) { - if (config.getBoolean(FlinkOptions.INSERT_DROP_DUPS)) { + if (config.getBoolean(FlinkOptions.PRE_COMBINE)) { records = FlinkWriteHelper.newInstance().deduplicateRecords(records, (HoodieIndex) null, -1); } bucket.preWrite(records); diff --git a/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java b/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java index 51280c3da..a7faeca5c 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java +++ b/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java @@ -211,13 +211,11 @@ public class StreamWriteOperatorCoordinator // the stream write task snapshot and flush the data buffer synchronously in sequence, // so a successful checkpoint subsumes the old one(follows the checkpoint subsuming contract) final boolean committed = commitInstant(this.instant); - if (tableState.scheduleCompaction) { - // if async compaction is on, schedule the compaction - if (committed || tableState.timeCompactionTriggerStrategy) { + if (committed) { + if (tableState.scheduleCompaction) { + // if async compaction is on, schedule the compaction writeClient.scheduleCompaction(Option.empty()); } - } - if (committed) { // start new instant. startInstant(); // sync Hive if is enabled @@ -532,7 +530,6 @@ public class StreamWriteOperatorCoordinator final String commitAction; final boolean isOverwrite; final boolean scheduleCompaction; - final boolean timeCompactionTriggerStrategy; final boolean syncHive; final boolean syncMetadata; @@ -542,7 +539,6 @@ public class StreamWriteOperatorCoordinator HoodieTableType.valueOf(conf.getString(FlinkOptions.TABLE_TYPE).toUpperCase(Locale.ROOT))); this.isOverwrite = WriteOperationType.isOverwrite(this.operationType); this.scheduleCompaction = StreamerUtil.needsScheduleCompaction(conf); - this.timeCompactionTriggerStrategy = StreamerUtil.isTimeCompactionTriggerStrategy(conf); this.syncHive = conf.getBoolean(FlinkOptions.HIVE_SYNC_ENABLED); this.syncMetadata = conf.getBoolean(FlinkOptions.METADATA_ENABLED); } diff --git a/hudi-flink/src/main/java/org/apache/hudi/sink/utils/PayloadCreation.java b/hudi-flink/src/main/java/org/apache/hudi/sink/utils/PayloadCreation.java index d10447f81..f2cb60d51 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/sink/utils/PayloadCreation.java +++ b/hudi-flink/src/main/java/org/apache/hudi/sink/utils/PayloadCreation.java @@ -55,7 +55,7 @@ public class PayloadCreation implements Serializable { } public static PayloadCreation instance(Configuration conf) throws Exception { - boolean shouldCombine = conf.getBoolean(FlinkOptions.INSERT_DROP_DUPS) + boolean shouldCombine = conf.getBoolean(FlinkOptions.PRE_COMBINE) || WriteOperationType.fromValue(conf.getString(FlinkOptions.OPERATION)) == WriteOperationType.UPSERT; String preCombineField = null; final Class[] argTypes; diff --git a/hudi-flink/src/main/java/org/apache/hudi/streamer/FlinkStreamerConfig.java b/hudi-flink/src/main/java/org/apache/hudi/streamer/FlinkStreamerConfig.java index c552bed22..7ca91f7e2 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/streamer/FlinkStreamerConfig.java +++ b/hudi-flink/src/main/java/org/apache/hudi/streamer/FlinkStreamerConfig.java @@ -112,7 +112,7 @@ public class FlinkStreamerConfig extends Configuration { @Parameter(names = {"--filter-dupes"}, description = "Should duplicate records from source be dropped/filtered out before insert/bulk-insert.") - public Boolean filterDupes = false; + public Boolean preCombine = false; @Parameter(names = {"--commit-on-errors"}, description = "Commit even when some records failed to be written.") public Boolean commitOnErrors = false; @@ -220,8 +220,8 @@ public class FlinkStreamerConfig extends Configuration { @Parameter(names = {"--compaction-max-memory"}, description = "Max memory in MB for compaction spillable map, default 100MB") public Integer compactionMaxMemory = 100; - @Parameter(names = {"--compaction-target-io"}, description = "Target IO per compaction (both read and write), default 5 GB") - public Long compactionTargetIo = 5120L; + @Parameter(names = {"--compaction-target-io"}, description = "Target IO per compaction (both read and write), default 500 GB") + public Long compactionTargetIo = 512000L; @Parameter(names = {"--clean-async-enabled"}, description = "Whether to cleanup the old commits immediately on new commits, enabled by default") public Boolean cleanAsyncEnabled = true; @@ -312,7 +312,7 @@ public class FlinkStreamerConfig extends Configuration { conf.setString(FlinkOptions.OPERATION, config.operation.value()); conf.setString(FlinkOptions.PRECOMBINE_FIELD, config.sourceOrderingField); conf.setString(FlinkOptions.PAYLOAD_CLASS_NAME, config.payloadClassName); - conf.setBoolean(FlinkOptions.INSERT_DROP_DUPS, config.filterDupes); + conf.setBoolean(FlinkOptions.PRE_COMBINE, config.preCombine); conf.setInteger(FlinkOptions.RETRY_TIMES, Integer.parseInt(config.instantRetryTimes)); conf.setLong(FlinkOptions.RETRY_INTERVAL_MS, Long.parseLong(config.instantRetryInterval)); conf.setBoolean(FlinkOptions.IGNORE_FAILED, config.commitOnErrors); diff --git a/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableFactory.java b/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableFactory.java index 627bc2c29..c19c83104 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableFactory.java +++ b/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableFactory.java @@ -18,6 +18,7 @@ package org.apache.hudi.table; +import org.apache.hudi.common.model.HoodieTableType; import org.apache.hudi.configuration.FlinkOptions; import org.apache.hudi.exception.HoodieValidationException; import org.apache.hudi.hive.MultiPartKeysValueExtractor; @@ -157,6 +158,8 @@ public class HoodieTableFactory implements DynamicTableSourceFactory, DynamicTab setupHiveOptions(conf); // read options setupReadOptions(conf); + // write options + setupWriteOptions(conf); // infer avro schema from physical DDL schema inferAvroSchema(conf, schema.toPhysicalRowDataType().notNull().getLogicalType()); } @@ -249,12 +252,6 @@ public class HoodieTableFactory implements DynamicTableSourceFactory, DynamicTab conf.setInteger(FlinkOptions.ARCHIVE_MIN_COMMITS, commitsToRetain + 10); conf.setInteger(FlinkOptions.ARCHIVE_MAX_COMMITS, commitsToRetain + 20); } - if (conf.getBoolean(FlinkOptions.COMPACTION_SCHEDULE_ENABLED) - && !conf.getBoolean(FlinkOptions.COMPACTION_ASYNC_ENABLED) - && FlinkOptions.isDefaultValueDefined(conf, FlinkOptions.COMPACTION_TARGET_IO)) { - // if compaction schedule is on, tweak the target io to 500GB - conf.setLong(FlinkOptions.COMPACTION_TARGET_IO, 500 * 1024L); - } if (StreamerUtil.allowDuplicateInserts(conf)) { // no need for compaction if insert duplicates is allowed conf.setBoolean(FlinkOptions.COMPACTION_ASYNC_ENABLED, false); @@ -282,6 +279,16 @@ public class HoodieTableFactory implements DynamicTableSourceFactory, DynamicTab } } + /** + * Sets up the write options from the table definition. + */ + private static void setupWriteOptions(Configuration conf) { + if (FlinkOptions.isDefaultValueDefined(conf, FlinkOptions.OPERATION) + && HoodieTableType.valueOf(conf.getString(FlinkOptions.TABLE_TYPE)) == HoodieTableType.COPY_ON_WRITE) { + conf.setBoolean(FlinkOptions.PRE_COMBINE, true); + } + } + /** * Inferences the deserialization Avro schema from the table schema (e.g. the DDL) * if both options {@link FlinkOptions#SOURCE_AVRO_SCHEMA_PATH} and diff --git a/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java b/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java index 7fb550d47..7e7bfaa3d 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java +++ b/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java @@ -154,7 +154,7 @@ public class StreamerUtil { HoodieWriteConfig.newBuilder() .withEngineType(EngineType.FLINK) .withPath(conf.getString(FlinkOptions.PATH)) - .combineInput(conf.getBoolean(FlinkOptions.INSERT_DROP_DUPS), true) + .combineInput(conf.getBoolean(FlinkOptions.PRE_COMBINE), true) .withMergeAllowDuplicateOnInserts(allowDuplicateInserts(conf)) .withCompactionConfig( HoodieCompactionConfig.newBuilder() @@ -302,16 +302,6 @@ public class StreamerUtil { && conf.getBoolean(FlinkOptions.COMPACTION_SCHEDULE_ENABLED); } - /** - * Returns whether the compaction trigger strategy is time based. - * - * @param conf The flink configuration. - */ - public static boolean isTimeCompactionTriggerStrategy(Configuration conf) { - final String strategy = conf.getString(FlinkOptions.COMPACTION_TRIGGER_STRATEGY); - return FlinkOptions.TIME_ELAPSED.equalsIgnoreCase(strategy) || FlinkOptions.NUM_OR_TIME.equalsIgnoreCase(strategy); - } - /** * Creates the meta client for reader. * diff --git a/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteCopyOnWrite.java b/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteCopyOnWrite.java index 624a8e8c4..da418f965 100644 --- a/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteCopyOnWrite.java +++ b/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteCopyOnWrite.java @@ -275,7 +275,7 @@ public class TestWriteCopyOnWrite { @Test public void testInsertDuplicates() throws Exception { // reset the config option - conf.setBoolean(FlinkOptions.INSERT_DROP_DUPS, true); + conf.setBoolean(FlinkOptions.PRE_COMBINE, true); funcWrapper = new StreamWriteFunctionWrapper<>(tempFile.getAbsolutePath(), conf); // open the function and ingest data @@ -470,7 +470,7 @@ public class TestWriteCopyOnWrite { public void testInsertWithDeduplication() throws Exception { // reset the config option conf.setDouble(FlinkOptions.WRITE_BATCH_SIZE, 0.0008); // 839 bytes batch size - conf.setBoolean(FlinkOptions.INSERT_DROP_DUPS, true); + conf.setBoolean(FlinkOptions.PRE_COMBINE, true); funcWrapper = new StreamWriteFunctionWrapper<>(tempFile.getAbsolutePath(), conf); // open the function and ingest data diff --git a/hudi-flink/src/test/java/org/apache/hudi/table/HoodieDataSourceITCase.java b/hudi-flink/src/test/java/org/apache/hudi/table/HoodieDataSourceITCase.java index 621cd1c43..9b4ea0084 100644 --- a/hudi-flink/src/test/java/org/apache/hudi/table/HoodieDataSourceITCase.java +++ b/hudi-flink/src/test/java/org/apache/hudi/table/HoodieDataSourceITCase.java @@ -652,7 +652,7 @@ public class HoodieDataSourceITCase extends AbstractTestBase { String hoodieTableDDL = sql("t1") .option(FlinkOptions.PATH, tempFile.getAbsolutePath()) .option(FlinkOptions.INDEX_GLOBAL_ENABLED, true) - .option(FlinkOptions.INSERT_DROP_DUPS, true) + .option(FlinkOptions.PRE_COMBINE, true) .end(); streamTableEnv.executeSql(hoodieTableDDL); @@ -674,7 +674,7 @@ public class HoodieDataSourceITCase extends AbstractTestBase { String hoodieTableDDL = sql("t1") .option(FlinkOptions.PATH, tempFile.getAbsolutePath()) .option(FlinkOptions.INDEX_GLOBAL_ENABLED, false) - .option(FlinkOptions.INSERT_DROP_DUPS, true) + .option(FlinkOptions.PRE_COMBINE, true) .end(); streamTableEnv.executeSql(hoodieTableDDL); diff --git a/hudi-flink/src/test/java/org/apache/hudi/table/TestHoodieTableFactory.java b/hudi-flink/src/test/java/org/apache/hudi/table/TestHoodieTableFactory.java index bbbb49d42..ad55dbf23 100644 --- a/hudi-flink/src/test/java/org/apache/hudi/table/TestHoodieTableFactory.java +++ b/hudi-flink/src/test/java/org/apache/hudi/table/TestHoodieTableFactory.java @@ -394,6 +394,21 @@ public class TestHoodieTableFactory { is("UTC")); } + @Test + void testSetupWriteOptionsForSink() { + final HoodieTableSink tableSink1 = + (HoodieTableSink) new HoodieTableFactory().createDynamicTableSink(MockContext.getInstance(this.conf)); + final Configuration conf1 = tableSink1.getConf(); + assertThat(conf1.get(FlinkOptions.PRE_COMBINE), is(true)); + + // set up operation as 'insert' + this.conf.setString(FlinkOptions.OPERATION, "insert"); + HoodieTableSink tableSink2 = + (HoodieTableSink) new HoodieTableFactory().createDynamicTableSink(MockContext.getInstance(this.conf)); + Configuration conf2 = tableSink2.getConf(); + assertThat(conf2.get(FlinkOptions.PRE_COMBINE), is(false)); + } + // ------------------------------------------------------------------------- // Inner Class // -------------------------------------------------------------------------