1
0

[HUDI-2401] Load archived instants for flink streaming reader (#3610)

This commit is contained in:
Danny Chan
2021-09-08 10:43:54 +08:00
committed by GitHub
parent ea59a7ff5f
commit cf3a2ead32

View File

@@ -352,10 +352,16 @@ public class StreamReadMonitoringFunction
// 1. the start commit is 'earliest'; // 1. the start commit is 'earliest';
// 2. the start instant is archived. // 2. the start instant is archived.
HoodieArchivedTimeline archivedTimeline = metaClient.getArchivedTimeline(); HoodieArchivedTimeline archivedTimeline = metaClient.getArchivedTimeline();
if (!metaClient.getArchivedTimeline().empty()) { HoodieTimeline archivedCompleteTimeline = archivedTimeline.getCommitsTimeline().filterCompletedInstants();
Stream<HoodieInstant> instantStream = archivedTimeline.getCommitsTimeline().filterCompletedInstants().getInstants(); if (!archivedCompleteTimeline.empty()) {
final String endTs = archivedCompleteTimeline.lastInstant().get().getTimestamp();
Stream<HoodieInstant> instantStream = archivedCompleteTimeline.getInstants();
if (instantRange != null) { if (instantRange != null) {
archivedTimeline.loadInstantDetailsInMemory(instantRange.getStartInstant(), endTs);
instantStream = instantStream.filter(s -> HoodieTimeline.compareTimestamps(s.getTimestamp(), GREATER_THAN_OR_EQUALS, instantRange.getStartInstant())); instantStream = instantStream.filter(s -> HoodieTimeline.compareTimestamps(s.getTimestamp(), GREATER_THAN_OR_EQUALS, instantRange.getStartInstant()));
} else {
final String startTs = archivedCompleteTimeline.firstInstant().get().getTimestamp();
archivedTimeline.loadInstantDetailsInMemory(startTs, endTs);
} }
return instantStream return instantStream
.map(instant -> WriteProfiles.getCommitMetadata(tableName, path, instant, archivedTimeline)).collect(Collectors.toList()); .map(instant -> WriteProfiles.getCommitMetadata(tableName, path, instant, archivedTimeline)).collect(Collectors.toList());