From cf3a2ead32f432757668d49a9138f891110aa9a5 Mon Sep 17 00:00:00 2001 From: Danny Chan Date: Wed, 8 Sep 2021 10:43:54 +0800 Subject: [PATCH] [HUDI-2401] Load archived instants for flink streaming reader (#3610) --- .../hudi/source/StreamReadMonitoringFunction.java | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/hudi-flink/src/main/java/org/apache/hudi/source/StreamReadMonitoringFunction.java b/hudi-flink/src/main/java/org/apache/hudi/source/StreamReadMonitoringFunction.java index 112dfda54..ec5690341 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/source/StreamReadMonitoringFunction.java +++ b/hudi-flink/src/main/java/org/apache/hudi/source/StreamReadMonitoringFunction.java @@ -352,10 +352,16 @@ public class StreamReadMonitoringFunction // 1. the start commit is 'earliest'; // 2. the start instant is archived. HoodieArchivedTimeline archivedTimeline = metaClient.getArchivedTimeline(); - if (!metaClient.getArchivedTimeline().empty()) { - Stream instantStream = archivedTimeline.getCommitsTimeline().filterCompletedInstants().getInstants(); + HoodieTimeline archivedCompleteTimeline = archivedTimeline.getCommitsTimeline().filterCompletedInstants(); + if (!archivedCompleteTimeline.empty()) { + final String endTs = archivedCompleteTimeline.lastInstant().get().getTimestamp(); + Stream instantStream = archivedCompleteTimeline.getInstants(); if (instantRange != null) { + archivedTimeline.loadInstantDetailsInMemory(instantRange.getStartInstant(), endTs); instantStream = instantStream.filter(s -> HoodieTimeline.compareTimestamps(s.getTimestamp(), GREATER_THAN_OR_EQUALS, instantRange.getStartInstant())); + } else { + final String startTs = archivedCompleteTimeline.firstInstant().get().getTimestamp(); + archivedTimeline.loadInstantDetailsInMemory(startTs, endTs); } return instantStream .map(instant -> WriteProfiles.getCommitMetadata(tableName, path, instant, archivedTimeline)).collect(Collectors.toList());