diff --git a/service-hudi-query/src/main/java/com/lanyuanxiaoyao/service/hudi/service/TimelineService.java b/service-hudi-query/src/main/java/com/lanyuanxiaoyao/service/hudi/service/TimelineService.java index 0d0d8f3..7d99920 100644 --- a/service-hudi-query/src/main/java/com/lanyuanxiaoyao/service/hudi/service/TimelineService.java +++ b/service-hudi-query/src/main/java/com/lanyuanxiaoyao/service/hudi/service/TimelineService.java @@ -13,7 +13,9 @@ import com.lanyuanxiaoyao.service.configuration.utils.ComparatorUtil; import com.lanyuanxiaoyao.service.forest.service.InfoService; import com.lanyuanxiaoyao.service.hudi.utils.HoodieUtils; import java.io.IOException; +import java.util.List; import java.util.function.Function; +import java.util.stream.Collectors; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; @@ -273,8 +275,15 @@ public class TimelineService { .setConf(new Configuration()) .setBasePath(meta.getHudi().getTargetHdfsPath()) .build(); - return HoodieUtils.getAllActiveInstants(client) - .select(instant -> StrUtil.equals(instant.getAction(), HoodieTimeline.COMPACTION_ACTION)) - .reject(instant -> StrUtil.equals(instant.getState(), HoodieInstant.State.COMPLETED.name())); + // 使用默认的方式取待压缩列表,不然获取压缩完成状态需要再扫描一次时间线目录,会降低性能 + List list = client.getActiveTimeline() + .filterPendingCompactionTimeline() + .getInstants() + .map(instant -> HoodieUtils.convert(instant, "active")) + .collect(Collectors.toList()); + if (ObjectUtil.isNotEmpty(list)) { + return Lists.immutable.ofAll(list); + } + return Lists.immutable.empty(); } } diff --git a/service-hudi-query/src/main/java/com/lanyuanxiaoyao/service/hudi/utils/HoodieUtils.java b/service-hudi-query/src/main/java/com/lanyuanxiaoyao/service/hudi/utils/HoodieUtils.java index 2524288..d1f9a9a 100644 --- a/service-hudi-query/src/main/java/com/lanyuanxiaoyao/service/hudi/utils/HoodieUtils.java +++ b/service-hudi-query/src/main/java/com/lanyuanxiaoyao/service/hudi/utils/HoodieUtils.java @@ -88,7 +88,7 @@ public class HoodieUtils { .toImmutable(); } - private static HudiInstant convert(HoodieInstant instant, String type) { + public static HudiInstant convert(HoodieInstant instant, String type) { return new HudiInstant( instant.getAction(), instant.getState().name(), @@ -99,7 +99,7 @@ public class HoodieUtils { ); } - private static ImmutableList activeInstants(HoodieTableMetaClient client) throws IOException { + public static ImmutableList activeInstants(HoodieTableMetaClient client) throws IOException { HoodieActiveTimeline timeline = client.getActiveTimeline(); Set committedTimestamps = timeline.getCommitsTimeline() .filterCompletedInstants() @@ -131,7 +131,7 @@ public class HoodieUtils { return Lists.immutable.ofAll(instants); } - private static ImmutableList archiveInstants(HoodieTableMetaClient client) { + public static ImmutableList archiveInstants(HoodieTableMetaClient client) { HoodieArchivedTimeline timeline = client.getArchivedTimeline(); MutableList instants = Lists.mutable.ofAll(timeline.getInstants().collect(Collectors.toList())); instants.forEach(instant -> logger.info(instant.toString()));