feat(api): 增加原始压缩报告的保存
This commit is contained in:
@@ -14,6 +14,7 @@ CREATE TABLE `tb_app_hudi_compaction_metrics`
|
|||||||
`total_records_deleted` bigint(20) DEFAULT NULL COMMENT '删除记录数',
|
`total_records_deleted` bigint(20) DEFAULT NULL COMMENT '删除记录数',
|
||||||
`total_records_updated` bigint(20) DEFAULT NULL COMMENT '更新记录数',
|
`total_records_updated` bigint(20) DEFAULT NULL COMMENT '更新记录数',
|
||||||
`total_records_compacted` bigint(20) DEFAULT NULL COMMENT '压缩记录数',
|
`total_records_compacted` bigint(20) DEFAULT NULL COMMENT '压缩记录数',
|
||||||
|
`metadata` mediumtext DEFAULT NULL COMMENT '压缩报告',
|
||||||
`update_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '记录更新时间',
|
`update_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '记录更新时间',
|
||||||
KEY `tb_app_hudi_compaction_metrics_join_index` (`flink_job_id`, `alias`, `application_id`, `compaction_plan_instant`)
|
KEY `tb_app_hudi_compaction_metrics_join_index` (`flink_job_id`, `alias`, `application_id`, `compaction_plan_instant`)
|
||||||
) DEFAULT CHARSET = utf8mb4 COMMENT ='压缩计划执行具体指标';
|
) DEFAULT CHARSET = utf8mb4 COMMENT ='压缩计划执行具体指标';
|
||||||
|
|||||||
@@ -1,16 +1,18 @@
|
|||||||
package com.lanyuanxiaoyao.service.api.controller;
|
package com.lanyuanxiaoyao.service.api.controller;
|
||||||
|
|
||||||
import cn.hutool.core.util.StrUtil;
|
import cn.hutool.core.util.StrUtil;
|
||||||
|
import com.fasterxml.jackson.core.JsonProcessingException;
|
||||||
|
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||||
import com.github.loki4j.slf4j.marker.LabelMarker;
|
import com.github.loki4j.slf4j.marker.LabelMarker;
|
||||||
import com.lanyuanxiaoyao.service.api.entity.HoodieCommitMetadata;
|
import com.lanyuanxiaoyao.service.api.entity.HoodieCommitMetadata;
|
||||||
import com.lanyuanxiaoyao.service.api.service.SyncStateService;
|
import com.lanyuanxiaoyao.service.api.service.SyncStateService;
|
||||||
import com.lanyuanxiaoyao.service.api.service.VersionUpdateService;
|
import com.lanyuanxiaoyao.service.api.service.VersionUpdateService;
|
||||||
import com.lanyuanxiaoyao.service.common.Constants;
|
import com.lanyuanxiaoyao.service.common.Constants;
|
||||||
import javax.annotation.Resource;
|
|
||||||
import org.eclipse.collections.api.factory.Maps;
|
import org.eclipse.collections.api.factory.Maps;
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
import org.slf4j.Marker;
|
import org.slf4j.Marker;
|
||||||
|
import org.springframework.http.converter.json.Jackson2ObjectMapperBuilder;
|
||||||
import org.springframework.web.bind.annotation.GetMapping;
|
import org.springframework.web.bind.annotation.GetMapping;
|
||||||
import org.springframework.web.bind.annotation.PostMapping;
|
import org.springframework.web.bind.annotation.PostMapping;
|
||||||
import org.springframework.web.bind.annotation.RequestBody;
|
import org.springframework.web.bind.annotation.RequestBody;
|
||||||
@@ -30,10 +32,15 @@ import org.springframework.web.bind.annotation.RestController;
|
|||||||
public class ApiController {
|
public class ApiController {
|
||||||
private static final Logger logger = LoggerFactory.getLogger(ApiController.class);
|
private static final Logger logger = LoggerFactory.getLogger(ApiController.class);
|
||||||
|
|
||||||
@Resource
|
private final SyncStateService syncStateService;
|
||||||
private SyncStateService syncStateService;
|
private final VersionUpdateService versionUpdateService;
|
||||||
@Resource
|
private final ObjectMapper mapper;
|
||||||
private VersionUpdateService versionUpdateService;
|
|
||||||
|
public ApiController(SyncStateService syncStateService, VersionUpdateService versionUpdateService, Jackson2ObjectMapperBuilder builder) {
|
||||||
|
this.syncStateService = syncStateService;
|
||||||
|
this.versionUpdateService = versionUpdateService;
|
||||||
|
this.mapper = builder.build();
|
||||||
|
}
|
||||||
|
|
||||||
private Marker makeMarker(Long flinkJobId, String alias) {
|
private Marker makeMarker(Long flinkJobId, String alias) {
|
||||||
return LabelMarker.of(() -> Maps.mutable.of(Constants.LOG_FLINK_JOB_ID, flinkJobId.toString(), Constants.LOG_ALIAS, alias));
|
return LabelMarker.of(() -> Maps.mutable.of(Constants.LOG_FLINK_JOB_ID, flinkJobId.toString(), Constants.LOG_ALIAS, alias));
|
||||||
@@ -121,9 +128,10 @@ public class ApiController {
|
|||||||
@RequestParam("instant") String instant,
|
@RequestParam("instant") String instant,
|
||||||
@RequestParam("cluster") String cluster,
|
@RequestParam("cluster") String cluster,
|
||||||
@RequestParam("application_id") String applicationId,
|
@RequestParam("application_id") String applicationId,
|
||||||
@RequestBody HoodieCommitMetadata metadata
|
@RequestBody String metadataText
|
||||||
) {
|
) throws JsonProcessingException {
|
||||||
logger.info(makeMarker(flinkJobId, alias), "Compaction pre commit: {} {} {} {} {}", flinkJobId, alias, instant, cluster, applicationId);
|
logger.info(makeMarker(flinkJobId, alias), "Compaction pre commit: {} {} {} {} {}", flinkJobId, alias, instant, cluster, applicationId);
|
||||||
|
HoodieCommitMetadata metadata = mapper.readValue(metadataText, HoodieCommitMetadata.class);
|
||||||
syncStateService.compactionCommit(
|
syncStateService.compactionCommit(
|
||||||
flinkJobId,
|
flinkJobId,
|
||||||
alias,
|
alias,
|
||||||
@@ -131,7 +139,8 @@ public class ApiController {
|
|||||||
instant,
|
instant,
|
||||||
cluster,
|
cluster,
|
||||||
applicationId,
|
applicationId,
|
||||||
metadata
|
metadata,
|
||||||
|
metadataText
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -142,9 +151,10 @@ public class ApiController {
|
|||||||
@RequestParam("instant") String instant,
|
@RequestParam("instant") String instant,
|
||||||
@RequestParam("cluster") String cluster,
|
@RequestParam("cluster") String cluster,
|
||||||
@RequestParam("application_id") String applicationId,
|
@RequestParam("application_id") String applicationId,
|
||||||
@RequestBody HoodieCommitMetadata metadata
|
@RequestBody String metadataText
|
||||||
) {
|
) throws JsonProcessingException {
|
||||||
logger.info(makeMarker(flinkJobId, alias), "Compaction commit: {} {} {} {} {}", flinkJobId, alias, instant, cluster, applicationId);
|
logger.info(makeMarker(flinkJobId, alias), "Compaction commit: {} {} {} {} {}", flinkJobId, alias, instant, cluster, applicationId);
|
||||||
|
HoodieCommitMetadata metadata = mapper.readValue(metadataText, HoodieCommitMetadata.class);
|
||||||
syncStateService.compactionFinish(flinkJobId, alias);
|
syncStateService.compactionFinish(flinkJobId, alias);
|
||||||
syncStateService.compactionCommit(
|
syncStateService.compactionCommit(
|
||||||
flinkJobId,
|
flinkJobId,
|
||||||
@@ -153,7 +163,8 @@ public class ApiController {
|
|||||||
instant,
|
instant,
|
||||||
cluster,
|
cluster,
|
||||||
applicationId,
|
applicationId,
|
||||||
metadata
|
metadata,
|
||||||
|
metadataText
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -252,7 +252,7 @@ public class SyncStateService {
|
|||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
public void compactionCommit(Long flinkJobId, String alias, String type, String instant, String cluster, String applicationId, HoodieCommitMetadata metadata) {
|
public void compactionCommit(Long flinkJobId, String alias, String type, String instant, String cluster, String applicationId, HoodieCommitMetadata metadata, String metadataText) {
|
||||||
if (ObjectUtil.isNull(metadata)) {
|
if (ObjectUtil.isNull(metadata)) {
|
||||||
jdbcTemplate.update(
|
jdbcTemplate.update(
|
||||||
SqlBuilder
|
SqlBuilder
|
||||||
@@ -264,10 +264,11 @@ public class SyncStateService {
|
|||||||
TbAppHudiCompactionMetrics.COMPACTION_PLAN_INSTANT_O,
|
TbAppHudiCompactionMetrics.COMPACTION_PLAN_INSTANT_O,
|
||||||
TbAppHudiCompactionMetrics.CLUSTER_O,
|
TbAppHudiCompactionMetrics.CLUSTER_O,
|
||||||
TbAppHudiCompactionMetrics.APPLICATION_ID_O,
|
TbAppHudiCompactionMetrics.APPLICATION_ID_O,
|
||||||
TbAppHudiCompactionMetrics.UPDATE_TIME_O
|
TbAppHudiCompactionMetrics.UPDATE_TIME_O,
|
||||||
|
TbAppHudiCompactionMetrics.METADATA_O
|
||||||
)
|
)
|
||||||
.values()
|
.values()
|
||||||
.addValue(null, null, null, null, null, null, null)
|
.addValue(null, null, null, null, null, null, null, null)
|
||||||
.precompileSql(),
|
.precompileSql(),
|
||||||
flinkJobId,
|
flinkJobId,
|
||||||
alias,
|
alias,
|
||||||
@@ -275,7 +276,8 @@ public class SyncStateService {
|
|||||||
Date.from(Instant.ofEpochMilli(DateUtil.parse(instant).getTime())),
|
Date.from(Instant.ofEpochMilli(DateUtil.parse(instant).getTime())),
|
||||||
cluster,
|
cluster,
|
||||||
applicationId,
|
applicationId,
|
||||||
now()
|
now(),
|
||||||
|
metadataText
|
||||||
);
|
);
|
||||||
} else {
|
} else {
|
||||||
jdbcTemplate.update(
|
jdbcTemplate.update(
|
||||||
@@ -294,10 +296,11 @@ public class SyncStateService {
|
|||||||
TbAppHudiCompactionMetrics.TOTAL_RECORDS_DELETED_O,
|
TbAppHudiCompactionMetrics.TOTAL_RECORDS_DELETED_O,
|
||||||
TbAppHudiCompactionMetrics.TOTAL_RECORDS_UPDATED_O,
|
TbAppHudiCompactionMetrics.TOTAL_RECORDS_UPDATED_O,
|
||||||
TbAppHudiCompactionMetrics.TOTAL_RECORDS_COMPACTED_O,
|
TbAppHudiCompactionMetrics.TOTAL_RECORDS_COMPACTED_O,
|
||||||
TbAppHudiCompactionMetrics.UPDATE_TIME_O
|
TbAppHudiCompactionMetrics.UPDATE_TIME_O,
|
||||||
|
TbAppHudiCompactionMetrics.METADATA_O
|
||||||
)
|
)
|
||||||
.values()
|
.values()
|
||||||
.addValue(null, null, null, null, null, null, null, null, null, null, null, null, null)
|
.addValue(null, null, null, null, null, null, null, null, null, null, null, null, null, null)
|
||||||
.precompileSql(),
|
.precompileSql(),
|
||||||
flinkJobId,
|
flinkJobId,
|
||||||
alias,
|
alias,
|
||||||
@@ -311,7 +314,8 @@ public class SyncStateService {
|
|||||||
metadata.getTotalRecordsDeleted(),
|
metadata.getTotalRecordsDeleted(),
|
||||||
metadata.getTotalCompactedRecordsUpdated(),
|
metadata.getTotalCompactedRecordsUpdated(),
|
||||||
metadata.getTotalLogRecordsCompacted(),
|
metadata.getTotalLogRecordsCompacted(),
|
||||||
now()
|
now(),
|
||||||
|
metadataText
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -690,6 +690,14 @@ public interface SQLConstants {
|
|||||||
* 字段 total_records_compacted 别名值 tahcm.total_records_compacted 压缩记录数
|
* 字段 total_records_compacted 别名值 tahcm.total_records_compacted 压缩记录数
|
||||||
*/
|
*/
|
||||||
String TOTAL_RECORDS_COMPACTED_A = _alias_.getAlias() + "." + TOTAL_RECORDS_COMPACTED_O;
|
String TOTAL_RECORDS_COMPACTED_A = _alias_.getAlias() + "." + TOTAL_RECORDS_COMPACTED_O;
|
||||||
|
/**
|
||||||
|
* 字段 metadata 原始值 metadata 压缩报告元数据
|
||||||
|
*/
|
||||||
|
String METADATA_O = "metadata";
|
||||||
|
/**
|
||||||
|
* 字段 metadata 别名值 tahcm.metadata 压缩报告元数据
|
||||||
|
*/
|
||||||
|
String METADATA_A = _alias_.getAlias() + "." + METADATA_O;
|
||||||
/**
|
/**
|
||||||
* 字段 update_time 原始值 update_time 记录更新时间
|
* 字段 update_time 原始值 update_time 记录更新时间
|
||||||
*/
|
*/
|
||||||
|
|||||||
Reference in New Issue
Block a user