diff --git a/service-monitor/src/main/java/com/lanyuanxiaoyao/service/monitor/metric/PulsarMetrics.java b/service-monitor/src/main/java/com/lanyuanxiaoyao/service/monitor/metric/PulsarMetrics.java index f0d1458..fa71a0c 100644 --- a/service-monitor/src/main/java/com/lanyuanxiaoyao/service/monitor/metric/PulsarMetrics.java +++ b/service-monitor/src/main/java/com/lanyuanxiaoyao/service/monitor/metric/PulsarMetrics.java @@ -3,7 +3,6 @@ package com.lanyuanxiaoyao.service.monitor.metric; import cn.hutool.core.util.StrUtil; import com.lanyuanxiaoyao.service.common.Constants; import com.lanyuanxiaoyao.service.common.utils.NameHelper; -import com.lanyuanxiaoyao.service.configuration.ExecutorProvider; import com.lanyuanxiaoyao.service.forest.service.InfoService; import com.lanyuanxiaoyao.service.forest.service.PulsarService; import io.micrometer.core.instrument.MeterRegistry; @@ -12,6 +11,7 @@ import java.util.Map; import java.util.concurrent.atomic.AtomicLong; import org.eclipse.collections.api.factory.Lists; import org.eclipse.collections.api.factory.Maps; +import org.eclipse.collections.api.map.MutableMap; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.scheduling.annotation.Scheduled; @@ -34,7 +34,7 @@ public class PulsarMetrics extends Metrics { private final InfoService infoService; private final PulsarService pulsarService; - private final Map backlogMap; + private final MutableMap backlogMap; public PulsarMetrics(MeterRegistry registry, InfoService infoService, PulsarService pulsarService) { this.registry = registry; @@ -52,9 +52,7 @@ public class PulsarMetrics extends Metrics { .reject(meta -> StrUtil.isBlank(meta.getPulsarAddress())) .forEach(meta -> { try { - String name = pulsarService.name(meta.getPulsarAddress()); - Long backlog = pulsarService.backlog(name, meta.getTopic(), NameHelper.pulsarSubscriptionName(meta.getJob().getId(), meta.getAlias())); - AtomicLong backlogCache = backlogMap.getOrDefault( + AtomicLong backlogCache = backlogMap.getIfAbsentPut( meta.getAlias(), registry.gauge( Constants.METRICS_PULSAR_BACKLOG, @@ -64,12 +62,15 @@ public class PulsarMetrics extends Metrics { Tag.of(Constants.METRICS_LABEL_SCHEMA, meta.getSchema()), Tag.of(Constants.METRICS_LABEL_TABLE, meta.getTable()) ), - new AtomicLong() + new AtomicLong(0) ) ); - backlogCache.set(backlog); - } catch (Exception e) { - logger.warn("Something bad for " + meta.getAlias(), e); + String name = pulsarService.name(meta.getPulsarAddress()); + if (StrUtil.isNotBlank(name)) { + Long backlog = pulsarService.backlog(name, meta.getTopic(), NameHelper.pulsarSubscriptionName(meta.getJob().getId(), meta.getAlias())); + backlogCache.set(backlog); + } + } catch (Exception ignored) { } }); }