feature(info-query): 增加跨天版本表查询接口
This commit is contained in:
@@ -7,6 +7,7 @@ import com.eshore.odcp.hudi.connector.entity.TableMeta;
|
||||
import com.lanyuanxiaoyao.service.configuration.entity.PageResponse;
|
||||
import com.lanyuanxiaoyao.service.configuration.entity.info.JobAndMetas;
|
||||
import com.lanyuanxiaoyao.service.configuration.entity.info.JobIdAndAlias;
|
||||
import com.lanyuanxiaoyao.service.configuration.entity.info.VersionUpdated;
|
||||
import com.lanyuanxiaoyao.service.info.service.InfoService;
|
||||
import java.util.List;
|
||||
import org.eclipse.collections.api.factory.Lists;
|
||||
@@ -58,6 +59,29 @@ public class InfoController {
|
||||
);
|
||||
}
|
||||
|
||||
@GetMapping("/version_tables")
|
||||
public PageResponse<VersionUpdated> versionTables(
|
||||
@RequestParam(value = "page", defaultValue = "1") Integer page,
|
||||
@RequestParam(value = "count", defaultValue = "10") Integer count,
|
||||
@RequestParam(value = "version", required = false) String version,
|
||||
@RequestParam(value = "flink_job_id", required = false) Long flinkJobId,
|
||||
@RequestParam(value = "alias", required = false) String alias,
|
||||
@RequestParam(value = "order", required = false) String order,
|
||||
@RequestParam(value = "direction", required = false) String direction,
|
||||
@RequestParam(value = "filter_schedules", required = false) List<Boolean> filterSchedules
|
||||
) {
|
||||
return infoService.findAllVersionTables(
|
||||
page,
|
||||
count,
|
||||
version,
|
||||
flinkJobId,
|
||||
alias,
|
||||
order,
|
||||
direction,
|
||||
Lists.immutable.ofAll(filterSchedules)
|
||||
);
|
||||
}
|
||||
|
||||
@GetMapping("/job_metas")
|
||||
public ImmutableList<JobAndMetas> jobAndMetas() {
|
||||
return infoService.jobAndMetas();
|
||||
|
||||
@@ -1,5 +1,9 @@
|
||||
package com.lanyuanxiaoyao.service.info.service;
|
||||
|
||||
import club.kingon.sql.builder.SelectSqlBuilder;
|
||||
import club.kingon.sql.builder.SqlBuilder;
|
||||
import club.kingon.sql.builder.entry.Alias;
|
||||
import club.kingon.sql.builder.entry.Column;
|
||||
import cn.hutool.core.util.ObjectUtil;
|
||||
import cn.hutool.core.util.StrUtil;
|
||||
import com.eshore.odcp.hudi.connector.entity.FlinkJob;
|
||||
@@ -9,7 +13,9 @@ import com.eshore.odcp.hudi.connector.utils.database.DatabaseService;
|
||||
import com.lanyuanxiaoyao.service.configuration.entity.PageResponse;
|
||||
import com.lanyuanxiaoyao.service.configuration.entity.info.JobAndMetas;
|
||||
import com.lanyuanxiaoyao.service.configuration.entity.info.JobIdAndAlias;
|
||||
import com.lanyuanxiaoyao.service.configuration.entity.info.VersionUpdated;
|
||||
import java.util.List;
|
||||
import java.util.function.Function;
|
||||
import org.eclipse.collections.api.list.ImmutableList;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
@@ -26,6 +32,7 @@ import static com.eshore.odcp.hudi.connector.Constants.DATABASE_NAME;
|
||||
* @author lanyuanxiaoyao
|
||||
* @date 2023-04-24
|
||||
*/
|
||||
@SuppressWarnings("OptionalGetWithoutIsPresent")
|
||||
@CacheConfig(cacheManager = "normal-cache")
|
||||
@Service
|
||||
public class InfoService {
|
||||
@@ -198,4 +205,63 @@ public class InfoService {
|
||||
public ImmutableList<String> updatedVersionTables() {
|
||||
return databaseService.findAllScheduledTable();
|
||||
}
|
||||
|
||||
private static String column(Alias table, String column) {
|
||||
return StrUtil.format("{}.{}", table.getAlias(), column);
|
||||
}
|
||||
|
||||
private static final String COUNT = "count(*)";
|
||||
|
||||
private static final Alias TABLE_VERSION = Alias.of(StrUtil.format("{}.tb_app_collect_table_version", DATABASE_NAME), "tactv");
|
||||
private static final String TABLE_VERSION_FLINK_JOB_ID = column(TABLE_VERSION, "flink_job_id");
|
||||
private static final String TABLE_VERSION_ALIAS = column(TABLE_VERSION, "alias");
|
||||
private static final String TABLE_VERSION_VERSION = column(TABLE_VERSION, "version");
|
||||
private static final String TABLE_VERSION_SCHEDULED = column(TABLE_VERSION, "scheduled");
|
||||
|
||||
private static final Alias TABLE_INFO = Alias.of(StrUtil.format("{}.tb_app_collect_table_info", DATABASE_NAME), "tacti");
|
||||
private static final String TABLE_INFO_FLINK_JOB_ID = column(TABLE_INFO, "flink_job_id");
|
||||
private static final String TABLE_INFO_ALIAS = column(TABLE_INFO, "alias");
|
||||
|
||||
private static final Alias TABLE_SYNC_STATE = Alias.of(StrUtil.format("{}.tb_app_hudi_sync_state", DATABASE_NAME), "tahss");
|
||||
private static final String TABLE_SYNC_STATE_ID = column(TABLE_SYNC_STATE, "id");
|
||||
|
||||
@Cacheable(value = "version-tables", sync = true)
|
||||
@Retryable(Throwable.class)
|
||||
public PageResponse<VersionUpdated> findAllVersionTables(
|
||||
Integer page,
|
||||
Integer count,
|
||||
String version,
|
||||
Long flinkJobId,
|
||||
String alias,
|
||||
String order,
|
||||
String direction,
|
||||
ImmutableList<Boolean> filterSchedules
|
||||
) {
|
||||
return mysqlTransactionTemplate.execute(status -> {
|
||||
Function<SelectSqlBuilder, SqlBuilder> criteria = builder ->
|
||||
builder.from(TABLE_INFO)
|
||||
.join(TABLE_VERSION)
|
||||
.onEq(TABLE_INFO_FLINK_JOB_ID, Column.as(TABLE_VERSION_FLINK_JOB_ID))
|
||||
.andEq(TABLE_INFO_ALIAS, Column.as(TABLE_VERSION_ALIAS))
|
||||
.join(TABLE_SYNC_STATE)
|
||||
.on(StrUtil.format("{} = CONCAT({}, '-', {})", TABLE_SYNC_STATE_ID, TABLE_INFO_FLINK_JOB_ID, TABLE_INFO_ALIAS))
|
||||
.whereLike(ObjectUtil.isNotNull(flinkJobId), TABLE_INFO_FLINK_JOB_ID, flinkJobId)
|
||||
.andLike(StrUtil.isNotBlank(alias), TABLE_INFO_ALIAS, alias)
|
||||
.andEq(StrUtil.isNotBlank(version), TABLE_VERSION_VERSION, version)
|
||||
.andIn(ObjectUtil.isNotEmpty(filterSchedules), TABLE_VERSION_SCHEDULED, filterSchedules)
|
||||
.orderBy(StrUtil.isNotBlank(order) && StrUtil.isNotBlank(direction), () -> StrUtil.format("{} {}", order, direction))
|
||||
.limit(Math.max(count, 1) * Math.max(page - 1, 0), Math.max(count, 1));
|
||||
Long total = mysqlJdbcTemplate.queryForObject(criteria.apply(SqlBuilder.select(COUNT)).build(), Long.class);
|
||||
List<VersionUpdated> list = mysqlJdbcTemplate.query(
|
||||
criteria.apply(SqlBuilder.select(
|
||||
TABLE_INFO_FLINK_JOB_ID,
|
||||
TABLE_INFO_ALIAS,
|
||||
TABLE_VERSION_VERSION,
|
||||
TABLE_VERSION_SCHEDULED
|
||||
)).build(),
|
||||
(rs, row) -> new VersionUpdated(rs.getLong(1), rs.getString(2), rs.getString(3), rs.getBoolean(4))
|
||||
);
|
||||
return new PageResponse<>(list, total);
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user