diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/commit/TestUpsertPartitioner.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/commit/TestUpsertPartitioner.java index 61d9d1d03..3039eb3bd 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/commit/TestUpsertPartitioner.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/commit/TestUpsertPartitioner.java @@ -373,23 +373,23 @@ public class TestUpsertPartitioner extends HoodieClientTestBase { .setClusteringPlan(clusteringPlan).setOperationType(WriteOperationType.CLUSTER.name()).build(); FileCreateUtils.createRequestedReplaceCommit(basePath,"002", Option.of(requestedReplaceMetadata)); - // create file slice 002 - FileCreateUtils.createBaseFile(basePath, testPartitionPath, "002", "2", 1); - FileCreateUtils.createCommit(basePath, "002"); + // create file slice 003 + FileCreateUtils.createBaseFile(basePath, testPartitionPath, "003", "3", 1); + FileCreateUtils.createCommit(basePath, "003"); metaClient = HoodieTableMetaClient.reload(metaClient); // generate new data to be ingested HoodieTestDataGenerator dataGenerator = new HoodieTestDataGenerator(new String[] {testPartitionPath}); - List insertRecords = dataGenerator.generateInserts("003", 100); + List insertRecords = dataGenerator.generateInserts("004", 100); WorkloadProfile profile = new WorkloadProfile(buildProfile(jsc.parallelize(insertRecords))); HoodieSparkTable table = HoodieSparkTable.create(config, context, metaClient); // create UpsertPartitioner UpsertPartitioner partitioner = new UpsertPartitioner(profile, context, table, config); - // for now we have file slice1 and file slice2 and file slice1 is contained in pending clustering plan - // So that only file slice2 can be used for ingestion. + // for now we have file slice1 and file slice3 and file slice1 is contained in pending clustering plan + // So that only file slice3 can be used for ingestion. assertEquals(1, partitioner.smallFiles.size(), "Should have 1 small file to be ingested."); } diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/view/AbstractTableFileSystemView.java b/hudi-common/src/main/java/org/apache/hudi/common/table/view/AbstractTableFileSystemView.java index bfcd6f43b..208d7ef2b 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/view/AbstractTableFileSystemView.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/view/AbstractTableFileSystemView.java @@ -380,6 +380,19 @@ public abstract class AbstractTableFileSystemView implements SyncableFileSystemV && baseFile.getCommitTime().equals(compactionWithInstantTime.get().getKey()); } + /** + * With async clustering, it is possible to see partial/complete base-files due to inflight-clustering, Ignore those + * base-files. + * + * @param baseFile base File + */ + protected boolean isBaseFileDueToPendingClustering(HoodieBaseFile baseFile) { + List pendingReplaceInstants = + metaClient.getActiveTimeline().filterPendingReplaceTimeline().getInstants().map(HoodieInstant::getTimestamp).collect(Collectors.toList()); + + return !pendingReplaceInstants.isEmpty() && pendingReplaceInstants.contains(baseFile.getCommitTime()); + } + /** * Returns true if the file-group is under pending-compaction and the file-slice' baseInstant matches compaction * Instant. @@ -492,7 +505,7 @@ public abstract class AbstractTableFileSystemView implements SyncableFileSystemV .map(fileGroup -> Option.fromJavaOptional(fileGroup.getAllBaseFiles() .filter(baseFile -> HoodieTimeline.compareTimestamps(baseFile.getCommitTime(), HoodieTimeline.LESSER_THAN_OR_EQUALS, maxCommitTime )) - .filter(df -> !isBaseFileDueToPendingCompaction(df)).findFirst())) + .filter(df -> !isBaseFileDueToPendingCompaction(df) && !isBaseFileDueToPendingClustering(df)).findFirst())) .filter(Option::isPresent).map(Option::get) .map(df -> addBootstrapBaseFileIfPresent(new HoodieFileGroupId(partitionPath, df.getFileId()), df)); } finally { @@ -511,7 +524,7 @@ public abstract class AbstractTableFileSystemView implements SyncableFileSystemV } else { return fetchHoodieFileGroup(partitionPath, fileId).map(fileGroup -> fileGroup.getAllBaseFiles() .filter(baseFile -> HoodieTimeline.compareTimestamps(baseFile.getCommitTime(), HoodieTimeline.EQUALS, - instantTime)).filter(df -> !isBaseFileDueToPendingCompaction(df)).findFirst().orElse(null)) + instantTime)).filter(df -> !isBaseFileDueToPendingCompaction(df) && !isBaseFileDueToPendingClustering(df)).findFirst().orElse(null)) .map(df -> addBootstrapBaseFileIfPresent(new HoodieFileGroupId(partitionPath, fileId), df)); } } finally { @@ -547,7 +560,7 @@ public abstract class AbstractTableFileSystemView implements SyncableFileSystemV .filter(fileGroup -> !isFileGroupReplacedBeforeAny(fileGroup.getFileGroupId(), commitsToReturn)) .map(fileGroup -> Pair.of(fileGroup.getFileGroupId(), Option.fromJavaOptional( fileGroup.getAllBaseFiles().filter(baseFile -> commitsToReturn.contains(baseFile.getCommitTime()) - && !isBaseFileDueToPendingCompaction(baseFile)).findFirst()))).filter(p -> p.getValue().isPresent()) + && !isBaseFileDueToPendingCompaction(baseFile) && !isBaseFileDueToPendingClustering(baseFile)).findFirst()))).filter(p -> p.getValue().isPresent()) .map(p -> addBootstrapBaseFileIfPresent(p.getKey(), p.getValue().get())); } finally { readLock.unlock(); @@ -563,7 +576,7 @@ public abstract class AbstractTableFileSystemView implements SyncableFileSystemV return fetchAllBaseFiles(partitionPath) .filter(df -> !isFileGroupReplaced(partitionPath, df.getFileId())) .filter(df -> visibleCommitsAndCompactionTimeline.containsOrBeforeTimelineStarts(df.getCommitTime())) - .filter(df -> !isBaseFileDueToPendingCompaction(df)) + .filter(df -> !isBaseFileDueToPendingCompaction(df) && !isBaseFileDueToPendingClustering(df)) .map(df -> addBootstrapBaseFileIfPresent(new HoodieFileGroupId(partitionPath, df.getFileId()), df)); } finally { readLock.unlock(); @@ -953,7 +966,7 @@ public abstract class AbstractTableFileSystemView implements SyncableFileSystemV protected Option getLatestBaseFile(HoodieFileGroup fileGroup) { return Option - .fromJavaOptional(fileGroup.getAllBaseFiles().filter(df -> !isBaseFileDueToPendingCompaction(df)).findFirst()); + .fromJavaOptional(fileGroup.getAllBaseFiles().filter(df -> !isBaseFileDueToPendingCompaction(df) && !isBaseFileDueToPendingClustering(df)).findFirst()); } /** diff --git a/hudi-common/src/test/java/org/apache/hudi/common/table/view/TestHoodieTableFileSystemView.java b/hudi-common/src/test/java/org/apache/hudi/common/table/view/TestHoodieTableFileSystemView.java index 924c6724e..54bc138fc 100644 --- a/hudi-common/src/test/java/org/apache/hudi/common/table/view/TestHoodieTableFileSystemView.java +++ b/hudi-common/src/test/java/org/apache/hudi/common/table/view/TestHoodieTableFileSystemView.java @@ -31,6 +31,7 @@ import org.apache.hudi.common.bootstrap.FileStatusUtils; import org.apache.hudi.common.bootstrap.index.BootstrapIndex.IndexWriter; import org.apache.hudi.common.bootstrap.index.HFileBootstrapIndex; import org.apache.hudi.common.fs.FSUtils; +import org.apache.hudi.common.fs.HoodieWrapperFileSystem; import org.apache.hudi.common.model.BaseFile; import org.apache.hudi.common.model.BootstrapFileMapping; import org.apache.hudi.common.model.CompactionOperation; @@ -41,6 +42,7 @@ import org.apache.hudi.common.model.HoodieFileGroup; import org.apache.hudi.common.model.HoodieFileGroupId; import org.apache.hudi.common.model.HoodieLogFile; import org.apache.hudi.common.model.HoodieTableType; +import org.apache.hudi.common.model.HoodieWriteStat; import org.apache.hudi.common.model.WriteOperationType; import org.apache.hudi.common.table.timeline.HoodieActiveTimeline; import org.apache.hudi.common.table.timeline.HoodieInstant; @@ -50,12 +52,15 @@ import org.apache.hudi.common.table.timeline.TimelineMetadataUtils; import org.apache.hudi.common.table.view.TableFileSystemView.BaseFileOnlyView; import org.apache.hudi.common.table.view.TableFileSystemView.SliceView; import org.apache.hudi.common.testutils.HoodieCommonTestHarness; +import org.apache.hudi.common.testutils.HoodieTestTable; import org.apache.hudi.common.testutils.HoodieTestUtils; import org.apache.hudi.common.util.ClusteringUtils; import org.apache.hudi.common.util.CommitUtils; import org.apache.hudi.common.util.CompactionUtils; import org.apache.hudi.common.util.Option; +import org.apache.hudi.common.util.collection.ImmutablePair; import org.apache.hudi.common.util.collection.Pair; + import org.apache.log4j.LogManager; import org.apache.log4j.Logger; import org.junit.jupiter.api.BeforeEach; @@ -1537,6 +1542,234 @@ public class TestHoodieTableFileSystemView extends HoodieCommonTestHarness { assertFalse(fileIds.contains(fileId3)); } + /** + * + * create hoodie table like + * . + * ├── .hoodie + * │   ├── .aux + * │   │   └── .bootstrap + * │   │   ├── .fileids + * │   │   └── .partitions + * │   ├── .temp + * │   ├── 1.commit + * │   ├── 1.commit.requested + * │   ├── 1.inflight + * │   ├── 2.replacecommit + * │   ├── 2.replacecommit.inflight + * │   ├── 2.replacecommit.requested + * │   ├── 3.commit + * │   ├── 3.commit.requested + * │   ├── 3.inflight + * │   ├── archived + * │   └── hoodie.properties + * └── 2020 + * └── 06 + * └── 27 + * ├── 5fe477d2-0150-46d4-833c-1e9cc8da9948_1-0-1_3.parquet + * ├── 7e3208c8-fdec-4254-9682-8fff1e51ee8d_1-0-1_2.parquet + * ├── e04b0e2d-1467-46b2-8ea6-f4fe950965a5_1-0-1_1.parquet + * └── f3936b66-b3db-4fc8-a6d0-b1a7559016e6_1-0-1_1.parquet + * + * First test fsView API with finished clustering: + * 1. getLatestBaseFilesBeforeOrOn + * 2. getBaseFileOn + * 3. getLatestBaseFilesInRange + * 4. getAllBaseFiles + * 5. getLatestBaseFiles + * + * Then remove 2.replacecommit, 1.commit, 1.commit.requested, 1.inflight to simulate + * pending clustering at the earliest position in the active timeline and test these APIs again. + * + * @throws IOException + */ + @Test + public void testHoodieTableFileSystemViewWithPendingClustering() throws IOException { + List latestBaseFilesBeforeOrOn; + Option baseFileOn; + List latestBaseFilesInRange; + List allBaseFiles; + List latestBaseFiles; + List latestBaseFilesPerPartition; + String partitionPath = "2020/06/27"; + new File(basePath + "/" + partitionPath).mkdirs(); + HoodieActiveTimeline commitTimeline = metaClient.getActiveTimeline(); + + // will create 5 fileId in partition. + // fileId1 and fileId2 will be replaced by fileID3 + // fileId4 and fileId5 will be committed after clustering finished. + String fileId1 = UUID.randomUUID().toString(); + String fileId2 = UUID.randomUUID().toString(); + String fileId3 = UUID.randomUUID().toString(); + String fileId4 = UUID.randomUUID().toString(); + String fileId5 = UUID.randomUUID().toString(); + + assertFalse(roView.getLatestBaseFiles(partitionPath) + .anyMatch(dfile -> dfile.getFileId().equals(fileId1) + || dfile.getFileId().equals(fileId2) + || dfile.getFileId().equals(fileId3) + || dfile.getFileId().equals(fileId4) + || dfile.getFileId().equals(fileId5)), + "No commit, should not find any data file"); + + // first insert commit + String commitTime1 = "1"; + String fileName1 = FSUtils.makeDataFileName(commitTime1, TEST_WRITE_TOKEN, fileId1); + String fileName2 = FSUtils.makeDataFileName(commitTime1, TEST_WRITE_TOKEN, fileId2); + new File(basePath + "/" + partitionPath + "/" + fileName1).createNewFile(); + new File(basePath + "/" + partitionPath + "/" + fileName2).createNewFile(); + + HoodieInstant instant1 = new HoodieInstant(true, HoodieTimeline.COMMIT_ACTION, commitTime1); + + // build writeStats + HashMap> partitionToFile1 = new HashMap<>(); + ArrayList files1 = new ArrayList<>(); + files1.add(fileId1); + files1.add(fileId2); + partitionToFile1.put(partitionPath, files1); + List writeStats1 = buildWriteStats(partitionToFile1, commitTime1); + + HoodieCommitMetadata commitMetadata1 = + CommitUtils.buildMetadata(writeStats1, new HashMap<>(), Option.empty(), WriteOperationType.INSERT, "", HoodieTimeline.COMMIT_ACTION); + saveAsComplete(commitTimeline, instant1, Option.of(commitMetadata1.toJsonString().getBytes(StandardCharsets.UTF_8))); + commitTimeline.reload(); + + // replace commit + String commitTime2 = "2"; + String fileName3 = FSUtils.makeDataFileName(commitTime2, TEST_WRITE_TOKEN, fileId3); + new File(basePath + "/" + partitionPath + "/" + fileName3).createNewFile(); + + HoodieInstant instant2 = new HoodieInstant(true, HoodieTimeline.REPLACE_COMMIT_ACTION, commitTime2); + Map> partitionToReplaceFileIds = new HashMap<>(); + List replacedFileIds = new ArrayList<>(); + replacedFileIds.add(fileId1); + replacedFileIds.add(fileId2); + partitionToReplaceFileIds.put(partitionPath, replacedFileIds); + + HashMap> partitionToFile2 = new HashMap<>(); + ArrayList files2 = new ArrayList<>(); + files2.add(fileId3); + partitionToFile2.put(partitionPath, files2); + List writeStats2 = buildWriteStats(partitionToFile2, commitTime2); + + HoodieCommitMetadata commitMetadata2 = + CommitUtils.buildMetadata(writeStats2, partitionToReplaceFileIds, Option.empty(), WriteOperationType.INSERT_OVERWRITE, "", HoodieTimeline.REPLACE_COMMIT_ACTION); + saveAsComplete(commitTimeline, instant2, Option.of(commitMetadata2.toJsonString().getBytes(StandardCharsets.UTF_8))); + + // another insert commit + String commitTime3 = "3"; + String fileName4 = FSUtils.makeDataFileName(commitTime3, TEST_WRITE_TOKEN, fileId4); + new File(basePath + "/" + partitionPath + "/" + fileName4).createNewFile(); + HoodieInstant instant3 = new HoodieInstant(true, HoodieTimeline.COMMIT_ACTION, commitTime3); + + // build writeStats + HashMap> partitionToFile3 = new HashMap<>(); + ArrayList files3 = new ArrayList<>(); + files3.add(fileId4); + partitionToFile3.put(partitionPath, files3); + List writeStats3 = buildWriteStats(partitionToFile3, commitTime3); + HoodieCommitMetadata commitMetadata3 = + CommitUtils.buildMetadata(writeStats3, new HashMap<>(), Option.empty(), WriteOperationType.INSERT, "", HoodieTimeline.COMMIT_ACTION); + saveAsComplete(commitTimeline, instant3, Option.of(commitMetadata3.toJsonString().getBytes(StandardCharsets.UTF_8))); + + metaClient.reloadActiveTimeline(); + refreshFsView(); + + ArrayList commits = new ArrayList<>(); + commits.add(commitTime1); + commits.add(commitTime2); + commits.add(commitTime3); + + // do check + latestBaseFilesBeforeOrOn = fsView.getLatestBaseFilesBeforeOrOn(partitionPath, commitTime3).map(HoodieBaseFile::getFileId).collect(Collectors.toList()); + assertEquals(2, latestBaseFilesBeforeOrOn.size()); + assertTrue(latestBaseFilesBeforeOrOn.contains(fileId3)); + assertTrue(latestBaseFilesBeforeOrOn.contains(fileId4)); + + // could see fileId3 because clustering is committed. + baseFileOn = fsView.getBaseFileOn(partitionPath, commitTime2, fileId3); + assertTrue(baseFileOn.isPresent()); + assertEquals(baseFileOn.get().getFileId(), fileId3); + + latestBaseFilesInRange = fsView.getLatestBaseFilesInRange(commits).map(HoodieBaseFile::getFileId).collect(Collectors.toList()); + assertEquals(2, latestBaseFilesInRange.size()); + assertTrue(latestBaseFilesInRange.contains(fileId3)); + assertTrue(latestBaseFilesInRange.contains(fileId4)); + + allBaseFiles = fsView.getAllBaseFiles(partitionPath).map(HoodieBaseFile::getFileId).collect(Collectors.toList()); + assertEquals(2, allBaseFiles.size()); + assertTrue(allBaseFiles.contains(fileId3)); + assertTrue(allBaseFiles.contains(fileId4)); + + // could see fileId3 because clustering is committed. + latestBaseFiles = fsView.getLatestBaseFiles().map(HoodieBaseFile::getFileId).collect(Collectors.toList()); + assertEquals(2, latestBaseFiles.size()); + assertTrue(allBaseFiles.contains(fileId3)); + assertTrue(allBaseFiles.contains(fileId4)); + + // could see fileId3 because clustering is committed. + latestBaseFilesPerPartition = fsView.getLatestBaseFiles(partitionPath).map(HoodieBaseFile::getFileId).collect(Collectors.toList()); + assertEquals(2, latestBaseFiles.size()); + assertTrue(latestBaseFilesPerPartition.contains(fileId3)); + assertTrue(latestBaseFilesPerPartition.contains(fileId4)); + + HoodieWrapperFileSystem fs = metaClient.getFs(); + fs.delete(new Path(basePath + "/.hoodie", "1.commit"), false); + fs.delete(new Path(basePath + "/.hoodie", "1.inflight"), false); + fs.delete(new Path(basePath + "/.hoodie", "1.commit.requested"), false); + fs.delete(new Path(basePath + "/.hoodie", "2.replacecommit"), false); + + metaClient.reloadActiveTimeline(); + refreshFsView(); + // do check after delete some commit file + latestBaseFilesBeforeOrOn = fsView.getLatestBaseFilesBeforeOrOn(partitionPath, commitTime3).map(HoodieBaseFile::getFileId).collect(Collectors.toList()); + assertEquals(3, latestBaseFilesBeforeOrOn.size()); + assertTrue(latestBaseFilesBeforeOrOn.contains(fileId1)); + assertTrue(latestBaseFilesBeforeOrOn.contains(fileId2)); + assertTrue(latestBaseFilesBeforeOrOn.contains(fileId4)); + + // couldn't see fileId3 because clustering is not committed. + baseFileOn = fsView.getBaseFileOn(partitionPath, commitTime2, fileId3); + assertFalse(baseFileOn.isPresent()); + + latestBaseFilesInRange = fsView.getLatestBaseFilesInRange(commits).map(HoodieBaseFile::getFileId).collect(Collectors.toList()); + assertEquals(3, latestBaseFilesInRange.size()); + assertTrue(latestBaseFilesInRange.contains(fileId1)); + assertTrue(latestBaseFilesInRange.contains(fileId2)); + assertTrue(latestBaseFilesInRange.contains(fileId4)); + + allBaseFiles = fsView.getAllBaseFiles(partitionPath).map(HoodieBaseFile::getFileId).collect(Collectors.toList()); + assertEquals(3, allBaseFiles.size()); + assertTrue(allBaseFiles.contains(fileId1)); + assertTrue(allBaseFiles.contains(fileId2)); + assertTrue(allBaseFiles.contains(fileId4)); + + // couldn't see fileId3 because clustering is not committed. + latestBaseFiles = fsView.getLatestBaseFiles().map(HoodieBaseFile::getFileId).collect(Collectors.toList()); + assertEquals(3, latestBaseFiles.size()); + assertTrue(allBaseFiles.contains(fileId1)); + assertTrue(allBaseFiles.contains(fileId2)); + assertTrue(allBaseFiles.contains(fileId4)); + + // couldn't see fileId3 because clustering is not committed. + latestBaseFilesPerPartition = fsView.getLatestBaseFiles(partitionPath).map(HoodieBaseFile::getFileId).collect(Collectors.toList()); + assertEquals(3, latestBaseFiles.size()); + assertTrue(latestBaseFilesPerPartition.contains(fileId1)); + assertTrue(latestBaseFilesPerPartition.contains(fileId2)); + assertTrue(latestBaseFilesPerPartition.contains(fileId4)); + } + + + // Generate Hoodie WriteStat For Given Partition + private List buildWriteStats(HashMap> partitionToFileIds, String commitTime) { + HashMap>> maps = new HashMap<>(); + for (String partition : partitionToFileIds.keySet()) { + List> list = partitionToFileIds.get(partition).stream().map(fileId -> new ImmutablePair(fileId, 0)).collect(Collectors.toList()); + maps.put(partition, list); + } + return HoodieTestTable.generateHoodieWriteStatForPartition(maps, commitTime, false); + } + @Override protected HoodieTableType getTableType() { return HoodieTableType.MERGE_ON_READ; diff --git a/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestTable.java b/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestTable.java index c55a389e2..f78312217 100644 --- a/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestTable.java +++ b/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestTable.java @@ -1044,7 +1044,7 @@ public class HoodieTestTable { return testTableState; } - private static List generateHoodieWriteStatForPartition(Map>> partitionToFileIdMap, + public static List generateHoodieWriteStatForPartition(Map>> partitionToFileIdMap, String commitTime, boolean bootstrap) { List writeStats = new ArrayList<>(); for (Map.Entry>> entry : partitionToFileIdMap.entrySet()) {