From f91e9e63e136ea558bc5c5fdfd97641dcbbc6813 Mon Sep 17 00:00:00 2001 From: Sivabalan Narayanan Date: Mon, 11 Apr 2022 21:02:43 -0700 Subject: [PATCH] [HUDI-3799] Fixing not deleting empty instants w/o archiving (#5261) --- .../hudi/client/HoodieTimelineArchiver.java | 23 ++++++----- .../client/utils/MetadataConversionUtils.java | 40 +++++++++++++++++++ .../hudi/io/TestHoodieTimelineArchiver.java | 20 +++++++++- .../org/apache/hudi/table/TestCleaner.java | 2 +- .../testutils/HoodieClientTestHarness.java | 10 +++-- .../common/testutils/FileCreateUtils.java | 4 ++ .../common/testutils/HoodieTestTable.java | 11 ++--- 7 files changed, 88 insertions(+), 22 deletions(-) diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/HoodieTimelineArchiver.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/HoodieTimelineArchiver.java index 12d00bf61..190a5fe1c 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/HoodieTimelineArchiver.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/HoodieTimelineArchiver.java @@ -588,19 +588,16 @@ public class HoodieTimelineArchiver { List records = new ArrayList<>(); for (HoodieInstant hoodieInstant : instants) { try { - if (table.getActiveTimeline().isEmpty(hoodieInstant) - && ( - hoodieInstant.getAction().equals(HoodieTimeline.CLEAN_ACTION) - || (hoodieInstant.getAction().equals(HoodieTimeline.ROLLBACK_ACTION) && hoodieInstant.isCompleted()) - ) - ) { - table.getActiveTimeline().deleteEmptyInstantIfExists(hoodieInstant); + deleteAnyLeftOverMarkers(context, hoodieInstant); + // in local FS and HDFS, there could be empty completed instants due to crash. + if (table.getActiveTimeline().isEmpty(hoodieInstant) && hoodieInstant.isCompleted()) { + // lets add an entry to the archival, even if not for the plan. + records.add(createAvroRecordFromEmptyInstant(hoodieInstant)); } else { - deleteAnyLeftOverMarkers(context, hoodieInstant); records.add(convertToAvroRecord(hoodieInstant)); - if (records.size() >= this.config.getCommitArchivalBatchSize()) { - writeToFile(wrapperSchema, records); - } + } + if (records.size() >= this.config.getCommitArchivalBatchSize()) { + writeToFile(wrapperSchema, records); } } catch (Exception e) { LOG.error("Failed to archive commits, .commit file: " + hoodieInstant.getFileName(), e); @@ -637,4 +634,8 @@ public class HoodieTimelineArchiver { throws IOException { return MetadataConversionUtils.createMetaWrapper(hoodieInstant, metaClient); } + + private IndexedRecord createAvroRecordFromEmptyInstant(HoodieInstant hoodieInstant) throws IOException { + return MetadataConversionUtils.createMetaWrapperForEmptyInstant(hoodieInstant); + } } diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/utils/MetadataConversionUtils.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/utils/MetadataConversionUtils.java index d588a9c5d..342de74a1 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/utils/MetadataConversionUtils.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/utils/MetadataConversionUtils.java @@ -125,6 +125,46 @@ public class MetadataConversionUtils { return archivedMetaWrapper; } + public static HoodieArchivedMetaEntry createMetaWrapperForEmptyInstant(HoodieInstant hoodieInstant) throws IOException { + HoodieArchivedMetaEntry archivedMetaWrapper = new HoodieArchivedMetaEntry(); + archivedMetaWrapper.setCommitTime(hoodieInstant.getTimestamp()); + archivedMetaWrapper.setActionState(hoodieInstant.getState().name()); + switch (hoodieInstant.getAction()) { + case HoodieTimeline.CLEAN_ACTION: { + archivedMetaWrapper.setActionType(ActionType.clean.name()); + break; + } + case HoodieTimeline.COMMIT_ACTION: { + archivedMetaWrapper.setActionType(ActionType.commit.name()); + break; + } + case HoodieTimeline.DELTA_COMMIT_ACTION: { + archivedMetaWrapper.setActionType(ActionType.deltacommit.name()); + break; + } + case HoodieTimeline.REPLACE_COMMIT_ACTION: { + archivedMetaWrapper.setActionType(ActionType.replacecommit.name()); + break; + } + case HoodieTimeline.ROLLBACK_ACTION: { + archivedMetaWrapper.setActionType(ActionType.rollback.name()); + break; + } + case HoodieTimeline.SAVEPOINT_ACTION: { + archivedMetaWrapper.setActionType(ActionType.savepoint.name()); + break; + } + case HoodieTimeline.COMPACTION_ACTION: { + archivedMetaWrapper.setActionType(ActionType.compaction.name()); + break; + } + default: { + throw new UnsupportedOperationException("Action not fully supported yet"); + } + } + return archivedMetaWrapper; + } + public static Option getInflightReplaceMetadata(HoodieTableMetaClient metaClient, HoodieInstant instant) throws IOException { Option inflightContent = metaClient.getActiveTimeline().getInstantDetails(instant); if (!inflightContent.isPresent() || inflightContent.get().length == 0) { diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/io/TestHoodieTimelineArchiver.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/io/TestHoodieTimelineArchiver.java index 445780384..d412052c2 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/io/TestHoodieTimelineArchiver.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/io/TestHoodieTimelineArchiver.java @@ -889,12 +889,19 @@ public class TestHoodieTimelineArchiver extends HoodieClientTestHarness { metaClient = HoodieTableMetaClient.reload(metaClient); int startInstant = 1; + List expectedArchivedInstants = new ArrayList<>(); for (int i = 0; i < maxInstantsToKeep + 1; i++, startInstant++) { - createCleanMetadata(startInstant + "", false, isEmpty || i % 2 == 0); + createCleanMetadata(startInstant + "", false, false, isEmpty || i % 2 == 0); + expectedArchivedInstants.add(new HoodieInstant(State.REQUESTED, HoodieTimeline.CLEAN_ACTION, startInstant + "")); + expectedArchivedInstants.add(new HoodieInstant(State.INFLIGHT, HoodieTimeline.CLEAN_ACTION, startInstant + "")); + expectedArchivedInstants.add(new HoodieInstant(State.COMPLETED, HoodieTimeline.CLEAN_ACTION, startInstant + "")); } for (int i = 0; i < maxInstantsToKeep + 1; i++, startInstant += 2) { createCommitAndRollbackFile(startInstant + 1 + "", startInstant + "", false, isEmpty || i % 2 == 0); + expectedArchivedInstants.add(new HoodieInstant(State.REQUESTED, HoodieTimeline.ROLLBACK_ACTION, startInstant + "")); + expectedArchivedInstants.add(new HoodieInstant(State.INFLIGHT, HoodieTimeline.ROLLBACK_ACTION, startInstant + "")); + expectedArchivedInstants.add(new HoodieInstant(State.COMPLETED, HoodieTimeline.ROLLBACK_ACTION, startInstant + "")); } if (enableMetadataTable) { @@ -916,6 +923,14 @@ public class TestHoodieTimelineArchiver extends HoodieClientTestHarness { assertTrue(actionInstantMap.containsKey("rollback"), "Rollback Action key must be preset"); assertEquals(minInstantsToKeep, actionInstantMap.get("rollback").size(), "Should have min instant"); + + // verify all expected instants are part of archived timeline + metaClient.getArchivedTimeline().loadCompletedInstantDetailsInMemory(); + HoodieInstant firstInstant = metaClient.reloadActiveTimeline().firstInstant().get(); + expectedArchivedInstants = expectedArchivedInstants.stream() + .filter(entry -> HoodieTimeline.compareTimestamps(entry.getTimestamp(), HoodieTimeline.LESSER_THAN, firstInstant.getTimestamp() + )).collect(Collectors.toList()); + expectedArchivedInstants.forEach(entry -> assertTrue(metaClient.getArchivedTimeline().containsInstant(entry))); } @ParameterizedTest @@ -1271,7 +1286,8 @@ public class TestHoodieTimelineArchiver extends HoodieClientTestHarness { private List getArchivedInstants(HoodieInstant instant) { List instants = new ArrayList<>(); - if (instant.getAction() == HoodieTimeline.COMMIT_ACTION || instant.getAction() == HoodieTimeline.DELTA_COMMIT_ACTION || instant.getAction() == HoodieTimeline.CLEAN_ACTION) { + if (instant.getAction().equals(HoodieTimeline.COMMIT_ACTION) || instant.getAction().equals(HoodieTimeline.DELTA_COMMIT_ACTION) + || instant.getAction().equals(HoodieTimeline.CLEAN_ACTION) || instant.getAction().equals(HoodieTimeline.ROLLBACK_ACTION)) { instants.add(new HoodieInstant(State.REQUESTED, instant.getAction(), instant.getTimestamp())); } instants.add(new HoodieInstant(State.INFLIGHT, instant.getAction(), instant.getTimestamp())); diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/TestCleaner.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/TestCleaner.java index 7e774c32c..b8545b0f6 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/TestCleaner.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/TestCleaner.java @@ -744,7 +744,7 @@ public class TestCleaner extends HoodieClientTestBase { for (int i = 0; i < cleanCount; i++, startInstant++) { String commitTime = makeNewCommitTime(startInstant, "%09d"); - createCleanMetadata(commitTime + "", false, true); + createEmptyCleanMetadata(commitTime + "", false); } int instantClean = startInstant; diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/HoodieClientTestHarness.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/HoodieClientTestHarness.java index 1b41769ec..4504c552c 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/HoodieClientTestHarness.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/HoodieClientTestHarness.java @@ -696,10 +696,14 @@ public abstract class HoodieClientTestHarness extends HoodieCommonTestHarness im } public HoodieInstant createCleanMetadata(String instantTime, boolean inflightOnly) throws IOException { - return createCleanMetadata(instantTime, inflightOnly, false); + return createCleanMetadata(instantTime, inflightOnly, false, false); } - public HoodieInstant createCleanMetadata(String instantTime, boolean inflightOnly, boolean isEmpty) throws IOException { + public HoodieInstant createEmptyCleanMetadata(String instantTime, boolean inflightOnly) throws IOException { + return createCleanMetadata(instantTime, inflightOnly, true, true); + } + + public HoodieInstant createCleanMetadata(String instantTime, boolean inflightOnly, boolean isEmptyForAll, boolean isEmptyCompleted) throws IOException { HoodieCleanerPlan cleanerPlan = new HoodieCleanerPlan(new HoodieActionInstant("", "", ""), "", new HashMap<>(), CleanPlanV2MigrationHandler.VERSION, new HashMap<>(), new ArrayList<>()); if (inflightOnly) { @@ -713,7 +717,7 @@ public abstract class HoodieClientTestHarness extends HoodieCommonTestHarness im Collections.emptyList(), instantTime); HoodieCleanMetadata cleanMetadata = convertCleanMetadata(instantTime, Option.of(0L), Collections.singletonList(cleanStats)); - HoodieTestTable.of(metaClient).addClean(instantTime, cleanerPlan, cleanMetadata, isEmpty); + HoodieTestTable.of(metaClient).addClean(instantTime, cleanerPlan, cleanMetadata, isEmptyForAll, isEmptyCompleted); } return new HoodieInstant(inflightOnly, "clean", instantTime); } diff --git a/hudi-common/src/test/java/org/apache/hudi/common/testutils/FileCreateUtils.java b/hudi-common/src/test/java/org/apache/hudi/common/testutils/FileCreateUtils.java index 27dd9df5e..d5def16f5 100644 --- a/hudi-common/src/test/java/org/apache/hudi/common/testutils/FileCreateUtils.java +++ b/hudi-common/src/test/java/org/apache/hudi/common/testutils/FileCreateUtils.java @@ -248,6 +248,10 @@ public class FileCreateUtils { createMetaFile(basePath, instantTime, HoodieTimeline.REQUESTED_ROLLBACK_EXTENSION, content); } + public static void createRequestedRollbackFile(String basePath, String instantTime) throws IOException { + createMetaFile(basePath, instantTime, HoodieTimeline.REQUESTED_ROLLBACK_EXTENSION); + } + public static void createInflightRollbackFile(String basePath, String instantTime) throws IOException { createMetaFile(basePath, instantTime, HoodieTimeline.INFLIGHT_ROLLBACK_EXTENSION); } diff --git a/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestTable.java b/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestTable.java index f0aae0a69..1f748f143 100644 --- a/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestTable.java +++ b/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestTable.java @@ -282,13 +282,13 @@ public class HoodieTestTable { } public HoodieTestTable addClean(String instantTime, HoodieCleanerPlan cleanerPlan, HoodieCleanMetadata metadata) throws IOException { - return addClean(instantTime, cleanerPlan, metadata, false); + return addClean(instantTime, cleanerPlan, metadata, false, false); } - public HoodieTestTable addClean(String instantTime, HoodieCleanerPlan cleanerPlan, HoodieCleanMetadata metadata, boolean isEmpty) throws IOException { - createRequestedCleanFile(basePath, instantTime, cleanerPlan, isEmpty); - createInflightCleanFile(basePath, instantTime, cleanerPlan, isEmpty); - createCleanFile(basePath, instantTime, metadata, isEmpty); + public HoodieTestTable addClean(String instantTime, HoodieCleanerPlan cleanerPlan, HoodieCleanMetadata metadata, boolean isEmptyForAll, boolean isEmptyCompleted) throws IOException { + createRequestedCleanFile(basePath, instantTime, cleanerPlan, isEmptyForAll); + createInflightCleanFile(basePath, instantTime, cleanerPlan, isEmptyForAll); + createCleanFile(basePath, instantTime, metadata, isEmptyCompleted); currentInstantTime = instantTime; return this; } @@ -335,6 +335,7 @@ public class HoodieTestTable { } public HoodieTestTable addRollback(String instantTime, HoodieRollbackMetadata rollbackMetadata, boolean isEmpty) throws IOException { + createRequestedRollbackFile(basePath, instantTime); createInflightRollbackFile(basePath, instantTime); createRollbackFile(basePath, instantTime, rollbackMetadata, isEmpty); currentInstantTime = instantTime;