perf(hudi-query): 优化hudi时间线的扫描速度

This commit is contained in:
v-zhangjc9
2024-05-22 13:11:50 +08:00
parent 2b7b7f838c
commit bff18280f3
7 changed files with 261 additions and 82 deletions

View File

@@ -1,3 +1,11 @@
GET http://AxhEbscwsJDbYMH2:cYxg3b4PtWoVD5SjFayWxtnSVsjzRsg4@132.126.207.130:35690/hudi_services/queue/queue/clear?name=compaction-queue-pre
Connection: Keep-Alive
User-Agent: Apache-HttpClient/4.5.14 (Java/17.0.10)
Cookie: JSESSIONID=8516C92140B5118AF9AA61025D0F8C93
Accept-Encoding: br,deflate,gzip,x-gzip
###
GET http://AxhEbscwsJDbYMH2:cYxg3b4PtWoVD5SjFayWxtnSVsjzRsg4@132.126.207.130:35690/hudi_services/service_scheduler/schedule/all
Connection: Keep-Alive
User-Agent: Apache-HttpClient/4.5.14 (Java/17.0.10)
@@ -448,11 +456,3 @@ Accept-Encoding: br,deflate,gzip,x-gzip
###
GET http://AxhEbscwsJDbYMH2:cYxg3b4PtWoVD5SjFayWxtnSVsjzRsg4@132.126.207.130:35690/hudi_services/queue/queue/clear?name=compaction-queue-pre
Connection: Keep-Alive
User-Agent: Apache-HttpClient/4.5.14 (Java/17.0.9)
Cookie: JSESSIONID=9AB8D98C10FACE15EA1CB758D79F8877
Accept-Encoding: br,deflate,gzip,x-gzip
###

View File

@@ -1,6 +1,10 @@
package com.lanyuanxiaoyao.service.command.pro.commands;
import cn.hutool.core.util.StrUtil;
import com.lanyuanxiaoyao.service.common.entity.TableMeta;
import com.lanyuanxiaoyao.service.configuration.ExecutorProvider;
import com.lanyuanxiaoyao.service.forest.service.HudiService;
import com.lanyuanxiaoyao.service.forest.service.InfoService;
import java.io.IOException;
import java.time.Instant;
import java.util.List;
@@ -50,10 +54,19 @@ import org.springframework.shell.standard.ShellOption;
* @author lanyuanxiaoyao
* @date 2024-03-19
*/
@SuppressWarnings("SpringJavaInjectionPointsAutowiringInspection")
@ShellComponent("Hudi相关操作")
public class HudiCommand {
private static final Logger logger = LoggerFactory.getLogger(HudiCommand.class);
private final InfoService infoService;
private final HudiService hudiService;
public HudiCommand(InfoService infoService, HudiService hudiService) {
this.infoService = infoService;
this.hudiService = hudiService;
}
public static void time(String name, Runnable runnable) {
logger.info(name);
long startTime = Instant.now().toEpochMilli();
@@ -158,6 +171,41 @@ public class HudiCommand {
time("reader 3", counter -> reader3(counter, configuration, root));
}
@ShellMethod("Count meta files")
public void countMetaFiles(@ShellOption(help = "alias", defaultValue = "") String alias) throws IOException {
LongAdder count = new LongAdder();
FileSystem fileSystem = FileSystem.get(new Configuration());
infoService
.tableMetaList()
.select(meta -> StrUtil.isBlank(alias) || StrUtil.equals(meta.getAlias(), alias))
.collect(TableMeta::getHudi)
.collect(TableMeta.HudiMeta::getTargetHdfsPath)
.asParallel(ExecutorProvider.EXECUTORS_5, 1)
.forEach(hdfs -> {
Path root = new Path(hdfs, ".hoodie");
try {
FileStatus[] statuses = fileSystem.listStatus(root);
for (FileStatus status : statuses) {
if (status.isFile()) {
count.increment();
}
if (StrUtil.containsIgnoreCase(status.getPath().toString(), "INVALID")) {
logger.info("{}", status.getPath().toString());
}
}
} catch (IOException e) {
logger.warn("List file error", e);
}
});
logger.info("Count: {}", count.longValue());
fileSystem.close();
}
@ShellMethod("Get timeline instants")
public void timelineInstant(@ShellOption(help = "root hdfs path") String hdfs) {
hudiService.timelineHdfsAllActive(hdfs).forEach(instant -> logger.info(instant.toString()));
}
public interface Runnable {
void run(LongAdder counter);
}

View File

@@ -1,5 +1,6 @@
package com.lanyuanxiaoyao.service.configuration.entity.hudi;
import cn.hutool.core.util.StrUtil;
import java.util.Comparator;
import org.eclipse.collections.api.factory.Maps;
import org.eclipse.collections.api.map.ImmutableMap;
@@ -18,11 +19,6 @@ public class HudiInstant implements Comparable<HudiInstant> {
Comparator.comparing(HudiInstant::getTimestamp)
.thenComparing(ACTION_COMPARATOR)
.thenComparing(HudiInstant::getState);
private static String getComparableAction(String action) {
return COMPARABLE_ACTIONS.getOrDefault(action, action);
}
private String action;
// REQUESTED, INFLIGHT, COMPLETED, INVALID
private String state;
@@ -43,24 +39,49 @@ public class HudiInstant implements Comparable<HudiInstant> {
this.type = type;
}
public HudiInstant(String action, String state, String timestamp, String fileName, Long fileTime, String type) {
this.action = action;
this.state = state;
this.timestamp = timestamp;
this.fileName = fileName;
this.fileTime = fileTime;
this.type = type;
}
private static String getComparableAction(String action) {
return COMPARABLE_ACTIONS.getOrDefault(action, action);
}
public String getAction() {
return action;
}
public void setAction(String action) {
this.action = action;
}
public String getState() {
return state;
}
public void setState(String state) {
this.state = state;
}
public String getTimestamp() {
return timestamp;
}
public void setTimestamp(String timestamp) {
this.timestamp = timestamp;
}
public String getFileName() {
return fileName;
}
public String getType() {
return type;
public void setFileName(String fileName) {
this.fileName = fileName;
}
public Long getFileTime() {
@@ -71,15 +92,17 @@ public class HudiInstant implements Comparable<HudiInstant> {
this.fileTime = fileTime;
}
public String getType() {
return type;
}
public void setType(String type) {
this.type = type;
}
@Override
public String toString() {
return "HudiInstant{" +
"action='" + action + '\'' +
", state='" + state + '\'' +
", timestamp='" + timestamp + '\'' +
", fileName='" + fileName + '\'' +
", type='" + type + '\'' +
'}';
return StrUtil.format("{},{},{},{},{},{}", action, state, timestamp, fileName, fileTime, type);
}
@Override

View File

@@ -2,7 +2,6 @@ package com.lanyuanxiaoyao.service.forest.service;
import com.dtflys.forest.annotation.BaseRequest;
import com.dtflys.forest.annotation.Body;
import com.dtflys.forest.annotation.BodyType;
import com.dtflys.forest.annotation.Get;
import com.dtflys.forest.annotation.Post;
import com.dtflys.forest.annotation.Query;
@@ -31,6 +30,24 @@ public interface HudiService {
@Get("/timeline/list_hdfs")
PageResponse<HudiInstant> timelineHdfsList(@Query Map<String, Object> queryMap);
@Get("/timeline/all?active=true&archive=false")
ImmutableList<HudiInstant> timelineAllActive(@Query("flink_job_id") Long flinkJobId, @Query("alias") String alias);
@Get("/timeline/all_hdfs?active=true&archive=false")
ImmutableList<HudiInstant> timelineHdfsAllActive(@Query("hdfs") String hdfs);
@Get("/timeline/all?active=false&archive=true")
ImmutableList<HudiInstant> timelineAllArchive(@Query("flink_job_id") Long flinkJobId, @Query("alias") String alias);
@Get("/timeline/all_hdfs?active=false&archive=true")
ImmutableList<HudiInstant> timelineHdfsAllArchive(@Query("hdfs") String hdfs);
@Get("/timeline/all")
ImmutableList<HudiInstant> timelineAll(@Query("flink_job_id") Long flinkJobId, @Query("alias") String alias, @Query("active") Boolean active, @Query("archive") Boolean archive);
@Get("/timeline/all_hdfs")
ImmutableList<HudiInstant> timelineHdfsAll(@Query("hdfs") String hdfs, @Query("active") Boolean active, @Query("archive") Boolean archive);
@Get("/timeline/read_compaction_plan")
HudiCompactionPlan readCompactionPlan(@Query("flink_job_id") Long flinkJobId, @Query("alias") String alias, @Query("instant") String instant);

View File

@@ -82,6 +82,25 @@ public class TimelineController {
);
}
@GetMapping("all")
public ImmutableList<HudiInstant> allInstants(
@RequestParam("flink_job_id") Long flinkJobId,
@RequestParam("alias") String alias,
@RequestParam(value = "active", defaultValue = "true") Boolean active,
@RequestParam(value = "archive", defaultValue = "false") Boolean archive
) throws IOException {
return timelineService.timeline(flinkJobId, alias, active, archive);
}
@GetMapping("all_hdfs")
public ImmutableList<HudiInstant> allInstants(
@RequestParam("hdfs") String hdfs,
@RequestParam(value = "active", defaultValue = "true") Boolean active,
@RequestParam(value = "archive", defaultValue = "false") Boolean archive
) throws IOException {
return timelineService.timeline(hdfs, active, archive);
}
@GetMapping("read_compaction_plan")
public HudiCompactionPlan readCompactionPlan(
@RequestParam("flink_job_id") Long flinkJobId,

View File

@@ -30,7 +30,6 @@ import org.apache.hudi.table.action.rollback.RollbackUtils;
import org.eclipse.collections.api.factory.Lists;
import org.eclipse.collections.api.factory.Maps;
import org.eclipse.collections.api.list.ImmutableList;
import org.eclipse.collections.api.list.MutableList;
import org.eclipse.collections.api.map.ImmutableMap;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -100,33 +99,11 @@ public class TimelineService {
) throws IOException {
Configuration configuration = new Configuration();
FileSystem fileSystem = FileSystem.get(configuration);
if (!fileSystem.exists(new Path(hdfs))) {
throw new IOException(StrUtil.format("Path {} is not exists", hdfs));
}
HoodieTableMetaClient client = HoodieTableMetaClient.builder()
.setConf(configuration)
.setBasePath(hdfs)
.build();
MutableList<HudiInstant> instants = Lists.mutable.empty();
if (ObjectUtil.isEmpty(filterType)) {
filterType = Lists.immutable.of(INSTANT_TYPE_ARCHIVE, INSTANT_TYPE_ACTIVE);
}
if (filterType.contains(INSTANT_TYPE_ARCHIVE)) {
HoodieUtils.getAllInstants(client, HoodieTableMetaClient::getArchivedTimeline)
.collect(instant -> covert(INSTANT_TYPE_ARCHIVE, instant))
.forEach(instants::add);
}
if (filterType.contains(INSTANT_TYPE_ACTIVE)) {
HoodieUtils.getAllInstants(client, HoodieTableMetaClient::getActiveTimeline)
.collect(instant -> covert(INSTANT_TYPE_ACTIVE, instant))
.forEach(instants::add);
}
ImmutableList<HudiInstant> hudiInstants = instants
.toSortedList(HudiInstant::compareTo)
.select(instant -> ObjectUtil.isEmpty(filterAction) || filterAction.contains(instant.getAction()))
.select(instant -> ObjectUtil.isEmpty(filterState) || filterState.contains(instant.getState()))
.toSortedList(ComparatorUtil.stringComparator(order, direction, TIMELINE_SORT_MAP))
.toImmutable();
ImmutableList<HudiInstant> hudiInstants = timeline(order, direction, hdfs, filterType, filterAction, filterState);
ImmutableList<HudiInstant> result = hudiInstants
.drop(Math.max(page - 1, 0) * count)
.take(count)
@@ -148,6 +125,47 @@ public class TimelineService {
return new PageResponse<>(result.toList(), hudiInstants.size());
}
public ImmutableList<HudiInstant> timeline(
String order,
String direction,
String hdfs,
ImmutableList<String> filterType,
ImmutableList<String> filterAction,
ImmutableList<String> filterState
) throws IOException {
HoodieTableMetaClient client = HoodieTableMetaClient.builder()
.setConf(new Configuration())
.setBasePath(hdfs)
.build();
FileSystem fileSystem = client.getRawFs();
if (!fileSystem.exists(new Path(hdfs))) {
throw new IOException(StrUtil.format("Path {} is not exists", hdfs));
}
if (ObjectUtil.isEmpty(filterType)) {
filterType = Lists.immutable.of(INSTANT_TYPE_ARCHIVE, INSTANT_TYPE_ACTIVE);
}
ImmutableList<HudiInstant> instants = HoodieUtils.getAllInstants(client, filterType.contains(INSTANT_TYPE_ACTIVE), filterType.contains(INSTANT_TYPE_ARCHIVE));
return instants
.toSortedList(HudiInstant::compareTo)
.select(instant -> ObjectUtil.isEmpty(filterAction) || filterAction.contains(instant.getAction()))
.select(instant -> ObjectUtil.isEmpty(filterState) || filterState.contains(instant.getState()))
.toSortedList(ComparatorUtil.stringComparator(order, direction, TIMELINE_SORT_MAP))
.toImmutable();
}
public ImmutableList<HudiInstant> timeline(Long flinkJobId, String alias, Boolean active, Boolean archive) throws IOException {
TableMeta meta = infoService.tableMetaDetail(flinkJobId, alias);
return timeline(meta.getHudi().getTargetHdfsPath(), active, archive);
}
public ImmutableList<HudiInstant> timeline(String hdfs, Boolean active, Boolean archive) throws IOException {
HoodieTableMetaClient client = HoodieTableMetaClient.builder()
.setConf(new Configuration())
.setBasePath(hdfs)
.build();
return HoodieUtils.getAllInstants(client, active, archive);
}
@Cacheable(value = "read-compaction-plan", sync = true)
@Retryable(Throwable.class)
public HudiCompactionPlan readCompactionPlan(Long flinkJobId, String alias, String instant) throws IOException {
@@ -255,19 +273,8 @@ public class TimelineService {
.setConf(new Configuration())
.setBasePath(meta.getHudi().getTargetHdfsPath())
.build();
return HoodieUtils.getAllInstants(client, HoodieTableMetaClient::getActiveTimeline)
return HoodieUtils.getAllActiveInstants(client)
.select(instant -> StrUtil.equals(instant.getAction(), HoodieTimeline.COMPACTION_ACTION))
.reject(instant -> ObjectUtil.equals(instant.getState(), HoodieInstant.State.COMPLETED))
.collect(instant -> covert(INSTANT_TYPE_ACTIVE, instant));
}
private HudiInstant covert(String type, HoodieInstant instant) {
return new HudiInstant(
instant.getAction(),
instant.getState().name(),
instant.getTimestamp(),
instant.getFileName(),
type
);
.reject(instant -> StrUtil.equals(instant.getState(), HoodieInstant.State.COMPLETED.name()));
}
}

View File

@@ -1,24 +1,28 @@
package com.lanyuanxiaoyao.service.hudi.utils;
import cn.hutool.core.util.ReUtil;
import cn.hutool.core.util.StrUtil;
import com.lanyuanxiaoyao.service.configuration.entity.hudi.HudiInstant;
import java.io.IOException;
import java.util.Arrays;
import java.util.Comparator;
import java.util.List;
import java.util.Set;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.TableSchemaResolver;
import org.apache.hudi.common.table.timeline.HoodieDefaultTimeline;
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
import org.apache.hudi.common.table.timeline.HoodieArchivedTimeline;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.org.apache.avro.Schema;
import org.eclipse.collections.api.factory.Lists;
import org.eclipse.collections.api.list.ImmutableList;
import org.eclipse.collections.api.list.MutableList;
import org.eclipse.collections.api.map.MutableMap;
import org.eclipse.collections.api.multimap.list.MutableListMultimap;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -44,37 +48,98 @@ public class HoodieUtils {
return schemaUtil.getTableAvroSchema(true);
}
public static ImmutableList<HoodieInstant> getAllInstants(HoodieTableMetaClient client, Function<HoodieTableMetaClient, HoodieDefaultTimeline> getTimeline) throws IOException {
public static ImmutableList<HudiInstant> getAllActiveInstants(HoodieTableMetaClient client) throws IOException {
return getAllInstants(client, true, false);
}
public static ImmutableList<HudiInstant> getAllArchiveInstants(HoodieTableMetaClient client) throws IOException {
return getAllInstants(client, false, true);
}
public static ImmutableList<HudiInstant> getAllInstants(HoodieTableMetaClient client) throws IOException {
return getAllInstants(client, true, true);
}
public static ImmutableList<HudiInstant> getAllInstants(HoodieTableMetaClient client, Boolean active, Boolean archive) throws IOException {
FileSystem fileSystem = client.getRawFs();
// 直接使用 toString 方法得到的值是被缓存的
String hdfs = client.getBasePathV2().toUri().toString();
Path metadataPath = new Path(hdfs + "/.hoodie");
return getAllInstants(getTimeline.apply(client), fileSystem, metadataPath)
.toSortedList(HoodieInstant::compareTo)
Path metadataPath = new Path(client.getBasePathV2().toString(), HoodieTableMetaClient.METAFOLDER_NAME);
MutableList<HudiInstant> instants = Lists.mutable.empty();
if (active) {
instants.addAllIterable(activeInstants(client.getActiveTimeline()));
}
if (archive) {
instants.addAllIterable(archiveInstants(client.getArchivedTimeline()));
}
MutableMap<String, Long> fileModifiedTimeMap = Lists.immutable.of(fileSystem.listStatus(metadataPath))
.toMap(status -> status.getPath().toString(), FileStatus::getModificationTime);
return instants
.collect(instant -> {
String instantPath = StrUtil.format("{}/{}", client.getMetaPath(), instant.getFileName());
instant.setFileTime(fileModifiedTimeMap.getOrDefault(instantPath, 0L));
return instant;
})
.sortThis(Comparator
.<HudiInstant>comparingLong(instant -> Long.parseLong(instant.getTimestamp()))
.thenComparingLong(HudiInstant::getFileTime))
.toImmutable();
}
private static ImmutableList<HoodieInstant> getAllInstants(HoodieDefaultTimeline timeline, FileSystem fileSystem, Path metadataPath) throws IOException {
private static HudiInstant convert(HoodieInstant instant, String type) {
return new HudiInstant(
instant.getAction(),
instant.getState().name(),
instant.getTimestamp(),
instant.getFileName(),
0L,
type
);
}
private static ImmutableList<HudiInstant> activeInstants(HoodieActiveTimeline timeline) {
Set<String> committedTimestamps = timeline.getCommitsTimeline()
.filterCompletedInstants()
.getInstants()
.map(HoodieInstant::getTimestamp)
.collect(Collectors.toSet());
List<String> compactionRequestedTimestamps = Arrays.stream(fileSystem.listStatus(metadataPath))
.filter(status -> status.getPath().toString().endsWith(HoodieTimeline.REQUESTED_COMPACTION_EXTENSION))
.map(status -> status.getPath().getName())
.map(name -> ReUtil.get("^(\\d+)\\..+", name, 1))
.filter(committedTimestamps::contains)
.collect(Collectors.toList());
return Lists.immutable.ofAll(timeline.getInstants()
List<HudiInstant> instants = timeline.getInstants()
.map(instant -> convert(instant, "active"))
.map(instant -> {
if (compactionRequestedTimestamps.contains(instant.getTimestamp())) {
return new HoodieInstant(HoodieInstant.State.COMPLETED, HoodieTimeline.COMPACTION_ACTION, instant.getTimestamp());
if (HoodieTimeline.COMPACTION_ACTION.equals(instant.getAction()) && committedTimestamps.contains(instant.getTimestamp())) {
return new HudiInstant(
HoodieTimeline.COMPACTION_ACTION,
HoodieInstant.State.COMPLETED.name(),
instant.getTimestamp(),
instant.getFileName(),
instant.getFileTime(),
instant.getType()
);
}
return instant;
})
.sorted(Comparator.comparingLong(i -> Long.parseLong(i.getTimestamp())))
.collect(Collectors.toList()));
.collect(Collectors.toList());
return Lists.immutable.ofAll(instants);
}
private static ImmutableList<HudiInstant> archiveInstants(HoodieArchivedTimeline timeline) {
MutableList<HoodieInstant> instants = Lists.mutable.ofAll(timeline.getInstants().collect(Collectors.toList()));
instants.forEach(instant -> logger.info(instant.toString()));
MutableListMultimap<HoodieInstant.State, String> stateMap = instants.groupBy(HoodieInstant::getState).collectValues(HoodieInstant::getTimestamp);
return instants
.select(instant -> HoodieInstant.State.REQUESTED.equals(instant.getState()))
.collect(instant -> convert(instant, "archive"))
.collect(instant -> {
if (stateMap.containsKeyAndValue(HoodieInstant.State.INVALID, instant.getTimestamp())) {
instant.setState(HoodieInstant.State.INVALID.name());
} else if (stateMap.containsKeyAndValue(HoodieInstant.State.COMPLETED, instant.getTimestamp())) {
instant.setState(HoodieInstant.State.COMPLETED.name());
} else if (stateMap.containsKeyAndValue(HoodieInstant.State.INFLIGHT, instant.getTimestamp())) {
instant.setState(HoodieInstant.State.INFLIGHT.name());
}
return instant;
})
.toImmutable();
}
}