1
0

[HUDI-169] Speed up rolling back of instants (#968)

This commit is contained in:
Balaji Varadarajan
2019-10-24 19:34:00 -07:00
committed by vinoth chandar
parent d8be818ac9
commit c23da694cc
5 changed files with 507 additions and 248 deletions

View File

@@ -51,6 +51,8 @@ public class HoodieWriteConfig extends DefaultHoodieConfig {
private static final String INSERT_PARALLELISM = "hoodie.insert.shuffle.parallelism";
private static final String BULKINSERT_PARALLELISM = "hoodie.bulkinsert.shuffle.parallelism";
private static final String UPSERT_PARALLELISM = "hoodie.upsert.shuffle.parallelism";
private static final String DEFAULT_ROLLBACK_PARALLELISM = "100";
private static final String ROLLBACK_PARALLELISM = "hoodie.rollback.parallelism";
private static final String WRITE_BUFFER_LIMIT_BYTES = "hoodie.write.buffer.limit.bytes";
private static final String DEFAULT_WRITE_BUFFER_LIMIT_BYTES = String.valueOf(4 * 1024 * 1024);
private static final String COMBINE_BEFORE_INSERT_PROP = "hoodie.combine.before.insert";
@@ -141,6 +143,11 @@ public class HoodieWriteConfig extends DefaultHoodieConfig {
return Integer.parseInt(props.getProperty(UPSERT_PARALLELISM));
}
public int getRollbackParallelism() {
return Integer.parseInt(props.getProperty(ROLLBACK_PARALLELISM));
}
public int getWriteBufferLimitBytes() {
return Integer.parseInt(props.getProperty(WRITE_BUFFER_LIMIT_BYTES, DEFAULT_WRITE_BUFFER_LIMIT_BYTES));
}
@@ -562,6 +569,11 @@ public class HoodieWriteConfig extends DefaultHoodieConfig {
return this;
}
public Builder withRollbackParallelism(int rollbackParallelism) {
props.setProperty(ROLLBACK_PARALLELISM, String.valueOf(rollbackParallelism));
return this;
}
public Builder withWriteBufferLimitBytes(int writeBufferLimit) {
props.setProperty(WRITE_BUFFER_LIMIT_BYTES, String.valueOf(writeBufferLimit));
return this;
@@ -651,6 +663,7 @@ public class HoodieWriteConfig extends DefaultHoodieConfig {
setDefaultOnCondition(props, !props.containsKey(BULKINSERT_PARALLELISM), BULKINSERT_PARALLELISM,
DEFAULT_PARALLELISM);
setDefaultOnCondition(props, !props.containsKey(UPSERT_PARALLELISM), UPSERT_PARALLELISM, DEFAULT_PARALLELISM);
setDefaultOnCondition(props, !props.containsKey(ROLLBACK_PARALLELISM), ROLLBACK_PARALLELISM, DEFAULT_PARALLELISM);
setDefaultOnCondition(props, !props.containsKey(COMBINE_BEFORE_INSERT_PROP), COMBINE_BEFORE_INSERT_PROP,
DEFAULT_COMBINE_BEFORE_INSERT);
setDefaultOnCondition(props, !props.containsKey(COMBINE_BEFORE_UPSERT_PROP), COMBINE_BEFORE_UPSERT_PROP,

View File

@@ -32,10 +32,8 @@ import java.util.Set;
import java.util.stream.Collectors;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.generic.IndexedRecord;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PathFilter;
import org.apache.hudi.WriteStatus;
import org.apache.hudi.avro.model.HoodieCompactionPlan;
import org.apache.hudi.common.HoodieCleanStat;
@@ -74,7 +72,6 @@ import org.apache.parquet.hadoop.ParquetReader;
import org.apache.spark.Partitioner;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFlatMapFunction;
import scala.Tuple2;
@@ -294,45 +291,6 @@ public class HoodieCopyOnWriteTable<T extends HoodieRecordPayload> extends Hoodi
}
}
/**
* Common method used for cleaning out parquet files under a partition path during rollback of a set of commits
*/
protected Map<FileStatus, Boolean> deleteCleanedFiles(Map<FileStatus, Boolean> results, String partitionPath,
PathFilter filter) throws IOException {
logger.info("Cleaning path " + partitionPath);
FileSystem fs = getMetaClient().getFs();
FileStatus[] toBeDeleted = fs.listStatus(FSUtils.getPartitionPath(config.getBasePath(), partitionPath), filter);
for (FileStatus file : toBeDeleted) {
boolean success = fs.delete(file.getPath(), false);
results.put(file, success);
logger.info("Delete file " + file.getPath() + "\t" + success);
}
return results;
}
/**
* Common method used for cleaning out parquet files under a partition path during rollback of a set of commits
*/
protected Map<FileStatus, Boolean> deleteCleanedFiles(Map<FileStatus, Boolean> results, String commit,
String partitionPath) throws IOException {
logger.info("Cleaning path " + partitionPath);
FileSystem fs = getMetaClient().getFs();
PathFilter filter = (path) -> {
if (path.toString().contains(".parquet")) {
String fileCommitTime = FSUtils.getCommitTime(path.getName());
return commit.equals(fileCommitTime);
}
return false;
};
FileStatus[] toBeDeleted = fs.listStatus(FSUtils.getPartitionPath(config.getBasePath(), partitionPath), filter);
for (FileStatus file : toBeDeleted) {
boolean success = fs.delete(file.getPath(), false);
results.put(file, success);
logger.info("Delete file " + file.getPath() + "\t" + success);
}
return results;
}
@Override
public List<HoodieRollbackStat> rollback(JavaSparkContext jsc, String commit, boolean deleteInstants)
throws IOException {
@@ -342,30 +300,38 @@ public class HoodieCopyOnWriteTable<T extends HoodieRecordPayload> extends Hoodi
this.getInflightCommitTimeline().getInstants().map(HoodieInstant::getTimestamp).collect(Collectors.toList());
// Atomically unpublish the commits
if (!inflights.contains(commit)) {
logger.info("Unpublishing " + commit);
activeTimeline.revertToInflight(new HoodieInstant(false, actionType, commit));
}
logger.info("Unpublished " + commit);
HoodieInstant instantToRollback = new HoodieInstant(false, actionType, commit);
Long startTime = System.currentTimeMillis();
// delete all the data files for this commit
logger.info("Clean out all parquet files generated for commit: " + commit);
List<RollbackRequest> rollbackRequests = generateRollbackRequests(instantToRollback);
//TODO: We need to persist this as rollback workload and use it in case of partial failures
List<HoodieRollbackStat> stats =
jsc.parallelize(FSUtils.getAllPartitionPaths(metaClient.getFs(), getMetaClient().getBasePath(),
config.shouldAssumeDatePartitioning())).map((Function<String, HoodieRollbackStat>) partitionPath -> {
// Scan all partitions files with this commit time
final Map<FileStatus, Boolean> filesToDeletedStatus = new HashMap<>();
deleteCleanedFiles(filesToDeletedStatus, commit, partitionPath);
return HoodieRollbackStat.newBuilder().withPartitionPath(partitionPath)
.withDeletedFileResults(filesToDeletedStatus).build();
}).collect();
new RollbackExecutor(metaClient, config).performRollback(jsc, instantToRollback, rollbackRequests);
// Delete Inflight instant if enabled
deleteInflightInstant(deleteInstants, activeTimeline, new HoodieInstant(true, actionType, commit));
logger.info("Time(in ms) taken to finish rollback " + (System.currentTimeMillis() - startTime));
return stats;
}
private List<RollbackRequest> generateRollbackRequests(HoodieInstant instantToRollback)
throws IOException {
return FSUtils.getAllPartitionPaths(this.metaClient.getFs(), this.getMetaClient().getBasePath(),
config.shouldAssumeDatePartitioning()).stream().map(partitionPath -> {
return RollbackRequest.createRollbackRequestWithDeleteDataAndLogFilesAction(partitionPath, instantToRollback);
}).collect(Collectors.toList());
}
/**
* Delete Inflight instant if enabled
*
*
* @param deleteInstant Enable Deletion of Inflight instant
* @param activeTimeline Hoodie active timeline
* @param instantToBeDeleted Instant to be deleted

View File

@@ -19,22 +19,17 @@
package org.apache.hudi.table;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Maps;
import com.google.common.base.Preconditions;
import com.google.common.collect.Sets;
import java.io.IOException;
import java.io.UncheckedIOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.stream.Collectors;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PathFilter;
import org.apache.hudi.WriteStatus;
import org.apache.hudi.avro.model.HoodieCompactionPlan;
import org.apache.hudi.common.HoodieRollbackStat;
@@ -47,11 +42,6 @@ import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.model.HoodieWriteStat;
import org.apache.hudi.common.table.HoodieTimeline;
import org.apache.hudi.common.table.SyncableFileSystemView;
import org.apache.hudi.common.table.log.HoodieLogFormat;
import org.apache.hudi.common.table.log.HoodieLogFormat.Writer;
import org.apache.hudi.common.table.log.block.HoodieCommandBlock;
import org.apache.hudi.common.table.log.block.HoodieCommandBlock.HoodieCommandBlockTypeEnum;
import org.apache.hudi.common.table.log.block.HoodieLogBlock.HeaderMetadataType;
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.util.FSUtils;
@@ -59,10 +49,8 @@ import org.apache.hudi.common.util.Option;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieCompactionException;
import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.exception.HoodieRollbackException;
import org.apache.hudi.exception.HoodieUpsertException;
import org.apache.hudi.func.MergeOnReadLazyInsertIterable;
import org.apache.hudi.index.HoodieIndex;
import org.apache.hudi.io.HoodieAppendHandle;
import org.apache.hudi.io.compact.HoodieRealtimeTableCompactor;
import org.apache.log4j.LogManager;
@@ -70,7 +58,6 @@ import org.apache.log4j.Logger;
import org.apache.spark.Partitioner;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
/**
* Implementation of a more real-time read-optimized Hoodie Table where
@@ -180,7 +167,6 @@ public class HoodieMergeOnReadTable<T extends HoodieRecordPayload> extends Hoodi
@Override
public List<HoodieRollbackStat> rollback(JavaSparkContext jsc, String commit, boolean deleteInstants)
throws IOException {
// At the moment, MOR table type does not support bulk nested rollbacks. Nested rollbacks is an experimental
// feature that is expensive. To perform nested rollbacks, initiate multiple requests of client.rollback
// (commitToRollback).
@@ -198,140 +184,121 @@ public class HoodieMergeOnReadTable<T extends HoodieRecordPayload> extends Hoodi
}
logger.info("Unpublished " + commit);
Long startTime = System.currentTimeMillis();
List<RollbackRequest> rollbackRequests = generateRollbackRequests(jsc, instantToRollback);
//TODO: We need to persist this as rollback workload and use it in case of partial failures
List<HoodieRollbackStat> allRollbackStats =
jsc.parallelize(FSUtils.getAllPartitionPaths(this.metaClient.getFs(), this.getMetaClient().getBasePath(),
config.shouldAssumeDatePartitioning())).map((Function<String, HoodieRollbackStat>) partitionPath -> {
HoodieActiveTimeline activeTimeline = this.getActiveTimeline().reload();
HoodieRollbackStat hoodieRollbackStats = null;
// Need to put the path filter here since Filter is not serializable
// PathFilter to get all parquet files and log files that need to be deleted
PathFilter filter = (path) -> {
if (path.toString().contains(".parquet")) {
String fileCommitTime = FSUtils.getCommitTime(path.getName());
return commit.equals(fileCommitTime);
} else if (path.toString().contains(".log")) {
// Since the baseCommitTime is the only commit for new log files, it's okay here
String fileCommitTime = FSUtils.getBaseCommitTimeFromLogPath(path);
return commit.equals(fileCommitTime);
}
return false;
};
final Map<FileStatus, Boolean> filesToDeletedStatus = new HashMap<>();
switch (instantToRollback.getAction()) {
case HoodieTimeline.COMMIT_ACTION:
try {
// Rollback of a commit should delete the newly created parquet files along with any log
// files created with this as baseCommit. This is required to support multi-rollbacks in a MOR
// table.
super.deleteCleanedFiles(filesToDeletedStatus, partitionPath, filter);
hoodieRollbackStats = HoodieRollbackStat.newBuilder().withPartitionPath(partitionPath)
.withDeletedFileResults(filesToDeletedStatus).build();
break;
} catch (IOException io) {
throw new UncheckedIOException("Failed to rollback for commit " + commit, io);
}
case HoodieTimeline.COMPACTION_ACTION:
try {
// If there is no delta commit present after the current commit (if compaction), no action, else we
// need to make sure that a compaction commit rollback also deletes any log files written as part of
// the
// succeeding deltacommit.
boolean higherDeltaCommits = !activeTimeline.getDeltaCommitTimeline().filterCompletedInstants()
.findInstantsAfter(commit, 1).empty();
if (higherDeltaCommits) {
// Rollback of a compaction action with no higher deltacommit means that the compaction is
// scheduled
// and has not yet finished. In this scenario we should delete only the newly created parquet
// files
// and not corresponding base commit log files created with this as baseCommit since updates would
// have been written to the log files.
super.deleteCleanedFiles(filesToDeletedStatus, commit, partitionPath);
hoodieRollbackStats = HoodieRollbackStat.newBuilder().withPartitionPath(partitionPath)
.withDeletedFileResults(filesToDeletedStatus).build();
} else {
// No deltacommits present after this compaction commit (inflight or requested). In this case, we
// can also delete any log files that were created with this compaction commit as base
// commit.
super.deleteCleanedFiles(filesToDeletedStatus, partitionPath, filter);
hoodieRollbackStats = HoodieRollbackStat.newBuilder().withPartitionPath(partitionPath)
.withDeletedFileResults(filesToDeletedStatus).build();
}
break;
} catch (IOException io) {
throw new UncheckedIOException("Failed to rollback for commit " + commit, io);
}
case HoodieTimeline.DELTA_COMMIT_ACTION:
// --------------------------------------------------------------------------------------------------
// (A) The following cases are possible if index.canIndexLogFiles and/or index.isGlobal
// --------------------------------------------------------------------------------------------------
// (A.1) Failed first commit - Inserts were written to log files and HoodieWriteStat has no entries.
// In
// this scenario we would want to delete these log files.
// (A.2) Failed recurring commit - Inserts/Updates written to log files. In this scenario,
// HoodieWriteStat will have the baseCommitTime for the first log file written, add rollback blocks.
// (A.3) Rollback triggered for first commit - Inserts were written to the log files but the commit is
// being reverted. In this scenario, HoodieWriteStat will be `null` for the attribute prevCommitTime
// and
// and hence will end up deleting these log files. This is done so there are no orphan log files
// lying around.
// (A.4) Rollback triggered for recurring commits - Inserts/Updates are being rolled back, the actions
// taken in this scenario is a combination of (A.2) and (A.3)
// ---------------------------------------------------------------------------------------------------
// (B) The following cases are possible if !index.canIndexLogFiles and/or !index.isGlobal
// ---------------------------------------------------------------------------------------------------
// (B.1) Failed first commit - Inserts were written to parquet files and HoodieWriteStat has no
// entries.
// In this scenario, we delete all the parquet files written for the failed commit.
// (B.2) Failed recurring commits - Inserts were written to parquet files and updates to log files. In
// this scenario, perform (A.1) and for updates written to log files, write rollback blocks.
// (B.3) Rollback triggered for first commit - Same as (B.1)
// (B.4) Rollback triggered for recurring commits - Same as (B.2) plus we need to delete the log files
// as well if the base parquet file gets deleted.
try {
HoodieCommitMetadata commitMetadata =
HoodieCommitMetadata.fromBytes(
metaClient.getCommitTimeline().getInstantDetails(new HoodieInstant(true,
instantToRollback.getAction(), instantToRollback.getTimestamp())).get(),
HoodieCommitMetadata.class);
// read commit file and (either append delete blocks or delete file)
Map<FileStatus, Long> filesToNumBlocksRollback = new HashMap<>();
// In case all data was inserts and the commit failed, delete the file belonging to that commit
// We do not know fileIds for inserts (first inserts are either log files or parquet files),
// delete all files for the corresponding failed commit, if present (same as COW)
super.deleteCleanedFiles(filesToDeletedStatus, partitionPath, filter);
final Set<String> deletedFiles = filesToDeletedStatus.entrySet().stream().map(entry -> {
Path filePath = entry.getKey().getPath();
return FSUtils.getFileIdFromFilePath(filePath);
}).collect(Collectors.toSet());
// append rollback blocks for updates
if (commitMetadata.getPartitionToWriteStats().containsKey(partitionPath)) {
hoodieRollbackStats = rollback(index, partitionPath, commit, commitMetadata, filesToDeletedStatus,
filesToNumBlocksRollback, deletedFiles);
}
break;
} catch (IOException io) {
throw new UncheckedIOException("Failed to rollback for commit " + commit, io);
}
default:
break;
}
return hoodieRollbackStats;
}).filter(Objects::nonNull).collect();
new RollbackExecutor(metaClient, config).performRollback(jsc, instantToRollback, rollbackRequests);
// Delete Inflight instants if enabled
deleteInflightInstant(deleteInstants, this.getActiveTimeline(),
new HoodieInstant(true, instantToRollback.getAction(), instantToRollback.getTimestamp()));
deleteInflightInstant(deleteInstants, this.getActiveTimeline(), new HoodieInstant(true, instantToRollback
.getAction(), instantToRollback.getTimestamp()));
logger.debug("Time(in ms) taken to finish rollback " + (System.currentTimeMillis() - startTime));
logger.info("Time(in ms) taken to finish rollback " + (System.currentTimeMillis() - startTime));
return allRollbackStats;
}
/**
* Generate all rollback requests that we need to perform for rolling back this action without actually performing
* rolling back
* @param jsc JavaSparkContext
* @param instantToRollback Instant to Rollback
* @return list of rollback requests
* @throws IOException
*/
private List<RollbackRequest> generateRollbackRequests(JavaSparkContext jsc, HoodieInstant instantToRollback)
throws IOException {
String commit = instantToRollback.getTimestamp();
List<String> partitions = FSUtils.getAllPartitionPaths(this.metaClient.getFs(), this.getMetaClient().getBasePath(),
config.shouldAssumeDatePartitioning());
int sparkPartitions = Math.max(Math.min(partitions.size(), config.getRollbackParallelism()), 1);
return jsc.parallelize(partitions, Math.min(partitions.size(), sparkPartitions))
.flatMap(partitionPath -> {
HoodieActiveTimeline activeTimeline = this.getActiveTimeline().reload();
List<RollbackRequest> partitionRollbackRequests = new ArrayList<>();
switch (instantToRollback.getAction()) {
case HoodieTimeline.COMMIT_ACTION:
logger.info("Rolling back commit action. There are higher delta commits. So only rolling back this "
+ "instant");
partitionRollbackRequests.add(RollbackRequest.createRollbackRequestWithDeleteDataAndLogFilesAction(
partitionPath, instantToRollback));
break;
case HoodieTimeline.COMPACTION_ACTION:
// If there is no delta commit present after the current commit (if compaction), no action, else we
// need to make sure that a compaction commit rollback also deletes any log files written as part of the
// succeeding deltacommit.
boolean higherDeltaCommits = !activeTimeline.getDeltaCommitTimeline()
.filterCompletedInstants().findInstantsAfter(commit, 1).empty();
if (higherDeltaCommits) {
// Rollback of a compaction action with no higher deltacommit means that the compaction is scheduled
// and has not yet finished. In this scenario we should delete only the newly created parquet files
// and not corresponding base commit log files created with this as baseCommit since updates would
// have been written to the log files.
logger.info("Rolling back compaction. There are higher delta commits. So only deleting data files");
partitionRollbackRequests.add(RollbackRequest.createRollbackRequestWithDeleteDataFilesOnlyAction(
partitionPath, instantToRollback));
} else {
// No deltacommits present after this compaction commit (inflight or requested). In this case, we
// can also delete any log files that were created with this compaction commit as base
// commit.
logger.info("Rolling back compaction plan. There are NO higher delta commits. So deleting both data and"
+ " log files");
partitionRollbackRequests.add(
RollbackRequest.createRollbackRequestWithDeleteDataAndLogFilesAction(partitionPath,
instantToRollback));
}
break;
case HoodieTimeline.DELTA_COMMIT_ACTION:
// --------------------------------------------------------------------------------------------------
// (A) The following cases are possible if index.canIndexLogFiles and/or index.isGlobal
// --------------------------------------------------------------------------------------------------
// (A.1) Failed first commit - Inserts were written to log files and HoodieWriteStat has no entries. In
// this scenario we would want to delete these log files.
// (A.2) Failed recurring commit - Inserts/Updates written to log files. In this scenario,
// HoodieWriteStat will have the baseCommitTime for the first log file written, add rollback blocks.
// (A.3) Rollback triggered for first commit - Inserts were written to the log files but the commit is
// being reverted. In this scenario, HoodieWriteStat will be `null` for the attribute prevCommitTime and
// and hence will end up deleting these log files. This is done so there are no orphan log files
// lying around.
// (A.4) Rollback triggered for recurring commits - Inserts/Updates are being rolled back, the actions
// taken in this scenario is a combination of (A.2) and (A.3)
// ---------------------------------------------------------------------------------------------------
// (B) The following cases are possible if !index.canIndexLogFiles and/or !index.isGlobal
// ---------------------------------------------------------------------------------------------------
// (B.1) Failed first commit - Inserts were written to parquet files and HoodieWriteStat has no entries.
// In this scenario, we delete all the parquet files written for the failed commit.
// (B.2) Failed recurring commits - Inserts were written to parquet files and updates to log files. In
// this scenario, perform (A.1) and for updates written to log files, write rollback blocks.
// (B.3) Rollback triggered for first commit - Same as (B.1)
// (B.4) Rollback triggered for recurring commits - Same as (B.2) plus we need to delete the log files
// as well if the base parquet file gets deleted.
try {
HoodieCommitMetadata commitMetadata = HoodieCommitMetadata.fromBytes(
metaClient.getCommitTimeline().getInstantDetails(
new HoodieInstant(true, instantToRollback.getAction(), instantToRollback.getTimestamp()))
.get(), HoodieCommitMetadata.class);
// In case all data was inserts and the commit failed, delete the file belonging to that commit
// We do not know fileIds for inserts (first inserts are either log files or parquet files),
// delete all files for the corresponding failed commit, if present (same as COW)
partitionRollbackRequests.add(RollbackRequest.createRollbackRequestWithDeleteDataAndLogFilesAction(
partitionPath, instantToRollback));
// append rollback blocks for updates
if (commitMetadata.getPartitionToWriteStats().containsKey(partitionPath)) {
partitionRollbackRequests
.addAll(generateAppendRollbackBlocksAction(partitionPath, instantToRollback, commitMetadata));
}
break;
} catch (IOException io) {
throw new UncheckedIOException("Failed to collect rollback actions for commit " + commit, io);
}
default:
break;
}
return partitionRollbackRequests.iterator();
}).filter(Objects::nonNull).collect();
}
@Override
public void finalizeWrite(JavaSparkContext jsc, String instantTs, List<HoodieWriteStat> stats)
throws HoodieIOException {
@@ -450,19 +417,10 @@ public class HoodieMergeOnReadTable<T extends HoodieRecordPayload> extends Hoodi
}
}
private Map<HeaderMetadataType, String> generateHeader(String commit) {
// generate metadata
Map<HeaderMetadataType, String> header = Maps.newHashMap();
header.put(HeaderMetadataType.INSTANT_TIME, metaClient.getActiveTimeline().lastInstant().get().getTimestamp());
header.put(HeaderMetadataType.TARGET_INSTANT_TIME, commit);
header.put(HeaderMetadataType.COMMAND_BLOCK_TYPE,
String.valueOf(HoodieCommandBlockTypeEnum.ROLLBACK_PREVIOUS_BLOCK.ordinal()));
return header;
}
private List<RollbackRequest> generateAppendRollbackBlocksAction(String partitionPath, HoodieInstant rollbackInstant,
HoodieCommitMetadata commitMetadata) {
Preconditions.checkArgument(rollbackInstant.getAction().equals(HoodieTimeline.DELTA_COMMIT_ACTION));
private HoodieRollbackStat rollback(HoodieIndex hoodieIndex, String partitionPath, String commit,
HoodieCommitMetadata commitMetadata, final Map<FileStatus, Boolean> filesToDeletedStatus,
Map<FileStatus, Long> filesToNumBlocksRollback, Set<String> deletedFiles) {
// wStat.getPrevCommit() might not give the right commit time in the following
// scenario : If a compaction was scheduled, the new commitTime associated with the requested compaction will be
// used to write the new log files. In this case, the commit time for the log file is the compaction requested time.
@@ -470,47 +428,27 @@ public class HoodieMergeOnReadTable<T extends HoodieRecordPayload> extends Hoodi
// baseCommit always by listing the file slice
Map<String, String> fileIdToBaseCommitTimeForLogMap = this.getRTFileSystemView().getLatestFileSlices(partitionPath)
.collect(Collectors.toMap(FileSlice::getFileId, FileSlice::getBaseInstantTime));
commitMetadata.getPartitionToWriteStats().get(partitionPath).stream().filter(wStat -> {
// Filter out stats without prevCommit since they are all inserts
return wStat != null && wStat.getPrevCommit() != HoodieWriteStat.NULL_COMMIT && wStat.getPrevCommit() != null
&& !deletedFiles.contains(wStat.getFileId());
}).forEach(wStat -> {
Writer writer = null;
String baseCommitTime = fileIdToBaseCommitTimeForLogMap.get(wStat.getFileId());
if (null != baseCommitTime) {
boolean success = false;
try {
writer = HoodieLogFormat.newWriterBuilder()
.onParentPath(FSUtils.getPartitionPath(this.getMetaClient().getBasePath(), partitionPath))
.withFileId(wStat.getFileId()).overBaseCommit(baseCommitTime).withFs(this.metaClient.getFs())
.withFileExtension(HoodieLogFile.DELTA_EXTENSION).build();
// generate metadata
Map<HeaderMetadataType, String> header = generateHeader(commit);
// if update belongs to an existing log file
writer = writer.appendBlock(new HoodieCommandBlock(header));
success = true;
} catch (IOException | InterruptedException io) {
throw new HoodieRollbackException("Failed to rollback for commit " + commit, io);
} finally {
try {
if (writer != null) {
writer.close();
}
if (success) {
// This step is intentionally done after writer is closed. Guarantees that
// getFileStatus would reflect correct stats and FileNotFoundException is not thrown in
// cloud-storage : HUDI-168
filesToNumBlocksRollback.put(this.getMetaClient().getFs().getFileStatus(writer.getLogFile().getPath()),
1L);
}
} catch (IOException io) {
throw new UncheckedIOException(io);
}
}
}
});
return HoodieRollbackStat.newBuilder().withPartitionPath(partitionPath).withDeletedFileResults(filesToDeletedStatus)
.withRollbackBlockAppendResults(filesToNumBlocksRollback).build();
}
return commitMetadata.getPartitionToWriteStats().get(partitionPath).stream()
.filter(wStat -> {
// Filter out stats without prevCommit since they are all inserts
boolean validForRollback = (wStat != null) && (wStat.getPrevCommit() != HoodieWriteStat.NULL_COMMIT)
&& (wStat.getPrevCommit() != null) && fileIdToBaseCommitTimeForLogMap.containsKey(wStat.getFileId());
if (validForRollback) {
// For sanity, log instant time can never be less than base-commit on which we are rolling back
Preconditions.checkArgument(HoodieTimeline.compareTimestamps(fileIdToBaseCommitTimeForLogMap.get(
wStat.getFileId()), rollbackInstant.getTimestamp(), HoodieTimeline.LESSER_OR_EQUAL));
}
return validForRollback && HoodieTimeline.compareTimestamps(fileIdToBaseCommitTimeForLogMap.get(
// Base Ts should be strictly less. If equal (for inserts-to-logs), the caller employs another option
// to delete and we should not step on it
wStat.getFileId()), rollbackInstant.getTimestamp(), HoodieTimeline.LESSER);
}).map(wStat -> {
String baseCommitTime = fileIdToBaseCommitTimeForLogMap.get(wStat.getFileId());
return RollbackRequest.createRollbackRequestWithAppendRollbackBlockAction(partitionPath, wStat.getFileId(),
baseCommitTime, rollbackInstant);
}).collect(Collectors.toList());
}
}

View File

@@ -0,0 +1,233 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi.table;
import com.google.common.base.Preconditions;
import com.google.common.collect.Maps;
import java.io.IOException;
import java.io.Serializable;
import java.io.UncheckedIOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.PathFilter;
import org.apache.hudi.common.HoodieRollbackStat;
import org.apache.hudi.common.model.HoodieLogFile;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.log.HoodieLogFormat;
import org.apache.hudi.common.table.log.HoodieLogFormat.Writer;
import org.apache.hudi.common.table.log.block.HoodieCommandBlock;
import org.apache.hudi.common.table.log.block.HoodieCommandBlock.HoodieCommandBlockTypeEnum;
import org.apache.hudi.common.table.log.block.HoodieLogBlock.HeaderMetadataType;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.util.FSUtils;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieRollbackException;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.apache.spark.api.java.JavaSparkContext;
import scala.Tuple2;
/**
* Performs Rollback of Hoodie Tables
*/
public class RollbackExecutor implements Serializable {
private static Logger logger = LogManager.getLogger(RollbackExecutor.class);
private final HoodieTableMetaClient metaClient;
private final HoodieWriteConfig config;
public RollbackExecutor(HoodieTableMetaClient metaClient, HoodieWriteConfig config) {
this.metaClient = metaClient;
this.config = config;
}
/**
* Performs all rollback actions that we have collected in parallel.
*/
public List<HoodieRollbackStat> performRollback(JavaSparkContext jsc,
HoodieInstant instantToRollback, List<RollbackRequest> rollbackRequests) {
SerializablePathFilter filter = (path) -> {
if (path.toString().contains(".parquet")) {
String fileCommitTime = FSUtils.getCommitTime(path.getName());
return instantToRollback.getTimestamp().equals(fileCommitTime);
} else if (path.toString().contains(".log")) {
// Since the baseCommitTime is the only commit for new log files, it's okay here
String fileCommitTime = FSUtils.getBaseCommitTimeFromLogPath(path);
return instantToRollback.getTimestamp().equals(fileCommitTime);
}
return false;
};
int sparkPartitions = Math.max(Math.min(rollbackRequests.size(), config.getRollbackParallelism()), 1);
return jsc.parallelize(rollbackRequests, sparkPartitions).mapToPair(rollbackRequest -> {
final Map<FileStatus, Boolean> filesToDeletedStatus = new HashMap<>();
switch (rollbackRequest.getRollbackAction()) {
case DELETE_DATA_FILES_ONLY: {
deleteCleanedFiles(metaClient, config, filesToDeletedStatus, instantToRollback.getTimestamp(),
rollbackRequest.getPartitionPath());
return new Tuple2<String, HoodieRollbackStat>(rollbackRequest.getPartitionPath(),
HoodieRollbackStat.newBuilder().withPartitionPath(rollbackRequest.getPartitionPath())
.withDeletedFileResults(filesToDeletedStatus).build());
}
case DELETE_DATA_AND_LOG_FILES: {
deleteCleanedFiles(metaClient, config, filesToDeletedStatus, rollbackRequest.getPartitionPath(), filter);
return new Tuple2<String, HoodieRollbackStat>(rollbackRequest.getPartitionPath(),
HoodieRollbackStat.newBuilder().withPartitionPath(rollbackRequest.getPartitionPath())
.withDeletedFileResults(filesToDeletedStatus).build());
}
case APPEND_ROLLBACK_BLOCK: {
Writer writer = null;
boolean success = false;
try {
writer = HoodieLogFormat.newWriterBuilder().onParentPath(
FSUtils.getPartitionPath(metaClient.getBasePath(), rollbackRequest.getPartitionPath()))
.withFileId(rollbackRequest.getFileId().get())
.overBaseCommit(rollbackRequest.getLatestBaseInstant().get())
.withFs(metaClient.getFs())
.withFileExtension(HoodieLogFile.DELTA_EXTENSION).build();
// generate metadata
Map<HeaderMetadataType, String> header = generateHeader(instantToRollback.getTimestamp());
// if update belongs to an existing log file
writer = writer.appendBlock(new HoodieCommandBlock(header));
success = true;
} catch (IOException | InterruptedException io) {
throw new HoodieRollbackException(
"Failed to rollback for instant " + instantToRollback, io);
} finally {
try {
if (writer != null) {
writer.close();
}
} catch (IOException io) {
throw new UncheckedIOException(io);
}
}
// This step is intentionally done after writer is closed. Guarantees that
// getFileStatus would reflect correct stats and FileNotFoundException is not thrown in
// cloud-storage : HUDI-168
Map<FileStatus, Long> filesToNumBlocksRollback = new HashMap<>();
filesToNumBlocksRollback.put(metaClient.getFs()
.getFileStatus(writer.getLogFile().getPath()), 1L);
return new Tuple2<String, HoodieRollbackStat>(rollbackRequest.getPartitionPath(),
HoodieRollbackStat.newBuilder().withPartitionPath(rollbackRequest.getPartitionPath())
.withRollbackBlockAppendResults(filesToNumBlocksRollback).build());
}
default:
throw new IllegalStateException("Unknown Rollback action " + rollbackRequest);
}
}).reduceByKey(this::mergeRollbackStat).map(Tuple2::_2).collect();
}
/**
* Helper to merge 2 rollback-stats for a given partition
*
* @param stat1 HoodieRollbackStat
* @param stat2 HoodieRollbackStat
* @return Merged HoodieRollbackStat
*/
private HoodieRollbackStat mergeRollbackStat(HoodieRollbackStat stat1, HoodieRollbackStat stat2) {
Preconditions.checkArgument(stat1.getPartitionPath().equals(stat2.getPartitionPath()));
final List<String> successDeleteFiles = new ArrayList<>();
final List<String> failedDeleteFiles = new ArrayList<>();
final Map<FileStatus, Long> commandBlocksCount = new HashMap<>();
if (stat1.getSuccessDeleteFiles() != null) {
successDeleteFiles.addAll(stat1.getSuccessDeleteFiles());
}
if (stat2.getSuccessDeleteFiles() != null) {
successDeleteFiles.addAll(stat2.getSuccessDeleteFiles());
}
if (stat1.getFailedDeleteFiles() != null) {
failedDeleteFiles.addAll(stat1.getFailedDeleteFiles());
}
if (stat2.getFailedDeleteFiles() != null) {
failedDeleteFiles.addAll(stat2.getFailedDeleteFiles());
}
if (stat1.getCommandBlocksCount() != null) {
commandBlocksCount.putAll(stat1.getCommandBlocksCount());
}
if (stat2.getCommandBlocksCount() != null) {
commandBlocksCount.putAll(stat2.getCommandBlocksCount());
}
return new HoodieRollbackStat(stat1.getPartitionPath(), successDeleteFiles, failedDeleteFiles, commandBlocksCount);
}
/**
* Common method used for cleaning out parquet files under a partition path during rollback of a set of commits
*/
private Map<FileStatus, Boolean> deleteCleanedFiles(HoodieTableMetaClient metaClient, HoodieWriteConfig config,
Map<FileStatus, Boolean> results, String partitionPath,
PathFilter filter) throws IOException {
logger.info("Cleaning path " + partitionPath);
FileSystem fs = metaClient.getFs();
FileStatus[] toBeDeleted = fs.listStatus(FSUtils.getPartitionPath(config.getBasePath(), partitionPath), filter);
for (FileStatus file : toBeDeleted) {
boolean success = fs.delete(file.getPath(), false);
results.put(file, success);
logger.info("Delete file " + file.getPath() + "\t" + success);
}
return results;
}
/**
* Common method used for cleaning out parquet files under a partition path during rollback of a set of commits
*/
private Map<FileStatus, Boolean> deleteCleanedFiles(HoodieTableMetaClient metaClient, HoodieWriteConfig config,
Map<FileStatus, Boolean> results, String commit, String partitionPath) throws IOException {
logger.info("Cleaning path " + partitionPath);
FileSystem fs = metaClient.getFs();
PathFilter filter = (path) -> {
if (path.toString().contains(".parquet")) {
String fileCommitTime = FSUtils.getCommitTime(path.getName());
return commit.equals(fileCommitTime);
}
return false;
};
FileStatus[] toBeDeleted = fs.listStatus(FSUtils.getPartitionPath(config.getBasePath(), partitionPath), filter);
for (FileStatus file : toBeDeleted) {
boolean success = fs.delete(file.getPath(), false);
results.put(file, success);
logger.info("Delete file " + file.getPath() + "\t" + success);
}
return results;
}
private Map<HeaderMetadataType, String> generateHeader(String commit) {
// generate metadata
Map<HeaderMetadataType, String> header = Maps.newHashMap();
header.put(HeaderMetadataType.INSTANT_TIME, metaClient.getActiveTimeline().lastInstant().get().getTimestamp());
header.put(HeaderMetadataType.TARGET_INSTANT_TIME, commit);
header.put(HeaderMetadataType.COMMAND_BLOCK_TYPE,
String.valueOf(HoodieCommandBlockTypeEnum.ROLLBACK_PREVIOUS_BLOCK.ordinal()));
return header;
}
public interface SerializablePathFilter extends PathFilter, Serializable {
}
}

View File

@@ -0,0 +1,109 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi.table;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.util.Option;
/**
* Request for performing one rollback action
*/
public class RollbackRequest {
/**
* Rollback Action Types
*/
public enum RollbackAction {
DELETE_DATA_FILES_ONLY,
DELETE_DATA_AND_LOG_FILES,
APPEND_ROLLBACK_BLOCK
}
/**
* Partition path that needs to be rolled-back
*/
private final String partitionPath;
/**
* Rollback Instant
*/
private final HoodieInstant rollbackInstant;
/**
* FileId in case of appending rollback block
*/
private final Option<String> fileId;
/**
* Latest base instant needed for appending rollback block instant
*/
private final Option<String> latestBaseInstant;
/**
* Rollback Action
*/
private final RollbackAction rollbackAction;
public RollbackRequest(String partitionPath, HoodieInstant rollbackInstant,
Option<String> fileId, Option<String> latestBaseInstant, RollbackAction rollbackAction) {
this.partitionPath = partitionPath;
this.rollbackInstant = rollbackInstant;
this.fileId = fileId;
this.latestBaseInstant = latestBaseInstant;
this.rollbackAction = rollbackAction;
}
public static RollbackRequest createRollbackRequestWithDeleteDataFilesOnlyAction(String partitionPath,
HoodieInstant rollbackInstant) {
return new RollbackRequest(partitionPath, rollbackInstant, Option.empty(), Option.empty(),
RollbackAction.DELETE_DATA_FILES_ONLY);
}
public static RollbackRequest createRollbackRequestWithDeleteDataAndLogFilesAction(String partitionPath,
HoodieInstant rollbackInstant) {
return new RollbackRequest(partitionPath, rollbackInstant, Option.empty(), Option.empty(),
RollbackAction.DELETE_DATA_AND_LOG_FILES);
}
public static RollbackRequest createRollbackRequestWithAppendRollbackBlockAction(String partitionPath, String fileId,
String baseInstant, HoodieInstant rollbackInstant) {
return new RollbackRequest(partitionPath, rollbackInstant, Option.of(fileId), Option.of(baseInstant),
RollbackAction.APPEND_ROLLBACK_BLOCK);
}
public String getPartitionPath() {
return partitionPath;
}
public HoodieInstant getRollbackInstant() {
return rollbackInstant;
}
public Option<String> getFileId() {
return fileId;
}
public Option<String> getLatestBaseInstant() {
return latestBaseInstant;
}
public RollbackAction getRollbackAction() {
return rollbackAction;
}
}