[HUDI-1428] Clean old fileslice is invalid (#2292)
Co-authored-by: zhang wen <wen.zhang@dmall.com> Co-authored-by: zhang wen <steven@stevendeMac-mini.local>
This commit is contained in:
@@ -80,7 +80,7 @@ public class CleanPlanner<T extends HoodieRecordPayload, I, K, O> implements Ser
|
|||||||
public CleanPlanner(HoodieTable<T, I, K, O> hoodieTable, HoodieWriteConfig config) {
|
public CleanPlanner(HoodieTable<T, I, K, O> hoodieTable, HoodieWriteConfig config) {
|
||||||
this.hoodieTable = hoodieTable;
|
this.hoodieTable = hoodieTable;
|
||||||
this.fileSystemView = hoodieTable.getHoodieView();
|
this.fileSystemView = hoodieTable.getHoodieView();
|
||||||
this.commitTimeline = hoodieTable.getCompletedCommitTimeline();
|
this.commitTimeline = hoodieTable.getCompletedCommitsTimeline();
|
||||||
this.config = config;
|
this.config = config;
|
||||||
this.fgIdToPendingCompactionOperations =
|
this.fgIdToPendingCompactionOperations =
|
||||||
((SyncableFileSystemView) hoodieTable.getSliceView()).getPendingCompactionOperations()
|
((SyncableFileSystemView) hoodieTable.getSliceView()).getPendingCompactionOperations()
|
||||||
|
|||||||
@@ -644,6 +644,50 @@ public class TestCleaner extends HoodieClientTestBase {
|
|||||||
assertTrue(testTable.logFileExists(p0, "001", file1P0, 3));
|
assertTrue(testTable.logFileExists(p0, "001", file1P0, 3));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Test HoodieTable.clean() Cleaning by commit logic for MOR table with Log files.
|
||||||
|
*/
|
||||||
|
@Test
|
||||||
|
public void testKeepLatestCommitsMOR() throws Exception {
|
||||||
|
|
||||||
|
HoodieWriteConfig config =
|
||||||
|
HoodieWriteConfig.newBuilder().withPath(basePath).withAssumeDatePartitioning(true)
|
||||||
|
.withCompactionConfig(HoodieCompactionConfig.newBuilder()
|
||||||
|
.withCleanerPolicy(HoodieCleaningPolicy.KEEP_LATEST_COMMITS).retainCommits(1).build())
|
||||||
|
.build();
|
||||||
|
|
||||||
|
HoodieTableMetaClient metaClient = HoodieTestUtils.init(hadoopConf, basePath, HoodieTableType.MERGE_ON_READ);
|
||||||
|
HoodieTestTable testTable = HoodieTestTable.of(metaClient);
|
||||||
|
String p0 = "2020/01/01";
|
||||||
|
|
||||||
|
// Make 3 files, one base file and 2 log files associated with base file
|
||||||
|
String file1P0 = testTable.addDeltaCommit("000").getFileIdsWithBaseFilesInPartitions(p0).get(p0);
|
||||||
|
testTable.forDeltaCommit("000")
|
||||||
|
.withLogFile(p0, file1P0, 1)
|
||||||
|
.withLogFile(p0, file1P0, 2);
|
||||||
|
|
||||||
|
// Make 2 files, one base file and 1 log files associated with base file
|
||||||
|
testTable.addDeltaCommit("001")
|
||||||
|
.withBaseFilesInPartition(p0, file1P0)
|
||||||
|
.withLogFile(p0, file1P0, 3);
|
||||||
|
|
||||||
|
// Make 2 files, one base file and 1 log files associated with base file
|
||||||
|
testTable.addDeltaCommit("002")
|
||||||
|
.withBaseFilesInPartition(p0, file1P0)
|
||||||
|
.withLogFile(p0, file1P0, 4);
|
||||||
|
|
||||||
|
List<HoodieCleanStat> hoodieCleanStats = runCleaner(config);
|
||||||
|
assertEquals(3,
|
||||||
|
getCleanStat(hoodieCleanStats, p0).getSuccessDeleteFiles()
|
||||||
|
.size(), "Must clean three files, one parquet and 2 log files");
|
||||||
|
assertFalse(testTable.baseFileExists(p0, "000", file1P0));
|
||||||
|
assertFalse(testTable.logFilesExist(p0, "000", file1P0, 1, 2));
|
||||||
|
assertTrue(testTable.baseFileExists(p0, "001", file1P0));
|
||||||
|
assertTrue(testTable.logFileExists(p0, "001", file1P0, 3));
|
||||||
|
assertTrue(testTable.baseFileExists(p0, "002", file1P0));
|
||||||
|
assertTrue(testTable.logFileExists(p0, "002", file1P0, 4));
|
||||||
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testCleanMetadataUpgradeDowngrade() {
|
public void testCleanMetadataUpgradeDowngrade() {
|
||||||
String instantTime = "000";
|
String instantTime = "000";
|
||||||
|
|||||||
Reference in New Issue
Block a user