feat(monitor): 增加关于hudi表文件数的监控指标
This commit is contained in:
@@ -0,0 +1,94 @@
|
||||
package com.lanyuanxiaoyao.service.monitor.metric;
|
||||
|
||||
import cn.hutool.core.util.StrUtil;
|
||||
import com.lanyuanxiaoyao.service.common.Constants;
|
||||
import com.lanyuanxiaoyao.service.configuration.ExecutorProvider;
|
||||
import com.lanyuanxiaoyao.service.forest.service.HudiService;
|
||||
import com.lanyuanxiaoyao.service.forest.service.InfoService;
|
||||
import io.micrometer.core.instrument.MeterRegistry;
|
||||
import io.micrometer.core.instrument.Tag;
|
||||
import java.util.concurrent.atomic.AtomicLong;
|
||||
import org.eclipse.collections.api.factory.Lists;
|
||||
import org.eclipse.collections.api.factory.Maps;
|
||||
import org.eclipse.collections.api.map.MutableMap;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import org.springframework.scheduling.annotation.Scheduled;
|
||||
import org.springframework.stereotype.Service;
|
||||
|
||||
/**
|
||||
* Hudi表相关指标
|
||||
*
|
||||
* @author lanyuanxiaoyao
|
||||
* @date 2024-03-05
|
||||
*/
|
||||
@SuppressWarnings("SpringJavaInjectionPointsAutowiringInspection")
|
||||
@Service
|
||||
public class HudiMetrics extends Metrics {
|
||||
private static final Logger logger = LoggerFactory.getLogger(HudiMetrics.class);
|
||||
|
||||
private final MeterRegistry registry;
|
||||
private final InfoService infoService;
|
||||
private final HudiService hudiService;
|
||||
|
||||
private final MutableMap<String, AtomicLong> fileCountCacheMap;
|
||||
private final MutableMap<String, AtomicLong> timelineFileCountCacheMap;
|
||||
|
||||
public HudiMetrics(MeterRegistry registry, InfoService infoService, HudiService hudiService) {
|
||||
this.registry = registry;
|
||||
this.infoService = infoService;
|
||||
this.hudiService = hudiService;
|
||||
|
||||
fileCountCacheMap = Maps.mutable.empty();
|
||||
timelineFileCountCacheMap = Maps.mutable.empty();
|
||||
}
|
||||
|
||||
@Scheduled(cron = "0 30 * * * ?")
|
||||
@Override
|
||||
public void update() {
|
||||
infoService.tableMetaList()
|
||||
.asParallel(ExecutorProvider.EXECUTORS_10, 1)
|
||||
.reject(meta -> StrUtil.isBlank(meta.getPulsarAddress()))
|
||||
.forEach(meta -> {
|
||||
try {
|
||||
AtomicLong filecountCache = fileCountCacheMap.getIfAbsentPut(
|
||||
meta.getAlias(),
|
||||
registry.gauge(
|
||||
Constants.METRICS_HUDI_TABLE_FILE_COUNT,
|
||||
Lists.immutable.of(
|
||||
Tag.of(Constants.METRICS_LABEL_FLINK_JOB_ID, meta.getJob().getId().toString()),
|
||||
Tag.of(Constants.METRICS_LABEL_ALIAS, meta.getAlias()),
|
||||
Tag.of(Constants.METRICS_LABEL_SCHEMA, meta.getSchema()),
|
||||
Tag.of(Constants.METRICS_LABEL_TABLE, meta.getTable())
|
||||
),
|
||||
new AtomicLong(0)
|
||||
)
|
||||
);
|
||||
AtomicLong timelineFileCountCache = timelineFileCountCacheMap.getIfAbsentPut(
|
||||
meta.getAlias(),
|
||||
registry.gauge(
|
||||
Constants.METRICS_HUDI_TABLE_TIMELINE_FILE_COUNT,
|
||||
Lists.immutable.of(
|
||||
Tag.of(Constants.METRICS_LABEL_FLINK_JOB_ID, meta.getJob().getId().toString()),
|
||||
Tag.of(Constants.METRICS_LABEL_ALIAS, meta.getAlias()),
|
||||
Tag.of(Constants.METRICS_LABEL_SCHEMA, meta.getSchema()),
|
||||
Tag.of(Constants.METRICS_LABEL_TABLE, meta.getTable())
|
||||
),
|
||||
new AtomicLong(0)
|
||||
)
|
||||
);
|
||||
|
||||
String hdfs = meta.getHudi().getTargetHdfsPath();
|
||||
if (hudiService.existsHudiTable(hdfs)) {
|
||||
Long count = hudiService.fileCount(hdfs);
|
||||
filecountCache.set(count);
|
||||
|
||||
String timelineHdfs = hdfs + "/.hoodie";
|
||||
timelineFileCountCache.set(hudiService.fileCount(timelineHdfs));
|
||||
}
|
||||
} catch (Exception exception) {
|
||||
logger.warn("Get file count fail for {}", meta.getAlias(), exception);
|
||||
}
|
||||
});
|
||||
}
|
||||
}
|
||||
@@ -75,7 +75,7 @@ public class PulsarMetrics extends Metrics {
|
||||
infoService.savePulsarBacklog(meta.getJob().getId(), meta.getAlias(), backlog);
|
||||
}
|
||||
} catch (Exception exception) {
|
||||
logger.warn("Update pulsar backlog fail for " + meta.getAlias(), exception);
|
||||
logger.warn("Update pulsar backlog fail for {}", meta.getAlias(), exception);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user