[HUDI-2881] Compact the file group with larger log files to reduce write amplification (#4152)
This commit is contained in:
@@ -171,6 +171,12 @@ public class HoodieCompactionConfig extends HoodieConfig {
|
||||
.withDocumentation("Amount of MBs to spend during compaction run for the LogFileSizeBasedCompactionStrategy. "
|
||||
+ "This value helps bound ingestion latency while compaction is run inline mode.");
|
||||
|
||||
public static final ConfigProperty<Long> COMPACTION_LOG_FILE_SIZE_THRESHOLD = ConfigProperty
|
||||
.key("hoodie.compaction.logfile.size.threshold")
|
||||
.defaultValue(0L)
|
||||
.withDocumentation("Only if the log file size is greater than the threshold in bytes,"
|
||||
+ " the file group will be compacted.");
|
||||
|
||||
public static final ConfigProperty<String> COMPACTION_STRATEGY = ConfigProperty
|
||||
.key("hoodie.compaction.strategy")
|
||||
.defaultValue(LogFileSizeBasedCompactionStrategy.class.getName())
|
||||
@@ -598,6 +604,11 @@ public class HoodieCompactionConfig extends HoodieConfig {
|
||||
return this;
|
||||
}
|
||||
|
||||
public Builder withLogFileSizeThresholdBasedCompaction(long logFileSizeThreshold) {
|
||||
compactionConfig.setValue(COMPACTION_LOG_FILE_SIZE_THRESHOLD, String.valueOf(logFileSizeThreshold));
|
||||
return this;
|
||||
}
|
||||
|
||||
public Builder withCommitsArchivalBatchSize(int batchSize) {
|
||||
compactionConfig.setValue(COMMITS_ARCHIVAL_BATCH_SIZE, String.valueOf(batchSize));
|
||||
return this;
|
||||
|
||||
@@ -1133,6 +1133,10 @@ public class HoodieWriteConfig extends HoodieConfig {
|
||||
return getLong(HoodieCompactionConfig.TARGET_IO_PER_COMPACTION_IN_MB);
|
||||
}
|
||||
|
||||
public Long getCompactionLogFileSizeThreshold() {
|
||||
return getLong(HoodieCompactionConfig.COMPACTION_LOG_FILE_SIZE_THRESHOLD);
|
||||
}
|
||||
|
||||
public Boolean getCompactionLazyBlockReadEnabled() {
|
||||
return getBoolean(HoodieCompactionConfig.COMPACTION_LAZY_BLOCK_READ_ENABLE);
|
||||
}
|
||||
|
||||
@@ -27,7 +27,8 @@ import java.util.List;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
/**
|
||||
* LogFileSizeBasedCompactionStrategy orders the compactions based on the total log files size and limits the
|
||||
* LogFileSizeBasedCompactionStrategy orders the compactions based on the total log files size,
|
||||
* filters the file group which log files size is greater than the threshold and limits the
|
||||
* compactions within a configured IO bound.
|
||||
*
|
||||
* @see BoundedIOCompactionStrategy
|
||||
@@ -39,8 +40,12 @@ public class LogFileSizeBasedCompactionStrategy extends BoundedIOCompactionStrat
|
||||
@Override
|
||||
public List<HoodieCompactionOperation> orderAndFilter(HoodieWriteConfig writeConfig,
|
||||
List<HoodieCompactionOperation> operations, List<HoodieCompactionPlan> pendingCompactionPlans) {
|
||||
// Filter the file group which log files size is greater than the threshold in bytes.
|
||||
// Order the operations based on the reverse size of the logs and limit them by the IO
|
||||
return super.orderAndFilter(writeConfig, operations.stream().sorted(this).collect(Collectors.toList()),
|
||||
long threshold = writeConfig.getCompactionLogFileSizeThreshold();
|
||||
return super.orderAndFilter(writeConfig, operations.stream()
|
||||
.filter(e -> e.getMetrics().getOrDefault(TOTAL_LOG_FILE_SIZE, 0d) >= threshold)
|
||||
.sorted(this).collect(Collectors.toList()),
|
||||
pendingCompactionPlans);
|
||||
}
|
||||
|
||||
|
||||
@@ -99,19 +99,20 @@ public class TestHoodieCompactionStrategy {
|
||||
sizesMap.put(90 * MB, Collections.singletonList(1024 * MB));
|
||||
LogFileSizeBasedCompactionStrategy strategy = new LogFileSizeBasedCompactionStrategy();
|
||||
HoodieWriteConfig writeConfig = HoodieWriteConfig.newBuilder().withPath("/tmp").withCompactionConfig(
|
||||
HoodieCompactionConfig.newBuilder().withCompactionStrategy(strategy).withTargetIOPerCompactionInMB(400).build())
|
||||
HoodieCompactionConfig.newBuilder().withCompactionStrategy(strategy).withTargetIOPerCompactionInMB(1205)
|
||||
.withLogFileSizeThresholdBasedCompaction(100 * 1024 * 1024).build())
|
||||
.build();
|
||||
List<HoodieCompactionOperation> operations = createCompactionOperations(writeConfig, sizesMap);
|
||||
List<HoodieCompactionOperation> returned = strategy.orderAndFilter(writeConfig, operations, new ArrayList<>());
|
||||
|
||||
assertTrue(returned.size() < operations.size(),
|
||||
"LogFileSizeBasedCompactionStrategy should have resulted in fewer compactions");
|
||||
assertEquals(1, returned.size(), "LogFileSizeBasedCompactionStrategy should have resulted in 1 compaction");
|
||||
assertEquals(2, returned.size(), "LogFileSizeBasedCompactionStrategy should have resulted in 2 compaction");
|
||||
// Total size of all the log files
|
||||
Long returnedSize = returned.stream().map(s -> s.getMetrics().get(BoundedIOCompactionStrategy.TOTAL_IO_MB))
|
||||
.map(Double::longValue).reduce(Long::sum).orElse(0L);
|
||||
assertEquals(1204, (long) returnedSize,
|
||||
"Should chose the first 2 compactions which should result in a total IO of 690 MB");
|
||||
assertEquals(1594, (long) returnedSize,
|
||||
"Should chose the first 2 compactions which should result in a total IO of 1594 MB");
|
||||
}
|
||||
|
||||
@Test
|
||||
|
||||
Reference in New Issue
Block a user