1
0

Implement Compaction policy abstraction. Implement LogSizeBased Bounded IO Compaction as the default strategy

This commit is contained in:
Prasanna Rajaperumal
2017-04-04 12:37:28 -07:00
committed by prazanna
parent 82b211d2e6
commit 91b088f29f
18 changed files with 585 additions and 68 deletions

View File

@@ -32,7 +32,6 @@ import com.uber.hoodie.config.HoodieIndexConfig;
import com.uber.hoodie.config.HoodieStorageConfig;
import com.uber.hoodie.config.HoodieWriteConfig;
import com.uber.hoodie.index.HoodieIndex;
import com.uber.hoodie.io.compact.CompactionFilter;
import com.uber.hoodie.io.compact.HoodieCompactor;
import com.uber.hoodie.io.compact.HoodieRealtimeTableCompactor;
import com.uber.hoodie.table.HoodieTable;
@@ -165,7 +164,7 @@ public class TestMergeOnReadTable {
HoodieCompactor compactor = new HoodieRealtimeTableCompactor();
HoodieTable table = HoodieTable.getHoodieTable(metaClient, getConfig());
compactor.compact(jsc, getConfig(), table, CompactionFilter.allowAll());
compactor.compact(jsc, getConfig(), table);
metaClient = new HoodieTableMetaClient(fs, cfg.getBasePath());
allFiles = HoodieTestUtils.listAllDataFilesInPath(fs, cfg.getBasePath());

View File

@@ -36,7 +36,6 @@ import com.uber.hoodie.config.HoodieStorageConfig;
import com.uber.hoodie.config.HoodieWriteConfig;
import com.uber.hoodie.index.HoodieBloomIndex;
import com.uber.hoodie.index.HoodieIndex;
import com.uber.hoodie.io.compact.CompactionFilter;
import com.uber.hoodie.io.compact.HoodieCompactor;
import com.uber.hoodie.io.compact.HoodieRealtimeTableCompactor;
import com.uber.hoodie.table.HoodieTable;
@@ -113,7 +112,7 @@ public class TestHoodieCompactor {
HoodieTableMetaClient metaClient = new HoodieTableMetaClient(FSUtils.getFs(), basePath);
HoodieTable table = HoodieTable.getHoodieTable(metaClient, getConfig());
compactor.compact(jsc, getConfig(), table, CompactionFilter.allowAll());
compactor.compact(jsc, getConfig(), table);
}
@Test
@@ -129,7 +128,7 @@ public class TestHoodieCompactor {
writeClient.insert(recordsRDD, newCommitTime).collect();
HoodieCompactionMetadata result =
compactor.compact(jsc, getConfig(), table, CompactionFilter.allowAll());
compactor.compact(jsc, getConfig(), table);
assertTrue("If there is nothing to compact, result will be empty",
result.getFileIdAndFullPaths().isEmpty());
}
@@ -177,7 +176,7 @@ public class TestHoodieCompactor {
table = HoodieTable.getHoodieTable(metaClient, config);
HoodieCompactionMetadata result =
compactor.compact(jsc, getConfig(), table, CompactionFilter.allowAll());
compactor.compact(jsc, getConfig(), table);
// Verify that recently written compacted data file has no log file
metaClient = new HoodieTableMetaClient(fs, basePath);

View File

@@ -0,0 +1,123 @@
/*
* Copyright (c) 2016 Uber Technologies, Inc. (hoodie-dev-group@uber.com)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.uber.hoodie.io.strategy;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertThat;
import static org.junit.Assert.assertTrue;
import com.beust.jcommander.internal.Lists;
import com.google.common.collect.Maps;
import com.uber.hoodie.common.model.HoodieDataFile;
import com.uber.hoodie.common.table.log.HoodieLogFile;
import com.uber.hoodie.config.HoodieCompactionConfig;
import com.uber.hoodie.config.HoodieWriteConfig;
import com.uber.hoodie.io.compact.CompactionOperation;
import com.uber.hoodie.io.compact.strategy.BoundedIOCompactionStrategy;
import com.uber.hoodie.io.compact.strategy.LogFileSizeBasedCompactionStrategy;
import com.uber.hoodie.io.compact.strategy.UnBoundedCompactionStrategy;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.stream.Collectors;
import org.junit.Test;
public class TestHoodieCompactionStrategy {
private static final long MB = 1024 * 1024L;
@Test
public void testUnBounded() {
Map<Long, List<Long>> sizesMap = Maps.newHashMap();
sizesMap.put(120 * MB, Lists.newArrayList(60 * MB, 10 * MB, 80 * MB));
sizesMap.put(110 * MB, Lists.newArrayList());
sizesMap.put(100 * MB, Lists.newArrayList(MB));
sizesMap.put(90 * MB, Lists.newArrayList(1024 * MB));
UnBoundedCompactionStrategy strategy = new UnBoundedCompactionStrategy();
HoodieWriteConfig writeConfig = HoodieWriteConfig.newBuilder().withPath("/tmp")
.withCompactionConfig(
HoodieCompactionConfig.newBuilder().withCompactionStrategy(strategy).build()).build();
List<CompactionOperation> operations = createCompactionOperations(writeConfig, sizesMap);
List<CompactionOperation> returned = strategy.orderAndFilter(writeConfig, operations);
assertEquals("UnBounded should not re-order or filter", operations, returned);
}
@Test
public void testBoundedIOSimple() {
Map<Long, List<Long>> sizesMap = Maps.newHashMap();
sizesMap.put(120 * MB, Lists.newArrayList(60 * MB, 10 * MB, 80 * MB));
sizesMap.put(110 * MB, Lists.newArrayList());
sizesMap.put(100 * MB, Lists.newArrayList(MB));
sizesMap.put(90 * MB, Lists.newArrayList(1024 * MB));
BoundedIOCompactionStrategy strategy = new BoundedIOCompactionStrategy();
HoodieWriteConfig writeConfig = HoodieWriteConfig.newBuilder().withPath("/tmp")
.withCompactionConfig(
HoodieCompactionConfig.newBuilder().withCompactionStrategy(strategy)
.withTargetIOPerCompactionInMB(400).build()).build();
List<CompactionOperation> operations = createCompactionOperations(writeConfig, sizesMap);
List<CompactionOperation> returned = strategy.orderAndFilter(writeConfig, operations);
assertTrue("BoundedIOCompaction should have resulted in fewer compactions",
returned.size() < operations.size());
assertEquals("BoundedIOCompaction should have resulted in 2 compactions being chosen",
2, returned.size());
// Total size of all the log files
Long returnedSize = returned.stream()
.map(s -> s.getMetrics().get(BoundedIOCompactionStrategy.TOTAL_IO_MB)).map(s -> (Long) s)
.reduce((size1, size2) -> size1 + size2).orElse(0L);
assertEquals("Should chose the first 2 compactions which should result in a total IO of 690 MB",
610, (long) returnedSize);
}
@Test
public void testLogFileSizeCompactionSimple() {
Map<Long, List<Long>> sizesMap = Maps.newHashMap();
sizesMap.put(120 * MB, Lists.newArrayList(60 * MB, 10 * MB, 80 * MB));
sizesMap.put(110 * MB, Lists.newArrayList());
sizesMap.put(100 * MB, Lists.newArrayList(MB));
sizesMap.put(90 * MB, Lists.newArrayList(1024 * MB));
LogFileSizeBasedCompactionStrategy strategy = new LogFileSizeBasedCompactionStrategy();
HoodieWriteConfig writeConfig = HoodieWriteConfig.newBuilder().withPath("/tmp")
.withCompactionConfig(
HoodieCompactionConfig.newBuilder().withCompactionStrategy(strategy)
.withTargetIOPerCompactionInMB(400).build()).build();
List<CompactionOperation> operations = createCompactionOperations(writeConfig, sizesMap);
List<CompactionOperation> returned = strategy.orderAndFilter(writeConfig, operations);
assertTrue("LogFileSizeBasedCompactionStrategy should have resulted in fewer compactions",
returned.size() < operations.size());
assertEquals("LogFileSizeBasedCompactionStrategy should have resulted in 1 compaction",
1, returned.size());
// Total size of all the log files
Long returnedSize = returned.stream()
.map(s -> s.getMetrics().get(BoundedIOCompactionStrategy.TOTAL_IO_MB)).map(s -> (Long) s)
.reduce((size1, size2) -> size1 + size2).orElse(0L);
assertEquals("Should chose the first 2 compactions which should result in a total IO of 690 MB",
1204, (long) returnedSize);
}
private List<CompactionOperation> createCompactionOperations(HoodieWriteConfig config,
Map<Long, List<Long>> sizesMap) {
List<CompactionOperation> operations = Lists.newArrayList(sizesMap.size());
sizesMap.forEach((k, v) -> {
operations.add(new CompactionOperation(TestHoodieDataFile.newDataFile(k), "",
v.stream().map(TestHoodieLogFile::newLogFile).collect(
Collectors.toList()), config));
});
return operations;
}
}

View File

@@ -0,0 +1,57 @@
/*
* Copyright (c) 2016 Uber Technologies, Inc. (hoodie-dev-group@uber.com)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.uber.hoodie.io.strategy;
import com.uber.hoodie.common.model.HoodieDataFile;
import com.uber.hoodie.common.util.FSUtils;
import java.util.UUID;
import org.apache.hadoop.fs.FileStatus;
public class TestHoodieDataFile extends HoodieDataFile {
private final long size;
public TestHoodieDataFile(long size) {
super(null);
this.size = size;
}
@Override
public String getPath() {
return "/tmp/test";
}
@Override
public String getFileId() {
return UUID.randomUUID().toString();
}
@Override
public String getCommitTime() {
return "100";
}
@Override
public long getFileSize() {
return size;
}
public static HoodieDataFile newDataFile(long size) {
return new TestHoodieDataFile(size);
}
}

View File

@@ -0,0 +1,46 @@
/*
* Copyright (c) 2016 Uber Technologies, Inc. (hoodie-dev-group@uber.com)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.uber.hoodie.io.strategy;
import com.uber.hoodie.common.table.log.HoodieLogFile;
import java.util.Optional;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.Path;
public class TestHoodieLogFile extends HoodieLogFile {
private final long size;
public TestHoodieLogFile(long size) {
super((Path) null);
this.size = size;
}
@Override
public Path getPath() {
return new Path("/tmp/test-log");
}
@Override
public Optional<Long> getFileSize() {
return Optional.of(size);
}
public static HoodieLogFile newLogFile(long size) {
return new TestHoodieLogFile(size);
}
}