From b30ce6d67558ea98e03d6e36dd9918d127197f09 Mon Sep 17 00:00:00 2001 From: v-zhangjc9 Date: Mon, 29 Apr 2024 15:07:53 +0800 Subject: [PATCH] =?UTF-8?q?feat(launcher):=20=E5=A2=9E=E5=8A=A0=E8=AF=A6?= =?UTF-8?q?=E7=BB=86=E7=9A=84=E5=8E=8B=E7=BC=A9=E4=BB=BB=E5=8A=A1=E6=8F=90?= =?UTF-8?q?=E4=BA=A4=E8=80=97=E6=97=B6=E6=8C=87=E6=A0=87?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../service/common/Constants.java | 11 +++ .../compaction/service/CompactionService.java | 91 ++++++++++++++++--- 2 files changed, 91 insertions(+), 11 deletions(-) diff --git a/service-common/src/main/java/com/lanyuanxiaoyao/service/common/Constants.java b/service-common/src/main/java/com/lanyuanxiaoyao/service/common/Constants.java index 4849988..4d86b34 100644 --- a/service-common/src/main/java/com/lanyuanxiaoyao/service/common/Constants.java +++ b/service-common/src/main/java/com/lanyuanxiaoyao/service/common/Constants.java @@ -89,7 +89,10 @@ public interface Constants { String METRICS_PREFIX = "service"; String METRICS_YARN_PREFIX = METRICS_PREFIX + "_yarn"; + String METRICS_YARN_JOB = METRICS_YARN_PREFIX + "_job"; + String METRICS_YARN_JOB_SUBMIT_TIME = METRICS_YARN_JOB + "_submit_time_ms"; + String METRICS_YARN_TABLE = METRICS_YARN_PREFIX + "_table"; String METRICS_SYNC_PREFIX = METRICS_PREFIX + "_sync"; @@ -104,6 +107,14 @@ public interface Constants { String METRICS_SYNC_SOURCE_CHANGE_PARTITION = METRICS_SYNC_PREFIX + "_source_change_partition"; String METRICS_SYNC_SOURCE_BACK_LOGS = METRICS_SYNC_PREFIX + "_source_back_logs"; + String METRICS_COMPACTION_PREFIX = METRICS_PREFIX + "_compaction"; + String METRICS_COMPACTION_SUBMIT = METRICS_COMPACTION_PREFIX + "_submit"; + String METRICS_COMPACTION_SUBMIT_COST_MS = METRICS_COMPACTION_SUBMIT + "_cost_ms"; + String METRICS_COMPACTION_SUBMIT_GET_TABLE_INFO_COST_MS = METRICS_COMPACTION_SUBMIT + "_get_table_info_cost_ms"; + String METRICS_COMPACTION_SUBMIT_GET_HUDI_TABLE_EXISTS_COST_MS = METRICS_COMPACTION_SUBMIT + "_get_hudi_table_exists_cost_ms"; + String METRICS_COMPACTION_SUBMIT_GET_COMPACT_INSTANTS_COST_MS = METRICS_COMPACTION_SUBMIT + "_get_compact_instants_cost_ms"; + String METRICS_COMPACTION_SUBMIT_YARN_JOB_SUBMIT_COST_MS = METRICS_COMPACTION_SUBMIT + "_yarn_job_submit_cost_ms"; + String METRICS_QUEUE_PREFIX = METRICS_PREFIX + "_queue"; String METRICS_QUEUE_SIZE = METRICS_QUEUE_PREFIX + "_size"; diff --git a/service-launcher/src/main/java/com/lanyuanxiaoyao/service/launcher/compaction/service/CompactionService.java b/service-launcher/src/main/java/com/lanyuanxiaoyao/service/launcher/compaction/service/CompactionService.java index ced75ea..d0dc519 100644 --- a/service-launcher/src/main/java/com/lanyuanxiaoyao/service/launcher/compaction/service/CompactionService.java +++ b/service-launcher/src/main/java/com/lanyuanxiaoyao/service/launcher/compaction/service/CompactionService.java @@ -23,9 +23,13 @@ import com.lanyuanxiaoyao.service.launcher.configuration.HadoopConfiguration; import com.lanyuanxiaoyao.service.launcher.configuration.ZookeeperConfiguration; import dev.failsafe.Failsafe; import dev.failsafe.RetryPolicy; +import io.micrometer.core.instrument.MeterRegistry; +import io.micrometer.core.instrument.Tag; import java.time.Duration; +import java.time.Instant; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; import javax.annotation.PreDestroy; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.CuratorFrameworkFactory; @@ -67,9 +71,15 @@ public class CompactionService { private final HudiService hudiService; private final ExecutorService executorService; private final ObjectMapper mapper; + // 关键指标 + private final AtomicLong compactionJobSubmitCost; + private final AtomicLong compactionJobGetTableInfoCost; + private final AtomicLong compactionJobGetHudiTableExistsCost; + private final AtomicLong compactionJobGetCompactInstantsCost; + private final AtomicLong compactionJobYarnJobSubmitCost; @SuppressWarnings("SpringJavaInjectionPointsAutowiringInspection") - public CompactionService(HadoopConfiguration hadoopConfiguration, ClusterConfiguration clusterConfiguration, ZookeeperConfiguration zookeeperConfiguration, DiscoveryClient discoveryClient, InfoService infoService, HudiService hudiService, ExecutorService executorService, Jackson2ObjectMapperBuilder builder) { + public CompactionService(HadoopConfiguration hadoopConfiguration, ClusterConfiguration clusterConfiguration, ZookeeperConfiguration zookeeperConfiguration, DiscoveryClient discoveryClient, InfoService infoService, HudiService hudiService, ExecutorService executorService, Jackson2ObjectMapperBuilder builder, MeterRegistry registry) { this.hadoopConfiguration = hadoopConfiguration; this.clusterConfiguration = clusterConfiguration; this.discoveryClient = discoveryClient; @@ -84,6 +94,42 @@ public class CompactionService { .connectionTimeoutMs((int) (30 * Constants.SECOND)) .build(); this.zookeeperClient.start(); + + this.compactionJobSubmitCost = registry.gauge( + Constants.METRICS_COMPACTION_SUBMIT_COST_MS, + Lists.immutable.of( + Tag.of(Constants.METRICS_LABEL_CLUSTER, clusterConfiguration.getCluster()) + ), + new AtomicLong(0) + ); + this.compactionJobGetTableInfoCost = registry.gauge( + Constants.METRICS_COMPACTION_SUBMIT_GET_TABLE_INFO_COST_MS, + Lists.immutable.of( + Tag.of(Constants.METRICS_LABEL_CLUSTER, clusterConfiguration.getCluster()) + ), + new AtomicLong(0) + ); + this.compactionJobGetHudiTableExistsCost = registry.gauge( + Constants.METRICS_COMPACTION_SUBMIT_GET_HUDI_TABLE_EXISTS_COST_MS, + Lists.immutable.of( + Tag.of(Constants.METRICS_LABEL_CLUSTER, clusterConfiguration.getCluster()) + ), + new AtomicLong(0) + ); + this.compactionJobGetCompactInstantsCost = registry.gauge( + Constants.METRICS_COMPACTION_SUBMIT_GET_COMPACT_INSTANTS_COST_MS, + Lists.immutable.of( + Tag.of(Constants.METRICS_LABEL_CLUSTER, clusterConfiguration.getCluster()) + ), + new AtomicLong(0) + ); + this.compactionJobYarnJobSubmitCost = registry.gauge( + Constants.METRICS_COMPACTION_SUBMIT_YARN_JOB_SUBMIT_COST_MS, + Lists.immutable.of( + Tag.of(Constants.METRICS_LABEL_CLUSTER, clusterConfiguration.getCluster()) + ), + new AtomicLong(0) + ); } @PreDestroy @@ -159,6 +205,7 @@ public class CompactionService { } public void compact(String batch, Long flinkJobId, String alias) throws Exception { + long submitStartTime = Instant.now().toEpochMilli(); // 构造任务相关的锁 String lockPath = NameHelper.compactionLauncherLockPath(flinkJobId, alias); InterProcessLock lock = new InterProcessMutex(zookeeperClient, lockPath); @@ -169,20 +216,33 @@ public class CompactionService { logger.info("Job {} {} is running", flinkJobId, alias); throw new JobCannotRunningException(); } + + long getTableInfoStartTime = Instant.now().toEpochMilli(); FlinkJob flinkJob = infoService.flinkJobDetail(flinkJobId); TableMeta meta = infoService.tableMetaDetail(flinkJobId, alias); + compactionJobGetTableInfoCost.set(Instant.now().toEpochMilli() - getTableInfoStartTime); + if (TableMetaHelper.existsTag(meta, Constants.TAGS_NO_COMPACT)) { logger.warn("[{}] [{}] Table tags no compact", flinkJob.getId(), meta.getAlias()); return; } logger.info("[{}] [{}] Execute job", flinkJob.getId(), meta.getAlias()); + // 判断是否存在 Hudi 表,提前结束掉 - if (!hudiService.existsHudiTable(flinkJob.getId(), meta.getAlias())) { + long getHudiTableExistsStartTime = Instant.now().toEpochMilli(); + boolean existsHudiTable = hudiService.existsHudiTable(flinkJob.getId(), meta.getAlias()); + compactionJobGetHudiTableExistsCost.set(Instant.now().toEpochMilli() - getHudiTableExistsStartTime); + + if (!existsHudiTable) { logger.info("[{}] [{}] Hudi table not found", flinkJob.getId(), meta.getAlias()); return; } + // 获取待压缩的时间点 + long getCompactInstantsStartTime = Instant.now().toEpochMilli(); ImmutableList selectedInstants = hudiService.timelinePendingCompactionList(flinkJob.getId(), meta.getAlias()); + compactionJobGetCompactInstantsCost.set(Instant.now().toEpochMilli() - getCompactInstantsStartTime); + if (ObjectUtil.isEmpty(selectedInstants)) { logger.info("[{}] [{}] Table not need to compact", flinkJob.getId(), meta.getAlias()); return; @@ -198,15 +258,24 @@ public class CompactionService { } logger.info("[{}] [{}] Execution", flinkJob.getId(), meta.getAlias()); String applicationId = Failsafe.with(RETRY_POLICY) - .get(() -> executorService.runCompaction( - batch, - flinkJob, - meta, - hadoopConfiguration.getKerberosKeytabPath(), - hadoopConfiguration.getKerberosPrincipal(), - selectedInstants.collect(HudiInstant::getTimestamp).makeString(","), - clusterConfiguration.getCluster() - ).toString()); + .get(() -> { + long yarnJobSubmitStartTime = Instant.now().toEpochMilli(); + String id = executorService.runCompaction( + batch, + flinkJob, + meta, + hadoopConfiguration.getKerberosKeytabPath(), + hadoopConfiguration.getKerberosPrincipal(), + selectedInstants.collect(HudiInstant::getTimestamp).makeString(","), + clusterConfiguration.getCluster() + ).toString(); + compactionJobYarnJobSubmitCost.set(Instant.now().toEpochMilli() - yarnJobSubmitStartTime); + return id; + }); + + // 记录任务提交耗时 + compactionJobSubmitCost.set(Instant.now().toEpochMilli() - submitStartTime); + Failsafe.with(RETRY_POLICY) .run(() -> infoService.saveCompactionId(flinkJob.getId(), meta.getAlias(), applicationId)); } else {