fix(launcher): 规范指标名称 使用timer规范时间指标的输出

This commit is contained in:
v-zhangjc9
2024-04-30 10:15:43 +08:00
parent b36311863c
commit 97e1f86465
2 changed files with 38 additions and 42 deletions

View File

@@ -107,13 +107,13 @@ public interface Constants {
String METRICS_SYNC_SOURCE_CHANGE_PARTITION = METRICS_SYNC_PREFIX + "_source_change_partition"; String METRICS_SYNC_SOURCE_CHANGE_PARTITION = METRICS_SYNC_PREFIX + "_source_change_partition";
String METRICS_SYNC_SOURCE_BACK_LOGS = METRICS_SYNC_PREFIX + "_source_back_logs"; String METRICS_SYNC_SOURCE_BACK_LOGS = METRICS_SYNC_PREFIX + "_source_back_logs";
String METRICS_COMPACTION_PREFIX = METRICS_PREFIX + "_compaction"; String METRICS_LAUNCHER_PREFIX = METRICS_PREFIX + "_launcher";
String METRICS_COMPACTION_SUBMIT = METRICS_COMPACTION_PREFIX + "_submit"; String METRICS_LAUNCHER_COMPACTION_SUBMIT = METRICS_LAUNCHER_PREFIX + "_compaction_submit";
String METRICS_COMPACTION_SUBMIT_COST_MS = METRICS_COMPACTION_SUBMIT + "_cost_ms"; String METRICS_LAUNCHER_COMPACTION_SUBMIT_COST_MS = METRICS_LAUNCHER_COMPACTION_SUBMIT + "_time";
String METRICS_COMPACTION_SUBMIT_GET_TABLE_INFO_COST_MS = METRICS_COMPACTION_SUBMIT + "_get_table_info_cost_ms"; String METRICS_LAUNCHER_COMPACTION_SUBMIT_GET_TABLE_INFO_COST_MS = METRICS_LAUNCHER_COMPACTION_SUBMIT + "_get_table_info_time";
String METRICS_COMPACTION_SUBMIT_GET_HUDI_TABLE_EXISTS_COST_MS = METRICS_COMPACTION_SUBMIT + "_get_hudi_table_exists_cost_ms"; String METRICS_LAUNCHER_COMPACTION_SUBMIT_GET_HUDI_TABLE_EXISTS_COST_MS = METRICS_LAUNCHER_COMPACTION_SUBMIT + "_get_hudi_table_exists_time";
String METRICS_COMPACTION_SUBMIT_GET_COMPACT_INSTANTS_COST_MS = METRICS_COMPACTION_SUBMIT + "_get_compact_instants_cost_ms"; String METRICS_LAUNCHER_COMPACTION_SUBMIT_GET_COMPACT_INSTANTS_COST_MS = METRICS_LAUNCHER_COMPACTION_SUBMIT + "_get_compact_instants_time";
String METRICS_COMPACTION_SUBMIT_YARN_JOB_SUBMIT_COST_MS = METRICS_COMPACTION_SUBMIT + "_yarn_job_submit_cost_ms"; String METRICS_LAUNCHER_COMPACTION_SUBMIT_YARN_JOB_SUBMIT_COST_MS = METRICS_LAUNCHER_COMPACTION_SUBMIT + "_yarn_job_submit_time";
String METRICS_QUEUE_PREFIX = METRICS_PREFIX + "_queue"; String METRICS_QUEUE_PREFIX = METRICS_PREFIX + "_queue";
String METRICS_QUEUE_SIZE = METRICS_QUEUE_PREFIX + "_size"; String METRICS_QUEUE_SIZE = METRICS_QUEUE_PREFIX + "_size";

View File

@@ -25,6 +25,7 @@ import dev.failsafe.Failsafe;
import dev.failsafe.RetryPolicy; import dev.failsafe.RetryPolicy;
import io.micrometer.core.instrument.MeterRegistry; import io.micrometer.core.instrument.MeterRegistry;
import io.micrometer.core.instrument.Tag; import io.micrometer.core.instrument.Tag;
import io.micrometer.core.instrument.Timer;
import java.time.Duration; import java.time.Duration;
import java.time.Instant; import java.time.Instant;
import java.util.concurrent.Executors; import java.util.concurrent.Executors;
@@ -72,11 +73,11 @@ public class CompactionService {
private final ExecutorService executorService; private final ExecutorService executorService;
private final ObjectMapper mapper; private final ObjectMapper mapper;
// 关键指标 // 关键指标
private final AtomicLong compactionJobSubmitCost; private final Timer compactionJobSubmitCost;
private final AtomicLong compactionJobGetTableInfoCost; private final Timer compactionJobGetTableInfoCost;
private final AtomicLong compactionJobGetHudiTableExistsCost; private final Timer compactionJobGetHudiTableExistsCost;
private final AtomicLong compactionJobGetCompactInstantsCost; private final Timer compactionJobGetCompactInstantsCost;
private final AtomicLong compactionJobYarnJobSubmitCost; private final Timer compactionJobYarnJobSubmitCost;
@SuppressWarnings("SpringJavaInjectionPointsAutowiringInspection") @SuppressWarnings("SpringJavaInjectionPointsAutowiringInspection")
public CompactionService(HadoopConfiguration hadoopConfiguration, ClusterConfiguration clusterConfiguration, ZookeeperConfiguration zookeeperConfiguration, DiscoveryClient discoveryClient, InfoService infoService, HudiService hudiService, ExecutorService executorService, Jackson2ObjectMapperBuilder builder, MeterRegistry registry) { public CompactionService(HadoopConfiguration hadoopConfiguration, ClusterConfiguration clusterConfiguration, ZookeeperConfiguration zookeeperConfiguration, DiscoveryClient discoveryClient, InfoService infoService, HudiService hudiService, ExecutorService executorService, Jackson2ObjectMapperBuilder builder, MeterRegistry registry) {
@@ -95,40 +96,35 @@ public class CompactionService {
.build(); .build();
this.zookeeperClient.start(); this.zookeeperClient.start();
this.compactionJobSubmitCost = registry.gauge( this.compactionJobSubmitCost = registry.timer(
Constants.METRICS_COMPACTION_SUBMIT_COST_MS, Constants.METRICS_LAUNCHER_COMPACTION_SUBMIT_COST_MS,
Lists.immutable.of( Lists.immutable.of(
Tag.of(Constants.METRICS_LABEL_CLUSTER, clusterConfiguration.getCluster()) Tag.of(Constants.METRICS_LABEL_CLUSTER, clusterConfiguration.getCluster())
), )
new AtomicLong(0)
); );
this.compactionJobGetTableInfoCost = registry.gauge( this.compactionJobGetTableInfoCost = registry.timer(
Constants.METRICS_COMPACTION_SUBMIT_GET_TABLE_INFO_COST_MS, Constants.METRICS_LAUNCHER_COMPACTION_SUBMIT_GET_TABLE_INFO_COST_MS,
Lists.immutable.of( Lists.immutable.of(
Tag.of(Constants.METRICS_LABEL_CLUSTER, clusterConfiguration.getCluster()) Tag.of(Constants.METRICS_LABEL_CLUSTER, clusterConfiguration.getCluster())
), )
new AtomicLong(0)
); );
this.compactionJobGetHudiTableExistsCost = registry.gauge( this.compactionJobGetHudiTableExistsCost = registry.timer(
Constants.METRICS_COMPACTION_SUBMIT_GET_HUDI_TABLE_EXISTS_COST_MS, Constants.METRICS_LAUNCHER_COMPACTION_SUBMIT_GET_HUDI_TABLE_EXISTS_COST_MS,
Lists.immutable.of( Lists.immutable.of(
Tag.of(Constants.METRICS_LABEL_CLUSTER, clusterConfiguration.getCluster()) Tag.of(Constants.METRICS_LABEL_CLUSTER, clusterConfiguration.getCluster())
), )
new AtomicLong(0)
); );
this.compactionJobGetCompactInstantsCost = registry.gauge( this.compactionJobGetCompactInstantsCost = registry.timer(
Constants.METRICS_COMPACTION_SUBMIT_GET_COMPACT_INSTANTS_COST_MS, Constants.METRICS_LAUNCHER_COMPACTION_SUBMIT_GET_COMPACT_INSTANTS_COST_MS,
Lists.immutable.of( Lists.immutable.of(
Tag.of(Constants.METRICS_LABEL_CLUSTER, clusterConfiguration.getCluster()) Tag.of(Constants.METRICS_LABEL_CLUSTER, clusterConfiguration.getCluster())
), )
new AtomicLong(0)
); );
this.compactionJobYarnJobSubmitCost = registry.gauge( this.compactionJobYarnJobSubmitCost = registry.timer(
Constants.METRICS_COMPACTION_SUBMIT_YARN_JOB_SUBMIT_COST_MS, Constants.METRICS_LAUNCHER_COMPACTION_SUBMIT_YARN_JOB_SUBMIT_COST_MS,
Lists.immutable.of( Lists.immutable.of(
Tag.of(Constants.METRICS_LABEL_CLUSTER, clusterConfiguration.getCluster()) Tag.of(Constants.METRICS_LABEL_CLUSTER, clusterConfiguration.getCluster())
), )
new AtomicLong(0)
); );
} }
@@ -205,7 +201,7 @@ public class CompactionService {
} }
public void compact(String batch, Long flinkJobId, String alias) throws Exception { public void compact(String batch, Long flinkJobId, String alias) throws Exception {
long submitStartTime = Instant.now().toEpochMilli(); Instant submitStartTime = Instant.now();
// 构造任务相关的锁 // 构造任务相关的锁
String lockPath = NameHelper.compactionLauncherLockPath(flinkJobId, alias); String lockPath = NameHelper.compactionLauncherLockPath(flinkJobId, alias);
InterProcessLock lock = new InterProcessMutex(zookeeperClient, lockPath); InterProcessLock lock = new InterProcessMutex(zookeeperClient, lockPath);
@@ -217,10 +213,10 @@ public class CompactionService {
throw new JobCannotRunningException(); throw new JobCannotRunningException();
} }
long getTableInfoStartTime = Instant.now().toEpochMilli(); Instant getTableInfoStartTime = Instant.now();
FlinkJob flinkJob = infoService.flinkJobDetail(flinkJobId); FlinkJob flinkJob = infoService.flinkJobDetail(flinkJobId);
TableMeta meta = infoService.tableMetaDetail(flinkJobId, alias); TableMeta meta = infoService.tableMetaDetail(flinkJobId, alias);
compactionJobGetTableInfoCost.set(Instant.now().toEpochMilli() - getTableInfoStartTime); compactionJobGetTableInfoCost.record(Duration.between(getTableInfoStartTime, Instant.now()));
if (TableMetaHelper.existsTag(meta, Constants.TAGS_NO_COMPACT)) { if (TableMetaHelper.existsTag(meta, Constants.TAGS_NO_COMPACT)) {
logger.warn("[{}] [{}] Table tags no compact", flinkJob.getId(), meta.getAlias()); logger.warn("[{}] [{}] Table tags no compact", flinkJob.getId(), meta.getAlias());
@@ -229,9 +225,9 @@ public class CompactionService {
logger.info("[{}] [{}] Execute job", flinkJob.getId(), meta.getAlias()); logger.info("[{}] [{}] Execute job", flinkJob.getId(), meta.getAlias());
// 判断是否存在 Hudi 表,提前结束掉 // 判断是否存在 Hudi 表,提前结束掉
long getHudiTableExistsStartTime = Instant.now().toEpochMilli(); Instant getHudiTableExistsStartTime = Instant.now();
boolean existsHudiTable = hudiService.existsHudiTable(flinkJob.getId(), meta.getAlias()); boolean existsHudiTable = hudiService.existsHudiTable(flinkJob.getId(), meta.getAlias());
compactionJobGetHudiTableExistsCost.set(Instant.now().toEpochMilli() - getHudiTableExistsStartTime); compactionJobGetHudiTableExistsCost.record(Duration.between(getHudiTableExistsStartTime, Instant.now()));
if (!existsHudiTable) { if (!existsHudiTable) {
logger.info("[{}] [{}] Hudi table not found", flinkJob.getId(), meta.getAlias()); logger.info("[{}] [{}] Hudi table not found", flinkJob.getId(), meta.getAlias());
@@ -239,9 +235,9 @@ public class CompactionService {
} }
// 获取待压缩的时间点 // 获取待压缩的时间点
long getCompactInstantsStartTime = Instant.now().toEpochMilli(); Instant getCompactInstantsStartTime = Instant.now();
ImmutableList<HudiInstant> selectedInstants = hudiService.timelinePendingCompactionList(flinkJob.getId(), meta.getAlias()); ImmutableList<HudiInstant> selectedInstants = hudiService.timelinePendingCompactionList(flinkJob.getId(), meta.getAlias());
compactionJobGetCompactInstantsCost.set(Instant.now().toEpochMilli() - getCompactInstantsStartTime); compactionJobGetCompactInstantsCost.record(Duration.between(getCompactInstantsStartTime, Instant.now()));
if (ObjectUtil.isEmpty(selectedInstants)) { if (ObjectUtil.isEmpty(selectedInstants)) {
logger.info("[{}] [{}] Table not need to compact", flinkJob.getId(), meta.getAlias()); logger.info("[{}] [{}] Table not need to compact", flinkJob.getId(), meta.getAlias());
@@ -259,7 +255,7 @@ public class CompactionService {
logger.info("[{}] [{}] Execution", flinkJob.getId(), meta.getAlias()); logger.info("[{}] [{}] Execution", flinkJob.getId(), meta.getAlias());
String applicationId = Failsafe.with(RETRY_POLICY) String applicationId = Failsafe.with(RETRY_POLICY)
.get(() -> { .get(() -> {
long yarnJobSubmitStartTime = Instant.now().toEpochMilli(); Instant yarnJobSubmitStartTime = Instant.now();
String id = executorService.runCompaction( String id = executorService.runCompaction(
batch, batch,
flinkJob, flinkJob,
@@ -269,12 +265,12 @@ public class CompactionService {
selectedInstants.collect(HudiInstant::getTimestamp).makeString(","), selectedInstants.collect(HudiInstant::getTimestamp).makeString(","),
clusterConfiguration.getCluster() clusterConfiguration.getCluster()
).toString(); ).toString();
compactionJobYarnJobSubmitCost.set(Instant.now().toEpochMilli() - yarnJobSubmitStartTime); compactionJobYarnJobSubmitCost.record(Duration.between(yarnJobSubmitStartTime, Instant.now()));
return id; return id;
}); });
// 记录任务提交耗时 // 记录任务提交耗时
compactionJobSubmitCost.set(Instant.now().toEpochMilli() - submitStartTime); compactionJobSubmitCost.record(Duration.between(submitStartTime, Instant.now()));
Failsafe.with(RETRY_POLICY) Failsafe.with(RETRY_POLICY)
.run(() -> infoService.saveCompactionId(flinkJob.getId(), meta.getAlias(), applicationId)); .run(() -> infoService.saveCompactionId(flinkJob.getId(), meta.getAlias(), applicationId));