diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java index 8a59e2c11..83ad16d8e 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java @@ -697,7 +697,6 @@ public abstract class HoodieBackedTableMetadataWriter implements HoodieTableMeta return; } - // Trigger compaction with suffixes based on the same instant time. This ensures that any future // delta commits synced over will not have an instant time lesser than the last completed instant on the // metadata table. diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/metadata/FlinkHoodieBackedTableMetadataWriter.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/metadata/FlinkHoodieBackedTableMetadataWriter.java index d11f570a6..204a8fc31 100644 --- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/metadata/FlinkHoodieBackedTableMetadataWriter.java +++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/metadata/FlinkHoodieBackedTableMetadataWriter.java @@ -154,7 +154,7 @@ public class FlinkHoodieBackedTableMetadataWriter extends HoodieBackedTableMetad * The record is tagged with respective file slice's location based on its record key. */ private List prepRecords(List records, String partitionName, int numFileGroups) { - List fileSlices = HoodieTableMetadataUtil.loadPartitionFileGroupsWithLatestFileSlices(metadataMetaClient, partitionName, false); + List fileSlices = HoodieTableMetadataUtil.getPartitionLatestFileSlices(metadataMetaClient, partitionName); ValidationUtils.checkArgument(fileSlices.size() == numFileGroups, String.format("Invalid number of file groups: found=%d, required=%d", fileSlices.size(), numFileGroups)); return records.stream().map(r -> { diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/metadata/SparkHoodieBackedTableMetadataWriter.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/metadata/SparkHoodieBackedTableMetadataWriter.java index b7b5961e4..5aa6917d6 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/metadata/SparkHoodieBackedTableMetadataWriter.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/metadata/SparkHoodieBackedTableMetadataWriter.java @@ -169,7 +169,7 @@ public class SparkHoodieBackedTableMetadataWriter extends HoodieBackedTableMetad * The record is tagged with respective file slice's location based on its record key. */ private JavaRDD prepRecords(JavaRDD recordsRDD, String partitionName, int numFileGroups) { - List fileSlices = HoodieTableMetadataUtil.loadPartitionFileGroupsWithLatestFileSlices(metadataMetaClient, partitionName, false); + List fileSlices = HoodieTableMetadataUtil.getPartitionLatestFileSlices(metadataMetaClient, partitionName); ValidationUtils.checkArgument(fileSlices.size() == numFileGroups, String.format("Invalid number of file groups: found=%d, required=%d", fileSlices.size(), numFileGroups)); return recordsRDD.map(r -> { diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedMetadata.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedMetadata.java index da4a7830d..392d68734 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedMetadata.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedMetadata.java @@ -424,10 +424,8 @@ public class TestHoodieBackedMetadata extends TestHoodieMetadataBase { }); } - /** * Tests that virtual key configs are honored in base files after compaction in metadata table. - * */ @ParameterizedTest @ValueSource(booleans = {true, false}) diff --git a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java index 58c25a17e..05d9d7349 100644 --- a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java +++ b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java @@ -245,7 +245,7 @@ public class HoodieBackedTableMetadata extends BaseTableMetadata { // Metadata is in sync till the latest completed instant on the dataset HoodieTimer timer = new HoodieTimer().startTimer(); - List latestFileSlices = HoodieTableMetadataUtil.loadPartitionFileGroupsWithLatestFileSlices(metadataMetaClient, partitionName, true); + List latestFileSlices = HoodieTableMetadataUtil.getPartitionLatestMergedFileSlices(metadataMetaClient, partitionName); if (latestFileSlices.size() == 0) { // empty partition return Pair.of(null, null); diff --git a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java index 7817d14e0..58d63a194 100644 --- a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java +++ b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java @@ -159,7 +159,7 @@ public class HoodieTableMetadataUtil { * @return a list of metadata table records */ public static List convertMetadataToRecords(HoodieActiveTimeline metadataTableTimeline, - HoodieRestoreMetadata restoreMetadata, String instantTime, Option lastSyncTs) { + HoodieRestoreMetadata restoreMetadata, String instantTime, Option lastSyncTs) { Map> partitionToAppendedFiles = new HashMap<>(); Map> partitionToDeletedFiles = new HashMap<>(); restoreMetadata.getHoodieRestoreMetadata().values().forEach(rms -> { @@ -338,29 +338,65 @@ public class HoodieTableMetadataUtil { } /** - * Loads the list of file groups for a partition of the Metadata Table with latest file slices. + * Get the latest file slices for a Metadata Table partition. If the file slice is + * because of pending compaction instant, then merge the file slice with the one + * just before the compaction instant time. The list of file slices returned is + * sorted in the correct order of file group name. * - * The list of file slices returned is sorted in the correct order of file group name. - * @param metaClient instance of {@link HoodieTableMetaClient}. - * @param partition The name of the partition whose file groups are to be loaded. - * @param isReader true if reader code path, false otherwise. + * @param metaClient - Instance of {@link HoodieTableMetaClient}. + * @param partition - The name of the partition whose file groups are to be loaded. * @return List of latest file slices for all file groups in a given partition. */ - public static List loadPartitionFileGroupsWithLatestFileSlices(HoodieTableMetaClient metaClient, String partition, boolean isReader) { - LOG.info("Loading file groups for metadata table partition " + partition); + public static List getPartitionLatestMergedFileSlices(HoodieTableMetaClient metaClient, String partition) { + LOG.info("Loading latest merged file slices for metadata table partition " + partition); + return getPartitionFileSlices(metaClient, partition, true); + } - // If there are no commits on the metadata table then the table's default FileSystemView will not return any file - // slices even though we may have initialized them. + /** + * Get the latest file slices for a Metadata Table partition. The list of file slices + * returned is sorted in the correct order of file group name. + * + * @param metaClient - Instance of {@link HoodieTableMetaClient}. + * @param partition - The name of the partition whose file groups are to be loaded. + * @return List of latest file slices for all file groups in a given partition. + */ + public static List getPartitionLatestFileSlices(HoodieTableMetaClient metaClient, String partition) { + LOG.info("Loading latest file slices for metadata table partition " + partition); + return getPartitionFileSlices(metaClient, partition, false); + } + + /** + * Get the latest file slices for a given partition. + * + * @param metaClient - Instance of {@link HoodieTableMetaClient}. + * @param partition - The name of the partition whose file groups are to be loaded. + * @param mergeFileSlices - When enabled, will merge the latest file slices with the last known + * completed instant. This is useful for readers when there are pending + * compactions. MergeFileSlices when disabled, will return the latest file + * slices without any merging, and this is needed for the writers. + * @return List of latest file slices for all file groups in a given partition. + */ + private static List getPartitionFileSlices(HoodieTableMetaClient metaClient, String partition, + boolean mergeFileSlices) { + // If there are no commits on the metadata table then the table's + // default FileSystemView will not return any file slices even + // though we may have initialized them. HoodieTimeline timeline = metaClient.getActiveTimeline(); if (timeline.empty()) { - final HoodieInstant instant = new HoodieInstant(false, HoodieTimeline.DELTA_COMMIT_ACTION, HoodieActiveTimeline.createNewInstantTime()); + final HoodieInstant instant = new HoodieInstant(false, HoodieTimeline.DELTA_COMMIT_ACTION, + HoodieActiveTimeline.createNewInstantTime()); timeline = new HoodieDefaultTimeline(Arrays.asList(instant).stream(), metaClient.getActiveTimeline()::getInstantDetails); } HoodieTableFileSystemView fsView = new HoodieTableFileSystemView(metaClient, timeline); - Stream fileSliceStream = isReader ? fsView.getLatestMergedFileSlicesBeforeOrOn(partition, timeline.filterCompletedInstants().lastInstant().get().getTimestamp()) : - fsView.getLatestFileSlices(partition); - return fileSliceStream.sorted((s1, s2) -> s1.getFileId().compareTo(s2.getFileId())) - .collect(Collectors.toList()); + Stream fileSliceStream; + if (mergeFileSlices) { + fileSliceStream = fsView.getLatestMergedFileSlicesBeforeOrOn( + partition, timeline.filterCompletedInstants().lastInstant().get().getTimestamp()); + } else { + fileSliceStream = fsView.getLatestFileSlices(partition); + } + return fileSliceStream.sorted((s1, s2) -> s1.getFileId().compareTo(s2.getFileId())).collect(Collectors.toList()); } + }