From 095347cd1a7bddcc9eb89e9e5599b1e1905b6dba Mon Sep 17 00:00:00 2001 From: v-zhangjc9 Date: Tue, 28 May 2024 11:52:42 +0800 Subject: [PATCH] =?UTF-8?q?fix(hudi-query):=20=E4=BF=AE=E5=A4=8D=E8=8E=B7?= =?UTF-8?q?=E5=8F=96=E5=BE=85=E5=8E=8B=E7=BC=A9=E5=88=97=E8=A1=A8=E4=BC=9A?= =?UTF-8?q?=E6=8B=96=E6=85=A2=E6=80=A7=E8=83=BD?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../service/hudi/service/TimelineService.java | 15 ++++++++++++--- .../service/hudi/utils/HoodieUtils.java | 6 +++--- 2 files changed, 15 insertions(+), 6 deletions(-) diff --git a/service-hudi-query/src/main/java/com/lanyuanxiaoyao/service/hudi/service/TimelineService.java b/service-hudi-query/src/main/java/com/lanyuanxiaoyao/service/hudi/service/TimelineService.java index 0d0d8f3..7d99920 100644 --- a/service-hudi-query/src/main/java/com/lanyuanxiaoyao/service/hudi/service/TimelineService.java +++ b/service-hudi-query/src/main/java/com/lanyuanxiaoyao/service/hudi/service/TimelineService.java @@ -13,7 +13,9 @@ import com.lanyuanxiaoyao.service.configuration.utils.ComparatorUtil; import com.lanyuanxiaoyao.service.forest.service.InfoService; import com.lanyuanxiaoyao.service.hudi.utils.HoodieUtils; import java.io.IOException; +import java.util.List; import java.util.function.Function; +import java.util.stream.Collectors; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; @@ -273,8 +275,15 @@ public class TimelineService { .setConf(new Configuration()) .setBasePath(meta.getHudi().getTargetHdfsPath()) .build(); - return HoodieUtils.getAllActiveInstants(client) - .select(instant -> StrUtil.equals(instant.getAction(), HoodieTimeline.COMPACTION_ACTION)) - .reject(instant -> StrUtil.equals(instant.getState(), HoodieInstant.State.COMPLETED.name())); + // 使用默认的方式取待压缩列表,不然获取压缩完成状态需要再扫描一次时间线目录,会降低性能 + List list = client.getActiveTimeline() + .filterPendingCompactionTimeline() + .getInstants() + .map(instant -> HoodieUtils.convert(instant, "active")) + .collect(Collectors.toList()); + if (ObjectUtil.isNotEmpty(list)) { + return Lists.immutable.ofAll(list); + } + return Lists.immutable.empty(); } } diff --git a/service-hudi-query/src/main/java/com/lanyuanxiaoyao/service/hudi/utils/HoodieUtils.java b/service-hudi-query/src/main/java/com/lanyuanxiaoyao/service/hudi/utils/HoodieUtils.java index 2524288..d1f9a9a 100644 --- a/service-hudi-query/src/main/java/com/lanyuanxiaoyao/service/hudi/utils/HoodieUtils.java +++ b/service-hudi-query/src/main/java/com/lanyuanxiaoyao/service/hudi/utils/HoodieUtils.java @@ -88,7 +88,7 @@ public class HoodieUtils { .toImmutable(); } - private static HudiInstant convert(HoodieInstant instant, String type) { + public static HudiInstant convert(HoodieInstant instant, String type) { return new HudiInstant( instant.getAction(), instant.getState().name(), @@ -99,7 +99,7 @@ public class HoodieUtils { ); } - private static ImmutableList activeInstants(HoodieTableMetaClient client) throws IOException { + public static ImmutableList activeInstants(HoodieTableMetaClient client) throws IOException { HoodieActiveTimeline timeline = client.getActiveTimeline(); Set committedTimestamps = timeline.getCommitsTimeline() .filterCompletedInstants() @@ -131,7 +131,7 @@ public class HoodieUtils { return Lists.immutable.ofAll(instants); } - private static ImmutableList archiveInstants(HoodieTableMetaClient client) { + public static ImmutableList archiveInstants(HoodieTableMetaClient client) { HoodieArchivedTimeline timeline = client.getArchivedTimeline(); MutableList instants = Lists.mutable.ofAll(timeline.getInstants().collect(Collectors.toList())); instants.forEach(instant -> logger.info(instant.toString()));