[HUDI-2753] Ensure list based rollback strategy is used for restore (#3983)
This commit is contained in:
committed by
GitHub
parent
cbcbec4d38
commit
04eb5fdc65
@@ -599,7 +599,7 @@ public abstract class AbstractHoodieWriteClient<T extends HoodieRecordPayload, I
|
|||||||
if (commitInstantOpt.isPresent()) {
|
if (commitInstantOpt.isPresent()) {
|
||||||
LOG.info("Scheduling Rollback at instant time :" + rollbackInstantTime);
|
LOG.info("Scheduling Rollback at instant time :" + rollbackInstantTime);
|
||||||
Option<HoodieRollbackPlan> rollbackPlanOption = table.scheduleRollback(context, rollbackInstantTime,
|
Option<HoodieRollbackPlan> rollbackPlanOption = table.scheduleRollback(context, rollbackInstantTime,
|
||||||
commitInstantOpt.get(), false);
|
commitInstantOpt.get(), false, config.shouldRollbackUsingMarkers());
|
||||||
if (rollbackPlanOption.isPresent()) {
|
if (rollbackPlanOption.isPresent()) {
|
||||||
// execute rollback
|
// execute rollback
|
||||||
HoodieRollbackMetadata rollbackMetadata = table.rollback(context, rollbackInstantTime, commitInstantOpt.get(), true,
|
HoodieRollbackMetadata rollbackMetadata = table.rollback(context, rollbackInstantTime, commitInstantOpt.get(), true,
|
||||||
@@ -1024,7 +1024,7 @@ public abstract class AbstractHoodieWriteClient<T extends HoodieRecordPayload, I
|
|||||||
|
|
||||||
protected void rollbackInflightClustering(HoodieInstant inflightInstant, HoodieTable<T, I, K, O> table) {
|
protected void rollbackInflightClustering(HoodieInstant inflightInstant, HoodieTable<T, I, K, O> table) {
|
||||||
String commitTime = HoodieActiveTimeline.createNewInstantTime();
|
String commitTime = HoodieActiveTimeline.createNewInstantTime();
|
||||||
table.scheduleRollback(context, commitTime, inflightInstant, false);
|
table.scheduleRollback(context, commitTime, inflightInstant, false, config.shouldRollbackUsingMarkers());
|
||||||
table.rollback(context, commitTime, inflightInstant, false, false);
|
table.rollback(context, commitTime, inflightInstant, false, false);
|
||||||
table.getActiveTimeline().revertReplaceCommitInflightToRequested(inflightInstant);
|
table.getActiveTimeline().revertReplaceCommitInflightToRequested(inflightInstant);
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -442,12 +442,13 @@ public abstract class HoodieTable<T extends HoodieRecordPayload, I, K, O> implem
|
|||||||
* @param context HoodieEngineContext
|
* @param context HoodieEngineContext
|
||||||
* @param instantTime Instant Time for scheduling rollback
|
* @param instantTime Instant Time for scheduling rollback
|
||||||
* @param instantToRollback instant to be rolled back
|
* @param instantToRollback instant to be rolled back
|
||||||
|
* @param shouldRollbackUsingMarkers uses marker based rollback strategy when set to true. uses list based rollback when false.
|
||||||
* @return HoodieRollbackPlan containing info on rollback.
|
* @return HoodieRollbackPlan containing info on rollback.
|
||||||
*/
|
*/
|
||||||
public abstract Option<HoodieRollbackPlan> scheduleRollback(HoodieEngineContext context,
|
public abstract Option<HoodieRollbackPlan> scheduleRollback(HoodieEngineContext context,
|
||||||
String instantTime,
|
String instantTime,
|
||||||
HoodieInstant instantToRollback,
|
HoodieInstant instantToRollback,
|
||||||
boolean skipTimelinePublish);
|
boolean skipTimelinePublish, boolean shouldRollbackUsingMarkers);
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Rollback the (inflight/committed) record changes with the given commit time.
|
* Rollback the (inflight/committed) record changes with the given commit time.
|
||||||
@@ -490,7 +491,7 @@ public abstract class HoodieTable<T extends HoodieRecordPayload, I, K, O> implem
|
|||||||
*/
|
*/
|
||||||
public void rollbackInflightCompaction(HoodieInstant inflightInstant) {
|
public void rollbackInflightCompaction(HoodieInstant inflightInstant) {
|
||||||
String commitTime = HoodieActiveTimeline.createNewInstantTime();
|
String commitTime = HoodieActiveTimeline.createNewInstantTime();
|
||||||
scheduleRollback(context, commitTime, inflightInstant, false);
|
scheduleRollback(context, commitTime, inflightInstant, false, config.shouldRollbackUsingMarkers());
|
||||||
rollback(context, commitTime, inflightInstant, false, false);
|
rollback(context, commitTime, inflightInstant, false, false);
|
||||||
getActiveTimeline().revertCompactionInflightToRequested(inflightInstant);
|
getActiveTimeline().revertCompactionInflightToRequested(inflightInstant);
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -48,7 +48,7 @@ public class CopyOnWriteRestoreActionExecutor<T extends HoodieRecordPayload, I,
|
|||||||
}
|
}
|
||||||
table.getMetaClient().reloadActiveTimeline();
|
table.getMetaClient().reloadActiveTimeline();
|
||||||
String newInstantTime = HoodieActiveTimeline.createNewInstantTime();
|
String newInstantTime = HoodieActiveTimeline.createNewInstantTime();
|
||||||
table.scheduleRollback(context, newInstantTime, instantToRollback, false);
|
table.scheduleRollback(context, newInstantTime, instantToRollback, false, false);
|
||||||
table.getMetaClient().reloadActiveTimeline();
|
table.getMetaClient().reloadActiveTimeline();
|
||||||
CopyOnWriteRollbackActionExecutor rollbackActionExecutor = new CopyOnWriteRollbackActionExecutor(
|
CopyOnWriteRollbackActionExecutor rollbackActionExecutor = new CopyOnWriteRollbackActionExecutor(
|
||||||
context,
|
context,
|
||||||
|
|||||||
@@ -52,7 +52,7 @@ public class MergeOnReadRestoreActionExecutor<T extends HoodieRecordPayload, I,
|
|||||||
}
|
}
|
||||||
table.getMetaClient().reloadActiveTimeline();
|
table.getMetaClient().reloadActiveTimeline();
|
||||||
String instantTime = HoodieActiveTimeline.createNewInstantTime();
|
String instantTime = HoodieActiveTimeline.createNewInstantTime();
|
||||||
table.scheduleRollback(context, instantTime, instantToRollback, false);
|
table.scheduleRollback(context, instantTime, instantToRollback, false, false);
|
||||||
table.getMetaClient().reloadActiveTimeline();
|
table.getMetaClient().reloadActiveTimeline();
|
||||||
MergeOnReadRollbackActionExecutor rollbackActionExecutor = new MergeOnReadRollbackActionExecutor(
|
MergeOnReadRollbackActionExecutor rollbackActionExecutor = new MergeOnReadRollbackActionExecutor(
|
||||||
context,
|
context,
|
||||||
|
|||||||
@@ -50,6 +50,7 @@ public class BaseRollbackPlanActionExecutor<T extends HoodieRecordPayload, I, K,
|
|||||||
|
|
||||||
protected final HoodieInstant instantToRollback;
|
protected final HoodieInstant instantToRollback;
|
||||||
private final boolean skipTimelinePublish;
|
private final boolean skipTimelinePublish;
|
||||||
|
private final boolean shouldRollbackUsingMarkers;
|
||||||
|
|
||||||
public static final Integer ROLLBACK_PLAN_VERSION_1 = 1;
|
public static final Integer ROLLBACK_PLAN_VERSION_1 = 1;
|
||||||
public static final Integer LATEST_ROLLBACK_PLAN_VERSION = ROLLBACK_PLAN_VERSION_1;
|
public static final Integer LATEST_ROLLBACK_PLAN_VERSION = ROLLBACK_PLAN_VERSION_1;
|
||||||
@@ -59,10 +60,12 @@ public class BaseRollbackPlanActionExecutor<T extends HoodieRecordPayload, I, K,
|
|||||||
HoodieTable<T, I, K, O> table,
|
HoodieTable<T, I, K, O> table,
|
||||||
String instantTime,
|
String instantTime,
|
||||||
HoodieInstant instantToRollback,
|
HoodieInstant instantToRollback,
|
||||||
boolean skipTimelinePublish) {
|
boolean skipTimelinePublish,
|
||||||
|
boolean shouldRollbackUsingMarkers) {
|
||||||
super(context, config, table, instantTime);
|
super(context, config, table, instantTime);
|
||||||
this.instantToRollback = instantToRollback;
|
this.instantToRollback = instantToRollback;
|
||||||
this.skipTimelinePublish = skipTimelinePublish;
|
this.skipTimelinePublish = skipTimelinePublish;
|
||||||
|
this.shouldRollbackUsingMarkers = shouldRollbackUsingMarkers;
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@@ -84,7 +87,7 @@ public class BaseRollbackPlanActionExecutor<T extends HoodieRecordPayload, I, K,
|
|||||||
* @return
|
* @return
|
||||||
*/
|
*/
|
||||||
private BaseRollbackPlanActionExecutor.RollbackStrategy getRollbackStrategy() {
|
private BaseRollbackPlanActionExecutor.RollbackStrategy getRollbackStrategy() {
|
||||||
if (config.shouldRollbackUsingMarkers()) {
|
if (shouldRollbackUsingMarkers) {
|
||||||
return new MarkerBasedRollbackStrategy(table, context, config, instantTime);
|
return new MarkerBasedRollbackStrategy(table, context, config, instantTime);
|
||||||
} else {
|
} else {
|
||||||
return new ListingBasedRollbackStrategy(table, context, config, instantTime);
|
return new ListingBasedRollbackStrategy(table, context, config, instantTime);
|
||||||
|
|||||||
@@ -316,8 +316,9 @@ public class HoodieFlinkCopyOnWriteTable<T extends HoodieRecordPayload>
|
|||||||
|
|
||||||
@Override
|
@Override
|
||||||
public Option<HoodieRollbackPlan> scheduleRollback(HoodieEngineContext context, String instantTime, HoodieInstant instantToRollback,
|
public Option<HoodieRollbackPlan> scheduleRollback(HoodieEngineContext context, String instantTime, HoodieInstant instantToRollback,
|
||||||
boolean skipTimelinePublish) {
|
boolean skipTimelinePublish, boolean shouldRollbackUsingMarkers) {
|
||||||
return new BaseRollbackPlanActionExecutor(context, config, this, instantTime, instantToRollback, skipTimelinePublish).execute();
|
return new BaseRollbackPlanActionExecutor(context, config, this, instantTime, instantToRollback, skipTimelinePublish,
|
||||||
|
shouldRollbackUsingMarkers).execute();
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
|||||||
@@ -120,8 +120,9 @@ public class HoodieFlinkMergeOnReadTable<T extends HoodieRecordPayload>
|
|||||||
|
|
||||||
@Override
|
@Override
|
||||||
public Option<HoodieRollbackPlan> scheduleRollback(HoodieEngineContext context, String instantTime, HoodieInstant instantToRollback,
|
public Option<HoodieRollbackPlan> scheduleRollback(HoodieEngineContext context, String instantTime, HoodieInstant instantToRollback,
|
||||||
boolean skipTimelinePublish) {
|
boolean skipTimelinePublish, boolean shouldRollbackUsingMarkers) {
|
||||||
return new BaseRollbackPlanActionExecutor(context, config, this, instantTime, instantToRollback, skipTimelinePublish).execute();
|
return new BaseRollbackPlanActionExecutor(context, config, this, instantTime, instantToRollback, skipTimelinePublish,
|
||||||
|
shouldRollbackUsingMarkers).execute();
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
|||||||
@@ -193,8 +193,9 @@ public class HoodieJavaCopyOnWriteTable<T extends HoodieRecordPayload> extends H
|
|||||||
|
|
||||||
@Override
|
@Override
|
||||||
public Option<HoodieRollbackPlan> scheduleRollback(HoodieEngineContext context, String instantTime, HoodieInstant instantToRollback,
|
public Option<HoodieRollbackPlan> scheduleRollback(HoodieEngineContext context, String instantTime, HoodieInstant instantToRollback,
|
||||||
boolean skipTimelinePublish) {
|
boolean skipTimelinePublish, boolean shouldRollbackUsingMarkers) {
|
||||||
return new BaseRollbackPlanActionExecutor(context, config, this, instantTime, instantToRollback, skipTimelinePublish).execute();
|
return new BaseRollbackPlanActionExecutor(context, config, this, instantTime, instantToRollback, skipTimelinePublish,
|
||||||
|
shouldRollbackUsingMarkers).execute();
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
|||||||
@@ -230,8 +230,9 @@ public class HoodieSparkCopyOnWriteTable<T extends HoodieRecordPayload>
|
|||||||
@Override
|
@Override
|
||||||
public Option<HoodieRollbackPlan> scheduleRollback(HoodieEngineContext context,
|
public Option<HoodieRollbackPlan> scheduleRollback(HoodieEngineContext context,
|
||||||
String instantTime,
|
String instantTime,
|
||||||
HoodieInstant instantToRollback, boolean skipTimelinePublish) {
|
HoodieInstant instantToRollback, boolean skipTimelinePublish, boolean shouldRollbackUsingMarkers) {
|
||||||
return new BaseRollbackPlanActionExecutor<>(context, config, this, instantTime, instantToRollback, skipTimelinePublish).execute();
|
return new BaseRollbackPlanActionExecutor<>(context, config, this, instantTime, instantToRollback, skipTimelinePublish,
|
||||||
|
shouldRollbackUsingMarkers).execute();
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
|||||||
@@ -156,8 +156,9 @@ public class HoodieSparkMergeOnReadTable<T extends HoodieRecordPayload> extends
|
|||||||
@Override
|
@Override
|
||||||
public Option<HoodieRollbackPlan> scheduleRollback(HoodieEngineContext context,
|
public Option<HoodieRollbackPlan> scheduleRollback(HoodieEngineContext context,
|
||||||
String instantTime,
|
String instantTime,
|
||||||
HoodieInstant instantToRollback, boolean skipTimelinePublish) {
|
HoodieInstant instantToRollback, boolean skipTimelinePublish, boolean shouldRollbackUsingMarkers) {
|
||||||
return new BaseRollbackPlanActionExecutor<>(context, config, this, instantTime, instantToRollback, skipTimelinePublish).execute();
|
return new BaseRollbackPlanActionExecutor<>(context, config, this, instantTime, instantToRollback, skipTimelinePublish,
|
||||||
|
shouldRollbackUsingMarkers).execute();
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
|||||||
@@ -495,7 +495,7 @@ public class TestHoodieClientOnCopyOnWriteStorage extends HoodieClientTestBase {
|
|||||||
@ParameterizedTest
|
@ParameterizedTest
|
||||||
@MethodSource("populateMetaFieldsParams")
|
@MethodSource("populateMetaFieldsParams")
|
||||||
public void testUpserts(boolean populateMetaFields) throws Exception {
|
public void testUpserts(boolean populateMetaFields) throws Exception {
|
||||||
HoodieWriteConfig.Builder cfgBuilder = getConfigBuilder();
|
HoodieWriteConfig.Builder cfgBuilder = getConfigBuilder().withRollbackUsingMarkers(true);
|
||||||
addConfigsForPopulateMetaFields(cfgBuilder, populateMetaFields);
|
addConfigsForPopulateMetaFields(cfgBuilder, populateMetaFields);
|
||||||
testUpsertsInternal(cfgBuilder.build(), SparkRDDWriteClient::upsert, false);
|
testUpsertsInternal(cfgBuilder.build(), SparkRDDWriteClient::upsert, false);
|
||||||
}
|
}
|
||||||
@@ -506,7 +506,7 @@ public class TestHoodieClientOnCopyOnWriteStorage extends HoodieClientTestBase {
|
|||||||
@ParameterizedTest
|
@ParameterizedTest
|
||||||
@MethodSource("populateMetaFieldsParams")
|
@MethodSource("populateMetaFieldsParams")
|
||||||
public void testUpsertsPrepped(boolean populateMetaFields) throws Exception {
|
public void testUpsertsPrepped(boolean populateMetaFields) throws Exception {
|
||||||
HoodieWriteConfig.Builder cfgBuilder = getConfigBuilder();
|
HoodieWriteConfig.Builder cfgBuilder = getConfigBuilder().withRollbackUsingMarkers(true);
|
||||||
addConfigsForPopulateMetaFields(cfgBuilder, populateMetaFields);
|
addConfigsForPopulateMetaFields(cfgBuilder, populateMetaFields);
|
||||||
testUpsertsInternal(cfgBuilder.build(), SparkRDDWriteClient::upsertPreppedRecords, true);
|
testUpsertsInternal(cfgBuilder.build(), SparkRDDWriteClient::upsertPreppedRecords, true);
|
||||||
}
|
}
|
||||||
@@ -523,6 +523,7 @@ public class TestHoodieClientOnCopyOnWriteStorage extends HoodieClientTestBase {
|
|||||||
throws Exception {
|
throws Exception {
|
||||||
// Force using older timeline layout
|
// Force using older timeline layout
|
||||||
HoodieWriteConfig hoodieWriteConfig = getConfigBuilder(HoodieFailedWritesCleaningPolicy.LAZY)
|
HoodieWriteConfig hoodieWriteConfig = getConfigBuilder(HoodieFailedWritesCleaningPolicy.LAZY)
|
||||||
|
.withRollbackUsingMarkers(true)
|
||||||
.withProps(config.getProps()).withTimelineLayoutVersion(
|
.withProps(config.getProps()).withTimelineLayoutVersion(
|
||||||
VERSION_0).build();
|
VERSION_0).build();
|
||||||
|
|
||||||
|
|||||||
@@ -1307,7 +1307,7 @@ public class TestCleaner extends HoodieClientTestBase {
|
|||||||
new HoodieInstant(State.REQUESTED, HoodieTimeline.COMMIT_ACTION, "000"), Option.empty());
|
new HoodieInstant(State.REQUESTED, HoodieTimeline.COMMIT_ACTION, "000"), Option.empty());
|
||||||
metaClient.reloadActiveTimeline();
|
metaClient.reloadActiveTimeline();
|
||||||
HoodieInstant rollbackInstant = new HoodieInstant(State.INFLIGHT, HoodieTimeline.COMMIT_ACTION, "000");
|
HoodieInstant rollbackInstant = new HoodieInstant(State.INFLIGHT, HoodieTimeline.COMMIT_ACTION, "000");
|
||||||
table.scheduleRollback(context, "001", rollbackInstant, false);
|
table.scheduleRollback(context, "001", rollbackInstant, false, config.shouldRollbackUsingMarkers());
|
||||||
table.rollback(context, "001", rollbackInstant, true, false);
|
table.rollback(context, "001", rollbackInstant, true, false);
|
||||||
final int numTempFilesAfter = testTable.listAllFilesInTempFolder().length;
|
final int numTempFilesAfter = testTable.listAllFilesInTempFolder().length;
|
||||||
assertEquals(0, numTempFilesAfter, "All temp files are deleted.");
|
assertEquals(0, numTempFilesAfter, "All temp files are deleted.");
|
||||||
|
|||||||
@@ -86,7 +86,8 @@ public class TestCopyOnWriteRollbackActionExecutor extends HoodieClientRollbackT
|
|||||||
|
|
||||||
// execute CopyOnWriteRollbackActionExecutor with filelisting mode
|
// execute CopyOnWriteRollbackActionExecutor with filelisting mode
|
||||||
BaseRollbackPlanActionExecutor copyOnWriteRollbackPlanActionExecutor =
|
BaseRollbackPlanActionExecutor copyOnWriteRollbackPlanActionExecutor =
|
||||||
new BaseRollbackPlanActionExecutor(context, table.getConfig(), table, "003", needRollBackInstant, false);
|
new BaseRollbackPlanActionExecutor(context, table.getConfig(), table, "003", needRollBackInstant, false,
|
||||||
|
table.getConfig().shouldRollbackUsingMarkers());
|
||||||
HoodieRollbackPlan rollbackPlan = (HoodieRollbackPlan) copyOnWriteRollbackPlanActionExecutor.execute().get();
|
HoodieRollbackPlan rollbackPlan = (HoodieRollbackPlan) copyOnWriteRollbackPlanActionExecutor.execute().get();
|
||||||
CopyOnWriteRollbackActionExecutor copyOnWriteRollbackActionExecutor = new CopyOnWriteRollbackActionExecutor(context, table.getConfig(), table, "003", needRollBackInstant, true,
|
CopyOnWriteRollbackActionExecutor copyOnWriteRollbackActionExecutor = new CopyOnWriteRollbackActionExecutor(context, table.getConfig(), table, "003", needRollBackInstant, true,
|
||||||
false);
|
false);
|
||||||
@@ -168,7 +169,8 @@ public class TestCopyOnWriteRollbackActionExecutor extends HoodieClientRollbackT
|
|||||||
}
|
}
|
||||||
|
|
||||||
BaseRollbackPlanActionExecutor copyOnWriteRollbackPlanActionExecutor =
|
BaseRollbackPlanActionExecutor copyOnWriteRollbackPlanActionExecutor =
|
||||||
new BaseRollbackPlanActionExecutor(context, table.getConfig(), table, "003", commitInstant, false);
|
new BaseRollbackPlanActionExecutor(context, table.getConfig(), table, "003", commitInstant, false,
|
||||||
|
table.getConfig().shouldRollbackUsingMarkers());
|
||||||
HoodieRollbackPlan hoodieRollbackPlan = (HoodieRollbackPlan) copyOnWriteRollbackPlanActionExecutor.execute().get();
|
HoodieRollbackPlan hoodieRollbackPlan = (HoodieRollbackPlan) copyOnWriteRollbackPlanActionExecutor.execute().get();
|
||||||
CopyOnWriteRollbackActionExecutor copyOnWriteRollbackActionExecutor = new CopyOnWriteRollbackActionExecutor(context, cfg, table, "003", commitInstant, false,
|
CopyOnWriteRollbackActionExecutor copyOnWriteRollbackActionExecutor = new CopyOnWriteRollbackActionExecutor(context, cfg, table, "003", commitInstant, false,
|
||||||
false);
|
false);
|
||||||
|
|||||||
@@ -91,7 +91,8 @@ public class TestMergeOnReadRollbackActionExecutor extends HoodieClientRollbackT
|
|||||||
//2. rollback
|
//2. rollback
|
||||||
HoodieInstant rollBackInstant = new HoodieInstant(isUsingMarkers, HoodieTimeline.DELTA_COMMIT_ACTION, "002");
|
HoodieInstant rollBackInstant = new HoodieInstant(isUsingMarkers, HoodieTimeline.DELTA_COMMIT_ACTION, "002");
|
||||||
BaseRollbackPlanActionExecutor mergeOnReadRollbackPlanActionExecutor =
|
BaseRollbackPlanActionExecutor mergeOnReadRollbackPlanActionExecutor =
|
||||||
new BaseRollbackPlanActionExecutor(context, cfg, table, "003", rollBackInstant, false);
|
new BaseRollbackPlanActionExecutor(context, cfg, table, "003", rollBackInstant, false,
|
||||||
|
cfg.shouldRollbackUsingMarkers());
|
||||||
mergeOnReadRollbackPlanActionExecutor.execute().get();
|
mergeOnReadRollbackPlanActionExecutor.execute().get();
|
||||||
MergeOnReadRollbackActionExecutor mergeOnReadRollbackActionExecutor = new MergeOnReadRollbackActionExecutor(
|
MergeOnReadRollbackActionExecutor mergeOnReadRollbackActionExecutor = new MergeOnReadRollbackActionExecutor(
|
||||||
context,
|
context,
|
||||||
|
|||||||
Reference in New Issue
Block a user