diff --git a/service-forest/src/main/java/com/lanyuanxiaoyao/service/forest/service/HudiService.java b/service-forest/src/main/java/com/lanyuanxiaoyao/service/forest/service/HudiService.java new file mode 100644 index 0000000..bda2712 --- /dev/null +++ b/service-forest/src/main/java/com/lanyuanxiaoyao/service/forest/service/HudiService.java @@ -0,0 +1,23 @@ +package com.lanyuanxiaoyao.service.forest.service; + +import com.dtflys.forest.annotation.BaseRequest; +import com.dtflys.forest.annotation.Get; +import com.dtflys.forest.annotation.Query; +import com.lanyuanxiaoyao.service.configuration.entity.hudi.HudiInstant; +import java.util.Map; +import org.eclipse.collections.api.list.ImmutableList; + +/** + * Hudi 操作 + * + * @author lanyuanxiaoyao + * @date 2023-05-01 + */ +@BaseRequest(baseURL = "http://service-hudi-query") +public interface HudiService { + @Get("/timeline/list") + ImmutableList timelineList(@Query Map queryMap); + + @Get("/timeline/list_hdfs") + ImmutableList timelineHdfsList(@Query Map queryMap); +} diff --git a/service-hudi-query/src/main/java/com/lanyuanxiaoyao/service/hudi/controller/TimelineController.java b/service-hudi-query/src/main/java/com/lanyuanxiaoyao/service/hudi/controller/TimelineController.java index 96d6869..3c50929 100644 --- a/service-hudi-query/src/main/java/com/lanyuanxiaoyao/service/hudi/controller/TimelineController.java +++ b/service-hudi-query/src/main/java/com/lanyuanxiaoyao/service/hudi/controller/TimelineController.java @@ -3,6 +3,8 @@ package com.lanyuanxiaoyao.service.hudi.controller; import com.lanyuanxiaoyao.service.configuration.entity.hudi.HudiInstant; import com.lanyuanxiaoyao.service.hudi.service.TimelineService; import java.io.IOException; +import java.util.List; +import org.eclipse.collections.api.factory.Lists; import org.eclipse.collections.api.list.ImmutableList; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -29,12 +31,34 @@ public class TimelineController { } @GetMapping("list") - public ImmutableList allInstants(@RequestParam("flink_job_id") Long flinkJobId, @RequestParam("alias") String alias) throws IOException { - return timelineService.timeline(flinkJobId, alias); + public ImmutableList allInstants( + @RequestParam("flink_job_id") Long flinkJobId, + @RequestParam("alias") String alias, + @RequestParam(value = "filter_type", required = false) List filterType, + @RequestParam(value = "filter_action", required = false) List filterAction, + @RequestParam(value = "filter_state", required = false) List filterState + ) throws IOException { + return timelineService.timeline( + flinkJobId, + alias, + Lists.immutable.ofAll(filterType), + Lists.immutable.ofAll(filterAction), + Lists.immutable.ofAll(filterState) + ); } @GetMapping("list_hdfs") - public ImmutableList allInstants(@RequestParam("hdfs") String hdfs) throws IOException { - return timelineService.timeline(hdfs); + public ImmutableList allInstants( + @RequestParam("hdfs") String hdfs, + @RequestParam(value = "filter_type", required = false) List filterType, + @RequestParam(value = "filter_action", required = false) List filterAction, + @RequestParam(value = "filter_state", required = false) List filterState + ) throws IOException { + return timelineService.timeline( + hdfs, + Lists.immutable.ofAll(filterType), + Lists.immutable.ofAll(filterAction), + Lists.immutable.ofAll(filterState) + ); } } diff --git a/service-hudi-query/src/main/java/com/lanyuanxiaoyao/service/hudi/service/TimelineService.java b/service-hudi-query/src/main/java/com/lanyuanxiaoyao/service/hudi/service/TimelineService.java index c8c80ec..1caefb1 100644 --- a/service-hudi-query/src/main/java/com/lanyuanxiaoyao/service/hudi/service/TimelineService.java +++ b/service-hudi-query/src/main/java/com/lanyuanxiaoyao/service/hudi/service/TimelineService.java @@ -1,5 +1,6 @@ package com.lanyuanxiaoyao.service.hudi.service; +import cn.hutool.core.util.ObjectUtil; import com.eshore.odcp.hudi.connector.entity.TableMeta; import com.lanyuanxiaoyao.service.configuration.entity.hudi.HudiInstant; import com.lanyuanxiaoyao.service.forest.service.InfoService; @@ -8,7 +9,9 @@ import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.common.table.timeline.HoodieInstant; +import org.eclipse.collections.api.factory.Lists; import org.eclipse.collections.api.list.ImmutableList; +import org.eclipse.collections.api.list.MutableList; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.cache.annotation.Cacheable; @@ -34,23 +37,40 @@ public class TimelineService { @Cacheable(value = "timeline", sync = true, key = "#flinkJobId.toString()+#alias") @Retryable(Throwable.class) - public ImmutableList timeline(Long flinkJobId, String alias) throws IOException { + public ImmutableList timeline(Long flinkJobId, String alias, ImmutableList filterType, ImmutableList filterAction, ImmutableList filterState) throws IOException { TableMeta meta = infoService.tableMetaDetail(flinkJobId, alias); - return timeline(meta.getHudi().getTargetHdfsPath()); + return timeline(meta.getHudi().getTargetHdfsPath(), filterType, filterAction, filterState); } + private static final String INSTANT_TYPE_ACTIVE = "active"; + private static final String INSTANT_TYPE_ARCHIVE = "archive"; + @Cacheable(value = "timeline", sync = true, key = "#hdfs") @Retryable(Throwable.class) - public ImmutableList timeline(String hdfs) throws IOException { + public ImmutableList timeline(String hdfs, ImmutableList filterType, ImmutableList filterAction, ImmutableList filterState) throws IOException { HoodieTableMetaClient client = HoodieTableMetaClient.builder() .setConf(new Configuration()) .setBasePath(hdfs) .build(); - ImmutableList activeInstants = HoodieUtils.getAllInstants(client, HoodieTableMetaClient::getActiveTimeline) - .collect(instant -> covert("active", instant)); - ImmutableList archiveInstants = HoodieUtils.getAllInstants(client, HoodieTableMetaClient::getArchivedTimeline) - .collect(instant -> covert("archive", instant)); - return activeInstants.newWithAll(archiveInstants) + MutableList instants = Lists.mutable.empty(); + if (ObjectUtil.isEmpty(filterType)) { + filterType = Lists.immutable.of(INSTANT_TYPE_ARCHIVE, INSTANT_TYPE_ACTIVE); + } + if (filterType.contains(INSTANT_TYPE_ARCHIVE)) { + HoodieUtils.getAllInstants(client, HoodieTableMetaClient::getArchivedTimeline) + .collect(instant -> covert(INSTANT_TYPE_ARCHIVE, instant)) + .select(instant -> ObjectUtil.isEmpty(filterAction) || filterAction.contains(instant.getAction())) + .select(instant -> ObjectUtil.isEmpty(filterState) || filterState.contains(instant.getState())) + .forEach(instants::add); + } + if (filterType.contains(INSTANT_TYPE_ACTIVE)) { + HoodieUtils.getAllInstants(client, HoodieTableMetaClient::getActiveTimeline) + .collect(instant -> covert(INSTANT_TYPE_ACTIVE, instant)) + .select(instant -> ObjectUtil.isEmpty(filterAction) || filterAction.contains(instant.getAction())) + .select(instant -> ObjectUtil.isEmpty(filterState) || filterState.contains(instant.getState())) + .forEach(instants::add); + } + return instants .toSortedList(HudiInstant::compareTo) .toImmutable(); } diff --git a/service-web/src/main/java/com/lanyuanxiaoyao/service/web/controller/HudiController.java b/service-web/src/main/java/com/lanyuanxiaoyao/service/web/controller/HudiController.java new file mode 100644 index 0000000..9bd9b26 --- /dev/null +++ b/service-web/src/main/java/com/lanyuanxiaoyao/service/web/controller/HudiController.java @@ -0,0 +1,56 @@ +package com.lanyuanxiaoyao.service.web.controller; + +import cn.hutool.core.util.ObjectUtil; +import com.lanyuanxiaoyao.service.configuration.entity.AmisResponse; +import com.lanyuanxiaoyao.service.forest.service.HudiService; +import java.util.List; +import org.eclipse.collections.api.factory.Maps; +import org.eclipse.collections.api.map.MutableMap; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.web.bind.annotation.GetMapping; +import org.springframework.web.bind.annotation.RequestMapping; +import org.springframework.web.bind.annotation.RequestParam; +import org.springframework.web.bind.annotation.RestController; + +/** + * Hudi 接口 + * + * @author lanyuanxiaoyao + * @date 2023-05-01 + */ +@RestController +@RequestMapping("hudi") +public class HudiController extends BaseController { + private static final Logger logger = LoggerFactory.getLogger(HudiController.class); + + private final HudiService hudiService; + + @SuppressWarnings("SpringJavaInjectionPointsAutowiringInspection") + public HudiController(HudiService hudiService) { + this.hudiService = hudiService; + } + + @GetMapping("/timeline/list") + public AmisResponse timeline( + @RequestParam("flink_job_id") Long flinkJobId, + @RequestParam("alias") String alias, + @RequestParam(value = "filter_type", required = false) List filterType, + @RequestParam(value = "filter_action", required = false) List filterAction, + @RequestParam(value = "filter_state", required = false) List filterState + ) { + MutableMap queryMap = Maps.mutable.empty(); + queryMap.put("flink_job_id", flinkJobId); + queryMap.put("alias", alias); + if (ObjectUtil.isNotEmpty(filterType)) { + queryMap.put("filter_type", filterType); + } + if (ObjectUtil.isNotEmpty(filterAction)) { + queryMap.put("filter_action", filterAction); + } + if (ObjectUtil.isNotEmpty(filterAction)) { + queryMap.put("filter_state", filterState); + } + return responseCrudData(hudiService.timelineList(queryMap)); + } +} diff --git a/service-web/src/main/resources/static/components/common.js b/service-web/src/main/resources/static/components/common.js index a87267a..5697370 100644 --- a/service-web/src/main/resources/static/components/common.js +++ b/service-web/src/main/resources/static/components/common.js @@ -659,3 +659,49 @@ function publishTypeMapping(field) { }, } } + +function hudiTimelineActionMapping(field) { + return { + type: 'mapping', + value: `\${${field}}`, + map: { + 'commit': "Commit", + 'deltacommit': "Delta Commit", + 'clean': "Clean", + 'rollback': "Rollback", + 'savepoint': "Savepoint", + 'replacecommit': "Replace Commit", + 'compaction': "Compaction", + 'restore': "Restore", + 'indexing': "Indexing", + 'schemacommit': "Schema Commit", + '*': `\${${field}}` + }, + } +} + +function hudiTimelineStateMapping(field) { + return { + type: 'mapping', + value: `\${${field}}`, + map: { + 'REQUESTED': "已提交", + 'INFLIGHT': "操作中", + 'COMPLETED': "已完成", + 'INVALID': "错误", + '*': `\${${field}}` + }, + } +} + +function hudiTimelineTypeMapping(field) { + return { + type: 'mapping', + value: `\${${field}}`, + map: { + 'active': "活跃", + 'archive': "归档", + '*': `\${${field}}` + }, + } +} diff --git a/service-web/src/main/resources/static/components/table-tab.js b/service-web/src/main/resources/static/components/table-tab.js index 6a89dad..4450559 100644 --- a/service-web/src/main/resources/static/components/table-tab.js +++ b/service-web/src/main/resources/static/components/table-tab.js @@ -68,7 +68,7 @@ function tableTab() { filterDefaultVisible: true, stopAutoRefreshWhenModalIsOpen: true, resizable: false, - perPage: 10, + perPage: 20, headerToolbar: [ "reload", 'filter-toggler', @@ -255,6 +255,55 @@ function tableTab() { actionType: 'dialog', dialog: simpleYarnDialog('compaction', '压缩详情') }, + { + label: '时间线', + type: 'action', + level: 'link', + actionType: 'dialog', + dialog: { + title: 'Hudi 表时间线', + actions: [], + size: 'lg', + body: { + type: 'crud', + api: { + method: 'get', + url: '${base}/hudi/timeline/list', + data: { + flink_job_id: '${flinkJobId}', + alias: '${tableMeta.alias}', + filter_type: 'active' + }, + }, + syncLocation: false, + columns: [ + { + name: 'timestamp', + label: '时间点', + }, + { + name: 'action', + label: '类型', + ...hudiTimelineActionMapping('action'), + }, + { + name: 'state', + label: ' 状态', + ...hudiTimelineStateMapping('state'), + }, + { + name: 'fileName', + label: '文件名', + }, + { + name: 'type', + label: '来源', + ...hudiTimelineTypeMapping('type'), + }, + ], + } + } + }, { label: '队列', type: 'action',