1
0

[HUDI-3406] Rollback incorrectly relying on FS listing instead of Com… (#4957)

* [HUDI-3406] Rollback incorrectly relying on FS listing instead of Commit Metadata

* [HUDI-3406] Rollback incorrectly relying on FS listing instead of Commit Metadata

* [HUDI-3406] Rollback incorrectly relying on FS listing instead of Commit Metadata

* fix comments

* fix comments

* fix comments
This commit is contained in:
ForwardXu
2022-04-01 10:01:41 +08:00
committed by GitHub
parent a048e940fd
commit 98b4e9796e
9 changed files with 351 additions and 368 deletions

View File

@@ -21,7 +21,6 @@ package org.apache.hudi.client.utils;
import com.fasterxml.jackson.databind.DeserializationFeature;
import com.fasterxml.jackson.databind.ObjectMapper;
import java.io.IOException;
import org.apache.hudi.avro.model.HoodieArchivedMetaEntry;
import org.apache.hudi.avro.model.HoodieCompactionPlan;
import org.apache.hudi.avro.model.HoodieRequestedReplaceMetadata;
@@ -34,6 +33,7 @@ import org.apache.hudi.common.model.HoodieReplaceCommitMetadata;
import org.apache.hudi.common.model.HoodieRollingStatMetadata;
import org.apache.hudi.common.model.WriteOperationType;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.table.timeline.TimelineMetadataUtils;
@@ -146,6 +146,19 @@ public class MetadataConversionUtils {
return Option.of(TimelineMetadataUtils.deserializeRequestedReplaceMetadata(requestedContent.get()));
}
public static Option<HoodieCommitMetadata> getHoodieCommitMetadata(HoodieTableMetaClient metaClient, HoodieInstant hoodieInstant) throws IOException {
HoodieActiveTimeline activeTimeline = metaClient.getActiveTimeline();
HoodieTimeline timeline = activeTimeline.getCommitsTimeline().filterCompletedInstants();
if (hoodieInstant.getAction().equals(HoodieTimeline.REPLACE_COMMIT_ACTION)) {
return Option.of(HoodieReplaceCommitMetadata.fromBytes(timeline.getInstantDetails(hoodieInstant).get(),
HoodieReplaceCommitMetadata.class));
}
return Option.of(HoodieCommitMetadata.fromBytes(timeline.getInstantDetails(hoodieInstant).get(),
HoodieCommitMetadata.class));
}
public static org.apache.hudi.avro.model.HoodieCommitMetadata convertCommitMetadata(
HoodieCommitMetadata hoodieCommitMetadata) {
ObjectMapper mapper = new ObjectMapper();
@@ -160,4 +173,4 @@ public class MetadataConversionUtils {
avroMetaData.getExtraMetadata().put(HoodieRollingStatMetadata.ROLLING_STAT_METADATA_KEY, "");
return avroMetaData;
}
}
}

View File

@@ -20,7 +20,6 @@ package org.apache.hudi.table.action.rollback;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PathFilter;
import org.apache.hudi.avro.model.HoodieRollbackRequest;
import org.apache.hudi.common.HoodieRollbackStat;
import org.apache.hudi.common.engine.HoodieEngineContext;
@@ -214,8 +213,4 @@ public class BaseRollbackHelper implements Serializable {
String.valueOf(HoodieCommandBlock.HoodieCommandBlockTypeEnum.ROLLBACK_PREVIOUS_BLOCK.ordinal()));
return header;
}
public interface SerializablePathFilter extends PathFilter, Serializable {
}
}

View File

@@ -1,150 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.hudi.table.action.rollback;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PathFilter;
import org.apache.hudi.avro.model.HoodieRollbackRequest;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.HoodieWriteStat;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import java.io.IOException;
import java.io.Serializable;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import static org.apache.hudi.table.action.rollback.BaseRollbackHelper.EMPTY_STRING;
/**
* Performs Rollback of Hoodie Tables.
*/
public class ListingBasedRollbackHelper implements Serializable {
private static final Logger LOG = LogManager.getLogger(ListingBasedRollbackHelper.class);
private final HoodieTableMetaClient metaClient;
private final HoodieWriteConfig config;
public ListingBasedRollbackHelper(HoodieTableMetaClient metaClient, HoodieWriteConfig config) {
this.metaClient = metaClient;
this.config = config;
}
/**
* Collects info for Rollback plan.
*/
public List<HoodieRollbackRequest> getRollbackRequestsForRollbackPlan(HoodieEngineContext context, HoodieInstant instantToRollback, List<ListingBasedRollbackRequest> rollbackRequests) {
int sparkPartitions = Math.max(Math.min(rollbackRequests.size(), config.getRollbackParallelism()), 1);
context.setJobStatus(this.getClass().getSimpleName(), "Creating Rollback Plan");
return getListingBasedRollbackRequests(context, instantToRollback, rollbackRequests, sparkPartitions);
}
/**
* May be delete interested files and collect stats or collect stats only.
*
* @param context instance of {@link HoodieEngineContext} to use.
* @param instantToRollback {@link HoodieInstant} of interest for which deletion or collect stats is requested.
* @param rollbackRequests List of {@link ListingBasedRollbackRequest} to be operated on.
* @param numPartitions number of spark partitions to use for parallelism.
* @return stats collected with or w/o actual deletions.
*/
private List<HoodieRollbackRequest> getListingBasedRollbackRequests(HoodieEngineContext context, HoodieInstant instantToRollback,
List<ListingBasedRollbackRequest> rollbackRequests, int numPartitions) {
return context.map(rollbackRequests, rollbackRequest -> {
switch (rollbackRequest.getType()) {
case DELETE_DATA_FILES_ONLY: {
final FileStatus[] filesToDeletedStatus = getBaseFilesToBeDeleted(metaClient, config, instantToRollback.getTimestamp(),
rollbackRequest.getPartitionPath(), metaClient.getFs());
List<String> filesToBeDeleted = Arrays.stream(filesToDeletedStatus).map(fileStatus -> {
String fileToBeDeleted = fileStatus.getPath().toString();
// strip scheme
return fileToBeDeleted.substring(fileToBeDeleted.indexOf(":") + 1);
}).collect(Collectors.toList());
return new HoodieRollbackRequest(rollbackRequest.getPartitionPath(),
EMPTY_STRING, EMPTY_STRING, filesToBeDeleted, Collections.EMPTY_MAP);
}
case DELETE_DATA_AND_LOG_FILES: {
final FileStatus[] filesToDeletedStatus = getBaseAndLogFilesToBeDeleted(instantToRollback.getTimestamp(), rollbackRequest.getPartitionPath(), metaClient.getFs());
List<String> filesToBeDeleted = Arrays.stream(filesToDeletedStatus).map(fileStatus -> {
String fileToBeDeleted = fileStatus.getPath().toString();
// strip scheme
return fileToBeDeleted.substring(fileToBeDeleted.indexOf(":") + 1);
}).collect(Collectors.toList());
return new HoodieRollbackRequest(rollbackRequest.getPartitionPath(), EMPTY_STRING, EMPTY_STRING, filesToBeDeleted, Collections.EMPTY_MAP);
}
case APPEND_ROLLBACK_BLOCK: {
String fileId = rollbackRequest.getFileId().get();
String latestBaseInstant = rollbackRequest.getLatestBaseInstant().get();
HoodieWriteStat writeStat = rollbackRequest.getWriteStat().get();
Path fullLogFilePath = FSUtils.getPartitionPath(config.getBasePath(), writeStat.getPath());
Map<String, Long> logFilesWithBlocksToRollback =
Collections.singletonMap(fullLogFilePath.toString(), writeStat.getTotalWriteBytes());
return new HoodieRollbackRequest(rollbackRequest.getPartitionPath(), fileId, latestBaseInstant,
Collections.EMPTY_LIST, logFilesWithBlocksToRollback);
}
default:
throw new IllegalStateException("Unknown Rollback action " + rollbackRequest);
}
}, numPartitions);
}
private FileStatus[] getBaseFilesToBeDeleted(HoodieTableMetaClient metaClient, HoodieWriteConfig config,
String commit, String partitionPath, FileSystem fs) throws IOException {
LOG.info("Collecting files to be cleaned/rolledback up for path " + partitionPath + " and commit " + commit);
String basefileExtension = metaClient.getTableConfig().getBaseFileFormat().getFileExtension();
PathFilter filter = (path) -> {
if (path.toString().contains(basefileExtension)) {
String fileCommitTime = FSUtils.getCommitTime(path.getName());
return commit.equals(fileCommitTime);
}
return false;
};
return fs.listStatus(FSUtils.getPartitionPath(config.getBasePath(), partitionPath), filter);
}
private FileStatus[] getBaseAndLogFilesToBeDeleted(String commit, String partitionPath, FileSystem fs) throws IOException {
String basefileExtension = metaClient.getTableConfig().getBaseFileFormat().getFileExtension();
BaseRollbackHelper.SerializablePathFilter filter = (path) -> {
if (path.toString().endsWith(basefileExtension)) {
String fileCommitTime = FSUtils.getCommitTime(path.getName());
return commit.equals(fileCommitTime);
} else if (FSUtils.isLogFile(path)) {
// Since the baseCommitTime is the only commit for new log files, it's okay here
String fileCommitTime = FSUtils.getBaseCommitTimeFromLogPath(path);
return commit.equals(fileCommitTime);
}
return false;
};
return fs.listStatus(FSUtils.getPartitionPath(config.getBasePath(), partitionPath), filter);
}
}

View File

@@ -18,19 +18,42 @@
package org.apache.hudi.table.action.rollback;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PathFilter;
import org.apache.hudi.avro.model.HoodieRollbackRequest;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.fs.HoodieWrapperFileSystem;
import org.apache.hudi.common.model.FileSlice;
import org.apache.hudi.common.model.HoodieCommitMetadata;
import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.model.HoodieWriteStat;
import org.apache.hudi.common.model.WriteOperationType;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieRollbackException;
import org.apache.hudi.table.HoodieTable;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.jetbrains.annotations.NotNull;
import java.io.IOException;
import java.util.List;
import static org.apache.hudi.client.utils.MetadataConversionUtils.getHoodieCommitMetadata;
import static org.apache.hudi.common.util.ValidationUtils.checkArgument;
import static org.apache.hudi.table.action.rollback.BaseRollbackHelper.EMPTY_STRING;
/**
* Listing based rollback strategy to fetch list of {@link HoodieRollbackRequest}s.
@@ -39,12 +62,15 @@ public class ListingBasedRollbackStrategy implements BaseRollbackPlanActionExecu
private static final Logger LOG = LogManager.getLogger(ListingBasedRollbackStrategy.class);
protected final HoodieTable table;
protected final HoodieEngineContext context;
protected final HoodieTable<?, ?, ?, ?> table;
protected final transient HoodieEngineContext context;
protected final HoodieWriteConfig config;
protected final String instantTime;
public ListingBasedRollbackStrategy(HoodieTable table,
public ListingBasedRollbackStrategy(HoodieTable<?, ?, ?, ?> table,
HoodieEngineContext context,
HoodieWriteConfig config,
String instantTime) {
@@ -57,20 +83,260 @@ public class ListingBasedRollbackStrategy implements BaseRollbackPlanActionExecu
@Override
public List<HoodieRollbackRequest> getRollbackRequests(HoodieInstant instantToRollback) {
try {
List<ListingBasedRollbackRequest> rollbackRequests = null;
if (table.getMetaClient().getTableType() == HoodieTableType.COPY_ON_WRITE) {
rollbackRequests = RollbackUtils.generateRollbackRequestsByListingCOW(context,
table.getMetaClient().getBasePath());
} else {
rollbackRequests = RollbackUtils
.generateRollbackRequestsUsingFileListingMOR(instantToRollback, table, context);
}
List<HoodieRollbackRequest> listingBasedRollbackRequests = new ListingBasedRollbackHelper(table.getMetaClient(), config)
.getRollbackRequestsForRollbackPlan(context, instantToRollback, rollbackRequests);
return listingBasedRollbackRequests;
} catch (IOException e) {
HoodieTableMetaClient metaClient = table.getMetaClient();
List<String> partitionPaths =
FSUtils.getAllPartitionPaths(context, table.getMetaClient().getBasePath(), false, false);
int numPartitions = Math.max(Math.min(partitionPaths.size(), config.getRollbackParallelism()), 1);
context.setJobStatus(this.getClass().getSimpleName(), "Creating Listing Rollback Plan");
HoodieTableType tableType = table.getMetaClient().getTableType();
String baseFileExtension = getBaseFileExtension(metaClient);
Option<HoodieCommitMetadata> commitMetadataOptional = getHoodieCommitMetadata(metaClient, instantToRollback);
Boolean isCommitMetadataCompleted = checkCommitMetadataCompleted(instantToRollback, commitMetadataOptional);
return context.flatMap(partitionPaths, partitionPath -> {
List<HoodieRollbackRequest> hoodieRollbackRequests = new ArrayList<>(partitionPaths.size());
FileStatus[] filesToDelete =
fetchFilesFromInstant(instantToRollback, partitionPath, metaClient.getBasePath(), baseFileExtension,
metaClient.getFs(), commitMetadataOptional, isCommitMetadataCompleted);
if (HoodieTableType.COPY_ON_WRITE == tableType) {
hoodieRollbackRequests.add(getHoodieRollbackRequest(partitionPath, filesToDelete));
} else if (HoodieTableType.MERGE_ON_READ == tableType) {
String commit = instantToRollback.getTimestamp();
HoodieActiveTimeline activeTimeline = table.getMetaClient().reloadActiveTimeline();
switch (instantToRollback.getAction()) {
case HoodieTimeline.COMMIT_ACTION:
case HoodieTimeline.REPLACE_COMMIT_ACTION:
hoodieRollbackRequests.add(getHoodieRollbackRequest(partitionPath, filesToDelete));
break;
case HoodieTimeline.COMPACTION_ACTION:
// If there is no delta commit present after the current commit (if compaction), no action, else we
// need to make sure that a compaction commit rollback also deletes any log files written as part of the
// succeeding deltacommit.
boolean higherDeltaCommits =
!activeTimeline.getDeltaCommitTimeline().filterCompletedInstants().findInstantsAfter(commit, 1)
.empty();
if (higherDeltaCommits) {
// Rollback of a compaction action with no higher deltacommit means that the compaction is scheduled
// and has not yet finished. In this scenario we should delete only the newly created base files
// and not corresponding base commit log files created with this as baseCommit since updates would
// have been written to the log files.
hoodieRollbackRequests.add(getHoodieRollbackRequest(partitionPath,
listFilesToBeDeleted(instantToRollback.getTimestamp(), baseFileExtension, partitionPath,
metaClient.getFs())));
} else {
// No deltacommits present after this compaction commit (inflight or requested). In this case, we
// can also delete any log files that were created with this compaction commit as base
// commit.
hoodieRollbackRequests.add(getHoodieRollbackRequest(partitionPath, filesToDelete));
}
break;
case HoodieTimeline.DELTA_COMMIT_ACTION:
// --------------------------------------------------------------------------------------------------
// (A) The following cases are possible if index.canIndexLogFiles and/or index.isGlobal
// --------------------------------------------------------------------------------------------------
// (A.1) Failed first commit - Inserts were written to log files and HoodieWriteStat has no entries. In
// this scenario we would want to delete these log files.
// (A.2) Failed recurring commit - Inserts/Updates written to log files. In this scenario,
// HoodieWriteStat will have the baseCommitTime for the first log file written, add rollback blocks.
// (A.3) Rollback triggered for first commit - Inserts were written to the log files but the commit is
// being reverted. In this scenario, HoodieWriteStat will be `null` for the attribute prevCommitTime and
// and hence will end up deleting these log files. This is done so there are no orphan log files
// lying around.
// (A.4) Rollback triggered for recurring commits - Inserts/Updates are being rolled back, the actions
// taken in this scenario is a combination of (A.2) and (A.3)
// ---------------------------------------------------------------------------------------------------
// (B) The following cases are possible if !index.canIndexLogFiles and/or !index.isGlobal
// ---------------------------------------------------------------------------------------------------
// (B.1) Failed first commit - Inserts were written to base files and HoodieWriteStat has no entries.
// In this scenario, we delete all the base files written for the failed commit.
// (B.2) Failed recurring commits - Inserts were written to base files and updates to log files. In
// this scenario, perform (A.1) and for updates written to log files, write rollback blocks.
// (B.3) Rollback triggered for first commit - Same as (B.1)
// (B.4) Rollback triggered for recurring commits - Same as (B.2) plus we need to delete the log files
// as well if the base base file gets deleted.
HoodieCommitMetadata commitMetadata = HoodieCommitMetadata.fromBytes(
table.getMetaClient().getCommitTimeline().getInstantDetails(instantToRollback).get(),
HoodieCommitMetadata.class);
// In case all data was inserts and the commit failed, delete the file belonging to that commit
// We do not know fileIds for inserts (first inserts are either log files or base files),
// delete all files for the corresponding failed commit, if present (same as COW)
hoodieRollbackRequests.add(getHoodieRollbackRequest(partitionPath, filesToDelete));
// append rollback blocks for updates and inserts as A.2 and B.2
if (commitMetadata.getPartitionToWriteStats().containsKey(partitionPath)) {
hoodieRollbackRequests.addAll(
getRollbackRequestToAppend(partitionPath, instantToRollback, commitMetadata, table));
}
break;
default:
throw new HoodieRollbackException("Unknown listing type, during rollback of " + instantToRollback);
}
} else {
throw new HoodieRollbackException(
String.format("Unsupported table type: %s, during listing rollback of %s", tableType, instantToRollback));
}
return hoodieRollbackRequests.stream();
}, numPartitions);
} catch (Exception e) {
LOG.error("Generating rollback requests failed for " + instantToRollback.getTimestamp(), e);
throw new HoodieRollbackException("Generating rollback requests failed for " + instantToRollback.getTimestamp(), e);
}
}
private String getBaseFileExtension(HoodieTableMetaClient metaClient) {
return metaClient.getTableConfig().getBaseFileFormat().getFileExtension();
}
@NotNull
private HoodieRollbackRequest getHoodieRollbackRequest(String partitionPath, FileStatus[] filesToDeletedStatus) {
List<String> filesToDelete = getFilesToBeDeleted(filesToDeletedStatus);
return new HoodieRollbackRequest(
partitionPath, EMPTY_STRING, EMPTY_STRING, filesToDelete, Collections.emptyMap());
}
@NotNull
private List<String> getFilesToBeDeleted(FileStatus[] dataFilesToDeletedStatus) {
return Arrays.stream(dataFilesToDeletedStatus).map(fileStatus -> {
String dataFileToBeDeleted = fileStatus.getPath().toString();
// strip scheme E.g: file:/var/folders
return dataFileToBeDeleted.substring(dataFileToBeDeleted.indexOf(":") + 1);
}).collect(Collectors.toList());
}
private FileStatus[] listFilesToBeDeleted(String commit, String basefileExtension, String partitionPath,
FileSystem fs) throws IOException {
LOG.info("Collecting files to be cleaned/rolledback up for path " + partitionPath + " and commit " + commit);
PathFilter filter = (path) -> {
if (path.toString().contains(basefileExtension)) {
String fileCommitTime = FSUtils.getCommitTime(path.getName());
return commit.equals(fileCommitTime);
}
return false;
};
return fs.listStatus(FSUtils.getPartitionPath(config.getBasePath(), partitionPath), filter);
}
private FileStatus[] fetchFilesFromInstant(HoodieInstant instantToRollback, String partitionPath, String basePath,
String baseFileExtension, HoodieWrapperFileSystem fs,
Option<HoodieCommitMetadata> commitMetadataOptional,
Boolean isCommitMetadataCompleted) throws IOException {
if (isCommitMetadataCompleted) {
return fetchFilesFromCommitMetadata(instantToRollback, partitionPath, basePath, commitMetadataOptional.get(),
baseFileExtension, fs);
} else {
return fetchFilesFromListFiles(instantToRollback, partitionPath, basePath, baseFileExtension, fs);
}
}
private FileStatus[] fetchFilesFromCommitMetadata(HoodieInstant instantToRollback, String partitionPath,
String basePath, HoodieCommitMetadata commitMetadata,
String baseFileExtension, HoodieWrapperFileSystem fs)
throws IOException {
SerializablePathFilter pathFilter = getSerializablePathFilter(baseFileExtension, instantToRollback.getTimestamp());
Path[] filePaths = getFilesFromCommitMetadata(basePath, commitMetadata, partitionPath);
return fs.listStatus(filePaths, pathFilter);
}
private FileStatus[] fetchFilesFromListFiles(HoodieInstant instantToRollback, String partitionPath, String basePath,
String baseFileExtension, HoodieWrapperFileSystem fs)
throws IOException {
SerializablePathFilter pathFilter = getSerializablePathFilter(baseFileExtension, instantToRollback.getTimestamp());
Path[] filePaths = listFilesToBeDeleted(basePath, partitionPath);
return fs.listStatus(filePaths, pathFilter);
}
private Boolean checkCommitMetadataCompleted(HoodieInstant instantToRollback,
Option<HoodieCommitMetadata> commitMetadataOptional) {
return commitMetadataOptional.isPresent() && instantToRollback.isCompleted()
&& !WriteOperationType.UNKNOWN.equals(commitMetadataOptional.get().getOperationType());
}
private static Path[] listFilesToBeDeleted(String basePath, String partitionPath) {
return new Path[] {FSUtils.getPartitionPath(basePath, partitionPath)};
}
private static Path[] getFilesFromCommitMetadata(String basePath, HoodieCommitMetadata commitMetadata, String partitionPath) {
List<String> fullPaths = commitMetadata.getFullPathsByPartitionPath(basePath, partitionPath);
return fullPaths.stream().map(Path::new).toArray(Path[]::new);
}
@NotNull
private static SerializablePathFilter getSerializablePathFilter(String basefileExtension, String commit) {
return (path) -> {
if (path.toString().endsWith(basefileExtension)) {
String fileCommitTime = FSUtils.getCommitTime(path.getName());
return commit.equals(fileCommitTime);
} else if (FSUtils.isLogFile(path)) {
// Since the baseCommitTime is the only commit for new log files, it's okay here
String fileCommitTime = FSUtils.getBaseCommitTimeFromLogPath(path);
return commit.equals(fileCommitTime);
}
return false;
};
}
public static List<HoodieRollbackRequest> getRollbackRequestToAppend(String partitionPath, HoodieInstant rollbackInstant,
HoodieCommitMetadata commitMetadata, HoodieTable<?, ?, ?, ?> table) {
List<HoodieRollbackRequest> hoodieRollbackRequests = new ArrayList<>();
checkArgument(rollbackInstant.getAction().equals(HoodieTimeline.DELTA_COMMIT_ACTION));
// wStat.getPrevCommit() might not give the right commit time in the following
// scenario : If a compaction was scheduled, the new commitTime associated with the requested compaction will be
// used to write the new log files. In this case, the commit time for the log file is the compaction requested time.
// But the index (global) might store the baseCommit of the base and not the requested, hence get the
// baseCommit always by listing the file slice
// With multi writers, rollbacks could be lazy. and so we need to use getLatestFileSlicesBeforeOrOn() instead of getLatestFileSlices()
Map<String, FileSlice> latestFileSlices = table.getSliceView()
.getLatestFileSlicesBeforeOrOn(partitionPath, rollbackInstant.getTimestamp(), true)
.collect(Collectors.toMap(FileSlice::getFileId, Function.identity()));
List<HoodieWriteStat> hoodieWriteStats = commitMetadata.getPartitionToWriteStats().get(partitionPath)
.stream()
.filter(writeStat -> {
// Filter out stats without prevCommit since they are all inserts
boolean validForRollback = (writeStat != null) && (!writeStat.getPrevCommit().equals(HoodieWriteStat.NULL_COMMIT))
&& (writeStat.getPrevCommit() != null) && latestFileSlices.containsKey(writeStat.getFileId());
if (!validForRollback) {
return false;
}
FileSlice latestFileSlice = latestFileSlices.get(writeStat.getFileId());
// For sanity, log-file base-instant time can never be less than base-commit on which we are rolling back
checkArgument(
HoodieTimeline.compareTimestamps(latestFileSlice.getBaseInstantTime(),
HoodieTimeline.LESSER_THAN_OR_EQUALS, rollbackInstant.getTimestamp()),
"Log-file base-instant could not be less than the instant being rolled back");
// Command block "rolling back" the preceding block {@link HoodieCommandBlockTypeEnum#ROLLBACK_PREVIOUS_BLOCK}
// w/in the latest file-slice is appended iff base-instant of the log-file is _strictly_ less
// than the instant of the Delta Commit being rolled back. Otherwise, log-file will be cleaned up
// in a different branch of the flow.
return HoodieTimeline.compareTimestamps(latestFileSlice.getBaseInstantTime(), HoodieTimeline.LESSER_THAN, rollbackInstant.getTimestamp());
})
.collect(Collectors.toList());
for (HoodieWriteStat writeStat : hoodieWriteStats) {
FileSlice latestFileSlice = latestFileSlices.get(writeStat.getFileId());
String fileId = writeStat.getFileId();
String latestBaseInstant = latestFileSlice.getBaseInstantTime();
Path fullLogFilePath = FSUtils.getPartitionPath(table.getConfig().getBasePath(), writeStat.getPath());
Map<String, Long> logFilesWithBlocksToRollback =
Collections.singletonMap(fullLogFilePath.toString(), writeStat.getTotalWriteBytes());
hoodieRollbackRequests.add(new HoodieRollbackRequest(partitionPath, fileId, latestBaseInstant,
Collections.emptyList(), logFilesWithBlocksToRollback));
}
return hoodieRollbackRequests;
}
}

View File

@@ -21,21 +21,13 @@ package org.apache.hudi.table.action.rollback;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hudi.avro.model.HoodieRollbackPlan;
import org.apache.hudi.common.HoodieRollbackStat;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.FileSlice;
import org.apache.hudi.common.model.HoodieCommitMetadata;
import org.apache.hudi.common.model.HoodieWriteStat;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.log.block.HoodieCommandBlock;
import org.apache.hudi.common.table.log.block.HoodieLogBlock;
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.table.timeline.TimelineMetadataUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.table.HoodieTable;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
@@ -44,9 +36,6 @@ import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.function.Function;
import java.util.stream.Collectors;
import static org.apache.hudi.common.util.ValidationUtils.checkArgument;
@@ -102,160 +91,4 @@ public class RollbackUtils {
return new HoodieRollbackStat(stat1.getPartitionPath(), successDeleteFiles, failedDeleteFiles, commandBlocksCount);
}
/**
* Generate all rollback requests that needs rolling back this action without actually performing rollback for COW table type.
* @param engineContext instance of {@link HoodieEngineContext} to use.
* @param basePath base path of interest.
* @return {@link List} of {@link ListingBasedRollbackRequest}s thus collected.
*/
public static List<ListingBasedRollbackRequest> generateRollbackRequestsByListingCOW(HoodieEngineContext engineContext, String basePath) {
return FSUtils.getAllPartitionPaths(engineContext, basePath, false, false).stream()
.map(ListingBasedRollbackRequest::createRollbackRequestWithDeleteDataAndLogFilesAction)
.collect(Collectors.toList());
}
/**
* Generate all rollback requests that we need to perform for rolling back this action without actually performing rolling back for MOR table type.
*
* @param instantToRollback Instant to Rollback
* @param table instance of {@link HoodieTable} to use.
* @param context instance of {@link HoodieEngineContext} to use.
* @return list of rollback requests
*/
public static List<ListingBasedRollbackRequest> generateRollbackRequestsUsingFileListingMOR(HoodieInstant instantToRollback, HoodieTable table, HoodieEngineContext context) throws IOException {
String commit = instantToRollback.getTimestamp();
HoodieWriteConfig config = table.getConfig();
List<String> partitions = FSUtils.getAllPartitionPaths(context, table.getMetaClient().getBasePath(), false, false);
if (partitions.isEmpty()) {
return new ArrayList<>();
}
int sparkPartitions = Math.max(Math.min(partitions.size(), config.getRollbackParallelism()), 1);
context.setJobStatus(RollbackUtils.class.getSimpleName(), "Generate all rollback requests");
return context.flatMap(partitions, partitionPath -> {
HoodieActiveTimeline activeTimeline = table.getMetaClient().reloadActiveTimeline();
List<ListingBasedRollbackRequest> partitionRollbackRequests = new ArrayList<>();
switch (instantToRollback.getAction()) {
case HoodieTimeline.COMMIT_ACTION:
case HoodieTimeline.REPLACE_COMMIT_ACTION:
LOG.info("Rolling back commit action.");
partitionRollbackRequests.add(
ListingBasedRollbackRequest.createRollbackRequestWithDeleteDataAndLogFilesAction(partitionPath));
break;
case HoodieTimeline.COMPACTION_ACTION:
// If there is no delta commit present after the current commit (if compaction), no action, else we
// need to make sure that a compaction commit rollback also deletes any log files written as part of the
// succeeding deltacommit.
boolean higherDeltaCommits =
!activeTimeline.getDeltaCommitTimeline().filterCompletedInstants().findInstantsAfter(commit, 1).empty();
if (higherDeltaCommits) {
// Rollback of a compaction action with no higher deltacommit means that the compaction is scheduled
// and has not yet finished. In this scenario we should delete only the newly created base files
// and not corresponding base commit log files created with this as baseCommit since updates would
// have been written to the log files.
LOG.info("Rolling back compaction. There are higher delta commits. So only deleting data files");
partitionRollbackRequests.add(
ListingBasedRollbackRequest.createRollbackRequestWithDeleteDataFilesOnlyAction(partitionPath));
} else {
// No deltacommits present after this compaction commit (inflight or requested). In this case, we
// can also delete any log files that were created with this compaction commit as base
// commit.
LOG.info("Rolling back compaction plan. There are NO higher delta commits. So deleting both data and"
+ " log files");
partitionRollbackRequests.add(
ListingBasedRollbackRequest.createRollbackRequestWithDeleteDataAndLogFilesAction(partitionPath));
}
break;
case HoodieTimeline.DELTA_COMMIT_ACTION:
// --------------------------------------------------------------------------------------------------
// (A) The following cases are possible if index.canIndexLogFiles and/or index.isGlobal
// --------------------------------------------------------------------------------------------------
// (A.1) Failed first commit - Inserts were written to log files and HoodieWriteStat has no entries. In
// this scenario we would want to delete these log files.
// (A.2) Failed recurring commit - Inserts/Updates written to log files. In this scenario,
// HoodieWriteStat will have the baseCommitTime for the first log file written, add rollback blocks.
// (A.3) Rollback triggered for first commit - Inserts were written to the log files but the commit is
// being reverted. In this scenario, HoodieWriteStat will be `null` for the attribute prevCommitTime and
// and hence will end up deleting these log files. This is done so there are no orphan log files
// lying around.
// (A.4) Rollback triggered for recurring commits - Inserts/Updates are being rolled back, the actions
// taken in this scenario is a combination of (A.2) and (A.3)
// ---------------------------------------------------------------------------------------------------
// (B) The following cases are possible if !index.canIndexLogFiles and/or !index.isGlobal
// ---------------------------------------------------------------------------------------------------
// (B.1) Failed first commit - Inserts were written to base files and HoodieWriteStat has no entries.
// In this scenario, we delete all the base files written for the failed commit.
// (B.2) Failed recurring commits - Inserts were written to base files and updates to log files. In
// this scenario, perform (A.1) and for updates written to log files, write rollback blocks.
// (B.3) Rollback triggered for first commit - Same as (B.1)
// (B.4) Rollback triggered for recurring commits - Same as (B.2) plus we need to delete the log files
// as well if the base base file gets deleted.
HoodieCommitMetadata commitMetadata = HoodieCommitMetadata.fromBytes(
table.getMetaClient().getCommitTimeline().getInstantDetails(instantToRollback).get(),
HoodieCommitMetadata.class);
// In case all data was inserts and the commit failed, delete the file belonging to that commit
// We do not know fileIds for inserts (first inserts are either log files or base files),
// delete all files for the corresponding failed commit, if present (same as COW)
partitionRollbackRequests.add(
ListingBasedRollbackRequest.createRollbackRequestWithDeleteDataAndLogFilesAction(partitionPath));
// append rollback blocks for updates and inserts as A.2 and B.2
if (commitMetadata.getPartitionToWriteStats().containsKey(partitionPath)) {
partitionRollbackRequests
.addAll(generateAppendRollbackBlocksAction(partitionPath, instantToRollback, commitMetadata, table));
}
break;
default:
break;
}
return partitionRollbackRequests.stream();
}, Math.min(partitions.size(), sparkPartitions)).stream().filter(Objects::nonNull).collect(Collectors.toList());
}
private static List<ListingBasedRollbackRequest> generateAppendRollbackBlocksAction(String partitionPath, HoodieInstant rollbackInstant,
HoodieCommitMetadata commitMetadata, HoodieTable table) {
checkArgument(rollbackInstant.getAction().equals(HoodieTimeline.DELTA_COMMIT_ACTION));
// wStat.getPrevCommit() might not give the right commit time in the following
// scenario : If a compaction was scheduled, the new commitTime associated with the requested compaction will be
// used to write the new log files. In this case, the commit time for the log file is the compaction requested time.
// But the index (global) might store the baseCommit of the base and not the requested, hence get the
// baseCommit always by listing the file slice
// With multi writers, rollbacks could be lazy. and so we need to use getLatestFileSlicesBeforeOrOn() instead of getLatestFileSlices()
Map<String, FileSlice> latestFileSlices = table.getSliceView()
.getLatestFileSlicesBeforeOrOn(partitionPath, rollbackInstant.getTimestamp(), true)
.collect(Collectors.toMap(FileSlice::getFileId, Function.identity()));
return commitMetadata.getPartitionToWriteStats().get(partitionPath)
.stream()
.filter(writeStat -> {
// Filter out stats without prevCommit since they are all inserts
boolean validForRollback = (writeStat != null) && (!writeStat.getPrevCommit().equals(HoodieWriteStat.NULL_COMMIT))
&& (writeStat.getPrevCommit() != null) && latestFileSlices.containsKey(writeStat.getFileId());
if (!validForRollback) {
return false;
}
FileSlice latestFileSlice = latestFileSlices.get(writeStat.getFileId());
// For sanity, log-file base-instant time can never be less than base-commit on which we are rolling back
checkArgument(
HoodieTimeline.compareTimestamps(latestFileSlice.getBaseInstantTime(),
HoodieTimeline.LESSER_THAN_OR_EQUALS, rollbackInstant.getTimestamp()),
"Log-file base-instant could not be less than the instant being rolled back");
// Command block "rolling back" the preceding block {@link HoodieCommandBlockTypeEnum#ROLLBACK_PREVIOUS_BLOCK}
// w/in the latest file-slice is appended iff base-instant of the log-file is _strictly_ less
// than the instant of the Delta Commit being rolled back. Otherwise, log-file will be cleaned up
// in a different branch of the flow.
return HoodieTimeline.compareTimestamps(latestFileSlice.getBaseInstantTime(), HoodieTimeline.LESSER_THAN, rollbackInstant.getTimestamp());
})
.map(writeStat -> {
FileSlice latestFileSlice = latestFileSlices.get(writeStat.getFileId());
return ListingBasedRollbackRequest.createRollbackRequestWithAppendRollbackBlockAction(partitionPath,
writeStat.getFileId(), latestFileSlice.getBaseInstantTime(), writeStat);
})
.collect(Collectors.toList());
}
}

View File

@@ -0,0 +1,26 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi.table.action.rollback;
import org.apache.hadoop.fs.PathFilter;
import java.io.Serializable;
public interface SerializablePathFilter extends PathFilter, Serializable {
}

View File

@@ -18,14 +18,14 @@
package org.apache.hudi.table.upgrade;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hudi.avro.model.HoodieRollbackRequest;
import org.apache.hudi.common.HoodieRollbackStat;
import org.apache.hudi.common.config.ConfigProperty;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.model.IOType;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.marker.MarkerType;
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
import org.apache.hudi.common.table.timeline.HoodieInstant;
@@ -35,15 +35,10 @@ import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieRollbackException;
import org.apache.hudi.table.HoodieTable;
import org.apache.hudi.table.action.rollback.BaseRollbackHelper;
import org.apache.hudi.table.action.rollback.ListingBasedRollbackHelper;
import org.apache.hudi.table.action.rollback.ListingBasedRollbackRequest;
import org.apache.hudi.table.action.rollback.RollbackUtils;
import org.apache.hudi.table.action.rollback.ListingBasedRollbackStrategy;
import org.apache.hudi.table.marker.WriteMarkers;
import org.apache.hudi.table.marker.WriteMarkersFactory;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.Path;
import java.util.Collections;
import java.util.List;
import java.util.Map;
@@ -100,14 +95,7 @@ public class ZeroToOneUpgradeHandler implements UpgradeHandler {
writeMarkers.quietDeleteMarkerDir(context, parallelism);
// generate rollback stats
List<ListingBasedRollbackRequest> rollbackRequests;
if (table.getMetaClient().getTableType() == HoodieTableType.COPY_ON_WRITE) {
rollbackRequests = RollbackUtils.generateRollbackRequestsByListingCOW(context, table.getMetaClient().getBasePath());
} else {
rollbackRequests = RollbackUtils.generateRollbackRequestsUsingFileListingMOR(commitInstantOpt.get(), table, context);
}
List<HoodieRollbackStat> rollbackStats = getListBasedRollBackStats(table.getMetaClient(), table.getConfig(),
context, commitInstantOpt, rollbackRequests);
List<HoodieRollbackStat> rollbackStats = getListBasedRollBackStats(table, context, commitInstantOpt);
// recreate markers adhering to marker based rollback
for (HoodieRollbackStat rollbackStat : rollbackStats) {
@@ -126,12 +114,12 @@ public class ZeroToOneUpgradeHandler implements UpgradeHandler {
}
}
List<HoodieRollbackStat> getListBasedRollBackStats(
HoodieTableMetaClient metaClient, HoodieWriteConfig config, HoodieEngineContext context,
Option<HoodieInstant> commitInstantOpt, List<ListingBasedRollbackRequest> rollbackRequests) {
List<HoodieRollbackRequest> hoodieRollbackRequests = new ListingBasedRollbackHelper(metaClient, config)
.getRollbackRequestsForRollbackPlan(context, commitInstantOpt.get(), rollbackRequests);
return new BaseRollbackHelper(metaClient, config).collectRollbackStats(context, commitInstantOpt.get(), hoodieRollbackRequests);
List<HoodieRollbackStat> getListBasedRollBackStats(HoodieTable<?, ?, ?, ?> table, HoodieEngineContext context, Option<HoodieInstant> commitInstantOpt) {
List<HoodieRollbackRequest> hoodieRollbackRequests =
new ListingBasedRollbackStrategy(table, context, table.getConfig(), commitInstantOpt.get().getTimestamp())
.getRollbackRequests(commitInstantOpt.get());
return new BaseRollbackHelper(table.getMetaClient(), table.getConfig())
.collectRollbackStats(context, commitInstantOpt.get(), hoodieRollbackRequests);
}
/**
@@ -143,7 +131,7 @@ public class ZeroToOneUpgradeHandler implements UpgradeHandler {
* @param table {@link HoodieTable} instance to use
* @return the marker file name thus curated.
*/
private static String getFileNameForMarkerFromLogFile(String logFilePath, HoodieTable table) {
private static String getFileNameForMarkerFromLogFile(String logFilePath, HoodieTable<?, ?, ?, ?> table) {
Path logPath = new Path(table.getMetaClient().getBasePath(), logFilePath);
String fileId = FSUtils.getFileIdFromLogPath(logPath);
String baseInstant = FSUtils.getBaseCommitTimeFromLogPath(logPath);

View File

@@ -45,7 +45,6 @@ import org.apache.hudi.index.HoodieIndex;
import org.apache.hudi.table.HoodieTable;
import org.apache.hudi.table.marker.WriteMarkersFactory;
import org.apache.hudi.testutils.MetadataMergeWriteStatus;
import org.apache.spark.api.java.JavaRDD;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
@@ -125,8 +124,8 @@ public class TestMergeOnReadRollbackActionExecutor extends HoodieClientRollbackT
for (Map.Entry<String, HoodieRollbackPartitionMetadata> entry : rollbackMetadata.entrySet()) {
HoodieRollbackPartitionMetadata meta = entry.getValue();
assertTrue(meta.getFailedDeleteFiles() == null || meta.getFailedDeleteFiles().size() == 0);
assertTrue(meta.getSuccessDeleteFiles() == null || meta.getSuccessDeleteFiles().size() == 0);
assertEquals(0, meta.getFailedDeleteFiles().size());
assertEquals(0, meta.getSuccessDeleteFiles().size());
}
//4. assert file group after rollback, and compare to the rollbackstat

View File

@@ -137,6 +137,19 @@ public class HoodieCommitMetadata implements Serializable {
return fullPaths;
}
public List<String> getFullPathsByPartitionPath(String basePath, String partitionPath) {
HashSet<String> fullPaths = new HashSet<>();
if (getPartitionToWriteStats().get(partitionPath) != null) {
for (HoodieWriteStat stat : getPartitionToWriteStats().get(partitionPath)) {
if ((stat.getFileId() != null)) {
String fullPath = FSUtils.getPartitionPath(basePath, stat.getPath()).toString();
fullPaths.add(fullPath);
}
}
}
return new ArrayList<>(fullPaths);
}
public Map<HoodieFileGroupId, String> getFileGroupIdAndFullPaths(String basePath) {
Map<HoodieFileGroupId, String> fileGroupIdToFullPaths = new HashMap<>();
for (Map.Entry<String, List<HoodieWriteStat>> entry : getPartitionToWriteStats().entrySet()) {