Adding a new Partition/Time based compaction strategy
This commit is contained in:
committed by
vinoth chandar
parent
051f600b7f
commit
4aed5c7338
@@ -87,6 +87,7 @@ public class HoodieRealtimeTableCompactor implements HoodieCompactor {
|
|||||||
.getLatestFileSlices(partitionPath)
|
.getLatestFileSlices(partitionPath)
|
||||||
.map(s -> new CompactionOperation(s.getDataFile().get(),
|
.map(s -> new CompactionOperation(s.getDataFile().get(),
|
||||||
partitionPath, s.getLogFiles().collect(Collectors.toList()), config))
|
partitionPath, s.getLogFiles().collect(Collectors.toList()), config))
|
||||||
|
.filter(c -> !c.getDeltaFilePaths().isEmpty())
|
||||||
.collect(toList()).iterator()).collect();
|
.collect(toList()).iterator()).collect();
|
||||||
log.info("Total of " + operations.size() + " compactions are retrieved");
|
log.info("Total of " + operations.size() + " compactions are retrieved");
|
||||||
|
|
||||||
|
|||||||
@@ -0,0 +1,62 @@
|
|||||||
|
/*
|
||||||
|
* Copyright (c) 2016 Uber Technologies, Inc. (hoodie-dev-group@uber.com)
|
||||||
|
*
|
||||||
|
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
* you may not use this file except in compliance with the License.
|
||||||
|
* You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
|
||||||
|
package com.uber.hoodie.io.compact.strategy;
|
||||||
|
|
||||||
|
import com.uber.hoodie.config.HoodieWriteConfig;
|
||||||
|
import com.uber.hoodie.exception.HoodieException;
|
||||||
|
import com.uber.hoodie.io.compact.CompactionOperation;
|
||||||
|
|
||||||
|
import java.text.ParseException;
|
||||||
|
import java.text.SimpleDateFormat;
|
||||||
|
import java.util.Comparator;
|
||||||
|
import java.util.Date;
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.Locale;
|
||||||
|
import java.util.stream.Collectors;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* This strategy orders compactions in reverse order of creation of Hive Partitions.
|
||||||
|
* It helps to compact data in latest partitions first and then older capped at the Total_IO allowed.
|
||||||
|
*/
|
||||||
|
public class DayBasedCompactionStrategy extends BoundedIOCompactionStrategy {
|
||||||
|
|
||||||
|
// For now, use SimpleDateFormat as default partition format
|
||||||
|
private static String datePartitionFormat = "yyyy/MM/dd";
|
||||||
|
// Sorts compaction in LastInFirstCompacted order
|
||||||
|
private static Comparator<CompactionOperation> comparator = (CompactionOperation leftC, CompactionOperation rightC) -> {
|
||||||
|
try {
|
||||||
|
Date left = new SimpleDateFormat(datePartitionFormat, Locale.ENGLISH)
|
||||||
|
.parse(leftC.getPartitionPath());
|
||||||
|
Date right = new SimpleDateFormat(datePartitionFormat, Locale.ENGLISH)
|
||||||
|
.parse(rightC.getPartitionPath());
|
||||||
|
return left.after(right) ? -1 : right.after(left) ? 1 : 0;
|
||||||
|
} catch (ParseException e) {
|
||||||
|
throw new HoodieException("Invalid Partition Date Format", e);
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
public Comparator<CompactionOperation> getComparator() {
|
||||||
|
return comparator;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public List<CompactionOperation> orderAndFilter(HoodieWriteConfig writeConfig, List<CompactionOperation> operations) {
|
||||||
|
// Iterate through the operations and accept operations as long as we are within the IO limit
|
||||||
|
return super.orderAndFilter(writeConfig, operations.stream().sorted(comparator).collect(Collectors.toList()));
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -128,7 +128,7 @@ public class TestHoodieCompactor {
|
|||||||
compactor.compact(jsc, getConfig(), table, HoodieActiveTimeline.createNewCommitTime());
|
compactor.compact(jsc, getConfig(), table, HoodieActiveTimeline.createNewCommitTime());
|
||||||
String basePath = table.getMetaClient().getBasePath();
|
String basePath = table.getMetaClient().getBasePath();
|
||||||
assertTrue("If there is nothing to compact, result will be empty",
|
assertTrue("If there is nothing to compact, result will be empty",
|
||||||
result.getFileIdAndFullPaths(basePath).isEmpty());
|
result == null || result.getFileIdAndFullPaths(basePath).isEmpty());
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
|
|||||||
@@ -16,25 +16,29 @@
|
|||||||
|
|
||||||
package com.uber.hoodie.io.strategy;
|
package com.uber.hoodie.io.strategy;
|
||||||
|
|
||||||
import static org.junit.Assert.assertEquals;
|
|
||||||
import static org.junit.Assert.assertTrue;
|
|
||||||
|
|
||||||
import com.beust.jcommander.internal.Lists;
|
import com.beust.jcommander.internal.Lists;
|
||||||
import com.google.common.collect.Maps;
|
import com.google.common.collect.Maps;
|
||||||
import com.uber.hoodie.config.HoodieCompactionConfig;
|
import com.uber.hoodie.config.HoodieCompactionConfig;
|
||||||
import com.uber.hoodie.config.HoodieWriteConfig;
|
import com.uber.hoodie.config.HoodieWriteConfig;
|
||||||
import com.uber.hoodie.io.compact.CompactionOperation;
|
import com.uber.hoodie.io.compact.CompactionOperation;
|
||||||
import com.uber.hoodie.io.compact.strategy.BoundedIOCompactionStrategy;
|
import com.uber.hoodie.io.compact.strategy.BoundedIOCompactionStrategy;
|
||||||
|
import com.uber.hoodie.io.compact.strategy.DayBasedCompactionStrategy;
|
||||||
import com.uber.hoodie.io.compact.strategy.LogFileSizeBasedCompactionStrategy;
|
import com.uber.hoodie.io.compact.strategy.LogFileSizeBasedCompactionStrategy;
|
||||||
import com.uber.hoodie.io.compact.strategy.UnBoundedCompactionStrategy;
|
import com.uber.hoodie.io.compact.strategy.UnBoundedCompactionStrategy;
|
||||||
|
import org.junit.Test;
|
||||||
|
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
|
import java.util.Random;
|
||||||
import java.util.stream.Collectors;
|
import java.util.stream.Collectors;
|
||||||
import org.junit.Test;
|
|
||||||
|
import static org.junit.Assert.assertEquals;
|
||||||
|
import static org.junit.Assert.assertTrue;
|
||||||
|
|
||||||
public class TestHoodieCompactionStrategy {
|
public class TestHoodieCompactionStrategy {
|
||||||
|
|
||||||
private static final long MB = 1024 * 1024L;
|
private static final long MB = 1024 * 1024L;
|
||||||
|
private String [] partitionPaths = {"2017/01/01", "2017/01/02", "2017/01/03"};
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testUnBounded() {
|
public void testUnBounded() {
|
||||||
@@ -106,11 +110,36 @@ public class TestHoodieCompactionStrategy {
|
|||||||
1204, (long) returnedSize);
|
1204, (long) returnedSize);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testPartitionAwareCompactionSimple() {
|
||||||
|
Map<Long, List<Long>> sizesMap = Maps.newHashMap();
|
||||||
|
sizesMap.put(120 * MB, Lists.newArrayList(60 * MB, 10 * MB, 80 * MB));
|
||||||
|
sizesMap.put(110 * MB, Lists.newArrayList());
|
||||||
|
sizesMap.put(100 * MB, Lists.newArrayList(MB));
|
||||||
|
sizesMap.put(90 * MB, Lists.newArrayList(1024 * MB));
|
||||||
|
DayBasedCompactionStrategy strategy = new DayBasedCompactionStrategy();
|
||||||
|
HoodieWriteConfig writeConfig = HoodieWriteConfig.newBuilder().withPath("/tmp")
|
||||||
|
.withCompactionConfig(
|
||||||
|
HoodieCompactionConfig.newBuilder().withCompactionStrategy(strategy)
|
||||||
|
.withTargetIOPerCompactionInMB(400).build()).build();
|
||||||
|
List<CompactionOperation> operations = createCompactionOperations(writeConfig, sizesMap);
|
||||||
|
List<CompactionOperation> returned = strategy.orderAndFilter(writeConfig, operations);
|
||||||
|
|
||||||
|
assertTrue("DayBasedCompactionStrategy should have resulted in fewer compactions",
|
||||||
|
returned.size() < operations.size());
|
||||||
|
|
||||||
|
int comparision = strategy.getComparator().compare(returned.get(returned.size() - 1), returned.get(0));
|
||||||
|
// Either the partition paths are sorted in descending order or they are equal
|
||||||
|
assertTrue("DayBasedCompactionStrategy should sort partitions in descending order",
|
||||||
|
comparision >= 0);
|
||||||
|
}
|
||||||
|
|
||||||
private List<CompactionOperation> createCompactionOperations(HoodieWriteConfig config,
|
private List<CompactionOperation> createCompactionOperations(HoodieWriteConfig config,
|
||||||
Map<Long, List<Long>> sizesMap) {
|
Map<Long, List<Long>> sizesMap) {
|
||||||
List<CompactionOperation> operations = Lists.newArrayList(sizesMap.size());
|
List<CompactionOperation> operations = Lists.newArrayList(sizesMap.size());
|
||||||
sizesMap.forEach((k, v) -> {
|
sizesMap.forEach((k, v) -> {
|
||||||
operations.add(new CompactionOperation(TestHoodieDataFile.newDataFile(k), "",
|
operations.add(new CompactionOperation(TestHoodieDataFile.newDataFile(k),
|
||||||
|
partitionPaths[new Random().nextInt(partitionPaths.length - 1)],
|
||||||
v.stream().map(TestHoodieLogFile::newLogFile).collect(
|
v.stream().map(TestHoodieLogFile::newLogFile).collect(
|
||||||
Collectors.toList()), config));
|
Collectors.toList()), config));
|
||||||
});
|
});
|
||||||
|
|||||||
Reference in New Issue
Block a user