From 1ce0b74e2dd8bc863f015c3bc8b5413a2a2449fd Mon Sep 17 00:00:00 2001 From: lanyuanxiaoyao Date: Wed, 28 Jun 2023 16:53:03 +0800 Subject: [PATCH] =?UTF-8?q?feature(hudi-query):=20Hudi=20=E6=97=B6?= =?UTF-8?q?=E9=97=B4=E7=BA=BF=E6=9F=A5=E8=AF=A2=E5=A2=9E=E5=8A=A0=20instan?= =?UTF-8?q?t=20=E6=96=87=E4=BB=B6=E5=88=9B=E5=BB=BA=E6=97=B6=E9=97=B4?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Hudi 时间线查询增加分页参数下推,增加性能 --- .../configuration/entity/PageResponse.java | 16 ++-- .../entity/hudi/HudiInstant.java | 8 ++ .../configuration}/utils/ComparatorUtil.java | 2 +- .../configuration}/utils/DatetimeUtil.java | 2 +- .../service/forest/service/HudiService.java | 5 +- .../hudi/controller/TimelineController.java | 44 +++++++++-- .../service/hudi/service/TimelineService.java | 75 +++++++++++++++++-- .../web/controller/HudiController.java | 24 +++--- .../web/controller/YarnController.java | 2 +- .../web/entity/CompactionMetricsVO.java | 2 +- .../service/web/entity/SyncStateVO.java | 2 +- .../service/web/entity/YarnApplicationVO.java | 2 +- .../service/web/entity/ZookeeperNodeVO.java | 2 +- web/components/common.js | 13 +++- 14 files changed, 154 insertions(+), 45 deletions(-) rename {service-web/src/main/java/com/lanyuanxiaoyao/service/web => service-configuration/src/main/java/com/lanyuanxiaoyao/service/configuration}/utils/ComparatorUtil.java (96%) rename {service-web/src/main/java/com/lanyuanxiaoyao/service/web => service-configuration/src/main/java/com/lanyuanxiaoyao/service/configuration}/utils/DatetimeUtil.java (96%) diff --git a/service-configuration/src/main/java/com/lanyuanxiaoyao/service/configuration/entity/PageResponse.java b/service-configuration/src/main/java/com/lanyuanxiaoyao/service/configuration/entity/PageResponse.java index 5e0b4e9..2849186 100644 --- a/service-configuration/src/main/java/com/lanyuanxiaoyao/service/configuration/entity/PageResponse.java +++ b/service-configuration/src/main/java/com/lanyuanxiaoyao/service/configuration/entity/PageResponse.java @@ -12,21 +12,21 @@ import java.util.Map; * @date 2023-04-26 */ public class PageResponse { - private List data = new ArrayList<>(0); + private Iterable data = new ArrayList<>(0); private Long total = 0L; private Map metadata = new HashMap<>(); public PageResponse() { } - public PageResponse(List data) { + public PageResponse(Iterable data, Long total) { this.data = data; - this.total = (long) data.size(); + this.total = total; } - public PageResponse(List data, Long total) { - this(data); - this.total = total; + public PageResponse(Iterable data, Integer total) { + this.data = data; + this.total = total.longValue(); } public PageResponse(List data, Long total, Map metadata) { @@ -34,11 +34,11 @@ public class PageResponse { this.metadata = metadata; } - public List getData() { + public Iterable getData() { return data; } - public void setData(List data) { + public void setData(Iterable data) { this.data = data; } diff --git a/service-configuration/src/main/java/com/lanyuanxiaoyao/service/configuration/entity/hudi/HudiInstant.java b/service-configuration/src/main/java/com/lanyuanxiaoyao/service/configuration/entity/hudi/HudiInstant.java index 6560ab1..a564d61 100644 --- a/service-configuration/src/main/java/com/lanyuanxiaoyao/service/configuration/entity/hudi/HudiInstant.java +++ b/service-configuration/src/main/java/com/lanyuanxiaoyao/service/configuration/entity/hudi/HudiInstant.java @@ -28,6 +28,7 @@ public class HudiInstant implements Comparable { private String state; private String timestamp; private String fileName; + private Long fileTime = 0L; // active or archive private String type; @@ -62,6 +63,13 @@ public class HudiInstant implements Comparable { return type; } + public Long getFileTime() { + return fileTime; + } + + public void setFileTime(Long fileTime) { + this.fileTime = fileTime; + } @Override public String toString() { diff --git a/service-web/src/main/java/com/lanyuanxiaoyao/service/web/utils/ComparatorUtil.java b/service-configuration/src/main/java/com/lanyuanxiaoyao/service/configuration/utils/ComparatorUtil.java similarity index 96% rename from service-web/src/main/java/com/lanyuanxiaoyao/service/web/utils/ComparatorUtil.java rename to service-configuration/src/main/java/com/lanyuanxiaoyao/service/configuration/utils/ComparatorUtil.java index fdb7309..d9458ca 100644 --- a/service-web/src/main/java/com/lanyuanxiaoyao/service/web/utils/ComparatorUtil.java +++ b/service-configuration/src/main/java/com/lanyuanxiaoyao/service/configuration/utils/ComparatorUtil.java @@ -1,4 +1,4 @@ -package com.lanyuanxiaoyao.service.web.utils; +package com.lanyuanxiaoyao.service.configuration.utils; import cn.hutool.core.util.StrUtil; import java.util.Comparator; diff --git a/service-web/src/main/java/com/lanyuanxiaoyao/service/web/utils/DatetimeUtil.java b/service-configuration/src/main/java/com/lanyuanxiaoyao/service/configuration/utils/DatetimeUtil.java similarity index 96% rename from service-web/src/main/java/com/lanyuanxiaoyao/service/web/utils/DatetimeUtil.java rename to service-configuration/src/main/java/com/lanyuanxiaoyao/service/configuration/utils/DatetimeUtil.java index d024f48..b70b2b5 100644 --- a/service-web/src/main/java/com/lanyuanxiaoyao/service/web/utils/DatetimeUtil.java +++ b/service-configuration/src/main/java/com/lanyuanxiaoyao/service/configuration/utils/DatetimeUtil.java @@ -1,4 +1,4 @@ -package com.lanyuanxiaoyao.service.web.utils; +package com.lanyuanxiaoyao.service.configuration.utils; import cn.hutool.core.util.ObjectUtil; import java.util.function.Supplier; diff --git a/service-forest/src/main/java/com/lanyuanxiaoyao/service/forest/service/HudiService.java b/service-forest/src/main/java/com/lanyuanxiaoyao/service/forest/service/HudiService.java index 6d1e3cf..503feb5 100644 --- a/service-forest/src/main/java/com/lanyuanxiaoyao/service/forest/service/HudiService.java +++ b/service-forest/src/main/java/com/lanyuanxiaoyao/service/forest/service/HudiService.java @@ -3,6 +3,7 @@ package com.lanyuanxiaoyao.service.forest.service; import com.dtflys.forest.annotation.BaseRequest; import com.dtflys.forest.annotation.Get; import com.dtflys.forest.annotation.Query; +import com.lanyuanxiaoyao.service.configuration.entity.PageResponse; import com.lanyuanxiaoyao.service.configuration.entity.hudi.HudiCompactionPlan; import com.lanyuanxiaoyao.service.configuration.entity.hudi.HudiInstant; import java.util.Map; @@ -17,10 +18,10 @@ import org.eclipse.collections.api.list.ImmutableList; @BaseRequest(baseURL = "http://service-hudi-query") public interface HudiService { @Get("/timeline/list") - ImmutableList timelineList(@Query Map queryMap); + PageResponse timelineList(@Query Map queryMap); @Get("/timeline/list_hdfs") - ImmutableList timelineHdfsList(@Query Map queryMap); + PageResponse timelineHdfsList(@Query Map queryMap); @Get("/timeline/list_pending_compaction") ImmutableList timelinePendingCompactionList(@Query("flink_job_id") Long flinkJobId, @Query("alias") String alias); diff --git a/service-hudi-query/src/main/java/com/lanyuanxiaoyao/service/hudi/controller/TimelineController.java b/service-hudi-query/src/main/java/com/lanyuanxiaoyao/service/hudi/controller/TimelineController.java index 6ce37b4..2e2dcf5 100644 --- a/service-hudi-query/src/main/java/com/lanyuanxiaoyao/service/hudi/controller/TimelineController.java +++ b/service-hudi-query/src/main/java/com/lanyuanxiaoyao/service/hudi/controller/TimelineController.java @@ -1,5 +1,6 @@ package com.lanyuanxiaoyao.service.hudi.controller; +import com.lanyuanxiaoyao.service.configuration.entity.PageResponse; import com.lanyuanxiaoyao.service.configuration.entity.hudi.HudiInstant; import com.lanyuanxiaoyao.service.hudi.service.TimelineService; import java.io.IOException; @@ -31,20 +32,51 @@ public class TimelineController { } @GetMapping("list") - public ImmutableList allInstants( + public PageResponse allInstants( + @RequestParam(value = "page", defaultValue = "1") Integer page, + @RequestParam(value = "count", defaultValue = "10") Integer count, + @RequestParam(value = "order", required = false) String order, + @RequestParam(value = "direction", required = false) String direction, @RequestParam("flink_job_id") Long flinkJobId, @RequestParam("alias") String alias, - @RequestParam(value = "filter_type", required = false) List filterType + @RequestParam(value = "filter_type", required = false) List filterType, + @RequestParam(value = "filter_action", required = false) List filterAction, + @RequestParam(value = "filter_state", required = false) List filterState ) throws IOException { - return timelineService.timeline(flinkJobId, alias, Lists.immutable.ofAll(filterType)); + return timelineService.timeline( + page, + count, + order, + direction, + flinkJobId, + alias, + Lists.immutable.ofAll(filterType), + Lists.immutable.ofAll(filterAction), + Lists.immutable.ofAll(filterState) + ); } @GetMapping("list_hdfs") - public ImmutableList allInstants( + public PageResponse allInstants( + @RequestParam(value = "page", defaultValue = "1") Integer page, + @RequestParam(value = "count", defaultValue = "10") Integer count, + @RequestParam(value = "order", required = false) String order, + @RequestParam(value = "direction", required = false) String direction, @RequestParam("hdfs") String hdfs, - @RequestParam(value = "filter_type", required = false) List filterType + @RequestParam(value = "filter_type", required = false) List filterType, + @RequestParam(value = "filter_action", required = false) List filterAction, + @RequestParam(value = "filter_state", required = false) List filterState ) throws IOException { - return timelineService.timeline(hdfs, Lists.immutable.ofAll(filterType)); + return timelineService.timeline( + page, + count, + order, + direction, + hdfs, + Lists.immutable.ofAll(filterType), + Lists.immutable.ofAll(filterAction), + Lists.immutable.ofAll(filterState) + ); } @GetMapping("list_pending_compaction") diff --git a/service-hudi-query/src/main/java/com/lanyuanxiaoyao/service/hudi/service/TimelineService.java b/service-hudi-query/src/main/java/com/lanyuanxiaoyao/service/hudi/service/TimelineService.java index c6dcd3c..1325cda 100644 --- a/service-hudi-query/src/main/java/com/lanyuanxiaoyao/service/hudi/service/TimelineService.java +++ b/service-hudi-query/src/main/java/com/lanyuanxiaoyao/service/hudi/service/TimelineService.java @@ -3,17 +3,26 @@ package com.lanyuanxiaoyao.service.hudi.service; import cn.hutool.core.util.ObjectUtil; import cn.hutool.core.util.StrUtil; import com.eshore.odcp.hudi.connector.entity.TableMeta; +import com.lanyuanxiaoyao.service.configuration.ExecutorProvider; +import com.lanyuanxiaoyao.service.configuration.entity.PageResponse; import com.lanyuanxiaoyao.service.configuration.entity.hudi.HudiInstant; +import com.lanyuanxiaoyao.service.configuration.utils.ComparatorUtil; import com.lanyuanxiaoyao.service.forest.service.InfoService; import com.lanyuanxiaoyao.service.hudi.utils.HoodieUtils; import java.io.IOException; +import java.util.function.Function; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.common.table.timeline.HoodieInstant; import org.apache.hudi.common.table.timeline.HoodieTimeline; import org.eclipse.collections.api.factory.Lists; +import org.eclipse.collections.api.factory.Maps; import org.eclipse.collections.api.list.ImmutableList; import org.eclipse.collections.api.list.MutableList; +import org.eclipse.collections.api.map.ImmutableMap; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.cache.annotation.Cacheable; @@ -31,6 +40,10 @@ public class TimelineService { private static final Logger logger = LoggerFactory.getLogger(TimelineService.class); private static final String INSTANT_TYPE_ACTIVE = "active"; private static final String INSTANT_TYPE_ARCHIVE = "archive"; + private static final ImmutableMap> TIMELINE_SORT_MAP = Maps.immutable.of( + "timestamp", + HudiInstant::getTimestamp + ); private final InfoService infoService; @SuppressWarnings("SpringJavaInjectionPointsAutowiringInspection") @@ -40,16 +53,46 @@ public class TimelineService { @Cacheable(value = "timeline", sync = true, key = "#flinkJobId.toString()+#alias") @Retryable(Throwable.class) - public ImmutableList timeline(Long flinkJobId, String alias, ImmutableList filterType) throws IOException { + public PageResponse timeline( + Integer page, + Integer count, + String order, + String direction, + Long flinkJobId, + String alias, + ImmutableList filterType, + ImmutableList filterAction, + ImmutableList filterState + ) throws IOException { TableMeta meta = infoService.tableMetaDetail(flinkJobId, alias); - return timeline(meta.getHudi().getTargetHdfsPath(), filterType); + return timeline( + page, + count, + order, + direction, + meta.getHudi().getTargetHdfsPath(), + filterType, + filterAction, + filterState + ); } @Cacheable(value = "timeline", sync = true, key = "#hdfs") @Retryable(Throwable.class) - public ImmutableList timeline(String hdfs, ImmutableList filterType) throws IOException { + public PageResponse timeline( + Integer page, + Integer count, + String order, + String direction, + String hdfs, + ImmutableList filterType, + ImmutableList filterAction, + ImmutableList filterState + ) throws IOException { + Configuration configuration = new Configuration(); + FileSystem fileSystem = FileSystem.get(configuration); HoodieTableMetaClient client = HoodieTableMetaClient.builder() - .setConf(new Configuration()) + .setConf(configuration) .setBasePath(hdfs) .build(); MutableList instants = Lists.mutable.empty(); @@ -66,9 +109,31 @@ public class TimelineService { .collect(instant -> covert(INSTANT_TYPE_ACTIVE, instant)) .forEach(instants::add); } - return instants + ImmutableList hudiInstants = instants .toSortedList(HudiInstant::compareTo) + .select(instant -> ObjectUtil.isEmpty(filterAction) || filterAction.contains(instant.getAction())) + .select(instant -> ObjectUtil.isEmpty(filterState) || filterState.contains(instant.getState())) + .toSortedList(ComparatorUtil.stringComparator(order, direction, TIMELINE_SORT_MAP)) .toImmutable(); + ImmutableList result = hudiInstants + .drop(Math.max(page - 1, 0) * count) + .take(count) + .asParallel(ExecutorProvider.EXECUTORS, 1) + .collect(instant -> { + Path instantPath = new Path(StrUtil.format("{}/{}", client.getMetaPath(), instant.getFileName())); + try { + if (fileSystem.exists(instantPath)) { + FileStatus status = fileSystem.getFileStatus(instantPath); + instant.setFileTime(status.getModificationTime()); + } + } catch (IOException e) { + logger.error("IOError", e); + } + return instant; + }) + .toList() + .toImmutable(); + return new PageResponse<>(result.toList(), hudiInstants.size()); } @Cacheable(value = "pending_compaction_timeline", sync = true, key = "#flinkJobId.toString()+#alias") diff --git a/service-web/src/main/java/com/lanyuanxiaoyao/service/web/controller/HudiController.java b/service-web/src/main/java/com/lanyuanxiaoyao/service/web/controller/HudiController.java index 2b83c92..a0d8a6b 100644 --- a/service-web/src/main/java/com/lanyuanxiaoyao/service/web/controller/HudiController.java +++ b/service-web/src/main/java/com/lanyuanxiaoyao/service/web/controller/HudiController.java @@ -2,9 +2,10 @@ package com.lanyuanxiaoyao.service.web.controller; import cn.hutool.core.util.ObjectUtil; import com.lanyuanxiaoyao.service.configuration.entity.AmisResponse; +import com.lanyuanxiaoyao.service.configuration.entity.PageResponse; import com.lanyuanxiaoyao.service.configuration.entity.hudi.HudiInstant; import com.lanyuanxiaoyao.service.forest.service.HudiService; -import com.lanyuanxiaoyao.service.web.utils.ComparatorUtil; +import com.lanyuanxiaoyao.service.configuration.utils.ComparatorUtil; import java.util.List; import java.util.function.Function; import org.eclipse.collections.api.factory.Maps; @@ -53,20 +54,17 @@ public class HudiController extends BaseController { @RequestParam(value = "filter_action", required = false) List filterAction, @RequestParam(value = "filter_state", required = false) List filterState ) { - MutableMap queryMap = Maps.mutable.empty(); - queryMap.put("flink_job_id", flinkJobId); - queryMap.put("alias", alias); + MutableMap queryMap = buildQueryMap(page, count, order, direction, flinkJobId.toString(), alias); if (ObjectUtil.isNotEmpty(filterType)) { queryMap.put("filter_type", filterType); } - ImmutableList hudiInstants = hudiService.timelineList(queryMap) - .select(instant -> ObjectUtil.isEmpty(filterAction) || filterAction.contains(instant.getAction())) - .select(instant -> ObjectUtil.isEmpty(filterState) || filterState.contains(instant.getState())) - .toSortedList(ComparatorUtil.stringComparator(order, direction, TIMELINE_SORT_MAP)) - .toImmutable(); - ImmutableList result = hudiInstants - .drop(Math.max(page - 1, 0) * count) - .take(count); - return responseCrudData(result, hudiInstants.size()); + if (ObjectUtil.isNotEmpty(filterAction)) { + queryMap.put("filter_action", filterAction); + } + if (ObjectUtil.isNotEmpty(filterState)) { + queryMap.put("filter_state", filterState); + } + PageResponse response = hudiService.timelineList(queryMap); + return responseCrudData(response.getData(), response.getTotal()); } } diff --git a/service-web/src/main/java/com/lanyuanxiaoyao/service/web/controller/YarnController.java b/service-web/src/main/java/com/lanyuanxiaoyao/service/web/controller/YarnController.java index 61ca866..676f5f5 100644 --- a/service-web/src/main/java/com/lanyuanxiaoyao/service/web/controller/YarnController.java +++ b/service-web/src/main/java/com/lanyuanxiaoyao/service/web/controller/YarnController.java @@ -14,7 +14,7 @@ import com.lanyuanxiaoyao.service.forest.service.FlinkService; import com.lanyuanxiaoyao.service.forest.service.YarnService; import com.lanyuanxiaoyao.service.web.entity.YarnApplicationVO; import com.lanyuanxiaoyao.service.web.entity.YarnClusterVO; -import com.lanyuanxiaoyao.service.web.utils.ComparatorUtil; +import com.lanyuanxiaoyao.service.configuration.utils.ComparatorUtil; import java.util.Comparator; import java.util.List; import java.util.Optional; diff --git a/service-web/src/main/java/com/lanyuanxiaoyao/service/web/entity/CompactionMetricsVO.java b/service-web/src/main/java/com/lanyuanxiaoyao/service/web/entity/CompactionMetricsVO.java index ae0a25a..0fb598b 100644 --- a/service-web/src/main/java/com/lanyuanxiaoyao/service/web/entity/CompactionMetricsVO.java +++ b/service-web/src/main/java/com/lanyuanxiaoyao/service/web/entity/CompactionMetricsVO.java @@ -2,7 +2,7 @@ package com.lanyuanxiaoyao.service.web.entity; import cn.hutool.core.util.ObjectUtil; import com.lanyuanxiaoyao.service.configuration.entity.info.CompactionMetrics; -import com.lanyuanxiaoyao.service.web.utils.DatetimeUtil; +import com.lanyuanxiaoyao.service.configuration.utils.DatetimeUtil; import java.time.Instant; /** diff --git a/service-web/src/main/java/com/lanyuanxiaoyao/service/web/entity/SyncStateVO.java b/service-web/src/main/java/com/lanyuanxiaoyao/service/web/entity/SyncStateVO.java index 0504a1b..eff6eab 100644 --- a/service-web/src/main/java/com/lanyuanxiaoyao/service/web/entity/SyncStateVO.java +++ b/service-web/src/main/java/com/lanyuanxiaoyao/service/web/entity/SyncStateVO.java @@ -2,7 +2,7 @@ package com.lanyuanxiaoyao.service.web.entity; import com.eshore.odcp.hudi.connector.entity.SyncState; import com.fasterxml.jackson.annotation.JsonIgnore; -import com.lanyuanxiaoyao.service.web.utils.DatetimeUtil; +import com.lanyuanxiaoyao.service.configuration.utils.DatetimeUtil; import java.time.Instant; /** diff --git a/service-web/src/main/java/com/lanyuanxiaoyao/service/web/entity/YarnApplicationVO.java b/service-web/src/main/java/com/lanyuanxiaoyao/service/web/entity/YarnApplicationVO.java index e0c2fe0..ac11d13 100644 --- a/service-web/src/main/java/com/lanyuanxiaoyao/service/web/entity/YarnApplicationVO.java +++ b/service-web/src/main/java/com/lanyuanxiaoyao/service/web/entity/YarnApplicationVO.java @@ -5,7 +5,7 @@ import cn.hutool.core.util.ReUtil; import com.eshore.odcp.hudi.connector.utils.NameHelper; import com.fasterxml.jackson.annotation.JsonIgnore; import com.lanyuanxiaoyao.service.configuration.entity.yarn.YarnApplication; -import com.lanyuanxiaoyao.service.web.utils.DatetimeUtil; +import com.lanyuanxiaoyao.service.configuration.utils.DatetimeUtil; import java.time.Instant; /** diff --git a/service-web/src/main/java/com/lanyuanxiaoyao/service/web/entity/ZookeeperNodeVO.java b/service-web/src/main/java/com/lanyuanxiaoyao/service/web/entity/ZookeeperNodeVO.java index 3837c58..417c913 100644 --- a/service-web/src/main/java/com/lanyuanxiaoyao/service/web/entity/ZookeeperNodeVO.java +++ b/service-web/src/main/java/com/lanyuanxiaoyao/service/web/entity/ZookeeperNodeVO.java @@ -4,7 +4,7 @@ import cn.hutool.core.util.ObjectUtil; import com.eshore.odcp.hudi.connector.entity.RunMeta; import com.fasterxml.jackson.annotation.JsonIgnore; import com.lanyuanxiaoyao.service.configuration.entity.zookeeper.ZookeeperNode; -import com.lanyuanxiaoyao.service.web.utils.DatetimeUtil; +import com.lanyuanxiaoyao.service.configuration.utils.DatetimeUtil; import java.time.Instant; import java.util.Optional; diff --git a/web/components/common.js b/web/components/common.js index 6502a50..f2aab4d 100644 --- a/web/components/common.js +++ b/web/components/common.js @@ -906,13 +906,10 @@ function tableMetaDialog() { direction: '${orderDir|default:undefined}', flink_job_id: '${flinkJobId|default:undefined}', alias: '${tableMeta.alias|default:undefined}', - filter_type: '${type|default:undefined}', + filter_type: "${type|default:active}", filter_action: '${action|default:undefined}', filter_state: '${state|default:undefined}', }, - defaultParams: { - filter_type: 'active', - }, }, ...crudCommonOptions(), perPage: 15, @@ -945,6 +942,14 @@ function tableMetaDialog() { ...mappingField('state', hudiTimelineStateMapping), filterable: filterableField(hudiTimelineStateMapping, true), }, + { + name: 'fileTime', + label: ' 文件时间', + width: 150, + align: 'center', + type: 'tpl', + tpl: "${DATETOSTR(DATE(fileTime), 'YYYY-MM-DD HH:mm:ss')}", + }, { name: 'fileName', label: '文件名',