fix(info-query): 修复查询跨天表第二页查询错误的问题

This commit is contained in:
2023-06-09 14:32:28 +08:00
parent 4024c83c38
commit 6864cf9287
2 changed files with 101 additions and 24 deletions

View File

@@ -2,8 +2,10 @@ package com.lanyuanxiaoyao.service.info.service;
import club.kingon.sql.builder.SelectSqlBuilder;
import club.kingon.sql.builder.SqlBuilder;
import club.kingon.sql.builder.WhereSqlBuilder;
import club.kingon.sql.builder.entry.Alias;
import club.kingon.sql.builder.entry.Column;
import cn.hutool.core.lang.Pair;
import cn.hutool.core.util.ObjectUtil;
import cn.hutool.core.util.StrUtil;
import com.eshore.odcp.hudi.connector.entity.FlinkJob;
@@ -15,8 +17,9 @@ import com.lanyuanxiaoyao.service.configuration.entity.info.JobAndMetas;
import com.lanyuanxiaoyao.service.configuration.entity.info.JobIdAndAlias;
import com.lanyuanxiaoyao.service.configuration.entity.info.VersionUpdated;
import java.util.List;
import java.util.function.Function;
import org.eclipse.collections.api.factory.Lists;
import org.eclipse.collections.api.list.ImmutableList;
import org.eclipse.collections.api.map.ImmutableMap;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.cache.annotation.CacheConfig;
@@ -225,6 +228,40 @@ public class InfoService {
private static final Alias TABLE_SYNC_STATE = Alias.of(StrUtil.format("{}.tb_app_hudi_sync_state", DATABASE_NAME), "tahss");
private static final String TABLE_SYNC_STATE_ID = column(TABLE_SYNC_STATE, "id");
private SqlBuilder generateVersionTableCriteria(
SelectSqlBuilder builder,
Integer page,
Integer count,
String version,
Long flinkJobId,
String alias,
String order,
String direction,
ImmutableList<Boolean> filterSchedules,
boolean limited,
boolean groupBy
) {
int limit = Math.max(count, 1);
int offset = limit * Math.max(page - 1, 0);
WhereSqlBuilder root = builder.from(TABLE_INFO)
.join(TABLE_VERSION)
.onEq(TABLE_INFO_FLINK_JOB_ID, Column.as(TABLE_VERSION_FLINK_JOB_ID))
.andEq(TABLE_INFO_ALIAS, Column.as(TABLE_VERSION_ALIAS))
.join(TABLE_SYNC_STATE)
.on(StrUtil.format("{} = CONCAT({}, '-', {})", TABLE_SYNC_STATE_ID, TABLE_INFO_FLINK_JOB_ID, TABLE_INFO_ALIAS))
.whereLike(ObjectUtil.isNotNull(flinkJobId), TABLE_INFO_FLINK_JOB_ID, flinkJobId)
.andLike(StrUtil.isNotBlank(alias), TABLE_INFO_ALIAS, alias)
.andEq(StrUtil.isNotBlank(version), TABLE_VERSION_VERSION, version)
.andIn(ObjectUtil.isNotEmpty(filterSchedules), TABLE_VERSION_SCHEDULED, filterSchedules);
if (groupBy) {
return root.groupBy(TABLE_VERSION_SCHEDULED);
} else {
return root
.orderBy(StrUtil.isNotBlank(order) && StrUtil.isNotBlank(direction), () -> StrUtil.format("{} {}", order, direction))
.limit(limited, offset, count);
}
}
@Cacheable(value = "version-tables", sync = true)
@Retryable(Throwable.class)
public PageResponse<VersionUpdated> findAllVersionTables(
@@ -238,30 +275,67 @@ public class InfoService {
ImmutableList<Boolean> filterSchedules
) {
return mysqlTransactionTemplate.execute(status -> {
Function<SelectSqlBuilder, SqlBuilder> criteria = builder ->
builder.from(TABLE_INFO)
.join(TABLE_VERSION)
.onEq(TABLE_INFO_FLINK_JOB_ID, Column.as(TABLE_VERSION_FLINK_JOB_ID))
.andEq(TABLE_INFO_ALIAS, Column.as(TABLE_VERSION_ALIAS))
.join(TABLE_SYNC_STATE)
.on(StrUtil.format("{} = CONCAT({}, '-', {})", TABLE_SYNC_STATE_ID, TABLE_INFO_FLINK_JOB_ID, TABLE_INFO_ALIAS))
.whereLike(ObjectUtil.isNotNull(flinkJobId), TABLE_INFO_FLINK_JOB_ID, flinkJobId)
.andLike(StrUtil.isNotBlank(alias), TABLE_INFO_ALIAS, alias)
.andEq(StrUtil.isNotBlank(version), TABLE_VERSION_VERSION, version)
.andIn(ObjectUtil.isNotEmpty(filterSchedules), TABLE_VERSION_SCHEDULED, filterSchedules)
.orderBy(StrUtil.isNotBlank(order) && StrUtil.isNotBlank(direction), () -> StrUtil.format("{} {}", order, direction))
.limit(Math.max(count, 1) * Math.max(page - 1, 0), Math.max(count, 1));
Long total = mysqlJdbcTemplate.queryForObject(criteria.apply(SqlBuilder.select(COUNT)).build(), Long.class);
Long total = mysqlJdbcTemplate.queryForObject(
generateVersionTableCriteria(
SqlBuilder.select(COUNT),
page,
count,
version,
flinkJobId,
alias,
order,
direction,
filterSchedules,
false,
false
).build(),
Long.class
);
List<Pair<Boolean, Integer>> groupMap = mysqlJdbcTemplate.query(
generateVersionTableCriteria(
SqlBuilder.select(TABLE_VERSION_SCHEDULED, COUNT),
page,
count,
version,
flinkJobId,
alias,
order,
direction,
filterSchedules,
false,
true
).build(),
(rs, row) -> new Pair<>(rs.getBoolean(1), rs.getInt(2))
);
ImmutableMap<Boolean, Integer> scheduleCount = Lists.immutable.ofAll(groupMap)
.groupBy(Pair::getKey)
.toMap()
.collectValues((key, list) -> list.getOnly().getValue())
.toImmutable();
List<VersionUpdated> list = mysqlJdbcTemplate.query(
criteria.apply(SqlBuilder.select(
TABLE_INFO_FLINK_JOB_ID,
TABLE_INFO_ALIAS,
TABLE_VERSION_VERSION,
TABLE_VERSION_SCHEDULED
)).build(),
generateVersionTableCriteria(
SqlBuilder.select(
TABLE_INFO_FLINK_JOB_ID,
TABLE_INFO_ALIAS,
TABLE_VERSION_VERSION,
TABLE_VERSION_SCHEDULED
),
page,
count,
version,
flinkJobId,
alias,
order,
direction,
filterSchedules,
true,
false
).build(),
(rs, row) -> new VersionUpdated(rs.getLong(1), rs.getString(2), rs.getString(3), rs.getBoolean(4))
);
return new PageResponse<>(list, total);
return new PageResponse<>(list, total)
.withMetadata("scheduled", scheduleCount.getOrDefault(true, 0))
.withMetadata("unScheduled", scheduleCount.getOrDefault(false, 0));
});
}
}