From 9a1afbcceb4faab9da2146e43cf921cb66832832 Mon Sep 17 00:00:00 2001 From: v-zhangjc9 Date: Tue, 28 May 2024 11:00:30 +0800 Subject: [PATCH] =?UTF-8?q?fix(hudi-query):=20=E4=BF=AE=E5=A4=8D=E6=97=B6?= =?UTF-8?q?=E9=97=B4=E7=BA=BFcompaction=E6=93=8D=E4=BD=9C=E5=AE=8C?= =?UTF-8?q?=E6=88=90=E7=8A=B6=E6=80=81=E6=97=A0=E6=B3=95=E8=AF=86=E5=88=AB?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../service/hudi/utils/HoodieUtils.java | 20 ++++++++++++++----- 1 file changed, 15 insertions(+), 5 deletions(-) diff --git a/service-hudi-query/src/main/java/com/lanyuanxiaoyao/service/hudi/utils/HoodieUtils.java b/service-hudi-query/src/main/java/com/lanyuanxiaoyao/service/hudi/utils/HoodieUtils.java index e3070fd..2524288 100644 --- a/service-hudi-query/src/main/java/com/lanyuanxiaoyao/service/hudi/utils/HoodieUtils.java +++ b/service-hudi-query/src/main/java/com/lanyuanxiaoyao/service/hudi/utils/HoodieUtils.java @@ -1,8 +1,10 @@ package com.lanyuanxiaoyao.service.hudi.utils; +import cn.hutool.core.util.ReUtil; import cn.hutool.core.util.StrUtil; import com.lanyuanxiaoyao.service.configuration.entity.hudi.HudiInstant; import java.io.IOException; +import java.util.Arrays; import java.util.Comparator; import java.util.List; import java.util.Set; @@ -66,10 +68,10 @@ public class HoodieUtils { MutableList instants = Lists.mutable.empty(); if (active) { - instants.addAllIterable(activeInstants(client.getActiveTimeline())); + instants.addAllIterable(activeInstants(client)); } if (archive) { - instants.addAllIterable(archiveInstants(client.getArchivedTimeline())); + instants.addAllIterable(archiveInstants(client)); } MutableMap fileModifiedTimeMap = Lists.immutable.of(fileSystem.listStatus(metadataPath)) @@ -97,16 +99,23 @@ public class HoodieUtils { ); } - private static ImmutableList activeInstants(HoodieActiveTimeline timeline) { + private static ImmutableList activeInstants(HoodieTableMetaClient client) throws IOException { + HoodieActiveTimeline timeline = client.getActiveTimeline(); Set committedTimestamps = timeline.getCommitsTimeline() .filterCompletedInstants() .getInstants() .map(HoodieInstant::getTimestamp) .collect(Collectors.toSet()); + Set compactionCommittedTimestamps = Arrays.stream(client.getFs().listStatus(new Path(client.getMetaPath()))) + .filter(status -> status.getPath().toString().endsWith(HoodieTimeline.REQUESTED_COMPACTION_EXTENSION)) + .map(status -> status.getPath().getName()) + .map(name -> ReUtil.get("^(\\d+)\\..+", name, 1)) + .filter(committedTimestamps::contains) + .collect(Collectors.toSet()); List instants = timeline.getInstants() .map(instant -> convert(instant, "active")) .map(instant -> { - if (HoodieTimeline.COMPACTION_ACTION.equals(instant.getAction()) && committedTimestamps.contains(instant.getTimestamp())) { + if (compactionCommittedTimestamps.contains(instant.getTimestamp())) { return new HudiInstant( HoodieTimeline.COMPACTION_ACTION, HoodieInstant.State.COMPLETED.name(), @@ -122,7 +131,8 @@ public class HoodieUtils { return Lists.immutable.ofAll(instants); } - private static ImmutableList archiveInstants(HoodieArchivedTimeline timeline) { + private static ImmutableList archiveInstants(HoodieTableMetaClient client) { + HoodieArchivedTimeline timeline = client.getArchivedTimeline(); MutableList instants = Lists.mutable.ofAll(timeline.getInstants().collect(Collectors.toList())); instants.forEach(instant -> logger.info(instant.toString())); MutableListMultimap stateMap = instants.groupBy(HoodieInstant::getState).collectValues(HoodieInstant::getTimestamp);