From 6864cf92870af8ca2dcb3862bb5a42474ce7a057 Mon Sep 17 00:00:00 2001 From: lanyuanxiaoyao Date: Fri, 9 Jun 2023 14:32:28 +0800 Subject: [PATCH] =?UTF-8?q?fix(info-query):=20=E4=BF=AE=E5=A4=8D=E6=9F=A5?= =?UTF-8?q?=E8=AF=A2=E8=B7=A8=E5=A4=A9=E8=A1=A8=E7=AC=AC=E4=BA=8C=E9=A1=B5?= =?UTF-8?q?=E6=9F=A5=E8=AF=A2=E9=94=99=E8=AF=AF=E7=9A=84=E9=97=AE=E9=A2=98?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../service/info/service/InfoService.java | 118 ++++++++++++++---- .../web/controller/VersionController.java | 7 +- 2 files changed, 101 insertions(+), 24 deletions(-) diff --git a/service-info-query/src/main/java/com/lanyuanxiaoyao/service/info/service/InfoService.java b/service-info-query/src/main/java/com/lanyuanxiaoyao/service/info/service/InfoService.java index 2411065..8cb8d81 100644 --- a/service-info-query/src/main/java/com/lanyuanxiaoyao/service/info/service/InfoService.java +++ b/service-info-query/src/main/java/com/lanyuanxiaoyao/service/info/service/InfoService.java @@ -2,8 +2,10 @@ package com.lanyuanxiaoyao.service.info.service; import club.kingon.sql.builder.SelectSqlBuilder; import club.kingon.sql.builder.SqlBuilder; +import club.kingon.sql.builder.WhereSqlBuilder; import club.kingon.sql.builder.entry.Alias; import club.kingon.sql.builder.entry.Column; +import cn.hutool.core.lang.Pair; import cn.hutool.core.util.ObjectUtil; import cn.hutool.core.util.StrUtil; import com.eshore.odcp.hudi.connector.entity.FlinkJob; @@ -15,8 +17,9 @@ import com.lanyuanxiaoyao.service.configuration.entity.info.JobAndMetas; import com.lanyuanxiaoyao.service.configuration.entity.info.JobIdAndAlias; import com.lanyuanxiaoyao.service.configuration.entity.info.VersionUpdated; import java.util.List; -import java.util.function.Function; +import org.eclipse.collections.api.factory.Lists; import org.eclipse.collections.api.list.ImmutableList; +import org.eclipse.collections.api.map.ImmutableMap; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.cache.annotation.CacheConfig; @@ -225,6 +228,40 @@ public class InfoService { private static final Alias TABLE_SYNC_STATE = Alias.of(StrUtil.format("{}.tb_app_hudi_sync_state", DATABASE_NAME), "tahss"); private static final String TABLE_SYNC_STATE_ID = column(TABLE_SYNC_STATE, "id"); + private SqlBuilder generateVersionTableCriteria( + SelectSqlBuilder builder, + Integer page, + Integer count, + String version, + Long flinkJobId, + String alias, + String order, + String direction, + ImmutableList filterSchedules, + boolean limited, + boolean groupBy + ) { + int limit = Math.max(count, 1); + int offset = limit * Math.max(page - 1, 0); + WhereSqlBuilder root = builder.from(TABLE_INFO) + .join(TABLE_VERSION) + .onEq(TABLE_INFO_FLINK_JOB_ID, Column.as(TABLE_VERSION_FLINK_JOB_ID)) + .andEq(TABLE_INFO_ALIAS, Column.as(TABLE_VERSION_ALIAS)) + .join(TABLE_SYNC_STATE) + .on(StrUtil.format("{} = CONCAT({}, '-', {})", TABLE_SYNC_STATE_ID, TABLE_INFO_FLINK_JOB_ID, TABLE_INFO_ALIAS)) + .whereLike(ObjectUtil.isNotNull(flinkJobId), TABLE_INFO_FLINK_JOB_ID, flinkJobId) + .andLike(StrUtil.isNotBlank(alias), TABLE_INFO_ALIAS, alias) + .andEq(StrUtil.isNotBlank(version), TABLE_VERSION_VERSION, version) + .andIn(ObjectUtil.isNotEmpty(filterSchedules), TABLE_VERSION_SCHEDULED, filterSchedules); + if (groupBy) { + return root.groupBy(TABLE_VERSION_SCHEDULED); + } else { + return root + .orderBy(StrUtil.isNotBlank(order) && StrUtil.isNotBlank(direction), () -> StrUtil.format("{} {}", order, direction)) + .limit(limited, offset, count); + } + } + @Cacheable(value = "version-tables", sync = true) @Retryable(Throwable.class) public PageResponse findAllVersionTables( @@ -238,30 +275,67 @@ public class InfoService { ImmutableList filterSchedules ) { return mysqlTransactionTemplate.execute(status -> { - Function criteria = builder -> - builder.from(TABLE_INFO) - .join(TABLE_VERSION) - .onEq(TABLE_INFO_FLINK_JOB_ID, Column.as(TABLE_VERSION_FLINK_JOB_ID)) - .andEq(TABLE_INFO_ALIAS, Column.as(TABLE_VERSION_ALIAS)) - .join(TABLE_SYNC_STATE) - .on(StrUtil.format("{} = CONCAT({}, '-', {})", TABLE_SYNC_STATE_ID, TABLE_INFO_FLINK_JOB_ID, TABLE_INFO_ALIAS)) - .whereLike(ObjectUtil.isNotNull(flinkJobId), TABLE_INFO_FLINK_JOB_ID, flinkJobId) - .andLike(StrUtil.isNotBlank(alias), TABLE_INFO_ALIAS, alias) - .andEq(StrUtil.isNotBlank(version), TABLE_VERSION_VERSION, version) - .andIn(ObjectUtil.isNotEmpty(filterSchedules), TABLE_VERSION_SCHEDULED, filterSchedules) - .orderBy(StrUtil.isNotBlank(order) && StrUtil.isNotBlank(direction), () -> StrUtil.format("{} {}", order, direction)) - .limit(Math.max(count, 1) * Math.max(page - 1, 0), Math.max(count, 1)); - Long total = mysqlJdbcTemplate.queryForObject(criteria.apply(SqlBuilder.select(COUNT)).build(), Long.class); + Long total = mysqlJdbcTemplate.queryForObject( + generateVersionTableCriteria( + SqlBuilder.select(COUNT), + page, + count, + version, + flinkJobId, + alias, + order, + direction, + filterSchedules, + false, + false + ).build(), + Long.class + ); + List> groupMap = mysqlJdbcTemplate.query( + generateVersionTableCriteria( + SqlBuilder.select(TABLE_VERSION_SCHEDULED, COUNT), + page, + count, + version, + flinkJobId, + alias, + order, + direction, + filterSchedules, + false, + true + ).build(), + (rs, row) -> new Pair<>(rs.getBoolean(1), rs.getInt(2)) + ); + ImmutableMap scheduleCount = Lists.immutable.ofAll(groupMap) + .groupBy(Pair::getKey) + .toMap() + .collectValues((key, list) -> list.getOnly().getValue()) + .toImmutable(); List list = mysqlJdbcTemplate.query( - criteria.apply(SqlBuilder.select( - TABLE_INFO_FLINK_JOB_ID, - TABLE_INFO_ALIAS, - TABLE_VERSION_VERSION, - TABLE_VERSION_SCHEDULED - )).build(), + generateVersionTableCriteria( + SqlBuilder.select( + TABLE_INFO_FLINK_JOB_ID, + TABLE_INFO_ALIAS, + TABLE_VERSION_VERSION, + TABLE_VERSION_SCHEDULED + ), + page, + count, + version, + flinkJobId, + alias, + order, + direction, + filterSchedules, + true, + false + ).build(), (rs, row) -> new VersionUpdated(rs.getLong(1), rs.getString(2), rs.getString(3), rs.getBoolean(4)) ); - return new PageResponse<>(list, total); + return new PageResponse<>(list, total) + .withMetadata("scheduled", scheduleCount.getOrDefault(true, 0)) + .withMetadata("unScheduled", scheduleCount.getOrDefault(false, 0)); }); } } diff --git a/service-web/src/main/java/com/lanyuanxiaoyao/service/web/controller/VersionController.java b/service-web/src/main/java/com/lanyuanxiaoyao/service/web/controller/VersionController.java index c0fdee9..0c5f447 100644 --- a/service-web/src/main/java/com/lanyuanxiaoyao/service/web/controller/VersionController.java +++ b/service-web/src/main/java/com/lanyuanxiaoyao/service/web/controller/VersionController.java @@ -60,8 +60,9 @@ public class VersionController extends BaseController { queryMap.put("filter_schedules", filterSchedules); } PageResponse pageResponse = infoService.versionTables(queryMap); + ImmutableList source = Lists.immutable.ofAll(pageResponse.getData()); Long total = pageResponse.getTotal(); - ImmutableList vos = Lists.immutable.ofAll(pageResponse.getData()) + ImmutableList vos = source .asParallel(ExecutorProvider.EXECUTORS, 1) .collect(item -> { CompletableFuture flinkJobFuture = CompletableFuture.supplyAsync(() -> infoService.flinkJobDetail(item.getFlinkJobId()), ExecutorProvider.EXECUTORS); @@ -84,6 +85,8 @@ public class VersionController extends BaseController { .reject(ObjectUtil::isNull) .toList() .toImmutable(); - return responseCrudData(vos, total); + return responseCrudData(vos, total) + .withData("scheduled", pageResponse.getMetadata().get("scheduled")) + .withData("unScheduled", pageResponse.getMetadata().get("unScheduled")); } }