diff --git a/service-api/src/main/java/com/lanyuanxiaoyao/service/api/controller/ApiController.java b/service-api/src/main/java/com/lanyuanxiaoyao/service/api/controller/ApiController.java index cc10ebc..ce50302 100644 --- a/service-api/src/main/java/com/lanyuanxiaoyao/service/api/controller/ApiController.java +++ b/service-api/src/main/java/com/lanyuanxiaoyao/service/api/controller/ApiController.java @@ -104,7 +104,8 @@ public class ApiController { "pre", instant, cluster, - applicationId + applicationId, + metadata ); } @@ -125,7 +126,8 @@ public class ApiController { "complete", instant, cluster, - applicationId + applicationId, + metadata ); } diff --git a/service-api/src/main/java/com/lanyuanxiaoyao/service/api/service/SyncStateService.java b/service-api/src/main/java/com/lanyuanxiaoyao/service/api/service/SyncStateService.java index 9840661..14c5cad 100644 --- a/service-api/src/main/java/com/lanyuanxiaoyao/service/api/service/SyncStateService.java +++ b/service-api/src/main/java/com/lanyuanxiaoyao/service/api/service/SyncStateService.java @@ -3,6 +3,7 @@ package com.lanyuanxiaoyao.service.api.service; import club.kingon.sql.builder.SqlBuilder; import cn.hutool.core.date.DateUtil; import cn.hutool.core.util.ObjectUtil; +import com.lanyuanxiaoyao.service.api.entity.HoodieCommitMetadata; import com.lanyuanxiaoyao.service.common.Constants; import java.time.Instant; import java.util.Date; @@ -183,30 +184,68 @@ public class SyncStateService { ); } - public void saveCompactionMetrics(Long flinkJobId, String alias, String type, String instant, String cluster, String applicationId) { - jdbcTemplate.update( - SqlBuilder - .insertInto( - TbAppHudiCompactionMetrics._origin_, - TbAppHudiCompactionMetrics.FLINK_JOB_ID_O, - TbAppHudiCompactionMetrics.ALIAS_O, - TbAppHudiCompactionMetrics.TYPE_O, - TbAppHudiCompactionMetrics.COMPACTION_PLAN_INSTANT_O, - TbAppHudiCompactionMetrics.CLUSTER_O, - TbAppHudiCompactionMetrics.APPLICATION_ID_O, - TbAppHudiCompactionMetrics.UPDATE_TIME_O - ) - .values() - .addValue(null, null, null, null, null, null, null) - .precompileSql(), - flinkJobId, - alias, - type, - Date.from(Instant.ofEpochMilli(DateUtil.parse(instant).getTime())), - cluster, - applicationId, - now() - ); + public void saveCompactionMetrics(Long flinkJobId, String alias, String type, String instant, String cluster, String applicationId, HoodieCommitMetadata metadata) { + if (ObjectUtil.isNull(metadata)) { + jdbcTemplate.update( + SqlBuilder + .insertInto( + TbAppHudiCompactionMetrics._origin_, + TbAppHudiCompactionMetrics.FLINK_JOB_ID_O, + TbAppHudiCompactionMetrics.ALIAS_O, + TbAppHudiCompactionMetrics.TYPE_O, + TbAppHudiCompactionMetrics.COMPACTION_PLAN_INSTANT_O, + TbAppHudiCompactionMetrics.CLUSTER_O, + TbAppHudiCompactionMetrics.APPLICATION_ID_O, + TbAppHudiCompactionMetrics.UPDATE_TIME_O + ) + .values() + .addValue(null, null, null, null, null, null, null) + .precompileSql(), + flinkJobId, + alias, + type, + Date.from(Instant.ofEpochMilli(DateUtil.parse(instant).getTime())), + cluster, + applicationId, + now() + ); + } else { + jdbcTemplate.update( + SqlBuilder + .insertInto( + TbAppHudiCompactionMetrics._origin_, + TbAppHudiCompactionMetrics.FLINK_JOB_ID_O, + TbAppHudiCompactionMetrics.ALIAS_O, + TbAppHudiCompactionMetrics.TYPE_O, + TbAppHudiCompactionMetrics.COMPACTION_PLAN_INSTANT_O, + TbAppHudiCompactionMetrics.CLUSTER_O, + TbAppHudiCompactionMetrics.APPLICATION_ID_O, + TbAppHudiCompactionMetrics.TOTAL_SCAN_TIME_O, + TbAppHudiCompactionMetrics.TOTAL_LOG_FILES_COMPACTED_O, + TbAppHudiCompactionMetrics.TOTAL_LOG_FILES_SIZE_O, + TbAppHudiCompactionMetrics.TOTAL_RECORDS_DELETED_O, + TbAppHudiCompactionMetrics.TOTAL_RECORDS_UPDATED_O, + TbAppHudiCompactionMetrics.TOTAL_RECORDS_COMPACTED_O, + TbAppHudiCompactionMetrics.UPDATE_TIME_O + ) + .values() + .addValue(null, null, null, null, null, null, null, null, null, null, null, null, null) + .precompileSql(), + flinkJobId, + alias, + type, + Date.from(Instant.ofEpochMilli(DateUtil.parse(instant).getTime())), + cluster, + applicationId, + metadata.getTotalScanTime(), + metadata.getTotalLogFilesCompacted(), + metadata.getTotalLogFilesSize(), + metadata.getTotalRecordsDeleted(), + metadata.getTotalCompactedRecordsUpdated(), + metadata.getTotalLogRecordsCompacted(), + now() + ); + } } public void saveCompactionLatestOperationTime(Long flinkJobId, String alias, Long latestOperationTime) { diff --git a/service-common/src/main/java/com/lanyuanxiaoyao/service/common/SQLConstants.java b/service-common/src/main/java/com/lanyuanxiaoyao/service/common/SQLConstants.java index 35340bd..9afdaf5 100644 --- a/service-common/src/main/java/com/lanyuanxiaoyao/service/common/SQLConstants.java +++ b/service-common/src/main/java/com/lanyuanxiaoyao/service/common/SQLConstants.java @@ -618,22 +618,6 @@ public interface SQLConstants { * 字段 type 别名值 tahcm.type 类型 */ String TYPE_A = _alias_.getAlias() + "." + TYPE_O; - /** - * 字段 source_schema 原始值 source_schema 库名 - */ - String SOURCE_SCHEMA_O = "source_schema"; - /** - * 字段 source_schema 别名值 tahcm.source_schema 库名 - */ - String SOURCE_SCHEMA_A = _alias_.getAlias() + "." + SOURCE_SCHEMA_O; - /** - * 字段 source_table 原始值 source_table 表名 - */ - String SOURCE_TABLE_O = "source_table"; - /** - * 字段 source_table 别名值 tahcm.source_table 表名 - */ - String SOURCE_TABLE_A = _alias_.getAlias() + "." + SOURCE_TABLE_O; /** * 字段 compaction_plan_instant 原始值 compaction_plan_instant 压缩计划时间点 */