[HUDI-4291] Fix flaky TestCleanPlanExecutor#testKeepLatestFileVersions (#5930)
This commit is contained in:
@@ -47,7 +47,6 @@ import org.junit.jupiter.api.Test;
|
||||
import org.junit.jupiter.params.ParameterizedTest;
|
||||
import org.junit.jupiter.params.provider.Arguments;
|
||||
import org.junit.jupiter.params.provider.MethodSource;
|
||||
import org.junit.jupiter.params.provider.ValueSource;
|
||||
|
||||
import java.nio.charset.StandardCharsets;
|
||||
import java.nio.file.Files;
|
||||
@@ -86,9 +85,7 @@ public class TestCleanPlanExecutor extends TestCleaner {
|
||||
.withCleanerPolicy(HoodieCleaningPolicy.KEEP_LATEST_COMMITS).retainCommits(2)
|
||||
.withCleaningTriggerStrategy("invalid_strategy").build())
|
||||
.build();
|
||||
Exception e = assertThrows(IllegalArgumentException.class, () -> {
|
||||
runCleaner(config, true);
|
||||
}, "should fail when invalid trigger strategy is provided!");
|
||||
Exception e = assertThrows(IllegalArgumentException.class, () -> runCleaner(config, true), "should fail when invalid trigger strategy is provided!");
|
||||
assertTrue(e.getMessage().contains("No enum constant org.apache.hudi.table.action.clean.CleaningTriggerStrategy.invalid_strategy"));
|
||||
}
|
||||
|
||||
@@ -272,36 +269,30 @@ public class TestCleanPlanExecutor extends TestCleaner {
|
||||
/**
|
||||
* Test Hudi COW Table Cleaner - Keep the latest file versions policy.
|
||||
*/
|
||||
@ParameterizedTest
|
||||
@ValueSource(booleans = {false, true})
|
||||
public void testKeepLatestFileVersions(Boolean enableBootstrapSourceClean) throws Exception {
|
||||
@Test
|
||||
public void testKeepLatestFileVersions() throws Exception {
|
||||
HoodieWriteConfig config =
|
||||
HoodieWriteConfig.newBuilder().withPath(basePath)
|
||||
.withMetadataConfig(HoodieMetadataConfig.newBuilder().withAssumeDatePartitioning(true).build())
|
||||
.withCompactionConfig(HoodieCompactionConfig.newBuilder()
|
||||
.withCleanBootstrapBaseFileEnabled(enableBootstrapSourceClean)
|
||||
.withCleanerPolicy(HoodieCleaningPolicy.KEEP_LATEST_FILE_VERSIONS).retainFileVersions(1).build())
|
||||
.build();
|
||||
HoodieWriteConfig.newBuilder().withPath(basePath)
|
||||
.withMetadataConfig(HoodieMetadataConfig.newBuilder().withAssumeDatePartitioning(true).build())
|
||||
.withCompactionConfig(HoodieCompactionConfig.newBuilder()
|
||||
.withCleanerPolicy(HoodieCleaningPolicy.KEEP_LATEST_FILE_VERSIONS).retainFileVersions(1).build())
|
||||
.build();
|
||||
|
||||
HoodieTableMetadataWriter metadataWriter = SparkHoodieBackedTableMetadataWriter.create(hadoopConf, config, context);
|
||||
HoodieTestTable testTable = HoodieMetadataTestTable.of(metaClient, metadataWriter);
|
||||
|
||||
final String p0 = "2020/01/01";
|
||||
final String p1 = "2020/01/02";
|
||||
final Map<String, List<BootstrapFileMapping>> bootstrapMapping = enableBootstrapSourceClean
|
||||
? generateBootstrapIndexAndSourceData(p0, p1) : null;
|
||||
|
||||
// make 1 commit, with 1 file per partition
|
||||
final String file1P0C0 = enableBootstrapSourceClean ? bootstrapMapping.get(p0).get(0).getFileId()
|
||||
: UUID.randomUUID().toString();
|
||||
final String file1P1C0 = enableBootstrapSourceClean ? bootstrapMapping.get(p1).get(0).getFileId()
|
||||
: UUID.randomUUID().toString();
|
||||
final String file1P0C0 = UUID.randomUUID().toString();
|
||||
final String file1P1C0 = UUID.randomUUID().toString();
|
||||
|
||||
Map<String, List<Pair<String, Integer>>> c1PartitionToFilesNameLengthMap = new HashMap<>();
|
||||
c1PartitionToFilesNameLengthMap.put(p0, Collections.singletonList(Pair.of(file1P0C0, 100)));
|
||||
c1PartitionToFilesNameLengthMap.put(p1, Collections.singletonList(Pair.of(file1P1C0, 200)));
|
||||
testTable.doWriteOperation("00000000000001", WriteOperationType.INSERT, Arrays.asList(p0, p1),
|
||||
c1PartitionToFilesNameLengthMap, false, false);
|
||||
c1PartitionToFilesNameLengthMap, false, false);
|
||||
|
||||
List<HoodieCleanStat> hoodieCleanStatsOne = runCleanerWithInstantFormat(config, true);
|
||||
assertEquals(0, hoodieCleanStatsOne.size(), "Must not clean any files");
|
||||
@@ -315,58 +306,135 @@ public class TestCleanPlanExecutor extends TestCleaner {
|
||||
c2PartitionToFilesNameLengthMap.put(p0, Arrays.asList(Pair.of(file1P0C0, 101), Pair.of(file2P0C1, 100)));
|
||||
c2PartitionToFilesNameLengthMap.put(p1, Arrays.asList(Pair.of(file1P1C0, 201), Pair.of(file2P1C1, 200)));
|
||||
testTable.doWriteOperation("00000000000002", WriteOperationType.UPSERT, Collections.emptyList(),
|
||||
c2PartitionToFilesNameLengthMap, false, false);
|
||||
c2PartitionToFilesNameLengthMap, false, false);
|
||||
|
||||
// enableBootstrapSourceClean would delete the bootstrap base file at the same time
|
||||
List<HoodieCleanStat> hoodieCleanStatsTwo = runCleaner(config, 1, true);
|
||||
HoodieCleanStat cleanStat = getCleanStat(hoodieCleanStatsTwo, p0);
|
||||
assertEquals(enableBootstrapSourceClean ? 2 : 1, cleanStat.getSuccessDeleteFiles().size()
|
||||
+ (cleanStat.getSuccessDeleteBootstrapBaseFiles() == null ? 0
|
||||
: cleanStat.getSuccessDeleteBootstrapBaseFiles().size()), "Must clean at least 1 file");
|
||||
|
||||
if (enableBootstrapSourceClean) {
|
||||
HoodieFileStatus fstatus =
|
||||
bootstrapMapping.get(p0).get(0).getBootstrapFileStatus();
|
||||
// This ensures full path is recorded in metadata.
|
||||
assertTrue(cleanStat.getSuccessDeleteBootstrapBaseFiles().contains(fstatus.getPath().getUri()),
|
||||
"Successful delete files were " + cleanStat.getSuccessDeleteBootstrapBaseFiles()
|
||||
+ " but did not contain " + fstatus.getPath().getUri());
|
||||
assertFalse(Files.exists(Paths.get(bootstrapMapping.get(
|
||||
p0).get(0).getBootstrapFileStatus().getPath().getUri())));
|
||||
}
|
||||
assertEquals(1, cleanStat.getSuccessDeleteFiles().size()
|
||||
+ (cleanStat.getSuccessDeleteBootstrapBaseFiles() == null ? 0
|
||||
: cleanStat.getSuccessDeleteBootstrapBaseFiles().size()), "Must clean at least 1 file");
|
||||
|
||||
cleanStat = getCleanStat(hoodieCleanStatsTwo, p1);
|
||||
assertTrue(testTable.baseFileExists(p0, "00000000000002", file2P0C1));
|
||||
assertTrue(testTable.baseFileExists(p1, "00000000000002", file2P1C1));
|
||||
assertFalse(testTable.baseFileExists(p0, "00000000000001", file1P0C0));
|
||||
assertFalse(testTable.baseFileExists(p1, "00000000000001", file1P1C0));
|
||||
assertEquals(enableBootstrapSourceClean ? 2 : 1, cleanStat.getSuccessDeleteFiles().size()
|
||||
+ (cleanStat.getSuccessDeleteBootstrapBaseFiles() == null ? 0
|
||||
: cleanStat.getSuccessDeleteBootstrapBaseFiles().size()), "Must clean at least 1 file");
|
||||
|
||||
if (enableBootstrapSourceClean) {
|
||||
HoodieFileStatus fstatus =
|
||||
bootstrapMapping.get(p1).get(0).getBootstrapFileStatus();
|
||||
// This ensures full path is recorded in metadata.
|
||||
assertTrue(cleanStat.getSuccessDeleteBootstrapBaseFiles().contains(fstatus.getPath().getUri()),
|
||||
"Successful delete files were " + cleanStat.getSuccessDeleteBootstrapBaseFiles()
|
||||
+ " but did not contain " + fstatus.getPath().getUri());
|
||||
assertFalse(Files.exists(Paths.get(bootstrapMapping.get(
|
||||
p1).get(0).getBootstrapFileStatus().getPath().getUri())));
|
||||
}
|
||||
assertEquals(1, cleanStat.getSuccessDeleteFiles().size()
|
||||
+ (cleanStat.getSuccessDeleteBootstrapBaseFiles() == null ? 0
|
||||
: cleanStat.getSuccessDeleteBootstrapBaseFiles().size()), "Must clean at least 1 file");
|
||||
|
||||
// make next commit, with 2 updates to existing files, and 1 insert
|
||||
final String file3P0C2 = UUID.randomUUID().toString();
|
||||
Map<String, List<Pair<String, Integer>>> c3PartitionToFilesNameLengthMap = new HashMap<>();
|
||||
c3PartitionToFilesNameLengthMap.put(p0, Arrays.asList(Pair.of(file1P0C0, 102), Pair.of(file2P0C1, 101),
|
||||
Pair.of(file3P0C2, 100)));
|
||||
Pair.of(file3P0C2, 100)));
|
||||
testTable.doWriteOperation("00000000000003", WriteOperationType.UPSERT, Collections.emptyList(),
|
||||
c3PartitionToFilesNameLengthMap, false, false);
|
||||
c3PartitionToFilesNameLengthMap, false, false);
|
||||
|
||||
List<HoodieCleanStat> hoodieCleanStatsThree = runCleaner(config, 3, true);
|
||||
assertEquals(2,
|
||||
getCleanStat(hoodieCleanStatsThree, p0)
|
||||
.getSuccessDeleteFiles().size(), "Must clean two files");
|
||||
getCleanStat(hoodieCleanStatsThree, p0)
|
||||
.getSuccessDeleteFiles().size(), "Must clean two files");
|
||||
assertFalse(testTable.baseFileExists(p0, "00000000000002", file1P0C0));
|
||||
assertFalse(testTable.baseFileExists(p0, "00000000000002", file2P0C1));
|
||||
assertTrue(testTable.baseFileExists(p0, "00000000000003", file3P0C2));
|
||||
|
||||
// No cleaning on partially written file, with no commit.
|
||||
testTable.forCommit("00000000000004").withBaseFilesInPartition(p0, file3P0C2);
|
||||
|
||||
List<HoodieCleanStat> hoodieCleanStatsFour = runCleaner(config);
|
||||
assertEquals(0, hoodieCleanStatsFour.size(), "Must not clean any files");
|
||||
assertTrue(testTable.baseFileExists(p0, "00000000000003", file3P0C2));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testKeepLatestFileVersionsWithBootstrapFileClean() throws Exception {
|
||||
HoodieWriteConfig config =
|
||||
HoodieWriteConfig.newBuilder().withPath(basePath)
|
||||
.withMetadataConfig(HoodieMetadataConfig.newBuilder().withAssumeDatePartitioning(true).build())
|
||||
.withCompactionConfig(HoodieCompactionConfig.newBuilder()
|
||||
.withCleanBootstrapBaseFileEnabled(true)
|
||||
.withCleanerParallelism(1)
|
||||
.withCleanerPolicy(HoodieCleaningPolicy.KEEP_LATEST_FILE_VERSIONS).retainFileVersions(1).build())
|
||||
.build();
|
||||
|
||||
HoodieTableMetadataWriter metadataWriter = SparkHoodieBackedTableMetadataWriter.create(hadoopConf, config, context);
|
||||
HoodieTestTable testTable = HoodieMetadataTestTable.of(metaClient, metadataWriter);
|
||||
|
||||
final String p0 = "2020/01/01";
|
||||
final String p1 = "2020/01/02";
|
||||
final Map<String, List<BootstrapFileMapping>> bootstrapMapping = generateBootstrapIndexAndSourceData(p0, p1);
|
||||
|
||||
// make 1 commit, with 1 file per partition
|
||||
final String file1P0C0 = bootstrapMapping.get(p0).get(0).getFileId();
|
||||
final String file1P1C0 = bootstrapMapping.get(p1).get(0).getFileId();
|
||||
|
||||
Map<String, List<Pair<String, Integer>>> c1PartitionToFilesNameLengthMap = new HashMap<>();
|
||||
c1PartitionToFilesNameLengthMap.put(p0, Collections.singletonList(Pair.of(file1P0C0, 100)));
|
||||
c1PartitionToFilesNameLengthMap.put(p1, Collections.singletonList(Pair.of(file1P1C0, 200)));
|
||||
testTable.doWriteOperation("00000000000001", WriteOperationType.INSERT, Arrays.asList(p0, p1),
|
||||
c1PartitionToFilesNameLengthMap, false, false);
|
||||
|
||||
List<HoodieCleanStat> hoodieCleanStatsOne = runCleanerWithInstantFormat(config, true);
|
||||
assertEquals(0, hoodieCleanStatsOne.size(), "Must not clean any files");
|
||||
assertTrue(testTable.baseFileExists(p0, "00000000000001", file1P0C0));
|
||||
assertTrue(testTable.baseFileExists(p1, "00000000000001", file1P1C0));
|
||||
|
||||
// make next commit, with 1 insert & 1 update per partition
|
||||
final String file2P0C1 = UUID.randomUUID().toString();
|
||||
final String file2P1C1 = UUID.randomUUID().toString();
|
||||
Map<String, List<Pair<String, Integer>>> c2PartitionToFilesNameLengthMap = new HashMap<>();
|
||||
c2PartitionToFilesNameLengthMap.put(p0, Arrays.asList(Pair.of(file1P0C0, 101), Pair.of(file2P0C1, 100)));
|
||||
c2PartitionToFilesNameLengthMap.put(p1, Arrays.asList(Pair.of(file1P1C0, 201), Pair.of(file2P1C1, 200)));
|
||||
testTable.doWriteOperation("00000000000002", WriteOperationType.UPSERT, Collections.emptyList(),
|
||||
c2PartitionToFilesNameLengthMap, false, false);
|
||||
|
||||
// should delete the bootstrap base file at the same time
|
||||
List<HoodieCleanStat> hoodieCleanStatsTwo = runCleaner(config, 1, true);
|
||||
HoodieCleanStat cleanStat = getCleanStat(hoodieCleanStatsTwo, p0);
|
||||
assertEquals(2, cleanStat.getSuccessDeleteFiles().size()
|
||||
+ (cleanStat.getSuccessDeleteBootstrapBaseFiles() == null ? 0
|
||||
: cleanStat.getSuccessDeleteBootstrapBaseFiles().size()), "Must clean at least 1 file");
|
||||
|
||||
HoodieFileStatus fstatus =
|
||||
bootstrapMapping.get(p0).get(0).getBootstrapFileStatus();
|
||||
// This ensures full path is recorded in metadata.
|
||||
assertTrue(cleanStat.getSuccessDeleteBootstrapBaseFiles().contains(fstatus.getPath().getUri()),
|
||||
"Successful delete files were " + cleanStat.getSuccessDeleteBootstrapBaseFiles()
|
||||
+ " but did not contain " + fstatus.getPath().getUri());
|
||||
assertFalse(Files.exists(Paths.get(bootstrapMapping.get(
|
||||
p0).get(0).getBootstrapFileStatus().getPath().getUri())));
|
||||
|
||||
cleanStat = getCleanStat(hoodieCleanStatsTwo, p1);
|
||||
assertTrue(testTable.baseFileExists(p0, "00000000000002", file2P0C1));
|
||||
assertTrue(testTable.baseFileExists(p1, "00000000000002", file2P1C1));
|
||||
assertFalse(testTable.baseFileExists(p0, "00000000000001", file1P0C0));
|
||||
assertFalse(testTable.baseFileExists(p1, "00000000000001", file1P1C0));
|
||||
assertEquals(2, cleanStat.getSuccessDeleteFiles().size()
|
||||
+ (cleanStat.getSuccessDeleteBootstrapBaseFiles() == null ? 0
|
||||
: cleanStat.getSuccessDeleteBootstrapBaseFiles().size()), "Must clean at least 1 file");
|
||||
|
||||
fstatus = bootstrapMapping.get(p1).get(0).getBootstrapFileStatus();
|
||||
// This ensures full path is recorded in metadata.
|
||||
assertTrue(cleanStat.getSuccessDeleteBootstrapBaseFiles().contains(fstatus.getPath().getUri()),
|
||||
"Successful delete files were " + cleanStat.getSuccessDeleteBootstrapBaseFiles()
|
||||
+ " but did not contain " + fstatus.getPath().getUri());
|
||||
assertFalse(Files.exists(Paths.get(bootstrapMapping.get(
|
||||
p1).get(0).getBootstrapFileStatus().getPath().getUri())));
|
||||
|
||||
// make next commit, with 2 updates to existing files, and 1 insert
|
||||
final String file3P0C2 = UUID.randomUUID().toString();
|
||||
Map<String, List<Pair<String, Integer>>> c3PartitionToFilesNameLengthMap = new HashMap<>();
|
||||
c3PartitionToFilesNameLengthMap.put(p0, Arrays.asList(Pair.of(file1P0C0, 102), Pair.of(file2P0C1, 101),
|
||||
Pair.of(file3P0C2, 100)));
|
||||
testTable.doWriteOperation("00000000000003", WriteOperationType.UPSERT, Collections.emptyList(),
|
||||
c3PartitionToFilesNameLengthMap, false, false);
|
||||
|
||||
List<HoodieCleanStat> hoodieCleanStatsThree = runCleaner(config, 3, true);
|
||||
assertEquals(2,
|
||||
getCleanStat(hoodieCleanStatsThree, p0)
|
||||
.getSuccessDeleteFiles().size(), "Must clean two files");
|
||||
assertFalse(testTable.baseFileExists(p0, "00000000000002", file1P0C0));
|
||||
assertFalse(testTable.baseFileExists(p0, "00000000000002", file2P0C1));
|
||||
assertTrue(testTable.baseFileExists(p0, "00000000000003", file3P0C2));
|
||||
|
||||
Reference in New Issue
Block a user