From 91b088f29fe71fabbbd7602748f4f7d048b4d108 Mon Sep 17 00:00:00 2001 From: Prasanna Rajaperumal Date: Tue, 4 Apr 2017 12:37:28 -0700 Subject: [PATCH] Implement Compaction policy abstraction. Implement LogSizeBased Bounded IO Compaction as the default strategy --- .../hoodie/config/HoodieCompactionConfig.java | 23 ++++ .../uber/hoodie/config/HoodieWriteConfig.java | 16 ++- .../io/compact/CompactionOperation.java | 89 +++++++------ .../hoodie/io/compact/HoodieCompactor.java | 2 +- .../compact/HoodieRealtimeTableCompactor.java | 6 +- .../strategy/BoundedIOCompactionStrategy.java | 80 ++++++++++++ .../compact/strategy/CompactionStrategy.java | 62 +++++++++ .../LogFileSizeBasedCompactionStrategy.java | 70 ++++++++++ .../strategy/UnBoundedCompactionStrategy.java | 47 +++++++ .../hoodie/table/HoodieMergeOnReadTable.java | 3 +- .../com/uber/hoodie/TestMergeOnReadTable.java | 3 +- .../uber/hoodie/io/TestHoodieCompactor.java | 7 +- .../TestHoodieCompactionStrategy.java | 123 ++++++++++++++++++ .../io/strategy/TestHoodieDataFile.java | 57 ++++++++ .../io/strategy/TestHoodieLogFile.java} | 40 ++++-- .../common/table/log/HoodieLogFile.java | 3 + .../com/uber/hoodie/common/util/FSUtils.java | 4 + .../hoodie/common/util/ReflectionUtils.java | 18 +++ 18 files changed, 585 insertions(+), 68 deletions(-) create mode 100644 hoodie-client/src/main/java/com/uber/hoodie/io/compact/strategy/BoundedIOCompactionStrategy.java create mode 100644 hoodie-client/src/main/java/com/uber/hoodie/io/compact/strategy/CompactionStrategy.java create mode 100644 hoodie-client/src/main/java/com/uber/hoodie/io/compact/strategy/LogFileSizeBasedCompactionStrategy.java create mode 100644 hoodie-client/src/main/java/com/uber/hoodie/io/compact/strategy/UnBoundedCompactionStrategy.java create mode 100644 hoodie-client/src/test/java/com/uber/hoodie/io/strategy/TestHoodieCompactionStrategy.java create mode 100644 hoodie-client/src/test/java/com/uber/hoodie/io/strategy/TestHoodieDataFile.java rename hoodie-client/src/{main/java/com/uber/hoodie/io/compact/CompactionFilter.java => test/java/com/uber/hoodie/io/strategy/TestHoodieLogFile.java} (51%) diff --git a/hoodie-client/src/main/java/com/uber/hoodie/config/HoodieCompactionConfig.java b/hoodie-client/src/main/java/com/uber/hoodie/config/HoodieCompactionConfig.java index da365619b..7b61548af 100644 --- a/hoodie-client/src/main/java/com/uber/hoodie/config/HoodieCompactionConfig.java +++ b/hoodie-client/src/main/java/com/uber/hoodie/config/HoodieCompactionConfig.java @@ -19,6 +19,8 @@ package com.uber.hoodie.config; import com.google.common.base.Preconditions; import com.uber.hoodie.common.model.HoodieCleaningPolicy; +import com.uber.hoodie.io.compact.strategy.CompactionStrategy; +import com.uber.hoodie.io.compact.strategy.LogFileSizeBasedCompactionStrategy; import javax.annotation.concurrent.Immutable; import java.io.File; import java.io.FileReader; @@ -83,6 +85,13 @@ public class HoodieCompactionConfig extends DefaultHoodieConfig { public static final String CLEANER_PARALLELISM = "hoodie.cleaner.parallelism"; public static final String DEFAULT_CLEANER_PARALLELISM = String.valueOf(200); + public static final String TARGET_IO_PER_COMPACTION_IN_MB_PROP = "hoodie.compaction.target.io"; + // 500GB of target IO per compaction (both read and write) + public static final String DEFAULT_TARGET_IO_PER_COMPACTION_IN_MB = String.valueOf(500 * 1024); + + public static final String COMPACTION_STRATEGY_PROP = "hoodie.compaction.strategy"; + // 200GB of target IO per compaction + public static final String DEFAULT_COMPACTION_STRATEGY = LogFileSizeBasedCompactionStrategy.class.getName(); private HoodieCompactionConfig(Properties props) { super(props); @@ -167,6 +176,16 @@ public class HoodieCompactionConfig extends DefaultHoodieConfig { return this; } + public Builder withCompactionStrategy(CompactionStrategy compactionStrategy) { + props.setProperty(COMPACTION_STRATEGY_PROP, compactionStrategy.getClass().getName()); + return this; + } + + public Builder withTargetIOPerCompactionInMB(long targetIOPerCompactionInMB) { + props.setProperty(TARGET_IO_PER_COMPACTION_IN_MB_PROP, String.valueOf(targetIOPerCompactionInMB)); + return this; + } + public HoodieCompactionConfig build() { HoodieCompactionConfig config = new HoodieCompactionConfig(props); setDefaultOnCondition(props, !props.containsKey(AUTO_CLEAN_PROP), @@ -195,6 +214,10 @@ public class HoodieCompactionConfig extends DefaultHoodieConfig { COPY_ON_WRITE_TABLE_RECORD_SIZE_ESTIMATE, DEFAULT_COPY_ON_WRITE_TABLE_RECORD_SIZE_ESTIMATE); setDefaultOnCondition(props, !props.containsKey(CLEANER_PARALLELISM), CLEANER_PARALLELISM, DEFAULT_CLEANER_PARALLELISM); + setDefaultOnCondition(props, !props.containsKey(COMPACTION_STRATEGY_PROP), + COMPACTION_STRATEGY_PROP, DEFAULT_COMPACTION_STRATEGY); + setDefaultOnCondition(props, !props.containsKey(TARGET_IO_PER_COMPACTION_IN_MB_PROP), + TARGET_IO_PER_COMPACTION_IN_MB_PROP, DEFAULT_TARGET_IO_PER_COMPACTION_IN_MB); HoodieCleaningPolicy.valueOf(props.getProperty(CLEANER_POLICY_PROP)); Preconditions.checkArgument( diff --git a/hoodie-client/src/main/java/com/uber/hoodie/config/HoodieWriteConfig.java b/hoodie-client/src/main/java/com/uber/hoodie/config/HoodieWriteConfig.java index bb190fda7..ce70942a3 100644 --- a/hoodie-client/src/main/java/com/uber/hoodie/config/HoodieWriteConfig.java +++ b/hoodie-client/src/main/java/com/uber/hoodie/config/HoodieWriteConfig.java @@ -19,9 +19,9 @@ package com.uber.hoodie.config; import com.google.common.base.Preconditions; import com.uber.hoodie.common.model.HoodieCleaningPolicy; -import com.uber.hoodie.config.HoodieCompactionConfig.Builder; +import com.uber.hoodie.common.util.ReflectionUtils; import com.uber.hoodie.index.HoodieIndex; -import com.uber.hoodie.io.HoodieCleaner; +import com.uber.hoodie.io.compact.strategy.CompactionStrategy; import com.uber.hoodie.metrics.MetricsReporterType; import org.apache.spark.storage.StorageLevel; @@ -164,6 +164,14 @@ public class HoodieWriteConfig extends DefaultHoodieConfig { props.getProperty(HoodieCompactionConfig.INLINE_COMPACT_NUM_DELTA_COMMITS_PROP)); } + public CompactionStrategy getCompactionStrategy() { + return ReflectionUtils.loadClass(props.getProperty(HoodieCompactionConfig.COMPACTION_STRATEGY_PROP)); + } + + public Long getTargetIOPerCompactionInMB() { + return Long.parseLong(props.getProperty(HoodieCompactionConfig.TARGET_IO_PER_COMPACTION_IN_MB_PROP)); + } + /** * index properties **/ @@ -238,7 +246,9 @@ public class HoodieWriteConfig extends DefaultHoodieConfig { return new Builder(); } - public static class Builder { + + + public static class Builder { private final Properties props = new Properties(); private boolean isIndexConfigSet = false; private boolean isStorageConfigSet = false; diff --git a/hoodie-client/src/main/java/com/uber/hoodie/io/compact/CompactionOperation.java b/hoodie-client/src/main/java/com/uber/hoodie/io/compact/CompactionOperation.java index 785d4a53a..ed3645ddd 100644 --- a/hoodie-client/src/main/java/com/uber/hoodie/io/compact/CompactionOperation.java +++ b/hoodie-client/src/main/java/com/uber/hoodie/io/compact/CompactionOperation.java @@ -19,61 +19,72 @@ package com.uber.hoodie.io.compact; import com.uber.hoodie.common.model.HoodieDataFile; import com.uber.hoodie.common.table.log.HoodieLogFile; +import com.uber.hoodie.config.HoodieWriteConfig; +import com.uber.hoodie.io.compact.strategy.CompactionStrategy; import java.io.Serializable; import java.util.List; +import java.util.Map; import java.util.stream.Collectors; /** * Encapsulates all the needed information about a compaction * and make a decision whether this compaction is effective or not * - * @see CompactionFilter + * @see CompactionStrategy */ public class CompactionOperation implements Serializable { - private String dataFileCommitTime; - private long dataFileSize; - private List deltaFilePaths; - private String dataFilePath; - private String fileId; - private String partitionPath; - //Only for serialization/de-serialization - @Deprecated - public CompactionOperation() { - } + private String dataFileCommitTime; + private long dataFileSize; + private List deltaFilePaths; + private String dataFilePath; + private String fileId; + private String partitionPath; + private Map metrics; - public CompactionOperation(HoodieDataFile dataFile, String partitionPath, - List value) { - this.dataFilePath = dataFile.getPath(); - this.fileId = dataFile.getFileId(); - this.partitionPath = partitionPath; - this.dataFileCommitTime = dataFile.getCommitTime(); - this.dataFileSize = dataFile.getFileStatus().getLen(); - this.deltaFilePaths = value.stream().map(s -> s.getPath().toString()).collect( - Collectors.toList()); - } + //Only for serialization/de-serialization + @Deprecated + public CompactionOperation() { + } - public String getDataFileCommitTime() { - return dataFileCommitTime; - } + public CompactionOperation(HoodieDataFile dataFile, String partitionPath, + List logFiles, HoodieWriteConfig writeConfig) { + this.dataFilePath = dataFile.getPath(); + this.fileId = dataFile.getFileId(); + this.partitionPath = partitionPath; + this.dataFileCommitTime = dataFile.getCommitTime(); + this.dataFileSize = dataFile.getFileSize(); + this.deltaFilePaths = logFiles.stream().map(s -> s.getPath().toString()).collect( + Collectors.toList()); + this.metrics = writeConfig.getCompactionStrategy() + .captureMetrics(dataFile, partitionPath, logFiles); + } - public long getDataFileSize() { - return dataFileSize; - } + public String getDataFileCommitTime() { + return dataFileCommitTime; + } - public List getDeltaFilePaths() { - return deltaFilePaths; - } + public long getDataFileSize() { + return dataFileSize; + } - public String getDataFilePath() { - return dataFilePath; - } + public List getDeltaFilePaths() { + return deltaFilePaths; + } - public String getFileId() { - return fileId; - } + public String getDataFilePath() { + return dataFilePath; + } - public String getPartitionPath() { - return partitionPath; - } + public String getFileId() { + return fileId; + } + + public String getPartitionPath() { + return partitionPath; + } + + public Map getMetrics() { + return metrics; + } } diff --git a/hoodie-client/src/main/java/com/uber/hoodie/io/compact/HoodieCompactor.java b/hoodie-client/src/main/java/com/uber/hoodie/io/compact/HoodieCompactor.java index d7d541bfd..8032cec41 100644 --- a/hoodie-client/src/main/java/com/uber/hoodie/io/compact/HoodieCompactor.java +++ b/hoodie-client/src/main/java/com/uber/hoodie/io/compact/HoodieCompactor.java @@ -36,7 +36,7 @@ public interface HoodieCompactor extends Serializable { * @throws Exception */ HoodieCompactionMetadata compact(JavaSparkContext jsc, final HoodieWriteConfig config, - HoodieTable hoodieTable, CompactionFilter compactionFilter) throws Exception; + HoodieTable hoodieTable) throws Exception; // Helper methods diff --git a/hoodie-client/src/main/java/com/uber/hoodie/io/compact/HoodieRealtimeTableCompactor.java b/hoodie-client/src/main/java/com/uber/hoodie/io/compact/HoodieRealtimeTableCompactor.java index 3cf9d7869..e7efcfa45 100644 --- a/hoodie-client/src/main/java/com/uber/hoodie/io/compact/HoodieRealtimeTableCompactor.java +++ b/hoodie-client/src/main/java/com/uber/hoodie/io/compact/HoodieRealtimeTableCompactor.java @@ -64,7 +64,7 @@ public class HoodieRealtimeTableCompactor implements HoodieCompactor { @Override public HoodieCompactionMetadata compact(JavaSparkContext jsc, HoodieWriteConfig config, - HoodieTable hoodieTable, CompactionFilter compactionFilter) throws IOException { + HoodieTable hoodieTable) throws IOException { Preconditions.checkArgument( hoodieTable.getMetaClient().getTableType() == HoodieTableType.MERGE_ON_READ, "HoodieRealtimeTableCompactor can only compact table of type " @@ -86,12 +86,12 @@ public class HoodieRealtimeTableCompactor implements HoodieCompactor { .getFileSystemView() .groupLatestDataFileWithLogFiles(partitionPath).entrySet() .stream() - .map(s -> new CompactionOperation(s.getKey(), partitionPath, s.getValue())) + .map(s -> new CompactionOperation(s.getKey(), partitionPath, s.getValue(), config)) .collect(toList()).iterator()).collect(); log.info("Total of " + operations.size() + " compactions are retrieved"); // Filter the compactions with the passed in filter. This lets us choose most effective compactions only - operations = compactionFilter.filter(operations); + operations = config.getCompactionStrategy().orderAndFilter(config, operations); if (operations.isEmpty()) { log.warn("After filtering, Nothing to compact for " + metaClient.getBasePath()); return null; diff --git a/hoodie-client/src/main/java/com/uber/hoodie/io/compact/strategy/BoundedIOCompactionStrategy.java b/hoodie-client/src/main/java/com/uber/hoodie/io/compact/strategy/BoundedIOCompactionStrategy.java new file mode 100644 index 000000000..81d2b378d --- /dev/null +++ b/hoodie-client/src/main/java/com/uber/hoodie/io/compact/strategy/BoundedIOCompactionStrategy.java @@ -0,0 +1,80 @@ +/* + * Copyright (c) 2016 Uber Technologies, Inc. (hoodie-dev-group@uber.com) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.uber.hoodie.io.compact.strategy; + +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; +import com.uber.hoodie.common.model.HoodieDataFile; +import com.uber.hoodie.common.table.log.HoodieLogFile; +import com.uber.hoodie.common.util.FSUtils; +import com.uber.hoodie.config.HoodieWriteConfig; +import com.uber.hoodie.io.compact.CompactionOperation; +import java.util.List; +import java.util.Map; +import java.util.Optional; + +/** + * CompactionStrategy which looks at total IO to be done for the compaction (read + write) + * and limits the list of compactions to be under a configured limit on the IO + * + * @see CompactionStrategy + */ +public class BoundedIOCompactionStrategy implements CompactionStrategy { + + public static final String TOTAL_IO_READ_MB = "TOTAL_IO_READ_MB"; + public static final String TOTAL_IO_WRITE_MB = "TOTAL_IO_WRITE_MB"; + public static final String TOTAL_IO_MB = "TOTAL_IO_MB"; + + @Override + public Map captureMetrics(HoodieDataFile dataFile, String partitionPath, + List logFiles) { + Map metrics = Maps.newHashMap(); + // Total size of all the log files + Long totalLogFileSize = logFiles.stream().map(HoodieLogFile::getFileSize).filter( + Optional::isPresent).map(Optional::get).reduce( + (size1, size2) -> size1 + size2).orElse(0L); + // Total read will be the base file + all the log files + Long totalIORead = FSUtils.getSizeInMB(dataFile.getFileSize() + totalLogFileSize); + // Total write will be similar to the size of the base file + Long totalIOWrite = FSUtils.getSizeInMB(dataFile.getFileSize()); + // Total IO will the the IO for read + write + Long totalIO = totalIORead + totalIOWrite; + // Save these metrics and we will use during the filter + metrics.put(TOTAL_IO_READ_MB, totalIORead); + metrics.put(TOTAL_IO_WRITE_MB, totalIOWrite); + metrics.put(TOTAL_IO_MB, totalIO); + return metrics; + + } + + @Override + public List orderAndFilter(HoodieWriteConfig writeConfig, List operations) { + // Iterate through the operations in order and accept operations as long as we are within the IO limit + // Preserves the original ordering of compactions + List finalOperations = Lists.newArrayList(); + long targetIORemaining = writeConfig.getTargetIOPerCompactionInMB(); + for (CompactionOperation op : operations) { + long opIo = (Long) op.getMetrics().get(TOTAL_IO_MB); + targetIORemaining -= opIo; + finalOperations.add(op); + if (targetIORemaining <= 0) { + return finalOperations; + } + } + return finalOperations; + } +} diff --git a/hoodie-client/src/main/java/com/uber/hoodie/io/compact/strategy/CompactionStrategy.java b/hoodie-client/src/main/java/com/uber/hoodie/io/compact/strategy/CompactionStrategy.java new file mode 100644 index 000000000..b0133cec1 --- /dev/null +++ b/hoodie-client/src/main/java/com/uber/hoodie/io/compact/strategy/CompactionStrategy.java @@ -0,0 +1,62 @@ +/* + * Copyright (c) 2016 Uber Technologies, Inc. (hoodie-dev-group@uber.com) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.uber.hoodie.io.compact.strategy; + +import com.uber.hoodie.common.model.HoodieDataFile; +import com.uber.hoodie.common.table.log.HoodieLogFile; +import com.uber.hoodie.config.HoodieWriteConfig; +import com.uber.hoodie.io.compact.CompactionOperation; +import java.io.Serializable; +import java.util.List; +import java.util.Map; + +/** + * Strategy for compaction. Pluggable implementation of define how compaction should be done. + * The implementations of this interface can capture the relevant metrics to order and filter + * the final list of compaction operation to run in a single compaction. + * + * Implementation of CompactionStrategy cannot hold any state. + * Difference instantiations can be passed in every time + * + * @see com.uber.hoodie.io.compact.HoodieRealtimeTableCompactor + * @see CompactionOperation + */ +public interface CompactionStrategy extends Serializable { + + /** + * Callback hook when a CompactionOperation is created. Individual strategies can + * capture the metrics they need to decide on the priority. + * + * @param dataFile - Base file to compact + * @param partitionPath - Partition path + * @param logFiles - List of log files to compact with the base file + * @return Map[String, Object] - metrics captured + */ + Map captureMetrics(HoodieDataFile dataFile, String partitionPath, + List logFiles); + + /** + * Order and Filter the list of compactions. Use the metrics captured with the + * captureMetrics to order and filter out compactions + * + * @param writeConfig - HoodieWriteConfig - config for this compaction is passed in + * @param operations - list of compactions collected + * @return list of compactions to perform in this run + */ + List orderAndFilter(HoodieWriteConfig writeConfig, + List operations); +} diff --git a/hoodie-client/src/main/java/com/uber/hoodie/io/compact/strategy/LogFileSizeBasedCompactionStrategy.java b/hoodie-client/src/main/java/com/uber/hoodie/io/compact/strategy/LogFileSizeBasedCompactionStrategy.java new file mode 100644 index 000000000..592f1124f --- /dev/null +++ b/hoodie-client/src/main/java/com/uber/hoodie/io/compact/strategy/LogFileSizeBasedCompactionStrategy.java @@ -0,0 +1,70 @@ +/* + * Copyright (c) 2016 Uber Technologies, Inc. (hoodie-dev-group@uber.com) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.uber.hoodie.io.compact.strategy; + +import com.uber.hoodie.common.model.HoodieDataFile; +import com.uber.hoodie.common.table.log.HoodieLogFile; +import com.uber.hoodie.config.HoodieWriteConfig; +import com.uber.hoodie.io.compact.CompactionOperation; +import java.util.Comparator; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.stream.Collectors; + +/** + * LogFileSizeBasedCompactionStrategy orders the compactions based on the total log files size + * and limits the compactions within a configured IO bound + * + * @see BoundedIOCompactionStrategy + * @see CompactionStrategy + */ +public class LogFileSizeBasedCompactionStrategy extends BoundedIOCompactionStrategy implements + Comparator { + + private static final String TOTAL_LOG_FILE_SIZE = "TOTAL_LOG_FILE_SIZE"; + + @Override + public Map captureMetrics(HoodieDataFile dataFile, String partitionPath, + List logFiles) { + + Map metrics = super.captureMetrics(dataFile, partitionPath, logFiles); + // Total size of all the log files + Long totalLogFileSize = logFiles.stream().map(HoodieLogFile::getFileSize).filter( + Optional::isPresent).map(Optional::get).reduce( + (size1, size2) -> size1 + size2).orElse(0L); + // save the metrics needed during the order + metrics.put(TOTAL_LOG_FILE_SIZE, totalLogFileSize); + return metrics; + } + + @Override + public List orderAndFilter(HoodieWriteConfig writeConfig, + List operations) { + // Order the operations based on the reverse size of the logs and limit them by the IO + return super + .orderAndFilter(writeConfig, operations.stream().sorted(this).collect(Collectors.toList())); + } + + @Override + public int compare(CompactionOperation op1, CompactionOperation op2) { + Long totalLogSize1 = (Long) op1.getMetrics().get(TOTAL_LOG_FILE_SIZE); + Long totalLogSize2 = (Long) op2.getMetrics().get(TOTAL_LOG_FILE_SIZE); + // Reverse the comparison order - so that larger log file size is compacted first + return totalLogSize2.compareTo(totalLogSize1); + } +} diff --git a/hoodie-client/src/main/java/com/uber/hoodie/io/compact/strategy/UnBoundedCompactionStrategy.java b/hoodie-client/src/main/java/com/uber/hoodie/io/compact/strategy/UnBoundedCompactionStrategy.java new file mode 100644 index 000000000..0693e1cf4 --- /dev/null +++ b/hoodie-client/src/main/java/com/uber/hoodie/io/compact/strategy/UnBoundedCompactionStrategy.java @@ -0,0 +1,47 @@ +/* + * Copyright (c) 2016 Uber Technologies, Inc. (hoodie-dev-group@uber.com) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.uber.hoodie.io.compact.strategy; + +import com.google.common.collect.Maps; +import com.uber.hoodie.common.model.HoodieDataFile; +import com.uber.hoodie.common.table.log.HoodieLogFile; +import com.uber.hoodie.config.HoodieWriteConfig; +import com.uber.hoodie.io.compact.CompactionOperation; +import java.util.List; +import java.util.Map; + +/** + * UnBoundedCompactionStrategy will not change ordering or filter any compaction. + * It is a pass-through and will compact all the base files which has a log file. + * This usually means no-intelligence on compaction. + * + * @see CompactionStrategy + */ +public class UnBoundedCompactionStrategy implements CompactionStrategy { + + @Override + public Map captureMetrics(HoodieDataFile dataFile, String partitionPath, + List logFiles) { + return Maps.newHashMap(); + } + + @Override + public List orderAndFilter(HoodieWriteConfig config, + List operations) { + return operations; + } +} diff --git a/hoodie-client/src/main/java/com/uber/hoodie/table/HoodieMergeOnReadTable.java b/hoodie-client/src/main/java/com/uber/hoodie/table/HoodieMergeOnReadTable.java index 7ee050c5d..04f5e9300 100644 --- a/hoodie-client/src/main/java/com/uber/hoodie/table/HoodieMergeOnReadTable.java +++ b/hoodie-client/src/main/java/com/uber/hoodie/table/HoodieMergeOnReadTable.java @@ -25,7 +25,6 @@ import com.uber.hoodie.common.table.timeline.HoodieInstant; import com.uber.hoodie.config.HoodieWriteConfig; import com.uber.hoodie.exception.HoodieCompactionException; import com.uber.hoodie.io.HoodieAppendHandle; -import com.uber.hoodie.io.compact.CompactionFilter; import com.uber.hoodie.io.compact.HoodieRealtimeTableCompactor; import java.util.Optional; import org.apache.log4j.LogManager; @@ -89,7 +88,7 @@ public class HoodieMergeOnReadTable extends Hoodi logger.info("Compacting merge on read table " + config.getBasePath()); HoodieRealtimeTableCompactor compactor = new HoodieRealtimeTableCompactor(); try { - return Optional.of(compactor.compact(jsc, config, this, CompactionFilter.allowAll())); + return Optional.of(compactor.compact(jsc, config, this)); } catch (IOException e) { throw new HoodieCompactionException("Could not compact " + config.getBasePath(), e); } diff --git a/hoodie-client/src/test/java/com/uber/hoodie/TestMergeOnReadTable.java b/hoodie-client/src/test/java/com/uber/hoodie/TestMergeOnReadTable.java index 16f422d5d..a76dcf807 100644 --- a/hoodie-client/src/test/java/com/uber/hoodie/TestMergeOnReadTable.java +++ b/hoodie-client/src/test/java/com/uber/hoodie/TestMergeOnReadTable.java @@ -32,7 +32,6 @@ import com.uber.hoodie.config.HoodieIndexConfig; import com.uber.hoodie.config.HoodieStorageConfig; import com.uber.hoodie.config.HoodieWriteConfig; import com.uber.hoodie.index.HoodieIndex; -import com.uber.hoodie.io.compact.CompactionFilter; import com.uber.hoodie.io.compact.HoodieCompactor; import com.uber.hoodie.io.compact.HoodieRealtimeTableCompactor; import com.uber.hoodie.table.HoodieTable; @@ -165,7 +164,7 @@ public class TestMergeOnReadTable { HoodieCompactor compactor = new HoodieRealtimeTableCompactor(); HoodieTable table = HoodieTable.getHoodieTable(metaClient, getConfig()); - compactor.compact(jsc, getConfig(), table, CompactionFilter.allowAll()); + compactor.compact(jsc, getConfig(), table); metaClient = new HoodieTableMetaClient(fs, cfg.getBasePath()); allFiles = HoodieTestUtils.listAllDataFilesInPath(fs, cfg.getBasePath()); diff --git a/hoodie-client/src/test/java/com/uber/hoodie/io/TestHoodieCompactor.java b/hoodie-client/src/test/java/com/uber/hoodie/io/TestHoodieCompactor.java index f40aa6ee1..a230364f0 100644 --- a/hoodie-client/src/test/java/com/uber/hoodie/io/TestHoodieCompactor.java +++ b/hoodie-client/src/test/java/com/uber/hoodie/io/TestHoodieCompactor.java @@ -36,7 +36,6 @@ import com.uber.hoodie.config.HoodieStorageConfig; import com.uber.hoodie.config.HoodieWriteConfig; import com.uber.hoodie.index.HoodieBloomIndex; import com.uber.hoodie.index.HoodieIndex; -import com.uber.hoodie.io.compact.CompactionFilter; import com.uber.hoodie.io.compact.HoodieCompactor; import com.uber.hoodie.io.compact.HoodieRealtimeTableCompactor; import com.uber.hoodie.table.HoodieTable; @@ -113,7 +112,7 @@ public class TestHoodieCompactor { HoodieTableMetaClient metaClient = new HoodieTableMetaClient(FSUtils.getFs(), basePath); HoodieTable table = HoodieTable.getHoodieTable(metaClient, getConfig()); - compactor.compact(jsc, getConfig(), table, CompactionFilter.allowAll()); + compactor.compact(jsc, getConfig(), table); } @Test @@ -129,7 +128,7 @@ public class TestHoodieCompactor { writeClient.insert(recordsRDD, newCommitTime).collect(); HoodieCompactionMetadata result = - compactor.compact(jsc, getConfig(), table, CompactionFilter.allowAll()); + compactor.compact(jsc, getConfig(), table); assertTrue("If there is nothing to compact, result will be empty", result.getFileIdAndFullPaths().isEmpty()); } @@ -177,7 +176,7 @@ public class TestHoodieCompactor { table = HoodieTable.getHoodieTable(metaClient, config); HoodieCompactionMetadata result = - compactor.compact(jsc, getConfig(), table, CompactionFilter.allowAll()); + compactor.compact(jsc, getConfig(), table); // Verify that recently written compacted data file has no log file metaClient = new HoodieTableMetaClient(fs, basePath); diff --git a/hoodie-client/src/test/java/com/uber/hoodie/io/strategy/TestHoodieCompactionStrategy.java b/hoodie-client/src/test/java/com/uber/hoodie/io/strategy/TestHoodieCompactionStrategy.java new file mode 100644 index 000000000..42e705f94 --- /dev/null +++ b/hoodie-client/src/test/java/com/uber/hoodie/io/strategy/TestHoodieCompactionStrategy.java @@ -0,0 +1,123 @@ +/* + * Copyright (c) 2016 Uber Technologies, Inc. (hoodie-dev-group@uber.com) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.uber.hoodie.io.strategy; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertThat; +import static org.junit.Assert.assertTrue; + +import com.beust.jcommander.internal.Lists; +import com.google.common.collect.Maps; +import com.uber.hoodie.common.model.HoodieDataFile; +import com.uber.hoodie.common.table.log.HoodieLogFile; +import com.uber.hoodie.config.HoodieCompactionConfig; +import com.uber.hoodie.config.HoodieWriteConfig; +import com.uber.hoodie.io.compact.CompactionOperation; +import com.uber.hoodie.io.compact.strategy.BoundedIOCompactionStrategy; +import com.uber.hoodie.io.compact.strategy.LogFileSizeBasedCompactionStrategy; +import com.uber.hoodie.io.compact.strategy.UnBoundedCompactionStrategy; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.stream.Collectors; +import org.junit.Test; + +public class TestHoodieCompactionStrategy { + + private static final long MB = 1024 * 1024L; + + @Test + public void testUnBounded() { + Map> sizesMap = Maps.newHashMap(); + sizesMap.put(120 * MB, Lists.newArrayList(60 * MB, 10 * MB, 80 * MB)); + sizesMap.put(110 * MB, Lists.newArrayList()); + sizesMap.put(100 * MB, Lists.newArrayList(MB)); + sizesMap.put(90 * MB, Lists.newArrayList(1024 * MB)); + UnBoundedCompactionStrategy strategy = new UnBoundedCompactionStrategy(); + HoodieWriteConfig writeConfig = HoodieWriteConfig.newBuilder().withPath("/tmp") + .withCompactionConfig( + HoodieCompactionConfig.newBuilder().withCompactionStrategy(strategy).build()).build(); + List operations = createCompactionOperations(writeConfig, sizesMap); + List returned = strategy.orderAndFilter(writeConfig, operations); + assertEquals("UnBounded should not re-order or filter", operations, returned); + } + + @Test + public void testBoundedIOSimple() { + Map> sizesMap = Maps.newHashMap(); + sizesMap.put(120 * MB, Lists.newArrayList(60 * MB, 10 * MB, 80 * MB)); + sizesMap.put(110 * MB, Lists.newArrayList()); + sizesMap.put(100 * MB, Lists.newArrayList(MB)); + sizesMap.put(90 * MB, Lists.newArrayList(1024 * MB)); + BoundedIOCompactionStrategy strategy = new BoundedIOCompactionStrategy(); + HoodieWriteConfig writeConfig = HoodieWriteConfig.newBuilder().withPath("/tmp") + .withCompactionConfig( + HoodieCompactionConfig.newBuilder().withCompactionStrategy(strategy) + .withTargetIOPerCompactionInMB(400).build()).build(); + List operations = createCompactionOperations(writeConfig, sizesMap); + List returned = strategy.orderAndFilter(writeConfig, operations); + + assertTrue("BoundedIOCompaction should have resulted in fewer compactions", + returned.size() < operations.size()); + assertEquals("BoundedIOCompaction should have resulted in 2 compactions being chosen", + 2, returned.size()); + // Total size of all the log files + Long returnedSize = returned.stream() + .map(s -> s.getMetrics().get(BoundedIOCompactionStrategy.TOTAL_IO_MB)).map(s -> (Long) s) + .reduce((size1, size2) -> size1 + size2).orElse(0L); + assertEquals("Should chose the first 2 compactions which should result in a total IO of 690 MB", + 610, (long) returnedSize); + } + + @Test + public void testLogFileSizeCompactionSimple() { + Map> sizesMap = Maps.newHashMap(); + sizesMap.put(120 * MB, Lists.newArrayList(60 * MB, 10 * MB, 80 * MB)); + sizesMap.put(110 * MB, Lists.newArrayList()); + sizesMap.put(100 * MB, Lists.newArrayList(MB)); + sizesMap.put(90 * MB, Lists.newArrayList(1024 * MB)); + LogFileSizeBasedCompactionStrategy strategy = new LogFileSizeBasedCompactionStrategy(); + HoodieWriteConfig writeConfig = HoodieWriteConfig.newBuilder().withPath("/tmp") + .withCompactionConfig( + HoodieCompactionConfig.newBuilder().withCompactionStrategy(strategy) + .withTargetIOPerCompactionInMB(400).build()).build(); + List operations = createCompactionOperations(writeConfig, sizesMap); + List returned = strategy.orderAndFilter(writeConfig, operations); + + assertTrue("LogFileSizeBasedCompactionStrategy should have resulted in fewer compactions", + returned.size() < operations.size()); + assertEquals("LogFileSizeBasedCompactionStrategy should have resulted in 1 compaction", + 1, returned.size()); + // Total size of all the log files + Long returnedSize = returned.stream() + .map(s -> s.getMetrics().get(BoundedIOCompactionStrategy.TOTAL_IO_MB)).map(s -> (Long) s) + .reduce((size1, size2) -> size1 + size2).orElse(0L); + assertEquals("Should chose the first 2 compactions which should result in a total IO of 690 MB", + 1204, (long) returnedSize); + } + + private List createCompactionOperations(HoodieWriteConfig config, + Map> sizesMap) { + List operations = Lists.newArrayList(sizesMap.size()); + sizesMap.forEach((k, v) -> { + operations.add(new CompactionOperation(TestHoodieDataFile.newDataFile(k), "", + v.stream().map(TestHoodieLogFile::newLogFile).collect( + Collectors.toList()), config)); + }); + return operations; + } +} diff --git a/hoodie-client/src/test/java/com/uber/hoodie/io/strategy/TestHoodieDataFile.java b/hoodie-client/src/test/java/com/uber/hoodie/io/strategy/TestHoodieDataFile.java new file mode 100644 index 000000000..6d6219ff6 --- /dev/null +++ b/hoodie-client/src/test/java/com/uber/hoodie/io/strategy/TestHoodieDataFile.java @@ -0,0 +1,57 @@ +/* + * Copyright (c) 2016 Uber Technologies, Inc. (hoodie-dev-group@uber.com) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.uber.hoodie.io.strategy; + +import com.uber.hoodie.common.model.HoodieDataFile; +import com.uber.hoodie.common.util.FSUtils; +import java.util.UUID; +import org.apache.hadoop.fs.FileStatus; + +public class TestHoodieDataFile extends HoodieDataFile { + + private final long size; + + public TestHoodieDataFile(long size) { + super(null); + this.size = size; + } + + @Override + public String getPath() { + return "/tmp/test"; + } + + @Override + public String getFileId() { + return UUID.randomUUID().toString(); + } + + @Override + public String getCommitTime() { + return "100"; + } + + + @Override + public long getFileSize() { + return size; + } + + public static HoodieDataFile newDataFile(long size) { + return new TestHoodieDataFile(size); + } +} diff --git a/hoodie-client/src/main/java/com/uber/hoodie/io/compact/CompactionFilter.java b/hoodie-client/src/test/java/com/uber/hoodie/io/strategy/TestHoodieLogFile.java similarity index 51% rename from hoodie-client/src/main/java/com/uber/hoodie/io/compact/CompactionFilter.java rename to hoodie-client/src/test/java/com/uber/hoodie/io/strategy/TestHoodieLogFile.java index 81762144f..45e0db584 100644 --- a/hoodie-client/src/main/java/com/uber/hoodie/io/compact/CompactionFilter.java +++ b/hoodie-client/src/test/java/com/uber/hoodie/io/strategy/TestHoodieLogFile.java @@ -14,21 +14,33 @@ * limitations under the License. */ -package com.uber.hoodie.io.compact; +package com.uber.hoodie.io.strategy; -import java.util.List; +import com.uber.hoodie.common.table.log.HoodieLogFile; +import java.util.Optional; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.Path; -/** - * Implementations of CompactionFilter allows prioritizing and filtering certain type of - * compactions over other compactions. - * - * e.g. Filter in-efficient compaction like compacting a very large old parquet file with a small avro file - */ -public interface CompactionFilter { - List filter(List input); +public class TestHoodieLogFile extends HoodieLogFile { - // Default implementation - do not filter anything - static CompactionFilter allowAll() { - return s -> s; - } + private final long size; + + public TestHoodieLogFile(long size) { + super((Path) null); + this.size = size; + } + + @Override + public Path getPath() { + return new Path("/tmp/test-log"); + } + + @Override + public Optional getFileSize() { + return Optional.of(size); + } + + public static HoodieLogFile newLogFile(long size) { + return new TestHoodieLogFile(size); + } } diff --git a/hoodie-common/src/main/java/com/uber/hoodie/common/table/log/HoodieLogFile.java b/hoodie-common/src/main/java/com/uber/hoodie/common/table/log/HoodieLogFile.java index 8759c86a5..eb38679b6 100644 --- a/hoodie-common/src/main/java/com/uber/hoodie/common/table/log/HoodieLogFile.java +++ b/hoodie-common/src/main/java/com/uber/hoodie/common/table/log/HoodieLogFile.java @@ -75,6 +75,9 @@ public class HoodieLogFile { return fileStatus; } + public Optional getFileSize() { + return fileStatus.map(FileStatus::getLen); + } public HoodieLogFile rollOver(FileSystem fs) throws IOException { String fileId = getFileId(); diff --git a/hoodie-common/src/main/java/com/uber/hoodie/common/util/FSUtils.java b/hoodie-common/src/main/java/com/uber/hoodie/common/util/FSUtils.java index d973cdbf1..78ad5035f 100644 --- a/hoodie-common/src/main/java/com/uber/hoodie/common/util/FSUtils.java +++ b/hoodie-common/src/main/java/com/uber/hoodie/common/util/FSUtils.java @@ -351,4 +351,8 @@ public class FSUtils { fs.mkdirs(partitionPath); } } + + public static Long getSizeInMB(long sizeInBytes) { + return sizeInBytes / (1024 * 1024); + } } diff --git a/hoodie-common/src/main/java/com/uber/hoodie/common/util/ReflectionUtils.java b/hoodie-common/src/main/java/com/uber/hoodie/common/util/ReflectionUtils.java index e32ddc558..422e22d40 100644 --- a/hoodie-common/src/main/java/com/uber/hoodie/common/util/ReflectionUtils.java +++ b/hoodie-common/src/main/java/com/uber/hoodie/common/util/ReflectionUtils.java @@ -18,6 +18,7 @@ package com.uber.hoodie.common.util; import com.uber.hoodie.common.model.HoodieRecordPayload; +import com.uber.hoodie.exception.HoodieException; import java.io.IOException; import java.util.HashMap; import java.util.Map; @@ -40,4 +41,21 @@ public class ReflectionUtils { throw new IOException("Could not load payload class " + recordPayloadClass, e); } } + + public static T loadClass(String fqcn) { + try { + if(clazzCache.get(fqcn) == null) { + Class clazz = Class.forName(fqcn); + clazzCache.put(fqcn, clazz); + } + return (T) clazzCache.get(fqcn).newInstance(); + } catch (ClassNotFoundException e) { + throw new HoodieException("Could not load class " + fqcn, e); + } catch (InstantiationException e) { + throw new HoodieException("Could not load class " + fqcn, e); + } catch (IllegalAccessException e) { + throw new HoodieException("Could not load class " + fqcn, e); + } + } + }