fix(monitor): 修复pulsar backlog查询指标为NaN

Map.getOrDefault只会保证一个默认值,但不会重新放入map里,导致每次都新创建一个指标,重复的guage指标会覆盖导致指标值为NaN
This commit is contained in:
v-zhangjc9
2024-03-08 16:10:45 +08:00
parent e1bc8ac20c
commit 8cbbe97381

View File

@@ -3,7 +3,6 @@ package com.lanyuanxiaoyao.service.monitor.metric;
import cn.hutool.core.util.StrUtil;
import com.lanyuanxiaoyao.service.common.Constants;
import com.lanyuanxiaoyao.service.common.utils.NameHelper;
import com.lanyuanxiaoyao.service.configuration.ExecutorProvider;
import com.lanyuanxiaoyao.service.forest.service.InfoService;
import com.lanyuanxiaoyao.service.forest.service.PulsarService;
import io.micrometer.core.instrument.MeterRegistry;
@@ -12,6 +11,7 @@ import java.util.Map;
import java.util.concurrent.atomic.AtomicLong;
import org.eclipse.collections.api.factory.Lists;
import org.eclipse.collections.api.factory.Maps;
import org.eclipse.collections.api.map.MutableMap;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.scheduling.annotation.Scheduled;
@@ -34,7 +34,7 @@ public class PulsarMetrics extends Metrics {
private final InfoService infoService;
private final PulsarService pulsarService;
private final Map<String, AtomicLong> backlogMap;
private final MutableMap<String, AtomicLong> backlogMap;
public PulsarMetrics(MeterRegistry registry, InfoService infoService, PulsarService pulsarService) {
this.registry = registry;
@@ -52,9 +52,7 @@ public class PulsarMetrics extends Metrics {
.reject(meta -> StrUtil.isBlank(meta.getPulsarAddress()))
.forEach(meta -> {
try {
String name = pulsarService.name(meta.getPulsarAddress());
Long backlog = pulsarService.backlog(name, meta.getTopic(), NameHelper.pulsarSubscriptionName(meta.getJob().getId(), meta.getAlias()));
AtomicLong backlogCache = backlogMap.getOrDefault(
AtomicLong backlogCache = backlogMap.getIfAbsentPut(
meta.getAlias(),
registry.gauge(
Constants.METRICS_PULSAR_BACKLOG,
@@ -64,12 +62,15 @@ public class PulsarMetrics extends Metrics {
Tag.of(Constants.METRICS_LABEL_SCHEMA, meta.getSchema()),
Tag.of(Constants.METRICS_LABEL_TABLE, meta.getTable())
),
new AtomicLong()
new AtomicLong(0)
)
);
backlogCache.set(backlog);
} catch (Exception e) {
logger.warn("Something bad for " + meta.getAlias(), e);
String name = pulsarService.name(meta.getPulsarAddress());
if (StrUtil.isNotBlank(name)) {
Long backlog = pulsarService.backlog(name, meta.getTopic(), NameHelper.pulsarSubscriptionName(meta.getJob().getId(), meta.getAlias()));
backlogCache.set(backlog);
}
} catch (Exception ignored) {
}
});
}