[HUDI-1266] Add unit test for validating replacecommit rollback (#2418)
This commit is contained in:
@@ -18,23 +18,26 @@
|
||||
|
||||
package org.apache.hudi.table.action.rollback;
|
||||
|
||||
import org.apache.hudi.client.HoodieWriteResult;
|
||||
import org.apache.hudi.client.SparkRDDWriteClient;
|
||||
import org.apache.hudi.client.WriteStatus;
|
||||
import org.apache.hudi.common.model.FileSlice;
|
||||
import org.apache.hudi.common.model.HoodieFileGroup;
|
||||
import org.apache.hudi.common.model.HoodieRecord;
|
||||
import org.apache.hudi.common.model.HoodieTableType;
|
||||
import org.apache.hudi.common.table.timeline.HoodieTimeline;
|
||||
import org.apache.hudi.common.table.view.SyncableFileSystemView;
|
||||
import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
|
||||
import org.apache.hudi.common.util.Option;
|
||||
import org.apache.hudi.config.HoodieWriteConfig;
|
||||
import org.apache.hudi.table.HoodieTable;
|
||||
import org.apache.hudi.testutils.Assertions;
|
||||
import org.apache.hudi.testutils.HoodieClientTestBase;
|
||||
|
||||
import org.apache.spark.api.java.JavaRDD;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.List;
|
||||
import java.util.Set;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
import static org.apache.hudi.common.testutils.HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH;
|
||||
@@ -96,4 +99,61 @@ public class HoodieClientRollbackTestBase extends HoodieClientTestBase {
|
||||
assertEquals(1, secondPartitionCommit2FileSlices.size());
|
||||
}
|
||||
}
|
||||
|
||||
protected void insertOverwriteCommitDataWithTwoPartitions(List<FileSlice> firstPartitionCommit2FileSlices,
|
||||
List<FileSlice> secondPartitionCommit2FileSlices,
|
||||
HoodieWriteConfig cfg,
|
||||
boolean commitSecondInsertOverwrite) throws IOException {
|
||||
//just generate two partitions
|
||||
dataGen = new HoodieTestDataGenerator(new String[]{DEFAULT_FIRST_PARTITION_PATH, DEFAULT_SECOND_PARTITION_PATH});
|
||||
HoodieTestDataGenerator.writePartitionMetadata(fs, new String[]{DEFAULT_FIRST_PARTITION_PATH, DEFAULT_SECOND_PARTITION_PATH}, basePath);
|
||||
SparkRDDWriteClient client = getHoodieWriteClient(cfg);
|
||||
/**
|
||||
* Write 1 (upsert)
|
||||
*/
|
||||
String newCommitTime = "001";
|
||||
List<HoodieRecord> records = dataGen.generateInsertsContainsAllPartitions(newCommitTime, 2);
|
||||
JavaRDD<HoodieRecord> writeRecords = jsc.parallelize(records, 1);
|
||||
client.startCommitWithTime(newCommitTime);
|
||||
JavaRDD<WriteStatus> statuses = client.upsert(writeRecords, newCommitTime);
|
||||
Assertions.assertNoWriteErrors(statuses.collect());
|
||||
client.commit(newCommitTime, statuses);
|
||||
|
||||
// get fileIds written
|
||||
HoodieTable table = this.getHoodieTable(metaClient, cfg);
|
||||
SyncableFileSystemView fsView = getFileSystemViewWithUnCommittedSlices(table.getMetaClient());
|
||||
List<HoodieFileGroup> firstPartitionCommit1FileGroups = fsView.getAllFileGroups(DEFAULT_FIRST_PARTITION_PATH).collect(Collectors.toList());
|
||||
assertEquals(1, firstPartitionCommit1FileGroups.size());
|
||||
Set<String> partition1Commit1FileIds = firstPartitionCommit1FileGroups.get(0).getAllFileSlices().map(FileSlice::getFileId).collect(Collectors.toSet());
|
||||
List<HoodieFileGroup> secondPartitionCommit1FileGroups = fsView.getAllFileGroups(DEFAULT_SECOND_PARTITION_PATH).collect(Collectors.toList());
|
||||
assertEquals(1, secondPartitionCommit1FileGroups.size());
|
||||
Set<String> partition2Commit1FileIds = secondPartitionCommit1FileGroups.get(0).getAllFileSlices().map(FileSlice::getFileId).collect(Collectors.toSet());
|
||||
|
||||
/**
|
||||
* Write 2 (one insert_overwrite)
|
||||
*/
|
||||
String commitActionType = HoodieTimeline.REPLACE_COMMIT_ACTION;
|
||||
newCommitTime = "002";
|
||||
records = dataGen.generateInsertsContainsAllPartitions(newCommitTime, 2);
|
||||
writeRecords = jsc.parallelize(records, 1);
|
||||
client.startCommitWithTime(newCommitTime, commitActionType);
|
||||
HoodieWriteResult result = client.insertOverwrite(writeRecords, newCommitTime);
|
||||
statuses = result.getWriteStatuses();
|
||||
Assertions.assertNoWriteErrors(statuses.collect());
|
||||
if (commitSecondInsertOverwrite) {
|
||||
client.commit(newCommitTime, statuses, Option.empty(), commitActionType, result.getPartitionToReplaceFileIds());
|
||||
}
|
||||
metaClient.reloadActiveTimeline();
|
||||
// get new fileIds written as part of insert_overwrite
|
||||
fsView = getFileSystemViewWithUnCommittedSlices(metaClient);
|
||||
List<HoodieFileGroup> firstPartitionCommit2FileGroups = fsView.getAllFileGroups(DEFAULT_FIRST_PARTITION_PATH)
|
||||
.filter(fg -> !partition1Commit1FileIds.contains(fg.getFileGroupId().getFileId())).collect(Collectors.toList());
|
||||
firstPartitionCommit2FileSlices.addAll(firstPartitionCommit2FileGroups.get(0).getAllFileSlices().collect(Collectors.toList()));
|
||||
List<HoodieFileGroup> secondPartitionCommit2FileGroups = fsView.getAllFileGroups(DEFAULT_SECOND_PARTITION_PATH)
|
||||
.filter(fg -> !partition2Commit1FileIds.contains(fg.getFileGroupId().getFileId())).collect(Collectors.toList());
|
||||
secondPartitionCommit2FileSlices.addAll(secondPartitionCommit2FileGroups.get(0).getAllFileSlices().collect(Collectors.toList()));
|
||||
|
||||
assertEquals(1, firstPartitionCommit2FileSlices.size());
|
||||
assertEquals(1, secondPartitionCommit2FileSlices.size());
|
||||
}
|
||||
}
|
||||
|
||||
@@ -28,7 +28,6 @@ import org.apache.hudi.common.testutils.HoodieTestTable;
|
||||
import org.apache.hudi.config.HoodieWriteConfig;
|
||||
import org.apache.hudi.table.HoodieTable;
|
||||
import org.apache.hudi.table.MarkerFiles;
|
||||
|
||||
import org.junit.jupiter.api.AfterEach;
|
||||
import org.junit.jupiter.api.BeforeEach;
|
||||
import org.junit.jupiter.api.Test;
|
||||
@@ -125,6 +124,19 @@ public class TestCopyOnWriteRollbackActionExecutor extends HoodieClientRollbackT
|
||||
assertFalse(testTable.baseFileExists(p2, "002", "id22"));
|
||||
}
|
||||
|
||||
// Verify that rollback works with replacecommit
|
||||
@ParameterizedTest
|
||||
@ValueSource(booleans = {true, false})
|
||||
public void testCopyOnWriteRollbackWithReplaceCommits(boolean isUsingMarkers) throws IOException {
|
||||
//1. prepare data and assert data result
|
||||
List<FileSlice> firstPartitionCommit2FileSlices = new ArrayList<>();
|
||||
List<FileSlice> secondPartitionCommit2FileSlices = new ArrayList<>();
|
||||
HoodieWriteConfig cfg = getConfigBuilder().withRollbackUsingMarkers(isUsingMarkers).withAutoCommit(false).build();
|
||||
this.insertOverwriteCommitDataWithTwoPartitions(firstPartitionCommit2FileSlices, secondPartitionCommit2FileSlices, cfg, !isUsingMarkers);
|
||||
HoodieTable table = this.getHoodieTable(metaClient, cfg);
|
||||
performRollbackAndValidate(isUsingMarkers, cfg, table, firstPartitionCommit2FileSlices, secondPartitionCommit2FileSlices);
|
||||
}
|
||||
|
||||
@ParameterizedTest
|
||||
@ValueSource(booleans = {true, false})
|
||||
public void testCopyOnWriteRollbackActionExecutor(boolean isUsingMarkers) throws IOException {
|
||||
@@ -133,8 +145,14 @@ public class TestCopyOnWriteRollbackActionExecutor extends HoodieClientRollbackT
|
||||
List<FileSlice> secondPartitionCommit2FileSlices = new ArrayList<>();
|
||||
HoodieWriteConfig cfg = getConfigBuilder().withRollbackUsingMarkers(isUsingMarkers).withAutoCommit(false).build();
|
||||
this.twoUpsertCommitDataWithTwoPartitions(firstPartitionCommit2FileSlices, secondPartitionCommit2FileSlices, cfg, !isUsingMarkers);
|
||||
metaClient.reloadActiveTimeline();
|
||||
HoodieTable table = this.getHoodieTable(metaClient, cfg);
|
||||
|
||||
performRollbackAndValidate(isUsingMarkers, cfg, table, firstPartitionCommit2FileSlices, secondPartitionCommit2FileSlices);
|
||||
}
|
||||
|
||||
private void performRollbackAndValidate(boolean isUsingMarkers, HoodieWriteConfig cfg, HoodieTable table,
|
||||
List<FileSlice> firstPartitionCommit2FileSlices,
|
||||
List<FileSlice> secondPartitionCommit2FileSlices) throws IOException {
|
||||
//2. rollback
|
||||
HoodieInstant commitInstant;
|
||||
if (isUsingMarkers) {
|
||||
|
||||
Reference in New Issue
Block a user