feature(web): 增加 hdfs 路径补全
This commit is contained in:
@@ -105,4 +105,13 @@ public interface InfoService {
|
|||||||
|
|
||||||
@Get("/non_exists_table")
|
@Get("/non_exists_table")
|
||||||
Boolean nonExistsTable(@Query("flink_job_id") Long flinkJobId, @Query("alias") String alias);
|
Boolean nonExistsTable(@Query("flink_job_id") Long flinkJobId, @Query("alias") String alias);
|
||||||
|
|
||||||
|
@Get("/all_flink_job_id")
|
||||||
|
ImmutableList<Long> allFlinkJobId();
|
||||||
|
|
||||||
|
@Get("/all_alias")
|
||||||
|
ImmutableList<String> allAlias();
|
||||||
|
|
||||||
|
@Get("/all_hdfs")
|
||||||
|
ImmutableList<String> allHdfs();
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -225,4 +225,19 @@ public class InfoController {
|
|||||||
public Boolean nonExistsTable(@RequestParam(value = "flink_job_id") Long flinkJobId, @RequestParam(value = "alias") String alias) {
|
public Boolean nonExistsTable(@RequestParam(value = "flink_job_id") Long flinkJobId, @RequestParam(value = "alias") String alias) {
|
||||||
return !infoService.existsTable(flinkJobId, alias);
|
return !infoService.existsTable(flinkJobId, alias);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@GetMapping("/all_flink_job_id")
|
||||||
|
public ImmutableList<Long> allFlinkJobId() {
|
||||||
|
return infoService.allFlinkJobId();
|
||||||
|
}
|
||||||
|
|
||||||
|
@GetMapping("/all_alias")
|
||||||
|
public ImmutableList<String> allAlias() {
|
||||||
|
return infoService.allAlias();
|
||||||
|
}
|
||||||
|
|
||||||
|
@GetMapping("/all_hdfs")
|
||||||
|
public ImmutableList<String> allHdfs() {
|
||||||
|
return infoService.allHdfs();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -242,6 +242,7 @@ public class InfoService {
|
|||||||
private static final String TABLE_INFO_ALIAS = column(TABLE_INFO, "alias");
|
private static final String TABLE_INFO_ALIAS = column(TABLE_INFO, "alias");
|
||||||
private static final String TABLE_INFO_PRIORITY = column(TABLE_INFO, "priority");
|
private static final String TABLE_INFO_PRIORITY = column(TABLE_INFO, "priority");
|
||||||
private static final String TABLE_INFO_STATUS = column(TABLE_INFO, "status");
|
private static final String TABLE_INFO_STATUS = column(TABLE_INFO, "status");
|
||||||
|
private static final String TABLE_INFO_TARGET_HDFS = column(TABLE_INFO, "tgt_hdfs_path");
|
||||||
|
|
||||||
private static final Alias TABLE_SYNC_STATE = Alias.of(StrUtil.format("{}.tb_app_hudi_sync_state", DATABASE_NAME), "tahss");
|
private static final Alias TABLE_SYNC_STATE = Alias.of(StrUtil.format("{}.tb_app_hudi_sync_state", DATABASE_NAME), "tahss");
|
||||||
private static final String TABLE_SYNC_STATE_ID = column(TABLE_SYNC_STATE, "id");
|
private static final String TABLE_SYNC_STATE_ID = column(TABLE_SYNC_STATE, "id");
|
||||||
@@ -745,4 +746,44 @@ public class InfoService {
|
|||||||
Boolean.class
|
Boolean.class
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Cacheable(value = "all-flink-job-id", sync = true, cacheManager = "long-cache")
|
||||||
|
@Retryable(Throwable.class)
|
||||||
|
public ImmutableList<Long> allFlinkJobId() {
|
||||||
|
return Lists.immutable.ofAll(mysqlJdbcTemplate.queryForList(
|
||||||
|
SqlBuilder.select(StrUtil.format("distinct {}", TABLE_FLINK_JOB_ID))
|
||||||
|
.from(TABLE_FLINK_JOB)
|
||||||
|
.whereEq(TABLE_FLINK_JOB_STATUS, "y")
|
||||||
|
.build(),
|
||||||
|
Long.class
|
||||||
|
));
|
||||||
|
}
|
||||||
|
|
||||||
|
@Cacheable(value = "all-alias", sync = true, cacheManager = "long-cache")
|
||||||
|
@Retryable(Throwable.class)
|
||||||
|
public ImmutableList<String> allAlias() {
|
||||||
|
return Lists.immutable.ofAll(mysqlJdbcTemplate.queryForList(
|
||||||
|
SqlBuilder.select(StrUtil.format("distinct {}", TABLE_INFO_ALIAS))
|
||||||
|
.from(TABLE_INFO, TABLE_FLINK_JOB)
|
||||||
|
.whereEq(TABLE_INFO_FLINK_JOB_ID, Column.as(TABLE_FLINK_JOB_ID))
|
||||||
|
.andEq(TABLE_FLINK_JOB_STATUS, "y")
|
||||||
|
.andEq(TABLE_INFO_STATUS, "y")
|
||||||
|
.build(),
|
||||||
|
String.class
|
||||||
|
));
|
||||||
|
}
|
||||||
|
|
||||||
|
@Cacheable(value = "all-hdfs", sync = true, cacheManager = "long-cache")
|
||||||
|
@Retryable(Throwable.class)
|
||||||
|
public ImmutableList<String> allHdfs() {
|
||||||
|
return Lists.immutable.ofAll(mysqlJdbcTemplate.queryForList(
|
||||||
|
SqlBuilder.select(StrUtil.format("distinct {}", TABLE_INFO_TARGET_HDFS))
|
||||||
|
.from(TABLE_INFO, TABLE_FLINK_JOB)
|
||||||
|
.whereEq(TABLE_INFO_FLINK_JOB_ID, Column.as(TABLE_FLINK_JOB_ID))
|
||||||
|
.andEq(TABLE_FLINK_JOB_STATUS, "y")
|
||||||
|
.andEq(TABLE_INFO_STATUS, "y")
|
||||||
|
.build(),
|
||||||
|
String.class
|
||||||
|
));
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -43,11 +43,9 @@ public class SqlBuilderTests {
|
|||||||
|
|
||||||
public static void main(String[] args) {
|
public static void main(String[] args) {
|
||||||
System.out.println(SqlFormatter.format(
|
System.out.println(SqlFormatter.format(
|
||||||
SqlBuilder.select("count(*) > 0")
|
SqlBuilder.select(StrUtil.format("distinct {}", TABLE_INFO_ALIAS))
|
||||||
.from(TABLE_INFO, TABLE_FLINK_JOB)
|
.from(TABLE_INFO, TABLE_FLINK_JOB)
|
||||||
.whereEq(TABLE_INFO_FLINK_JOB_ID, Column.as(TABLE_FLINK_JOB_ID))
|
.whereEq(TABLE_INFO_FLINK_JOB_ID, Column.as(TABLE_FLINK_JOB_ID))
|
||||||
.andEq(TABLE_FLINK_JOB_ID, 1552408244680593408L)
|
|
||||||
.andEq(TABLE_INFO_ALIAS, "crm_order_ord_prod_inst_sub_his")
|
|
||||||
.andEq(TABLE_FLINK_JOB_STATUS, "y")
|
.andEq(TABLE_FLINK_JOB_STATUS, "y")
|
||||||
.andEq(TABLE_INFO_STATUS, "y")
|
.andEq(TABLE_INFO_STATUS, "y")
|
||||||
.build()
|
.build()
|
||||||
|
|||||||
@@ -8,6 +8,8 @@ import com.eshore.odcp.hudi.connector.entity.SyncState;
|
|||||||
import com.eshore.odcp.hudi.connector.entity.TableMeta;
|
import com.eshore.odcp.hudi.connector.entity.TableMeta;
|
||||||
import com.eshore.odcp.hudi.connector.utils.NameHelper;
|
import com.eshore.odcp.hudi.connector.utils.NameHelper;
|
||||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||||
|
import com.github.benmanes.caffeine.cache.Caffeine;
|
||||||
|
import com.github.benmanes.caffeine.cache.LoadingCache;
|
||||||
import com.lanyuanxiaoyao.service.configuration.ExecutorProvider;
|
import com.lanyuanxiaoyao.service.configuration.ExecutorProvider;
|
||||||
import com.lanyuanxiaoyao.service.configuration.entity.PageResponse;
|
import com.lanyuanxiaoyao.service.configuration.entity.PageResponse;
|
||||||
import com.lanyuanxiaoyao.service.configuration.entity.info.CompactionMetrics;
|
import com.lanyuanxiaoyao.service.configuration.entity.info.CompactionMetrics;
|
||||||
@@ -22,7 +24,9 @@ import com.lanyuanxiaoyao.service.web.entity.CompactionMetricsVO;
|
|||||||
import com.lanyuanxiaoyao.service.web.entity.FlinkJobVO;
|
import com.lanyuanxiaoyao.service.web.entity.FlinkJobVO;
|
||||||
import com.lanyuanxiaoyao.service.web.entity.SyncStateVO;
|
import com.lanyuanxiaoyao.service.web.entity.SyncStateVO;
|
||||||
import com.lanyuanxiaoyao.service.web.entity.TableVO;
|
import com.lanyuanxiaoyao.service.web.entity.TableVO;
|
||||||
|
import java.time.Duration;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
import java.util.Objects;
|
||||||
import java.util.concurrent.CompletableFuture;
|
import java.util.concurrent.CompletableFuture;
|
||||||
import java.util.concurrent.ExecutionException;
|
import java.util.concurrent.ExecutionException;
|
||||||
import org.eclipse.collections.api.factory.Lists;
|
import org.eclipse.collections.api.factory.Lists;
|
||||||
@@ -55,6 +59,20 @@ public class TableController extends BaseController {
|
|||||||
this.infoService = infoService;
|
this.infoService = infoService;
|
||||||
this.zookeeperService = zookeeperService;
|
this.zookeeperService = zookeeperService;
|
||||||
this.mapper = jackson2ObjectMapperBuilder.build();
|
this.mapper = jackson2ObjectMapperBuilder.build();
|
||||||
|
this.cache = Caffeine.newBuilder()
|
||||||
|
.expireAfterAccess(Duration.ofMinutes(5))
|
||||||
|
.build(key -> {
|
||||||
|
switch (key) {
|
||||||
|
case "flink_job_id":
|
||||||
|
return this.infoService.allFlinkJobId().collect(Objects::toString);
|
||||||
|
case "alias":
|
||||||
|
return this.infoService.allAlias();
|
||||||
|
case "hdfs":
|
||||||
|
return this.infoService.allHdfs();
|
||||||
|
default:
|
||||||
|
return Lists.immutable.empty();
|
||||||
|
}
|
||||||
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
@GetMapping("list")
|
@GetMapping("list")
|
||||||
@@ -185,4 +203,33 @@ public class TableController extends BaseController {
|
|||||||
ImmutableList<CompactionMetricsVO> vos = Lists.immutable.ofAll(pageResponse.getData()).collect(CompactionMetricsVO::new);
|
ImmutableList<CompactionMetricsVO> vos = Lists.immutable.ofAll(pageResponse.getData()).collect(CompactionMetricsVO::new);
|
||||||
return AmisResponse.responseCrudData(vos, pageResponse.getTotal());
|
return AmisResponse.responseCrudData(vos, pageResponse.getTotal());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private final LoadingCache<String, ImmutableList<String>> cache;
|
||||||
|
|
||||||
|
@SuppressWarnings("DataFlowIssue")
|
||||||
|
@GetMapping("all_flink_job_id")
|
||||||
|
public AmisCrudResponse allFlinkJobId(@RequestParam(value = "key", required = false) String key) {
|
||||||
|
if (StrUtil.isBlank(key)) {
|
||||||
|
return AmisResponse.responseCrudData(Lists.immutable.empty());
|
||||||
|
}
|
||||||
|
return AmisResponse.responseCrudData(cache.get("flink_job_id").select(id -> StrUtil.contains(id, key)));
|
||||||
|
}
|
||||||
|
|
||||||
|
@SuppressWarnings("DataFlowIssue")
|
||||||
|
@GetMapping("all_alias")
|
||||||
|
public AmisCrudResponse allAlias(@RequestParam(value = "key", required = false) String key) {
|
||||||
|
if (StrUtil.isBlank(key)) {
|
||||||
|
return AmisResponse.responseCrudData(Lists.immutable.empty());
|
||||||
|
}
|
||||||
|
return AmisResponse.responseCrudData(cache.get("alias").select(id -> StrUtil.contains(id, key)));
|
||||||
|
}
|
||||||
|
|
||||||
|
@SuppressWarnings("DataFlowIssue")
|
||||||
|
@GetMapping("all_hdfs")
|
||||||
|
public AmisCrudResponse allHdfs(@RequestParam(value = "key", required = false) String key) {
|
||||||
|
if (StrUtil.isBlank(key)) {
|
||||||
|
return AmisResponse.responseCrudData(Lists.immutable.empty());
|
||||||
|
}
|
||||||
|
return AmisResponse.responseCrudData(cache.get("hdfs").select(id -> StrUtil.contains(id, key)));
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -8,7 +8,7 @@ function toolTab() {
|
|||||||
actions: [
|
actions: [
|
||||||
{
|
{
|
||||||
type: 'submit',
|
type: 'submit',
|
||||||
label: '查询',
|
label: '查询时间线',
|
||||||
actionType: 'dialog',
|
actionType: 'dialog',
|
||||||
dialog: {
|
dialog: {
|
||||||
title: 'Hudi 表时间线',
|
title: 'Hudi 表时间线',
|
||||||
@@ -42,26 +42,10 @@ function toolTab() {
|
|||||||
columns: timelineColumns(),
|
columns: timelineColumns(),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
|
||||||
],
|
|
||||||
body: [
|
|
||||||
{
|
|
||||||
type: 'input-text',
|
|
||||||
name: 'hdfs',
|
|
||||||
label: 'HDFS路经',
|
|
||||||
required: true,
|
|
||||||
clearable: true,
|
|
||||||
description: '输入表HDFS路径',
|
|
||||||
}
|
|
||||||
]
|
|
||||||
},
|
},
|
||||||
{
|
|
||||||
type: 'form',
|
|
||||||
title: '查询表结构',
|
|
||||||
actions: [
|
|
||||||
{
|
{
|
||||||
type: 'submit',
|
type: 'submit',
|
||||||
label: '查询',
|
label: '查询表结构',
|
||||||
actionType: 'dialog',
|
actionType: 'dialog',
|
||||||
dialog: {
|
dialog: {
|
||||||
title: 'Hudi 表结构',
|
title: 'Hudi 表结构',
|
||||||
@@ -96,6 +80,7 @@ function toolTab() {
|
|||||||
required: true,
|
required: true,
|
||||||
clearable: true,
|
clearable: true,
|
||||||
description: '输入表HDFS路径',
|
description: '输入表HDFS路径',
|
||||||
|
autoComplete: '${base}/table/all_hdfs?key=$term',
|
||||||
}
|
}
|
||||||
]
|
]
|
||||||
},
|
},
|
||||||
|
|||||||
Reference in New Issue
Block a user