diff --git a/service-hudi-query/src/main/java/com/lanyuanxiaoyao/service/hudi/utils/HoodieUtils.java b/service-hudi-query/src/main/java/com/lanyuanxiaoyao/service/hudi/utils/HoodieUtils.java index e3070fd..2524288 100644 --- a/service-hudi-query/src/main/java/com/lanyuanxiaoyao/service/hudi/utils/HoodieUtils.java +++ b/service-hudi-query/src/main/java/com/lanyuanxiaoyao/service/hudi/utils/HoodieUtils.java @@ -1,8 +1,10 @@ package com.lanyuanxiaoyao.service.hudi.utils; +import cn.hutool.core.util.ReUtil; import cn.hutool.core.util.StrUtil; import com.lanyuanxiaoyao.service.configuration.entity.hudi.HudiInstant; import java.io.IOException; +import java.util.Arrays; import java.util.Comparator; import java.util.List; import java.util.Set; @@ -66,10 +68,10 @@ public class HoodieUtils { MutableList instants = Lists.mutable.empty(); if (active) { - instants.addAllIterable(activeInstants(client.getActiveTimeline())); + instants.addAllIterable(activeInstants(client)); } if (archive) { - instants.addAllIterable(archiveInstants(client.getArchivedTimeline())); + instants.addAllIterable(archiveInstants(client)); } MutableMap fileModifiedTimeMap = Lists.immutable.of(fileSystem.listStatus(metadataPath)) @@ -97,16 +99,23 @@ public class HoodieUtils { ); } - private static ImmutableList activeInstants(HoodieActiveTimeline timeline) { + private static ImmutableList activeInstants(HoodieTableMetaClient client) throws IOException { + HoodieActiveTimeline timeline = client.getActiveTimeline(); Set committedTimestamps = timeline.getCommitsTimeline() .filterCompletedInstants() .getInstants() .map(HoodieInstant::getTimestamp) .collect(Collectors.toSet()); + Set compactionCommittedTimestamps = Arrays.stream(client.getFs().listStatus(new Path(client.getMetaPath()))) + .filter(status -> status.getPath().toString().endsWith(HoodieTimeline.REQUESTED_COMPACTION_EXTENSION)) + .map(status -> status.getPath().getName()) + .map(name -> ReUtil.get("^(\\d+)\\..+", name, 1)) + .filter(committedTimestamps::contains) + .collect(Collectors.toSet()); List instants = timeline.getInstants() .map(instant -> convert(instant, "active")) .map(instant -> { - if (HoodieTimeline.COMPACTION_ACTION.equals(instant.getAction()) && committedTimestamps.contains(instant.getTimestamp())) { + if (compactionCommittedTimestamps.contains(instant.getTimestamp())) { return new HudiInstant( HoodieTimeline.COMPACTION_ACTION, HoodieInstant.State.COMPLETED.name(), @@ -122,7 +131,8 @@ public class HoodieUtils { return Lists.immutable.ofAll(instants); } - private static ImmutableList archiveInstants(HoodieArchivedTimeline timeline) { + private static ImmutableList archiveInstants(HoodieTableMetaClient client) { + HoodieArchivedTimeline timeline = client.getArchivedTimeline(); MutableList instants = Lists.mutable.ofAll(timeline.getInstants().collect(Collectors.toList())); instants.forEach(instant -> logger.info(instant.toString())); MutableListMultimap stateMap = instants.groupBy(HoodieInstant::getState).collectValues(HoodieInstant::getTimestamp);