From 372fbc4733824d6f61db7f4b7d5e289dc902e634 Mon Sep 17 00:00:00 2001 From: Vinoth Chandar Date: Tue, 26 Mar 2019 18:52:33 -0700 Subject: [PATCH] Fixes HUDI-9 : Check precondition minInstantsToKeep > cleanerCommitsRetained - Added a precondition check, otherwise incr pull could miss commits - Lowered default cleaner retention to 10, to enable simpler understanding for newbies - Bumped down min/max instants to retain as well --- .../hoodie/config/HoodieCompactionConfig.java | 33 ++++++++++++------- .../uber/hoodie/config/HoodieWriteConfig.java | 4 +-- .../hoodie/config/HoodieWriteConfigTest.java | 5 +-- .../hoodie/io/TestHoodieCommitArchiveLog.java | 10 +++--- 4 files changed, 31 insertions(+), 21 deletions(-) diff --git a/hoodie-client/src/main/java/com/uber/hoodie/config/HoodieCompactionConfig.java b/hoodie-client/src/main/java/com/uber/hoodie/config/HoodieCompactionConfig.java index b1709955a..2cde40138 100644 --- a/hoodie-client/src/main/java/com/uber/hoodie/config/HoodieCompactionConfig.java +++ b/hoodie-client/src/main/java/com/uber/hoodie/config/HoodieCompactionConfig.java @@ -43,8 +43,8 @@ public class HoodieCompactionConfig extends DefaultHoodieConfig { public static final String CLEANER_FILE_VERSIONS_RETAINED_PROP = "hoodie.cleaner.fileversions" + ".retained"; public static final String CLEANER_COMMITS_RETAINED_PROP = "hoodie.cleaner.commits.retained"; - public static final String MAX_COMMITS_TO_KEEP = "hoodie.keep.max.commits"; - public static final String MIN_COMMITS_TO_KEEP = "hoodie.keep.min.commits"; + public static final String MAX_COMMITS_TO_KEEP_PROP = "hoodie.keep.max.commits"; + public static final String MIN_COMMITS_TO_KEEP_PROP = "hoodie.keep.min.commits"; // Upsert uses this file size to compact new data onto existing files.. public static final String PARQUET_SMALL_FILE_LIMIT_BYTES = "hoodie.parquet.small.file.limit"; // Turned off by default @@ -101,9 +101,9 @@ public class HoodieCompactionConfig extends DefaultHoodieConfig { private static final String DEFAULT_INLINE_COMPACT = "false"; private static final String DEFAULT_INLINE_COMPACT_NUM_DELTA_COMMITS = "1"; private static final String DEFAULT_CLEANER_FILE_VERSIONS_RETAINED = "3"; - private static final String DEFAULT_CLEANER_COMMITS_RETAINED = "24"; - private static final String DEFAULT_MAX_COMMITS_TO_KEEP = String.valueOf(128); - private static final String DEFAULT_MIN_COMMITS_TO_KEEP = String.valueOf(96); + private static final String DEFAULT_CLEANER_COMMITS_RETAINED = "10"; + private static final String DEFAULT_MAX_COMMITS_TO_KEEP = "30"; + private static final String DEFAULT_MIN_COMMITS_TO_KEEP = "20"; public static final String TARGET_PARTITIONS_PER_DAYBASED_COMPACTION_PROP = "hoodie.compaction.daybased.target" + ".partitions"; // 500GB of target IO per compaction (both read and write) @@ -168,8 +168,8 @@ public class HoodieCompactionConfig extends DefaultHoodieConfig { } public Builder archiveCommitsWith(int minToKeep, int maxToKeep) { - props.setProperty(MIN_COMMITS_TO_KEEP, String.valueOf(minToKeep)); - props.setProperty(MAX_COMMITS_TO_KEEP, String.valueOf(maxToKeep)); + props.setProperty(MIN_COMMITS_TO_KEEP_PROP, String.valueOf(minToKeep)); + props.setProperty(MAX_COMMITS_TO_KEEP_PROP, String.valueOf(maxToKeep)); return this; } @@ -254,9 +254,9 @@ public class HoodieCompactionConfig extends DefaultHoodieConfig { CLEANER_FILE_VERSIONS_RETAINED_PROP, DEFAULT_CLEANER_FILE_VERSIONS_RETAINED); setDefaultOnCondition(props, !props.containsKey(CLEANER_COMMITS_RETAINED_PROP), CLEANER_COMMITS_RETAINED_PROP, DEFAULT_CLEANER_COMMITS_RETAINED); - setDefaultOnCondition(props, !props.containsKey(MAX_COMMITS_TO_KEEP), MAX_COMMITS_TO_KEEP, + setDefaultOnCondition(props, !props.containsKey(MAX_COMMITS_TO_KEEP_PROP), MAX_COMMITS_TO_KEEP_PROP, DEFAULT_MAX_COMMITS_TO_KEEP); - setDefaultOnCondition(props, !props.containsKey(MIN_COMMITS_TO_KEEP), MIN_COMMITS_TO_KEEP, + setDefaultOnCondition(props, !props.containsKey(MIN_COMMITS_TO_KEEP_PROP), MIN_COMMITS_TO_KEEP_PROP, DEFAULT_MIN_COMMITS_TO_KEEP); setDefaultOnCondition(props, !props.containsKey(PARQUET_SMALL_FILE_LIMIT_BYTES), PARQUET_SMALL_FILE_LIMIT_BYTES, DEFAULT_PARQUET_SMALL_FILE_LIMIT_BYTES); @@ -283,10 +283,19 @@ public class HoodieCompactionConfig extends DefaultHoodieConfig { TARGET_PARTITIONS_PER_DAYBASED_COMPACTION_PROP, DEFAULT_TARGET_PARTITIONS_PER_DAYBASED_COMPACTION); HoodieCleaningPolicy.valueOf(props.getProperty(CLEANER_POLICY_PROP)); - Preconditions.checkArgument(Integer.parseInt(props.getProperty(MAX_COMMITS_TO_KEEP)) > Integer - .parseInt(props.getProperty(MIN_COMMITS_TO_KEEP))); + + // Ensure minInstantsToKeep > cleanerCommitsRetained, otherwise we will archive some + // commit instant on timeline, that still has not been cleaned. Could miss some data via incr pull + int minInstantsToKeep = Integer.parseInt(props.getProperty(HoodieCompactionConfig.MIN_COMMITS_TO_KEEP_PROP)); + int maxInstantsToKeep = Integer.parseInt(props.getProperty(HoodieCompactionConfig.MAX_COMMITS_TO_KEEP_PROP)); + int cleanerCommitsRetained = Integer + .parseInt(props.getProperty(HoodieCompactionConfig.CLEANER_COMMITS_RETAINED_PROP)); + Preconditions.checkArgument(maxInstantsToKeep > minInstantsToKeep); + Preconditions.checkArgument(minInstantsToKeep > cleanerCommitsRetained, + String.format("Increase %s=%d to be greater than %s=%d. Otherwise, there is risk of incremental pull " + + "missing data from few instants.", HoodieCompactionConfig.MIN_COMMITS_TO_KEEP_PROP, + minInstantsToKeep, HoodieCompactionConfig.CLEANER_COMMITS_RETAINED_PROP, cleanerCommitsRetained)); return config; } - } } diff --git a/hoodie-client/src/main/java/com/uber/hoodie/config/HoodieWriteConfig.java b/hoodie-client/src/main/java/com/uber/hoodie/config/HoodieWriteConfig.java index d1a5e467f..67b38d553 100644 --- a/hoodie-client/src/main/java/com/uber/hoodie/config/HoodieWriteConfig.java +++ b/hoodie-client/src/main/java/com/uber/hoodie/config/HoodieWriteConfig.java @@ -176,11 +176,11 @@ public class HoodieWriteConfig extends DefaultHoodieConfig { } public int getMaxCommitsToKeep() { - return Integer.parseInt(props.getProperty(HoodieCompactionConfig.MAX_COMMITS_TO_KEEP)); + return Integer.parseInt(props.getProperty(HoodieCompactionConfig.MAX_COMMITS_TO_KEEP_PROP)); } public int getMinCommitsToKeep() { - return Integer.parseInt(props.getProperty(HoodieCompactionConfig.MIN_COMMITS_TO_KEEP)); + return Integer.parseInt(props.getProperty(HoodieCompactionConfig.MIN_COMMITS_TO_KEEP_PROP)); } public int getParquetSmallFileLimit() { diff --git a/hoodie-client/src/test/java/com/uber/hoodie/config/HoodieWriteConfigTest.java b/hoodie-client/src/test/java/com/uber/hoodie/config/HoodieWriteConfigTest.java index 3e6fc8e68..0835e67d6 100644 --- a/hoodie-client/src/test/java/com/uber/hoodie/config/HoodieWriteConfigTest.java +++ b/hoodie-client/src/test/java/com/uber/hoodie/config/HoodieWriteConfigTest.java @@ -34,8 +34,9 @@ public class HoodieWriteConfigTest { public void testPropertyLoading() throws IOException { Builder builder = HoodieWriteConfig.newBuilder().withPath("/tmp"); Map params = Maps.newHashMap(); - params.put(HoodieCompactionConfig.MAX_COMMITS_TO_KEEP, "5"); - params.put(HoodieCompactionConfig.MIN_COMMITS_TO_KEEP, "2"); + params.put(HoodieCompactionConfig.CLEANER_COMMITS_RETAINED_PROP, "1"); + params.put(HoodieCompactionConfig.MAX_COMMITS_TO_KEEP_PROP, "5"); + params.put(HoodieCompactionConfig.MIN_COMMITS_TO_KEEP_PROP, "2"); ByteArrayOutputStream outStream = saveParamsIntoOutputStream(params); ByteArrayInputStream inputStream = new ByteArrayInputStream(outStream.toByteArray()); try { diff --git a/hoodie-client/src/test/java/com/uber/hoodie/io/TestHoodieCommitArchiveLog.java b/hoodie-client/src/test/java/com/uber/hoodie/io/TestHoodieCommitArchiveLog.java index 76c364115..2d6f9578e 100644 --- a/hoodie-client/src/test/java/com/uber/hoodie/io/TestHoodieCommitArchiveLog.java +++ b/hoodie-client/src/test/java/com/uber/hoodie/io/TestHoodieCommitArchiveLog.java @@ -134,7 +134,7 @@ public class TestHoodieCommitArchiveLog { HoodieWriteConfig cfg = HoodieWriteConfig.newBuilder().withPath(basePath) .withSchema(HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA).withParallelism(2, 2) .withCompactionConfig( - HoodieCompactionConfig.newBuilder().archiveCommitsWith(2, 4).build()) + HoodieCompactionConfig.newBuilder().retainCommits(1).archiveCommitsWith(2, 4).build()) .forTable("test-trip-table").build(); HoodieTestUtils.init(hadoopConf, basePath); // Requested Compaction @@ -279,7 +279,7 @@ public class TestHoodieCommitArchiveLog { HoodieWriteConfig cfg = HoodieWriteConfig.newBuilder().withPath(basePath) .withSchema(HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA).withParallelism(2, 2) .forTable("test-trip-table").withCompactionConfig( - HoodieCompactionConfig.newBuilder().archiveCommitsWith(2, 5).build()).build(); + HoodieCompactionConfig.newBuilder().retainCommits(1).archiveCommitsWith(2, 5).build()).build(); HoodieTableMetaClient metaClient = new HoodieTableMetaClient(dfs.getConf(), basePath); HoodieCommitArchiveLog archiveLog = new HoodieCommitArchiveLog(cfg, metaClient); // Requested Compaction @@ -346,7 +346,7 @@ public class TestHoodieCommitArchiveLog { HoodieWriteConfig cfg = HoodieWriteConfig.newBuilder().withPath(basePath) .withSchema(HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA).withParallelism(2, 2) .forTable("test-trip-table").withCompactionConfig( - HoodieCompactionConfig.newBuilder().archiveCommitsWith(2, 5).build()).build(); + HoodieCompactionConfig.newBuilder().retainCommits(1).archiveCommitsWith(2, 5).build()).build(); HoodieTableMetaClient metaClient = new HoodieTableMetaClient(dfs.getConf(), basePath); HoodieCommitArchiveLog archiveLog = new HoodieCommitArchiveLog(cfg, metaClient); HoodieTestDataGenerator.createCommitFile(basePath, "100", dfs.getConf()); @@ -372,7 +372,7 @@ public class TestHoodieCommitArchiveLog { HoodieWriteConfig cfg = HoodieWriteConfig.newBuilder().withPath(basePath) .withSchema(HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA).withParallelism(2, 2) .forTable("test-trip-table").withCompactionConfig( - HoodieCompactionConfig.newBuilder().archiveCommitsWith(2, 5).build()).build(); + HoodieCompactionConfig.newBuilder().retainCommits(1).archiveCommitsWith(2, 5).build()).build(); HoodieTableMetaClient metaClient = new HoodieTableMetaClient(dfs.getConf(), basePath); HoodieCommitArchiveLog archiveLog = new HoodieCommitArchiveLog(cfg, metaClient); HoodieTestDataGenerator.createCommitFile(basePath, "100", dfs.getConf()); @@ -404,7 +404,7 @@ public class TestHoodieCommitArchiveLog { HoodieWriteConfig cfg = HoodieWriteConfig.newBuilder().withPath(basePath) .withSchema(HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA).withParallelism(2, 2) .forTable("test-trip-table").withCompactionConfig( - HoodieCompactionConfig.newBuilder().archiveCommitsWith(2, 5).build()).build(); + HoodieCompactionConfig.newBuilder().retainCommits(1).archiveCommitsWith(2, 5).build()).build(); HoodieTableMetaClient metaClient = new HoodieTableMetaClient(dfs.getConf(), basePath); HoodieCommitArchiveLog archiveLog = new HoodieCommitArchiveLog(cfg, metaClient); HoodieTestDataGenerator.createCommitFile(basePath, "100", dfs.getConf());