diff --git a/hoodie-client/src/main/java/com/uber/hoodie/config/HoodieCompactionConfig.java b/hoodie-client/src/main/java/com/uber/hoodie/config/HoodieCompactionConfig.java index 97c5e5c68..d1584aac0 100644 --- a/hoodie-client/src/main/java/com/uber/hoodie/config/HoodieCompactionConfig.java +++ b/hoodie-client/src/main/java/com/uber/hoodie/config/HoodieCompactionConfig.java @@ -108,7 +108,6 @@ public class HoodieCompactionConfig extends DefaultHoodieConfig { + ".partitions"; // 500GB of target IO per compaction (both read and write) public static final String DEFAULT_TARGET_PARTITIONS_PER_DAYBASED_COMPACTION = String.valueOf(10); - public static final String DEFAULT_COMPACTOR_ID = "default"; private HoodieCompactionConfig(Properties props) { super(props); diff --git a/hoodie-client/src/main/java/com/uber/hoodie/io/compact/HoodieRealtimeTableCompactor.java b/hoodie-client/src/main/java/com/uber/hoodie/io/compact/HoodieRealtimeTableCompactor.java index b314e8eb3..90a24b6d9 100644 --- a/hoodie-client/src/main/java/com/uber/hoodie/io/compact/HoodieRealtimeTableCompactor.java +++ b/hoodie-client/src/main/java/com/uber/hoodie/io/compact/HoodieRealtimeTableCompactor.java @@ -41,7 +41,6 @@ import com.uber.hoodie.io.compact.strategy.CompactionStrategy; import com.uber.hoodie.table.HoodieCopyOnWriteTable; import com.uber.hoodie.table.HoodieTable; import java.io.IOException; -import java.util.ArrayList; import java.util.Collection; import java.util.Iterator; import java.util.List; @@ -49,6 +48,7 @@ import java.util.Optional; import java.util.stream.Collectors; import java.util.stream.StreamSupport; import org.apache.avro.Schema; +import org.apache.commons.lang3.tuple.Pair; import org.apache.hadoop.fs.FileSystem; import org.apache.log4j.LogManager; import org.apache.log4j.Logger; @@ -95,7 +95,7 @@ public class HoodieRealtimeTableCompactor implements HoodieCompactor { HoodieCopyOnWriteTable table = new HoodieCopyOnWriteTable(config, jsc); List operations = compactionPlan.getOperations().stream() .map(CompactionOperation::convertFromAvroRecordInstance).collect(toList()); - log.info("Compactor " + compactionPlan.getCompactorId() + " running, Compacting " + operations + " files"); + log.info("Compactor compacting " + operations + " files"); return jsc.parallelize(operations, operations.size()) .map(s -> compact(table, metaClient, config, s, compactionCommitTime)) .flatMap(writeStatusesItr -> writeStatusesItr.iterator()); @@ -217,7 +217,7 @@ public class HoodieRealtimeTableCompactor implements HoodieCompactor { // TODO: In subsequent PRs, pending Compaction plans will be wired in. Strategy can look at pending compaction // plans to schedule next compaction plan HoodieCompactionPlan compactionPlan = config.getCompactionStrategy().generateCompactionPlan(config, operations, - new ArrayList<>()); + CompactionUtils.getAllPendingCompactionPlans(metaClient).stream().map(Pair::getValue).collect(toList())); if (compactionPlan.getOperations().isEmpty()) { log.warn("After filtering, Nothing to compact for " + metaClient.getBasePath()); } diff --git a/hoodie-client/src/main/java/com/uber/hoodie/io/compact/strategy/CompactionStrategy.java b/hoodie-client/src/main/java/com/uber/hoodie/io/compact/strategy/CompactionStrategy.java index 2b5070dba..fa5db2527 100644 --- a/hoodie-client/src/main/java/com/uber/hoodie/io/compact/strategy/CompactionStrategy.java +++ b/hoodie-client/src/main/java/com/uber/hoodie/io/compact/strategy/CompactionStrategy.java @@ -22,7 +22,6 @@ import com.uber.hoodie.avro.model.HoodieCompactionPlan; import com.uber.hoodie.common.model.HoodieDataFile; import com.uber.hoodie.common.model.HoodieLogFile; import com.uber.hoodie.common.util.FSUtils; -import com.uber.hoodie.config.HoodieCompactionConfig; import com.uber.hoodie.config.HoodieWriteConfig; import java.io.Serializable; import java.util.List; @@ -92,7 +91,7 @@ public abstract class CompactionStrategy implements Serializable { public HoodieCompactionPlan generateCompactionPlan(HoodieWriteConfig writeConfig, List operations, List pendingCompactionPlans) { // Strategy implementation can overload this method to set specific compactor-id - return HoodieCompactionPlan.newBuilder().setCompactorId(HoodieCompactionConfig.DEFAULT_COMPACTOR_ID) + return HoodieCompactionPlan.newBuilder() .setOperations(orderAndFilter(writeConfig, operations, pendingCompactionPlans)) .build(); } diff --git a/hoodie-common/src/main/avro/HoodieCompactionOperation.avsc b/hoodie-common/src/main/avro/HoodieCompactionOperation.avsc index 8400fa53a..450b89898 100644 --- a/hoodie-common/src/main/avro/HoodieCompactionOperation.avsc +++ b/hoodie-common/src/main/avro/HoodieCompactionOperation.avsc @@ -3,11 +3,7 @@ "type":"record", "name":"HoodieCompactionPlan", "fields":[ - { - "name":"compactorId", - "type":["null","string"] - }, - { + { "name":"operations", "type":["null", { "type":"array", diff --git a/hoodie-common/src/main/java/com/uber/hoodie/common/table/view/HoodieTableFileSystemView.java b/hoodie-common/src/main/java/com/uber/hoodie/common/table/view/HoodieTableFileSystemView.java index fe6e326a0..e1ef48a35 100644 --- a/hoodie-common/src/main/java/com/uber/hoodie/common/table/view/HoodieTableFileSystemView.java +++ b/hoodie-common/src/main/java/com/uber/hoodie/common/table/view/HoodieTableFileSystemView.java @@ -16,6 +16,7 @@ package com.uber.hoodie.common.table.view; +import com.google.common.collect.ImmutableMap; import com.uber.hoodie.common.model.FileSlice; import com.uber.hoodie.common.model.HoodieDataFile; import com.uber.hoodie.common.model.HoodieFileGroup; @@ -23,6 +24,8 @@ import com.uber.hoodie.common.model.HoodieLogFile; import com.uber.hoodie.common.table.HoodieTableMetaClient; import com.uber.hoodie.common.table.HoodieTimeline; import com.uber.hoodie.common.table.TableFileSystemView; +import com.uber.hoodie.common.table.timeline.HoodieInstant; +import com.uber.hoodie.common.util.CompactionUtils; import com.uber.hoodie.common.util.FSUtils; import com.uber.hoodie.exception.HoodieIOException; import java.io.IOException; @@ -41,6 +44,8 @@ import java.util.stream.Stream; import org.apache.commons.lang3.tuple.Pair; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.Path; +import org.apache.log4j.LogManager; +import org.apache.log4j.Logger; /** * Common abstract implementation for multiple TableFileSystemView Implementations. 2 possible @@ -55,6 +60,8 @@ public class HoodieTableFileSystemView implements TableFileSystemView, TableFileSystemView.ReadOptimizedView, TableFileSystemView.RealtimeView, Serializable { + private static Logger log = LogManager.getLogger(HoodieTableFileSystemView.class); + protected HoodieTableMetaClient metaClient; // This is the commits that will be visible for all views extending this view protected HoodieTimeline visibleActiveTimeline; @@ -78,10 +85,15 @@ public class HoodieTableFileSystemView implements TableFileSystemView, this.visibleActiveTimeline = visibleActiveTimeline; this.fileGroupMap = new HashMap<>(); this.partitionToFileGroupsMap = new HashMap<>(); - //TODO: vb Will be implemented in next PR - this.fileIdToPendingCompactionInstantTime = new HashMap<>(); - } + // Build fileId to Pending Compaction Instants + List pendingCompactionInstants = + metaClient.getActiveTimeline().filterPendingCompactionTimeline().getInstants().collect(Collectors.toList()); + this.fileIdToPendingCompactionInstantTime = ImmutableMap.copyOf( + CompactionUtils.getAllPendingCompactionOperations(metaClient).entrySet().stream().map(entry -> { + return Pair.of(entry.getKey(), entry.getValue().getKey()); + }).collect(Collectors.toMap(Pair::getKey, Pair::getValue))); + } /** * Create a file system view, as of the given timeline, with the provided file statuses. @@ -404,15 +416,4 @@ public class HoodieTableFileSystemView implements TableFileSystemView, "Failed to list data files in partition " + partitionPathStr, e); } } - - /** - * Used by tests to add pending compaction entries TODO: This method is temporary and should go away in subsequent - * Async Compaction PR - * - * @param fileId File Id - * @param compactionInstantTime Compaction Instant Time - */ - protected void addPendingCompactionFileId(String fileId, String compactionInstantTime) { - fileIdToPendingCompactionInstantTime.put(fileId, compactionInstantTime); - } } diff --git a/hoodie-common/src/main/java/com/uber/hoodie/common/util/AvroUtils.java b/hoodie-common/src/main/java/com/uber/hoodie/common/util/AvroUtils.java index a28857ff3..1cde27f31 100644 --- a/hoodie-common/src/main/java/com/uber/hoodie/common/util/AvroUtils.java +++ b/hoodie-common/src/main/java/com/uber/hoodie/common/util/AvroUtils.java @@ -174,7 +174,7 @@ public class AvroUtils { return Optional.of(baos.toByteArray()); } - public static HoodieCompactionPlan deserializeHoodieCompactionPlan(byte[] bytes) + public static HoodieCompactionPlan deserializeCompactionPlan(byte[] bytes) throws IOException { return deserializeAvroMetadata(bytes, HoodieCompactionPlan.class); } diff --git a/hoodie-common/src/main/java/com/uber/hoodie/common/util/CompactionUtils.java b/hoodie-common/src/main/java/com/uber/hoodie/common/util/CompactionUtils.java index 45091da85..cadc9d7bd 100644 --- a/hoodie-common/src/main/java/com/uber/hoodie/common/util/CompactionUtils.java +++ b/hoodie-common/src/main/java/com/uber/hoodie/common/util/CompactionUtils.java @@ -20,15 +20,21 @@ import com.uber.hoodie.avro.model.HoodieCompactionOperation; import com.uber.hoodie.avro.model.HoodieCompactionPlan; import com.uber.hoodie.common.model.CompactionOperation; import com.uber.hoodie.common.model.FileSlice; +import com.uber.hoodie.common.table.HoodieTableMetaClient; +import com.uber.hoodie.common.table.timeline.HoodieInstant; +import com.uber.hoodie.exception.HoodieException; +import java.io.IOException; +import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Optional; import java.util.function.Function; import java.util.stream.Collectors; -import javafx.util.Pair; +import java.util.stream.Stream; +import org.apache.commons.lang3.tuple.Pair; /** - * Helper class to generate compaction workload from FileGroup/FileSlice abstraction + * Helper class to generate compaction plan from FileGroup/FileSlice abstraction */ public class CompactionUtils { @@ -41,7 +47,7 @@ public class CompactionUtils { * @return Compaction Operation */ public static HoodieCompactionOperation buildFromFileSlice(String partitionPath, FileSlice fileSlice, - Optional, Map>> metricsCaptureFunction) { + Optional, Map>> metricsCaptureFunction) { HoodieCompactionOperation.Builder builder = HoodieCompactionOperation.newBuilder(); builder.setPartitionPath(partitionPath); builder.setFileId(fileSlice.getFileId()); @@ -52,25 +58,23 @@ public class CompactionUtils { } if (metricsCaptureFunction.isPresent()) { - builder.setMetrics(metricsCaptureFunction.get().apply(new Pair(partitionPath, fileSlice))); + builder.setMetrics(metricsCaptureFunction.get().apply(Pair.of(partitionPath, fileSlice))); } return builder.build(); } /** - * Generate compaction workload from file-slices + * Generate compaction plan from file-slices * - * @param compactorId Compactor Id to set * @param partitionFileSlicePairs list of partition file-slice pairs * @param extraMetadata Extra Metadata * @param metricsCaptureFunction Metrics Capture function */ - public static HoodieCompactionPlan buildFromFileSlices(String compactorId, + public static HoodieCompactionPlan buildFromFileSlices( List> partitionFileSlicePairs, Optional> extraMetadata, - Optional, Map>> metricsCaptureFunction) { + Optional, Map>> metricsCaptureFunction) { HoodieCompactionPlan.Builder builder = HoodieCompactionPlan.newBuilder(); - builder.setCompactorId(compactorId); extraMetadata.ifPresent(m -> builder.setExtraMetadata(m)); builder.setOperations(partitionFileSlicePairs.stream().map(pfPair -> buildFromFileSlice(pfPair.getKey(), pfPair.getValue(), metricsCaptureFunction)).collect(Collectors.toList())); @@ -97,4 +101,58 @@ public class CompactionUtils { public static CompactionOperation buildCompactionOperation(HoodieCompactionOperation hc) { return CompactionOperation.convertFromAvroRecordInstance(hc); } + + /** + * Get all pending compaction plans along with their instants + * + * @param metaClient Hoodie Meta Client + */ + public static List> getAllPendingCompactionPlans( + HoodieTableMetaClient metaClient) { + List pendingCompactionInstants = + metaClient.getActiveTimeline().filterPendingCompactionTimeline().getInstants().collect(Collectors.toList()); + return pendingCompactionInstants.stream().map(instant -> { + try { + HoodieCompactionPlan compactionPlan = AvroUtils.deserializeCompactionPlan( + metaClient.getActiveTimeline().getInstantDetails(instant).get()); + return Pair.of(instant, compactionPlan); + } catch (IOException e) { + throw new HoodieException(e); + } + }).collect(Collectors.toList()); + } + + /** + * Get all file-ids with pending Compaction operations and their target compaction instant time + * + * @param metaClient Hoodie Table Meta Client + */ + public static Map> getAllPendingCompactionOperations( + HoodieTableMetaClient metaClient) { + List> pendingCompactionPlanWithInstants = + getAllPendingCompactionPlans(metaClient); + + Map> fileIdToPendingCompactionWithInstantMap = new HashMap<>(); + pendingCompactionPlanWithInstants.stream().flatMap(instantPlanPair -> { + HoodieInstant instant = instantPlanPair.getKey(); + HoodieCompactionPlan compactionPlan = instantPlanPair.getValue(); + List ops = compactionPlan.getOperations(); + if (null != ops) { + return ops.stream().map(op -> { + return Pair.of(op.getFileId(), Pair.of(instant.getTimestamp(), op)); + }); + } else { + return Stream.empty(); + } + }).forEach(pair -> { + // Defensive check to ensure a single-fileId does not have more than one pending compaction + if (fileIdToPendingCompactionWithInstantMap.containsKey(pair.getKey())) { + String msg = "Hoodie File Id (" + pair.getKey() + ") has more thant 1 pending compactions. Instants: " + + pair.getValue() + ", " + fileIdToPendingCompactionWithInstantMap.get(pair.getKey()); + throw new IllegalStateException(msg); + } + fileIdToPendingCompactionWithInstantMap.put(pair.getKey(), pair.getValue()); + }); + return fileIdToPendingCompactionWithInstantMap; + } } diff --git a/hoodie-common/src/test/java/com/uber/hoodie/common/table/view/HoodieTableFileSystemViewTest.java b/hoodie-common/src/test/java/com/uber/hoodie/common/table/view/HoodieTableFileSystemViewTest.java index 757a3caae..713b6b802 100644 --- a/hoodie-common/src/test/java/com/uber/hoodie/common/table/view/HoodieTableFileSystemViewTest.java +++ b/hoodie-common/src/test/java/com/uber/hoodie/common/table/view/HoodieTableFileSystemViewTest.java @@ -22,6 +22,7 @@ import static org.junit.Assert.assertTrue; import com.google.common.collect.Lists; import com.google.common.collect.Sets; +import com.uber.hoodie.avro.model.HoodieCompactionPlan; import com.uber.hoodie.common.model.FileSlice; import com.uber.hoodie.common.model.HoodieDataFile; import com.uber.hoodie.common.model.HoodieFileGroup; @@ -33,9 +34,12 @@ import com.uber.hoodie.common.table.TableFileSystemView; import com.uber.hoodie.common.table.timeline.HoodieActiveTimeline; import com.uber.hoodie.common.table.timeline.HoodieInstant; import com.uber.hoodie.common.table.timeline.HoodieInstant.State; +import com.uber.hoodie.common.util.AvroUtils; +import com.uber.hoodie.common.util.CompactionUtils; import com.uber.hoodie.common.util.FSUtils; import java.io.File; import java.io.IOException; +import java.util.ArrayList; import java.util.Arrays; import java.util.List; import java.util.Map; @@ -44,6 +48,7 @@ import java.util.Set; import java.util.UUID; import java.util.stream.Collectors; import java.util.stream.Stream; +import org.apache.commons.lang3.tuple.Pair; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.Path; import org.junit.Before; @@ -234,19 +239,26 @@ public class HoodieTableFileSystemViewTest { commitTimeline.saveAsComplete(deltaInstant2, Optional.empty()); commitTimeline.saveAsComplete(deltaInstant3, Optional.empty()); - // Fake delta-ingestion after compaction-requested + refreshFsView(null); + List fileSlices = rtView.getLatestFileSlices(partitionPath).collect(Collectors.toList()); String compactionRequestedTime = "4"; String compactDataFileName = FSUtils.makeDataFileName(compactionRequestedTime, 1, fileId); + List> partitionFileSlicesPairs = new ArrayList<>(); + partitionFileSlicesPairs.add(Pair.of(partitionPath, fileSlices.get(0))); + HoodieCompactionPlan compactionPlan = CompactionUtils.buildFromFileSlices(partitionFileSlicesPairs, + Optional.empty(), Optional.empty()); HoodieInstant compactionInstant = null; if (isCompactionInFlight) { // Create a Data-file but this should be skipped by view new File(basePath + "/" + partitionPath + "/" + compactDataFileName).createNewFile(); compactionInstant = new HoodieInstant(State.INFLIGHT, HoodieTimeline.COMPACTION_ACTION, compactionRequestedTime); - commitTimeline.saveToInflight(compactionInstant, Optional.empty()); + commitTimeline.saveToInflight(compactionInstant, AvroUtils.serializeCompactionWorkload(compactionPlan)); } else { compactionInstant = new HoodieInstant(State.REQUESTED, HoodieTimeline.COMPACTION_ACTION, compactionRequestedTime); - commitTimeline.saveToRequested(compactionInstant, Optional.empty()); + commitTimeline.saveToRequested(compactionInstant, AvroUtils.serializeCompactionWorkload(compactionPlan)); } + + // Fake delta-ingestion after compaction-requested String deltaInstantTime4 = "5"; String deltaInstantTime5 = "6"; List allInstantTimes = Arrays.asList(instantTime1, deltaInstantTime1, deltaInstantTime2, @@ -260,7 +272,6 @@ public class HoodieTableFileSystemViewTest { commitTimeline.saveAsComplete(deltaInstant4, Optional.empty()); commitTimeline.saveAsComplete(deltaInstant5, Optional.empty()); refreshFsView(null); - fsView.addPendingCompactionFileId(fileId, compactionRequestedTime); List dataFiles = roView.getAllDataFiles(partitionPath).collect(Collectors.toList()); if (skipCreatingDataFile) { @@ -367,7 +378,6 @@ public class HoodieTableFileSystemViewTest { commitTimeline.saveToInflight(new HoodieInstant(State.INFLIGHT, HoodieTimeline.DELTA_COMMIT_ACTION, inflightDeltaInstantTime), Optional.empty()); refreshFsView(null); - fsView.addPendingCompactionFileId(fileId, compactionRequestedTime); List allRawFileSlices = getAllRawFileSlices(partitionPath).collect(Collectors.toList()); dataFiles = allRawFileSlices.stream().flatMap(slice -> { diff --git a/hoodie-common/src/test/java/com/uber/hoodie/common/util/TestCompactionUtils.java b/hoodie-common/src/test/java/com/uber/hoodie/common/util/TestCompactionUtils.java new file mode 100644 index 000000000..9faaca3e5 --- /dev/null +++ b/hoodie-common/src/test/java/com/uber/hoodie/common/util/TestCompactionUtils.java @@ -0,0 +1,345 @@ +/* + * Copyright (c) 2016 Uber Technologies, Inc. (hoodie-dev-group@uber.com) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.uber.hoodie.common.util; + +import static com.uber.hoodie.common.model.HoodieTestUtils.DEFAULT_PARTITION_PATHS; +import static com.uber.hoodie.common.table.HoodieTimeline.COMPACTION_ACTION; + +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; +import com.uber.hoodie.avro.model.HoodieCompactionOperation; +import com.uber.hoodie.avro.model.HoodieCompactionPlan; +import com.uber.hoodie.common.model.FileSlice; +import com.uber.hoodie.common.model.HoodieDataFile; +import com.uber.hoodie.common.model.HoodieLogFile; +import com.uber.hoodie.common.model.HoodieTestUtils; +import com.uber.hoodie.common.table.HoodieTableMetaClient; +import com.uber.hoodie.common.table.timeline.HoodieInstant; +import com.uber.hoodie.common.table.timeline.HoodieInstant.State; +import com.uber.hoodie.exception.HoodieIOException; +import java.io.IOException; +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.UUID; +import java.util.function.Function; +import java.util.stream.Collectors; +import java.util.stream.IntStream; +import java.util.stream.Stream; +import org.apache.commons.lang3.tuple.Pair; +import org.apache.hadoop.fs.Path; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; + +public class TestCompactionUtils { + + private static final Map metrics = + new ImmutableMap.Builder() + .put("key1", 1.0) + .put("key2", 3.0).build(); + @Rule + public TemporaryFolder tmpFolder = new TemporaryFolder(); + private HoodieTableMetaClient metaClient; + private String basePath; + private Function, Map> metricsCaptureFn = (partitionFileSlice) -> metrics; + + @Before + public void init() throws IOException { + metaClient = HoodieTestUtils.init(tmpFolder.getRoot().getAbsolutePath()); + basePath = metaClient.getBasePath(); + } + + @Test + public void testBuildFromFileSlice() { + // Empty File-Slice with no data and log files + FileSlice emptyFileSlice = new FileSlice("000", "empty1"); + HoodieCompactionOperation op = CompactionUtils.buildFromFileSlice( + DEFAULT_PARTITION_PATHS[0], emptyFileSlice, Optional.of(metricsCaptureFn)); + testFileSliceCompactionOpEquality(emptyFileSlice, op, DEFAULT_PARTITION_PATHS[0]); + + // File Slice with data-file but no log files + FileSlice noLogFileSlice = new FileSlice("000", "noLog1"); + noLogFileSlice.setDataFile(new TestHoodieDataFile("/tmp/noLog.parquet")); + op = CompactionUtils.buildFromFileSlice( + DEFAULT_PARTITION_PATHS[0], noLogFileSlice, Optional.of(metricsCaptureFn)); + testFileSliceCompactionOpEquality(noLogFileSlice, op, DEFAULT_PARTITION_PATHS[0]); + + //File Slice with no data-file but log files present + FileSlice noDataFileSlice = new FileSlice("000", "noData1"); + noDataFileSlice.addLogFile(new HoodieLogFile(new Path( + FSUtils.makeLogFileName("noData1", ".log", "000", 1)))); + noDataFileSlice.addLogFile(new HoodieLogFile(new Path( + FSUtils.makeLogFileName("noData1", ".log", "000", 2)))); + op = CompactionUtils.buildFromFileSlice( + DEFAULT_PARTITION_PATHS[0], noDataFileSlice, Optional.of(metricsCaptureFn)); + testFileSliceCompactionOpEquality(noDataFileSlice, op, DEFAULT_PARTITION_PATHS[0]); + + //File Slice with data-file and log files present + FileSlice fileSlice = new FileSlice("000", "noData1"); + fileSlice.setDataFile(new TestHoodieDataFile("/tmp/noLog.parquet")); + fileSlice.addLogFile(new HoodieLogFile(new Path( + FSUtils.makeLogFileName("noData1", ".log", "000", 1)))); + fileSlice.addLogFile(new HoodieLogFile(new Path( + FSUtils.makeLogFileName("noData1", ".log", "000", 2)))); + op = CompactionUtils.buildFromFileSlice( + DEFAULT_PARTITION_PATHS[0], fileSlice, Optional.of(metricsCaptureFn)); + testFileSliceCompactionOpEquality(fileSlice, op, DEFAULT_PARTITION_PATHS[0]); + } + + /** + * Generate input for compaction plan tests + */ + private Pair>, HoodieCompactionPlan> buildCompactionPlan() { + FileSlice emptyFileSlice = new FileSlice("000", "empty1"); + FileSlice fileSlice = new FileSlice("000", "noData1"); + fileSlice.setDataFile(new TestHoodieDataFile("/tmp/noLog.parquet")); + fileSlice.addLogFile(new HoodieLogFile(new Path( + FSUtils.makeLogFileName("noData1", ".log", "000", 1)))); + fileSlice.addLogFile(new HoodieLogFile(new Path( + FSUtils.makeLogFileName("noData1", ".log", "000", 2)))); + FileSlice noLogFileSlice = new FileSlice("000", "noLog1"); + noLogFileSlice.setDataFile(new TestHoodieDataFile("/tmp/noLog.parquet")); + FileSlice noDataFileSlice = new FileSlice("000", "noData1"); + noDataFileSlice.addLogFile(new HoodieLogFile(new Path( + FSUtils.makeLogFileName("noData1", ".log", "000", 1)))); + noDataFileSlice.addLogFile(new HoodieLogFile(new Path( + FSUtils.makeLogFileName("noData1", ".log", "000", 2)))); + List fileSliceList = Arrays.asList(emptyFileSlice, noDataFileSlice, fileSlice, noLogFileSlice); + List> input = fileSliceList.stream().map(f -> Pair.of(DEFAULT_PARTITION_PATHS[0], f)) + .collect(Collectors.toList()); + return Pair.of(input, CompactionUtils.buildFromFileSlices(input, Optional.empty(), Optional.of(metricsCaptureFn))); + } + + @Test + public void testBuildFromFileSlices() { + Pair>, HoodieCompactionPlan> inputAndPlan = buildCompactionPlan(); + testFileSlicesCompactionPlanEquality(inputAndPlan.getKey(), inputAndPlan.getValue()); + } + + @Test + public void testCompactionTransformation() { + // check HoodieCompactionOperation <=> CompactionOperation transformation function + Pair>, HoodieCompactionPlan> inputAndPlan = buildCompactionPlan(); + HoodieCompactionPlan plan = inputAndPlan.getRight(); + List originalOps = plan.getOperations(); + List regeneratedOps = + originalOps.stream().map(op -> { + // Convert to CompactionOperation + return CompactionUtils.buildCompactionOperation(op); + }).map(op2 -> { + // Convert back to HoodieCompactionOperation and check for equality + return CompactionUtils.buildHoodieCompactionOperation(op2); + }).collect(Collectors.toList()); + Assert.assertTrue("Transformation did get tested", originalOps.size() > 0); + Assert.assertEquals("All fields set correctly in transformations", originalOps, regeneratedOps); + } + + @Test(expected = IllegalStateException.class) + public void testGetAllPendingCompactionOperationsWithDupFileId() throws IOException { + // Case where there is duplicate fileIds in compaction requests + HoodieCompactionPlan plan1 = createCompactionPlan("000", 10); + HoodieCompactionPlan plan2 = createCompactionPlan("001", 10); + scheduleCompaction("000", plan1); + scheduleCompaction("001", plan2); + // schedule same plan again so that there will be duplicates + scheduleCompaction("003", plan1); + metaClient = new HoodieTableMetaClient(metaClient.getHadoopConf(), basePath, true); + Map> res = + CompactionUtils.getAllPendingCompactionOperations(metaClient); + } + + @Test + public void testGetAllPendingCompactionOperations() throws IOException { + // Case where there are 4 compaction requests where 1 is empty. + testGetAllPendingCompactionOperations(false, 10, 10, 10, 0); + } + + @Test + public void testGetAllPendingInflightCompactionOperations() throws IOException { + // Case where there are 4 compaction requests where 1 is empty. All of them are marked inflight + testGetAllPendingCompactionOperations(true, 10, 10, 10, 0); + } + + @Test + public void testGetAllPendingCompactionOperationsForEmptyCompactions() throws IOException { + // Case where there are 4 compaction requests and all are empty. + testGetAllPendingCompactionOperations(false, 0, 0, 0, 0); + } + + private void testGetAllPendingCompactionOperations(boolean inflight, int numEntriesInPlan1, int numEntriesInPlan2, + int numEntriesInPlan3, int numEntriesInPlan4) throws IOException { + HoodieCompactionPlan plan1 = createCompactionPlan("000", numEntriesInPlan1); + HoodieCompactionPlan plan2 = createCompactionPlan("001", numEntriesInPlan2); + HoodieCompactionPlan plan3 = createCompactionPlan("002", numEntriesInPlan3); + HoodieCompactionPlan plan4 = createCompactionPlan("003", numEntriesInPlan4); + + if (inflight) { + scheduleInflightCompaction("000", plan1); + scheduleInflightCompaction("001", plan2); + scheduleInflightCompaction("002", plan3); + scheduleInflightCompaction("003", plan4); + } else { + scheduleCompaction("000", plan1); + scheduleCompaction("001", plan2); + scheduleCompaction("002", plan3); + scheduleCompaction("003", plan4); + } + + List expectedNumEntries = + Arrays.asList(numEntriesInPlan1, numEntriesInPlan2, numEntriesInPlan3, numEntriesInPlan4); + List plans = new ImmutableList.Builder() + .add(plan1, plan2, plan3, plan4).build(); + IntStream.range(0, 4).boxed().forEach(idx -> { + if (expectedNumEntries.get(idx) > 0) { + Assert.assertEquals("check if plan " + idx + " has exp entries", + expectedNumEntries.get(idx).longValue(), plans.get(idx).getOperations().size()); + } else { + Assert.assertNull("Plan " + idx + " has null ops", plans.get(idx).getOperations()); + } + }); + + metaClient = new HoodieTableMetaClient(metaClient.getHadoopConf(), basePath, true); + Map> pendingCompactionMap = + CompactionUtils.getAllPendingCompactionOperations(metaClient); + + Map> expPendingCompactionMap = + generateExpectedCompactionOperations(Arrays.asList(plan1, plan2, plan3, plan4)); + + // Ensure all the + Assert.assertEquals(expPendingCompactionMap, pendingCompactionMap); + } + + private Map> generateExpectedCompactionOperations( + List plans) { + return plans.stream() + .flatMap(plan -> { + if (plan.getOperations() != null) { + return plan.getOperations().stream().map(op -> Pair.of(op.getFileId(), + Pair.of(op.getBaseInstantTime(), op))); + } + return Stream.empty(); + }).collect(Collectors.toMap(Pair::getKey, Pair::getValue)); + } + + private void scheduleCompaction(String instantTime, HoodieCompactionPlan compactionPlan) throws IOException { + metaClient.getActiveTimeline().saveToRequested(new HoodieInstant(State.REQUESTED, COMPACTION_ACTION, instantTime), + AvroUtils.serializeCompactionWorkload(compactionPlan)); + } + + private void scheduleInflightCompaction(String instantTime, HoodieCompactionPlan compactionPlan) throws IOException { + metaClient.getActiveTimeline().saveToInflight(new HoodieInstant(State.INFLIGHT, COMPACTION_ACTION, instantTime), + AvroUtils.serializeCompactionWorkload(compactionPlan)); + } + + private HoodieCompactionPlan createCompactionPlan(String instantId, int numFileIds) { + List ops = IntStream.range(0, numFileIds).boxed().map(idx -> { + try { + String fileId = + HoodieTestUtils.createNewDataFile(basePath, DEFAULT_PARTITION_PATHS[0], instantId); + HoodieTestUtils.createNewLogFile(metaClient.getFs(), basePath, DEFAULT_PARTITION_PATHS[0], + instantId, fileId, Optional.of(1)); + HoodieTestUtils.createNewLogFile(metaClient.getFs(), basePath, DEFAULT_PARTITION_PATHS[0], + instantId, fileId, Optional.of(2)); + FileSlice slice = new FileSlice(instantId, fileId); + slice.setDataFile(new TestHoodieDataFile(HoodieTestUtils.createDataFile(basePath, DEFAULT_PARTITION_PATHS[0], + instantId, fileId))); + String logFilePath1 = HoodieTestUtils.getLogFilePath(basePath, DEFAULT_PARTITION_PATHS[0], instantId, fileId, + Optional.of(1)); + String logFilePath2 = HoodieTestUtils.getLogFilePath(basePath, DEFAULT_PARTITION_PATHS[0], instantId, fileId, + Optional.of(2)); + slice.addLogFile(new HoodieLogFile(new Path(logFilePath1))); + slice.addLogFile(new HoodieLogFile(new Path(logFilePath2))); + return CompactionUtils.buildFromFileSlice(DEFAULT_PARTITION_PATHS[0], slice, Optional.empty()); + } catch (IOException e) { + throw new HoodieIOException(e.getMessage(), e); + } + }).collect(Collectors.toList()); + return new HoodieCompactionPlan(ops.isEmpty() ? null : ops, new HashMap<>()); + } + + /** + * Validates if generated compaction plan matches with input file-slices + * + * @param input File Slices with partition-path + * @param plan Compaction Plan + */ + private void testFileSlicesCompactionPlanEquality(List> input, + HoodieCompactionPlan plan) { + Assert.assertEquals("All file-slices present", input.size(), plan.getOperations().size()); + IntStream.range(0, input.size()).boxed().forEach(idx -> + testFileSliceCompactionOpEquality(input.get(idx).getValue(), plan.getOperations().get(idx), + input.get(idx).getKey())); + } + + /** + * Validates if generated compaction operation matches with input file slice and partition path + * + * @param slice File Slice + * @param op HoodieCompactionOperation + * @param expPartitionPath Partition path + */ + private void testFileSliceCompactionOpEquality(FileSlice slice, HoodieCompactionOperation op, + String expPartitionPath) { + Assert.assertEquals("Partition path is correct", expPartitionPath, op.getPartitionPath()); + Assert.assertEquals("Same base-instant", slice.getBaseInstantTime(), op.getBaseInstantTime()); + Assert.assertEquals("Same file-id", slice.getFileId(), op.getFileId()); + if (slice.getDataFile().isPresent()) { + Assert.assertEquals("Same data-file", slice.getDataFile().get().getPath(), op.getDataFilePath()); + } + List paths = slice.getLogFiles().map(l -> l.getPath().toString()).collect(Collectors.toList()); + IntStream.range(0, paths.size()).boxed().forEach(idx -> { + Assert.assertEquals("Log File Index " + idx, paths.get(idx), op.getDeltaFilePaths().get(idx)); + }); + Assert.assertEquals("Metrics set", metrics, op.getMetrics()); + } + + + private static class TestHoodieDataFile extends HoodieDataFile { + + private final String path; + + public TestHoodieDataFile(String path) { + super(null); + this.path = path; + } + + @Override + public String getPath() { + return path; + } + + @Override + public String getFileId() { + return UUID.randomUUID().toString(); + } + + @Override + public String getCommitTime() { + return "100"; + } + + @Override + public long getFileSize() { + return 0; + } + } +}