diff --git a/hoodie-client/src/main/java/com/uber/hoodie/config/HoodieCompactionConfig.java b/hoodie-client/src/main/java/com/uber/hoodie/config/HoodieCompactionConfig.java index d1584aac0..97c5e5c68 100644 --- a/hoodie-client/src/main/java/com/uber/hoodie/config/HoodieCompactionConfig.java +++ b/hoodie-client/src/main/java/com/uber/hoodie/config/HoodieCompactionConfig.java @@ -108,6 +108,7 @@ public class HoodieCompactionConfig extends DefaultHoodieConfig { + ".partitions"; // 500GB of target IO per compaction (both read and write) public static final String DEFAULT_TARGET_PARTITIONS_PER_DAYBASED_COMPACTION = String.valueOf(10); + public static final String DEFAULT_COMPACTOR_ID = "default"; private HoodieCompactionConfig(Properties props) { super(props); diff --git a/hoodie-client/src/main/java/com/uber/hoodie/io/compact/HoodieCompactor.java b/hoodie-client/src/main/java/com/uber/hoodie/io/compact/HoodieCompactor.java index 7475176c4..ad1c18e2f 100644 --- a/hoodie-client/src/main/java/com/uber/hoodie/io/compact/HoodieCompactor.java +++ b/hoodie-client/src/main/java/com/uber/hoodie/io/compact/HoodieCompactor.java @@ -17,13 +17,11 @@ package com.uber.hoodie.io.compact; import com.uber.hoodie.WriteStatus; -import com.uber.hoodie.common.table.HoodieTimeline; -import com.uber.hoodie.common.table.timeline.HoodieActiveTimeline; -import com.uber.hoodie.common.table.timeline.HoodieInstant; +import com.uber.hoodie.avro.model.HoodieCompactionPlan; import com.uber.hoodie.config.HoodieWriteConfig; import com.uber.hoodie.table.HoodieTable; +import java.io.IOException; import java.io.Serializable; -import java.util.Date; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; @@ -34,17 +32,31 @@ public interface HoodieCompactor extends Serializable { /** * Compact the delta files with the data files + * + * @deprecated : Will be removed in next PR */ + @Deprecated JavaRDD compact(JavaSparkContext jsc, final HoodieWriteConfig config, HoodieTable hoodieTable, String compactionCommitTime) throws Exception; + /** + * Generate a new compaction plan for scheduling + * + * @param jsc Spark Context + * @param hoodieTable Hoodie Table + * @param config Hoodie Write Configuration + * @param compactionCommitTime scheduled compaction commit time + * @return Compaction Plan + * @throws IOException when encountering errors + */ + HoodieCompactionPlan generateCompactionPlan(JavaSparkContext jsc, + HoodieTable hoodieTable, HoodieWriteConfig config, String compactionCommitTime) + throws IOException; - // Helper methods - default String startCompactionCommit(HoodieTable hoodieTable) { - String commitTime = HoodieActiveTimeline.COMMIT_FORMATTER.format(new Date()); - HoodieActiveTimeline activeTimeline = hoodieTable.getActiveTimeline(); - activeTimeline - .createInflight(new HoodieInstant(true, HoodieTimeline.COMMIT_ACTION, commitTime)); - return commitTime; - } -} + /** + * Execute compaction operations and report back status + */ + JavaRDD compact(JavaSparkContext jsc, + HoodieCompactionPlan compactionPlan, HoodieTable hoodieTable, HoodieWriteConfig config, + String compactionCommitTime) throws IOException; +} \ No newline at end of file diff --git a/hoodie-client/src/main/java/com/uber/hoodie/io/compact/HoodieRealtimeTableCompactor.java b/hoodie-client/src/main/java/com/uber/hoodie/io/compact/HoodieRealtimeTableCompactor.java index 5a60f0d8a..b314e8eb3 100644 --- a/hoodie-client/src/main/java/com/uber/hoodie/io/compact/HoodieRealtimeTableCompactor.java +++ b/hoodie-client/src/main/java/com/uber/hoodie/io/compact/HoodieRealtimeTableCompactor.java @@ -22,6 +22,10 @@ import com.google.common.base.Preconditions; import com.google.common.collect.Lists; import com.google.common.collect.Sets; import com.uber.hoodie.WriteStatus; +import com.uber.hoodie.avro.model.HoodieCompactionOperation; +import com.uber.hoodie.avro.model.HoodieCompactionPlan; +import com.uber.hoodie.common.model.CompactionOperation; +import com.uber.hoodie.common.model.HoodieDataFile; import com.uber.hoodie.common.model.HoodieLogFile; import com.uber.hoodie.common.model.HoodieTableType; import com.uber.hoodie.common.model.HoodieWriteStat.RuntimeStats; @@ -29,6 +33,7 @@ import com.uber.hoodie.common.table.HoodieTableMetaClient; import com.uber.hoodie.common.table.HoodieTimeline; import com.uber.hoodie.common.table.TableFileSystemView; import com.uber.hoodie.common.table.log.HoodieMergedLogRecordScanner; +import com.uber.hoodie.common.util.CompactionUtils; import com.uber.hoodie.common.util.FSUtils; import com.uber.hoodie.common.util.HoodieAvroUtils; import com.uber.hoodie.config.HoodieWriteConfig; @@ -36,9 +41,11 @@ import com.uber.hoodie.io.compact.strategy.CompactionStrategy; import com.uber.hoodie.table.HoodieCopyOnWriteTable; import com.uber.hoodie.table.HoodieTable; import java.io.IOException; +import java.util.ArrayList; import java.util.Collection; import java.util.Iterator; import java.util.List; +import java.util.Optional; import java.util.stream.Collectors; import java.util.stream.StreamSupport; import org.apache.avro.Schema; @@ -70,26 +77,25 @@ public class HoodieRealtimeTableCompactor implements HoodieCompactor { public JavaRDD compact(JavaSparkContext jsc, HoodieWriteConfig config, HoodieTable hoodieTable, String compactionCommitTime) throws IOException { - totalLogFiles = new LongAccumulator(); - totalFileSlices = new LongAccumulator(); - jsc.sc().register(totalLogFiles); - jsc.sc().register(totalFileSlices); - - List operations = getCompactionWorkload(jsc, hoodieTable, config, + HoodieCompactionPlan compactionPlan = generateCompactionPlan(jsc, hoodieTable, config, compactionCommitTime); - if (operations == null) { + List operations = compactionPlan.getOperations(); + if ((operations == null) || (operations.isEmpty())) { return jsc.emptyRDD(); } - return executeCompaction(jsc, operations, hoodieTable, config, compactionCommitTime); + return compact(jsc, compactionPlan, hoodieTable, config, compactionCommitTime); } - private JavaRDD executeCompaction(JavaSparkContext jsc, - List operations, HoodieTable hoodieTable, HoodieWriteConfig config, + @Override + public JavaRDD compact(JavaSparkContext jsc, + HoodieCompactionPlan compactionPlan, HoodieTable hoodieTable, HoodieWriteConfig config, String compactionCommitTime) throws IOException { HoodieTableMetaClient metaClient = hoodieTable.getMetaClient(); // Compacting is very similar to applying updates to existing file HoodieCopyOnWriteTable table = new HoodieCopyOnWriteTable(config, jsc); - log.info("After filtering, Compacting " + operations + " files"); + List operations = compactionPlan.getOperations().stream() + .map(CompactionOperation::convertFromAvroRecordInstance).collect(toList()); + log.info("Compactor " + compactionPlan.getCompactorId() + " running, Compacting " + operations + " files"); return jsc.parallelize(operations, operations.size()) .map(s -> compact(table, metaClient, config, s, compactionCommitTime)) .flatMap(writeStatusesItr -> writeStatusesItr.iterator()); @@ -144,8 +150,8 @@ public class HoodieRealtimeTableCompactor implements HoodieCompactor { s.getStat().setTotalLogFilesCompacted(scanner.getTotalLogFiles()); s.getStat().setTotalLogRecords(scanner.getTotalLogRecords()); s.getStat().setPartitionPath(operation.getPartitionPath()); - s.getStat().setTotalLogSizeCompacted((long) operation.getMetrics().get( - CompactionStrategy.TOTAL_LOG_FILE_SIZE)); + s.getStat().setTotalLogSizeCompacted(operation.getMetrics().get( + CompactionStrategy.TOTAL_LOG_FILE_SIZE).longValue()); s.getStat().setTotalLogBlocks(scanner.getTotalLogBlocks()); s.getStat().setTotalCorruptLogBlock(scanner.getTotalCorruptBlocks()); s.getStat().setTotalRollbackBlocks(scanner.getTotalRollbacks()); @@ -156,10 +162,16 @@ public class HoodieRealtimeTableCompactor implements HoodieCompactor { }).collect(toList()); } - private List getCompactionWorkload(JavaSparkContext jsc, + @Override + public HoodieCompactionPlan generateCompactionPlan(JavaSparkContext jsc, HoodieTable hoodieTable, HoodieWriteConfig config, String compactionCommitTime) throws IOException { + totalLogFiles = new LongAccumulator(); + totalFileSlices = new LongAccumulator(); + jsc.sc().register(totalLogFiles); + jsc.sc().register(totalFileSlices); + Preconditions .checkArgument(hoodieTable.getMetaClient().getTableType() == HoodieTableType.MERGE_ON_READ, "HoodieRealtimeTableCompactor can only compact table of type " @@ -176,7 +188,7 @@ public class HoodieRealtimeTableCompactor implements HoodieCompactor { TableFileSystemView.RealtimeView fileSystemView = hoodieTable.getRTFileSystemView(); log.info("Compaction looking for files to compact in " + partitionPaths + " partitions"); - List operations = + List operations = jsc.parallelize(partitionPaths, partitionPaths.size()) .flatMap((FlatMapFunction) partitionPath -> fileSystemView .getLatestFileSlices(partitionPath).map( @@ -185,10 +197,16 @@ public class HoodieRealtimeTableCompactor implements HoodieCompactor { .getBaseInstantAndLogVersionComparator().reversed()).collect(Collectors.toList()); totalLogFiles.add((long) logFiles.size()); totalFileSlices.add(1L); - return new CompactionOperation(s.getDataFile(), partitionPath, logFiles, config); + // Avro generated classes are not inheriting Serializable. Using CompactionOperation POJO + // for spark Map operations and collecting them finally in Avro generated classes for storing + // into meta files. + Optional dataFile = s.getDataFile(); + return new CompactionOperation(dataFile, partitionPath, logFiles, + config.getCompactionStrategy().captureMetrics(config, dataFile, partitionPath, logFiles)); }) .filter(c -> !c.getDeltaFilePaths().isEmpty()) - .collect(toList()).iterator()).collect(); + .collect(toList()).iterator()).collect().stream().map(CompactionUtils::buildHoodieCompactionOperation) + .collect(toList()); log.info("Total of " + operations.size() + " compactions are retrieved"); log.info("Total number of latest files slices " + totalFileSlices.value()); log.info("Total number of log files " + totalLogFiles.value()); @@ -196,12 +214,13 @@ public class HoodieRealtimeTableCompactor implements HoodieCompactor { // Filter the compactions with the passed in filter. This lets us choose most effective // compactions only - operations = config.getCompactionStrategy().orderAndFilter(config, operations); - if (operations.isEmpty()) { + // TODO: In subsequent PRs, pending Compaction plans will be wired in. Strategy can look at pending compaction + // plans to schedule next compaction plan + HoodieCompactionPlan compactionPlan = config.getCompactionStrategy().generateCompactionPlan(config, operations, + new ArrayList<>()); + if (compactionPlan.getOperations().isEmpty()) { log.warn("After filtering, Nothing to compact for " + metaClient.getBasePath()); - return null; } - return operations; + return compactionPlan; } - -} \ No newline at end of file +} diff --git a/hoodie-client/src/main/java/com/uber/hoodie/io/compact/strategy/BoundedIOCompactionStrategy.java b/hoodie-client/src/main/java/com/uber/hoodie/io/compact/strategy/BoundedIOCompactionStrategy.java index 4f4cdf128..409d786da 100644 --- a/hoodie-client/src/main/java/com/uber/hoodie/io/compact/strategy/BoundedIOCompactionStrategy.java +++ b/hoodie-client/src/main/java/com/uber/hoodie/io/compact/strategy/BoundedIOCompactionStrategy.java @@ -17,8 +17,9 @@ package com.uber.hoodie.io.compact.strategy; import com.google.common.collect.Lists; +import com.uber.hoodie.avro.model.HoodieCompactionOperation; +import com.uber.hoodie.avro.model.HoodieCompactionPlan; import com.uber.hoodie.config.HoodieWriteConfig; -import com.uber.hoodie.io.compact.CompactionOperation; import java.util.List; /** @@ -30,15 +31,15 @@ import java.util.List; public class BoundedIOCompactionStrategy extends CompactionStrategy { @Override - public List orderAndFilter(HoodieWriteConfig writeConfig, - List operations) { + public List orderAndFilter(HoodieWriteConfig writeConfig, + List operations, List pendingCompactionPlans) { // Iterate through the operations in order and accept operations as long as we are within the // IO limit // Preserves the original ordering of compactions - List finalOperations = Lists.newArrayList(); + List finalOperations = Lists.newArrayList(); long targetIORemaining = writeConfig.getTargetIOPerCompactionInMB(); - for (CompactionOperation op : operations) { - long opIo = (Long) op.getMetrics().get(TOTAL_IO_MB); + for (HoodieCompactionOperation op : operations) { + long opIo = op.getMetrics().get(TOTAL_IO_MB).longValue(); targetIORemaining -= opIo; finalOperations.add(op); if (targetIORemaining <= 0) { diff --git a/hoodie-client/src/main/java/com/uber/hoodie/io/compact/strategy/CompactionStrategy.java b/hoodie-client/src/main/java/com/uber/hoodie/io/compact/strategy/CompactionStrategy.java index 2a7e65954..2b5070dba 100644 --- a/hoodie-client/src/main/java/com/uber/hoodie/io/compact/strategy/CompactionStrategy.java +++ b/hoodie-client/src/main/java/com/uber/hoodie/io/compact/strategy/CompactionStrategy.java @@ -17,11 +17,13 @@ package com.uber.hoodie.io.compact.strategy; import com.google.common.collect.Maps; +import com.uber.hoodie.avro.model.HoodieCompactionOperation; +import com.uber.hoodie.avro.model.HoodieCompactionPlan; import com.uber.hoodie.common.model.HoodieDataFile; import com.uber.hoodie.common.model.HoodieLogFile; import com.uber.hoodie.common.util.FSUtils; +import com.uber.hoodie.config.HoodieCompactionConfig; import com.uber.hoodie.config.HoodieWriteConfig; -import com.uber.hoodie.io.compact.CompactionOperation; import java.io.Serializable; import java.util.List; import java.util.Map; @@ -35,7 +37,6 @@ import java.util.Optional; * passed in every time * * @see com.uber.hoodie.io.compact.HoodieRealtimeTableCompactor - * @see CompactionOperation */ public abstract class CompactionStrategy implements Serializable { @@ -46,7 +47,7 @@ public abstract class CompactionStrategy implements Serializable { public static final String TOTAL_LOG_FILES = "TOTAL_LOG_FILES"; /** - * Callback hook when a CompactionOperation is created. Individual strategies can capture the + * Callback hook when a HoodieCompactionOperation is created. Individual strategies can capture the * metrics they need to decide on the priority. * * @param dataFile - Base file to compact @@ -54,9 +55,9 @@ public abstract class CompactionStrategy implements Serializable { * @param logFiles - List of log files to compact with the base file * @return Map[String, Object] - metrics captured */ - public Map captureMetrics(HoodieWriteConfig writeConfig, Optional dataFile, String - partitionPath, List logFiles) { - Map metrics = Maps.newHashMap(); + public Map captureMetrics(HoodieWriteConfig writeConfig, Optional dataFile, + String partitionPath, List logFiles) { + Map metrics = Maps.newHashMap(); Long defaultMaxParquetFileSize = writeConfig.getParquetMaxFileSize(); // Total size of all the log files Long totalLogFileSize = logFiles.stream().map(HoodieLogFile::getFileSize).filter(Optional::isPresent) @@ -70,25 +71,44 @@ public abstract class CompactionStrategy implements Serializable { // Total IO will the the IO for read + write Long totalIO = totalIORead + totalIOWrite; // Save these metrics and we will use during the filter - metrics.put(TOTAL_IO_READ_MB, totalIORead); - metrics.put(TOTAL_IO_WRITE_MB, totalIOWrite); - metrics.put(TOTAL_IO_MB, totalIO); - metrics.put(TOTAL_LOG_FILE_SIZE, totalLogFileSize); - metrics.put(TOTAL_LOG_FILES, logFiles.size()); + metrics.put(TOTAL_IO_READ_MB, totalIORead.doubleValue()); + metrics.put(TOTAL_IO_WRITE_MB, totalIOWrite.doubleValue()); + metrics.put(TOTAL_IO_MB, totalIO.doubleValue()); + metrics.put(TOTAL_LOG_FILE_SIZE, totalLogFileSize.doubleValue()); + metrics.put(TOTAL_LOG_FILES, Double.valueOf(logFiles.size())); return metrics; - } /** - * Order and Filter the list of compactions. Use the metrics captured with the captureMetrics to - * order and filter out compactions + * Generate Compaction plan. Allows clients to order and filter the list of compactions to be set. The default + * implementation takes care of setting compactor Id from configuration allowing subclasses to only worry about + * ordering and filtering compaction operations * - * @param writeConfig - HoodieWriteConfig - config for this compaction is passed in - * @param operations - list of compactions collected + * @param writeConfig Hoodie Write Config + * @param operations Compaction Operations to be ordered and filtered + * @param pendingCompactionPlans Pending Compaction Plans for strategy to schedule next compaction plan + * @return Compaction plan to be scheduled. + */ + public HoodieCompactionPlan generateCompactionPlan(HoodieWriteConfig writeConfig, + List operations, List pendingCompactionPlans) { + // Strategy implementation can overload this method to set specific compactor-id + return HoodieCompactionPlan.newBuilder().setCompactorId(HoodieCompactionConfig.DEFAULT_COMPACTOR_ID) + .setOperations(orderAndFilter(writeConfig, operations, pendingCompactionPlans)) + .build(); + } + + /** + * Order and Filter the list of compactions. Use the metrics captured with the captureMetrics to order and filter out + * compactions + * + * @param writeConfig config for this compaction is passed in + * @param operations list of compactions collected + * @param pendingCompactionPlans Pending Compaction Plans for strategy to schedule next compaction plan * @return list of compactions to perform in this run */ - public List orderAndFilter(HoodieWriteConfig writeConfig, - List operations) { + protected List orderAndFilter(HoodieWriteConfig writeConfig, + List operations, + List pendingCompactionPlans) { return operations; } } diff --git a/hoodie-client/src/main/java/com/uber/hoodie/io/compact/strategy/DayBasedCompactionStrategy.java b/hoodie-client/src/main/java/com/uber/hoodie/io/compact/strategy/DayBasedCompactionStrategy.java index 714dad54e..3360feb78 100644 --- a/hoodie-client/src/main/java/com/uber/hoodie/io/compact/strategy/DayBasedCompactionStrategy.java +++ b/hoodie-client/src/main/java/com/uber/hoodie/io/compact/strategy/DayBasedCompactionStrategy.java @@ -18,9 +18,10 @@ package com.uber.hoodie.io.compact.strategy; import com.google.common.annotations.VisibleForTesting; +import com.uber.hoodie.avro.model.HoodieCompactionOperation; +import com.uber.hoodie.avro.model.HoodieCompactionPlan; import com.uber.hoodie.config.HoodieWriteConfig; import com.uber.hoodie.exception.HoodieException; -import com.uber.hoodie.io.compact.CompactionOperation; import java.text.ParseException; import java.text.SimpleDateFormat; import java.util.Comparator; @@ -58,12 +59,12 @@ public class DayBasedCompactionStrategy extends CompactionStrategy { } @Override - public List orderAndFilter(HoodieWriteConfig writeConfig, - List operations) { + public List orderAndFilter(HoodieWriteConfig writeConfig, + List operations, List pendingCompactionPlans) { // Iterate through the operations and accept operations as long as we are within the configured target partitions // limit - List filteredList = operations.stream() - .collect(Collectors.groupingBy(CompactionOperation::getPartitionPath)).entrySet().stream() + List filteredList = operations.stream() + .collect(Collectors.groupingBy(HoodieCompactionOperation::getPartitionPath)).entrySet().stream() .sorted(Map.Entry.comparingByKey(comparator)).limit(writeConfig.getTargetPartitionsPerDayBasedCompaction()) .flatMap(e -> e.getValue().stream()) .collect(Collectors.toList()); diff --git a/hoodie-client/src/main/java/com/uber/hoodie/io/compact/strategy/LogFileSizeBasedCompactionStrategy.java b/hoodie-client/src/main/java/com/uber/hoodie/io/compact/strategy/LogFileSizeBasedCompactionStrategy.java index 39f66b2fe..8dd36695a 100644 --- a/hoodie-client/src/main/java/com/uber/hoodie/io/compact/strategy/LogFileSizeBasedCompactionStrategy.java +++ b/hoodie-client/src/main/java/com/uber/hoodie/io/compact/strategy/LogFileSizeBasedCompactionStrategy.java @@ -16,10 +16,11 @@ package com.uber.hoodie.io.compact.strategy; +import com.uber.hoodie.avro.model.HoodieCompactionOperation; +import com.uber.hoodie.avro.model.HoodieCompactionPlan; import com.uber.hoodie.common.model.HoodieDataFile; import com.uber.hoodie.common.model.HoodieLogFile; import com.uber.hoodie.config.HoodieWriteConfig; -import com.uber.hoodie.io.compact.CompactionOperation; import java.util.Comparator; import java.util.List; import java.util.Map; @@ -34,37 +35,37 @@ import java.util.stream.Collectors; * @see CompactionStrategy */ public class LogFileSizeBasedCompactionStrategy extends BoundedIOCompactionStrategy implements - Comparator { + Comparator { private static final String TOTAL_LOG_FILE_SIZE = "TOTAL_LOG_FILE_SIZE"; @Override - public Map captureMetrics(HoodieWriteConfig config, Optional dataFile, String - partitionPath, - List logFiles) { + public Map captureMetrics(HoodieWriteConfig config, Optional dataFile, + String partitionPath, List logFiles) { + Map metrics = super.captureMetrics(config, dataFile, partitionPath, logFiles); - Map metrics = super.captureMetrics(config, dataFile, partitionPath, logFiles); // Total size of all the log files Long totalLogFileSize = logFiles.stream().map(HoodieLogFile::getFileSize) .filter(Optional::isPresent).map(Optional::get).reduce((size1, size2) -> size1 + size2) .orElse(0L); // save the metrics needed during the order - metrics.put(TOTAL_LOG_FILE_SIZE, totalLogFileSize); + metrics.put(TOTAL_LOG_FILE_SIZE, totalLogFileSize.doubleValue()); return metrics; } @Override - public List orderAndFilter(HoodieWriteConfig writeConfig, - List operations) { + public List orderAndFilter(HoodieWriteConfig writeConfig, + List operations, List pendingCompactionPlans) { // Order the operations based on the reverse size of the logs and limit them by the IO return super - .orderAndFilter(writeConfig, operations.stream().sorted(this).collect(Collectors.toList())); + .orderAndFilter(writeConfig, + operations.stream().sorted(this).collect(Collectors.toList()), pendingCompactionPlans); } @Override - public int compare(CompactionOperation op1, CompactionOperation op2) { - Long totalLogSize1 = (Long) op1.getMetrics().get(TOTAL_LOG_FILE_SIZE); - Long totalLogSize2 = (Long) op2.getMetrics().get(TOTAL_LOG_FILE_SIZE); + public int compare(HoodieCompactionOperation op1, HoodieCompactionOperation op2) { + Long totalLogSize1 = op1.getMetrics().get(TOTAL_LOG_FILE_SIZE).longValue(); + Long totalLogSize2 = op2.getMetrics().get(TOTAL_LOG_FILE_SIZE).longValue(); // Reverse the comparison order - so that larger log file size is compacted first return totalLogSize2.compareTo(totalLogSize1); } diff --git a/hoodie-client/src/main/java/com/uber/hoodie/io/compact/strategy/UnBoundedCompactionStrategy.java b/hoodie-client/src/main/java/com/uber/hoodie/io/compact/strategy/UnBoundedCompactionStrategy.java index 3f8297f28..f8edc2ce1 100644 --- a/hoodie-client/src/main/java/com/uber/hoodie/io/compact/strategy/UnBoundedCompactionStrategy.java +++ b/hoodie-client/src/main/java/com/uber/hoodie/io/compact/strategy/UnBoundedCompactionStrategy.java @@ -16,8 +16,9 @@ package com.uber.hoodie.io.compact.strategy; +import com.uber.hoodie.avro.model.HoodieCompactionOperation; +import com.uber.hoodie.avro.model.HoodieCompactionPlan; import com.uber.hoodie.config.HoodieWriteConfig; -import com.uber.hoodie.io.compact.CompactionOperation; import java.util.List; /** @@ -30,8 +31,8 @@ import java.util.List; public class UnBoundedCompactionStrategy extends CompactionStrategy { @Override - public List orderAndFilter(HoodieWriteConfig config, - List operations) { + public List orderAndFilter(HoodieWriteConfig config, + List operations, List pendingCompactionWorkloads) { return operations; } } diff --git a/hoodie-client/src/main/java/com/uber/hoodie/table/HoodieCopyOnWriteTable.java b/hoodie-client/src/main/java/com/uber/hoodie/table/HoodieCopyOnWriteTable.java index 25ba78421..b05ba602f 100644 --- a/hoodie-client/src/main/java/com/uber/hoodie/table/HoodieCopyOnWriteTable.java +++ b/hoodie-client/src/main/java/com/uber/hoodie/table/HoodieCopyOnWriteTable.java @@ -19,6 +19,7 @@ package com.uber.hoodie.table; import com.google.common.collect.Maps; import com.google.common.hash.Hashing; import com.uber.hoodie.WriteStatus; +import com.uber.hoodie.avro.model.HoodieCompactionPlan; import com.uber.hoodie.common.HoodieCleanStat; import com.uber.hoodie.common.HoodieRollbackStat; import com.uber.hoodie.common.model.HoodieCommitMetadata; @@ -161,11 +162,22 @@ public class HoodieCopyOnWriteTable extends Hoodi return true; } + @Override + public HoodieCompactionPlan scheduleCompaction(JavaSparkContext jsc, String commitTime) { + throw new HoodieNotSupportedException("Compaction is not supported from a CopyOnWrite table"); + } + @Override public JavaRDD compact(JavaSparkContext jsc, String commitTime) { throw new HoodieNotSupportedException("Compaction is not supported from a CopyOnWrite table"); } + @Override + public JavaRDD compact(JavaSparkContext jsc, String compactionInstantTime, + HoodieCompactionPlan compactionPlan) { + throw new HoodieNotSupportedException("Compaction is not supported from a CopyOnWrite table"); + } + public Iterator> handleUpdate(String commitTime, String fileLoc, Iterator> recordItr) throws IOException { // these are updates diff --git a/hoodie-client/src/main/java/com/uber/hoodie/table/HoodieMergeOnReadTable.java b/hoodie-client/src/main/java/com/uber/hoodie/table/HoodieMergeOnReadTable.java index 74e83e92c..ef2ed7f0e 100644 --- a/hoodie-client/src/main/java/com/uber/hoodie/table/HoodieMergeOnReadTable.java +++ b/hoodie-client/src/main/java/com/uber/hoodie/table/HoodieMergeOnReadTable.java @@ -19,6 +19,7 @@ package com.uber.hoodie.table; import com.google.common.collect.Maps; import com.google.common.collect.Sets; import com.uber.hoodie.WriteStatus; +import com.uber.hoodie.avro.model.HoodieCompactionPlan; import com.uber.hoodie.common.HoodieRollbackStat; import com.uber.hoodie.common.model.FileSlice; import com.uber.hoodie.common.model.HoodieCommitMetadata; @@ -126,7 +127,7 @@ public class HoodieMergeOnReadTable extends } @Override - public JavaRDD compact(JavaSparkContext jsc, String compactionCommitTime) { + public HoodieCompactionPlan scheduleCompaction(JavaSparkContext jsc, String instantTime) { logger.info("Checking if compaction needs to be run on " + config.getBasePath()); Optional lastCompaction = getActiveTimeline().getCommitTimeline() .filterCompletedInstants().lastInstant(); @@ -141,10 +142,20 @@ public class HoodieMergeOnReadTable extends logger.info("Not running compaction as only " + deltaCommitsSinceLastCompaction + " delta commits was found since last compaction " + deltaCommitsSinceTs + ". Waiting for " + config.getInlineCompactDeltaCommitMax()); - return jsc.emptyRDD(); + return new HoodieCompactionPlan(); } logger.info("Compacting merge on read table " + config.getBasePath()); + HoodieRealtimeTableCompactor compactor = new HoodieRealtimeTableCompactor(); + try { + return compactor.generateCompactionPlan(jsc, this, config, instantTime); + } catch (IOException e) { + throw new HoodieCompactionException("Could not schedule compaction " + config.getBasePath(), e); + } + } + + @Override + public JavaRDD compact(JavaSparkContext jsc, String compactionCommitTime) { HoodieRealtimeTableCompactor compactor = new HoodieRealtimeTableCompactor(); try { return compactor.compact(jsc, config, this, compactionCommitTime); @@ -153,6 +164,17 @@ public class HoodieMergeOnReadTable extends } } + @Override + public JavaRDD compact(JavaSparkContext jsc, String compactionInstantTime, + HoodieCompactionPlan compactionPlan) { + HoodieRealtimeTableCompactor compactor = new HoodieRealtimeTableCompactor(); + try { + return compactor.compact(jsc, compactionPlan, this, config, compactionInstantTime); + } catch (IOException e) { + throw new HoodieCompactionException("Could not compact " + config.getBasePath(), e); + } + } + @Override public List rollback(JavaSparkContext jsc, List commits) throws IOException { diff --git a/hoodie-client/src/main/java/com/uber/hoodie/table/HoodieTable.java b/hoodie-client/src/main/java/com/uber/hoodie/table/HoodieTable.java index 762313318..3e83ca1a3 100644 --- a/hoodie-client/src/main/java/com/uber/hoodie/table/HoodieTable.java +++ b/hoodie-client/src/main/java/com/uber/hoodie/table/HoodieTable.java @@ -17,6 +17,7 @@ package com.uber.hoodie.table; import com.uber.hoodie.WriteStatus; +import com.uber.hoodie.avro.model.HoodieCompactionPlan; import com.uber.hoodie.avro.model.HoodieSavepointMetadata; import com.uber.hoodie.common.HoodieCleanStat; import com.uber.hoodie.common.HoodieRollbackStat; @@ -211,12 +212,32 @@ public abstract class HoodieTable implements Seri public abstract Iterator> handleInsertPartition(String commitTime, Integer partition, Iterator> recordIterator, Partitioner partitioner); + /** + * Schedule compaction for the instant time + * @param jsc Spark Context + * @param instantTime Instant Time for scheduling compaction + * @return + */ + public abstract HoodieCompactionPlan scheduleCompaction(JavaSparkContext jsc, String instantTime); + /** * Run Compaction on the table. Compaction arranges the data so that it is optimized for data * access + * @deprecated Will be replaced with newer APIs */ + @Deprecated public abstract JavaRDD compact(JavaSparkContext jsc, String commitTime); + /** + * Run Compaction on the table. Compaction arranges the data so that it is optimized for data access + * + * @param jsc Spark Context + * @param compactionInstantTime Instant Time + * @param compactionPlan Compaction Plan + */ + public abstract JavaRDD compact(JavaSparkContext jsc, String compactionInstantTime, + HoodieCompactionPlan compactionPlan); + /** * Clean partition paths according to cleaning policy and returns the number of files cleaned. */ diff --git a/hoodie-client/src/test/java/com/uber/hoodie/io/strategy/TestHoodieCompactionStrategy.java b/hoodie-client/src/test/java/com/uber/hoodie/io/strategy/TestHoodieCompactionStrategy.java index cb53fdd8d..44a3e6537 100644 --- a/hoodie-client/src/test/java/com/uber/hoodie/io/strategy/TestHoodieCompactionStrategy.java +++ b/hoodie-client/src/test/java/com/uber/hoodie/io/strategy/TestHoodieCompactionStrategy.java @@ -20,19 +20,24 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; import com.beust.jcommander.internal.Lists; +import com.google.common.collect.ImmutableMap; import com.google.common.collect.Maps; +import com.uber.hoodie.avro.model.HoodieCompactionOperation; +import com.uber.hoodie.common.model.HoodieDataFile; +import com.uber.hoodie.common.model.HoodieLogFile; import com.uber.hoodie.config.HoodieCompactionConfig; import com.uber.hoodie.config.HoodieWriteConfig; -import com.uber.hoodie.io.compact.CompactionOperation; import com.uber.hoodie.io.compact.strategy.BoundedIOCompactionStrategy; import com.uber.hoodie.io.compact.strategy.DayBasedCompactionStrategy; import com.uber.hoodie.io.compact.strategy.LogFileSizeBasedCompactionStrategy; import com.uber.hoodie.io.compact.strategy.UnBoundedCompactionStrategy; +import java.util.ArrayList; import java.util.List; import java.util.Map; import java.util.Optional; import java.util.Random; import java.util.stream.Collectors; +import org.apache.commons.lang3.tuple.Pair; import org.junit.Assert; import org.junit.Test; @@ -51,8 +56,8 @@ public class TestHoodieCompactionStrategy { UnBoundedCompactionStrategy strategy = new UnBoundedCompactionStrategy(); HoodieWriteConfig writeConfig = HoodieWriteConfig.newBuilder().withPath("/tmp").withCompactionConfig( HoodieCompactionConfig.newBuilder().withCompactionStrategy(strategy).build()).build(); - List operations = createCompactionOperations(writeConfig, sizesMap); - List returned = strategy.orderAndFilter(writeConfig, operations); + List operations = createCompactionOperations(writeConfig, sizesMap); + List returned = strategy.orderAndFilter(writeConfig, operations, new ArrayList<>()); assertEquals("UnBounded should not re-order or filter", operations, returned); } @@ -67,14 +72,14 @@ public class TestHoodieCompactionStrategy { HoodieWriteConfig writeConfig = HoodieWriteConfig.newBuilder().withPath("/tmp").withCompactionConfig( HoodieCompactionConfig.newBuilder().withCompactionStrategy(strategy).withTargetIOPerCompactionInMB(400).build()) .build(); - List operations = createCompactionOperations(writeConfig, sizesMap); - List returned = strategy.orderAndFilter(writeConfig, operations); + List operations = createCompactionOperations(writeConfig, sizesMap); + List returned = strategy.orderAndFilter(writeConfig, operations, new ArrayList<>()); assertTrue("BoundedIOCompaction should have resulted in fewer compactions", returned.size() < operations.size()); assertEquals("BoundedIOCompaction should have resulted in 2 compactions being chosen", 2, returned.size()); // Total size of all the log files Long returnedSize = returned.stream().map(s -> s.getMetrics().get(BoundedIOCompactionStrategy.TOTAL_IO_MB)) - .map(s -> (Long) s).reduce((size1, size2) -> size1 + size2).orElse(0L); + .map(s -> s.longValue()).reduce((size1, size2) -> size1 + size2).orElse(0L); assertEquals("Should chose the first 2 compactions which should result in a total IO of 690 MB", 610, (long) returnedSize); } @@ -90,15 +95,15 @@ public class TestHoodieCompactionStrategy { HoodieWriteConfig writeConfig = HoodieWriteConfig.newBuilder().withPath("/tmp").withCompactionConfig( HoodieCompactionConfig.newBuilder().withCompactionStrategy(strategy).withTargetIOPerCompactionInMB(400).build()) .build(); - List operations = createCompactionOperations(writeConfig, sizesMap); - List returned = strategy.orderAndFilter(writeConfig, operations); + List operations = createCompactionOperations(writeConfig, sizesMap); + List returned = strategy.orderAndFilter(writeConfig, operations, new ArrayList<>()); assertTrue("LogFileSizeBasedCompactionStrategy should have resulted in fewer compactions", returned.size() < operations.size()); assertEquals("LogFileSizeBasedCompactionStrategy should have resulted in 1 compaction", 1, returned.size()); // Total size of all the log files Long returnedSize = returned.stream().map(s -> s.getMetrics().get(BoundedIOCompactionStrategy.TOTAL_IO_MB)) - .map(s -> (Long) s).reduce((size1, size2) -> size1 + size2).orElse(0L); + .map(s -> s.longValue()).reduce((size1, size2) -> size1 + size2).orElse(0L); assertEquals("Should chose the first 2 compactions which should result in a total IO of 690 MB", 1204, (long) returnedSize); } @@ -110,25 +115,20 @@ public class TestHoodieCompactionStrategy { sizesMap.put(110 * MB, Lists.newArrayList()); sizesMap.put(100 * MB, Lists.newArrayList(MB)); sizesMap.put(90 * MB, Lists.newArrayList(1024 * MB)); - sizesMap.put(90 * MB, Lists.newArrayList(1024 * MB)); - sizesMap.put(90 * MB, Lists.newArrayList(1024 * MB)); - sizesMap.put(90 * MB, Lists.newArrayList(1024 * MB)); + + Map keyToPartitionMap = new ImmutableMap.Builder().put(120 * MB, partitionPaths[2]) + .put(110 * MB, partitionPaths[2]) + .put(100 * MB, partitionPaths[1]) + .put(90 * MB, partitionPaths[0]) + .build(); DayBasedCompactionStrategy strategy = new DayBasedCompactionStrategy(); HoodieWriteConfig writeConfig = HoodieWriteConfig.newBuilder().withPath("/tmp").withCompactionConfig( HoodieCompactionConfig.newBuilder().withCompactionStrategy(strategy) .withTargetPartitionsPerDayBasedCompaction(1) .build()).build(); - - List operations = Lists.newArrayList(sizesMap.size()); - int partitionPathIndex = 0; - for (Map.Entry> entry : sizesMap.entrySet()) { - operations.add(new CompactionOperation(Optional.of(TestHoodieDataFile.newDataFile(entry.getKey())), - partitionPaths[(partitionPathIndex % (partitionPaths.length - 1))], - entry.getValue().stream().map(TestHoodieLogFile::newLogFile).collect(Collectors.toList()), writeConfig)); - partitionPathIndex++; - } - List returned = strategy.orderAndFilter(writeConfig, operations); + List operations = createCompactionOperations(writeConfig, sizesMap, keyToPartitionMap); + List returned = strategy.orderAndFilter(writeConfig, operations, new ArrayList<>()); assertTrue("DayBasedCompactionStrategy should have resulted in fewer compactions", returned.size() < operations.size()); @@ -141,13 +141,28 @@ public class TestHoodieCompactionStrategy { assertTrue("DayBasedCompactionStrategy should sort partitions in descending order", comparision >= 0); } - private List createCompactionOperations(HoodieWriteConfig config, + private List createCompactionOperations(HoodieWriteConfig config, Map> sizesMap) { - List operations = Lists.newArrayList(sizesMap.size()); + Map keyToParitionMap = sizesMap.entrySet().stream().map(e -> + Pair.of(e.getKey(), partitionPaths[new Random().nextInt(partitionPaths.length - 1)])) + .collect(Collectors.toMap(Pair::getKey, Pair::getValue)); + return createCompactionOperations(config, sizesMap, keyToParitionMap); + } + + private List createCompactionOperations(HoodieWriteConfig config, + Map> sizesMap, Map keyToPartitionMap) { + List operations = Lists.newArrayList(sizesMap.size()); + sizesMap.forEach((k, v) -> { - operations.add(new CompactionOperation(Optional.of(TestHoodieDataFile.newDataFile(k)), - partitionPaths[new Random().nextInt(partitionPaths.length - 1)], - v.stream().map(TestHoodieLogFile::newLogFile).collect(Collectors.toList()), config)); + HoodieDataFile df = TestHoodieDataFile.newDataFile(k); + String partitionPath = keyToPartitionMap.get(k); + List logFiles = v.stream().map(TestHoodieLogFile::newLogFile).collect(Collectors.toList()); + operations.add(new HoodieCompactionOperation(df.getCommitTime(), + logFiles.stream().map(s -> s.getPath().toString()).collect(Collectors.toList()), + df.getPath(), + df.getFileId(), + partitionPath, + config.getCompactionStrategy().captureMetrics(config, Optional.of(df), partitionPath, logFiles))); }); return operations; } diff --git a/hoodie-common/src/main/avro/HoodieCompactionOperation.avsc b/hoodie-common/src/main/avro/HoodieCompactionOperation.avsc new file mode 100644 index 000000000..8400fa53a --- /dev/null +++ b/hoodie-common/src/main/avro/HoodieCompactionOperation.avsc @@ -0,0 +1,66 @@ +{ + "namespace":"com.uber.hoodie.avro.model", + "type":"record", + "name":"HoodieCompactionPlan", + "fields":[ + { + "name":"compactorId", + "type":["null","string"] + }, + { + "name":"operations", + "type":["null", { + "type":"array", + "items":{ + "name":"HoodieCompactionOperation", + "type":"record", + "fields":[ + { + "name":"baseInstantTime", + "type":["null","string"] + }, + { + "name":"deltaFilePaths", + "type":["null", { + "type":"array", + "items":"string" + }], + "default": null + }, + { + "name":"dataFilePath", + "type":["null","string"], + "default": null + }, + { + "name":"fileId", + "type":["null","string"] + }, + { + "name":"partitionPath", + "type":["null","string"], + "default": null + }, + { + "name":"metrics", + "type":["null", { + "type":"map", + "values":"double" + }], + "default": null + } + ] + } + }], + "default": null + }, + { + "name":"extraMetadata", + "type":["null", { + "type":"map", + "values":"string" + }], + "default": null + } + ] +} diff --git a/hoodie-client/src/main/java/com/uber/hoodie/io/compact/CompactionOperation.java b/hoodie-common/src/main/java/com/uber/hoodie/common/model/CompactionOperation.java similarity index 65% rename from hoodie-client/src/main/java/com/uber/hoodie/io/compact/CompactionOperation.java rename to hoodie-common/src/main/java/com/uber/hoodie/common/model/CompactionOperation.java index 3d5ffc40c..9f23c4a12 100644 --- a/hoodie-client/src/main/java/com/uber/hoodie/io/compact/CompactionOperation.java +++ b/hoodie-common/src/main/java/com/uber/hoodie/common/model/CompactionOperation.java @@ -14,14 +14,13 @@ * limitations under the License. */ -package com.uber.hoodie.io.compact; +package com.uber.hoodie.common.model; -import com.uber.hoodie.common.model.HoodieDataFile; -import com.uber.hoodie.common.model.HoodieLogFile; +import com.uber.hoodie.avro.model.HoodieCompactionOperation; import com.uber.hoodie.common.util.FSUtils; -import com.uber.hoodie.config.HoodieWriteConfig; -import com.uber.hoodie.io.compact.strategy.CompactionStrategy; import java.io.Serializable; +import java.util.ArrayList; +import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Optional; @@ -31,17 +30,16 @@ import java.util.stream.Collectors; * Encapsulates all the needed information about a compaction and make a decision whether this * compaction is effective or not * - * @see CompactionStrategy */ public class CompactionOperation implements Serializable { + private String baseInstantTime; private Optional dataFileCommitTime; - private Optional dataFileSize; private List deltaFilePaths; private Optional dataFilePath; private String fileId; private String partitionPath; - private Map metrics; + private Map metrics; //Only for serialization/de-serialization @Deprecated @@ -49,34 +47,34 @@ public class CompactionOperation implements Serializable { } public CompactionOperation(Optional dataFile, String partitionPath, - List logFiles, HoodieWriteConfig writeConfig) { + List logFiles, Map metrics) { if (dataFile.isPresent()) { + this.baseInstantTime = dataFile.get().getCommitTime(); this.dataFilePath = Optional.of(dataFile.get().getPath()); this.fileId = dataFile.get().getFileId(); this.dataFileCommitTime = Optional.of(dataFile.get().getCommitTime()); - this.dataFileSize = Optional.of(dataFile.get().getFileSize()); } else { assert logFiles.size() > 0; this.dataFilePath = Optional.empty(); + this.baseInstantTime = FSUtils.getBaseCommitTimeFromLogPath(logFiles.get(0).getPath()); this.fileId = FSUtils.getFileIdFromLogPath(logFiles.get(0).getPath()); this.dataFileCommitTime = Optional.empty(); - this.dataFileSize = Optional.empty(); } + this.partitionPath = partitionPath; this.deltaFilePaths = logFiles.stream().map(s -> s.getPath().toString()) .collect(Collectors.toList()); - this.metrics = writeConfig.getCompactionStrategy() - .captureMetrics(writeConfig, dataFile, partitionPath, logFiles); + this.metrics = metrics; + } + + public String getBaseInstantTime() { + return baseInstantTime; } public Optional getDataFileCommitTime() { return dataFileCommitTime; } - public Optional getDataFileSize() { - return dataFileSize; - } - public List getDeltaFilePaths() { return deltaFilePaths; } @@ -93,7 +91,23 @@ public class CompactionOperation implements Serializable { return partitionPath; } - public Map getMetrics() { + public Map getMetrics() { return metrics; } + + /** + * Convert Avro generated Compaction operation to POJO for Spark RDD operation + * @param operation Hoodie Compaction Operation + * @return + */ + public static CompactionOperation convertFromAvroRecordInstance(HoodieCompactionOperation operation) { + CompactionOperation op = new CompactionOperation(); + op.baseInstantTime = operation.getBaseInstantTime(); + op.dataFilePath = Optional.ofNullable(operation.getDataFilePath()); + op.deltaFilePaths = new ArrayList<>(operation.getDeltaFilePaths()); + op.fileId = operation.getFileId(); + op.metrics = new HashMap<>(operation.getMetrics()); + op.partitionPath = operation.getPartitionPath(); + return op; + } } diff --git a/hoodie-common/src/main/java/com/uber/hoodie/common/util/AvroUtils.java b/hoodie-common/src/main/java/com/uber/hoodie/common/util/AvroUtils.java index 633960d0a..a28857ff3 100644 --- a/hoodie-common/src/main/java/com/uber/hoodie/common/util/AvroUtils.java +++ b/hoodie-common/src/main/java/com/uber/hoodie/common/util/AvroUtils.java @@ -21,6 +21,7 @@ import com.google.common.collect.ImmutableMap; import com.google.common.collect.Lists; import com.uber.hoodie.avro.model.HoodieCleanMetadata; import com.uber.hoodie.avro.model.HoodieCleanPartitionMetadata; +import com.uber.hoodie.avro.model.HoodieCompactionPlan; import com.uber.hoodie.avro.model.HoodieRollbackMetadata; import com.uber.hoodie.avro.model.HoodieRollbackPartitionMetadata; import com.uber.hoodie.avro.model.HoodieSavepointMetadata; @@ -142,6 +143,10 @@ public class AvroUtils { partitionMetadataBuilder.build()); } + public static Optional serializeCompactionWorkload(HoodieCompactionPlan compactionWorkload) + throws IOException { + return serializeAvroMetadata(compactionWorkload, HoodieCompactionPlan.class); + } public static Optional serializeCleanMetadata(HoodieCleanMetadata metadata) throws IOException { @@ -169,6 +174,11 @@ public class AvroUtils { return Optional.of(baos.toByteArray()); } + public static HoodieCompactionPlan deserializeHoodieCompactionPlan(byte[] bytes) + throws IOException { + return deserializeAvroMetadata(bytes, HoodieCompactionPlan.class); + } + public static HoodieCleanMetadata deserializeHoodieCleanMetadata(byte[] bytes) throws IOException { return deserializeAvroMetadata(bytes, HoodieCleanMetadata.class); @@ -188,5 +198,4 @@ public class AvroUtils { .checkArgument(fileReader.hasNext(), "Could not deserialize metadata of type " + clazz); return fileReader.next(); } - } diff --git a/hoodie-common/src/main/java/com/uber/hoodie/common/util/CompactionUtils.java b/hoodie-common/src/main/java/com/uber/hoodie/common/util/CompactionUtils.java new file mode 100644 index 000000000..45091da85 --- /dev/null +++ b/hoodie-common/src/main/java/com/uber/hoodie/common/util/CompactionUtils.java @@ -0,0 +1,100 @@ +/* + * Copyright (c) 2018 Uber Technologies, Inc. (hoodie-dev-group@uber.com) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.uber.hoodie.common.util; + +import com.uber.hoodie.avro.model.HoodieCompactionOperation; +import com.uber.hoodie.avro.model.HoodieCompactionPlan; +import com.uber.hoodie.common.model.CompactionOperation; +import com.uber.hoodie.common.model.FileSlice; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.function.Function; +import java.util.stream.Collectors; +import javafx.util.Pair; + +/** + * Helper class to generate compaction workload from FileGroup/FileSlice abstraction + */ +public class CompactionUtils { + + /** + * Generate compaction operation from file-slice + * + * @param partitionPath Partition path + * @param fileSlice File Slice + * @param metricsCaptureFunction Metrics Capture function + * @return Compaction Operation + */ + public static HoodieCompactionOperation buildFromFileSlice(String partitionPath, FileSlice fileSlice, + Optional, Map>> metricsCaptureFunction) { + HoodieCompactionOperation.Builder builder = HoodieCompactionOperation.newBuilder(); + builder.setPartitionPath(partitionPath); + builder.setFileId(fileSlice.getFileId()); + builder.setBaseInstantTime(fileSlice.getBaseInstantTime()); + builder.setDeltaFilePaths(fileSlice.getLogFiles().map(lf -> lf.getPath().toString()).collect(Collectors.toList())); + if (fileSlice.getDataFile().isPresent()) { + builder.setDataFilePath(fileSlice.getDataFile().get().getPath()); + } + + if (metricsCaptureFunction.isPresent()) { + builder.setMetrics(metricsCaptureFunction.get().apply(new Pair(partitionPath, fileSlice))); + } + return builder.build(); + } + + /** + * Generate compaction workload from file-slices + * + * @param compactorId Compactor Id to set + * @param partitionFileSlicePairs list of partition file-slice pairs + * @param extraMetadata Extra Metadata + * @param metricsCaptureFunction Metrics Capture function + */ + public static HoodieCompactionPlan buildFromFileSlices(String compactorId, + List> partitionFileSlicePairs, + Optional> extraMetadata, + Optional, Map>> metricsCaptureFunction) { + HoodieCompactionPlan.Builder builder = HoodieCompactionPlan.newBuilder(); + builder.setCompactorId(compactorId); + extraMetadata.ifPresent(m -> builder.setExtraMetadata(m)); + builder.setOperations(partitionFileSlicePairs.stream().map(pfPair -> + buildFromFileSlice(pfPair.getKey(), pfPair.getValue(), metricsCaptureFunction)).collect(Collectors.toList())); + return builder.build(); + } + + /** + * Build Avro generated Compaction operation payload from compaction operation POJO for serialization + */ + public static HoodieCompactionOperation buildHoodieCompactionOperation(CompactionOperation op) { + return HoodieCompactionOperation.newBuilder().setFileId(op.getFileId()) + .setBaseInstantTime(op.getBaseInstantTime()) + .setPartitionPath(op.getPartitionPath()) + .setDataFilePath(op.getDataFilePath().isPresent() ? op.getDataFilePath().get() : null) + .setDeltaFilePaths(op.getDeltaFilePaths()) + .setMetrics(op.getMetrics()).build(); + } + + /** + * Build Compaction operation payload from Avro version for using in Spark executors + * + * @param hc HoodieCompactionOperation + */ + public static CompactionOperation buildCompactionOperation(HoodieCompactionOperation hc) { + return CompactionOperation.convertFromAvroRecordInstance(hc); + } +}