1
0

[HUDI-3007] Fix issues in HoodieRepairTool (#4564)

This commit is contained in:
Y Ethan Guo
2022-01-12 09:03:27 -08:00
committed by GitHub
parent 12e95771ee
commit 397795c7d0
7 changed files with 957 additions and 119 deletions

View File

@@ -22,21 +22,22 @@ package org.apache.hudi.utilities;
import org.apache.hudi.client.common.HoodieSparkEngineContext;
import org.apache.hudi.common.config.SerializableConfiguration;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.HoodieFileFormat;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
import org.apache.hudi.common.table.timeline.HoodieArchivedTimeline;
import org.apache.hudi.common.util.FileIOUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.common.util.collection.ImmutablePair;
import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.metadata.FileSystemBackedTableMetadata;
import org.apache.hudi.metadata.HoodieTableMetadata;
import org.apache.hudi.table.repair.RepairUtils;
import com.beust.jcommander.JCommander;
import com.beust.jcommander.Parameter;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.log4j.LogManager;
@@ -48,14 +49,10 @@ import java.io.Serializable;
import java.security.SecureRandom;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
import scala.Tuple2;
/**
* A tool with spark-submit to repair Hudi table by finding and deleting dangling
* base and log files.
@@ -153,15 +150,15 @@ public class HoodieRepairTool {
// Properties with source, hoodie client, key generator etc.
private TypedProperties props;
// Spark context
private final JavaSparkContext jsc;
private final HoodieEngineContext context;
private final HoodieTableMetaClient metaClient;
private final FileSystemBackedTableMetadata tableMetadata;
private final HoodieTableMetadata tableMetadata;
public HoodieRepairTool(JavaSparkContext jsc, Config cfg) {
if (cfg.propsFilePath != null) {
cfg.propsFilePath = FSUtils.addSchemeIfLocalPath(cfg.propsFilePath).toString();
}
this.jsc = jsc;
this.context = new HoodieSparkEngineContext(jsc);
this.cfg = cfg;
this.props = cfg.propsFilePath == null
? UtilHelpers.buildProperties(cfg.configs)
@@ -170,13 +167,12 @@ public class HoodieRepairTool {
.setConf(jsc.hadoopConfiguration()).setBasePath(cfg.basePath)
.setLoadActiveTimelineOnLoad(true)
.build();
this.tableMetadata = new FileSystemBackedTableMetadata(
new HoodieSparkEngineContext(jsc),
new SerializableConfiguration(jsc.hadoopConfiguration()),
cfg.basePath, cfg.assumeDatePartitioning);
context, context.getHadoopConf(), cfg.basePath, cfg.assumeDatePartitioning);
}
public void run() {
public boolean run() {
Option<String> startingInstantOption = Option.ofNullable(cfg.startingInstantTime);
Option<String> endingInstantOption = Option.ofNullable(cfg.endingInstantTime);
@@ -201,24 +197,22 @@ public class HoodieRepairTool {
+ "not belonging to any commit are going to be DELETED from the table ******");
if (checkBackupPathForRepair() < 0) {
LOG.error("Backup path check failed.");
break;
return false;
}
doRepair(startingInstantOption, endingInstantOption, false);
break;
return doRepair(startingInstantOption, endingInstantOption, false);
case DRY_RUN:
LOG.info(" ****** The repair tool is in DRY_RUN mode, "
+ "only LOOKING FOR dangling data and log files from the table ******");
doRepair(startingInstantOption, endingInstantOption, true);
break;
return doRepair(startingInstantOption, endingInstantOption, true);
case UNDO:
if (checkBackupPathAgainstBasePath() < 0) {
LOG.error("Backup path check failed.");
break;
return false;
}
undoRepair();
break;
return undoRepair();
default:
LOG.info("Unsupported running mode [" + cfg.runningMode + "], quit the job directly");
return false;
}
} catch (IOException e) {
throw new HoodieIOException("Unable to repair table in " + cfg.basePath, e);
@@ -246,69 +240,98 @@ public class HoodieRepairTool {
* Copies the list of files from source base path to destination base path.
* The destination file path (base + relative) should not already exist.
*
* @param jsc {@link JavaSparkContext} instance.
* @param context {@link HoodieEngineContext} instance.
* @param relativeFilePaths A {@link List} of relative file paths for copying.
* @param sourceBasePath Source base path.
* @param destBasePath Destination base path.
* @param parallelism Parallelism.
* @return {@code true} if all successful; {@code false} otherwise.
*/
static boolean copyFiles(
JavaSparkContext jsc, List<String> relativeFilePaths, String sourceBasePath,
String destBasePath, int parallelism) {
SerializableConfiguration conf = new SerializableConfiguration(jsc.hadoopConfiguration());
List<Boolean> allResults = jsc.parallelize(relativeFilePaths, parallelism)
HoodieEngineContext context, List<String> relativeFilePaths, String sourceBasePath,
String destBasePath) {
SerializableConfiguration conf = context.getHadoopConf();
List<Boolean> allResults = context.parallelize(relativeFilePaths)
.mapPartitions(iterator -> {
List<Boolean> results = new ArrayList<>();
FileSystem fs = FSUtils.getFs(destBasePath, conf.get());
iterator.forEachRemaining(filePath -> {
boolean success = false;
Path sourcePath = new Path(sourceBasePath, filePath);
Path destPath = new Path(destBasePath, filePath);
try {
if (!fs.exists(destPath)) {
FileIOUtils.copy(fs, new Path(sourceBasePath, filePath), destPath);
FileIOUtils.copy(fs, sourcePath, destPath);
success = true;
}
} catch (IOException e) {
// Copy Fail
LOG.error(String.format("Copying file fails: source [%s], destination [%s]",
sourcePath, destPath));
} finally {
results.add(success);
}
});
return results.iterator();
})
.collect();
}, true)
.collectAsList();
return allResults.stream().reduce((r1, r2) -> r1 && r2).orElse(false);
}
/**
* Lists all Hoodie files from the table base path.
*
* @param basePathStr Table base path.
* @param conf {@link Configuration} instance.
* @return An array of {@link FileStatus} of all Hoodie files.
* @param context {@link HoodieEngineContext} instance.
* @param basePathStr Table base path.
* @param expectedLevel Expected level in the directory hierarchy to include the file status.
* @param parallelism Parallelism for the file listing.
* @return A list of absolute file paths of all Hoodie files.
* @throws IOException upon errors.
*/
static FileStatus[] listFilesFromBasePath(String basePathStr, Configuration conf) throws IOException {
final Set<String> validFileExtensions = Arrays.stream(HoodieFileFormat.values())
.map(HoodieFileFormat::getFileExtension).collect(Collectors.toCollection(HashSet::new));
final String logFileExtension = HoodieFileFormat.HOODIE_LOG.getFileExtension();
FileSystem fs = FSUtils.getFs(basePathStr, conf);
static List<String> listFilesFromBasePath(
HoodieEngineContext context, String basePathStr, int expectedLevel, int parallelism) {
FileSystem fs = FSUtils.getFs(basePathStr, context.getHadoopConf().get());
Path basePath = new Path(basePathStr);
return FSUtils.getFileStatusAtLevel(
context, fs, basePath, expectedLevel, parallelism).stream()
.filter(fileStatus -> {
if (!fileStatus.isFile()) {
return false;
}
return FSUtils.isDataFile(fileStatus.getPath());
})
.map(fileStatus -> fileStatus.getPath().toString())
.collect(Collectors.toList());
}
try {
return Arrays.stream(fs.listStatus(basePath, path -> {
String extension = FSUtils.getFileExtension(path.getName());
return validFileExtensions.contains(extension) || path.getName().contains(logFileExtension);
})).filter(FileStatus::isFile).toArray(FileStatus[]::new);
} catch (IOException e) {
// return empty FileStatus if partition does not exist already
if (!fs.exists(basePath)) {
return new FileStatus[0];
} else {
throw e;
}
}
/**
* Deletes files from table base path.
*
* @param context {@link HoodieEngineContext} instance.
* @param basePath Base path of the table.
* @param relativeFilePaths A {@link List} of relative file paths for deleting.
*/
static boolean deleteFiles(
HoodieEngineContext context, String basePath, List<String> relativeFilePaths) {
SerializableConfiguration conf = context.getHadoopConf();
return context.parallelize(relativeFilePaths)
.mapPartitions(iterator -> {
FileSystem fs = FSUtils.getFs(basePath, conf.get());
List<Boolean> results = new ArrayList<>();
iterator.forEachRemaining(relativeFilePath -> {
boolean success = false;
try {
success = fs.delete(new Path(basePath, relativeFilePath), false);
} catch (IOException e) {
LOG.warn("Failed to delete file " + relativeFilePath);
} finally {
results.add(success);
}
});
return results.iterator();
}, true)
.collectAsList()
.stream().reduce((a, b) -> a && b)
.orElse(true);
}
/**
@@ -319,15 +342,14 @@ public class HoodieRepairTool {
* @param isDryRun Is dry run.
* @throws IOException upon errors.
*/
void doRepair(
boolean doRepair(
Option<String> startingInstantOption, Option<String> endingInstantOption, boolean isDryRun) throws IOException {
// Scans all partitions to find base and log files in the base path
List<Path> allFilesInPartitions = getBaseAndLogFilePathsFromFileSystem();
// Buckets the files based on instant time
// instant time -> relative paths of base and log files to base path
Map<String, List<String>> instantToFilesMap = RepairUtils.tagInstantsOfBaseAndLogFiles(
metaClient.getBasePath(),
metaClient.getTableConfig().getBaseFileFormat().getFileExtension(), allFilesInPartitions);
metaClient.getBasePath(), allFilesInPartitions);
List<String> instantTimesToRepair = instantToFilesMap.keySet().stream()
.filter(instant -> (!startingInstantOption.isPresent()
|| instant.compareTo(startingInstantOption.get()) >= 0)
@@ -340,30 +362,30 @@ public class HoodieRepairTool {
// This assumes that the archived timeline only has completed instants so this is safe
archivedTimeline.loadCompletedInstantDetailsInMemory();
int parallelism = Math.max(Math.min(instantTimesToRepair.size(), cfg.parallelism), 1);
List<Tuple2<String, List<String>>> instantFilesToRemove =
jsc.parallelize(instantTimesToRepair, parallelism)
.mapToPair(instantToRepair ->
new Tuple2<>(instantToRepair, RepairUtils.findInstantFilesToRemove(instantToRepair,
List<ImmutablePair<String, List<String>>> instantFilesToRemove =
context.parallelize(instantTimesToRepair)
.map(instantToRepair ->
new ImmutablePair<>(instantToRepair, RepairUtils.findInstantFilesToRemove(instantToRepair,
instantToFilesMap.get(instantToRepair), activeTimeline, archivedTimeline)))
.collect();
.collectAsList();
List<Tuple2<String, List<String>>> instantsWithDanglingFiles =
instantFilesToRemove.stream().filter(e -> !e._2.isEmpty()).collect(Collectors.toList());
List<ImmutablePair<String, List<String>>> instantsWithDanglingFiles =
instantFilesToRemove.stream().filter(e -> !e.getValue().isEmpty()).collect(Collectors.toList());
printRepairInfo(instantTimesToRepair, instantsWithDanglingFiles);
if (!isDryRun) {
List<String> relativeFilePathsToDelete =
instantsWithDanglingFiles.stream().flatMap(e -> e._2.stream()).collect(Collectors.toList());
List<String> relativeFilePathsToDelete = instantsWithDanglingFiles.stream()
.flatMap(e -> e.getValue().stream())
.collect(Collectors.toList());
if (relativeFilePathsToDelete.size() > 0) {
parallelism = Math.max(Math.min(relativeFilePathsToDelete.size(), cfg.parallelism), 1);
if (!backupFiles(relativeFilePathsToDelete, parallelism)) {
if (!backupFiles(relativeFilePathsToDelete)) {
LOG.error("Error backing up dangling files. Exiting...");
return;
return false;
}
deleteFiles(relativeFilePathsToDelete, parallelism);
return deleteFiles(context, cfg.basePath, relativeFilePathsToDelete);
}
LOG.info(String.format("Table repair on %s is successful", cfg.basePath));
}
return true;
}
/**
@@ -387,22 +409,38 @@ public class HoodieRepairTool {
*
* @throws IOException upon errors.
*/
void undoRepair() throws IOException {
boolean undoRepair() throws IOException {
FileSystem fs = metaClient.getFs();
String backupPathStr = cfg.backupPath;
Path backupPath = new Path(backupPathStr);
if (!fs.exists(backupPath)) {
LOG.error("Cannot find backup path: " + backupPath);
return;
return false;
}
List<String> relativeFilePaths = Arrays.stream(
listFilesFromBasePath(backupPathStr, jsc.hadoopConfiguration()))
.map(fileStatus ->
FSUtils.getPartitionPath(backupPathStr, fileStatus.getPath().toString()).toString())
List<String> allPartitionPaths = tableMetadata.getAllPartitionPaths();
if (allPartitionPaths.isEmpty()) {
LOG.error("Cannot get one partition path since there is no partition available");
return false;
}
int partitionLevels = getExpectedLevelBasedOnPartitionPath(allPartitionPaths.get(0));
List<String> relativeFilePaths = listFilesFromBasePath(
context, backupPathStr, partitionLevels, cfg.parallelism).stream()
.map(filePath ->
FSUtils.getRelativePartitionPath(new Path(backupPathStr), new Path(filePath)))
.collect(Collectors.toList());
int parallelism = Math.max(Math.min(relativeFilePaths.size(), cfg.parallelism), 1);
restoreFiles(relativeFilePaths, parallelism);
return restoreFiles(relativeFilePaths);
}
int getExpectedLevelBasedOnPartitionPath(String partitionPath) {
if (StringUtils.isNullOrEmpty(partitionPath)) {
return 0;
}
String[] partitionParts = partitionPath.split("/");
return partitionParts.length;
}
/**
@@ -455,48 +493,20 @@ public class HoodieRepairTool {
* Backs up dangling files from table base path to backup path.
*
* @param relativeFilePaths A {@link List} of relative file paths for backup.
* @param parallelism Parallelism for copying.
* @return {@code true} if all successful; {@code false} otherwise.
*/
boolean backupFiles(List<String> relativeFilePaths, int parallelism) {
return copyFiles(jsc, relativeFilePaths, cfg.basePath, cfg.backupPath, parallelism);
boolean backupFiles(List<String> relativeFilePaths) {
return copyFiles(context, relativeFilePaths, cfg.basePath, cfg.backupPath);
}
/**
* Restores dangling files from backup path to table base path.
*
* @param relativeFilePaths A {@link List} of relative file paths for restoring.
* @param parallelism Parallelism for copying.
* @return {@code true} if all successful; {@code false} otherwise.
*/
boolean restoreFiles(List<String> relativeFilePaths, int parallelism) {
return copyFiles(jsc, relativeFilePaths, cfg.backupPath, cfg.basePath, parallelism);
}
/**
* Deletes files from table base path.
*
* @param relativeFilePaths A {@link List} of relative file paths for deleting.
* @param parallelism Parallelism for deleting.
*/
void deleteFiles(List<String> relativeFilePaths, int parallelism) {
jsc.parallelize(relativeFilePaths, parallelism)
.mapPartitions(iterator -> {
FileSystem fs = metaClient.getFs();
List<Boolean> results = new ArrayList<>();
iterator.forEachRemaining(filePath -> {
boolean success = false;
try {
success = fs.delete(new Path(filePath), false);
} catch (IOException e) {
LOG.warn("Failed to delete file " + filePath);
} finally {
results.add(success);
}
});
return results.iterator();
})
.collect();
boolean restoreFiles(List<String> relativeFilePaths) {
return copyFiles(context, relativeFilePaths, cfg.backupPath, cfg.basePath);
}
/**
@@ -506,17 +516,14 @@ public class HoodieRepairTool {
* @param instantsWithDanglingFiles A list of instants with dangling files.
*/
private void printRepairInfo(
List<String> instantTimesToRepair, List<Tuple2<String, List<String>>> instantsWithDanglingFiles) {
List<String> instantTimesToRepair, List<ImmutablePair<String, List<String>>> instantsWithDanglingFiles) {
int numInstantsToRepair = instantsWithDanglingFiles.size();
LOG.warn("Number of instants verified based on the base and log files: "
+ instantTimesToRepair.size());
LOG.warn("Instant timestamps: " + instantTimesToRepair);
LOG.warn("Number of instants to repair: " + numInstantsToRepair);
if (numInstantsToRepair > 0) {
instantsWithDanglingFiles.forEach(e -> {
LOG.warn(" -> Instant " + numInstantsToRepair);
LOG.warn(" ** Removing files: " + e._2);
});
instantsWithDanglingFiles.forEach(e -> LOG.warn(" ** Removing files: " + e.getValue()));
}
}

View File

@@ -0,0 +1,409 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.hudi.utilities;
import org.apache.hudi.HoodieTestCommitGenerator;
import org.apache.hudi.client.HoodieReadClient;
import org.apache.hudi.client.SparkRDDWriteClient;
import org.apache.hudi.client.common.HoodieSparkEngineContext;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.testutils.HoodieCommonTestHarness;
import org.apache.hudi.common.testutils.HoodieTestUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.testutils.providers.SparkProvider;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.SQLContext;
import org.apache.spark.sql.SparkSession;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.provider.Arguments;
import java.io.IOException;
import java.security.SecureRandom;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import static org.apache.hudi.HoodieTestCommitGenerator.getBaseFilename;
import static org.apache.hudi.HoodieTestCommitGenerator.getLogFilename;
import static org.apache.hudi.HoodieTestCommitGenerator.initCommitInfoForRepairTests;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
public class TestHoodieRepairTool extends HoodieCommonTestHarness implements SparkProvider {
private static final Logger LOG = LogManager.getLogger(TestHoodieRepairTool.class);
// Instant time -> List<Pair<relativePartitionPath, fileId>>
private static final Map<String, List<Pair<String, String>>> BASE_FILE_INFO = new HashMap<>();
private static final Map<String, List<Pair<String, String>>> LOG_FILE_INFO = new HashMap<>();
// Relative paths to base path for dangling files
private static final List<String> DANGLING_DATA_FILE_LIST = new ArrayList<>();
private static transient SparkSession spark;
private static transient SQLContext sqlContext;
private static transient JavaSparkContext jsc;
private static transient HoodieSparkEngineContext context;
// instant time -> partitionPathToFileIdAndNameMap
private final Map<String, Map<String, List<Pair<String, String>>>> instantInfoMap = new HashMap<>();
private final List<String> allFileAbsolutePathList = new ArrayList<>();
private java.nio.file.Path backupTempDir;
@BeforeAll
static void initFileInfo() {
initCommitInfoForRepairTests(BASE_FILE_INFO, LOG_FILE_INFO);
initDanglingDataFileList();
}
@BeforeEach
public void initWithCleanState() throws IOException {
boolean initialized = spark != null;
if (!initialized) {
SparkConf sparkConf = conf();
SparkRDDWriteClient.registerClasses(sparkConf);
HoodieReadClient.addHoodieSupport(sparkConf);
spark = SparkSession.builder().config(sparkConf).getOrCreate();
sqlContext = spark.sqlContext();
jsc = new JavaSparkContext(spark.sparkContext());
context = new HoodieSparkEngineContext(jsc);
}
initPath();
metaClient = HoodieTestUtils.init(basePath, getTableType());
backupTempDir = tempDir.resolve("backup");
cleanUpDanglingDataFilesInFS();
cleanUpBackupTempDir();
HoodieTestCommitGenerator.setupTimelineInFS(
basePath, BASE_FILE_INFO, LOG_FILE_INFO, instantInfoMap);
allFileAbsolutePathList.clear();
allFileAbsolutePathList.addAll(instantInfoMap.entrySet().stream()
.flatMap(e -> e.getValue().entrySet().stream()
.flatMap(partition -> partition.getValue().stream()
.map(fileInfo -> new Path(
new Path(basePath, partition.getKey()), fileInfo.getValue()).toString())
.collect(Collectors.toList())
.stream())
.collect(Collectors.toList())
.stream()
)
.collect(Collectors.toList()));
}
@AfterEach
public void cleanUp() throws IOException {
cleanUpDanglingDataFilesInFS();
cleanUpBackupTempDir();
}
@AfterAll
public static synchronized void resetSpark() {
if (spark != null) {
spark.close();
spark = null;
}
}
private void cleanUpDanglingDataFilesInFS() {
FileSystem fs = metaClient.getFs();
DANGLING_DATA_FILE_LIST.forEach(
relativeFilePath -> {
Path path = new Path(basePath, relativeFilePath);
try {
if (fs.exists(path)) {
fs.delete(path, false);
}
} catch (IOException e) {
throw new HoodieIOException("Unable to delete file: " + path);
}
}
);
}
private void cleanUpBackupTempDir() throws IOException {
FileSystem fs = metaClient.getFs();
fs.delete(new Path(backupTempDir.toAbsolutePath().toString()), true);
}
private static void initDanglingDataFileList() {
DANGLING_DATA_FILE_LIST.add(
new Path("2022/01/01",
getBaseFilename("000", UUID.randomUUID().toString())).toString());
DANGLING_DATA_FILE_LIST.add(
new Path("2022/01/06",
getLogFilename("001", UUID.randomUUID().toString())).toString());
}
private Stream<Arguments> configPathParams() {
Object[][] data = new Object[][] {
{null, basePath, -1}, {basePath + "/backup", basePath, -1},
{"/tmp/backup", basePath, 0}
};
return Stream.of(data).map(Arguments::of);
}
@Test
public void testCheckBackupPathAgainstBasePath() {
configPathParams().forEach(arguments -> {
Object[] args = arguments.get();
String backupPath = (String) args[0];
String basePath = (String) args[1];
int expectedResult = (Integer) args[2];
HoodieRepairTool.Config config = new HoodieRepairTool.Config();
config.backupPath = backupPath;
config.basePath = basePath;
HoodieRepairTool tool = new HoodieRepairTool(jsc, config);
assertEquals(expectedResult, tool.checkBackupPathAgainstBasePath());
});
}
private Stream<Arguments> configPathParamsWithFS() throws IOException {
SecureRandom random = new SecureRandom();
long randomLong = random.nextLong();
String emptyBackupPath = "/tmp/empty_backup_" + randomLong;
FSUtils.createPathIfNotExists(metaClient.getFs(), new Path(emptyBackupPath));
String nonEmptyBackupPath = "/tmp/nonempty_backup_" + randomLong;
FSUtils.createPathIfNotExists(metaClient.getFs(), new Path(nonEmptyBackupPath));
FSUtils.createPathIfNotExists(metaClient.getFs(), new Path(nonEmptyBackupPath, ".hoodie"));
Object[][] data = new Object[][] {
{null, basePath, 0}, {"/tmp/backup", basePath, 0},
{emptyBackupPath, basePath, 0}, {basePath + "/backup", basePath, -1},
{nonEmptyBackupPath, basePath, -1},
};
return Stream.of(data).map(Arguments::of);
}
@Test
public void testCheckBackupPathForRepair() throws IOException {
for (Arguments arguments: configPathParamsWithFS().collect(Collectors.toList())) {
Object[] args = arguments.get();
String backupPath = (String) args[0];
String basePath = (String) args[1];
int expectedResult = (Integer) args[2];
HoodieRepairTool.Config config = new HoodieRepairTool.Config();
config.backupPath = backupPath;
config.basePath = basePath;
HoodieRepairTool tool = new HoodieRepairTool(jsc, config);
assertEquals(expectedResult, tool.checkBackupPathForRepair());
if (backupPath == null) {
// Backup path should be created if not provided
assertNotNull(config.backupPath);
}
}
}
@Test
public void testRepairWithIntactInstants() throws IOException {
testRepairToolWithMode(
Option.empty(), Option.empty(), HoodieRepairTool.Mode.REPAIR.toString(),
backupTempDir.toAbsolutePath().toString(), true,
allFileAbsolutePathList, Collections.emptyList());
}
@Test
public void testRepairWithBrokenInstants() throws IOException {
List<String> tableDanglingFileList = createDanglingDataFilesInFS(basePath);
String backupPath = backupTempDir.toAbsolutePath().toString();
List<String> backupDanglingFileList = DANGLING_DATA_FILE_LIST.stream()
.map(filePath -> new Path(backupPath, filePath).toString())
.collect(Collectors.toList());
List<String> existingFileList = new ArrayList<>(allFileAbsolutePathList);
existingFileList.addAll(backupDanglingFileList);
testRepairToolWithMode(
Option.empty(), Option.empty(), HoodieRepairTool.Mode.REPAIR.toString(),
backupPath, true,
existingFileList, tableDanglingFileList);
}
@Test
public void testRepairWithOneBrokenInstant() throws IOException {
List<String> tableDanglingFileList = createDanglingDataFilesInFS(basePath);
String backupPath = backupTempDir.toAbsolutePath().toString();
List<String> backupDanglingFileList = DANGLING_DATA_FILE_LIST
.subList(1, 2).stream()
.map(filePath -> new Path(backupPath, filePath).toString())
.collect(Collectors.toList());
List<String> existingFileList = new ArrayList<>(allFileAbsolutePathList);
existingFileList.addAll(backupDanglingFileList);
existingFileList.addAll(tableDanglingFileList.subList(0, 1));
testRepairToolWithMode(
Option.of("001"), Option.empty(), HoodieRepairTool.Mode.REPAIR.toString(),
backupPath, true,
existingFileList, tableDanglingFileList.subList(1, 2));
}
@Test
public void testDryRunWithBrokenInstants() throws IOException {
List<String> tableDanglingFileList = createDanglingDataFilesInFS(basePath);
String backupPath = backupTempDir.toAbsolutePath().toString();
List<String> backupDanglingFileList = DANGLING_DATA_FILE_LIST.stream()
.map(filePath -> new Path(backupPath, filePath).toString())
.collect(Collectors.toList());
List<String> existingFileList = new ArrayList<>(allFileAbsolutePathList);
existingFileList.addAll(tableDanglingFileList);
testRepairToolWithMode(
Option.empty(), Option.empty(), HoodieRepairTool.Mode.DRY_RUN.toString(),
backupPath, true,
existingFileList, backupDanglingFileList);
}
@Test
public void testDryRunWithOneBrokenInstant() throws IOException {
List<String> tableDanglingFileList = createDanglingDataFilesInFS(basePath);
String backupPath = backupTempDir.toAbsolutePath().toString();
List<String> backupDanglingFileList = DANGLING_DATA_FILE_LIST.stream()
.map(filePath -> new Path(backupPath, filePath).toString())
.collect(Collectors.toList());
List<String> existingFileList = new ArrayList<>(allFileAbsolutePathList);
existingFileList.addAll(tableDanglingFileList);
testRepairToolWithMode(
Option.of("001"), Option.empty(), HoodieRepairTool.Mode.DRY_RUN.toString(),
backupPath, true,
existingFileList, backupDanglingFileList);
}
@Test
public void testUndoWithNonExistentBackupPath() throws IOException {
String backupPath = backupTempDir.toAbsolutePath().toString();
metaClient.getFs().delete(new Path(backupPath), true);
testRepairToolWithMode(
Option.empty(), Option.empty(), HoodieRepairTool.Mode.UNDO.toString(),
backupPath, false,
allFileAbsolutePathList, Collections.emptyList());
}
@Test
public void testUndoWithExistingBackupPath() throws IOException {
String backupPath = backupTempDir.toAbsolutePath().toString();
List<String> backupDanglingFileList = createDanglingDataFilesInFS(backupPath);
List<String> restoreDanglingFileList = DANGLING_DATA_FILE_LIST.stream()
.map(filePath -> new Path(basePath, filePath).toString())
.collect(Collectors.toList());
List<String> existingFileList = new ArrayList<>(allFileAbsolutePathList);
existingFileList.addAll(backupDanglingFileList);
existingFileList.addAll(restoreDanglingFileList);
verifyFilesInFS(allFileAbsolutePathList, restoreDanglingFileList);
verifyFilesInFS(backupDanglingFileList, Collections.emptyList());
testRepairToolWithMode(
Option.empty(), Option.empty(), HoodieRepairTool.Mode.UNDO.toString(),
backupPath, true,
existingFileList, Collections.emptyList());
// Second run should fail
testRepairToolWithMode(
Option.empty(), Option.empty(), HoodieRepairTool.Mode.UNDO.toString(),
backupPath, false,
existingFileList, Collections.emptyList());
}
private void testRepairToolWithMode(
Option<String> startingInstantOption, Option<String> endingInstantOption,
String runningMode, String backupPath, boolean isRunSuccessful,
List<String> existFilePathList, List<String> nonExistFilePathList) throws IOException {
HoodieRepairTool.Config config = new HoodieRepairTool.Config();
config.backupPath = backupPath;
config.basePath = basePath;
config.assumeDatePartitioning = true;
if (startingInstantOption.isPresent()) {
config.startingInstantTime = startingInstantOption.get();
}
if (endingInstantOption.isPresent()) {
config.endingInstantTime = endingInstantOption.get();
}
config.runningMode = runningMode;
HoodieRepairTool tool = new HoodieRepairTool(jsc, config);
assertEquals(isRunSuccessful, tool.run());
verifyFilesInFS(existFilePathList, nonExistFilePathList);
}
private void verifyFilesInFS(
List<String> existFilePathList, List<String> nonExistFilePathList) throws IOException {
FileSystem fs = metaClient.getFs();
for (String filePath : existFilePathList) {
assertTrue(fs.exists(new Path(filePath)),
String.format("File %s should exist but it's not in the file system", filePath));
}
for (String filePath : nonExistFilePathList) {
assertFalse(fs.exists(new Path(filePath)),
String.format("File %s should not exist but it's in the file system", filePath));
}
}
private List<String> createDanglingDataFilesInFS(String parentPath) {
FileSystem fs = metaClient.getFs();
return DANGLING_DATA_FILE_LIST.stream().map(relativeFilePath -> {
Path path = new Path(parentPath, relativeFilePath);
try {
fs.mkdirs(path.getParent());
if (!fs.exists(path)) {
fs.create(path, false);
}
} catch (IOException e) {
LOG.error("Error creating file: " + path);
}
return path.toString();
})
.collect(Collectors.toList());
}
@Override
public HoodieEngineContext context() {
return context;
}
@Override
public SparkSession spark() {
return spark;
}
@Override
public SQLContext sqlContext() {
return sqlContext;
}
@Override
public JavaSparkContext jsc() {
return jsc;
}
}