diff --git a/service-configuration/src/main/java/com/lanyuanxiaoyao/service/configuration/entity/info/TableInfoSearchCache.java b/service-configuration/src/main/java/com/lanyuanxiaoyao/service/configuration/entity/info/TableInfoSearchCache.java new file mode 100644 index 0000000..29ebb89 --- /dev/null +++ b/service-configuration/src/main/java/com/lanyuanxiaoyao/service/configuration/entity/info/TableInfoSearchCache.java @@ -0,0 +1,55 @@ +package com.lanyuanxiaoyao.service.configuration.entity.info; + +/** + * Table Info 表查询条件缓存 + * + * @author lanyuanxiaoyao + * @date 2023-07-14 + */ +public class TableInfoSearchCache { + private Long flinkJobId; + private String alias; + private String hdfs; + + public TableInfoSearchCache() { + } + + public TableInfoSearchCache(Long flinkJobId, String alias, String hdfs) { + this.flinkJobId = flinkJobId; + this.alias = alias; + this.hdfs = hdfs; + } + + public Long getFlinkJobId() { + return flinkJobId; + } + + public void setFlinkJobId(Long flinkJobId) { + this.flinkJobId = flinkJobId; + } + + public String getAlias() { + return alias; + } + + public void setAlias(String alias) { + this.alias = alias; + } + + public String getHdfs() { + return hdfs; + } + + public void setHdfs(String hdfs) { + this.hdfs = hdfs; + } + + @Override + public String toString() { + return "TableInfoSearchCache{" + + "flinkJobId=" + flinkJobId + + ", alias='" + alias + '\'' + + ", hdfs='" + hdfs + '\'' + + '}'; + } +} diff --git a/service-forest/src/main/java/com/lanyuanxiaoyao/service/forest/service/InfoService.java b/service-forest/src/main/java/com/lanyuanxiaoyao/service/forest/service/InfoService.java index fee5464..c3b198e 100644 --- a/service-forest/src/main/java/com/lanyuanxiaoyao/service/forest/service/InfoService.java +++ b/service-forest/src/main/java/com/lanyuanxiaoyao/service/forest/service/InfoService.java @@ -109,9 +109,24 @@ public interface InfoService { @Get("/info/all_flink_job_id") ImmutableList allFlinkJobId(); + @Get("/info/all_flink_job_id") + ImmutableList allFlinkJobId(@Query("key") String key); + + @Get("/info/all_flink_job_id") + ImmutableList allFlinkJobId(@Query("key") String key, @Query("alias") String alias); + @Get("/info/all_alias") ImmutableList allAlias(); + @Get("/info/all_alias") + ImmutableList allAlias(@Query("key") String key); + + @Get("/info/all_alias") + ImmutableList allAlias(@Query("key") String key, @Query("flink_job_id") String flinkJobId); + @Get("/info/all_hdfs") ImmutableList allHdfs(); + + @Get("/info/all_hdfs") + ImmutableList allHdfs(@Query("key") String key); } diff --git a/service-info-query/src/main/java/com/lanyuanxiaoyao/service/info/controller/InfoController.java b/service-info-query/src/main/java/com/lanyuanxiaoyao/service/info/controller/InfoController.java index 588acc5..1cc3441 100644 --- a/service-info-query/src/main/java/com/lanyuanxiaoyao/service/info/controller/InfoController.java +++ b/service-info-query/src/main/java/com/lanyuanxiaoyao/service/info/controller/InfoController.java @@ -1,14 +1,12 @@ package com.lanyuanxiaoyao.service.info.controller; import cn.hutool.core.util.ObjectUtil; +import cn.hutool.core.util.StrUtil; import com.eshore.odcp.hudi.connector.entity.FlinkJob; import com.eshore.odcp.hudi.connector.entity.SyncState; import com.eshore.odcp.hudi.connector.entity.TableMeta; import com.lanyuanxiaoyao.service.configuration.entity.PageResponse; -import com.lanyuanxiaoyao.service.configuration.entity.info.CompactionMetrics; -import com.lanyuanxiaoyao.service.configuration.entity.info.JobAndMetas; -import com.lanyuanxiaoyao.service.configuration.entity.info.JobIdAndAlias; -import com.lanyuanxiaoyao.service.configuration.entity.info.VersionUpdated; +import com.lanyuanxiaoyao.service.configuration.entity.info.*; import com.lanyuanxiaoyao.service.info.service.InfoService; import java.util.List; import org.eclipse.collections.api.factory.Lists; @@ -229,17 +227,34 @@ public class InfoController { } @GetMapping("/all_flink_job_id") - public ImmutableList allFlinkJobId() { - return infoService.allFlinkJobId(); + public ImmutableList allFlinkJobId( + @RequestParam(value = "key", required = false) String key, + @RequestParam(value = "alias", required = false) String alias + ) { + return infoService.allTableInfoSearchCache() + .select(cache -> StrUtil.isBlank(key) || StrUtil.contains(cache.getFlinkJobId().toString(), key)) + .select(cache -> StrUtil.isBlank(alias) || StrUtil.contains(cache.getAlias(), alias)) + .collect(TableInfoSearchCache::getFlinkJobId) + .distinct(); } @GetMapping("/all_alias") - public ImmutableList allAlias() { - return infoService.allAlias(); + public ImmutableList allAlias( + @RequestParam(value = "key", required = false) String key, + @RequestParam(value = "flink_job_id", required = false) String flinkJobId + ) { + return infoService.allTableInfoSearchCache() + .select(cache -> StrUtil.isBlank(key) || StrUtil.contains(cache.getAlias(), key)) + .select(cache -> StrUtil.isBlank(flinkJobId) || StrUtil.contains(cache.getFlinkJobId().toString(), flinkJobId)) + .collect(TableInfoSearchCache::getAlias) + .distinct(); } @GetMapping("/all_hdfs") - public ImmutableList allHdfs() { - return infoService.allHdfs(); + public ImmutableList allHdfs(@RequestParam(value = "key", required = false) String key) { + return infoService.allTableInfoSearchCache() + .select(cache -> StrUtil.isBlank(key) || StrUtil.contains(cache.getAlias(), key)) + .collect(TableInfoSearchCache::getHdfs) + .distinct(); } } diff --git a/service-info-query/src/main/java/com/lanyuanxiaoyao/service/info/service/InfoService.java b/service-info-query/src/main/java/com/lanyuanxiaoyao/service/info/service/InfoService.java index 90ad55c..cfefa24 100644 --- a/service-info-query/src/main/java/com/lanyuanxiaoyao/service/info/service/InfoService.java +++ b/service-info-query/src/main/java/com/lanyuanxiaoyao/service/info/service/InfoService.java @@ -8,16 +8,16 @@ import club.kingon.sql.builder.entry.Column; import cn.hutool.core.lang.Pair; import cn.hutool.core.util.ObjectUtil; import cn.hutool.core.util.StrUtil; +import com.eshore.odcp.hudi.connector.SQLConstants; import com.eshore.odcp.hudi.connector.entity.FlinkJob; import com.eshore.odcp.hudi.connector.entity.SyncState; import com.eshore.odcp.hudi.connector.entity.TableMeta; import com.eshore.odcp.hudi.connector.utils.database.DatabaseService; import com.lanyuanxiaoyao.service.configuration.entity.PageResponse; -import com.lanyuanxiaoyao.service.configuration.entity.info.CompactionMetrics; -import com.lanyuanxiaoyao.service.configuration.entity.info.JobAndMetas; -import com.lanyuanxiaoyao.service.configuration.entity.info.JobIdAndAlias; -import com.lanyuanxiaoyao.service.configuration.entity.info.VersionUpdated; +import com.lanyuanxiaoyao.service.configuration.entity.info.*; import com.lanyuanxiaoyao.service.info.configuration.SQLLoggerProvider; +import java.sql.ResultSet; +import java.sql.SQLException; import java.sql.Timestamp; import java.util.List; import org.eclipse.collections.api.factory.Lists; @@ -28,11 +28,15 @@ import org.slf4j.LoggerFactory; import org.springframework.cache.annotation.CacheConfig; import org.springframework.cache.annotation.Cacheable; import org.springframework.jdbc.core.JdbcTemplate; +import org.springframework.jdbc.core.RowMapper; import org.springframework.retry.annotation.Retryable; import org.springframework.stereotype.Service; import org.springframework.transaction.support.TransactionTemplate; import static com.eshore.odcp.hudi.connector.Constants.DATABASE_NAME; +import static com.eshore.odcp.hudi.connector.SQLConstants.HudiCollectBuild.*; +import static com.eshore.odcp.hudi.connector.SQLConstants.HudiCollectBuild.TbAppCollectTableInfo; +import static com.eshore.odcp.hudi.connector.SQLConstants.HudiCollectBuild.TbAppFlinkJobConfig; /** * @author lanyuanxiaoyao @@ -622,18 +626,18 @@ public class InfoService { int offset = limit * Math.max(page - 1, 0); Alias m1 = Alias.of( SqlBuilder.selectAll() - .from(TABLE_COMPACTION_METRICS) - .whereEq(TABLE_COMPACTION_METRICS_TYPE, "pre") - .andEq(TABLE_COMPACTION_METRICS_FLINK_JOB_ID, flinkJobId) - .andEq(TABLE_COMPACTION_METRICS_ALIAS, alias), + .from(TbAppHudiCompactionMetrics._alias_) + .whereEq(TbAppHudiCompactionMetrics.TYPE_A, "pre") + .andEq(TbAppHudiCompactionMetrics.FLINK_JOB_ID_A, flinkJobId) + .andEq(TbAppHudiCompactionMetrics.ALIAS_A, alias), "m1" ); Alias m2 = Alias.of( SqlBuilder.selectAll() - .from(TABLE_COMPACTION_METRICS) - .whereEq(TABLE_COMPACTION_METRICS_TYPE, "complete") - .andEq(TABLE_COMPACTION_METRICS_FLINK_JOB_ID, flinkJobId) - .andEq(TABLE_COMPACTION_METRICS_ALIAS, alias), + .from(TbAppHudiCompactionMetrics._alias_) + .whereEq(TbAppHudiCompactionMetrics.TYPE_A, "complete") + .andEq(TbAppHudiCompactionMetrics.FLINK_JOB_ID_A, flinkJobId) + .andEq(TbAppHudiCompactionMetrics.ALIAS_A, alias), "m2" ); return builder @@ -762,54 +766,28 @@ public class InfoService { public Boolean existsTable(Long flinkJobId, String alias) { return mysqlJdbcTemplate.queryForObject( SqlBuilder.select("count(*) > 0") - .from(TABLE_INFO, TABLE_FLINK_JOB) - .whereEq(TABLE_INFO_FLINK_JOB_ID, Column.as(TABLE_FLINK_JOB_ID)) - .andEq(TABLE_FLINK_JOB_ID, flinkJobId) - .andEq(TABLE_INFO_ALIAS, alias) - .andEq(TABLE_FLINK_JOB_STATUS, "y") - .andEq(TABLE_INFO_STATUS, "y") + .from(TbAppCollectTableInfo._alias_, TbAppFlinkJobConfig._alias_) + .whereEq(TbAppCollectTableInfo.FLINK_JOB_ID_A, Column.as(TbAppFlinkJobConfig.ID_A)) + .andEq(TbAppFlinkJobConfig.ID_A, flinkJobId) + .andEq(TbAppCollectTableInfo.ALIAS_A, alias) + .andEq(TbAppFlinkJobConfig.STATUS_A, "y") + .andEq(TbAppCollectTableInfo.STATUS_A, "y") .build(), Boolean.class ); } - @Cacheable(value = "all-flink-job-id", sync = true, cacheManager = "long-cache") + @Cacheable(value = "all-table-info-search-cache", sync = true, cacheManager = "long-cache") @Retryable(Throwable.class) - public ImmutableList allFlinkJobId() { - return Lists.immutable.ofAll(mysqlJdbcTemplate.queryForList( - SqlBuilder.select(StrUtil.format("distinct {}", TABLE_FLINK_JOB_ID)) - .from(TABLE_FLINK_JOB) - .whereEq(TABLE_FLINK_JOB_STATUS, "y") + public ImmutableList allTableInfoSearchCache() { + return Lists.immutable.ofAll(mysqlJdbcTemplate.query( + SqlBuilder.select(TbAppCollectTableInfo.FLINK_JOB_ID_A, TbAppCollectTableInfo.ALIAS_A, TbAppCollectTableInfo.TGT_HDFS_PATH_A) + .from(TbAppCollectTableInfo._alias_, TbAppFlinkJobConfig._alias_) + .whereEq(TbAppCollectTableInfo.FLINK_JOB_ID_A, Column.as(TbAppFlinkJobConfig.ID_A)) + .andEq(TbAppFlinkJobConfig.STATUS_A, "y") + .andEq(TbAppCollectTableInfo.STATUS_A, "y") .build(), - Long.class - )); - } - - @Cacheable(value = "all-alias", sync = true, cacheManager = "long-cache") - @Retryable(Throwable.class) - public ImmutableList allAlias() { - return Lists.immutable.ofAll(mysqlJdbcTemplate.queryForList( - SqlBuilder.select(StrUtil.format("distinct {}", TABLE_INFO_ALIAS)) - .from(TABLE_INFO, TABLE_FLINK_JOB) - .whereEq(TABLE_INFO_FLINK_JOB_ID, Column.as(TABLE_FLINK_JOB_ID)) - .andEq(TABLE_FLINK_JOB_STATUS, "y") - .andEq(TABLE_INFO_STATUS, "y") - .build(), - String.class - )); - } - - @Cacheable(value = "all-hdfs", sync = true, cacheManager = "long-cache") - @Retryable(Throwable.class) - public ImmutableList allHdfs() { - return Lists.immutable.ofAll(mysqlJdbcTemplate.queryForList( - SqlBuilder.select(StrUtil.format("distinct {}", TABLE_INFO_TARGET_HDFS)) - .from(TABLE_INFO, TABLE_FLINK_JOB) - .whereEq(TABLE_INFO_FLINK_JOB_ID, Column.as(TABLE_FLINK_JOB_ID)) - .andEq(TABLE_FLINK_JOB_STATUS, "y") - .andEq(TABLE_INFO_STATUS, "y") - .build(), - String.class + (rs, row) -> new TableInfoSearchCache(rs.getLong(1), rs.getString(2), rs.getString(3)) )); } } diff --git a/service-web/src/main/java/com/lanyuanxiaoyao/service/web/controller/TableController.java b/service-web/src/main/java/com/lanyuanxiaoyao/service/web/controller/TableController.java index 707f4ef..fe05512 100644 --- a/service-web/src/main/java/com/lanyuanxiaoyao/service/web/controller/TableController.java +++ b/service-web/src/main/java/com/lanyuanxiaoyao/service/web/controller/TableController.java @@ -60,20 +60,6 @@ public class TableController extends BaseController { this.infoService = infoService; this.zookeeperService = zookeeperService; this.mapper = jackson2ObjectMapperBuilder.build(); - this.cache = Caffeine.newBuilder() - .expireAfterAccess(Duration.ofMinutes(5)) - .build(key -> { - switch (key) { - case "flink_job_id": - return this.infoService.allFlinkJobId().collect(Objects::toString); - case "alias": - return this.infoService.allAlias(); - case "hdfs": - return this.infoService.allHdfs(); - default: - return Lists.immutable.empty(); - } - }); } @GetMapping("list") @@ -209,24 +195,36 @@ public class TableController extends BaseController { return AmisResponse.responseCrudData(vos, pageResponse.getTotal()); } - private final LoadingCache> cache; - @SuppressWarnings("DataFlowIssue") @GetMapping("all_flink_job_id") - public AmisCrudResponse allFlinkJobId(@RequestParam(value = "key", required = false) String key) { + public AmisCrudResponse allFlinkJobId( + @RequestParam(value = "key", required = false) String key, + @RequestParam(value = "alias", required = false) String alias + ) { if (StrUtil.isBlank(key)) { return AmisResponse.responseCrudData(Lists.immutable.empty()); } - return AmisResponse.responseCrudData(cache.get("flink_job_id").select(id -> StrUtil.contains(id, key))); + if (StrUtil.isBlank(alias)) { + return AmisResponse.responseCrudData(infoService.allFlinkJobId(key).collect(Objects::toString)); + } else { + return AmisResponse.responseCrudData(infoService.allFlinkJobId(key, alias).collect(Objects::toString)); + } } @SuppressWarnings("DataFlowIssue") @GetMapping("all_alias") - public AmisCrudResponse allAlias(@RequestParam(value = "key", required = false) String key) { - if (StrUtil.isBlank(key)) { + public AmisCrudResponse allAlias( + @RequestParam(value = "key", required = false) String key, + @RequestParam(value = "flink_job_id", required = false) String flinkJobId + ) { + if (StrUtil.isBlank(key) && StrUtil.isBlank(flinkJobId)) { return AmisResponse.responseCrudData(Lists.immutable.empty()); } - return AmisResponse.responseCrudData(cache.get("alias").select(id -> StrUtil.contains(id, key))); + if (StrUtil.isBlank(flinkJobId)) { + return AmisResponse.responseCrudData(infoService.allAlias(key)); + } else { + return AmisResponse.responseCrudData(infoService.allAlias(key, flinkJobId)); + } } @SuppressWarnings("DataFlowIssue") @@ -235,6 +233,6 @@ public class TableController extends BaseController { if (StrUtil.isBlank(key)) { return AmisResponse.responseCrudData(Lists.immutable.empty()); } - return AmisResponse.responseCrudData(cache.get("hdfs").select(id -> StrUtil.contains(id, key))); + return AmisResponse.responseCrudData(infoService.allHdfs(key)); } } diff --git a/web/components/common.js b/web/components/common.js index 8c53d75..6bfa2d4 100644 --- a/web/components/common.js +++ b/web/components/common.js @@ -1934,7 +1934,13 @@ function flinkJobIdTextInput(require = false) { placeholder: '通过 ID 搜索', clearable: true, required: require, - autoComplete: '${base}/table/all_flink_job_id?key=$term', + autoComplete: { + method: 'get', + url: '${base}/table/all_flink_job_id?key=$term', + data: { + alias: '${alias|default:undefined}' + } + }, } } @@ -1946,6 +1952,13 @@ function aliasTextInput(require = false) { placeholder: '通过别名搜索', clearable: true, required: require, - autoComplete: '${base}/table/all_alias?key=$term', + creatable: false, + autoComplete: { + method: 'get', + url: '${base}/table/all_alias?key=$term', + data: { + flink_job_id: '${flinkJobId|default:undefined}' + } + }, } }