From 11bc1fe6f498850d2c496151741813001d3850a3 Mon Sep 17 00:00:00 2001 From: steven zhang Date: Sun, 13 Dec 2020 22:28:53 +0800 Subject: [PATCH] [HUDI-1428] Clean old fileslice is invalid (#2292) Co-authored-by: zhang wen Co-authored-by: zhang wen --- .../hudi/table/action/clean/CleanPlanner.java | 2 +- .../org/apache/hudi/table/TestCleaner.java | 44 +++++++++++++++++++ 2 files changed, 45 insertions(+), 1 deletion(-) diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanPlanner.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanPlanner.java index 405fc81b7..4f9b2a24c 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanPlanner.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanPlanner.java @@ -80,7 +80,7 @@ public class CleanPlanner implements Ser public CleanPlanner(HoodieTable hoodieTable, HoodieWriteConfig config) { this.hoodieTable = hoodieTable; this.fileSystemView = hoodieTable.getHoodieView(); - this.commitTimeline = hoodieTable.getCompletedCommitTimeline(); + this.commitTimeline = hoodieTable.getCompletedCommitsTimeline(); this.config = config; this.fgIdToPendingCompactionOperations = ((SyncableFileSystemView) hoodieTable.getSliceView()).getPendingCompactionOperations() diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/TestCleaner.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/TestCleaner.java index 00f1ea00e..69c6f98c6 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/TestCleaner.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/TestCleaner.java @@ -644,6 +644,50 @@ public class TestCleaner extends HoodieClientTestBase { assertTrue(testTable.logFileExists(p0, "001", file1P0, 3)); } + /** + * Test HoodieTable.clean() Cleaning by commit logic for MOR table with Log files. + */ + @Test + public void testKeepLatestCommitsMOR() throws Exception { + + HoodieWriteConfig config = + HoodieWriteConfig.newBuilder().withPath(basePath).withAssumeDatePartitioning(true) + .withCompactionConfig(HoodieCompactionConfig.newBuilder() + .withCleanerPolicy(HoodieCleaningPolicy.KEEP_LATEST_COMMITS).retainCommits(1).build()) + .build(); + + HoodieTableMetaClient metaClient = HoodieTestUtils.init(hadoopConf, basePath, HoodieTableType.MERGE_ON_READ); + HoodieTestTable testTable = HoodieTestTable.of(metaClient); + String p0 = "2020/01/01"; + + // Make 3 files, one base file and 2 log files associated with base file + String file1P0 = testTable.addDeltaCommit("000").getFileIdsWithBaseFilesInPartitions(p0).get(p0); + testTable.forDeltaCommit("000") + .withLogFile(p0, file1P0, 1) + .withLogFile(p0, file1P0, 2); + + // Make 2 files, one base file and 1 log files associated with base file + testTable.addDeltaCommit("001") + .withBaseFilesInPartition(p0, file1P0) + .withLogFile(p0, file1P0, 3); + + // Make 2 files, one base file and 1 log files associated with base file + testTable.addDeltaCommit("002") + .withBaseFilesInPartition(p0, file1P0) + .withLogFile(p0, file1P0, 4); + + List hoodieCleanStats = runCleaner(config); + assertEquals(3, + getCleanStat(hoodieCleanStats, p0).getSuccessDeleteFiles() + .size(), "Must clean three files, one parquet and 2 log files"); + assertFalse(testTable.baseFileExists(p0, "000", file1P0)); + assertFalse(testTable.logFilesExist(p0, "000", file1P0, 1, 2)); + assertTrue(testTable.baseFileExists(p0, "001", file1P0)); + assertTrue(testTable.logFileExists(p0, "001", file1P0, 3)); + assertTrue(testTable.baseFileExists(p0, "002", file1P0)); + assertTrue(testTable.logFileExists(p0, "002", file1P0, 4)); + } + @Test public void testCleanMetadataUpgradeDowngrade() { String instantTime = "000";