feat(launcher): 增加详细的压缩任务提交耗时指标
This commit is contained in:
@@ -89,7 +89,10 @@ public interface Constants {
|
||||
String METRICS_PREFIX = "service";
|
||||
|
||||
String METRICS_YARN_PREFIX = METRICS_PREFIX + "_yarn";
|
||||
|
||||
String METRICS_YARN_JOB = METRICS_YARN_PREFIX + "_job";
|
||||
String METRICS_YARN_JOB_SUBMIT_TIME = METRICS_YARN_JOB + "_submit_time_ms";
|
||||
|
||||
String METRICS_YARN_TABLE = METRICS_YARN_PREFIX + "_table";
|
||||
|
||||
String METRICS_SYNC_PREFIX = METRICS_PREFIX + "_sync";
|
||||
@@ -104,6 +107,14 @@ public interface Constants {
|
||||
String METRICS_SYNC_SOURCE_CHANGE_PARTITION = METRICS_SYNC_PREFIX + "_source_change_partition";
|
||||
String METRICS_SYNC_SOURCE_BACK_LOGS = METRICS_SYNC_PREFIX + "_source_back_logs";
|
||||
|
||||
String METRICS_COMPACTION_PREFIX = METRICS_PREFIX + "_compaction";
|
||||
String METRICS_COMPACTION_SUBMIT = METRICS_COMPACTION_PREFIX + "_submit";
|
||||
String METRICS_COMPACTION_SUBMIT_COST_MS = METRICS_COMPACTION_SUBMIT + "_cost_ms";
|
||||
String METRICS_COMPACTION_SUBMIT_GET_TABLE_INFO_COST_MS = METRICS_COMPACTION_SUBMIT + "_get_table_info_cost_ms";
|
||||
String METRICS_COMPACTION_SUBMIT_GET_HUDI_TABLE_EXISTS_COST_MS = METRICS_COMPACTION_SUBMIT + "_get_hudi_table_exists_cost_ms";
|
||||
String METRICS_COMPACTION_SUBMIT_GET_COMPACT_INSTANTS_COST_MS = METRICS_COMPACTION_SUBMIT + "_get_compact_instants_cost_ms";
|
||||
String METRICS_COMPACTION_SUBMIT_YARN_JOB_SUBMIT_COST_MS = METRICS_COMPACTION_SUBMIT + "_yarn_job_submit_cost_ms";
|
||||
|
||||
String METRICS_QUEUE_PREFIX = METRICS_PREFIX + "_queue";
|
||||
String METRICS_QUEUE_SIZE = METRICS_QUEUE_PREFIX + "_size";
|
||||
|
||||
|
||||
@@ -23,9 +23,13 @@ import com.lanyuanxiaoyao.service.launcher.configuration.HadoopConfiguration;
|
||||
import com.lanyuanxiaoyao.service.launcher.configuration.ZookeeperConfiguration;
|
||||
import dev.failsafe.Failsafe;
|
||||
import dev.failsafe.RetryPolicy;
|
||||
import io.micrometer.core.instrument.MeterRegistry;
|
||||
import io.micrometer.core.instrument.Tag;
|
||||
import java.time.Duration;
|
||||
import java.time.Instant;
|
||||
import java.util.concurrent.Executors;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.atomic.AtomicLong;
|
||||
import javax.annotation.PreDestroy;
|
||||
import org.apache.curator.framework.CuratorFramework;
|
||||
import org.apache.curator.framework.CuratorFrameworkFactory;
|
||||
@@ -67,9 +71,15 @@ public class CompactionService {
|
||||
private final HudiService hudiService;
|
||||
private final ExecutorService executorService;
|
||||
private final ObjectMapper mapper;
|
||||
// 关键指标
|
||||
private final AtomicLong compactionJobSubmitCost;
|
||||
private final AtomicLong compactionJobGetTableInfoCost;
|
||||
private final AtomicLong compactionJobGetHudiTableExistsCost;
|
||||
private final AtomicLong compactionJobGetCompactInstantsCost;
|
||||
private final AtomicLong compactionJobYarnJobSubmitCost;
|
||||
|
||||
@SuppressWarnings("SpringJavaInjectionPointsAutowiringInspection")
|
||||
public CompactionService(HadoopConfiguration hadoopConfiguration, ClusterConfiguration clusterConfiguration, ZookeeperConfiguration zookeeperConfiguration, DiscoveryClient discoveryClient, InfoService infoService, HudiService hudiService, ExecutorService executorService, Jackson2ObjectMapperBuilder builder) {
|
||||
public CompactionService(HadoopConfiguration hadoopConfiguration, ClusterConfiguration clusterConfiguration, ZookeeperConfiguration zookeeperConfiguration, DiscoveryClient discoveryClient, InfoService infoService, HudiService hudiService, ExecutorService executorService, Jackson2ObjectMapperBuilder builder, MeterRegistry registry) {
|
||||
this.hadoopConfiguration = hadoopConfiguration;
|
||||
this.clusterConfiguration = clusterConfiguration;
|
||||
this.discoveryClient = discoveryClient;
|
||||
@@ -84,6 +94,42 @@ public class CompactionService {
|
||||
.connectionTimeoutMs((int) (30 * Constants.SECOND))
|
||||
.build();
|
||||
this.zookeeperClient.start();
|
||||
|
||||
this.compactionJobSubmitCost = registry.gauge(
|
||||
Constants.METRICS_COMPACTION_SUBMIT_COST_MS,
|
||||
Lists.immutable.of(
|
||||
Tag.of(Constants.METRICS_LABEL_CLUSTER, clusterConfiguration.getCluster())
|
||||
),
|
||||
new AtomicLong(0)
|
||||
);
|
||||
this.compactionJobGetTableInfoCost = registry.gauge(
|
||||
Constants.METRICS_COMPACTION_SUBMIT_GET_TABLE_INFO_COST_MS,
|
||||
Lists.immutable.of(
|
||||
Tag.of(Constants.METRICS_LABEL_CLUSTER, clusterConfiguration.getCluster())
|
||||
),
|
||||
new AtomicLong(0)
|
||||
);
|
||||
this.compactionJobGetHudiTableExistsCost = registry.gauge(
|
||||
Constants.METRICS_COMPACTION_SUBMIT_GET_HUDI_TABLE_EXISTS_COST_MS,
|
||||
Lists.immutable.of(
|
||||
Tag.of(Constants.METRICS_LABEL_CLUSTER, clusterConfiguration.getCluster())
|
||||
),
|
||||
new AtomicLong(0)
|
||||
);
|
||||
this.compactionJobGetCompactInstantsCost = registry.gauge(
|
||||
Constants.METRICS_COMPACTION_SUBMIT_GET_COMPACT_INSTANTS_COST_MS,
|
||||
Lists.immutable.of(
|
||||
Tag.of(Constants.METRICS_LABEL_CLUSTER, clusterConfiguration.getCluster())
|
||||
),
|
||||
new AtomicLong(0)
|
||||
);
|
||||
this.compactionJobYarnJobSubmitCost = registry.gauge(
|
||||
Constants.METRICS_COMPACTION_SUBMIT_YARN_JOB_SUBMIT_COST_MS,
|
||||
Lists.immutable.of(
|
||||
Tag.of(Constants.METRICS_LABEL_CLUSTER, clusterConfiguration.getCluster())
|
||||
),
|
||||
new AtomicLong(0)
|
||||
);
|
||||
}
|
||||
|
||||
@PreDestroy
|
||||
@@ -159,6 +205,7 @@ public class CompactionService {
|
||||
}
|
||||
|
||||
public void compact(String batch, Long flinkJobId, String alias) throws Exception {
|
||||
long submitStartTime = Instant.now().toEpochMilli();
|
||||
// 构造任务相关的锁
|
||||
String lockPath = NameHelper.compactionLauncherLockPath(flinkJobId, alias);
|
||||
InterProcessLock lock = new InterProcessMutex(zookeeperClient, lockPath);
|
||||
@@ -169,20 +216,33 @@ public class CompactionService {
|
||||
logger.info("Job {} {} is running", flinkJobId, alias);
|
||||
throw new JobCannotRunningException();
|
||||
}
|
||||
|
||||
long getTableInfoStartTime = Instant.now().toEpochMilli();
|
||||
FlinkJob flinkJob = infoService.flinkJobDetail(flinkJobId);
|
||||
TableMeta meta = infoService.tableMetaDetail(flinkJobId, alias);
|
||||
compactionJobGetTableInfoCost.set(Instant.now().toEpochMilli() - getTableInfoStartTime);
|
||||
|
||||
if (TableMetaHelper.existsTag(meta, Constants.TAGS_NO_COMPACT)) {
|
||||
logger.warn("[{}] [{}] Table tags no compact", flinkJob.getId(), meta.getAlias());
|
||||
return;
|
||||
}
|
||||
logger.info("[{}] [{}] Execute job", flinkJob.getId(), meta.getAlias());
|
||||
|
||||
// 判断是否存在 Hudi 表,提前结束掉
|
||||
if (!hudiService.existsHudiTable(flinkJob.getId(), meta.getAlias())) {
|
||||
long getHudiTableExistsStartTime = Instant.now().toEpochMilli();
|
||||
boolean existsHudiTable = hudiService.existsHudiTable(flinkJob.getId(), meta.getAlias());
|
||||
compactionJobGetHudiTableExistsCost.set(Instant.now().toEpochMilli() - getHudiTableExistsStartTime);
|
||||
|
||||
if (!existsHudiTable) {
|
||||
logger.info("[{}] [{}] Hudi table not found", flinkJob.getId(), meta.getAlias());
|
||||
return;
|
||||
}
|
||||
|
||||
// 获取待压缩的时间点
|
||||
long getCompactInstantsStartTime = Instant.now().toEpochMilli();
|
||||
ImmutableList<HudiInstant> selectedInstants = hudiService.timelinePendingCompactionList(flinkJob.getId(), meta.getAlias());
|
||||
compactionJobGetCompactInstantsCost.set(Instant.now().toEpochMilli() - getCompactInstantsStartTime);
|
||||
|
||||
if (ObjectUtil.isEmpty(selectedInstants)) {
|
||||
logger.info("[{}] [{}] Table not need to compact", flinkJob.getId(), meta.getAlias());
|
||||
return;
|
||||
@@ -198,15 +258,24 @@ public class CompactionService {
|
||||
}
|
||||
logger.info("[{}] [{}] Execution", flinkJob.getId(), meta.getAlias());
|
||||
String applicationId = Failsafe.with(RETRY_POLICY)
|
||||
.get(() -> executorService.runCompaction(
|
||||
batch,
|
||||
flinkJob,
|
||||
meta,
|
||||
hadoopConfiguration.getKerberosKeytabPath(),
|
||||
hadoopConfiguration.getKerberosPrincipal(),
|
||||
selectedInstants.collect(HudiInstant::getTimestamp).makeString(","),
|
||||
clusterConfiguration.getCluster()
|
||||
).toString());
|
||||
.get(() -> {
|
||||
long yarnJobSubmitStartTime = Instant.now().toEpochMilli();
|
||||
String id = executorService.runCompaction(
|
||||
batch,
|
||||
flinkJob,
|
||||
meta,
|
||||
hadoopConfiguration.getKerberosKeytabPath(),
|
||||
hadoopConfiguration.getKerberosPrincipal(),
|
||||
selectedInstants.collect(HudiInstant::getTimestamp).makeString(","),
|
||||
clusterConfiguration.getCluster()
|
||||
).toString();
|
||||
compactionJobYarnJobSubmitCost.set(Instant.now().toEpochMilli() - yarnJobSubmitStartTime);
|
||||
return id;
|
||||
});
|
||||
|
||||
// 记录任务提交耗时
|
||||
compactionJobSubmitCost.set(Instant.now().toEpochMilli() - submitStartTime);
|
||||
|
||||
Failsafe.with(RETRY_POLICY)
|
||||
.run(() -> infoService.saveCompactionId(flinkJob.getId(), meta.getAlias(), applicationId));
|
||||
} else {
|
||||
|
||||
Reference in New Issue
Block a user