From 06240417e9393de223c9cf6e1b66858d0417768e Mon Sep 17 00:00:00 2001 From: zhangyue19921010 <69956021+zhangyue19921010@users.noreply.github.com> Date: Sat, 11 Sep 2021 12:26:04 +0800 Subject: [PATCH] [HUDI-2354] Fix TimelineServer error because of replacecommit archive (#3536) * bug fixed * done * done * travis fix * code reviewed * code review * done * code reviewed Co-authored-by: yuezhang --- .../view/AbstractTableFileSystemView.java | 20 +++++++-- .../view/TestHoodieTableFileSystemView.java | 41 +++++++++++++++++++ 2 files changed, 57 insertions(+), 4 deletions(-) diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/view/AbstractTableFileSystemView.java b/hudi-common/src/main/java/org/apache/hudi/common/table/view/AbstractTableFileSystemView.java index c9d4d95a4..01122fdc5 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/view/AbstractTableFileSystemView.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/view/AbstractTableFileSystemView.java @@ -44,6 +44,7 @@ import org.apache.hudi.exception.HoodieIOException; import org.apache.log4j.LogManager; import org.apache.log4j.Logger; +import java.io.FileNotFoundException; import java.io.IOException; import java.io.Serializable; import java.util.AbstractMap; @@ -220,6 +221,17 @@ public abstract class AbstractTableFileSystemView implements SyncableFileSystemV // get replace instant mapping for each partition, fileId return replaceMetadata.getPartitionToReplaceFileIds().entrySet().stream().flatMap(entry -> entry.getValue().stream().map(e -> new AbstractMap.SimpleEntry<>(new HoodieFileGroupId(entry.getKey(), e), instant))); + } catch (HoodieIOException ex) { + + if (ex.getIOException() instanceof FileNotFoundException) { + // Replace instant could be deleted by archive and FileNotFoundException could be threw during getInstantDetails function + // So that we need to catch the FileNotFoundException here and continue + LOG.warn(ex.getMessage()); + return Stream.empty(); + } else { + throw ex; + } + } catch (IOException e) { throw new HoodieIOException("error reading commit metadata for " + instant); } @@ -1013,7 +1025,7 @@ public abstract class AbstractTableFileSystemView implements SyncableFileSystemV /** * Default implementation for fetching latest base-file. - * + * * @param partitionPath Partition path * @param fileId File Id * @return base File if present @@ -1025,7 +1037,7 @@ public abstract class AbstractTableFileSystemView implements SyncableFileSystemV /** * Default implementation for fetching file-slice. - * + * * @param partitionPath Partition path * @param fileId File Id * @return File Slice if present @@ -1059,7 +1071,7 @@ public abstract class AbstractTableFileSystemView implements SyncableFileSystemV return HoodieTimeline.compareTimestamps(instant, GREATER_THAN, hoodieInstantOption.get().getTimestamp()); } - + private boolean isFileGroupReplacedBeforeOrOn(HoodieFileGroupId fileGroupId, String instant) { Option hoodieInstantOption = getReplaceInstant(fileGroupId); if (!hoodieInstantOption.isPresent()) { @@ -1108,7 +1120,7 @@ public abstract class AbstractTableFileSystemView implements SyncableFileSystemV /** * Return Only Commits and Compaction timeline for building file-groups. - * + * * @return {@code HoodieTimeline} */ public HoodieTimeline getVisibleCommitsAndCompactionTimeline() { diff --git a/hudi-common/src/test/java/org/apache/hudi/common/table/view/TestHoodieTableFileSystemView.java b/hudi-common/src/test/java/org/apache/hudi/common/table/view/TestHoodieTableFileSystemView.java index 37a7c91c8..dee15e22d 100644 --- a/hudi-common/src/test/java/org/apache/hudi/common/table/view/TestHoodieTableFileSystemView.java +++ b/hudi-common/src/test/java/org/apache/hudi/common/table/view/TestHoodieTableFileSystemView.java @@ -81,6 +81,7 @@ import java.util.UUID; import java.util.stream.Collectors; import java.util.stream.Stream; +import static org.junit.jupiter.api.Assertions.assertDoesNotThrow; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertTrue; @@ -141,6 +142,46 @@ public class TestHoodieTableFileSystemView extends HoodieCommonTestHarness { testViewForFileSlicesWithNoBaseFile(1, 0, ""); } + @Test + public void testCloseHoodieTableFileSystemView() throws Exception { + String instantTime1 = "1"; + String instantTime2 = "2"; + String clusteringInstantTime3 = "3"; + String clusteringInstantTime4 = "4"; + + // prepare metadata + HoodieActiveTimeline commitTimeline = metaClient.getActiveTimeline(); + Map> partitionToReplaceFileIds = new HashMap<>(); + List replacedFileIds = new ArrayList<>(); + replacedFileIds.add("fake_file_id"); + partitionToReplaceFileIds.put("fake_partition_path", replacedFileIds); + + // prepare Instants + HoodieInstant instant1 = new HoodieInstant(true, HoodieTimeline.COMMIT_ACTION, instantTime1); + HoodieInstant instant2 = new HoodieInstant(true, HoodieTimeline.COMMIT_ACTION, instantTime2); + HoodieInstant clusteringInstant3 = new HoodieInstant(true, HoodieTimeline.REPLACE_COMMIT_ACTION, clusteringInstantTime3); + HoodieInstant clusteringInstant4 = new HoodieInstant(true, HoodieTimeline.REPLACE_COMMIT_ACTION, clusteringInstantTime4); + HoodieCommitMetadata commitMetadata = + CommitUtils.buildMetadata(Collections.emptyList(), partitionToReplaceFileIds, Option.empty(), WriteOperationType.CLUSTER, "", HoodieTimeline.REPLACE_COMMIT_ACTION); + + saveAsComplete(commitTimeline, instant1, Option.empty()); + saveAsComplete(commitTimeline, instant2, Option.empty()); + saveAsComplete(commitTimeline, clusteringInstant3, Option.empty()); + saveAsComplete(commitTimeline, clusteringInstant4, Option.of(commitMetadata.toJsonString().getBytes(StandardCharsets.UTF_8))); + + refreshFsView(); + + // Now create a scenario where archiving deleted replace commits (requested,inflight and replacecommit) + boolean deleteReplaceCommit = new File(this.basePath + "/.hoodie/" + clusteringInstantTime3 + ".replacecommit").delete(); + boolean deleteReplaceCommitRequested = new File(this.basePath + "/.hoodie/" + clusteringInstantTime3 + ".replacecommit.requested").delete(); + boolean deleteReplaceCommitInflight = new File(this.basePath + "/.hoodie/" + clusteringInstantTime3 + ".replacecommit.inflight").delete(); + + // confirm deleted + assertTrue(deleteReplaceCommit && deleteReplaceCommitInflight && deleteReplaceCommitRequested); + assertDoesNotThrow(() -> fsView.close()); + + } + protected void testViewForFileSlicesWithNoBaseFile(int expNumTotalFileSlices, int expNumTotalDataFiles, String partitionPath) throws Exception { Paths.get(basePath, partitionPath).toFile().mkdirs();