diff --git a/service-common/src/main/java/com/lanyuanxiaoyao/service/common/Constants.java b/service-common/src/main/java/com/lanyuanxiaoyao/service/common/Constants.java index 4d86b34..8c4daac 100644 --- a/service-common/src/main/java/com/lanyuanxiaoyao/service/common/Constants.java +++ b/service-common/src/main/java/com/lanyuanxiaoyao/service/common/Constants.java @@ -107,13 +107,13 @@ public interface Constants { String METRICS_SYNC_SOURCE_CHANGE_PARTITION = METRICS_SYNC_PREFIX + "_source_change_partition"; String METRICS_SYNC_SOURCE_BACK_LOGS = METRICS_SYNC_PREFIX + "_source_back_logs"; - String METRICS_COMPACTION_PREFIX = METRICS_PREFIX + "_compaction"; - String METRICS_COMPACTION_SUBMIT = METRICS_COMPACTION_PREFIX + "_submit"; - String METRICS_COMPACTION_SUBMIT_COST_MS = METRICS_COMPACTION_SUBMIT + "_cost_ms"; - String METRICS_COMPACTION_SUBMIT_GET_TABLE_INFO_COST_MS = METRICS_COMPACTION_SUBMIT + "_get_table_info_cost_ms"; - String METRICS_COMPACTION_SUBMIT_GET_HUDI_TABLE_EXISTS_COST_MS = METRICS_COMPACTION_SUBMIT + "_get_hudi_table_exists_cost_ms"; - String METRICS_COMPACTION_SUBMIT_GET_COMPACT_INSTANTS_COST_MS = METRICS_COMPACTION_SUBMIT + "_get_compact_instants_cost_ms"; - String METRICS_COMPACTION_SUBMIT_YARN_JOB_SUBMIT_COST_MS = METRICS_COMPACTION_SUBMIT + "_yarn_job_submit_cost_ms"; + String METRICS_LAUNCHER_PREFIX = METRICS_PREFIX + "_launcher"; + String METRICS_LAUNCHER_COMPACTION_SUBMIT = METRICS_LAUNCHER_PREFIX + "_compaction_submit"; + String METRICS_LAUNCHER_COMPACTION_SUBMIT_COST_MS = METRICS_LAUNCHER_COMPACTION_SUBMIT + "_time"; + String METRICS_LAUNCHER_COMPACTION_SUBMIT_GET_TABLE_INFO_COST_MS = METRICS_LAUNCHER_COMPACTION_SUBMIT + "_get_table_info_time"; + String METRICS_LAUNCHER_COMPACTION_SUBMIT_GET_HUDI_TABLE_EXISTS_COST_MS = METRICS_LAUNCHER_COMPACTION_SUBMIT + "_get_hudi_table_exists_time"; + String METRICS_LAUNCHER_COMPACTION_SUBMIT_GET_COMPACT_INSTANTS_COST_MS = METRICS_LAUNCHER_COMPACTION_SUBMIT + "_get_compact_instants_time"; + String METRICS_LAUNCHER_COMPACTION_SUBMIT_YARN_JOB_SUBMIT_COST_MS = METRICS_LAUNCHER_COMPACTION_SUBMIT + "_yarn_job_submit_time"; String METRICS_QUEUE_PREFIX = METRICS_PREFIX + "_queue"; String METRICS_QUEUE_SIZE = METRICS_QUEUE_PREFIX + "_size"; diff --git a/service-launcher/src/main/java/com/lanyuanxiaoyao/service/launcher/compaction/service/CompactionService.java b/service-launcher/src/main/java/com/lanyuanxiaoyao/service/launcher/compaction/service/CompactionService.java index d0dc519..a990090 100644 --- a/service-launcher/src/main/java/com/lanyuanxiaoyao/service/launcher/compaction/service/CompactionService.java +++ b/service-launcher/src/main/java/com/lanyuanxiaoyao/service/launcher/compaction/service/CompactionService.java @@ -25,6 +25,7 @@ import dev.failsafe.Failsafe; import dev.failsafe.RetryPolicy; import io.micrometer.core.instrument.MeterRegistry; import io.micrometer.core.instrument.Tag; +import io.micrometer.core.instrument.Timer; import java.time.Duration; import java.time.Instant; import java.util.concurrent.Executors; @@ -72,11 +73,11 @@ public class CompactionService { private final ExecutorService executorService; private final ObjectMapper mapper; // 关键指标 - private final AtomicLong compactionJobSubmitCost; - private final AtomicLong compactionJobGetTableInfoCost; - private final AtomicLong compactionJobGetHudiTableExistsCost; - private final AtomicLong compactionJobGetCompactInstantsCost; - private final AtomicLong compactionJobYarnJobSubmitCost; + private final Timer compactionJobSubmitCost; + private final Timer compactionJobGetTableInfoCost; + private final Timer compactionJobGetHudiTableExistsCost; + private final Timer compactionJobGetCompactInstantsCost; + private final Timer compactionJobYarnJobSubmitCost; @SuppressWarnings("SpringJavaInjectionPointsAutowiringInspection") public CompactionService(HadoopConfiguration hadoopConfiguration, ClusterConfiguration clusterConfiguration, ZookeeperConfiguration zookeeperConfiguration, DiscoveryClient discoveryClient, InfoService infoService, HudiService hudiService, ExecutorService executorService, Jackson2ObjectMapperBuilder builder, MeterRegistry registry) { @@ -95,40 +96,35 @@ public class CompactionService { .build(); this.zookeeperClient.start(); - this.compactionJobSubmitCost = registry.gauge( - Constants.METRICS_COMPACTION_SUBMIT_COST_MS, + this.compactionJobSubmitCost = registry.timer( + Constants.METRICS_LAUNCHER_COMPACTION_SUBMIT_COST_MS, Lists.immutable.of( Tag.of(Constants.METRICS_LABEL_CLUSTER, clusterConfiguration.getCluster()) - ), - new AtomicLong(0) + ) ); - this.compactionJobGetTableInfoCost = registry.gauge( - Constants.METRICS_COMPACTION_SUBMIT_GET_TABLE_INFO_COST_MS, + this.compactionJobGetTableInfoCost = registry.timer( + Constants.METRICS_LAUNCHER_COMPACTION_SUBMIT_GET_TABLE_INFO_COST_MS, Lists.immutable.of( Tag.of(Constants.METRICS_LABEL_CLUSTER, clusterConfiguration.getCluster()) - ), - new AtomicLong(0) + ) ); - this.compactionJobGetHudiTableExistsCost = registry.gauge( - Constants.METRICS_COMPACTION_SUBMIT_GET_HUDI_TABLE_EXISTS_COST_MS, + this.compactionJobGetHudiTableExistsCost = registry.timer( + Constants.METRICS_LAUNCHER_COMPACTION_SUBMIT_GET_HUDI_TABLE_EXISTS_COST_MS, Lists.immutable.of( Tag.of(Constants.METRICS_LABEL_CLUSTER, clusterConfiguration.getCluster()) - ), - new AtomicLong(0) + ) ); - this.compactionJobGetCompactInstantsCost = registry.gauge( - Constants.METRICS_COMPACTION_SUBMIT_GET_COMPACT_INSTANTS_COST_MS, + this.compactionJobGetCompactInstantsCost = registry.timer( + Constants.METRICS_LAUNCHER_COMPACTION_SUBMIT_GET_COMPACT_INSTANTS_COST_MS, Lists.immutable.of( Tag.of(Constants.METRICS_LABEL_CLUSTER, clusterConfiguration.getCluster()) - ), - new AtomicLong(0) + ) ); - this.compactionJobYarnJobSubmitCost = registry.gauge( - Constants.METRICS_COMPACTION_SUBMIT_YARN_JOB_SUBMIT_COST_MS, + this.compactionJobYarnJobSubmitCost = registry.timer( + Constants.METRICS_LAUNCHER_COMPACTION_SUBMIT_YARN_JOB_SUBMIT_COST_MS, Lists.immutable.of( Tag.of(Constants.METRICS_LABEL_CLUSTER, clusterConfiguration.getCluster()) - ), - new AtomicLong(0) + ) ); } @@ -205,7 +201,7 @@ public class CompactionService { } public void compact(String batch, Long flinkJobId, String alias) throws Exception { - long submitStartTime = Instant.now().toEpochMilli(); + Instant submitStartTime = Instant.now(); // 构造任务相关的锁 String lockPath = NameHelper.compactionLauncherLockPath(flinkJobId, alias); InterProcessLock lock = new InterProcessMutex(zookeeperClient, lockPath); @@ -217,10 +213,10 @@ public class CompactionService { throw new JobCannotRunningException(); } - long getTableInfoStartTime = Instant.now().toEpochMilli(); + Instant getTableInfoStartTime = Instant.now(); FlinkJob flinkJob = infoService.flinkJobDetail(flinkJobId); TableMeta meta = infoService.tableMetaDetail(flinkJobId, alias); - compactionJobGetTableInfoCost.set(Instant.now().toEpochMilli() - getTableInfoStartTime); + compactionJobGetTableInfoCost.record(Duration.between(getTableInfoStartTime, Instant.now())); if (TableMetaHelper.existsTag(meta, Constants.TAGS_NO_COMPACT)) { logger.warn("[{}] [{}] Table tags no compact", flinkJob.getId(), meta.getAlias()); @@ -229,9 +225,9 @@ public class CompactionService { logger.info("[{}] [{}] Execute job", flinkJob.getId(), meta.getAlias()); // 判断是否存在 Hudi 表,提前结束掉 - long getHudiTableExistsStartTime = Instant.now().toEpochMilli(); + Instant getHudiTableExistsStartTime = Instant.now(); boolean existsHudiTable = hudiService.existsHudiTable(flinkJob.getId(), meta.getAlias()); - compactionJobGetHudiTableExistsCost.set(Instant.now().toEpochMilli() - getHudiTableExistsStartTime); + compactionJobGetHudiTableExistsCost.record(Duration.between(getHudiTableExistsStartTime, Instant.now())); if (!existsHudiTable) { logger.info("[{}] [{}] Hudi table not found", flinkJob.getId(), meta.getAlias()); @@ -239,9 +235,9 @@ public class CompactionService { } // 获取待压缩的时间点 - long getCompactInstantsStartTime = Instant.now().toEpochMilli(); + Instant getCompactInstantsStartTime = Instant.now(); ImmutableList selectedInstants = hudiService.timelinePendingCompactionList(flinkJob.getId(), meta.getAlias()); - compactionJobGetCompactInstantsCost.set(Instant.now().toEpochMilli() - getCompactInstantsStartTime); + compactionJobGetCompactInstantsCost.record(Duration.between(getCompactInstantsStartTime, Instant.now())); if (ObjectUtil.isEmpty(selectedInstants)) { logger.info("[{}] [{}] Table not need to compact", flinkJob.getId(), meta.getAlias()); @@ -259,7 +255,7 @@ public class CompactionService { logger.info("[{}] [{}] Execution", flinkJob.getId(), meta.getAlias()); String applicationId = Failsafe.with(RETRY_POLICY) .get(() -> { - long yarnJobSubmitStartTime = Instant.now().toEpochMilli(); + Instant yarnJobSubmitStartTime = Instant.now(); String id = executorService.runCompaction( batch, flinkJob, @@ -269,12 +265,12 @@ public class CompactionService { selectedInstants.collect(HudiInstant::getTimestamp).makeString(","), clusterConfiguration.getCluster() ).toString(); - compactionJobYarnJobSubmitCost.set(Instant.now().toEpochMilli() - yarnJobSubmitStartTime); + compactionJobYarnJobSubmitCost.record(Duration.between(yarnJobSubmitStartTime, Instant.now())); return id; }); // 记录任务提交耗时 - compactionJobSubmitCost.set(Instant.now().toEpochMilli() - submitStartTime); + compactionJobSubmitCost.record(Duration.between(submitStartTime, Instant.now())); Failsafe.with(RETRY_POLICY) .run(() -> infoService.saveCompactionId(flinkJob.getId(), meta.getAlias(), applicationId));