1
0

[HUDI-2433] Refactor rollback actions in hudi-client module (#3664)

This commit is contained in:
Y Ethan Guo
2021-09-15 15:52:43 -07:00
committed by GitHub
parent 86a7351c39
commit 916f12b7dd
34 changed files with 512 additions and 1462 deletions

View File

@@ -33,30 +33,39 @@ import org.apache.log4j.Logger;
import java.util.ArrayList;
import java.util.List;
public abstract class BaseCopyOnWriteRollbackActionExecutor<T extends HoodieRecordPayload, I, K, O> extends BaseRollbackActionExecutor<T, I, K, O> {
public class CopyOnWriteRollbackActionExecutor<T extends HoodieRecordPayload, I, K, O> extends BaseRollbackActionExecutor<T, I, K, O> {
private static final Logger LOG = LogManager.getLogger(BaseCopyOnWriteRollbackActionExecutor.class);
private static final Logger LOG = LogManager.getLogger(CopyOnWriteRollbackActionExecutor.class);
public BaseCopyOnWriteRollbackActionExecutor(HoodieEngineContext context,
HoodieWriteConfig config,
HoodieTable<T, I, K, O> table,
String instantTime,
HoodieInstant commitInstant,
boolean deleteInstants) {
public CopyOnWriteRollbackActionExecutor(HoodieEngineContext context,
HoodieWriteConfig config,
HoodieTable<T, I, K, O> table,
String instantTime,
HoodieInstant commitInstant,
boolean deleteInstants) {
super(context, config, table, instantTime, commitInstant, deleteInstants);
}
public BaseCopyOnWriteRollbackActionExecutor(HoodieEngineContext context,
HoodieWriteConfig config,
HoodieTable<T, I, K, O> table,
String instantTime,
HoodieInstant commitInstant,
boolean deleteInstants,
boolean skipTimelinePublish,
boolean useMarkerBasedStrategy) {
public CopyOnWriteRollbackActionExecutor(HoodieEngineContext context,
HoodieWriteConfig config,
HoodieTable<T, I, K, O> table,
String instantTime,
HoodieInstant commitInstant,
boolean deleteInstants,
boolean skipTimelinePublish,
boolean useMarkerBasedStrategy) {
super(context, config, table, instantTime, commitInstant, deleteInstants, skipTimelinePublish, useMarkerBasedStrategy);
}
@Override
protected RollbackStrategy getRollbackStrategy() {
if (useMarkerBasedStrategy) {
return new MarkerBasedRollbackStrategy(table, context, config, instantTime);
} else {
return this::executeRollbackUsingFileListing;
}
}
@Override
protected List<HoodieRollbackStat> executeRollback() {
HoodieTimer rollbackTimer = new HoodieTimer();
@@ -88,4 +97,11 @@ public abstract class BaseCopyOnWriteRollbackActionExecutor<T extends HoodieReco
LOG.info("Time(in ms) taken to finish rollback " + rollbackTimer.endTimer());
return stats;
}
@Override
protected List<HoodieRollbackStat> executeRollbackUsingFileListing(HoodieInstant instantToRollback) {
List<ListingBasedRollbackRequest> rollbackRequests = RollbackUtils.generateRollbackRequestsByListingCOW(
context, table.getMetaClient().getBasePath(), config);
return new ListingBasedRollbackHelper(table.getMetaClient(), config).performRollback(context, instantToRollback, rollbackRequests);
}
}

View File

@@ -0,0 +1,249 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.hudi.table.action.rollback;
import org.apache.hudi.common.HoodieRollbackStat;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.HoodieFileFormat;
import org.apache.hudi.common.model.HoodieLogFile;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.log.HoodieLogFormat;
import org.apache.hudi.common.table.log.block.HoodieCommandBlock;
import org.apache.hudi.common.table.log.block.HoodieLogBlock.HeaderMetadataType;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.util.collection.ImmutablePair;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.exception.HoodieRollbackException;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.PathFilter;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import java.io.IOException;
import java.io.Serializable;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.stream.Collectors;
/**
* Performs Rollback of Hoodie Tables.
*/
public class ListingBasedRollbackHelper implements Serializable {
private static final Logger LOG = LogManager.getLogger(ListingBasedRollbackHelper.class);
private final HoodieTableMetaClient metaClient;
private final HoodieWriteConfig config;
public ListingBasedRollbackHelper(HoodieTableMetaClient metaClient, HoodieWriteConfig config) {
this.metaClient = metaClient;
this.config = config;
}
/**
* Performs all rollback actions that we have collected in parallel.
*/
public List<HoodieRollbackStat> performRollback(HoodieEngineContext context, HoodieInstant instantToRollback,
List<ListingBasedRollbackRequest> rollbackRequests) {
int parallelism = Math.max(Math.min(rollbackRequests.size(), config.getRollbackParallelism()), 1);
context.setJobStatus(this.getClass().getSimpleName(), "Perform rollback actions");
return context.mapToPairAndReduceByKey(rollbackRequests,
rollbackRequest -> maybeDeleteAndCollectStats(rollbackRequest, instantToRollback, true),
RollbackUtils::mergeRollbackStat,
parallelism);
}
/**
* Collect all file info that needs to be rollbacked.
*/
public List<HoodieRollbackStat> collectRollbackStats(HoodieEngineContext context, HoodieInstant instantToRollback,
List<ListingBasedRollbackRequest> rollbackRequests) {
int parallelism = Math.max(Math.min(rollbackRequests.size(), config.getRollbackParallelism()), 1);
context.setJobStatus(this.getClass().getSimpleName(), "Collect rollback stats for upgrade/downgrade");
return context.mapToPairAndReduceByKey(rollbackRequests,
rollbackRequest -> maybeDeleteAndCollectStats(rollbackRequest, instantToRollback, false),
RollbackUtils::mergeRollbackStat,
parallelism);
}
/**
* May be delete interested files and collect stats or collect stats only.
*
* @param instantToRollback {@link HoodieInstant} of interest for which deletion or collect stats is requested.
* @param doDelete {@code true} if deletion has to be done.
* {@code false} if only stats are to be collected w/o performing any deletes.
* @return stats collected with or w/o actual deletions.
*/
private Pair<String, HoodieRollbackStat> maybeDeleteAndCollectStats(ListingBasedRollbackRequest rollbackRequest,
HoodieInstant instantToRollback,
boolean doDelete) throws IOException {
switch (rollbackRequest.getType()) {
case DELETE_DATA_FILES_ONLY: {
final Map<FileStatus, Boolean> filesToDeletedStatus = deleteBaseFiles(metaClient, config, instantToRollback.getTimestamp(),
rollbackRequest.getPartitionPath(), doDelete);
return new ImmutablePair<>(rollbackRequest.getPartitionPath(),
HoodieRollbackStat.newBuilder().withPartitionPath(rollbackRequest.getPartitionPath())
.withDeletedFileResults(filesToDeletedStatus).build());
}
case DELETE_DATA_AND_LOG_FILES: {
final Map<FileStatus, Boolean> filesToDeletedStatus = deleteBaseAndLogFiles(metaClient, config, instantToRollback.getTimestamp(), rollbackRequest.getPartitionPath(), doDelete);
return new ImmutablePair<>(rollbackRequest.getPartitionPath(),
HoodieRollbackStat.newBuilder().withPartitionPath(rollbackRequest.getPartitionPath())
.withDeletedFileResults(filesToDeletedStatus).build());
}
case APPEND_ROLLBACK_BLOCK: {
String fileId = rollbackRequest.getFileId().get();
String latestBaseInstant = rollbackRequest.getLatestBaseInstant().get();
// collect all log files that is supposed to be deleted with this rollback
Map<FileStatus, Long> writtenLogFileSizeMap = FSUtils.getAllLogFiles(metaClient.getFs(),
FSUtils.getPartitionPath(config.getBasePath(), rollbackRequest.getPartitionPath()),
fileId, HoodieFileFormat.HOODIE_LOG.getFileExtension(), latestBaseInstant)
.collect(Collectors.toMap(HoodieLogFile::getFileStatus, value -> value.getFileStatus().getLen()));
HoodieLogFormat.Writer writer = null;
try {
writer = HoodieLogFormat.newWriterBuilder()
.onParentPath(FSUtils.getPartitionPath(metaClient.getBasePath(), rollbackRequest.getPartitionPath()))
.withFileId(fileId)
.overBaseCommit(latestBaseInstant)
.withFs(metaClient.getFs())
.withFileExtension(HoodieLogFile.DELTA_EXTENSION).build();
// generate metadata
if (doDelete) {
Map<HeaderMetadataType, String> header = generateHeader(instantToRollback.getTimestamp());
// if update belongs to an existing log file
writer.appendBlock(new HoodieCommandBlock(header));
}
} catch (IOException | InterruptedException io) {
throw new HoodieRollbackException("Failed to rollback for instant " + instantToRollback, io);
} finally {
try {
if (writer != null) {
writer.close();
}
} catch (IOException io) {
throw new HoodieIOException("Error appending rollback block..", io);
}
}
// This step is intentionally done after writer is closed. Guarantees that
// getFileStatus would reflect correct stats and FileNotFoundException is not thrown in
// cloud-storage : HUDI-168
Map<FileStatus, Long> filesToNumBlocksRollback = Collections.singletonMap(
metaClient.getFs().getFileStatus(Objects.requireNonNull(writer).getLogFile().getPath()),
1L
);
return new ImmutablePair<>(rollbackRequest.getPartitionPath(),
HoodieRollbackStat.newBuilder().withPartitionPath(rollbackRequest.getPartitionPath())
.withRollbackBlockAppendResults(filesToNumBlocksRollback)
.withWrittenLogFileSizeMap(writtenLogFileSizeMap).build());
}
default:
throw new IllegalStateException("Unknown Rollback action " + rollbackRequest);
}
}
/**
* Common method used for cleaning out base files under a partition path during rollback of a set of commits.
*/
private Map<FileStatus, Boolean> deleteBaseAndLogFiles(HoodieTableMetaClient metaClient, HoodieWriteConfig config,
String commit, String partitionPath, boolean doDelete) throws IOException {
LOG.info("Cleaning path " + partitionPath);
String basefileExtension = metaClient.getTableConfig().getBaseFileFormat().getFileExtension();
SerializablePathFilter filter = (path) -> {
if (path.toString().endsWith(basefileExtension)) {
String fileCommitTime = FSUtils.getCommitTime(path.getName());
return commit.equals(fileCommitTime);
} else if (FSUtils.isLogFile(path)) {
// Since the baseCommitTime is the only commit for new log files, it's okay here
String fileCommitTime = FSUtils.getBaseCommitTimeFromLogPath(path);
return commit.equals(fileCommitTime);
}
return false;
};
final Map<FileStatus, Boolean> results = new HashMap<>();
FileSystem fs = metaClient.getFs();
FileStatus[] toBeDeleted = fs.listStatus(FSUtils.getPartitionPath(config.getBasePath(), partitionPath), filter);
for (FileStatus file : toBeDeleted) {
if (doDelete) {
boolean success = fs.delete(file.getPath(), false);
results.put(file, success);
LOG.info("Delete file " + file.getPath() + "\t" + success);
} else {
results.put(file, true);
}
}
return results;
}
/**
* Common method used for cleaning out base files under a partition path during rollback of a set of commits.
*/
private Map<FileStatus, Boolean> deleteBaseFiles(HoodieTableMetaClient metaClient, HoodieWriteConfig config,
String commit, String partitionPath, boolean doDelete) throws IOException {
final Map<FileStatus, Boolean> results = new HashMap<>();
LOG.info("Cleaning path " + partitionPath);
FileSystem fs = metaClient.getFs();
String basefileExtension = metaClient.getTableConfig().getBaseFileFormat().getFileExtension();
PathFilter filter = (path) -> {
if (path.toString().contains(basefileExtension)) {
String fileCommitTime = FSUtils.getCommitTime(path.getName());
return commit.equals(fileCommitTime);
}
return false;
};
FileStatus[] toBeDeleted = fs.listStatus(FSUtils.getPartitionPath(config.getBasePath(), partitionPath), filter);
for (FileStatus file : toBeDeleted) {
if (doDelete) {
boolean success = fs.delete(file.getPath(), false);
results.put(file, success);
LOG.info("Delete file " + file.getPath() + "\t" + success);
} else {
results.put(file, true);
}
}
return results;
}
private Map<HeaderMetadataType, String> generateHeader(String commit) {
// generate metadata
Map<HeaderMetadataType, String> header = new HashMap<>(3);
header.put(HeaderMetadataType.INSTANT_TIME, metaClient.getActiveTimeline().lastInstant().get().getTimestamp());
header.put(HeaderMetadataType.TARGET_INSTANT_TIME, commit);
header.put(HeaderMetadataType.COMMAND_BLOCK_TYPE,
String.valueOf(HoodieCommandBlock.HoodieCommandBlockTypeEnum.ROLLBACK_PREVIOUS_BLOCK.ordinal()));
return header;
}
public interface SerializablePathFilter extends PathFilter, Serializable {
}
}

View File

@@ -20,34 +20,43 @@ package org.apache.hudi.table.action.rollback;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hudi.common.HoodieRollbackStat;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.HoodieFileFormat;
import org.apache.hudi.common.model.HoodieLogFile;
import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.model.IOType;
import org.apache.hudi.common.table.log.HoodieLogFormat;
import org.apache.hudi.common.table.log.block.HoodieCommandBlock;
import org.apache.hudi.common.table.log.block.HoodieLogBlock;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.util.collection.ImmutablePair;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.exception.HoodieRollbackException;
import org.apache.hudi.table.HoodieTable;
import org.apache.hudi.table.marker.MarkerBasedRollbackUtils;
import org.apache.hudi.table.marker.WriteMarkers;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import java.io.IOException;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.stream.Collectors;
/**
* Performs rollback using marker files generated during the write..
*/
public abstract class AbstractMarkerBasedRollbackStrategy<T extends HoodieRecordPayload, I, K, O> implements BaseRollbackActionExecutor.RollbackStrategy {
public class MarkerBasedRollbackStrategy implements BaseRollbackActionExecutor.RollbackStrategy {
private static final Logger LOG = LogManager.getLogger(AbstractMarkerBasedRollbackStrategy.class);
private static final Logger LOG = LogManager.getLogger(MarkerBasedRollbackStrategy.class);
protected final HoodieTable<T, I, K, O> table;
protected final HoodieTable<?, ?, ?, ?> table;
protected final transient HoodieEngineContext context;
@@ -57,7 +66,7 @@ public abstract class AbstractMarkerBasedRollbackStrategy<T extends HoodieRecord
protected final String instantTime;
public AbstractMarkerBasedRollbackStrategy(HoodieTable<T, I, K, O> table, HoodieEngineContext context, HoodieWriteConfig config, String instantTime) {
public MarkerBasedRollbackStrategy(HoodieTable<?, ?, ?, ?> table, HoodieEngineContext context, HoodieWriteConfig config, String instantTime) {
this.table = table;
this.context = context;
this.basePath = table.getMetaClient().getBasePath();
@@ -124,8 +133,8 @@ public abstract class AbstractMarkerBasedRollbackStrategy<T extends HoodieRecord
// the information of files appended to is required for metadata sync
Map<FileStatus, Long> filesToNumBlocksRollback = Collections.singletonMap(
table.getMetaClient().getFs().getFileStatus(Objects.requireNonNull(writer).getLogFile().getPath()),
1L);
table.getMetaClient().getFs().getFileStatus(Objects.requireNonNull(writer).getLogFile().getPath()),
1L);
return HoodieRollbackStat.newBuilder()
.withPartitionPath(partitionPath)
@@ -135,13 +144,48 @@ public abstract class AbstractMarkerBasedRollbackStrategy<T extends HoodieRecord
/**
* Returns written log file size map for the respective baseCommitTime to assist in metadata table syncing.
* @param partitionPath partition path of interest
* @param baseCommitTime base commit time of interest
* @param fileId fileId of interest
*
* @param partitionPathStr partition path of interest
* @param baseCommitTime base commit time of interest
* @param fileId fileId of interest
* @return Map<FileStatus, File size>
* @throws IOException
*/
protected Map<FileStatus, Long> getWrittenLogFileSizeMap(String partitionPath, String baseCommitTime, String fileId) throws IOException {
return Collections.EMPTY_MAP;
protected Map<FileStatus, Long> getWrittenLogFileSizeMap(String partitionPathStr, String baseCommitTime, String fileId) throws IOException {
// collect all log files that is supposed to be deleted with this rollback
return FSUtils.getAllLogFiles(table.getMetaClient().getFs(),
FSUtils.getPartitionPath(config.getBasePath(), partitionPathStr), fileId, HoodieFileFormat.HOODIE_LOG.getFileExtension(), baseCommitTime)
.collect(Collectors.toMap(HoodieLogFile::getFileStatus, value -> value.getFileStatus().getLen()));
}
@Override
public List<HoodieRollbackStat> execute(HoodieInstant instantToRollback) {
try {
List<String> markerPaths = MarkerBasedRollbackUtils.getAllMarkerPaths(
table, context, instantToRollback.getTimestamp(), config.getRollbackParallelism());
int parallelism = Math.max(Math.min(markerPaths.size(), config.getRollbackParallelism()), 1);
context.setJobStatus(this.getClass().getSimpleName(), "Rolling back using marker files");
return context.mapToPairAndReduceByKey(markerPaths, markerFilePath -> {
String typeStr = markerFilePath.substring(markerFilePath.lastIndexOf(".") + 1);
IOType type = IOType.valueOf(typeStr);
HoodieRollbackStat rollbackStat;
switch (type) {
case MERGE:
rollbackStat = undoMerge(WriteMarkers.stripMarkerSuffix(markerFilePath));
break;
case APPEND:
rollbackStat = undoAppend(WriteMarkers.stripMarkerSuffix(markerFilePath), instantToRollback);
break;
case CREATE:
rollbackStat = undoCreate(WriteMarkers.stripMarkerSuffix(markerFilePath));
break;
default:
throw new HoodieRollbackException("Unknown marker type, during rollback of " + instantToRollback);
}
return new ImmutablePair<>(rollbackStat.getPartitionPath(), rollbackStat);
}, RollbackUtils::mergeRollbackStat, parallelism);
} catch (Exception e) {
throw new HoodieRollbackException("Error rolling back using marker files written for " + instantToRollback, e);
}
}
}

View File

@@ -7,13 +7,14 @@
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.hudi.table.action.rollback;
@@ -24,37 +25,49 @@ import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.util.HoodieTimer;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.table.HoodieTable;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
public abstract class BaseMergeOnReadRollbackActionExecutor<T extends HoodieRecordPayload, I, K, O> extends BaseRollbackActionExecutor<T, I, K, O> {
public class MergeOnReadRollbackActionExecutor<T extends HoodieRecordPayload, I, K, O> extends BaseRollbackActionExecutor<T, I, K, O> {
private static final Logger LOG = LogManager.getLogger(BaseMergeOnReadRollbackActionExecutor.class);
private static final Logger LOG = LogManager.getLogger(MergeOnReadRollbackActionExecutor.class);
public BaseMergeOnReadRollbackActionExecutor(HoodieEngineContext context,
HoodieWriteConfig config,
HoodieTable<T, I, K, O> table,
String instantTime,
HoodieInstant commitInstant,
boolean deleteInstants) {
public MergeOnReadRollbackActionExecutor(HoodieEngineContext context,
HoodieWriteConfig config,
HoodieTable<T, I, K, O> table,
String instantTime,
HoodieInstant commitInstant,
boolean deleteInstants) {
super(context, config, table, instantTime, commitInstant, deleteInstants);
}
public BaseMergeOnReadRollbackActionExecutor(HoodieEngineContext context,
HoodieWriteConfig config,
HoodieTable<T, I, K, O> table,
String instantTime,
HoodieInstant commitInstant,
boolean deleteInstants,
boolean skipTimelinePublish,
boolean useMarkerBasedStrategy) {
public MergeOnReadRollbackActionExecutor(HoodieEngineContext context,
HoodieWriteConfig config,
HoodieTable<T, I, K, O> table,
String instantTime,
HoodieInstant commitInstant,
boolean deleteInstants,
boolean skipTimelinePublish,
boolean useMarkerBasedStrategy) {
super(context, config, table, instantTime, commitInstant, deleteInstants, skipTimelinePublish, useMarkerBasedStrategy);
}
@Override
protected RollbackStrategy getRollbackStrategy() {
if (useMarkerBasedStrategy) {
return new MarkerBasedRollbackStrategy(table, context, config, instantTime);
} else {
return this::executeRollbackUsingFileListing;
}
}
@Override
protected List<HoodieRollbackStat> executeRollback() {
HoodieTimer rollbackTimer = new HoodieTimer();
@@ -93,4 +106,15 @@ public abstract class BaseMergeOnReadRollbackActionExecutor<T extends HoodieReco
LOG.info("Time(in ms) taken to finish rollback " + rollbackTimer.endTimer());
return allRollbackStats;
}
@Override
protected List<HoodieRollbackStat> executeRollbackUsingFileListing(HoodieInstant resolvedInstant) {
List<ListingBasedRollbackRequest> rollbackRequests;
try {
rollbackRequests = RollbackUtils.generateRollbackRequestsUsingFileListingMOR(resolvedInstant, table, context);
} catch (IOException e) {
throw new HoodieIOException("Error generating rollback requests by file listing.", e);
}
return new ListingBasedRollbackHelper(table.getMetaClient(), config).performRollback(context, resolvedInstant, rollbackRequests);
}
}

View File

@@ -23,6 +23,7 @@ import org.apache.hudi.common.config.SerializableConfiguration;
import org.apache.hudi.common.engine.EngineProperty;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.engine.TaskContextSupplier;
import org.apache.hudi.common.function.SerializableBiFunction;
import org.apache.hudi.common.function.SerializableConsumer;
import org.apache.hudi.common.function.SerializableFunction;
import org.apache.hudi.common.function.SerializablePairFunction;
@@ -32,6 +33,7 @@ import org.apache.flink.api.common.functions.RuntimeContext;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import java.util.stream.Stream;
@@ -43,6 +45,7 @@ import static org.apache.hudi.common.function.FunctionWrapper.throwingFlatMapWra
import static org.apache.hudi.common.function.FunctionWrapper.throwingForeachWrapper;
import static org.apache.hudi.common.function.FunctionWrapper.throwingMapToPairWrapper;
import static org.apache.hudi.common.function.FunctionWrapper.throwingMapWrapper;
import static org.apache.hudi.common.function.FunctionWrapper.throwingReduceWrapper;
/**
* A flink engine implementation of HoodieEngineContext.
@@ -74,6 +77,15 @@ public class HoodieFlinkEngineContext extends HoodieEngineContext {
return data.stream().parallel().map(throwingMapWrapper(func)).collect(Collectors.toList());
}
@Override
public <I, K, V> List<V> mapToPairAndReduceByKey(List<I> data, SerializablePairFunction<I, K, V> mapToPairFunc, SerializableBiFunction<V, V, V> reduceFunc, int parallelism) {
return data.stream().parallel().map(throwingMapToPairWrapper(mapToPairFunc))
.collect(Collectors.groupingBy(p -> p.getKey())).values().stream()
.map(list -> list.stream().map(e -> e.getValue()).reduce(throwingReduceWrapper(reduceFunc)).orElse(null))
.filter(Objects::nonNull)
.collect(Collectors.toList());
}
@Override
public <I, O> List<O> flatMap(List<I> data, SerializableFunction<I, Stream<O>> func, int parallelism) {
return data.stream().parallel().flatMap(throwingFlatMapWrapper(func)).collect(Collectors.toList());

View File

@@ -53,7 +53,7 @@ import org.apache.hudi.table.action.commit.FlinkInsertPreppedCommitActionExecuto
import org.apache.hudi.table.action.commit.FlinkMergeHelper;
import org.apache.hudi.table.action.commit.FlinkUpsertCommitActionExecutor;
import org.apache.hudi.table.action.commit.FlinkUpsertPreppedCommitActionExecutor;
import org.apache.hudi.table.action.rollback.FlinkCopyOnWriteRollbackActionExecutor;
import org.apache.hudi.table.action.rollback.CopyOnWriteRollbackActionExecutor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -305,7 +305,7 @@ public class HoodieFlinkCopyOnWriteTable<T extends HoodieRecordPayload> extends
@Override
public HoodieRollbackMetadata rollback(HoodieEngineContext context, String rollbackInstantTime, HoodieInstant commitInstant, boolean deleteInstants) {
return new FlinkCopyOnWriteRollbackActionExecutor(context, config, this, rollbackInstantTime, commitInstant, deleteInstants).execute();
return new CopyOnWriteRollbackActionExecutor(context, config, this, rollbackInstantTime, commitInstant, deleteInstants).execute();
}
@Override

View File

@@ -37,7 +37,7 @@ import org.apache.hudi.table.action.commit.delta.FlinkUpsertDeltaCommitActionExe
import org.apache.hudi.table.action.commit.delta.FlinkUpsertPreppedDeltaCommitActionExecutor;
import org.apache.hudi.table.action.compact.BaseScheduleCompactionActionExecutor;
import org.apache.hudi.table.action.compact.FlinkScheduleCompactionActionExecutor;
import org.apache.hudi.table.action.rollback.FlinkMergeOnReadRollbackActionExecutor;
import org.apache.hudi.table.action.rollback.MergeOnReadRollbackActionExecutor;
import java.util.List;
import java.util.Map;
@@ -108,7 +108,7 @@ public class HoodieFlinkMergeOnReadTable<T extends HoodieRecordPayload>
@Override
public HoodieRollbackMetadata rollback(HoodieEngineContext context, String rollbackInstantTime, HoodieInstant commitInstant, boolean deleteInstants) {
return new FlinkMergeOnReadRollbackActionExecutor(context, config, this, rollbackInstantTime, commitInstant, deleteInstants).execute();
return new MergeOnReadRollbackActionExecutor(context, config, this, rollbackInstantTime, commitInstant, deleteInstants).execute();
}
}

View File

@@ -1,71 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi.table.action.rollback;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.common.HoodieRollbackStat;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.table.HoodieTable;
import java.util.List;
@SuppressWarnings("checkstyle:LineLength")
public class FlinkCopyOnWriteRollbackActionExecutor<T extends HoodieRecordPayload> extends
BaseCopyOnWriteRollbackActionExecutor<T, List<HoodieRecord<T>>, List<HoodieKey>, List<WriteStatus>> {
public FlinkCopyOnWriteRollbackActionExecutor(HoodieEngineContext context,
HoodieWriteConfig config,
HoodieTable<T, List<HoodieRecord<T>>, List<HoodieKey>, List<WriteStatus>> table,
String instantTime,
HoodieInstant commitInstant,
boolean deleteInstants) {
super(context, config, table, instantTime, commitInstant, deleteInstants);
}
public FlinkCopyOnWriteRollbackActionExecutor(HoodieEngineContext context,
HoodieWriteConfig config,
HoodieTable<T, List<HoodieRecord<T>>, List<HoodieKey>, List<WriteStatus>> table,
String instantTime,
HoodieInstant commitInstant,
boolean deleteInstants,
boolean skipTimelinePublish,
boolean useMarkerBasedStrategy) {
super(context, config, table, instantTime, commitInstant, deleteInstants, skipTimelinePublish, useMarkerBasedStrategy);
}
@Override
protected BaseRollbackActionExecutor.RollbackStrategy getRollbackStrategy() {
if (useMarkerBasedStrategy) {
return new FlinkMarkerBasedRollbackStrategy(table, context, config, instantTime);
} else {
return this::executeRollbackUsingFileListing;
}
}
@Override
protected List<HoodieRollbackStat> executeRollbackUsingFileListing(HoodieInstant instantToRollback) {
List<ListingBasedRollbackRequest> rollbackRequests = RollbackUtils.generateRollbackRequestsByListingCOW(
context, table.getMetaClient().getBasePath(), config);
return new ListingBasedRollbackHelper(table.getMetaClient(), config).performRollback(context, instantToRollback, rollbackRequests);
}
}

View File

@@ -1,90 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi.table.action.rollback;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.common.HoodieRollbackStat;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.HoodieFileFormat;
import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.model.HoodieLogFile;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.model.IOType;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieRollbackException;
import org.apache.hudi.table.HoodieTable;
import org.apache.hudi.table.marker.MarkerBasedRollbackUtils;
import org.apache.hudi.table.marker.WriteMarkers;
import org.apache.hadoop.fs.FileStatus;
import java.io.IOException;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import scala.Tuple2;
@SuppressWarnings("checkstyle:LineLength")
public class FlinkMarkerBasedRollbackStrategy<T extends HoodieRecordPayload> extends AbstractMarkerBasedRollbackStrategy<T, List<HoodieRecord<T>>, List<HoodieKey>, List<WriteStatus>> {
public FlinkMarkerBasedRollbackStrategy(HoodieTable<T, List<HoodieRecord<T>>, List<HoodieKey>, List<WriteStatus>> table, HoodieEngineContext context, HoodieWriteConfig config, String instantTime) {
super(table, context, config, instantTime);
}
@Override
public List<HoodieRollbackStat> execute(HoodieInstant instantToRollback) {
try {
List<String> markerPaths = MarkerBasedRollbackUtils.getAllMarkerPaths(
table, context, instantToRollback.getTimestamp(), config.getRollbackParallelism());
List<HoodieRollbackStat> rollbackStats = context.map(markerPaths, markerFilePath -> {
String typeStr = markerFilePath.substring(markerFilePath.lastIndexOf(".") + 1);
IOType type = IOType.valueOf(typeStr);
switch (type) {
case MERGE:
return undoMerge(WriteMarkers.stripMarkerSuffix(markerFilePath));
case APPEND:
return undoAppend(WriteMarkers.stripMarkerSuffix(markerFilePath), instantToRollback);
case CREATE:
return undoCreate(WriteMarkers.stripMarkerSuffix(markerFilePath));
default:
throw new HoodieRollbackException("Unknown marker type, during rollback of " + instantToRollback);
}
}, 0);
return rollbackStats.stream().map(rollbackStat -> new Tuple2<>(rollbackStat.getPartitionPath(), rollbackStat))
.collect(Collectors.groupingBy(Tuple2::_1))
.values()
.stream()
.map(x -> x.stream().map(y -> y._2).reduce(RollbackUtils::mergeRollbackStat).get())
.collect(Collectors.toList());
} catch (Exception e) {
throw new HoodieRollbackException("Error rolling back using marker files written for " + instantToRollback, e);
}
}
protected Map<FileStatus, Long> getWrittenLogFileSizeMap(String partitionPathStr, String baseCommitTime, String fileId) throws IOException {
// collect all log files that is supposed to be deleted with this rollback
return FSUtils.getAllLogFiles(table.getMetaClient().getFs(),
FSUtils.getPartitionPath(config.getBasePath(), partitionPathStr), fileId, HoodieFileFormat.HOODIE_LOG.getFileExtension(), baseCommitTime)
.collect(Collectors.toMap(HoodieLogFile::getFileStatus, value -> value.getFileStatus().getLen()));
}
}

View File

@@ -1,77 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi.table.action.rollback;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.common.HoodieRollbackStat;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.table.HoodieTable;
import java.io.IOException;
import java.util.List;
@SuppressWarnings("checkstyle:LineLength")
public class FlinkMergeOnReadRollbackActionExecutor<T extends HoodieRecordPayload> extends
BaseMergeOnReadRollbackActionExecutor<T, List<HoodieRecord<T>>, List<HoodieKey>, List<WriteStatus>> {
public FlinkMergeOnReadRollbackActionExecutor(HoodieEngineContext context,
HoodieWriteConfig config,
HoodieTable<T, List<HoodieRecord<T>>, List<HoodieKey>, List<WriteStatus>> table,
String instantTime,
HoodieInstant commitInstant,
boolean deleteInstants) {
super(context, config, table, instantTime, commitInstant, deleteInstants);
}
public FlinkMergeOnReadRollbackActionExecutor(HoodieEngineContext context,
HoodieWriteConfig config,
HoodieTable<T, List<HoodieRecord<T>>, List<HoodieKey>, List<WriteStatus>> table,
String instantTime,
HoodieInstant commitInstant,
boolean deleteInstants,
boolean skipTimelinePublish,
boolean useMarkerBasedStrategy) {
super(context, config, table, instantTime, commitInstant, deleteInstants, skipTimelinePublish, useMarkerBasedStrategy);
}
@Override
protected RollbackStrategy getRollbackStrategy() {
if (useMarkerBasedStrategy) {
return new FlinkMarkerBasedRollbackStrategy(table, context, config, instantTime);
} else {
return this::executeRollbackUsingFileListing;
}
}
@Override
protected List<HoodieRollbackStat> executeRollbackUsingFileListing(HoodieInstant resolvedInstant) {
List<ListingBasedRollbackRequest> rollbackRequests;
try {
rollbackRequests = RollbackUtils.generateRollbackRequestsUsingFileListingMOR(resolvedInstant, table, context);
} catch (IOException e) {
throw new HoodieIOException("Error generating rollback requests by file listing.", e);
}
return new ListingBasedRollbackHelper(table.getMetaClient(), config).performRollback(context, resolvedInstant, rollbackRequests);
}
}

View File

@@ -1,250 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi.table.action.rollback;
import org.apache.hudi.common.HoodieRollbackStat;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.HoodieFileFormat;
import org.apache.hudi.common.model.HoodieLogFile;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.log.HoodieLogFormat;
import org.apache.hudi.common.table.log.block.HoodieCommandBlock;
import org.apache.hudi.common.table.log.block.HoodieLogBlock;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.util.collection.ImmutablePair;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.exception.HoodieRollbackException;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.PathFilter;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import java.io.IOException;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.stream.Collectors;
/**
* Performs Rollback of Hoodie Tables.
*/
public class ListingBasedRollbackHelper implements Serializable {
private static final Logger LOG = LogManager.getLogger(ListingBasedRollbackHelper.class);
private final HoodieTableMetaClient metaClient;
private final HoodieWriteConfig config;
public ListingBasedRollbackHelper(HoodieTableMetaClient metaClient, HoodieWriteConfig config) {
this.metaClient = metaClient;
this.config = config;
}
/**
* Performs all rollback actions that we have collected in parallel.
*/
public List<HoodieRollbackStat> performRollback(HoodieEngineContext context, HoodieInstant instantToRollback, List<ListingBasedRollbackRequest> rollbackRequests) {
Map<String, HoodieRollbackStat> partitionPathRollbackStatsPairs = maybeDeleteAndCollectStats(context, instantToRollback, rollbackRequests, true);
Map<String, List<Pair<String, HoodieRollbackStat>>> collect = partitionPathRollbackStatsPairs.entrySet()
.stream()
.map(x -> Pair.of(x.getKey(), x.getValue())).collect(Collectors.groupingBy(Pair::getLeft));
return collect.values().stream()
.map(pairs -> pairs.stream().map(Pair::getRight).reduce(RollbackUtils::mergeRollbackStat).orElse(null))
.filter(Objects::nonNull)
.collect(Collectors.toList());
}
/**
* Collect all file info that needs to be rollbacked.
*/
public List<HoodieRollbackStat> collectRollbackStats(HoodieEngineContext context, HoodieInstant instantToRollback, List<ListingBasedRollbackRequest> rollbackRequests) {
Map<String, HoodieRollbackStat> partitionPathRollbackStatsPairs = maybeDeleteAndCollectStats(context, instantToRollback, rollbackRequests, false);
return new ArrayList<>(partitionPathRollbackStatsPairs.values());
}
/**
* May be delete interested files and collect stats or collect stats only.
*
* @param context instance of {@link HoodieEngineContext} to use.
* @param instantToRollback {@link HoodieInstant} of interest for which deletion or collect stats is requested.
* @param rollbackRequests List of {@link ListingBasedRollbackRequest} to be operated on.
* @param doDelete {@code true} if deletion has to be done. {@code false} if only stats are to be collected w/o performing any deletes.
* @return stats collected with or w/o actual deletions.
*/
Map<String, HoodieRollbackStat> maybeDeleteAndCollectStats(HoodieEngineContext context,
HoodieInstant instantToRollback,
List<ListingBasedRollbackRequest> rollbackRequests,
boolean doDelete) {
return context.mapToPair(rollbackRequests, rollbackRequest -> {
switch (rollbackRequest.getType()) {
case DELETE_DATA_FILES_ONLY: {
final Map<FileStatus, Boolean> filesToDeletedStatus = deleteBaseFiles(metaClient, config, instantToRollback.getTimestamp(),
rollbackRequest.getPartitionPath(), doDelete);
return new ImmutablePair<>(rollbackRequest.getPartitionPath(),
HoodieRollbackStat.newBuilder().withPartitionPath(rollbackRequest.getPartitionPath())
.withDeletedFileResults(filesToDeletedStatus).build());
}
case DELETE_DATA_AND_LOG_FILES: {
final Map<FileStatus, Boolean> filesToDeletedStatus = deleteBaseAndLogFiles(metaClient, config, instantToRollback.getTimestamp(), rollbackRequest.getPartitionPath(), doDelete);
return new ImmutablePair<>(rollbackRequest.getPartitionPath(),
HoodieRollbackStat.newBuilder().withPartitionPath(rollbackRequest.getPartitionPath())
.withDeletedFileResults(filesToDeletedStatus).build());
}
case APPEND_ROLLBACK_BLOCK: {
String fileId = rollbackRequest.getFileId().get();
String latestBaseInstant = rollbackRequest.getLatestBaseInstant().get();
// collect all log files that is supposed to be deleted with this rollback
Map<FileStatus, Long> writtenLogFileSizeMap = FSUtils.getAllLogFiles(metaClient.getFs(),
FSUtils.getPartitionPath(config.getBasePath(), rollbackRequest.getPartitionPath()),
fileId, HoodieFileFormat.HOODIE_LOG.getFileExtension(), latestBaseInstant)
.collect(Collectors.toMap(HoodieLogFile::getFileStatus, value -> value.getFileStatus().getLen()));
HoodieLogFormat.Writer writer = null;
try {
writer = HoodieLogFormat.newWriterBuilder()
.onParentPath(FSUtils.getPartitionPath(metaClient.getBasePath(), rollbackRequest.getPartitionPath()))
.withFileId(fileId)
.overBaseCommit(latestBaseInstant)
.withFs(metaClient.getFs())
.withFileExtension(HoodieLogFile.DELTA_EXTENSION).build();
// generate metadata
if (doDelete) {
Map<HoodieLogBlock.HeaderMetadataType, String> header = generateHeader(instantToRollback.getTimestamp());
// if update belongs to an existing log file
writer.appendBlock(new HoodieCommandBlock(header));
}
} catch (IOException | InterruptedException io) {
throw new HoodieRollbackException("Failed to rollback for instant " + instantToRollback, io);
} finally {
try {
if (writer != null) {
writer.close();
}
} catch (IOException io) {
throw new HoodieIOException("Error appending rollback block..", io);
}
}
// This step is intentionally done after writer is closed. Guarantees that
// getFileStatus would reflect correct stats and FileNotFoundException is not thrown in
// cloud-storage : HUDI-168
Map<FileStatus, Long> filesToNumBlocksRollback = Collections.singletonMap(
metaClient.getFs().getFileStatus(Objects.requireNonNull(writer).getLogFile().getPath()),
1L
);
return new ImmutablePair<>(rollbackRequest.getPartitionPath(),
HoodieRollbackStat.newBuilder().withPartitionPath(rollbackRequest.getPartitionPath())
.withRollbackBlockAppendResults(filesToNumBlocksRollback)
.withWrittenLogFileSizeMap(writtenLogFileSizeMap).build());
}
default:
throw new IllegalStateException("Unknown Rollback action " + rollbackRequest);
}
}, 0);
}
/**
* Common method used for cleaning out base files under a partition path during rollback of a set of commits.
*/
private Map<FileStatus, Boolean> deleteBaseAndLogFiles(HoodieTableMetaClient metaClient, HoodieWriteConfig config,
String commit, String partitionPath, boolean doDelete) throws IOException {
LOG.info("Cleaning path " + partitionPath);
String basefileExtension = metaClient.getTableConfig().getBaseFileFormat().getFileExtension();
SerializablePathFilter filter = (path) -> {
if (path.toString().endsWith(basefileExtension)) {
String fileCommitTime = FSUtils.getCommitTime(path.getName());
return commit.equals(fileCommitTime);
} else if (FSUtils.isLogFile(path)) {
// Since the baseCommitTime is the only commit for new log files, it's okay here
String fileCommitTime = FSUtils.getBaseCommitTimeFromLogPath(path);
return commit.equals(fileCommitTime);
}
return false;
};
final Map<FileStatus, Boolean> results = new HashMap<>();
FileSystem fs = metaClient.getFs();
FileStatus[] toBeDeleted = fs.listStatus(FSUtils.getPartitionPath(config.getBasePath(), partitionPath), filter);
for (FileStatus file : toBeDeleted) {
if (doDelete) {
boolean success = fs.delete(file.getPath(), false);
results.put(file, success);
LOG.info("Delete file " + file.getPath() + "\t" + success);
} else {
results.put(file, true);
}
}
return results;
}
/**
* Common method used for cleaning out base files under a partition path during rollback of a set of commits.
*/
private Map<FileStatus, Boolean> deleteBaseFiles(HoodieTableMetaClient metaClient, HoodieWriteConfig config,
String commit, String partitionPath, boolean doDelete) throws IOException {
final Map<FileStatus, Boolean> results = new HashMap<>();
LOG.info("Cleaning path " + partitionPath);
FileSystem fs = metaClient.getFs();
String basefileExtension = metaClient.getTableConfig().getBaseFileFormat().getFileExtension();
PathFilter filter = (path) -> {
if (path.toString().contains(basefileExtension)) {
String fileCommitTime = FSUtils.getCommitTime(path.getName());
return commit.equals(fileCommitTime);
}
return false;
};
FileStatus[] toBeDeleted = fs.listStatus(FSUtils.getPartitionPath(config.getBasePath(), partitionPath), filter);
for (FileStatus file : toBeDeleted) {
if (doDelete) {
boolean success = fs.delete(file.getPath(), false);
results.put(file, success);
LOG.info("Delete file " + file.getPath() + "\t" + success);
} else {
results.put(file, true);
}
}
return results;
}
private Map<HoodieLogBlock.HeaderMetadataType, String> generateHeader(String commit) {
// generate metadata
Map<HoodieLogBlock.HeaderMetadataType, String> header = new HashMap<>(3);
header.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, metaClient.getActiveTimeline().lastInstant().get().getTimestamp());
header.put(HoodieLogBlock.HeaderMetadataType.TARGET_INSTANT_TIME, commit);
header.put(HoodieLogBlock.HeaderMetadataType.COMMAND_BLOCK_TYPE,
String.valueOf(HoodieCommandBlock.HoodieCommandBlockTypeEnum.ROLLBACK_PREVIOUS_BLOCK.ordinal()));
return header;
}
public interface SerializablePathFilter extends PathFilter, Serializable {
}
}

View File

@@ -27,8 +27,8 @@ import org.apache.hudi.common.util.Option;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.table.HoodieFlinkTable;
import org.apache.hudi.table.HoodieTable;
import org.apache.hudi.table.action.rollback.ListingBasedRollbackHelper;
import org.apache.hudi.table.action.rollback.ListingBasedRollbackRequest;
import org.apache.hudi.table.action.rollback.ListingBasedRollbackHelper;
import java.util.List;

View File

@@ -23,6 +23,7 @@ import org.apache.hudi.common.config.SerializableConfiguration;
import org.apache.hudi.common.engine.EngineProperty;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.engine.TaskContextSupplier;
import org.apache.hudi.common.function.SerializableBiFunction;
import org.apache.hudi.common.function.SerializableConsumer;
import org.apache.hudi.common.function.SerializableFunction;
import org.apache.hudi.common.function.SerializablePairFunction;
@@ -40,6 +41,7 @@ import static org.apache.hudi.common.function.FunctionWrapper.throwingFlatMapWra
import static org.apache.hudi.common.function.FunctionWrapper.throwingForeachWrapper;
import static org.apache.hudi.common.function.FunctionWrapper.throwingMapToPairWrapper;
import static org.apache.hudi.common.function.FunctionWrapper.throwingMapWrapper;
import static org.apache.hudi.common.function.FunctionWrapper.throwingReduceWrapper;
/**
* A java engine implementation of HoodieEngineContext.
@@ -59,6 +61,14 @@ public class HoodieJavaEngineContext extends HoodieEngineContext {
return data.stream().parallel().map(throwingMapWrapper(func)).collect(toList());
}
@Override
public <I, K, V> List<V> mapToPairAndReduceByKey(List<I> data, SerializablePairFunction<I, K, V> mapToPairFunc, SerializableBiFunction<V, V, V> reduceFunc, int parallelism) {
return data.stream().parallel().map(throwingMapToPairWrapper(mapToPairFunc))
.collect(Collectors.groupingBy(p -> p.getKey())).values().stream()
.map(list -> list.stream().map(e -> e.getValue()).reduce(throwingReduceWrapper(reduceFunc)).get())
.collect(Collectors.toList());
}
@Override
public <I, O> List<O> flatMap(List<I> data, SerializableFunction<I, Stream<O>> func, int parallelism) {
return data.stream().parallel().flatMap(throwingFlatMapWrapper(func)).collect(toList());

View File

@@ -50,7 +50,7 @@ import org.apache.hudi.table.action.commit.JavaInsertPreppedCommitActionExecutor
import org.apache.hudi.table.action.commit.JavaUpsertCommitActionExecutor;
import org.apache.hudi.table.action.commit.JavaUpsertPreppedCommitActionExecutor;
import org.apache.hudi.table.action.restore.JavaCopyOnWriteRestoreActionExecutor;
import org.apache.hudi.table.action.rollback.JavaCopyOnWriteRollbackActionExecutor;
import org.apache.hudi.table.action.rollback.CopyOnWriteRollbackActionExecutor;
import org.apache.hudi.table.action.savepoint.SavepointActionExecutor;
import java.util.List;
@@ -193,7 +193,7 @@ public class HoodieJavaCopyOnWriteTable<T extends HoodieRecordPayload> extends H
String rollbackInstantTime,
HoodieInstant commitInstant,
boolean deleteInstants) {
return new JavaCopyOnWriteRollbackActionExecutor(
return new CopyOnWriteRollbackActionExecutor(
context, config, this, rollbackInstantTime, commitInstant, deleteInstants).execute();
}

View File

@@ -30,7 +30,7 @@ import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieRollbackException;
import org.apache.hudi.table.HoodieTable;
import org.apache.hudi.table.action.rollback.JavaCopyOnWriteRollbackActionExecutor;
import org.apache.hudi.table.action.rollback.CopyOnWriteRollbackActionExecutor;
import java.util.List;
@@ -48,7 +48,7 @@ public class JavaCopyOnWriteRestoreActionExecutor<T extends HoodieRecordPayload>
@Override
protected HoodieRollbackMetadata rollbackInstant(HoodieInstant instantToRollback) {
table.getMetaClient().reloadActiveTimeline();
JavaCopyOnWriteRollbackActionExecutor rollbackActionExecutor = new JavaCopyOnWriteRollbackActionExecutor(
CopyOnWriteRollbackActionExecutor rollbackActionExecutor = new CopyOnWriteRollbackActionExecutor(
context,
config,
table,

View File

@@ -1,72 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi.table.action.rollback;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.common.HoodieRollbackStat;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.table.HoodieTable;
import java.util.List;
@SuppressWarnings("checkstyle:LineLength")
public class JavaCopyOnWriteRollbackActionExecutor<T extends HoodieRecordPayload> extends
BaseCopyOnWriteRollbackActionExecutor<T, List<HoodieRecord<T>>, List<HoodieKey>, List<WriteStatus>> {
public JavaCopyOnWriteRollbackActionExecutor(HoodieEngineContext context,
HoodieWriteConfig config,
HoodieTable<T, List<HoodieRecord<T>>, List<HoodieKey>, List<WriteStatus>> table,
String instantTime,
HoodieInstant commitInstant,
boolean deleteInstants) {
super(context, config, table, instantTime, commitInstant, deleteInstants);
}
public JavaCopyOnWriteRollbackActionExecutor(HoodieEngineContext context,
HoodieWriteConfig config,
HoodieTable<T, List<HoodieRecord<T>>, List<HoodieKey>, List<WriteStatus>> table,
String instantTime,
HoodieInstant commitInstant,
boolean deleteInstants,
boolean skipTimelinePublish,
boolean useMarkerBasedStrategy) {
super(context, config, table, instantTime, commitInstant, deleteInstants, skipTimelinePublish, useMarkerBasedStrategy);
}
@Override
protected BaseRollbackActionExecutor.RollbackStrategy getRollbackStrategy() {
if (useMarkerBasedStrategy) {
return new JavaMarkerBasedRollbackStrategy(table, context, config, instantTime);
} else {
return this::executeRollbackUsingFileListing;
}
}
@Override
protected List<HoodieRollbackStat> executeRollbackUsingFileListing(HoodieInstant instantToRollback) {
List<ListingBasedRollbackRequest> rollbackRequests = RollbackUtils.generateRollbackRequestsByListingCOW(
context, table.getMetaClient().getBasePath(), config);
return new JavaListingBasedRollbackHelper(table.getMetaClient(), config)
.performRollback(context, instantToRollback, rollbackRequests);
}
}

View File

@@ -1,237 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi.table.action.rollback;
import org.apache.hudi.common.HoodieRollbackStat;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.HoodieLogFile;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.log.HoodieLogFormat;
import org.apache.hudi.common.table.log.block.HoodieCommandBlock;
import org.apache.hudi.common.table.log.block.HoodieLogBlock;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.util.collection.ImmutablePair;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.exception.HoodieRollbackException;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.PathFilter;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import java.io.IOException;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.stream.Collectors;
/**
* Performs Rollback of Hoodie Tables.
*/
public class JavaListingBasedRollbackHelper implements Serializable {
private static final Logger LOG = LogManager.getLogger(JavaListingBasedRollbackHelper.class);
private final HoodieTableMetaClient metaClient;
private final HoodieWriteConfig config;
public JavaListingBasedRollbackHelper(HoodieTableMetaClient metaClient, HoodieWriteConfig config) {
this.metaClient = metaClient;
this.config = config;
}
/**
* Performs all rollback actions that we have collected in parallel.
*/
public List<HoodieRollbackStat> performRollback(HoodieEngineContext context, HoodieInstant instantToRollback, List<ListingBasedRollbackRequest> rollbackRequests) {
Map<String, HoodieRollbackStat> partitionPathRollbackStatsPairs = maybeDeleteAndCollectStats(context, instantToRollback, rollbackRequests, true);
Map<String, List<Pair<String, HoodieRollbackStat>>> collect = partitionPathRollbackStatsPairs.entrySet()
.stream()
.map(x -> Pair.of(x.getKey(), x.getValue())).collect(Collectors.groupingBy(Pair::getLeft));
return collect.values().stream()
.map(pairs -> pairs.stream().map(Pair::getRight).reduce(RollbackUtils::mergeRollbackStat).orElse(null))
.filter(Objects::nonNull)
.collect(Collectors.toList());
}
/**
* Collect all file info that needs to be rollbacked.
*/
public List<HoodieRollbackStat> collectRollbackStats(HoodieEngineContext context, HoodieInstant instantToRollback, List<ListingBasedRollbackRequest> rollbackRequests) {
Map<String, HoodieRollbackStat> partitionPathRollbackStatsPairs = maybeDeleteAndCollectStats(context, instantToRollback, rollbackRequests, false);
return new ArrayList<>(partitionPathRollbackStatsPairs.values());
}
/**
* May be delete interested files and collect stats or collect stats only.
*
* @param context instance of {@link HoodieEngineContext} to use.
* @param instantToRollback {@link HoodieInstant} of interest for which deletion or collect stats is requested.
* @param rollbackRequests List of {@link ListingBasedRollbackRequest} to be operated on.
* @param doDelete {@code true} if deletion has to be done. {@code false} if only stats are to be collected w/o performing any deletes.
* @return stats collected with or w/o actual deletions.
*/
Map<String, HoodieRollbackStat> maybeDeleteAndCollectStats(HoodieEngineContext context,
HoodieInstant instantToRollback,
List<ListingBasedRollbackRequest> rollbackRequests,
boolean doDelete) {
return context.mapToPair(rollbackRequests, rollbackRequest -> {
switch (rollbackRequest.getType()) {
case DELETE_DATA_FILES_ONLY: {
final Map<FileStatus, Boolean> filesToDeletedStatus = deleteBaseFiles(metaClient, config, instantToRollback.getTimestamp(),
rollbackRequest.getPartitionPath(), doDelete);
return new ImmutablePair<>(rollbackRequest.getPartitionPath(),
HoodieRollbackStat.newBuilder().withPartitionPath(rollbackRequest.getPartitionPath())
.withDeletedFileResults(filesToDeletedStatus).build());
}
case DELETE_DATA_AND_LOG_FILES: {
final Map<FileStatus, Boolean> filesToDeletedStatus = deleteBaseAndLogFiles(metaClient, config, instantToRollback.getTimestamp(), rollbackRequest.getPartitionPath(), doDelete);
return new ImmutablePair<>(rollbackRequest.getPartitionPath(),
HoodieRollbackStat.newBuilder().withPartitionPath(rollbackRequest.getPartitionPath())
.withDeletedFileResults(filesToDeletedStatus).build());
}
case APPEND_ROLLBACK_BLOCK: {
HoodieLogFormat.Writer writer = null;
try {
writer = HoodieLogFormat.newWriterBuilder()
.onParentPath(FSUtils.getPartitionPath(metaClient.getBasePath(), rollbackRequest.getPartitionPath()))
.withFileId(rollbackRequest.getFileId().get())
.overBaseCommit(rollbackRequest.getLatestBaseInstant().get()).withFs(metaClient.getFs())
.withFileExtension(HoodieLogFile.DELTA_EXTENSION).build();
// generate metadata
if (doDelete) {
Map<HoodieLogBlock.HeaderMetadataType, String> header = generateHeader(instantToRollback.getTimestamp());
// if update belongs to an existing log file
writer.appendBlock(new HoodieCommandBlock(header));
}
} catch (IOException | InterruptedException io) {
throw new HoodieRollbackException("Failed to rollback for instant " + instantToRollback, io);
} finally {
try {
if (writer != null) {
writer.close();
}
} catch (IOException io) {
throw new HoodieIOException("Error appending rollback block..", io);
}
}
// This step is intentionally done after writer is closed. Guarantees that
// getFileStatus would reflect correct stats and FileNotFoundException is not thrown in
// cloud-storage : HUDI-168
Map<FileStatus, Long> filesToNumBlocksRollback = Collections.singletonMap(
metaClient.getFs().getFileStatus(Objects.requireNonNull(writer).getLogFile().getPath()), 1L
);
return new ImmutablePair<>(rollbackRequest.getPartitionPath(),
HoodieRollbackStat.newBuilder().withPartitionPath(rollbackRequest.getPartitionPath())
.withRollbackBlockAppendResults(filesToNumBlocksRollback).build());
}
default:
throw new IllegalStateException("Unknown Rollback action " + rollbackRequest);
}
}, 0);
}
/**
* Common method used for cleaning out base files under a partition path during rollback of a set of commits.
*/
private Map<FileStatus, Boolean> deleteBaseAndLogFiles(HoodieTableMetaClient metaClient, HoodieWriteConfig config,
String commit, String partitionPath, boolean doDelete) throws IOException {
LOG.info("Cleaning path " + partitionPath);
String basefileExtension = metaClient.getTableConfig().getBaseFileFormat().getFileExtension();
SerializablePathFilter filter = (path) -> {
if (path.toString().endsWith(basefileExtension)) {
String fileCommitTime = FSUtils.getCommitTime(path.getName());
return commit.equals(fileCommitTime);
} else if (FSUtils.isLogFile(path)) {
// Since the baseCommitTime is the only commit for new log files, it's okay here
String fileCommitTime = FSUtils.getBaseCommitTimeFromLogPath(path);
return commit.equals(fileCommitTime);
}
return false;
};
final Map<FileStatus, Boolean> results = new HashMap<>();
FileSystem fs = metaClient.getFs();
FileStatus[] toBeDeleted = fs.listStatus(FSUtils.getPartitionPath(config.getBasePath(), partitionPath), filter);
for (FileStatus file : toBeDeleted) {
if (doDelete) {
boolean success = fs.delete(file.getPath(), false);
results.put(file, success);
LOG.info("Delete file " + file.getPath() + "\t" + success);
} else {
results.put(file, true);
}
}
return results;
}
/**
* Common method used for cleaning out base files under a partition path during rollback of a set of commits.
*/
private Map<FileStatus, Boolean> deleteBaseFiles(HoodieTableMetaClient metaClient, HoodieWriteConfig config,
String commit, String partitionPath, boolean doDelete) throws IOException {
final Map<FileStatus, Boolean> results = new HashMap<>();
LOG.info("Cleaning path " + partitionPath);
FileSystem fs = metaClient.getFs();
String basefileExtension = metaClient.getTableConfig().getBaseFileFormat().getFileExtension();
PathFilter filter = (path) -> {
if (path.toString().contains(basefileExtension)) {
String fileCommitTime = FSUtils.getCommitTime(path.getName());
return commit.equals(fileCommitTime);
}
return false;
};
FileStatus[] toBeDeleted = fs.listStatus(FSUtils.getPartitionPath(config.getBasePath(), partitionPath), filter);
for (FileStatus file : toBeDeleted) {
if (doDelete) {
boolean success = fs.delete(file.getPath(), false);
results.put(file, success);
LOG.info("Delete file " + file.getPath() + "\t" + success);
} else {
results.put(file, true);
}
}
return results;
}
private Map<HoodieLogBlock.HeaderMetadataType, String> generateHeader(String commit) {
// generate metadata
Map<HoodieLogBlock.HeaderMetadataType, String> header = new HashMap<>(3);
header.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, metaClient.getActiveTimeline().lastInstant().get().getTimestamp());
header.put(HoodieLogBlock.HeaderMetadataType.TARGET_INSTANT_TIME, commit);
header.put(HoodieLogBlock.HeaderMetadataType.COMMAND_BLOCK_TYPE,
String.valueOf(HoodieCommandBlock.HoodieCommandBlockTypeEnum.ROLLBACK_PREVIOUS_BLOCK.ordinal()));
return header;
}
public interface SerializablePathFilter extends PathFilter, Serializable {
}
}

View File

@@ -1,78 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi.table.action.rollback;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.common.HoodieRollbackStat;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.model.IOType;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieRollbackException;
import org.apache.hudi.table.HoodieTable;
import org.apache.hudi.table.marker.MarkerBasedRollbackUtils;
import org.apache.hudi.table.marker.WriteMarkers;
import java.util.List;
import java.util.stream.Collectors;
@SuppressWarnings("checkstyle:LineLength")
public class JavaMarkerBasedRollbackStrategy<T extends HoodieRecordPayload> extends AbstractMarkerBasedRollbackStrategy<T, List<HoodieRecord<T>>, List<HoodieKey>, List<WriteStatus>> {
public JavaMarkerBasedRollbackStrategy(HoodieTable<T, List<HoodieRecord<T>>, List<HoodieKey>, List<WriteStatus>> table,
HoodieEngineContext context,
HoodieWriteConfig config,
String instantTime) {
super(table, context, config, instantTime);
}
@Override
public List<HoodieRollbackStat> execute(HoodieInstant instantToRollback) {
try {
List<String> markerPaths = MarkerBasedRollbackUtils.getAllMarkerPaths(
table, context, instantToRollback.getTimestamp(), config.getRollbackParallelism());
List<HoodieRollbackStat> rollbackStats = context.map(markerPaths, markerFilePath -> {
String typeStr = markerFilePath.substring(markerFilePath.lastIndexOf(".") + 1);
IOType type = IOType.valueOf(typeStr);
switch (type) {
case MERGE:
return undoMerge(WriteMarkers.stripMarkerSuffix(markerFilePath));
case APPEND:
return undoAppend(WriteMarkers.stripMarkerSuffix(markerFilePath), instantToRollback);
case CREATE:
return undoCreate(WriteMarkers.stripMarkerSuffix(markerFilePath));
default:
throw new HoodieRollbackException("Unknown marker type, during rollback of " + instantToRollback);
}
}, 0);
return rollbackStats.stream().map(rollbackStat -> Pair.of(rollbackStat.getPartitionPath(), rollbackStat))
.collect(Collectors.groupingBy(Pair::getKey))
.values()
.stream()
.map(x -> x.stream().map(y -> y.getValue()).reduce(RollbackUtils::mergeRollbackStat).get())
.collect(Collectors.toList());
} catch (Exception e) {
throw new HoodieRollbackException("Error rolling back using marker files written for " + instantToRollback, e);
}
}
}

View File

@@ -22,6 +22,7 @@ import org.apache.hudi.client.SparkTaskContextSupplier;
import org.apache.hudi.common.config.SerializableConfiguration;
import org.apache.hudi.common.engine.EngineProperty;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.function.SerializableBiFunction;
import org.apache.hudi.common.function.SerializableConsumer;
import org.apache.hudi.common.function.SerializableFunction;
import org.apache.hudi.common.function.SerializablePairFunction;
@@ -73,6 +74,14 @@ public class HoodieSparkEngineContext extends HoodieEngineContext {
return javaSparkContext.parallelize(data, parallelism).map(func::apply).collect();
}
@Override
public <I, K, V> List<V> mapToPairAndReduceByKey(List<I> data, SerializablePairFunction<I, K, V> mapToPairFunc, SerializableBiFunction<V, V, V> reduceFunc, int parallelism) {
return javaSparkContext.parallelize(data, parallelism).mapToPair(input -> {
Pair<K, V> pair = mapToPairFunc.call(input);
return new Tuple2<>(pair.getLeft(), pair.getRight());
}).reduceByKey(reduceFunc::apply).map(Tuple2::_2).collect();
}
@Override
public <I, O> List<O> flatMap(List<I> data, SerializableFunction<I, Stream<O>> func, int parallelism) {
return javaSparkContext.parallelize(data, parallelism).flatMap(x -> func.apply(x).iterator()).collect();

View File

@@ -65,7 +65,7 @@ import org.apache.hudi.table.action.commit.SparkMergeHelper;
import org.apache.hudi.table.action.commit.SparkUpsertCommitActionExecutor;
import org.apache.hudi.table.action.commit.SparkUpsertPreppedCommitActionExecutor;
import org.apache.hudi.table.action.restore.SparkCopyOnWriteRestoreActionExecutor;
import org.apache.hudi.table.action.rollback.SparkCopyOnWriteRollbackActionExecutor;
import org.apache.hudi.table.action.rollback.CopyOnWriteRollbackActionExecutor;
import org.apache.hudi.table.action.savepoint.SavepointActionExecutor;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
@@ -247,7 +247,7 @@ public class HoodieSparkCopyOnWriteTable<T extends HoodieRecordPayload> extends
@Override
public HoodieRollbackMetadata rollback(HoodieEngineContext context, String rollbackInstantTime, HoodieInstant commitInstant, boolean deleteInstants) {
return new SparkCopyOnWriteRollbackActionExecutor((HoodieSparkEngineContext) context, config, this, rollbackInstantTime, commitInstant, deleteInstants).execute();
return new CopyOnWriteRollbackActionExecutor((HoodieSparkEngineContext) context, config, this, rollbackInstantTime, commitInstant, deleteInstants).execute();
}
@Override

View File

@@ -49,7 +49,8 @@ import org.apache.hudi.table.action.deltacommit.SparkUpsertDeltaCommitActionExec
import org.apache.hudi.table.action.deltacommit.SparkUpsertPreppedDeltaCommitActionExecutor;
import org.apache.hudi.table.action.compact.BaseScheduleCompactionActionExecutor;
import org.apache.hudi.table.action.restore.SparkMergeOnReadRestoreActionExecutor;
import org.apache.hudi.table.action.rollback.SparkMergeOnReadRollbackActionExecutor;
import org.apache.hudi.table.action.rollback.MergeOnReadRollbackActionExecutor;
import org.apache.spark.api.java.JavaRDD;
import java.util.List;
@@ -146,7 +147,7 @@ public class HoodieSparkMergeOnReadTable<T extends HoodieRecordPayload> extends
String rollbackInstantTime,
HoodieInstant commitInstant,
boolean deleteInstants) {
return new SparkMergeOnReadRollbackActionExecutor(context, config, this, rollbackInstantTime, commitInstant, deleteInstants).execute();
return new MergeOnReadRollbackActionExecutor(context, config, this, rollbackInstantTime, commitInstant, deleteInstants).execute();
}
@Override

View File

@@ -30,7 +30,7 @@ import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieRollbackException;
import org.apache.hudi.table.HoodieTable;
import org.apache.hudi.table.action.rollback.SparkCopyOnWriteRollbackActionExecutor;
import org.apache.hudi.table.action.rollback.CopyOnWriteRollbackActionExecutor;
import org.apache.spark.api.java.JavaRDD;
@@ -49,7 +49,7 @@ public class SparkCopyOnWriteRestoreActionExecutor<T extends HoodieRecordPayload
@Override
protected HoodieRollbackMetadata rollbackInstant(HoodieInstant instantToRollback) {
table.getMetaClient().reloadActiveTimeline();
SparkCopyOnWriteRollbackActionExecutor rollbackActionExecutor = new SparkCopyOnWriteRollbackActionExecutor(
CopyOnWriteRollbackActionExecutor rollbackActionExecutor = new CopyOnWriteRollbackActionExecutor(
(HoodieSparkEngineContext) context,
config,
table,

View File

@@ -29,7 +29,7 @@ import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.table.HoodieTable;
import org.apache.hudi.table.action.rollback.SparkMergeOnReadRollbackActionExecutor;
import org.apache.hudi.table.action.rollback.MergeOnReadRollbackActionExecutor;
import org.apache.spark.api.java.JavaRDD;
@@ -48,7 +48,7 @@ public class SparkMergeOnReadRestoreActionExecutor<T extends HoodieRecordPayload
@Override
protected HoodieRollbackMetadata rollbackInstant(HoodieInstant instantToRollback) {
table.getMetaClient().reloadActiveTimeline();
SparkMergeOnReadRollbackActionExecutor rollbackActionExecutor = new SparkMergeOnReadRollbackActionExecutor(
MergeOnReadRollbackActionExecutor rollbackActionExecutor = new MergeOnReadRollbackActionExecutor(
context,
config,
table,

View File

@@ -1,252 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi.table.action.rollback;
import org.apache.hudi.client.common.HoodieSparkEngineContext;
import org.apache.hudi.common.HoodieRollbackStat;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.HoodieFileFormat;
import org.apache.hudi.common.model.HoodieLogFile;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.log.HoodieLogFormat;
import org.apache.hudi.common.table.log.HoodieLogFormat.Writer;
import org.apache.hudi.common.table.log.block.HoodieCommandBlock;
import org.apache.hudi.common.table.log.block.HoodieCommandBlock.HoodieCommandBlockTypeEnum;
import org.apache.hudi.common.table.log.block.HoodieLogBlock.HeaderMetadataType;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.exception.HoodieRollbackException;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.PathFilter;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaSparkContext;
import java.io.IOException;
import java.io.Serializable;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.stream.Collectors;
import scala.Tuple2;
/**
* Performs Rollback of Hoodie Tables.
*/
public class ListingBasedRollbackHelper implements Serializable {
private static final Logger LOG = LogManager.getLogger(ListingBasedRollbackHelper.class);
private final HoodieTableMetaClient metaClient;
private final HoodieWriteConfig config;
public ListingBasedRollbackHelper(HoodieTableMetaClient metaClient, HoodieWriteConfig config) {
this.metaClient = metaClient;
this.config = config;
}
/**
* Performs all rollback actions that we have collected in parallel.
*/
public List<HoodieRollbackStat> performRollback(HoodieEngineContext context, HoodieInstant instantToRollback, List<ListingBasedRollbackRequest> rollbackRequests) {
int sparkPartitions = Math.max(Math.min(rollbackRequests.size(), config.getRollbackParallelism()), 1);
context.setJobStatus(this.getClass().getSimpleName(), "Perform rollback actions");
JavaPairRDD<String, HoodieRollbackStat> partitionPathRollbackStatsPairRDD = maybeDeleteAndCollectStats(context, instantToRollback, rollbackRequests, sparkPartitions, true);
return partitionPathRollbackStatsPairRDD.reduceByKey(RollbackUtils::mergeRollbackStat).map(Tuple2::_2).collect();
}
/**
* Collect all file info that needs to be rollbacked.
*/
public List<HoodieRollbackStat> collectRollbackStats(HoodieEngineContext context, HoodieInstant instantToRollback, List<ListingBasedRollbackRequest> rollbackRequests) {
int sparkPartitions = Math.max(Math.min(rollbackRequests.size(), config.getRollbackParallelism()), 1);
context.setJobStatus(this.getClass().getSimpleName(), "Collect rollback stats for upgrade/downgrade");
JavaPairRDD<String, HoodieRollbackStat> partitionPathRollbackStatsPairRDD = maybeDeleteAndCollectStats(context, instantToRollback, rollbackRequests, sparkPartitions, false);
return partitionPathRollbackStatsPairRDD.map(Tuple2::_2).collect();
}
/**
* May be delete interested files and collect stats or collect stats only.
*
* @param context instance of {@link HoodieEngineContext} to use.
* @param instantToRollback {@link HoodieInstant} of interest for which deletion or collect stats is requested.
* @param rollbackRequests List of {@link ListingBasedRollbackRequest} to be operated on.
* @param sparkPartitions number of spark partitions to use for parallelism.
* @param doDelete {@code true} if deletion has to be done. {@code false} if only stats are to be collected w/o performing any deletes.
* @return stats collected with or w/o actual deletions.
*/
JavaPairRDD<String, HoodieRollbackStat> maybeDeleteAndCollectStats(HoodieEngineContext context, HoodieInstant instantToRollback, List<ListingBasedRollbackRequest> rollbackRequests,
int sparkPartitions, boolean doDelete) {
JavaSparkContext jsc = HoodieSparkEngineContext.getSparkContext(context);
return jsc.parallelize(rollbackRequests, sparkPartitions).mapToPair(rollbackRequest -> {
switch (rollbackRequest.getType()) {
case DELETE_DATA_FILES_ONLY: {
final Map<FileStatus, Boolean> filesToDeletedStatus = deleteBaseFiles(metaClient, config, instantToRollback.getTimestamp(),
rollbackRequest.getPartitionPath(), doDelete);
return new Tuple2<>(rollbackRequest.getPartitionPath(),
HoodieRollbackStat.newBuilder().withPartitionPath(rollbackRequest.getPartitionPath())
.withDeletedFileResults(filesToDeletedStatus).build());
}
case DELETE_DATA_AND_LOG_FILES: {
final Map<FileStatus, Boolean> filesToDeletedStatus = deleteBaseAndLogFiles(metaClient, config, instantToRollback.getTimestamp(), rollbackRequest.getPartitionPath(), doDelete);
return new Tuple2<>(rollbackRequest.getPartitionPath(),
HoodieRollbackStat.newBuilder().withPartitionPath(rollbackRequest.getPartitionPath())
.withDeletedFileResults(filesToDeletedStatus).build());
}
case APPEND_ROLLBACK_BLOCK: {
String fileId = rollbackRequest.getFileId().get();
String latestBaseInstant = rollbackRequest.getLatestBaseInstant().get();
// collect all log files that is supposed to be deleted with this rollback
Map<FileStatus, Long> writtenLogFileSizeMap = FSUtils.getAllLogFiles(metaClient.getFs(),
FSUtils.getPartitionPath(config.getBasePath(), rollbackRequest.getPartitionPath()),
fileId, HoodieFileFormat.HOODIE_LOG.getFileExtension(), latestBaseInstant)
.collect(Collectors.toMap(HoodieLogFile::getFileStatus, value -> value.getFileStatus().getLen()));
Writer writer = null;
try {
writer = HoodieLogFormat.newWriterBuilder()
.onParentPath(FSUtils.getPartitionPath(metaClient.getBasePath(), rollbackRequest.getPartitionPath()))
.withFileId(fileId)
.overBaseCommit(latestBaseInstant)
.withFs(metaClient.getFs())
.withFileExtension(HoodieLogFile.DELTA_EXTENSION).build();
// generate metadata
if (doDelete) {
Map<HeaderMetadataType, String> header = generateHeader(instantToRollback.getTimestamp());
// if update belongs to an existing log file
writer.appendBlock(new HoodieCommandBlock(header));
}
} catch (IOException | InterruptedException io) {
throw new HoodieRollbackException("Failed to rollback for instant " + instantToRollback, io);
} finally {
try {
if (writer != null) {
writer.close();
}
} catch (IOException io) {
throw new HoodieIOException("Error appending rollback block..", io);
}
}
// This step is intentionally done after writer is closed. Guarantees that
// getFileStatus would reflect correct stats and FileNotFoundException is not thrown in
// cloud-storage : HUDI-168
Map<FileStatus, Long> filesToNumBlocksRollback = Collections.singletonMap(
metaClient.getFs().getFileStatus(Objects.requireNonNull(writer).getLogFile().getPath()),
1L
);
return new Tuple2<>(rollbackRequest.getPartitionPath(),
HoodieRollbackStat.newBuilder().withPartitionPath(rollbackRequest.getPartitionPath())
.withRollbackBlockAppendResults(filesToNumBlocksRollback)
.withWrittenLogFileSizeMap(writtenLogFileSizeMap).build());
}
default:
throw new IllegalStateException("Unknown Rollback action " + rollbackRequest);
}
});
}
/**
* Common method used for cleaning out base files under a partition path during rollback of a set of commits.
*/
private Map<FileStatus, Boolean> deleteBaseAndLogFiles(HoodieTableMetaClient metaClient, HoodieWriteConfig config,
String commit, String partitionPath, boolean doDelete) throws IOException {
LOG.info("Cleaning path " + partitionPath);
String basefileExtension = metaClient.getTableConfig().getBaseFileFormat().getFileExtension();
SerializablePathFilter filter = (path) -> {
if (path.toString().endsWith(basefileExtension)) {
String fileCommitTime = FSUtils.getCommitTime(path.getName());
return commit.equals(fileCommitTime);
} else if (FSUtils.isLogFile(path)) {
// Since the baseCommitTime is the only commit for new log files, it's okay here
String fileCommitTime = FSUtils.getBaseCommitTimeFromLogPath(path);
return commit.equals(fileCommitTime);
}
return false;
};
final Map<FileStatus, Boolean> results = new HashMap<>();
FileSystem fs = metaClient.getFs();
FileStatus[] toBeDeleted = fs.listStatus(FSUtils.getPartitionPath(config.getBasePath(), partitionPath), filter);
for (FileStatus file : toBeDeleted) {
if (doDelete) {
boolean success = fs.delete(file.getPath(), false);
results.put(file, success);
LOG.info("Delete file " + file.getPath() + "\t" + success);
} else {
results.put(file, true);
}
}
return results;
}
/**
* Common method used for cleaning out base files under a partition path during rollback of a set of commits.
*/
private Map<FileStatus, Boolean> deleteBaseFiles(HoodieTableMetaClient metaClient, HoodieWriteConfig config,
String commit, String partitionPath, boolean doDelete) throws IOException {
final Map<FileStatus, Boolean> results = new HashMap<>();
LOG.info("Cleaning path " + partitionPath);
FileSystem fs = metaClient.getFs();
String basefileExtension = metaClient.getTableConfig().getBaseFileFormat().getFileExtension();
PathFilter filter = (path) -> {
if (path.toString().contains(basefileExtension)) {
String fileCommitTime = FSUtils.getCommitTime(path.getName());
return commit.equals(fileCommitTime);
}
return false;
};
FileStatus[] toBeDeleted = fs.listStatus(FSUtils.getPartitionPath(config.getBasePath(), partitionPath), filter);
for (FileStatus file : toBeDeleted) {
if (doDelete) {
boolean success = fs.delete(file.getPath(), false);
results.put(file, success);
LOG.info("Delete file " + file.getPath() + "\t" + success);
} else {
results.put(file, true);
}
}
return results;
}
private Map<HeaderMetadataType, String> generateHeader(String commit) {
// generate metadata
Map<HeaderMetadataType, String> header = new HashMap<>(3);
header.put(HeaderMetadataType.INSTANT_TIME, metaClient.getActiveTimeline().lastInstant().get().getTimestamp());
header.put(HeaderMetadataType.TARGET_INSTANT_TIME, commit);
header.put(HeaderMetadataType.COMMAND_BLOCK_TYPE,
String.valueOf(HoodieCommandBlockTypeEnum.ROLLBACK_PREVIOUS_BLOCK.ordinal()));
return header;
}
public interface SerializablePathFilter extends PathFilter, Serializable {
}
}

View File

@@ -1,73 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi.table.action.rollback;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.client.common.HoodieSparkEngineContext;
import org.apache.hudi.common.HoodieRollbackStat;
import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.table.HoodieTable;
import org.apache.spark.api.java.JavaRDD;
import java.util.List;
@SuppressWarnings("checkstyle:LineLength")
public class SparkCopyOnWriteRollbackActionExecutor<T extends HoodieRecordPayload> extends
BaseCopyOnWriteRollbackActionExecutor<T, JavaRDD<HoodieRecord<T>>, JavaRDD<HoodieKey>, JavaRDD<WriteStatus>> {
public SparkCopyOnWriteRollbackActionExecutor(HoodieSparkEngineContext context,
HoodieWriteConfig config,
HoodieTable<T, JavaRDD<HoodieRecord<T>>, JavaRDD<HoodieKey>, JavaRDD<WriteStatus>> table,
String instantTime,
HoodieInstant commitInstant,
boolean deleteInstants) {
super(context, config, table, instantTime, commitInstant, deleteInstants);
}
public SparkCopyOnWriteRollbackActionExecutor(HoodieSparkEngineContext context,
HoodieWriteConfig config,
HoodieTable<T, JavaRDD<HoodieRecord<T>>, JavaRDD<HoodieKey>, JavaRDD<WriteStatus>> table,
String instantTime,
HoodieInstant commitInstant,
boolean deleteInstants,
boolean skipTimelinePublish,
boolean useMarkerBasedStrategy) {
super(context, config, table, instantTime, commitInstant, deleteInstants, skipTimelinePublish, useMarkerBasedStrategy);
}
@Override
protected BaseRollbackActionExecutor.RollbackStrategy getRollbackStrategy() {
if (useMarkerBasedStrategy) {
return new SparkMarkerBasedRollbackStrategy(table, context, config, instantTime);
} else {
return this::executeRollbackUsingFileListing;
}
}
@Override
protected List<HoodieRollbackStat> executeRollbackUsingFileListing(HoodieInstant instantToRollback) {
List<ListingBasedRollbackRequest> rollbackRequests = RollbackUtils.generateRollbackRequestsByListingCOW(context,
table.getMetaClient().getBasePath(), config);
return new ListingBasedRollbackHelper(table.getMetaClient(), config).performRollback(context, instantToRollback, rollbackRequests);
}
}

View File

@@ -1,93 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi.table.action.rollback;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.client.common.HoodieSparkEngineContext;
import org.apache.hudi.common.HoodieRollbackStat;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.HoodieFileFormat;
import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.model.HoodieLogFile;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.model.IOType;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieRollbackException;
import org.apache.hudi.table.HoodieTable;
import org.apache.hudi.table.marker.MarkerBasedRollbackUtils;
import org.apache.hudi.table.marker.WriteMarkers;
import org.apache.hadoop.fs.FileStatus;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import java.io.IOException;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import scala.Tuple2;
@SuppressWarnings("checkstyle:LineLength")
public class SparkMarkerBasedRollbackStrategy<T extends HoodieRecordPayload> extends AbstractMarkerBasedRollbackStrategy<T, JavaRDD<HoodieRecord<T>>, JavaRDD<HoodieKey>, JavaRDD<WriteStatus>> {
public SparkMarkerBasedRollbackStrategy(HoodieTable<T, JavaRDD<HoodieRecord<T>>, JavaRDD<HoodieKey>, JavaRDD<WriteStatus>> table, HoodieEngineContext context, HoodieWriteConfig config, String instantTime) {
super(table, context, config, instantTime);
}
@Override
public List<HoodieRollbackStat> execute(HoodieInstant instantToRollback) {
JavaSparkContext jsc = HoodieSparkEngineContext.getSparkContext(context);
try {
List<String> markerPaths = MarkerBasedRollbackUtils.getAllMarkerPaths(
table, context, instantToRollback.getTimestamp(), config.getRollbackParallelism());
int parallelism = Math.max(Math.min(markerPaths.size(), config.getRollbackParallelism()), 1);
jsc.setJobGroup(this.getClass().getSimpleName(), "Rolling back using marker files");
return jsc.parallelize(markerPaths, parallelism)
.map(markerFilePath -> {
String typeStr = markerFilePath.substring(markerFilePath.lastIndexOf(".") + 1);
IOType type = IOType.valueOf(typeStr);
switch (type) {
case MERGE:
return undoMerge(WriteMarkers.stripMarkerSuffix(markerFilePath));
case APPEND:
return undoAppend(WriteMarkers.stripMarkerSuffix(markerFilePath), instantToRollback);
case CREATE:
return undoCreate(WriteMarkers.stripMarkerSuffix(markerFilePath));
default:
throw new HoodieRollbackException("Unknown marker type, during rollback of " + instantToRollback);
}
})
.mapToPair(rollbackStat -> new Tuple2<>(rollbackStat.getPartitionPath(), rollbackStat))
.reduceByKey(RollbackUtils::mergeRollbackStat)
.map(Tuple2::_2).collect();
} catch (Exception e) {
throw new HoodieRollbackException("Error rolling back using marker files written for " + instantToRollback, e);
}
}
protected Map<FileStatus, Long> getWrittenLogFileSizeMap(String partitionPathStr, String baseCommitTime, String fileId) throws IOException {
// collect all log files that is supposed to be deleted with this rollback
return FSUtils.getAllLogFiles(table.getMetaClient().getFs(),
FSUtils.getPartitionPath(config.getBasePath(), partitionPathStr), fileId, HoodieFileFormat.HOODIE_LOG.getFileExtension(), baseCommitTime)
.collect(Collectors.toMap(HoodieLogFile::getFileStatus, value -> value.getFileStatus().getLen()));
}
}

View File

@@ -1,82 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi.table.action.rollback;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.client.common.HoodieSparkEngineContext;
import org.apache.hudi.common.HoodieRollbackStat;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.table.HoodieTable;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import java.io.IOException;
import java.util.List;
@SuppressWarnings("checkstyle:LineLength")
public class SparkMergeOnReadRollbackActionExecutor<T extends HoodieRecordPayload> extends
BaseMergeOnReadRollbackActionExecutor<T, JavaRDD<HoodieRecord<T>>, JavaRDD<HoodieKey>, JavaRDD<WriteStatus>> {
public SparkMergeOnReadRollbackActionExecutor(HoodieEngineContext context,
HoodieWriteConfig config,
HoodieTable<T, JavaRDD<HoodieRecord<T>>, JavaRDD<HoodieKey>, JavaRDD<WriteStatus>> table,
String instantTime,
HoodieInstant commitInstant,
boolean deleteInstants) {
super(context, config, table, instantTime, commitInstant, deleteInstants);
}
public SparkMergeOnReadRollbackActionExecutor(HoodieEngineContext context,
HoodieWriteConfig config,
HoodieTable<T, JavaRDD<HoodieRecord<T>>, JavaRDD<HoodieKey>, JavaRDD<WriteStatus>> table,
String instantTime,
HoodieInstant commitInstant,
boolean deleteInstants,
boolean skipTimelinePublish,
boolean useMarkerBasedStrategy) {
super(context, config, table, instantTime, commitInstant, deleteInstants, skipTimelinePublish, useMarkerBasedStrategy);
}
@Override
protected BaseRollbackActionExecutor.RollbackStrategy getRollbackStrategy() {
if (useMarkerBasedStrategy) {
return new SparkMarkerBasedRollbackStrategy(table, context, config, instantTime);
} else {
return this::executeRollbackUsingFileListing;
}
}
@Override
protected List<HoodieRollbackStat> executeRollbackUsingFileListing(HoodieInstant resolvedInstant) {
List<ListingBasedRollbackRequest> rollbackRequests;
JavaSparkContext jsc = HoodieSparkEngineContext.getSparkContext(context);
try {
rollbackRequests = RollbackUtils.generateRollbackRequestsUsingFileListingMOR(resolvedInstant, table, context);
} catch (IOException e) {
throw new HoodieIOException("Error generating rollback requests by file listing.", e);
}
return new ListingBasedRollbackHelper(table.getMetaClient(), config).performRollback(context, resolvedInstant, rollbackRequests);
}
}

View File

@@ -83,8 +83,8 @@ public class TestCopyOnWriteRollbackActionExecutor extends HoodieClientRollbackT
HoodieInstant needRollBackInstant = new HoodieInstant(false, HoodieTimeline.COMMIT_ACTION, "002");
// execute CopyOnWriteRollbackActionExecutor with filelisting mode
SparkCopyOnWriteRollbackActionExecutor copyOnWriteRollbackActionExecutor = new SparkCopyOnWriteRollbackActionExecutor(context, table.getConfig(), table, "003", needRollBackInstant, true);
assertFalse(copyOnWriteRollbackActionExecutor.getRollbackStrategy() instanceof SparkMarkerBasedRollbackStrategy);
CopyOnWriteRollbackActionExecutor copyOnWriteRollbackActionExecutor = new CopyOnWriteRollbackActionExecutor(context, table.getConfig(), table, "003", needRollBackInstant, true);
assertFalse(copyOnWriteRollbackActionExecutor.getRollbackStrategy() instanceof MarkerBasedRollbackStrategy);
List<HoodieRollbackStat> hoodieRollbackStats = copyOnWriteRollbackActionExecutor.executeRollback();
// assert hoodieRollbackStats
@@ -162,11 +162,11 @@ public class TestCopyOnWriteRollbackActionExecutor extends HoodieClientRollbackT
commitInstant = table.getCompletedCommitTimeline().lastInstant().get();
}
SparkCopyOnWriteRollbackActionExecutor copyOnWriteRollbackActionExecutor = new SparkCopyOnWriteRollbackActionExecutor(context, cfg, table, "003", commitInstant, false);
CopyOnWriteRollbackActionExecutor copyOnWriteRollbackActionExecutor = new CopyOnWriteRollbackActionExecutor(context, cfg, table, "003", commitInstant, false);
if (!isUsingMarkers) {
assertFalse(copyOnWriteRollbackActionExecutor.getRollbackStrategy() instanceof SparkMarkerBasedRollbackStrategy);
assertFalse(copyOnWriteRollbackActionExecutor.getRollbackStrategy() instanceof MarkerBasedRollbackStrategy);
} else {
assertTrue(copyOnWriteRollbackActionExecutor.getRollbackStrategy() instanceof SparkMarkerBasedRollbackStrategy);
assertTrue(copyOnWriteRollbackActionExecutor.getRollbackStrategy() instanceof MarkerBasedRollbackStrategy);
}
Map<String, HoodieRollbackPartitionMetadata> rollbackMetadata = copyOnWriteRollbackActionExecutor.execute().getPartitionMetadata();

View File

@@ -89,7 +89,7 @@ public class TestMergeOnReadRollbackActionExecutor extends HoodieClientRollbackT
//2. rollback
HoodieInstant rollBackInstant = new HoodieInstant(isUsingMarkers, HoodieTimeline.DELTA_COMMIT_ACTION, "002");
SparkMergeOnReadRollbackActionExecutor mergeOnReadRollbackActionExecutor = new SparkMergeOnReadRollbackActionExecutor(
MergeOnReadRollbackActionExecutor mergeOnReadRollbackActionExecutor = new MergeOnReadRollbackActionExecutor(
context,
cfg,
table,
@@ -98,9 +98,9 @@ public class TestMergeOnReadRollbackActionExecutor extends HoodieClientRollbackT
true);
// assert is filelist mode
if (!isUsingMarkers) {
assertFalse(mergeOnReadRollbackActionExecutor.getRollbackStrategy() instanceof SparkMarkerBasedRollbackStrategy);
assertFalse(mergeOnReadRollbackActionExecutor.getRollbackStrategy() instanceof MarkerBasedRollbackStrategy);
} else {
assertTrue(mergeOnReadRollbackActionExecutor.getRollbackStrategy() instanceof SparkMarkerBasedRollbackStrategy);
assertTrue(mergeOnReadRollbackActionExecutor.getRollbackStrategy() instanceof MarkerBasedRollbackStrategy);
}
//3. assert the rollback stat
@@ -145,15 +145,15 @@ public class TestMergeOnReadRollbackActionExecutor extends HoodieClientRollbackT
public void testFailForCompletedInstants() {
Assertions.assertThrows(IllegalArgumentException.class, () -> {
HoodieInstant rollBackInstant = new HoodieInstant(false, HoodieTimeline.DELTA_COMMIT_ACTION, "002");
new SparkMergeOnReadRollbackActionExecutor(
context,
getConfigBuilder().build(),
getHoodieTable(metaClient, getConfigBuilder().build()),
"003",
rollBackInstant,
true,
true,
true);
new MergeOnReadRollbackActionExecutor(
context,
getConfigBuilder().build(),
getHoodieTable(metaClient, getConfigBuilder().build()),
"003",
rollBackInstant,
true,
true,
true);
});
}

View File

@@ -32,7 +32,7 @@ import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.testutils.HoodieTestTable;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.table.HoodieSparkTable;
import org.apache.hudi.table.action.rollback.SparkMarkerBasedRollbackStrategy;
import org.apache.hudi.table.action.rollback.MarkerBasedRollbackStrategy;
import org.apache.hudi.testutils.HoodieClientTestBase;
import org.apache.hadoop.fs.FileStatus;
@@ -93,7 +93,7 @@ public class TestMarkerBasedRollbackStrategy extends HoodieClientTestBase {
.withMarkerFile("partA", f2, IOType.CREATE);
// when
List<HoodieRollbackStat> stats = new SparkMarkerBasedRollbackStrategy(HoodieSparkTable.create(getConfig(), context, metaClient), context, getConfig(), "002")
List<HoodieRollbackStat> stats = new MarkerBasedRollbackStrategy(HoodieSparkTable.create(getConfig(), context, metaClient), context, getConfig(), "002")
.execute(new HoodieInstant(HoodieInstant.State.INFLIGHT, HoodieTimeline.COMMIT_ACTION, "001"));
// then: ensure files are deleted correctly, non-existent files reported as failed deletes
@@ -176,7 +176,7 @@ public class TestMarkerBasedRollbackStrategy extends HoodieClientTestBase {
writeStatuses.collect();
// rollback 2nd commit and ensure stats reflect the info.
return new SparkMarkerBasedRollbackStrategy(HoodieSparkTable.create(writeConfig, context, metaClient), context, writeConfig, "003")
return new MarkerBasedRollbackStrategy(HoodieSparkTable.create(writeConfig, context, metaClient), context, writeConfig, "003")
.execute(new HoodieInstant(HoodieInstant.State.INFLIGHT, HoodieTimeline.DELTA_COMMIT_ACTION, "002"));
}

View File

@@ -19,6 +19,7 @@
package org.apache.hudi.common.engine;
import org.apache.hudi.common.config.SerializableConfiguration;
import org.apache.hudi.common.function.SerializableBiFunction;
import org.apache.hudi.common.function.SerializableConsumer;
import org.apache.hudi.common.function.SerializableFunction;
import org.apache.hudi.common.function.SerializablePairFunction;
@@ -56,6 +57,9 @@ public abstract class HoodieEngineContext {
public abstract <I, O> List<O> map(List<I> data, SerializableFunction<I, O> func, int parallelism);
public abstract <I, K, V> List<V> mapToPairAndReduceByKey(
List<I> data, SerializablePairFunction<I, K, V> mapToPairFunc, SerializableBiFunction<V, V, V> reduceFunc, int parallelism);
public abstract <I, O> List<O> flatMap(List<I> data, SerializableFunction<I, Stream<O>> func, int parallelism);
public abstract <I> void foreach(List<I> data, SerializableConsumer<I> consumer, int parallelism);

View File

@@ -20,6 +20,7 @@ package org.apache.hudi.common.engine;
import org.apache.hadoop.conf.Configuration;
import org.apache.hudi.common.config.SerializableConfiguration;
import org.apache.hudi.common.function.SerializableBiFunction;
import org.apache.hudi.common.function.SerializableConsumer;
import org.apache.hudi.common.function.SerializableFunction;
import org.apache.hudi.common.function.SerializablePairFunction;
@@ -37,6 +38,7 @@ import static org.apache.hudi.common.function.FunctionWrapper.throwingFlatMapWra
import static org.apache.hudi.common.function.FunctionWrapper.throwingForeachWrapper;
import static org.apache.hudi.common.function.FunctionWrapper.throwingMapToPairWrapper;
import static org.apache.hudi.common.function.FunctionWrapper.throwingMapWrapper;
import static org.apache.hudi.common.function.FunctionWrapper.throwingReduceWrapper;
/**
* A java based engine context, use this implementation on the query engine integrations if needed.
@@ -56,6 +58,15 @@ public final class HoodieLocalEngineContext extends HoodieEngineContext {
return data.stream().parallel().map(throwingMapWrapper(func)).collect(toList());
}
@Override
public <I, K, V> List<V> mapToPairAndReduceByKey(
List<I> data, SerializablePairFunction<I, K, V> mapToPairFunc, SerializableBiFunction<V, V, V> reduceFunc, int parallelism) {
return data.stream().parallel().map(throwingMapToPairWrapper(mapToPairFunc))
.collect(Collectors.groupingBy(p -> p.getKey())).values().stream()
.map(list -> list.stream().map(e -> e.getValue()).reduce(throwingReduceWrapper(reduceFunc)).get())
.collect(Collectors.toList());
}
@Override
public <I, O> List<O> flatMap(List<I> data, SerializableFunction<I, Stream<O>> func, int parallelism) {
return data.stream().parallel().flatMap(throwingFlatMapWrapper(func)).collect(toList());

View File

@@ -21,6 +21,7 @@ package org.apache.hudi.common.function;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.exception.HoodieException;
import java.util.function.BinaryOperator;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.stream.Stream;
@@ -70,4 +71,14 @@ public class FunctionWrapper {
}
};
}
public static <V> BinaryOperator<V> throwingReduceWrapper(SerializableBiFunction<V, V, V> throwingReduceFunction) {
return (v1, v2) -> {
try {
return throwingReduceFunction.apply(v1, v2);
} catch (Exception e) {
throw new HoodieException("Error occurs when executing mapToPair", e);
}
};
}
}

View File

@@ -0,0 +1,34 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.hudi.common.function;
import java.io.Serializable;
/**
* A function that accepts two arguments and produces a result.
*
* @param <T> the type of the first argument to the function
* @param <U> the type of the second argument to the function
* @param <R> the type of the result of the function
*/
@FunctionalInterface
public interface SerializableBiFunction<T, U, R> extends Serializable {
R apply(T t, U u);
}