1
0

[HUDI-2432] Adding restore.requested instant and restore plan for restore action (#4605)

- This adds a restore plan and serializes it to restore.requested meta file in timeline. This also means that we are introducing schedule and execution phases for restore which was not present before.
This commit is contained in:
Sivabalan Narayanan
2022-02-10 08:06:23 -05:00
committed by GitHub
parent 0ababcfaa7
commit e7ec3a82dc
16 changed files with 309 additions and 25 deletions

View File

@@ -23,6 +23,7 @@ import org.apache.hudi.avro.model.HoodieCleanerPlan;
import org.apache.hudi.avro.model.HoodieClusteringPlan;
import org.apache.hudi.avro.model.HoodieCompactionPlan;
import org.apache.hudi.avro.model.HoodieRestoreMetadata;
import org.apache.hudi.avro.model.HoodieRestorePlan;
import org.apache.hudi.avro.model.HoodieRollbackMetadata;
import org.apache.hudi.avro.model.HoodieRollbackPlan;
import org.apache.hudi.callback.HoodieWriteCommitCallback;
@@ -690,16 +691,21 @@ public abstract class BaseHoodieWriteClient<T extends HoodieRecordPayload, I, K,
Timer.Context timerContext = metrics.getRollbackCtx();
try {
HoodieTable<T, I, K, O> table = createTable(config, hadoopConf, config.isMetadataTableEnabled());
HoodieRestoreMetadata restoreMetadata = table.restore(context, restoreInstantTime, instantTime);
if (timerContext != null) {
final long durationInMs = metrics.getDurationInMs(timerContext.stop());
final long totalFilesDeleted = restoreMetadata.getHoodieRestoreMetadata().values().stream()
.flatMap(Collection::stream)
.mapToLong(HoodieRollbackMetadata::getTotalFilesDeleted)
.sum();
metrics.updateRollbackMetrics(durationInMs, totalFilesDeleted);
Option<HoodieRestorePlan> restorePlanOption = table.scheduleRestore(context, restoreInstantTime, instantTime);
if (restorePlanOption.isPresent()) {
HoodieRestoreMetadata restoreMetadata = table.restore(context, restoreInstantTime, instantTime);
if (timerContext != null) {
final long durationInMs = metrics.getDurationInMs(timerContext.stop());
final long totalFilesDeleted = restoreMetadata.getHoodieRestoreMetadata().values().stream()
.flatMap(Collection::stream)
.mapToLong(HoodieRollbackMetadata::getTotalFilesDeleted)
.sum();
metrics.updateRollbackMetrics(durationInMs, totalFilesDeleted);
}
return restoreMetadata;
} else {
throw new HoodieRestoreException("Failed to restore " + config.getBasePath() + " to commit " + instantTime);
}
return restoreMetadata;
} catch (Exception e) {
throw new HoodieRestoreException("Failed to restore to " + instantTime, e);
}

View File

@@ -23,4 +23,8 @@ public class HoodieRestoreException extends HoodieException {
public HoodieRestoreException(String msg, Throwable e) {
super(msg, e);
}
public HoodieRestoreException(String msg) {
super(msg);
}
}

View File

@@ -29,6 +29,7 @@ import org.apache.hudi.avro.model.HoodieCleanerPlan;
import org.apache.hudi.avro.model.HoodieClusteringPlan;
import org.apache.hudi.avro.model.HoodieCompactionPlan;
import org.apache.hudi.avro.model.HoodieRestoreMetadata;
import org.apache.hudi.avro.model.HoodieRestorePlan;
import org.apache.hudi.avro.model.HoodieRollbackMetadata;
import org.apache.hudi.avro.model.HoodieRollbackPlan;
import org.apache.hudi.avro.model.HoodieSavepointMetadata;
@@ -346,6 +347,13 @@ public abstract class HoodieTable<T extends HoodieRecordPayload, I, K, O> implem
return getActiveTimeline().getRollbackTimeline();
}
/**
* Get restore timeline.
*/
public HoodieTimeline getRestoreTimeline() {
return getActiveTimeline().getRestoreTimeline();
}
/**
* Get only the completed (no-inflights) savepoint timeline.
*/
@@ -497,6 +505,13 @@ public abstract class HoodieTable<T extends HoodieRecordPayload, I, K, O> implem
String restoreInstantTime,
String instantToRestore);
/**
* Schedules Restore for the table to the given instant.
*/
public abstract Option<HoodieRestorePlan> scheduleRestore(HoodieEngineContext context,
String restoreInstantTime,
String instantToRestore);
/**
* Rollback failed compactions. Inflight rollbacks for compactions revert the .inflight file
* to the .requested file.

View File

@@ -18,7 +18,9 @@
package org.apache.hudi.table.action.restore;
import org.apache.hudi.avro.model.HoodieInstantInfo;
import org.apache.hudi.avro.model.HoodieRestoreMetadata;
import org.apache.hudi.avro.model.HoodieRestorePlan;
import org.apache.hudi.avro.model.HoodieRollbackMetadata;
import org.apache.hudi.client.transaction.TransactionManager;
import org.apache.hudi.common.engine.HoodieEngineContext;
@@ -29,14 +31,18 @@ import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.table.timeline.TimelineMetadataUtils;
import org.apache.hudi.common.util.HoodieTimer;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.ValidationUtils;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieRestoreException;
import org.apache.hudi.exception.HoodieRollbackException;
import org.apache.hudi.table.HoodieTable;
import org.apache.hudi.table.action.BaseActionExecutor;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
@@ -65,29 +71,53 @@ public abstract class BaseRestoreActionExecutor<T extends HoodieRecordPayload, I
HoodieTimer restoreTimer = new HoodieTimer();
restoreTimer.startTimer();
// Get all the commits on the timeline after the provided commit time
List<HoodieInstant> instantsToRollback = table.getActiveTimeline().getWriteTimeline()
.getReverseOrderedInstants()
.filter(instant -> HoodieActiveTimeline.GREATER_THAN.test(instant.getTimestamp(), restoreInstantTime))
.collect(Collectors.toList());
Map<String, List<HoodieRollbackMetadata>> instantToMetadata = new HashMap<>();
table.getActiveTimeline().createNewInstant(new HoodieInstant(true, HoodieTimeline.RESTORE_ACTION, instantTime));
instantsToRollback.forEach(instant -> {
instantToMetadata.put(instant.getTimestamp(), Collections.singletonList(rollbackInstant(instant)));
LOG.info("Deleted instant " + instant);
});
Option<HoodieInstant> restoreInstant = table.getRestoreTimeline()
.filterInflightsAndRequested()
.filter(instant -> instant.getTimestamp().equals(instantTime))
.firstInstant();
if (!restoreInstant.isPresent()) {
throw new HoodieRollbackException("No pending restore instants found to execute restore");
}
try {
List<HoodieInstant> instantsToRollback = getInstantsToRollback(restoreInstant.get());
ValidationUtils.checkArgument(restoreInstant.get().getState().equals(HoodieInstant.State.REQUESTED)
|| restoreInstant.get().getState().equals(HoodieInstant.State.INFLIGHT));
Map<String, List<HoodieRollbackMetadata>> instantToMetadata = new HashMap<>();
if (restoreInstant.get().isRequested()) {
table.getActiveTimeline().transitionRestoreRequestedToInflight(restoreInstant.get());
}
instantsToRollback.forEach(instant -> {
instantToMetadata.put(instant.getTimestamp(), Collections.singletonList(rollbackInstant(instant)));
LOG.info("Deleted instant " + instant);
});
return finishRestore(instantToMetadata,
instantsToRollback,
restoreTimer.endTimer()
);
} catch (IOException io) {
throw new HoodieRollbackException("unable to rollback instants " + instantsToRollback, io);
throw new HoodieRestoreException("unable to Restore instant " + restoreInstant.get(), io);
}
}
private List<HoodieInstant> getInstantsToRollback(HoodieInstant restoreInstant) throws IOException {
List<HoodieInstant> instantsToRollback = new ArrayList<>();
HoodieRestorePlan restorePlan = RestoreUtils.getRestorePlan(table.getMetaClient(), restoreInstant);
for (HoodieInstantInfo instantInfo : restorePlan.getInstantsToRollback()) {
// If restore crashed mid-way, there are chances that some commits are already rolled back,
// but some are not. so, we can ignore those commits which are fully rolledback in previous attempt if any.
Option<HoodieInstant> rollbackInstantOpt = table.getActiveTimeline().getWriteTimeline()
.filter(instant -> instant.getTimestamp().equals(instantInfo.getCommitTime()) && instant.getAction().equals(instantInfo.getAction())).firstInstant();
if (rollbackInstantOpt.isPresent()) {
instantsToRollback.add(rollbackInstantOpt.get());
} else {
LOG.warn("Ignoring already rolledback instant " + instantInfo.toString());
}
}
return instantsToRollback;
}
protected abstract HoodieRollbackMetadata rollbackInstant(HoodieInstant rollbackInstant);
private HoodieRestoreMetadata finishRestore(Map<String, List<HoodieRollbackMetadata>> instantToMetadata,
@@ -99,7 +129,7 @@ public abstract class BaseRestoreActionExecutor<T extends HoodieRecordPayload, I
writeToMetadata(restoreMetadata);
table.getActiveTimeline().saveAsComplete(new HoodieInstant(true, HoodieTimeline.RESTORE_ACTION, instantTime),
TimelineMetadataUtils.serializeRestoreMetadata(restoreMetadata));
// get all rollbacks instants after restore instant time and delete them.
// get all pending rollbacks instants after restore instant time and delete them.
// if not, rollbacks will be considered not completed and might hinder metadata table compaction.
List<HoodieInstant> instantsToRollback = table.getActiveTimeline().getRollbackTimeline()
.getReverseOrderedInstants()
@@ -115,6 +145,7 @@ public abstract class BaseRestoreActionExecutor<T extends HoodieRecordPayload, I
/**
* Update metadata table if available. Any update to metadata table happens within data table lock.
*
* @param restoreMetadata instance of {@link HoodieRestoreMetadata} to be applied to metadata.
*/
private void writeToMetadata(HoodieRestoreMetadata restoreMetadata) {

View File

@@ -0,0 +1,46 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.hudi.table.action.restore;
import org.apache.hudi.avro.model.HoodieRestorePlan;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.table.timeline.TimelineMetadataUtils;
import java.io.IOException;
public class RestoreUtils {
/**
* Get Latest version of Restore plan corresponding to a restore instant.
*
* @param metaClient Hoodie Table Meta Client
* @param restoreInstant Instant referring to restore action
* @return Rollback plan corresponding to rollback instant
* @throws IOException
*/
public static HoodieRestorePlan getRestorePlan(HoodieTableMetaClient metaClient, HoodieInstant restoreInstant)
throws IOException {
final HoodieInstant requested = HoodieTimeline.getRollbackRequestedInstant(restoreInstant);
return TimelineMetadataUtils.deserializeAvroMetadata(
metaClient.getActiveTimeline().readRestoreInfoAsBytes(requested).get(), HoodieRestorePlan.class);
}
}

View File

@@ -0,0 +1,84 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.hudi.table.action.rollback;
import org.apache.hudi.avro.model.HoodieInstantInfo;
import org.apache.hudi.avro.model.HoodieRestorePlan;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.table.timeline.TimelineMetadataUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.table.HoodieTable;
import org.apache.hudi.table.action.BaseActionExecutor;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import java.io.IOException;
import java.util.List;
import java.util.stream.Collectors;
/**
* Plans the restore action and add a restore.requested meta file to timeline.
*/
public class RestorePlanActionExecutor<T extends HoodieRecordPayload, I, K, O> extends BaseActionExecutor<T, I, K, O, Option<HoodieRestorePlan>> {
private static final Logger LOG = LogManager.getLogger(RestorePlanActionExecutor.class);
public static final Integer RESTORE_PLAN_VERSION_1 = 1;
public static final Integer LATEST_RESTORE_PLAN_VERSION = RESTORE_PLAN_VERSION_1;
private final String restoreInstantTime;
public RestorePlanActionExecutor(HoodieEngineContext context,
HoodieWriteConfig config,
HoodieTable<T, I, K, O> table,
String instantTime,
String restoreInstantTime) {
super(context, config, table, instantTime);
this.restoreInstantTime = restoreInstantTime;
}
@Override
public Option<HoodieRestorePlan> execute() {
final HoodieInstant restoreInstant = new HoodieInstant(HoodieInstant.State.REQUESTED, HoodieTimeline.RESTORE_ACTION, instantTime);
try {
// Get all the commits on the timeline after the provided commit time
List<HoodieInstantInfo> instantsToRollback = table.getActiveTimeline().getWriteTimeline()
.getReverseOrderedInstants()
.filter(instant -> HoodieActiveTimeline.GREATER_THAN.test(instant.getTimestamp(), restoreInstantTime)).map(entry -> new HoodieInstantInfo(entry.getTimestamp(), entry.getAction()))
.collect(Collectors.toList());
HoodieRestorePlan restorePlan = new HoodieRestorePlan(instantsToRollback, LATEST_RESTORE_PLAN_VERSION);
table.getActiveTimeline().saveToRestoreRequested(restoreInstant, TimelineMetadataUtils.serializeRestorePlan(restorePlan));
table.getMetaClient().reloadActiveTimeline();
LOG.info("Requesting Restore with instant time " + restoreInstant);
return Option.of(restorePlan);
} catch (IOException e) {
LOG.error("Got exception when saving restore requested file", e);
throw new HoodieIOException(e.getMessage(), e);
}
}
}