feature(web): 优化flink job id和alias的自动匹配获取

This commit is contained in:
2023-07-14 14:56:19 +08:00
parent 5c43617315
commit 1a0bc8b7ef
6 changed files with 161 additions and 87 deletions

View File

@@ -1,14 +1,12 @@
package com.lanyuanxiaoyao.service.info.controller;
import cn.hutool.core.util.ObjectUtil;
import cn.hutool.core.util.StrUtil;
import com.eshore.odcp.hudi.connector.entity.FlinkJob;
import com.eshore.odcp.hudi.connector.entity.SyncState;
import com.eshore.odcp.hudi.connector.entity.TableMeta;
import com.lanyuanxiaoyao.service.configuration.entity.PageResponse;
import com.lanyuanxiaoyao.service.configuration.entity.info.CompactionMetrics;
import com.lanyuanxiaoyao.service.configuration.entity.info.JobAndMetas;
import com.lanyuanxiaoyao.service.configuration.entity.info.JobIdAndAlias;
import com.lanyuanxiaoyao.service.configuration.entity.info.VersionUpdated;
import com.lanyuanxiaoyao.service.configuration.entity.info.*;
import com.lanyuanxiaoyao.service.info.service.InfoService;
import java.util.List;
import org.eclipse.collections.api.factory.Lists;
@@ -229,17 +227,34 @@ public class InfoController {
}
@GetMapping("/all_flink_job_id")
public ImmutableList<Long> allFlinkJobId() {
return infoService.allFlinkJobId();
public ImmutableList<Long> allFlinkJobId(
@RequestParam(value = "key", required = false) String key,
@RequestParam(value = "alias", required = false) String alias
) {
return infoService.allTableInfoSearchCache()
.select(cache -> StrUtil.isBlank(key) || StrUtil.contains(cache.getFlinkJobId().toString(), key))
.select(cache -> StrUtil.isBlank(alias) || StrUtil.contains(cache.getAlias(), alias))
.collect(TableInfoSearchCache::getFlinkJobId)
.distinct();
}
@GetMapping("/all_alias")
public ImmutableList<String> allAlias() {
return infoService.allAlias();
public ImmutableList<String> allAlias(
@RequestParam(value = "key", required = false) String key,
@RequestParam(value = "flink_job_id", required = false) String flinkJobId
) {
return infoService.allTableInfoSearchCache()
.select(cache -> StrUtil.isBlank(key) || StrUtil.contains(cache.getAlias(), key))
.select(cache -> StrUtil.isBlank(flinkJobId) || StrUtil.contains(cache.getFlinkJobId().toString(), flinkJobId))
.collect(TableInfoSearchCache::getAlias)
.distinct();
}
@GetMapping("/all_hdfs")
public ImmutableList<String> allHdfs() {
return infoService.allHdfs();
public ImmutableList<String> allHdfs(@RequestParam(value = "key", required = false) String key) {
return infoService.allTableInfoSearchCache()
.select(cache -> StrUtil.isBlank(key) || StrUtil.contains(cache.getAlias(), key))
.collect(TableInfoSearchCache::getHdfs)
.distinct();
}
}

View File

@@ -8,16 +8,16 @@ import club.kingon.sql.builder.entry.Column;
import cn.hutool.core.lang.Pair;
import cn.hutool.core.util.ObjectUtil;
import cn.hutool.core.util.StrUtil;
import com.eshore.odcp.hudi.connector.SQLConstants;
import com.eshore.odcp.hudi.connector.entity.FlinkJob;
import com.eshore.odcp.hudi.connector.entity.SyncState;
import com.eshore.odcp.hudi.connector.entity.TableMeta;
import com.eshore.odcp.hudi.connector.utils.database.DatabaseService;
import com.lanyuanxiaoyao.service.configuration.entity.PageResponse;
import com.lanyuanxiaoyao.service.configuration.entity.info.CompactionMetrics;
import com.lanyuanxiaoyao.service.configuration.entity.info.JobAndMetas;
import com.lanyuanxiaoyao.service.configuration.entity.info.JobIdAndAlias;
import com.lanyuanxiaoyao.service.configuration.entity.info.VersionUpdated;
import com.lanyuanxiaoyao.service.configuration.entity.info.*;
import com.lanyuanxiaoyao.service.info.configuration.SQLLoggerProvider;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Timestamp;
import java.util.List;
import org.eclipse.collections.api.factory.Lists;
@@ -28,11 +28,15 @@ import org.slf4j.LoggerFactory;
import org.springframework.cache.annotation.CacheConfig;
import org.springframework.cache.annotation.Cacheable;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.jdbc.core.RowMapper;
import org.springframework.retry.annotation.Retryable;
import org.springframework.stereotype.Service;
import org.springframework.transaction.support.TransactionTemplate;
import static com.eshore.odcp.hudi.connector.Constants.DATABASE_NAME;
import static com.eshore.odcp.hudi.connector.SQLConstants.HudiCollectBuild.*;
import static com.eshore.odcp.hudi.connector.SQLConstants.HudiCollectBuild.TbAppCollectTableInfo;
import static com.eshore.odcp.hudi.connector.SQLConstants.HudiCollectBuild.TbAppFlinkJobConfig;
/**
* @author lanyuanxiaoyao
@@ -622,18 +626,18 @@ public class InfoService {
int offset = limit * Math.max(page - 1, 0);
Alias m1 = Alias.of(
SqlBuilder.selectAll()
.from(TABLE_COMPACTION_METRICS)
.whereEq(TABLE_COMPACTION_METRICS_TYPE, "pre")
.andEq(TABLE_COMPACTION_METRICS_FLINK_JOB_ID, flinkJobId)
.andEq(TABLE_COMPACTION_METRICS_ALIAS, alias),
.from(TbAppHudiCompactionMetrics._alias_)
.whereEq(TbAppHudiCompactionMetrics.TYPE_A, "pre")
.andEq(TbAppHudiCompactionMetrics.FLINK_JOB_ID_A, flinkJobId)
.andEq(TbAppHudiCompactionMetrics.ALIAS_A, alias),
"m1"
);
Alias m2 = Alias.of(
SqlBuilder.selectAll()
.from(TABLE_COMPACTION_METRICS)
.whereEq(TABLE_COMPACTION_METRICS_TYPE, "complete")
.andEq(TABLE_COMPACTION_METRICS_FLINK_JOB_ID, flinkJobId)
.andEq(TABLE_COMPACTION_METRICS_ALIAS, alias),
.from(TbAppHudiCompactionMetrics._alias_)
.whereEq(TbAppHudiCompactionMetrics.TYPE_A, "complete")
.andEq(TbAppHudiCompactionMetrics.FLINK_JOB_ID_A, flinkJobId)
.andEq(TbAppHudiCompactionMetrics.ALIAS_A, alias),
"m2"
);
return builder
@@ -762,54 +766,28 @@ public class InfoService {
public Boolean existsTable(Long flinkJobId, String alias) {
return mysqlJdbcTemplate.queryForObject(
SqlBuilder.select("count(*) > 0")
.from(TABLE_INFO, TABLE_FLINK_JOB)
.whereEq(TABLE_INFO_FLINK_JOB_ID, Column.as(TABLE_FLINK_JOB_ID))
.andEq(TABLE_FLINK_JOB_ID, flinkJobId)
.andEq(TABLE_INFO_ALIAS, alias)
.andEq(TABLE_FLINK_JOB_STATUS, "y")
.andEq(TABLE_INFO_STATUS, "y")
.from(TbAppCollectTableInfo._alias_, TbAppFlinkJobConfig._alias_)
.whereEq(TbAppCollectTableInfo.FLINK_JOB_ID_A, Column.as(TbAppFlinkJobConfig.ID_A))
.andEq(TbAppFlinkJobConfig.ID_A, flinkJobId)
.andEq(TbAppCollectTableInfo.ALIAS_A, alias)
.andEq(TbAppFlinkJobConfig.STATUS_A, "y")
.andEq(TbAppCollectTableInfo.STATUS_A, "y")
.build(),
Boolean.class
);
}
@Cacheable(value = "all-flink-job-id", sync = true, cacheManager = "long-cache")
@Cacheable(value = "all-table-info-search-cache", sync = true, cacheManager = "long-cache")
@Retryable(Throwable.class)
public ImmutableList<Long> allFlinkJobId() {
return Lists.immutable.ofAll(mysqlJdbcTemplate.queryForList(
SqlBuilder.select(StrUtil.format("distinct {}", TABLE_FLINK_JOB_ID))
.from(TABLE_FLINK_JOB)
.whereEq(TABLE_FLINK_JOB_STATUS, "y")
public ImmutableList<TableInfoSearchCache> allTableInfoSearchCache() {
return Lists.immutable.ofAll(mysqlJdbcTemplate.query(
SqlBuilder.select(TbAppCollectTableInfo.FLINK_JOB_ID_A, TbAppCollectTableInfo.ALIAS_A, TbAppCollectTableInfo.TGT_HDFS_PATH_A)
.from(TbAppCollectTableInfo._alias_, TbAppFlinkJobConfig._alias_)
.whereEq(TbAppCollectTableInfo.FLINK_JOB_ID_A, Column.as(TbAppFlinkJobConfig.ID_A))
.andEq(TbAppFlinkJobConfig.STATUS_A, "y")
.andEq(TbAppCollectTableInfo.STATUS_A, "y")
.build(),
Long.class
));
}
@Cacheable(value = "all-alias", sync = true, cacheManager = "long-cache")
@Retryable(Throwable.class)
public ImmutableList<String> allAlias() {
return Lists.immutable.ofAll(mysqlJdbcTemplate.queryForList(
SqlBuilder.select(StrUtil.format("distinct {}", TABLE_INFO_ALIAS))
.from(TABLE_INFO, TABLE_FLINK_JOB)
.whereEq(TABLE_INFO_FLINK_JOB_ID, Column.as(TABLE_FLINK_JOB_ID))
.andEq(TABLE_FLINK_JOB_STATUS, "y")
.andEq(TABLE_INFO_STATUS, "y")
.build(),
String.class
));
}
@Cacheable(value = "all-hdfs", sync = true, cacheManager = "long-cache")
@Retryable(Throwable.class)
public ImmutableList<String> allHdfs() {
return Lists.immutable.ofAll(mysqlJdbcTemplate.queryForList(
SqlBuilder.select(StrUtil.format("distinct {}", TABLE_INFO_TARGET_HDFS))
.from(TABLE_INFO, TABLE_FLINK_JOB)
.whereEq(TABLE_INFO_FLINK_JOB_ID, Column.as(TABLE_FLINK_JOB_ID))
.andEq(TABLE_FLINK_JOB_STATUS, "y")
.andEq(TABLE_INFO_STATUS, "y")
.build(),
String.class
(rs, row) -> new TableInfoSearchCache(rs.getLong(1), rs.getString(2), rs.getString(3))
));
}
}