From e7ec3a82dc274b8d683d74e59ee7bf35d7827ce0 Mon Sep 17 00:00:00 2001 From: Sivabalan Narayanan Date: Thu, 10 Feb 2022 08:06:23 -0500 Subject: [PATCH] [HUDI-2432] Adding restore.requested instant and restore plan for restore action (#4605) - This adds a restore plan and serializes it to restore.requested meta file in timeline. This also means that we are introducing schedule and execution phases for restore which was not present before. --- .../hudi/client/BaseHoodieWriteClient.java | 24 ++++-- .../exception/HoodieRestoreException.java | 4 + .../org/apache/hudi/table/HoodieTable.java | 15 ++++ .../restore/BaseRestoreActionExecutor.java | 61 ++++++++++---- .../table/action/restore/RestoreUtils.java | 46 ++++++++++ .../rollback/RestorePlanActionExecutor.java | 84 +++++++++++++++++++ .../table/HoodieFlinkCopyOnWriteTable.java | 6 ++ .../table/HoodieJavaCopyOnWriteTable.java | 7 ++ .../table/HoodieSparkCopyOnWriteTable.java | 7 ++ .../table/HoodieSparkMergeOnReadTable.java | 2 + .../TestHoodieClientOnCopyOnWriteStorage.java | 1 + .../src/main/avro/HoodieRestorePlan.avsc | 37 ++++++++ .../table/timeline/HoodieActiveTimeline.java | 29 ++++++- .../common/table/timeline/HoodieInstant.java | 1 + .../common/table/timeline/HoodieTimeline.java | 5 ++ .../table/timeline/TimelineMetadataUtils.java | 5 ++ 16 files changed, 309 insertions(+), 25 deletions(-) create mode 100644 hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/restore/RestoreUtils.java create mode 100644 hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/RestorePlanActionExecutor.java create mode 100644 hudi-common/src/main/avro/HoodieRestorePlan.avsc diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java index 271f8a378..2414a9fb7 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java @@ -23,6 +23,7 @@ import org.apache.hudi.avro.model.HoodieCleanerPlan; import org.apache.hudi.avro.model.HoodieClusteringPlan; import org.apache.hudi.avro.model.HoodieCompactionPlan; import org.apache.hudi.avro.model.HoodieRestoreMetadata; +import org.apache.hudi.avro.model.HoodieRestorePlan; import org.apache.hudi.avro.model.HoodieRollbackMetadata; import org.apache.hudi.avro.model.HoodieRollbackPlan; import org.apache.hudi.callback.HoodieWriteCommitCallback; @@ -690,16 +691,21 @@ public abstract class BaseHoodieWriteClient table = createTable(config, hadoopConf, config.isMetadataTableEnabled()); - HoodieRestoreMetadata restoreMetadata = table.restore(context, restoreInstantTime, instantTime); - if (timerContext != null) { - final long durationInMs = metrics.getDurationInMs(timerContext.stop()); - final long totalFilesDeleted = restoreMetadata.getHoodieRestoreMetadata().values().stream() - .flatMap(Collection::stream) - .mapToLong(HoodieRollbackMetadata::getTotalFilesDeleted) - .sum(); - metrics.updateRollbackMetrics(durationInMs, totalFilesDeleted); + Option restorePlanOption = table.scheduleRestore(context, restoreInstantTime, instantTime); + if (restorePlanOption.isPresent()) { + HoodieRestoreMetadata restoreMetadata = table.restore(context, restoreInstantTime, instantTime); + if (timerContext != null) { + final long durationInMs = metrics.getDurationInMs(timerContext.stop()); + final long totalFilesDeleted = restoreMetadata.getHoodieRestoreMetadata().values().stream() + .flatMap(Collection::stream) + .mapToLong(HoodieRollbackMetadata::getTotalFilesDeleted) + .sum(); + metrics.updateRollbackMetrics(durationInMs, totalFilesDeleted); + } + return restoreMetadata; + } else { + throw new HoodieRestoreException("Failed to restore " + config.getBasePath() + " to commit " + instantTime); } - return restoreMetadata; } catch (Exception e) { throw new HoodieRestoreException("Failed to restore to " + instantTime, e); } diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/exception/HoodieRestoreException.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/exception/HoodieRestoreException.java index c6c9076f5..baad53aba 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/exception/HoodieRestoreException.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/exception/HoodieRestoreException.java @@ -23,4 +23,8 @@ public class HoodieRestoreException extends HoodieException { public HoodieRestoreException(String msg, Throwable e) { super(msg, e); } + + public HoodieRestoreException(String msg) { + super(msg); + } } diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTable.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTable.java index f19b6aa86..639467fcd 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTable.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTable.java @@ -29,6 +29,7 @@ import org.apache.hudi.avro.model.HoodieCleanerPlan; import org.apache.hudi.avro.model.HoodieClusteringPlan; import org.apache.hudi.avro.model.HoodieCompactionPlan; import org.apache.hudi.avro.model.HoodieRestoreMetadata; +import org.apache.hudi.avro.model.HoodieRestorePlan; import org.apache.hudi.avro.model.HoodieRollbackMetadata; import org.apache.hudi.avro.model.HoodieRollbackPlan; import org.apache.hudi.avro.model.HoodieSavepointMetadata; @@ -346,6 +347,13 @@ public abstract class HoodieTable implem return getActiveTimeline().getRollbackTimeline(); } + /** + * Get restore timeline. + */ + public HoodieTimeline getRestoreTimeline() { + return getActiveTimeline().getRestoreTimeline(); + } + /** * Get only the completed (no-inflights) savepoint timeline. */ @@ -497,6 +505,13 @@ public abstract class HoodieTable implem String restoreInstantTime, String instantToRestore); + /** + * Schedules Restore for the table to the given instant. + */ + public abstract Option scheduleRestore(HoodieEngineContext context, + String restoreInstantTime, + String instantToRestore); + /** * Rollback failed compactions. Inflight rollbacks for compactions revert the .inflight file * to the .requested file. diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/restore/BaseRestoreActionExecutor.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/restore/BaseRestoreActionExecutor.java index 58247bb8e..9025623e8 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/restore/BaseRestoreActionExecutor.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/restore/BaseRestoreActionExecutor.java @@ -18,7 +18,9 @@ package org.apache.hudi.table.action.restore; +import org.apache.hudi.avro.model.HoodieInstantInfo; import org.apache.hudi.avro.model.HoodieRestoreMetadata; +import org.apache.hudi.avro.model.HoodieRestorePlan; import org.apache.hudi.avro.model.HoodieRollbackMetadata; import org.apache.hudi.client.transaction.TransactionManager; import org.apache.hudi.common.engine.HoodieEngineContext; @@ -29,14 +31,18 @@ import org.apache.hudi.common.table.timeline.HoodieTimeline; import org.apache.hudi.common.table.timeline.TimelineMetadataUtils; import org.apache.hudi.common.util.HoodieTimer; import org.apache.hudi.common.util.Option; +import org.apache.hudi.common.util.ValidationUtils; import org.apache.hudi.config.HoodieWriteConfig; +import org.apache.hudi.exception.HoodieRestoreException; import org.apache.hudi.exception.HoodieRollbackException; import org.apache.hudi.table.HoodieTable; import org.apache.hudi.table.action.BaseActionExecutor; + import org.apache.log4j.LogManager; import org.apache.log4j.Logger; import java.io.IOException; +import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; import java.util.List; @@ -65,29 +71,53 @@ public abstract class BaseRestoreActionExecutor instantsToRollback = table.getActiveTimeline().getWriteTimeline() - .getReverseOrderedInstants() - .filter(instant -> HoodieActiveTimeline.GREATER_THAN.test(instant.getTimestamp(), restoreInstantTime)) - .collect(Collectors.toList()); - - Map> instantToMetadata = new HashMap<>(); - table.getActiveTimeline().createNewInstant(new HoodieInstant(true, HoodieTimeline.RESTORE_ACTION, instantTime)); - instantsToRollback.forEach(instant -> { - instantToMetadata.put(instant.getTimestamp(), Collections.singletonList(rollbackInstant(instant))); - LOG.info("Deleted instant " + instant); - }); - + Option restoreInstant = table.getRestoreTimeline() + .filterInflightsAndRequested() + .filter(instant -> instant.getTimestamp().equals(instantTime)) + .firstInstant(); + if (!restoreInstant.isPresent()) { + throw new HoodieRollbackException("No pending restore instants found to execute restore"); + } try { + List instantsToRollback = getInstantsToRollback(restoreInstant.get()); + ValidationUtils.checkArgument(restoreInstant.get().getState().equals(HoodieInstant.State.REQUESTED) + || restoreInstant.get().getState().equals(HoodieInstant.State.INFLIGHT)); + Map> instantToMetadata = new HashMap<>(); + if (restoreInstant.get().isRequested()) { + table.getActiveTimeline().transitionRestoreRequestedToInflight(restoreInstant.get()); + } + + instantsToRollback.forEach(instant -> { + instantToMetadata.put(instant.getTimestamp(), Collections.singletonList(rollbackInstant(instant))); + LOG.info("Deleted instant " + instant); + }); + return finishRestore(instantToMetadata, instantsToRollback, restoreTimer.endTimer() ); } catch (IOException io) { - throw new HoodieRollbackException("unable to rollback instants " + instantsToRollback, io); + throw new HoodieRestoreException("unable to Restore instant " + restoreInstant.get(), io); } } + private List getInstantsToRollback(HoodieInstant restoreInstant) throws IOException { + List instantsToRollback = new ArrayList<>(); + HoodieRestorePlan restorePlan = RestoreUtils.getRestorePlan(table.getMetaClient(), restoreInstant); + for (HoodieInstantInfo instantInfo : restorePlan.getInstantsToRollback()) { + // If restore crashed mid-way, there are chances that some commits are already rolled back, + // but some are not. so, we can ignore those commits which are fully rolledback in previous attempt if any. + Option rollbackInstantOpt = table.getActiveTimeline().getWriteTimeline() + .filter(instant -> instant.getTimestamp().equals(instantInfo.getCommitTime()) && instant.getAction().equals(instantInfo.getAction())).firstInstant(); + if (rollbackInstantOpt.isPresent()) { + instantsToRollback.add(rollbackInstantOpt.get()); + } else { + LOG.warn("Ignoring already rolledback instant " + instantInfo.toString()); + } + } + return instantsToRollback; + } + protected abstract HoodieRollbackMetadata rollbackInstant(HoodieInstant rollbackInstant); private HoodieRestoreMetadata finishRestore(Map> instantToMetadata, @@ -99,7 +129,7 @@ public abstract class BaseRestoreActionExecutor instantsToRollback = table.getActiveTimeline().getRollbackTimeline() .getReverseOrderedInstants() @@ -115,6 +145,7 @@ public abstract class BaseRestoreActionExecutor extends BaseActionExecutor> { + + + private static final Logger LOG = LogManager.getLogger(RestorePlanActionExecutor.class); + + public static final Integer RESTORE_PLAN_VERSION_1 = 1; + public static final Integer LATEST_RESTORE_PLAN_VERSION = RESTORE_PLAN_VERSION_1; + private final String restoreInstantTime; + + public RestorePlanActionExecutor(HoodieEngineContext context, + HoodieWriteConfig config, + HoodieTable table, + String instantTime, + String restoreInstantTime) { + super(context, config, table, instantTime); + this.restoreInstantTime = restoreInstantTime; + } + + @Override + public Option execute() { + final HoodieInstant restoreInstant = new HoodieInstant(HoodieInstant.State.REQUESTED, HoodieTimeline.RESTORE_ACTION, instantTime); + try { + // Get all the commits on the timeline after the provided commit time + List instantsToRollback = table.getActiveTimeline().getWriteTimeline() + .getReverseOrderedInstants() + .filter(instant -> HoodieActiveTimeline.GREATER_THAN.test(instant.getTimestamp(), restoreInstantTime)).map(entry -> new HoodieInstantInfo(entry.getTimestamp(), entry.getAction())) + .collect(Collectors.toList()); + + HoodieRestorePlan restorePlan = new HoodieRestorePlan(instantsToRollback, LATEST_RESTORE_PLAN_VERSION); + table.getActiveTimeline().saveToRestoreRequested(restoreInstant, TimelineMetadataUtils.serializeRestorePlan(restorePlan)); + table.getMetaClient().reloadActiveTimeline(); + LOG.info("Requesting Restore with instant time " + restoreInstant); + return Option.of(restorePlan); + } catch (IOException e) { + LOG.error("Got exception when saving restore requested file", e); + throw new HoodieIOException(e.getMessage(), e); + } + } +} diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/HoodieFlinkCopyOnWriteTable.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/HoodieFlinkCopyOnWriteTable.java index 9aceffe44..7e41ab150 100644 --- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/HoodieFlinkCopyOnWriteTable.java +++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/HoodieFlinkCopyOnWriteTable.java @@ -23,6 +23,7 @@ import org.apache.hudi.avro.model.HoodieCleanerPlan; import org.apache.hudi.avro.model.HoodieClusteringPlan; import org.apache.hudi.avro.model.HoodieCompactionPlan; import org.apache.hudi.avro.model.HoodieRestoreMetadata; +import org.apache.hudi.avro.model.HoodieRestorePlan; import org.apache.hudi.avro.model.HoodieRollbackMetadata; import org.apache.hudi.avro.model.HoodieRollbackPlan; import org.apache.hudi.avro.model.HoodieSavepointMetadata; @@ -342,6 +343,11 @@ public class HoodieFlinkCopyOnWriteTable throw new HoodieNotSupportedException("Savepoint is not supported yet"); } + @Override + public Option scheduleRestore(HoodieEngineContext context, String restoreInstantTime, String instantToRestore) { + throw new HoodieNotSupportedException("Restore is not supported yet"); + } + @Override public HoodieRestoreMetadata restore(HoodieEngineContext context, String restoreInstantTime, String instantToRestore) { throw new HoodieNotSupportedException("Savepoint and restore is not supported yet"); diff --git a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/HoodieJavaCopyOnWriteTable.java b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/HoodieJavaCopyOnWriteTable.java index 62a6980d5..f8590e9bd 100644 --- a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/HoodieJavaCopyOnWriteTable.java +++ b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/HoodieJavaCopyOnWriteTable.java @@ -23,6 +23,7 @@ import org.apache.hudi.avro.model.HoodieCleanerPlan; import org.apache.hudi.avro.model.HoodieClusteringPlan; import org.apache.hudi.avro.model.HoodieCompactionPlan; import org.apache.hudi.avro.model.HoodieRestoreMetadata; +import org.apache.hudi.avro.model.HoodieRestorePlan; import org.apache.hudi.avro.model.HoodieRollbackMetadata; import org.apache.hudi.avro.model.HoodieRollbackPlan; import org.apache.hudi.avro.model.HoodieSavepointMetadata; @@ -63,6 +64,7 @@ import org.apache.hudi.table.action.commit.JavaUpsertPreppedCommitActionExecutor import org.apache.hudi.table.action.restore.CopyOnWriteRestoreActionExecutor; import org.apache.hudi.table.action.rollback.BaseRollbackPlanActionExecutor; import org.apache.hudi.table.action.rollback.CopyOnWriteRollbackActionExecutor; +import org.apache.hudi.table.action.rollback.RestorePlanActionExecutor; import org.apache.hudi.table.action.savepoint.SavepointActionExecutor; import org.slf4j.Logger; @@ -247,6 +249,11 @@ public class HoodieJavaCopyOnWriteTable context, config, this, instantToSavepoint, user, comment).execute(); } + @Override + public Option scheduleRestore(HoodieEngineContext context, String restoreInstantTime, String instantToRestore) { + return new RestorePlanActionExecutor(context, config, this, restoreInstantTime, instantToRestore).execute(); + } + @Override public HoodieRestoreMetadata restore(HoodieEngineContext context, String restoreInstantTime, diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/HoodieSparkCopyOnWriteTable.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/HoodieSparkCopyOnWriteTable.java index 12c0483bf..6a3305575 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/HoodieSparkCopyOnWriteTable.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/HoodieSparkCopyOnWriteTable.java @@ -25,6 +25,7 @@ import org.apache.hudi.avro.model.HoodieCleanerPlan; import org.apache.hudi.avro.model.HoodieClusteringPlan; import org.apache.hudi.avro.model.HoodieCompactionPlan; import org.apache.hudi.avro.model.HoodieRestoreMetadata; +import org.apache.hudi.avro.model.HoodieRestorePlan; import org.apache.hudi.avro.model.HoodieRollbackMetadata; import org.apache.hudi.avro.model.HoodieRollbackPlan; import org.apache.hudi.avro.model.HoodieSavepointMetadata; @@ -74,6 +75,7 @@ import org.apache.hudi.table.action.commit.SparkUpsertPreppedCommitActionExecuto import org.apache.hudi.table.action.restore.CopyOnWriteRestoreActionExecutor; import org.apache.hudi.table.action.rollback.BaseRollbackPlanActionExecutor; import org.apache.hudi.table.action.rollback.CopyOnWriteRollbackActionExecutor; +import org.apache.hudi.table.action.rollback.RestorePlanActionExecutor; import org.apache.hudi.table.action.savepoint.SavepointActionExecutor; import org.apache.avro.Schema; @@ -258,6 +260,7 @@ public class HoodieSparkCopyOnWriteTable @Override public void rollbackBootstrap(HoodieEngineContext context, String instantTime) { + new RestorePlanActionExecutor<>(context, config, this, instantTime, HoodieTimeline.INIT_INSTANT_TS).execute(); new CopyOnWriteRestoreActionExecutor(context, config, this, instantTime, HoodieTimeline.INIT_INSTANT_TS).execute(); } @@ -353,4 +356,8 @@ public class HoodieSparkCopyOnWriteTable return new CopyOnWriteRestoreActionExecutor(context, config, this, restoreInstantTime, instantToRestore).execute(); } + @Override + public Option scheduleRestore(HoodieEngineContext context, String restoreInstantTime, String instantToRestore) { + return new RestorePlanActionExecutor(context, config, this, restoreInstantTime, instantToRestore).execute(); + } } diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/HoodieSparkMergeOnReadTable.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/HoodieSparkMergeOnReadTable.java index 75af5d0f6..334efa7fc 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/HoodieSparkMergeOnReadTable.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/HoodieSparkMergeOnReadTable.java @@ -52,6 +52,7 @@ import org.apache.hudi.table.action.deltacommit.SparkUpsertPreppedDeltaCommitAct import org.apache.hudi.table.action.restore.MergeOnReadRestoreActionExecutor; import org.apache.hudi.table.action.rollback.BaseRollbackPlanActionExecutor; import org.apache.hudi.table.action.rollback.MergeOnReadRollbackActionExecutor; +import org.apache.hudi.table.action.rollback.RestorePlanActionExecutor; import org.apache.spark.api.java.JavaRDD; @@ -150,6 +151,7 @@ public class HoodieSparkMergeOnReadTable extends @Override public void rollbackBootstrap(HoodieEngineContext context, String instantTime) { + new RestorePlanActionExecutor<>(context, config, this, instantTime, HoodieTimeline.INIT_INSTANT_TS).execute(); new MergeOnReadRestoreActionExecutor(context, config, this, instantTime, HoodieTimeline.INIT_INSTANT_TS).execute(); } diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieClientOnCopyOnWriteStorage.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieClientOnCopyOnWriteStorage.java index 7334f4cb6..a1d7569a1 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieClientOnCopyOnWriteStorage.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieClientOnCopyOnWriteStorage.java @@ -576,6 +576,7 @@ public class TestHoodieClientOnCopyOnWriteStorage extends HoodieClientTestBase { HoodieWriteConfig newConfig = getConfigBuilder().withProps(config.getProps()).withTimelineLayoutVersion( TimelineLayoutVersion.CURR_VERSION).build(); client = getHoodieWriteClient(newConfig); + client.restoreToInstant("004"); assertFalse(metaClient.reloadActiveTimeline().getRollbackTimeline().lastInstant().isPresent()); diff --git a/hudi-common/src/main/avro/HoodieRestorePlan.avsc b/hudi-common/src/main/avro/HoodieRestorePlan.avsc new file mode 100644 index 000000000..1ad9e6a4b --- /dev/null +++ b/hudi-common/src/main/avro/HoodieRestorePlan.avsc @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +{ + "namespace":"org.apache.hudi.avro.model", + "type":"record", + "name":"HoodieRestorePlan", + "fields":[ + { + "name": "instantsToRollback", + "default": [], + "type": { + "type": "array", + "default": null, + "items": "HoodieInstantInfo" + } + }, + { + "name":"version", + "type":["int", "null"], + "default": 1 + }] +} \ No newline at end of file diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieActiveTimeline.java b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieActiveTimeline.java index c7473bd7d..1fa3845bf 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieActiveTimeline.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieActiveTimeline.java @@ -70,7 +70,7 @@ public class HoodieActiveTimeline extends HoodieDefaultTimeline { SAVEPOINT_EXTENSION, INFLIGHT_SAVEPOINT_EXTENSION, CLEAN_EXTENSION, REQUESTED_CLEAN_EXTENSION, INFLIGHT_CLEAN_EXTENSION, INFLIGHT_COMPACTION_EXTENSION, REQUESTED_COMPACTION_EXTENSION, - INFLIGHT_RESTORE_EXTENSION, RESTORE_EXTENSION, + REQUESTED_RESTORE_EXTENSION, INFLIGHT_RESTORE_EXTENSION, RESTORE_EXTENSION, ROLLBACK_EXTENSION, REQUESTED_ROLLBACK_EXTENSION, INFLIGHT_ROLLBACK_EXTENSION, REQUESTED_REPLACE_COMMIT_EXTENSION, INFLIGHT_REPLACE_COMMIT_EXTENSION, REPLACE_COMMIT_EXTENSION)); private static final Logger LOG = LogManager.getLogger(HoodieActiveTimeline.class); @@ -289,6 +289,11 @@ public class HoodieActiveTimeline extends HoodieDefaultTimeline { return readDataFromPath(new Path(metaClient.getMetaPath(), instant.getFileName())); } + public Option readRestoreInfoAsBytes(HoodieInstant instant) { + // Rollback metadata are always stored only in timeline .hoodie + return readDataFromPath(new Path(metaClient.getMetaPath(), instant.getFileName())); + } + //----------------------------------------------------------------- // BEGIN - COMPACTION RELATED META-DATA MANAGEMENT. //----------------------------------------------------------------- @@ -429,6 +434,21 @@ public class HoodieActiveTimeline extends HoodieDefaultTimeline { return inflight; } + /** + * Transition Restore State from requested to inflight. + * + * @param requestedInstant requested instant + * @return commit instant + */ + public HoodieInstant transitionRestoreRequestedToInflight(HoodieInstant requestedInstant) { + ValidationUtils.checkArgument(requestedInstant.getAction().equals(HoodieTimeline.RESTORE_ACTION), "Transition to inflight requested for a restore instant with diff action " + + requestedInstant.toString()); + ValidationUtils.checkArgument(requestedInstant.isRequested(), "Transition to inflight requested for an instant not in requested state " + requestedInstant.toString()); + HoodieInstant inflight = new HoodieInstant(State.INFLIGHT, RESTORE_ACTION, requestedInstant.getTimestamp()); + transitionState(requestedInstant, inflight, Option.empty()); + return inflight; + } + /** * Transition replace requested file to replace inflight. * @@ -599,6 +619,13 @@ public class HoodieActiveTimeline extends HoodieDefaultTimeline { createFileInMetaPath(instant.getFileName(), content, false); } + public void saveToRestoreRequested(HoodieInstant instant, Option content) { + ValidationUtils.checkArgument(instant.getAction().equals(HoodieTimeline.RESTORE_ACTION)); + ValidationUtils.checkArgument(instant.getState().equals(State.REQUESTED)); + // Plan is stored in meta path + createFileInMetaPath(instant.getFileName(), content, false); + } + private void createFileInMetaPath(String filename, Option content, boolean allowOverwrite) { Path fullPath = new Path(metaClient.getMetaPath(), filename); if (allowOverwrite || metaClient.getTimelineLayoutVersion().isNullVersion()) { diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieInstant.java b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieInstant.java index a8df62c64..9cd088312 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieInstant.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieInstant.java @@ -166,6 +166,7 @@ public class HoodieInstant implements Serializable, Comparable { } } else if (HoodieTimeline.RESTORE_ACTION.equals(action)) { return isInflight() ? HoodieTimeline.makeInflightRestoreFileName(timestamp) + : isRequested() ? HoodieTimeline.makeRequestedRestoreFileName(timestamp) : HoodieTimeline.makeRestoreFileName(timestamp); } else if (HoodieTimeline.REPLACE_COMMIT_ACTION.equals(action)) { return isInflight() ? HoodieTimeline.makeInflightReplaceFileName(timestamp) diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieTimeline.java b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieTimeline.java index 6ea44a830..25b9c2ec6 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieTimeline.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieTimeline.java @@ -78,6 +78,7 @@ public interface HoodieTimeline extends Serializable { String REQUESTED_COMPACTION_SUFFIX = StringUtils.join(COMPACTION_ACTION, REQUESTED_EXTENSION); String REQUESTED_COMPACTION_EXTENSION = StringUtils.join(".", REQUESTED_COMPACTION_SUFFIX); String INFLIGHT_COMPACTION_EXTENSION = StringUtils.join(".", COMPACTION_ACTION, INFLIGHT_EXTENSION); + String REQUESTED_RESTORE_EXTENSION = "." + RESTORE_ACTION + REQUESTED_EXTENSION; String INFLIGHT_RESTORE_EXTENSION = "." + RESTORE_ACTION + INFLIGHT_EXTENSION; String RESTORE_EXTENSION = "." + RESTORE_ACTION; String INFLIGHT_REPLACE_COMMIT_EXTENSION = "." + REPLACE_COMMIT_ACTION + INFLIGHT_EXTENSION; @@ -386,6 +387,10 @@ public interface HoodieTimeline extends Serializable { return StringUtils.join(instant, HoodieTimeline.REQUESTED_ROLLBACK_EXTENSION); } + static String makeRequestedRestoreFileName(String instant) { + return StringUtils.join(instant, HoodieTimeline.REQUESTED_RESTORE_EXTENSION); + } + static String makeInflightRollbackFileName(String instant) { return StringUtils.join(instant, HoodieTimeline.INFLIGHT_ROLLBACK_EXTENSION); } diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/TimelineMetadataUtils.java b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/TimelineMetadataUtils.java index 723c594ff..70a23f1b4 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/TimelineMetadataUtils.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/TimelineMetadataUtils.java @@ -26,6 +26,7 @@ import org.apache.hudi.avro.model.HoodieInstantInfo; import org.apache.hudi.avro.model.HoodieReplaceCommitMetadata; import org.apache.hudi.avro.model.HoodieRequestedReplaceMetadata; import org.apache.hudi.avro.model.HoodieRestoreMetadata; +import org.apache.hudi.avro.model.HoodieRestorePlan; import org.apache.hudi.avro.model.HoodieRollbackMetadata; import org.apache.hudi.avro.model.HoodieRollbackPartitionMetadata; import org.apache.hudi.avro.model.HoodieRollbackPlan; @@ -112,6 +113,10 @@ public class TimelineMetadataUtils { return serializeAvroMetadata(rollbackPlan, HoodieRollbackPlan.class); } + public static Option serializeRestorePlan(HoodieRestorePlan restorePlan) throws IOException { + return serializeAvroMetadata(restorePlan, HoodieRestorePlan.class); + } + public static Option serializeCleanMetadata(HoodieCleanMetadata metadata) throws IOException { return serializeAvroMetadata(metadata, HoodieCleanMetadata.class); }