fix(hudi-query): 修复时间线compaction操作完成状态无法识别

This commit is contained in:
v-zhangjc9
2024-05-28 11:00:30 +08:00
parent e257044ff6
commit 9a1afbcceb

View File

@@ -1,8 +1,10 @@
package com.lanyuanxiaoyao.service.hudi.utils; package com.lanyuanxiaoyao.service.hudi.utils;
import cn.hutool.core.util.ReUtil;
import cn.hutool.core.util.StrUtil; import cn.hutool.core.util.StrUtil;
import com.lanyuanxiaoyao.service.configuration.entity.hudi.HudiInstant; import com.lanyuanxiaoyao.service.configuration.entity.hudi.HudiInstant;
import java.io.IOException; import java.io.IOException;
import java.util.Arrays;
import java.util.Comparator; import java.util.Comparator;
import java.util.List; import java.util.List;
import java.util.Set; import java.util.Set;
@@ -66,10 +68,10 @@ public class HoodieUtils {
MutableList<HudiInstant> instants = Lists.mutable.empty(); MutableList<HudiInstant> instants = Lists.mutable.empty();
if (active) { if (active) {
instants.addAllIterable(activeInstants(client.getActiveTimeline())); instants.addAllIterable(activeInstants(client));
} }
if (archive) { if (archive) {
instants.addAllIterable(archiveInstants(client.getArchivedTimeline())); instants.addAllIterable(archiveInstants(client));
} }
MutableMap<String, Long> fileModifiedTimeMap = Lists.immutable.of(fileSystem.listStatus(metadataPath)) MutableMap<String, Long> fileModifiedTimeMap = Lists.immutable.of(fileSystem.listStatus(metadataPath))
@@ -97,16 +99,23 @@ public class HoodieUtils {
); );
} }
private static ImmutableList<HudiInstant> activeInstants(HoodieActiveTimeline timeline) { private static ImmutableList<HudiInstant> activeInstants(HoodieTableMetaClient client) throws IOException {
HoodieActiveTimeline timeline = client.getActiveTimeline();
Set<String> committedTimestamps = timeline.getCommitsTimeline() Set<String> committedTimestamps = timeline.getCommitsTimeline()
.filterCompletedInstants() .filterCompletedInstants()
.getInstants() .getInstants()
.map(HoodieInstant::getTimestamp) .map(HoodieInstant::getTimestamp)
.collect(Collectors.toSet()); .collect(Collectors.toSet());
Set<String> compactionCommittedTimestamps = Arrays.stream(client.getFs().listStatus(new Path(client.getMetaPath())))
.filter(status -> status.getPath().toString().endsWith(HoodieTimeline.REQUESTED_COMPACTION_EXTENSION))
.map(status -> status.getPath().getName())
.map(name -> ReUtil.get("^(\\d+)\\..+", name, 1))
.filter(committedTimestamps::contains)
.collect(Collectors.toSet());
List<HudiInstant> instants = timeline.getInstants() List<HudiInstant> instants = timeline.getInstants()
.map(instant -> convert(instant, "active")) .map(instant -> convert(instant, "active"))
.map(instant -> { .map(instant -> {
if (HoodieTimeline.COMPACTION_ACTION.equals(instant.getAction()) && committedTimestamps.contains(instant.getTimestamp())) { if (compactionCommittedTimestamps.contains(instant.getTimestamp())) {
return new HudiInstant( return new HudiInstant(
HoodieTimeline.COMPACTION_ACTION, HoodieTimeline.COMPACTION_ACTION,
HoodieInstant.State.COMPLETED.name(), HoodieInstant.State.COMPLETED.name(),
@@ -122,7 +131,8 @@ public class HoodieUtils {
return Lists.immutable.ofAll(instants); return Lists.immutable.ofAll(instants);
} }
private static ImmutableList<HudiInstant> archiveInstants(HoodieArchivedTimeline timeline) { private static ImmutableList<HudiInstant> archiveInstants(HoodieTableMetaClient client) {
HoodieArchivedTimeline timeline = client.getArchivedTimeline();
MutableList<HoodieInstant> instants = Lists.mutable.ofAll(timeline.getInstants().collect(Collectors.toList())); MutableList<HoodieInstant> instants = Lists.mutable.ofAll(timeline.getInstants().collect(Collectors.toList()));
instants.forEach(instant -> logger.info(instant.toString())); instants.forEach(instant -> logger.info(instant.toString()));
MutableListMultimap<HoodieInstant.State, String> stateMap = instants.groupBy(HoodieInstant::getState).collectValues(HoodieInstant::getTimestamp); MutableListMultimap<HoodieInstant.State, String> stateMap = instants.groupBy(HoodieInstant::getState).collectValues(HoodieInstant::getTimestamp);