[HUDI-2422] Adding rollback plan and rollback requested instant (#3651)
- This patch introduces rollback plan and rollback.requested instant. Rollback will be done in two phases, namely rollback plan and rollback action. In planning, we prepare the rollback plan and serialize it to rollback.requested. In the rollback action phase, we fetch details from the plan and just delete the files as per the plan. This will ensure final rollback commit metadata will contain all files that got rolled back even if rollback failed midway and retried again.
This commit is contained in:
committed by
GitHub
parent
4deaa30c8d
commit
b8dad628e5
@@ -82,6 +82,13 @@ public class HoodieSparkEngineContext extends HoodieEngineContext {
|
||||
}).reduceByKey(reduceFunc::apply).map(Tuple2::_2).collect();
|
||||
}
|
||||
|
||||
@Override
|
||||
public <I, K, V> List<V> reduceByKey(
|
||||
List<Pair<K, V>> data, SerializableBiFunction<V, V, V> reduceFunc, int parallelism) {
|
||||
return javaSparkContext.parallelize(data, parallelism).mapToPair(pair -> new Tuple2<K, V>(pair.getLeft(), pair.getRight()))
|
||||
.reduceByKey(reduceFunc::apply).map(Tuple2::_2).collect();
|
||||
}
|
||||
|
||||
@Override
|
||||
public <I, O> List<O> flatMap(List<I> data, SerializableFunction<I, Stream<O>> func, int parallelism) {
|
||||
return javaSparkContext.parallelize(data, parallelism).flatMap(x -> func.apply(x).iterator()).collect();
|
||||
|
||||
@@ -24,6 +24,7 @@ import org.apache.hudi.avro.model.HoodieClusteringPlan;
|
||||
import org.apache.hudi.avro.model.HoodieCompactionPlan;
|
||||
import org.apache.hudi.avro.model.HoodieRestoreMetadata;
|
||||
import org.apache.hudi.avro.model.HoodieRollbackMetadata;
|
||||
import org.apache.hudi.avro.model.HoodieRollbackPlan;
|
||||
import org.apache.hudi.avro.model.HoodieSavepointMetadata;
|
||||
import org.apache.hudi.client.WriteStatus;
|
||||
import org.apache.hudi.client.common.HoodieSparkEngineContext;
|
||||
@@ -65,6 +66,7 @@ import org.apache.hudi.table.action.commit.SparkMergeHelper;
|
||||
import org.apache.hudi.table.action.commit.SparkUpsertCommitActionExecutor;
|
||||
import org.apache.hudi.table.action.commit.SparkUpsertPreppedCommitActionExecutor;
|
||||
import org.apache.hudi.table.action.restore.SparkCopyOnWriteRestoreActionExecutor;
|
||||
import org.apache.hudi.table.action.rollback.BaseRollbackPlanActionExecutor;
|
||||
import org.apache.hudi.table.action.rollback.CopyOnWriteRollbackActionExecutor;
|
||||
import org.apache.hudi.table.action.savepoint.SavepointActionExecutor;
|
||||
import org.apache.log4j.LogManager;
|
||||
@@ -187,6 +189,13 @@ public class HoodieSparkCopyOnWriteTable<T extends HoodieRecordPayload> extends
|
||||
return new SparkCleanPlanActionExecutor<>(context, config,this, instantTime, extraMetadata).execute();
|
||||
}
|
||||
|
||||
@Override
|
||||
public Option<HoodieRollbackPlan> scheduleRollback(HoodieEngineContext context,
|
||||
String instantTime,
|
||||
HoodieInstant instantToRollback, boolean skipTimelinePublish) {
|
||||
return new BaseRollbackPlanActionExecutor<>(context, config, this, instantTime, instantToRollback, skipTimelinePublish).execute();
|
||||
}
|
||||
|
||||
public Iterator<List<WriteStatus>> handleUpdate(String instantTime, String partitionPath, String fileId,
|
||||
Map<String, HoodieRecord<T>> keyToNewRecords, HoodieBaseFile oldDataFile) throws IOException {
|
||||
// these are updates
|
||||
|
||||
@@ -21,6 +21,7 @@ package org.apache.hudi.table;
|
||||
import org.apache.hudi.avro.model.HoodieCompactionPlan;
|
||||
import org.apache.hudi.avro.model.HoodieRestoreMetadata;
|
||||
import org.apache.hudi.avro.model.HoodieRollbackMetadata;
|
||||
import org.apache.hudi.avro.model.HoodieRollbackPlan;
|
||||
import org.apache.hudi.client.WriteStatus;
|
||||
import org.apache.hudi.client.common.HoodieSparkEngineContext;
|
||||
import org.apache.hudi.common.engine.HoodieEngineContext;
|
||||
@@ -49,6 +50,7 @@ import org.apache.hudi.table.action.deltacommit.SparkUpsertDeltaCommitActionExec
|
||||
import org.apache.hudi.table.action.deltacommit.SparkUpsertPreppedDeltaCommitActionExecutor;
|
||||
import org.apache.hudi.table.action.compact.BaseScheduleCompactionActionExecutor;
|
||||
import org.apache.hudi.table.action.restore.SparkMergeOnReadRestoreActionExecutor;
|
||||
import org.apache.hudi.table.action.rollback.BaseRollbackPlanActionExecutor;
|
||||
import org.apache.hudi.table.action.rollback.MergeOnReadRollbackActionExecutor;
|
||||
|
||||
import org.apache.spark.api.java.JavaRDD;
|
||||
@@ -142,6 +144,13 @@ public class HoodieSparkMergeOnReadTable<T extends HoodieRecordPayload> extends
|
||||
new SparkMergeOnReadRestoreActionExecutor((HoodieSparkEngineContext) context, config, this, instantTime, HoodieTimeline.INIT_INSTANT_TS).execute();
|
||||
}
|
||||
|
||||
@Override
|
||||
public Option<HoodieRollbackPlan> scheduleRollback(HoodieEngineContext context,
|
||||
String instantTime,
|
||||
HoodieInstant instantToRollback, boolean skipTimelinePublish) {
|
||||
return new BaseRollbackPlanActionExecutor<>(context, config, this, instantTime, instantToRollback, skipTimelinePublish).execute();
|
||||
}
|
||||
|
||||
@Override
|
||||
public HoodieRollbackMetadata rollback(HoodieEngineContext context,
|
||||
String rollbackInstantTime,
|
||||
|
||||
@@ -48,20 +48,23 @@ public class SparkCopyOnWriteRestoreActionExecutor<T extends HoodieRecordPayload
|
||||
|
||||
@Override
|
||||
protected HoodieRollbackMetadata rollbackInstant(HoodieInstant instantToRollback) {
|
||||
if (!instantToRollback.getAction().equals(HoodieTimeline.COMMIT_ACTION)
|
||||
&& !instantToRollback.getAction().equals(HoodieTimeline.REPLACE_COMMIT_ACTION)) {
|
||||
throw new HoodieRollbackException("Unsupported action in rollback instant:" + instantToRollback);
|
||||
}
|
||||
table.getMetaClient().reloadActiveTimeline();
|
||||
String instantTime = HoodieActiveTimeline.createNewInstantTime();
|
||||
table.scheduleRollback(context, instantTime, instantToRollback, false);
|
||||
table.getMetaClient().reloadActiveTimeline();
|
||||
CopyOnWriteRollbackActionExecutor rollbackActionExecutor = new CopyOnWriteRollbackActionExecutor(
|
||||
(HoodieSparkEngineContext) context,
|
||||
config,
|
||||
table,
|
||||
HoodieActiveTimeline.createNewInstantTime(),
|
||||
instantTime,
|
||||
instantToRollback,
|
||||
true,
|
||||
true,
|
||||
false);
|
||||
if (!instantToRollback.getAction().equals(HoodieTimeline.COMMIT_ACTION)
|
||||
&& !instantToRollback.getAction().equals(HoodieTimeline.REPLACE_COMMIT_ACTION)) {
|
||||
throw new HoodieRollbackException("Unsupported action in rollback instant:" + instantToRollback);
|
||||
}
|
||||
return rollbackActionExecutor.execute();
|
||||
}
|
||||
}
|
||||
|
||||
@@ -47,17 +47,6 @@ public class SparkMergeOnReadRestoreActionExecutor<T extends HoodieRecordPayload
|
||||
|
||||
@Override
|
||||
protected HoodieRollbackMetadata rollbackInstant(HoodieInstant instantToRollback) {
|
||||
table.getMetaClient().reloadActiveTimeline();
|
||||
MergeOnReadRollbackActionExecutor rollbackActionExecutor = new MergeOnReadRollbackActionExecutor(
|
||||
context,
|
||||
config,
|
||||
table,
|
||||
HoodieActiveTimeline.createNewInstantTime(),
|
||||
instantToRollback,
|
||||
true,
|
||||
true,
|
||||
false);
|
||||
|
||||
switch (instantToRollback.getAction()) {
|
||||
case HoodieTimeline.COMMIT_ACTION:
|
||||
case HoodieTimeline.DELTA_COMMIT_ACTION:
|
||||
@@ -66,9 +55,27 @@ public class SparkMergeOnReadRestoreActionExecutor<T extends HoodieRecordPayload
|
||||
// TODO : Get file status and create a rollback stat and file
|
||||
// TODO : Delete the .aux files along with the instant file, okay for now since the archival process will
|
||||
// delete these files when it does not see a corresponding instant file under .hoodie
|
||||
return rollbackActionExecutor.execute();
|
||||
break;
|
||||
default:
|
||||
throw new IllegalArgumentException("invalid action name " + instantToRollback.getAction());
|
||||
}
|
||||
table.getMetaClient().reloadActiveTimeline();
|
||||
String instantTime = HoodieActiveTimeline.createNewInstantTime();
|
||||
table.scheduleRollback(context, instantTime, instantToRollback, false);
|
||||
table.getMetaClient().reloadActiveTimeline();
|
||||
MergeOnReadRollbackActionExecutor rollbackActionExecutor = new MergeOnReadRollbackActionExecutor(
|
||||
context,
|
||||
config,
|
||||
table,
|
||||
instantTime,
|
||||
instantToRollback,
|
||||
true,
|
||||
true,
|
||||
false);
|
||||
|
||||
// TODO : Get file status and create a rollback stat and file
|
||||
// TODO : Delete the .aux files along with the instant file, okay for now since the archival process will
|
||||
// delete these files when it does not see a corresponding instant file under .hoodie
|
||||
return rollbackActionExecutor.execute();
|
||||
}
|
||||
}
|
||||
|
||||
@@ -18,6 +18,7 @@
|
||||
|
||||
package org.apache.hudi.table.upgrade;
|
||||
|
||||
import org.apache.hudi.avro.model.HoodieRollbackRequest;
|
||||
import org.apache.hudi.common.HoodieRollbackStat;
|
||||
import org.apache.hudi.common.engine.HoodieEngineContext;
|
||||
import org.apache.hudi.common.table.HoodieTableMetaClient;
|
||||
@@ -26,6 +27,7 @@ import org.apache.hudi.common.util.Option;
|
||||
import org.apache.hudi.config.HoodieWriteConfig;
|
||||
import org.apache.hudi.table.HoodieSparkTable;
|
||||
import org.apache.hudi.table.HoodieTable;
|
||||
import org.apache.hudi.table.action.rollback.BaseRollbackHelper;
|
||||
import org.apache.hudi.table.action.rollback.ListingBasedRollbackHelper;
|
||||
import org.apache.hudi.table.action.rollback.ListingBasedRollbackRequest;
|
||||
|
||||
@@ -44,7 +46,8 @@ public class ZeroToOneUpgradeHandler extends BaseZeroToOneUpgradeHandler {
|
||||
@Override
|
||||
List<HoodieRollbackStat> getListBasedRollBackStats(HoodieTableMetaClient metaClient, HoodieWriteConfig config, HoodieEngineContext context, Option<HoodieInstant> commitInstantOpt,
|
||||
List<ListingBasedRollbackRequest> rollbackRequests) {
|
||||
return new ListingBasedRollbackHelper(metaClient, config)
|
||||
.collectRollbackStats(context, commitInstantOpt.get(), rollbackRequests);
|
||||
List<HoodieRollbackRequest> hoodieRollbackRequests = new ListingBasedRollbackHelper(metaClient, config)
|
||||
.getRollbackRequestsForRollbackPlan(context, commitInstantOpt.get(), rollbackRequests);
|
||||
return new BaseRollbackHelper(metaClient, config).collectRollbackStats(context, commitInstantOpt.get(), hoodieRollbackRequests);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1282,7 +1282,9 @@ public class TestCleaner extends HoodieClientTestBase {
|
||||
table.getActiveTimeline().transitionRequestedToInflight(
|
||||
new HoodieInstant(State.REQUESTED, HoodieTimeline.COMMIT_ACTION, "000"), Option.empty());
|
||||
metaClient.reloadActiveTimeline();
|
||||
table.rollback(context, "001", new HoodieInstant(State.INFLIGHT, HoodieTimeline.COMMIT_ACTION, "000"), true);
|
||||
HoodieInstant rollbackInstant = new HoodieInstant(State.INFLIGHT, HoodieTimeline.COMMIT_ACTION, "000");
|
||||
table.scheduleRollback(context, "001", rollbackInstant, false);
|
||||
table.rollback(context, "001", rollbackInstant, true);
|
||||
final int numTempFilesAfter = testTable.listAllFilesInTempFolder().length;
|
||||
assertEquals(0, numTempFilesAfter, "All temp files are deleted.");
|
||||
}
|
||||
|
||||
@@ -19,6 +19,7 @@
|
||||
package org.apache.hudi.table.action.rollback;
|
||||
|
||||
import org.apache.hudi.avro.model.HoodieRollbackPartitionMetadata;
|
||||
import org.apache.hudi.avro.model.HoodieRollbackPlan;
|
||||
import org.apache.hudi.common.HoodieRollbackStat;
|
||||
import org.apache.hudi.common.model.FileSlice;
|
||||
import org.apache.hudi.common.model.HoodieFileGroup;
|
||||
@@ -79,13 +80,16 @@ public class TestCopyOnWriteRollbackActionExecutor extends HoodieClientRollbackT
|
||||
.withBaseFilesInPartition(p1, "id21")
|
||||
.withBaseFilesInPartition(p2, "id22");
|
||||
|
||||
HoodieTable table = this.getHoodieTable(metaClient, getConfig());
|
||||
HoodieWriteConfig writeConfig = getConfigBuilder().withRollbackUsingMarkers(false).build();
|
||||
HoodieTable table = this.getHoodieTable(metaClient, writeConfig);
|
||||
HoodieInstant needRollBackInstant = new HoodieInstant(false, HoodieTimeline.COMMIT_ACTION, "002");
|
||||
|
||||
// execute CopyOnWriteRollbackActionExecutor with filelisting mode
|
||||
BaseRollbackPlanActionExecutor copyOnWriteRollbackPlanActionExecutor =
|
||||
new BaseRollbackPlanActionExecutor(context, table.getConfig(), table, "003", needRollBackInstant, false);
|
||||
HoodieRollbackPlan rollbackPlan = (HoodieRollbackPlan) copyOnWriteRollbackPlanActionExecutor.execute().get();
|
||||
CopyOnWriteRollbackActionExecutor copyOnWriteRollbackActionExecutor = new CopyOnWriteRollbackActionExecutor(context, table.getConfig(), table, "003", needRollBackInstant, true);
|
||||
assertFalse(copyOnWriteRollbackActionExecutor.getRollbackStrategy() instanceof MarkerBasedRollbackStrategy);
|
||||
List<HoodieRollbackStat> hoodieRollbackStats = copyOnWriteRollbackActionExecutor.executeRollback();
|
||||
List<HoodieRollbackStat> hoodieRollbackStats = copyOnWriteRollbackActionExecutor.executeRollback(rollbackPlan);
|
||||
|
||||
// assert hoodieRollbackStats
|
||||
assertEquals(hoodieRollbackStats.size(), 3);
|
||||
@@ -96,14 +100,14 @@ public class TestCopyOnWriteRollbackActionExecutor extends HoodieClientRollbackT
|
||||
assertEquals(0, stat.getFailedDeleteFiles().size());
|
||||
assertEquals(Collections.EMPTY_MAP, stat.getCommandBlocksCount());
|
||||
assertEquals(testTable.forCommit("002").getBaseFilePath(p1, "id21").toString(),
|
||||
stat.getSuccessDeleteFiles().get(0));
|
||||
this.fs.getScheme() + ":" + stat.getSuccessDeleteFiles().get(0));
|
||||
break;
|
||||
case p2:
|
||||
assertEquals(1, stat.getSuccessDeleteFiles().size());
|
||||
assertEquals(0, stat.getFailedDeleteFiles().size());
|
||||
assertEquals(Collections.EMPTY_MAP, stat.getCommandBlocksCount());
|
||||
assertEquals(testTable.forCommit("002").getBaseFilePath(p2, "id22").toString(),
|
||||
stat.getSuccessDeleteFiles().get(0));
|
||||
this.fs.getScheme() + ":" + stat.getSuccessDeleteFiles().get(0));
|
||||
break;
|
||||
case p3:
|
||||
assertEquals(0, stat.getSuccessDeleteFiles().size());
|
||||
@@ -150,7 +154,7 @@ public class TestCopyOnWriteRollbackActionExecutor extends HoodieClientRollbackT
|
||||
HoodieTable table = this.getHoodieTable(metaClient, cfg);
|
||||
performRollbackAndValidate(isUsingMarkers, cfg, table, firstPartitionCommit2FileSlices, secondPartitionCommit2FileSlices);
|
||||
}
|
||||
|
||||
|
||||
private void performRollbackAndValidate(boolean isUsingMarkers, HoodieWriteConfig cfg, HoodieTable table,
|
||||
List<FileSlice> firstPartitionCommit2FileSlices,
|
||||
List<FileSlice> secondPartitionCommit2FileSlices) throws IOException {
|
||||
@@ -162,12 +166,10 @@ public class TestCopyOnWriteRollbackActionExecutor extends HoodieClientRollbackT
|
||||
commitInstant = table.getCompletedCommitTimeline().lastInstant().get();
|
||||
}
|
||||
|
||||
BaseRollbackPlanActionExecutor copyOnWriteRollbackPlanActionExecutor =
|
||||
new BaseRollbackPlanActionExecutor(context, table.getConfig(), table, "003", commitInstant, false);
|
||||
HoodieRollbackPlan hoodieRollbackPlan = (HoodieRollbackPlan) copyOnWriteRollbackPlanActionExecutor.execute().get();
|
||||
CopyOnWriteRollbackActionExecutor copyOnWriteRollbackActionExecutor = new CopyOnWriteRollbackActionExecutor(context, cfg, table, "003", commitInstant, false);
|
||||
if (!isUsingMarkers) {
|
||||
assertFalse(copyOnWriteRollbackActionExecutor.getRollbackStrategy() instanceof MarkerBasedRollbackStrategy);
|
||||
} else {
|
||||
assertTrue(copyOnWriteRollbackActionExecutor.getRollbackStrategy() instanceof MarkerBasedRollbackStrategy);
|
||||
}
|
||||
Map<String, HoodieRollbackPartitionMetadata> rollbackMetadata = copyOnWriteRollbackActionExecutor.execute().getPartitionMetadata();
|
||||
|
||||
//3. assert the rollback stat
|
||||
@@ -175,9 +177,9 @@ public class TestCopyOnWriteRollbackActionExecutor extends HoodieClientRollbackT
|
||||
for (Map.Entry<String, HoodieRollbackPartitionMetadata> entry : rollbackMetadata.entrySet()) {
|
||||
HoodieRollbackPartitionMetadata meta = entry.getValue();
|
||||
assertTrue(meta.getFailedDeleteFiles() == null
|
||||
|| meta.getFailedDeleteFiles().size() == 0);
|
||||
|| meta.getFailedDeleteFiles().size() == 0);
|
||||
assertTrue(meta.getSuccessDeleteFiles() == null
|
||||
|| meta.getSuccessDeleteFiles().size() == 1);
|
||||
|| meta.getSuccessDeleteFiles().size() == 1);
|
||||
}
|
||||
|
||||
//4. assert filegroup after rollback, and compare to the rollbackstat
|
||||
@@ -187,15 +189,11 @@ public class TestCopyOnWriteRollbackActionExecutor extends HoodieClientRollbackT
|
||||
List<FileSlice> firstPartitionRollBack1FileSlices = firstPartitionRollBack1FileGroups.get(0).getAllFileSlices().collect(Collectors.toList());
|
||||
assertEquals(1, firstPartitionRollBack1FileSlices.size());
|
||||
|
||||
if (!isUsingMarkers) {
|
||||
firstPartitionCommit2FileSlices.removeAll(firstPartitionRollBack1FileSlices);
|
||||
assertEquals(1, firstPartitionCommit2FileSlices.size());
|
||||
assertEquals(firstPartitionCommit2FileSlices.get(0).getBaseFile().get().getPath(),
|
||||
rollbackMetadata.get(DEFAULT_FIRST_PARTITION_PATH).getSuccessDeleteFiles().get(0));
|
||||
} else {
|
||||
assertEquals(firstPartitionCommit2FileSlices.get(0).getBaseFile().get().getPath(),
|
||||
String.format("%s:%s/%s", this.fs.getScheme(), basePath, rollbackMetadata.get(DEFAULT_FIRST_PARTITION_PATH).getSuccessDeleteFiles().get(0)));
|
||||
}
|
||||
firstPartitionCommit2FileSlices.removeAll(firstPartitionRollBack1FileSlices);
|
||||
assertEquals(1, firstPartitionCommit2FileSlices.size());
|
||||
assertEquals(firstPartitionCommit2FileSlices.get(0).getBaseFile().get().getPath(),
|
||||
this.fs.getScheme() + ":" + rollbackMetadata.get(DEFAULT_FIRST_PARTITION_PATH).getSuccessDeleteFiles().get(0));
|
||||
|
||||
|
||||
// assert the second partition file group and file slice
|
||||
List<HoodieFileGroup> secondPartitionRollBack1FileGroups = table.getFileSystemView().getAllFileGroups(DEFAULT_SECOND_PARTITION_PATH).collect(Collectors.toList());
|
||||
@@ -204,15 +202,10 @@ public class TestCopyOnWriteRollbackActionExecutor extends HoodieClientRollbackT
|
||||
assertEquals(1, secondPartitionRollBack1FileSlices.size());
|
||||
|
||||
// assert the second partition rollback file is equals rollBack1SecondPartitionStat
|
||||
if (!isUsingMarkers) {
|
||||
secondPartitionCommit2FileSlices.removeAll(secondPartitionRollBack1FileSlices);
|
||||
assertEquals(1, secondPartitionCommit2FileSlices.size());
|
||||
assertEquals(secondPartitionCommit2FileSlices.get(0).getBaseFile().get().getPath(),
|
||||
rollbackMetadata.get(DEFAULT_SECOND_PARTITION_PATH).getSuccessDeleteFiles().get(0));
|
||||
} else {
|
||||
assertEquals(secondPartitionCommit2FileSlices.get(0).getBaseFile().get().getPath(),
|
||||
String.format("%s:%s/%s", this.fs.getScheme(), basePath, rollbackMetadata.get(DEFAULT_SECOND_PARTITION_PATH).getSuccessDeleteFiles().get(0)));
|
||||
}
|
||||
secondPartitionCommit2FileSlices.removeAll(secondPartitionRollBack1FileSlices);
|
||||
assertEquals(1, secondPartitionCommit2FileSlices.size());
|
||||
assertEquals(secondPartitionCommit2FileSlices.get(0).getBaseFile().get().getPath(),
|
||||
this.fs.getScheme() + ":" + rollbackMetadata.get(DEFAULT_SECOND_PARTITION_PATH).getSuccessDeleteFiles().get(0));
|
||||
|
||||
assertFalse(WriteMarkersFactory.get(cfg.getMarkersType(), table, commitInstant.getTimestamp()).doesMarkerDirExist());
|
||||
}
|
||||
|
||||
@@ -61,7 +61,7 @@ public class TestMergeOnReadRollbackActionExecutor extends HoodieClientRollbackT
|
||||
initPath();
|
||||
initSparkContexts();
|
||||
//just generate tow partitions
|
||||
dataGen = new HoodieTestDataGenerator(new String[]{DEFAULT_FIRST_PARTITION_PATH, DEFAULT_SECOND_PARTITION_PATH});
|
||||
dataGen = new HoodieTestDataGenerator(new String[] {DEFAULT_FIRST_PARTITION_PATH, DEFAULT_SECOND_PARTITION_PATH});
|
||||
initFileSystem();
|
||||
initMetaClient();
|
||||
}
|
||||
@@ -89,6 +89,9 @@ public class TestMergeOnReadRollbackActionExecutor extends HoodieClientRollbackT
|
||||
|
||||
//2. rollback
|
||||
HoodieInstant rollBackInstant = new HoodieInstant(isUsingMarkers, HoodieTimeline.DELTA_COMMIT_ACTION, "002");
|
||||
BaseRollbackPlanActionExecutor mergeOnReadRollbackPlanActionExecutor =
|
||||
new BaseRollbackPlanActionExecutor(context, cfg, table, "003", rollBackInstant, false);
|
||||
mergeOnReadRollbackPlanActionExecutor.execute().get();
|
||||
MergeOnReadRollbackActionExecutor mergeOnReadRollbackActionExecutor = new MergeOnReadRollbackActionExecutor(
|
||||
context,
|
||||
cfg,
|
||||
@@ -96,13 +99,6 @@ public class TestMergeOnReadRollbackActionExecutor extends HoodieClientRollbackT
|
||||
"003",
|
||||
rollBackInstant,
|
||||
true);
|
||||
// assert is filelist mode
|
||||
if (!isUsingMarkers) {
|
||||
assertFalse(mergeOnReadRollbackActionExecutor.getRollbackStrategy() instanceof MarkerBasedRollbackStrategy);
|
||||
} else {
|
||||
assertTrue(mergeOnReadRollbackActionExecutor.getRollbackStrategy() instanceof MarkerBasedRollbackStrategy);
|
||||
}
|
||||
|
||||
//3. assert the rollback stat
|
||||
Map<String, HoodieRollbackPartitionMetadata> rollbackMetadata = mergeOnReadRollbackActionExecutor.execute().getPartitionMetadata();
|
||||
assertEquals(2, rollbackMetadata.size());
|
||||
@@ -145,15 +141,13 @@ public class TestMergeOnReadRollbackActionExecutor extends HoodieClientRollbackT
|
||||
public void testFailForCompletedInstants() {
|
||||
Assertions.assertThrows(IllegalArgumentException.class, () -> {
|
||||
HoodieInstant rollBackInstant = new HoodieInstant(false, HoodieTimeline.DELTA_COMMIT_ACTION, "002");
|
||||
new MergeOnReadRollbackActionExecutor(
|
||||
context,
|
||||
getConfigBuilder().build(),
|
||||
new MergeOnReadRollbackActionExecutor(context, getConfigBuilder().build(),
|
||||
getHoodieTable(metaClient, getConfigBuilder().build()),
|
||||
"003",
|
||||
rollBackInstant,
|
||||
true,
|
||||
true,
|
||||
true);
|
||||
true).execute();
|
||||
});
|
||||
}
|
||||
|
||||
|
||||
@@ -18,6 +18,7 @@
|
||||
|
||||
package org.apache.hudi.table.functional;
|
||||
|
||||
import org.apache.hudi.avro.model.HoodieRollbackRequest;
|
||||
import org.apache.hudi.client.SparkRDDWriteClient;
|
||||
import org.apache.hudi.client.WriteStatus;
|
||||
import org.apache.hudi.client.common.HoodieSparkEngineContext;
|
||||
@@ -32,6 +33,8 @@ import org.apache.hudi.common.table.timeline.HoodieTimeline;
|
||||
import org.apache.hudi.common.testutils.HoodieTestTable;
|
||||
import org.apache.hudi.config.HoodieWriteConfig;
|
||||
import org.apache.hudi.table.HoodieSparkTable;
|
||||
import org.apache.hudi.table.HoodieTable;
|
||||
import org.apache.hudi.table.action.rollback.BaseRollbackHelper;
|
||||
import org.apache.hudi.table.action.rollback.MarkerBasedRollbackStrategy;
|
||||
import org.apache.hudi.testutils.HoodieClientTestBase;
|
||||
|
||||
@@ -93,8 +96,13 @@ public class TestMarkerBasedRollbackStrategy extends HoodieClientTestBase {
|
||||
.withMarkerFile("partA", f2, IOType.CREATE);
|
||||
|
||||
// when
|
||||
List<HoodieRollbackStat> stats = new MarkerBasedRollbackStrategy(HoodieSparkTable.create(getConfig(), context, metaClient), context, getConfig(), "002")
|
||||
.execute(new HoodieInstant(HoodieInstant.State.INFLIGHT, HoodieTimeline.COMMIT_ACTION, "001"));
|
||||
HoodieTable hoodieTable = HoodieSparkTable.create(getConfig(), context, metaClient);
|
||||
List<HoodieRollbackRequest> rollbackRequests = new MarkerBasedRollbackStrategy(hoodieTable, context, getConfig(),
|
||||
"002").getRollbackRequests(new HoodieInstant(HoodieInstant.State.INFLIGHT, HoodieTimeline.COMMIT_ACTION, "001"));
|
||||
|
||||
List<HoodieRollbackStat> stats = new BaseRollbackHelper(hoodieTable.getMetaClient(), getConfig()).performRollback(context,
|
||||
new HoodieInstant(HoodieInstant.State.INFLIGHT, HoodieTimeline.COMMIT_ACTION, "001"),
|
||||
rollbackRequests);
|
||||
|
||||
// then: ensure files are deleted correctly, non-existent files reported as failed deletes
|
||||
assertEquals(2, stats.size());
|
||||
@@ -175,9 +183,14 @@ public class TestMarkerBasedRollbackStrategy extends HoodieClientTestBase {
|
||||
writeStatuses = writeClient.upsert(jsc.parallelize(records, 1), newCommitTime);
|
||||
writeStatuses.collect();
|
||||
|
||||
HoodieTable hoodieTable = HoodieSparkTable.create(getConfig(), context, metaClient);
|
||||
List<HoodieRollbackRequest> rollbackRequests = new MarkerBasedRollbackStrategy(hoodieTable, context, getConfig(),
|
||||
"003").getRollbackRequests(new HoodieInstant(HoodieInstant.State.INFLIGHT, HoodieTimeline.DELTA_COMMIT_ACTION, "002"));
|
||||
|
||||
// rollback 2nd commit and ensure stats reflect the info.
|
||||
return new MarkerBasedRollbackStrategy(HoodieSparkTable.create(writeConfig, context, metaClient), context, writeConfig, "003")
|
||||
.execute(new HoodieInstant(HoodieInstant.State.INFLIGHT, HoodieTimeline.DELTA_COMMIT_ACTION, "002"));
|
||||
return new BaseRollbackHelper(hoodieTable.getMetaClient(), getConfig()).performRollback(context,
|
||||
new HoodieInstant(HoodieInstant.State.INFLIGHT, HoodieTimeline.DELTA_COMMIT_ACTION, "002"),
|
||||
rollbackRequests);
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user