feat(api): 增加压缩参数记录

This commit is contained in:
v-zhangjc9
2024-04-29 15:31:44 +08:00
parent b30ce6d675
commit 6167a94fcf
3 changed files with 67 additions and 42 deletions

View File

@@ -104,7 +104,8 @@ public class ApiController {
"pre", "pre",
instant, instant,
cluster, cluster,
applicationId applicationId,
metadata
); );
} }
@@ -125,7 +126,8 @@ public class ApiController {
"complete", "complete",
instant, instant,
cluster, cluster,
applicationId applicationId,
metadata
); );
} }

View File

@@ -3,6 +3,7 @@ package com.lanyuanxiaoyao.service.api.service;
import club.kingon.sql.builder.SqlBuilder; import club.kingon.sql.builder.SqlBuilder;
import cn.hutool.core.date.DateUtil; import cn.hutool.core.date.DateUtil;
import cn.hutool.core.util.ObjectUtil; import cn.hutool.core.util.ObjectUtil;
import com.lanyuanxiaoyao.service.api.entity.HoodieCommitMetadata;
import com.lanyuanxiaoyao.service.common.Constants; import com.lanyuanxiaoyao.service.common.Constants;
import java.time.Instant; import java.time.Instant;
import java.util.Date; import java.util.Date;
@@ -183,30 +184,68 @@ public class SyncStateService {
); );
} }
public void saveCompactionMetrics(Long flinkJobId, String alias, String type, String instant, String cluster, String applicationId) { public void saveCompactionMetrics(Long flinkJobId, String alias, String type, String instant, String cluster, String applicationId, HoodieCommitMetadata metadata) {
jdbcTemplate.update( if (ObjectUtil.isNull(metadata)) {
SqlBuilder jdbcTemplate.update(
.insertInto( SqlBuilder
TbAppHudiCompactionMetrics._origin_, .insertInto(
TbAppHudiCompactionMetrics.FLINK_JOB_ID_O, TbAppHudiCompactionMetrics._origin_,
TbAppHudiCompactionMetrics.ALIAS_O, TbAppHudiCompactionMetrics.FLINK_JOB_ID_O,
TbAppHudiCompactionMetrics.TYPE_O, TbAppHudiCompactionMetrics.ALIAS_O,
TbAppHudiCompactionMetrics.COMPACTION_PLAN_INSTANT_O, TbAppHudiCompactionMetrics.TYPE_O,
TbAppHudiCompactionMetrics.CLUSTER_O, TbAppHudiCompactionMetrics.COMPACTION_PLAN_INSTANT_O,
TbAppHudiCompactionMetrics.APPLICATION_ID_O, TbAppHudiCompactionMetrics.CLUSTER_O,
TbAppHudiCompactionMetrics.UPDATE_TIME_O TbAppHudiCompactionMetrics.APPLICATION_ID_O,
) TbAppHudiCompactionMetrics.UPDATE_TIME_O
.values() )
.addValue(null, null, null, null, null, null, null) .values()
.precompileSql(), .addValue(null, null, null, null, null, null, null)
flinkJobId, .precompileSql(),
alias, flinkJobId,
type, alias,
Date.from(Instant.ofEpochMilli(DateUtil.parse(instant).getTime())), type,
cluster, Date.from(Instant.ofEpochMilli(DateUtil.parse(instant).getTime())),
applicationId, cluster,
now() applicationId,
); now()
);
} else {
jdbcTemplate.update(
SqlBuilder
.insertInto(
TbAppHudiCompactionMetrics._origin_,
TbAppHudiCompactionMetrics.FLINK_JOB_ID_O,
TbAppHudiCompactionMetrics.ALIAS_O,
TbAppHudiCompactionMetrics.TYPE_O,
TbAppHudiCompactionMetrics.COMPACTION_PLAN_INSTANT_O,
TbAppHudiCompactionMetrics.CLUSTER_O,
TbAppHudiCompactionMetrics.APPLICATION_ID_O,
TbAppHudiCompactionMetrics.TOTAL_SCAN_TIME_O,
TbAppHudiCompactionMetrics.TOTAL_LOG_FILES_COMPACTED_O,
TbAppHudiCompactionMetrics.TOTAL_LOG_FILES_SIZE_O,
TbAppHudiCompactionMetrics.TOTAL_RECORDS_DELETED_O,
TbAppHudiCompactionMetrics.TOTAL_RECORDS_UPDATED_O,
TbAppHudiCompactionMetrics.TOTAL_RECORDS_COMPACTED_O,
TbAppHudiCompactionMetrics.UPDATE_TIME_O
)
.values()
.addValue(null, null, null, null, null, null, null, null, null, null, null, null, null)
.precompileSql(),
flinkJobId,
alias,
type,
Date.from(Instant.ofEpochMilli(DateUtil.parse(instant).getTime())),
cluster,
applicationId,
metadata.getTotalScanTime(),
metadata.getTotalLogFilesCompacted(),
metadata.getTotalLogFilesSize(),
metadata.getTotalRecordsDeleted(),
metadata.getTotalCompactedRecordsUpdated(),
metadata.getTotalLogRecordsCompacted(),
now()
);
}
} }
public void saveCompactionLatestOperationTime(Long flinkJobId, String alias, Long latestOperationTime) { public void saveCompactionLatestOperationTime(Long flinkJobId, String alias, Long latestOperationTime) {

View File

@@ -618,22 +618,6 @@ public interface SQLConstants {
* 字段 type 别名值 tahcm.type 类型 * 字段 type 别名值 tahcm.type 类型
*/ */
String TYPE_A = _alias_.getAlias() + "." + TYPE_O; String TYPE_A = _alias_.getAlias() + "." + TYPE_O;
/**
* 字段 source_schema 原始值 source_schema 库名
*/
String SOURCE_SCHEMA_O = "source_schema";
/**
* 字段 source_schema 别名值 tahcm.source_schema 库名
*/
String SOURCE_SCHEMA_A = _alias_.getAlias() + "." + SOURCE_SCHEMA_O;
/**
* 字段 source_table 原始值 source_table 表名
*/
String SOURCE_TABLE_O = "source_table";
/**
* 字段 source_table 别名值 tahcm.source_table 表名
*/
String SOURCE_TABLE_A = _alias_.getAlias() + "." + SOURCE_TABLE_O;
/** /**
* 字段 compaction_plan_instant 原始值 compaction_plan_instant 压缩计划时间点 * 字段 compaction_plan_instant 原始值 compaction_plan_instant 压缩计划时间点
*/ */