1
0

[HUDI-2395] Metadata tests rewrite (#3695)

- Added commit metadata infra to test table so that we can test entire metadata using test table itself. These tests don't care about the contents of files as such and hence we should be able to test all code paths for metadata using test table.

Co-authored-by: Sivabalan Narayanan <n.siva.b@gmail.com>
This commit is contained in:
Sagar Sumit
2021-09-24 01:10:11 +05:30
committed by GitHub
parent 5515a0d319
commit eeafd24f4c
7 changed files with 1197 additions and 776 deletions

View File

@@ -167,6 +167,9 @@ public class FSUtils {
}
public static String getCommitTime(String fullFileName) {
if (isLogFile(new Path(fullFileName))) {
return fullFileName.split("_")[1].split("\\.")[0];
}
return fullFileName.split("_")[2].split("\\.")[0];
}

View File

@@ -50,6 +50,7 @@ import java.util.UUID;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import static org.apache.hudi.common.model.HoodieFileFormat.HOODIE_LOG;
import static org.apache.hudi.common.table.timeline.HoodieActiveTimeline.COMMIT_FORMATTER;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
@@ -157,6 +158,9 @@ public class TestFSUtils extends HoodieCommonTestHarness {
String fileName = UUID.randomUUID().toString();
String fullFileName = FSUtils.makeDataFileName(instantTime, TEST_WRITE_TOKEN, fileName);
assertEquals(instantTime, FSUtils.getCommitTime(fullFileName));
// test log file name
fullFileName = FSUtils.makeLogFileName(fileName, HOODIE_LOG.getFileExtension(), instantTime, 1, TEST_WRITE_TOKEN);
assertEquals(instantTime, FSUtils.getCommitTime(fullFileName));
}
@Test

View File

@@ -39,6 +39,8 @@ import org.apache.hudi.common.util.Option;
import org.apache.hudi.exception.HoodieException;
import org.apache.hadoop.fs.FileSystem;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import java.io.IOException;
import java.io.RandomAccessFile;
@@ -48,8 +50,11 @@ import java.nio.file.Path;
import java.nio.file.Paths;
import java.nio.file.attribute.FileTime;
import java.time.Instant;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import static org.apache.hudi.common.table.timeline.TimelineMetadataUtils.serializeCleanMetadata;
import static org.apache.hudi.common.table.timeline.TimelineMetadataUtils.serializeCleanerPlan;
@@ -59,6 +64,8 @@ import static org.apache.hudi.common.table.timeline.TimelineMetadataUtils.serial
public class FileCreateUtils {
private static final Logger LOG = LogManager.getLogger(FileCreateUtils.class);
private static final String WRITE_TOKEN = "1-0-1";
private static final String BASE_FILE_EXTENSION = HoodieTableConfig.BASE_FILE_FORMAT.defaultValue().getFileExtension();
@@ -216,6 +223,10 @@ public class FileCreateUtils {
createAuxiliaryMetaFile(basePath, instantTime, HoodieTimeline.REQUESTED_COMPACTION_EXTENSION);
}
public static void createInflightCompaction(String basePath, String instantTime) throws IOException {
createAuxiliaryMetaFile(basePath, instantTime, HoodieTimeline.REQUESTED_COMPACTION_EXTENSION);
}
public static void createPartitionMetaFile(String basePath, String partitionPath) throws IOException {
Path parentPath = Paths.get(basePath, partitionPath);
Files.createDirectories(parentPath);
@@ -307,6 +318,13 @@ public class FileCreateUtils {
.endsWith(String.format("%s.%s", HoodieTableMetaClient.MARKER_EXTN, ioType))).count();
}
public static List<Path> getPartitionPaths(Path basePath) throws IOException {
if (Files.notExists(basePath)) {
return Collections.emptyList();
}
return Files.list(basePath).filter(entry -> !entry.getFileName().toString().equals(HoodieTableMetaClient.METAFOLDER_NAME)).collect(Collectors.toList());
}
/**
* Find total basefiles for passed in paths.
*/

View File

@@ -75,7 +75,11 @@ public class FileSystemTestUtils {
}
public static List<FileStatus> listRecursive(FileSystem fs, Path path) throws IOException {
RemoteIterator<LocatedFileStatus> itr = fs.listFiles(path, true);
return listFiles(fs, path, true);
}
public static List<FileStatus> listFiles(FileSystem fs, Path path, boolean recursive) throws IOException {
RemoteIterator<LocatedFileStatus> itr = fs.listFiles(path, recursive);
List<FileStatus> statuses = new ArrayList<>();
while (itr.hasNext()) {
statuses.add(itr.next());

View File

@@ -19,52 +19,79 @@
package org.apache.hudi.common.testutils;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hudi.avro.model.HoodieActionInstant;
import org.apache.hudi.avro.model.HoodieCleanMetadata;
import org.apache.hudi.avro.model.HoodieCleanerPlan;
import org.apache.hudi.avro.model.HoodieCompactionPlan;
import org.apache.hudi.avro.model.HoodieInstantInfo;
import org.apache.hudi.avro.model.HoodieRequestedReplaceMetadata;
import org.apache.hudi.avro.model.HoodieRollbackMetadata;
import org.apache.hudi.avro.model.HoodieRollbackPartitionMetadata;
import org.apache.hudi.avro.model.HoodieSavepointMetadata;
import org.apache.hudi.avro.model.HoodieSavepointPartitionMetadata;
import org.apache.hudi.common.HoodieCleanStat;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.FileSlice;
import org.apache.hudi.common.model.HoodieCleaningPolicy;
import org.apache.hudi.common.model.HoodieCommitMetadata;
import org.apache.hudi.common.model.HoodieFileFormat;
import org.apache.hudi.common.model.HoodiePartitionMetadata;
import org.apache.hudi.common.model.HoodieReplaceCommitMetadata;
import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.model.HoodieWriteStat;
import org.apache.hudi.common.model.IOType;
import org.apache.hudi.common.model.WriteOperationType;
import org.apache.hudi.common.table.HoodieTableConfig;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.table.timeline.TimelineMetadataUtils;
import org.apache.hudi.common.table.timeline.versioning.clean.CleanPlanV2MigrationHandler;
import org.apache.hudi.common.util.CompactionUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.ValidationUtils;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hudi.exception.HoodieIOException;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.time.Instant;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Comparator;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Random;
import java.util.UUID;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import java.util.stream.Stream;
import static java.time.temporal.ChronoUnit.SECONDS;
import static org.apache.hudi.common.model.HoodieTableType.MERGE_ON_READ;
import static org.apache.hudi.common.model.WriteOperationType.CLUSTER;
import static org.apache.hudi.common.model.WriteOperationType.COMPACT;
import static org.apache.hudi.common.model.WriteOperationType.UPSERT;
import static org.apache.hudi.common.table.timeline.HoodieActiveTimeline.COMMIT_FORMATTER;
import static org.apache.hudi.common.table.timeline.HoodieTimeline.CLEAN_ACTION;
import static org.apache.hudi.common.table.timeline.HoodieTimeline.REPLACE_COMMIT_ACTION;
import static org.apache.hudi.common.testutils.FileCreateUtils.baseFileName;
import static org.apache.hudi.common.testutils.FileCreateUtils.createCleanFile;
import static org.apache.hudi.common.testutils.FileCreateUtils.createCommit;
import static org.apache.hudi.common.testutils.FileCreateUtils.createDeltaCommit;
import static org.apache.hudi.common.testutils.FileCreateUtils.createInflightCleanFile;
import static org.apache.hudi.common.testutils.FileCreateUtils.createInflightCommit;
import static org.apache.hudi.common.testutils.FileCreateUtils.createInflightCompaction;
import static org.apache.hudi.common.testutils.FileCreateUtils.createInflightDeltaCommit;
import static org.apache.hudi.common.testutils.FileCreateUtils.createInflightReplaceCommit;
import static org.apache.hudi.common.testutils.FileCreateUtils.createInflightRollbackFile;
@@ -77,9 +104,19 @@ import static org.apache.hudi.common.testutils.FileCreateUtils.createRequestedDe
import static org.apache.hudi.common.testutils.FileCreateUtils.createRequestedReplaceCommit;
import static org.apache.hudi.common.testutils.FileCreateUtils.createRollbackFile;
import static org.apache.hudi.common.testutils.FileCreateUtils.logFileName;
import static org.apache.hudi.common.util.CleanerUtils.convertCleanMetadata;
import static org.apache.hudi.common.util.CollectionUtils.createImmutableMap;
import static org.apache.hudi.common.util.CommitUtils.buildMetadata;
import static org.apache.hudi.common.util.CommitUtils.getCommitActionType;
import static org.apache.hudi.common.util.StringUtils.EMPTY_STRING;
public class HoodieTestTable {
private static final Logger LOG = LogManager.getLogger(HoodieTestTable.class);
private static final Random RANDOM = new Random();
private static HoodieTestTableState testTableState;
private final List<String> inflightCommits = new ArrayList<>();
protected final String basePath;
protected final FileSystem fs;
protected HoodieTableMetaClient metaClient;
@@ -94,6 +131,7 @@ public class HoodieTestTable {
}
public static HoodieTestTable of(HoodieTableMetaClient metaClient) {
testTableState = HoodieTestTableState.of();
return new HoodieTestTable(metaClient.getBasePath(), metaClient.getRawFs(), metaClient);
}
@@ -130,6 +168,7 @@ public class HoodieTestTable {
public HoodieTestTable addInflightCommit(String instantTime) throws Exception {
createRequestedCommit(basePath, instantTime);
createInflightCommit(basePath, instantTime);
inflightCommits.add(instantTime);
currentInstantTime = instantTime;
metaClient = HoodieTableMetaClient.reload(metaClient);
return this;
@@ -144,6 +183,28 @@ public class HoodieTestTable {
return this;
}
public HoodieCommitMetadata createCommitMetadata(WriteOperationType operationType, String commitTime, HoodieTestTableState testTableState) {
String actionType = getCommitActionType(operationType, metaClient.getTableType());
return createCommitMetadata(operationType, commitTime, Collections.emptyMap(), testTableState, false, actionType);
}
public HoodieCommitMetadata createCommitMetadata(WriteOperationType operationType, String commitTime,
HoodieTestTableState testTableState, boolean bootstrap) {
String actionType = getCommitActionType(operationType, metaClient.getTableType());
return createCommitMetadata(operationType, commitTime, Collections.emptyMap(), testTableState, bootstrap, actionType);
}
public HoodieCommitMetadata createCommitMetadata(WriteOperationType operationType, String commitTime,
Map<String, List<String>> partitionToReplaceFileIds,
HoodieTestTableState testTableState, boolean bootstrap, String action) {
List<HoodieWriteStat> writeStats = generateHoodieWriteStatForPartition(testTableState.getPartitionToBaseFileInfoMap(commitTime), commitTime, bootstrap);
if (MERGE_ON_READ.equals(metaClient.getTableType()) && UPSERT.equals(operationType)) {
writeStats.addAll(generateHoodieWriteStatForPartitionLogFiles(testTableState.getPartitionToLogFileInfoMap(commitTime), commitTime, bootstrap));
}
Map<String, String> extraMetadata = createImmutableMap("test", "test");
return buildMetadata(writeStats, partitionToReplaceFileIds, Option.of(extraMetadata), operationType, EMPTY_STRING, action);
}
public HoodieTestTable addCommit(String instantTime, HoodieCommitMetadata metadata) throws Exception {
createRequestedCommit(basePath, instantTime);
createInflightCommit(basePath, instantTime);
@@ -153,6 +214,14 @@ public class HoodieTestTable {
return this;
}
public HoodieTestTable moveInflightCommitToComplete(String instantTime, HoodieCommitMetadata metadata) throws IOException {
createCommit(basePath, instantTime, metadata);
inflightCommits.remove(instantTime);
currentInstantTime = instantTime;
metaClient = HoodieTableMetaClient.reload(metaClient);
return this;
}
public HoodieTestTable addDeltaCommit(String instantTime) throws Exception {
createRequestedDeltaCommit(basePath, instantTime);
createInflightDeltaCommit(basePath, instantTime);
@@ -199,6 +268,31 @@ public class HoodieTestTable {
return this;
}
public HoodieTestTable addClean(String instantTime) throws IOException {
HoodieCleanerPlan cleanerPlan = new HoodieCleanerPlan(new HoodieActionInstant(EMPTY_STRING, EMPTY_STRING, EMPTY_STRING), EMPTY_STRING, new HashMap<>(),
CleanPlanV2MigrationHandler.VERSION, new HashMap<>());
HoodieCleanStat cleanStats = new HoodieCleanStat(
HoodieCleaningPolicy.KEEP_LATEST_FILE_VERSIONS,
HoodieTestUtils.DEFAULT_PARTITION_PATHS[RANDOM.nextInt(HoodieTestUtils.DEFAULT_PARTITION_PATHS.length)],
Collections.emptyList(),
Collections.emptyList(),
Collections.emptyList(),
instantTime);
HoodieCleanMetadata cleanMetadata = convertCleanMetadata(instantTime, Option.of(0L), Collections.singletonList(cleanStats));
return HoodieTestTable.of(metaClient).addClean(instantTime, cleanerPlan, cleanMetadata);
}
public Pair<HoodieCleanerPlan, HoodieCleanMetadata> getHoodieCleanMetadata(String commitTime, HoodieTestTableState testTableState) {
HoodieCleanerPlan cleanerPlan = new HoodieCleanerPlan(new HoodieActionInstant(commitTime, CLEAN_ACTION, EMPTY_STRING), EMPTY_STRING, new HashMap<>(),
CleanPlanV2MigrationHandler.VERSION, new HashMap<>());
List<HoodieCleanStat> cleanStats = new ArrayList<>();
for (Map.Entry<String, List<String>> entry : testTableState.getPartitionToFileIdMapForCleaner(commitTime).entrySet()) {
cleanStats.add(new HoodieCleanStat(HoodieCleaningPolicy.KEEP_LATEST_FILE_VERSIONS,
entry.getKey(), entry.getValue(), entry.getValue(), Collections.emptyList(), commitTime));
}
return Pair.of(cleanerPlan, convertCleanMetadata(commitTime, Option.of(0L), cleanStats));
}
public HoodieTestTable addInflightRollback(String instantTime) throws IOException {
createInflightRollbackFile(basePath, instantTime);
currentInstantTime = instantTime;
@@ -214,6 +308,61 @@ public class HoodieTestTable {
return this;
}
public HoodieRollbackMetadata getRollbackMetadata(String instantTimeToDelete, Map<String, List<String>> partitionToFilesMeta) throws Exception {
HoodieRollbackMetadata rollbackMetadata = new HoodieRollbackMetadata();
rollbackMetadata.setCommitsRollback(Collections.singletonList(instantTimeToDelete));
rollbackMetadata.setStartRollbackTime(instantTimeToDelete);
Map<String, HoodieRollbackPartitionMetadata> partitionMetadataMap = new HashMap<>();
for (Map.Entry<String, List<String>> entry : partitionToFilesMeta.entrySet()) {
HoodieRollbackPartitionMetadata rollbackPartitionMetadata = new HoodieRollbackPartitionMetadata();
rollbackPartitionMetadata.setPartitionPath(entry.getKey());
rollbackPartitionMetadata.setSuccessDeleteFiles(entry.getValue());
rollbackPartitionMetadata.setFailedDeleteFiles(new ArrayList<>());
rollbackPartitionMetadata.setWrittenLogFiles(getWrittenLogFiles(instantTimeToDelete, entry));
rollbackPartitionMetadata.setRollbackLogFiles(createImmutableMap(logFileName(instantTimeToDelete, UUID.randomUUID().toString(), 0), (long) (100 + RANDOM.nextInt(500))));
partitionMetadataMap.put(entry.getKey(), rollbackPartitionMetadata);
}
rollbackMetadata.setPartitionMetadata(partitionMetadataMap);
rollbackMetadata.setInstantsRollback(Collections.singletonList(new HoodieInstantInfo(instantTimeToDelete, HoodieTimeline.ROLLBACK_ACTION)));
return rollbackMetadata;
}
/**
* Return a map of log file name to file size that were expected to be rolled back in that partition.
*/
private Map<String, Long> getWrittenLogFiles(String instant, Map.Entry<String, List<String>> entry) {
Map<String, Long> writtenLogFiles = new HashMap<>();
for (String fileName : entry.getValue()) {
if (FSUtils.isLogFile(new Path(fileName))) {
if (testTableState.getPartitionToLogFileInfoMap(instant) != null
&& testTableState.getPartitionToLogFileInfoMap(instant).containsKey(entry.getKey())) {
List<Pair<String, Integer[]>> fileInfos = testTableState.getPartitionToLogFileInfoMap(instant).get(entry.getKey());
for (Pair<String, Integer[]> fileInfo : fileInfos) {
if (fileName.equals(logFileName(instant, fileInfo.getLeft(), fileInfo.getRight()[0]))) {
writtenLogFiles.put(fileName, Long.valueOf(fileInfo.getRight()[1]));
}
}
}
}
}
return writtenLogFiles;
}
public HoodieSavepointMetadata getSavepointMetadata(String instant, Map<String, List<String>> partitionToFilesMeta) {
HoodieSavepointMetadata savepointMetadata = new HoodieSavepointMetadata();
savepointMetadata.setSavepointedAt(Long.valueOf(instant));
Map<String, HoodieSavepointPartitionMetadata> partitionMetadataMap = new HashMap<>();
for (Map.Entry<String, List<String>> entry : partitionToFilesMeta.entrySet()) {
HoodieSavepointPartitionMetadata savepointPartitionMetadata = new HoodieSavepointPartitionMetadata();
savepointPartitionMetadata.setPartitionPath(entry.getKey());
savepointPartitionMetadata.setSavepointDataFile(entry.getValue());
partitionMetadataMap.put(entry.getKey(), savepointPartitionMetadata);
}
savepointMetadata.setPartitionMetadata(partitionMetadataMap);
savepointMetadata.setSavepointedBy("test");
return savepointMetadata;
}
public HoodieTestTable addRequestedCompaction(String instantTime) throws IOException {
createRequestedCompaction(basePath, instantTime);
currentInstantTime = instantTime;
@@ -235,6 +384,13 @@ public class HoodieTestTable {
return addRequestedCompaction(instantTime, plan);
}
public HoodieTestTable addCompaction(String instantTime, HoodieCommitMetadata commitMetadata) throws Exception {
createRequestedCompaction(basePath, instantTime);
createInflightCompaction(basePath, instantTime);
return HoodieTestTable.of(metaClient)
.addCommit(instantTime, commitMetadata);
}
public HoodieTestTable forCommit(String instantTime) {
currentInstantTime = instantTime;
return this;
@@ -311,6 +467,13 @@ public class HoodieTestTable {
return this;
}
public HoodieTestTable withBaseFilesInPartition(String partition, List<Pair<String, Integer>> fileInfos) throws Exception {
for (Pair<String, Integer> fileInfo : fileInfos) {
FileCreateUtils.createBaseFile(basePath, partition, currentInstantTime, fileInfo.getKey(), fileInfo.getValue());
}
return this;
}
public String getFileIdWithLogFile(String partitionPath) throws Exception {
String fileId = UUID.randomUUID().toString();
withLogFile(partitionPath, fileId);
@@ -328,6 +491,13 @@ public class HoodieTestTable {
return this;
}
public HoodieTestTable withLogFilesInPartition(String partition, List<Pair<String, Integer[]>> fileInfos) throws Exception {
for (Pair<String, Integer[]> fileInfo : fileInfos) {
FileCreateUtils.createLogFile(basePath, partition, currentInstantTime, fileInfo.getKey(), fileInfo.getValue()[0], fileInfo.getValue()[1]);
}
return this;
}
public boolean inflightCommitExists(String instantTime) {
try {
return fs.exists(getInflightCommitFilePath(instantTime));
@@ -388,6 +558,11 @@ public class HoodieTestTable {
return new Path(Paths.get(basePath, partition).toUri());
}
public List<java.nio.file.Path> getAllPartitionPaths() throws IOException {
java.nio.file.Path basePathPath = Paths.get(basePath, HoodieTableMetaClient.TEMPFOLDER_NAME).getParent().getParent();
return FileCreateUtils.getPartitionPaths(basePathPath);
}
public Path getBaseFilePath(String partition, String fileId) {
return new Path(Paths.get(basePath, partition, getBaseFileNameById(fileId)).toUri());
}
@@ -396,6 +571,24 @@ public class HoodieTestTable {
return baseFileName(currentInstantTime, fileId);
}
public Path getLogFilePath(String partition, String fileId, int version) {
return new Path(Paths.get(basePath, partition, getLogFileNameById(fileId, version)).toString());
}
public String getLogFileNameById(String fileId, int version) {
return logFileName(currentInstantTime, fileId, version);
}
public List<String> getEarliestFilesInPartition(String partition, int count) throws IOException {
List<FileStatus> fileStatuses = Arrays.asList(listAllFilesInPartition(partition));
fileStatuses.sort(Comparator.comparing(FileStatus::getModificationTime));
return fileStatuses.subList(0, count).stream().map(entry -> entry.getPath().getName()).collect(Collectors.toList());
}
public List<String> inflightCommits() {
return this.inflightCommits;
}
public FileStatus[] listAllBaseFiles() throws IOException {
return listAllBaseFiles(HoodieTableConfig.BASE_FILE_FORMAT.defaultValue().getFileExtension());
}
@@ -421,16 +614,356 @@ public class HoodieTestTable {
}
public FileStatus[] listAllFilesInPartition(String partitionPath) throws IOException {
return FileSystemTestUtils.listRecursive(fs, new Path(Paths.get(basePath, partitionPath).toString())).toArray(new FileStatus[0]);
return FileSystemTestUtils.listRecursive(fs, new Path(Paths.get(basePath, partitionPath).toString())).stream()
.filter(entry -> {
boolean toReturn = true;
String fileName = entry.getPath().getName();
if (fileName.equals(HoodiePartitionMetadata.HOODIE_PARTITION_METAFILE)) {
toReturn = false;
} else {
for (String inflight : inflightCommits) {
if (fileName.contains(inflight)) {
toReturn = false;
break;
}
}
}
return toReturn;
}).toArray(FileStatus[]::new);
}
public FileStatus[] listAllFilesInTempFolder() throws IOException {
return FileSystemTestUtils.listRecursive(fs, new Path(Paths.get(basePath, HoodieTableMetaClient.TEMPFOLDER_NAME).toString())).toArray(new FileStatus[0]);
}
public void deleteFilesInPartition(String partitionPath, List<String> filesToDelete) throws IOException {
FileStatus[] allFiles = listAllFilesInPartition(partitionPath);
Arrays.stream(allFiles).filter(entry -> filesToDelete.contains(entry.getPath().getName())).forEach(entry -> {
try {
Files.delete(Paths.get(basePath, partitionPath, entry.getPath().getName()));
} catch (IOException e) {
throw new HoodieTestTableException(e);
}
});
}
public HoodieTestTable doRollback(String commitTimeToRollback, String commitTime) throws Exception {
Option<HoodieCommitMetadata> commitMetadata = getMetadataForInstant(commitTimeToRollback);
if (!commitMetadata.isPresent()) {
throw new IllegalArgumentException("Instant to rollback not present in timeline: " + commitTimeToRollback);
}
Map<String, List<String>> partitionFiles = getPartitionFiles(commitMetadata.get());
HoodieRollbackMetadata rollbackMetadata = getRollbackMetadata(commitTimeToRollback, partitionFiles);
for (Map.Entry<String, List<String>> entry : partitionFiles.entrySet()) {
deleteFilesInPartition(entry.getKey(), entry.getValue());
}
return addRollback(commitTime, rollbackMetadata);
}
public HoodieTestTable doCluster(String commitTime, Map<String, List<String>> partitionToReplaceFileIds) throws Exception {
Map<String, List<Pair<String, Integer>>> partitionToReplaceFileIdsWithLength = new HashMap<>();
for (Map.Entry<String, List<String>> entry : partitionToReplaceFileIds.entrySet()) {
String partition = entry.getKey();
partitionToReplaceFileIdsWithLength.put(entry.getKey(), new ArrayList<>());
for (String fileId : entry.getValue()) {
int length = 100 + RANDOM.nextInt(500);
partitionToReplaceFileIdsWithLength.get(partition).add(Pair.of(fileId, length));
}
}
List<HoodieWriteStat> writeStats = generateHoodieWriteStatForPartition(partitionToReplaceFileIdsWithLength, commitTime, false);
HoodieReplaceCommitMetadata replaceMetadata =
(HoodieReplaceCommitMetadata) buildMetadata(writeStats, partitionToReplaceFileIds, Option.empty(), CLUSTER, EMPTY_STRING, REPLACE_COMMIT_ACTION);
return addReplaceCommit(commitTime, Option.empty(), Option.empty(), replaceMetadata);
}
public HoodieCleanMetadata doClean(String commitTime, Map<String, Integer> partitionFileCountsToDelete) throws IOException {
Map<String, List<String>> partitionFilesToDelete = new HashMap<>();
for (Map.Entry<String, Integer> entry : partitionFileCountsToDelete.entrySet()) {
partitionFilesToDelete.put(entry.getKey(), getEarliestFilesInPartition(entry.getKey(), entry.getValue()));
}
HoodieTestTableState testTableState = new HoodieTestTableState();
for (Map.Entry<String, List<String>> entry : partitionFilesToDelete.entrySet()) {
testTableState = testTableState.createTestTableStateForCleaner(commitTime, entry.getKey(), entry.getValue());
deleteFilesInPartition(entry.getKey(), entry.getValue());
}
Pair<HoodieCleanerPlan, HoodieCleanMetadata> cleanerMeta = getHoodieCleanMetadata(commitTime, testTableState);
addClean(commitTime, cleanerMeta.getKey(), cleanerMeta.getValue());
return cleanerMeta.getValue();
}
public HoodieCleanMetadata doCleanBasedOnCommits(String cleanCommitTime, List<String> commitsToClean) throws IOException {
Map<String, Integer> partitionFileCountsToDelete = new HashMap<>();
for (String commitTime : commitsToClean) {
Option<HoodieCommitMetadata> commitMetadata = getMetadataForInstant(commitTime);
if (commitMetadata.isPresent()) {
Map<String, List<String>> partitionFiles = getPartitionFiles(commitMetadata.get());
for (String partition : partitionFiles.keySet()) {
partitionFileCountsToDelete.put(partition, partitionFiles.get(partition).size() + partitionFileCountsToDelete.getOrDefault(partition, 0));
}
}
}
return doClean(cleanCommitTime, partitionFileCountsToDelete);
}
public HoodieSavepointMetadata doSavepoint(String commitTime) throws IOException {
Option<HoodieCommitMetadata> commitMetadata = getMetadataForInstant(commitTime);
if (!commitMetadata.isPresent()) {
throw new IllegalArgumentException("Instant to rollback not present in timeline: " + commitTime);
}
Map<String, List<String>> partitionFiles = getPartitionFiles(commitMetadata.get());
HoodieSavepointMetadata savepointMetadata = getSavepointMetadata(commitTime, partitionFiles);
for (Map.Entry<String, List<String>> entry : partitionFiles.entrySet()) {
deleteFilesInPartition(entry.getKey(), entry.getValue());
}
return savepointMetadata;
}
public HoodieTestTable doCompaction(String commitTime, List<String> partitions) throws Exception {
this.currentInstantTime = commitTime;
if (partitions.isEmpty()) {
partitions = Collections.singletonList(EMPTY_STRING);
}
HoodieTestTableState testTableState = getTestTableStateWithPartitionFileInfo(COMPACT, metaClient.getTableType(), commitTime, partitions, 1);
HoodieCommitMetadata commitMetadata = createCommitMetadata(COMPACT, commitTime, testTableState);
for (String partition : partitions) {
this.withBaseFilesInPartition(partition, testTableState.getPartitionToBaseFileInfoMap(commitTime).get(partition));
}
return addCompaction(commitTime, commitMetadata);
}
public HoodieCommitMetadata doWriteOperation(String commitTime, WriteOperationType operationType,
List<String> partitions, int filesPerPartition) throws Exception {
return doWriteOperation(commitTime, operationType, Collections.emptyList(), partitions, filesPerPartition, false);
}
public HoodieCommitMetadata doWriteOperation(String commitTime, WriteOperationType operationType,
List<String> newPartitionsToAdd, List<String> partitions,
int filesPerPartition) throws Exception {
return doWriteOperation(commitTime, operationType, newPartitionsToAdd, partitions, filesPerPartition, false);
}
public HoodieCommitMetadata doWriteOperation(String commitTime, WriteOperationType operationType,
List<String> newPartitionsToAdd, List<String> partitions,
int filesPerPartition, boolean bootstrap) throws Exception {
return doWriteOperation(commitTime, operationType, newPartitionsToAdd, partitions, filesPerPartition, bootstrap, false);
}
public HoodieCommitMetadata doWriteOperation(String commitTime, WriteOperationType operationType,
List<String> partitions, int filesPerPartition, boolean bootstrap) throws Exception {
return doWriteOperation(commitTime, operationType, Collections.emptyList(), partitions, filesPerPartition, bootstrap, false);
}
public HoodieCommitMetadata doWriteOperation(String commitTime, WriteOperationType operationType,
List<String> newPartitionsToAdd, List<String> partitions,
int filesPerPartition, boolean bootstrap, boolean createInflightCommit) throws Exception {
if (partitions.isEmpty()) {
partitions = Collections.singletonList(EMPTY_STRING);
}
HoodieTestTableState testTableState = getTestTableStateWithPartitionFileInfo(operationType, metaClient.getTableType(), commitTime, partitions, filesPerPartition);
HoodieCommitMetadata commitMetadata = createCommitMetadata(operationType, commitTime, testTableState, bootstrap);
for (String str : newPartitionsToAdd) {
this.withPartitionMetaFiles(str);
}
if (createInflightCommit) {
this.addInflightCommit(commitTime);
} else {
this.addCommit(commitTime, commitMetadata);
}
for (String partition : partitions) {
this.withBaseFilesInPartition(partition, testTableState.getPartitionToBaseFileInfoMap(commitTime).get(partition));
if (MERGE_ON_READ.equals(metaClient.getTableType()) && UPSERT.equals(operationType)) {
this.withLogFilesInPartition(partition, testTableState.getPartitionToLogFileInfoMap(commitTime).get(partition));
}
}
return commitMetadata;
}
private Option<HoodieCommitMetadata> getMetadataForInstant(String instantTime) {
Option<HoodieInstant> hoodieInstant = metaClient.getActiveTimeline().getCommitsTimeline()
.filterCompletedInstants().filter(i -> i.getTimestamp().equals(instantTime)).firstInstant();
try {
if (hoodieInstant.isPresent()) {
switch (hoodieInstant.get().getAction()) {
case HoodieTimeline.REPLACE_COMMIT_ACTION:
HoodieReplaceCommitMetadata replaceCommitMetadata = HoodieReplaceCommitMetadata
.fromBytes(metaClient.getActiveTimeline().getInstantDetails(hoodieInstant.get()).get(), HoodieReplaceCommitMetadata.class);
return Option.of(replaceCommitMetadata);
case HoodieTimeline.DELTA_COMMIT_ACTION:
case HoodieTimeline.COMMIT_ACTION:
HoodieCommitMetadata commitMetadata = HoodieCommitMetadata
.fromBytes(metaClient.getActiveTimeline().getInstantDetails(hoodieInstant.get()).get(), HoodieCommitMetadata.class);
return Option.of(commitMetadata);
default:
throw new IllegalArgumentException("Unknown instant action" + hoodieInstant.get().getAction());
}
} else {
return Option.empty();
}
} catch (IOException io) {
throw new HoodieIOException("Unable to read metadata for instant " + hoodieInstant.get(), io);
}
}
private static Map<String, List<String>> getPartitionFiles(HoodieCommitMetadata commitMetadata) {
Map<String, List<String>> partitionFilesToDelete = new HashMap<>();
Map<String, List<HoodieWriteStat>> partitionToWriteStats = commitMetadata.getPartitionToWriteStats();
for (Map.Entry<String, List<HoodieWriteStat>> entry : partitionToWriteStats.entrySet()) {
partitionFilesToDelete.put(entry.getKey(), new ArrayList<>());
entry.getValue().forEach(writeStat -> partitionFilesToDelete.get(entry.getKey()).add(writeStat.getFileId()));
}
return partitionFilesToDelete;
}
private static HoodieTestTableState getTestTableStateWithPartitionFileInfo(WriteOperationType operationType, HoodieTableType tableType, String commitTime,
List<String> partitions, int filesPerPartition) {
for (String partition : partitions) {
Stream<Integer> fileLengths = IntStream.range(0, filesPerPartition).map(i -> 100 + RANDOM.nextInt(500)).boxed();
if (MERGE_ON_READ.equals(tableType) && UPSERT.equals(operationType)) {
List<Pair<Integer, Integer>> fileVersionAndLength = fileLengths.map(len -> Pair.of(0, len)).collect(Collectors.toList());
testTableState = testTableState.createTestTableStateForBaseAndLogFiles(commitTime, partition, fileVersionAndLength);
} else {
testTableState = testTableState.createTestTableStateForBaseFilesOnly(commitTime, partition, fileLengths.collect(Collectors.toList()));
}
}
return testTableState;
}
private static List<HoodieWriteStat> generateHoodieWriteStatForPartition(Map<String, List<Pair<String, Integer>>> partitionToFileIdMap,
String commitTime, boolean bootstrap) {
List<HoodieWriteStat> writeStats = new ArrayList<>();
for (Map.Entry<String, List<Pair<String, Integer>>> entry : partitionToFileIdMap.entrySet()) {
String partition = entry.getKey();
for (Pair<String, Integer> fileIdInfo : entry.getValue()) {
HoodieWriteStat writeStat = new HoodieWriteStat();
String fileName = bootstrap ? fileIdInfo.getKey() :
FileCreateUtils.baseFileName(commitTime, fileIdInfo.getKey());
writeStat.setFileId(fileName);
writeStat.setPartitionPath(partition);
writeStat.setPath(partition + "/" + fileName);
writeStat.setTotalWriteBytes(fileIdInfo.getValue());
writeStats.add(writeStat);
}
}
return writeStats;
}
/**
* Returns the write stats for log files in the partition. Since log file has version associated with it, the {@param partitionToFileIdMap}
* contains list of Pair<String, Integer[]> where the Integer[] array has both file version and file size.
*/
private static List<HoodieWriteStat> generateHoodieWriteStatForPartitionLogFiles(Map<String, List<Pair<String, Integer[]>>> partitionToFileIdMap, String commitTime, boolean bootstrap) {
List<HoodieWriteStat> writeStats = new ArrayList<>();
if (partitionToFileIdMap == null) {
return writeStats;
}
for (Map.Entry<String, List<Pair<String, Integer[]>>> entry : partitionToFileIdMap.entrySet()) {
String partition = entry.getKey();
for (Pair<String, Integer[]> fileIdInfo : entry.getValue()) {
HoodieWriteStat writeStat = new HoodieWriteStat();
String fileName = bootstrap ? fileIdInfo.getKey() :
FileCreateUtils.logFileName(commitTime, fileIdInfo.getKey(), fileIdInfo.getValue()[0]);
writeStat.setFileId(fileName);
writeStat.setPartitionPath(partition);
writeStat.setPath(partition + "/" + fileName);
writeStat.setTotalWriteBytes(fileIdInfo.getValue()[1]);
writeStats.add(writeStat);
}
}
return writeStats;
}
public static class HoodieTestTableException extends RuntimeException {
public HoodieTestTableException(Throwable t) {
super(t);
}
}
static class HoodieTestTableState {
/**
* Map<commitTime, Map<partitionPath, List<filesToDelete>>>
* Used in building CLEAN metadata.
*/
Map<String, Map<String, List<String>>> commitsToPartitionToFileIdForCleaner = new HashMap<>();
/**
* Map<commitTime, Map<partitionPath, List<Pair<fileName, fileLength>>>>
* Used to build commit metadata for base files for several write operations.
*/
Map<String, Map<String, List<Pair<String, Integer>>>> commitsToPartitionToBaseFileInfoStats = new HashMap<>();
/**
* Map<commitTime, Map<partitionPath, List<Pair<fileName, [fileVersion, fileLength]>>>>
* Used to build commit metadata for log files for several write operations.
*/
Map<String, Map<String, List<Pair<String, Integer[]>>>> commitsToPartitionToLogFileInfoStats = new HashMap<>();
HoodieTestTableState() {
}
static HoodieTestTableState of() {
return new HoodieTestTableState();
}
HoodieTestTableState createTestTableStateForCleaner(String commitTime, String partitionPath, List<String> filesToClean) {
if (!commitsToPartitionToFileIdForCleaner.containsKey(commitTime)) {
commitsToPartitionToFileIdForCleaner.put(commitTime, new HashMap<>());
}
if (!this.commitsToPartitionToFileIdForCleaner.get(commitTime).containsKey(partitionPath)) {
this.commitsToPartitionToFileIdForCleaner.get(commitTime).put(partitionPath, new ArrayList<>());
}
this.commitsToPartitionToFileIdForCleaner.get(commitTime).get(partitionPath).addAll(filesToClean);
return this;
}
Map<String, List<String>> getPartitionToFileIdMapForCleaner(String commitTime) {
return this.commitsToPartitionToFileIdForCleaner.get(commitTime);
}
HoodieTestTableState createTestTableStateForBaseFilesOnly(String commitTime, String partitionPath, List<Integer> lengths) {
if (!commitsToPartitionToBaseFileInfoStats.containsKey(commitTime)) {
commitsToPartitionToBaseFileInfoStats.put(commitTime, new HashMap<>());
}
if (!this.commitsToPartitionToBaseFileInfoStats.get(commitTime).containsKey(partitionPath)) {
this.commitsToPartitionToBaseFileInfoStats.get(commitTime).put(partitionPath, new ArrayList<>());
}
List<Pair<String, Integer>> fileInfos = new ArrayList<>();
for (int length : lengths) {
fileInfos.add(Pair.of(UUID.randomUUID().toString(), length));
}
this.commitsToPartitionToBaseFileInfoStats.get(commitTime).get(partitionPath).addAll(fileInfos);
return this;
}
HoodieTestTableState createTestTableStateForBaseAndLogFiles(String commitTime, String partitionPath, List<Pair<Integer, Integer>> versionsAndLengths) {
if (!commitsToPartitionToBaseFileInfoStats.containsKey(commitTime)) {
createTestTableStateForBaseFilesOnly(commitTime, partitionPath, versionsAndLengths.stream().map(Pair::getRight).collect(Collectors.toList()));
}
if (!this.commitsToPartitionToBaseFileInfoStats.get(commitTime).containsKey(partitionPath)) {
createTestTableStateForBaseFilesOnly(commitTime, partitionPath, versionsAndLengths.stream().map(Pair::getRight).collect(Collectors.toList()));
}
if (!commitsToPartitionToLogFileInfoStats.containsKey(commitTime)) {
commitsToPartitionToLogFileInfoStats.put(commitTime, new HashMap<>());
}
if (!this.commitsToPartitionToLogFileInfoStats.get(commitTime).containsKey(partitionPath)) {
this.commitsToPartitionToLogFileInfoStats.get(commitTime).put(partitionPath, new ArrayList<>());
}
List<Pair<String, Integer[]>> fileInfos = new ArrayList<>();
for (int i = 0; i < versionsAndLengths.size(); i++) {
Pair<Integer, Integer> versionAndLength = versionsAndLengths.get(i);
String fileId = FSUtils.getFileId(commitsToPartitionToBaseFileInfoStats.get(commitTime).get(partitionPath).get(i).getLeft());
fileInfos.add(Pair.of(fileId, new Integer[] {versionAndLength.getLeft(), versionAndLength.getRight()}));
}
this.commitsToPartitionToLogFileInfoStats.get(commitTime).get(partitionPath).addAll(fileInfos);
return this;
}
Map<String, List<Pair<String, Integer>>> getPartitionToBaseFileInfoMap(String commitTime) {
return this.commitsToPartitionToBaseFileInfoStats.get(commitTime);
}
Map<String, List<Pair<String, Integer[]>>> getPartitionToLogFileInfoMap(String commitTime) {
return this.commitsToPartitionToLogFileInfoStats.get(commitTime);
}
}
}