From 465d553df8c2b157a49e837e1a91cee46ba889d4 Mon Sep 17 00:00:00 2001 From: Danny Chan Date: Mon, 14 Mar 2022 14:22:07 +0800 Subject: [PATCH] [HUDI-3600] Tweak the default cleaning strategy to be more streaming friendly for flink (#5010) --- .../hudi/configuration/FlinkOptions.java | 12 +++++----- .../hudi/table/TestHoodieTableFactory.java | 22 ++++++++++--------- 2 files changed, 18 insertions(+), 16 deletions(-) diff --git a/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java b/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java index 1be906036..34bab1285 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java +++ b/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java @@ -552,21 +552,21 @@ public class FlinkOptions extends HoodieConfig { public static final ConfigOption CLEAN_RETAIN_COMMITS = ConfigOptions .key("clean.retain_commits") .intType() - .defaultValue(10)// default 10 commits + .defaultValue(30)// default 30 commits .withDescription("Number of commits to retain. So data will be retained for num_of_commits * time_between_commits (scheduled).\n" - + "This also directly translates into how much you can incrementally pull on this table, default 10"); + + "This also directly translates into how much you can incrementally pull on this table, default 30"); public static final ConfigOption ARCHIVE_MAX_COMMITS = ConfigOptions .key("archive.max_commits") .intType() - .defaultValue(30)// default max 30 commits - .withDescription("Max number of commits to keep before archiving older commits into a sequential log, default 30"); + .defaultValue(50)// default max 50 commits + .withDescription("Max number of commits to keep before archiving older commits into a sequential log, default 50"); public static final ConfigOption ARCHIVE_MIN_COMMITS = ConfigOptions .key("archive.min_commits") .intType() - .defaultValue(20)// default min 20 commits - .withDescription("Min number of commits to keep before archiving older commits into a sequential log, default 20"); + .defaultValue(40)// default min 40 commits + .withDescription("Min number of commits to keep before archiving older commits into a sequential log, default 40"); // ------------------------------------------------------------------------ // Hive Sync Options diff --git a/hudi-flink/src/test/java/org/apache/hudi/table/TestHoodieTableFactory.java b/hudi-flink/src/test/java/org/apache/hudi/table/TestHoodieTableFactory.java index a76e00816..c6a1b0068 100644 --- a/hudi-flink/src/test/java/org/apache/hudi/table/TestHoodieTableFactory.java +++ b/hudi-flink/src/test/java/org/apache/hudi/table/TestHoodieTableFactory.java @@ -253,17 +253,18 @@ public class TestHoodieTableFactory { final MockContext sourceContext1 = MockContext.getInstance(this.conf, schema1, "f2"); final HoodieTableSource tableSource1 = (HoodieTableSource) new HoodieTableFactory().createDynamicTableSource(sourceContext1); final Configuration conf1 = tableSource1.getConf(); - assertThat(conf1.getInteger(FlinkOptions.ARCHIVE_MIN_COMMITS), is(20)); - assertThat(conf1.getInteger(FlinkOptions.ARCHIVE_MAX_COMMITS), is(30)); + assertThat(conf1.getInteger(FlinkOptions.ARCHIVE_MIN_COMMITS), is(FlinkOptions.ARCHIVE_MIN_COMMITS.defaultValue())); + assertThat(conf1.getInteger(FlinkOptions.ARCHIVE_MAX_COMMITS), is(FlinkOptions.ARCHIVE_MAX_COMMITS.defaultValue())); // set up new retains commits that is greater than min archive commits - this.conf.setString(FlinkOptions.CLEAN_RETAIN_COMMITS.key(), "25"); + final int retainCommits = FlinkOptions.ARCHIVE_MIN_COMMITS.defaultValue() + 5; + this.conf.setInteger(FlinkOptions.CLEAN_RETAIN_COMMITS.key(), retainCommits); final MockContext sourceContext2 = MockContext.getInstance(this.conf, schema1, "f2"); final HoodieTableSource tableSource2 = (HoodieTableSource) new HoodieTableFactory().createDynamicTableSource(sourceContext2); final Configuration conf2 = tableSource2.getConf(); - assertThat(conf2.getInteger(FlinkOptions.ARCHIVE_MIN_COMMITS), is(35)); - assertThat(conf2.getInteger(FlinkOptions.ARCHIVE_MAX_COMMITS), is(45)); + assertThat(conf2.getInteger(FlinkOptions.ARCHIVE_MIN_COMMITS), is(retainCommits + 10)); + assertThat(conf2.getInteger(FlinkOptions.ARCHIVE_MAX_COMMITS), is(retainCommits + 20)); } @Test @@ -401,17 +402,18 @@ public class TestHoodieTableFactory { final MockContext sinkContext1 = MockContext.getInstance(this.conf, schema1, "f2"); final HoodieTableSink tableSink1 = (HoodieTableSink) new HoodieTableFactory().createDynamicTableSink(sinkContext1); final Configuration conf1 = tableSink1.getConf(); - assertThat(conf1.getInteger(FlinkOptions.ARCHIVE_MIN_COMMITS), is(20)); - assertThat(conf1.getInteger(FlinkOptions.ARCHIVE_MAX_COMMITS), is(30)); + assertThat(conf1.getInteger(FlinkOptions.ARCHIVE_MIN_COMMITS), is(FlinkOptions.ARCHIVE_MIN_COMMITS.defaultValue())); + assertThat(conf1.getInteger(FlinkOptions.ARCHIVE_MAX_COMMITS), is(FlinkOptions.ARCHIVE_MAX_COMMITS.defaultValue())); // set up new retains commits that is greater than min archive commits - this.conf.setString(FlinkOptions.CLEAN_RETAIN_COMMITS.key(), "25"); + final int retainCommits = FlinkOptions.ARCHIVE_MIN_COMMITS.defaultValue() + 5; + this.conf.setInteger(FlinkOptions.CLEAN_RETAIN_COMMITS.key(), retainCommits); final MockContext sinkContext2 = MockContext.getInstance(this.conf, schema1, "f2"); final HoodieTableSink tableSink2 = (HoodieTableSink) new HoodieTableFactory().createDynamicTableSink(sinkContext2); final Configuration conf2 = tableSink2.getConf(); - assertThat(conf2.getInteger(FlinkOptions.ARCHIVE_MIN_COMMITS), is(35)); - assertThat(conf2.getInteger(FlinkOptions.ARCHIVE_MAX_COMMITS), is(45)); + assertThat(conf2.getInteger(FlinkOptions.ARCHIVE_MIN_COMMITS), is(retainCommits + 10)); + assertThat(conf2.getInteger(FlinkOptions.ARCHIVE_MAX_COMMITS), is(retainCommits + 20)); } @Test