[HUDI-2472] Fix few Cleaner tests with metadata table enabled (#3825)
This commit is contained in:
committed by
GitHub
parent
e355ab52db
commit
b68c5a68f9
@@ -144,8 +144,13 @@ public class FileCreateUtils {
|
||||
createMetaFile(basePath, instantTime, HoodieTimeline.COMMIT_EXTENSION);
|
||||
}
|
||||
|
||||
public static void createCommit(String basePath, String instantTime, HoodieCommitMetadata metadata) throws IOException {
|
||||
createMetaFile(basePath, instantTime, HoodieTimeline.COMMIT_EXTENSION, metadata.toJsonString().getBytes(StandardCharsets.UTF_8));
|
||||
public static void createCommit(String basePath, String instantTime, Option<HoodieCommitMetadata> metadata) throws IOException {
|
||||
if (metadata.isPresent()) {
|
||||
createMetaFile(basePath, instantTime, HoodieTimeline.COMMIT_EXTENSION,
|
||||
metadata.get().toJsonString().getBytes(StandardCharsets.UTF_8));
|
||||
} else {
|
||||
createMetaFile(basePath, instantTime, HoodieTimeline.COMMIT_EXTENSION);
|
||||
}
|
||||
}
|
||||
|
||||
public static void createCommit(String basePath, String instantTime, FileSystem fs) throws IOException {
|
||||
|
||||
@@ -188,14 +188,19 @@ public class HoodieTestTable {
|
||||
}
|
||||
|
||||
public HoodieTestTable addCommit(String instantTime) throws Exception {
|
||||
return addCommit(instantTime, Option.empty());
|
||||
}
|
||||
|
||||
public HoodieTestTable addCommit(String instantTime, Option<HoodieCommitMetadata> metadata) throws Exception {
|
||||
createRequestedCommit(basePath, instantTime);
|
||||
createInflightCommit(basePath, instantTime);
|
||||
createCommit(basePath, instantTime);
|
||||
createCommit(basePath, instantTime, metadata);
|
||||
currentInstantTime = instantTime;
|
||||
return this;
|
||||
}
|
||||
|
||||
public HoodieCommitMetadata createCommitMetadata(WriteOperationType operationType, String commitTime, HoodieTestTableState testTableState) {
|
||||
public HoodieCommitMetadata createCommitMetadata(WriteOperationType operationType, String commitTime,
|
||||
HoodieTestTableState testTableState) {
|
||||
String actionType = getCommitActionType(operationType, metaClient.getTableType());
|
||||
return createCommitMetadata(operationType, commitTime, Collections.emptyMap(), testTableState, false, actionType);
|
||||
}
|
||||
@@ -203,7 +208,8 @@ public class HoodieTestTable {
|
||||
public HoodieCommitMetadata createCommitMetadata(WriteOperationType operationType, String commitTime,
|
||||
HoodieTestTableState testTableState, boolean bootstrap) {
|
||||
String actionType = getCommitActionType(operationType, metaClient.getTableType());
|
||||
return createCommitMetadata(operationType, commitTime, Collections.emptyMap(), testTableState, bootstrap, actionType);
|
||||
return createCommitMetadata(operationType, commitTime, Collections.emptyMap(), testTableState, bootstrap,
|
||||
actionType);
|
||||
}
|
||||
|
||||
public HoodieCommitMetadata createCommitMetadata(WriteOperationType operationType, String commitTime,
|
||||
@@ -217,17 +223,9 @@ public class HoodieTestTable {
|
||||
return buildMetadata(writeStats, partitionToReplaceFileIds, Option.of(extraMetadata), operationType, EMPTY_STRING, action);
|
||||
}
|
||||
|
||||
public HoodieTestTable addCommit(String instantTime, HoodieCommitMetadata metadata) throws Exception {
|
||||
createRequestedCommit(basePath, instantTime);
|
||||
createInflightCommit(basePath, instantTime);
|
||||
createCommit(basePath, instantTime, metadata);
|
||||
currentInstantTime = instantTime;
|
||||
return this;
|
||||
}
|
||||
|
||||
public HoodieTestTable moveInflightCommitToComplete(String instantTime, HoodieCommitMetadata metadata) throws IOException {
|
||||
if (metaClient.getTableType() == HoodieTableType.COPY_ON_WRITE) {
|
||||
createCommit(basePath, instantTime, metadata);
|
||||
createCommit(basePath, instantTime, Option.of(metadata));
|
||||
} else {
|
||||
createDeltaCommit(basePath, instantTime, metadata);
|
||||
}
|
||||
@@ -425,11 +423,11 @@ public class HoodieTestTable {
|
||||
public HoodieTestTable addCompaction(String instantTime, HoodieCommitMetadata commitMetadata) throws Exception {
|
||||
createRequestedCompaction(basePath, instantTime);
|
||||
createInflightCompaction(basePath, instantTime);
|
||||
return addCommit(instantTime, commitMetadata);
|
||||
return addCommit(instantTime, Option.of(commitMetadata));
|
||||
}
|
||||
|
||||
public HoodieTestTable moveInflightCompactionToComplete(String instantTime, HoodieCommitMetadata metadata) throws IOException {
|
||||
createCommit(basePath, instantTime, metadata);
|
||||
createCommit(basePath, instantTime, Option.of(metadata));
|
||||
inflightCommits.remove(instantTime);
|
||||
currentInstantTime = instantTime;
|
||||
return this;
|
||||
@@ -834,16 +832,45 @@ public class HoodieTestTable {
|
||||
|
||||
public HoodieCommitMetadata doWriteOperation(String commitTime, WriteOperationType operationType,
|
||||
List<String> partitions, int filesPerPartition, boolean bootstrap) throws Exception {
|
||||
return doWriteOperation(commitTime, operationType, Collections.emptyList(), partitions, filesPerPartition, bootstrap, false);
|
||||
return doWriteOperation(commitTime, operationType, Collections.emptyList(), partitions, filesPerPartition,
|
||||
bootstrap, false);
|
||||
}
|
||||
|
||||
public HoodieCommitMetadata doWriteOperation(String commitTime, WriteOperationType operationType,
|
||||
List<String> newPartitionsToAdd, List<String> partitions,
|
||||
int filesPerPartition, boolean bootstrap, boolean createInflightCommit) throws Exception {
|
||||
int filesPerPartition, boolean bootstrap,
|
||||
boolean createInflightCommit) throws Exception {
|
||||
if (partitions.isEmpty()) {
|
||||
partitions = Collections.singletonList(EMPTY_STRING);
|
||||
}
|
||||
HoodieTestTableState testTableState = getTestTableStateWithPartitionFileInfo(operationType, metaClient.getTableType(), commitTime, partitions, filesPerPartition);
|
||||
|
||||
Map<String, List<Pair<String, Integer>>> partitionToFilesNameLengthMap = getPartitionFiles(partitions,
|
||||
filesPerPartition);
|
||||
return doWriteOperation(commitTime, operationType, newPartitionsToAdd, partitionToFilesNameLengthMap, bootstrap,
|
||||
createInflightCommit);
|
||||
}
|
||||
|
||||
/**
|
||||
* Add commits to the requested partitions.
|
||||
*
|
||||
* @param commitTime - Commit time for the operation
|
||||
* @param operationType - Operation type
|
||||
* @param newPartitionsToAdd - New partitions to add for the operation
|
||||
* @param partitionToFilesNameLengthMap - Map of partition names to its list of files name and length pair
|
||||
* @param bootstrap - Whether bootstrapping needed for the operation
|
||||
* @param createInflightCommit - Whether in flight commit needed for the operation
|
||||
* @return Commit metadata for the commit operation performed.
|
||||
* @throws Exception
|
||||
*/
|
||||
public HoodieCommitMetadata doWriteOperation(String commitTime, WriteOperationType operationType,
|
||||
List<String> newPartitionsToAdd,
|
||||
Map<String, List<Pair<String, Integer>>> partitionToFilesNameLengthMap,
|
||||
boolean bootstrap, boolean createInflightCommit) throws Exception {
|
||||
if (partitionToFilesNameLengthMap.isEmpty()) {
|
||||
partitionToFilesNameLengthMap = Collections.singletonMap(EMPTY_STRING, Collections.EMPTY_LIST);
|
||||
}
|
||||
HoodieTestTableState testTableState = getTestTableStateWithPartitionFileInfo(operationType,
|
||||
metaClient.getTableType(), commitTime, partitionToFilesNameLengthMap);
|
||||
HoodieCommitMetadata commitMetadata = createCommitMetadata(operationType, commitTime, testTableState, bootstrap);
|
||||
for (String str : newPartitionsToAdd) {
|
||||
this.withPartitionMetaFiles(str);
|
||||
@@ -856,12 +883,13 @@ public class HoodieTestTable {
|
||||
}
|
||||
} else {
|
||||
if (metaClient.getTableType() == HoodieTableType.COPY_ON_WRITE) {
|
||||
this.addCommit(commitTime, commitMetadata);
|
||||
this.addCommit(commitTime, Option.of(commitMetadata));
|
||||
} else {
|
||||
this.addDeltaCommit(commitTime, commitMetadata);
|
||||
}
|
||||
}
|
||||
for (String partition : partitions) {
|
||||
for (Map.Entry<String, List<Pair<String, Integer>>> entry : partitionToFilesNameLengthMap.entrySet()) {
|
||||
String partition = entry.getKey();
|
||||
this.withBaseFilesInPartition(partition, testTableState.getPartitionToBaseFileInfoMap(commitTime).get(partition));
|
||||
if (MERGE_ON_READ.equals(metaClient.getTableType()) && UPSERT.equals(operationType)) {
|
||||
this.withLogFilesInPartition(partition, testTableState.getPartitionToLogFileInfoMap(commitTime).get(partition));
|
||||
@@ -911,15 +939,69 @@ public class HoodieTestTable {
|
||||
return partitionFilesToDelete;
|
||||
}
|
||||
|
||||
private static HoodieTestTableState getTestTableStateWithPartitionFileInfo(WriteOperationType operationType, HoodieTableType tableType, String commitTime,
|
||||
List<String> partitions, int filesPerPartition) {
|
||||
/**
|
||||
* Generate partition files names and length details.
|
||||
*
|
||||
* @param partitions - List of partition for which file details need to be generated
|
||||
* @param filesPerPartition - File count per partition
|
||||
* @return Map of partition to its collection of files name and length pair
|
||||
*/
|
||||
protected static Map<String, List<Pair<String, Integer>>> getPartitionFiles(List<String> partitions,
|
||||
int filesPerPartition) {
|
||||
Map<String, List<Pair<String, Integer>>> partitionToFilesNameLengthMap = new HashMap<>();
|
||||
for (String partition : partitions) {
|
||||
Stream<Integer> fileLengths = IntStream.range(0, filesPerPartition).map(i -> 100 + RANDOM.nextInt(500)).boxed();
|
||||
List<Pair<String, Integer>> fileNameAndLengthList =
|
||||
fileLengths.map(len -> Pair.of(UUID.randomUUID().toString(), len)).collect(Collectors.toList());
|
||||
partitionToFilesNameLengthMap.put(partition, fileNameAndLengthList);
|
||||
}
|
||||
return partitionToFilesNameLengthMap;
|
||||
}
|
||||
|
||||
/**
|
||||
* Get Test table state for the requested partitions and file count.
|
||||
*
|
||||
* @param operationType - Table write operation type
|
||||
* @param tableType - Hudi table type
|
||||
* @param commitTime - Write commit time
|
||||
* @param partitions - List of partition names
|
||||
* @param filesPerPartition - Total file count per partition
|
||||
* @return Test table state for the requested partitions and file count
|
||||
*/
|
||||
private static HoodieTestTableState getTestTableStateWithPartitionFileInfo(WriteOperationType operationType,
|
||||
HoodieTableType tableType,
|
||||
String commitTime,
|
||||
List<String> partitions,
|
||||
int filesPerPartition) {
|
||||
Map<String, List<Pair<String, Integer>>> partitionToFilesNameLengthMap = getPartitionFiles(partitions,
|
||||
filesPerPartition);
|
||||
return getTestTableStateWithPartitionFileInfo(operationType, tableType, commitTime, partitionToFilesNameLengthMap);
|
||||
}
|
||||
|
||||
/**
|
||||
* Get Test table state for the requested partitions and files.
|
||||
*
|
||||
* @param operationType - Table write operation type
|
||||
* @param tableType - Hudi table type
|
||||
* @param commitTime - Write commit time
|
||||
* @param partitionToFilesNameLengthMap - Map of partition names to its list of files and their lengths
|
||||
* @return Test tabke state for the requested partitions and files
|
||||
*/
|
||||
private static HoodieTestTableState getTestTableStateWithPartitionFileInfo(WriteOperationType operationType,
|
||||
HoodieTableType tableType,
|
||||
String commitTime,
|
||||
Map<String, List<Pair<String, Integer>>> partitionToFilesNameLengthMap) {
|
||||
for (Map.Entry<String, List<Pair<String, Integer>>> partitionEntry : partitionToFilesNameLengthMap.entrySet()) {
|
||||
String partitionName = partitionEntry.getKey();
|
||||
List<Pair<String, Integer>> fileNameAndLengthList = partitionEntry.getValue();
|
||||
if (MERGE_ON_READ.equals(tableType) && UPSERT.equals(operationType)) {
|
||||
List<Pair<Integer, Integer>> fileVersionAndLength = fileLengths.map(len -> Pair.of(0, len)).collect(Collectors.toList());
|
||||
testTableState = testTableState.createTestTableStateForBaseAndLogFiles(commitTime, partition, fileVersionAndLength);
|
||||
List<Pair<Integer, Integer>> fileVersionAndLength =
|
||||
fileNameAndLengthList.stream().map(nameLengthPair -> Pair.of(0, nameLengthPair.getRight())).collect(Collectors.toList());
|
||||
testTableState = testTableState.createTestTableStateForBaseAndLogFiles(commitTime, partitionName,
|
||||
fileVersionAndLength);
|
||||
} else {
|
||||
testTableState = testTableState.createTestTableStateForBaseFilesOnly(commitTime, partition, fileLengths.collect(Collectors.toList()));
|
||||
testTableState = testTableState.createTestTableStateForBaseFilesOnly(commitTime, partitionName,
|
||||
fileNameAndLengthList);
|
||||
}
|
||||
}
|
||||
return testTableState;
|
||||
@@ -1015,7 +1097,17 @@ public class HoodieTestTable {
|
||||
return this.commitsToPartitionToFileIdForCleaner.get(commitTime);
|
||||
}
|
||||
|
||||
HoodieTestTableState createTestTableStateForBaseFilesOnly(String commitTime, String partitionPath, List<Integer> lengths) {
|
||||
HoodieTestTableState createTestTableStateForBaseFileLengthsOnly(String commitTime, String partitionPath,
|
||||
List<Integer> lengths) {
|
||||
List<Pair<String, Integer>> fileNameLengthList = new ArrayList<>();
|
||||
for (int length : lengths) {
|
||||
fileNameLengthList.add(Pair.of(UUID.randomUUID().toString(), length));
|
||||
}
|
||||
return createTestTableStateForBaseFilesOnly(commitTime, partitionPath, fileNameLengthList);
|
||||
}
|
||||
|
||||
HoodieTestTableState createTestTableStateForBaseFilesOnly(String commitTime, String partitionPath,
|
||||
List<Pair<String, Integer>> fileNameAndLengthList) {
|
||||
if (!commitsToPartitionToBaseFileInfoStats.containsKey(commitTime)) {
|
||||
commitsToPartitionToBaseFileInfoStats.put(commitTime, new HashMap<>());
|
||||
}
|
||||
@@ -1023,20 +1115,19 @@ public class HoodieTestTable {
|
||||
this.commitsToPartitionToBaseFileInfoStats.get(commitTime).put(partitionPath, new ArrayList<>());
|
||||
}
|
||||
|
||||
List<Pair<String, Integer>> fileInfos = new ArrayList<>();
|
||||
for (int length : lengths) {
|
||||
fileInfos.add(Pair.of(UUID.randomUUID().toString(), length));
|
||||
}
|
||||
this.commitsToPartitionToBaseFileInfoStats.get(commitTime).get(partitionPath).addAll(fileInfos);
|
||||
this.commitsToPartitionToBaseFileInfoStats.get(commitTime).get(partitionPath).addAll(fileNameAndLengthList);
|
||||
return this;
|
||||
}
|
||||
|
||||
HoodieTestTableState createTestTableStateForBaseAndLogFiles(String commitTime, String partitionPath, List<Pair<Integer, Integer>> versionsAndLengths) {
|
||||
HoodieTestTableState createTestTableStateForBaseAndLogFiles(String commitTime, String partitionPath,
|
||||
List<Pair<Integer, Integer>> versionsAndLengths) {
|
||||
if (!commitsToPartitionToBaseFileInfoStats.containsKey(commitTime)) {
|
||||
createTestTableStateForBaseFilesOnly(commitTime, partitionPath, versionsAndLengths.stream().map(Pair::getRight).collect(Collectors.toList()));
|
||||
createTestTableStateForBaseFileLengthsOnly(commitTime, partitionPath,
|
||||
versionsAndLengths.stream().map(Pair::getRight).collect(Collectors.toList()));
|
||||
}
|
||||
if (!this.commitsToPartitionToBaseFileInfoStats.get(commitTime).containsKey(partitionPath)) {
|
||||
createTestTableStateForBaseFilesOnly(commitTime, partitionPath, versionsAndLengths.stream().map(Pair::getRight).collect(Collectors.toList()));
|
||||
createTestTableStateForBaseFileLengthsOnly(commitTime, partitionPath,
|
||||
versionsAndLengths.stream().map(Pair::getRight).collect(Collectors.toList()));
|
||||
}
|
||||
if (!commitsToPartitionToLogFileInfoStats.containsKey(commitTime)) {
|
||||
commitsToPartitionToLogFileInfoStats.put(commitTime, new HashMap<>());
|
||||
|
||||
Reference in New Issue
Block a user