From 2d5ac551955e8c7be7be60edc5aaf17d2ed7f650 Mon Sep 17 00:00:00 2001 From: zhangyue19921010 <69956021+zhangyue19921010@users.noreply.github.com> Date: Thu, 16 Sep 2021 07:00:04 +0800 Subject: [PATCH] [HUDI-2355][Bug]Archive service executed after cleaner finished. (#3545) Co-authored-by: yuezhang --- .../client/AbstractHoodieWriteClient.java | 2 +- .../functional/TestHoodieDeltaStreamer.java | 84 +++++++++++++++++++ 2 files changed, 85 insertions(+), 1 deletion(-) diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/AbstractHoodieWriteClient.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/AbstractHoodieWriteClient.java index dfb2fc8fc..9650ddaeb 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/AbstractHoodieWriteClient.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/AbstractHoodieWriteClient.java @@ -434,10 +434,10 @@ public abstract class AbstractHoodieWriteClient { + TestHelpers.assertAtLeastNReplaceCommits(2, tableBasePath, dfs); + return true; + }); + + TestHelpers.assertAtLeastNCommits(6, tableBasePath, dfs); + TestHelpers.assertAtLeastNReplaceCommits(2, tableBasePath, dfs); + + // Step 2 : Get the first replacecommit and extract the corresponding replaced file IDs. + HoodieTableMetaClient meta = HoodieTableMetaClient.builder().setConf(dfs.getConf()).setBasePath(tableBasePath).build(); + HoodieTimeline replacedTimeline = meta.reloadActiveTimeline().getCompletedReplaceTimeline(); + Option firstReplaceHoodieInstant = replacedTimeline.nthFromLastInstant(1); + assertTrue(firstReplaceHoodieInstant.isPresent()); + + Option firstReplaceHoodieInstantDetails = replacedTimeline.getInstantDetails(firstReplaceHoodieInstant.get()); + HoodieReplaceCommitMetadata firstReplaceMetadata = HoodieReplaceCommitMetadata.fromBytes(firstReplaceHoodieInstantDetails.get(), HoodieReplaceCommitMetadata.class); + Map> partitionToReplaceFileIds = firstReplaceMetadata.getPartitionToReplaceFileIds(); + String partitionName = null; + List replacedFileIDs = null; + for (Map.Entry entry : partitionToReplaceFileIds.entrySet()) { + partitionName = String.valueOf(entry.getKey()); + replacedFileIDs = (List) entry.getValue(); + } + + assertNotNull(partitionName); + assertNotNull(replacedFileIDs); + + // Step 3 : Based to replacedFileIDs , get the corresponding complete path. + ArrayList replacedFilePaths = new ArrayList<>(); + Path partitionPath = new Path(meta.getBasePath(), partitionName); + RemoteIterator hoodieFiles = meta.getFs().listFiles(partitionPath, true); + while (hoodieFiles.hasNext()) { + LocatedFileStatus f = hoodieFiles.next(); + String file = f.getPath().toUri().toString(); + for (Object replacedFileID : replacedFileIDs) { + if (file.contains(String.valueOf(replacedFileID))) { + replacedFilePaths.add(file); + } + } + } + + assertFalse(replacedFilePaths.isEmpty()); + + // Step 4 : Insert 1 record and trigger sync/async cleaner and archive. + List configs = getAsyncServicesConfigs(1, "true", "true", "2", "", ""); + configs.add(String.format("%s=%s", HoodieCompactionConfig.CLEANER_POLICY.key(), "KEEP_LATEST_COMMITS")); + configs.add(String.format("%s=%s", HoodieCompactionConfig.CLEANER_COMMITS_RETAINED.key(), "1")); + configs.add(String.format("%s=%s", HoodieCompactionConfig.MIN_COMMITS_TO_KEEP.key(), "2")); + configs.add(String.format("%s=%s", HoodieCompactionConfig.MAX_COMMITS_TO_KEEP.key(), "3")); + configs.add(String.format("%s=%s", HoodieCompactionConfig.ASYNC_CLEAN, asyncClean)); + cfg.configs = configs; + cfg.continuousMode = false; + ds = new HoodieDeltaStreamer(cfg, jsc); + ds.sync(); + + // Step 5 : Make sure that firstReplaceHoodieInstant is archived. + long count = meta.reloadActiveTimeline().getCompletedReplaceTimeline().getInstants().filter(instant -> firstReplaceHoodieInstant.get().equals(instant)).count(); + assertEquals(0, count); + + // Step 6 : All the replaced files in firstReplaceHoodieInstant should be deleted through sync/async cleaner. + for (String replacedFilePath : replacedFilePaths) { + assertFalse(meta.getFs().exists(new Path(replacedFilePath))); + } + } + private List getAsyncServicesConfigs(int totalRecords, String autoClean, String inlineCluster, String inlineClusterMaxCommit, String asyncCluster, String asyncClusterMaxCommit) { List configs = new ArrayList<>();