1
0

[HUDI-3194] fix MOR snapshot query during compaction (#4540)

This commit is contained in:
Yuwei XIAO
2022-01-18 06:24:24 +08:00
committed by GitHub
parent 36a9f63e45
commit d36533735f
3 changed files with 130 additions and 5 deletions

View File

@@ -69,7 +69,7 @@ public class HoodieRealtimeInputFormatUtils extends HoodieInputFormatUtils {
private static final Logger LOG = LogManager.getLogger(HoodieRealtimeInputFormatUtils.class);
public static InputSplit[] getRealtimeSplits(Configuration conf, Stream<FileSplit> fileSplits) throws IOException {
public static InputSplit[] getRealtimeSplits(Configuration conf, Stream<FileSplit> fileSplits) {
Map<Path, List<FileSplit>> partitionsToParquetSplits =
fileSplits.collect(Collectors.groupingBy(split -> split.getPath().getParent()));
// TODO(vc): Should we handle also non-hoodie splits here?
@@ -94,8 +94,8 @@ public class HoodieRealtimeInputFormatUtils extends HoodieInputFormatUtils {
HoodieTableMetaClient metaClient = partitionsToMetaClient.get(partitionPath);
if (!fsCache.containsKey(metaClient)) {
HoodieLocalEngineContext engineContext = new HoodieLocalEngineContext(conf);
HoodieTableFileSystemView fsView = FileSystemViewManager.createInMemoryFileSystemView(engineContext,
metaClient, HoodieInputFormatUtils.buildMetadataConfig(conf));
HoodieTableFileSystemView fsView = FileSystemViewManager.createInMemoryFileSystemViewWithTimeline(engineContext,
metaClient, HoodieInputFormatUtils.buildMetadataConfig(conf), metaClient.getActiveTimeline());
fsCache.put(metaClient, fsView);
}
HoodieTableFileSystemView fsView = fsCache.get(metaClient);
@@ -103,7 +103,7 @@ public class HoodieRealtimeInputFormatUtils extends HoodieInputFormatUtils {
String relPartitionPath = FSUtils.getRelativePartitionPath(new Path(metaClient.getBasePath()), partitionPath);
// Both commit and delta-commits are included - pick the latest completed one
Option<HoodieInstant> latestCompletedInstant =
metaClient.getActiveTimeline().getCommitsTimeline().filterCompletedInstants().lastInstant();
metaClient.getActiveTimeline().getWriteTimeline().filterCompletedInstants().lastInstant();
Stream<FileSlice> latestFileSlices = latestCompletedInstant
.map(instant -> fsView.getLatestMergedFileSlicesBeforeOrOn(relPartitionPath, instant.getTimestamp()))