1
0

Ensure Cleaner and Archiver do not delete file-slices and workload marked for compaction

This commit is contained in:
Balaji Varadarajan
2018-05-31 14:16:19 -07:00
committed by vinoth chandar
parent 0a0451a765
commit 9b78523d62
10 changed files with 666 additions and 76 deletions

View File

@@ -36,6 +36,7 @@ import com.uber.hoodie.common.table.HoodieTimeline;
import com.uber.hoodie.common.table.TableFileSystemView;
import com.uber.hoodie.common.table.timeline.HoodieActiveTimeline;
import com.uber.hoodie.common.table.timeline.HoodieInstant;
import com.uber.hoodie.common.table.timeline.HoodieInstant.State;
import com.uber.hoodie.common.util.AvroUtils;
import com.uber.hoodie.common.util.FSUtils;
import com.uber.hoodie.config.HoodieCompactionConfig;
@@ -65,6 +66,7 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;
import org.apache.hadoop.fs.FileSystem;
import org.apache.log4j.LogManager;
@@ -91,6 +93,7 @@ public class HoodieWriteClient<T extends HoodieRecordPayload> implements Seriali
private final transient FileSystem fs;
private final transient JavaSparkContext jsc;
private final HoodieWriteConfig config;
private final boolean rollbackInFlight;
private final transient HoodieMetrics metrics;
private final transient HoodieIndex<T> index;
private transient Timer.Context writeContext = null;
@@ -122,10 +125,7 @@ public class HoodieWriteClient<T extends HoodieRecordPayload> implements Seriali
this.config = clientConfig;
this.index = index;
this.metrics = new HoodieMetrics(config, config.getTableName());
if (rollbackInFlight) {
rollbackInflightCommits();
}
this.rollbackInFlight = rollbackInFlight;
}
public static SparkConf registerClasses(SparkConf conf) {
@@ -681,6 +681,42 @@ public class HoodieWriteClient<T extends HoodieRecordPayload> implements Seriali
logger.info("Savepoint " + savepointTime + " deleted");
}
/**
* Delete a compaction request that is pending.
*
* NOTE - This is an Admin operation.
* With async compaction, this is expected to be called with async compaction and ingestion shutdown.
* Otherwise, async compactor could fail with errors
*
* @param compactionTime - delete the compaction time
* @return
*/
private void deletePendingCompaction(String compactionTime) {
HoodieTable<T> table = HoodieTable.getHoodieTable(
new HoodieTableMetaClient(jsc.hadoopConfiguration(), config.getBasePath(), true), config, jsc);
HoodieActiveTimeline activeTimeline = table.getActiveTimeline();
HoodieInstant compactionRequestedInstant =
new HoodieInstant(State.REQUESTED, HoodieTimeline.COMPACTION_ACTION, compactionTime);
HoodieInstant compactionInflightInstant =
new HoodieInstant(State.INFLIGHT, HoodieTimeline.COMPACTION_ACTION, compactionTime);
boolean isCompactionInstantInRequestedState = table.getActiveTimeline().filterPendingCompactionTimeline()
.containsInstant(compactionRequestedInstant);
boolean isCompactionInstantInInflightState = table.getActiveTimeline().filterPendingCompactionTimeline()
.containsInstant(compactionInflightInstant);
if (isCompactionInstantInRequestedState) {
activeTimeline.deleteCompactionRequested(compactionRequestedInstant);
} else if (isCompactionInstantInInflightState) {
activeTimeline.revertCompactionInflightToRequested(compactionInflightInstant);
activeTimeline.deleteCompactionRequested(compactionRequestedInstant);
} else {
logger.error("No compaction present " + compactionTime);
throw new IllegalArgumentException("No compaction present " + compactionTime);
}
logger.info("Compaction " + compactionTime + " deleted");
}
/**
* Rollback the state to the savepoint. WARNING: This rollsback recent commits and deleted data
* files. Queries accessing the files will mostly fail. This should be done during a downtime.
@@ -692,6 +728,11 @@ public class HoodieWriteClient<T extends HoodieRecordPayload> implements Seriali
HoodieTable<T> table = HoodieTable.getHoodieTable(
new HoodieTableMetaClient(jsc.hadoopConfiguration(), config.getBasePath(), true), config, jsc);
HoodieActiveTimeline activeTimeline = table.getActiveTimeline();
// Rollback to savepoint is expected to be a manual operation and no concurrent ingestion or compaction is expected
// to be running. Rollback to savepoint also removes any pending compaction actions that are generated after
// savepoint time. Allowing pending compaction to be retained is not safe as those workload could be referencing
// file-slices that will be rolled-back as part of this operation
HoodieTimeline commitTimeline = table.getMetaClient().getCommitsAndCompactionTimeline();
HoodieInstant savePoint = new HoodieInstant(false, HoodieTimeline.SAVEPOINT_ACTION,
@@ -740,8 +781,10 @@ public class HoodieWriteClient<T extends HoodieRecordPayload> implements Seriali
// Create a Hoodie table which encapsulated the commits and files visible
HoodieTable<T> table = HoodieTable.getHoodieTable(
new HoodieTableMetaClient(jsc.hadoopConfiguration(), config.getBasePath(), true), config, jsc);
HoodieActiveTimeline activeTimeline = table.getActiveTimeline();
HoodieTimeline inflightTimeline = table.getInflightCommitTimeline();
Set<String> pendingCompactions =
table.getActiveTimeline().filterPendingCompactionTimeline().getInstants()
.map(HoodieInstant::getTimestamp).collect(Collectors.toSet());
HoodieTimeline inflightCommitTimeline = table.getInflightCommitTimeline();
HoodieTimeline commitTimeline = table.getCompletedCommitTimeline();
// Check if any of the commits is a savepoint - do not allow rollback on those commits
@@ -755,37 +798,45 @@ public class HoodieWriteClient<T extends HoodieRecordPayload> implements Seriali
}
});
List<String> pendingCompactionToRollback =
commits.stream().filter(c -> pendingCompactions.contains(c)).collect(Collectors.toList());
List<String> commitsToRollback =
commits.stream().filter(c -> !pendingCompactions.contains(c)).collect(Collectors.toList());
try {
if (commitTimeline.empty() && inflightTimeline.empty()) {
if (commitTimeline.empty() && inflightCommitTimeline.empty()) {
// nothing to rollback
logger.info("No commits to rollback " + commits);
logger.info("No commits to rollback " + commitsToRollback);
}
// Make sure only the last n commits are being rolled back
// If there is a commit in-between or after that is not rolled back, then abort
String lastCommit = commits.get(commits.size() - 1);
String lastCommit = commitsToRollback.get(commitsToRollback.size() - 1);
if (!commitTimeline.empty() && !commitTimeline
.findInstantsAfter(lastCommit, Integer.MAX_VALUE).empty()) {
throw new HoodieRollbackException(
"Found commits after time :" + lastCommit + ", please rollback greater commits first");
}
List<String> inflights = inflightTimeline.getInstants().map(HoodieInstant::getTimestamp)
List<String> inflights = inflightCommitTimeline.getInstants().map(HoodieInstant::getTimestamp)
.collect(Collectors.toList());
if (!inflights.isEmpty() && inflights.indexOf(lastCommit) != inflights.size() - 1) {
throw new HoodieRollbackException("Found in-flight commits after time :" + lastCommit
+ ", please rollback greater commits first");
}
List<HoodieRollbackStat> stats = table.rollback(jsc, commits);
// Remove interleaving pending compactions before rolling back commits
pendingCompactionToRollback.stream().forEach(this::deletePendingCompaction);
List<HoodieRollbackStat> stats = table.rollback(jsc, commitsToRollback);
// cleanup index entries
commits.stream().forEach(s -> {
commitsToRollback.stream().forEach(s -> {
if (!index.rollbackCommit(s)) {
throw new HoodieRollbackException("Rollback index changes failed, for time :" + s);
}
});
logger.info("Index rolled back for commits " + commits);
logger.info("Index rolled back for commits " + commitsToRollback);
Optional<Long> durationInMs = Optional.empty();
if (context != null) {
@@ -795,11 +846,11 @@ public class HoodieWriteClient<T extends HoodieRecordPayload> implements Seriali
metrics.updateRollbackMetrics(durationInMs.get(), numFilesDeleted);
}
HoodieRollbackMetadata rollbackMetadata = AvroUtils
.convertRollbackMetadata(startRollbackTime, durationInMs, commits, stats);
.convertRollbackMetadata(startRollbackTime, durationInMs, commitsToRollback, stats);
table.getActiveTimeline().saveAsComplete(
new HoodieInstant(true, HoodieTimeline.ROLLBACK_ACTION, startRollbackTime),
AvroUtils.serializeRollbackMetadata(rollbackMetadata));
logger.info("Commits " + commits + " rollback is complete");
logger.info("Commits " + commitsToRollback + " rollback is complete");
if (!table.getActiveTimeline().getCleanerTimeline().empty()) {
logger.info("Cleaning up older rollback meta files");
@@ -810,7 +861,7 @@ public class HoodieWriteClient<T extends HoodieRecordPayload> implements Seriali
}
} catch (IOException e) {
throw new HoodieRollbackException(
"Failed to rollback " + config.getBasePath() + " commits " + commits, e);
"Failed to rollback " + config.getBasePath() + " commits " + commitsToRollback, e);
}
}
@@ -890,6 +941,10 @@ public class HoodieWriteClient<T extends HoodieRecordPayload> implements Seriali
}
public void startCommitWithTime(String commitTime) {
if (rollbackInFlight) {
// Only rollback inflight commit/delta-commits. Do not touch compaction commits
rollbackInflightCommits();
}
logger.info("Generate a new commit time " + commitTime);
HoodieTable<T> table = HoodieTable.getHoodieTable(
new HoodieTableMetaClient(jsc.hadoopConfiguration(), config.getBasePath(), true), config, jsc);
@@ -1061,6 +1116,7 @@ public class HoodieWriteClient<T extends HoodieRecordPayload> implements Seriali
}
private HoodieTable getTableAndInitCtx() {
// Create a Hoodie table which encapsulated the commits and files visible
// Create a Hoodie table which encapsulated the commits and files visible
HoodieTable table = HoodieTable.getHoodieTable(
new HoodieTableMetaClient(jsc.hadoopConfiguration(), config.getBasePath(), true), config, jsc);

View File

@@ -16,6 +16,7 @@
package com.uber.hoodie.io;
import com.uber.hoodie.common.model.CompactionOperation;
import com.uber.hoodie.common.model.FileSlice;
import com.uber.hoodie.common.model.HoodieCleaningPolicy;
import com.uber.hoodie.common.model.HoodieDataFile;
@@ -25,14 +26,17 @@ import com.uber.hoodie.common.model.HoodieTableType;
import com.uber.hoodie.common.table.HoodieTimeline;
import com.uber.hoodie.common.table.TableFileSystemView;
import com.uber.hoodie.common.table.timeline.HoodieInstant;
import com.uber.hoodie.common.table.view.HoodieTableFileSystemView;
import com.uber.hoodie.config.HoodieWriteConfig;
import com.uber.hoodie.table.HoodieTable;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.stream.Collectors;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
@@ -48,6 +52,7 @@ public class HoodieCleanHelper<T extends HoodieRecordPayload<T>> {
private final TableFileSystemView fileSystemView;
private final HoodieTimeline commitTimeline;
private final Map<String, CompactionOperation> fileIdToPendingCompactionOperations;
private HoodieTable<T> hoodieTable;
private HoodieWriteConfig config;
@@ -56,9 +61,12 @@ public class HoodieCleanHelper<T extends HoodieRecordPayload<T>> {
this.fileSystemView = hoodieTable.getCompletedFileSystemView();
this.commitTimeline = hoodieTable.getCompletedCommitTimeline();
this.config = config;
this.fileIdToPendingCompactionOperations =
((HoodieTableFileSystemView)hoodieTable.getRTFileSystemView()).getFileIdToPendingCompaction().entrySet()
.stream().map(entry -> Pair.of(entry.getKey(), entry.getValue().getValue()))
.collect(Collectors.toMap(Pair::getKey, Pair::getValue));
}
/**
* Selects the older versions of files for cleaning, such that it bounds the number of versions of
* each file. This policy is useful, if you are simply interested in querying the table, and you
@@ -81,8 +89,8 @@ public class HoodieCleanHelper<T extends HoodieRecordPayload<T>> {
while (fileSliceIterator.hasNext() && keepVersions > 0) {
// Skip this most recent version
FileSlice nextSlice = fileSliceIterator.next();
HoodieDataFile dataFile = nextSlice.getDataFile().get();
if (savepointedFiles.contains(dataFile.getFileName())) {
Optional<HoodieDataFile> dataFile = nextSlice.getDataFile();
if (dataFile.isPresent() && savepointedFiles.contains(dataFile.get().getFileName())) {
// do not clean up a savepoint data file
continue;
}
@@ -91,12 +99,16 @@ public class HoodieCleanHelper<T extends HoodieRecordPayload<T>> {
// Delete the remaining files
while (fileSliceIterator.hasNext()) {
FileSlice nextSlice = fileSliceIterator.next();
HoodieDataFile dataFile = nextSlice.getDataFile().get();
deletePaths.add(dataFile.getFileStatus().getPath().toString());
if (hoodieTable.getMetaClient().getTableType() == HoodieTableType.MERGE_ON_READ) {
// If merge on read, then clean the log files for the commits as well
deletePaths.addAll(nextSlice.getLogFiles().map(file -> file.getPath().toString())
.collect(Collectors.toList()));
if (!isFileSliceNeededForPendingCompaction(nextSlice)) {
if (nextSlice.getDataFile().isPresent()) {
HoodieDataFile dataFile = nextSlice.getDataFile().get();
deletePaths.add(dataFile.getFileStatus().getPath().toString());
}
if (hoodieTable.getMetaClient().getTableType() == HoodieTableType.MERGE_ON_READ) {
// If merge on read, then clean the log files for the commits as well
deletePaths.addAll(nextSlice.getLogFiles().map(file -> file.getPath().toString())
.collect(Collectors.toList()));
}
}
}
}
@@ -133,17 +145,21 @@ public class HoodieCleanHelper<T extends HoodieRecordPayload<T>> {
.collect(Collectors.toList());
for (HoodieFileGroup fileGroup : fileGroups) {
List<FileSlice> fileSliceList = fileGroup.getAllFileSlices().collect(Collectors.toList());
HoodieDataFile dataFile = fileSliceList.get(0).getDataFile().get();
String lastVersion = dataFile.getCommitTime();
if (fileSliceList.isEmpty()) {
continue;
}
String lastVersion = fileSliceList.get(0).getBaseInstantTime();
String lastVersionBeforeEarliestCommitToRetain = getLatestVersionBeforeCommit(fileSliceList,
earliestCommitToRetain);
// Ensure there are more than 1 version of the file (we only clean old files from updates)
// i.e always spare the last commit.
for (FileSlice aSlice : fileSliceList) {
HoodieDataFile aFile = aSlice.getDataFile().get();
String fileCommitTime = aFile.getCommitTime();
if (savepointedFiles.contains(aFile.getFileName())) {
Optional<HoodieDataFile> aFile = aSlice.getDataFile();
String fileCommitTime = aSlice.getBaseInstantTime();
if (aFile.isPresent() && savepointedFiles.contains(aFile.get().getFileName())) {
// do not clean up a savepoint data file
continue;
}
@@ -159,11 +175,14 @@ public class HoodieCleanHelper<T extends HoodieRecordPayload<T>> {
}
// Always keep the last commit
if (HoodieTimeline
if (!isFileSliceNeededForPendingCompaction(aSlice)
&& HoodieTimeline
.compareTimestamps(earliestCommitToRetain.getTimestamp(), fileCommitTime,
HoodieTimeline.GREATER)) {
// this is a commit, that should be cleaned.
deletePaths.add(aFile.getFileStatus().getPath().toString());
if (aFile.isPresent()) {
deletePaths.add(aFile.get().getFileStatus().getPath().toString());
}
if (hoodieTable.getMetaClient().getTableType() == HoodieTableType.MERGE_ON_READ) {
// If merge on read, then clean the log files for the commits as well
deletePaths.addAll(aSlice.getLogFiles().map(file -> file.getPath().toString())
@@ -183,7 +202,7 @@ public class HoodieCleanHelper<T extends HoodieRecordPayload<T>> {
private String getLatestVersionBeforeCommit(List<FileSlice> fileSliceList,
HoodieInstant commitTime) {
for (FileSlice file : fileSliceList) {
String fileCommitTime = file.getDataFile().get().getCommitTime();
String fileCommitTime = file.getBaseInstantTime();
if (HoodieTimeline
.compareTimestamps(commitTime.getTimestamp(), fileCommitTime, HoodieTimeline.GREATER)) {
// fileList is sorted on the reverse, so the first commit we find <= commitTime is the
@@ -226,4 +245,19 @@ public class HoodieCleanHelper<T extends HoodieRecordPayload<T>> {
}
return earliestCommitToRetain;
}
}
/**
* Determine if file slice needed to be preserved for pending compaction
* @param fileSlice File Slice
* @return true if file slice needs to be preserved, false otherwise.
*/
private boolean isFileSliceNeededForPendingCompaction(FileSlice fileSlice) {
CompactionOperation op = fileIdToPendingCompactionOperations.get(fileSlice.getFileId());
if (null != op) {
// If file slice's instant time is newer or same as that of operation, do not clean
return HoodieTimeline.compareTimestamps(fileSlice.getBaseInstantTime(), op.getBaseInstantTime(),
HoodieTimeline.GREATER_OR_EQUAL);
}
return false;
}
}

View File

@@ -16,6 +16,10 @@
package com.uber.hoodie.io;
import static com.uber.hoodie.common.table.HoodieTimeline.COMMIT_ACTION;
import static com.uber.hoodie.common.table.HoodieTimeline.DELTA_COMMIT_ACTION;
import static com.uber.hoodie.common.table.HoodieTimeline.LESSER_OR_EQUAL;
import com.fasterxml.jackson.databind.DeserializationFeature;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.Maps;
@@ -32,6 +36,7 @@ import com.uber.hoodie.common.table.HoodieTimeline;
import com.uber.hoodie.common.table.log.HoodieLogFormat;
import com.uber.hoodie.common.table.log.block.HoodieAvroDataBlock;
import com.uber.hoodie.common.table.log.block.HoodieLogBlock;
import com.uber.hoodie.common.table.timeline.HoodieActiveTimeline;
import com.uber.hoodie.common.table.timeline.HoodieArchivedTimeline;
import com.uber.hoodie.common.table.timeline.HoodieInstant;
import com.uber.hoodie.common.util.AvroUtils;
@@ -42,6 +47,7 @@ import com.uber.hoodie.exception.HoodieIOException;
import com.uber.hoodie.table.HoodieTable;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
@@ -100,7 +106,7 @@ public class HoodieCommitArchiveLog {
/**
* Check if commits need to be archived. If yes, archive commits.
*/
public boolean archiveIfRequired(final JavaSparkContext jsc) {
public boolean archiveIfRequired(final JavaSparkContext jsc) throws IOException {
try {
List<HoodieInstant> instantsToArchive = getInstantsToArchive(jsc).collect(Collectors.toList());
boolean success = true;
@@ -144,23 +150,34 @@ public class HoodieCommitArchiveLog {
//TODO (na) : Add a way to return actions associated with a timeline and then merge/unify
// with logic above to avoid Stream.concats
HoodieTimeline commitTimeline = table.getCompletedCommitTimeline();
Optional<HoodieInstant> oldestPendingCompactionInstant =
table.getActiveTimeline().filterPendingCompactionTimeline().firstInstant();
// We cannot have any holes in the commit timeline. We cannot archive any commits which are
// made after the first savepoint present.
Optional<HoodieInstant> firstSavepoint = table.getCompletedSavepointTimeline().firstInstant();
if (!commitTimeline.empty() && commitTimeline.countInstants() > maxCommitsToKeep) {
// Actually do the commits
instants = Stream.concat(instants, commitTimeline.getInstants().filter(s -> {
// if no savepoint present, then dont filter
return !(firstSavepoint.isPresent() && HoodieTimeline
.compareTimestamps(firstSavepoint.get().getTimestamp(), s.getTimestamp(),
HoodieTimeline.LESSER_OR_EQUAL));
}).limit(commitTimeline.countInstants() - minCommitsToKeep));
instants = Stream.concat(instants, commitTimeline.getInstants()
.filter(s -> {
// if no savepoint present, then dont filter
return !(firstSavepoint.isPresent() && HoodieTimeline
.compareTimestamps(firstSavepoint.get().getTimestamp(), s.getTimestamp(),
HoodieTimeline.LESSER_OR_EQUAL));
})
.filter(s -> {
// Ensure commits >= oldest pending compaction commit is retained
return oldestPendingCompactionInstant.map(instant -> {
return HoodieTimeline.compareTimestamps(instant.getTimestamp(), s.getTimestamp(), HoodieTimeline.GREATER);
}).orElse(true);
})
.limit(commitTimeline.countInstants() - minCommitsToKeep));
}
return instants;
}
private boolean deleteArchivedInstants(List<HoodieInstant> archivedInstants) {
private boolean deleteArchivedInstants(List<HoodieInstant> archivedInstants) throws IOException {
log.info("Deleting instants " + archivedInstants);
boolean success = true;
for (HoodieInstant archivedInstant : archivedInstants) {
@@ -174,6 +191,48 @@ public class HoodieCommitArchiveLog {
throw new HoodieIOException("Failed to delete archived instant " + archivedInstant, e);
}
}
// Remove older meta-data from auxiliary path too
Optional<HoodieInstant> latestCommitted =
archivedInstants.stream()
.filter(i -> {
return i.isCompleted()
&& (i.getAction().equals(COMMIT_ACTION) || (i.getAction().equals(DELTA_COMMIT_ACTION)));
})
.sorted(Comparator.comparing(HoodieInstant::getTimestamp).reversed()).findFirst();
if (latestCommitted.isPresent()) {
success &= deleteAllInstantsOlderorEqualsInAuxMetaFolder(latestCommitted.get());
}
return success;
}
/**
* Remove older instants from auxiliary meta folder
*
* @param thresholdInstant Hoodie Instant
* @return success if all eligible file deleted successfully
* @throws IOException in case of error
*/
private boolean deleteAllInstantsOlderorEqualsInAuxMetaFolder(HoodieInstant thresholdInstant)
throws IOException {
List<HoodieInstant> instants =
HoodieTableMetaClient.scanHoodieInstantsFromFileSystem(metaClient.getFs(),
new Path(metaClient.getMetaAuxiliaryPath()),
HoodieActiveTimeline.VALID_EXTENSIONS_IN_ACTIVE_TIMELINE);
List<HoodieInstant> instantsToBeDeleted =
instants.stream().filter(instant1 -> HoodieTimeline.compareTimestamps(instant1.getTimestamp(),
thresholdInstant.getTimestamp(), LESSER_OR_EQUAL)).collect(Collectors.toList());
boolean success = true;
for (HoodieInstant deleteInstant : instantsToBeDeleted) {
log.info("Deleting instant " + deleteInstant + " in auxiliary meta path " + metaClient.getMetaAuxiliaryPath());
Path metaFile = new Path(metaClient.getMetaAuxiliaryPath(), deleteInstant.getFileName());
if (metaClient.getFs().exists(metaFile)) {
success &= metaClient.getFs().delete(metaFile, false);
log.info("Deleted instant file in auxiliary metapath : " + metaFile);
}
}
return success;
}
@@ -212,7 +271,7 @@ public class HoodieCommitArchiveLog {
archivedMetaWrapper.setActionType(ActionType.clean.name());
break;
}
case HoodieTimeline.COMMIT_ACTION: {
case COMMIT_ACTION: {
HoodieCommitMetadata commitMetadata = HoodieCommitMetadata
.fromBytes(commitTimeline.getInstantDetails(hoodieInstant).get());
archivedMetaWrapper.setHoodieCommitMetadata(commitMetadataConverter(commitMetadata));

View File

@@ -19,12 +19,16 @@ package com.uber.hoodie;
import static com.uber.hoodie.common.HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH;
import static com.uber.hoodie.common.HoodieTestDataGenerator.DEFAULT_SECOND_PARTITION_PATH;
import static com.uber.hoodie.common.HoodieTestDataGenerator.DEFAULT_THIRD_PARTITION_PATH;
import static com.uber.hoodie.common.table.HoodieTimeline.COMPACTION_ACTION;
import static com.uber.hoodie.common.table.HoodieTimeline.GREATER;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import com.google.common.collect.Iterables;
import com.uber.hoodie.avro.model.HoodieCompactionPlan;
import com.uber.hoodie.common.HoodieCleanStat;
import com.uber.hoodie.common.model.FileSlice;
import com.uber.hoodie.common.model.HoodieCleaningPolicy;
import com.uber.hoodie.common.model.HoodieCommitMetadata;
import com.uber.hoodie.common.model.HoodieDataFile;
@@ -38,12 +42,16 @@ import com.uber.hoodie.common.table.HoodieTimeline;
import com.uber.hoodie.common.table.TableFileSystemView;
import com.uber.hoodie.common.table.timeline.HoodieActiveTimeline;
import com.uber.hoodie.common.table.timeline.HoodieInstant;
import com.uber.hoodie.common.table.timeline.HoodieInstant.State;
import com.uber.hoodie.common.util.AvroUtils;
import com.uber.hoodie.common.util.CompactionUtils;
import com.uber.hoodie.common.util.FSUtils;
import com.uber.hoodie.config.HoodieCompactionConfig;
import com.uber.hoodie.config.HoodieWriteConfig;
import com.uber.hoodie.index.HoodieIndex;
import com.uber.hoodie.table.HoodieTable;
import java.io.IOException;
import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
@@ -53,7 +61,10 @@ import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.TreeSet;
import java.util.function.Predicate;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.hadoop.fs.Path;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
@@ -61,6 +72,7 @@ import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.scheduler.SparkListener;
import org.apache.spark.scheduler.SparkListenerTaskEnd;
import org.apache.spark.util.AccumulatorV2;
import org.junit.Assert;
import org.junit.Test;
import scala.Option;
import scala.collection.Iterator;
@@ -82,7 +94,7 @@ public class TestCleaner extends TestHoodieClientBase {
* @param insertFn Insertion API for testing
* @throws Exception in case of error
*/
private void insertFirstBigBatchForClientCleanerTest(
private String insertFirstBigBatchForClientCleanerTest(
HoodieWriteConfig cfg,
HoodieWriteClient client,
Function2<List<HoodieRecord>, String, Integer> recordGenFunction,
@@ -118,6 +130,7 @@ public class TestCleaner extends TestHoodieClientBase {
HoodieIndex index = HoodieIndex.createIndex(cfg, jsc);
List<HoodieRecord> taggedRecords = index.tagLocation(jsc.parallelize(records, 1), jsc, table).collect();
checkTaggedRecords(taggedRecords, newCommitTime);
return newCommitTime;
}
/**
@@ -185,21 +198,51 @@ public class TestCleaner extends TestHoodieClientBase {
insertFirstBigBatchForClientCleanerTest(cfg, client, recordInsertGenWrappedFunction, insertFn);
Map<String, String> selectedFileIdForCompaction = new HashMap<>();
Map<String, FileSlice> compactionFileIdToLatestFileSlice = new HashMap<>();
HoodieTableMetaClient metadata = new HoodieTableMetaClient(jsc.hadoopConfiguration(), basePath);
HoodieTable table = HoodieTable.getHoodieTable(metadata, getConfig(), jsc);
for (String partitionPath : dataGen.getPartitionPaths()) {
TableFileSystemView fsView = table.getFileSystemView();
Optional<Boolean> added = fsView.getAllFileGroups(partitionPath).findFirst()
.map(fg -> {
selectedFileIdForCompaction.put(fg.getId(), partitionPath);
fg.getLatestFileSlice().map(fs -> compactionFileIdToLatestFileSlice.put(fg.getId(), fs));
return true;
});
if (added.isPresent()) {
// Select only one file-group for compaction
break;
}
}
// Create workload with selected file-slices
List<Pair<String, FileSlice>> partitionFileSlicePairs = compactionFileIdToLatestFileSlice.entrySet().stream()
.map(e -> Pair.of(selectedFileIdForCompaction.get(e.getKey()), e.getValue())).collect(Collectors.toList());
HoodieCompactionPlan compactionPlan =
CompactionUtils.buildFromFileSlices(partitionFileSlicePairs, Optional.empty(), Optional.empty());
List<String> instantTimes = HoodieTestUtils.monotonicIncreasingCommitTimestamps(9, 1);
String compactionTime = instantTimes.get(0);
table.getActiveTimeline().saveToCompactionRequested(
new HoodieInstant(State.REQUESTED, COMPACTION_ACTION, compactionTime),
AvroUtils.serializeCompactionPlan(compactionPlan));
instantTimes = instantTimes.subList(1, instantTimes.size());
// Keep doing some writes and clean inline. Make sure we have expected number of files
// remaining.
HoodieTestUtils.monotonicIncreasingCommitTimestamps(8, 1).stream().forEach(newCommitTime -> {
for (String newInstantTime : instantTimes) {
try {
client.startCommitWithTime(newCommitTime);
List<HoodieRecord> records = recordUpsertGenWrappedFunction.apply(newCommitTime, 100);
client.startCommitWithTime(newInstantTime);
List<HoodieRecord> records = recordUpsertGenWrappedFunction.apply(newInstantTime, 100);
List<WriteStatus> statuses =
upsertFn.apply(client, jsc.parallelize(records, 1), newCommitTime).collect();
upsertFn.apply(client, jsc.parallelize(records, 1), newInstantTime).collect();
// Verify there are no errors
assertNoWriteErrors(statuses);
HoodieTableMetaClient metadata = new HoodieTableMetaClient(jsc.hadoopConfiguration(), basePath);
HoodieTable table = HoodieTable.getHoodieTable(metadata, getConfig(), jsc);
HoodieTimeline timeline = metadata.getCommitsTimeline();
metadata = new HoodieTableMetaClient(jsc.hadoopConfiguration(), basePath);
table = HoodieTable.getHoodieTable(metadata, getConfig(), jsc);
HoodieTimeline timeline = table.getMetaClient().getCommitsTimeline();
TableFileSystemView fsView = table.getFileSystemView();
// Need to ensure the following
@@ -221,25 +264,39 @@ public class TestCleaner extends TestHoodieClientBase {
List<HoodieFileGroup> fileGroups = fsView.getAllFileGroups(partitionPath).collect(Collectors.toList());
for (HoodieFileGroup fileGroup : fileGroups) {
// No file has no more than max versions
String fileId = fileGroup.getId();
List<HoodieDataFile> dataFiles = fileGroup.getAllDataFiles().collect(Collectors.toList());
if (selectedFileIdForCompaction.containsKey(fileGroup.getId())) {
// Ensure latest file-slice selected for compaction is retained
String oldestCommitRetained =
fileGroup.getAllDataFiles().map(HoodieDataFile::getCommitTime).sorted().findFirst().get();
assertTrue("fileId " + fileId + " has more than " + maxVersions + " versions",
dataFiles.size() <= maxVersions);
Optional<HoodieDataFile> dataFileForCompactionPresent =
fileGroup.getAllDataFiles().filter(df -> {
return compactionFileIdToLatestFileSlice.get(fileGroup.getId())
.getBaseInstantTime().equals(df.getCommitTime());
}).findAny();
Assert.assertTrue("Data File selected for compaction is retained",
dataFileForCompactionPresent.isPresent());
} else {
// file has no more than max versions
String fileId = fileGroup.getId();
List<HoodieDataFile> dataFiles = fileGroup.getAllDataFiles().collect(Collectors.toList());
// Each file, has the latest N versions (i.e cleaning gets rid of older versions)
List<String> commitedVersions = new ArrayList<>(fileIdToVersions.get(fileId));
for (int i = 0; i < dataFiles.size(); i++) {
assertEquals("File " + fileId + " does not have latest versions on commits" + commitedVersions,
Iterables.get(dataFiles, i).getCommitTime(), commitedVersions.get(commitedVersions.size() - 1 - i));
assertTrue("fileId " + fileId + " has more than " + maxVersions + " versions",
dataFiles.size() <= maxVersions);
// Each file, has the latest N versions (i.e cleaning gets rid of older versions)
List<String> commitedVersions = new ArrayList<>(fileIdToVersions.get(fileId));
for (int i = 0; i < dataFiles.size(); i++) {
assertEquals("File " + fileId + " does not have latest versions on commits" + commitedVersions,
Iterables.get(dataFiles, i).getCommitTime(), commitedVersions.get(commitedVersions.size() - 1 - i));
}
}
}
}
} catch (IOException ioe) {
throw new RuntimeException(ioe);
}
});
}
}
/**
@@ -679,6 +736,168 @@ public class TestCleaner extends TestHoodieClientBase {
stageOneShuffleReadTaskRecordsCountMap.values().stream().filter(a -> a > 10 && a < 100).count() == 3);
}
/**
* Test Keep Latest Commits when there are pending compactions
*/
@Test
public void testKeepLatestCommitsWithPendingCompactions() throws IOException {
HoodieWriteConfig config = HoodieWriteConfig.newBuilder().withPath(basePath).withAssumeDatePartitioning(true)
.withCompactionConfig(HoodieCompactionConfig.newBuilder().withCleanerPolicy(
HoodieCleaningPolicy.KEEP_LATEST_COMMITS).retainCommits(2).build()).build();
// Deletions:
// . FileId Parquet Logs Total Retained Commits
// FileId7 5 10 15 009, 011
// FileId6 5 10 15 009
// FileId5 3 6 9 005
// FileId4 2 4 6 003
// FileId3 1 2 3 001
// FileId2 0 0 0 000
// FileId1 0 0 0 000
testPendingCompactions(config, 48, 18);
}
/**
* Test Keep Latest Versions when there are pending compactions
*/
@Test
public void testKeepLatestVersionsWithPendingCompactions() throws IOException {
HoodieWriteConfig config = HoodieWriteConfig.newBuilder().withPath(basePath).withAssumeDatePartitioning(true)
.withCompactionConfig(HoodieCompactionConfig.newBuilder().withCleanerPolicy(
HoodieCleaningPolicy.KEEP_LATEST_FILE_VERSIONS).retainFileVersions(2).build()).build();
// Deletions:
// . FileId Parquet Logs Total Retained Commits
// FileId7 5 10 15 009, 011
// FileId6 4 8 12 007, 009
// FileId5 2 4 6 003 005
// FileId4 1 2 3 001, 003
// FileId3 0 0 0 000, 001
// FileId2 0 0 0 000
// FileId1 0 0 0 000
testPendingCompactions(config, 36, 9);
}
/**
* Common test method for validating pending compactions
*
* @param config Hoodie Write Config
* @param expNumFilesDeleted Number of files deleted
*/
public void testPendingCompactions(HoodieWriteConfig config, int expNumFilesDeleted,
int expNumFilesUnderCompactionDeleted) throws IOException {
HoodieTableMetaClient metaClient = HoodieTestUtils.initTableType(jsc.hadoopConfiguration(), basePath,
HoodieTableType.MERGE_ON_READ);
String[] instants = new String[]{"000", "001", "003", "005", "007", "009", "011", "013"};
String[] compactionInstants = new String[]{"002", "004", "006", "008", "010"};
Map<String, String> expFileIdToPendingCompaction = new HashMap<>();
Map<String, String> fileIdToLatestInstantBeforeCompaction = new HashMap<>();
Map<String, List<FileSlice>> compactionInstantsToFileSlices = new HashMap<>();
for (String instant : instants) {
HoodieTestUtils.createCommitFiles(basePath, instant);
}
// Generate 7 file-groups. First one has only one slice and no pending compaction. File Slices (2 - 5) has
// multiple versions with pending compaction. File Slices (6 - 7) have multiple file-slices but not under
// compactions
// FileIds 2-5 will be under compaction
int maxNumFileIds = 7;
String[] fileIds = new String[]
{"fileId1", "fileId2", "fileId3", "fileId4", "fileId5", "fileId6", "fileId7"};
int maxNumFileIdsForCompaction = 4;
for (int i = 0; i < maxNumFileIds; i++) {
final String fileId = HoodieTestUtils.createDataFile(basePath, DEFAULT_FIRST_PARTITION_PATH, instants[0],
fileIds[i]);
HoodieTestUtils.createNewLogFile(fs, basePath, DEFAULT_FIRST_PARTITION_PATH, instants[0],
fileId, Optional.empty());
HoodieTestUtils.createNewLogFile(fs, basePath, DEFAULT_FIRST_PARTITION_PATH, instants[0],
fileId, Optional.of(2));
fileIdToLatestInstantBeforeCompaction.put(fileId, instants[0]);
for (int j = 1; j <= i; j++) {
if (j == i && j <= maxNumFileIdsForCompaction) {
expFileIdToPendingCompaction.put(fileId, compactionInstants[j]);
HoodieTable table = HoodieTable.getHoodieTable(
new HoodieTableMetaClient(jsc.hadoopConfiguration(), config.getBasePath(), true), config,
jsc);
FileSlice slice = table.getRTFileSystemView().getLatestFileSlices(DEFAULT_FIRST_PARTITION_PATH)
.filter(fs -> fs.getFileId().equals(fileId)).findFirst().get();
List<FileSlice> slices = new ArrayList<>();
if (compactionInstantsToFileSlices.containsKey(compactionInstants[j])) {
slices = compactionInstantsToFileSlices.get(compactionInstants[j]);
}
slices.add(slice);
compactionInstantsToFileSlices.put(compactionInstants[j], slices);
// Add log-files to simulate delta-commits after pending compaction
HoodieTestUtils.createNewLogFile(fs, basePath, DEFAULT_FIRST_PARTITION_PATH, compactionInstants[j],
fileId, Optional.empty());
HoodieTestUtils.createNewLogFile(fs, basePath, DEFAULT_FIRST_PARTITION_PATH, compactionInstants[j],
fileId, Optional.of(2));
} else {
HoodieTestUtils.createDataFile(basePath, DEFAULT_FIRST_PARTITION_PATH, instants[j], fileId);
HoodieTestUtils.createNewLogFile(fs, basePath, DEFAULT_FIRST_PARTITION_PATH, instants[j], fileId,
Optional.empty());
HoodieTestUtils.createNewLogFile(fs, basePath, DEFAULT_FIRST_PARTITION_PATH, instants[j], fileId,
Optional.of(2));
fileIdToLatestInstantBeforeCompaction.put(fileId, instants[j]);
}
}
}
// Setup pending compaction plans
for (String instant : compactionInstants) {
List<FileSlice> fileSliceList = compactionInstantsToFileSlices.get(instant);
if (null != fileSliceList) {
HoodieTestUtils.createCompactionRequest(metaClient, instant,
fileSliceList.stream().map(fs -> Pair.of(DEFAULT_FIRST_PARTITION_PATH, fs)).collect(Collectors.toList()));
}
}
// Clean now
HoodieTable table = HoodieTable.getHoodieTable(
new HoodieTableMetaClient(jsc.hadoopConfiguration(), config.getBasePath(), true), config,
jsc);
List<HoodieCleanStat> hoodieCleanStats = table.clean(jsc);
// Test for safety
final HoodieTable hoodieTable = HoodieTable.getHoodieTable(
new HoodieTableMetaClient(jsc.hadoopConfiguration(), config.getBasePath(), true), config,
jsc);
expFileIdToPendingCompaction.entrySet().stream().forEach(entry -> {
String fileId = entry.getKey();
String baseInstantForCompaction = fileIdToLatestInstantBeforeCompaction.get(fileId);
Optional<FileSlice> fileSliceForCompaction =
hoodieTable.getRTFileSystemView().getLatestFileSlicesBeforeOrOn(DEFAULT_FIRST_PARTITION_PATH,
baseInstantForCompaction).filter(fs -> fs.getFileId().equals(fileId)).findFirst();
Assert.assertTrue("Base Instant for Compaction must be preserved", fileSliceForCompaction.isPresent());
Assert.assertTrue("FileSlice has data-file", fileSliceForCompaction.get().getDataFile().isPresent());
Assert.assertEquals("FileSlice has log-files", 2,
fileSliceForCompaction.get().getLogFiles().count());
});
// Test for progress (Did we clean some files ?)
long numFilesUnderCompactionDeleted =
hoodieCleanStats.stream().flatMap(cleanStat -> {
return convertPathToFileIdWithCommitTime(metaClient, cleanStat.getDeletePathPatterns()).map(
fileIdWithCommitTime -> {
if (expFileIdToPendingCompaction.containsKey(fileIdWithCommitTime.getKey())) {
Assert.assertTrue("Deleted instant time must be less than pending compaction",
HoodieTimeline.compareTimestamps(
fileIdToLatestInstantBeforeCompaction.get(fileIdWithCommitTime.getKey()),
fileIdWithCommitTime.getValue(), GREATER));
return true;
}
return false;
});
}).filter(x -> x).count();
long numDeleted = hoodieCleanStats.stream()
.flatMap(cleanStat -> cleanStat.getDeletePathPatterns().stream()).count();
// Tighter check for regression
Assert.assertEquals("Correct number of files deleted", expNumFilesDeleted, numDeleted);
Assert.assertEquals("Correct number of files under compaction deleted",
expNumFilesUnderCompactionDeleted, numFilesUnderCompactionDeleted);
}
/**
* Utility method to create temporary data files
*
@@ -703,4 +922,23 @@ public class TestCleaner extends TestHoodieClientBase {
private int getTotalTempFiles() throws IOException {
return fs.listStatus(new Path(basePath, HoodieTableMetaClient.TEMPFOLDER_NAME)).length;
}
private Stream<Pair<String, String>> convertPathToFileIdWithCommitTime(
final HoodieTableMetaClient metaClient, List<String> paths) {
Predicate<String> roFilePredicate = path ->
path.contains(metaClient.getTableConfig().getROFileFormat().getFileExtension());
Predicate<String> rtFilePredicate = path ->
path.contains(metaClient.getTableConfig().getRTFileFormat().getFileExtension());
Stream<Pair<String, String>> stream1 = paths.stream().filter(roFilePredicate)
.map(fullPath -> {
String fileName = Paths.get(fullPath).getFileName().toString();
return Pair.of(FSUtils.getFileId(fileName), FSUtils.getCommitTime(fileName));
});
Stream<Pair<String, String>> stream2 = paths.stream().filter(rtFilePredicate)
.map(path -> {
return Pair.of(FSUtils.getFileIdFromLogPath(new Path(path)),
FSUtils.getBaseCommitTimeFromLogPath(new Path(path)));
});
return Stream.concat(stream1, stream2);
}
}

View File

@@ -303,7 +303,7 @@ public class TestClientRollback extends TestHoodieClientBase {
&& HoodieTestUtils.doesDataFileExist(basePath, "2016/05/06", commitTime1, file13));
// Turn auto rollback on
new HoodieWriteClient(jsc, config, true);
new HoodieWriteClient(jsc, config, true).startCommit();
assertTrue(HoodieTestUtils.doesCommitExist(basePath, commitTime1));
assertFalse(HoodieTestUtils.doesInflightExist(basePath, commitTime2));
assertFalse(HoodieTestUtils.doesInflightExist(basePath, commitTime3));

View File

@@ -138,6 +138,15 @@ public class HoodieTestDataGenerator {
}
}
public static void createCompactionRequestedFile(String basePath, String commitTime, Configuration configuration)
throws IOException {
Path commitFile = new Path(basePath + "/" + HoodieTableMetaClient.METAFOLDER_NAME + "/"
+ HoodieTimeline.makeRequestedCompactionFileName(commitTime));
FileSystem fs = FSUtils.getFs(basePath, configuration);
FSDataOutputStream os = fs.create(commitFile, true);
os.close();
}
public static void createCompactionAuxiliaryMetadata(String basePath, HoodieInstant instant,
Configuration configuration) throws IOException {
Path commitFile = new Path(

View File

@@ -16,7 +16,9 @@
package com.uber.hoodie.io;
import static com.uber.hoodie.common.table.HoodieTimeline.COMPACTION_ACTION;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import com.google.common.collect.Sets;
@@ -30,7 +32,9 @@ import com.uber.hoodie.common.table.HoodieTableMetaClient;
import com.uber.hoodie.common.table.HoodieTimeline;
import com.uber.hoodie.common.table.log.HoodieLogFormat;
import com.uber.hoodie.common.table.log.block.HoodieAvroDataBlock;
import com.uber.hoodie.common.table.timeline.HoodieActiveTimeline;
import com.uber.hoodie.common.table.timeline.HoodieInstant;
import com.uber.hoodie.common.table.timeline.HoodieInstant.State;
import com.uber.hoodie.config.HoodieCompactionConfig;
import com.uber.hoodie.config.HoodieWriteConfig;
import java.io.File;
@@ -133,11 +137,47 @@ public class TestHoodieCommitArchiveLog {
HoodieCompactionConfig.newBuilder().archiveCommitsWith(2, 4).build())
.forTable("test-trip-table").build();
HoodieTestUtils.init(hadoopConf, basePath);
// Requested Compaction
HoodieTestDataGenerator.createCompactionAuxiliaryMetadata(basePath,
new HoodieInstant(State.REQUESTED, COMPACTION_ACTION, "100"), dfs.getConf());
// Inflight Compaction
HoodieTestDataGenerator.createCompactionAuxiliaryMetadata(basePath,
new HoodieInstant(State.INFLIGHT, COMPACTION_ACTION, "100"), dfs.getConf());
HoodieTestDataGenerator.createCommitFile(basePath, "100", dfs.getConf());
// Requested Compaction
HoodieTestDataGenerator.createCompactionAuxiliaryMetadata(basePath,
new HoodieInstant(State.REQUESTED, COMPACTION_ACTION, "101"), dfs.getConf());
// Inflight Compaction
HoodieTestDataGenerator.createCompactionAuxiliaryMetadata(basePath,
new HoodieInstant(State.INFLIGHT, COMPACTION_ACTION, "101"), dfs.getConf());
HoodieTestDataGenerator.createCommitFile(basePath, "101", dfs.getConf());
// Requested Compaction
HoodieTestDataGenerator.createCompactionAuxiliaryMetadata(basePath,
new HoodieInstant(State.REQUESTED, COMPACTION_ACTION, "102"), dfs.getConf());
// Inflight Compaction
HoodieTestDataGenerator.createCompactionAuxiliaryMetadata(basePath,
new HoodieInstant(State.INFLIGHT, COMPACTION_ACTION, "102"), dfs.getConf());
HoodieTestDataGenerator.createCommitFile(basePath, "102", dfs.getConf());
// Requested Compaction
HoodieTestDataGenerator.createCompactionAuxiliaryMetadata(basePath,
new HoodieInstant(State.REQUESTED, COMPACTION_ACTION, "103"), dfs.getConf());
// Inflight Compaction
HoodieTestDataGenerator.createCompactionAuxiliaryMetadata(basePath,
new HoodieInstant(State.INFLIGHT, COMPACTION_ACTION, "103"), dfs.getConf());
HoodieTestDataGenerator.createCommitFile(basePath, "103", dfs.getConf());
// Requested Compaction
HoodieTestDataGenerator.createCompactionAuxiliaryMetadata(basePath,
new HoodieInstant(State.REQUESTED, COMPACTION_ACTION, "104"), dfs.getConf());
// Inflight Compaction
HoodieTestDataGenerator.createCompactionAuxiliaryMetadata(basePath,
new HoodieInstant(State.INFLIGHT, COMPACTION_ACTION, "104"), dfs.getConf());
HoodieTestDataGenerator.createCommitFile(basePath, "104", dfs.getConf());
// Requested Compaction
HoodieTestDataGenerator.createCompactionAuxiliaryMetadata(basePath,
new HoodieInstant(State.REQUESTED, COMPACTION_ACTION, "105"), dfs.getConf());
// Inflight Compaction
HoodieTestDataGenerator.createCompactionAuxiliaryMetadata(basePath,
new HoodieInstant(State.INFLIGHT, COMPACTION_ACTION, "105"), dfs.getConf());
HoodieTestDataGenerator.createCommitFile(basePath, "105", dfs.getConf());
HoodieTableMetaClient metaClient = new HoodieTableMetaClient(dfs.getConf(), basePath);
@@ -172,6 +212,37 @@ public class TestHoodieCommitArchiveLog {
timeline = metaClient.getActiveTimeline().reload().getAllCommitsTimeline().filterCompletedInstants();
originalCommits.removeAll(timeline.getInstants().collect(Collectors.toList()));
// Check compaction instants
List<HoodieInstant> instants =
HoodieTableMetaClient.scanHoodieInstantsFromFileSystem(metaClient.getFs(),
new Path(metaClient.getMetaAuxiliaryPath()),
HoodieActiveTimeline.VALID_EXTENSIONS_IN_ACTIVE_TIMELINE);
assertEquals("Should delete all compaction instants < 104", 4, instants.size());
assertFalse("Requested Compaction must be absent for 100", instants.contains(
new HoodieInstant(State.REQUESTED, COMPACTION_ACTION, "100")));
assertFalse("Inflight Compaction must be absent for 100", instants.contains(
new HoodieInstant(State.INFLIGHT, COMPACTION_ACTION, "100")));
assertFalse("Requested Compaction must be absent for 101", instants.contains(
new HoodieInstant(State.REQUESTED, COMPACTION_ACTION, "101")));
assertFalse("Inflight Compaction must be absent for 101", instants.contains(
new HoodieInstant(State.INFLIGHT, COMPACTION_ACTION, "101")));
assertFalse("Requested Compaction must be absent for 102", instants.contains(
new HoodieInstant(State.REQUESTED, COMPACTION_ACTION, "102")));
assertFalse("Inflight Compaction must be absent for 102", instants.contains(
new HoodieInstant(State.INFLIGHT, COMPACTION_ACTION, "102")));
assertFalse("Requested Compaction must be absent for 103", instants.contains(
new HoodieInstant(State.REQUESTED, COMPACTION_ACTION, "103")));
assertFalse("Inflight Compaction must be absent for 103", instants.contains(
new HoodieInstant(State.INFLIGHT, COMPACTION_ACTION, "103")));
assertTrue("Requested Compaction must be present for 104", instants.contains(
new HoodieInstant(State.REQUESTED, COMPACTION_ACTION, "104")));
assertTrue("Inflight Compaction must be present for 104", instants.contains(
new HoodieInstant(State.INFLIGHT, COMPACTION_ACTION, "104")));
assertTrue("Requested Compaction must be present for 105", instants.contains(
new HoodieInstant(State.REQUESTED, COMPACTION_ACTION, "105")));
assertTrue("Inflight Compaction must be present for 105", instants.contains(
new HoodieInstant(State.INFLIGHT, COMPACTION_ACTION, "105")));
//read the file
HoodieLogFormat.Reader reader = HoodieLogFormat.newReader(dfs,
new HoodieLogFile(new Path(basePath + "/.hoodie/.commits_.archive.1")),
@@ -210,9 +281,33 @@ public class TestHoodieCommitArchiveLog {
HoodieCompactionConfig.newBuilder().archiveCommitsWith(2, 5).build()).build();
HoodieTableMetaClient metaClient = new HoodieTableMetaClient(dfs.getConf(), basePath);
HoodieCommitArchiveLog archiveLog = new HoodieCommitArchiveLog(cfg, metaClient);
// Requested Compaction
HoodieTestDataGenerator.createCompactionAuxiliaryMetadata(basePath,
new HoodieInstant(State.REQUESTED, COMPACTION_ACTION, "100"), dfs.getConf());
// Inflight Compaction
HoodieTestDataGenerator.createCompactionAuxiliaryMetadata(basePath,
new HoodieInstant(State.INFLIGHT, COMPACTION_ACTION, "100"), dfs.getConf());
HoodieTestDataGenerator.createCommitFile(basePath, "100", dfs.getConf());
// Requested Compaction
HoodieTestDataGenerator.createCompactionAuxiliaryMetadata(basePath,
new HoodieInstant(State.REQUESTED, COMPACTION_ACTION, "101"), dfs.getConf());
// Inflight Compaction
HoodieTestDataGenerator.createCompactionAuxiliaryMetadata(basePath,
new HoodieInstant(State.INFLIGHT, COMPACTION_ACTION, "101"), dfs.getConf());
HoodieTestDataGenerator.createCommitFile(basePath, "101", dfs.getConf());
// Requested Compaction
HoodieTestDataGenerator.createCompactionAuxiliaryMetadata(basePath,
new HoodieInstant(State.REQUESTED, COMPACTION_ACTION, "102"), dfs.getConf());
// Inflight Compaction
HoodieTestDataGenerator.createCompactionAuxiliaryMetadata(basePath,
new HoodieInstant(State.INFLIGHT, COMPACTION_ACTION, "102"), dfs.getConf());
HoodieTestDataGenerator.createCommitFile(basePath, "102", dfs.getConf());
// Requested Compaction
HoodieTestDataGenerator.createCompactionAuxiliaryMetadata(basePath,
new HoodieInstant(State.REQUESTED, COMPACTION_ACTION, "103"), dfs.getConf());
// Inflight Compaction
HoodieTestDataGenerator.createCompactionAuxiliaryMetadata(basePath,
new HoodieInstant(State.INFLIGHT, COMPACTION_ACTION, "103"), dfs.getConf());
HoodieTestDataGenerator.createCommitFile(basePath, "103", dfs.getConf());
HoodieTimeline timeline = metaClient.getActiveTimeline().getCommitsTimeline().filterCompletedInstants();
@@ -221,6 +316,28 @@ public class TestHoodieCommitArchiveLog {
assertTrue(result);
timeline = metaClient.getActiveTimeline().reload().getCommitsTimeline().filterCompletedInstants();
assertEquals("Should not archive commits when maxCommitsToKeep is 5", 4, timeline.countInstants());
List<HoodieInstant> instants =
HoodieTableMetaClient.scanHoodieInstantsFromFileSystem(metaClient.getFs(),
new Path(metaClient.getMetaAuxiliaryPath()),
HoodieActiveTimeline.VALID_EXTENSIONS_IN_ACTIVE_TIMELINE);
assertEquals("Should not delete any aux compaction files when maxCommitsToKeep is 5", 8, instants.size());
assertTrue("Requested Compaction must be present for 100", instants.contains(
new HoodieInstant(State.REQUESTED, COMPACTION_ACTION, "100")));
assertTrue("Inflight Compaction must be present for 100", instants.contains(
new HoodieInstant(State.INFLIGHT, COMPACTION_ACTION, "100")));
assertTrue("Requested Compaction must be present for 101", instants.contains(
new HoodieInstant(State.REQUESTED, COMPACTION_ACTION, "101")));
assertTrue("Inflight Compaction must be present for 101", instants.contains(
new HoodieInstant(State.INFLIGHT, COMPACTION_ACTION, "101")));
assertTrue("Requested Compaction must be present for 102", instants.contains(
new HoodieInstant(State.REQUESTED, COMPACTION_ACTION, "102")));
assertTrue("Inflight Compaction must be present for 102", instants.contains(
new HoodieInstant(State.INFLIGHT, COMPACTION_ACTION, "102")));
assertTrue("Requested Compaction must be present for 103", instants.contains(
new HoodieInstant(State.REQUESTED, COMPACTION_ACTION, "103")));
assertTrue("Inflight Compaction must be present for 103", instants.contains(
new HoodieInstant(State.INFLIGHT, COMPACTION_ACTION, "103")));
}
@Test
@@ -281,6 +398,53 @@ public class TestHoodieCommitArchiveLog {
timeline.containsInstant(new HoodieInstant(false, HoodieTimeline.COMMIT_ACTION, "103")));
}
@Test
public void testArchiveCommitCompactionNoHole() throws IOException {
HoodieWriteConfig cfg = HoodieWriteConfig.newBuilder().withPath(basePath)
.withSchema(HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA).withParallelism(2, 2)
.forTable("test-trip-table").withCompactionConfig(
HoodieCompactionConfig.newBuilder().archiveCommitsWith(2, 5).build()).build();
HoodieTableMetaClient metaClient = new HoodieTableMetaClient(dfs.getConf(), basePath);
HoodieCommitArchiveLog archiveLog = new HoodieCommitArchiveLog(cfg, metaClient);
HoodieTestDataGenerator.createCommitFile(basePath, "100", dfs.getConf());
HoodieTestDataGenerator.createCompactionRequestedFile(basePath, "101", dfs.getConf());
HoodieTestDataGenerator.createCompactionAuxiliaryMetadata(basePath,
new HoodieInstant(State.REQUESTED, COMPACTION_ACTION, "101"), dfs.getConf());
HoodieTestDataGenerator.createCommitFile(basePath, "102", dfs.getConf());
HoodieTestDataGenerator.createCommitFile(basePath, "103", dfs.getConf());
HoodieTestDataGenerator.createCompactionRequestedFile(basePath, "104", dfs.getConf());
HoodieTestDataGenerator.createCompactionAuxiliaryMetadata(basePath,
new HoodieInstant(State.REQUESTED, COMPACTION_ACTION, "104"), dfs.getConf());
HoodieTestDataGenerator.createCommitFile(basePath, "105", dfs.getConf());
HoodieTestDataGenerator.createCommitFile(basePath, "106", dfs.getConf());
HoodieTestDataGenerator.createCommitFile(basePath, "107", dfs.getConf());
HoodieTimeline timeline = metaClient.getActiveTimeline().getCommitsAndCompactionTimeline();
assertEquals("Loaded 6 commits and the count should match", 8, timeline.countInstants());
boolean result = archiveLog.archiveIfRequired(jsc);
assertTrue(result);
timeline = metaClient.getActiveTimeline().reload().getCommitsAndCompactionTimeline();
assertFalse("Instants before oldest pending compaction can be removed",
timeline.containsInstant(new HoodieInstant(false, HoodieTimeline.COMMIT_ACTION, "100")));
assertEquals(
"Since we have a pending compaction at 101, we should never archive any commit "
+ "after 101 (we only " + "archive 100)", 7, timeline.countInstants());
assertTrue("Requested Compaction must still be present",
timeline.containsInstant(new HoodieInstant(State.REQUESTED, HoodieTimeline.COMPACTION_ACTION, "101")));
assertTrue("Instants greater than oldest pending compaction must be present",
timeline.containsInstant(new HoodieInstant(false, HoodieTimeline.COMMIT_ACTION, "102")));
assertTrue("Instants greater than oldest pending compaction must be present",
timeline.containsInstant(new HoodieInstant(false, HoodieTimeline.COMMIT_ACTION, "103")));
assertTrue("Instants greater than oldest pending compaction must be present",
timeline.containsInstant(new HoodieInstant(State.REQUESTED, HoodieTimeline.COMPACTION_ACTION, "104")));
assertTrue("Instants greater than oldest pending compaction must be present",
timeline.containsInstant(new HoodieInstant(false, HoodieTimeline.COMMIT_ACTION, "105")));
assertTrue("Instants greater than oldest pending compaction must be present",
timeline.containsInstant(new HoodieInstant(false, HoodieTimeline.COMMIT_ACTION, "106")));
assertTrue("Instants greater than oldest pending compaction must be present",
timeline.containsInstant(new HoodieInstant(false, HoodieTimeline.COMMIT_ACTION, "107")));
}
private void verifyInflightInstants(HoodieTableMetaClient metaClient, int expectedTotalInstants) {
HoodieTimeline timeline = metaClient.getActiveTimeline().reload()
.getTimelineOfActions(Sets.newHashSet(HoodieTimeline.CLEAN_ACTION)).filterInflights();

View File

@@ -106,7 +106,7 @@ public class CompactionOperation implements Serializable {
op.dataFilePath = Optional.ofNullable(operation.getDataFilePath());
op.deltaFilePaths = new ArrayList<>(operation.getDeltaFilePaths());
op.fileId = operation.getFileId();
op.metrics = new HashMap<>(operation.getMetrics());
op.metrics = operation.getMetrics() == null ? new HashMap<>() : new HashMap<>(operation.getMetrics());
op.partitionPath = operation.getPartitionPath();
return op;
}

View File

@@ -17,6 +17,7 @@
package com.uber.hoodie.common.table.view;
import com.google.common.collect.ImmutableMap;
import com.uber.hoodie.common.model.CompactionOperation;
import com.uber.hoodie.common.model.FileSlice;
import com.uber.hoodie.common.model.HoodieDataFile;
import com.uber.hoodie.common.model.HoodieFileGroup;
@@ -74,7 +75,7 @@ public class HoodieTableFileSystemView implements TableFileSystemView,
/**
* File Id to pending compaction instant time
*/
private final Map<String, String> fileIdToPendingCompactionInstantTime;
private final Map<String, Pair<String, CompactionOperation>> fileIdToPendingCompaction;
/**
* Create a file system view, as of the given timeline
@@ -89,9 +90,10 @@ public class HoodieTableFileSystemView implements TableFileSystemView,
// Build fileId to Pending Compaction Instants
List<HoodieInstant> pendingCompactionInstants =
metaClient.getActiveTimeline().filterPendingCompactionTimeline().getInstants().collect(Collectors.toList());
this.fileIdToPendingCompactionInstantTime = ImmutableMap.copyOf(
this.fileIdToPendingCompaction = ImmutableMap.copyOf(
CompactionUtils.getAllPendingCompactionOperations(metaClient).entrySet().stream().map(entry -> {
return Pair.of(entry.getKey(), entry.getValue().getKey());
return Pair.of(entry.getKey(), Pair.of(entry.getValue().getKey(),
CompactionOperation.convertFromAvroRecordInstance(entry.getValue().getValue())));
}).collect(Collectors.toMap(Pair::getKey, Pair::getValue)));
}
@@ -155,10 +157,10 @@ public class HoodieTableFileSystemView implements TableFileSystemView,
if (logFiles.containsKey(pair)) {
logFiles.get(pair).forEach(logFile -> group.addLogFile(logFile));
}
if (fileIdToPendingCompactionInstantTime.containsKey(fileId)) {
if (fileIdToPendingCompaction.containsKey(fileId)) {
// If there is no delta-commit after compaction request, this step would ensure a new file-slice appears
// so that any new ingestion uses the correct base-instant
group.addNewFileSliceAtInstant(fileIdToPendingCompactionInstantTime.get(fileId));
group.addNewFileSliceAtInstant(fileIdToPendingCompaction.get(fileId).getKey());
}
fileGroups.add(group);
});
@@ -196,8 +198,9 @@ public class HoodieTableFileSystemView implements TableFileSystemView,
* @param dataFile Data File
*/
private boolean isDataFileDueToPendingCompaction(HoodieDataFile dataFile) {
String compactionInstantTime = fileIdToPendingCompactionInstantTime.get(dataFile.getFileId());
if ((null != compactionInstantTime) && dataFile.getCommitTime().equals(compactionInstantTime)) {
Pair<String, CompactionOperation> compactionWithInstantTime = fileIdToPendingCompaction.get(dataFile.getFileId());
if ((null != compactionWithInstantTime) && (null != compactionWithInstantTime.getLeft())
&& dataFile.getCommitTime().equals(compactionWithInstantTime.getKey())) {
return true;
}
return false;
@@ -277,7 +280,7 @@ public class HoodieTableFileSystemView implements TableFileSystemView,
FileSlice fileSlice = fileGroup.getLatestFileSlice().get();
// if the file-group is under compaction, pick the latest before compaction instant time.
if (isFileSliceAfterPendingCompaction(fileSlice)) {
String compactionInstantTime = fileIdToPendingCompactionInstantTime.get(fileSlice.getFileId());
String compactionInstantTime = fileIdToPendingCompaction.get(fileSlice.getFileId()).getLeft();
return fileGroup.getLatestFileSliceBefore(compactionInstantTime);
}
return Optional.of(fileSlice);
@@ -292,8 +295,9 @@ public class HoodieTableFileSystemView implements TableFileSystemView,
* @return
*/
private boolean isFileSliceAfterPendingCompaction(FileSlice fileSlice) {
String compactionInstantTime = fileIdToPendingCompactionInstantTime.get(fileSlice.getFileId());
if ((null != compactionInstantTime) && fileSlice.getBaseInstantTime().equals(compactionInstantTime)) {
Pair<String, CompactionOperation> compactionWithInstantTime = fileIdToPendingCompaction.get(fileSlice.getFileId());
if ((null != compactionWithInstantTime)
&& fileSlice.getBaseInstantTime().equals(compactionWithInstantTime.getKey())) {
return true;
}
return false;
@@ -352,8 +356,8 @@ public class HoodieTableFileSystemView implements TableFileSystemView,
*/
private FileSlice getMergedFileSlice(HoodieFileGroup fileGroup, FileSlice fileSlice) {
// if the file-group is under construction, pick the latest before compaction instant time.
if (fileIdToPendingCompactionInstantTime.containsKey(fileSlice.getFileId())) {
String compactionInstantTime = fileIdToPendingCompactionInstantTime.get(fileSlice.getFileId());
if (fileIdToPendingCompaction.containsKey(fileSlice.getFileId())) {
String compactionInstantTime = fileIdToPendingCompaction.get(fileSlice.getFileId()).getKey();
if (fileSlice.getBaseInstantTime().equals(compactionInstantTime)) {
Optional<FileSlice> prevFileSlice = fileGroup.getLatestFileSliceBefore(compactionInstantTime);
if (prevFileSlice.isPresent()) {
@@ -416,4 +420,8 @@ public class HoodieTableFileSystemView implements TableFileSystemView,
"Failed to list data files in partition " + partitionPathStr, e);
}
}
public Map<String, Pair<String, CompactionOperation>> getFileIdToPendingCompaction() {
return fileIdToPendingCompaction;
}
}

View File

@@ -26,6 +26,7 @@ import com.esotericsoftware.kryo.serializers.JavaSerializer;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.uber.hoodie.avro.model.HoodieCleanMetadata;
import com.uber.hoodie.avro.model.HoodieCompactionPlan;
import com.uber.hoodie.common.HoodieCleanStat;
import com.uber.hoodie.common.model.HoodieWriteStat.RuntimeStats;
import com.uber.hoodie.common.table.HoodieTableConfig;
@@ -36,7 +37,10 @@ import com.uber.hoodie.common.table.log.HoodieLogFormat.Writer;
import com.uber.hoodie.common.table.log.block.HoodieAvroDataBlock;
import com.uber.hoodie.common.table.log.block.HoodieLogBlock;
import com.uber.hoodie.common.table.timeline.HoodieActiveTimeline;
import com.uber.hoodie.common.table.timeline.HoodieInstant;
import com.uber.hoodie.common.table.timeline.HoodieInstant.State;
import com.uber.hoodie.common.util.AvroUtils;
import com.uber.hoodie.common.util.CompactionUtils;
import com.uber.hoodie.common.util.FSUtils;
import com.uber.hoodie.common.util.HoodieAvroUtils;
import java.io.ByteArrayInputStream;
@@ -61,6 +65,7 @@ import java.util.stream.Stream;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.generic.IndexedRecord;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
@@ -113,6 +118,14 @@ public class HoodieTestUtils {
}
}
public static final void createDeltaCommitFiles(String basePath, String... commitTimes) throws IOException {
for (String commitTime : commitTimes) {
new File(
basePath + "/" + HoodieTableMetaClient.METAFOLDER_NAME + "/" + HoodieTimeline.makeDeltaFileName(commitTime))
.createNewFile();
}
}
public static final void createInflightCommitFiles(String basePath, String... commitTimes) throws IOException {
for (String commitTime : commitTimes) {
new File(basePath + "/" + HoodieTableMetaClient.METAFOLDER_NAME + "/" + HoodieTimeline.makeInflightCommitFileName(
@@ -177,6 +190,15 @@ public class HoodieTestUtils {
}
}
public static final void createCompactionRequest(HoodieTableMetaClient metaClient, String instant,
List<Pair<String, FileSlice>> fileSliceList) throws IOException {
HoodieCompactionPlan plan = CompactionUtils.buildFromFileSlices(fileSliceList, Optional.empty(), Optional.empty());
HoodieInstant compactionInstant =
new HoodieInstant(State.REQUESTED, HoodieTimeline.COMPACTION_ACTION, instant);
metaClient.getActiveTimeline().saveToCompactionRequested(compactionInstant,
AvroUtils.serializeCompactionPlan(plan));
}
public static final String getDataFilePath(String basePath, String partitionPath, String commitTime, String fileID)
throws IOException {
return basePath + "/" + partitionPath + "/" + FSUtils