Changing Day based compaction strategy to be IO agnostic
This commit is contained in:
committed by
vinoth chandar
parent
3da063f83b
commit
a6fe96fdfe
@@ -104,6 +104,10 @@ public class HoodieCompactionConfig extends DefaultHoodieConfig {
|
|||||||
private static final String DEFAULT_CLEANER_COMMITS_RETAINED = "24";
|
private static final String DEFAULT_CLEANER_COMMITS_RETAINED = "24";
|
||||||
private static final String DEFAULT_MAX_COMMITS_TO_KEEP = String.valueOf(128);
|
private static final String DEFAULT_MAX_COMMITS_TO_KEEP = String.valueOf(128);
|
||||||
private static final String DEFAULT_MIN_COMMITS_TO_KEEP = String.valueOf(96);
|
private static final String DEFAULT_MIN_COMMITS_TO_KEEP = String.valueOf(96);
|
||||||
|
public static final String TARGET_PARTITIONS_PER_DAYBASED_COMPACTION_PROP = "hoodie.compaction.daybased.target"
|
||||||
|
+ ".partitions";
|
||||||
|
// 500GB of target IO per compaction (both read and write)
|
||||||
|
public static final String DEFAULT_TARGET_PARTITIONS_PER_DAYBASED_COMPACTION = String.valueOf(10);
|
||||||
|
|
||||||
private HoodieCompactionConfig(Properties props) {
|
private HoodieCompactionConfig(Properties props) {
|
||||||
super(props);
|
super(props);
|
||||||
@@ -230,6 +234,12 @@ public class HoodieCompactionConfig extends DefaultHoodieConfig {
|
|||||||
return this;
|
return this;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public Builder withTargetPartitionsPerDayBasedCompaction(int targetPartitionsPerCompaction) {
|
||||||
|
props.setProperty(TARGET_PARTITIONS_PER_DAYBASED_COMPACTION_PROP,
|
||||||
|
String.valueOf(targetPartitionsPerCompaction));
|
||||||
|
return this;
|
||||||
|
}
|
||||||
|
|
||||||
public HoodieCompactionConfig build() {
|
public HoodieCompactionConfig build() {
|
||||||
HoodieCompactionConfig config = new HoodieCompactionConfig(props);
|
HoodieCompactionConfig config = new HoodieCompactionConfig(props);
|
||||||
setDefaultOnCondition(props, !props.containsKey(AUTO_CLEAN_PROP), AUTO_CLEAN_PROP,
|
setDefaultOnCondition(props, !props.containsKey(AUTO_CLEAN_PROP), AUTO_CLEAN_PROP,
|
||||||
@@ -269,6 +279,8 @@ public class HoodieCompactionConfig extends DefaultHoodieConfig {
|
|||||||
COMPACTION_LAZY_BLOCK_READ_ENABLED_PROP, DEFAULT_COMPACTION_LAZY_BLOCK_READ_ENABLED);
|
COMPACTION_LAZY_BLOCK_READ_ENABLED_PROP, DEFAULT_COMPACTION_LAZY_BLOCK_READ_ENABLED);
|
||||||
setDefaultOnCondition(props, !props.containsKey(COMPACTION_REVERSE_LOG_READ_ENABLED_PROP),
|
setDefaultOnCondition(props, !props.containsKey(COMPACTION_REVERSE_LOG_READ_ENABLED_PROP),
|
||||||
COMPACTION_REVERSE_LOG_READ_ENABLED_PROP, DEFAULT_COMPACTION_REVERSE_LOG_READ_ENABLED);
|
COMPACTION_REVERSE_LOG_READ_ENABLED_PROP, DEFAULT_COMPACTION_REVERSE_LOG_READ_ENABLED);
|
||||||
|
setDefaultOnCondition(props, !props.containsKey(TARGET_PARTITIONS_PER_DAYBASED_COMPACTION_PROP),
|
||||||
|
TARGET_PARTITIONS_PER_DAYBASED_COMPACTION_PROP, DEFAULT_TARGET_PARTITIONS_PER_DAYBASED_COMPACTION);
|
||||||
|
|
||||||
HoodieCleaningPolicy.valueOf(props.getProperty(CLEANER_POLICY_PROP));
|
HoodieCleaningPolicy.valueOf(props.getProperty(CLEANER_POLICY_PROP));
|
||||||
Preconditions.checkArgument(Integer.parseInt(props.getProperty(MAX_COMMITS_TO_KEEP)) > Integer
|
Preconditions.checkArgument(Integer.parseInt(props.getProperty(MAX_COMMITS_TO_KEEP)) > Integer
|
||||||
|
|||||||
@@ -237,6 +237,11 @@ public class HoodieWriteConfig extends DefaultHoodieConfig {
|
|||||||
return props.getProperty(HoodieCompactionConfig.PAYLOAD_CLASS_PROP);
|
return props.getProperty(HoodieCompactionConfig.PAYLOAD_CLASS_PROP);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public int getTargetPartitionsPerDayBasedCompaction() {
|
||||||
|
return Integer
|
||||||
|
.parseInt(props.getProperty(HoodieCompactionConfig.TARGET_PARTITIONS_PER_DAYBASED_COMPACTION_PROP));
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* index properties
|
* index properties
|
||||||
**/
|
**/
|
||||||
|
|||||||
@@ -17,6 +17,7 @@
|
|||||||
|
|
||||||
package com.uber.hoodie.io.compact.strategy;
|
package com.uber.hoodie.io.compact.strategy;
|
||||||
|
|
||||||
|
import com.google.common.annotations.VisibleForTesting;
|
||||||
import com.uber.hoodie.config.HoodieWriteConfig;
|
import com.uber.hoodie.config.HoodieWriteConfig;
|
||||||
import com.uber.hoodie.exception.HoodieException;
|
import com.uber.hoodie.exception.HoodieException;
|
||||||
import com.uber.hoodie.io.compact.CompactionOperation;
|
import com.uber.hoodie.io.compact.CompactionOperation;
|
||||||
@@ -26,39 +27,46 @@ import java.util.Comparator;
|
|||||||
import java.util.Date;
|
import java.util.Date;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Locale;
|
import java.util.Locale;
|
||||||
|
import java.util.Map;
|
||||||
import java.util.stream.Collectors;
|
import java.util.stream.Collectors;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* This strategy orders compactions in reverse order of creation of Hive Partitions. It helps to
|
* This strategy orders compactions in reverse order of creation of Hive Partitions. It helps to
|
||||||
* compact data in latest partitions first and then older capped at the Total_IO allowed.
|
* compact data in latest partitions first and then older capped at the Total_IO allowed.
|
||||||
*/
|
*/
|
||||||
public class DayBasedCompactionStrategy extends BoundedIOCompactionStrategy {
|
public class DayBasedCompactionStrategy extends CompactionStrategy {
|
||||||
|
|
||||||
// For now, use SimpleDateFormat as default partition format
|
// For now, use SimpleDateFormat as default partition format
|
||||||
private static String datePartitionFormat = "yyyy/MM/dd";
|
private static String datePartitionFormat = "yyyy/MM/dd";
|
||||||
// Sorts compaction in LastInFirstCompacted order
|
// Sorts compaction in LastInFirstCompacted order
|
||||||
private static Comparator<CompactionOperation> comparator = (CompactionOperation leftC,
|
private static Comparator<String> comparator = (String leftPartition,
|
||||||
CompactionOperation rightC) -> {
|
String rightPartition) -> {
|
||||||
try {
|
try {
|
||||||
Date left = new SimpleDateFormat(datePartitionFormat, Locale.ENGLISH)
|
Date left = new SimpleDateFormat(datePartitionFormat, Locale.ENGLISH)
|
||||||
.parse(leftC.getPartitionPath());
|
.parse(leftPartition);
|
||||||
Date right = new SimpleDateFormat(datePartitionFormat, Locale.ENGLISH)
|
Date right = new SimpleDateFormat(datePartitionFormat, Locale.ENGLISH)
|
||||||
.parse(rightC.getPartitionPath());
|
.parse(rightPartition);
|
||||||
return left.after(right) ? -1 : right.after(left) ? 1 : 0;
|
return left.after(right) ? -1 : right.after(left) ? 1 : 0;
|
||||||
} catch (ParseException e) {
|
} catch (ParseException e) {
|
||||||
throw new HoodieException("Invalid Partition Date Format", e);
|
throw new HoodieException("Invalid Partition Date Format", e);
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
public Comparator<CompactionOperation> getComparator() {
|
@VisibleForTesting
|
||||||
|
public Comparator<String> getComparator() {
|
||||||
return comparator;
|
return comparator;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public List<CompactionOperation> orderAndFilter(HoodieWriteConfig writeConfig,
|
public List<CompactionOperation> orderAndFilter(HoodieWriteConfig writeConfig,
|
||||||
List<CompactionOperation> operations) {
|
List<CompactionOperation> operations) {
|
||||||
// Iterate through the operations and accept operations as long as we are within the IO limit
|
// Iterate through the operations and accept operations as long as we are within the configured target partitions
|
||||||
return super.orderAndFilter(writeConfig,
|
// limit
|
||||||
operations.stream().sorted(comparator).collect(Collectors.toList()));
|
List<CompactionOperation> filteredList = operations.stream()
|
||||||
|
.collect(Collectors.groupingBy(CompactionOperation::getPartitionPath)).entrySet().stream()
|
||||||
|
.sorted(Map.Entry.comparingByKey(comparator)).limit(writeConfig.getTargetPartitionsPerDayBasedCompaction())
|
||||||
|
.flatMap(e -> e.getValue().stream())
|
||||||
|
.collect(Collectors.toList());
|
||||||
|
return filteredList;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -33,6 +33,7 @@ import java.util.Map;
|
|||||||
import java.util.Optional;
|
import java.util.Optional;
|
||||||
import java.util.Random;
|
import java.util.Random;
|
||||||
import java.util.stream.Collectors;
|
import java.util.stream.Collectors;
|
||||||
|
import org.junit.Assert;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
|
|
||||||
public class TestHoodieCompactionStrategy {
|
public class TestHoodieCompactionStrategy {
|
||||||
@@ -109,17 +110,33 @@ public class TestHoodieCompactionStrategy {
|
|||||||
sizesMap.put(110 * MB, Lists.newArrayList());
|
sizesMap.put(110 * MB, Lists.newArrayList());
|
||||||
sizesMap.put(100 * MB, Lists.newArrayList(MB));
|
sizesMap.put(100 * MB, Lists.newArrayList(MB));
|
||||||
sizesMap.put(90 * MB, Lists.newArrayList(1024 * MB));
|
sizesMap.put(90 * MB, Lists.newArrayList(1024 * MB));
|
||||||
|
sizesMap.put(90 * MB, Lists.newArrayList(1024 * MB));
|
||||||
|
sizesMap.put(90 * MB, Lists.newArrayList(1024 * MB));
|
||||||
|
sizesMap.put(90 * MB, Lists.newArrayList(1024 * MB));
|
||||||
|
|
||||||
DayBasedCompactionStrategy strategy = new DayBasedCompactionStrategy();
|
DayBasedCompactionStrategy strategy = new DayBasedCompactionStrategy();
|
||||||
HoodieWriteConfig writeConfig = HoodieWriteConfig.newBuilder().withPath("/tmp").withCompactionConfig(
|
HoodieWriteConfig writeConfig = HoodieWriteConfig.newBuilder().withPath("/tmp").withCompactionConfig(
|
||||||
HoodieCompactionConfig.newBuilder().withCompactionStrategy(strategy).withTargetIOPerCompactionInMB(400).build())
|
HoodieCompactionConfig.newBuilder().withCompactionStrategy(strategy)
|
||||||
.build();
|
.withTargetPartitionsPerDayBasedCompaction(1)
|
||||||
List<CompactionOperation> operations = createCompactionOperations(writeConfig, sizesMap);
|
.build()).build();
|
||||||
|
|
||||||
|
List<CompactionOperation> operations = Lists.newArrayList(sizesMap.size());
|
||||||
|
int partitionPathIndex = 0;
|
||||||
|
for (Map.Entry<Long, List<Long>> entry : sizesMap.entrySet()) {
|
||||||
|
operations.add(new CompactionOperation(Optional.of(TestHoodieDataFile.newDataFile(entry.getKey())),
|
||||||
|
partitionPaths[(partitionPathIndex % (partitionPaths.length - 1))],
|
||||||
|
entry.getValue().stream().map(TestHoodieLogFile::newLogFile).collect(Collectors.toList()), writeConfig));
|
||||||
|
partitionPathIndex++;
|
||||||
|
}
|
||||||
List<CompactionOperation> returned = strategy.orderAndFilter(writeConfig, operations);
|
List<CompactionOperation> returned = strategy.orderAndFilter(writeConfig, operations);
|
||||||
|
|
||||||
assertTrue("DayBasedCompactionStrategy should have resulted in fewer compactions",
|
assertTrue("DayBasedCompactionStrategy should have resulted in fewer compactions",
|
||||||
returned.size() < operations.size());
|
returned.size() < operations.size());
|
||||||
|
Assert.assertEquals("DayBasedCompactionStrategy should have resulted in fewer compactions",
|
||||||
|
returned.size(), 2);
|
||||||
|
|
||||||
int comparision = strategy.getComparator().compare(returned.get(returned.size() - 1), returned.get(0));
|
int comparision = strategy.getComparator().compare(returned.get(returned.size() - 1).getPartitionPath(), returned
|
||||||
|
.get(0).getPartitionPath());
|
||||||
// Either the partition paths are sorted in descending order or they are equal
|
// Either the partition paths are sorted in descending order or they are equal
|
||||||
assertTrue("DayBasedCompactionStrategy should sort partitions in descending order", comparision >= 0);
|
assertTrue("DayBasedCompactionStrategy should sort partitions in descending order", comparision >= 0);
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user