diff --git a/service-configuration/src/main/java/com/lanyuanxiaoyao/service/configuration/entity/info/CompactionMetrics.java b/service-configuration/src/main/java/com/lanyuanxiaoyao/service/configuration/entity/info/CompactionMetrics.java new file mode 100644 index 0000000..5c16479 --- /dev/null +++ b/service-configuration/src/main/java/com/lanyuanxiaoyao/service/configuration/entity/info/CompactionMetrics.java @@ -0,0 +1,78 @@ +package com.lanyuanxiaoyao.service.configuration.entity.info; + +import java.time.LocalDateTime; + +/** + * @author lanyuanxiaoyao + * @date 2023-06-14 + */ +public class CompactionMetrics { + private Long flinkJobId; + private String alias; + private String applicationId; + private String cluster; + private String compactionPlanInstant; + private Boolean complete; + private Long startedTime; + private Long finishedTime; + + public CompactionMetrics() { + } + + public CompactionMetrics(Long flinkJobId, String alias, String applicationId, String cluster, String compactionPlanInstant, Boolean complete, Long startedTime, Long finishedTime) { + this.flinkJobId = flinkJobId; + this.alias = alias; + this.applicationId = applicationId; + this.cluster = cluster; + this.compactionPlanInstant = compactionPlanInstant; + this.complete = complete; + this.startedTime = startedTime; + this.finishedTime = finishedTime; + } + + public Long getFlinkJobId() { + return flinkJobId; + } + + public String getAlias() { + return alias; + } + + public String getApplicationId() { + return applicationId; + } + + public String getCluster() { + return cluster; + } + + public String getCompactionPlanInstant() { + return compactionPlanInstant; + } + + public Boolean getComplete() { + return complete; + } + + public Long getStartedTime() { + return startedTime; + } + + public Long getFinishedTime() { + return finishedTime; + } + + @Override + public String toString() { + return "CompactionMetrics{" + + "flinkJobId=" + flinkJobId + + ", alias='" + alias + '\'' + + ", applicationId='" + applicationId + '\'' + + ", cluster='" + cluster + '\'' + + ", compactionPlanInstant='" + compactionPlanInstant + '\'' + + ", isComplete=" + complete + + ", startedTime=" + startedTime + + ", finishedTime=" + finishedTime + + '}'; + } +} diff --git a/service-forest/src/main/java/com/lanyuanxiaoyao/service/forest/service/InfoService.java b/service-forest/src/main/java/com/lanyuanxiaoyao/service/forest/service/InfoService.java index 2574054..3d57302 100644 --- a/service-forest/src/main/java/com/lanyuanxiaoyao/service/forest/service/InfoService.java +++ b/service-forest/src/main/java/com/lanyuanxiaoyao/service/forest/service/InfoService.java @@ -7,6 +7,7 @@ import com.eshore.odcp.hudi.connector.entity.FlinkJob; import com.eshore.odcp.hudi.connector.entity.SyncState; import com.eshore.odcp.hudi.connector.entity.TableMeta; import com.lanyuanxiaoyao.service.configuration.entity.PageResponse; +import com.lanyuanxiaoyao.service.configuration.entity.info.CompactionMetrics; import com.lanyuanxiaoyao.service.configuration.entity.info.JobAndMetas; import com.lanyuanxiaoyao.service.configuration.entity.info.JobIdAndAlias; import com.lanyuanxiaoyao.service.configuration.entity.info.VersionUpdated; @@ -69,6 +70,9 @@ public interface InfoService { @Get("/version_tables") PageResponse versionTables(@Query Map queryMap); + @Get("/compaction_metrics") + PageResponse compactionMetrics(@Query Map queryMap); + @Get("/job_metas") ImmutableList jobAndMetas(); diff --git a/service-info-query/src/main/java/com/lanyuanxiaoyao/service/info/controller/InfoController.java b/service-info-query/src/main/java/com/lanyuanxiaoyao/service/info/controller/InfoController.java index fa38125..5d61806 100644 --- a/service-info-query/src/main/java/com/lanyuanxiaoyao/service/info/controller/InfoController.java +++ b/service-info-query/src/main/java/com/lanyuanxiaoyao/service/info/controller/InfoController.java @@ -5,10 +5,12 @@ import com.eshore.odcp.hudi.connector.entity.FlinkJob; import com.eshore.odcp.hudi.connector.entity.SyncState; import com.eshore.odcp.hudi.connector.entity.TableMeta; import com.lanyuanxiaoyao.service.configuration.entity.PageResponse; +import com.lanyuanxiaoyao.service.configuration.entity.info.CompactionMetrics; import com.lanyuanxiaoyao.service.configuration.entity.info.JobAndMetas; import com.lanyuanxiaoyao.service.configuration.entity.info.JobIdAndAlias; import com.lanyuanxiaoyao.service.configuration.entity.info.VersionUpdated; import com.lanyuanxiaoyao.service.info.service.InfoService; +import java.util.List; import org.eclipse.collections.api.factory.Lists; import org.eclipse.collections.api.list.ImmutableList; import org.slf4j.Logger; @@ -18,8 +20,6 @@ import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RequestParam; import org.springframework.web.bind.annotation.RestController; -import java.util.List; - /** * 信息接口 * @@ -194,4 +194,25 @@ public class InfoController { public Long hiveFocusCount() { return infoService.hiveFocusCount(); } + + @GetMapping("/compaction_metrics") + public PageResponse compactionMetrics( + @RequestParam(value = "page", defaultValue = "1") Integer page, + @RequestParam(value = "count", defaultValue = "10") Integer count, + @RequestParam(value = "flink_job_id") Long flinkJobId, + @RequestParam(value = "alias") String alias, + @RequestParam(value = "order", required = false) String order, + @RequestParam(value = "direction", required = false) String direction, + @RequestParam(value = "filter_completes", required = false) List filterCompletes + ) { + return infoService.findAllCompactionMetrics( + page, + count, + flinkJobId, + alias, + order, + direction, + ObjectUtil.isNull(filterCompletes) ? Lists.immutable.empty() : Lists.immutable.ofAll(filterCompletes) + ); + } } diff --git a/service-info-query/src/main/java/com/lanyuanxiaoyao/service/info/service/InfoService.java b/service-info-query/src/main/java/com/lanyuanxiaoyao/service/info/service/InfoService.java index d8900eb..e807889 100644 --- a/service-info-query/src/main/java/com/lanyuanxiaoyao/service/info/service/InfoService.java +++ b/service-info-query/src/main/java/com/lanyuanxiaoyao/service/info/service/InfoService.java @@ -13,9 +13,11 @@ import com.eshore.odcp.hudi.connector.entity.SyncState; import com.eshore.odcp.hudi.connector.entity.TableMeta; import com.eshore.odcp.hudi.connector.utils.database.DatabaseService; import com.lanyuanxiaoyao.service.configuration.entity.PageResponse; +import com.lanyuanxiaoyao.service.configuration.entity.info.CompactionMetrics; import com.lanyuanxiaoyao.service.configuration.entity.info.JobAndMetas; import com.lanyuanxiaoyao.service.configuration.entity.info.JobIdAndAlias; import com.lanyuanxiaoyao.service.configuration.entity.info.VersionUpdated; +import java.sql.Timestamp; import java.util.List; import org.eclipse.collections.api.factory.Lists; import org.eclipse.collections.api.list.ImmutableList; @@ -554,4 +556,129 @@ public class InfoService { Long.class ); } + + private static final Alias TABLE_COMPACTION_METRICS = Alias.of(StrUtil.format("{}.tb_app_hudi_compaction_metrics", DATABASE_NAME), "tahcm"); + private static final String TABLE_COMPACTION_METRICS_TYPE = column(TABLE_COMPACTION_METRICS, "type"); + private static final String TABLE_COMPACTION_METRICS_FLINK_JOB_ID = column(TABLE_COMPACTION_METRICS, "flink_job_id"); + private static final String TABLE_COMPACTION_METRICS_ALIAS = column(TABLE_COMPACTION_METRICS, "alias"); + + private SqlBuilder generateCompactionMetricsCriteria( + SelectSqlBuilder builder, + Integer page, + Integer count, + Long flinkJobId, + String alias, + String order, + String direction, + ImmutableList filterCompletes, + boolean limited + ) { + int limit = Math.max(count, 1); + int offset = limit * Math.max(page - 1, 0); + Alias m1 = Alias.of( + SqlBuilder.selectAll() + .from(TABLE_COMPACTION_METRICS) + .whereEq(TABLE_COMPACTION_METRICS_TYPE, "pre") + .andEq(TABLE_COMPACTION_METRICS_FLINK_JOB_ID, flinkJobId) + .andEq(TABLE_COMPACTION_METRICS_ALIAS, alias), + "m1" + ); + Alias m2 = Alias.of( + SqlBuilder.selectAll() + .from(TABLE_COMPACTION_METRICS) + .whereEq(TABLE_COMPACTION_METRICS_TYPE, "complete") + .andEq(TABLE_COMPACTION_METRICS_FLINK_JOB_ID, flinkJobId) + .andEq(TABLE_COMPACTION_METRICS_ALIAS, alias), + "m2" + ); + return builder + .from(m1) + .leftJoin(m2) + .onEq(column(m1, "flink_job_id"), Column.as(column(m2, "flink_job_id"))) + .andEq(column(m1, "alias"), Column.as(column(m2, "alias"))) + .andEq(column(m1, "application_id"), Column.as(column(m2, "application_id"))) + .andEq(column(m1, "compaction_plan_instant"), Column.as(column(m2, "compaction_plan_instant"))) + .whereNotNull(filterCompletes.anySatisfy(b -> b), column(m2, "type")) + .orNull(filterCompletes.anySatisfy(b -> !b), column(m2, "type")) + .orderBy(StrUtil.isNotBlank(order) && StrUtil.isNotBlank(direction), StrUtil.format("m1.{} {}", order, direction)) + .limit(limited, offset, count); + } + + @Cacheable(value = "compaction-metrics", sync = true) + @Retryable(Throwable.class) + public PageResponse findAllCompactionMetrics( + Integer page, + Integer count, + Long flinkJobId, + String alias, + String order, + String direction, + ImmutableList filterCompletes + ) { + return mysqlTransactionTemplate.execute(status -> { + Long total = mysqlJdbcTemplate.queryForObject( + generateCompactionMetricsCriteria( + SqlBuilder.select(COUNT), + page, + count, + flinkJobId, + alias, + order, + direction, + filterCompletes, + false + ).build(), + Long.class + ); + List list = mysqlJdbcTemplate.query( + generateCompactionMetricsCriteria( + SqlBuilder.select( + "m1.flink_job_id", + "m1.alias", + "m1.application_id", + "m1.cluster", + "m1.compaction_plan_instant", + "m2.type is not null as is_complete", + "m1.update_time as started_time", + "m2.update_time as finished_time" + ), + page, + count, + flinkJobId, + alias, + order, + direction, + filterCompletes, + true + ).build(), + (rs, row) -> { + boolean isComplete = rs.getBoolean(6); + if (ObjectUtil.isNull(isComplete)) { + isComplete = false; + } + Timestamp startedTimestamp = rs.getTimestamp(7); + long startedTime = 0; + if (ObjectUtil.isNotNull(startedTimestamp)) { + startedTime = startedTimestamp.getTime(); + } + Timestamp finishedTimestamp = rs.getTimestamp(8); + long finishedTime = 0; + if (ObjectUtil.isNotNull(finishedTimestamp)) { + finishedTime = finishedTimestamp.getTime(); + } + return new CompactionMetrics( + rs.getLong(1), + rs.getString(2), + rs.getString(3), + rs.getString(4), + rs.getString(5), + isComplete, + startedTime, + finishedTime + ); + } + ); + return new PageResponse<>(list, total); + }); + } } diff --git a/service-info-query/src/test/java/SqlBuilderTests.java b/service-info-query/src/test/java/SqlBuilderTests.java index d28fbe3..6edba40 100644 --- a/service-info-query/src/test/java/SqlBuilderTests.java +++ b/service-info-query/src/test/java/SqlBuilderTests.java @@ -1,7 +1,11 @@ +import club.kingon.sql.builder.SelectSqlBuilder; import club.kingon.sql.builder.SqlBuilder; import club.kingon.sql.builder.entry.Alias; +import club.kingon.sql.builder.entry.Column; import cn.hutool.core.util.StrUtil; import cn.hutool.db.sql.SqlFormatter; +import org.eclipse.collections.api.factory.Lists; +import org.eclipse.collections.api.list.ImmutableList; import static com.eshore.odcp.hudi.connector.Constants.DATABASE_NAME; @@ -31,13 +35,73 @@ public class SqlBuilderTests { return StrUtil.format("{}.{}", table.getAlias(), column); } + private static final Alias TABLE_COMPACTION_METRICS = Alias.of(StrUtil.format("{}.tb_app_hudi_compaction_metrics", DATABASE_NAME), "tahcm"); + private static final String TABLE_COMPACTION_METRICS_TYPE = column(TABLE_COMPACTION_METRICS, "type"); + private static final String TABLE_COMPACTION_METRICS_FLINK_JOB_ID = column(TABLE_COMPACTION_METRICS, "flink_job_id"); + private static final String TABLE_COMPACTION_METRICS_ALIAS = column(TABLE_COMPACTION_METRICS, "alias"); + + private static SqlBuilder generateCompactionMetricsCriteria( + SelectSqlBuilder builder, + Integer page, + Integer count, + Long flinkJobId, + String alias, + String order, + String direction, + ImmutableList filterCompletes + ) { + int limit = Math.max(count, 1); + int offset = limit * Math.max(page - 1, 0); + Alias m1 = Alias.of( + SqlBuilder.selectAll() + .from(TABLE_COMPACTION_METRICS) + .whereEq(TABLE_COMPACTION_METRICS_TYPE, "pre") + .andEq(TABLE_COMPACTION_METRICS_FLINK_JOB_ID, flinkJobId) + .andEq(TABLE_COMPACTION_METRICS_ALIAS, alias), + "m1" + ); + Alias m2 = Alias.of( + SqlBuilder.selectAll() + .from(TABLE_COMPACTION_METRICS) + .whereEq(TABLE_COMPACTION_METRICS_TYPE, "complete") + .andEq(TABLE_COMPACTION_METRICS_FLINK_JOB_ID, flinkJobId) + .andEq(TABLE_COMPACTION_METRICS_ALIAS, alias), + "m2" + ); + return builder + .from(m1) + .leftJoin(m2) + .onEq(column(m1, "flink_job_id"), Column.as(column(m2, "flink_job_id"))) + .andEq(column(m1, "alias"), Column.as(column(m2, "alias"))) + .andEq(column(m1, "application_id"), Column.as(column(m2, "application_id"))) + .andEq(column(m1, "compaction_plan_instant"), Column.as(column(m2, "compaction_plan_instant"))) + .whereNotNull(filterCompletes.anySatisfy(b -> b), column(m2, "type")) + .orNull(filterCompletes.anySatisfy(b -> !b), column(m2, "type")) + .orderBy(StrUtil.isNotBlank(order) && StrUtil.isNotBlank(direction), StrUtil.format("m1.{} {}", order, direction)) + .limit(offset, count); + } + public static void main(String[] args) { System.out.println(SqlFormatter.format( - SqlBuilder.select("count(distinct concat(src_schema, src_table))") - .from(TABLE_INFO) - .whereGe(TABLE_INFO_PRIORITY, 10000) - .andEq(TABLE_INFO_STATUS, "y") - .build() + generateCompactionMetricsCriteria( + SqlBuilder.select( + "m1.flink_job_id", + "m1.alias", + "m1.application_id", + "m1.cluster", + "m1.compaction_plan_instant", + "m2.type is not null as is_complete", + "m1.update_time as started_time", + "m2.update_time as finished_time" + ), + 1, + 100, + 1542097996099055616L, + "acct_acct_item_fs", + "update_time", + "asc", + Lists.immutable.of(false, true) + ).build() )); } }