[HUDI-995] Migrate HoodieTestUtils APIs to HoodieTestTable (#2167)
Remove APIs in `HoodieTestUtils` - `createCommitFiles` - `createDataFile` - `createNewLogFile` - `createCompactionRequest` Migrated usages in `TestCleaner#testPendingCompactions`. Also improved some API names in `HoodieTestTable`.
This commit is contained in:
@@ -51,7 +51,6 @@ import org.apache.hudi.common.table.timeline.versioning.clean.CleanMetadataMigra
|
||||
import org.apache.hudi.common.table.timeline.versioning.clean.CleanPlanMigrator;
|
||||
import org.apache.hudi.common.table.timeline.versioning.clean.CleanPlanV1MigrationHandler;
|
||||
import org.apache.hudi.common.table.view.TableFileSystemView;
|
||||
import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
|
||||
import org.apache.hudi.common.testutils.HoodieTestTable;
|
||||
import org.apache.hudi.common.testutils.HoodieTestUtils;
|
||||
import org.apache.hudi.common.util.CleanerUtils;
|
||||
@@ -155,7 +154,7 @@ public class TestCleaner extends HoodieClientTestBase {
|
||||
assertTrue(table.getCompletedCleanTimeline().empty());
|
||||
|
||||
HoodieIndex index = SparkHoodieIndex.createIndex(cfg);
|
||||
List<HoodieRecord> taggedRecords = ((JavaRDD<HoodieRecord>)index.tagLocation(jsc.parallelize(records, 1), context, table)).collect();
|
||||
List<HoodieRecord> taggedRecords = ((JavaRDD<HoodieRecord>) index.tagLocation(jsc.parallelize(records, 1), context, table)).collect();
|
||||
checkTaggedRecords(taggedRecords, newCommitTime);
|
||||
}
|
||||
|
||||
@@ -550,7 +549,7 @@ public class TestCleaner extends HoodieClientTestBase {
|
||||
Map<String, String> partitionAndFileId002 = testTable.addCommit("00000000000002")
|
||||
.withBaseFilesInPartition(p0, file1P0C0)
|
||||
.withBaseFilesInPartition(p1, file1P1C0)
|
||||
.withBaseFilesInPartitions(p0, p1);
|
||||
.getFileIdsWithBaseFilesInPartitions(p0, p1);
|
||||
|
||||
List<HoodieCleanStat> hoodieCleanStatsTwo = runCleaner(config, 1);
|
||||
// enableBootstrapSourceClean would delete the bootstrap base file as the same time
|
||||
@@ -592,7 +591,7 @@ public class TestCleaner extends HoodieClientTestBase {
|
||||
// make next commit, with 2 updates to existing files, and 1 insert
|
||||
String file3P0C2 = testTable.addCommit("00000000000003")
|
||||
.withBaseFilesInPartition(p0, file1P0C0, file2P0C1)
|
||||
.withBaseFilesInPartitions(p0).get(p0);
|
||||
.getFileIdsWithBaseFilesInPartitions(p0).get(p0);
|
||||
List<HoodieCleanStat> hoodieCleanStatsThree = runCleaner(config, 3);
|
||||
assertEquals(2,
|
||||
getCleanStat(hoodieCleanStatsThree, p0)
|
||||
@@ -625,7 +624,7 @@ public class TestCleaner extends HoodieClientTestBase {
|
||||
String p0 = "2020/01/01";
|
||||
|
||||
// Make 3 files, one base file and 2 log files associated with base file
|
||||
String file1P0 = testTable.addDeltaCommit("000").withBaseFilesInPartitions(p0).get(p0);
|
||||
String file1P0 = testTable.addDeltaCommit("000").getFileIdsWithBaseFilesInPartitions(p0).get(p0);
|
||||
testTable.forDeltaCommit("000")
|
||||
.withLogFile(p0, file1P0, 1)
|
||||
.withLogFile(p0, file1P0, 2);
|
||||
@@ -865,7 +864,7 @@ public class TestCleaner extends HoodieClientTestBase {
|
||||
assertTrue(testTable.baseFileExists(p1, "00000000000001", file1P1C0));
|
||||
|
||||
// make next commit, with 1 insert & 1 update per partition
|
||||
Map<String, String> partitionAndFileId002 = testTable.addInflightCommit("00000000000002").withBaseFilesInPartitions(p0, p1);
|
||||
Map<String, String> partitionAndFileId002 = testTable.addInflightCommit("00000000000002").getFileIdsWithBaseFilesInPartitions(p0, p1);
|
||||
String file2P0C1 = partitionAndFileId002.get(p0);
|
||||
String file2P1C1 = partitionAndFileId002.get(p1);
|
||||
testTable.forCommit("00000000000002").withBaseFilesInPartition(p0, file1P0C0).withBaseFilesInPartition(p1, file1P1C0);
|
||||
@@ -889,7 +888,7 @@ public class TestCleaner extends HoodieClientTestBase {
|
||||
String file3P0C2 = testTable.addInflightCommit("00000000000003")
|
||||
.withBaseFilesInPartition(p0, file1P0C0)
|
||||
.withBaseFilesInPartition(p0, file2P0C1)
|
||||
.withBaseFilesInPartitions(p0).get(p0);
|
||||
.getFileIdsWithBaseFilesInPartitions(p0).get(p0);
|
||||
commitMetadata = generateCommitMetadata(CollectionUtils
|
||||
.createImmutableMap(p0,
|
||||
CollectionUtils.createImmutableList(file1P0C0, file2P0C1, file3P0C2)));
|
||||
@@ -906,7 +905,7 @@ public class TestCleaner extends HoodieClientTestBase {
|
||||
String file4P0C3 = testTable.addInflightCommit("00000000000004")
|
||||
.withBaseFilesInPartition(p0, file1P0C0)
|
||||
.withBaseFilesInPartition(p0, file2P0C1)
|
||||
.withBaseFilesInPartitions(p0).get(p0);
|
||||
.getFileIdsWithBaseFilesInPartitions(p0).get(p0);
|
||||
commitMetadata = generateCommitMetadata(CollectionUtils.createImmutableMap(
|
||||
p0, CollectionUtils.createImmutableList(file1P0C0, file2P0C1, file4P0C3)));
|
||||
metaClient.getActiveTimeline().saveAsComplete(
|
||||
@@ -1021,7 +1020,7 @@ public class TestCleaner extends HoodieClientTestBase {
|
||||
* Test Keep Latest Commits when there are pending compactions.
|
||||
*/
|
||||
@Test
|
||||
public void testKeepLatestCommitsWithPendingCompactions() throws IOException {
|
||||
public void testKeepLatestCommitsWithPendingCompactions() throws Exception {
|
||||
HoodieWriteConfig config = HoodieWriteConfig.newBuilder().withPath(basePath).withAssumeDatePartitioning(true)
|
||||
.withCompactionConfig(HoodieCompactionConfig.newBuilder()
|
||||
.withCleanerPolicy(HoodieCleaningPolicy.KEEP_LATEST_COMMITS).retainCommits(2).build())
|
||||
@@ -1043,7 +1042,7 @@ public class TestCleaner extends HoodieClientTestBase {
|
||||
*/
|
||||
@ParameterizedTest
|
||||
@ValueSource(booleans = {false, true})
|
||||
public void testKeepLatestVersionsWithPendingCompactions(boolean retryFailure) throws IOException {
|
||||
public void testKeepLatestVersionsWithPendingCompactions(boolean retryFailure) throws Exception {
|
||||
HoodieWriteConfig config =
|
||||
HoodieWriteConfig.newBuilder().withPath(basePath).withAssumeDatePartitioning(true)
|
||||
.withCompactionConfig(HoodieCompactionConfig.newBuilder()
|
||||
@@ -1098,73 +1097,82 @@ public class TestCleaner extends HoodieClientTestBase {
|
||||
* @param expNumFilesDeleted Number of files deleted
|
||||
*/
|
||||
private void testPendingCompactions(HoodieWriteConfig config, int expNumFilesDeleted,
|
||||
int expNumFilesUnderCompactionDeleted, boolean retryFailure) throws IOException {
|
||||
int expNumFilesUnderCompactionDeleted, boolean retryFailure) throws Exception {
|
||||
HoodieTableMetaClient metaClient =
|
||||
HoodieTestUtils.init(hadoopConf, basePath, HoodieTableType.MERGE_ON_READ);
|
||||
String[] instants = new String[] {"000", "001", "003", "005", "007", "009", "011", "013"};
|
||||
String[] compactionInstants = new String[] {"002", "004", "006", "008", "010"};
|
||||
Map<String, String> expFileIdToPendingCompaction = new HashMap<>();
|
||||
Map<String, String> fileIdToLatestInstantBeforeCompaction = new HashMap<>();
|
||||
Map<String, List<FileSlice>> compactionInstantsToFileSlices = new HashMap<>();
|
||||
|
||||
for (String instant : instants) {
|
||||
HoodieTestUtils.createCommitFiles(basePath, instant);
|
||||
}
|
||||
final String partition = "2016/03/15";
|
||||
Map<String, String> expFileIdToPendingCompaction = new HashMap<String, String>() {
|
||||
{
|
||||
put("fileId2", "004");
|
||||
put("fileId3", "006");
|
||||
put("fileId4", "008");
|
||||
put("fileId5", "010");
|
||||
}
|
||||
};
|
||||
Map<String, String> fileIdToLatestInstantBeforeCompaction = new HashMap<String, String>() {
|
||||
{
|
||||
put("fileId1", "000");
|
||||
put("fileId2", "000");
|
||||
put("fileId3", "001");
|
||||
put("fileId4", "003");
|
||||
put("fileId5", "005");
|
||||
put("fileId6", "009");
|
||||
put("fileId7", "011");
|
||||
}
|
||||
};
|
||||
|
||||
// Generate 7 file-groups. First one has only one slice and no pending compaction. File Slices (2 - 5) has
|
||||
// multiple versions with pending compaction. File Slices (6 - 7) have multiple file-slices but not under
|
||||
// compactions
|
||||
// FileIds 2-5 will be under compaction
|
||||
int maxNumFileIds = 7;
|
||||
String[] fileIds = new String[] {"fileId1", "fileId2", "fileId3", "fileId4", "fileId5", "fileId6", "fileId7"};
|
||||
int maxNumFileIdsForCompaction = 4;
|
||||
for (int i = 0; i < maxNumFileIds; i++) {
|
||||
final String fileId = HoodieTestUtils.createDataFile(basePath,
|
||||
HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, instants[0], fileIds[i]);
|
||||
HoodieTestUtils.createNewLogFile(fs, basePath, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, instants[0],
|
||||
fileId, Option.empty());
|
||||
HoodieTestUtils.createNewLogFile(fs, basePath, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, instants[0],
|
||||
fileId, Option.of(2));
|
||||
fileIdToLatestInstantBeforeCompaction.put(fileId, instants[0]);
|
||||
for (int j = 1; j <= i; j++) {
|
||||
if (j == i && j <= maxNumFileIdsForCompaction) {
|
||||
expFileIdToPendingCompaction.put(fileId, compactionInstants[j]);
|
||||
metaClient = HoodieTableMetaClient.reload(metaClient);
|
||||
HoodieTable table = HoodieSparkTable.create(config, context, metaClient);
|
||||
FileSlice slice =
|
||||
table.getSliceView().getLatestFileSlices(HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH)
|
||||
.filter(fs -> fs.getFileId().equals(fileId)).findFirst().get();
|
||||
List<FileSlice> slices = new ArrayList<>();
|
||||
if (compactionInstantsToFileSlices.containsKey(compactionInstants[j])) {
|
||||
slices = compactionInstantsToFileSlices.get(compactionInstants[j]);
|
||||
}
|
||||
slices.add(slice);
|
||||
compactionInstantsToFileSlices.put(compactionInstants[j], slices);
|
||||
// Add log-files to simulate delta-commits after pending compaction
|
||||
HoodieTestUtils.createNewLogFile(fs, basePath, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH,
|
||||
compactionInstants[j], fileId, Option.empty());
|
||||
HoodieTestUtils.createNewLogFile(fs, basePath, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH,
|
||||
compactionInstants[j], fileId, Option.of(2));
|
||||
} else {
|
||||
HoodieTestUtils.createDataFile(basePath, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, instants[j],
|
||||
fileId);
|
||||
HoodieTestUtils.createNewLogFile(fs, basePath, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH,
|
||||
instants[j], fileId, Option.empty());
|
||||
HoodieTestUtils.createNewLogFile(fs, basePath, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH,
|
||||
instants[j], fileId, Option.of(2));
|
||||
fileIdToLatestInstantBeforeCompaction.put(fileId, instants[j]);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Setup pending compaction plans
|
||||
for (String instant : compactionInstants) {
|
||||
List<FileSlice> fileSliceList = compactionInstantsToFileSlices.get(instant);
|
||||
if (null != fileSliceList) {
|
||||
HoodieTestUtils.createCompactionRequest(metaClient, instant, fileSliceList.stream()
|
||||
.map(fs -> Pair.of(HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, fs)).collect(Collectors.toList()));
|
||||
}
|
||||
}
|
||||
HoodieTestTable.of(metaClient)
|
||||
.addCommit("000")
|
||||
.withBaseFilesInPartition(partition, "fileId1", "fileId2", "fileId3", "fileId4", "fileId5", "fileId6", "fileId7")
|
||||
.withLogFile(partition, "fileId1", 1, 2)
|
||||
.withLogFile(partition, "fileId2", 1, 2)
|
||||
.withLogFile(partition, "fileId3", 1, 2)
|
||||
.withLogFile(partition, "fileId4", 1, 2)
|
||||
.withLogFile(partition, "fileId5", 1, 2)
|
||||
.withLogFile(partition, "fileId6", 1, 2)
|
||||
.withLogFile(partition, "fileId7", 1, 2)
|
||||
.addCommit("001")
|
||||
.withBaseFilesInPartition(partition, "fileId3", "fileId4", "fileId5", "fileId6", "fileId7")
|
||||
.withLogFile(partition, "fileId3", 1, 2)
|
||||
.withLogFile(partition, "fileId4", 1, 2)
|
||||
.withLogFile(partition, "fileId5", 1, 2)
|
||||
.withLogFile(partition, "fileId6", 1, 2)
|
||||
.withLogFile(partition, "fileId7", 1, 2)
|
||||
.addCommit("003")
|
||||
.withBaseFilesInPartition(partition, "fileId4", "fileId5", "fileId6", "fileId7")
|
||||
.withLogFile(partition, "fileId4", 1, 2)
|
||||
.withLogFile(partition, "fileId5", 1, 2)
|
||||
.withLogFile(partition, "fileId6", 1, 2)
|
||||
.withLogFile(partition, "fileId7", 1, 2)
|
||||
.addRequestedCompaction("004", new FileSlice(partition, "000", "fileId2"))
|
||||
.withLogFile(partition, "fileId2", 1, 2)
|
||||
.addCommit("005")
|
||||
.withBaseFilesInPartition(partition, "fileId5", "fileId6", "fileId7")
|
||||
.withLogFile(partition, "fileId5", 1, 2)
|
||||
.withLogFile(partition, "fileId6", 1, 2)
|
||||
.withLogFile(partition, "fileId7", 1, 2)
|
||||
.addRequestedCompaction("006", new FileSlice(partition, "001", "fileId3"))
|
||||
.withLogFile(partition, "fileId3", 1, 2)
|
||||
.addCommit("007")
|
||||
.withBaseFilesInPartition(partition, "fileId6", "fileId7")
|
||||
.withLogFile(partition, "fileId6", 1, 2)
|
||||
.withLogFile(partition, "fileId7", 1, 2)
|
||||
.addRequestedCompaction("008", new FileSlice(partition, "003", "fileId4"))
|
||||
.withLogFile(partition, "fileId4", 1, 2)
|
||||
.addCommit("009")
|
||||
.withBaseFilesInPartition(partition, "fileId6", "fileId7")
|
||||
.withLogFile(partition, "fileId6", 1, 2)
|
||||
.withLogFile(partition, "fileId7", 1, 2)
|
||||
.addRequestedCompaction("010", new FileSlice(partition, "005", "fileId5"))
|
||||
.withLogFile(partition, "fileId5", 1, 2)
|
||||
.addCommit("011")
|
||||
.withBaseFilesInPartition(partition, "fileId7")
|
||||
.withLogFile(partition, "fileId7", 1, 2)
|
||||
.addCommit("013");
|
||||
|
||||
// Clean now
|
||||
metaClient = HoodieTableMetaClient.reload(metaClient);
|
||||
@@ -1177,7 +1185,7 @@ public class TestCleaner extends HoodieClientTestBase {
|
||||
expFileIdToPendingCompaction.forEach((fileId, value) -> {
|
||||
String baseInstantForCompaction = fileIdToLatestInstantBeforeCompaction.get(fileId);
|
||||
Option<FileSlice> fileSliceForCompaction = Option.fromJavaOptional(hoodieTable.getSliceView()
|
||||
.getLatestFileSlicesBeforeOrOn(HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, baseInstantForCompaction,
|
||||
.getLatestFileSlicesBeforeOrOn(partition, baseInstantForCompaction,
|
||||
true)
|
||||
.filter(fs -> fs.getFileId().equals(fileId)).findFirst());
|
||||
assertTrue(fileSliceForCompaction.isPresent(), "Base Instant for Compaction must be preserved");
|
||||
|
||||
@@ -59,10 +59,10 @@ public class TestMarkerBasedRollbackStrategy extends HoodieClientTestBase {
|
||||
// given: wrote some base files and corresponding markers
|
||||
HoodieTestTable testTable = HoodieTestTable.of(metaClient);
|
||||
String f0 = testTable.addRequestedCommit("000")
|
||||
.withBaseFilesInPartitions("partA").get("partA");
|
||||
.getFileIdsWithBaseFilesInPartitions("partA").get("partA");
|
||||
String f1 = testTable.addCommit("001")
|
||||
.withBaseFilesInPartition("partA", f0)
|
||||
.withBaseFilesInPartitions("partB").get("partB");
|
||||
.getFileIdsWithBaseFilesInPartitions("partB").get("partB");
|
||||
String f2 = "f2";
|
||||
testTable.forCommit("001")
|
||||
.withMarkerFile("partA", f0, IOType.MERGE)
|
||||
@@ -90,10 +90,10 @@ public class TestMarkerBasedRollbackStrategy extends HoodieClientTestBase {
|
||||
// given: wrote some base + log files and corresponding markers
|
||||
HoodieTestTable testTable = HoodieTestTable.of(metaClient);
|
||||
String f2 = testTable.addRequestedDeltaCommit("000")
|
||||
.withBaseFilesInPartitions("partA").get("partA");
|
||||
.getFileIdsWithBaseFilesInPartitions("partA").get("partA");
|
||||
String f1 = testTable.addDeltaCommit("001")
|
||||
.withLogFile("partA", f2)
|
||||
.withBaseFilesInPartitions("partB").get("partB");
|
||||
.getFileIdsWithBaseFilesInPartitions("partB").get("partB");
|
||||
String f3 = "f3";
|
||||
String f4 = "f4";
|
||||
testTable.forDeltaCommit("001")
|
||||
|
||||
Reference in New Issue
Block a user