fix(info-query): 修复取flink id和alias的方法没有完整校验表是否合法

This commit is contained in:
2024-01-24 14:39:42 +08:00
parent a5281b3d75
commit 347eb1ad86
4 changed files with 42 additions and 44 deletions

View File

@@ -6,6 +6,7 @@ import com.lanyuanxiaoyao.service.configuration.entity.info.JobAndMetas;
import com.lanyuanxiaoyao.service.configuration.entity.info.JobIdAndAlias;
import com.lanyuanxiaoyao.service.configuration.entity.info.TableInfoSearchCache;
import com.lanyuanxiaoyao.service.info.service.InfoService;
import com.lanyuanxiaoyao.service.info.service.TableMetaService;
import java.util.List;
import org.eclipse.collections.api.factory.Lists;
import org.eclipse.collections.api.list.ImmutableList;
@@ -28,9 +29,11 @@ public class InfoController {
private static final Logger logger = LoggerFactory.getLogger(InfoController.class);
private final InfoService infoService;
private final TableMetaService tableMetaService;
public InfoController(InfoService infoService) {
public InfoController(InfoService infoService, TableMetaService tableMetaService) {
this.infoService = infoService;
this.tableMetaService = tableMetaService;
}
@GetMapping("/job_id_alias")
@@ -65,8 +68,9 @@ public class InfoController {
@GetMapping("/all_flink_job_id_and_alias")
public ImmutableList<JobIdAndAlias> allFlinkJobIdAndAlias() {
return infoService.allTableInfoSearchCache()
.collect(cache -> new JobIdAndAlias(cache.getFlinkJobId(), cache.getAlias()));
return tableMetaService.allTableInfoSearchCache()
.collect(cache -> new JobIdAndAlias(cache.getFlinkJobId(), cache.getAlias()))
.distinct();
}
@GetMapping("/all_flink_job_id")
@@ -74,7 +78,7 @@ public class InfoController {
@RequestParam(value = "key", required = false) String key,
@RequestParam(value = "alias", required = false) String alias
) {
return infoService.allTableInfoSearchCache()
return tableMetaService.allTableInfoSearchCache()
.select(cache -> StrUtil.isBlank(key) || StrUtil.contains(cache.getFlinkJobId().toString(), key))
.select(cache -> StrUtil.isBlank(alias) || StrUtil.contains(cache.getAlias(), alias))
.collect(TableInfoSearchCache::getFlinkJobId)
@@ -86,7 +90,7 @@ public class InfoController {
@RequestParam(value = "key", required = false) String key,
@RequestParam(value = "flink_job_id", required = false) String flinkJobId
) {
return infoService.allTableInfoSearchCache()
return tableMetaService.allTableInfoSearchCache()
.select(cache -> StrUtil.isBlank(key) || StrUtil.contains(cache.getAlias(), key))
.select(cache -> StrUtil.isBlank(flinkJobId) || StrUtil.contains(cache.getFlinkJobId().toString(), flinkJobId))
.collect(TableInfoSearchCache::getAlias)
@@ -95,7 +99,7 @@ public class InfoController {
@GetMapping("/all_hdfs")
public ImmutableList<String> allHdfs(@RequestParam(value = "key", required = false) String key) {
return infoService.allTableInfoSearchCache()
return tableMetaService.allTableInfoSearchCache()
.select(cache -> StrUtil.isBlank(key) || StrUtil.contains(cache.getAlias(), key) || StrUtil.contains(cache.getHdfs(), key))
.collect(TableInfoSearchCache::getHdfs)
.distinct();
@@ -103,7 +107,7 @@ public class InfoController {
@GetMapping("/all_pulsar")
public ImmutableList<String> allPulsar(@RequestParam(value = "key", required = false) String key) {
return infoService.allTableInfoSearchCache()
return tableMetaService.allTableInfoSearchCache()
.select(cache -> StrUtil.isBlank(key) || StrUtil.contains(cache.getAlias(), key) || StrUtil.contains(cache.getPulsar(), key))
.collect(TableInfoSearchCache::getPulsar)
.distinct();
@@ -111,7 +115,7 @@ public class InfoController {
@GetMapping("/all_pulsar_topic")
public ImmutableList<String> allPulsarTopic(@RequestParam(value = "key", required = false) String key) {
return infoService.allTableInfoSearchCache()
return tableMetaService.allTableInfoSearchCache()
.select(cache -> StrUtil.isBlank(key) || StrUtil.contains(cache.getAlias(), key) || StrUtil.contains(cache.getTopic(), key))
.collect(TableInfoSearchCache::getTopic)
.distinct();

View File

@@ -174,30 +174,4 @@ public class InfoService extends BaseService {
return flinkJobService.flinkJobs()
.collect(job -> new JobAndMetas(job, tableMetaService.tableMetaList(job.getId())));
}
@Cacheable(value = "all-table-info-search-cache", sync = true, cacheManager = "long-cache")
@Retryable(Throwable.class)
public ImmutableList<TableInfoSearchCache> allTableInfoSearchCache() {
return Lists.immutable.ofAll(mysqlJdbcTemplate.query(
SqlBuilder.select(
TbAppCollectTableInfo.FLINK_JOB_ID_A,
TbAppCollectTableInfo.ALIAS_A,
TbAppCollectTableInfo.TGT_HDFS_PATH_A,
TbAppCollectTableInfo.SRC_PULSAR_ADDR_A,
TbAppCollectTableInfo.SRC_TOPIC_A
)
.from(TbAppCollectTableInfo._alias_, TbAppFlinkJobConfig._alias_)
.whereEq(TbAppCollectTableInfo.FLINK_JOB_ID_A, Column.as(TbAppFlinkJobConfig.ID_A))
.andEq(TbAppFlinkJobConfig.STATUS_A, "y")
.andEq(TbAppCollectTableInfo.STATUS_A, "y")
.build(),
(rs, row) -> new TableInfoSearchCache(
rs.getLong(1),
rs.getString(2),
rs.getString(3),
rs.getString(4),
rs.getString(5)
)
));
}
}

View File

@@ -14,6 +14,7 @@ import com.eshore.odcp.hudi.connector.exception.ConfigException;
import com.eshore.odcp.hudi.connector.exception.TableMetaNotFoundException;
import com.lanyuanxiaoyao.service.configuration.ExecutorProvider;
import com.lanyuanxiaoyao.service.configuration.entity.info.SimpleTableMeta;
import com.lanyuanxiaoyao.service.configuration.entity.info.TableInfoSearchCache;
import com.lanyuanxiaoyao.service.configuration.entity.info.TableMetaAdd;
import java.util.List;
import java.util.Locale;
@@ -676,4 +677,27 @@ public class TableMetaService extends BaseService {
"CITY_ID"
);
}
@Cacheable(value = "all-table-info-search-cache", sync = true, cacheManager = "long-cache")
@Retryable(Throwable.class)
public ImmutableList<TableInfoSearchCache> allTableInfoSearchCache() {
return Lists.immutable.ofAll(mysqlJdbcTemplate.query(
generateTableMetaList(
SqlBuilder.select(
TbAppFlinkJobConfig.ID_A,
TbAppCollectTableInfo.ALIAS_A,
TbAppCollectTableInfo.TGT_HDFS_PATH_A,
TbAppCollectTableInfo.SRC_PULSAR_ADDR_A,
TbAppCollectTableInfo.SRC_TOPIC_A
)
).build(),
(rs, row) -> new TableInfoSearchCache(
rs.getLong(1),
rs.getString(2),
rs.getString(3),
rs.getString(4),
rs.getString(5)
)
)).distinct();
}
}

View File

@@ -67,18 +67,14 @@ public class SqlBuilderTests {
generateTableMetaList(
SqlBuilder.select(
TbAppFlinkJobConfig.ID_A,
TbAppFlinkJobConfig.NAME_A,
TbAppCollectTableInfo.ALIAS_A,
TbAppCollectTableInfo.SRC_SCHEMA_A,
TbAppCollectTableInfo.SRC_TABLE_A,
TbAppCollectTableInfo.TGT_DB_A,
TbAppCollectTableInfo.TGT_TABLE_A,
TbAppCollectTableInfo.TGT_HDFS_PATH_A
TbAppCollectTableInfo.TGT_HDFS_PATH_A,
TbAppCollectTableInfo.SRC_PULSAR_ADDR_A,
TbAppCollectTableInfo.SRC_TOPIC_A
),
flinkJobId,
alias
)
.build()
null,
null
).build()
));
}
}