1
0

[HUDI-3735] TestHoodieSparkMergeOnReadTableRollback is flaky (#5874)

This commit is contained in:
xi chaomin
2022-06-24 14:47:36 +08:00
committed by GitHub
parent 6456bd3a51
commit 30ebdc708b

View File

@@ -227,8 +227,8 @@ public class TestHoodieSparkMergeOnReadTableRollback extends SparkClientFunction
allFiles = listAllBaseFilesInPath(hoodieTable); allFiles = listAllBaseFilesInPath(hoodieTable);
// After rollback, there should be no base file with the failed commit time // After rollback, there should be no base file with the failed commit time
List<String> remainingFiles = Arrays.stream(allFiles).filter(file -> file.getPath().getName() List<String> remainingFiles = Arrays.stream(allFiles).filter(file -> file.getPath().getName()
.contains(commitTime1)).map(fileStatus -> fileStatus.getPath().toString()).collect(Collectors.toList()); .contains("_" + commitTime1)).map(fileStatus -> fileStatus.getPath().toString()).collect(Collectors.toList());
assertEquals(0, remainingFiles.size(), "There files should have been rolled-back " assertEquals(0, remainingFiles.size(), "These files should have been rolled-back "
+ "when rolling back commit " + commitTime1 + " but are still remaining. Files: " + remainingFiles); + "when rolling back commit " + commitTime1 + " but are still remaining. Files: " + remainingFiles);
inputPaths = tableView.getLatestBaseFiles() inputPaths = tableView.getLatestBaseFiles()
.map(baseFile -> new Path(baseFile.getPath()).getParent().toString()) .map(baseFile -> new Path(baseFile.getPath()).getParent().toString())
@@ -240,7 +240,7 @@ public class TestHoodieSparkMergeOnReadTableRollback extends SparkClientFunction
/* /*
* Write 3 (inserts + updates - testing successful delta commit) * Write 3 (inserts + updates - testing successful delta commit)
*/ */
final String commitTime2 = "003"; final String commitTime2 = "000000003";
try (SparkRDDWriteClient thirdClient = getHoodieWriteClient(getHoodieWriteConfigWithSmallFileHandlingOff(true));) { try (SparkRDDWriteClient thirdClient = getHoodieWriteClient(getHoodieWriteConfigWithSmallFileHandlingOff(true));) {
thirdClient.startCommitWithTime(commitTime2); thirdClient.startCommitWithTime(commitTime2);
@@ -265,8 +265,10 @@ public class TestHoodieSparkMergeOnReadTableRollback extends SparkClientFunction
thirdClient.rollback(commitTime2); thirdClient.rollback(commitTime2);
allFiles = listAllBaseFilesInPath(hoodieTable); allFiles = listAllBaseFilesInPath(hoodieTable);
// After rollback, there should be no base file with the failed commit time // After rollback, there should be no base file with the failed commit time
assertEquals(0, Arrays.stream(allFiles) List<String> remainingFiles = Arrays.stream(allFiles).filter(file -> file.getPath().getName()
.filter(file -> file.getPath().getName().contains(commitTime2)).count()); .contains("_" + commitTime2)).map(fileStatus -> fileStatus.getPath().toString()).collect(Collectors.toList());
assertEquals(0, remainingFiles.size(), "These files should have been rolled-back "
+ "when rolling back commit " + commitTime2 + " but are still remaining. Files: " + remainingFiles);
metaClient = HoodieTableMetaClient.reload(metaClient); metaClient = HoodieTableMetaClient.reload(metaClient);
hoodieTable = HoodieSparkTable.create(cfg, context(), metaClient); hoodieTable = HoodieSparkTable.create(cfg, context(), metaClient);
@@ -282,7 +284,7 @@ public class TestHoodieSparkMergeOnReadTableRollback extends SparkClientFunction
/* /*
* Write 4 (updates) * Write 4 (updates)
*/ */
newCommitTime = "004"; newCommitTime = "000000004";
thirdClient.startCommitWithTime(newCommitTime); thirdClient.startCommitWithTime(newCommitTime);
writeStatusJavaRDD = thirdClient.upsert(writeRecords, newCommitTime); writeStatusJavaRDD = thirdClient.upsert(writeRecords, newCommitTime);