1
0

[HUDI-3604] Adjust the order of timeline changes in rollbacks (#5114)

This commit is contained in:
Y Ethan Guo
2022-03-26 22:37:44 -07:00
committed by GitHub
parent 4d940bbf8a
commit 484b3407e0
5 changed files with 44 additions and 30 deletions

View File

@@ -32,11 +32,12 @@ import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
import org.apache.hudi.common.table.timeline.HoodieInstant; import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.TimelineMetadataUtils; import org.apache.hudi.common.table.timeline.TimelineMetadataUtils;
import org.apache.hudi.common.table.timeline.versioning.TimelineLayoutVersion; import org.apache.hudi.common.table.timeline.versioning.TimelineLayoutVersion;
import org.apache.hudi.common.testutils.HoodieTestTable; import org.apache.hudi.common.testutils.HoodieMetadataTestTable;
import org.apache.hudi.common.util.collection.Pair; import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.config.HoodieIndexConfig; import org.apache.hudi.config.HoodieIndexConfig;
import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.index.HoodieIndex; import org.apache.hudi.index.HoodieIndex;
import org.apache.hudi.metadata.SparkHoodieBackedTableMetadataWriter;
import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Tag; import org.junit.jupiter.api.Tag;
@@ -80,7 +81,12 @@ public class TestRollbacksCommand extends CLIFunctionalTestHarness {
put(DEFAULT_THIRD_PARTITION_PATH, "file-3"); put(DEFAULT_THIRD_PARTITION_PATH, "file-3");
} }
}; };
HoodieTestTable.of(metaClient)
HoodieWriteConfig config = HoodieWriteConfig.newBuilder().withPath(tablePath)
.withRollbackUsingMarkers(false)
.withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(HoodieIndex.IndexType.INMEMORY).build()).build();
HoodieMetadataTestTable.of(metaClient, SparkHoodieBackedTableMetadataWriter.create(
metaClient.getHadoopConf(), config, context))
.withPartitionMetaFiles(DEFAULT_PARTITION_PATHS) .withPartitionMetaFiles(DEFAULT_PARTITION_PATHS)
.addCommit("100") .addCommit("100")
.withBaseFilesInPartitions(partitionAndFileId) .withBaseFilesInPartitions(partitionAndFileId)
@@ -88,11 +94,8 @@ public class TestRollbacksCommand extends CLIFunctionalTestHarness {
.withBaseFilesInPartitions(partitionAndFileId) .withBaseFilesInPartitions(partitionAndFileId)
.addInflightCommit("102") .addInflightCommit("102")
.withBaseFilesInPartitions(partitionAndFileId); .withBaseFilesInPartitions(partitionAndFileId);
// generate two rollback
HoodieWriteConfig config = HoodieWriteConfig.newBuilder().withPath(tablePath)
.withRollbackUsingMarkers(false)
.withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(HoodieIndex.IndexType.INMEMORY).build()).build();
// generate two rollback
try (BaseHoodieWriteClient client = new SparkRDDWriteClient(context(), config)) { try (BaseHoodieWriteClient client = new SparkRDDWriteClient(context(), config)) {
// Rollback inflight commit3 and commit2 // Rollback inflight commit3 and commit2
client.rollback("102"); client.rollback("102");

View File

@@ -61,13 +61,15 @@ public abstract class BaseRollbackActionExecutor<T extends HoodieRecordPayload,
private final TransactionManager txnManager; private final TransactionManager txnManager;
private final boolean skipLocking; private final boolean skipLocking;
protected HoodieInstant resolvedInstant;
public BaseRollbackActionExecutor(HoodieEngineContext context, public BaseRollbackActionExecutor(HoodieEngineContext context,
HoodieWriteConfig config, HoodieWriteConfig config,
HoodieTable<T, I, K, O> table, HoodieTable<T, I, K, O> table,
String instantTime, String instantTime,
HoodieInstant instantToRollback, HoodieInstant instantToRollback,
boolean deleteInstants, boolean deleteInstants,
boolean skipLocking) { boolean skipLocking) {
this(context, config, table, instantTime, instantToRollback, deleteInstants, this(context, config, table, instantTime, instantToRollback, deleteInstants,
false, config.shouldRollbackUsingMarkers(), skipLocking); false, config.shouldRollbackUsingMarkers(), skipLocking);
} }
@@ -83,6 +85,7 @@ public abstract class BaseRollbackActionExecutor<T extends HoodieRecordPayload,
boolean skipLocking) { boolean skipLocking) {
super(context, config, table, instantTime); super(context, config, table, instantTime);
this.instantToRollback = instantToRollback; this.instantToRollback = instantToRollback;
this.resolvedInstant = instantToRollback;
this.deleteInstants = deleteInstants; this.deleteInstants = deleteInstants;
this.skipTimelinePublish = skipTimelinePublish; this.skipTimelinePublish = skipTimelinePublish;
this.useMarkerBasedStrategy = useMarkerBasedStrategy; this.useMarkerBasedStrategy = useMarkerBasedStrategy;
@@ -118,9 +121,7 @@ public abstract class BaseRollbackActionExecutor<T extends HoodieRecordPayload,
Option.of(rollbackTimer.endTimer()), Option.of(rollbackTimer.endTimer()),
Collections.singletonList(instantToRollback), Collections.singletonList(instantToRollback),
stats); stats);
if (!skipTimelinePublish) { finishRollback(inflightInstant, rollbackMetadata);
finishRollback(inflightInstant, rollbackMetadata);
}
// Finally, remove the markers post rollback. // Finally, remove the markers post rollback.
WriteMarkersFactory.get(config.getMarkersType(), table, instantToRollback.getTimestamp()) WriteMarkersFactory.get(config.getMarkersType(), table, instantToRollback.getTimestamp())
@@ -237,18 +238,32 @@ public abstract class BaseRollbackActionExecutor<T extends HoodieRecordPayload,
} }
protected void finishRollback(HoodieInstant inflightInstant, HoodieRollbackMetadata rollbackMetadata) throws HoodieIOException { protected void finishRollback(HoodieInstant inflightInstant, HoodieRollbackMetadata rollbackMetadata) throws HoodieIOException {
boolean enableLocking = (!skipLocking && !skipTimelinePublish);
try { try {
if (!skipLocking) { if (enableLocking) {
this.txnManager.beginTransaction(Option.empty(), Option.empty()); this.txnManager.beginTransaction(Option.empty(), Option.empty());
} }
writeTableMetadata(rollbackMetadata);
table.getActiveTimeline().transitionRollbackInflightToComplete(inflightInstant, // If publish the rollback to the timeline, we first write the rollback metadata
TimelineMetadataUtils.serializeRollbackMetadata(rollbackMetadata)); // to metadata table
LOG.info("Rollback of Commits " + rollbackMetadata.getCommitsRollback() + " is complete"); if (!skipTimelinePublish) {
writeTableMetadata(rollbackMetadata);
}
// Then we delete the inflight instant in the data table timeline if enabled
deleteInflightAndRequestedInstant(deleteInstants, table.getActiveTimeline(), resolvedInstant);
// If publish the rollback to the timeline, we finally transition the inflight rollback
// to complete in the data table timeline
if (!skipTimelinePublish) {
table.getActiveTimeline().transitionRollbackInflightToComplete(inflightInstant,
TimelineMetadataUtils.serializeRollbackMetadata(rollbackMetadata));
LOG.info("Rollback of Commits " + rollbackMetadata.getCommitsRollback() + " is complete");
}
} catch (IOException e) { } catch (IOException e) {
throw new HoodieIOException("Error executing rollback at instant " + instantTime, e); throw new HoodieIOException("Error executing rollback at instant " + instantTime, e);
} finally { } finally {
if (!skipLocking) { if (enableLocking) {
this.txnManager.endTransaction(Option.empty()); this.txnManager.endTransaction(Option.empty());
} }
} }

View File

@@ -67,7 +67,6 @@ public class CopyOnWriteRollbackActionExecutor<T extends HoodieRecordPayload, I,
List<HoodieRollbackStat> stats = new ArrayList<>(); List<HoodieRollbackStat> stats = new ArrayList<>();
HoodieActiveTimeline activeTimeline = table.getActiveTimeline(); HoodieActiveTimeline activeTimeline = table.getActiveTimeline();
HoodieInstant resolvedInstant = instantToRollback;
if (instantToRollback.isCompleted()) { if (instantToRollback.isCompleted()) {
LOG.info("Unpublishing instant " + instantToRollback); LOG.info("Unpublishing instant " + instantToRollback);
@@ -86,8 +85,6 @@ public class CopyOnWriteRollbackActionExecutor<T extends HoodieRecordPayload, I,
dropBootstrapIndexIfNeeded(instantToRollback); dropBootstrapIndexIfNeeded(instantToRollback);
// Delete Inflight instant if enabled
deleteInflightAndRequestedInstant(deleteInstants, activeTimeline, resolvedInstant);
LOG.info("Time(in ms) taken to finish rollback " + rollbackTimer.endTimer()); LOG.info("Time(in ms) taken to finish rollback " + rollbackTimer.endTimer());
return stats; return stats;
} }

View File

@@ -67,7 +67,6 @@ public class MergeOnReadRollbackActionExecutor<T extends HoodieRecordPayload, I,
LOG.info("Rolling back instant " + instantToRollback); LOG.info("Rolling back instant " + instantToRollback);
HoodieInstant resolvedInstant = instantToRollback;
// Atomically un-publish all non-inflight commits // Atomically un-publish all non-inflight commits
if (instantToRollback.isCompleted()) { if (instantToRollback.isCompleted()) {
LOG.info("Un-publishing instant " + instantToRollback + ", deleteInstants=" + deleteInstants); LOG.info("Un-publishing instant " + instantToRollback + ", deleteInstants=" + deleteInstants);
@@ -93,8 +92,6 @@ public class MergeOnReadRollbackActionExecutor<T extends HoodieRecordPayload, I,
dropBootstrapIndexIfNeeded(resolvedInstant); dropBootstrapIndexIfNeeded(resolvedInstant);
// Delete Inflight instants if enabled
deleteInflightAndRequestedInstant(deleteInstants, table.getActiveTimeline(), resolvedInstant);
LOG.info("Time(in ms) taken to finish rollback " + rollbackTimer.endTimer()); LOG.info("Time(in ms) taken to finish rollback " + rollbackTimer.endTimer());
return allRollbackStats; return allRollbackStats;
} }

View File

@@ -83,10 +83,10 @@ public class TestCopyOnWriteRollbackActionExecutor extends HoodieClientRollbackT
HoodieWriteConfig writeConfig = getConfigBuilder().withRollbackUsingMarkers(false).build(); HoodieWriteConfig writeConfig = getConfigBuilder().withRollbackUsingMarkers(false).build();
HoodieTable table = this.getHoodieTable(metaClient, writeConfig); HoodieTable table = this.getHoodieTable(metaClient, writeConfig);
HoodieInstant needRollBackInstant = new HoodieInstant(false, HoodieTimeline.COMMIT_ACTION, "002"); HoodieInstant needRollBackInstant = new HoodieInstant(false, HoodieTimeline.COMMIT_ACTION, "002");
String rollbackInstant = "003";
// execute CopyOnWriteRollbackActionExecutor with filelisting mode // execute CopyOnWriteRollbackActionExecutor with filelisting mode
BaseRollbackPlanActionExecutor copyOnWriteRollbackPlanActionExecutor = BaseRollbackPlanActionExecutor copyOnWriteRollbackPlanActionExecutor =
new BaseRollbackPlanActionExecutor(context, table.getConfig(), table, "003", needRollBackInstant, false, new BaseRollbackPlanActionExecutor(context, table.getConfig(), table, rollbackInstant, needRollBackInstant, false,
table.getConfig().shouldRollbackUsingMarkers()); table.getConfig().shouldRollbackUsingMarkers());
HoodieRollbackPlan rollbackPlan = (HoodieRollbackPlan) copyOnWriteRollbackPlanActionExecutor.execute().get(); HoodieRollbackPlan rollbackPlan = (HoodieRollbackPlan) copyOnWriteRollbackPlanActionExecutor.execute().get();
CopyOnWriteRollbackActionExecutor copyOnWriteRollbackActionExecutor = new CopyOnWriteRollbackActionExecutor(context, table.getConfig(), table, "003", needRollBackInstant, true, CopyOnWriteRollbackActionExecutor copyOnWriteRollbackActionExecutor = new CopyOnWriteRollbackActionExecutor(context, table.getConfig(), table, "003", needRollBackInstant, true,
@@ -125,7 +125,9 @@ public class TestCopyOnWriteRollbackActionExecutor extends HoodieClientRollbackT
assertTrue(testTable.commitExists("001")); assertTrue(testTable.commitExists("001"));
assertTrue(testTable.baseFileExists(p1, "001", "id11")); assertTrue(testTable.baseFileExists(p1, "001", "id11"));
assertTrue(testTable.baseFileExists(p2, "001", "id12")); assertTrue(testTable.baseFileExists(p2, "001", "id12"));
assertFalse(testTable.inflightCommitExists("002")); // Note that executeRollback() does not delete inflight instant files
// The deletion is done in finishRollback() called by runRollback()
assertTrue(testTable.inflightCommitExists("002"));
assertFalse(testTable.commitExists("002")); assertFalse(testTable.commitExists("002"));
assertFalse(testTable.baseFileExists(p1, "002", "id21")); assertFalse(testTable.baseFileExists(p1, "002", "id21"));
assertFalse(testTable.baseFileExists(p2, "002", "id22")); assertFalse(testTable.baseFileExists(p2, "002", "id22"));