Fixes HUDI-9 : Check precondition minInstantsToKeep > cleanerCommitsRetained
- Added a precondition check, otherwise incr pull could miss commits - Lowered default cleaner retention to 10, to enable simpler understanding for newbies - Bumped down min/max instants to retain as well
This commit is contained in:
committed by
vinoth chandar
parent
3d9041e216
commit
372fbc4733
@@ -43,8 +43,8 @@ public class HoodieCompactionConfig extends DefaultHoodieConfig {
|
||||
public static final String CLEANER_FILE_VERSIONS_RETAINED_PROP =
|
||||
"hoodie.cleaner.fileversions" + ".retained";
|
||||
public static final String CLEANER_COMMITS_RETAINED_PROP = "hoodie.cleaner.commits.retained";
|
||||
public static final String MAX_COMMITS_TO_KEEP = "hoodie.keep.max.commits";
|
||||
public static final String MIN_COMMITS_TO_KEEP = "hoodie.keep.min.commits";
|
||||
public static final String MAX_COMMITS_TO_KEEP_PROP = "hoodie.keep.max.commits";
|
||||
public static final String MIN_COMMITS_TO_KEEP_PROP = "hoodie.keep.min.commits";
|
||||
// Upsert uses this file size to compact new data onto existing files..
|
||||
public static final String PARQUET_SMALL_FILE_LIMIT_BYTES = "hoodie.parquet.small.file.limit";
|
||||
// Turned off by default
|
||||
@@ -101,9 +101,9 @@ public class HoodieCompactionConfig extends DefaultHoodieConfig {
|
||||
private static final String DEFAULT_INLINE_COMPACT = "false";
|
||||
private static final String DEFAULT_INLINE_COMPACT_NUM_DELTA_COMMITS = "1";
|
||||
private static final String DEFAULT_CLEANER_FILE_VERSIONS_RETAINED = "3";
|
||||
private static final String DEFAULT_CLEANER_COMMITS_RETAINED = "24";
|
||||
private static final String DEFAULT_MAX_COMMITS_TO_KEEP = String.valueOf(128);
|
||||
private static final String DEFAULT_MIN_COMMITS_TO_KEEP = String.valueOf(96);
|
||||
private static final String DEFAULT_CLEANER_COMMITS_RETAINED = "10";
|
||||
private static final String DEFAULT_MAX_COMMITS_TO_KEEP = "30";
|
||||
private static final String DEFAULT_MIN_COMMITS_TO_KEEP = "20";
|
||||
public static final String TARGET_PARTITIONS_PER_DAYBASED_COMPACTION_PROP = "hoodie.compaction.daybased.target"
|
||||
+ ".partitions";
|
||||
// 500GB of target IO per compaction (both read and write)
|
||||
@@ -168,8 +168,8 @@ public class HoodieCompactionConfig extends DefaultHoodieConfig {
|
||||
}
|
||||
|
||||
public Builder archiveCommitsWith(int minToKeep, int maxToKeep) {
|
||||
props.setProperty(MIN_COMMITS_TO_KEEP, String.valueOf(minToKeep));
|
||||
props.setProperty(MAX_COMMITS_TO_KEEP, String.valueOf(maxToKeep));
|
||||
props.setProperty(MIN_COMMITS_TO_KEEP_PROP, String.valueOf(minToKeep));
|
||||
props.setProperty(MAX_COMMITS_TO_KEEP_PROP, String.valueOf(maxToKeep));
|
||||
return this;
|
||||
}
|
||||
|
||||
@@ -254,9 +254,9 @@ public class HoodieCompactionConfig extends DefaultHoodieConfig {
|
||||
CLEANER_FILE_VERSIONS_RETAINED_PROP, DEFAULT_CLEANER_FILE_VERSIONS_RETAINED);
|
||||
setDefaultOnCondition(props, !props.containsKey(CLEANER_COMMITS_RETAINED_PROP),
|
||||
CLEANER_COMMITS_RETAINED_PROP, DEFAULT_CLEANER_COMMITS_RETAINED);
|
||||
setDefaultOnCondition(props, !props.containsKey(MAX_COMMITS_TO_KEEP), MAX_COMMITS_TO_KEEP,
|
||||
setDefaultOnCondition(props, !props.containsKey(MAX_COMMITS_TO_KEEP_PROP), MAX_COMMITS_TO_KEEP_PROP,
|
||||
DEFAULT_MAX_COMMITS_TO_KEEP);
|
||||
setDefaultOnCondition(props, !props.containsKey(MIN_COMMITS_TO_KEEP), MIN_COMMITS_TO_KEEP,
|
||||
setDefaultOnCondition(props, !props.containsKey(MIN_COMMITS_TO_KEEP_PROP), MIN_COMMITS_TO_KEEP_PROP,
|
||||
DEFAULT_MIN_COMMITS_TO_KEEP);
|
||||
setDefaultOnCondition(props, !props.containsKey(PARQUET_SMALL_FILE_LIMIT_BYTES),
|
||||
PARQUET_SMALL_FILE_LIMIT_BYTES, DEFAULT_PARQUET_SMALL_FILE_LIMIT_BYTES);
|
||||
@@ -283,10 +283,19 @@ public class HoodieCompactionConfig extends DefaultHoodieConfig {
|
||||
TARGET_PARTITIONS_PER_DAYBASED_COMPACTION_PROP, DEFAULT_TARGET_PARTITIONS_PER_DAYBASED_COMPACTION);
|
||||
|
||||
HoodieCleaningPolicy.valueOf(props.getProperty(CLEANER_POLICY_PROP));
|
||||
Preconditions.checkArgument(Integer.parseInt(props.getProperty(MAX_COMMITS_TO_KEEP)) > Integer
|
||||
.parseInt(props.getProperty(MIN_COMMITS_TO_KEEP)));
|
||||
|
||||
// Ensure minInstantsToKeep > cleanerCommitsRetained, otherwise we will archive some
|
||||
// commit instant on timeline, that still has not been cleaned. Could miss some data via incr pull
|
||||
int minInstantsToKeep = Integer.parseInt(props.getProperty(HoodieCompactionConfig.MIN_COMMITS_TO_KEEP_PROP));
|
||||
int maxInstantsToKeep = Integer.parseInt(props.getProperty(HoodieCompactionConfig.MAX_COMMITS_TO_KEEP_PROP));
|
||||
int cleanerCommitsRetained = Integer
|
||||
.parseInt(props.getProperty(HoodieCompactionConfig.CLEANER_COMMITS_RETAINED_PROP));
|
||||
Preconditions.checkArgument(maxInstantsToKeep > minInstantsToKeep);
|
||||
Preconditions.checkArgument(minInstantsToKeep > cleanerCommitsRetained,
|
||||
String.format("Increase %s=%d to be greater than %s=%d. Otherwise, there is risk of incremental pull "
|
||||
+ "missing data from few instants.", HoodieCompactionConfig.MIN_COMMITS_TO_KEEP_PROP,
|
||||
minInstantsToKeep, HoodieCompactionConfig.CLEANER_COMMITS_RETAINED_PROP, cleanerCommitsRetained));
|
||||
return config;
|
||||
}
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
@@ -176,11 +176,11 @@ public class HoodieWriteConfig extends DefaultHoodieConfig {
|
||||
}
|
||||
|
||||
public int getMaxCommitsToKeep() {
|
||||
return Integer.parseInt(props.getProperty(HoodieCompactionConfig.MAX_COMMITS_TO_KEEP));
|
||||
return Integer.parseInt(props.getProperty(HoodieCompactionConfig.MAX_COMMITS_TO_KEEP_PROP));
|
||||
}
|
||||
|
||||
public int getMinCommitsToKeep() {
|
||||
return Integer.parseInt(props.getProperty(HoodieCompactionConfig.MIN_COMMITS_TO_KEEP));
|
||||
return Integer.parseInt(props.getProperty(HoodieCompactionConfig.MIN_COMMITS_TO_KEEP_PROP));
|
||||
}
|
||||
|
||||
public int getParquetSmallFileLimit() {
|
||||
|
||||
@@ -34,8 +34,9 @@ public class HoodieWriteConfigTest {
|
||||
public void testPropertyLoading() throws IOException {
|
||||
Builder builder = HoodieWriteConfig.newBuilder().withPath("/tmp");
|
||||
Map<String, String> params = Maps.newHashMap();
|
||||
params.put(HoodieCompactionConfig.MAX_COMMITS_TO_KEEP, "5");
|
||||
params.put(HoodieCompactionConfig.MIN_COMMITS_TO_KEEP, "2");
|
||||
params.put(HoodieCompactionConfig.CLEANER_COMMITS_RETAINED_PROP, "1");
|
||||
params.put(HoodieCompactionConfig.MAX_COMMITS_TO_KEEP_PROP, "5");
|
||||
params.put(HoodieCompactionConfig.MIN_COMMITS_TO_KEEP_PROP, "2");
|
||||
ByteArrayOutputStream outStream = saveParamsIntoOutputStream(params);
|
||||
ByteArrayInputStream inputStream = new ByteArrayInputStream(outStream.toByteArray());
|
||||
try {
|
||||
|
||||
@@ -134,7 +134,7 @@ public class TestHoodieCommitArchiveLog {
|
||||
HoodieWriteConfig cfg = HoodieWriteConfig.newBuilder().withPath(basePath)
|
||||
.withSchema(HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA).withParallelism(2, 2)
|
||||
.withCompactionConfig(
|
||||
HoodieCompactionConfig.newBuilder().archiveCommitsWith(2, 4).build())
|
||||
HoodieCompactionConfig.newBuilder().retainCommits(1).archiveCommitsWith(2, 4).build())
|
||||
.forTable("test-trip-table").build();
|
||||
HoodieTestUtils.init(hadoopConf, basePath);
|
||||
// Requested Compaction
|
||||
@@ -279,7 +279,7 @@ public class TestHoodieCommitArchiveLog {
|
||||
HoodieWriteConfig cfg = HoodieWriteConfig.newBuilder().withPath(basePath)
|
||||
.withSchema(HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA).withParallelism(2, 2)
|
||||
.forTable("test-trip-table").withCompactionConfig(
|
||||
HoodieCompactionConfig.newBuilder().archiveCommitsWith(2, 5).build()).build();
|
||||
HoodieCompactionConfig.newBuilder().retainCommits(1).archiveCommitsWith(2, 5).build()).build();
|
||||
HoodieTableMetaClient metaClient = new HoodieTableMetaClient(dfs.getConf(), basePath);
|
||||
HoodieCommitArchiveLog archiveLog = new HoodieCommitArchiveLog(cfg, metaClient);
|
||||
// Requested Compaction
|
||||
@@ -346,7 +346,7 @@ public class TestHoodieCommitArchiveLog {
|
||||
HoodieWriteConfig cfg = HoodieWriteConfig.newBuilder().withPath(basePath)
|
||||
.withSchema(HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA).withParallelism(2, 2)
|
||||
.forTable("test-trip-table").withCompactionConfig(
|
||||
HoodieCompactionConfig.newBuilder().archiveCommitsWith(2, 5).build()).build();
|
||||
HoodieCompactionConfig.newBuilder().retainCommits(1).archiveCommitsWith(2, 5).build()).build();
|
||||
HoodieTableMetaClient metaClient = new HoodieTableMetaClient(dfs.getConf(), basePath);
|
||||
HoodieCommitArchiveLog archiveLog = new HoodieCommitArchiveLog(cfg, metaClient);
|
||||
HoodieTestDataGenerator.createCommitFile(basePath, "100", dfs.getConf());
|
||||
@@ -372,7 +372,7 @@ public class TestHoodieCommitArchiveLog {
|
||||
HoodieWriteConfig cfg = HoodieWriteConfig.newBuilder().withPath(basePath)
|
||||
.withSchema(HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA).withParallelism(2, 2)
|
||||
.forTable("test-trip-table").withCompactionConfig(
|
||||
HoodieCompactionConfig.newBuilder().archiveCommitsWith(2, 5).build()).build();
|
||||
HoodieCompactionConfig.newBuilder().retainCommits(1).archiveCommitsWith(2, 5).build()).build();
|
||||
HoodieTableMetaClient metaClient = new HoodieTableMetaClient(dfs.getConf(), basePath);
|
||||
HoodieCommitArchiveLog archiveLog = new HoodieCommitArchiveLog(cfg, metaClient);
|
||||
HoodieTestDataGenerator.createCommitFile(basePath, "100", dfs.getConf());
|
||||
@@ -404,7 +404,7 @@ public class TestHoodieCommitArchiveLog {
|
||||
HoodieWriteConfig cfg = HoodieWriteConfig.newBuilder().withPath(basePath)
|
||||
.withSchema(HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA).withParallelism(2, 2)
|
||||
.forTable("test-trip-table").withCompactionConfig(
|
||||
HoodieCompactionConfig.newBuilder().archiveCommitsWith(2, 5).build()).build();
|
||||
HoodieCompactionConfig.newBuilder().retainCommits(1).archiveCommitsWith(2, 5).build()).build();
|
||||
HoodieTableMetaClient metaClient = new HoodieTableMetaClient(dfs.getConf(), basePath);
|
||||
HoodieCommitArchiveLog archiveLog = new HoodieCommitArchiveLog(cfg, metaClient);
|
||||
HoodieTestDataGenerator.createCommitFile(basePath, "100", dfs.getConf());
|
||||
|
||||
Reference in New Issue
Block a user