1
0

[HUDI-2938] Metadata table util to get latest file slices for reader/writers (#4218)

This commit is contained in:
Manoj Govindassamy
2021-12-11 20:42:36 -08:00
committed by GitHub
parent 15444c951f
commit b22c2c611b
6 changed files with 54 additions and 21 deletions

View File

@@ -697,7 +697,6 @@ public abstract class HoodieBackedTableMetadataWriter implements HoodieTableMeta
return; return;
} }
// Trigger compaction with suffixes based on the same instant time. This ensures that any future // Trigger compaction with suffixes based on the same instant time. This ensures that any future
// delta commits synced over will not have an instant time lesser than the last completed instant on the // delta commits synced over will not have an instant time lesser than the last completed instant on the
// metadata table. // metadata table.

View File

@@ -154,7 +154,7 @@ public class FlinkHoodieBackedTableMetadataWriter extends HoodieBackedTableMetad
* The record is tagged with respective file slice's location based on its record key. * The record is tagged with respective file slice's location based on its record key.
*/ */
private List<HoodieRecord> prepRecords(List<HoodieRecord> records, String partitionName, int numFileGroups) { private List<HoodieRecord> prepRecords(List<HoodieRecord> records, String partitionName, int numFileGroups) {
List<FileSlice> fileSlices = HoodieTableMetadataUtil.loadPartitionFileGroupsWithLatestFileSlices(metadataMetaClient, partitionName, false); List<FileSlice> fileSlices = HoodieTableMetadataUtil.getPartitionLatestFileSlices(metadataMetaClient, partitionName);
ValidationUtils.checkArgument(fileSlices.size() == numFileGroups, String.format("Invalid number of file groups: found=%d, required=%d", fileSlices.size(), numFileGroups)); ValidationUtils.checkArgument(fileSlices.size() == numFileGroups, String.format("Invalid number of file groups: found=%d, required=%d", fileSlices.size(), numFileGroups));
return records.stream().map(r -> { return records.stream().map(r -> {

View File

@@ -169,7 +169,7 @@ public class SparkHoodieBackedTableMetadataWriter extends HoodieBackedTableMetad
* The record is tagged with respective file slice's location based on its record key. * The record is tagged with respective file slice's location based on its record key.
*/ */
private JavaRDD<HoodieRecord> prepRecords(JavaRDD<HoodieRecord> recordsRDD, String partitionName, int numFileGroups) { private JavaRDD<HoodieRecord> prepRecords(JavaRDD<HoodieRecord> recordsRDD, String partitionName, int numFileGroups) {
List<FileSlice> fileSlices = HoodieTableMetadataUtil.loadPartitionFileGroupsWithLatestFileSlices(metadataMetaClient, partitionName, false); List<FileSlice> fileSlices = HoodieTableMetadataUtil.getPartitionLatestFileSlices(metadataMetaClient, partitionName);
ValidationUtils.checkArgument(fileSlices.size() == numFileGroups, String.format("Invalid number of file groups: found=%d, required=%d", fileSlices.size(), numFileGroups)); ValidationUtils.checkArgument(fileSlices.size() == numFileGroups, String.format("Invalid number of file groups: found=%d, required=%d", fileSlices.size(), numFileGroups));
return recordsRDD.map(r -> { return recordsRDD.map(r -> {

View File

@@ -424,10 +424,8 @@ public class TestHoodieBackedMetadata extends TestHoodieMetadataBase {
}); });
} }
/** /**
* Tests that virtual key configs are honored in base files after compaction in metadata table. * Tests that virtual key configs are honored in base files after compaction in metadata table.
*
*/ */
@ParameterizedTest @ParameterizedTest
@ValueSource(booleans = {true, false}) @ValueSource(booleans = {true, false})

View File

@@ -245,7 +245,7 @@ public class HoodieBackedTableMetadata extends BaseTableMetadata {
// Metadata is in sync till the latest completed instant on the dataset // Metadata is in sync till the latest completed instant on the dataset
HoodieTimer timer = new HoodieTimer().startTimer(); HoodieTimer timer = new HoodieTimer().startTimer();
List<FileSlice> latestFileSlices = HoodieTableMetadataUtil.loadPartitionFileGroupsWithLatestFileSlices(metadataMetaClient, partitionName, true); List<FileSlice> latestFileSlices = HoodieTableMetadataUtil.getPartitionLatestMergedFileSlices(metadataMetaClient, partitionName);
if (latestFileSlices.size() == 0) { if (latestFileSlices.size() == 0) {
// empty partition // empty partition
return Pair.of(null, null); return Pair.of(null, null);

View File

@@ -338,29 +338,65 @@ public class HoodieTableMetadataUtil {
} }
/** /**
* Loads the list of file groups for a partition of the Metadata Table with latest file slices. * Get the latest file slices for a Metadata Table partition. If the file slice is
* because of pending compaction instant, then merge the file slice with the one
* just before the compaction instant time. The list of file slices returned is
* sorted in the correct order of file group name.
* *
* The list of file slices returned is sorted in the correct order of file group name. * @param metaClient - Instance of {@link HoodieTableMetaClient}.
* @param metaClient instance of {@link HoodieTableMetaClient}. * @param partition - The name of the partition whose file groups are to be loaded.
* @param partition The name of the partition whose file groups are to be loaded.
* @param isReader true if reader code path, false otherwise.
* @return List of latest file slices for all file groups in a given partition. * @return List of latest file slices for all file groups in a given partition.
*/ */
public static List<FileSlice> loadPartitionFileGroupsWithLatestFileSlices(HoodieTableMetaClient metaClient, String partition, boolean isReader) { public static List<FileSlice> getPartitionLatestMergedFileSlices(HoodieTableMetaClient metaClient, String partition) {
LOG.info("Loading file groups for metadata table partition " + partition); LOG.info("Loading latest merged file slices for metadata table partition " + partition);
return getPartitionFileSlices(metaClient, partition, true);
}
// If there are no commits on the metadata table then the table's default FileSystemView will not return any file /**
// slices even though we may have initialized them. * Get the latest file slices for a Metadata Table partition. The list of file slices
* returned is sorted in the correct order of file group name.
*
* @param metaClient - Instance of {@link HoodieTableMetaClient}.
* @param partition - The name of the partition whose file groups are to be loaded.
* @return List of latest file slices for all file groups in a given partition.
*/
public static List<FileSlice> getPartitionLatestFileSlices(HoodieTableMetaClient metaClient, String partition) {
LOG.info("Loading latest file slices for metadata table partition " + partition);
return getPartitionFileSlices(metaClient, partition, false);
}
/**
* Get the latest file slices for a given partition.
*
* @param metaClient - Instance of {@link HoodieTableMetaClient}.
* @param partition - The name of the partition whose file groups are to be loaded.
* @param mergeFileSlices - When enabled, will merge the latest file slices with the last known
* completed instant. This is useful for readers when there are pending
* compactions. MergeFileSlices when disabled, will return the latest file
* slices without any merging, and this is needed for the writers.
* @return List of latest file slices for all file groups in a given partition.
*/
private static List<FileSlice> getPartitionFileSlices(HoodieTableMetaClient metaClient, String partition,
boolean mergeFileSlices) {
// If there are no commits on the metadata table then the table's
// default FileSystemView will not return any file slices even
// though we may have initialized them.
HoodieTimeline timeline = metaClient.getActiveTimeline(); HoodieTimeline timeline = metaClient.getActiveTimeline();
if (timeline.empty()) { if (timeline.empty()) {
final HoodieInstant instant = new HoodieInstant(false, HoodieTimeline.DELTA_COMMIT_ACTION, HoodieActiveTimeline.createNewInstantTime()); final HoodieInstant instant = new HoodieInstant(false, HoodieTimeline.DELTA_COMMIT_ACTION,
HoodieActiveTimeline.createNewInstantTime());
timeline = new HoodieDefaultTimeline(Arrays.asList(instant).stream(), metaClient.getActiveTimeline()::getInstantDetails); timeline = new HoodieDefaultTimeline(Arrays.asList(instant).stream(), metaClient.getActiveTimeline()::getInstantDetails);
} }
HoodieTableFileSystemView fsView = new HoodieTableFileSystemView(metaClient, timeline); HoodieTableFileSystemView fsView = new HoodieTableFileSystemView(metaClient, timeline);
Stream<FileSlice> fileSliceStream = isReader ? fsView.getLatestMergedFileSlicesBeforeOrOn(partition, timeline.filterCompletedInstants().lastInstant().get().getTimestamp()) : Stream<FileSlice> fileSliceStream;
fsView.getLatestFileSlices(partition); if (mergeFileSlices) {
return fileSliceStream.sorted((s1, s2) -> s1.getFileId().compareTo(s2.getFileId())) fileSliceStream = fsView.getLatestMergedFileSlicesBeforeOrOn(
.collect(Collectors.toList()); partition, timeline.filterCompletedInstants().lastInstant().get().getTimestamp());
} else {
fileSliceStream = fsView.getLatestFileSlices(partition);
} }
return fileSliceStream.sorted((s1, s2) -> s1.getFileId().compareTo(s2.getFileId())).collect(Collectors.toList());
}
} }