Track fileIds with pending compaction in FileSystemView to provide correct API semantics
This commit is contained in:
committed by
vinoth chandar
parent
1b61f04e05
commit
9d99942564
@@ -108,7 +108,6 @@ public class HoodieCompactionConfig extends DefaultHoodieConfig {
|
||||
+ ".partitions";
|
||||
// 500GB of target IO per compaction (both read and write)
|
||||
public static final String DEFAULT_TARGET_PARTITIONS_PER_DAYBASED_COMPACTION = String.valueOf(10);
|
||||
public static final String DEFAULT_COMPACTOR_ID = "default";
|
||||
|
||||
private HoodieCompactionConfig(Properties props) {
|
||||
super(props);
|
||||
|
||||
@@ -41,7 +41,6 @@ import com.uber.hoodie.io.compact.strategy.CompactionStrategy;
|
||||
import com.uber.hoodie.table.HoodieCopyOnWriteTable;
|
||||
import com.uber.hoodie.table.HoodieTable;
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collection;
|
||||
import java.util.Iterator;
|
||||
import java.util.List;
|
||||
@@ -49,6 +48,7 @@ import java.util.Optional;
|
||||
import java.util.stream.Collectors;
|
||||
import java.util.stream.StreamSupport;
|
||||
import org.apache.avro.Schema;
|
||||
import org.apache.commons.lang3.tuple.Pair;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.log4j.LogManager;
|
||||
import org.apache.log4j.Logger;
|
||||
@@ -95,7 +95,7 @@ public class HoodieRealtimeTableCompactor implements HoodieCompactor {
|
||||
HoodieCopyOnWriteTable table = new HoodieCopyOnWriteTable(config, jsc);
|
||||
List<CompactionOperation> operations = compactionPlan.getOperations().stream()
|
||||
.map(CompactionOperation::convertFromAvroRecordInstance).collect(toList());
|
||||
log.info("Compactor " + compactionPlan.getCompactorId() + " running, Compacting " + operations + " files");
|
||||
log.info("Compactor compacting " + operations + " files");
|
||||
return jsc.parallelize(operations, operations.size())
|
||||
.map(s -> compact(table, metaClient, config, s, compactionCommitTime))
|
||||
.flatMap(writeStatusesItr -> writeStatusesItr.iterator());
|
||||
@@ -217,7 +217,7 @@ public class HoodieRealtimeTableCompactor implements HoodieCompactor {
|
||||
// TODO: In subsequent PRs, pending Compaction plans will be wired in. Strategy can look at pending compaction
|
||||
// plans to schedule next compaction plan
|
||||
HoodieCompactionPlan compactionPlan = config.getCompactionStrategy().generateCompactionPlan(config, operations,
|
||||
new ArrayList<>());
|
||||
CompactionUtils.getAllPendingCompactionPlans(metaClient).stream().map(Pair::getValue).collect(toList()));
|
||||
if (compactionPlan.getOperations().isEmpty()) {
|
||||
log.warn("After filtering, Nothing to compact for " + metaClient.getBasePath());
|
||||
}
|
||||
|
||||
@@ -22,7 +22,6 @@ import com.uber.hoodie.avro.model.HoodieCompactionPlan;
|
||||
import com.uber.hoodie.common.model.HoodieDataFile;
|
||||
import com.uber.hoodie.common.model.HoodieLogFile;
|
||||
import com.uber.hoodie.common.util.FSUtils;
|
||||
import com.uber.hoodie.config.HoodieCompactionConfig;
|
||||
import com.uber.hoodie.config.HoodieWriteConfig;
|
||||
import java.io.Serializable;
|
||||
import java.util.List;
|
||||
@@ -92,7 +91,7 @@ public abstract class CompactionStrategy implements Serializable {
|
||||
public HoodieCompactionPlan generateCompactionPlan(HoodieWriteConfig writeConfig,
|
||||
List<HoodieCompactionOperation> operations, List<HoodieCompactionPlan> pendingCompactionPlans) {
|
||||
// Strategy implementation can overload this method to set specific compactor-id
|
||||
return HoodieCompactionPlan.newBuilder().setCompactorId(HoodieCompactionConfig.DEFAULT_COMPACTOR_ID)
|
||||
return HoodieCompactionPlan.newBuilder()
|
||||
.setOperations(orderAndFilter(writeConfig, operations, pendingCompactionPlans))
|
||||
.build();
|
||||
}
|
||||
|
||||
@@ -3,11 +3,7 @@
|
||||
"type":"record",
|
||||
"name":"HoodieCompactionPlan",
|
||||
"fields":[
|
||||
{
|
||||
"name":"compactorId",
|
||||
"type":["null","string"]
|
||||
},
|
||||
{
|
||||
{
|
||||
"name":"operations",
|
||||
"type":["null", {
|
||||
"type":"array",
|
||||
|
||||
@@ -16,6 +16,7 @@
|
||||
|
||||
package com.uber.hoodie.common.table.view;
|
||||
|
||||
import com.google.common.collect.ImmutableMap;
|
||||
import com.uber.hoodie.common.model.FileSlice;
|
||||
import com.uber.hoodie.common.model.HoodieDataFile;
|
||||
import com.uber.hoodie.common.model.HoodieFileGroup;
|
||||
@@ -23,6 +24,8 @@ import com.uber.hoodie.common.model.HoodieLogFile;
|
||||
import com.uber.hoodie.common.table.HoodieTableMetaClient;
|
||||
import com.uber.hoodie.common.table.HoodieTimeline;
|
||||
import com.uber.hoodie.common.table.TableFileSystemView;
|
||||
import com.uber.hoodie.common.table.timeline.HoodieInstant;
|
||||
import com.uber.hoodie.common.util.CompactionUtils;
|
||||
import com.uber.hoodie.common.util.FSUtils;
|
||||
import com.uber.hoodie.exception.HoodieIOException;
|
||||
import java.io.IOException;
|
||||
@@ -41,6 +44,8 @@ import java.util.stream.Stream;
|
||||
import org.apache.commons.lang3.tuple.Pair;
|
||||
import org.apache.hadoop.fs.FileStatus;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.log4j.LogManager;
|
||||
import org.apache.log4j.Logger;
|
||||
|
||||
/**
|
||||
* Common abstract implementation for multiple TableFileSystemView Implementations. 2 possible
|
||||
@@ -55,6 +60,8 @@ public class HoodieTableFileSystemView implements TableFileSystemView,
|
||||
TableFileSystemView.ReadOptimizedView,
|
||||
TableFileSystemView.RealtimeView, Serializable {
|
||||
|
||||
private static Logger log = LogManager.getLogger(HoodieTableFileSystemView.class);
|
||||
|
||||
protected HoodieTableMetaClient metaClient;
|
||||
// This is the commits that will be visible for all views extending this view
|
||||
protected HoodieTimeline visibleActiveTimeline;
|
||||
@@ -78,10 +85,15 @@ public class HoodieTableFileSystemView implements TableFileSystemView,
|
||||
this.visibleActiveTimeline = visibleActiveTimeline;
|
||||
this.fileGroupMap = new HashMap<>();
|
||||
this.partitionToFileGroupsMap = new HashMap<>();
|
||||
//TODO: vb Will be implemented in next PR
|
||||
this.fileIdToPendingCompactionInstantTime = new HashMap<>();
|
||||
}
|
||||
|
||||
// Build fileId to Pending Compaction Instants
|
||||
List<HoodieInstant> pendingCompactionInstants =
|
||||
metaClient.getActiveTimeline().filterPendingCompactionTimeline().getInstants().collect(Collectors.toList());
|
||||
this.fileIdToPendingCompactionInstantTime = ImmutableMap.copyOf(
|
||||
CompactionUtils.getAllPendingCompactionOperations(metaClient).entrySet().stream().map(entry -> {
|
||||
return Pair.of(entry.getKey(), entry.getValue().getKey());
|
||||
}).collect(Collectors.toMap(Pair::getKey, Pair::getValue)));
|
||||
}
|
||||
|
||||
/**
|
||||
* Create a file system view, as of the given timeline, with the provided file statuses.
|
||||
@@ -404,15 +416,4 @@ public class HoodieTableFileSystemView implements TableFileSystemView,
|
||||
"Failed to list data files in partition " + partitionPathStr, e);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Used by tests to add pending compaction entries TODO: This method is temporary and should go away in subsequent
|
||||
* Async Compaction PR
|
||||
*
|
||||
* @param fileId File Id
|
||||
* @param compactionInstantTime Compaction Instant Time
|
||||
*/
|
||||
protected void addPendingCompactionFileId(String fileId, String compactionInstantTime) {
|
||||
fileIdToPendingCompactionInstantTime.put(fileId, compactionInstantTime);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -174,7 +174,7 @@ public class AvroUtils {
|
||||
return Optional.of(baos.toByteArray());
|
||||
}
|
||||
|
||||
public static HoodieCompactionPlan deserializeHoodieCompactionPlan(byte[] bytes)
|
||||
public static HoodieCompactionPlan deserializeCompactionPlan(byte[] bytes)
|
||||
throws IOException {
|
||||
return deserializeAvroMetadata(bytes, HoodieCompactionPlan.class);
|
||||
}
|
||||
|
||||
@@ -20,15 +20,21 @@ import com.uber.hoodie.avro.model.HoodieCompactionOperation;
|
||||
import com.uber.hoodie.avro.model.HoodieCompactionPlan;
|
||||
import com.uber.hoodie.common.model.CompactionOperation;
|
||||
import com.uber.hoodie.common.model.FileSlice;
|
||||
import com.uber.hoodie.common.table.HoodieTableMetaClient;
|
||||
import com.uber.hoodie.common.table.timeline.HoodieInstant;
|
||||
import com.uber.hoodie.exception.HoodieException;
|
||||
import java.io.IOException;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Optional;
|
||||
import java.util.function.Function;
|
||||
import java.util.stream.Collectors;
|
||||
import javafx.util.Pair;
|
||||
import java.util.stream.Stream;
|
||||
import org.apache.commons.lang3.tuple.Pair;
|
||||
|
||||
/**
|
||||
* Helper class to generate compaction workload from FileGroup/FileSlice abstraction
|
||||
* Helper class to generate compaction plan from FileGroup/FileSlice abstraction
|
||||
*/
|
||||
public class CompactionUtils {
|
||||
|
||||
@@ -41,7 +47,7 @@ public class CompactionUtils {
|
||||
* @return Compaction Operation
|
||||
*/
|
||||
public static HoodieCompactionOperation buildFromFileSlice(String partitionPath, FileSlice fileSlice,
|
||||
Optional<Function<Pair<String, FileSlice>, Map<String, Long>>> metricsCaptureFunction) {
|
||||
Optional<Function<Pair<String, FileSlice>, Map<String, Double>>> metricsCaptureFunction) {
|
||||
HoodieCompactionOperation.Builder builder = HoodieCompactionOperation.newBuilder();
|
||||
builder.setPartitionPath(partitionPath);
|
||||
builder.setFileId(fileSlice.getFileId());
|
||||
@@ -52,25 +58,23 @@ public class CompactionUtils {
|
||||
}
|
||||
|
||||
if (metricsCaptureFunction.isPresent()) {
|
||||
builder.setMetrics(metricsCaptureFunction.get().apply(new Pair(partitionPath, fileSlice)));
|
||||
builder.setMetrics(metricsCaptureFunction.get().apply(Pair.of(partitionPath, fileSlice)));
|
||||
}
|
||||
return builder.build();
|
||||
}
|
||||
|
||||
/**
|
||||
* Generate compaction workload from file-slices
|
||||
* Generate compaction plan from file-slices
|
||||
*
|
||||
* @param compactorId Compactor Id to set
|
||||
* @param partitionFileSlicePairs list of partition file-slice pairs
|
||||
* @param extraMetadata Extra Metadata
|
||||
* @param metricsCaptureFunction Metrics Capture function
|
||||
*/
|
||||
public static HoodieCompactionPlan buildFromFileSlices(String compactorId,
|
||||
public static HoodieCompactionPlan buildFromFileSlices(
|
||||
List<Pair<String, FileSlice>> partitionFileSlicePairs,
|
||||
Optional<Map<String, String>> extraMetadata,
|
||||
Optional<Function<Pair<String, FileSlice>, Map<String, Long>>> metricsCaptureFunction) {
|
||||
Optional<Function<Pair<String, FileSlice>, Map<String, Double>>> metricsCaptureFunction) {
|
||||
HoodieCompactionPlan.Builder builder = HoodieCompactionPlan.newBuilder();
|
||||
builder.setCompactorId(compactorId);
|
||||
extraMetadata.ifPresent(m -> builder.setExtraMetadata(m));
|
||||
builder.setOperations(partitionFileSlicePairs.stream().map(pfPair ->
|
||||
buildFromFileSlice(pfPair.getKey(), pfPair.getValue(), metricsCaptureFunction)).collect(Collectors.toList()));
|
||||
@@ -97,4 +101,58 @@ public class CompactionUtils {
|
||||
public static CompactionOperation buildCompactionOperation(HoodieCompactionOperation hc) {
|
||||
return CompactionOperation.convertFromAvroRecordInstance(hc);
|
||||
}
|
||||
|
||||
/**
|
||||
* Get all pending compaction plans along with their instants
|
||||
*
|
||||
* @param metaClient Hoodie Meta Client
|
||||
*/
|
||||
public static List<Pair<HoodieInstant, HoodieCompactionPlan>> getAllPendingCompactionPlans(
|
||||
HoodieTableMetaClient metaClient) {
|
||||
List<HoodieInstant> pendingCompactionInstants =
|
||||
metaClient.getActiveTimeline().filterPendingCompactionTimeline().getInstants().collect(Collectors.toList());
|
||||
return pendingCompactionInstants.stream().map(instant -> {
|
||||
try {
|
||||
HoodieCompactionPlan compactionPlan = AvroUtils.deserializeCompactionPlan(
|
||||
metaClient.getActiveTimeline().getInstantDetails(instant).get());
|
||||
return Pair.of(instant, compactionPlan);
|
||||
} catch (IOException e) {
|
||||
throw new HoodieException(e);
|
||||
}
|
||||
}).collect(Collectors.toList());
|
||||
}
|
||||
|
||||
/**
|
||||
* Get all file-ids with pending Compaction operations and their target compaction instant time
|
||||
*
|
||||
* @param metaClient Hoodie Table Meta Client
|
||||
*/
|
||||
public static Map<String, Pair<String, HoodieCompactionOperation>> getAllPendingCompactionOperations(
|
||||
HoodieTableMetaClient metaClient) {
|
||||
List<Pair<HoodieInstant, HoodieCompactionPlan>> pendingCompactionPlanWithInstants =
|
||||
getAllPendingCompactionPlans(metaClient);
|
||||
|
||||
Map<String, Pair<String, HoodieCompactionOperation>> fileIdToPendingCompactionWithInstantMap = new HashMap<>();
|
||||
pendingCompactionPlanWithInstants.stream().flatMap(instantPlanPair -> {
|
||||
HoodieInstant instant = instantPlanPair.getKey();
|
||||
HoodieCompactionPlan compactionPlan = instantPlanPair.getValue();
|
||||
List<HoodieCompactionOperation> ops = compactionPlan.getOperations();
|
||||
if (null != ops) {
|
||||
return ops.stream().map(op -> {
|
||||
return Pair.of(op.getFileId(), Pair.of(instant.getTimestamp(), op));
|
||||
});
|
||||
} else {
|
||||
return Stream.empty();
|
||||
}
|
||||
}).forEach(pair -> {
|
||||
// Defensive check to ensure a single-fileId does not have more than one pending compaction
|
||||
if (fileIdToPendingCompactionWithInstantMap.containsKey(pair.getKey())) {
|
||||
String msg = "Hoodie File Id (" + pair.getKey() + ") has more thant 1 pending compactions. Instants: "
|
||||
+ pair.getValue() + ", " + fileIdToPendingCompactionWithInstantMap.get(pair.getKey());
|
||||
throw new IllegalStateException(msg);
|
||||
}
|
||||
fileIdToPendingCompactionWithInstantMap.put(pair.getKey(), pair.getValue());
|
||||
});
|
||||
return fileIdToPendingCompactionWithInstantMap;
|
||||
}
|
||||
}
|
||||
|
||||
@@ -22,6 +22,7 @@ import static org.junit.Assert.assertTrue;
|
||||
|
||||
import com.google.common.collect.Lists;
|
||||
import com.google.common.collect.Sets;
|
||||
import com.uber.hoodie.avro.model.HoodieCompactionPlan;
|
||||
import com.uber.hoodie.common.model.FileSlice;
|
||||
import com.uber.hoodie.common.model.HoodieDataFile;
|
||||
import com.uber.hoodie.common.model.HoodieFileGroup;
|
||||
@@ -33,9 +34,12 @@ import com.uber.hoodie.common.table.TableFileSystemView;
|
||||
import com.uber.hoodie.common.table.timeline.HoodieActiveTimeline;
|
||||
import com.uber.hoodie.common.table.timeline.HoodieInstant;
|
||||
import com.uber.hoodie.common.table.timeline.HoodieInstant.State;
|
||||
import com.uber.hoodie.common.util.AvroUtils;
|
||||
import com.uber.hoodie.common.util.CompactionUtils;
|
||||
import com.uber.hoodie.common.util.FSUtils;
|
||||
import java.io.File;
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
@@ -44,6 +48,7 @@ import java.util.Set;
|
||||
import java.util.UUID;
|
||||
import java.util.stream.Collectors;
|
||||
import java.util.stream.Stream;
|
||||
import org.apache.commons.lang3.tuple.Pair;
|
||||
import org.apache.hadoop.fs.FileStatus;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.junit.Before;
|
||||
@@ -234,19 +239,26 @@ public class HoodieTableFileSystemViewTest {
|
||||
commitTimeline.saveAsComplete(deltaInstant2, Optional.empty());
|
||||
commitTimeline.saveAsComplete(deltaInstant3, Optional.empty());
|
||||
|
||||
// Fake delta-ingestion after compaction-requested
|
||||
refreshFsView(null);
|
||||
List<FileSlice> fileSlices = rtView.getLatestFileSlices(partitionPath).collect(Collectors.toList());
|
||||
String compactionRequestedTime = "4";
|
||||
String compactDataFileName = FSUtils.makeDataFileName(compactionRequestedTime, 1, fileId);
|
||||
List<Pair<String, FileSlice>> partitionFileSlicesPairs = new ArrayList<>();
|
||||
partitionFileSlicesPairs.add(Pair.of(partitionPath, fileSlices.get(0)));
|
||||
HoodieCompactionPlan compactionPlan = CompactionUtils.buildFromFileSlices(partitionFileSlicesPairs,
|
||||
Optional.empty(), Optional.empty());
|
||||
HoodieInstant compactionInstant = null;
|
||||
if (isCompactionInFlight) {
|
||||
// Create a Data-file but this should be skipped by view
|
||||
new File(basePath + "/" + partitionPath + "/" + compactDataFileName).createNewFile();
|
||||
compactionInstant = new HoodieInstant(State.INFLIGHT, HoodieTimeline.COMPACTION_ACTION, compactionRequestedTime);
|
||||
commitTimeline.saveToInflight(compactionInstant, Optional.empty());
|
||||
commitTimeline.saveToInflight(compactionInstant, AvroUtils.serializeCompactionWorkload(compactionPlan));
|
||||
} else {
|
||||
compactionInstant = new HoodieInstant(State.REQUESTED, HoodieTimeline.COMPACTION_ACTION, compactionRequestedTime);
|
||||
commitTimeline.saveToRequested(compactionInstant, Optional.empty());
|
||||
commitTimeline.saveToRequested(compactionInstant, AvroUtils.serializeCompactionWorkload(compactionPlan));
|
||||
}
|
||||
|
||||
// Fake delta-ingestion after compaction-requested
|
||||
String deltaInstantTime4 = "5";
|
||||
String deltaInstantTime5 = "6";
|
||||
List<String> allInstantTimes = Arrays.asList(instantTime1, deltaInstantTime1, deltaInstantTime2,
|
||||
@@ -260,7 +272,6 @@ public class HoodieTableFileSystemViewTest {
|
||||
commitTimeline.saveAsComplete(deltaInstant4, Optional.empty());
|
||||
commitTimeline.saveAsComplete(deltaInstant5, Optional.empty());
|
||||
refreshFsView(null);
|
||||
fsView.addPendingCompactionFileId(fileId, compactionRequestedTime);
|
||||
|
||||
List<HoodieDataFile> dataFiles = roView.getAllDataFiles(partitionPath).collect(Collectors.toList());
|
||||
if (skipCreatingDataFile) {
|
||||
@@ -367,7 +378,6 @@ public class HoodieTableFileSystemViewTest {
|
||||
commitTimeline.saveToInflight(new HoodieInstant(State.INFLIGHT, HoodieTimeline.DELTA_COMMIT_ACTION,
|
||||
inflightDeltaInstantTime), Optional.empty());
|
||||
refreshFsView(null);
|
||||
fsView.addPendingCompactionFileId(fileId, compactionRequestedTime);
|
||||
|
||||
List<FileSlice> allRawFileSlices = getAllRawFileSlices(partitionPath).collect(Collectors.toList());
|
||||
dataFiles = allRawFileSlices.stream().flatMap(slice -> {
|
||||
|
||||
@@ -0,0 +1,345 @@
|
||||
/*
|
||||
* Copyright (c) 2016 Uber Technologies, Inc. (hoodie-dev-group@uber.com)
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package com.uber.hoodie.common.util;
|
||||
|
||||
import static com.uber.hoodie.common.model.HoodieTestUtils.DEFAULT_PARTITION_PATHS;
|
||||
import static com.uber.hoodie.common.table.HoodieTimeline.COMPACTION_ACTION;
|
||||
|
||||
import com.google.common.collect.ImmutableList;
|
||||
import com.google.common.collect.ImmutableMap;
|
||||
import com.uber.hoodie.avro.model.HoodieCompactionOperation;
|
||||
import com.uber.hoodie.avro.model.HoodieCompactionPlan;
|
||||
import com.uber.hoodie.common.model.FileSlice;
|
||||
import com.uber.hoodie.common.model.HoodieDataFile;
|
||||
import com.uber.hoodie.common.model.HoodieLogFile;
|
||||
import com.uber.hoodie.common.model.HoodieTestUtils;
|
||||
import com.uber.hoodie.common.table.HoodieTableMetaClient;
|
||||
import com.uber.hoodie.common.table.timeline.HoodieInstant;
|
||||
import com.uber.hoodie.common.table.timeline.HoodieInstant.State;
|
||||
import com.uber.hoodie.exception.HoodieIOException;
|
||||
import java.io.IOException;
|
||||
import java.util.Arrays;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Optional;
|
||||
import java.util.UUID;
|
||||
import java.util.function.Function;
|
||||
import java.util.stream.Collectors;
|
||||
import java.util.stream.IntStream;
|
||||
import java.util.stream.Stream;
|
||||
import org.apache.commons.lang3.tuple.Pair;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Before;
|
||||
import org.junit.Rule;
|
||||
import org.junit.Test;
|
||||
import org.junit.rules.TemporaryFolder;
|
||||
|
||||
public class TestCompactionUtils {
|
||||
|
||||
private static final Map<String, Double> metrics =
|
||||
new ImmutableMap.Builder<String, Double>()
|
||||
.put("key1", 1.0)
|
||||
.put("key2", 3.0).build();
|
||||
@Rule
|
||||
public TemporaryFolder tmpFolder = new TemporaryFolder();
|
||||
private HoodieTableMetaClient metaClient;
|
||||
private String basePath;
|
||||
private Function<Pair<String, FileSlice>, Map<String, Double>> metricsCaptureFn = (partitionFileSlice) -> metrics;
|
||||
|
||||
@Before
|
||||
public void init() throws IOException {
|
||||
metaClient = HoodieTestUtils.init(tmpFolder.getRoot().getAbsolutePath());
|
||||
basePath = metaClient.getBasePath();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testBuildFromFileSlice() {
|
||||
// Empty File-Slice with no data and log files
|
||||
FileSlice emptyFileSlice = new FileSlice("000", "empty1");
|
||||
HoodieCompactionOperation op = CompactionUtils.buildFromFileSlice(
|
||||
DEFAULT_PARTITION_PATHS[0], emptyFileSlice, Optional.of(metricsCaptureFn));
|
||||
testFileSliceCompactionOpEquality(emptyFileSlice, op, DEFAULT_PARTITION_PATHS[0]);
|
||||
|
||||
// File Slice with data-file but no log files
|
||||
FileSlice noLogFileSlice = new FileSlice("000", "noLog1");
|
||||
noLogFileSlice.setDataFile(new TestHoodieDataFile("/tmp/noLog.parquet"));
|
||||
op = CompactionUtils.buildFromFileSlice(
|
||||
DEFAULT_PARTITION_PATHS[0], noLogFileSlice, Optional.of(metricsCaptureFn));
|
||||
testFileSliceCompactionOpEquality(noLogFileSlice, op, DEFAULT_PARTITION_PATHS[0]);
|
||||
|
||||
//File Slice with no data-file but log files present
|
||||
FileSlice noDataFileSlice = new FileSlice("000", "noData1");
|
||||
noDataFileSlice.addLogFile(new HoodieLogFile(new Path(
|
||||
FSUtils.makeLogFileName("noData1", ".log", "000", 1))));
|
||||
noDataFileSlice.addLogFile(new HoodieLogFile(new Path(
|
||||
FSUtils.makeLogFileName("noData1", ".log", "000", 2))));
|
||||
op = CompactionUtils.buildFromFileSlice(
|
||||
DEFAULT_PARTITION_PATHS[0], noDataFileSlice, Optional.of(metricsCaptureFn));
|
||||
testFileSliceCompactionOpEquality(noDataFileSlice, op, DEFAULT_PARTITION_PATHS[0]);
|
||||
|
||||
//File Slice with data-file and log files present
|
||||
FileSlice fileSlice = new FileSlice("000", "noData1");
|
||||
fileSlice.setDataFile(new TestHoodieDataFile("/tmp/noLog.parquet"));
|
||||
fileSlice.addLogFile(new HoodieLogFile(new Path(
|
||||
FSUtils.makeLogFileName("noData1", ".log", "000", 1))));
|
||||
fileSlice.addLogFile(new HoodieLogFile(new Path(
|
||||
FSUtils.makeLogFileName("noData1", ".log", "000", 2))));
|
||||
op = CompactionUtils.buildFromFileSlice(
|
||||
DEFAULT_PARTITION_PATHS[0], fileSlice, Optional.of(metricsCaptureFn));
|
||||
testFileSliceCompactionOpEquality(fileSlice, op, DEFAULT_PARTITION_PATHS[0]);
|
||||
}
|
||||
|
||||
/**
|
||||
* Generate input for compaction plan tests
|
||||
*/
|
||||
private Pair<List<Pair<String, FileSlice>>, HoodieCompactionPlan> buildCompactionPlan() {
|
||||
FileSlice emptyFileSlice = new FileSlice("000", "empty1");
|
||||
FileSlice fileSlice = new FileSlice("000", "noData1");
|
||||
fileSlice.setDataFile(new TestHoodieDataFile("/tmp/noLog.parquet"));
|
||||
fileSlice.addLogFile(new HoodieLogFile(new Path(
|
||||
FSUtils.makeLogFileName("noData1", ".log", "000", 1))));
|
||||
fileSlice.addLogFile(new HoodieLogFile(new Path(
|
||||
FSUtils.makeLogFileName("noData1", ".log", "000", 2))));
|
||||
FileSlice noLogFileSlice = new FileSlice("000", "noLog1");
|
||||
noLogFileSlice.setDataFile(new TestHoodieDataFile("/tmp/noLog.parquet"));
|
||||
FileSlice noDataFileSlice = new FileSlice("000", "noData1");
|
||||
noDataFileSlice.addLogFile(new HoodieLogFile(new Path(
|
||||
FSUtils.makeLogFileName("noData1", ".log", "000", 1))));
|
||||
noDataFileSlice.addLogFile(new HoodieLogFile(new Path(
|
||||
FSUtils.makeLogFileName("noData1", ".log", "000", 2))));
|
||||
List<FileSlice> fileSliceList = Arrays.asList(emptyFileSlice, noDataFileSlice, fileSlice, noLogFileSlice);
|
||||
List<Pair<String, FileSlice>> input = fileSliceList.stream().map(f -> Pair.of(DEFAULT_PARTITION_PATHS[0], f))
|
||||
.collect(Collectors.toList());
|
||||
return Pair.of(input, CompactionUtils.buildFromFileSlices(input, Optional.empty(), Optional.of(metricsCaptureFn)));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testBuildFromFileSlices() {
|
||||
Pair<List<Pair<String, FileSlice>>, HoodieCompactionPlan> inputAndPlan = buildCompactionPlan();
|
||||
testFileSlicesCompactionPlanEquality(inputAndPlan.getKey(), inputAndPlan.getValue());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testCompactionTransformation() {
|
||||
// check HoodieCompactionOperation <=> CompactionOperation transformation function
|
||||
Pair<List<Pair<String, FileSlice>>, HoodieCompactionPlan> inputAndPlan = buildCompactionPlan();
|
||||
HoodieCompactionPlan plan = inputAndPlan.getRight();
|
||||
List<HoodieCompactionOperation> originalOps = plan.getOperations();
|
||||
List<HoodieCompactionOperation> regeneratedOps =
|
||||
originalOps.stream().map(op -> {
|
||||
// Convert to CompactionOperation
|
||||
return CompactionUtils.buildCompactionOperation(op);
|
||||
}).map(op2 -> {
|
||||
// Convert back to HoodieCompactionOperation and check for equality
|
||||
return CompactionUtils.buildHoodieCompactionOperation(op2);
|
||||
}).collect(Collectors.toList());
|
||||
Assert.assertTrue("Transformation did get tested", originalOps.size() > 0);
|
||||
Assert.assertEquals("All fields set correctly in transformations", originalOps, regeneratedOps);
|
||||
}
|
||||
|
||||
@Test(expected = IllegalStateException.class)
|
||||
public void testGetAllPendingCompactionOperationsWithDupFileId() throws IOException {
|
||||
// Case where there is duplicate fileIds in compaction requests
|
||||
HoodieCompactionPlan plan1 = createCompactionPlan("000", 10);
|
||||
HoodieCompactionPlan plan2 = createCompactionPlan("001", 10);
|
||||
scheduleCompaction("000", plan1);
|
||||
scheduleCompaction("001", plan2);
|
||||
// schedule same plan again so that there will be duplicates
|
||||
scheduleCompaction("003", plan1);
|
||||
metaClient = new HoodieTableMetaClient(metaClient.getHadoopConf(), basePath, true);
|
||||
Map<String, Pair<String, HoodieCompactionOperation>> res =
|
||||
CompactionUtils.getAllPendingCompactionOperations(metaClient);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testGetAllPendingCompactionOperations() throws IOException {
|
||||
// Case where there are 4 compaction requests where 1 is empty.
|
||||
testGetAllPendingCompactionOperations(false, 10, 10, 10, 0);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testGetAllPendingInflightCompactionOperations() throws IOException {
|
||||
// Case where there are 4 compaction requests where 1 is empty. All of them are marked inflight
|
||||
testGetAllPendingCompactionOperations(true, 10, 10, 10, 0);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testGetAllPendingCompactionOperationsForEmptyCompactions() throws IOException {
|
||||
// Case where there are 4 compaction requests and all are empty.
|
||||
testGetAllPendingCompactionOperations(false, 0, 0, 0, 0);
|
||||
}
|
||||
|
||||
private void testGetAllPendingCompactionOperations(boolean inflight, int numEntriesInPlan1, int numEntriesInPlan2,
|
||||
int numEntriesInPlan3, int numEntriesInPlan4) throws IOException {
|
||||
HoodieCompactionPlan plan1 = createCompactionPlan("000", numEntriesInPlan1);
|
||||
HoodieCompactionPlan plan2 = createCompactionPlan("001", numEntriesInPlan2);
|
||||
HoodieCompactionPlan plan3 = createCompactionPlan("002", numEntriesInPlan3);
|
||||
HoodieCompactionPlan plan4 = createCompactionPlan("003", numEntriesInPlan4);
|
||||
|
||||
if (inflight) {
|
||||
scheduleInflightCompaction("000", plan1);
|
||||
scheduleInflightCompaction("001", plan2);
|
||||
scheduleInflightCompaction("002", plan3);
|
||||
scheduleInflightCompaction("003", plan4);
|
||||
} else {
|
||||
scheduleCompaction("000", plan1);
|
||||
scheduleCompaction("001", plan2);
|
||||
scheduleCompaction("002", plan3);
|
||||
scheduleCompaction("003", plan4);
|
||||
}
|
||||
|
||||
List<Integer> expectedNumEntries =
|
||||
Arrays.asList(numEntriesInPlan1, numEntriesInPlan2, numEntriesInPlan3, numEntriesInPlan4);
|
||||
List<HoodieCompactionPlan> plans = new ImmutableList.Builder<HoodieCompactionPlan>()
|
||||
.add(plan1, plan2, plan3, plan4).build();
|
||||
IntStream.range(0, 4).boxed().forEach(idx -> {
|
||||
if (expectedNumEntries.get(idx) > 0) {
|
||||
Assert.assertEquals("check if plan " + idx + " has exp entries",
|
||||
expectedNumEntries.get(idx).longValue(), plans.get(idx).getOperations().size());
|
||||
} else {
|
||||
Assert.assertNull("Plan " + idx + " has null ops", plans.get(idx).getOperations());
|
||||
}
|
||||
});
|
||||
|
||||
metaClient = new HoodieTableMetaClient(metaClient.getHadoopConf(), basePath, true);
|
||||
Map<String, Pair<String, HoodieCompactionOperation>> pendingCompactionMap =
|
||||
CompactionUtils.getAllPendingCompactionOperations(metaClient);
|
||||
|
||||
Map<String, Pair<String, HoodieCompactionOperation>> expPendingCompactionMap =
|
||||
generateExpectedCompactionOperations(Arrays.asList(plan1, plan2, plan3, plan4));
|
||||
|
||||
// Ensure all the
|
||||
Assert.assertEquals(expPendingCompactionMap, pendingCompactionMap);
|
||||
}
|
||||
|
||||
private Map<String, Pair<String, HoodieCompactionOperation>> generateExpectedCompactionOperations(
|
||||
List<HoodieCompactionPlan> plans) {
|
||||
return plans.stream()
|
||||
.flatMap(plan -> {
|
||||
if (plan.getOperations() != null) {
|
||||
return plan.getOperations().stream().map(op -> Pair.of(op.getFileId(),
|
||||
Pair.of(op.getBaseInstantTime(), op)));
|
||||
}
|
||||
return Stream.empty();
|
||||
}).collect(Collectors.toMap(Pair::getKey, Pair::getValue));
|
||||
}
|
||||
|
||||
private void scheduleCompaction(String instantTime, HoodieCompactionPlan compactionPlan) throws IOException {
|
||||
metaClient.getActiveTimeline().saveToRequested(new HoodieInstant(State.REQUESTED, COMPACTION_ACTION, instantTime),
|
||||
AvroUtils.serializeCompactionWorkload(compactionPlan));
|
||||
}
|
||||
|
||||
private void scheduleInflightCompaction(String instantTime, HoodieCompactionPlan compactionPlan) throws IOException {
|
||||
metaClient.getActiveTimeline().saveToInflight(new HoodieInstant(State.INFLIGHT, COMPACTION_ACTION, instantTime),
|
||||
AvroUtils.serializeCompactionWorkload(compactionPlan));
|
||||
}
|
||||
|
||||
private HoodieCompactionPlan createCompactionPlan(String instantId, int numFileIds) {
|
||||
List<HoodieCompactionOperation> ops = IntStream.range(0, numFileIds).boxed().map(idx -> {
|
||||
try {
|
||||
String fileId =
|
||||
HoodieTestUtils.createNewDataFile(basePath, DEFAULT_PARTITION_PATHS[0], instantId);
|
||||
HoodieTestUtils.createNewLogFile(metaClient.getFs(), basePath, DEFAULT_PARTITION_PATHS[0],
|
||||
instantId, fileId, Optional.of(1));
|
||||
HoodieTestUtils.createNewLogFile(metaClient.getFs(), basePath, DEFAULT_PARTITION_PATHS[0],
|
||||
instantId, fileId, Optional.of(2));
|
||||
FileSlice slice = new FileSlice(instantId, fileId);
|
||||
slice.setDataFile(new TestHoodieDataFile(HoodieTestUtils.createDataFile(basePath, DEFAULT_PARTITION_PATHS[0],
|
||||
instantId, fileId)));
|
||||
String logFilePath1 = HoodieTestUtils.getLogFilePath(basePath, DEFAULT_PARTITION_PATHS[0], instantId, fileId,
|
||||
Optional.of(1));
|
||||
String logFilePath2 = HoodieTestUtils.getLogFilePath(basePath, DEFAULT_PARTITION_PATHS[0], instantId, fileId,
|
||||
Optional.of(2));
|
||||
slice.addLogFile(new HoodieLogFile(new Path(logFilePath1)));
|
||||
slice.addLogFile(new HoodieLogFile(new Path(logFilePath2)));
|
||||
return CompactionUtils.buildFromFileSlice(DEFAULT_PARTITION_PATHS[0], slice, Optional.empty());
|
||||
} catch (IOException e) {
|
||||
throw new HoodieIOException(e.getMessage(), e);
|
||||
}
|
||||
}).collect(Collectors.toList());
|
||||
return new HoodieCompactionPlan(ops.isEmpty() ? null : ops, new HashMap<>());
|
||||
}
|
||||
|
||||
/**
|
||||
* Validates if generated compaction plan matches with input file-slices
|
||||
*
|
||||
* @param input File Slices with partition-path
|
||||
* @param plan Compaction Plan
|
||||
*/
|
||||
private void testFileSlicesCompactionPlanEquality(List<Pair<String, FileSlice>> input,
|
||||
HoodieCompactionPlan plan) {
|
||||
Assert.assertEquals("All file-slices present", input.size(), plan.getOperations().size());
|
||||
IntStream.range(0, input.size()).boxed().forEach(idx ->
|
||||
testFileSliceCompactionOpEquality(input.get(idx).getValue(), plan.getOperations().get(idx),
|
||||
input.get(idx).getKey()));
|
||||
}
|
||||
|
||||
/**
|
||||
* Validates if generated compaction operation matches with input file slice and partition path
|
||||
*
|
||||
* @param slice File Slice
|
||||
* @param op HoodieCompactionOperation
|
||||
* @param expPartitionPath Partition path
|
||||
*/
|
||||
private void testFileSliceCompactionOpEquality(FileSlice slice, HoodieCompactionOperation op,
|
||||
String expPartitionPath) {
|
||||
Assert.assertEquals("Partition path is correct", expPartitionPath, op.getPartitionPath());
|
||||
Assert.assertEquals("Same base-instant", slice.getBaseInstantTime(), op.getBaseInstantTime());
|
||||
Assert.assertEquals("Same file-id", slice.getFileId(), op.getFileId());
|
||||
if (slice.getDataFile().isPresent()) {
|
||||
Assert.assertEquals("Same data-file", slice.getDataFile().get().getPath(), op.getDataFilePath());
|
||||
}
|
||||
List<String> paths = slice.getLogFiles().map(l -> l.getPath().toString()).collect(Collectors.toList());
|
||||
IntStream.range(0, paths.size()).boxed().forEach(idx -> {
|
||||
Assert.assertEquals("Log File Index " + idx, paths.get(idx), op.getDeltaFilePaths().get(idx));
|
||||
});
|
||||
Assert.assertEquals("Metrics set", metrics, op.getMetrics());
|
||||
}
|
||||
|
||||
|
||||
private static class TestHoodieDataFile extends HoodieDataFile {
|
||||
|
||||
private final String path;
|
||||
|
||||
public TestHoodieDataFile(String path) {
|
||||
super(null);
|
||||
this.path = path;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getPath() {
|
||||
return path;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getFileId() {
|
||||
return UUID.randomUUID().toString();
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getCommitTime() {
|
||||
return "100";
|
||||
}
|
||||
|
||||
@Override
|
||||
public long getFileSize() {
|
||||
return 0;
|
||||
}
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user