[HUDI-3732] Fixing rollback validation (#5157)
* Fixing rollback validation * Adding tests
This commit is contained in:
committed by
GitHub
parent
80011df995
commit
73a21092f8
@@ -136,6 +136,9 @@ public abstract class BaseRestoreActionExecutor<T extends HoodieRecordPayload, I
|
|||||||
.filter(instant -> HoodieActiveTimeline.GREATER_THAN.test(instant.getTimestamp(), restoreInstantTime))
|
.filter(instant -> HoodieActiveTimeline.GREATER_THAN.test(instant.getTimestamp(), restoreInstantTime))
|
||||||
.collect(Collectors.toList());
|
.collect(Collectors.toList());
|
||||||
instantsToRollback.forEach(entry -> {
|
instantsToRollback.forEach(entry -> {
|
||||||
|
if (entry.isCompleted()) {
|
||||||
|
table.getActiveTimeline().deleteCompletedRollback(entry);
|
||||||
|
}
|
||||||
table.getActiveTimeline().deletePending(new HoodieInstant(HoodieInstant.State.INFLIGHT, HoodieTimeline.ROLLBACK_ACTION, entry.getTimestamp()));
|
table.getActiveTimeline().deletePending(new HoodieInstant(HoodieInstant.State.INFLIGHT, HoodieTimeline.ROLLBACK_ACTION, entry.getTimestamp()));
|
||||||
table.getActiveTimeline().deletePending(new HoodieInstant(HoodieInstant.State.REQUESTED, HoodieTimeline.ROLLBACK_ACTION, entry.getTimestamp()));
|
table.getActiveTimeline().deletePending(new HoodieInstant(HoodieInstant.State.REQUESTED, HoodieTimeline.ROLLBACK_ACTION, entry.getTimestamp()));
|
||||||
});
|
});
|
||||||
|
|||||||
@@ -186,7 +186,12 @@ public abstract class BaseRollbackActionExecutor<T extends HoodieRecordPayload,
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
List<String> inflights = inflightAndRequestedCommitTimeline.getInstants().map(HoodieInstant::getTimestamp)
|
List<String> inflights = inflightAndRequestedCommitTimeline.getInstants().filter(instant -> {
|
||||||
|
if (!instant.getAction().equals(HoodieTimeline.REPLACE_COMMIT_ACTION)) {
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
return !ClusteringUtils.isPendingClusteringInstant(table.getMetaClient(), instant);
|
||||||
|
}).map(HoodieInstant::getTimestamp)
|
||||||
.collect(Collectors.toList());
|
.collect(Collectors.toList());
|
||||||
if ((instantTimeToRollback != null) && !inflights.isEmpty()
|
if ((instantTimeToRollback != null) && !inflights.isEmpty()
|
||||||
&& (inflights.indexOf(instantTimeToRollback) != inflights.size() - 1)) {
|
&& (inflights.indexOf(instantTimeToRollback) != inflights.size() - 1)) {
|
||||||
|
|||||||
@@ -1399,6 +1399,41 @@ public class TestHoodieClientOnCopyOnWriteStorage extends HoodieClientTestBase {
|
|||||||
testInsertAndClustering(clusteringConfig, populateMetaFields, true, false, SqlQueryEqualityPreCommitValidator.class.getName(), COUNT_SQL_QUERY_FOR_VALIDATION, "");
|
testInsertAndClustering(clusteringConfig, populateMetaFields, true, false, SqlQueryEqualityPreCommitValidator.class.getName(), COUNT_SQL_QUERY_FOR_VALIDATION, "");
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testRolblackOfRegularCommitWithPendingReplaceCommitInTimeline() throws Exception {
|
||||||
|
HoodieClusteringConfig clusteringConfig = HoodieClusteringConfig.newBuilder().withClusteringMaxNumGroups(10)
|
||||||
|
.withClusteringTargetPartitions(0).withInlineClusteringNumCommits(1).withInlineClustering(true)
|
||||||
|
.withPreserveHoodieCommitMetadata(true).build();
|
||||||
|
// trigger clustering, but do not complete
|
||||||
|
testInsertAndClustering(clusteringConfig, true, false, false, SqlQueryEqualityPreCommitValidator.class.getName(), COUNT_SQL_QUERY_FOR_VALIDATION, "");
|
||||||
|
|
||||||
|
// trigger another partial commit, followed by valid commit. rollback of partial commit should succeed.
|
||||||
|
HoodieWriteConfig.Builder cfgBuilder = getConfigBuilder().withAutoCommit(false);
|
||||||
|
SparkRDDWriteClient client = getHoodieWriteClient(cfgBuilder.build());
|
||||||
|
String commitTime1 = HoodieActiveTimeline.createNewInstantTime();
|
||||||
|
List<HoodieRecord> records1 = dataGen.generateInserts(commitTime1, 200);
|
||||||
|
client.startCommitWithTime(commitTime1);
|
||||||
|
JavaRDD<HoodieRecord> insertRecordsRDD1 = jsc.parallelize(records1, 2);
|
||||||
|
JavaRDD<WriteStatus> statuses = client.upsert(insertRecordsRDD1, commitTime1);
|
||||||
|
List<WriteStatus> statusList = statuses.collect();
|
||||||
|
assertNoWriteErrors(statusList);
|
||||||
|
|
||||||
|
HoodieTableMetaClient metaClient = HoodieTableMetaClient.builder().setConf(hadoopConf).setBasePath(basePath).build();
|
||||||
|
assertEquals(2, metaClient.getActiveTimeline().getCommitsTimeline().filterInflightsAndRequested().countInstants());
|
||||||
|
|
||||||
|
// trigger another commit. this should rollback latest partial commit.
|
||||||
|
records1 = dataGen.generateInserts(commitTime1, 200);
|
||||||
|
client.startCommitWithTime(commitTime1);
|
||||||
|
insertRecordsRDD1 = jsc.parallelize(records1, 2);
|
||||||
|
statuses = client.upsert(insertRecordsRDD1, commitTime1);
|
||||||
|
statusList = statuses.collect();
|
||||||
|
assertNoWriteErrors(statusList);
|
||||||
|
client.commit(commitTime1, statuses);
|
||||||
|
metaClient.reloadActiveTimeline();
|
||||||
|
// rollback should have succeeded. Essentially, the pending clustering should not hinder the rollback of regular commits.
|
||||||
|
assertEquals(1, metaClient.getActiveTimeline().getCommitsTimeline().filterInflightsAndRequested().countInstants());
|
||||||
|
}
|
||||||
|
|
||||||
@ParameterizedTest
|
@ParameterizedTest
|
||||||
@ValueSource(booleans = {true, false})
|
@ValueSource(booleans = {true, false})
|
||||||
public void testInlineScheduleClustering(boolean scheduleInlineClustering) throws IOException {
|
public void testInlineScheduleClustering(boolean scheduleInlineClustering) throws IOException {
|
||||||
|
|||||||
@@ -201,6 +201,11 @@ public class HoodieActiveTimeline extends HoodieDefaultTimeline {
|
|||||||
deleteInstantFile(instant);
|
deleteInstantFile(instant);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public void deleteCompletedRollback(HoodieInstant instant) {
|
||||||
|
ValidationUtils.checkArgument(instant.isCompleted());
|
||||||
|
deleteInstantFile(instant);
|
||||||
|
}
|
||||||
|
|
||||||
public static void deleteInstantFile(FileSystem fs, String metaPath, HoodieInstant instant) {
|
public static void deleteInstantFile(FileSystem fs, String metaPath, HoodieInstant instant) {
|
||||||
try {
|
try {
|
||||||
fs.delete(new Path(metaPath, instant.getFileName()), false);
|
fs.delete(new Path(metaPath, instant.getFileName()), false);
|
||||||
|
|||||||
Reference in New Issue
Block a user