From 4aed5c7338701310ea7f71459b80a532ad5eccd7 Mon Sep 17 00:00:00 2001 From: Nishith Agarwal Date: Wed, 25 Oct 2017 13:30:15 -0700 Subject: [PATCH] Adding a new Partition/Time based compaction strategy --- .../compact/HoodieRealtimeTableCompactor.java | 1 + .../strategy/DayBasedCompactionStrategy.java | 62 +++++++++++++++++++ .../uber/hoodie/io/TestHoodieCompactor.java | 2 +- .../TestHoodieCompactionStrategy.java | 39 ++++++++++-- 4 files changed, 98 insertions(+), 6 deletions(-) create mode 100644 hoodie-client/src/main/java/com/uber/hoodie/io/compact/strategy/DayBasedCompactionStrategy.java diff --git a/hoodie-client/src/main/java/com/uber/hoodie/io/compact/HoodieRealtimeTableCompactor.java b/hoodie-client/src/main/java/com/uber/hoodie/io/compact/HoodieRealtimeTableCompactor.java index e794492eb..e4da087c1 100644 --- a/hoodie-client/src/main/java/com/uber/hoodie/io/compact/HoodieRealtimeTableCompactor.java +++ b/hoodie-client/src/main/java/com/uber/hoodie/io/compact/HoodieRealtimeTableCompactor.java @@ -87,6 +87,7 @@ public class HoodieRealtimeTableCompactor implements HoodieCompactor { .getLatestFileSlices(partitionPath) .map(s -> new CompactionOperation(s.getDataFile().get(), partitionPath, s.getLogFiles().collect(Collectors.toList()), config)) + .filter(c -> !c.getDeltaFilePaths().isEmpty()) .collect(toList()).iterator()).collect(); log.info("Total of " + operations.size() + " compactions are retrieved"); diff --git a/hoodie-client/src/main/java/com/uber/hoodie/io/compact/strategy/DayBasedCompactionStrategy.java b/hoodie-client/src/main/java/com/uber/hoodie/io/compact/strategy/DayBasedCompactionStrategy.java new file mode 100644 index 000000000..5aa967869 --- /dev/null +++ b/hoodie-client/src/main/java/com/uber/hoodie/io/compact/strategy/DayBasedCompactionStrategy.java @@ -0,0 +1,62 @@ +/* + * Copyright (c) 2016 Uber Technologies, Inc. (hoodie-dev-group@uber.com) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +package com.uber.hoodie.io.compact.strategy; + +import com.uber.hoodie.config.HoodieWriteConfig; +import com.uber.hoodie.exception.HoodieException; +import com.uber.hoodie.io.compact.CompactionOperation; + +import java.text.ParseException; +import java.text.SimpleDateFormat; +import java.util.Comparator; +import java.util.Date; +import java.util.List; +import java.util.Locale; +import java.util.stream.Collectors; + +/** + * This strategy orders compactions in reverse order of creation of Hive Partitions. + * It helps to compact data in latest partitions first and then older capped at the Total_IO allowed. + */ +public class DayBasedCompactionStrategy extends BoundedIOCompactionStrategy { + + // For now, use SimpleDateFormat as default partition format + private static String datePartitionFormat = "yyyy/MM/dd"; + // Sorts compaction in LastInFirstCompacted order + private static Comparator comparator = (CompactionOperation leftC, CompactionOperation rightC) -> { + try { + Date left = new SimpleDateFormat(datePartitionFormat, Locale.ENGLISH) + .parse(leftC.getPartitionPath()); + Date right = new SimpleDateFormat(datePartitionFormat, Locale.ENGLISH) + .parse(rightC.getPartitionPath()); + return left.after(right) ? -1 : right.after(left) ? 1 : 0; + } catch (ParseException e) { + throw new HoodieException("Invalid Partition Date Format", e); + } + }; + + public Comparator getComparator() { + return comparator; + } + + @Override + public List orderAndFilter(HoodieWriteConfig writeConfig, List operations) { + // Iterate through the operations and accept operations as long as we are within the IO limit + return super.orderAndFilter(writeConfig, operations.stream().sorted(comparator).collect(Collectors.toList())); + } +} \ No newline at end of file diff --git a/hoodie-client/src/test/java/com/uber/hoodie/io/TestHoodieCompactor.java b/hoodie-client/src/test/java/com/uber/hoodie/io/TestHoodieCompactor.java index d13949c71..e8ea13055 100644 --- a/hoodie-client/src/test/java/com/uber/hoodie/io/TestHoodieCompactor.java +++ b/hoodie-client/src/test/java/com/uber/hoodie/io/TestHoodieCompactor.java @@ -128,7 +128,7 @@ public class TestHoodieCompactor { compactor.compact(jsc, getConfig(), table, HoodieActiveTimeline.createNewCommitTime()); String basePath = table.getMetaClient().getBasePath(); assertTrue("If there is nothing to compact, result will be empty", - result.getFileIdAndFullPaths(basePath).isEmpty()); + result == null || result.getFileIdAndFullPaths(basePath).isEmpty()); } @Test diff --git a/hoodie-client/src/test/java/com/uber/hoodie/io/strategy/TestHoodieCompactionStrategy.java b/hoodie-client/src/test/java/com/uber/hoodie/io/strategy/TestHoodieCompactionStrategy.java index c01e21522..55898c373 100644 --- a/hoodie-client/src/test/java/com/uber/hoodie/io/strategy/TestHoodieCompactionStrategy.java +++ b/hoodie-client/src/test/java/com/uber/hoodie/io/strategy/TestHoodieCompactionStrategy.java @@ -16,25 +16,29 @@ package com.uber.hoodie.io.strategy; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertTrue; - import com.beust.jcommander.internal.Lists; import com.google.common.collect.Maps; import com.uber.hoodie.config.HoodieCompactionConfig; import com.uber.hoodie.config.HoodieWriteConfig; import com.uber.hoodie.io.compact.CompactionOperation; import com.uber.hoodie.io.compact.strategy.BoundedIOCompactionStrategy; +import com.uber.hoodie.io.compact.strategy.DayBasedCompactionStrategy; import com.uber.hoodie.io.compact.strategy.LogFileSizeBasedCompactionStrategy; import com.uber.hoodie.io.compact.strategy.UnBoundedCompactionStrategy; +import org.junit.Test; + import java.util.List; import java.util.Map; +import java.util.Random; import java.util.stream.Collectors; -import org.junit.Test; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; public class TestHoodieCompactionStrategy { private static final long MB = 1024 * 1024L; + private String [] partitionPaths = {"2017/01/01", "2017/01/02", "2017/01/03"}; @Test public void testUnBounded() { @@ -106,11 +110,36 @@ public class TestHoodieCompactionStrategy { 1204, (long) returnedSize); } + @Test + public void testPartitionAwareCompactionSimple() { + Map> sizesMap = Maps.newHashMap(); + sizesMap.put(120 * MB, Lists.newArrayList(60 * MB, 10 * MB, 80 * MB)); + sizesMap.put(110 * MB, Lists.newArrayList()); + sizesMap.put(100 * MB, Lists.newArrayList(MB)); + sizesMap.put(90 * MB, Lists.newArrayList(1024 * MB)); + DayBasedCompactionStrategy strategy = new DayBasedCompactionStrategy(); + HoodieWriteConfig writeConfig = HoodieWriteConfig.newBuilder().withPath("/tmp") + .withCompactionConfig( + HoodieCompactionConfig.newBuilder().withCompactionStrategy(strategy) + .withTargetIOPerCompactionInMB(400).build()).build(); + List operations = createCompactionOperations(writeConfig, sizesMap); + List returned = strategy.orderAndFilter(writeConfig, operations); + + assertTrue("DayBasedCompactionStrategy should have resulted in fewer compactions", + returned.size() < operations.size()); + + int comparision = strategy.getComparator().compare(returned.get(returned.size() - 1), returned.get(0)); + // Either the partition paths are sorted in descending order or they are equal + assertTrue("DayBasedCompactionStrategy should sort partitions in descending order", + comparision >= 0); + } + private List createCompactionOperations(HoodieWriteConfig config, Map> sizesMap) { List operations = Lists.newArrayList(sizesMap.size()); sizesMap.forEach((k, v) -> { - operations.add(new CompactionOperation(TestHoodieDataFile.newDataFile(k), "", + operations.add(new CompactionOperation(TestHoodieDataFile.newDataFile(k), + partitionPaths[new Random().nextInt(partitionPaths.length - 1)], v.stream().map(TestHoodieLogFile::newLogFile).collect( Collectors.toList()), config)); });