diff --git a/hoodie-client/src/main/java/com/uber/hoodie/config/HoodieCompactionConfig.java b/hoodie-client/src/main/java/com/uber/hoodie/config/HoodieCompactionConfig.java index 3d10d1b73..d1584aac0 100644 --- a/hoodie-client/src/main/java/com/uber/hoodie/config/HoodieCompactionConfig.java +++ b/hoodie-client/src/main/java/com/uber/hoodie/config/HoodieCompactionConfig.java @@ -104,6 +104,10 @@ public class HoodieCompactionConfig extends DefaultHoodieConfig { private static final String DEFAULT_CLEANER_COMMITS_RETAINED = "24"; private static final String DEFAULT_MAX_COMMITS_TO_KEEP = String.valueOf(128); private static final String DEFAULT_MIN_COMMITS_TO_KEEP = String.valueOf(96); + public static final String TARGET_PARTITIONS_PER_DAYBASED_COMPACTION_PROP = "hoodie.compaction.daybased.target" + + ".partitions"; + // 500GB of target IO per compaction (both read and write) + public static final String DEFAULT_TARGET_PARTITIONS_PER_DAYBASED_COMPACTION = String.valueOf(10); private HoodieCompactionConfig(Properties props) { super(props); @@ -230,6 +234,12 @@ public class HoodieCompactionConfig extends DefaultHoodieConfig { return this; } + public Builder withTargetPartitionsPerDayBasedCompaction(int targetPartitionsPerCompaction) { + props.setProperty(TARGET_PARTITIONS_PER_DAYBASED_COMPACTION_PROP, + String.valueOf(targetPartitionsPerCompaction)); + return this; + } + public HoodieCompactionConfig build() { HoodieCompactionConfig config = new HoodieCompactionConfig(props); setDefaultOnCondition(props, !props.containsKey(AUTO_CLEAN_PROP), AUTO_CLEAN_PROP, @@ -269,6 +279,8 @@ public class HoodieCompactionConfig extends DefaultHoodieConfig { COMPACTION_LAZY_BLOCK_READ_ENABLED_PROP, DEFAULT_COMPACTION_LAZY_BLOCK_READ_ENABLED); setDefaultOnCondition(props, !props.containsKey(COMPACTION_REVERSE_LOG_READ_ENABLED_PROP), COMPACTION_REVERSE_LOG_READ_ENABLED_PROP, DEFAULT_COMPACTION_REVERSE_LOG_READ_ENABLED); + setDefaultOnCondition(props, !props.containsKey(TARGET_PARTITIONS_PER_DAYBASED_COMPACTION_PROP), + TARGET_PARTITIONS_PER_DAYBASED_COMPACTION_PROP, DEFAULT_TARGET_PARTITIONS_PER_DAYBASED_COMPACTION); HoodieCleaningPolicy.valueOf(props.getProperty(CLEANER_POLICY_PROP)); Preconditions.checkArgument(Integer.parseInt(props.getProperty(MAX_COMMITS_TO_KEEP)) > Integer diff --git a/hoodie-client/src/main/java/com/uber/hoodie/config/HoodieWriteConfig.java b/hoodie-client/src/main/java/com/uber/hoodie/config/HoodieWriteConfig.java index c974d2892..6c1e394df 100644 --- a/hoodie-client/src/main/java/com/uber/hoodie/config/HoodieWriteConfig.java +++ b/hoodie-client/src/main/java/com/uber/hoodie/config/HoodieWriteConfig.java @@ -237,6 +237,11 @@ public class HoodieWriteConfig extends DefaultHoodieConfig { return props.getProperty(HoodieCompactionConfig.PAYLOAD_CLASS_PROP); } + public int getTargetPartitionsPerDayBasedCompaction() { + return Integer + .parseInt(props.getProperty(HoodieCompactionConfig.TARGET_PARTITIONS_PER_DAYBASED_COMPACTION_PROP)); + } + /** * index properties **/ diff --git a/hoodie-client/src/main/java/com/uber/hoodie/io/compact/strategy/DayBasedCompactionStrategy.java b/hoodie-client/src/main/java/com/uber/hoodie/io/compact/strategy/DayBasedCompactionStrategy.java index 3e5045952..714dad54e 100644 --- a/hoodie-client/src/main/java/com/uber/hoodie/io/compact/strategy/DayBasedCompactionStrategy.java +++ b/hoodie-client/src/main/java/com/uber/hoodie/io/compact/strategy/DayBasedCompactionStrategy.java @@ -17,6 +17,7 @@ package com.uber.hoodie.io.compact.strategy; +import com.google.common.annotations.VisibleForTesting; import com.uber.hoodie.config.HoodieWriteConfig; import com.uber.hoodie.exception.HoodieException; import com.uber.hoodie.io.compact.CompactionOperation; @@ -26,39 +27,46 @@ import java.util.Comparator; import java.util.Date; import java.util.List; import java.util.Locale; +import java.util.Map; import java.util.stream.Collectors; /** * This strategy orders compactions in reverse order of creation of Hive Partitions. It helps to * compact data in latest partitions first and then older capped at the Total_IO allowed. */ -public class DayBasedCompactionStrategy extends BoundedIOCompactionStrategy { +public class DayBasedCompactionStrategy extends CompactionStrategy { // For now, use SimpleDateFormat as default partition format private static String datePartitionFormat = "yyyy/MM/dd"; // Sorts compaction in LastInFirstCompacted order - private static Comparator comparator = (CompactionOperation leftC, - CompactionOperation rightC) -> { + private static Comparator comparator = (String leftPartition, + String rightPartition) -> { try { Date left = new SimpleDateFormat(datePartitionFormat, Locale.ENGLISH) - .parse(leftC.getPartitionPath()); + .parse(leftPartition); Date right = new SimpleDateFormat(datePartitionFormat, Locale.ENGLISH) - .parse(rightC.getPartitionPath()); + .parse(rightPartition); return left.after(right) ? -1 : right.after(left) ? 1 : 0; } catch (ParseException e) { throw new HoodieException("Invalid Partition Date Format", e); } }; - public Comparator getComparator() { + @VisibleForTesting + public Comparator getComparator() { return comparator; } @Override public List orderAndFilter(HoodieWriteConfig writeConfig, List operations) { - // Iterate through the operations and accept operations as long as we are within the IO limit - return super.orderAndFilter(writeConfig, - operations.stream().sorted(comparator).collect(Collectors.toList())); + // Iterate through the operations and accept operations as long as we are within the configured target partitions + // limit + List filteredList = operations.stream() + .collect(Collectors.groupingBy(CompactionOperation::getPartitionPath)).entrySet().stream() + .sorted(Map.Entry.comparingByKey(comparator)).limit(writeConfig.getTargetPartitionsPerDayBasedCompaction()) + .flatMap(e -> e.getValue().stream()) + .collect(Collectors.toList()); + return filteredList; } } \ No newline at end of file diff --git a/hoodie-client/src/test/java/com/uber/hoodie/io/strategy/TestHoodieCompactionStrategy.java b/hoodie-client/src/test/java/com/uber/hoodie/io/strategy/TestHoodieCompactionStrategy.java index 9965cfcba..cb53fdd8d 100644 --- a/hoodie-client/src/test/java/com/uber/hoodie/io/strategy/TestHoodieCompactionStrategy.java +++ b/hoodie-client/src/test/java/com/uber/hoodie/io/strategy/TestHoodieCompactionStrategy.java @@ -33,6 +33,7 @@ import java.util.Map; import java.util.Optional; import java.util.Random; import java.util.stream.Collectors; +import org.junit.Assert; import org.junit.Test; public class TestHoodieCompactionStrategy { @@ -109,17 +110,33 @@ public class TestHoodieCompactionStrategy { sizesMap.put(110 * MB, Lists.newArrayList()); sizesMap.put(100 * MB, Lists.newArrayList(MB)); sizesMap.put(90 * MB, Lists.newArrayList(1024 * MB)); + sizesMap.put(90 * MB, Lists.newArrayList(1024 * MB)); + sizesMap.put(90 * MB, Lists.newArrayList(1024 * MB)); + sizesMap.put(90 * MB, Lists.newArrayList(1024 * MB)); + DayBasedCompactionStrategy strategy = new DayBasedCompactionStrategy(); HoodieWriteConfig writeConfig = HoodieWriteConfig.newBuilder().withPath("/tmp").withCompactionConfig( - HoodieCompactionConfig.newBuilder().withCompactionStrategy(strategy).withTargetIOPerCompactionInMB(400).build()) - .build(); - List operations = createCompactionOperations(writeConfig, sizesMap); + HoodieCompactionConfig.newBuilder().withCompactionStrategy(strategy) + .withTargetPartitionsPerDayBasedCompaction(1) + .build()).build(); + + List operations = Lists.newArrayList(sizesMap.size()); + int partitionPathIndex = 0; + for (Map.Entry> entry : sizesMap.entrySet()) { + operations.add(new CompactionOperation(Optional.of(TestHoodieDataFile.newDataFile(entry.getKey())), + partitionPaths[(partitionPathIndex % (partitionPaths.length - 1))], + entry.getValue().stream().map(TestHoodieLogFile::newLogFile).collect(Collectors.toList()), writeConfig)); + partitionPathIndex++; + } List returned = strategy.orderAndFilter(writeConfig, operations); assertTrue("DayBasedCompactionStrategy should have resulted in fewer compactions", returned.size() < operations.size()); + Assert.assertEquals("DayBasedCompactionStrategy should have resulted in fewer compactions", + returned.size(), 2); - int comparision = strategy.getComparator().compare(returned.get(returned.size() - 1), returned.get(0)); + int comparision = strategy.getComparator().compare(returned.get(returned.size() - 1).getPartitionPath(), returned + .get(0).getPartitionPath()); // Either the partition paths are sorted in descending order or they are equal assertTrue("DayBasedCompactionStrategy should sort partitions in descending order", comparision >= 0); }