feat(web): 增加指标采集进度显示

This commit is contained in:
v-zhangjc9
2024-10-14 10:51:10 +08:00
parent 7efd9129c2
commit fb79468eee
11 changed files with 417 additions and 197 deletions

View File

@@ -0,0 +1,55 @@
package com.lanyuanxiaoyao.service.configuration.entity.monitor;
/**
* 指标运行进度
*
* @author lanyuanxiaoyao
* @date 2024-10-14
*/
public class MetricsProgress {
private String name;
private Boolean running;
private Double progress;
public MetricsProgress() {
}
public MetricsProgress(String name, Boolean running, Double progress) {
this.name = name;
this.running = running;
this.progress = progress;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public Double getProgress() {
return progress;
}
public void setProgress(Double progress) {
this.progress = progress;
}
public Boolean getRunning() {
return running;
}
public void setRunning(Boolean running) {
this.running = running;
}
@Override
public String toString() {
return "MetricsProgress{" +
"name='" + name + '\'' +
", running=" + running +
", progress=" + progress +
'}';
}
}

View File

@@ -0,0 +1,18 @@
package com.lanyuanxiaoyao.service.forest.service;
import com.dtflys.forest.annotation.BaseRequest;
import com.dtflys.forest.annotation.Get;
import com.lanyuanxiaoyao.service.configuration.entity.monitor.MetricsProgress;
import org.eclipse.collections.api.list.ImmutableList;
/**
* 监控指标查询
*
* @author lanyuanxiaoyao
* @date 2024-10-14
*/
@BaseRequest(baseURL = "http://service-monitor")
public interface MonitorService {
@Get("/metrics_control/progress")
ImmutableList<MetricsProgress> progress();
}

View File

@@ -0,0 +1,40 @@
package com.lanyuanxiaoyao.service.monitor.controller;
import com.lanyuanxiaoyao.service.configuration.entity.monitor.MetricsProgress;
import com.lanyuanxiaoyao.service.monitor.metric.Metrics;
import java.util.Map;
import org.eclipse.collections.api.factory.Lists;
import org.eclipse.collections.api.list.ImmutableList;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.context.ApplicationContext;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
/**
* 操作进度
*
* @author lanyuanxiaoyao
* @date 2024-10-14
*/
@RestController
@RequestMapping("metrics_control")
public class MetricsController {
private static final Logger logger = LoggerFactory.getLogger(MetricsController.class);
private final ApplicationContext context;
public MetricsController(ApplicationContext context) {
this.context = context;
}
@GetMapping("progress")
public ImmutableList<MetricsProgress> progress() {
Map<String, Metrics> metricsMap = context.getBeansOfType(Metrics.class);
return Lists.immutable.ofAll(metricsMap.entrySet())
.toImmutableSortedList(Map.Entry.comparingByKey())
.collect(Map.Entry::getValue)
.collect(metrics -> new MetricsProgress(metrics.name(), metrics.running(), metrics.progress()));
}
}

View File

@@ -1,94 +0,0 @@
package com.lanyuanxiaoyao.service.monitor.metric;
import cn.hutool.core.util.StrUtil;
import com.lanyuanxiaoyao.service.common.Constants;
import com.lanyuanxiaoyao.service.configuration.ExecutorProvider;
import com.lanyuanxiaoyao.service.forest.service.HudiService;
import com.lanyuanxiaoyao.service.forest.service.InfoService;
import io.micrometer.core.instrument.MeterRegistry;
import io.micrometer.core.instrument.Tag;
import java.util.concurrent.atomic.AtomicLong;
import org.eclipse.collections.api.factory.Lists;
import org.eclipse.collections.api.factory.Maps;
import org.eclipse.collections.api.map.MutableMap;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Service;
/**
* Hudi表相关指标
*
* @author lanyuanxiaoyao
* @date 2024-03-05
*/
@SuppressWarnings("SpringJavaInjectionPointsAutowiringInspection")
@Service
public class HudiMetrics extends Metrics {
private static final Logger logger = LoggerFactory.getLogger(HudiMetrics.class);
private final MeterRegistry registry;
private final InfoService infoService;
private final HudiService hudiService;
private final MutableMap<String, AtomicLong> fileCountCacheMap;
private final MutableMap<String, AtomicLong> timelineFileCountCacheMap;
public HudiMetrics(MeterRegistry registry, InfoService infoService, HudiService hudiService) {
this.registry = registry;
this.infoService = infoService;
this.hudiService = hudiService;
fileCountCacheMap = Maps.mutable.empty();
timelineFileCountCacheMap = Maps.mutable.empty();
}
@Scheduled(cron = "0 30 * * * ?")
@Override
public void update() {
infoService.tableMetaList()
.asParallel(ExecutorProvider.EXECUTORS_10, 1)
.reject(meta -> StrUtil.isBlank(meta.getPulsarAddress()))
.forEach(meta -> {
try {
AtomicLong filecountCache = fileCountCacheMap.getIfAbsentPut(
meta.getAlias(),
registry.gauge(
Constants.METRICS_HUDI_TABLE_FILE_COUNT,
Lists.immutable.of(
Tag.of(Constants.METRICS_LABEL_FLINK_JOB_ID, meta.getJob().getId().toString()),
Tag.of(Constants.METRICS_LABEL_ALIAS, meta.getAlias()),
Tag.of(Constants.METRICS_LABEL_SCHEMA, meta.getSchema()),
Tag.of(Constants.METRICS_LABEL_TABLE, meta.getTable())
),
new AtomicLong(0)
)
);
AtomicLong timelineFileCountCache = timelineFileCountCacheMap.getIfAbsentPut(
meta.getAlias(),
registry.gauge(
Constants.METRICS_HUDI_TABLE_TIMELINE_FILE_COUNT,
Lists.immutable.of(
Tag.of(Constants.METRICS_LABEL_FLINK_JOB_ID, meta.getJob().getId().toString()),
Tag.of(Constants.METRICS_LABEL_ALIAS, meta.getAlias()),
Tag.of(Constants.METRICS_LABEL_SCHEMA, meta.getSchema()),
Tag.of(Constants.METRICS_LABEL_TABLE, meta.getTable())
),
new AtomicLong(0)
)
);
String hdfs = meta.getHudi().getTargetHdfsPath();
if (hudiService.existsHudiTable(hdfs)) {
Long count = hudiService.fileCount(hdfs);
filecountCache.set(count);
String timelineHdfs = hdfs + "/.hoodie";
timelineFileCountCache.set(hudiService.fileCount(timelineHdfs));
}
} catch (Exception exception) {
logger.warn("Get file count fail for {}", meta.getAlias(), exception);
}
});
}
}

View File

@@ -0,0 +1,111 @@
package com.lanyuanxiaoyao.service.monitor.metric;
import cn.hutool.core.util.StrUtil;
import com.lanyuanxiaoyao.service.common.Constants;
import com.lanyuanxiaoyao.service.common.entity.TableMeta;
import com.lanyuanxiaoyao.service.configuration.ExecutorProvider;
import com.lanyuanxiaoyao.service.forest.service.HudiService;
import com.lanyuanxiaoyao.service.forest.service.InfoService;
import io.micrometer.core.instrument.MeterRegistry;
import io.micrometer.core.instrument.Tag;
import java.util.concurrent.atomic.AtomicLong;
import org.eclipse.collections.api.factory.Lists;
import org.eclipse.collections.api.factory.Maps;
import org.eclipse.collections.api.list.ImmutableList;
import org.eclipse.collections.api.map.MutableMap;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Service;
import static com.lanyuanxiaoyao.service.common.Constants.MINUTE;
/**
* Hudi表相关指标
*
* @author lanyuanxiaoyao
* @date 2024-03-05
*/
@SuppressWarnings("SpringJavaInjectionPointsAutowiringInspection")
@Service
public class HudiTableFilesCountMetrics extends Metrics {
private static final Logger logger = LoggerFactory.getLogger(HudiTableFilesCountMetrics.class);
private final MeterRegistry registry;
private final InfoService infoService;
private final HudiService hudiService;
private final MutableMap<String, AtomicLong> fileCountCacheMap;
private final MutableMap<String, AtomicLong> timelineFileCountCacheMap;
public HudiTableFilesCountMetrics(MeterRegistry registry, InfoService infoService, HudiService hudiService) {
this.registry = registry;
this.infoService = infoService;
this.hudiService = hudiService;
fileCountCacheMap = Maps.mutable.empty();
timelineFileCountCacheMap = Maps.mutable.empty();
}
@Override
public String name() {
return "Hudi表文件数量监控";
}
@Scheduled(fixedDelay = 30 * MINUTE, initialDelay = MINUTE)
@Override
public void update() {
try {
start();
ImmutableList<TableMeta> metas = infoService.tableMetaList();
setTotal(metas.size());
metas
.asParallel(ExecutorProvider.EXECUTORS_2, 1)
.reject(meta -> StrUtil.isBlank(meta.getPulsarAddress()))
.forEach(meta -> {
try {
AtomicLong filecountCache = fileCountCacheMap.getIfAbsentPut(
meta.getAlias(),
registry.gauge(
Constants.METRICS_HUDI_TABLE_FILE_COUNT,
Lists.immutable.of(
Tag.of(Constants.METRICS_LABEL_FLINK_JOB_ID, meta.getJob().getId().toString()),
Tag.of(Constants.METRICS_LABEL_ALIAS, meta.getAlias()),
Tag.of(Constants.METRICS_LABEL_SCHEMA, meta.getSchema()),
Tag.of(Constants.METRICS_LABEL_TABLE, meta.getTable())
),
new AtomicLong(0)
)
);
AtomicLong timelineFileCountCache = timelineFileCountCacheMap.getIfAbsentPut(
meta.getAlias(),
registry.gauge(
Constants.METRICS_HUDI_TABLE_TIMELINE_FILE_COUNT,
Lists.immutable.of(
Tag.of(Constants.METRICS_LABEL_FLINK_JOB_ID, meta.getJob().getId().toString()),
Tag.of(Constants.METRICS_LABEL_ALIAS, meta.getAlias()),
Tag.of(Constants.METRICS_LABEL_SCHEMA, meta.getSchema()),
Tag.of(Constants.METRICS_LABEL_TABLE, meta.getTable())
),
new AtomicLong(0)
)
);
String hdfs = meta.getHudi().getTargetHdfsPath();
if (hudiService.existsHudiTable(hdfs)) {
Long count = hudiService.fileCount(hdfs);
filecountCache.set(count);
String timelineHdfs = hdfs + "/.hoodie";
timelineFileCountCache.set(hudiService.fileCount(timelineHdfs));
}
} catch (Exception exception) {
logger.warn("Get file count fail for {}", meta.getAlias(), exception);
}
finished();
});
} finally {
reset();
}
}
}

View File

@@ -1,9 +1,56 @@
package com.lanyuanxiaoyao.service.monitor.metric;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
/**
* @author lanyuanxiaoyao
* @date 2024-03-05
*/
public abstract class Metrics {
abstract void update();
private final AtomicBoolean running = new AtomicBoolean(false);
private final AtomicLong finished = new AtomicLong(0);
private final AtomicLong total = new AtomicLong(0);
public abstract String name();
public abstract void update();
public boolean running() {
return running.get();
}
public double progress() {
if (total.get() == 0) {
return 0;
} else {
return finished.get() * 1.0 / total.get();
}
}
protected void start() {
running.set(true);
}
protected void stop() {
running.set(false);
}
protected void setTotal(Long total) {
this.total.set(total);
}
protected void setTotal(Integer total) {
this.total.set(total);
}
protected void finished() {
finished.incrementAndGet();
}
protected void reset() {
stop();
setTotal(0);
finished.set(0);
}
}

View File

@@ -0,0 +1,97 @@
package com.lanyuanxiaoyao.service.monitor.metric;
import cn.hutool.core.util.StrUtil;
import com.lanyuanxiaoyao.service.common.Constants;
import com.lanyuanxiaoyao.service.common.entity.TableMeta;
import com.lanyuanxiaoyao.service.common.utils.NameHelper;
import com.lanyuanxiaoyao.service.configuration.ExecutorProvider;
import com.lanyuanxiaoyao.service.configuration.HudiServiceProperties;
import com.lanyuanxiaoyao.service.forest.service.InfoService;
import com.lanyuanxiaoyao.service.forest.service.PulsarService;
import io.micrometer.core.instrument.MeterRegistry;
import io.micrometer.core.instrument.Tag;
import java.util.concurrent.atomic.AtomicLong;
import org.eclipse.collections.api.factory.Lists;
import org.eclipse.collections.api.factory.Maps;
import org.eclipse.collections.api.list.ImmutableList;
import org.eclipse.collections.api.map.MutableMap;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Service;
import static com.lanyuanxiaoyao.service.common.Constants.MINUTE;
/**
* Pulsar
*
* @author lanyuanxiaoyao
* @date 2024-03-05
*/
@SuppressWarnings("SpringJavaInjectionPointsAutowiringInspection")
@Service
public class PulsarBacklogMetrics extends Metrics {
private static final Logger logger = LoggerFactory.getLogger(PulsarBacklogMetrics.class);
private final MeterRegistry registry;
private final InfoService infoService;
private final PulsarService pulsarService;
private final HudiServiceProperties hudiServiceProperties;
private final MutableMap<String, AtomicLong> backlogMap;
public PulsarBacklogMetrics(MeterRegistry registry, InfoService infoService, PulsarService pulsarService, HudiServiceProperties hudiServiceProperties) {
this.registry = registry;
this.infoService = infoService;
this.pulsarService = pulsarService;
this.hudiServiceProperties = hudiServiceProperties;
backlogMap = Maps.mutable.empty();
}
@Override
public String name() {
return "Pulsar backlog监控";
}
@Scheduled(fixedDelay = 30 * MINUTE, initialDelay = MINUTE)
@Override
public void update() {
try {
start();
ImmutableList<TableMeta> metas = infoService.tableMetaList();
setTotal(metas.size());
metas
.asParallel(ExecutorProvider.EXECUTORS_2, 1)
.reject(meta -> StrUtil.isBlank(meta.getPulsarAddress()))
.forEach(meta -> {
try {
AtomicLong backlogCache = backlogMap.getIfAbsentPut(
meta.getAlias(),
registry.gauge(
Constants.METRICS_PULSAR_BACKLOG,
Lists.immutable.of(
Tag.of(Constants.METRICS_LABEL_FLINK_JOB_ID, meta.getJob().getId().toString()),
Tag.of(Constants.METRICS_LABEL_ALIAS, meta.getAlias()),
Tag.of(Constants.METRICS_LABEL_SCHEMA, meta.getSchema()),
Tag.of(Constants.METRICS_LABEL_TABLE, meta.getTable())
),
new AtomicLong(0)
)
);
String name = pulsarService.name(meta.getPulsarAddress());
if (StrUtil.isNotBlank(name)) {
Long backlog = pulsarService.backlog(name, meta.getTopic(), NameHelper.pulsarSubscriptionName(meta.getJob().getId(), meta.getAlias(), hudiServiceProperties.getSignature()));
backlogCache.set(backlog);
infoService.savePulsarBacklog(meta.getJob().getId(), meta.getAlias(), backlog);
}
} catch (Exception exception) {
logger.warn("Update pulsar backlog fail for {}", meta.getAlias(), exception);
}
finished();
});
} finally {
reset();
}
}
}

View File

@@ -1,82 +0,0 @@
package com.lanyuanxiaoyao.service.monitor.metric;
import cn.hutool.core.util.StrUtil;
import com.lanyuanxiaoyao.service.common.Constants;
import com.lanyuanxiaoyao.service.common.utils.NameHelper;
import com.lanyuanxiaoyao.service.configuration.ExecutorProvider;
import com.lanyuanxiaoyao.service.configuration.HudiServiceProperties;
import com.lanyuanxiaoyao.service.forest.service.InfoService;
import com.lanyuanxiaoyao.service.forest.service.PulsarService;
import io.micrometer.core.instrument.MeterRegistry;
import io.micrometer.core.instrument.Tag;
import java.util.concurrent.atomic.AtomicLong;
import org.eclipse.collections.api.factory.Lists;
import org.eclipse.collections.api.factory.Maps;
import org.eclipse.collections.api.map.MutableMap;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Service;
import static com.lanyuanxiaoyao.service.common.Constants.MINUTE;
/**
* Pulsar
*
* @author lanyuanxiaoyao
* @date 2024-03-05
*/
@SuppressWarnings("SpringJavaInjectionPointsAutowiringInspection")
@Service
public class PulsarMetrics extends Metrics {
private static final Logger logger = LoggerFactory.getLogger(PulsarMetrics.class);
private final MeterRegistry registry;
private final InfoService infoService;
private final PulsarService pulsarService;
private final HudiServiceProperties hudiServiceProperties;
private final MutableMap<String, AtomicLong> backlogMap;
public PulsarMetrics(MeterRegistry registry, InfoService infoService, PulsarService pulsarService, HudiServiceProperties hudiServiceProperties) {
this.registry = registry;
this.infoService = infoService;
this.pulsarService = pulsarService;
this.hudiServiceProperties = hudiServiceProperties;
backlogMap = Maps.mutable.empty();
}
@Scheduled(fixedDelay = 30 * MINUTE, initialDelay = MINUTE)
@Override
void update() {
infoService.tableMetaList()
.asParallel(ExecutorProvider.EXECUTORS_2, 1)
.reject(meta -> StrUtil.isBlank(meta.getPulsarAddress()))
.forEach(meta -> {
try {
AtomicLong backlogCache = backlogMap.getIfAbsentPut(
meta.getAlias(),
registry.gauge(
Constants.METRICS_PULSAR_BACKLOG,
Lists.immutable.of(
Tag.of(Constants.METRICS_LABEL_FLINK_JOB_ID, meta.getJob().getId().toString()),
Tag.of(Constants.METRICS_LABEL_ALIAS, meta.getAlias()),
Tag.of(Constants.METRICS_LABEL_SCHEMA, meta.getSchema()),
Tag.of(Constants.METRICS_LABEL_TABLE, meta.getTable())
),
new AtomicLong(0)
)
);
String name = pulsarService.name(meta.getPulsarAddress());
if (StrUtil.isNotBlank(name)) {
Long backlog = pulsarService.backlog(name, meta.getTopic(), NameHelper.pulsarSubscriptionName(meta.getJob().getId(), meta.getAlias(), hudiServiceProperties.getSignature()));
backlogCache.set(backlog);
infoService.savePulsarBacklog(meta.getJob().getId(), meta.getAlias(), backlog);
}
} catch (Exception exception) {
logger.warn("Update pulsar backlog fail for {}", meta.getAlias(), exception);
}
});
}
}

View File

@@ -6,10 +6,16 @@ import com.lanyuanxiaoyao.service.common.entity.FlinkJob;
import com.lanyuanxiaoyao.service.common.utils.NameHelper;
import com.lanyuanxiaoyao.service.configuration.ExecutorProvider;
import com.lanyuanxiaoyao.service.configuration.entity.info.JobIdAndAlias;
import com.lanyuanxiaoyao.service.configuration.entity.monitor.MetricsProgress;
import com.lanyuanxiaoyao.service.configuration.entity.yarn.YarnApplication;
import com.lanyuanxiaoyao.service.configuration.entity.yarn.YarnRootQueue;
import com.lanyuanxiaoyao.service.configuration.entity.zookeeper.ZookeeperNode;
import com.lanyuanxiaoyao.service.forest.service.*;
import com.lanyuanxiaoyao.service.forest.service.InfoService;
import com.lanyuanxiaoyao.service.forest.service.MonitorService;
import com.lanyuanxiaoyao.service.forest.service.QueueService;
import com.lanyuanxiaoyao.service.forest.service.ScheduleService;
import com.lanyuanxiaoyao.service.forest.service.YarnService;
import com.lanyuanxiaoyao.service.forest.service.ZookeeperService;
import com.lanyuanxiaoyao.service.web.controller.base.AmisCrudResponse;
import com.lanyuanxiaoyao.service.web.controller.base.AmisMapResponse;
import com.lanyuanxiaoyao.service.web.controller.base.AmisResponse;
@@ -48,14 +54,16 @@ public class OverviewController extends BaseController {
private final QueueService queueService;
private final ScheduleService scheduleService;
private final ZookeeperService zookeeperService;
private final MonitorService monitorService;
@SuppressWarnings("SpringJavaInjectionPointsAutowiringInspection")
public OverviewController(InfoService infoService, YarnService yarnService, QueueService queueService, ScheduleService scheduleService, ZookeeperService zookeeperService) {
public OverviewController(InfoService infoService, YarnService yarnService, QueueService queueService, ScheduleService scheduleService, ZookeeperService zookeeperService, MonitorService monitorService) {
this.infoService = infoService;
this.yarnService = yarnService;
this.queueService = queueService;
this.scheduleService = scheduleService;
this.zookeeperService = zookeeperService;
this.monitorService = monitorService;
}
@GetMapping("")
@@ -207,4 +215,12 @@ public class OverviewController extends BaseController {
.setData("unRunningTable", unRunningTable.size())
.setData("unRunningTableList", unRunningTable);
}
@GetMapping("monitor_progress")
public AmisMapResponse monitorProgress() {
return AmisCrudResponse.responseCrudData(
monitorService.progress()
.collect(p -> new MetricsProgress(p.getName(), p.getRunning(), p.getProgress()))
);
}
}

View File

@@ -1,6 +1,6 @@
const commonInfo = {
// baseUrl: 'http://132.126.207.130:35690/hudi_services/service_web',
baseUrl: '/hudi_services/service_web',
baseUrl: 'http://132.126.207.130:35690/hudi_services/service_web',
// baseUrl: '/hudi_services/service_web',
clusters: {
// hudi同步运行集群和yarn队列名称
sync: {

View File

@@ -408,26 +408,38 @@ function overviewTab() {
}
]
},
/*{type: 'divider'},
{type: 'divider'},
{
type: 'service',
api: '${base}/overview/schedule_times',
interval: 60000,
silentPolling: true,
body: [
'调度时间点',
type: 'crud',
title: '监控指标运行进度',
api: `\${base}/overview/monitor_progress`,
...crudCommonOptions(),
interval: 2000,
loadDataOnce: true,
columns: [
{
type: 'each',
name: 'items',
className: 'grid',
items: {
type: 'tag',
color: '${color}',
label: '${DATETOSTR(TIMESTAMP(time, \'x\'), \'HH:mm:ss\')}'
name: 'name',
label: '名称',
width: 120,
},
{
name: 'running',
label: '状态',
type: 'mapping',
width: 50,
map: {
'true': '运行中',
'false': '未运行',
}
}
},
{
label: '进度',
type: 'progress',
value: '${ROUND(progress * 100)}',
map: 'bg-primary',
},
]
}*/
}
]
}
}