1
0

[HUDI-3721] Delete MDT if necessary when trigger rollback to savepoint (#5173)

Co-authored-by: yuezhang <yuezhang@freewheel.tv>
This commit is contained in:
YueZhang
2022-03-31 11:26:37 +08:00
committed by GitHub
parent 2c4554fada
commit 2dbb273d26
9 changed files with 106 additions and 24 deletions

View File

@@ -22,6 +22,8 @@ import org.apache.hadoop.fs.Path;
import org.apache.hudi.cli.HoodieCLI; import org.apache.hudi.cli.HoodieCLI;
import org.apache.hudi.cli.commands.TableCommand; import org.apache.hudi.cli.commands.TableCommand;
import org.apache.hudi.cli.testutils.AbstractShellIntegrationTest; import org.apache.hudi.cli.testutils.AbstractShellIntegrationTest;
import org.apache.hudi.client.common.HoodieSparkEngineContext;
import org.apache.hudi.common.config.HoodieMetadataConfig;
import org.apache.hudi.common.model.HoodieTableType; import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline; import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
import org.apache.hudi.common.table.timeline.HoodieInstant; import org.apache.hudi.common.table.timeline.HoodieInstant;
@@ -29,6 +31,9 @@ import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.table.timeline.versioning.TimelineLayoutVersion; import org.apache.hudi.common.table.timeline.versioning.TimelineLayoutVersion;
import org.apache.hudi.common.testutils.HoodieTestDataGenerator; import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.metadata.HoodieTableMetadata;
import org.apache.hudi.metadata.SparkHoodieBackedTableMetadataWriter;
import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Test;
import org.springframework.shell.core.CommandResult; import org.springframework.shell.core.CommandResult;
@@ -118,6 +123,54 @@ public class ITTestSavepointsCommand extends AbstractShellIntegrationTest {
new HoodieInstant(HoodieInstant.State.COMPLETED, "commit", "103"))); new HoodieInstant(HoodieInstant.State.COMPLETED, "commit", "103")));
} }
/**
* Test case of command 'savepoint rollback' with metadata table bootstrap.
*/
@Test
public void testRollbackToSavepointWithMetadataTableEnable() throws IOException {
// generate for savepoints
for (int i = 101; i < 105; i++) {
String instantTime = String.valueOf(i);
HoodieTestDataGenerator.createCommitFile(tablePath, instantTime, jsc.hadoopConfiguration());
}
// generate one savepoint at 102
String savepoint = "102";
HoodieTestDataGenerator.createSavepointFile(tablePath, savepoint, jsc.hadoopConfiguration());
// re-bootstrap metadata table
// delete first
String basePath = metaClient.getBasePath();
Path metadataTableBasePath = new Path(HoodieTableMetadata.getMetadataTableBasePath(basePath));
metaClient.getFs().delete(metadataTableBasePath, true);
// then bootstrap metadata table at instant 104
HoodieWriteConfig writeConfig = HoodieWriteConfig.newBuilder().withPath(HoodieCLI.basePath)
.withMetadataConfig(HoodieMetadataConfig.newBuilder().enable(true).build()).build();
SparkHoodieBackedTableMetadataWriter.create(HoodieCLI.conf, writeConfig, new HoodieSparkEngineContext(jsc));
assertTrue(HoodieCLI.fs.exists(metadataTableBasePath));
// roll back to savepoint
CommandResult cr = getShell().executeCommand(
String.format("savepoint rollback --savepoint %s --sparkMaster %s", savepoint, "local"));
assertAll("Command run failed",
() -> assertTrue(cr.isSuccess()),
() -> assertEquals(
String.format("Savepoint \"%s\" rolled back", savepoint), cr.getResult().toString()));
// there is 1 restore instant
HoodieActiveTimeline timeline = HoodieCLI.getTableMetaClient().getActiveTimeline();
assertEquals(1, timeline.getRestoreTimeline().countInstants());
// 103 and 104 instant had rollback
assertFalse(timeline.getCommitTimeline().containsInstant(
new HoodieInstant(HoodieInstant.State.COMPLETED, "commit", "103")));
assertFalse(timeline.getCommitTimeline().containsInstant(
new HoodieInstant(HoodieInstant.State.COMPLETED, "commit", "104")));
}
/** /**
* Test case of command 'savepoint delete'. * Test case of command 'savepoint delete'.
*/ */

View File

@@ -18,6 +18,7 @@
package org.apache.hudi.client; package org.apache.hudi.client;
import org.apache.hadoop.fs.Path;
import org.apache.hudi.async.AsyncArchiveService; import org.apache.hudi.async.AsyncArchiveService;
import org.apache.hudi.async.AsyncCleanerService; import org.apache.hudi.async.AsyncCleanerService;
import org.apache.hudi.avro.model.HoodieCleanMetadata; import org.apache.hudi.avro.model.HoodieCleanMetadata;
@@ -66,6 +67,7 @@ import org.apache.hudi.exception.HoodieRestoreException;
import org.apache.hudi.exception.HoodieRollbackException; import org.apache.hudi.exception.HoodieRollbackException;
import org.apache.hudi.exception.HoodieSavepointException; import org.apache.hudi.exception.HoodieSavepointException;
import org.apache.hudi.index.HoodieIndex; import org.apache.hudi.index.HoodieIndex;
import org.apache.hudi.metadata.HoodieTableMetadata;
import org.apache.hudi.metadata.HoodieTableMetadataWriter; import org.apache.hudi.metadata.HoodieTableMetadataWriter;
import org.apache.hudi.metrics.HoodieMetrics; import org.apache.hudi.metrics.HoodieMetrics;
import org.apache.hudi.table.BulkInsertPartitioner; import org.apache.hudi.table.BulkInsertPartitioner;
@@ -643,9 +645,30 @@ public abstract class BaseHoodieWriteClient<T extends HoodieRecordPayload, I, K,
* @return true if the savepoint was restored to successfully * @return true if the savepoint was restored to successfully
*/ */
public void restoreToSavepoint(String savepointTime) { public void restoreToSavepoint(String savepointTime) {
HoodieTable<T, I, K, O> table = initTable(WriteOperationType.UNKNOWN, Option.empty()); boolean initialMetadataTableIfNecessary = config.isMetadataTableEnabled();
if (initialMetadataTableIfNecessary) {
try {
// Delete metadata table directly when users trigger savepoint rollback if mdt existed and beforeTimelineStarts
String metadataTableBasePathStr = HoodieTableMetadata.getMetadataTableBasePath(config.getBasePath());
HoodieTableMetaClient mdtClient = HoodieTableMetaClient.builder().setConf(hadoopConf).setBasePath(metadataTableBasePathStr).build();
// Same as HoodieTableMetadataUtil#processRollbackMetadata
HoodieInstant syncedInstant = new HoodieInstant(false, HoodieTimeline.DELTA_COMMIT_ACTION, savepointTime);
// The instant required to sync rollback to MDT has been archived and the mdt syncing will be failed
// So that we need to delete the whole MDT here.
if (mdtClient.getCommitsTimeline().isBeforeTimelineStarts(syncedInstant.getTimestamp())) {
mdtClient.getFs().delete(new Path(metadataTableBasePathStr), true);
// rollbackToSavepoint action will try to bootstrap MDT at first but sync to MDT will fail at the current scenario.
// so that we need to disable metadata initialized here.
initialMetadataTableIfNecessary = false;
}
} catch (Exception e) {
// Metadata directory does not exist
}
}
HoodieTable<T, I, K, O> table = initTable(WriteOperationType.UNKNOWN, Option.empty(), initialMetadataTableIfNecessary);
SavepointHelpers.validateSavepointPresence(table, savepointTime); SavepointHelpers.validateSavepointPresence(table, savepointTime);
restoreToInstant(savepointTime); restoreToInstant(savepointTime, initialMetadataTableIfNecessary);
SavepointHelpers.validateSavepointRestore(table, savepointTime); SavepointHelpers.validateSavepointRestore(table, savepointTime);
} }
@@ -659,7 +682,7 @@ public abstract class BaseHoodieWriteClient<T extends HoodieRecordPayload, I, K,
/** /**
* @Deprecated * @Deprecated
* Rollback the inflight record changes with the given commit time. This * Rollback the inflight record changes with the given commit time. This
* will be removed in future in favor of {@link BaseHoodieWriteClient#restoreToInstant(String)} * will be removed in future in favor of {@link BaseHoodieWriteClient#restoreToInstant(String, boolean)
* *
* @param commitInstantTime Instant time of the commit * @param commitInstantTime Instant time of the commit
* @param pendingRollbackInfo pending rollback instant and plan if rollback failed from previous attempt. * @param pendingRollbackInfo pending rollback instant and plan if rollback failed from previous attempt.
@@ -717,12 +740,12 @@ public abstract class BaseHoodieWriteClient<T extends HoodieRecordPayload, I, K,
* *
* @param instantTime Instant time to which restoration is requested * @param instantTime Instant time to which restoration is requested
*/ */
public HoodieRestoreMetadata restoreToInstant(final String instantTime) throws HoodieRestoreException { public HoodieRestoreMetadata restoreToInstant(final String instantTime, boolean initialMetadataTableIfNecessary) throws HoodieRestoreException {
LOG.info("Begin restore to instant " + instantTime); LOG.info("Begin restore to instant " + instantTime);
final String restoreInstantTime = HoodieActiveTimeline.createNewInstantTime(); final String restoreInstantTime = HoodieActiveTimeline.createNewInstantTime();
Timer.Context timerContext = metrics.getRollbackCtx(); Timer.Context timerContext = metrics.getRollbackCtx();
try { try {
HoodieTable<T, I, K, O> table = initTable(WriteOperationType.UNKNOWN, Option.empty()); HoodieTable<T, I, K, O> table = initTable(WriteOperationType.UNKNOWN, Option.empty(), initialMetadataTableIfNecessary);
Option<HoodieRestorePlan> restorePlanOption = table.scheduleRestore(context, restoreInstantTime, instantTime); Option<HoodieRestorePlan> restorePlanOption = table.scheduleRestore(context, restoreInstantTime, instantTime);
if (restorePlanOption.isPresent()) { if (restorePlanOption.isPresent()) {
HoodieRestoreMetadata restoreMetadata = table.restore(context, restoreInstantTime, instantTime); HoodieRestoreMetadata restoreMetadata = table.restore(context, restoreInstantTime, instantTime);
@@ -1288,14 +1311,14 @@ public abstract class BaseHoodieWriteClient<T extends HoodieRecordPayload, I, K,
* @param instantTime current inflight instant time * @param instantTime current inflight instant time
* @return instantiated {@link HoodieTable} * @return instantiated {@link HoodieTable}
*/ */
protected abstract HoodieTable doInitTable(HoodieTableMetaClient metaClient, Option<String> instantTime); protected abstract HoodieTable doInitTable(HoodieTableMetaClient metaClient, Option<String> instantTime, boolean initialMetadataTableIfNecessary);
/** /**
* Instantiates and initializes instance of {@link HoodieTable}, performing crucial bootstrapping * Instantiates and initializes instance of {@link HoodieTable}, performing crucial bootstrapping
* operations such as: * operations such as:
* *
* NOTE: This method is engine-agnostic and SHOULD NOT be overloaded, please check on * NOTE: This method is engine-agnostic and SHOULD NOT be overloaded, please check on
* {@link #doInitTable(HoodieTableMetaClient, Option<String>)} instead * {@link #doInitTable(HoodieTableMetaClient, Option, boolean)} instead
* *
* <ul> * <ul>
* <li>Checking whether upgrade/downgrade is required</li> * <li>Checking whether upgrade/downgrade is required</li>
@@ -1303,7 +1326,7 @@ public abstract class BaseHoodieWriteClient<T extends HoodieRecordPayload, I, K,
* <li>Initializing metrics contexts</li> * <li>Initializing metrics contexts</li>
* </ul> * </ul>
*/ */
protected final HoodieTable initTable(WriteOperationType operationType, Option<String> instantTime) { protected final HoodieTable initTable(WriteOperationType operationType, Option<String> instantTime, boolean initialMetadataTableIfNecessary) {
HoodieTableMetaClient metaClient = createMetaClient(true); HoodieTableMetaClient metaClient = createMetaClient(true);
// Setup write schemas for deletes // Setup write schemas for deletes
if (operationType == WriteOperationType.DELETE) { if (operationType == WriteOperationType.DELETE) {
@@ -1315,7 +1338,7 @@ public abstract class BaseHoodieWriteClient<T extends HoodieRecordPayload, I, K,
this.txnManager.beginTransaction(); this.txnManager.beginTransaction();
try { try {
tryUpgrade(metaClient, instantTime); tryUpgrade(metaClient, instantTime);
table = doInitTable(metaClient, instantTime); table = doInitTable(metaClient, instantTime, initialMetadataTableIfNecessary);
} finally { } finally {
this.txnManager.endTransaction(); this.txnManager.endTransaction();
} }
@@ -1348,6 +1371,10 @@ public abstract class BaseHoodieWriteClient<T extends HoodieRecordPayload, I, K,
return table; return table;
} }
protected final HoodieTable initTable(WriteOperationType operationType, Option<String> instantTime) {
return initTable(operationType, instantTime, config.isMetadataTableEnabled());
}
/** /**
* Sets write schema from last instant since deletes may not have schema set in the config. * Sets write schema from last instant since deletes may not have schema set in the config.
*/ */

View File

@@ -398,7 +398,7 @@ public class HoodieFlinkWriteClient<T extends HoodieRecordPayload> extends
} }
@Override @Override
protected HoodieTable doInitTable(HoodieTableMetaClient metaClient, Option<String> instantTime) { protected HoodieTable doInitTable(HoodieTableMetaClient metaClient, Option<String> instantTime, boolean initialMetadataTableIfNecessary) {
// Create a Hoodie table which encapsulated the commits and files visible // Create a Hoodie table which encapsulated the commits and files visible
return getHoodieTable(); return getHoodieTable();
} }

View File

@@ -233,7 +233,7 @@ public class HoodieJavaWriteClient<T extends HoodieRecordPayload> extends
} }
@Override @Override
protected HoodieTable doInitTable(HoodieTableMetaClient metaClient, Option<String> instantTime) { protected HoodieTable doInitTable(HoodieTableMetaClient metaClient, Option<String> instantTime, boolean initialMetadataTableIfNecessary) {
// new JavaUpgradeDowngrade(metaClient, config, context).run(metaClient, HoodieTableVersion.current(), config, context, instantTime); // new JavaUpgradeDowngrade(metaClient, config, context).run(metaClient, HoodieTableVersion.current(), config, context, instantTime);
// Create a Hoodie table which encapsulated the commits and files visible // Create a Hoodie table which encapsulated the commits and files visible

View File

@@ -425,11 +425,13 @@ public class SparkRDDWriteClient<T extends HoodieRecordPayload> extends
} }
@Override @Override
protected HoodieTable doInitTable(HoodieTableMetaClient metaClient, Option<String> instantTime) { protected HoodieTable doInitTable(HoodieTableMetaClient metaClient, Option<String> instantTime, boolean initialMetadataTableIfNecessary) {
// Initialize Metadata Table to make sure it's bootstrapped _before_ the operation, if (initialMetadataTableIfNecessary) {
// if it didn't exist before // Initialize Metadata Table to make sure it's bootstrapped _before_ the operation,
// See https://issues.apache.org/jira/browse/HUDI-3343 for more details // if it didn't exist before
initializeMetadataTable(instantTime); // See https://issues.apache.org/jira/browse/HUDI-3343 for more details
initializeMetadataTable(instantTime);
}
// Create a Hoodie table which encapsulated the commits and files visible // Create a Hoodie table which encapsulated the commits and files visible
return HoodieSparkTable.create(config, (HoodieSparkEngineContext) context, metaClient, config.isMetadataTableEnabled()); return HoodieSparkTable.create(config, (HoodieSparkEngineContext) context, metaClient, config.isMetadataTableEnabled());

View File

@@ -291,7 +291,7 @@ public class TestTableSchemaEvolution extends HoodieClientTestBase {
} }
// Rollback to the original schema // Rollback to the original schema
client.restoreToInstant("004"); client.restoreToInstant("004", hoodieWriteConfig.isMetadataTableEnabled());
checkLatestDeltaCommit("004"); checkLatestDeltaCommit("004");
// Updates with original schema are now allowed // Updates with original schema are now allowed
@@ -432,7 +432,7 @@ public class TestTableSchemaEvolution extends HoodieClientTestBase {
// Revert to the older commit and ensure that the original schema can now // Revert to the older commit and ensure that the original schema can now
// be used for inserts and inserts. // be used for inserts and inserts.
client.restoreToInstant("003"); client.restoreToInstant("003", hoodieWriteConfig.isMetadataTableEnabled());
curTimeline = metaClient.reloadActiveTimeline().getCommitTimeline().filterCompletedInstants(); curTimeline = metaClient.reloadActiveTimeline().getCommitTimeline().filterCompletedInstants();
assertTrue(curTimeline.lastInstant().get().getTimestamp().equals("003")); assertTrue(curTimeline.lastInstant().get().getTimestamp().equals("003"));
checkReadRecords("000", numRecords); checkReadRecords("000", numRecords);

View File

@@ -1208,7 +1208,7 @@ public class TestHoodieBackedMetadata extends TestHoodieMetadataBase {
validateMetadata(client); validateMetadata(client);
// Restore // Restore
client.restoreToInstant("0000006"); client.restoreToInstant("0000006", writeConfig.isMetadataTableEnabled());
validateMetadata(client); validateMetadata(client);
} }
} }

View File

@@ -585,7 +585,7 @@ public class TestHoodieClientOnCopyOnWriteStorage extends HoodieClientTestBase {
client.savepoint("004", "user1","comment1"); client.savepoint("004", "user1","comment1");
client.restoreToInstant("004"); client.restoreToInstant("004", config.isMetadataTableEnabled());
assertFalse(metaClient.reloadActiveTimeline().getRollbackTimeline().lastInstant().isPresent()); assertFalse(metaClient.reloadActiveTimeline().getRollbackTimeline().lastInstant().isPresent());

View File

@@ -150,7 +150,7 @@ public class TestHoodieSparkMergeOnReadTableRollback extends SparkClientFunction
// NOTE: First writer will have Metadata table DISABLED // NOTE: First writer will have Metadata table DISABLED
HoodieWriteConfig.Builder cfgBuilder = HoodieWriteConfig.Builder cfgBuilder =
getConfigBuilder(false, rollbackUsingMarkers, HoodieIndex.IndexType.SIMPLE); getConfigBuilder(false, rollbackUsingMarkers, HoodieIndex.IndexType.SIMPLE);
addConfigsForPopulateMetaFields(cfgBuilder, true); addConfigsForPopulateMetaFields(cfgBuilder, true);
HoodieWriteConfig cfg = cfgBuilder.build(); HoodieWriteConfig cfg = cfgBuilder.build();
@@ -480,7 +480,7 @@ public class TestHoodieSparkMergeOnReadTableRollback extends SparkClientFunction
copyOfRecords.clear(); copyOfRecords.clear();
// Rollback latest commit first // Rollback latest commit first
client.restoreToInstant("000"); client.restoreToInstant("000", cfg.isMetadataTableEnabled());
metaClient = HoodieTableMetaClient.reload(metaClient); metaClient = HoodieTableMetaClient.reload(metaClient);
allFiles = listAllBaseFilesInPath(hoodieTable); allFiles = listAllBaseFilesInPath(hoodieTable);
@@ -530,7 +530,7 @@ public class TestHoodieSparkMergeOnReadTableRollback extends SparkClientFunction
if (!restoreAfterCompaction) { if (!restoreAfterCompaction) {
// restore to 002 and validate records. // restore to 002 and validate records.
client.restoreToInstant("002"); client.restoreToInstant("002", cfg.isMetadataTableEnabled());
validateRecords(cfg, metaClient, updates1); validateRecords(cfg, metaClient, updates1);
} else { } else {
// trigger compaction and then trigger couple of upserts followed by restore. // trigger compaction and then trigger couple of upserts followed by restore.
@@ -546,7 +546,7 @@ public class TestHoodieSparkMergeOnReadTableRollback extends SparkClientFunction
validateRecords(cfg, metaClient, updates5); validateRecords(cfg, metaClient, updates5);
// restore to 003 and validate records. // restore to 003 and validate records.
client.restoreToInstant("003"); client.restoreToInstant("003", cfg.isMetadataTableEnabled());
validateRecords(cfg, metaClient, updates2); validateRecords(cfg, metaClient, updates2);
} }
} }