refactor(info-query): 优化跨天信息和基本信息查询

This commit is contained in:
2023-06-13 10:45:33 +08:00
parent c1a9e5b24b
commit 04a23431f7
6 changed files with 229 additions and 137 deletions

View File

@@ -22,30 +22,42 @@ import java.util.Map;
*/
@BaseRequest(baseURL = "http://service-info-query/info")
public interface InfoService {
@Get("/table_total")
Long tableTotal();
@Get("/table_count")
Long tableCount();
@Get("/hudi_total")
Long hudiTotal();
@Get("/table_focus_count")
Long tableFocusCount();
@Get("/focus_count")
Long focusCount();
@Get("/hudi_count")
Long hudiCount();
@Get("/normal_count")
Long normalCount();
@Get("/hudi_focus_count")
Long hudiFocusCount();
@Get("/un_scheduled_normal_table_count")
Long unScheduledNormalTableCount(@Query("version") String version);
@Get("/un_scheduled_focus_table_count")
Long unScheduledFocusTableCount(@Query("version") String version);
@Get("/un_receive_version_normal_table")
ImmutableList<JobIdAndAlias> unReceiveVersionNormalTable(@Query("version") String version);
@Get("/un_receive_version_normal_table_count")
Long unReceiveVersionNormalTableCount(@Query("version") String version);
@Get("/un_receive_version_focus_table")
ImmutableList<JobIdAndAlias> unReceiveVersionFocusTable(@Query("version") String version);
@Get("/un_receive_version_focus_table_count")
Long unReceiveVersionFocusTableCount(@Query("version") String version);
@Get("/un_scheduled_normal_table")
ImmutableList<JobIdAndAlias> unScheduledNormalTable(@Query("version") String version);
@Get("/un_scheduled_normal_table_count")
Long unScheduledNormalTableCount(@Query("version") String version);
@Get("/un_scheduled_focus_table")
ImmutableList<JobIdAndAlias> unScheduledFocusTable(@Query("version") String version);
@Get("/un_scheduled_focus_table_count")
Long unScheduledFocusTableCount(@Query("version") String version);
@Get("/job_id_alias")
PageResponse<JobIdAndAlias> jobIdAndAlias(@Query Map<String, Object> queryMap);

View File

@@ -37,26 +37,6 @@ public class InfoController {
this.infoService = infoService;
}
@GetMapping("table_total")
public Long tableTotal() {
return infoService.tableTotal();
}
@GetMapping("hudi_total")
public Long hudiTotal() {
return infoService.hudiTotal();
}
@GetMapping("focus_count")
public Long focusCount() {
return infoService.focusCount();
}
@GetMapping("normal_count")
public Long normalCount() {
return infoService.normalCount();
}
@GetMapping("/job_id_alias")
public PageResponse<JobIdAndAlias> jobIdAndAlias(
@RequestParam(value = "page", defaultValue = "1") Integer page,
@@ -145,14 +125,9 @@ public class InfoController {
return infoService.nonUpdatedVersionTables();
}
@GetMapping("/un_scheduled_normal_table_count")
public Long unScheduledNormalTableCount(@RequestParam("version") String version) {
return infoService.unScheduledNormalTableCount(version);
}
@GetMapping("/un_scheduled_focus_table_count")
public Long unScheduledFocusTableCount(@RequestParam("version") String version) {
return infoService.unScheduledFocusTableCount(version);
@GetMapping("/un_receive_version_normal_table")
public ImmutableList<JobIdAndAlias> unReceiveVersionNormalTable(@RequestParam("version") String version) {
return infoService.unReceiveVersionNormalTable(version);
}
@GetMapping("/un_receive_version_normal_table_count")
@@ -160,8 +135,53 @@ public class InfoController {
return infoService.unReceiveVersionNormalTableCount(version);
}
@GetMapping("/un_receive_version_focus_table")
public ImmutableList<JobIdAndAlias> unReceiveVersionFocusTable(@RequestParam("version") String version) {
return infoService.unReceiveVersionFocusTable(version);
}
@GetMapping("/un_receive_version_focus_table_count")
public Long unReceiveVersionFocusTableCount(@RequestParam("version") String version) {
return infoService.unReceiveVersionFocusTableCount(version);
}
@GetMapping("/un_scheduled_normal_table")
public ImmutableList<JobIdAndAlias> unScheduledNormalTable(@RequestParam("version") String version) {
return infoService.unScheduledNormalTable(version);
}
@GetMapping("/un_scheduled_normal_table_count")
public Long unScheduledNormalTableCount(@RequestParam("version") String version) {
return infoService.unScheduledNormalTableCount(version);
}
@GetMapping("/un_scheduled_focus_table")
public ImmutableList<JobIdAndAlias> unScheduledFocusTable(@RequestParam("version") String version) {
return infoService.unScheduledFocusTable(version);
}
@GetMapping("/un_scheduled_focus_table_count")
public Long unScheduledFocusTableCount(@RequestParam("version") String version) {
return infoService.unScheduledFocusTableCount(version);
}
@GetMapping("/table_count")
public Long tableCount() {
return infoService.tableCount();
}
@GetMapping("/table_focus_count")
public Long tableFocusCount() {
return infoService.tableFocusCount();
}
@GetMapping("/hudi_count")
public Long hudiCount() {
return infoService.hudiCount();
}
@GetMapping("/hudi_focus_count")
public Long hudiFocusCount() {
return infoService.hudiFocusCount();
}
}

View File

@@ -343,119 +343,189 @@ public class InfoService {
});
}
private SqlBuilder generateUnReceiveVersionNormalTableSql(SelectSqlBuilder builder, String version) {
return builder
.from(TABLE_INFO)
.whereLt(TABLE_INFO_PRIORITY, 10000)
.andEq(TABLE_INFO_STATUS, "y")
.andNotIn(
StrUtil.format("concat({}, {})", TABLE_INFO_FLINK_JOB_ID, TABLE_INFO_ALIAS),
SqlBuilder.select(StrUtil.format("concat({}, {})", TABLE_VERSION_FLINK_JOB_ID, TABLE_VERSION_ALIAS))
.from(TABLE_VERSION)
.whereEq(TABLE_VERSION_VERSION, version)
);
}
@Cacheable(value = "un-receive-version-normal-table", sync = true)
@Retryable(Throwable.class)
public ImmutableList<JobIdAndAlias> unReceiveVersionNormalTable(String version) {
return Lists.immutable.ofAll(
mysqlJdbcTemplate.query(
generateUnReceiveVersionNormalTableSql(SqlBuilder.select(TABLE_INFO_FLINK_JOB_ID, TABLE_INFO_ALIAS), version)
.build(),
(rs, row) -> new JobIdAndAlias(rs.getLong(1), rs.getString(2))
)
);
}
@Cacheable(value = "un-receive-version-normal-table-count", sync = true)
@Retryable(Throwable.class)
public Long unReceiveVersionNormalTableCount(String version) {
return mysqlJdbcTemplate.queryForObject(
SqlBuilder.select(COUNT)
.from(TABLE_INFO)
.whereLt(TABLE_INFO_PRIORITY, 10000)
.andEq(TABLE_INFO_STATUS, "y")
.andNotIn(
StrUtil.format("concat({}, {})", TABLE_INFO_FLINK_JOB_ID, TABLE_INFO_ALIAS),
SqlBuilder.select(StrUtil.format("concat({}, {})", TABLE_VERSION_FLINK_JOB_ID, TABLE_VERSION_ALIAS))
.from(TABLE_VERSION)
.whereEq(TABLE_VERSION_VERSION, version)
)
generateUnReceiveVersionNormalTableSql(SqlBuilder.select(COUNT), version)
.build(),
Long.class
);
}
private SqlBuilder generateUnReceiveVersionFocusTable(SelectSqlBuilder builder, String version) {
return builder
.from(TABLE_INFO)
.whereGe(TABLE_INFO_PRIORITY, 10000)
.andEq(TABLE_INFO_STATUS, "y")
.andNotIn(
StrUtil.format("concat({}, {})", TABLE_INFO_FLINK_JOB_ID, TABLE_INFO_ALIAS),
SqlBuilder.select(StrUtil.format("concat({}, {})", TABLE_VERSION_FLINK_JOB_ID, TABLE_VERSION_ALIAS))
.from(TABLE_VERSION)
.whereEq(TABLE_VERSION_VERSION, version)
);
}
@Cacheable(value = "un-receive-version-focus-table", sync = true)
@Retryable(Throwable.class)
public ImmutableList<JobIdAndAlias> unReceiveVersionFocusTable(String version) {
return Lists.immutable.ofAll(
mysqlJdbcTemplate.query(
generateUnReceiveVersionFocusTable(SqlBuilder.select(TABLE_INFO_FLINK_JOB_ID, TABLE_INFO_ALIAS), version)
.build(),
(rs, row) -> new JobIdAndAlias(rs.getLong(1), rs.getString(2))
)
);
}
@Cacheable(value = "un-receive-version-focus-table-count", sync = true)
@Retryable(Throwable.class)
public Long unReceiveVersionFocusTableCount(String version) {
return mysqlJdbcTemplate.queryForObject(
SqlBuilder.select(COUNT)
.from(TABLE_INFO)
.whereGe(TABLE_INFO_PRIORITY, 10000)
.andEq(TABLE_INFO_STATUS, "y")
.andNotIn(
StrUtil.format("concat({}, {})", TABLE_INFO_FLINK_JOB_ID, TABLE_INFO_ALIAS),
SqlBuilder.select(StrUtil.format("concat({}, {})", TABLE_VERSION_FLINK_JOB_ID, TABLE_VERSION_ALIAS))
.from(TABLE_VERSION)
.whereEq(TABLE_VERSION_VERSION, version)
)
generateUnReceiveVersionFocusTable(SqlBuilder.select(COUNT), version)
.build(),
Long.class
);
}
private SqlBuilder generateUnScheduledNormalTableSql(SelectSqlBuilder builder, String version) {
return builder
.from(TABLE_INFO)
.join(TABLE_VERSION)
.onEq(TABLE_INFO_FLINK_JOB_ID, Column.as(TABLE_VERSION_FLINK_JOB_ID))
.andEq(TABLE_INFO_ALIAS, Column.as(TABLE_VERSION_ALIAS))
.whereLt(TABLE_INFO_PRIORITY, 10000)
.andEq(TABLE_VERSION_SCHEDULED, false)
.andEq(TABLE_VERSION_VERSION, version)
.andEq(TABLE_INFO_STATUS, "y");
}
@Cacheable(value = "un-scheduled-normal-table", sync = true)
@Retryable(Throwable.class)
public ImmutableList<JobIdAndAlias> unScheduledNormalTable(String version) {
return Lists.immutable.ofAll(
mysqlJdbcTemplate.query(
generateUnScheduledNormalTableSql(SqlBuilder.select(TABLE_INFO_FLINK_JOB_ID, TABLE_INFO_ALIAS), version)
.build(),
(rs, row) -> new JobIdAndAlias(rs.getLong(1), rs.getString(2))
)
);
}
@Cacheable(value = "un_scheduled_normal_table_count", sync = true)
@Retryable(Throwable.class)
public Long unScheduledNormalTableCount(String version) {
return mysqlJdbcTemplate.queryForObject(
SqlBuilder.select(COUNT)
.from(TABLE_INFO)
.join(TABLE_VERSION)
.onEq(TABLE_INFO_FLINK_JOB_ID, Column.as(TABLE_VERSION_FLINK_JOB_ID))
.andEq(TABLE_INFO_ALIAS, Column.as(TABLE_VERSION_ALIAS))
.whereLt(TABLE_INFO_PRIORITY, 10000)
.andEq(TABLE_VERSION_SCHEDULED, false)
.andEq(TABLE_VERSION_VERSION, version)
.andEq(TABLE_INFO_STATUS, "y")
generateUnScheduledNormalTableSql(SqlBuilder.select(COUNT), version)
.build(),
Long.class
);
}
private SqlBuilder generateUnScheduledFocusTableSql(SelectSqlBuilder builder, String version) {
return builder
.from(TABLE_INFO)
.join(TABLE_VERSION)
.onEq(TABLE_INFO_FLINK_JOB_ID, Column.as(TABLE_VERSION_FLINK_JOB_ID))
.andEq(TABLE_INFO_ALIAS, Column.as(TABLE_VERSION_ALIAS))
.whereGe(TABLE_INFO_PRIORITY, 10000)
.andEq(TABLE_VERSION_SCHEDULED, false)
.andEq(TABLE_VERSION_VERSION, version)
.andEq(TABLE_INFO_STATUS, "y");
}
@Cacheable(value = "un-scheduled-focus-table", sync = true)
@Retryable(Throwable.class)
public ImmutableList<JobIdAndAlias> unScheduledFocusTable(String version) {
return Lists.immutable.ofAll(
mysqlJdbcTemplate.query(
generateUnScheduledFocusTableSql(SqlBuilder.select(TABLE_INFO_FLINK_JOB_ID, TABLE_INFO_ALIAS), version)
.build(),
(rs, row) -> new JobIdAndAlias(rs.getLong(1), rs.getString(2))
)
);
}
@Cacheable(value = "un_scheduled_focus_table_count", sync = true)
@Retryable(Throwable.class)
public Long unScheduledFocusTableCount(String version) {
return mysqlJdbcTemplate.queryForObject(
SqlBuilder.select(COUNT)
.from(TABLE_INFO)
.join(TABLE_VERSION)
.onEq(TABLE_INFO_FLINK_JOB_ID, Column.as(TABLE_VERSION_FLINK_JOB_ID))
.andEq(TABLE_INFO_ALIAS, Column.as(TABLE_VERSION_ALIAS))
.whereGe(TABLE_INFO_PRIORITY, 10000)
.andEq(TABLE_VERSION_SCHEDULED, false)
.andEq(TABLE_VERSION_VERSION, version)
.andEq(TABLE_INFO_STATUS, "y")
generateUnScheduledFocusTableSql(SqlBuilder.select(COUNT), version)
.build(),
Long.class
);
}
@Cacheable(value = "table-total", sync = true)
@Cacheable(value = "table-count", sync = true)
@Retryable(Throwable.class)
public Long tableTotal() {
return mysqlJdbcTemplate.queryForObject(
SqlBuilder.select("count(distinct concat(src_schema, src_table))").from(TABLE_INFO).whereEq(TABLE_INFO_STATUS, "y").build(),
Long.class
);
}
@Cacheable(value = "hudi-total", sync = true)
@Retryable(Throwable.class)
public Long hudiTotal() {
return mysqlJdbcTemplate.queryForObject(
SqlBuilder.select("count(distinct tgt_hdfs_path) as count").from(TABLE_INFO).whereEq(TABLE_INFO_STATUS, "y").build(),
Long.class
);
}
@Cacheable(value = "focus-count", sync = true)
@Retryable(Throwable.class)
public Long focusCount() {
public Long tableCount() {
return mysqlJdbcTemplate.queryForObject(
SqlBuilder.select("count(distinct concat(src_schema, src_table))")
.from(TABLE_INFO)
.whereGe(TABLE_INFO_PRIORITY, 10000)
.andEq(TABLE_INFO_STATUS, "y")
.whereEq(TABLE_INFO_STATUS, "y")
.build(),
Long.class
);
}
@Cacheable(value = "normal-count", sync = true)
@Cacheable(value = "table-focus-count", sync = true)
@Retryable(Throwable.class)
public Long normalCount() {
public Long tableFocusCount() {
return mysqlJdbcTemplate.queryForObject(
SqlBuilder.select("count(distinct concat(src_schema, src_table))")
.from(TABLE_INFO)
.whereLt(TABLE_INFO_PRIORITY, 10000)
.andEq(TABLE_INFO_STATUS, "y")
.whereEq(TABLE_INFO_STATUS, "y")
.andGe(TABLE_INFO_PRIORITY, 10000)
.build(),
Long.class
);
}
@Cacheable(value = "hudi-count", sync = true)
@Retryable(Throwable.class)
public Long hudiCount() {
return mysqlJdbcTemplate.queryForObject(
SqlBuilder.select("count(distinct tgt_hdfs_path) as count")
.from(TABLE_INFO)
.whereEq(TABLE_INFO_STATUS, "y")
.build(),
Long.class
);
}
@Cacheable(value = "hudi-focus-count", sync = true)
@Retryable(Throwable.class)
public Long hudiFocusCount() {
return mysqlJdbcTemplate.queryForObject(
SqlBuilder.select("count(distinct tgt_hdfs_path) as count")
.from(TABLE_INFO)
.whereEq(TABLE_INFO_STATUS, "y")
.andGe(TABLE_INFO_PRIORITY, 10000)
.build(),
Long.class
);

View File

@@ -22,6 +22,7 @@ public class SqlBuilderTests {
private static final String TABLE_INFO_FLINK_JOB_ID = column(TABLE_INFO, "flink_job_id");
private static final String TABLE_INFO_ALIAS = column(TABLE_INFO, "alias");
private static final String TABLE_INFO_PRIORITY = column(TABLE_INFO, "priority");
private static final String TABLE_INFO_STATUS = column(TABLE_INFO, "status");
private static final Alias TABLE_SYNC_STATE = Alias.of(StrUtil.format("{}.tb_app_hudi_sync_state", DATABASE_NAME), "tahss");
private static final String TABLE_SYNC_STATE_ID = column(TABLE_SYNC_STATE, "id");
@@ -32,15 +33,10 @@ public class SqlBuilderTests {
public static void main(String[] args) {
System.out.println(SqlFormatter.format(
SqlBuilder.select(COUNT)
SqlBuilder.select("count(distinct concat(src_schema, src_table))")
.from(TABLE_INFO)
.whereGe(TABLE_INFO_PRIORITY, 10000)
.andNotIn(
StrUtil.format("concat({}, {})", TABLE_INFO_FLINK_JOB_ID, TABLE_INFO_ALIAS),
SqlBuilder.select(StrUtil.format("concat({}, {})", TABLE_VERSION_FLINK_JOB_ID, TABLE_VERSION_ALIAS))
.from(TABLE_VERSION)
.whereEq(TABLE_VERSION_VERSION, "20230611")
)
.andEq(TABLE_INFO_STATUS, "y")
.build()
));
}

View File

@@ -50,21 +50,21 @@ public class OverviewController extends BaseController {
@GetMapping("")
public AmisResponse overview() throws ExecutionException, InterruptedException {
CompletableFuture<Long> tableTotalFuture = CompletableFuture.supplyAsync(infoService::tableTotal, ExecutorProvider.EXECUTORS);
CompletableFuture<Long> hudiTotalFuture = CompletableFuture.supplyAsync(infoService::hudiTotal, ExecutorProvider.EXECUTORS);
CompletableFuture<Long> focusCountFuture = CompletableFuture.supplyAsync(infoService::focusCount, ExecutorProvider.EXECUTORS);
CompletableFuture<Long> normalCountFuture = CompletableFuture.supplyAsync(infoService::normalCount, ExecutorProvider.EXECUTORS);
CompletableFuture<Long> tableCountFuture = CompletableFuture.supplyAsync(infoService::tableCount, ExecutorProvider.EXECUTORS);
CompletableFuture<Long> tableFocusCountFuture = CompletableFuture.supplyAsync(infoService::tableFocusCount, ExecutorProvider.EXECUTORS);
CompletableFuture<Long> hudiCountFuture = CompletableFuture.supplyAsync(infoService::hudiCount, ExecutorProvider.EXECUTORS);
CompletableFuture<Long> hudiFocusCountFuture = CompletableFuture.supplyAsync(infoService::hudiFocusCount, ExecutorProvider.EXECUTORS);
CompletableFuture.allOf(
tableTotalFuture,
hudiTotalFuture,
focusCountFuture,
normalCountFuture
tableCountFuture,
tableFocusCountFuture,
hudiCountFuture,
hudiFocusCountFuture
);
return responseData()
.withData("table_total", tableTotalFuture.get())
.withData("hudi_total", hudiTotalFuture.get())
.withData("focus_count", focusCountFuture.get())
.withData("normal_count", normalCountFuture.get());
.withData("table_count", tableCountFuture.get())
.withData("table_focus_count", tableFocusCountFuture.get())
.withData("hudi_count", hudiCountFuture.get())
.withData("hudi_focus_count", hudiFocusCountFuture.get());
}
@GetMapping("yarn-job")

View File

@@ -35,6 +35,7 @@ function overviewYarnJob(cluster, search, queue, yarnQueue) {
}
]
},
' ',
{
type: 'service',
className: 'inline',
@@ -120,6 +121,7 @@ function overviewTab() {
]
},
{type: 'divider'},
'<span class="italic text-gray-500 my-2">表数量 (重点表数量, 普通表数量)</span>',
{
type: 'service',
api: '${base}/overview',
@@ -128,29 +130,21 @@ function overviewTab() {
body: [
{
type: 'tpl',
tpl: '逻辑表:<span class="font-bold mr-1">${table_total}</span>',
tpl: '逻辑表:<span class="font-bold mr-1">${PADSTART(table_count, 4)} (<span class="text-primary">${PADSTART(table_focus_count, 4)}</span>, ${PADSTART(table_count - table_focus_count, 4)})</span>',
},
'<br>',
{
type: 'tpl',
tpl: '湖底表:<span class="font-bold mr-1">${hudi_total}</span>',
},
'<br>',
{
type: 'tpl',
tpl: '重点表:<span class="font-bold mr-1">${focus_count}</span>',
},
'<br>',
{
type: 'tpl',
tpl: '普通表:<span class="font-bold">${normal_count}</span>',
tpl: '湖底表:<span class="font-bold mr-1">${PADSTART(hudi_count, 4)} (<span class="text-primary">${PADSTART(hudi_focus_count, 4)}</span>, ${PADSTART(hudi_count - hudi_focus_count, 4)})</span>',
},
]
},
{type: 'divider'},
'<span class="italic text-gray-500 my-2">集群 (集群总资源使用,队列资源使用)(总压缩任务数,调度中任务数,运行中任务数,失败任务数)</span>',
overviewYarnJob('b5-sync', 'Sync', undefined, 'default'),
{type: 'divider'},
{
className: 'my-2',
type: 'service',
api: `\${base}/overview/queue?queue=compaction-queue-pre`,
interval: 10000,
@@ -158,7 +152,7 @@ function overviewTab() {
body: [
{
type: 'tpl',
tpl: 'Pre<span class="font-bold">${size}</span>',
tpl: '预调度队列<span class="font-bold">${size}</span>',
}
]
},