diff --git a/service-web/src/main/java/com/lanyuanxiaoyao/service/web/controller/TableController.java b/service-web/src/main/java/com/lanyuanxiaoyao/service/web/controller/TableController.java index 6f9de3a..9eaadc0 100644 --- a/service-web/src/main/java/com/lanyuanxiaoyao/service/web/controller/TableController.java +++ b/service-web/src/main/java/com/lanyuanxiaoyao/service/web/controller/TableController.java @@ -11,9 +11,11 @@ import com.fasterxml.jackson.databind.ObjectMapper; import com.lanyuanxiaoyao.service.configuration.ExecutorProvider; import com.lanyuanxiaoyao.service.configuration.entity.AmisResponse; import com.lanyuanxiaoyao.service.configuration.entity.PageResponse; +import com.lanyuanxiaoyao.service.configuration.entity.info.CompactionMetrics; import com.lanyuanxiaoyao.service.configuration.entity.info.JobIdAndAlias; import com.lanyuanxiaoyao.service.forest.service.InfoService; import com.lanyuanxiaoyao.service.forest.service.ZookeeperService; +import com.lanyuanxiaoyao.service.web.entity.CompactionMetricsVO; import com.lanyuanxiaoyao.service.web.entity.FlinkJobVO; import com.lanyuanxiaoyao.service.web.entity.SyncStateVO; import com.lanyuanxiaoyao.service.web.entity.TableVO; @@ -158,4 +160,26 @@ public class TableController extends BaseController { .withData("flinkJob", new FlinkJobVO(flinkJobFuture.get())) .withData("tableMeta", tableMetaFuture.get()); } + + @GetMapping("list_compaction_metrics") + public AmisResponse listCompactionMetrics( + @RequestParam(value = "page", defaultValue = "1") Integer page, + @RequestParam(value = "count", defaultValue = "10") Integer count, + @RequestParam(value = "order", required = false) String order, + @RequestParam(value = "direction", required = false) String direction, + @RequestParam(value = "search_flink_job_id") String searchFlinkJobId, + @RequestParam(value = "search_alias") String searchAlias, + @RequestParam(value = "filter_completes", required = false) List filterCompletes + ) throws Exception { + if (ObjectUtil.isNull(searchFlinkJobId) || ObjectUtil.isNull(searchAlias)) { + throw new Exception("flink job id or alias is null"); + } + MutableMap queryMap = buildQueryMap(page, count, order, direction, searchFlinkJobId, searchAlias); + if (ObjectUtil.isNotEmpty(filterCompletes)) { + queryMap.put("filter_completes", filterCompletes); + } + PageResponse pageResponse = infoService.compactionMetrics(queryMap); + ImmutableList vos = Lists.immutable.ofAll(pageResponse.getData()).collect(CompactionMetricsVO::new); + return responseCrudData(vos, pageResponse.getTotal()); + } } diff --git a/service-web/src/main/java/com/lanyuanxiaoyao/service/web/entity/CompactionMetricsVO.java b/service-web/src/main/java/com/lanyuanxiaoyao/service/web/entity/CompactionMetricsVO.java new file mode 100644 index 0000000..78e4fb4 --- /dev/null +++ b/service-web/src/main/java/com/lanyuanxiaoyao/service/web/entity/CompactionMetricsVO.java @@ -0,0 +1,84 @@ +package com.lanyuanxiaoyao.service.web.entity; + +import cn.hutool.core.util.ObjectUtil; +import com.lanyuanxiaoyao.service.configuration.entity.info.CompactionMetrics; +import com.lanyuanxiaoyao.service.web.utils.DatetimeUtil; +import java.time.Instant; + +/** + * @author lanyuanxiaoyao + * @date 2023-06-14 + */ +public class CompactionMetricsVO { + private final String flinkJobId; + private final String alias; + private final String applicationId; + private final String cluster; + private final String compactionPlanInstant; + private final Boolean complete; + private final Long startedTime; + private final Long finishedTime; + + + private String startedTimeFromNow; + private String finishedTimeFromNow; + + public CompactionMetricsVO(CompactionMetrics metrics) { + this.flinkJobId = metrics.getFlinkJobId().toString(); + this.alias = metrics.getAlias(); + this.applicationId = metrics.getApplicationId(); + this.cluster = metrics.getCluster(); + this.compactionPlanInstant = metrics.getCompactionPlanInstant(); + this.complete = metrics.getComplete(); + this.startedTime = metrics.getStartedTime(); + this.finishedTime = metrics.getFinishedTime(); + + long now = Instant.now().toEpochMilli(); + if (ObjectUtil.isNotNull(metrics.getStartedTime()) && metrics.getStartedTime() != 0) { + startedTimeFromNow = DatetimeUtil.fromNow(now, metrics.getStartedTime()); + } + if (ObjectUtil.isNotNull(metrics.getFinishedTime()) && metrics.getFinishedTime() != 0) { + finishedTimeFromNow = DatetimeUtil.fromNow(now, metrics.getFinishedTime()); + } + } + + public String getFlinkJobId() { + return flinkJobId; + } + + public String getAlias() { + return alias; + } + + public String getApplicationId() { + return applicationId; + } + + public String getCluster() { + return cluster; + } + + public String getCompactionPlanInstant() { + return compactionPlanInstant; + } + + public Boolean getComplete() { + return complete; + } + + public Long getStartedTime() { + return startedTime; + } + + public Long getFinishedTime() { + return finishedTime; + } + + public String getStartedTimeFromNow() { + return startedTimeFromNow; + } + + public String getFinishedTimeFromNow() { + return finishedTimeFromNow; + } +} diff --git a/service-web/src/test/java/RandomColorTest.java b/service-web/src/test/java/RandomColorTest.java new file mode 100644 index 0000000..ef642ce --- /dev/null +++ b/service-web/src/test/java/RandomColorTest.java @@ -0,0 +1,14 @@ +import cn.hutool.core.util.RandomUtil; +import java.awt.Color; + +/** + * 随机颜色 + * + * @author lanyuanxiaoyao + * @date 2023-06-14 + */ +public class RandomColorTest { + public static void main(String[] args) { + System.out.println(Integer.toHexString(RandomUtil.randomInt(256))); + } +} diff --git a/test/test.http b/test/test.http index 3932f9a..0c7e2a5 100644 --- a/test/test.http +++ b/test/test.http @@ -36,3 +36,9 @@ Content-Type: application/json "251351dc-36db-4fe3-97d2-9667d7a89559", "2b7f29f6-97b5-4363-98a7-a28eaca7d930" ] + +### Info +GET http://{{username}}:{{password}}@132.122.116.146:25638/info/compaction_metrics?flink_job_id=1542097996099055616&alias=acct_acct_item_fs&filter_completes=false + +### Info +GET http://{{username}}:{{password}}@132.122.116.150:16883/table/list_compaction_metrics?search_flink_job_id=1542097996099055616&search_alias=acct_acct_item_fs \ No newline at end of file diff --git a/web/components/common.js b/web/components/common.js index 29a0997..6c627a8 100644 --- a/web/components/common.js +++ b/web/components/common.js @@ -751,6 +751,87 @@ function tableMetaDialog() { actionType: 'dialog', dialog: simpleYarnDialog('b1', '压缩详情', 'compactionJobName') }, + { + label: '历史压缩', + type: 'action', + icon: 'fa fa-list', + actionType: 'dialog', + dialog: { + title: 'Hudi 表时间线', + actions: [], + size: 'lg', + body: { + type: 'crud', + api: { + method: 'get', + url: '${base}/table/list_compaction_metrics', + data: { + page: '${page|default:undefined}', + count: '${perPage|default:undefined}', + order: '${orderBy|default:update_time}', + direction: '${orderDir|default:DESC}', + search_flink_job_id: '${flinkJobId|default:undefined}', + search_alias: '${tableMeta.alias|default:undefined}', + filter_completes: '${complete|default:undefined}', + }, + defaultParams: { + filter_type: 'active', + }, + }, + ...crudCommonOptions(), + perPage: 15, + headerToolbar: [ + "reload", + paginationCommonOptions(), + ], + footerToolbar: [ + paginationCommonOptions(), + ], + columns: [ + { + name: 'compactionPlanInstant', + label: '压缩时间点', + width: 160, + ...copyField('compactionPlanInstant') + }, + { + name: 'cluster', + label: '集群', + width: 65, + align: 'center', + type: 'tpl', + tpl: '${cluster}' + }, + { + name: 'applicationId', + label: '应用', + ...copyField('applicationId') + }, + { + name: 'complete', + label: '状态', + ...mappingField('complete', compactionMetricsStateMapping), + filterable: filterableField(compactionMetricsStateMapping, false), + }, + { + name: 'startedTime', + label: '启动时间', + ...timeAndFrom('startedTime', 'startedTimeFromNow'), + sortable: true, + canAccessSuperData: false, + }, + { + name: 'finishedTime', + label: '停止时间', + ...timeAndFrom('finishedTime', 'finishedTimeFromNow'), + sortable: true, + align: 'center', + canAccessSuperData: false, + }, + ], + } + } + }, { type: 'button', label: '时间线', @@ -1416,6 +1497,11 @@ let versionUpdateStateMapping = [ mappingItem('未跨天', 'false', 'label-danger'), ] +let compactionMetricsStateMapping = [ + mappingItem('成功', 'true', 'label-success'), + mappingItem('失败', 'false', 'label-danger'), +] + function mappingField(field, mapping) { let mapData = { '*': `\${${field}}`,