From 0230d40b74ee0bf99ff1d3d06ad1a7db26bef017 Mon Sep 17 00:00:00 2001 From: Manoj Govindassamy Date: Fri, 19 Nov 2021 17:02:57 -0800 Subject: [PATCH] [HUDI-2796] Metadata table support for Restore action to first commit (#4039) - Adding support for the metadata table to restore to first commit and take proper action for the bootstrap on subequent commits. --- .../HoodieBackedTableMetadataWriter.java | 67 +++++++++++++++---- .../hudi/table/action/BaseActionExecutor.java | 2 +- ...stHoodieSparkMergeOnReadTableRollback.java | 8 ++- 3 files changed, 59 insertions(+), 18 deletions(-) diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java index 6886573c1..24b34244e 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java @@ -353,21 +353,11 @@ public abstract class HoodieBackedTableMetadataWriter implements HoodieTableMeta return false; } - boolean isRollbackAction = false; - List rollbackedTimestamps = Collections.emptyList(); - if (actionMetadata.isPresent() && actionMetadata.get() instanceof HoodieRollbackMetadata) { - isRollbackAction = true; - List rollbackedInstants = - ((HoodieRollbackMetadata) actionMetadata.get()).getInstantsRollback(); - rollbackedTimestamps = rollbackedInstants.stream().map(instant -> { - return instant.getCommitTime().toString(); - }).collect(Collectors.toList()); - } - + // Detect the commit gaps if any from the data and the metadata active timeline if (dataMetaClient.getActiveTimeline().getAllCommitsTimeline().isBeforeTimelineStarts( latestMetadataInstant.get().getTimestamp()) - && (!isRollbackAction || !rollbackedTimestamps.contains(latestMetadataInstantTimestamp))) { - LOG.warn("Metadata Table will need to be re-bootstrapped as un-synced instants have been archived." + && !isCommitRevertedByInFlightAction(actionMetadata, latestMetadataInstantTimestamp)) { + LOG.error("Metadata Table will need to be re-bootstrapped as un-synced instants have been archived." + " latestMetadataInstant=" + latestMetadataInstant.get().getTimestamp() + ", latestDataInstant=" + dataMetaClient.getActiveTimeline().firstInstant().get().getTimestamp()); return true; @@ -376,10 +366,59 @@ public abstract class HoodieBackedTableMetadataWriter implements HoodieTableMeta return false; } + /** + * Is the latest commit instant reverted by the in-flight instant action? + * + * @param actionMetadata - In-flight instant action metadata + * @param latestMetadataInstantTimestamp - Metadata table latest instant timestamp + * @param - ActionMetadata type + * @return True if the latest instant action is reverted by the action + */ + private boolean isCommitRevertedByInFlightAction(Option actionMetadata, + final String latestMetadataInstantTimestamp) { + if (!actionMetadata.isPresent()) { + return false; + } + + final String INSTANT_ACTION = (actionMetadata.get() instanceof HoodieRollbackMetadata + ? HoodieTimeline.ROLLBACK_ACTION + : (actionMetadata.get() instanceof HoodieRestoreMetadata ? HoodieTimeline.RESTORE_ACTION : "")); + + List affectedInstantTimestamps; + switch (INSTANT_ACTION) { + case HoodieTimeline.ROLLBACK_ACTION: + List rollbackedInstants = + ((HoodieRollbackMetadata) actionMetadata.get()).getInstantsRollback(); + affectedInstantTimestamps = rollbackedInstants.stream().map(instant -> { + return instant.getCommitTime().toString(); + }).collect(Collectors.toList()); + + if (affectedInstantTimestamps.contains(latestMetadataInstantTimestamp)) { + return true; + } + break; + case HoodieTimeline.RESTORE_ACTION: + List restoredInstants = + ((HoodieRestoreMetadata) actionMetadata.get()).getRestoreInstantInfo(); + affectedInstantTimestamps = restoredInstants.stream().map(instant -> { + return instant.getCommitTime().toString(); + }).collect(Collectors.toList()); + + if (affectedInstantTimestamps.contains(latestMetadataInstantTimestamp)) { + return true; + } + break; + default: + return false; + } + + return false; + } + /** * Initialize the Metadata Table by listing files and partitions from the file system. * - * @param dataMetaClient {@code HoodieTableMetaClient} for the dataset. + * @param dataMetaClient {@code HoodieTableMetaClient} for the dataset. * @param inflightInstantTimestamp */ private boolean bootstrapFromFilesystem(HoodieEngineContext engineContext, HoodieTableMetaClient dataMetaClient, diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/BaseActionExecutor.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/BaseActionExecutor.java index a22479b6b..d4c920b20 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/BaseActionExecutor.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/BaseActionExecutor.java @@ -81,6 +81,6 @@ public abstract class BaseActionExecutor w.update(metadata, instantTime)); + table.getMetadataWriter(Option.of(metadata)).ifPresent(w -> w.update(metadata, instantTime)); } } diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestHoodieSparkMergeOnReadTableRollback.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestHoodieSparkMergeOnReadTableRollback.java index 6bbb0f655..1e68a9458 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestHoodieSparkMergeOnReadTableRollback.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestHoodieSparkMergeOnReadTableRollback.java @@ -139,7 +139,8 @@ public class TestHoodieSparkMergeOnReadTableRollback extends SparkClientFunction @ParameterizedTest @ValueSource(booleans = {true, false}) void testRollbackWithDeltaAndCompactionCommit(boolean rollbackUsingMarkers) throws Exception { - HoodieWriteConfig.Builder cfgBuilder = getConfigBuilder(false, rollbackUsingMarkers, HoodieIndex.IndexType.SIMPLE); + HoodieWriteConfig.Builder cfgBuilder = getConfigBuilder(false, rollbackUsingMarkers, HoodieIndex.IndexType.SIMPLE) + .withMetadataConfig(HoodieMetadataConfig.newBuilder().enable(true).build()); addConfigsForPopulateMetaFields(cfgBuilder, true); HoodieWriteConfig cfg = cfgBuilder.build(); @@ -294,7 +295,8 @@ public class TestHoodieSparkMergeOnReadTableRollback extends SparkClientFunction @Test void testMultiRollbackWithDeltaAndCompactionCommit() throws Exception { boolean populateMetaFields = true; - HoodieWriteConfig.Builder cfgBuilder = getConfigBuilder(false).withMetadataConfig(HoodieMetadataConfig.newBuilder().enable(false).build()); + HoodieWriteConfig.Builder cfgBuilder = getConfigBuilder(false) + .withMetadataConfig(HoodieMetadataConfig.newBuilder().enable(true).build()); addConfigsForPopulateMetaFields(cfgBuilder, populateMetaFields); HoodieWriteConfig cfg = cfgBuilder.build(); @@ -344,7 +346,7 @@ public class TestHoodieSparkMergeOnReadTableRollback extends SparkClientFunction newCommitTime = "002"; // WriteClient with custom config (disable small file handling) HoodieWriteConfig smallFileWriteConfig = getHoodieWriteConfigWithSmallFileHandlingOffBuilder(populateMetaFields) - .withMetadataConfig(HoodieMetadataConfig.newBuilder().enable(false).build()).build(); + .withMetadataConfig(HoodieMetadataConfig.newBuilder().enable(true).build()).build(); try (SparkRDDWriteClient nClient = getHoodieWriteClient(smallFileWriteConfig)) { nClient.startCommitWithTime(newCommitTime);