feat(web): 增加指标采集进度显示
This commit is contained in:
@@ -0,0 +1,55 @@
|
|||||||
|
package com.lanyuanxiaoyao.service.configuration.entity.monitor;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 指标运行进度
|
||||||
|
*
|
||||||
|
* @author lanyuanxiaoyao
|
||||||
|
* @date 2024-10-14
|
||||||
|
*/
|
||||||
|
public class MetricsProgress {
|
||||||
|
private String name;
|
||||||
|
private Boolean running;
|
||||||
|
private Double progress;
|
||||||
|
|
||||||
|
public MetricsProgress() {
|
||||||
|
}
|
||||||
|
|
||||||
|
public MetricsProgress(String name, Boolean running, Double progress) {
|
||||||
|
this.name = name;
|
||||||
|
this.running = running;
|
||||||
|
this.progress = progress;
|
||||||
|
}
|
||||||
|
|
||||||
|
public String getName() {
|
||||||
|
return name;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setName(String name) {
|
||||||
|
this.name = name;
|
||||||
|
}
|
||||||
|
|
||||||
|
public Double getProgress() {
|
||||||
|
return progress;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setProgress(Double progress) {
|
||||||
|
this.progress = progress;
|
||||||
|
}
|
||||||
|
|
||||||
|
public Boolean getRunning() {
|
||||||
|
return running;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setRunning(Boolean running) {
|
||||||
|
this.running = running;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public String toString() {
|
||||||
|
return "MetricsProgress{" +
|
||||||
|
"name='" + name + '\'' +
|
||||||
|
", running=" + running +
|
||||||
|
", progress=" + progress +
|
||||||
|
'}';
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -0,0 +1,18 @@
|
|||||||
|
package com.lanyuanxiaoyao.service.forest.service;
|
||||||
|
|
||||||
|
import com.dtflys.forest.annotation.BaseRequest;
|
||||||
|
import com.dtflys.forest.annotation.Get;
|
||||||
|
import com.lanyuanxiaoyao.service.configuration.entity.monitor.MetricsProgress;
|
||||||
|
import org.eclipse.collections.api.list.ImmutableList;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 监控指标查询
|
||||||
|
*
|
||||||
|
* @author lanyuanxiaoyao
|
||||||
|
* @date 2024-10-14
|
||||||
|
*/
|
||||||
|
@BaseRequest(baseURL = "http://service-monitor")
|
||||||
|
public interface MonitorService {
|
||||||
|
@Get("/metrics_control/progress")
|
||||||
|
ImmutableList<MetricsProgress> progress();
|
||||||
|
}
|
||||||
@@ -0,0 +1,40 @@
|
|||||||
|
package com.lanyuanxiaoyao.service.monitor.controller;
|
||||||
|
|
||||||
|
import com.lanyuanxiaoyao.service.configuration.entity.monitor.MetricsProgress;
|
||||||
|
import com.lanyuanxiaoyao.service.monitor.metric.Metrics;
|
||||||
|
import java.util.Map;
|
||||||
|
import org.eclipse.collections.api.factory.Lists;
|
||||||
|
import org.eclipse.collections.api.list.ImmutableList;
|
||||||
|
import org.slf4j.Logger;
|
||||||
|
import org.slf4j.LoggerFactory;
|
||||||
|
import org.springframework.context.ApplicationContext;
|
||||||
|
import org.springframework.web.bind.annotation.GetMapping;
|
||||||
|
import org.springframework.web.bind.annotation.RequestMapping;
|
||||||
|
import org.springframework.web.bind.annotation.RestController;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 操作进度
|
||||||
|
*
|
||||||
|
* @author lanyuanxiaoyao
|
||||||
|
* @date 2024-10-14
|
||||||
|
*/
|
||||||
|
@RestController
|
||||||
|
@RequestMapping("metrics_control")
|
||||||
|
public class MetricsController {
|
||||||
|
private static final Logger logger = LoggerFactory.getLogger(MetricsController.class);
|
||||||
|
|
||||||
|
private final ApplicationContext context;
|
||||||
|
|
||||||
|
public MetricsController(ApplicationContext context) {
|
||||||
|
this.context = context;
|
||||||
|
}
|
||||||
|
|
||||||
|
@GetMapping("progress")
|
||||||
|
public ImmutableList<MetricsProgress> progress() {
|
||||||
|
Map<String, Metrics> metricsMap = context.getBeansOfType(Metrics.class);
|
||||||
|
return Lists.immutable.ofAll(metricsMap.entrySet())
|
||||||
|
.toImmutableSortedList(Map.Entry.comparingByKey())
|
||||||
|
.collect(Map.Entry::getValue)
|
||||||
|
.collect(metrics -> new MetricsProgress(metrics.name(), metrics.running(), metrics.progress()));
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -1,94 +0,0 @@
|
|||||||
package com.lanyuanxiaoyao.service.monitor.metric;
|
|
||||||
|
|
||||||
import cn.hutool.core.util.StrUtil;
|
|
||||||
import com.lanyuanxiaoyao.service.common.Constants;
|
|
||||||
import com.lanyuanxiaoyao.service.configuration.ExecutorProvider;
|
|
||||||
import com.lanyuanxiaoyao.service.forest.service.HudiService;
|
|
||||||
import com.lanyuanxiaoyao.service.forest.service.InfoService;
|
|
||||||
import io.micrometer.core.instrument.MeterRegistry;
|
|
||||||
import io.micrometer.core.instrument.Tag;
|
|
||||||
import java.util.concurrent.atomic.AtomicLong;
|
|
||||||
import org.eclipse.collections.api.factory.Lists;
|
|
||||||
import org.eclipse.collections.api.factory.Maps;
|
|
||||||
import org.eclipse.collections.api.map.MutableMap;
|
|
||||||
import org.slf4j.Logger;
|
|
||||||
import org.slf4j.LoggerFactory;
|
|
||||||
import org.springframework.scheduling.annotation.Scheduled;
|
|
||||||
import org.springframework.stereotype.Service;
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Hudi表相关指标
|
|
||||||
*
|
|
||||||
* @author lanyuanxiaoyao
|
|
||||||
* @date 2024-03-05
|
|
||||||
*/
|
|
||||||
@SuppressWarnings("SpringJavaInjectionPointsAutowiringInspection")
|
|
||||||
@Service
|
|
||||||
public class HudiMetrics extends Metrics {
|
|
||||||
private static final Logger logger = LoggerFactory.getLogger(HudiMetrics.class);
|
|
||||||
|
|
||||||
private final MeterRegistry registry;
|
|
||||||
private final InfoService infoService;
|
|
||||||
private final HudiService hudiService;
|
|
||||||
|
|
||||||
private final MutableMap<String, AtomicLong> fileCountCacheMap;
|
|
||||||
private final MutableMap<String, AtomicLong> timelineFileCountCacheMap;
|
|
||||||
|
|
||||||
public HudiMetrics(MeterRegistry registry, InfoService infoService, HudiService hudiService) {
|
|
||||||
this.registry = registry;
|
|
||||||
this.infoService = infoService;
|
|
||||||
this.hudiService = hudiService;
|
|
||||||
|
|
||||||
fileCountCacheMap = Maps.mutable.empty();
|
|
||||||
timelineFileCountCacheMap = Maps.mutable.empty();
|
|
||||||
}
|
|
||||||
|
|
||||||
@Scheduled(cron = "0 30 * * * ?")
|
|
||||||
@Override
|
|
||||||
public void update() {
|
|
||||||
infoService.tableMetaList()
|
|
||||||
.asParallel(ExecutorProvider.EXECUTORS_10, 1)
|
|
||||||
.reject(meta -> StrUtil.isBlank(meta.getPulsarAddress()))
|
|
||||||
.forEach(meta -> {
|
|
||||||
try {
|
|
||||||
AtomicLong filecountCache = fileCountCacheMap.getIfAbsentPut(
|
|
||||||
meta.getAlias(),
|
|
||||||
registry.gauge(
|
|
||||||
Constants.METRICS_HUDI_TABLE_FILE_COUNT,
|
|
||||||
Lists.immutable.of(
|
|
||||||
Tag.of(Constants.METRICS_LABEL_FLINK_JOB_ID, meta.getJob().getId().toString()),
|
|
||||||
Tag.of(Constants.METRICS_LABEL_ALIAS, meta.getAlias()),
|
|
||||||
Tag.of(Constants.METRICS_LABEL_SCHEMA, meta.getSchema()),
|
|
||||||
Tag.of(Constants.METRICS_LABEL_TABLE, meta.getTable())
|
|
||||||
),
|
|
||||||
new AtomicLong(0)
|
|
||||||
)
|
|
||||||
);
|
|
||||||
AtomicLong timelineFileCountCache = timelineFileCountCacheMap.getIfAbsentPut(
|
|
||||||
meta.getAlias(),
|
|
||||||
registry.gauge(
|
|
||||||
Constants.METRICS_HUDI_TABLE_TIMELINE_FILE_COUNT,
|
|
||||||
Lists.immutable.of(
|
|
||||||
Tag.of(Constants.METRICS_LABEL_FLINK_JOB_ID, meta.getJob().getId().toString()),
|
|
||||||
Tag.of(Constants.METRICS_LABEL_ALIAS, meta.getAlias()),
|
|
||||||
Tag.of(Constants.METRICS_LABEL_SCHEMA, meta.getSchema()),
|
|
||||||
Tag.of(Constants.METRICS_LABEL_TABLE, meta.getTable())
|
|
||||||
),
|
|
||||||
new AtomicLong(0)
|
|
||||||
)
|
|
||||||
);
|
|
||||||
|
|
||||||
String hdfs = meta.getHudi().getTargetHdfsPath();
|
|
||||||
if (hudiService.existsHudiTable(hdfs)) {
|
|
||||||
Long count = hudiService.fileCount(hdfs);
|
|
||||||
filecountCache.set(count);
|
|
||||||
|
|
||||||
String timelineHdfs = hdfs + "/.hoodie";
|
|
||||||
timelineFileCountCache.set(hudiService.fileCount(timelineHdfs));
|
|
||||||
}
|
|
||||||
} catch (Exception exception) {
|
|
||||||
logger.warn("Get file count fail for {}", meta.getAlias(), exception);
|
|
||||||
}
|
|
||||||
});
|
|
||||||
}
|
|
||||||
}
|
|
||||||
@@ -0,0 +1,111 @@
|
|||||||
|
package com.lanyuanxiaoyao.service.monitor.metric;
|
||||||
|
|
||||||
|
import cn.hutool.core.util.StrUtil;
|
||||||
|
import com.lanyuanxiaoyao.service.common.Constants;
|
||||||
|
import com.lanyuanxiaoyao.service.common.entity.TableMeta;
|
||||||
|
import com.lanyuanxiaoyao.service.configuration.ExecutorProvider;
|
||||||
|
import com.lanyuanxiaoyao.service.forest.service.HudiService;
|
||||||
|
import com.lanyuanxiaoyao.service.forest.service.InfoService;
|
||||||
|
import io.micrometer.core.instrument.MeterRegistry;
|
||||||
|
import io.micrometer.core.instrument.Tag;
|
||||||
|
import java.util.concurrent.atomic.AtomicLong;
|
||||||
|
import org.eclipse.collections.api.factory.Lists;
|
||||||
|
import org.eclipse.collections.api.factory.Maps;
|
||||||
|
import org.eclipse.collections.api.list.ImmutableList;
|
||||||
|
import org.eclipse.collections.api.map.MutableMap;
|
||||||
|
import org.slf4j.Logger;
|
||||||
|
import org.slf4j.LoggerFactory;
|
||||||
|
import org.springframework.scheduling.annotation.Scheduled;
|
||||||
|
import org.springframework.stereotype.Service;
|
||||||
|
|
||||||
|
import static com.lanyuanxiaoyao.service.common.Constants.MINUTE;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Hudi表相关指标
|
||||||
|
*
|
||||||
|
* @author lanyuanxiaoyao
|
||||||
|
* @date 2024-03-05
|
||||||
|
*/
|
||||||
|
@SuppressWarnings("SpringJavaInjectionPointsAutowiringInspection")
|
||||||
|
@Service
|
||||||
|
public class HudiTableFilesCountMetrics extends Metrics {
|
||||||
|
private static final Logger logger = LoggerFactory.getLogger(HudiTableFilesCountMetrics.class);
|
||||||
|
|
||||||
|
private final MeterRegistry registry;
|
||||||
|
private final InfoService infoService;
|
||||||
|
private final HudiService hudiService;
|
||||||
|
|
||||||
|
private final MutableMap<String, AtomicLong> fileCountCacheMap;
|
||||||
|
private final MutableMap<String, AtomicLong> timelineFileCountCacheMap;
|
||||||
|
|
||||||
|
public HudiTableFilesCountMetrics(MeterRegistry registry, InfoService infoService, HudiService hudiService) {
|
||||||
|
this.registry = registry;
|
||||||
|
this.infoService = infoService;
|
||||||
|
this.hudiService = hudiService;
|
||||||
|
|
||||||
|
fileCountCacheMap = Maps.mutable.empty();
|
||||||
|
timelineFileCountCacheMap = Maps.mutable.empty();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public String name() {
|
||||||
|
return "Hudi表文件数量监控";
|
||||||
|
}
|
||||||
|
|
||||||
|
@Scheduled(fixedDelay = 30 * MINUTE, initialDelay = MINUTE)
|
||||||
|
@Override
|
||||||
|
public void update() {
|
||||||
|
try {
|
||||||
|
start();
|
||||||
|
ImmutableList<TableMeta> metas = infoService.tableMetaList();
|
||||||
|
setTotal(metas.size());
|
||||||
|
metas
|
||||||
|
.asParallel(ExecutorProvider.EXECUTORS_2, 1)
|
||||||
|
.reject(meta -> StrUtil.isBlank(meta.getPulsarAddress()))
|
||||||
|
.forEach(meta -> {
|
||||||
|
try {
|
||||||
|
AtomicLong filecountCache = fileCountCacheMap.getIfAbsentPut(
|
||||||
|
meta.getAlias(),
|
||||||
|
registry.gauge(
|
||||||
|
Constants.METRICS_HUDI_TABLE_FILE_COUNT,
|
||||||
|
Lists.immutable.of(
|
||||||
|
Tag.of(Constants.METRICS_LABEL_FLINK_JOB_ID, meta.getJob().getId().toString()),
|
||||||
|
Tag.of(Constants.METRICS_LABEL_ALIAS, meta.getAlias()),
|
||||||
|
Tag.of(Constants.METRICS_LABEL_SCHEMA, meta.getSchema()),
|
||||||
|
Tag.of(Constants.METRICS_LABEL_TABLE, meta.getTable())
|
||||||
|
),
|
||||||
|
new AtomicLong(0)
|
||||||
|
)
|
||||||
|
);
|
||||||
|
AtomicLong timelineFileCountCache = timelineFileCountCacheMap.getIfAbsentPut(
|
||||||
|
meta.getAlias(),
|
||||||
|
registry.gauge(
|
||||||
|
Constants.METRICS_HUDI_TABLE_TIMELINE_FILE_COUNT,
|
||||||
|
Lists.immutable.of(
|
||||||
|
Tag.of(Constants.METRICS_LABEL_FLINK_JOB_ID, meta.getJob().getId().toString()),
|
||||||
|
Tag.of(Constants.METRICS_LABEL_ALIAS, meta.getAlias()),
|
||||||
|
Tag.of(Constants.METRICS_LABEL_SCHEMA, meta.getSchema()),
|
||||||
|
Tag.of(Constants.METRICS_LABEL_TABLE, meta.getTable())
|
||||||
|
),
|
||||||
|
new AtomicLong(0)
|
||||||
|
)
|
||||||
|
);
|
||||||
|
|
||||||
|
String hdfs = meta.getHudi().getTargetHdfsPath();
|
||||||
|
if (hudiService.existsHudiTable(hdfs)) {
|
||||||
|
Long count = hudiService.fileCount(hdfs);
|
||||||
|
filecountCache.set(count);
|
||||||
|
|
||||||
|
String timelineHdfs = hdfs + "/.hoodie";
|
||||||
|
timelineFileCountCache.set(hudiService.fileCount(timelineHdfs));
|
||||||
|
}
|
||||||
|
} catch (Exception exception) {
|
||||||
|
logger.warn("Get file count fail for {}", meta.getAlias(), exception);
|
||||||
|
}
|
||||||
|
finished();
|
||||||
|
});
|
||||||
|
} finally {
|
||||||
|
reset();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -1,9 +1,56 @@
|
|||||||
package com.lanyuanxiaoyao.service.monitor.metric;
|
package com.lanyuanxiaoyao.service.monitor.metric;
|
||||||
|
|
||||||
|
import java.util.concurrent.atomic.AtomicBoolean;
|
||||||
|
import java.util.concurrent.atomic.AtomicLong;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @author lanyuanxiaoyao
|
* @author lanyuanxiaoyao
|
||||||
* @date 2024-03-05
|
* @date 2024-03-05
|
||||||
*/
|
*/
|
||||||
public abstract class Metrics {
|
public abstract class Metrics {
|
||||||
abstract void update();
|
private final AtomicBoolean running = new AtomicBoolean(false);
|
||||||
|
private final AtomicLong finished = new AtomicLong(0);
|
||||||
|
private final AtomicLong total = new AtomicLong(0);
|
||||||
|
|
||||||
|
public abstract String name();
|
||||||
|
|
||||||
|
public abstract void update();
|
||||||
|
|
||||||
|
public boolean running() {
|
||||||
|
return running.get();
|
||||||
|
}
|
||||||
|
|
||||||
|
public double progress() {
|
||||||
|
if (total.get() == 0) {
|
||||||
|
return 0;
|
||||||
|
} else {
|
||||||
|
return finished.get() * 1.0 / total.get();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
protected void start() {
|
||||||
|
running.set(true);
|
||||||
|
}
|
||||||
|
|
||||||
|
protected void stop() {
|
||||||
|
running.set(false);
|
||||||
|
}
|
||||||
|
|
||||||
|
protected void setTotal(Long total) {
|
||||||
|
this.total.set(total);
|
||||||
|
}
|
||||||
|
|
||||||
|
protected void setTotal(Integer total) {
|
||||||
|
this.total.set(total);
|
||||||
|
}
|
||||||
|
|
||||||
|
protected void finished() {
|
||||||
|
finished.incrementAndGet();
|
||||||
|
}
|
||||||
|
|
||||||
|
protected void reset() {
|
||||||
|
stop();
|
||||||
|
setTotal(0);
|
||||||
|
finished.set(0);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -0,0 +1,97 @@
|
|||||||
|
package com.lanyuanxiaoyao.service.monitor.metric;
|
||||||
|
|
||||||
|
import cn.hutool.core.util.StrUtil;
|
||||||
|
import com.lanyuanxiaoyao.service.common.Constants;
|
||||||
|
import com.lanyuanxiaoyao.service.common.entity.TableMeta;
|
||||||
|
import com.lanyuanxiaoyao.service.common.utils.NameHelper;
|
||||||
|
import com.lanyuanxiaoyao.service.configuration.ExecutorProvider;
|
||||||
|
import com.lanyuanxiaoyao.service.configuration.HudiServiceProperties;
|
||||||
|
import com.lanyuanxiaoyao.service.forest.service.InfoService;
|
||||||
|
import com.lanyuanxiaoyao.service.forest.service.PulsarService;
|
||||||
|
import io.micrometer.core.instrument.MeterRegistry;
|
||||||
|
import io.micrometer.core.instrument.Tag;
|
||||||
|
import java.util.concurrent.atomic.AtomicLong;
|
||||||
|
import org.eclipse.collections.api.factory.Lists;
|
||||||
|
import org.eclipse.collections.api.factory.Maps;
|
||||||
|
import org.eclipse.collections.api.list.ImmutableList;
|
||||||
|
import org.eclipse.collections.api.map.MutableMap;
|
||||||
|
import org.slf4j.Logger;
|
||||||
|
import org.slf4j.LoggerFactory;
|
||||||
|
import org.springframework.scheduling.annotation.Scheduled;
|
||||||
|
import org.springframework.stereotype.Service;
|
||||||
|
|
||||||
|
import static com.lanyuanxiaoyao.service.common.Constants.MINUTE;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Pulsar
|
||||||
|
*
|
||||||
|
* @author lanyuanxiaoyao
|
||||||
|
* @date 2024-03-05
|
||||||
|
*/
|
||||||
|
@SuppressWarnings("SpringJavaInjectionPointsAutowiringInspection")
|
||||||
|
@Service
|
||||||
|
public class PulsarBacklogMetrics extends Metrics {
|
||||||
|
private static final Logger logger = LoggerFactory.getLogger(PulsarBacklogMetrics.class);
|
||||||
|
|
||||||
|
private final MeterRegistry registry;
|
||||||
|
private final InfoService infoService;
|
||||||
|
private final PulsarService pulsarService;
|
||||||
|
private final HudiServiceProperties hudiServiceProperties;
|
||||||
|
|
||||||
|
private final MutableMap<String, AtomicLong> backlogMap;
|
||||||
|
|
||||||
|
public PulsarBacklogMetrics(MeterRegistry registry, InfoService infoService, PulsarService pulsarService, HudiServiceProperties hudiServiceProperties) {
|
||||||
|
this.registry = registry;
|
||||||
|
this.infoService = infoService;
|
||||||
|
this.pulsarService = pulsarService;
|
||||||
|
this.hudiServiceProperties = hudiServiceProperties;
|
||||||
|
|
||||||
|
backlogMap = Maps.mutable.empty();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public String name() {
|
||||||
|
return "Pulsar backlog监控";
|
||||||
|
}
|
||||||
|
|
||||||
|
@Scheduled(fixedDelay = 30 * MINUTE, initialDelay = MINUTE)
|
||||||
|
@Override
|
||||||
|
public void update() {
|
||||||
|
try {
|
||||||
|
start();
|
||||||
|
ImmutableList<TableMeta> metas = infoService.tableMetaList();
|
||||||
|
setTotal(metas.size());
|
||||||
|
metas
|
||||||
|
.asParallel(ExecutorProvider.EXECUTORS_2, 1)
|
||||||
|
.reject(meta -> StrUtil.isBlank(meta.getPulsarAddress()))
|
||||||
|
.forEach(meta -> {
|
||||||
|
try {
|
||||||
|
AtomicLong backlogCache = backlogMap.getIfAbsentPut(
|
||||||
|
meta.getAlias(),
|
||||||
|
registry.gauge(
|
||||||
|
Constants.METRICS_PULSAR_BACKLOG,
|
||||||
|
Lists.immutable.of(
|
||||||
|
Tag.of(Constants.METRICS_LABEL_FLINK_JOB_ID, meta.getJob().getId().toString()),
|
||||||
|
Tag.of(Constants.METRICS_LABEL_ALIAS, meta.getAlias()),
|
||||||
|
Tag.of(Constants.METRICS_LABEL_SCHEMA, meta.getSchema()),
|
||||||
|
Tag.of(Constants.METRICS_LABEL_TABLE, meta.getTable())
|
||||||
|
),
|
||||||
|
new AtomicLong(0)
|
||||||
|
)
|
||||||
|
);
|
||||||
|
String name = pulsarService.name(meta.getPulsarAddress());
|
||||||
|
if (StrUtil.isNotBlank(name)) {
|
||||||
|
Long backlog = pulsarService.backlog(name, meta.getTopic(), NameHelper.pulsarSubscriptionName(meta.getJob().getId(), meta.getAlias(), hudiServiceProperties.getSignature()));
|
||||||
|
backlogCache.set(backlog);
|
||||||
|
infoService.savePulsarBacklog(meta.getJob().getId(), meta.getAlias(), backlog);
|
||||||
|
}
|
||||||
|
} catch (Exception exception) {
|
||||||
|
logger.warn("Update pulsar backlog fail for {}", meta.getAlias(), exception);
|
||||||
|
}
|
||||||
|
finished();
|
||||||
|
});
|
||||||
|
} finally {
|
||||||
|
reset();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -1,82 +0,0 @@
|
|||||||
package com.lanyuanxiaoyao.service.monitor.metric;
|
|
||||||
|
|
||||||
import cn.hutool.core.util.StrUtil;
|
|
||||||
import com.lanyuanxiaoyao.service.common.Constants;
|
|
||||||
import com.lanyuanxiaoyao.service.common.utils.NameHelper;
|
|
||||||
import com.lanyuanxiaoyao.service.configuration.ExecutorProvider;
|
|
||||||
import com.lanyuanxiaoyao.service.configuration.HudiServiceProperties;
|
|
||||||
import com.lanyuanxiaoyao.service.forest.service.InfoService;
|
|
||||||
import com.lanyuanxiaoyao.service.forest.service.PulsarService;
|
|
||||||
import io.micrometer.core.instrument.MeterRegistry;
|
|
||||||
import io.micrometer.core.instrument.Tag;
|
|
||||||
import java.util.concurrent.atomic.AtomicLong;
|
|
||||||
import org.eclipse.collections.api.factory.Lists;
|
|
||||||
import org.eclipse.collections.api.factory.Maps;
|
|
||||||
import org.eclipse.collections.api.map.MutableMap;
|
|
||||||
import org.slf4j.Logger;
|
|
||||||
import org.slf4j.LoggerFactory;
|
|
||||||
import org.springframework.scheduling.annotation.Scheduled;
|
|
||||||
import org.springframework.stereotype.Service;
|
|
||||||
|
|
||||||
import static com.lanyuanxiaoyao.service.common.Constants.MINUTE;
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Pulsar
|
|
||||||
*
|
|
||||||
* @author lanyuanxiaoyao
|
|
||||||
* @date 2024-03-05
|
|
||||||
*/
|
|
||||||
@SuppressWarnings("SpringJavaInjectionPointsAutowiringInspection")
|
|
||||||
@Service
|
|
||||||
public class PulsarMetrics extends Metrics {
|
|
||||||
private static final Logger logger = LoggerFactory.getLogger(PulsarMetrics.class);
|
|
||||||
|
|
||||||
private final MeterRegistry registry;
|
|
||||||
private final InfoService infoService;
|
|
||||||
private final PulsarService pulsarService;
|
|
||||||
private final HudiServiceProperties hudiServiceProperties;
|
|
||||||
|
|
||||||
private final MutableMap<String, AtomicLong> backlogMap;
|
|
||||||
|
|
||||||
public PulsarMetrics(MeterRegistry registry, InfoService infoService, PulsarService pulsarService, HudiServiceProperties hudiServiceProperties) {
|
|
||||||
this.registry = registry;
|
|
||||||
this.infoService = infoService;
|
|
||||||
this.pulsarService = pulsarService;
|
|
||||||
this.hudiServiceProperties = hudiServiceProperties;
|
|
||||||
|
|
||||||
backlogMap = Maps.mutable.empty();
|
|
||||||
}
|
|
||||||
|
|
||||||
@Scheduled(fixedDelay = 30 * MINUTE, initialDelay = MINUTE)
|
|
||||||
@Override
|
|
||||||
void update() {
|
|
||||||
infoService.tableMetaList()
|
|
||||||
.asParallel(ExecutorProvider.EXECUTORS_2, 1)
|
|
||||||
.reject(meta -> StrUtil.isBlank(meta.getPulsarAddress()))
|
|
||||||
.forEach(meta -> {
|
|
||||||
try {
|
|
||||||
AtomicLong backlogCache = backlogMap.getIfAbsentPut(
|
|
||||||
meta.getAlias(),
|
|
||||||
registry.gauge(
|
|
||||||
Constants.METRICS_PULSAR_BACKLOG,
|
|
||||||
Lists.immutable.of(
|
|
||||||
Tag.of(Constants.METRICS_LABEL_FLINK_JOB_ID, meta.getJob().getId().toString()),
|
|
||||||
Tag.of(Constants.METRICS_LABEL_ALIAS, meta.getAlias()),
|
|
||||||
Tag.of(Constants.METRICS_LABEL_SCHEMA, meta.getSchema()),
|
|
||||||
Tag.of(Constants.METRICS_LABEL_TABLE, meta.getTable())
|
|
||||||
),
|
|
||||||
new AtomicLong(0)
|
|
||||||
)
|
|
||||||
);
|
|
||||||
String name = pulsarService.name(meta.getPulsarAddress());
|
|
||||||
if (StrUtil.isNotBlank(name)) {
|
|
||||||
Long backlog = pulsarService.backlog(name, meta.getTopic(), NameHelper.pulsarSubscriptionName(meta.getJob().getId(), meta.getAlias(), hudiServiceProperties.getSignature()));
|
|
||||||
backlogCache.set(backlog);
|
|
||||||
infoService.savePulsarBacklog(meta.getJob().getId(), meta.getAlias(), backlog);
|
|
||||||
}
|
|
||||||
} catch (Exception exception) {
|
|
||||||
logger.warn("Update pulsar backlog fail for {}", meta.getAlias(), exception);
|
|
||||||
}
|
|
||||||
});
|
|
||||||
}
|
|
||||||
}
|
|
||||||
@@ -6,10 +6,16 @@ import com.lanyuanxiaoyao.service.common.entity.FlinkJob;
|
|||||||
import com.lanyuanxiaoyao.service.common.utils.NameHelper;
|
import com.lanyuanxiaoyao.service.common.utils.NameHelper;
|
||||||
import com.lanyuanxiaoyao.service.configuration.ExecutorProvider;
|
import com.lanyuanxiaoyao.service.configuration.ExecutorProvider;
|
||||||
import com.lanyuanxiaoyao.service.configuration.entity.info.JobIdAndAlias;
|
import com.lanyuanxiaoyao.service.configuration.entity.info.JobIdAndAlias;
|
||||||
|
import com.lanyuanxiaoyao.service.configuration.entity.monitor.MetricsProgress;
|
||||||
import com.lanyuanxiaoyao.service.configuration.entity.yarn.YarnApplication;
|
import com.lanyuanxiaoyao.service.configuration.entity.yarn.YarnApplication;
|
||||||
import com.lanyuanxiaoyao.service.configuration.entity.yarn.YarnRootQueue;
|
import com.lanyuanxiaoyao.service.configuration.entity.yarn.YarnRootQueue;
|
||||||
import com.lanyuanxiaoyao.service.configuration.entity.zookeeper.ZookeeperNode;
|
import com.lanyuanxiaoyao.service.configuration.entity.zookeeper.ZookeeperNode;
|
||||||
import com.lanyuanxiaoyao.service.forest.service.*;
|
import com.lanyuanxiaoyao.service.forest.service.InfoService;
|
||||||
|
import com.lanyuanxiaoyao.service.forest.service.MonitorService;
|
||||||
|
import com.lanyuanxiaoyao.service.forest.service.QueueService;
|
||||||
|
import com.lanyuanxiaoyao.service.forest.service.ScheduleService;
|
||||||
|
import com.lanyuanxiaoyao.service.forest.service.YarnService;
|
||||||
|
import com.lanyuanxiaoyao.service.forest.service.ZookeeperService;
|
||||||
import com.lanyuanxiaoyao.service.web.controller.base.AmisCrudResponse;
|
import com.lanyuanxiaoyao.service.web.controller.base.AmisCrudResponse;
|
||||||
import com.lanyuanxiaoyao.service.web.controller.base.AmisMapResponse;
|
import com.lanyuanxiaoyao.service.web.controller.base.AmisMapResponse;
|
||||||
import com.lanyuanxiaoyao.service.web.controller.base.AmisResponse;
|
import com.lanyuanxiaoyao.service.web.controller.base.AmisResponse;
|
||||||
@@ -48,14 +54,16 @@ public class OverviewController extends BaseController {
|
|||||||
private final QueueService queueService;
|
private final QueueService queueService;
|
||||||
private final ScheduleService scheduleService;
|
private final ScheduleService scheduleService;
|
||||||
private final ZookeeperService zookeeperService;
|
private final ZookeeperService zookeeperService;
|
||||||
|
private final MonitorService monitorService;
|
||||||
|
|
||||||
@SuppressWarnings("SpringJavaInjectionPointsAutowiringInspection")
|
@SuppressWarnings("SpringJavaInjectionPointsAutowiringInspection")
|
||||||
public OverviewController(InfoService infoService, YarnService yarnService, QueueService queueService, ScheduleService scheduleService, ZookeeperService zookeeperService) {
|
public OverviewController(InfoService infoService, YarnService yarnService, QueueService queueService, ScheduleService scheduleService, ZookeeperService zookeeperService, MonitorService monitorService) {
|
||||||
this.infoService = infoService;
|
this.infoService = infoService;
|
||||||
this.yarnService = yarnService;
|
this.yarnService = yarnService;
|
||||||
this.queueService = queueService;
|
this.queueService = queueService;
|
||||||
this.scheduleService = scheduleService;
|
this.scheduleService = scheduleService;
|
||||||
this.zookeeperService = zookeeperService;
|
this.zookeeperService = zookeeperService;
|
||||||
|
this.monitorService = monitorService;
|
||||||
}
|
}
|
||||||
|
|
||||||
@GetMapping("")
|
@GetMapping("")
|
||||||
@@ -207,4 +215,12 @@ public class OverviewController extends BaseController {
|
|||||||
.setData("unRunningTable", unRunningTable.size())
|
.setData("unRunningTable", unRunningTable.size())
|
||||||
.setData("unRunningTableList", unRunningTable);
|
.setData("unRunningTableList", unRunningTable);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@GetMapping("monitor_progress")
|
||||||
|
public AmisMapResponse monitorProgress() {
|
||||||
|
return AmisCrudResponse.responseCrudData(
|
||||||
|
monitorService.progress()
|
||||||
|
.collect(p -> new MetricsProgress(p.getName(), p.getRunning(), p.getProgress()))
|
||||||
|
);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -1,6 +1,6 @@
|
|||||||
const commonInfo = {
|
const commonInfo = {
|
||||||
// baseUrl: 'http://132.126.207.130:35690/hudi_services/service_web',
|
baseUrl: 'http://132.126.207.130:35690/hudi_services/service_web',
|
||||||
baseUrl: '/hudi_services/service_web',
|
// baseUrl: '/hudi_services/service_web',
|
||||||
clusters: {
|
clusters: {
|
||||||
// hudi同步运行集群和yarn队列名称
|
// hudi同步运行集群和yarn队列名称
|
||||||
sync: {
|
sync: {
|
||||||
|
|||||||
@@ -408,26 +408,38 @@ function overviewTab() {
|
|||||||
}
|
}
|
||||||
]
|
]
|
||||||
},
|
},
|
||||||
/*{type: 'divider'},
|
{type: 'divider'},
|
||||||
{
|
{
|
||||||
type: 'service',
|
type: 'crud',
|
||||||
api: '${base}/overview/schedule_times',
|
title: '监控指标运行进度',
|
||||||
interval: 60000,
|
api: `\${base}/overview/monitor_progress`,
|
||||||
silentPolling: true,
|
...crudCommonOptions(),
|
||||||
body: [
|
interval: 2000,
|
||||||
'调度时间点',
|
loadDataOnce: true,
|
||||||
|
columns: [
|
||||||
{
|
{
|
||||||
type: 'each',
|
name: 'name',
|
||||||
name: 'items',
|
label: '名称',
|
||||||
className: 'grid',
|
width: 120,
|
||||||
items: {
|
},
|
||||||
type: 'tag',
|
{
|
||||||
color: '${color}',
|
name: 'running',
|
||||||
label: '${DATETOSTR(TIMESTAMP(time, \'x\'), \'HH:mm:ss\')}'
|
label: '状态',
|
||||||
|
type: 'mapping',
|
||||||
|
width: 50,
|
||||||
|
map: {
|
||||||
|
'true': '运行中',
|
||||||
|
'false': '未运行',
|
||||||
}
|
}
|
||||||
}
|
},
|
||||||
|
{
|
||||||
|
label: '进度',
|
||||||
|
type: 'progress',
|
||||||
|
value: '${ROUND(progress * 100)}',
|
||||||
|
map: 'bg-primary',
|
||||||
|
},
|
||||||
]
|
]
|
||||||
}*/
|
}
|
||||||
]
|
]
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
Reference in New Issue
Block a user