From 28981c37952756f934ec92ccc39ad60ceb57db40 Mon Sep 17 00:00:00 2001 From: lanyuanxiaoyao Date: Wed, 7 Jun 2023 19:26:03 +0800 Subject: [PATCH] =?UTF-8?q?feature(info-query):=20=E5=A2=9E=E5=8A=A0?= =?UTF-8?q?=E8=B7=A8=E5=A4=A9=E7=89=88=E6=9C=AC=E8=A1=A8=E6=9F=A5=E8=AF=A2?= =?UTF-8?q?=E6=8E=A5=E5=8F=A3?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../entity/info/VersionUpdated.java | 52 +++++++++++++++ .../service/forest/service/InfoService.java | 4 ++ service-info-query/pom.xml | 5 ++ .../info/controller/InfoController.java | 24 +++++++ .../service/info/service/InfoService.java | 66 +++++++++++++++++++ .../src/test/java/SqlBuilderTests.java | 36 ++++++++++ 6 files changed, 187 insertions(+) create mode 100644 service-configuration/src/main/java/com/lanyuanxiaoyao/service/configuration/entity/info/VersionUpdated.java create mode 100644 service-info-query/src/test/java/SqlBuilderTests.java diff --git a/service-configuration/src/main/java/com/lanyuanxiaoyao/service/configuration/entity/info/VersionUpdated.java b/service-configuration/src/main/java/com/lanyuanxiaoyao/service/configuration/entity/info/VersionUpdated.java new file mode 100644 index 0000000..6381477 --- /dev/null +++ b/service-configuration/src/main/java/com/lanyuanxiaoyao/service/configuration/entity/info/VersionUpdated.java @@ -0,0 +1,52 @@ +package com.lanyuanxiaoyao.service.configuration.entity.info; + +import java.time.LocalDate; + +/** + * 表跨天信息 + * + * @author lanyuanxiaoyao + * @date 2023-06-06 + */ +public class VersionUpdated { + private Long flinkJobId; + private String alias; + private String version; + private Boolean updated; + + public VersionUpdated() { + } + + public VersionUpdated(Long flinkJobId, String alias, String version, Boolean updated) { + this.flinkJobId = flinkJobId; + this.alias = alias; + this.version = version; + this.updated = updated; + } + + public Long getFlinkJobId() { + return flinkJobId; + } + + public String getAlias() { + return alias; + } + + public String getVersion() { + return version; + } + + public Boolean getUpdated() { + return updated; + } + + @Override + public String toString() { + return "VersionUpdated{" + + "flinkJobId=" + flinkJobId + + ", alias='" + alias + '\'' + + ", version='" + version + '\'' + + ", updated=" + updated + + '}'; + } +} diff --git a/service-forest/src/main/java/com/lanyuanxiaoyao/service/forest/service/InfoService.java b/service-forest/src/main/java/com/lanyuanxiaoyao/service/forest/service/InfoService.java index 56dc073..ea1d039 100644 --- a/service-forest/src/main/java/com/lanyuanxiaoyao/service/forest/service/InfoService.java +++ b/service-forest/src/main/java/com/lanyuanxiaoyao/service/forest/service/InfoService.java @@ -9,6 +9,7 @@ import com.eshore.odcp.hudi.connector.entity.TableMeta; import com.lanyuanxiaoyao.service.configuration.entity.PageResponse; import com.lanyuanxiaoyao.service.configuration.entity.info.JobAndMetas; import com.lanyuanxiaoyao.service.configuration.entity.info.JobIdAndAlias; +import com.lanyuanxiaoyao.service.configuration.entity.info.VersionUpdated; import java.util.Map; import org.eclipse.collections.api.list.ImmutableList; @@ -23,6 +24,9 @@ public interface InfoService { @Get("/job_id_alias") PageResponse jobIdAndAlias(@Query Map queryMap); + @Get("/version_tables") + PageResponse versionTables(@Query Map queryMap); + @Get("/job_metas") ImmutableList jobAndMetas(); diff --git a/service-info-query/pom.xml b/service-info-query/pom.xml index 9e8ddbd..a83c333 100644 --- a/service-info-query/pom.xml +++ b/service-info-query/pom.xml @@ -32,6 +32,11 @@ druid-spring-boot-starter 1.2.17 + + io.github.dragons96 + sql-builder + 0.0.5.3 + diff --git a/service-info-query/src/main/java/com/lanyuanxiaoyao/service/info/controller/InfoController.java b/service-info-query/src/main/java/com/lanyuanxiaoyao/service/info/controller/InfoController.java index 281afc4..b50642c 100644 --- a/service-info-query/src/main/java/com/lanyuanxiaoyao/service/info/controller/InfoController.java +++ b/service-info-query/src/main/java/com/lanyuanxiaoyao/service/info/controller/InfoController.java @@ -7,6 +7,7 @@ import com.eshore.odcp.hudi.connector.entity.TableMeta; import com.lanyuanxiaoyao.service.configuration.entity.PageResponse; import com.lanyuanxiaoyao.service.configuration.entity.info.JobAndMetas; import com.lanyuanxiaoyao.service.configuration.entity.info.JobIdAndAlias; +import com.lanyuanxiaoyao.service.configuration.entity.info.VersionUpdated; import com.lanyuanxiaoyao.service.info.service.InfoService; import java.util.List; import org.eclipse.collections.api.factory.Lists; @@ -58,6 +59,29 @@ public class InfoController { ); } + @GetMapping("/version_tables") + public PageResponse versionTables( + @RequestParam(value = "page", defaultValue = "1") Integer page, + @RequestParam(value = "count", defaultValue = "10") Integer count, + @RequestParam(value = "version", required = false) String version, + @RequestParam(value = "flink_job_id", required = false) Long flinkJobId, + @RequestParam(value = "alias", required = false) String alias, + @RequestParam(value = "order", required = false) String order, + @RequestParam(value = "direction", required = false) String direction, + @RequestParam(value = "filter_schedules", required = false) List filterSchedules + ) { + return infoService.findAllVersionTables( + page, + count, + version, + flinkJobId, + alias, + order, + direction, + Lists.immutable.ofAll(filterSchedules) + ); + } + @GetMapping("/job_metas") public ImmutableList jobAndMetas() { return infoService.jobAndMetas(); diff --git a/service-info-query/src/main/java/com/lanyuanxiaoyao/service/info/service/InfoService.java b/service-info-query/src/main/java/com/lanyuanxiaoyao/service/info/service/InfoService.java index 9e8207e..2411065 100644 --- a/service-info-query/src/main/java/com/lanyuanxiaoyao/service/info/service/InfoService.java +++ b/service-info-query/src/main/java/com/lanyuanxiaoyao/service/info/service/InfoService.java @@ -1,5 +1,9 @@ package com.lanyuanxiaoyao.service.info.service; +import club.kingon.sql.builder.SelectSqlBuilder; +import club.kingon.sql.builder.SqlBuilder; +import club.kingon.sql.builder.entry.Alias; +import club.kingon.sql.builder.entry.Column; import cn.hutool.core.util.ObjectUtil; import cn.hutool.core.util.StrUtil; import com.eshore.odcp.hudi.connector.entity.FlinkJob; @@ -9,7 +13,9 @@ import com.eshore.odcp.hudi.connector.utils.database.DatabaseService; import com.lanyuanxiaoyao.service.configuration.entity.PageResponse; import com.lanyuanxiaoyao.service.configuration.entity.info.JobAndMetas; import com.lanyuanxiaoyao.service.configuration.entity.info.JobIdAndAlias; +import com.lanyuanxiaoyao.service.configuration.entity.info.VersionUpdated; import java.util.List; +import java.util.function.Function; import org.eclipse.collections.api.list.ImmutableList; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -26,6 +32,7 @@ import static com.eshore.odcp.hudi.connector.Constants.DATABASE_NAME; * @author lanyuanxiaoyao * @date 2023-04-24 */ +@SuppressWarnings("OptionalGetWithoutIsPresent") @CacheConfig(cacheManager = "normal-cache") @Service public class InfoService { @@ -198,4 +205,63 @@ public class InfoService { public ImmutableList updatedVersionTables() { return databaseService.findAllScheduledTable(); } + + private static String column(Alias table, String column) { + return StrUtil.format("{}.{}", table.getAlias(), column); + } + + private static final String COUNT = "count(*)"; + + private static final Alias TABLE_VERSION = Alias.of(StrUtil.format("{}.tb_app_collect_table_version", DATABASE_NAME), "tactv"); + private static final String TABLE_VERSION_FLINK_JOB_ID = column(TABLE_VERSION, "flink_job_id"); + private static final String TABLE_VERSION_ALIAS = column(TABLE_VERSION, "alias"); + private static final String TABLE_VERSION_VERSION = column(TABLE_VERSION, "version"); + private static final String TABLE_VERSION_SCHEDULED = column(TABLE_VERSION, "scheduled"); + + private static final Alias TABLE_INFO = Alias.of(StrUtil.format("{}.tb_app_collect_table_info", DATABASE_NAME), "tacti"); + private static final String TABLE_INFO_FLINK_JOB_ID = column(TABLE_INFO, "flink_job_id"); + private static final String TABLE_INFO_ALIAS = column(TABLE_INFO, "alias"); + + private static final Alias TABLE_SYNC_STATE = Alias.of(StrUtil.format("{}.tb_app_hudi_sync_state", DATABASE_NAME), "tahss"); + private static final String TABLE_SYNC_STATE_ID = column(TABLE_SYNC_STATE, "id"); + + @Cacheable(value = "version-tables", sync = true) + @Retryable(Throwable.class) + public PageResponse findAllVersionTables( + Integer page, + Integer count, + String version, + Long flinkJobId, + String alias, + String order, + String direction, + ImmutableList filterSchedules + ) { + return mysqlTransactionTemplate.execute(status -> { + Function criteria = builder -> + builder.from(TABLE_INFO) + .join(TABLE_VERSION) + .onEq(TABLE_INFO_FLINK_JOB_ID, Column.as(TABLE_VERSION_FLINK_JOB_ID)) + .andEq(TABLE_INFO_ALIAS, Column.as(TABLE_VERSION_ALIAS)) + .join(TABLE_SYNC_STATE) + .on(StrUtil.format("{} = CONCAT({}, '-', {})", TABLE_SYNC_STATE_ID, TABLE_INFO_FLINK_JOB_ID, TABLE_INFO_ALIAS)) + .whereLike(ObjectUtil.isNotNull(flinkJobId), TABLE_INFO_FLINK_JOB_ID, flinkJobId) + .andLike(StrUtil.isNotBlank(alias), TABLE_INFO_ALIAS, alias) + .andEq(StrUtil.isNotBlank(version), TABLE_VERSION_VERSION, version) + .andIn(ObjectUtil.isNotEmpty(filterSchedules), TABLE_VERSION_SCHEDULED, filterSchedules) + .orderBy(StrUtil.isNotBlank(order) && StrUtil.isNotBlank(direction), () -> StrUtil.format("{} {}", order, direction)) + .limit(Math.max(count, 1) * Math.max(page - 1, 0), Math.max(count, 1)); + Long total = mysqlJdbcTemplate.queryForObject(criteria.apply(SqlBuilder.select(COUNT)).build(), Long.class); + List list = mysqlJdbcTemplate.query( + criteria.apply(SqlBuilder.select( + TABLE_INFO_FLINK_JOB_ID, + TABLE_INFO_ALIAS, + TABLE_VERSION_VERSION, + TABLE_VERSION_SCHEDULED + )).build(), + (rs, row) -> new VersionUpdated(rs.getLong(1), rs.getString(2), rs.getString(3), rs.getBoolean(4)) + ); + return new PageResponse<>(list, total); + }); + } } diff --git a/service-info-query/src/test/java/SqlBuilderTests.java b/service-info-query/src/test/java/SqlBuilderTests.java new file mode 100644 index 0000000..bf45359 --- /dev/null +++ b/service-info-query/src/test/java/SqlBuilderTests.java @@ -0,0 +1,36 @@ +import club.kingon.sql.builder.SqlBuilder; +import club.kingon.sql.builder.entry.Alias; +import club.kingon.sql.builder.entry.Column; +import cn.hutool.core.util.StrUtil; +import cn.hutool.db.sql.SqlFormatter; +import org.eclipse.collections.api.factory.Lists; + +import static com.eshore.odcp.hudi.connector.Constants.DATABASE_NAME; + +/** + * @author lanyuanxiaoyao + * @date 2023-06-07 + */ +public class SqlBuilderTests { + private static final Alias TABLE_VERSION = Alias.of(StrUtil.format("{}.tb_app_collect_table_version", DATABASE_NAME), "tactv"); + private static final Alias TABLE_INFO = Alias.of(StrUtil.format("{}.tb_app_collect_table_info", DATABASE_NAME), "tacti"); + + private static String column(Alias table, String column) { + return StrUtil.format("{}.{}", table.getAlias(), column); + } + + public static void main(String[] args) { + System.out.println(SqlFormatter.format( + SqlBuilder + .select("count(*)") + .from(TABLE_INFO) + .join(TABLE_VERSION) + .onEq(column(TABLE_INFO, "flink_job_id"), Column.as(column(TABLE_VERSION, "flink_job_id"))) + .andEq(column(TABLE_INFO, "alias"), Column.as(column(TABLE_VERSION, "alias"))) + .whereEq(false, "a", "b") + .andEq("b", "c") + .andIn("d", Lists.immutable.empty()) + .build() + )); + } +}