[HUDI-2354] Fix TimelineServer error because of replacecommit archive (#3536)
* bug fixed * done * done * travis fix * code reviewed * code review * done * code reviewed Co-authored-by: yuezhang <yuezhang@freewheel.tv>
This commit is contained in:
@@ -44,6 +44,7 @@ import org.apache.hudi.exception.HoodieIOException;
|
||||
import org.apache.log4j.LogManager;
|
||||
import org.apache.log4j.Logger;
|
||||
|
||||
import java.io.FileNotFoundException;
|
||||
import java.io.IOException;
|
||||
import java.io.Serializable;
|
||||
import java.util.AbstractMap;
|
||||
@@ -220,6 +221,17 @@ public abstract class AbstractTableFileSystemView implements SyncableFileSystemV
|
||||
// get replace instant mapping for each partition, fileId
|
||||
return replaceMetadata.getPartitionToReplaceFileIds().entrySet().stream().flatMap(entry -> entry.getValue().stream().map(e ->
|
||||
new AbstractMap.SimpleEntry<>(new HoodieFileGroupId(entry.getKey(), e), instant)));
|
||||
} catch (HoodieIOException ex) {
|
||||
|
||||
if (ex.getIOException() instanceof FileNotFoundException) {
|
||||
// Replace instant could be deleted by archive and FileNotFoundException could be threw during getInstantDetails function
|
||||
// So that we need to catch the FileNotFoundException here and continue
|
||||
LOG.warn(ex.getMessage());
|
||||
return Stream.empty();
|
||||
} else {
|
||||
throw ex;
|
||||
}
|
||||
|
||||
} catch (IOException e) {
|
||||
throw new HoodieIOException("error reading commit metadata for " + instant);
|
||||
}
|
||||
@@ -1013,7 +1025,7 @@ public abstract class AbstractTableFileSystemView implements SyncableFileSystemV
|
||||
|
||||
/**
|
||||
* Default implementation for fetching latest base-file.
|
||||
*
|
||||
*
|
||||
* @param partitionPath Partition path
|
||||
* @param fileId File Id
|
||||
* @return base File if present
|
||||
@@ -1025,7 +1037,7 @@ public abstract class AbstractTableFileSystemView implements SyncableFileSystemV
|
||||
|
||||
/**
|
||||
* Default implementation for fetching file-slice.
|
||||
*
|
||||
*
|
||||
* @param partitionPath Partition path
|
||||
* @param fileId File Id
|
||||
* @return File Slice if present
|
||||
@@ -1059,7 +1071,7 @@ public abstract class AbstractTableFileSystemView implements SyncableFileSystemV
|
||||
|
||||
return HoodieTimeline.compareTimestamps(instant, GREATER_THAN, hoodieInstantOption.get().getTimestamp());
|
||||
}
|
||||
|
||||
|
||||
private boolean isFileGroupReplacedBeforeOrOn(HoodieFileGroupId fileGroupId, String instant) {
|
||||
Option<HoodieInstant> hoodieInstantOption = getReplaceInstant(fileGroupId);
|
||||
if (!hoodieInstantOption.isPresent()) {
|
||||
@@ -1108,7 +1120,7 @@ public abstract class AbstractTableFileSystemView implements SyncableFileSystemV
|
||||
|
||||
/**
|
||||
* Return Only Commits and Compaction timeline for building file-groups.
|
||||
*
|
||||
*
|
||||
* @return {@code HoodieTimeline}
|
||||
*/
|
||||
public HoodieTimeline getVisibleCommitsAndCompactionTimeline() {
|
||||
|
||||
@@ -81,6 +81,7 @@ import java.util.UUID;
|
||||
import java.util.stream.Collectors;
|
||||
import java.util.stream.Stream;
|
||||
|
||||
import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
|
||||
import static org.junit.jupiter.api.Assertions.assertEquals;
|
||||
import static org.junit.jupiter.api.Assertions.assertFalse;
|
||||
import static org.junit.jupiter.api.Assertions.assertTrue;
|
||||
@@ -141,6 +142,46 @@ public class TestHoodieTableFileSystemView extends HoodieCommonTestHarness {
|
||||
testViewForFileSlicesWithNoBaseFile(1, 0, "");
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testCloseHoodieTableFileSystemView() throws Exception {
|
||||
String instantTime1 = "1";
|
||||
String instantTime2 = "2";
|
||||
String clusteringInstantTime3 = "3";
|
||||
String clusteringInstantTime4 = "4";
|
||||
|
||||
// prepare metadata
|
||||
HoodieActiveTimeline commitTimeline = metaClient.getActiveTimeline();
|
||||
Map<String, List<String>> partitionToReplaceFileIds = new HashMap<>();
|
||||
List<String> replacedFileIds = new ArrayList<>();
|
||||
replacedFileIds.add("fake_file_id");
|
||||
partitionToReplaceFileIds.put("fake_partition_path", replacedFileIds);
|
||||
|
||||
// prepare Instants
|
||||
HoodieInstant instant1 = new HoodieInstant(true, HoodieTimeline.COMMIT_ACTION, instantTime1);
|
||||
HoodieInstant instant2 = new HoodieInstant(true, HoodieTimeline.COMMIT_ACTION, instantTime2);
|
||||
HoodieInstant clusteringInstant3 = new HoodieInstant(true, HoodieTimeline.REPLACE_COMMIT_ACTION, clusteringInstantTime3);
|
||||
HoodieInstant clusteringInstant4 = new HoodieInstant(true, HoodieTimeline.REPLACE_COMMIT_ACTION, clusteringInstantTime4);
|
||||
HoodieCommitMetadata commitMetadata =
|
||||
CommitUtils.buildMetadata(Collections.emptyList(), partitionToReplaceFileIds, Option.empty(), WriteOperationType.CLUSTER, "", HoodieTimeline.REPLACE_COMMIT_ACTION);
|
||||
|
||||
saveAsComplete(commitTimeline, instant1, Option.empty());
|
||||
saveAsComplete(commitTimeline, instant2, Option.empty());
|
||||
saveAsComplete(commitTimeline, clusteringInstant3, Option.empty());
|
||||
saveAsComplete(commitTimeline, clusteringInstant4, Option.of(commitMetadata.toJsonString().getBytes(StandardCharsets.UTF_8)));
|
||||
|
||||
refreshFsView();
|
||||
|
||||
// Now create a scenario where archiving deleted replace commits (requested,inflight and replacecommit)
|
||||
boolean deleteReplaceCommit = new File(this.basePath + "/.hoodie/" + clusteringInstantTime3 + ".replacecommit").delete();
|
||||
boolean deleteReplaceCommitRequested = new File(this.basePath + "/.hoodie/" + clusteringInstantTime3 + ".replacecommit.requested").delete();
|
||||
boolean deleteReplaceCommitInflight = new File(this.basePath + "/.hoodie/" + clusteringInstantTime3 + ".replacecommit.inflight").delete();
|
||||
|
||||
// confirm deleted
|
||||
assertTrue(deleteReplaceCommit && deleteReplaceCommitInflight && deleteReplaceCommitRequested);
|
||||
assertDoesNotThrow(() -> fsView.close());
|
||||
|
||||
}
|
||||
|
||||
protected void testViewForFileSlicesWithNoBaseFile(int expNumTotalFileSlices, int expNumTotalDataFiles,
|
||||
String partitionPath) throws Exception {
|
||||
Paths.get(basePath, partitionPath).toFile().mkdirs();
|
||||
|
||||
Reference in New Issue
Block a user