feat(monitor): 增加处理线程 输出错误日志
This commit is contained in:
@@ -3,6 +3,7 @@ package com.lanyuanxiaoyao.service.monitor.metric;
|
|||||||
import cn.hutool.core.util.StrUtil;
|
import cn.hutool.core.util.StrUtil;
|
||||||
import com.lanyuanxiaoyao.service.common.Constants;
|
import com.lanyuanxiaoyao.service.common.Constants;
|
||||||
import com.lanyuanxiaoyao.service.common.utils.NameHelper;
|
import com.lanyuanxiaoyao.service.common.utils.NameHelper;
|
||||||
|
import com.lanyuanxiaoyao.service.configuration.ExecutorProvider;
|
||||||
import com.lanyuanxiaoyao.service.configuration.HudiServiceProperties;
|
import com.lanyuanxiaoyao.service.configuration.HudiServiceProperties;
|
||||||
import com.lanyuanxiaoyao.service.forest.service.InfoService;
|
import com.lanyuanxiaoyao.service.forest.service.InfoService;
|
||||||
import com.lanyuanxiaoyao.service.forest.service.PulsarService;
|
import com.lanyuanxiaoyao.service.forest.service.PulsarService;
|
||||||
@@ -17,7 +18,6 @@ import org.slf4j.LoggerFactory;
|
|||||||
import org.springframework.scheduling.annotation.Scheduled;
|
import org.springframework.scheduling.annotation.Scheduled;
|
||||||
import org.springframework.stereotype.Service;
|
import org.springframework.stereotype.Service;
|
||||||
|
|
||||||
import static com.lanyuanxiaoyao.service.common.Constants.HALF_HOUR;
|
|
||||||
import static com.lanyuanxiaoyao.service.common.Constants.MINUTE;
|
import static com.lanyuanxiaoyao.service.common.Constants.MINUTE;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@@ -47,11 +47,11 @@ public class PulsarMetrics extends Metrics {
|
|||||||
backlogMap = Maps.mutable.empty();
|
backlogMap = Maps.mutable.empty();
|
||||||
}
|
}
|
||||||
|
|
||||||
@Scheduled(fixedDelay = HALF_HOUR, initialDelay = MINUTE)
|
@Scheduled(fixedDelay = 10 * MINUTE, initialDelay = MINUTE)
|
||||||
@Override
|
@Override
|
||||||
void update() {
|
void update() {
|
||||||
infoService.tableMetaList()
|
infoService.tableMetaList()
|
||||||
// .asParallel(ExecutorProvider.EXECUTORS, 50)
|
.asParallel(ExecutorProvider.EXECUTORS, 2)
|
||||||
.reject(meta -> StrUtil.isBlank(meta.getPulsarAddress()))
|
.reject(meta -> StrUtil.isBlank(meta.getPulsarAddress()))
|
||||||
.forEach(meta -> {
|
.forEach(meta -> {
|
||||||
try {
|
try {
|
||||||
@@ -74,7 +74,8 @@ public class PulsarMetrics extends Metrics {
|
|||||||
backlogCache.set(backlog);
|
backlogCache.set(backlog);
|
||||||
infoService.savePulsarBacklog(meta.getJob().getId(), meta.getAlias(), backlog);
|
infoService.savePulsarBacklog(meta.getJob().getId(), meta.getAlias(), backlog);
|
||||||
}
|
}
|
||||||
} catch (Exception ignored) {
|
} catch (Exception exception) {
|
||||||
|
logger.warn("Update pulsar backlog fail for " + meta.getAlias(), exception);
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user