fix(hudi-query): 修复获取待压缩列表会拖慢性能
This commit is contained in:
@@ -13,7 +13,9 @@ import com.lanyuanxiaoyao.service.configuration.utils.ComparatorUtil;
|
|||||||
import com.lanyuanxiaoyao.service.forest.service.InfoService;
|
import com.lanyuanxiaoyao.service.forest.service.InfoService;
|
||||||
import com.lanyuanxiaoyao.service.hudi.utils.HoodieUtils;
|
import com.lanyuanxiaoyao.service.hudi.utils.HoodieUtils;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
|
import java.util.List;
|
||||||
import java.util.function.Function;
|
import java.util.function.Function;
|
||||||
|
import java.util.stream.Collectors;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.fs.FileStatus;
|
import org.apache.hadoop.fs.FileStatus;
|
||||||
import org.apache.hadoop.fs.FileSystem;
|
import org.apache.hadoop.fs.FileSystem;
|
||||||
@@ -273,8 +275,15 @@ public class TimelineService {
|
|||||||
.setConf(new Configuration())
|
.setConf(new Configuration())
|
||||||
.setBasePath(meta.getHudi().getTargetHdfsPath())
|
.setBasePath(meta.getHudi().getTargetHdfsPath())
|
||||||
.build();
|
.build();
|
||||||
return HoodieUtils.getAllActiveInstants(client)
|
// 使用默认的方式取待压缩列表,不然获取压缩完成状态需要再扫描一次时间线目录,会降低性能
|
||||||
.select(instant -> StrUtil.equals(instant.getAction(), HoodieTimeline.COMPACTION_ACTION))
|
List<HudiInstant> list = client.getActiveTimeline()
|
||||||
.reject(instant -> StrUtil.equals(instant.getState(), HoodieInstant.State.COMPLETED.name()));
|
.filterPendingCompactionTimeline()
|
||||||
|
.getInstants()
|
||||||
|
.map(instant -> HoodieUtils.convert(instant, "active"))
|
||||||
|
.collect(Collectors.toList());
|
||||||
|
if (ObjectUtil.isNotEmpty(list)) {
|
||||||
|
return Lists.immutable.ofAll(list);
|
||||||
|
}
|
||||||
|
return Lists.immutable.empty();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -88,7 +88,7 @@ public class HoodieUtils {
|
|||||||
.toImmutable();
|
.toImmutable();
|
||||||
}
|
}
|
||||||
|
|
||||||
private static HudiInstant convert(HoodieInstant instant, String type) {
|
public static HudiInstant convert(HoodieInstant instant, String type) {
|
||||||
return new HudiInstant(
|
return new HudiInstant(
|
||||||
instant.getAction(),
|
instant.getAction(),
|
||||||
instant.getState().name(),
|
instant.getState().name(),
|
||||||
@@ -99,7 +99,7 @@ public class HoodieUtils {
|
|||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
private static ImmutableList<HudiInstant> activeInstants(HoodieTableMetaClient client) throws IOException {
|
public static ImmutableList<HudiInstant> activeInstants(HoodieTableMetaClient client) throws IOException {
|
||||||
HoodieActiveTimeline timeline = client.getActiveTimeline();
|
HoodieActiveTimeline timeline = client.getActiveTimeline();
|
||||||
Set<String> committedTimestamps = timeline.getCommitsTimeline()
|
Set<String> committedTimestamps = timeline.getCommitsTimeline()
|
||||||
.filterCompletedInstants()
|
.filterCompletedInstants()
|
||||||
@@ -131,7 +131,7 @@ public class HoodieUtils {
|
|||||||
return Lists.immutable.ofAll(instants);
|
return Lists.immutable.ofAll(instants);
|
||||||
}
|
}
|
||||||
|
|
||||||
private static ImmutableList<HudiInstant> archiveInstants(HoodieTableMetaClient client) {
|
public static ImmutableList<HudiInstant> archiveInstants(HoodieTableMetaClient client) {
|
||||||
HoodieArchivedTimeline timeline = client.getArchivedTimeline();
|
HoodieArchivedTimeline timeline = client.getArchivedTimeline();
|
||||||
MutableList<HoodieInstant> instants = Lists.mutable.ofAll(timeline.getInstants().collect(Collectors.toList()));
|
MutableList<HoodieInstant> instants = Lists.mutable.ofAll(timeline.getInstants().collect(Collectors.toList()));
|
||||||
instants.forEach(instant -> logger.info(instant.toString()));
|
instants.forEach(instant -> logger.info(instant.toString()));
|
||||||
|
|||||||
Reference in New Issue
Block a user