Implement Savepoints and required metadata timeline (#86)
- Introduce avro to save clean metadata with details about the last commit that was retained - Save rollback metadata in the meta timeline - Create savepoint metadata and add API to createSavepoint, deleteSavepoint and rollbackToSavepoint - Savepointed commit should not be rolledback or cleaned or archived - introduce cli commands to show, create and rollback to savepoints - Write unit tests to test savepoints and rollbackToSavepoints
This commit is contained in:
@@ -17,6 +17,14 @@
|
||||
package com.uber.hoodie;
|
||||
|
||||
import com.codahale.metrics.Timer;
|
||||
import com.google.common.base.Preconditions;
|
||||
import com.google.common.collect.Lists;
|
||||
import com.google.common.collect.Maps;
|
||||
import com.uber.hoodie.avro.model.HoodieCleanMetadata;
|
||||
import com.uber.hoodie.avro.model.HoodieRollbackMetadata;
|
||||
import com.uber.hoodie.avro.model.HoodieSavepointMetadata;
|
||||
import com.uber.hoodie.common.HoodieCleanStat;
|
||||
import com.uber.hoodie.common.HoodieRollbackStat;
|
||||
import com.uber.hoodie.common.model.HoodieCommitMetadata;
|
||||
import com.uber.hoodie.common.model.HoodieKey;
|
||||
import com.uber.hoodie.common.model.HoodieRecord;
|
||||
@@ -27,12 +35,15 @@ import com.uber.hoodie.common.table.HoodieTableMetaClient;
|
||||
import com.uber.hoodie.common.table.HoodieTimeline;
|
||||
import com.uber.hoodie.common.table.timeline.HoodieActiveTimeline;
|
||||
import com.uber.hoodie.common.table.timeline.HoodieInstant;
|
||||
import com.uber.hoodie.common.util.AvroUtils;
|
||||
import com.uber.hoodie.common.util.FSUtils;
|
||||
import com.uber.hoodie.config.HoodieWriteConfig;
|
||||
import com.uber.hoodie.exception.HoodieCommitException;
|
||||
import com.uber.hoodie.exception.HoodieException;
|
||||
import com.uber.hoodie.exception.HoodieIOException;
|
||||
import com.uber.hoodie.exception.HoodieInsertException;
|
||||
import com.uber.hoodie.exception.HoodieRollbackException;
|
||||
import com.uber.hoodie.exception.HoodieSavepointException;
|
||||
import com.uber.hoodie.exception.HoodieUpsertException;
|
||||
import com.uber.hoodie.func.BulkInsertMapFunction;
|
||||
import com.uber.hoodie.index.HoodieIndex;
|
||||
@@ -62,14 +73,19 @@ import java.io.IOException;
|
||||
import java.io.Serializable;
|
||||
import java.nio.charset.StandardCharsets;
|
||||
import java.text.ParseException;
|
||||
import java.util.Arrays;
|
||||
import java.util.Collections;
|
||||
import java.util.Date;
|
||||
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Optional;
|
||||
import java.util.stream.Collectors;
|
||||
import java.util.stream.Stream;
|
||||
|
||||
import org.apache.spark.util.AccumulatorV2;
|
||||
import org.apache.spark.util.LongAccumulator;
|
||||
import scala.Option;
|
||||
import scala.Tuple2;
|
||||
|
||||
@@ -359,7 +375,7 @@ public class HoodieWriteClient<T extends HoodieRecordPayload> implements Seriali
|
||||
// We cannot have unbounded commit files. Archive commits if we have to archive
|
||||
archiveLog.archiveIfRequired();
|
||||
// Call clean to cleanup if there is anything to cleanup after the commit,
|
||||
clean();
|
||||
clean(commitTime);
|
||||
if (writeContext != null) {
|
||||
long durationInMs = metrics.getDurationInMs(writeContext.stop());
|
||||
metrics.updateCommitMetrics(
|
||||
@@ -379,6 +395,143 @@ public class HoodieWriteClient<T extends HoodieRecordPayload> implements Seriali
|
||||
return true;
|
||||
}
|
||||
|
||||
/**
|
||||
* Savepoint the latest commit. The data files and commit files for that commit will never be rolledback,
|
||||
* cleaned or archived. This gives an option to rollback the state to the savepoint anytime.
|
||||
* Savepoint needs to be manually created and deleted.
|
||||
*
|
||||
* Savepoint should be on a commit that is not cleaned.
|
||||
*
|
||||
* @param savePointMetadata - metadata about the savepoint
|
||||
* @return true if the savepoint was created successfully
|
||||
*/
|
||||
public boolean savepoint(HoodieSavepointMetadata savePointMetadata) {
|
||||
HoodieTable<T> table = HoodieTable
|
||||
.getHoodieTable(new HoodieTableMetaClient(fs, config.getBasePath(), true), config);
|
||||
if (table.getCompletedCommitTimeline().empty()) {
|
||||
throw new HoodieSavepointException("Could not savepoint. Commit timeline is empty");
|
||||
}
|
||||
|
||||
String latestCommit = table.getCompletedCommitTimeline().lastInstant().get().getTimestamp();
|
||||
logger.info("Savepointing latest commit " + latestCommit);
|
||||
return savepoint(latestCommit, savePointMetadata);
|
||||
}
|
||||
|
||||
/**
|
||||
* Savepoint a specific commit. The data files and commit files for that commit will never be rolledback,
|
||||
* cleaned or archived. This gives an option to rollback the state to the savepoint anytime.
|
||||
* Savepoint needs to be manually created and deleted.
|
||||
*
|
||||
* Savepoint should be on a commit that is not cleaned.
|
||||
*
|
||||
* @param savePointMetadata - metadata about the savepoint
|
||||
* @return true if the savepoint was created successfully
|
||||
*/
|
||||
public boolean savepoint(String commitTime, HoodieSavepointMetadata savePointMetadata) {
|
||||
HoodieTable<T> table = HoodieTable
|
||||
.getHoodieTable(new HoodieTableMetaClient(fs, config.getBasePath(), true), config);
|
||||
Optional<HoodieInstant> cleanInstant = table.getCompletedCleanTimeline().lastInstant();
|
||||
|
||||
HoodieInstant commitInstant = new HoodieInstant(false, HoodieTimeline.COMMIT_ACTION, commitTime);
|
||||
if(!table.getCompletedCommitTimeline().containsInstant(commitInstant)) {
|
||||
throw new HoodieSavepointException("Could not savepoint non-existing commit " + commitInstant);
|
||||
}
|
||||
|
||||
try {
|
||||
// Check the last commit that was not cleaned and check if savepoint time is > that commit
|
||||
String lastCommitRetained;
|
||||
if (cleanInstant.isPresent()) {
|
||||
HoodieCleanMetadata cleanMetadata = AvroUtils.deserializeHoodieCleanMetadata(
|
||||
table.getActiveTimeline().getInstantDetails(cleanInstant.get()).get());
|
||||
lastCommitRetained = cleanMetadata.getEarliestCommitToRetain();
|
||||
} else {
|
||||
lastCommitRetained =
|
||||
table.getCompletedCommitTimeline().firstInstant().get().getTimestamp();
|
||||
}
|
||||
|
||||
// Cannot allow savepoint time on a commit that could have been cleaned
|
||||
Preconditions.checkArgument(table.getActiveTimeline()
|
||||
.compareTimestamps(commitTime, lastCommitRetained, HoodieTimeline.GREATER_OR_EQUAL),
|
||||
"Could not savepoint commit " + commitTime + " as this is beyond the lookup window "
|
||||
+ lastCommitRetained);
|
||||
|
||||
// Nothing to save in the savepoint
|
||||
table.getActiveTimeline().saveAsComplete(
|
||||
new HoodieInstant(false, HoodieTimeline.SAVEPOINT_ACTION, commitTime),
|
||||
AvroUtils.serializeSavepointMetadata(savePointMetadata));
|
||||
logger.info("Savepoint " + commitTime + " created");
|
||||
return true;
|
||||
} catch (IOException e) {
|
||||
throw new HoodieSavepointException("Failed to savepoint " + commitTime, e);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Delete a savepoint that was created. Once the savepoint is deleted, the commit can be rolledback
|
||||
* and cleaner may clean up data files.
|
||||
*
|
||||
* @param savepointTime - delete the savepoint
|
||||
* @return true if the savepoint was deleted successfully
|
||||
*/
|
||||
public void deleteSavepoint(String savepointTime) {
|
||||
HoodieTable<T> table = HoodieTable
|
||||
.getHoodieTable(new HoodieTableMetaClient(fs, config.getBasePath(), true), config);
|
||||
HoodieActiveTimeline activeTimeline = table.getActiveTimeline();
|
||||
|
||||
HoodieInstant savePoint =
|
||||
new HoodieInstant(false, HoodieTimeline.SAVEPOINT_ACTION, savepointTime);
|
||||
boolean isSavepointPresent =
|
||||
table.getCompletedSavepointTimeline().containsInstant(savePoint);
|
||||
if (!isSavepointPresent) {
|
||||
logger.warn("No savepoint present " + savepointTime);
|
||||
return;
|
||||
}
|
||||
|
||||
activeTimeline.revertToInflight(savePoint);
|
||||
activeTimeline.deleteInflight(
|
||||
new HoodieInstant(true, HoodieTimeline.SAVEPOINT_ACTION, savepointTime));
|
||||
logger.info("Savepoint " + savepointTime + " deleted");
|
||||
}
|
||||
|
||||
/**
|
||||
* Rollback the state to the savepoint.
|
||||
* WARNING: This rollsback recent commits and deleted data files. Queries accessing the files
|
||||
* will mostly fail. This should be done during a downtime.
|
||||
*
|
||||
* @param savepointTime - savepoint time to rollback to
|
||||
* @return true if the savepoint was rollecback to successfully
|
||||
*/
|
||||
public boolean rollbackToSavepoint(String savepointTime) {
|
||||
HoodieTable<T> table = HoodieTable
|
||||
.getHoodieTable(new HoodieTableMetaClient(fs, config.getBasePath(), true), config);
|
||||
HoodieActiveTimeline activeTimeline = table.getActiveTimeline();
|
||||
HoodieTimeline commitTimeline = table.getCompletedCommitTimeline();
|
||||
|
||||
HoodieInstant savePoint =
|
||||
new HoodieInstant(false, HoodieTimeline.SAVEPOINT_ACTION, savepointTime);
|
||||
boolean isSavepointPresent =
|
||||
table.getCompletedSavepointTimeline().containsInstant(savePoint);
|
||||
if (!isSavepointPresent) {
|
||||
throw new HoodieRollbackException("No savepoint for commitTime " + savepointTime);
|
||||
}
|
||||
|
||||
List<String> commitsToRollback =
|
||||
commitTimeline.findInstantsAfter(savepointTime, Integer.MAX_VALUE).getInstants()
|
||||
.map(HoodieInstant::getTimestamp).collect(Collectors.toList());
|
||||
logger.info("Rolling back commits " + commitsToRollback);
|
||||
|
||||
rollback(commitsToRollback);
|
||||
|
||||
// Make sure the rollback was successful
|
||||
Optional<HoodieInstant> lastInstant =
|
||||
activeTimeline.reload().getCommitTimeline().filterCompletedInstants().lastInstant();
|
||||
Preconditions.checkArgument(lastInstant.isPresent());
|
||||
Preconditions.checkArgument(lastInstant.get().getTimestamp().equals(savepointTime),
|
||||
savepointTime + "is not the last commit after rolling back " + commitsToRollback
|
||||
+ ", last commit was " + lastInstant.get().getTimestamp());
|
||||
return true;
|
||||
}
|
||||
|
||||
/**
|
||||
* Rollback the (inflight/committed) record changes with the given commit time.
|
||||
* Three steps:
|
||||
@@ -388,81 +541,135 @@ public class HoodieWriteClient<T extends HoodieRecordPayload> implements Seriali
|
||||
* (4) Finally delete .commit or .inflight file,
|
||||
*/
|
||||
public boolean rollback(final String commitTime) throws HoodieRollbackException {
|
||||
rollback(Lists.newArrayList(commitTime));
|
||||
return true;
|
||||
}
|
||||
|
||||
|
||||
private void rollback(List<String> commits) {
|
||||
if(commits.isEmpty()) {
|
||||
logger.info("List of commits to rollback is empty");
|
||||
return;
|
||||
}
|
||||
|
||||
final Timer.Context context = metrics.getRollbackCtx();
|
||||
String startRollbackTime = HoodieActiveTimeline.COMMIT_FORMATTER.format(new Date());
|
||||
|
||||
// Create a Hoodie table which encapsulated the commits and files visible
|
||||
HoodieTable<T> table = HoodieTable
|
||||
.getHoodieTable(new HoodieTableMetaClient(fs, config.getBasePath(), true), config);
|
||||
HoodieActiveTimeline activeTimeline = table.getActiveTimeline();
|
||||
HoodieTimeline inflightTimeline = activeTimeline.getCommitTimeline().filterInflights();
|
||||
HoodieTimeline commitTimeline = activeTimeline.getCommitTimeline().filterCompletedInstants();
|
||||
HoodieTimeline inflightTimeline = table.getInflightCommitTimeline();
|
||||
HoodieTimeline commitTimeline = table.getCompletedCommitTimeline();
|
||||
|
||||
// Check if any of the commits is a savepoint - do not allow rollback on those commits
|
||||
List<String> savepoints =
|
||||
table.getCompletedSavepointTimeline().getInstants().map(HoodieInstant::getTimestamp)
|
||||
.collect(Collectors.toList());
|
||||
commits.forEach(s -> {
|
||||
if (savepoints.contains(s)) {
|
||||
throw new HoodieRollbackException(
|
||||
"Could not rollback a savepointed commit. Delete savepoint first before rolling back"
|
||||
+ s);
|
||||
}
|
||||
});
|
||||
|
||||
try {
|
||||
if (commitTimeline.lastInstant().isPresent()
|
||||
&& !commitTimeline.findInstantsAfter(commitTime, Integer.MAX_VALUE).empty()) {
|
||||
throw new HoodieRollbackException("Found commits after time :" + commitTime +
|
||||
if (commitTimeline.empty() && inflightTimeline.empty()) {
|
||||
// nothing to rollback
|
||||
logger.info("No commits to rollback " + commits);
|
||||
}
|
||||
|
||||
// Make sure only the last n commits are being rolled back
|
||||
// If there is a commit in-between or after that is not rolled back, then abort
|
||||
String lastCommit = commits.get(commits.size() - 1);
|
||||
if (!commitTimeline.empty() && !commitTimeline
|
||||
.findInstantsAfter(lastCommit, Integer.MAX_VALUE).empty()) {
|
||||
throw new HoodieRollbackException("Found commits after time :" + lastCommit +
|
||||
", please rollback greater commits first");
|
||||
}
|
||||
|
||||
List<String> inflights = inflightTimeline.getInstants().map(HoodieInstant::getTimestamp)
|
||||
.collect(Collectors.toList());
|
||||
if (!inflights.isEmpty() && inflights.indexOf(commitTime) != inflights.size() - 1) {
|
||||
throw new HoodieRollbackException(
|
||||
"Found in-flight commits after time :" + commitTime +
|
||||
", please rollback greater commits first");
|
||||
if (!inflights.isEmpty() && inflights.indexOf(lastCommit) != inflights.size() - 1) {
|
||||
throw new HoodieRollbackException(
|
||||
"Found in-flight commits after time :" + lastCommit +
|
||||
", please rollback greater commits first");
|
||||
}
|
||||
|
||||
if (inflights.contains(commitTime) || (commitTimeline.lastInstant().isPresent()
|
||||
&& commitTimeline.lastInstant().get().getTimestamp().equals(commitTime))) {
|
||||
// 1. Atomically unpublish this commit
|
||||
if(!inflights.contains(commitTime)) {
|
||||
// This is completed commit, first revert it to inflight to unpublish data
|
||||
activeTimeline.revertToInflight(
|
||||
new HoodieInstant(false, HoodieTimeline.COMMIT_ACTION, commitTime));
|
||||
}
|
||||
// 2. Revert the index changes
|
||||
logger.info("Clean out index changes at time: " + commitTime);
|
||||
if (!index.rollbackCommit(commitTime)) {
|
||||
throw new HoodieRollbackException(
|
||||
"Clean out index changes failed, for time :" + commitTime);
|
||||
}
|
||||
// Atomically unpublish all the commits
|
||||
commits.stream().filter(s -> !inflights.contains(s))
|
||||
.map(s -> new HoodieInstant(false, HoodieTimeline.COMMIT_ACTION, s))
|
||||
.forEach(activeTimeline::revertToInflight);
|
||||
logger.info("Unpublished " + commits);
|
||||
|
||||
// 3. Delete the new generated parquet files
|
||||
logger.info("Clean out all parquet files generated at time: " + commitTime);
|
||||
final Accumulator<Integer> numFilesDeletedAccu = jsc.accumulator(0);
|
||||
jsc.parallelize(
|
||||
FSUtils.getAllPartitionPaths(fs, table.getMetaClient().getBasePath()))
|
||||
.foreach((VoidFunction<String>) partitionPath -> {
|
||||
// Scan all partitions files with this commit time
|
||||
FileSystem fs1 = FSUtils.getFs();
|
||||
FileStatus[] toBeDeleted =
|
||||
fs1.listStatus(new Path(config.getBasePath(), partitionPath),
|
||||
path -> {
|
||||
return commitTime
|
||||
.equals(FSUtils.getCommitTime(path.getName()));
|
||||
});
|
||||
for (FileStatus file : toBeDeleted) {
|
||||
boolean success = fs1.delete(file.getPath(), false);
|
||||
logger.info("Delete file " + file.getPath() + "\t" + success);
|
||||
if (success) {
|
||||
numFilesDeletedAccu.add(1);
|
||||
// cleanup index entries
|
||||
commits.stream().forEach(s -> {
|
||||
if (!index.rollbackCommit(s)) {
|
||||
throw new HoodieRollbackException(
|
||||
"Clean out index changes failed, for time :" + s);
|
||||
}
|
||||
});
|
||||
logger.info("Index rolled back for commits " + commits);
|
||||
|
||||
// delete all the data files for all these commits
|
||||
logger.info("Clean out all parquet files generated for commits: " + commits);
|
||||
final LongAccumulator numFilesDeletedCounter = jsc.sc().longAccumulator();
|
||||
List<HoodieRollbackStat> stats = jsc.parallelize(
|
||||
FSUtils.getAllPartitionPaths(fs, table.getMetaClient().getBasePath()))
|
||||
.map((Function<String, HoodieRollbackStat>) partitionPath -> {
|
||||
// Scan all partitions files with this commit time
|
||||
logger.info("Cleaning path " + partitionPath);
|
||||
FileSystem fs1 = FSUtils.getFs();
|
||||
FileStatus[] toBeDeleted =
|
||||
fs1.listStatus(new Path(config.getBasePath(), partitionPath), path -> {
|
||||
if(!path.toString().contains(".parquet")) {
|
||||
return false;
|
||||
}
|
||||
String fileCommitTime = FSUtils.getCommitTime(path.getName());
|
||||
return commits.contains(fileCommitTime);
|
||||
});
|
||||
Map<FileStatus, Boolean> results = Maps.newHashMap();
|
||||
for (FileStatus file : toBeDeleted) {
|
||||
boolean success = fs1.delete(file.getPath(), false);
|
||||
results.put(file, success);
|
||||
logger.info("Delete file " + file.getPath() + "\t" + success);
|
||||
if (success) {
|
||||
numFilesDeletedCounter.add(1);
|
||||
}
|
||||
});
|
||||
// 4. Remove commit
|
||||
logger.info("Clean out metadata files at time: " + commitTime);
|
||||
activeTimeline.deleteInflight(
|
||||
new HoodieInstant(true, HoodieTimeline.COMMIT_ACTION, commitTime));
|
||||
}
|
||||
return HoodieRollbackStat.newBuilder().withPartitionPath(partitionPath)
|
||||
.withDeletedFileResults(results).build();
|
||||
}).collect();
|
||||
|
||||
if (context != null) {
|
||||
long durationInMs = metrics.getDurationInMs(context.stop());
|
||||
int numFilesDeleted = numFilesDeletedAccu.value();
|
||||
metrics.updateRollbackMetrics(durationInMs, numFilesDeleted);
|
||||
}
|
||||
// Remove the rolled back inflight commits
|
||||
commits.stream().map(s -> new HoodieInstant(true, HoodieTimeline.COMMIT_ACTION, s))
|
||||
.forEach(activeTimeline::deleteInflight);
|
||||
logger.info("Deleted inflight commits " + commits);
|
||||
|
||||
Optional<Long> durationInMs = Optional.empty();
|
||||
if (context != null) {
|
||||
durationInMs = Optional.of(metrics.getDurationInMs(context.stop()));
|
||||
Long numFilesDeleted = numFilesDeletedCounter.value();
|
||||
metrics.updateRollbackMetrics(durationInMs.get(), numFilesDeleted);
|
||||
}
|
||||
HoodieRollbackMetadata rollbackMetadata =
|
||||
AvroUtils.convertRollbackMetadata(startRollbackTime, durationInMs, commits, stats);
|
||||
table.getActiveTimeline().saveAsComplete(
|
||||
new HoodieInstant(false, HoodieTimeline.ROLLBACK_ACTION, startRollbackTime),
|
||||
AvroUtils.serializeRollbackMetadata(rollbackMetadata));
|
||||
logger.info("Commits " + commits + " rollback is complete");
|
||||
|
||||
if (!table.getActiveTimeline().getCleanerTimeline().empty()) {
|
||||
logger.info("Cleaning up older rollback meta files");
|
||||
// Cleanup of older cleaner meta files
|
||||
// TODO - make the commit archival generic and archive rollback metadata
|
||||
FSUtils.deleteOlderRollbackMetaFiles(fs, table.getMetaClient().getMetaPath(),
|
||||
table.getActiveTimeline().getRollbackTimeline().getInstants());
|
||||
}
|
||||
return true;
|
||||
} catch (IOException e) {
|
||||
throw new HoodieRollbackException("Failed to rollback " +
|
||||
config.getBasePath() + " at commit time" + commitTime, e);
|
||||
config.getBasePath() + " commits " + commits, e);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -476,37 +683,58 @@ public class HoodieWriteClient<T extends HoodieRecordPayload> implements Seriali
|
||||
/**
|
||||
* Clean up any stale/old files/data lying around (either on file storage or index storage)
|
||||
*/
|
||||
private void clean() throws HoodieIOException {
|
||||
private void clean(String startCleanTime) throws HoodieIOException {
|
||||
try {
|
||||
logger.info("Cleaner started");
|
||||
final Timer.Context context = metrics.getCleanCtx();
|
||||
|
||||
// Create a Hoodie table which encapsulated the commits and files visible
|
||||
HoodieTable<T> table = HoodieTable
|
||||
.getHoodieTable(new HoodieTableMetaClient(fs, config.getBasePath(), true), config);
|
||||
|
||||
List<String> partitionsToClean = FSUtils.getAllPartitionPaths(fs, table.getMetaClient().getBasePath());
|
||||
List<String> partitionsToClean =
|
||||
FSUtils.getAllPartitionPaths(fs, table.getMetaClient().getBasePath());
|
||||
// shuffle to distribute cleaning work across partitions evenly
|
||||
Collections.shuffle(partitionsToClean);
|
||||
logger.info("Partitions to clean up : " + partitionsToClean + ", with policy " + config.getCleanerPolicy());
|
||||
if(partitionsToClean.isEmpty()) {
|
||||
logger.info("Partitions to clean up : " + partitionsToClean + ", with policy " + config
|
||||
.getCleanerPolicy());
|
||||
if (partitionsToClean.isEmpty()) {
|
||||
logger.info("Nothing to clean here mom. It is already clean");
|
||||
return;
|
||||
}
|
||||
|
||||
int cleanerParallelism = Math.min(partitionsToClean.size(), config.getCleanerParallelism());
|
||||
int numFilesDeleted = jsc.parallelize(partitionsToClean, cleanerParallelism)
|
||||
.map((Function<String, Integer>) partitionPathToClean -> {
|
||||
List<HoodieCleanStat> cleanStats = jsc.parallelize(partitionsToClean, cleanerParallelism)
|
||||
.map((Function<String, HoodieCleanStat>) partitionPathToClean -> {
|
||||
HoodieCleaner cleaner = new HoodieCleaner(table, config);
|
||||
return cleaner.clean(partitionPathToClean);
|
||||
})
|
||||
.reduce((Function2<Integer, Integer, Integer>) (v1, v2) -> v1 + v2);
|
||||
.collect();
|
||||
|
||||
logger.info("Cleaned " + numFilesDeleted + " files");
|
||||
// Emit metrics (duration, numFilesDeleted) if needed
|
||||
Optional<Long> durationInMs = Optional.empty();
|
||||
if (context != null) {
|
||||
long durationInMs = metrics.getDurationInMs(context.stop());
|
||||
logger.info("cleanerElaspsedTime (Minutes): " + durationInMs / (1000 * 60));
|
||||
metrics.updateCleanMetrics(durationInMs, numFilesDeleted);
|
||||
durationInMs = Optional.of(metrics.getDurationInMs(context.stop()));
|
||||
logger.info("cleanerElaspsedTime (Minutes): " + durationInMs.get() / (1000 * 60));
|
||||
}
|
||||
|
||||
// Create the metadata and save it
|
||||
HoodieCleanMetadata metadata =
|
||||
AvroUtils.convertCleanMetadata(startCleanTime, durationInMs, cleanStats);
|
||||
logger.info("Cleaned " + metadata.getTotalFilesDeleted() + " files");
|
||||
metrics.updateCleanMetrics(durationInMs.orElseGet(() -> -1L),
|
||||
metadata.getTotalFilesDeleted());
|
||||
|
||||
table.getActiveTimeline().saveAsComplete(
|
||||
new HoodieInstant(false, HoodieTimeline.CLEAN_ACTION, startCleanTime),
|
||||
AvroUtils.serializeCleanMetadata(metadata));
|
||||
logger.info("Marked clean started on " + startCleanTime + " as complete");
|
||||
|
||||
if (!table.getActiveTimeline().getCleanerTimeline().empty()) {
|
||||
// Cleanup of older cleaner meta files
|
||||
// TODO - make the commit archival generic and archive clean metadata
|
||||
FSUtils.deleteOlderCleanMetaFiles(fs, table.getMetaClient().getMetaPath(),
|
||||
table.getActiveTimeline().getCleanerTimeline().getInstants());
|
||||
}
|
||||
} catch (IOException e) {
|
||||
throw new HoodieIOException("Failed to clean up after commit", e);
|
||||
|
||||
Reference in New Issue
Block a user