From b68c5a68f9ba21f253256a49aacd9f4aee49e039 Mon Sep 17 00:00:00 2001 From: Manoj Govindassamy Date: Wed, 20 Oct 2021 15:57:00 -0700 Subject: [PATCH] [HUDI-2472] Fix few Cleaner tests with metadata table enabled (#3825) --- ...tFileWritesConflictResolutionStrategy.java | 4 +- .../testutils/HoodieMetadataTestTable.java | 24 ++- .../utils/TestMetadataConversionUtils.java | 4 +- .../org/apache/hudi/table/TestCleaner.java | 59 ++++--- .../common/testutils/FileCreateUtils.java | 9 +- .../common/testutils/HoodieTestTable.java | 157 ++++++++++++++---- 6 files changed, 195 insertions(+), 62 deletions(-) diff --git a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/client/transaction/TestSimpleConcurrentFileWritesConflictResolutionStrategy.java b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/client/transaction/TestSimpleConcurrentFileWritesConflictResolutionStrategy.java index abe2a9456..afe8e05aa 100644 --- a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/client/transaction/TestSimpleConcurrentFileWritesConflictResolutionStrategy.java +++ b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/client/transaction/TestSimpleConcurrentFileWritesConflictResolutionStrategy.java @@ -314,7 +314,7 @@ public class TestSimpleConcurrentFileWritesConflictResolutionStrategy extends Ho commitMetadata.addWriteStat(HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, writeStat); commitMetadata.setOperationType(WriteOperationType.INSERT); HoodieTestTable.of(metaClient) - .addCommit(instantTime, commitMetadata) + .addCommit(instantTime, Option.of(commitMetadata)) .withBaseFilesInPartition(HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, fileId1, fileId2); } @@ -362,7 +362,7 @@ public class TestSimpleConcurrentFileWritesConflictResolutionStrategy extends Ho writeStat.setFileId("file-1"); commitMetadata.addWriteStat(HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, writeStat); HoodieTestTable.of(metaClient) - .addCommit(instantTime, commitMetadata) + .addCommit(instantTime, Option.of(commitMetadata)) .withBaseFilesInPartition(HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, fileId1, fileId2); } diff --git a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/common/testutils/HoodieMetadataTestTable.java b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/common/testutils/HoodieMetadataTestTable.java index 801c8463b..bbaf07374 100644 --- a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/common/testutils/HoodieMetadataTestTable.java +++ b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/common/testutils/HoodieMetadataTestTable.java @@ -27,6 +27,7 @@ import org.apache.hudi.common.model.HoodieReplaceCommitMetadata; import org.apache.hudi.common.model.WriteOperationType; import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.common.util.Option; +import org.apache.hudi.common.util.collection.Pair; import org.apache.hudi.metadata.HoodieTableMetadataWriter; import org.apache.hadoop.fs.FileSystem; @@ -40,7 +41,7 @@ import java.util.Map; */ public class HoodieMetadataTestTable extends HoodieTestTable { - private HoodieTableMetadataWriter writer; + private final HoodieTableMetadataWriter writer; protected HoodieMetadataTestTable(String basePath, FileSystem fs, HoodieTableMetaClient metaClient, HoodieTableMetadataWriter writer) { super(basePath, fs, metaClient); @@ -56,11 +57,25 @@ public class HoodieMetadataTestTable extends HoodieTestTable { return new HoodieMetadataTestTable(metaClient.getBasePath(), metaClient.getRawFs(), metaClient, writer); } + /** + * Add commits to the requested partitions and update metadata table. + * + * @param commitTime - Commit time for the operation + * @param operationType - Operation type + * @param newPartitionsToAdd - New partitions to add for the operation + * @param partitionToFilesNameLengthMap - Map of partition names to its list of files name and length pair + * @param bootstrap - Whether bootstrapping needed for the operation + * @param createInflightCommit - Whether in flight commit needed for the operation + * @return Commit metadata for the commit operation performed. + * @throws Exception + */ @Override public HoodieCommitMetadata doWriteOperation(String commitTime, WriteOperationType operationType, - List newPartitionsToAdd, List partitions, - int filesPerPartition, boolean bootstrap, boolean createInflightCommit) throws Exception { - HoodieCommitMetadata commitMetadata = super.doWriteOperation(commitTime, operationType, newPartitionsToAdd, partitions, filesPerPartition, bootstrap, createInflightCommit); + List newPartitionsToAdd, + Map>> partitionToFilesNameLengthMap, + boolean bootstrap, boolean createInflightCommit) throws Exception { + HoodieCommitMetadata commitMetadata = super.doWriteOperation(commitTime, operationType, newPartitionsToAdd, + partitionToFilesNameLengthMap, bootstrap, createInflightCommit); if (writer != null && !createInflightCommit) { writer.update(commitMetadata, commitTime); } @@ -140,4 +155,5 @@ public class HoodieMetadataTestTable extends HoodieTestTable { } return this; } + } diff --git a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/utils/TestMetadataConversionUtils.java b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/utils/TestMetadataConversionUtils.java index eb2ebd951..c0952bc5a 100644 --- a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/utils/TestMetadataConversionUtils.java +++ b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/utils/TestMetadataConversionUtils.java @@ -176,7 +176,7 @@ public class TestMetadataConversionUtils extends HoodieCommonTestHarness { commitMetadata.setOperationType(WriteOperationType.COMPACT); commitMetadata.setCompacted(true); HoodieTestTable.of(metaClient) - .addCommit(instantTime, commitMetadata) + .addCommit(instantTime, Option.of(commitMetadata)) .withBaseFilesInPartition(HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, fileId1, fileId2); } @@ -206,7 +206,7 @@ public class TestMetadataConversionUtils extends HoodieCommonTestHarness { commitMetadata.addMetadata("test", "test"); commitMetadata.setOperationType(WriteOperationType.INSERT); HoodieTestTable.of(metaClient) - .addCommit(instantTime, commitMetadata) + .addCommit(instantTime, Option.of(commitMetadata)) .withBaseFilesInPartition(HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, fileId1, fileId2); } diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/TestCleaner.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/TestCleaner.java index 973200eb4..8714df026 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/TestCleaner.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/TestCleaner.java @@ -63,6 +63,7 @@ import org.apache.hudi.common.table.timeline.versioning.clean.CleanMetadataMigra import org.apache.hudi.common.table.timeline.versioning.clean.CleanPlanMigrator; import org.apache.hudi.common.table.timeline.versioning.clean.CleanPlanV1MigrationHandler; import org.apache.hudi.common.table.view.TableFileSystemView; +import org.apache.hudi.common.testutils.HoodieMetadataTestTable; import org.apache.hudi.common.testutils.HoodieTestTable; import org.apache.hudi.common.testutils.HoodieTestUtils; import org.apache.hudi.common.util.CleanerUtils; @@ -76,6 +77,8 @@ import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.exception.HoodieIOException; import org.apache.hudi.index.HoodieIndex; import org.apache.hudi.index.SparkHoodieIndex; +import org.apache.hudi.metadata.HoodieTableMetadataWriter; +import org.apache.hudi.metadata.SparkHoodieBackedTableMetadataWriter; import org.apache.hudi.table.action.clean.CleanPlanner; import org.apache.hudi.testutils.HoodieClientTestBase; import org.apache.log4j.LogManager; @@ -635,29 +638,38 @@ public class TestCleaner extends HoodieClientTestBase { } /** - * Test HoodieTable.clean() Cleaning by versions logic. + * Test Hudi COW Table Cleaner - Keep the latest file versions policy. */ @ParameterizedTest @ValueSource(booleans = {false, true}) public void testKeepLatestFileVersions(Boolean enableBootstrapSourceClean) throws Exception { HoodieWriteConfig config = HoodieWriteConfig.newBuilder().withPath(basePath) - .withMetadataConfig(HoodieMetadataConfig.newBuilder().withAssumeDatePartitioning(true).enable(false).build()) + .withMetadataConfig(HoodieMetadataConfig.newBuilder().withAssumeDatePartitioning(true).enable(true).build()) .withCompactionConfig(HoodieCompactionConfig.newBuilder() .withCleanBootstrapBaseFileEnabled(enableBootstrapSourceClean) .withCleanerPolicy(HoodieCleaningPolicy.KEEP_LATEST_FILE_VERSIONS).retainFileVersions(1).build()) .build(); - HoodieTestTable testTable = HoodieTestTable.of(metaClient); - String p0 = "2020/01/01"; - String p1 = "2020/01/02"; - Map> bootstrapMapping = enableBootstrapSourceClean ? generateBootstrapIndexAndSourceData(p0, p1) : null; + + HoodieTableMetadataWriter metadataWriter = SparkHoodieBackedTableMetadataWriter.create(hadoopConf, config, context); + HoodieTestTable testTable = HoodieMetadataTestTable.of(metaClient, metadataWriter); + + final String p0 = "2020/01/01"; + final String p1 = "2020/01/02"; + final Map> bootstrapMapping = enableBootstrapSourceClean + ? generateBootstrapIndexAndSourceData(p0, p1) : null; // make 1 commit, with 1 file per partition - String file1P0C0 = enableBootstrapSourceClean ? bootstrapMapping.get(p0).get(0).getFileId() + final String file1P0C0 = enableBootstrapSourceClean ? bootstrapMapping.get(p0).get(0).getFileId() : UUID.randomUUID().toString(); - String file1P1C0 = enableBootstrapSourceClean ? bootstrapMapping.get(p1).get(0).getFileId() + final String file1P1C0 = enableBootstrapSourceClean ? bootstrapMapping.get(p1).get(0).getFileId() : UUID.randomUUID().toString(); - testTable.addCommit("00000000000001").withBaseFilesInPartition(p0, file1P0C0).withBaseFilesInPartition(p1, file1P1C0); + + Map>> c1PartitionToFilesNameLengthMap = new HashMap<>(); + c1PartitionToFilesNameLengthMap.put(p0, Collections.singletonList(Pair.of(file1P0C0, 100))); + c1PartitionToFilesNameLengthMap.put(p1, Collections.singletonList(Pair.of(file1P1C0, 200))); + testTable.doWriteOperation("00000000000001", WriteOperationType.INSERT, Arrays.asList(p0, p1), + c1PartitionToFilesNameLengthMap, false, false); List hoodieCleanStatsOne = runCleaner(config); assertEquals(0, hoodieCleanStatsOne.size(), "Must not clean any files"); @@ -665,17 +677,21 @@ public class TestCleaner extends HoodieClientTestBase { assertTrue(testTable.baseFileExists(p1, "00000000000001", file1P1C0)); // make next commit, with 1 insert & 1 update per partition - Map partitionAndFileId002 = testTable.addCommit("00000000000002") - .withBaseFilesInPartition(p0, file1P0C0) - .withBaseFilesInPartition(p1, file1P1C0) - .getFileIdsWithBaseFilesInPartitions(p0, p1); + final String file2P0C1 = UUID.randomUUID().toString(); + final String file2P1C1 = UUID.randomUUID().toString(); + Map>> c2PartitionToFilesNameLengthMap = new HashMap<>(); + c2PartitionToFilesNameLengthMap.put(p0, Arrays.asList(Pair.of(file1P0C0, 101), Pair.of(file2P0C1, 100))); + c2PartitionToFilesNameLengthMap.put(p1, Arrays.asList(Pair.of(file1P1C0, 201), Pair.of(file2P1C1, 200))); + testTable.doWriteOperation("00000000000002", WriteOperationType.UPSERT, Collections.emptyList(), + c2PartitionToFilesNameLengthMap, false, false); + // enableBootstrapSourceClean would delete the bootstrap base file at the same time List hoodieCleanStatsTwo = runCleaner(config, 1); - // enableBootstrapSourceClean would delete the bootstrap base file as the same time HoodieCleanStat cleanStat = getCleanStat(hoodieCleanStatsTwo, p0); assertEquals(enableBootstrapSourceClean ? 2 : 1, cleanStat.getSuccessDeleteFiles().size() + (cleanStat.getSuccessDeleteBootstrapBaseFiles() == null ? 0 : cleanStat.getSuccessDeleteBootstrapBaseFiles().size()), "Must clean at least 1 file"); + if (enableBootstrapSourceClean) { HoodieFileStatus fstatus = bootstrapMapping.get(p0).get(0).getBootstrapFileStatus(); @@ -686,9 +702,8 @@ public class TestCleaner extends HoodieClientTestBase { assertFalse(Files.exists(Paths.get(bootstrapMapping.get( p0).get(0).getBootstrapFileStatus().getPath().getUri()))); } + cleanStat = getCleanStat(hoodieCleanStatsTwo, p1); - String file2P0C1 = partitionAndFileId002.get(p0); - String file2P1C1 = partitionAndFileId002.get(p1); assertTrue(testTable.baseFileExists(p0, "00000000000002", file2P0C1)); assertTrue(testTable.baseFileExists(p1, "00000000000002", file2P1C1)); assertFalse(testTable.baseFileExists(p0, "00000000000001", file1P0C0)); @@ -696,6 +711,7 @@ public class TestCleaner extends HoodieClientTestBase { assertEquals(enableBootstrapSourceClean ? 2 : 1, cleanStat.getSuccessDeleteFiles().size() + (cleanStat.getSuccessDeleteBootstrapBaseFiles() == null ? 0 : cleanStat.getSuccessDeleteBootstrapBaseFiles().size()), "Must clean at least 1 file"); + if (enableBootstrapSourceClean) { HoodieFileStatus fstatus = bootstrapMapping.get(p1).get(0).getBootstrapFileStatus(); @@ -708,9 +724,13 @@ public class TestCleaner extends HoodieClientTestBase { } // make next commit, with 2 updates to existing files, and 1 insert - String file3P0C2 = testTable.addCommit("00000000000003") - .withBaseFilesInPartition(p0, file1P0C0, file2P0C1) - .getFileIdsWithBaseFilesInPartitions(p0).get(p0); + final String file3P0C2 = UUID.randomUUID().toString(); + Map>> c3PartitionToFilesNameLengthMap = new HashMap<>(); + c3PartitionToFilesNameLengthMap.put(p0, Arrays.asList(Pair.of(file1P0C0, 102), Pair.of(file2P0C1, 101), + Pair.of(file3P0C2, 100))); + testTable.doWriteOperation("00000000000003", WriteOperationType.UPSERT, Collections.emptyList(), + c3PartitionToFilesNameLengthMap, false, false); + List hoodieCleanStatsThree = runCleaner(config, 3); assertEquals(2, getCleanStat(hoodieCleanStatsThree, p0) @@ -721,6 +741,7 @@ public class TestCleaner extends HoodieClientTestBase { // No cleaning on partially written file, with no commit. testTable.forCommit("00000000000004").withBaseFilesInPartition(p0, file3P0C2); + List hoodieCleanStatsFour = runCleaner(config); assertEquals(0, hoodieCleanStatsFour.size(), "Must not clean any files"); assertTrue(testTable.baseFileExists(p0, "00000000000003", file3P0C2)); diff --git a/hudi-common/src/test/java/org/apache/hudi/common/testutils/FileCreateUtils.java b/hudi-common/src/test/java/org/apache/hudi/common/testutils/FileCreateUtils.java index 33a695ed7..491ad32f9 100644 --- a/hudi-common/src/test/java/org/apache/hudi/common/testutils/FileCreateUtils.java +++ b/hudi-common/src/test/java/org/apache/hudi/common/testutils/FileCreateUtils.java @@ -144,8 +144,13 @@ public class FileCreateUtils { createMetaFile(basePath, instantTime, HoodieTimeline.COMMIT_EXTENSION); } - public static void createCommit(String basePath, String instantTime, HoodieCommitMetadata metadata) throws IOException { - createMetaFile(basePath, instantTime, HoodieTimeline.COMMIT_EXTENSION, metadata.toJsonString().getBytes(StandardCharsets.UTF_8)); + public static void createCommit(String basePath, String instantTime, Option metadata) throws IOException { + if (metadata.isPresent()) { + createMetaFile(basePath, instantTime, HoodieTimeline.COMMIT_EXTENSION, + metadata.get().toJsonString().getBytes(StandardCharsets.UTF_8)); + } else { + createMetaFile(basePath, instantTime, HoodieTimeline.COMMIT_EXTENSION); + } } public static void createCommit(String basePath, String instantTime, FileSystem fs) throws IOException { diff --git a/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestTable.java b/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestTable.java index 2a829b596..2018ae28c 100644 --- a/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestTable.java +++ b/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestTable.java @@ -188,14 +188,19 @@ public class HoodieTestTable { } public HoodieTestTable addCommit(String instantTime) throws Exception { + return addCommit(instantTime, Option.empty()); + } + + public HoodieTestTable addCommit(String instantTime, Option metadata) throws Exception { createRequestedCommit(basePath, instantTime); createInflightCommit(basePath, instantTime); - createCommit(basePath, instantTime); + createCommit(basePath, instantTime, metadata); currentInstantTime = instantTime; return this; } - public HoodieCommitMetadata createCommitMetadata(WriteOperationType operationType, String commitTime, HoodieTestTableState testTableState) { + public HoodieCommitMetadata createCommitMetadata(WriteOperationType operationType, String commitTime, + HoodieTestTableState testTableState) { String actionType = getCommitActionType(operationType, metaClient.getTableType()); return createCommitMetadata(operationType, commitTime, Collections.emptyMap(), testTableState, false, actionType); } @@ -203,7 +208,8 @@ public class HoodieTestTable { public HoodieCommitMetadata createCommitMetadata(WriteOperationType operationType, String commitTime, HoodieTestTableState testTableState, boolean bootstrap) { String actionType = getCommitActionType(operationType, metaClient.getTableType()); - return createCommitMetadata(operationType, commitTime, Collections.emptyMap(), testTableState, bootstrap, actionType); + return createCommitMetadata(operationType, commitTime, Collections.emptyMap(), testTableState, bootstrap, + actionType); } public HoodieCommitMetadata createCommitMetadata(WriteOperationType operationType, String commitTime, @@ -217,17 +223,9 @@ public class HoodieTestTable { return buildMetadata(writeStats, partitionToReplaceFileIds, Option.of(extraMetadata), operationType, EMPTY_STRING, action); } - public HoodieTestTable addCommit(String instantTime, HoodieCommitMetadata metadata) throws Exception { - createRequestedCommit(basePath, instantTime); - createInflightCommit(basePath, instantTime); - createCommit(basePath, instantTime, metadata); - currentInstantTime = instantTime; - return this; - } - public HoodieTestTable moveInflightCommitToComplete(String instantTime, HoodieCommitMetadata metadata) throws IOException { if (metaClient.getTableType() == HoodieTableType.COPY_ON_WRITE) { - createCommit(basePath, instantTime, metadata); + createCommit(basePath, instantTime, Option.of(metadata)); } else { createDeltaCommit(basePath, instantTime, metadata); } @@ -425,11 +423,11 @@ public class HoodieTestTable { public HoodieTestTable addCompaction(String instantTime, HoodieCommitMetadata commitMetadata) throws Exception { createRequestedCompaction(basePath, instantTime); createInflightCompaction(basePath, instantTime); - return addCommit(instantTime, commitMetadata); + return addCommit(instantTime, Option.of(commitMetadata)); } public HoodieTestTable moveInflightCompactionToComplete(String instantTime, HoodieCommitMetadata metadata) throws IOException { - createCommit(basePath, instantTime, metadata); + createCommit(basePath, instantTime, Option.of(metadata)); inflightCommits.remove(instantTime); currentInstantTime = instantTime; return this; @@ -834,16 +832,45 @@ public class HoodieTestTable { public HoodieCommitMetadata doWriteOperation(String commitTime, WriteOperationType operationType, List partitions, int filesPerPartition, boolean bootstrap) throws Exception { - return doWriteOperation(commitTime, operationType, Collections.emptyList(), partitions, filesPerPartition, bootstrap, false); + return doWriteOperation(commitTime, operationType, Collections.emptyList(), partitions, filesPerPartition, + bootstrap, false); } public HoodieCommitMetadata doWriteOperation(String commitTime, WriteOperationType operationType, List newPartitionsToAdd, List partitions, - int filesPerPartition, boolean bootstrap, boolean createInflightCommit) throws Exception { + int filesPerPartition, boolean bootstrap, + boolean createInflightCommit) throws Exception { if (partitions.isEmpty()) { partitions = Collections.singletonList(EMPTY_STRING); } - HoodieTestTableState testTableState = getTestTableStateWithPartitionFileInfo(operationType, metaClient.getTableType(), commitTime, partitions, filesPerPartition); + + Map>> partitionToFilesNameLengthMap = getPartitionFiles(partitions, + filesPerPartition); + return doWriteOperation(commitTime, operationType, newPartitionsToAdd, partitionToFilesNameLengthMap, bootstrap, + createInflightCommit); + } + + /** + * Add commits to the requested partitions. + * + * @param commitTime - Commit time for the operation + * @param operationType - Operation type + * @param newPartitionsToAdd - New partitions to add for the operation + * @param partitionToFilesNameLengthMap - Map of partition names to its list of files name and length pair + * @param bootstrap - Whether bootstrapping needed for the operation + * @param createInflightCommit - Whether in flight commit needed for the operation + * @return Commit metadata for the commit operation performed. + * @throws Exception + */ + public HoodieCommitMetadata doWriteOperation(String commitTime, WriteOperationType operationType, + List newPartitionsToAdd, + Map>> partitionToFilesNameLengthMap, + boolean bootstrap, boolean createInflightCommit) throws Exception { + if (partitionToFilesNameLengthMap.isEmpty()) { + partitionToFilesNameLengthMap = Collections.singletonMap(EMPTY_STRING, Collections.EMPTY_LIST); + } + HoodieTestTableState testTableState = getTestTableStateWithPartitionFileInfo(operationType, + metaClient.getTableType(), commitTime, partitionToFilesNameLengthMap); HoodieCommitMetadata commitMetadata = createCommitMetadata(operationType, commitTime, testTableState, bootstrap); for (String str : newPartitionsToAdd) { this.withPartitionMetaFiles(str); @@ -856,12 +883,13 @@ public class HoodieTestTable { } } else { if (metaClient.getTableType() == HoodieTableType.COPY_ON_WRITE) { - this.addCommit(commitTime, commitMetadata); + this.addCommit(commitTime, Option.of(commitMetadata)); } else { this.addDeltaCommit(commitTime, commitMetadata); } } - for (String partition : partitions) { + for (Map.Entry>> entry : partitionToFilesNameLengthMap.entrySet()) { + String partition = entry.getKey(); this.withBaseFilesInPartition(partition, testTableState.getPartitionToBaseFileInfoMap(commitTime).get(partition)); if (MERGE_ON_READ.equals(metaClient.getTableType()) && UPSERT.equals(operationType)) { this.withLogFilesInPartition(partition, testTableState.getPartitionToLogFileInfoMap(commitTime).get(partition)); @@ -911,15 +939,69 @@ public class HoodieTestTable { return partitionFilesToDelete; } - private static HoodieTestTableState getTestTableStateWithPartitionFileInfo(WriteOperationType operationType, HoodieTableType tableType, String commitTime, - List partitions, int filesPerPartition) { + /** + * Generate partition files names and length details. + * + * @param partitions - List of partition for which file details need to be generated + * @param filesPerPartition - File count per partition + * @return Map of partition to its collection of files name and length pair + */ + protected static Map>> getPartitionFiles(List partitions, + int filesPerPartition) { + Map>> partitionToFilesNameLengthMap = new HashMap<>(); for (String partition : partitions) { Stream fileLengths = IntStream.range(0, filesPerPartition).map(i -> 100 + RANDOM.nextInt(500)).boxed(); + List> fileNameAndLengthList = + fileLengths.map(len -> Pair.of(UUID.randomUUID().toString(), len)).collect(Collectors.toList()); + partitionToFilesNameLengthMap.put(partition, fileNameAndLengthList); + } + return partitionToFilesNameLengthMap; + } + + /** + * Get Test table state for the requested partitions and file count. + * + * @param operationType - Table write operation type + * @param tableType - Hudi table type + * @param commitTime - Write commit time + * @param partitions - List of partition names + * @param filesPerPartition - Total file count per partition + * @return Test table state for the requested partitions and file count + */ + private static HoodieTestTableState getTestTableStateWithPartitionFileInfo(WriteOperationType operationType, + HoodieTableType tableType, + String commitTime, + List partitions, + int filesPerPartition) { + Map>> partitionToFilesNameLengthMap = getPartitionFiles(partitions, + filesPerPartition); + return getTestTableStateWithPartitionFileInfo(operationType, tableType, commitTime, partitionToFilesNameLengthMap); + } + + /** + * Get Test table state for the requested partitions and files. + * + * @param operationType - Table write operation type + * @param tableType - Hudi table type + * @param commitTime - Write commit time + * @param partitionToFilesNameLengthMap - Map of partition names to its list of files and their lengths + * @return Test tabke state for the requested partitions and files + */ + private static HoodieTestTableState getTestTableStateWithPartitionFileInfo(WriteOperationType operationType, + HoodieTableType tableType, + String commitTime, + Map>> partitionToFilesNameLengthMap) { + for (Map.Entry>> partitionEntry : partitionToFilesNameLengthMap.entrySet()) { + String partitionName = partitionEntry.getKey(); + List> fileNameAndLengthList = partitionEntry.getValue(); if (MERGE_ON_READ.equals(tableType) && UPSERT.equals(operationType)) { - List> fileVersionAndLength = fileLengths.map(len -> Pair.of(0, len)).collect(Collectors.toList()); - testTableState = testTableState.createTestTableStateForBaseAndLogFiles(commitTime, partition, fileVersionAndLength); + List> fileVersionAndLength = + fileNameAndLengthList.stream().map(nameLengthPair -> Pair.of(0, nameLengthPair.getRight())).collect(Collectors.toList()); + testTableState = testTableState.createTestTableStateForBaseAndLogFiles(commitTime, partitionName, + fileVersionAndLength); } else { - testTableState = testTableState.createTestTableStateForBaseFilesOnly(commitTime, partition, fileLengths.collect(Collectors.toList())); + testTableState = testTableState.createTestTableStateForBaseFilesOnly(commitTime, partitionName, + fileNameAndLengthList); } } return testTableState; @@ -1015,7 +1097,17 @@ public class HoodieTestTable { return this.commitsToPartitionToFileIdForCleaner.get(commitTime); } - HoodieTestTableState createTestTableStateForBaseFilesOnly(String commitTime, String partitionPath, List lengths) { + HoodieTestTableState createTestTableStateForBaseFileLengthsOnly(String commitTime, String partitionPath, + List lengths) { + List> fileNameLengthList = new ArrayList<>(); + for (int length : lengths) { + fileNameLengthList.add(Pair.of(UUID.randomUUID().toString(), length)); + } + return createTestTableStateForBaseFilesOnly(commitTime, partitionPath, fileNameLengthList); + } + + HoodieTestTableState createTestTableStateForBaseFilesOnly(String commitTime, String partitionPath, + List> fileNameAndLengthList) { if (!commitsToPartitionToBaseFileInfoStats.containsKey(commitTime)) { commitsToPartitionToBaseFileInfoStats.put(commitTime, new HashMap<>()); } @@ -1023,20 +1115,19 @@ public class HoodieTestTable { this.commitsToPartitionToBaseFileInfoStats.get(commitTime).put(partitionPath, new ArrayList<>()); } - List> fileInfos = new ArrayList<>(); - for (int length : lengths) { - fileInfos.add(Pair.of(UUID.randomUUID().toString(), length)); - } - this.commitsToPartitionToBaseFileInfoStats.get(commitTime).get(partitionPath).addAll(fileInfos); + this.commitsToPartitionToBaseFileInfoStats.get(commitTime).get(partitionPath).addAll(fileNameAndLengthList); return this; } - HoodieTestTableState createTestTableStateForBaseAndLogFiles(String commitTime, String partitionPath, List> versionsAndLengths) { + HoodieTestTableState createTestTableStateForBaseAndLogFiles(String commitTime, String partitionPath, + List> versionsAndLengths) { if (!commitsToPartitionToBaseFileInfoStats.containsKey(commitTime)) { - createTestTableStateForBaseFilesOnly(commitTime, partitionPath, versionsAndLengths.stream().map(Pair::getRight).collect(Collectors.toList())); + createTestTableStateForBaseFileLengthsOnly(commitTime, partitionPath, + versionsAndLengths.stream().map(Pair::getRight).collect(Collectors.toList())); } if (!this.commitsToPartitionToBaseFileInfoStats.get(commitTime).containsKey(partitionPath)) { - createTestTableStateForBaseFilesOnly(commitTime, partitionPath, versionsAndLengths.stream().map(Pair::getRight).collect(Collectors.toList())); + createTestTableStateForBaseFileLengthsOnly(commitTime, partitionPath, + versionsAndLengths.stream().map(Pair::getRight).collect(Collectors.toList())); } if (!commitsToPartitionToLogFileInfoStats.containsKey(commitTime)) { commitsToPartitionToLogFileInfoStats.put(commitTime, new HashMap<>());