From 942a024e74af52e09cabbfe967f5da0ef108bdbb Mon Sep 17 00:00:00 2001 From: Danny Chan Date: Sat, 10 Jul 2021 15:40:30 +0800 Subject: [PATCH] [HUDI-2143] Tweak the default compaction target IO to 500GB when flink async compaction is off (#3238) --- .../org/apache/hudi/table/HoodieTableFactory.java | 14 ++++++++++---- 1 file changed, 10 insertions(+), 4 deletions(-) diff --git a/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableFactory.java b/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableFactory.java index 2c2193c78..695fe0086 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableFactory.java +++ b/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableFactory.java @@ -152,8 +152,8 @@ public class HoodieTableFactory implements DynamicTableSourceFactory, DynamicTab } // hoodie key about options setupHoodieKeyOptions(conf, table); - // cleaning options - setupCleaningOptions(conf); + // compaction options + setupCompactionOptions(conf); // infer avro schema from physical DDL schema inferAvroSchema(conf, schema.toRowDataType().notNull().getLogicalType()); } @@ -192,9 +192,9 @@ public class HoodieTableFactory implements DynamicTableSourceFactory, DynamicTab } /** - * Sets up the cleaning options from the table definition. + * Sets up the compaction options from the table definition. */ - private static void setupCleaningOptions(Configuration conf) { + private static void setupCompactionOptions(Configuration conf) { int commitsToRetain = conf.getInteger(FlinkOptions.CLEAN_RETAIN_COMMITS); int minCommitsToKeep = conf.getInteger(FlinkOptions.ARCHIVE_MIN_COMMITS); if (commitsToRetain >= minCommitsToKeep) { @@ -205,6 +205,12 @@ public class HoodieTableFactory implements DynamicTableSourceFactory, DynamicTab conf.setInteger(FlinkOptions.ARCHIVE_MIN_COMMITS, commitsToRetain + 10); conf.setInteger(FlinkOptions.ARCHIVE_MAX_COMMITS, commitsToRetain + 20); } + if (conf.getBoolean(FlinkOptions.COMPACTION_SCHEDULE_ENABLED) + && !conf.getBoolean(FlinkOptions.COMPACTION_ASYNC_ENABLED) + && FlinkOptions.isDefaultValueDefined(conf, FlinkOptions.COMPACTION_TARGET_IO)) { + // if compaction schedule is on, tweak the target io to 500GB + conf.setLong(FlinkOptions.COMPACTION_TARGET_IO, 500 * 1024L); + } } /**