From 6167a94fcf283bf16ad81f160dea1163ee8b9713 Mon Sep 17 00:00:00 2001 From: v-zhangjc9 Date: Mon, 29 Apr 2024 15:31:44 +0800 Subject: [PATCH] =?UTF-8?q?feat(api):=20=E5=A2=9E=E5=8A=A0=E5=8E=8B?= =?UTF-8?q?=E7=BC=A9=E5=8F=82=E6=95=B0=E8=AE=B0=E5=BD=95?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../service/api/controller/ApiController.java | 6 +- .../service/api/service/SyncStateService.java | 87 ++++++++++++++----- .../service/common/SQLConstants.java | 16 ---- 3 files changed, 67 insertions(+), 42 deletions(-) diff --git a/service-api/src/main/java/com/lanyuanxiaoyao/service/api/controller/ApiController.java b/service-api/src/main/java/com/lanyuanxiaoyao/service/api/controller/ApiController.java index cc10ebc..ce50302 100644 --- a/service-api/src/main/java/com/lanyuanxiaoyao/service/api/controller/ApiController.java +++ b/service-api/src/main/java/com/lanyuanxiaoyao/service/api/controller/ApiController.java @@ -104,7 +104,8 @@ public class ApiController { "pre", instant, cluster, - applicationId + applicationId, + metadata ); } @@ -125,7 +126,8 @@ public class ApiController { "complete", instant, cluster, - applicationId + applicationId, + metadata ); } diff --git a/service-api/src/main/java/com/lanyuanxiaoyao/service/api/service/SyncStateService.java b/service-api/src/main/java/com/lanyuanxiaoyao/service/api/service/SyncStateService.java index 9840661..14c5cad 100644 --- a/service-api/src/main/java/com/lanyuanxiaoyao/service/api/service/SyncStateService.java +++ b/service-api/src/main/java/com/lanyuanxiaoyao/service/api/service/SyncStateService.java @@ -3,6 +3,7 @@ package com.lanyuanxiaoyao.service.api.service; import club.kingon.sql.builder.SqlBuilder; import cn.hutool.core.date.DateUtil; import cn.hutool.core.util.ObjectUtil; +import com.lanyuanxiaoyao.service.api.entity.HoodieCommitMetadata; import com.lanyuanxiaoyao.service.common.Constants; import java.time.Instant; import java.util.Date; @@ -183,30 +184,68 @@ public class SyncStateService { ); } - public void saveCompactionMetrics(Long flinkJobId, String alias, String type, String instant, String cluster, String applicationId) { - jdbcTemplate.update( - SqlBuilder - .insertInto( - TbAppHudiCompactionMetrics._origin_, - TbAppHudiCompactionMetrics.FLINK_JOB_ID_O, - TbAppHudiCompactionMetrics.ALIAS_O, - TbAppHudiCompactionMetrics.TYPE_O, - TbAppHudiCompactionMetrics.COMPACTION_PLAN_INSTANT_O, - TbAppHudiCompactionMetrics.CLUSTER_O, - TbAppHudiCompactionMetrics.APPLICATION_ID_O, - TbAppHudiCompactionMetrics.UPDATE_TIME_O - ) - .values() - .addValue(null, null, null, null, null, null, null) - .precompileSql(), - flinkJobId, - alias, - type, - Date.from(Instant.ofEpochMilli(DateUtil.parse(instant).getTime())), - cluster, - applicationId, - now() - ); + public void saveCompactionMetrics(Long flinkJobId, String alias, String type, String instant, String cluster, String applicationId, HoodieCommitMetadata metadata) { + if (ObjectUtil.isNull(metadata)) { + jdbcTemplate.update( + SqlBuilder + .insertInto( + TbAppHudiCompactionMetrics._origin_, + TbAppHudiCompactionMetrics.FLINK_JOB_ID_O, + TbAppHudiCompactionMetrics.ALIAS_O, + TbAppHudiCompactionMetrics.TYPE_O, + TbAppHudiCompactionMetrics.COMPACTION_PLAN_INSTANT_O, + TbAppHudiCompactionMetrics.CLUSTER_O, + TbAppHudiCompactionMetrics.APPLICATION_ID_O, + TbAppHudiCompactionMetrics.UPDATE_TIME_O + ) + .values() + .addValue(null, null, null, null, null, null, null) + .precompileSql(), + flinkJobId, + alias, + type, + Date.from(Instant.ofEpochMilli(DateUtil.parse(instant).getTime())), + cluster, + applicationId, + now() + ); + } else { + jdbcTemplate.update( + SqlBuilder + .insertInto( + TbAppHudiCompactionMetrics._origin_, + TbAppHudiCompactionMetrics.FLINK_JOB_ID_O, + TbAppHudiCompactionMetrics.ALIAS_O, + TbAppHudiCompactionMetrics.TYPE_O, + TbAppHudiCompactionMetrics.COMPACTION_PLAN_INSTANT_O, + TbAppHudiCompactionMetrics.CLUSTER_O, + TbAppHudiCompactionMetrics.APPLICATION_ID_O, + TbAppHudiCompactionMetrics.TOTAL_SCAN_TIME_O, + TbAppHudiCompactionMetrics.TOTAL_LOG_FILES_COMPACTED_O, + TbAppHudiCompactionMetrics.TOTAL_LOG_FILES_SIZE_O, + TbAppHudiCompactionMetrics.TOTAL_RECORDS_DELETED_O, + TbAppHudiCompactionMetrics.TOTAL_RECORDS_UPDATED_O, + TbAppHudiCompactionMetrics.TOTAL_RECORDS_COMPACTED_O, + TbAppHudiCompactionMetrics.UPDATE_TIME_O + ) + .values() + .addValue(null, null, null, null, null, null, null, null, null, null, null, null, null) + .precompileSql(), + flinkJobId, + alias, + type, + Date.from(Instant.ofEpochMilli(DateUtil.parse(instant).getTime())), + cluster, + applicationId, + metadata.getTotalScanTime(), + metadata.getTotalLogFilesCompacted(), + metadata.getTotalLogFilesSize(), + metadata.getTotalRecordsDeleted(), + metadata.getTotalCompactedRecordsUpdated(), + metadata.getTotalLogRecordsCompacted(), + now() + ); + } } public void saveCompactionLatestOperationTime(Long flinkJobId, String alias, Long latestOperationTime) { diff --git a/service-common/src/main/java/com/lanyuanxiaoyao/service/common/SQLConstants.java b/service-common/src/main/java/com/lanyuanxiaoyao/service/common/SQLConstants.java index 35340bd..9afdaf5 100644 --- a/service-common/src/main/java/com/lanyuanxiaoyao/service/common/SQLConstants.java +++ b/service-common/src/main/java/com/lanyuanxiaoyao/service/common/SQLConstants.java @@ -618,22 +618,6 @@ public interface SQLConstants { * 字段 type 别名值 tahcm.type 类型 */ String TYPE_A = _alias_.getAlias() + "." + TYPE_O; - /** - * 字段 source_schema 原始值 source_schema 库名 - */ - String SOURCE_SCHEMA_O = "source_schema"; - /** - * 字段 source_schema 别名值 tahcm.source_schema 库名 - */ - String SOURCE_SCHEMA_A = _alias_.getAlias() + "." + SOURCE_SCHEMA_O; - /** - * 字段 source_table 原始值 source_table 表名 - */ - String SOURCE_TABLE_O = "source_table"; - /** - * 字段 source_table 别名值 tahcm.source_table 表名 - */ - String SOURCE_TABLE_A = _alias_.getAlias() + "." + SOURCE_TABLE_O; /** * 字段 compaction_plan_instant 原始值 compaction_plan_instant 压缩计划时间点 */