diff --git a/database/table_tb_app_hudi_compaction_metrics.sql b/database/table_tb_app_hudi_compaction_metrics.sql index b2a338b..904a7ad 100644 --- a/database/table_tb_app_hudi_compaction_metrics.sql +++ b/database/table_tb_app_hudi_compaction_metrics.sql @@ -14,6 +14,7 @@ CREATE TABLE `tb_app_hudi_compaction_metrics` `total_records_deleted` bigint(20) DEFAULT NULL COMMENT '删除记录数', `total_records_updated` bigint(20) DEFAULT NULL COMMENT '更新记录数', `total_records_compacted` bigint(20) DEFAULT NULL COMMENT '压缩记录数', + `metadata` mediumtext DEFAULT NULL COMMENT '压缩报告', `update_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '记录更新时间', KEY `tb_app_hudi_compaction_metrics_join_index` (`flink_job_id`, `alias`, `application_id`, `compaction_plan_instant`) ) DEFAULT CHARSET = utf8mb4 COMMENT ='压缩计划执行具体指标'; diff --git a/service-api/src/main/java/com/lanyuanxiaoyao/service/api/controller/ApiController.java b/service-api/src/main/java/com/lanyuanxiaoyao/service/api/controller/ApiController.java index 96dd51c..bcf1775 100644 --- a/service-api/src/main/java/com/lanyuanxiaoyao/service/api/controller/ApiController.java +++ b/service-api/src/main/java/com/lanyuanxiaoyao/service/api/controller/ApiController.java @@ -1,16 +1,18 @@ package com.lanyuanxiaoyao.service.api.controller; import cn.hutool.core.util.StrUtil; +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; import com.github.loki4j.slf4j.marker.LabelMarker; import com.lanyuanxiaoyao.service.api.entity.HoodieCommitMetadata; import com.lanyuanxiaoyao.service.api.service.SyncStateService; import com.lanyuanxiaoyao.service.api.service.VersionUpdateService; import com.lanyuanxiaoyao.service.common.Constants; -import javax.annotation.Resource; import org.eclipse.collections.api.factory.Maps; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.slf4j.Marker; +import org.springframework.http.converter.json.Jackson2ObjectMapperBuilder; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.PostMapping; import org.springframework.web.bind.annotation.RequestBody; @@ -30,10 +32,15 @@ import org.springframework.web.bind.annotation.RestController; public class ApiController { private static final Logger logger = LoggerFactory.getLogger(ApiController.class); - @Resource - private SyncStateService syncStateService; - @Resource - private VersionUpdateService versionUpdateService; + private final SyncStateService syncStateService; + private final VersionUpdateService versionUpdateService; + private final ObjectMapper mapper; + + public ApiController(SyncStateService syncStateService, VersionUpdateService versionUpdateService, Jackson2ObjectMapperBuilder builder) { + this.syncStateService = syncStateService; + this.versionUpdateService = versionUpdateService; + this.mapper = builder.build(); + } private Marker makeMarker(Long flinkJobId, String alias) { return LabelMarker.of(() -> Maps.mutable.of(Constants.LOG_FLINK_JOB_ID, flinkJobId.toString(), Constants.LOG_ALIAS, alias)); @@ -121,9 +128,10 @@ public class ApiController { @RequestParam("instant") String instant, @RequestParam("cluster") String cluster, @RequestParam("application_id") String applicationId, - @RequestBody HoodieCommitMetadata metadata - ) { + @RequestBody String metadataText + ) throws JsonProcessingException { logger.info(makeMarker(flinkJobId, alias), "Compaction pre commit: {} {} {} {} {}", flinkJobId, alias, instant, cluster, applicationId); + HoodieCommitMetadata metadata = mapper.readValue(metadataText, HoodieCommitMetadata.class); syncStateService.compactionCommit( flinkJobId, alias, @@ -131,7 +139,8 @@ public class ApiController { instant, cluster, applicationId, - metadata + metadata, + metadataText ); } @@ -142,9 +151,10 @@ public class ApiController { @RequestParam("instant") String instant, @RequestParam("cluster") String cluster, @RequestParam("application_id") String applicationId, - @RequestBody HoodieCommitMetadata metadata - ) { + @RequestBody String metadataText + ) throws JsonProcessingException { logger.info(makeMarker(flinkJobId, alias), "Compaction commit: {} {} {} {} {}", flinkJobId, alias, instant, cluster, applicationId); + HoodieCommitMetadata metadata = mapper.readValue(metadataText, HoodieCommitMetadata.class); syncStateService.compactionFinish(flinkJobId, alias); syncStateService.compactionCommit( flinkJobId, @@ -153,7 +163,8 @@ public class ApiController { instant, cluster, applicationId, - metadata + metadata, + metadataText ); } diff --git a/service-api/src/main/java/com/lanyuanxiaoyao/service/api/service/SyncStateService.java b/service-api/src/main/java/com/lanyuanxiaoyao/service/api/service/SyncStateService.java index f8f7e17..29b398d 100644 --- a/service-api/src/main/java/com/lanyuanxiaoyao/service/api/service/SyncStateService.java +++ b/service-api/src/main/java/com/lanyuanxiaoyao/service/api/service/SyncStateService.java @@ -252,7 +252,7 @@ public class SyncStateService { ); } - public void compactionCommit(Long flinkJobId, String alias, String type, String instant, String cluster, String applicationId, HoodieCommitMetadata metadata) { + public void compactionCommit(Long flinkJobId, String alias, String type, String instant, String cluster, String applicationId, HoodieCommitMetadata metadata, String metadataText) { if (ObjectUtil.isNull(metadata)) { jdbcTemplate.update( SqlBuilder @@ -264,10 +264,11 @@ public class SyncStateService { TbAppHudiCompactionMetrics.COMPACTION_PLAN_INSTANT_O, TbAppHudiCompactionMetrics.CLUSTER_O, TbAppHudiCompactionMetrics.APPLICATION_ID_O, - TbAppHudiCompactionMetrics.UPDATE_TIME_O + TbAppHudiCompactionMetrics.UPDATE_TIME_O, + TbAppHudiCompactionMetrics.METADATA_O ) .values() - .addValue(null, null, null, null, null, null, null) + .addValue(null, null, null, null, null, null, null, null) .precompileSql(), flinkJobId, alias, @@ -275,7 +276,8 @@ public class SyncStateService { Date.from(Instant.ofEpochMilli(DateUtil.parse(instant).getTime())), cluster, applicationId, - now() + now(), + metadataText ); } else { jdbcTemplate.update( @@ -294,10 +296,11 @@ public class SyncStateService { TbAppHudiCompactionMetrics.TOTAL_RECORDS_DELETED_O, TbAppHudiCompactionMetrics.TOTAL_RECORDS_UPDATED_O, TbAppHudiCompactionMetrics.TOTAL_RECORDS_COMPACTED_O, - TbAppHudiCompactionMetrics.UPDATE_TIME_O + TbAppHudiCompactionMetrics.UPDATE_TIME_O, + TbAppHudiCompactionMetrics.METADATA_O ) .values() - .addValue(null, null, null, null, null, null, null, null, null, null, null, null, null) + .addValue(null, null, null, null, null, null, null, null, null, null, null, null, null, null) .precompileSql(), flinkJobId, alias, @@ -311,7 +314,8 @@ public class SyncStateService { metadata.getTotalRecordsDeleted(), metadata.getTotalCompactedRecordsUpdated(), metadata.getTotalLogRecordsCompacted(), - now() + now(), + metadataText ); } } diff --git a/service-common/src/main/java/com/lanyuanxiaoyao/service/common/SQLConstants.java b/service-common/src/main/java/com/lanyuanxiaoyao/service/common/SQLConstants.java index 5519486..dc5de66 100644 --- a/service-common/src/main/java/com/lanyuanxiaoyao/service/common/SQLConstants.java +++ b/service-common/src/main/java/com/lanyuanxiaoyao/service/common/SQLConstants.java @@ -690,6 +690,14 @@ public interface SQLConstants { * 字段 total_records_compacted 别名值 tahcm.total_records_compacted 压缩记录数 */ String TOTAL_RECORDS_COMPACTED_A = _alias_.getAlias() + "." + TOTAL_RECORDS_COMPACTED_O; + /** + * 字段 metadata 原始值 metadata 压缩报告元数据 + */ + String METADATA_O = "metadata"; + /** + * 字段 metadata 别名值 tahcm.metadata 压缩报告元数据 + */ + String METADATA_A = _alias_.getAlias() + "." + METADATA_O; /** * 字段 update_time 原始值 update_time 记录更新时间 */